1use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12 cast::AsArray, Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray,
13 GenericListArray, LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
14 StructArray, UInt32Array, UInt8Array,
15};
16use arrow_array::{
17 new_null_array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SortOptions};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30use crate::list::ListArrayExt;
31pub use floats::*;
32
33pub mod cast;
34pub mod json;
35pub mod list;
36pub mod memory;
37pub mod r#struct;
38
39pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
41
42pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
44
45pub const BLOB_META_KEY: &str = "lance-encoding:blob";
48pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
50pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str =
52 "lance-encoding:blob-dedicated-size-threshold";
53
54type Result<T> = std::result::Result<T, ArrowError>;
55
56pub trait DataTypeExt {
57 fn is_binary_like(&self) -> bool;
70
71 fn is_struct(&self) -> bool;
73
74 fn is_fixed_stride(&self) -> bool;
79
80 fn is_dictionary(&self) -> bool;
82
83 fn byte_width(&self) -> usize;
86
87 fn byte_width_opt(&self) -> Option<usize>;
90}
91
92impl DataTypeExt for DataType {
93 fn is_binary_like(&self) -> bool {
94 use DataType::*;
95 matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
96 }
97
98 fn is_struct(&self) -> bool {
99 matches!(self, Self::Struct(_))
100 }
101
102 fn is_fixed_stride(&self) -> bool {
103 use DataType::*;
104 matches!(
105 self,
106 Boolean
107 | UInt8
108 | UInt16
109 | UInt32
110 | UInt64
111 | Int8
112 | Int16
113 | Int32
114 | Int64
115 | Float16
116 | Float32
117 | Float64
118 | Decimal128(_, _)
119 | Decimal256(_, _)
120 | FixedSizeList(_, _)
121 | FixedSizeBinary(_)
122 | Duration(_)
123 | Timestamp(_, _)
124 | Date32
125 | Date64
126 | Time32(_)
127 | Time64(_)
128 )
129 }
130
131 fn is_dictionary(&self) -> bool {
132 matches!(self, Self::Dictionary(_, _))
133 }
134
135 fn byte_width_opt(&self) -> Option<usize> {
136 match self {
137 Self::Int8 => Some(1),
138 Self::Int16 => Some(2),
139 Self::Int32 => Some(4),
140 Self::Int64 => Some(8),
141 Self::UInt8 => Some(1),
142 Self::UInt16 => Some(2),
143 Self::UInt32 => Some(4),
144 Self::UInt64 => Some(8),
145 Self::Float16 => Some(2),
146 Self::Float32 => Some(4),
147 Self::Float64 => Some(8),
148 Self::Date32 => Some(4),
149 Self::Date64 => Some(8),
150 Self::Time32(_) => Some(4),
151 Self::Time64(_) => Some(8),
152 Self::Timestamp(_, _) => Some(8),
153 Self::Duration(_) => Some(8),
154 Self::Decimal128(_, _) => Some(16),
155 Self::Decimal256(_, _) => Some(32),
156 Self::Interval(unit) => match unit {
157 IntervalUnit::YearMonth => Some(4),
158 IntervalUnit::DayTime => Some(8),
159 IntervalUnit::MonthDayNano => Some(16),
160 },
161 Self::FixedSizeBinary(s) => Some(*s as usize),
162 Self::FixedSizeList(dt, s) => dt
163 .data_type()
164 .byte_width_opt()
165 .map(|width| width * *s as usize),
166 _ => None,
167 }
168 }
169
170 fn byte_width(&self) -> usize {
171 self.byte_width_opt()
172 .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
173 }
174}
175
176pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
194 values: T,
195 offsets: &PrimitiveArray<Offset>,
196) -> Result<GenericListArray<Offset::Native>>
197where
198 Offset::Native: OffsetSizeTrait,
199{
200 let data_type = if Offset::Native::IS_LARGE {
201 DataType::LargeList(Arc::new(Field::new(
202 "item",
203 values.data_type().clone(),
204 true,
205 )))
206 } else {
207 DataType::List(Arc::new(Field::new(
208 "item",
209 values.data_type().clone(),
210 true,
211 )))
212 };
213 let data = ArrayDataBuilder::new(data_type)
214 .len(offsets.len() - 1)
215 .add_buffer(offsets.into_data().buffers()[0].clone())
216 .add_child_data(values.into_data())
217 .build()?;
218
219 Ok(GenericListArray::from(data))
220}
221
222pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
223 DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
224}
225
226pub trait FixedSizeListArrayExt {
227 fn try_new_from_values<T: Array + 'static>(
246 values: T,
247 list_size: i32,
248 ) -> Result<FixedSizeListArray>;
249
250 fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
264
265 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
268}
269
270impl FixedSizeListArrayExt for FixedSizeListArray {
271 fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
272 let field = Arc::new(Field::new("item", values.data_type().clone(), true));
273 let values = Arc::new(values);
274
275 Self::try_new(field, list_size, values, None)
276 }
277
278 fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
279 if n >= self.len() {
280 return Ok(self.clone());
281 }
282 let mut rng = SmallRng::from_os_rng();
283 let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
284 take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
285 }
286
287 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
288 match self.data_type() {
289 DataType::FixedSizeList(field, size) => match field.data_type() {
290 DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
291 DataType::Int8 => Ok(Self::new(
292 Arc::new(arrow_schema::Field::new(
293 field.name(),
294 DataType::Float32,
295 field.is_nullable(),
296 )),
297 *size,
298 Arc::new(Float32Array::from_iter_values(
299 self.values()
300 .as_any()
301 .downcast_ref::<Int8Array>()
302 .ok_or(ArrowError::ParseError(
303 "Fail to cast primitive array to Int8Type".to_string(),
304 ))?
305 .into_iter()
306 .filter_map(|x| x.map(|y| y as f32)),
307 )),
308 self.nulls().cloned(),
309 )),
310 DataType::Int16 => Ok(Self::new(
311 Arc::new(arrow_schema::Field::new(
312 field.name(),
313 DataType::Float32,
314 field.is_nullable(),
315 )),
316 *size,
317 Arc::new(Float32Array::from_iter_values(
318 self.values()
319 .as_any()
320 .downcast_ref::<Int16Array>()
321 .ok_or(ArrowError::ParseError(
322 "Fail to cast primitive array to Int8Type".to_string(),
323 ))?
324 .into_iter()
325 .filter_map(|x| x.map(|y| y as f32)),
326 )),
327 self.nulls().cloned(),
328 )),
329 DataType::Int32 => Ok(Self::new(
330 Arc::new(arrow_schema::Field::new(
331 field.name(),
332 DataType::Float32,
333 field.is_nullable(),
334 )),
335 *size,
336 Arc::new(Float32Array::from_iter_values(
337 self.values()
338 .as_any()
339 .downcast_ref::<Int32Array>()
340 .ok_or(ArrowError::ParseError(
341 "Fail to cast primitive array to Int8Type".to_string(),
342 ))?
343 .into_iter()
344 .filter_map(|x| x.map(|y| y as f32)),
345 )),
346 self.nulls().cloned(),
347 )),
348 DataType::Int64 => Ok(Self::new(
349 Arc::new(arrow_schema::Field::new(
350 field.name(),
351 DataType::Float64,
352 field.is_nullable(),
353 )),
354 *size,
355 Arc::new(Float64Array::from_iter_values(
356 self.values()
357 .as_any()
358 .downcast_ref::<Int64Array>()
359 .ok_or(ArrowError::ParseError(
360 "Fail to cast primitive array to Int8Type".to_string(),
361 ))?
362 .into_iter()
363 .filter_map(|x| x.map(|y| y as f64)),
364 )),
365 self.nulls().cloned(),
366 )),
367 DataType::UInt8 => Ok(Self::new(
368 Arc::new(arrow_schema::Field::new(
369 field.name(),
370 DataType::Float64,
371 field.is_nullable(),
372 )),
373 *size,
374 Arc::new(Float64Array::from_iter_values(
375 self.values()
376 .as_any()
377 .downcast_ref::<UInt8Array>()
378 .ok_or(ArrowError::ParseError(
379 "Fail to cast primitive array to Int8Type".to_string(),
380 ))?
381 .into_iter()
382 .filter_map(|x| x.map(|y| y as f64)),
383 )),
384 self.nulls().cloned(),
385 )),
386 DataType::UInt32 => Ok(Self::new(
387 Arc::new(arrow_schema::Field::new(
388 field.name(),
389 DataType::Float64,
390 field.is_nullable(),
391 )),
392 *size,
393 Arc::new(Float64Array::from_iter_values(
394 self.values()
395 .as_any()
396 .downcast_ref::<UInt32Array>()
397 .ok_or(ArrowError::ParseError(
398 "Fail to cast primitive array to Int8Type".to_string(),
399 ))?
400 .into_iter()
401 .filter_map(|x| x.map(|y| y as f64)),
402 )),
403 self.nulls().cloned(),
404 )),
405 data_type => Err(ArrowError::ParseError(format!(
406 "Expect either floating type or integer got {:?}",
407 data_type
408 ))),
409 },
410 data_type => Err(ArrowError::ParseError(format!(
411 "Expect either FixedSizeList got {:?}",
412 data_type
413 ))),
414 }
415 }
416}
417
418pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
421 arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
422}
423
424pub trait FixedSizeBinaryArrayExt {
425 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
444}
445
446impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
447 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
448 let data_type = DataType::FixedSizeBinary(stride);
449 let data = ArrayDataBuilder::new(data_type)
450 .len(values.len() / stride as usize)
451 .add_buffer(values.into_data().buffers()[0].clone())
452 .build()?;
453 Ok(Self::from(data))
454 }
455}
456
457pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
458 arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
459}
460
461pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
462 match arr.data_type() {
463 DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
464 DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
465 _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
466 }
467}
468
469pub trait RecordBatchExt {
471 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
501
502 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
504
505 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
509
510 fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
555
556 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
566
567 fn drop_column(&self, name: &str) -> Result<RecordBatch>;
571
572 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
574
575 fn replace_column_schema_by_name(
577 &self,
578 name: &str,
579 new_data_type: DataType,
580 column: Arc<dyn Array>,
581 ) -> Result<RecordBatch>;
582
583 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
585
586 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
588
589 fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
591
592 fn metadata(&self) -> &HashMap<String, String>;
594
595 fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
597 let mut metadata = self.metadata().clone();
598 metadata.insert(key, value);
599 self.with_metadata(metadata)
600 }
601
602 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
604
605 fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
607
608 fn shrink_to_fit(&self) -> Result<RecordBatch>;
610
611 fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<RecordBatch>;
613}
614
615impl RecordBatchExt for RecordBatch {
616 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
617 let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
618 let mut new_columns = self.columns().to_vec();
619 new_columns.push(arr);
620 Self::try_new(new_schema, new_columns)
621 }
622
623 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
624 let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
625 let mut new_columns = self.columns().to_vec();
626 new_columns.insert(index, arr);
627 Self::try_new(new_schema, new_columns)
628 }
629
630 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
631 let schema = Arc::new(Schema::new_with_metadata(
632 arr.fields().to_vec(),
633 self.schema().metadata.clone(),
634 ));
635 let batch = Self::from(arr);
636 batch.with_schema(schema)
637 }
638
639 fn merge(&self, other: &Self) -> Result<Self> {
640 if self.num_rows() != other.num_rows() {
641 return Err(ArrowError::InvalidArgumentError(format!(
642 "Attempt to merge two RecordBatch with different sizes: {} != {}",
643 self.num_rows(),
644 other.num_rows()
645 )));
646 }
647 let left_struct_array: StructArray = self.clone().into();
648 let right_struct_array: StructArray = other.clone().into();
649 self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
650 }
651
652 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
653 if self.num_rows() != other.num_rows() {
654 return Err(ArrowError::InvalidArgumentError(format!(
655 "Attempt to merge two RecordBatch with different sizes: {} != {}",
656 self.num_rows(),
657 other.num_rows()
658 )));
659 }
660 let left_struct_array: StructArray = self.clone().into();
661 let right_struct_array: StructArray = other.clone().into();
662 self.try_new_from_struct_array(merge_with_schema(
663 &left_struct_array,
664 &right_struct_array,
665 schema.fields(),
666 ))
667 }
668
669 fn drop_column(&self, name: &str) -> Result<Self> {
670 let mut fields = vec![];
671 let mut columns = vec![];
672 for i in 0..self.schema().fields.len() {
673 if self.schema().field(i).name() != name {
674 fields.push(self.schema().field(i).clone());
675 columns.push(self.column(i).clone());
676 }
677 }
678 Self::try_new(
679 Arc::new(Schema::new_with_metadata(
680 fields,
681 self.schema().metadata().clone(),
682 )),
683 columns,
684 )
685 }
686
687 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
688 let mut fields = self.schema().fields().to_vec();
689 if index >= fields.len() {
690 return Err(ArrowError::InvalidArgumentError(format!(
691 "Index out of bounds: {}",
692 index
693 )));
694 }
695 fields[index] = Arc::new(Field::new(
696 new_name,
697 fields[index].data_type().clone(),
698 fields[index].is_nullable(),
699 ));
700 Self::try_new(
701 Arc::new(Schema::new_with_metadata(
702 fields,
703 self.schema().metadata().clone(),
704 )),
705 self.columns().to_vec(),
706 )
707 }
708
709 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
710 let mut columns = self.columns().to_vec();
711 let field_i = self
712 .schema()
713 .fields()
714 .iter()
715 .position(|f| f.name() == name)
716 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
717 columns[field_i] = column;
718 Self::try_new(self.schema(), columns)
719 }
720
721 fn replace_column_schema_by_name(
722 &self,
723 name: &str,
724 new_data_type: DataType,
725 column: Arc<dyn Array>,
726 ) -> Result<RecordBatch> {
727 let fields = self
728 .schema()
729 .fields()
730 .iter()
731 .map(|x| {
732 if x.name() != name {
733 x.clone()
734 } else {
735 let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
736 Arc::new(new_field)
737 }
738 })
739 .collect::<Vec<_>>();
740 let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
741 let mut columns = self.columns().to_vec();
742 let field_i = self
743 .schema()
744 .fields()
745 .iter()
746 .position(|f| f.name() == name)
747 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
748 columns[field_i] = column;
749 Self::try_new(Arc::new(schema), columns)
750 }
751
752 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
753 let split = name.split('.').collect::<Vec<_>>();
754 if split.is_empty() {
755 return None;
756 }
757
758 self.column_by_name(split[0])
759 .and_then(|arr| get_sub_array(arr, &split[1..]))
760 }
761
762 fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
763 let struct_array: StructArray = self.clone().into();
764 self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
765 }
766
767 fn metadata(&self) -> &HashMap<String, String> {
768 self.schema_ref().metadata()
769 }
770
771 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
772 let mut schema = self.schema_ref().as_ref().clone();
773 schema.metadata = metadata;
774 Self::try_new(schema.into(), self.columns().into())
775 }
776
777 fn take(&self, indices: &UInt32Array) -> Result<Self> {
778 let struct_array: StructArray = self.clone().into();
779 let taken = take(&struct_array, indices, None)?;
780 self.try_new_from_struct_array(taken.as_struct().clone())
781 }
782
783 fn shrink_to_fit(&self) -> Result<Self> {
784 crate::deepcopy::deep_copy_batch_sliced(self)
786 }
787
788 fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<Self> {
789 if column >= self.num_columns() {
790 return Err(ArrowError::InvalidArgumentError(format!(
791 "Column index out of bounds: {}",
792 column
793 )));
794 }
795 let column = self.column(column);
796 let sorted = arrow_ord::sort::sort_to_indices(column, options, None)?;
797 self.take(&sorted)
798 }
799}
800
801fn project_array(array: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
804 match target_field.data_type() {
805 DataType::Struct(subfields) => {
806 let struct_arr = array.as_struct();
807 let projected = project(struct_arr, subfields)?;
808 Ok(Arc::new(projected))
809 }
810 DataType::List(inner_field) => {
811 let list_arr: &ListArray = array.as_list();
812 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
813 Ok(Arc::new(ListArray::new(
814 inner_field.clone(),
815 list_arr.offsets().clone(),
816 projected_values,
817 list_arr.nulls().cloned(),
818 )))
819 }
820 DataType::LargeList(inner_field) => {
821 let list_arr: &LargeListArray = array.as_list();
822 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
823 Ok(Arc::new(LargeListArray::new(
824 inner_field.clone(),
825 list_arr.offsets().clone(),
826 projected_values,
827 list_arr.nulls().cloned(),
828 )))
829 }
830 DataType::FixedSizeList(inner_field, size) => {
831 let list_arr = array.as_fixed_size_list();
832 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
833 Ok(Arc::new(FixedSizeListArray::new(
834 inner_field.clone(),
835 *size,
836 projected_values,
837 list_arr.nulls().cloned(),
838 )))
839 }
840 _ => Ok(array.clone()),
841 }
842}
843
844fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
845 if fields.is_empty() {
846 return Ok(StructArray::new_empty_fields(
847 struct_array.len(),
848 struct_array.nulls().cloned(),
849 ));
850 }
851 let mut columns: Vec<ArrayRef> = vec![];
852 for field in fields.iter() {
853 if let Some(col) = struct_array.column_by_name(field.name()) {
854 let projected = project_array(col, field.as_ref())?;
855 columns.push(projected);
856 } else {
857 return Err(ArrowError::SchemaError(format!(
858 "field {} does not exist in the RecordBatch",
859 field.name()
860 )));
861 }
862 }
863 StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
865}
866
867fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
868 let left_list: &GenericListArray<T> = left.as_list();
869 let right_list: &GenericListArray<T> = right.as_list();
870 left_list.offsets().inner() == right_list.offsets().inner()
871}
872
873fn merge_list_structs_helper<T: OffsetSizeTrait>(
874 left: &dyn Array,
875 right: &dyn Array,
876 items_field_name: impl Into<String>,
877 items_nullable: bool,
878) -> Arc<dyn Array> {
879 let left_list: &GenericListArray<T> = left.as_list();
880 let right_list: &GenericListArray<T> = right.as_list();
881 let left_struct = left_list.values();
882 let right_struct = right_list.values();
883 let left_struct_arr = left_struct.as_struct();
884 let right_struct_arr = right_struct.as_struct();
885 let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
886 let items_field = Arc::new(Field::new(
887 items_field_name,
888 merged_items.data_type().clone(),
889 items_nullable,
890 ));
891 Arc::new(GenericListArray::<T>::new(
892 items_field,
893 left_list.offsets().clone(),
894 merged_items,
895 left_list.nulls().cloned(),
896 ))
897}
898
899fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
900 left: &dyn Array,
901 right: &dyn Array,
902 not_null: &dyn Array,
903 items_field_name: impl Into<String>,
904) -> Arc<dyn Array> {
905 let left_list: &GenericListArray<T> = left.as_list::<T>();
906 let not_null_list = not_null.as_list::<T>();
907 let right_list = right.as_list::<T>();
908
909 let left_struct = left_list.values().as_struct();
910 let not_null_struct: &StructArray = not_null_list.values().as_struct();
911 let right_struct = right_list.values().as_struct();
912
913 let values_len = not_null_list.values().len();
914 let mut merged_fields =
915 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
916 let mut merged_columns =
917 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
918
919 for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
920 merged_fields.push(field.clone());
921 if let Some(val) = not_null_struct.column_by_name(field.name()) {
922 merged_columns.push(val.clone());
923 } else {
924 merged_columns.push(new_null_array(field.data_type(), values_len))
925 }
926 }
927 for (_, field) in right_struct
928 .columns()
929 .iter()
930 .zip(right_struct.fields())
931 .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
932 {
933 merged_fields.push(field.clone());
934 if let Some(val) = not_null_struct.column_by_name(field.name()) {
935 merged_columns.push(val.clone());
936 } else {
937 merged_columns.push(new_null_array(field.data_type(), values_len));
938 }
939 }
940
941 let merged_struct = Arc::new(StructArray::new(
942 Fields::from(merged_fields),
943 merged_columns,
944 not_null_struct.nulls().cloned(),
945 ));
946 let items_field = Arc::new(Field::new(
947 items_field_name,
948 merged_struct.data_type().clone(),
949 true,
950 ));
951 Arc::new(GenericListArray::<T>::new(
952 items_field,
953 not_null_list.offsets().clone(),
954 merged_struct,
955 not_null_list.nulls().cloned(),
956 ))
957}
958
959fn merge_list_struct_null(
960 left: &dyn Array,
961 right: &dyn Array,
962 not_null: &dyn Array,
963) -> Arc<dyn Array> {
964 match left.data_type() {
965 DataType::List(left_field) => {
966 merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
967 }
968 DataType::LargeList(left_field) => {
969 merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
970 }
971 _ => unreachable!(),
972 }
973}
974
975fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
976 if left.null_count() == left.len() {
980 return merge_list_struct_null(left, right, right);
981 } else if right.null_count() == right.len() {
982 return merge_list_struct_null(left, right, left);
983 }
984 match (left.data_type(), right.data_type()) {
985 (DataType::List(left_field), DataType::List(_)) => {
986 if !lists_have_same_offsets_helper::<i32>(left, right) {
987 panic!("Attempt to merge list struct arrays which do not have same offsets");
988 }
989 merge_list_structs_helper::<i32>(
990 left,
991 right,
992 left_field.name(),
993 left_field.is_nullable(),
994 )
995 }
996 (DataType::LargeList(left_field), DataType::LargeList(_)) => {
997 if !lists_have_same_offsets_helper::<i64>(left, right) {
998 panic!("Attempt to merge list struct arrays which do not have same offsets");
999 }
1000 merge_list_structs_helper::<i64>(
1001 left,
1002 right,
1003 left_field.name(),
1004 left_field.is_nullable(),
1005 )
1006 }
1007 _ => unreachable!(),
1008 }
1009}
1010
1011fn normalize_validity(
1014 validity: Option<&arrow_buffer::NullBuffer>,
1015) -> Option<&arrow_buffer::NullBuffer> {
1016 validity.and_then(|v| {
1017 if v.null_count() == v.len() {
1018 None
1019 } else {
1020 Some(v)
1021 }
1022 })
1023}
1024
1025fn merge_struct_validity(
1030 left_validity: Option<&arrow_buffer::NullBuffer>,
1031 right_validity: Option<&arrow_buffer::NullBuffer>,
1032) -> Option<arrow_buffer::NullBuffer> {
1033 let left_normalized = normalize_validity(left_validity);
1035 let right_normalized = normalize_validity(right_validity);
1036
1037 match (left_normalized, right_normalized) {
1038 (None, None) => None,
1040 (Some(left), None) => Some(left.clone()),
1041 (None, Some(right)) => Some(right.clone()),
1042 (Some(left), Some(right)) => {
1043 if left.null_count() == 0 && right.null_count() == 0 {
1045 return Some(left.clone());
1046 }
1047
1048 let left_buffer = left.inner();
1049 let right_buffer = right.inner();
1050
1051 let merged_buffer = left_buffer | right_buffer;
1054
1055 Some(arrow_buffer::NullBuffer::from(merged_buffer))
1056 }
1057 }
1058}
1059
1060fn merge_list_child_values(
1061 child_field: &Field,
1062 left_values: ArrayRef,
1063 right_values: ArrayRef,
1064) -> ArrayRef {
1065 match child_field.data_type() {
1066 DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1067 left_values.as_struct(),
1068 right_values.as_struct(),
1069 child_fields,
1070 )) as ArrayRef,
1071 DataType::List(grandchild) => {
1072 let left_list = left_values
1073 .as_any()
1074 .downcast_ref::<ListArray>()
1075 .expect("left list values should be ListArray");
1076 let right_list = right_values
1077 .as_any()
1078 .downcast_ref::<ListArray>()
1079 .expect("right list values should be ListArray");
1080 let merged_values = merge_list_child_values(
1081 grandchild.as_ref(),
1082 left_list.values().clone(),
1083 right_list.values().clone(),
1084 );
1085 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1086 Arc::new(ListArray::new(
1087 grandchild.clone(),
1088 left_list.offsets().clone(),
1089 merged_values,
1090 merged_validity,
1091 )) as ArrayRef
1092 }
1093 DataType::LargeList(grandchild) => {
1094 let left_list = left_values
1095 .as_any()
1096 .downcast_ref::<LargeListArray>()
1097 .expect("left list values should be LargeListArray");
1098 let right_list = right_values
1099 .as_any()
1100 .downcast_ref::<LargeListArray>()
1101 .expect("right list values should be LargeListArray");
1102 let merged_values = merge_list_child_values(
1103 grandchild.as_ref(),
1104 left_list.values().clone(),
1105 right_list.values().clone(),
1106 );
1107 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1108 Arc::new(LargeListArray::new(
1109 grandchild.clone(),
1110 left_list.offsets().clone(),
1111 merged_values,
1112 merged_validity,
1113 )) as ArrayRef
1114 }
1115 DataType::FixedSizeList(grandchild, list_size) => {
1116 let left_list = left_values
1117 .as_any()
1118 .downcast_ref::<FixedSizeListArray>()
1119 .expect("left list values should be FixedSizeListArray");
1120 let right_list = right_values
1121 .as_any()
1122 .downcast_ref::<FixedSizeListArray>()
1123 .expect("right list values should be FixedSizeListArray");
1124 let merged_values = merge_list_child_values(
1125 grandchild.as_ref(),
1126 left_list.values().clone(),
1127 right_list.values().clone(),
1128 );
1129 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1130 Arc::new(FixedSizeListArray::new(
1131 grandchild.clone(),
1132 *list_size,
1133 merged_values,
1134 merged_validity,
1135 )) as ArrayRef
1136 }
1137 _ => left_values.clone(),
1138 }
1139}
1140
1141fn adjust_child_validity(
1145 child: &ArrayRef,
1146 parent_validity: Option<&arrow_buffer::NullBuffer>,
1147) -> ArrayRef {
1148 let parent_validity = match parent_validity {
1150 None => return child.clone(),
1151 Some(p) if p.null_count() == 0 => return child.clone(), Some(p) => p,
1153 };
1154
1155 let child_validity = child.nulls();
1156
1157 let new_validity = match child_validity {
1159 None => {
1160 parent_validity.clone()
1162 }
1163 Some(child_nulls) => {
1164 let child_buffer = child_nulls.inner();
1165 let parent_buffer = parent_validity.inner();
1166
1167 let merged_buffer = child_buffer & parent_buffer;
1170
1171 arrow_buffer::NullBuffer::from(merged_buffer)
1172 }
1173 };
1174
1175 arrow_array::make_array(
1177 arrow_data::ArrayData::try_new(
1178 child.data_type().clone(),
1179 child.len(),
1180 Some(new_validity.into_inner().into_inner()),
1181 child.offset(),
1182 child.to_data().buffers().to_vec(),
1183 child.to_data().child_data().to_vec(),
1184 )
1185 .unwrap(),
1186 )
1187}
1188
1189fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1190 let mut fields: Vec<Field> = vec![];
1191 let mut columns: Vec<ArrayRef> = vec![];
1192 let right_fields = right_struct_array.fields();
1193 let right_columns = right_struct_array.columns();
1194
1195 let left_validity = left_struct_array.nulls();
1197 let right_validity = right_struct_array.nulls();
1198
1199 let merged_validity = merge_struct_validity(left_validity, right_validity);
1201
1202 for (left_field, left_column) in left_struct_array
1204 .fields()
1205 .iter()
1206 .zip(left_struct_array.columns().iter())
1207 {
1208 match right_fields
1209 .iter()
1210 .position(|f| f.name() == left_field.name())
1211 {
1212 Some(right_index) => {
1214 let right_field = right_fields.get(right_index).unwrap();
1215 let right_column = right_columns.get(right_index).unwrap();
1216 match (left_field.data_type(), right_field.data_type()) {
1218 (DataType::Struct(_), DataType::Struct(_)) => {
1219 let left_sub_array = left_column.as_struct();
1220 let right_sub_array = right_column.as_struct();
1221 let merged_sub_array = merge(left_sub_array, right_sub_array);
1222 fields.push(Field::new(
1223 left_field.name(),
1224 merged_sub_array.data_type().clone(),
1225 left_field.is_nullable(),
1226 ));
1227 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1228 }
1229 (DataType::List(left_list), DataType::List(right_list))
1230 if left_list.data_type().is_struct()
1231 && right_list.data_type().is_struct() =>
1232 {
1233 if left_list.data_type() == right_list.data_type() {
1235 fields.push(left_field.as_ref().clone());
1236 columns.push(left_column.clone());
1237 }
1238 let merged_sub_array = merge_list_struct(&left_column, &right_column);
1242
1243 fields.push(Field::new(
1244 left_field.name(),
1245 merged_sub_array.data_type().clone(),
1246 left_field.is_nullable(),
1247 ));
1248 columns.push(merged_sub_array);
1249 }
1250 _ => {
1252 fields.push(left_field.as_ref().clone());
1254 let adjusted_column = adjust_child_validity(left_column, left_validity);
1256 columns.push(adjusted_column);
1257 }
1258 }
1259 }
1260 None => {
1261 fields.push(left_field.as_ref().clone());
1262 let adjusted_column = adjust_child_validity(left_column, left_validity);
1264 columns.push(adjusted_column);
1265 }
1266 }
1267 }
1268
1269 right_fields
1271 .iter()
1272 .zip(right_columns.iter())
1273 .for_each(|(field, column)| {
1274 if !left_struct_array
1276 .fields()
1277 .iter()
1278 .any(|f| f.name() == field.name())
1279 {
1280 fields.push(field.as_ref().clone());
1281 let adjusted_column = adjust_child_validity(column, right_validity);
1284 columns.push(adjusted_column);
1285 }
1286 });
1287
1288 StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1289}
1290
1291fn merge_with_schema(
1292 left_struct_array: &StructArray,
1293 right_struct_array: &StructArray,
1294 fields: &Fields,
1295) -> StructArray {
1296 fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1298 match (left, right) {
1299 (DataType::Struct(_), DataType::Struct(_)) => true,
1300 (DataType::Struct(_), _) => false,
1301 (_, DataType::Struct(_)) => false,
1302 _ => true,
1303 }
1304 }
1305
1306 let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1307 let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1308
1309 let left_fields = left_struct_array.fields();
1310 let left_columns = left_struct_array.columns();
1311 let right_fields = right_struct_array.fields();
1312 let right_columns = right_struct_array.columns();
1313
1314 let left_validity = left_struct_array.nulls();
1316 let right_validity = right_struct_array.nulls();
1317
1318 let merged_validity = merge_struct_validity(left_validity, right_validity);
1320
1321 for field in fields {
1322 let left_match_idx = left_fields.iter().position(|f| {
1323 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1324 });
1325 let right_match_idx = right_fields.iter().position(|f| {
1326 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1327 });
1328
1329 match (left_match_idx, right_match_idx) {
1330 (None, Some(right_idx)) => {
1331 output_fields.push(right_fields[right_idx].as_ref().clone());
1332 let adjusted_column =
1334 adjust_child_validity(&right_columns[right_idx], right_validity);
1335 columns.push(adjusted_column);
1336 }
1337 (Some(left_idx), None) => {
1338 output_fields.push(left_fields[left_idx].as_ref().clone());
1339 let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1341 columns.push(adjusted_column);
1342 }
1343 (Some(left_idx), Some(right_idx)) => {
1344 match field.data_type() {
1345 DataType::Struct(child_fields) => {
1346 let left_sub_array = left_columns[left_idx].as_struct();
1347 let right_sub_array = right_columns[right_idx].as_struct();
1348 let merged_sub_array =
1349 merge_with_schema(left_sub_array, right_sub_array, child_fields);
1350 output_fields.push(Field::new(
1351 field.name(),
1352 merged_sub_array.data_type().clone(),
1353 field.is_nullable(),
1354 ));
1355 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1356 }
1357 DataType::List(child_field) => {
1358 let left_list = left_columns[left_idx]
1359 .as_any()
1360 .downcast_ref::<ListArray>()
1361 .unwrap();
1362 let right_list = right_columns[right_idx]
1363 .as_any()
1364 .downcast_ref::<ListArray>()
1365 .unwrap();
1366 let merged_values = merge_list_child_values(
1367 child_field.as_ref(),
1368 left_list.trimmed_values(),
1369 right_list.trimmed_values(),
1370 );
1371 let merged_validity =
1372 merge_struct_validity(left_list.nulls(), right_list.nulls());
1373 let merged_list = ListArray::new(
1374 child_field.clone(),
1375 left_list.offsets().clone(),
1376 merged_values,
1377 merged_validity,
1378 );
1379 output_fields.push(field.as_ref().clone());
1380 columns.push(Arc::new(merged_list) as ArrayRef);
1381 }
1382 DataType::LargeList(child_field) => {
1383 let left_list = left_columns[left_idx]
1384 .as_any()
1385 .downcast_ref::<LargeListArray>()
1386 .unwrap();
1387 let right_list = right_columns[right_idx]
1388 .as_any()
1389 .downcast_ref::<LargeListArray>()
1390 .unwrap();
1391 let merged_values = merge_list_child_values(
1392 child_field.as_ref(),
1393 left_list.trimmed_values(),
1394 right_list.trimmed_values(),
1395 );
1396 let merged_validity =
1397 merge_struct_validity(left_list.nulls(), right_list.nulls());
1398 let merged_list = LargeListArray::new(
1399 child_field.clone(),
1400 left_list.offsets().clone(),
1401 merged_values,
1402 merged_validity,
1403 );
1404 output_fields.push(field.as_ref().clone());
1405 columns.push(Arc::new(merged_list) as ArrayRef);
1406 }
1407 DataType::FixedSizeList(child_field, list_size) => {
1408 let left_list = left_columns[left_idx]
1409 .as_any()
1410 .downcast_ref::<FixedSizeListArray>()
1411 .unwrap();
1412 let right_list = right_columns[right_idx]
1413 .as_any()
1414 .downcast_ref::<FixedSizeListArray>()
1415 .unwrap();
1416 let merged_values = merge_list_child_values(
1417 child_field.as_ref(),
1418 left_list.values().clone(),
1419 right_list.values().clone(),
1420 );
1421 let merged_validity =
1422 merge_struct_validity(left_list.nulls(), right_list.nulls());
1423 let merged_list = FixedSizeListArray::new(
1424 child_field.clone(),
1425 *list_size,
1426 merged_values,
1427 merged_validity,
1428 );
1429 output_fields.push(field.as_ref().clone());
1430 columns.push(Arc::new(merged_list) as ArrayRef);
1431 }
1432 _ => {
1433 output_fields.push(left_fields[left_idx].as_ref().clone());
1434 let adjusted_column =
1436 adjust_child_validity(&left_columns[left_idx], left_validity);
1437 columns.push(adjusted_column);
1438 }
1439 }
1440 }
1441 (None, None) => {
1442 }
1444 }
1445 }
1446
1447 StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1448}
1449
1450fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1451 if components.is_empty() {
1452 return Some(array);
1453 }
1454 if !matches!(array.data_type(), DataType::Struct(_)) {
1455 return None;
1456 }
1457 let struct_arr = array.as_struct();
1458 struct_arr
1459 .column_by_name(components[0])
1460 .and_then(|arr| get_sub_array(arr, &components[1..]))
1461}
1462
1463pub fn interleave_batches(
1467 batches: &[RecordBatch],
1468 indices: &[(usize, usize)],
1469) -> Result<RecordBatch> {
1470 let first_batch = batches.first().ok_or_else(|| {
1471 ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1472 })?;
1473 let schema = first_batch.schema();
1474 let num_columns = first_batch.num_columns();
1475 let mut columns = Vec::with_capacity(num_columns);
1476 let mut chunks = Vec::with_capacity(batches.len());
1477
1478 for i in 0..num_columns {
1479 for batch in batches {
1480 chunks.push(batch.column(i).as_ref());
1481 }
1482 let new_column = interleave(&chunks, indices)?;
1483 columns.push(new_column);
1484 chunks.clear();
1485 }
1486
1487 RecordBatch::try_new(schema, columns)
1488}
1489
1490pub trait BufferExt {
1491 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1506
1507 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1516}
1517
1518fn is_pwr_two(n: u64) -> bool {
1519 n & (n - 1) == 0
1520}
1521
1522impl BufferExt for arrow_buffer::Buffer {
1523 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1524 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1525 {
1526 let size_bytes = bytes.len();
1528 Self::copy_bytes_bytes(bytes, size_bytes)
1529 } else {
1530 unsafe {
1533 Self::from_custom_allocation(
1534 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1535 bytes.len(),
1536 Arc::new(bytes),
1537 )
1538 }
1539 }
1540 }
1541
1542 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1543 assert!(size_bytes >= bytes.len());
1544 let mut buf = MutableBuffer::with_capacity(size_bytes);
1545 let to_fill = size_bytes - bytes.len();
1546 buf.extend(bytes);
1547 buf.extend(std::iter::repeat_n(0_u8, to_fill));
1548
1549 buf.shrink_to_fit();
1552
1553 Self::from(buf)
1554 }
1555}
1556
1557#[cfg(test)]
1558mod tests {
1559 use super::*;
1560 use arrow_array::{new_empty_array, new_null_array, ListArray, StringArray};
1561 use arrow_array::{Float32Array, Int32Array, StructArray};
1562 use arrow_buffer::OffsetBuffer;
1563
1564 #[test]
1565 fn test_merge_recursive() {
1566 let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1567 let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1568 let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1569 let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1570
1571 let left_schema = Schema::new(vec![
1572 Field::new("a", DataType::Int32, true),
1573 Field::new(
1574 "b",
1575 DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1576 true,
1577 ),
1578 ]);
1579 let left_batch = RecordBatch::try_new(
1580 Arc::new(left_schema),
1581 vec![
1582 Arc::new(a_array.clone()),
1583 Arc::new(StructArray::from(vec![(
1584 Arc::new(Field::new("c", DataType::Int32, true)),
1585 Arc::new(c_array.clone()) as ArrayRef,
1586 )])),
1587 ],
1588 )
1589 .unwrap();
1590
1591 let right_schema = Schema::new(vec![
1592 Field::new("e", DataType::Int32, true),
1593 Field::new(
1594 "b",
1595 DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1596 true,
1597 ),
1598 ]);
1599 let right_batch = RecordBatch::try_new(
1600 Arc::new(right_schema),
1601 vec![
1602 Arc::new(e_array.clone()),
1603 Arc::new(StructArray::from(vec![(
1604 Arc::new(Field::new("d", DataType::Utf8, true)),
1605 Arc::new(d_array.clone()) as ArrayRef,
1606 )])) as ArrayRef,
1607 ],
1608 )
1609 .unwrap();
1610
1611 let merged_schema = Schema::new(vec![
1612 Field::new("a", DataType::Int32, true),
1613 Field::new(
1614 "b",
1615 DataType::Struct(
1616 vec![
1617 Field::new("c", DataType::Int32, true),
1618 Field::new("d", DataType::Utf8, true),
1619 ]
1620 .into(),
1621 ),
1622 true,
1623 ),
1624 Field::new("e", DataType::Int32, true),
1625 ]);
1626 let merged_batch = RecordBatch::try_new(
1627 Arc::new(merged_schema),
1628 vec![
1629 Arc::new(a_array) as ArrayRef,
1630 Arc::new(StructArray::from(vec![
1631 (
1632 Arc::new(Field::new("c", DataType::Int32, true)),
1633 Arc::new(c_array) as ArrayRef,
1634 ),
1635 (
1636 Arc::new(Field::new("d", DataType::Utf8, true)),
1637 Arc::new(d_array) as ArrayRef,
1638 ),
1639 ])) as ArrayRef,
1640 Arc::new(e_array) as ArrayRef,
1641 ],
1642 )
1643 .unwrap();
1644
1645 let result = left_batch.merge(&right_batch).unwrap();
1646 assert_eq!(result, merged_batch);
1647 }
1648
1649 #[test]
1650 fn test_merge_with_schema() {
1651 fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1652 let fields: Fields = names
1653 .iter()
1654 .zip(types)
1655 .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1656 .collect();
1657 let schema = Schema::new(vec![Field::new(
1658 "struct",
1659 DataType::Struct(fields.clone()),
1660 false,
1661 )]);
1662 let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1663 let batch = RecordBatch::try_new(
1664 Arc::new(schema.clone()),
1665 vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1666 );
1667 (schema, batch.unwrap())
1668 }
1669
1670 let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1671 let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1672 let (output_schema, _) = test_batch(
1673 &["b", "a", "c"],
1674 &[DataType::Int64, DataType::Int32, DataType::Int32],
1675 );
1676
1677 let merged = left_batch
1679 .merge_with_schema(&right_batch, &output_schema)
1680 .unwrap();
1681 assert_eq!(merged.schema().as_ref(), &output_schema);
1682
1683 let (naive_schema, _) = test_batch(
1685 &["a", "b", "c"],
1686 &[DataType::Int32, DataType::Int64, DataType::Int32],
1687 );
1688 let merged = left_batch.merge(&right_batch).unwrap();
1689 assert_eq!(merged.schema().as_ref(), &naive_schema);
1690 }
1691
1692 #[test]
1693 fn test_merge_list_struct() {
1694 let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1695 let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1696 let x_struct_field = Arc::new(Field::new(
1697 "item",
1698 DataType::Struct(Fields::from(vec![x_field.clone()])),
1699 true,
1700 ));
1701 let y_struct_field = Arc::new(Field::new(
1702 "item",
1703 DataType::Struct(Fields::from(vec![y_field.clone()])),
1704 true,
1705 ));
1706 let both_struct_field = Arc::new(Field::new(
1707 "item",
1708 DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1709 true,
1710 ));
1711 let left_schema = Schema::new(vec![Field::new(
1712 "list_struct",
1713 DataType::List(x_struct_field.clone()),
1714 true,
1715 )]);
1716 let right_schema = Schema::new(vec![Field::new(
1717 "list_struct",
1718 DataType::List(y_struct_field.clone()),
1719 true,
1720 )]);
1721 let both_schema = Schema::new(vec![Field::new(
1722 "list_struct",
1723 DataType::List(both_struct_field.clone()),
1724 true,
1725 )]);
1726
1727 let x = Arc::new(Int32Array::from(vec![1]));
1728 let y = Arc::new(Int32Array::from(vec![2]));
1729 let x_struct = Arc::new(StructArray::new(
1730 Fields::from(vec![x_field.clone()]),
1731 vec![x.clone()],
1732 None,
1733 ));
1734 let y_struct = Arc::new(StructArray::new(
1735 Fields::from(vec![y_field.clone()]),
1736 vec![y.clone()],
1737 None,
1738 ));
1739 let both_struct = Arc::new(StructArray::new(
1740 Fields::from(vec![x_field.clone(), y_field.clone()]),
1741 vec![x.clone(), y],
1742 None,
1743 ));
1744 let both_null_struct = Arc::new(StructArray::new(
1745 Fields::from(vec![x_field, y_field]),
1746 vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1747 None,
1748 ));
1749 let offsets = OffsetBuffer::from_lengths([1]);
1750 let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1751 let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1752 let both_list = ListArray::new(
1753 both_struct_field.clone(),
1754 offsets.clone(),
1755 both_struct,
1756 None,
1757 );
1758 let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1759 let x_batch =
1760 RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1761 let y_batch = RecordBatch::try_new(
1762 Arc::new(right_schema.clone()),
1763 vec![Arc::new(y_s_list.clone())],
1764 )
1765 .unwrap();
1766 let merged = x_batch.merge(&y_batch).unwrap();
1767 let expected =
1768 RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1769 assert_eq!(merged, expected);
1770
1771 let y_null_list = new_null_array(y_s_list.data_type(), 1);
1772 let y_null_batch =
1773 RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1774 .unwrap();
1775 let expected =
1776 RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1777 let merged = x_batch.merge(&y_null_batch).unwrap();
1778 assert_eq!(merged, expected);
1779 }
1780
1781 #[test]
1782 fn test_byte_width_opt() {
1783 assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1784 assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1785 assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1786 assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1787 assert_eq!(DataType::Utf8.byte_width_opt(), None);
1788 assert_eq!(DataType::Binary.byte_width_opt(), None);
1789 assert_eq!(
1790 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1791 None
1792 );
1793 assert_eq!(
1794 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1795 .byte_width_opt(),
1796 Some(12)
1797 );
1798 assert_eq!(
1799 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1800 .byte_width_opt(),
1801 Some(16)
1802 );
1803 assert_eq!(
1804 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1805 .byte_width_opt(),
1806 None
1807 );
1808 }
1809
1810 #[test]
1811 fn test_take_record_batch() {
1812 let schema = Arc::new(Schema::new(vec![
1813 Field::new("a", DataType::Int32, true),
1814 Field::new("b", DataType::Utf8, true),
1815 ]));
1816 let batch = RecordBatch::try_new(
1817 schema.clone(),
1818 vec![
1819 Arc::new(Int32Array::from_iter_values(0..20)),
1820 Arc::new(StringArray::from_iter_values(
1821 (0..20).map(|i| format!("str-{}", i)),
1822 )),
1823 ],
1824 )
1825 .unwrap();
1826 let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1827 assert_eq!(
1828 taken,
1829 RecordBatch::try_new(
1830 schema,
1831 vec![
1832 Arc::new(Int32Array::from(vec![1, 5, 10])),
1833 Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1834 ],
1835 )
1836 .unwrap()
1837 )
1838 }
1839
1840 #[test]
1841 fn test_schema_project_by_schema() {
1842 let metadata = [("key".to_string(), "value".to_string())];
1843 let schema = Arc::new(
1844 Schema::new(vec![
1845 Field::new("a", DataType::Int32, true),
1846 Field::new("b", DataType::Utf8, true),
1847 ])
1848 .with_metadata(metadata.clone().into()),
1849 );
1850 let batch = RecordBatch::try_new(
1851 schema,
1852 vec![
1853 Arc::new(Int32Array::from_iter_values(0..20)),
1854 Arc::new(StringArray::from_iter_values(
1855 (0..20).map(|i| format!("str-{}", i)),
1856 )),
1857 ],
1858 )
1859 .unwrap();
1860
1861 let empty_schema = Schema::empty();
1863 let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1864 let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1865 assert_eq!(
1866 empty_projected,
1867 RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1868 .with_schema(Arc::new(expected_schema))
1869 .unwrap()
1870 );
1871
1872 let reordered_schema = Schema::new(vec![
1874 Field::new("b", DataType::Utf8, true),
1875 Field::new("a", DataType::Int32, true),
1876 ]);
1877 let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1878 let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1879 assert_eq!(
1880 reordered_projected,
1881 RecordBatch::try_new(
1882 expected_schema,
1883 vec![
1884 Arc::new(StringArray::from_iter_values(
1885 (0..20).map(|i| format!("str-{}", i)),
1886 )),
1887 Arc::new(Int32Array::from_iter_values(0..20)),
1888 ],
1889 )
1890 .unwrap()
1891 );
1892
1893 let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1895 let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1896 let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1897 assert_eq!(
1898 sub_projected,
1899 RecordBatch::try_new(
1900 expected_schema,
1901 vec![Arc::new(Int32Array::from_iter_values(0..20))],
1902 )
1903 .unwrap()
1904 );
1905 }
1906
1907 #[test]
1908 fn test_project_preserves_struct_validity() {
1909 let fields = Fields::from(vec![
1911 Field::new("id", DataType::Int32, false),
1912 Field::new("value", DataType::Float32, true),
1913 ]);
1914
1915 let id_array = Int32Array::from(vec![1, 2, 3]);
1917 let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1918 let struct_array = StructArray::new(
1919 fields.clone(),
1920 vec![
1921 Arc::new(id_array) as ArrayRef,
1922 Arc::new(value_array) as ArrayRef,
1923 ],
1924 Some(vec![true, false, true].into()), );
1926
1927 let projected = project(&struct_array, &fields).unwrap();
1929
1930 assert_eq!(projected.null_count(), 1);
1932 assert!(!projected.is_null(0));
1933 assert!(projected.is_null(1));
1934 assert!(!projected.is_null(2));
1935 }
1936
1937 #[test]
1938 fn test_merge_struct_with_different_validity() {
1939 let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1942 let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1943 let left_struct = StructArray::new(
1944 left_fields,
1945 vec![Arc::new(height_array) as ArrayRef],
1946 Some(vec![true, false, true, false].into()), );
1948
1949 let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1951 let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1952 let right_struct = StructArray::new(
1953 right_fields,
1954 vec![Arc::new(width_array) as ArrayRef],
1955 Some(vec![true, true, false, false].into()), );
1957
1958 let merged = merge(&left_struct, &right_struct);
1960
1961 assert_eq!(merged.null_count(), 1); assert!(!merged.is_null(0));
1969 assert!(!merged.is_null(1));
1970 assert!(!merged.is_null(2));
1971 assert!(merged.is_null(3));
1972
1973 let height_col = merged.column_by_name("height").unwrap();
1975 let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1976 assert_eq!(height_values.value(0), 500);
1977 assert!(height_values.is_null(1)); assert_eq!(height_values.value(2), 600);
1979
1980 let width_col = merged.column_by_name("width").unwrap();
1981 let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1982 assert_eq!(width_values.value(0), 300);
1983 assert_eq!(width_values.value(1), 200);
1984 assert!(width_values.is_null(2)); }
1986
1987 #[test]
1988 fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
1989 let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
1991 let left_count = Arc::new(Int32Array::from(vec![None, None]));
1992 let left_struct = Arc::new(StructArray::new(
1993 Fields::from(vec![
1994 Field::new("company_id", DataType::Int32, true),
1995 Field::new("count", DataType::Int32, true),
1996 ]),
1997 vec![left_company_id, left_count],
1998 None,
1999 ));
2000 let left_list = Arc::new(ListArray::new(
2001 Arc::new(Field::new(
2002 "item",
2003 DataType::Struct(left_struct.fields().clone()),
2004 true,
2005 )),
2006 OffsetBuffer::from_lengths([2]),
2007 left_struct,
2008 None,
2009 ));
2010
2011 let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
2013 let right_struct = Arc::new(StructArray::new(
2014 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2015 vec![right_company_name],
2016 None,
2017 ));
2018 let right_list = Arc::new(ListArray::new(
2019 Arc::new(Field::new(
2020 "item",
2021 DataType::Struct(right_struct.fields().clone()),
2022 true,
2023 )),
2024 OffsetBuffer::from_lengths([2]),
2025 right_struct,
2026 None,
2027 ));
2028
2029 let target_fields = Fields::from(vec![Field::new(
2030 "companies",
2031 DataType::List(Arc::new(Field::new(
2032 "item",
2033 DataType::Struct(Fields::from(vec![
2034 Field::new("company_id", DataType::Int32, true),
2035 Field::new("company_name", DataType::Utf8, true),
2036 Field::new("count", DataType::Int32, true),
2037 ])),
2038 true,
2039 ))),
2040 true,
2041 )]);
2042
2043 let left_batch = RecordBatch::try_new(
2044 Arc::new(Schema::new(vec![Field::new(
2045 "companies",
2046 left_list.data_type().clone(),
2047 true,
2048 )])),
2049 vec![left_list as ArrayRef],
2050 )
2051 .unwrap();
2052
2053 let right_batch = RecordBatch::try_new(
2054 Arc::new(Schema::new(vec![Field::new(
2055 "companies",
2056 right_list.data_type().clone(),
2057 true,
2058 )])),
2059 vec![right_list as ArrayRef],
2060 )
2061 .unwrap();
2062
2063 let merged = left_batch
2064 .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2065 .unwrap();
2066
2067 let merged_list = merged
2069 .column_by_name("companies")
2070 .unwrap()
2071 .as_any()
2072 .downcast_ref::<ListArray>()
2073 .unwrap();
2074 let merged_struct = merged_list.values().as_struct();
2075
2076 assert_eq!(merged_struct.num_columns(), 3);
2078 assert!(merged_struct.column_by_name("company_id").is_some());
2079 assert!(merged_struct.column_by_name("company_name").is_some());
2080 assert!(merged_struct.column_by_name("count").is_some());
2081
2082 let company_id = merged_struct
2084 .column_by_name("company_id")
2085 .unwrap()
2086 .as_any()
2087 .downcast_ref::<Int32Array>()
2088 .unwrap();
2089 assert!(company_id.is_null(0));
2090 assert!(company_id.is_null(1));
2091
2092 let company_name = merged_struct
2093 .column_by_name("company_name")
2094 .unwrap()
2095 .as_any()
2096 .downcast_ref::<StringArray>()
2097 .unwrap();
2098 assert_eq!(company_name.value(0), "Google");
2099 assert_eq!(company_name.value(1), "Microsoft");
2100
2101 let count = merged_struct
2102 .column_by_name("count")
2103 .unwrap()
2104 .as_any()
2105 .downcast_ref::<Int32Array>()
2106 .unwrap();
2107 assert!(count.is_null(0));
2108 assert!(count.is_null(1));
2109 }
2110
2111 #[test]
2112 fn test_merge_struct_lists() {
2113 test_merge_struct_lists_generic::<i32>();
2114 }
2115
2116 #[test]
2117 fn test_merge_struct_large_lists() {
2118 test_merge_struct_lists_generic::<i64>();
2119 }
2120
2121 fn test_merge_struct_lists_generic<O: OffsetSizeTrait>() {
2122 let left_company_id = Arc::new(Int32Array::from(vec![
2124 Some(1),
2125 Some(2),
2126 Some(3),
2127 Some(4),
2128 Some(5),
2129 Some(6),
2130 Some(7),
2131 Some(8),
2132 Some(9),
2133 Some(10),
2134 Some(11),
2135 Some(12),
2136 Some(13),
2137 Some(14),
2138 Some(15),
2139 Some(16),
2140 Some(17),
2141 Some(18),
2142 Some(19),
2143 Some(20),
2144 ]));
2145 let left_count = Arc::new(Int32Array::from(vec![
2146 Some(10),
2147 Some(20),
2148 Some(30),
2149 Some(40),
2150 Some(50),
2151 Some(60),
2152 Some(70),
2153 Some(80),
2154 Some(90),
2155 Some(100),
2156 Some(110),
2157 Some(120),
2158 Some(130),
2159 Some(140),
2160 Some(150),
2161 Some(160),
2162 Some(170),
2163 Some(180),
2164 Some(190),
2165 Some(200),
2166 ]));
2167 let left_struct = Arc::new(StructArray::new(
2168 Fields::from(vec![
2169 Field::new("company_id", DataType::Int32, true),
2170 Field::new("count", DataType::Int32, true),
2171 ]),
2172 vec![left_company_id, left_count],
2173 None,
2174 ));
2175
2176 let left_list = Arc::new(GenericListArray::<O>::new(
2177 Arc::new(Field::new(
2178 "item",
2179 DataType::Struct(left_struct.fields().clone()),
2180 true,
2181 )),
2182 OffsetBuffer::from_lengths([3, 1]),
2183 left_struct.clone(),
2184 None,
2185 ));
2186
2187 let left_list_struct = Arc::new(StructArray::new(
2188 Fields::from(vec![Field::new(
2189 "companies",
2190 if O::IS_LARGE {
2191 DataType::LargeList(Arc::new(Field::new(
2192 "item",
2193 DataType::Struct(left_struct.fields().clone()),
2194 true,
2195 )))
2196 } else {
2197 DataType::List(Arc::new(Field::new(
2198 "item",
2199 DataType::Struct(left_struct.fields().clone()),
2200 true,
2201 )))
2202 },
2203 true,
2204 )]),
2205 vec![left_list as ArrayRef],
2206 None,
2207 ));
2208
2209 let right_company_name = Arc::new(StringArray::from(vec![
2211 "Google",
2212 "Microsoft",
2213 "Apple",
2214 "Facebook",
2215 ]));
2216 let right_struct = Arc::new(StructArray::new(
2217 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2218 vec![right_company_name],
2219 None,
2220 ));
2221 let right_list = Arc::new(GenericListArray::<O>::new(
2222 Arc::new(Field::new(
2223 "item",
2224 DataType::Struct(right_struct.fields().clone()),
2225 true,
2226 )),
2227 OffsetBuffer::from_lengths([3, 1]),
2228 right_struct.clone(),
2229 None,
2230 ));
2231
2232 let right_list_struct = Arc::new(StructArray::new(
2233 Fields::from(vec![Field::new(
2234 "companies",
2235 if O::IS_LARGE {
2236 DataType::LargeList(Arc::new(Field::new(
2237 "item",
2238 DataType::Struct(right_struct.fields().clone()),
2239 true,
2240 )))
2241 } else {
2242 DataType::List(Arc::new(Field::new(
2243 "item",
2244 DataType::Struct(right_struct.fields().clone()),
2245 true,
2246 )))
2247 },
2248 true,
2249 )]),
2250 vec![right_list as ArrayRef],
2251 None,
2252 ));
2253
2254 let target_fields = Fields::from(vec![Field::new(
2256 "companies",
2257 if O::IS_LARGE {
2258 DataType::LargeList(Arc::new(Field::new(
2259 "item",
2260 DataType::Struct(Fields::from(vec![
2261 Field::new("company_id", DataType::Int32, true),
2262 Field::new("company_name", DataType::Utf8, true),
2263 Field::new("count", DataType::Int32, true),
2264 ])),
2265 true,
2266 )))
2267 } else {
2268 DataType::List(Arc::new(Field::new(
2269 "item",
2270 DataType::Struct(Fields::from(vec![
2271 Field::new("company_id", DataType::Int32, true),
2272 Field::new("company_name", DataType::Utf8, true),
2273 Field::new("count", DataType::Int32, true),
2274 ])),
2275 true,
2276 )))
2277 },
2278 true,
2279 )]);
2280
2281 let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
2283 assert_eq!(merged_array.len(), 2);
2284 }
2285
2286 #[test]
2287 fn test_project_by_schema_list_struct_reorder() {
2288 let source_inner_struct = DataType::Struct(Fields::from(vec![
2293 Field::new("c", DataType::Utf8, true),
2294 Field::new("b", DataType::Utf8, true),
2295 Field::new("a", DataType::Utf8, true),
2296 ]));
2297 let source_schema = Arc::new(Schema::new(vec![
2298 Field::new("id", DataType::Int32, false),
2299 Field::new(
2300 "data",
2301 DataType::List(Arc::new(Field::new(
2302 "item",
2303 source_inner_struct.clone(),
2304 true,
2305 ))),
2306 true,
2307 ),
2308 ]));
2309
2310 let c_array = StringArray::from(vec!["c1", "c2"]);
2312 let b_array = StringArray::from(vec!["b1", "b2"]);
2313 let a_array = StringArray::from(vec!["a1", "a2"]);
2314 let inner_struct = StructArray::from(vec![
2315 (
2316 Arc::new(Field::new("c", DataType::Utf8, true)),
2317 Arc::new(c_array) as ArrayRef,
2318 ),
2319 (
2320 Arc::new(Field::new("b", DataType::Utf8, true)),
2321 Arc::new(b_array) as ArrayRef,
2322 ),
2323 (
2324 Arc::new(Field::new("a", DataType::Utf8, true)),
2325 Arc::new(a_array) as ArrayRef,
2326 ),
2327 ]);
2328
2329 let list_array = ListArray::new(
2330 Arc::new(Field::new("item", source_inner_struct, true)),
2331 OffsetBuffer::from_lengths([1, 1]),
2332 Arc::new(inner_struct),
2333 None,
2334 );
2335
2336 let batch = RecordBatch::try_new(
2337 source_schema,
2338 vec![Arc::new(Int32Array::from(vec![1, 2])), Arc::new(list_array)],
2339 )
2340 .unwrap();
2341
2342 let target_inner_struct = DataType::Struct(Fields::from(vec![
2344 Field::new("a", DataType::Utf8, true),
2345 Field::new("b", DataType::Utf8, true),
2346 Field::new("c", DataType::Utf8, true),
2347 ]));
2348 let target_schema = Schema::new(vec![
2349 Field::new("id", DataType::Int32, false),
2350 Field::new(
2351 "data",
2352 DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2353 true,
2354 ),
2355 ]);
2356
2357 let projected = batch.project_by_schema(&target_schema).unwrap();
2359
2360 assert_eq!(projected.schema().as_ref(), &target_schema);
2362
2363 let projected_list = projected.column(1).as_list::<i32>();
2365 let projected_struct = projected_list.values().as_struct();
2366
2367 assert_eq!(
2369 projected_struct.column_by_name("a").unwrap().as_ref(),
2370 &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2371 );
2372 assert_eq!(
2373 projected_struct.column_by_name("b").unwrap().as_ref(),
2374 &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2375 );
2376 assert_eq!(
2377 projected_struct.column_by_name("c").unwrap().as_ref(),
2378 &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2379 );
2380
2381 assert_eq!(
2383 projected_struct.column(0).as_ref(),
2384 &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2385 );
2386 assert_eq!(
2387 projected_struct.column(1).as_ref(),
2388 &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2389 );
2390 assert_eq!(
2391 projected_struct.column(2).as_ref(),
2392 &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2393 );
2394 }
2395
2396 #[test]
2397 fn test_project_by_schema_nested_list_struct() {
2398 let inner_struct = DataType::Struct(Fields::from(vec![
2400 Field::new("y", DataType::Int32, true),
2401 Field::new("x", DataType::Int32, true),
2402 ]));
2403 let source_schema = Arc::new(Schema::new(vec![Field::new(
2404 "outer",
2405 DataType::List(Arc::new(Field::new(
2406 "item",
2407 DataType::Struct(Fields::from(vec![
2408 Field::new("b", DataType::Utf8, true),
2409 Field::new(
2410 "inner_list",
2411 DataType::List(Arc::new(Field::new("item", inner_struct.clone(), true))),
2412 true,
2413 ),
2414 Field::new("a", DataType::Utf8, true),
2415 ])),
2416 true,
2417 ))),
2418 true,
2419 )]));
2420
2421 let y_array = Int32Array::from(vec![1, 2]);
2423 let x_array = Int32Array::from(vec![3, 4]);
2424 let innermost_struct = StructArray::from(vec![
2425 (
2426 Arc::new(Field::new("y", DataType::Int32, true)),
2427 Arc::new(y_array) as ArrayRef,
2428 ),
2429 (
2430 Arc::new(Field::new("x", DataType::Int32, true)),
2431 Arc::new(x_array) as ArrayRef,
2432 ),
2433 ]);
2434 let inner_list = ListArray::new(
2435 Arc::new(Field::new("item", inner_struct.clone(), true)),
2436 OffsetBuffer::from_lengths([2]),
2437 Arc::new(innermost_struct),
2438 None,
2439 );
2440
2441 let b_array = StringArray::from(vec!["b1"]);
2442 let a_array = StringArray::from(vec!["a1"]);
2443 let middle_struct = StructArray::from(vec![
2444 (
2445 Arc::new(Field::new("b", DataType::Utf8, true)),
2446 Arc::new(b_array) as ArrayRef,
2447 ),
2448 (
2449 Arc::new(Field::new(
2450 "inner_list",
2451 DataType::List(Arc::new(Field::new("item", inner_struct, true))),
2452 true,
2453 )),
2454 Arc::new(inner_list) as ArrayRef,
2455 ),
2456 (
2457 Arc::new(Field::new("a", DataType::Utf8, true)),
2458 Arc::new(a_array) as ArrayRef,
2459 ),
2460 ]);
2461
2462 let outer_list = ListArray::new(
2463 Arc::new(Field::new("item", middle_struct.data_type().clone(), true)),
2464 OffsetBuffer::from_lengths([1]),
2465 Arc::new(middle_struct),
2466 None,
2467 );
2468
2469 let batch =
2470 RecordBatch::try_new(source_schema, vec![Arc::new(outer_list) as ArrayRef]).unwrap();
2471
2472 let target_inner_struct = DataType::Struct(Fields::from(vec![
2474 Field::new("x", DataType::Int32, true), Field::new("y", DataType::Int32, true),
2476 ]));
2477 let target_schema = Schema::new(vec![Field::new(
2478 "outer",
2479 DataType::List(Arc::new(Field::new(
2480 "item",
2481 DataType::Struct(Fields::from(vec![
2482 Field::new("a", DataType::Utf8, true), Field::new(
2484 "inner_list",
2485 DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2486 true,
2487 ),
2488 Field::new("b", DataType::Utf8, true),
2489 ])),
2490 true,
2491 ))),
2492 true,
2493 )]);
2494
2495 let projected = batch.project_by_schema(&target_schema).unwrap();
2496
2497 assert_eq!(projected.schema().as_ref(), &target_schema);
2499
2500 let outer_list = projected.column(0).as_list::<i32>();
2502 let middle_struct = outer_list.values().as_struct();
2503
2504 assert_eq!(
2506 middle_struct.column(0).as_ref(),
2507 &StringArray::from(vec!["a1"]) as &dyn Array
2508 );
2509 assert_eq!(
2510 middle_struct.column(2).as_ref(),
2511 &StringArray::from(vec!["b1"]) as &dyn Array
2512 );
2513
2514 let inner_list = middle_struct.column(1).as_list::<i32>();
2516 let innermost_struct = inner_list.values().as_struct();
2517 assert_eq!(
2518 innermost_struct.column(0).as_ref(),
2519 &Int32Array::from(vec![3, 4]) as &dyn Array
2520 );
2521 assert_eq!(
2522 innermost_struct.column(1).as_ref(),
2523 &Int32Array::from(vec![1, 2]) as &dyn Array
2524 );
2525 }
2526}