1use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12 Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray, GenericListArray,
13 LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray,
14 UInt8Array, UInt32Array, cast::AsArray,
15};
16use arrow_array::{
17 Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, new_null_array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SortOptions};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30use crate::list::ListArrayExt;
31pub use floats::*;
32
33pub mod cast;
34pub mod json;
35pub mod list;
36pub mod memory;
37pub mod scalar;
38pub mod stream;
39pub mod r#struct;
40
41pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
43
44pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
46
47pub const BLOB_META_KEY: &str = "lance-encoding:blob";
50pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
52pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str =
54 "lance-encoding:blob-dedicated-size-threshold";
55
56type Result<T> = std::result::Result<T, ArrowError>;
57
58pub trait DataTypeExt {
59 fn is_binary_like(&self) -> bool;
72
73 fn is_struct(&self) -> bool;
75
76 fn is_fixed_stride(&self) -> bool;
81
82 fn is_dictionary(&self) -> bool;
84
85 fn byte_width(&self) -> usize;
88
89 fn byte_width_opt(&self) -> Option<usize>;
92}
93
94impl DataTypeExt for DataType {
95 fn is_binary_like(&self) -> bool {
96 use DataType::*;
97 matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
98 }
99
100 fn is_struct(&self) -> bool {
101 matches!(self, Self::Struct(_))
102 }
103
104 fn is_fixed_stride(&self) -> bool {
105 use DataType::*;
106 matches!(
107 self,
108 Boolean
109 | UInt8
110 | UInt16
111 | UInt32
112 | UInt64
113 | Int8
114 | Int16
115 | Int32
116 | Int64
117 | Float16
118 | Float32
119 | Float64
120 | Decimal128(_, _)
121 | Decimal256(_, _)
122 | FixedSizeList(_, _)
123 | FixedSizeBinary(_)
124 | Duration(_)
125 | Timestamp(_, _)
126 | Date32
127 | Date64
128 | Time32(_)
129 | Time64(_)
130 )
131 }
132
133 fn is_dictionary(&self) -> bool {
134 matches!(self, Self::Dictionary(_, _))
135 }
136
137 fn byte_width_opt(&self) -> Option<usize> {
138 match self {
139 Self::Int8 => Some(1),
140 Self::Int16 => Some(2),
141 Self::Int32 => Some(4),
142 Self::Int64 => Some(8),
143 Self::UInt8 => Some(1),
144 Self::UInt16 => Some(2),
145 Self::UInt32 => Some(4),
146 Self::UInt64 => Some(8),
147 Self::Float16 => Some(2),
148 Self::Float32 => Some(4),
149 Self::Float64 => Some(8),
150 Self::Date32 => Some(4),
151 Self::Date64 => Some(8),
152 Self::Time32(_) => Some(4),
153 Self::Time64(_) => Some(8),
154 Self::Timestamp(_, _) => Some(8),
155 Self::Duration(_) => Some(8),
156 Self::Decimal128(_, _) => Some(16),
157 Self::Decimal256(_, _) => Some(32),
158 Self::Interval(unit) => match unit {
159 IntervalUnit::YearMonth => Some(4),
160 IntervalUnit::DayTime => Some(8),
161 IntervalUnit::MonthDayNano => Some(16),
162 },
163 Self::FixedSizeBinary(s) => Some(*s as usize),
164 Self::FixedSizeList(dt, s) => dt
165 .data_type()
166 .byte_width_opt()
167 .map(|width| width * *s as usize),
168 _ => None,
169 }
170 }
171
172 fn byte_width(&self) -> usize {
173 self.byte_width_opt()
174 .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
175 }
176}
177
178pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
196 values: T,
197 offsets: &PrimitiveArray<Offset>,
198) -> Result<GenericListArray<Offset::Native>>
199where
200 Offset::Native: OffsetSizeTrait,
201{
202 let data_type = if Offset::Native::IS_LARGE {
203 DataType::LargeList(Arc::new(Field::new(
204 "item",
205 values.data_type().clone(),
206 true,
207 )))
208 } else {
209 DataType::List(Arc::new(Field::new(
210 "item",
211 values.data_type().clone(),
212 true,
213 )))
214 };
215 let data = ArrayDataBuilder::new(data_type)
216 .len(offsets.len() - 1)
217 .add_buffer(offsets.into_data().buffers()[0].clone())
218 .add_child_data(values.into_data())
219 .build()?;
220
221 Ok(GenericListArray::from(data))
222}
223
224pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
225 DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
226}
227
228pub trait FixedSizeListArrayExt {
229 fn try_new_from_values<T: Array + 'static>(
248 values: T,
249 list_size: i32,
250 ) -> Result<FixedSizeListArray>;
251
252 fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
266
267 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
270}
271
272impl FixedSizeListArrayExt for FixedSizeListArray {
273 fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
274 let field = Arc::new(Field::new("item", values.data_type().clone(), true));
275 let values = Arc::new(values);
276
277 Self::try_new(field, list_size, values, None)
278 }
279
280 fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
281 if n >= self.len() {
282 return Ok(self.clone());
283 }
284 let mut rng = SmallRng::from_os_rng();
285 let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
286 take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
287 }
288
289 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
290 match self.data_type() {
291 DataType::FixedSizeList(field, size) => match field.data_type() {
292 DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
293 DataType::Int8 => Ok(Self::new(
294 Arc::new(arrow_schema::Field::new(
295 field.name(),
296 DataType::Float32,
297 field.is_nullable(),
298 )),
299 *size,
300 Arc::new(Float32Array::from_iter_values(
301 self.values()
302 .as_any()
303 .downcast_ref::<Int8Array>()
304 .ok_or(ArrowError::ParseError(
305 "Fail to cast primitive array to Int8Type".to_string(),
306 ))?
307 .into_iter()
308 .filter_map(|x| x.map(|y| y as f32)),
309 )),
310 self.nulls().cloned(),
311 )),
312 DataType::Int16 => Ok(Self::new(
313 Arc::new(arrow_schema::Field::new(
314 field.name(),
315 DataType::Float32,
316 field.is_nullable(),
317 )),
318 *size,
319 Arc::new(Float32Array::from_iter_values(
320 self.values()
321 .as_any()
322 .downcast_ref::<Int16Array>()
323 .ok_or(ArrowError::ParseError(
324 "Fail to cast primitive array to Int16Type".to_string(),
325 ))?
326 .into_iter()
327 .filter_map(|x| x.map(|y| y as f32)),
328 )),
329 self.nulls().cloned(),
330 )),
331 DataType::Int32 => Ok(Self::new(
332 Arc::new(arrow_schema::Field::new(
333 field.name(),
334 DataType::Float32,
335 field.is_nullable(),
336 )),
337 *size,
338 Arc::new(Float32Array::from_iter_values(
339 self.values()
340 .as_any()
341 .downcast_ref::<Int32Array>()
342 .ok_or(ArrowError::ParseError(
343 "Fail to cast primitive array to Int32Type".to_string(),
344 ))?
345 .into_iter()
346 .filter_map(|x| x.map(|y| y as f32)),
347 )),
348 self.nulls().cloned(),
349 )),
350 DataType::Int64 => Ok(Self::new(
351 Arc::new(arrow_schema::Field::new(
352 field.name(),
353 DataType::Float64,
354 field.is_nullable(),
355 )),
356 *size,
357 Arc::new(Float64Array::from_iter_values(
358 self.values()
359 .as_any()
360 .downcast_ref::<Int64Array>()
361 .ok_or(ArrowError::ParseError(
362 "Fail to cast primitive array to Int64Type".to_string(),
363 ))?
364 .into_iter()
365 .filter_map(|x| x.map(|y| y as f64)),
366 )),
367 self.nulls().cloned(),
368 )),
369 DataType::UInt8 => Ok(Self::new(
370 Arc::new(arrow_schema::Field::new(
371 field.name(),
372 DataType::Float64,
373 field.is_nullable(),
374 )),
375 *size,
376 Arc::new(Float64Array::from_iter_values(
377 self.values()
378 .as_any()
379 .downcast_ref::<UInt8Array>()
380 .ok_or(ArrowError::ParseError(
381 "Fail to cast primitive array to UInt8Type".to_string(),
382 ))?
383 .into_iter()
384 .filter_map(|x| x.map(|y| y as f64)),
385 )),
386 self.nulls().cloned(),
387 )),
388 DataType::UInt32 => Ok(Self::new(
389 Arc::new(arrow_schema::Field::new(
390 field.name(),
391 DataType::Float64,
392 field.is_nullable(),
393 )),
394 *size,
395 Arc::new(Float64Array::from_iter_values(
396 self.values()
397 .as_any()
398 .downcast_ref::<UInt32Array>()
399 .ok_or(ArrowError::ParseError(
400 "Fail to cast primitive array to UInt32Type".to_string(),
401 ))?
402 .into_iter()
403 .filter_map(|x| x.map(|y| y as f64)),
404 )),
405 self.nulls().cloned(),
406 )),
407 data_type => Err(ArrowError::ParseError(format!(
408 "Expect either floating type or integer got {:?}",
409 data_type
410 ))),
411 },
412 data_type => Err(ArrowError::ParseError(format!(
413 "Expect either FixedSizeList got {:?}",
414 data_type
415 ))),
416 }
417 }
418}
419
420pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
423 arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
424}
425
426pub trait FixedSizeBinaryArrayExt {
427 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
446}
447
448impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
449 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
450 let data_type = DataType::FixedSizeBinary(stride);
451 let data = ArrayDataBuilder::new(data_type)
452 .len(values.len() / stride as usize)
453 .add_buffer(values.into_data().buffers()[0].clone())
454 .build()?;
455 Ok(Self::from(data))
456 }
457}
458
459pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
460 arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
461}
462
463pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
464 match arr.data_type() {
465 DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
466 DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
467 _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
468 }
469}
470
471pub trait RecordBatchExt {
473 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
503
504 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
506
507 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
511
512 fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
557
558 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
568
569 fn drop_column(&self, name: &str) -> Result<RecordBatch>;
573
574 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
576
577 fn replace_column_schema_by_name(
579 &self,
580 name: &str,
581 new_data_type: DataType,
582 column: Arc<dyn Array>,
583 ) -> Result<RecordBatch>;
584
585 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
587
588 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
590
591 fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
593
594 fn metadata(&self) -> &HashMap<String, String>;
596
597 fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
599 let mut metadata = self.metadata().clone();
600 metadata.insert(key, value);
601 self.with_metadata(metadata)
602 }
603
604 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
606
607 fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
609
610 fn shrink_to_fit(&self) -> Result<RecordBatch>;
612
613 fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<RecordBatch>;
615}
616
617impl RecordBatchExt for RecordBatch {
618 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
619 let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
620 let mut new_columns = self.columns().to_vec();
621 new_columns.push(arr);
622 Self::try_new(new_schema, new_columns)
623 }
624
625 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
626 let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
627 let mut new_columns = self.columns().to_vec();
628 new_columns.insert(index, arr);
629 Self::try_new(new_schema, new_columns)
630 }
631
632 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
633 let schema = Arc::new(Schema::new_with_metadata(
634 arr.fields().to_vec(),
635 self.schema().metadata.clone(),
636 ));
637 let batch = Self::from(arr);
638 batch.with_schema(schema)
639 }
640
641 fn merge(&self, other: &Self) -> Result<Self> {
642 if self.num_rows() != other.num_rows() {
643 return Err(ArrowError::InvalidArgumentError(format!(
644 "Attempt to merge two RecordBatch with different sizes: {} != {}",
645 self.num_rows(),
646 other.num_rows()
647 )));
648 }
649 let left_struct_array: StructArray = self.clone().into();
650 let right_struct_array: StructArray = other.clone().into();
651 self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
652 }
653
654 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
655 if self.num_rows() != other.num_rows() {
656 return Err(ArrowError::InvalidArgumentError(format!(
657 "Attempt to merge two RecordBatch with different sizes: {} != {}",
658 self.num_rows(),
659 other.num_rows()
660 )));
661 }
662 let left_struct_array: StructArray = self.clone().into();
663 let right_struct_array: StructArray = other.clone().into();
664 self.try_new_from_struct_array(merge_with_schema(
665 &left_struct_array,
666 &right_struct_array,
667 schema.fields(),
668 ))
669 }
670
671 fn drop_column(&self, name: &str) -> Result<Self> {
672 let mut fields = vec![];
673 let mut columns = vec![];
674 for i in 0..self.schema().fields.len() {
675 if self.schema().field(i).name() != name {
676 fields.push(self.schema().field(i).clone());
677 columns.push(self.column(i).clone());
678 }
679 }
680 Self::try_new(
681 Arc::new(Schema::new_with_metadata(
682 fields,
683 self.schema().metadata().clone(),
684 )),
685 columns,
686 )
687 }
688
689 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
690 let mut fields = self.schema().fields().to_vec();
691 if index >= fields.len() {
692 return Err(ArrowError::InvalidArgumentError(format!(
693 "Index out of bounds: {}",
694 index
695 )));
696 }
697 fields[index] = Arc::new(Field::new(
698 new_name,
699 fields[index].data_type().clone(),
700 fields[index].is_nullable(),
701 ));
702 Self::try_new(
703 Arc::new(Schema::new_with_metadata(
704 fields,
705 self.schema().metadata().clone(),
706 )),
707 self.columns().to_vec(),
708 )
709 }
710
711 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
712 let mut columns = self.columns().to_vec();
713 let field_i = self
714 .schema()
715 .fields()
716 .iter()
717 .position(|f| f.name() == name)
718 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
719 columns[field_i] = column;
720 Self::try_new(self.schema(), columns)
721 }
722
723 fn replace_column_schema_by_name(
724 &self,
725 name: &str,
726 new_data_type: DataType,
727 column: Arc<dyn Array>,
728 ) -> Result<RecordBatch> {
729 let fields = self
730 .schema()
731 .fields()
732 .iter()
733 .map(|x| {
734 if x.name() != name {
735 x.clone()
736 } else {
737 let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
738 Arc::new(new_field)
739 }
740 })
741 .collect::<Vec<_>>();
742 let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
743 let mut columns = self.columns().to_vec();
744 let field_i = self
745 .schema()
746 .fields()
747 .iter()
748 .position(|f| f.name() == name)
749 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
750 columns[field_i] = column;
751 Self::try_new(Arc::new(schema), columns)
752 }
753
754 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
755 let split = name.split('.').collect::<Vec<_>>();
756 if split.is_empty() {
757 return None;
758 }
759
760 self.column_by_name(split[0])
761 .and_then(|arr| get_sub_array(arr, &split[1..]))
762 }
763
764 fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
765 let struct_array: StructArray = self.clone().into();
766 self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
767 }
768
769 fn metadata(&self) -> &HashMap<String, String> {
770 self.schema_ref().metadata()
771 }
772
773 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
774 let mut schema = self.schema_ref().as_ref().clone();
775 schema.metadata = metadata;
776 Self::try_new(schema.into(), self.columns().into())
777 }
778
779 fn take(&self, indices: &UInt32Array) -> Result<Self> {
780 let struct_array: StructArray = self.clone().into();
781 let taken = take(&struct_array, indices, None)?;
782 self.try_new_from_struct_array(taken.as_struct().clone())
783 }
784
785 fn shrink_to_fit(&self) -> Result<Self> {
786 crate::deepcopy::deep_copy_batch_sliced(self)
788 }
789
790 fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<Self> {
791 if column >= self.num_columns() {
792 return Err(ArrowError::InvalidArgumentError(format!(
793 "Column index out of bounds: {}",
794 column
795 )));
796 }
797 let column = self.column(column);
798 let sorted = arrow_ord::sort::sort_to_indices(column, options, None)?;
799 self.take(&sorted)
800 }
801}
802
803fn project_array(array: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
806 match target_field.data_type() {
807 DataType::Struct(subfields) => {
808 let struct_arr = array.as_struct();
809 let projected = project(struct_arr, subfields)?;
810 Ok(Arc::new(projected))
811 }
812 DataType::List(inner_field) => {
813 let list_arr: &ListArray = array.as_list();
814 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
815 Ok(Arc::new(ListArray::new(
816 inner_field.clone(),
817 list_arr.offsets().clone(),
818 projected_values,
819 list_arr.nulls().cloned(),
820 )))
821 }
822 DataType::LargeList(inner_field) => {
823 let list_arr: &LargeListArray = array.as_list();
824 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
825 Ok(Arc::new(LargeListArray::new(
826 inner_field.clone(),
827 list_arr.offsets().clone(),
828 projected_values,
829 list_arr.nulls().cloned(),
830 )))
831 }
832 DataType::FixedSizeList(inner_field, size) => {
833 let list_arr = array.as_fixed_size_list();
834 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
835 Ok(Arc::new(FixedSizeListArray::new(
836 inner_field.clone(),
837 *size,
838 projected_values,
839 list_arr.nulls().cloned(),
840 )))
841 }
842 _ => Ok(array.clone()),
843 }
844}
845
846fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
847 if fields.is_empty() {
848 return Ok(StructArray::new_empty_fields(
849 struct_array.len(),
850 struct_array.nulls().cloned(),
851 ));
852 }
853 let mut columns: Vec<ArrayRef> = vec![];
854 for field in fields.iter() {
855 if let Some(col) = struct_array.column_by_name(field.name()) {
856 let projected = project_array(col, field.as_ref())?;
857 columns.push(projected);
858 } else {
859 return Err(ArrowError::SchemaError(format!(
860 "field {} does not exist in the RecordBatch",
861 field.name()
862 )));
863 }
864 }
865 StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
867}
868
869fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
870 let left_list: &GenericListArray<T> = left.as_list();
871 let right_list: &GenericListArray<T> = right.as_list();
872 left_list.offsets().inner() == right_list.offsets().inner()
873}
874
875fn merge_list_structs_helper<T: OffsetSizeTrait>(
876 left: &dyn Array,
877 right: &dyn Array,
878 items_field_name: impl Into<String>,
879 items_nullable: bool,
880) -> Arc<dyn Array> {
881 let left_list: &GenericListArray<T> = left.as_list();
882 let right_list: &GenericListArray<T> = right.as_list();
883 let left_struct = left_list.values();
884 let right_struct = right_list.values();
885 let left_struct_arr = left_struct.as_struct();
886 let right_struct_arr = right_struct.as_struct();
887 let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
888 let items_field = Arc::new(Field::new(
889 items_field_name,
890 merged_items.data_type().clone(),
891 items_nullable,
892 ));
893 Arc::new(GenericListArray::<T>::new(
894 items_field,
895 left_list.offsets().clone(),
896 merged_items,
897 left_list.nulls().cloned(),
898 ))
899}
900
901fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
902 left: &dyn Array,
903 right: &dyn Array,
904 not_null: &dyn Array,
905 items_field_name: impl Into<String>,
906) -> Arc<dyn Array> {
907 let left_list: &GenericListArray<T> = left.as_list::<T>();
908 let not_null_list = not_null.as_list::<T>();
909 let right_list = right.as_list::<T>();
910
911 let left_struct = left_list.values().as_struct();
912 let not_null_struct: &StructArray = not_null_list.values().as_struct();
913 let right_struct = right_list.values().as_struct();
914
915 let values_len = not_null_list.values().len();
916 let mut merged_fields =
917 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
918 let mut merged_columns =
919 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
920
921 for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
922 merged_fields.push(field.clone());
923 if let Some(val) = not_null_struct.column_by_name(field.name()) {
924 merged_columns.push(val.clone());
925 } else {
926 merged_columns.push(new_null_array(field.data_type(), values_len))
927 }
928 }
929 for (_, field) in right_struct
930 .columns()
931 .iter()
932 .zip(right_struct.fields())
933 .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
934 {
935 merged_fields.push(field.clone());
936 if let Some(val) = not_null_struct.column_by_name(field.name()) {
937 merged_columns.push(val.clone());
938 } else {
939 merged_columns.push(new_null_array(field.data_type(), values_len));
940 }
941 }
942
943 let merged_struct = Arc::new(StructArray::new(
944 Fields::from(merged_fields),
945 merged_columns,
946 not_null_struct.nulls().cloned(),
947 ));
948 let items_field = Arc::new(Field::new(
949 items_field_name,
950 merged_struct.data_type().clone(),
951 true,
952 ));
953 Arc::new(GenericListArray::<T>::new(
954 items_field,
955 not_null_list.offsets().clone(),
956 merged_struct,
957 not_null_list.nulls().cloned(),
958 ))
959}
960
961fn merge_list_struct_null(
962 left: &dyn Array,
963 right: &dyn Array,
964 not_null: &dyn Array,
965) -> Arc<dyn Array> {
966 match left.data_type() {
967 DataType::List(left_field) => {
968 merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
969 }
970 DataType::LargeList(left_field) => {
971 merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
972 }
973 _ => unreachable!(),
974 }
975}
976
977fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
978 if left.null_count() == left.len() {
982 return merge_list_struct_null(left, right, right);
983 } else if right.null_count() == right.len() {
984 return merge_list_struct_null(left, right, left);
985 }
986 match (left.data_type(), right.data_type()) {
987 (DataType::List(left_field), DataType::List(_)) => {
988 if !lists_have_same_offsets_helper::<i32>(left, right) {
989 panic!("Attempt to merge list struct arrays which do not have same offsets");
990 }
991 merge_list_structs_helper::<i32>(
992 left,
993 right,
994 left_field.name(),
995 left_field.is_nullable(),
996 )
997 }
998 (DataType::LargeList(left_field), DataType::LargeList(_)) => {
999 if !lists_have_same_offsets_helper::<i64>(left, right) {
1000 panic!("Attempt to merge list struct arrays which do not have same offsets");
1001 }
1002 merge_list_structs_helper::<i64>(
1003 left,
1004 right,
1005 left_field.name(),
1006 left_field.is_nullable(),
1007 )
1008 }
1009 _ => unreachable!(),
1010 }
1011}
1012
1013fn normalize_validity(
1016 validity: Option<&arrow_buffer::NullBuffer>,
1017) -> Option<&arrow_buffer::NullBuffer> {
1018 validity.and_then(|v| {
1019 if v.null_count() == v.len() {
1020 None
1021 } else {
1022 Some(v)
1023 }
1024 })
1025}
1026
1027fn merge_struct_validity(
1032 left_validity: Option<&arrow_buffer::NullBuffer>,
1033 right_validity: Option<&arrow_buffer::NullBuffer>,
1034) -> Option<arrow_buffer::NullBuffer> {
1035 let left_normalized = normalize_validity(left_validity);
1037 let right_normalized = normalize_validity(right_validity);
1038
1039 match (left_normalized, right_normalized) {
1040 (None, None) => None,
1042 (Some(left), None) => Some(left.clone()),
1043 (None, Some(right)) => Some(right.clone()),
1044 (Some(left), Some(right)) => {
1045 if left.null_count() == 0 && right.null_count() == 0 {
1047 return Some(left.clone());
1048 }
1049
1050 let left_buffer = left.inner();
1051 let right_buffer = right.inner();
1052
1053 let merged_buffer = left_buffer | right_buffer;
1056
1057 Some(arrow_buffer::NullBuffer::from(merged_buffer))
1058 }
1059 }
1060}
1061
1062fn merge_list_child_values(
1063 child_field: &Field,
1064 left_values: ArrayRef,
1065 right_values: ArrayRef,
1066) -> ArrayRef {
1067 match child_field.data_type() {
1068 DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1069 left_values.as_struct(),
1070 right_values.as_struct(),
1071 child_fields,
1072 )) as ArrayRef,
1073 DataType::List(grandchild) => {
1074 let left_list = left_values
1075 .as_any()
1076 .downcast_ref::<ListArray>()
1077 .expect("left list values should be ListArray");
1078 let right_list = right_values
1079 .as_any()
1080 .downcast_ref::<ListArray>()
1081 .expect("right list values should be ListArray");
1082 let merged_values = merge_list_child_values(
1083 grandchild.as_ref(),
1084 left_list.values().clone(),
1085 right_list.values().clone(),
1086 );
1087 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1088 Arc::new(ListArray::new(
1089 grandchild.clone(),
1090 left_list.offsets().clone(),
1091 merged_values,
1092 merged_validity,
1093 )) as ArrayRef
1094 }
1095 DataType::LargeList(grandchild) => {
1096 let left_list = left_values
1097 .as_any()
1098 .downcast_ref::<LargeListArray>()
1099 .expect("left list values should be LargeListArray");
1100 let right_list = right_values
1101 .as_any()
1102 .downcast_ref::<LargeListArray>()
1103 .expect("right list values should be LargeListArray");
1104 let merged_values = merge_list_child_values(
1105 grandchild.as_ref(),
1106 left_list.values().clone(),
1107 right_list.values().clone(),
1108 );
1109 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1110 Arc::new(LargeListArray::new(
1111 grandchild.clone(),
1112 left_list.offsets().clone(),
1113 merged_values,
1114 merged_validity,
1115 )) as ArrayRef
1116 }
1117 DataType::FixedSizeList(grandchild, list_size) => {
1118 let left_list = left_values
1119 .as_any()
1120 .downcast_ref::<FixedSizeListArray>()
1121 .expect("left list values should be FixedSizeListArray");
1122 let right_list = right_values
1123 .as_any()
1124 .downcast_ref::<FixedSizeListArray>()
1125 .expect("right list values should be FixedSizeListArray");
1126 let merged_values = merge_list_child_values(
1127 grandchild.as_ref(),
1128 left_list.values().clone(),
1129 right_list.values().clone(),
1130 );
1131 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1132 Arc::new(FixedSizeListArray::new(
1133 grandchild.clone(),
1134 *list_size,
1135 merged_values,
1136 merged_validity,
1137 )) as ArrayRef
1138 }
1139 _ => left_values.clone(),
1140 }
1141}
1142
1143fn adjust_child_validity(
1147 child: &ArrayRef,
1148 parent_validity: Option<&arrow_buffer::NullBuffer>,
1149) -> ArrayRef {
1150 let parent_validity = match parent_validity {
1152 None => return child.clone(),
1153 Some(p) if p.null_count() == 0 => return child.clone(), Some(p) => p,
1155 };
1156
1157 let child_validity = child.nulls();
1158
1159 let new_validity = match child_validity {
1161 None => {
1162 parent_validity.clone()
1164 }
1165 Some(child_nulls) => {
1166 let child_buffer = child_nulls.inner();
1167 let parent_buffer = parent_validity.inner();
1168
1169 let merged_buffer = child_buffer & parent_buffer;
1172
1173 arrow_buffer::NullBuffer::from(merged_buffer)
1174 }
1175 };
1176
1177 arrow_array::make_array(
1179 arrow_data::ArrayData::try_new(
1180 child.data_type().clone(),
1181 child.len(),
1182 Some(new_validity.into_inner().into_inner()),
1183 child.offset(),
1184 child.to_data().buffers().to_vec(),
1185 child.to_data().child_data().to_vec(),
1186 )
1187 .unwrap(),
1188 )
1189}
1190
1191fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1192 let mut fields: Vec<Field> = vec![];
1193 let mut columns: Vec<ArrayRef> = vec![];
1194 let right_fields = right_struct_array.fields();
1195 let right_columns = right_struct_array.columns();
1196
1197 let left_validity = left_struct_array.nulls();
1199 let right_validity = right_struct_array.nulls();
1200
1201 let merged_validity = merge_struct_validity(left_validity, right_validity);
1203
1204 for (left_field, left_column) in left_struct_array
1206 .fields()
1207 .iter()
1208 .zip(left_struct_array.columns().iter())
1209 {
1210 match right_fields
1211 .iter()
1212 .position(|f| f.name() == left_field.name())
1213 {
1214 Some(right_index) => {
1216 let right_field = right_fields.get(right_index).unwrap();
1217 let right_column = right_columns.get(right_index).unwrap();
1218 match (left_field.data_type(), right_field.data_type()) {
1220 (DataType::Struct(_), DataType::Struct(_)) => {
1221 let left_sub_array = left_column.as_struct();
1222 let right_sub_array = right_column.as_struct();
1223 let merged_sub_array = merge(left_sub_array, right_sub_array);
1224 fields.push(Field::new(
1225 left_field.name(),
1226 merged_sub_array.data_type().clone(),
1227 left_field.is_nullable(),
1228 ));
1229 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1230 }
1231 (DataType::List(left_list), DataType::List(right_list))
1232 if left_list.data_type().is_struct()
1233 && right_list.data_type().is_struct() =>
1234 {
1235 if left_list.data_type() == right_list.data_type() {
1237 fields.push(left_field.as_ref().clone());
1238 columns.push(left_column.clone());
1239 }
1240 let merged_sub_array = merge_list_struct(&left_column, &right_column);
1244
1245 fields.push(Field::new(
1246 left_field.name(),
1247 merged_sub_array.data_type().clone(),
1248 left_field.is_nullable(),
1249 ));
1250 columns.push(merged_sub_array);
1251 }
1252 _ => {
1254 fields.push(left_field.as_ref().clone());
1256 let adjusted_column = adjust_child_validity(left_column, left_validity);
1258 columns.push(adjusted_column);
1259 }
1260 }
1261 }
1262 None => {
1263 fields.push(left_field.as_ref().clone());
1264 let adjusted_column = adjust_child_validity(left_column, left_validity);
1266 columns.push(adjusted_column);
1267 }
1268 }
1269 }
1270
1271 right_fields
1273 .iter()
1274 .zip(right_columns.iter())
1275 .for_each(|(field, column)| {
1276 if !left_struct_array
1278 .fields()
1279 .iter()
1280 .any(|f| f.name() == field.name())
1281 {
1282 fields.push(field.as_ref().clone());
1283 let adjusted_column = adjust_child_validity(column, right_validity);
1286 columns.push(adjusted_column);
1287 }
1288 });
1289
1290 StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1291}
1292
1293fn merge_with_schema(
1294 left_struct_array: &StructArray,
1295 right_struct_array: &StructArray,
1296 fields: &Fields,
1297) -> StructArray {
1298 fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1300 match (left, right) {
1301 (DataType::Struct(_), DataType::Struct(_)) => true,
1302 (DataType::Struct(_), _) => false,
1303 (_, DataType::Struct(_)) => false,
1304 _ => true,
1305 }
1306 }
1307
1308 let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1309 let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1310
1311 let left_fields = left_struct_array.fields();
1312 let left_columns = left_struct_array.columns();
1313 let right_fields = right_struct_array.fields();
1314 let right_columns = right_struct_array.columns();
1315
1316 let left_validity = left_struct_array.nulls();
1318 let right_validity = right_struct_array.nulls();
1319
1320 let merged_validity = merge_struct_validity(left_validity, right_validity);
1322
1323 for field in fields {
1324 let left_match_idx = left_fields.iter().position(|f| {
1325 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1326 });
1327 let right_match_idx = right_fields.iter().position(|f| {
1328 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1329 });
1330
1331 match (left_match_idx, right_match_idx) {
1332 (None, Some(right_idx)) => {
1333 output_fields.push(right_fields[right_idx].as_ref().clone());
1334 let adjusted_column =
1336 adjust_child_validity(&right_columns[right_idx], right_validity);
1337 columns.push(adjusted_column);
1338 }
1339 (Some(left_idx), None) => {
1340 output_fields.push(left_fields[left_idx].as_ref().clone());
1341 let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1343 columns.push(adjusted_column);
1344 }
1345 (Some(left_idx), Some(right_idx)) => {
1346 match field.data_type() {
1347 DataType::Struct(child_fields) => {
1348 let left_sub_array = left_columns[left_idx].as_struct();
1349 let right_sub_array = right_columns[right_idx].as_struct();
1350 let merged_sub_array =
1351 merge_with_schema(left_sub_array, right_sub_array, child_fields);
1352 output_fields.push(Field::new(
1353 field.name(),
1354 merged_sub_array.data_type().clone(),
1355 field.is_nullable(),
1356 ));
1357 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1358 }
1359 DataType::List(child_field) => {
1360 let left_list = left_columns[left_idx]
1361 .as_any()
1362 .downcast_ref::<ListArray>()
1363 .unwrap();
1364 let right_list = right_columns[right_idx]
1365 .as_any()
1366 .downcast_ref::<ListArray>()
1367 .unwrap();
1368 let merged_values = merge_list_child_values(
1369 child_field.as_ref(),
1370 left_list.trimmed_values(),
1371 right_list.trimmed_values(),
1372 );
1373 let merged_validity =
1374 merge_struct_validity(left_list.nulls(), right_list.nulls());
1375 let merged_list = ListArray::new(
1376 child_field.clone(),
1377 left_list.offsets().clone(),
1378 merged_values,
1379 merged_validity,
1380 );
1381 output_fields.push(field.as_ref().clone());
1382 columns.push(Arc::new(merged_list) as ArrayRef);
1383 }
1384 DataType::LargeList(child_field) => {
1385 let left_list = left_columns[left_idx]
1386 .as_any()
1387 .downcast_ref::<LargeListArray>()
1388 .unwrap();
1389 let right_list = right_columns[right_idx]
1390 .as_any()
1391 .downcast_ref::<LargeListArray>()
1392 .unwrap();
1393 let merged_values = merge_list_child_values(
1394 child_field.as_ref(),
1395 left_list.trimmed_values(),
1396 right_list.trimmed_values(),
1397 );
1398 let merged_validity =
1399 merge_struct_validity(left_list.nulls(), right_list.nulls());
1400 let merged_list = LargeListArray::new(
1401 child_field.clone(),
1402 left_list.offsets().clone(),
1403 merged_values,
1404 merged_validity,
1405 );
1406 output_fields.push(field.as_ref().clone());
1407 columns.push(Arc::new(merged_list) as ArrayRef);
1408 }
1409 DataType::FixedSizeList(child_field, list_size) => {
1410 let left_list = left_columns[left_idx]
1411 .as_any()
1412 .downcast_ref::<FixedSizeListArray>()
1413 .unwrap();
1414 let right_list = right_columns[right_idx]
1415 .as_any()
1416 .downcast_ref::<FixedSizeListArray>()
1417 .unwrap();
1418 let merged_values = merge_list_child_values(
1419 child_field.as_ref(),
1420 left_list.values().clone(),
1421 right_list.values().clone(),
1422 );
1423 let merged_validity =
1424 merge_struct_validity(left_list.nulls(), right_list.nulls());
1425 let merged_list = FixedSizeListArray::new(
1426 child_field.clone(),
1427 *list_size,
1428 merged_values,
1429 merged_validity,
1430 );
1431 output_fields.push(field.as_ref().clone());
1432 columns.push(Arc::new(merged_list) as ArrayRef);
1433 }
1434 _ => {
1435 output_fields.push(left_fields[left_idx].as_ref().clone());
1436 let adjusted_column =
1438 adjust_child_validity(&left_columns[left_idx], left_validity);
1439 columns.push(adjusted_column);
1440 }
1441 }
1442 }
1443 (None, None) => {
1444 }
1446 }
1447 }
1448
1449 StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1450}
1451
1452fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1453 if components.is_empty() {
1454 return Some(array);
1455 }
1456 if !matches!(array.data_type(), DataType::Struct(_)) {
1457 return None;
1458 }
1459 let struct_arr = array.as_struct();
1460 struct_arr
1461 .column_by_name(components[0])
1462 .and_then(|arr| get_sub_array(arr, &components[1..]))
1463}
1464
1465pub fn interleave_batches(
1469 batches: &[RecordBatch],
1470 indices: &[(usize, usize)],
1471) -> Result<RecordBatch> {
1472 let first_batch = batches.first().ok_or_else(|| {
1473 ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1474 })?;
1475 let schema = first_batch.schema();
1476 let num_columns = first_batch.num_columns();
1477 let mut columns = Vec::with_capacity(num_columns);
1478 let mut chunks = Vec::with_capacity(batches.len());
1479
1480 for i in 0..num_columns {
1481 for batch in batches {
1482 chunks.push(batch.column(i).as_ref());
1483 }
1484 let new_column = interleave(&chunks, indices)?;
1485 columns.push(new_column);
1486 chunks.clear();
1487 }
1488
1489 RecordBatch::try_new(schema, columns)
1490}
1491
1492pub trait BufferExt {
1493 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1508
1509 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1518}
1519
1520fn is_pwr_two(n: u64) -> bool {
1521 n & (n - 1) == 0
1522}
1523
1524impl BufferExt for arrow_buffer::Buffer {
1525 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1526 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1527 {
1528 let size_bytes = bytes.len();
1530 Self::copy_bytes_bytes(bytes, size_bytes)
1531 } else {
1532 unsafe {
1535 Self::from_custom_allocation(
1536 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1537 bytes.len(),
1538 Arc::new(bytes),
1539 )
1540 }
1541 }
1542 }
1543
1544 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1545 assert!(size_bytes >= bytes.len());
1546 let mut buf = MutableBuffer::with_capacity(size_bytes);
1547 let to_fill = size_bytes - bytes.len();
1548 buf.extend(bytes);
1549 buf.extend(std::iter::repeat_n(0_u8, to_fill));
1550
1551 buf.shrink_to_fit();
1554
1555 Self::from(buf)
1556 }
1557}
1558
1559#[cfg(test)]
1560mod tests {
1561 use super::*;
1562 use arrow_array::{Float32Array, Int32Array, StructArray};
1563 use arrow_array::{ListArray, StringArray, new_empty_array, new_null_array};
1564 use arrow_buffer::OffsetBuffer;
1565
1566 #[test]
1567 fn test_merge_recursive() {
1568 let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1569 let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1570 let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1571 let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1572
1573 let left_schema = Schema::new(vec![
1574 Field::new("a", DataType::Int32, true),
1575 Field::new(
1576 "b",
1577 DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1578 true,
1579 ),
1580 ]);
1581 let left_batch = RecordBatch::try_new(
1582 Arc::new(left_schema),
1583 vec![
1584 Arc::new(a_array.clone()),
1585 Arc::new(StructArray::from(vec![(
1586 Arc::new(Field::new("c", DataType::Int32, true)),
1587 Arc::new(c_array.clone()) as ArrayRef,
1588 )])),
1589 ],
1590 )
1591 .unwrap();
1592
1593 let right_schema = Schema::new(vec![
1594 Field::new("e", DataType::Int32, true),
1595 Field::new(
1596 "b",
1597 DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1598 true,
1599 ),
1600 ]);
1601 let right_batch = RecordBatch::try_new(
1602 Arc::new(right_schema),
1603 vec![
1604 Arc::new(e_array.clone()),
1605 Arc::new(StructArray::from(vec![(
1606 Arc::new(Field::new("d", DataType::Utf8, true)),
1607 Arc::new(d_array.clone()) as ArrayRef,
1608 )])) as ArrayRef,
1609 ],
1610 )
1611 .unwrap();
1612
1613 let merged_schema = Schema::new(vec![
1614 Field::new("a", DataType::Int32, true),
1615 Field::new(
1616 "b",
1617 DataType::Struct(
1618 vec![
1619 Field::new("c", DataType::Int32, true),
1620 Field::new("d", DataType::Utf8, true),
1621 ]
1622 .into(),
1623 ),
1624 true,
1625 ),
1626 Field::new("e", DataType::Int32, true),
1627 ]);
1628 let merged_batch = RecordBatch::try_new(
1629 Arc::new(merged_schema),
1630 vec![
1631 Arc::new(a_array) as ArrayRef,
1632 Arc::new(StructArray::from(vec![
1633 (
1634 Arc::new(Field::new("c", DataType::Int32, true)),
1635 Arc::new(c_array) as ArrayRef,
1636 ),
1637 (
1638 Arc::new(Field::new("d", DataType::Utf8, true)),
1639 Arc::new(d_array) as ArrayRef,
1640 ),
1641 ])) as ArrayRef,
1642 Arc::new(e_array) as ArrayRef,
1643 ],
1644 )
1645 .unwrap();
1646
1647 let result = left_batch.merge(&right_batch).unwrap();
1648 assert_eq!(result, merged_batch);
1649 }
1650
1651 #[test]
1652 fn test_merge_with_schema() {
1653 fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1654 let fields: Fields = names
1655 .iter()
1656 .zip(types)
1657 .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1658 .collect();
1659 let schema = Schema::new(vec![Field::new(
1660 "struct",
1661 DataType::Struct(fields.clone()),
1662 false,
1663 )]);
1664 let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1665 let batch = RecordBatch::try_new(
1666 Arc::new(schema.clone()),
1667 vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1668 );
1669 (schema, batch.unwrap())
1670 }
1671
1672 let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1673 let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1674 let (output_schema, _) = test_batch(
1675 &["b", "a", "c"],
1676 &[DataType::Int64, DataType::Int32, DataType::Int32],
1677 );
1678
1679 let merged = left_batch
1681 .merge_with_schema(&right_batch, &output_schema)
1682 .unwrap();
1683 assert_eq!(merged.schema().as_ref(), &output_schema);
1684
1685 let (naive_schema, _) = test_batch(
1687 &["a", "b", "c"],
1688 &[DataType::Int32, DataType::Int64, DataType::Int32],
1689 );
1690 let merged = left_batch.merge(&right_batch).unwrap();
1691 assert_eq!(merged.schema().as_ref(), &naive_schema);
1692 }
1693
1694 #[test]
1695 fn test_merge_list_struct() {
1696 let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1697 let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1698 let x_struct_field = Arc::new(Field::new(
1699 "item",
1700 DataType::Struct(Fields::from(vec![x_field.clone()])),
1701 true,
1702 ));
1703 let y_struct_field = Arc::new(Field::new(
1704 "item",
1705 DataType::Struct(Fields::from(vec![y_field.clone()])),
1706 true,
1707 ));
1708 let both_struct_field = Arc::new(Field::new(
1709 "item",
1710 DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1711 true,
1712 ));
1713 let left_schema = Schema::new(vec![Field::new(
1714 "list_struct",
1715 DataType::List(x_struct_field.clone()),
1716 true,
1717 )]);
1718 let right_schema = Schema::new(vec![Field::new(
1719 "list_struct",
1720 DataType::List(y_struct_field.clone()),
1721 true,
1722 )]);
1723 let both_schema = Schema::new(vec![Field::new(
1724 "list_struct",
1725 DataType::List(both_struct_field.clone()),
1726 true,
1727 )]);
1728
1729 let x = Arc::new(Int32Array::from(vec![1]));
1730 let y = Arc::new(Int32Array::from(vec![2]));
1731 let x_struct = Arc::new(StructArray::new(
1732 Fields::from(vec![x_field.clone()]),
1733 vec![x.clone()],
1734 None,
1735 ));
1736 let y_struct = Arc::new(StructArray::new(
1737 Fields::from(vec![y_field.clone()]),
1738 vec![y.clone()],
1739 None,
1740 ));
1741 let both_struct = Arc::new(StructArray::new(
1742 Fields::from(vec![x_field.clone(), y_field.clone()]),
1743 vec![x.clone(), y],
1744 None,
1745 ));
1746 let both_null_struct = Arc::new(StructArray::new(
1747 Fields::from(vec![x_field, y_field]),
1748 vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1749 None,
1750 ));
1751 let offsets = OffsetBuffer::from_lengths([1]);
1752 let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1753 let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1754 let both_list = ListArray::new(
1755 both_struct_field.clone(),
1756 offsets.clone(),
1757 both_struct,
1758 None,
1759 );
1760 let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1761 let x_batch =
1762 RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1763 let y_batch = RecordBatch::try_new(
1764 Arc::new(right_schema.clone()),
1765 vec![Arc::new(y_s_list.clone())],
1766 )
1767 .unwrap();
1768 let merged = x_batch.merge(&y_batch).unwrap();
1769 let expected =
1770 RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1771 assert_eq!(merged, expected);
1772
1773 let y_null_list = new_null_array(y_s_list.data_type(), 1);
1774 let y_null_batch =
1775 RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1776 .unwrap();
1777 let expected =
1778 RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1779 let merged = x_batch.merge(&y_null_batch).unwrap();
1780 assert_eq!(merged, expected);
1781 }
1782
1783 #[test]
1784 fn test_byte_width_opt() {
1785 assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1786 assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1787 assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1788 assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1789 assert_eq!(DataType::Utf8.byte_width_opt(), None);
1790 assert_eq!(DataType::Binary.byte_width_opt(), None);
1791 assert_eq!(
1792 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1793 None
1794 );
1795 assert_eq!(
1796 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1797 .byte_width_opt(),
1798 Some(12)
1799 );
1800 assert_eq!(
1801 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1802 .byte_width_opt(),
1803 Some(16)
1804 );
1805 assert_eq!(
1806 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1807 .byte_width_opt(),
1808 None
1809 );
1810 }
1811
1812 #[test]
1813 fn test_take_record_batch() {
1814 let schema = Arc::new(Schema::new(vec![
1815 Field::new("a", DataType::Int32, true),
1816 Field::new("b", DataType::Utf8, true),
1817 ]));
1818 let batch = RecordBatch::try_new(
1819 schema.clone(),
1820 vec![
1821 Arc::new(Int32Array::from_iter_values(0..20)),
1822 Arc::new(StringArray::from_iter_values(
1823 (0..20).map(|i| format!("str-{}", i)),
1824 )),
1825 ],
1826 )
1827 .unwrap();
1828 let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1829 assert_eq!(
1830 taken,
1831 RecordBatch::try_new(
1832 schema,
1833 vec![
1834 Arc::new(Int32Array::from(vec![1, 5, 10])),
1835 Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1836 ],
1837 )
1838 .unwrap()
1839 )
1840 }
1841
1842 #[test]
1843 fn test_schema_project_by_schema() {
1844 let metadata = [("key".to_string(), "value".to_string())];
1845 let schema = Arc::new(
1846 Schema::new(vec![
1847 Field::new("a", DataType::Int32, true),
1848 Field::new("b", DataType::Utf8, true),
1849 ])
1850 .with_metadata(metadata.clone().into()),
1851 );
1852 let batch = RecordBatch::try_new(
1853 schema,
1854 vec![
1855 Arc::new(Int32Array::from_iter_values(0..20)),
1856 Arc::new(StringArray::from_iter_values(
1857 (0..20).map(|i| format!("str-{}", i)),
1858 )),
1859 ],
1860 )
1861 .unwrap();
1862
1863 let empty_schema = Schema::empty();
1865 let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1866 let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1867 assert_eq!(
1868 empty_projected,
1869 RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1870 .with_schema(Arc::new(expected_schema))
1871 .unwrap()
1872 );
1873
1874 let reordered_schema = Schema::new(vec![
1876 Field::new("b", DataType::Utf8, true),
1877 Field::new("a", DataType::Int32, true),
1878 ]);
1879 let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1880 let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1881 assert_eq!(
1882 reordered_projected,
1883 RecordBatch::try_new(
1884 expected_schema,
1885 vec![
1886 Arc::new(StringArray::from_iter_values(
1887 (0..20).map(|i| format!("str-{}", i)),
1888 )),
1889 Arc::new(Int32Array::from_iter_values(0..20)),
1890 ],
1891 )
1892 .unwrap()
1893 );
1894
1895 let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1897 let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1898 let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1899 assert_eq!(
1900 sub_projected,
1901 RecordBatch::try_new(
1902 expected_schema,
1903 vec![Arc::new(Int32Array::from_iter_values(0..20))],
1904 )
1905 .unwrap()
1906 );
1907 }
1908
1909 #[test]
1910 fn test_project_preserves_struct_validity() {
1911 let fields = Fields::from(vec![
1913 Field::new("id", DataType::Int32, false),
1914 Field::new("value", DataType::Float32, true),
1915 ]);
1916
1917 let id_array = Int32Array::from(vec![1, 2, 3]);
1919 let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1920 let struct_array = StructArray::new(
1921 fields.clone(),
1922 vec![
1923 Arc::new(id_array) as ArrayRef,
1924 Arc::new(value_array) as ArrayRef,
1925 ],
1926 Some(vec![true, false, true].into()), );
1928
1929 let projected = project(&struct_array, &fields).unwrap();
1931
1932 assert_eq!(projected.null_count(), 1);
1934 assert!(!projected.is_null(0));
1935 assert!(projected.is_null(1));
1936 assert!(!projected.is_null(2));
1937 }
1938
1939 #[test]
1940 fn test_merge_struct_with_different_validity() {
1941 let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1944 let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1945 let left_struct = StructArray::new(
1946 left_fields,
1947 vec![Arc::new(height_array) as ArrayRef],
1948 Some(vec![true, false, true, false].into()), );
1950
1951 let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1953 let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1954 let right_struct = StructArray::new(
1955 right_fields,
1956 vec![Arc::new(width_array) as ArrayRef],
1957 Some(vec![true, true, false, false].into()), );
1959
1960 let merged = merge(&left_struct, &right_struct);
1962
1963 assert_eq!(merged.null_count(), 1); assert!(!merged.is_null(0));
1971 assert!(!merged.is_null(1));
1972 assert!(!merged.is_null(2));
1973 assert!(merged.is_null(3));
1974
1975 let height_col = merged.column_by_name("height").unwrap();
1977 let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1978 assert_eq!(height_values.value(0), 500);
1979 assert!(height_values.is_null(1)); assert_eq!(height_values.value(2), 600);
1981
1982 let width_col = merged.column_by_name("width").unwrap();
1983 let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1984 assert_eq!(width_values.value(0), 300);
1985 assert_eq!(width_values.value(1), 200);
1986 assert!(width_values.is_null(2)); }
1988
1989 #[test]
1990 fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
1991 let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
1993 let left_count = Arc::new(Int32Array::from(vec![None, None]));
1994 let left_struct = Arc::new(StructArray::new(
1995 Fields::from(vec![
1996 Field::new("company_id", DataType::Int32, true),
1997 Field::new("count", DataType::Int32, true),
1998 ]),
1999 vec![left_company_id, left_count],
2000 None,
2001 ));
2002 let left_list = Arc::new(ListArray::new(
2003 Arc::new(Field::new(
2004 "item",
2005 DataType::Struct(left_struct.fields().clone()),
2006 true,
2007 )),
2008 OffsetBuffer::from_lengths([2]),
2009 left_struct,
2010 None,
2011 ));
2012
2013 let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
2015 let right_struct = Arc::new(StructArray::new(
2016 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2017 vec![right_company_name],
2018 None,
2019 ));
2020 let right_list = Arc::new(ListArray::new(
2021 Arc::new(Field::new(
2022 "item",
2023 DataType::Struct(right_struct.fields().clone()),
2024 true,
2025 )),
2026 OffsetBuffer::from_lengths([2]),
2027 right_struct,
2028 None,
2029 ));
2030
2031 let target_fields = Fields::from(vec![Field::new(
2032 "companies",
2033 DataType::List(Arc::new(Field::new(
2034 "item",
2035 DataType::Struct(Fields::from(vec![
2036 Field::new("company_id", DataType::Int32, true),
2037 Field::new("company_name", DataType::Utf8, true),
2038 Field::new("count", DataType::Int32, true),
2039 ])),
2040 true,
2041 ))),
2042 true,
2043 )]);
2044
2045 let left_batch = RecordBatch::try_new(
2046 Arc::new(Schema::new(vec![Field::new(
2047 "companies",
2048 left_list.data_type().clone(),
2049 true,
2050 )])),
2051 vec![left_list as ArrayRef],
2052 )
2053 .unwrap();
2054
2055 let right_batch = RecordBatch::try_new(
2056 Arc::new(Schema::new(vec![Field::new(
2057 "companies",
2058 right_list.data_type().clone(),
2059 true,
2060 )])),
2061 vec![right_list as ArrayRef],
2062 )
2063 .unwrap();
2064
2065 let merged = left_batch
2066 .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2067 .unwrap();
2068
2069 let merged_list = merged
2071 .column_by_name("companies")
2072 .unwrap()
2073 .as_any()
2074 .downcast_ref::<ListArray>()
2075 .unwrap();
2076 let merged_struct = merged_list.values().as_struct();
2077
2078 assert_eq!(merged_struct.num_columns(), 3);
2080 assert!(merged_struct.column_by_name("company_id").is_some());
2081 assert!(merged_struct.column_by_name("company_name").is_some());
2082 assert!(merged_struct.column_by_name("count").is_some());
2083
2084 let company_id = merged_struct
2086 .column_by_name("company_id")
2087 .unwrap()
2088 .as_any()
2089 .downcast_ref::<Int32Array>()
2090 .unwrap();
2091 assert!(company_id.is_null(0));
2092 assert!(company_id.is_null(1));
2093
2094 let company_name = merged_struct
2095 .column_by_name("company_name")
2096 .unwrap()
2097 .as_any()
2098 .downcast_ref::<StringArray>()
2099 .unwrap();
2100 assert_eq!(company_name.value(0), "Google");
2101 assert_eq!(company_name.value(1), "Microsoft");
2102
2103 let count = merged_struct
2104 .column_by_name("count")
2105 .unwrap()
2106 .as_any()
2107 .downcast_ref::<Int32Array>()
2108 .unwrap();
2109 assert!(count.is_null(0));
2110 assert!(count.is_null(1));
2111 }
2112
2113 #[test]
2114 fn test_merge_struct_lists() {
2115 test_merge_struct_lists_generic::<i32>();
2116 }
2117
2118 #[test]
2119 fn test_merge_struct_large_lists() {
2120 test_merge_struct_lists_generic::<i64>();
2121 }
2122
2123 fn test_merge_struct_lists_generic<O: OffsetSizeTrait>() {
2124 let left_company_id = Arc::new(Int32Array::from(vec![
2126 Some(1),
2127 Some(2),
2128 Some(3),
2129 Some(4),
2130 Some(5),
2131 Some(6),
2132 Some(7),
2133 Some(8),
2134 Some(9),
2135 Some(10),
2136 Some(11),
2137 Some(12),
2138 Some(13),
2139 Some(14),
2140 Some(15),
2141 Some(16),
2142 Some(17),
2143 Some(18),
2144 Some(19),
2145 Some(20),
2146 ]));
2147 let left_count = Arc::new(Int32Array::from(vec![
2148 Some(10),
2149 Some(20),
2150 Some(30),
2151 Some(40),
2152 Some(50),
2153 Some(60),
2154 Some(70),
2155 Some(80),
2156 Some(90),
2157 Some(100),
2158 Some(110),
2159 Some(120),
2160 Some(130),
2161 Some(140),
2162 Some(150),
2163 Some(160),
2164 Some(170),
2165 Some(180),
2166 Some(190),
2167 Some(200),
2168 ]));
2169 let left_struct = Arc::new(StructArray::new(
2170 Fields::from(vec![
2171 Field::new("company_id", DataType::Int32, true),
2172 Field::new("count", DataType::Int32, true),
2173 ]),
2174 vec![left_company_id, left_count],
2175 None,
2176 ));
2177
2178 let left_list = Arc::new(GenericListArray::<O>::new(
2179 Arc::new(Field::new(
2180 "item",
2181 DataType::Struct(left_struct.fields().clone()),
2182 true,
2183 )),
2184 OffsetBuffer::from_lengths([3, 1]),
2185 left_struct.clone(),
2186 None,
2187 ));
2188
2189 let left_list_struct = Arc::new(StructArray::new(
2190 Fields::from(vec![Field::new(
2191 "companies",
2192 if O::IS_LARGE {
2193 DataType::LargeList(Arc::new(Field::new(
2194 "item",
2195 DataType::Struct(left_struct.fields().clone()),
2196 true,
2197 )))
2198 } else {
2199 DataType::List(Arc::new(Field::new(
2200 "item",
2201 DataType::Struct(left_struct.fields().clone()),
2202 true,
2203 )))
2204 },
2205 true,
2206 )]),
2207 vec![left_list as ArrayRef],
2208 None,
2209 ));
2210
2211 let right_company_name = Arc::new(StringArray::from(vec![
2213 "Google",
2214 "Microsoft",
2215 "Apple",
2216 "Facebook",
2217 ]));
2218 let right_struct = Arc::new(StructArray::new(
2219 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2220 vec![right_company_name],
2221 None,
2222 ));
2223 let right_list = Arc::new(GenericListArray::<O>::new(
2224 Arc::new(Field::new(
2225 "item",
2226 DataType::Struct(right_struct.fields().clone()),
2227 true,
2228 )),
2229 OffsetBuffer::from_lengths([3, 1]),
2230 right_struct.clone(),
2231 None,
2232 ));
2233
2234 let right_list_struct = Arc::new(StructArray::new(
2235 Fields::from(vec![Field::new(
2236 "companies",
2237 if O::IS_LARGE {
2238 DataType::LargeList(Arc::new(Field::new(
2239 "item",
2240 DataType::Struct(right_struct.fields().clone()),
2241 true,
2242 )))
2243 } else {
2244 DataType::List(Arc::new(Field::new(
2245 "item",
2246 DataType::Struct(right_struct.fields().clone()),
2247 true,
2248 )))
2249 },
2250 true,
2251 )]),
2252 vec![right_list as ArrayRef],
2253 None,
2254 ));
2255
2256 let target_fields = Fields::from(vec![Field::new(
2258 "companies",
2259 if O::IS_LARGE {
2260 DataType::LargeList(Arc::new(Field::new(
2261 "item",
2262 DataType::Struct(Fields::from(vec![
2263 Field::new("company_id", DataType::Int32, true),
2264 Field::new("company_name", DataType::Utf8, true),
2265 Field::new("count", DataType::Int32, true),
2266 ])),
2267 true,
2268 )))
2269 } else {
2270 DataType::List(Arc::new(Field::new(
2271 "item",
2272 DataType::Struct(Fields::from(vec![
2273 Field::new("company_id", DataType::Int32, true),
2274 Field::new("company_name", DataType::Utf8, true),
2275 Field::new("count", DataType::Int32, true),
2276 ])),
2277 true,
2278 )))
2279 },
2280 true,
2281 )]);
2282
2283 let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
2285 assert_eq!(merged_array.len(), 2);
2286 }
2287
2288 #[test]
2289 fn test_project_by_schema_list_struct_reorder() {
2290 let source_inner_struct = DataType::Struct(Fields::from(vec![
2295 Field::new("c", DataType::Utf8, true),
2296 Field::new("b", DataType::Utf8, true),
2297 Field::new("a", DataType::Utf8, true),
2298 ]));
2299 let source_schema = Arc::new(Schema::new(vec![
2300 Field::new("id", DataType::Int32, false),
2301 Field::new(
2302 "data",
2303 DataType::List(Arc::new(Field::new(
2304 "item",
2305 source_inner_struct.clone(),
2306 true,
2307 ))),
2308 true,
2309 ),
2310 ]));
2311
2312 let c_array = StringArray::from(vec!["c1", "c2"]);
2314 let b_array = StringArray::from(vec!["b1", "b2"]);
2315 let a_array = StringArray::from(vec!["a1", "a2"]);
2316 let inner_struct = StructArray::from(vec![
2317 (
2318 Arc::new(Field::new("c", DataType::Utf8, true)),
2319 Arc::new(c_array) as ArrayRef,
2320 ),
2321 (
2322 Arc::new(Field::new("b", DataType::Utf8, true)),
2323 Arc::new(b_array) as ArrayRef,
2324 ),
2325 (
2326 Arc::new(Field::new("a", DataType::Utf8, true)),
2327 Arc::new(a_array) as ArrayRef,
2328 ),
2329 ]);
2330
2331 let list_array = ListArray::new(
2332 Arc::new(Field::new("item", source_inner_struct, true)),
2333 OffsetBuffer::from_lengths([1, 1]),
2334 Arc::new(inner_struct),
2335 None,
2336 );
2337
2338 let batch = RecordBatch::try_new(
2339 source_schema,
2340 vec![Arc::new(Int32Array::from(vec![1, 2])), Arc::new(list_array)],
2341 )
2342 .unwrap();
2343
2344 let target_inner_struct = DataType::Struct(Fields::from(vec![
2346 Field::new("a", DataType::Utf8, true),
2347 Field::new("b", DataType::Utf8, true),
2348 Field::new("c", DataType::Utf8, true),
2349 ]));
2350 let target_schema = Schema::new(vec![
2351 Field::new("id", DataType::Int32, false),
2352 Field::new(
2353 "data",
2354 DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2355 true,
2356 ),
2357 ]);
2358
2359 let projected = batch.project_by_schema(&target_schema).unwrap();
2361
2362 assert_eq!(projected.schema().as_ref(), &target_schema);
2364
2365 let projected_list = projected.column(1).as_list::<i32>();
2367 let projected_struct = projected_list.values().as_struct();
2368
2369 assert_eq!(
2371 projected_struct.column_by_name("a").unwrap().as_ref(),
2372 &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2373 );
2374 assert_eq!(
2375 projected_struct.column_by_name("b").unwrap().as_ref(),
2376 &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2377 );
2378 assert_eq!(
2379 projected_struct.column_by_name("c").unwrap().as_ref(),
2380 &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2381 );
2382
2383 assert_eq!(
2385 projected_struct.column(0).as_ref(),
2386 &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2387 );
2388 assert_eq!(
2389 projected_struct.column(1).as_ref(),
2390 &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2391 );
2392 assert_eq!(
2393 projected_struct.column(2).as_ref(),
2394 &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2395 );
2396 }
2397
2398 #[test]
2399 fn test_project_by_schema_nested_list_struct() {
2400 let inner_struct = DataType::Struct(Fields::from(vec![
2402 Field::new("y", DataType::Int32, true),
2403 Field::new("x", DataType::Int32, true),
2404 ]));
2405 let source_schema = Arc::new(Schema::new(vec![Field::new(
2406 "outer",
2407 DataType::List(Arc::new(Field::new(
2408 "item",
2409 DataType::Struct(Fields::from(vec![
2410 Field::new("b", DataType::Utf8, true),
2411 Field::new(
2412 "inner_list",
2413 DataType::List(Arc::new(Field::new("item", inner_struct.clone(), true))),
2414 true,
2415 ),
2416 Field::new("a", DataType::Utf8, true),
2417 ])),
2418 true,
2419 ))),
2420 true,
2421 )]));
2422
2423 let y_array = Int32Array::from(vec![1, 2]);
2425 let x_array = Int32Array::from(vec![3, 4]);
2426 let innermost_struct = StructArray::from(vec![
2427 (
2428 Arc::new(Field::new("y", DataType::Int32, true)),
2429 Arc::new(y_array) as ArrayRef,
2430 ),
2431 (
2432 Arc::new(Field::new("x", DataType::Int32, true)),
2433 Arc::new(x_array) as ArrayRef,
2434 ),
2435 ]);
2436 let inner_list = ListArray::new(
2437 Arc::new(Field::new("item", inner_struct.clone(), true)),
2438 OffsetBuffer::from_lengths([2]),
2439 Arc::new(innermost_struct),
2440 None,
2441 );
2442
2443 let b_array = StringArray::from(vec!["b1"]);
2444 let a_array = StringArray::from(vec!["a1"]);
2445 let middle_struct = StructArray::from(vec![
2446 (
2447 Arc::new(Field::new("b", DataType::Utf8, true)),
2448 Arc::new(b_array) as ArrayRef,
2449 ),
2450 (
2451 Arc::new(Field::new(
2452 "inner_list",
2453 DataType::List(Arc::new(Field::new("item", inner_struct, true))),
2454 true,
2455 )),
2456 Arc::new(inner_list) as ArrayRef,
2457 ),
2458 (
2459 Arc::new(Field::new("a", DataType::Utf8, true)),
2460 Arc::new(a_array) as ArrayRef,
2461 ),
2462 ]);
2463
2464 let outer_list = ListArray::new(
2465 Arc::new(Field::new("item", middle_struct.data_type().clone(), true)),
2466 OffsetBuffer::from_lengths([1]),
2467 Arc::new(middle_struct),
2468 None,
2469 );
2470
2471 let batch =
2472 RecordBatch::try_new(source_schema, vec![Arc::new(outer_list) as ArrayRef]).unwrap();
2473
2474 let target_inner_struct = DataType::Struct(Fields::from(vec![
2476 Field::new("x", DataType::Int32, true), Field::new("y", DataType::Int32, true),
2478 ]));
2479 let target_schema = Schema::new(vec![Field::new(
2480 "outer",
2481 DataType::List(Arc::new(Field::new(
2482 "item",
2483 DataType::Struct(Fields::from(vec![
2484 Field::new("a", DataType::Utf8, true), Field::new(
2486 "inner_list",
2487 DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2488 true,
2489 ),
2490 Field::new("b", DataType::Utf8, true),
2491 ])),
2492 true,
2493 ))),
2494 true,
2495 )]);
2496
2497 let projected = batch.project_by_schema(&target_schema).unwrap();
2498
2499 assert_eq!(projected.schema().as_ref(), &target_schema);
2501
2502 let outer_list = projected.column(0).as_list::<i32>();
2504 let middle_struct = outer_list.values().as_struct();
2505
2506 assert_eq!(
2508 middle_struct.column(0).as_ref(),
2509 &StringArray::from(vec!["a1"]) as &dyn Array
2510 );
2511 assert_eq!(
2512 middle_struct.column(2).as_ref(),
2513 &StringArray::from(vec!["b1"]) as &dyn Array
2514 );
2515
2516 let inner_list = middle_struct.column(1).as_list::<i32>();
2518 let innermost_struct = inner_list.values().as_struct();
2519 assert_eq!(
2520 innermost_struct.column(0).as_ref(),
2521 &Int32Array::from(vec![3, 4]) as &dyn Array
2522 );
2523 assert_eq!(
2524 innermost_struct.column(1).as_ref(),
2525 &Int32Array::from(vec![1, 2]) as &dyn Array
2526 );
2527 }
2528}