1use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12 Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray, GenericListArray,
13 LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray,
14 UInt8Array, UInt32Array, cast::AsArray,
15};
16use arrow_array::{
17 Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, new_null_array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SortOptions};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30use crate::list::ListArrayExt;
31pub use floats::*;
32
33pub mod cast;
34pub mod json;
35pub mod list;
36pub mod memory;
37pub mod scalar;
38pub mod stream;
39pub mod r#struct;
40
41pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
43
44pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
46
47pub const BLOB_META_KEY: &str = "lance-encoding:blob";
50pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
52pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str =
54 "lance-encoding:blob-dedicated-size-threshold";
55
56type Result<T> = std::result::Result<T, ArrowError>;
57
58pub trait DataTypeExt {
59 fn is_binary_like(&self) -> bool;
72
73 fn is_struct(&self) -> bool;
75
76 fn is_fixed_stride(&self) -> bool;
81
82 fn is_dictionary(&self) -> bool;
84
85 fn byte_width(&self) -> usize;
88
89 fn byte_width_opt(&self) -> Option<usize>;
92}
93
94impl DataTypeExt for DataType {
95 fn is_binary_like(&self) -> bool {
96 use DataType::*;
97 matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
98 }
99
100 fn is_struct(&self) -> bool {
101 matches!(self, Self::Struct(_))
102 }
103
104 fn is_fixed_stride(&self) -> bool {
105 use DataType::*;
106 matches!(
107 self,
108 Boolean
109 | UInt8
110 | UInt16
111 | UInt32
112 | UInt64
113 | Int8
114 | Int16
115 | Int32
116 | Int64
117 | Float16
118 | Float32
119 | Float64
120 | Decimal128(_, _)
121 | Decimal256(_, _)
122 | FixedSizeList(_, _)
123 | FixedSizeBinary(_)
124 | Duration(_)
125 | Timestamp(_, _)
126 | Date32
127 | Date64
128 | Time32(_)
129 | Time64(_)
130 )
131 }
132
133 fn is_dictionary(&self) -> bool {
134 matches!(self, Self::Dictionary(_, _))
135 }
136
137 fn byte_width_opt(&self) -> Option<usize> {
138 match self {
139 Self::Int8 => Some(1),
140 Self::Int16 => Some(2),
141 Self::Int32 => Some(4),
142 Self::Int64 => Some(8),
143 Self::UInt8 => Some(1),
144 Self::UInt16 => Some(2),
145 Self::UInt32 => Some(4),
146 Self::UInt64 => Some(8),
147 Self::Float16 => Some(2),
148 Self::Float32 => Some(4),
149 Self::Float64 => Some(8),
150 Self::Date32 => Some(4),
151 Self::Date64 => Some(8),
152 Self::Time32(_) => Some(4),
153 Self::Time64(_) => Some(8),
154 Self::Timestamp(_, _) => Some(8),
155 Self::Duration(_) => Some(8),
156 Self::Decimal128(_, _) => Some(16),
157 Self::Decimal256(_, _) => Some(32),
158 Self::Interval(unit) => match unit {
159 IntervalUnit::YearMonth => Some(4),
160 IntervalUnit::DayTime => Some(8),
161 IntervalUnit::MonthDayNano => Some(16),
162 },
163 Self::FixedSizeBinary(s) => Some(*s as usize),
164 Self::FixedSizeList(dt, s) => dt
165 .data_type()
166 .byte_width_opt()
167 .map(|width| width * *s as usize),
168 _ => None,
169 }
170 }
171
172 fn byte_width(&self) -> usize {
173 self.byte_width_opt()
174 .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
175 }
176}
177
178pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
196 values: T,
197 offsets: &PrimitiveArray<Offset>,
198) -> Result<GenericListArray<Offset::Native>>
199where
200 Offset::Native: OffsetSizeTrait,
201{
202 let data_type = if Offset::Native::IS_LARGE {
203 DataType::LargeList(Arc::new(Field::new(
204 "item",
205 values.data_type().clone(),
206 true,
207 )))
208 } else {
209 DataType::List(Arc::new(Field::new(
210 "item",
211 values.data_type().clone(),
212 true,
213 )))
214 };
215 let data = ArrayDataBuilder::new(data_type)
216 .len(offsets.len() - 1)
217 .add_buffer(offsets.into_data().buffers()[0].clone())
218 .add_child_data(values.into_data())
219 .build()?;
220
221 Ok(GenericListArray::from(data))
222}
223
224pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
225 DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
226}
227
228pub trait FixedSizeListArrayExt {
229 fn try_new_from_values<T: Array + 'static>(
248 values: T,
249 list_size: i32,
250 ) -> Result<FixedSizeListArray>;
251
252 fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
266
267 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
270}
271
272impl FixedSizeListArrayExt for FixedSizeListArray {
273 fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
274 let field = Arc::new(Field::new("item", values.data_type().clone(), true));
275 let values = Arc::new(values);
276
277 Self::try_new(field, list_size, values, None)
278 }
279
280 fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
281 if n >= self.len() {
282 return Ok(self.clone());
283 }
284 let mut rng = SmallRng::from_os_rng();
285 let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
286 take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
287 }
288
289 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
290 match self.data_type() {
291 DataType::FixedSizeList(field, size) => match field.data_type() {
292 DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
293 DataType::Int8 => Ok(Self::new(
294 Arc::new(arrow_schema::Field::new(
295 field.name(),
296 DataType::Float32,
297 field.is_nullable(),
298 )),
299 *size,
300 Arc::new(Float32Array::from_iter_values(
301 self.values()
302 .as_any()
303 .downcast_ref::<Int8Array>()
304 .ok_or(ArrowError::ParseError(
305 "Fail to cast primitive array to Int8Type".to_string(),
306 ))?
307 .into_iter()
308 .filter_map(|x| x.map(|y| y as f32)),
309 )),
310 self.nulls().cloned(),
311 )),
312 DataType::Int16 => Ok(Self::new(
313 Arc::new(arrow_schema::Field::new(
314 field.name(),
315 DataType::Float32,
316 field.is_nullable(),
317 )),
318 *size,
319 Arc::new(Float32Array::from_iter_values(
320 self.values()
321 .as_any()
322 .downcast_ref::<Int16Array>()
323 .ok_or(ArrowError::ParseError(
324 "Fail to cast primitive array to Int16Type".to_string(),
325 ))?
326 .into_iter()
327 .filter_map(|x| x.map(|y| y as f32)),
328 )),
329 self.nulls().cloned(),
330 )),
331 DataType::Int32 => Ok(Self::new(
332 Arc::new(arrow_schema::Field::new(
333 field.name(),
334 DataType::Float32,
335 field.is_nullable(),
336 )),
337 *size,
338 Arc::new(Float32Array::from_iter_values(
339 self.values()
340 .as_any()
341 .downcast_ref::<Int32Array>()
342 .ok_or(ArrowError::ParseError(
343 "Fail to cast primitive array to Int32Type".to_string(),
344 ))?
345 .into_iter()
346 .filter_map(|x| x.map(|y| y as f32)),
347 )),
348 self.nulls().cloned(),
349 )),
350 DataType::Int64 => Ok(Self::new(
351 Arc::new(arrow_schema::Field::new(
352 field.name(),
353 DataType::Float64,
354 field.is_nullable(),
355 )),
356 *size,
357 Arc::new(Float64Array::from_iter_values(
358 self.values()
359 .as_any()
360 .downcast_ref::<Int64Array>()
361 .ok_or(ArrowError::ParseError(
362 "Fail to cast primitive array to Int64Type".to_string(),
363 ))?
364 .into_iter()
365 .filter_map(|x| x.map(|y| y as f64)),
366 )),
367 self.nulls().cloned(),
368 )),
369 DataType::UInt8 => Ok(Self::new(
370 Arc::new(arrow_schema::Field::new(
371 field.name(),
372 DataType::Float64,
373 field.is_nullable(),
374 )),
375 *size,
376 Arc::new(Float64Array::from_iter_values(
377 self.values()
378 .as_any()
379 .downcast_ref::<UInt8Array>()
380 .ok_or(ArrowError::ParseError(
381 "Fail to cast primitive array to UInt8Type".to_string(),
382 ))?
383 .into_iter()
384 .filter_map(|x| x.map(|y| y as f64)),
385 )),
386 self.nulls().cloned(),
387 )),
388 DataType::UInt32 => Ok(Self::new(
389 Arc::new(arrow_schema::Field::new(
390 field.name(),
391 DataType::Float64,
392 field.is_nullable(),
393 )),
394 *size,
395 Arc::new(Float64Array::from_iter_values(
396 self.values()
397 .as_any()
398 .downcast_ref::<UInt32Array>()
399 .ok_or(ArrowError::ParseError(
400 "Fail to cast primitive array to UInt32Type".to_string(),
401 ))?
402 .into_iter()
403 .filter_map(|x| x.map(|y| y as f64)),
404 )),
405 self.nulls().cloned(),
406 )),
407 data_type => Err(ArrowError::ParseError(format!(
408 "Expect either floating type or integer got {:?}",
409 data_type
410 ))),
411 },
412 data_type => Err(ArrowError::ParseError(format!(
413 "Expect either FixedSizeList got {:?}",
414 data_type
415 ))),
416 }
417 }
418}
419
420pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
423 arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
424}
425
426pub trait FixedSizeBinaryArrayExt {
427 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
446}
447
448impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
449 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
450 let data_type = DataType::FixedSizeBinary(stride);
451 let data = ArrayDataBuilder::new(data_type)
452 .len(values.len() / stride as usize)
453 .add_buffer(values.into_data().buffers()[0].clone())
454 .build()?;
455 Ok(Self::from(data))
456 }
457}
458
459pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
460 arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
461}
462
463pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
464 match arr.data_type() {
465 DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
466 DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
467 _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
468 }
469}
470
471pub trait RecordBatchExt {
473 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
503
504 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
506
507 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
511
512 fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
557
558 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
568
569 fn drop_column(&self, name: &str) -> Result<RecordBatch>;
573
574 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
576
577 fn replace_column_schema_by_name(
579 &self,
580 name: &str,
581 new_data_type: DataType,
582 column: Arc<dyn Array>,
583 ) -> Result<RecordBatch>;
584
585 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
587
588 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
590
591 fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
593
594 fn metadata(&self) -> &HashMap<String, String>;
596
597 fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
599 let mut metadata = self.metadata().clone();
600 metadata.insert(key, value);
601 self.with_metadata(metadata)
602 }
603
604 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
606
607 fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
609
610 fn shrink_to_fit(&self) -> Result<RecordBatch>;
612
613 fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<RecordBatch>;
615}
616
617impl RecordBatchExt for RecordBatch {
618 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
619 let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
620 let mut new_columns = self.columns().to_vec();
621 new_columns.push(arr);
622 Self::try_new(new_schema, new_columns)
623 }
624
625 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
626 let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
627 let mut new_columns = self.columns().to_vec();
628 new_columns.insert(index, arr);
629 Self::try_new(new_schema, new_columns)
630 }
631
632 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
633 let schema = Arc::new(Schema::new_with_metadata(
634 arr.fields().to_vec(),
635 self.schema().metadata.clone(),
636 ));
637 let batch = Self::from(arr);
638 batch.with_schema(schema)
639 }
640
641 fn merge(&self, other: &Self) -> Result<Self> {
642 if self.num_rows() != other.num_rows() {
643 return Err(ArrowError::InvalidArgumentError(format!(
644 "Attempt to merge two RecordBatch with different sizes: {} != {}",
645 self.num_rows(),
646 other.num_rows()
647 )));
648 }
649 let left_struct_array: StructArray = self.clone().into();
650 let right_struct_array: StructArray = other.clone().into();
651 self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
652 }
653
654 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
655 if self.num_rows() != other.num_rows() {
656 return Err(ArrowError::InvalidArgumentError(format!(
657 "Attempt to merge two RecordBatch with different sizes: {} != {}",
658 self.num_rows(),
659 other.num_rows()
660 )));
661 }
662 let left_struct_array: StructArray = self.clone().into();
663 let right_struct_array: StructArray = other.clone().into();
664 self.try_new_from_struct_array(merge_with_schema(
665 &left_struct_array,
666 &right_struct_array,
667 schema.fields(),
668 ))
669 }
670
671 fn drop_column(&self, name: &str) -> Result<Self> {
672 let mut fields = vec![];
673 let mut columns = vec![];
674 for i in 0..self.schema().fields.len() {
675 if self.schema().field(i).name() != name {
676 fields.push(self.schema().field(i).clone());
677 columns.push(self.column(i).clone());
678 }
679 }
680 Self::try_new(
681 Arc::new(Schema::new_with_metadata(
682 fields,
683 self.schema().metadata().clone(),
684 )),
685 columns,
686 )
687 }
688
689 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
690 let mut fields = self.schema().fields().to_vec();
691 if index >= fields.len() {
692 return Err(ArrowError::InvalidArgumentError(format!(
693 "Index out of bounds: {}",
694 index
695 )));
696 }
697 fields[index] = Arc::new(Field::new(
698 new_name,
699 fields[index].data_type().clone(),
700 fields[index].is_nullable(),
701 ));
702 Self::try_new(
703 Arc::new(Schema::new_with_metadata(
704 fields,
705 self.schema().metadata().clone(),
706 )),
707 self.columns().to_vec(),
708 )
709 }
710
711 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
712 let mut columns = self.columns().to_vec();
713 let field_i = self
714 .schema()
715 .fields()
716 .iter()
717 .position(|f| f.name() == name)
718 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
719 columns[field_i] = column;
720 Self::try_new(self.schema(), columns)
721 }
722
723 fn replace_column_schema_by_name(
724 &self,
725 name: &str,
726 new_data_type: DataType,
727 column: Arc<dyn Array>,
728 ) -> Result<RecordBatch> {
729 let fields = self
730 .schema()
731 .fields()
732 .iter()
733 .map(|x| {
734 if x.name() != name {
735 x.clone()
736 } else {
737 let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
738 Arc::new(new_field)
739 }
740 })
741 .collect::<Vec<_>>();
742 let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
743 let mut columns = self.columns().to_vec();
744 let field_i = self
745 .schema()
746 .fields()
747 .iter()
748 .position(|f| f.name() == name)
749 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
750 columns[field_i] = column;
751 Self::try_new(Arc::new(schema), columns)
752 }
753
754 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
755 let split = name.split('.').collect::<Vec<_>>();
756 if split.is_empty() {
757 return None;
758 }
759
760 self.column_by_name(split[0])
761 .and_then(|arr| get_sub_array(arr, &split[1..]))
762 }
763
764 fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
765 let struct_array: StructArray = self.clone().into();
766 self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
767 }
768
769 fn metadata(&self) -> &HashMap<String, String> {
770 self.schema_ref().metadata()
771 }
772
773 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
774 let mut schema = self.schema_ref().as_ref().clone();
775 schema.metadata = metadata;
776 Self::try_new(schema.into(), self.columns().into())
777 }
778
779 fn take(&self, indices: &UInt32Array) -> Result<Self> {
780 let struct_array: StructArray = self.clone().into();
781 let taken = take(&struct_array, indices, None)?;
782 self.try_new_from_struct_array(taken.as_struct().clone())
783 }
784
785 fn shrink_to_fit(&self) -> Result<Self> {
786 crate::deepcopy::deep_copy_batch_sliced(self)
788 }
789
790 fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<Self> {
791 if column >= self.num_columns() {
792 return Err(ArrowError::InvalidArgumentError(format!(
793 "Column index out of bounds: {}",
794 column
795 )));
796 }
797 let column = self.column(column);
798 let sorted = arrow_ord::sort::sort_to_indices(column, options, None)?;
799 self.take(&sorted)
800 }
801}
802
803fn project_array(array: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
806 match target_field.data_type() {
807 DataType::Struct(subfields) => {
808 let struct_arr = array.as_struct();
809 let projected = project(struct_arr, subfields)?;
810 Ok(Arc::new(projected))
811 }
812 DataType::List(inner_field) => {
813 let list_arr: &ListArray = array.as_list();
814 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
815 Ok(Arc::new(ListArray::new(
816 inner_field.clone(),
817 list_arr.offsets().clone(),
818 projected_values,
819 list_arr.nulls().cloned(),
820 )))
821 }
822 DataType::LargeList(inner_field) => {
823 let list_arr: &LargeListArray = array.as_list();
824 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
825 Ok(Arc::new(LargeListArray::new(
826 inner_field.clone(),
827 list_arr.offsets().clone(),
828 projected_values,
829 list_arr.nulls().cloned(),
830 )))
831 }
832 DataType::FixedSizeList(inner_field, size) => {
833 let list_arr = array.as_fixed_size_list();
834 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
835 Ok(Arc::new(FixedSizeListArray::new(
836 inner_field.clone(),
837 *size,
838 projected_values,
839 list_arr.nulls().cloned(),
840 )))
841 }
842 _ => Ok(array.clone()),
843 }
844}
845
846fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
847 if fields.is_empty() {
848 return Ok(StructArray::new_empty_fields(
849 struct_array.len(),
850 struct_array.nulls().cloned(),
851 ));
852 }
853 let mut columns: Vec<ArrayRef> = vec![];
854 for field in fields.iter() {
855 if let Some(col) = struct_array.column_by_name(field.name()) {
856 let projected = project_array(col, field.as_ref())?;
857 columns.push(projected);
858 } else {
859 return Err(ArrowError::SchemaError(format!(
860 "field {} does not exist in the RecordBatch",
861 field.name()
862 )));
863 }
864 }
865 StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
867}
868
869fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
870 let left_list: &GenericListArray<T> = left.as_list();
871 let right_list: &GenericListArray<T> = right.as_list();
872 left_list.offsets().inner() == right_list.offsets().inner()
873}
874
875fn merge_list_structs_helper<T: OffsetSizeTrait>(
876 left: &dyn Array,
877 right: &dyn Array,
878 items_field_name: impl Into<String>,
879 items_nullable: bool,
880) -> Arc<dyn Array> {
881 let left_list: &GenericListArray<T> = left.as_list();
882 let right_list: &GenericListArray<T> = right.as_list();
883 let left_struct = left_list.values();
884 let right_struct = right_list.values();
885 let left_struct_arr = left_struct.as_struct();
886 let right_struct_arr = right_struct.as_struct();
887 let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
888 let items_field = Arc::new(Field::new(
889 items_field_name,
890 merged_items.data_type().clone(),
891 items_nullable,
892 ));
893 Arc::new(GenericListArray::<T>::new(
894 items_field,
895 left_list.offsets().clone(),
896 merged_items,
897 left_list.nulls().cloned(),
898 ))
899}
900
901fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
902 left: &dyn Array,
903 right: &dyn Array,
904 not_null: &dyn Array,
905 items_field_name: impl Into<String>,
906) -> Arc<dyn Array> {
907 let left_list: &GenericListArray<T> = left.as_list::<T>();
908 let not_null_list = not_null.as_list::<T>();
909 let right_list = right.as_list::<T>();
910
911 let left_struct = left_list.values().as_struct();
912 let not_null_struct: &StructArray = not_null_list.values().as_struct();
913 let right_struct = right_list.values().as_struct();
914
915 let values_len = not_null_list.values().len();
916 let mut merged_fields =
917 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
918 let mut merged_columns =
919 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
920
921 for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
922 merged_fields.push(field.clone());
923 if let Some(val) = not_null_struct.column_by_name(field.name()) {
924 merged_columns.push(val.clone());
925 } else {
926 merged_columns.push(new_null_array(field.data_type(), values_len))
927 }
928 }
929 for (_, field) in right_struct
930 .columns()
931 .iter()
932 .zip(right_struct.fields())
933 .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
934 {
935 merged_fields.push(field.clone());
936 if let Some(val) = not_null_struct.column_by_name(field.name()) {
937 merged_columns.push(val.clone());
938 } else {
939 merged_columns.push(new_null_array(field.data_type(), values_len));
940 }
941 }
942
943 let merged_struct = Arc::new(StructArray::new(
944 Fields::from(merged_fields),
945 merged_columns,
946 not_null_struct.nulls().cloned(),
947 ));
948 let items_field = Arc::new(Field::new(
949 items_field_name,
950 merged_struct.data_type().clone(),
951 true,
952 ));
953 Arc::new(GenericListArray::<T>::new(
954 items_field,
955 not_null_list.offsets().clone(),
956 merged_struct,
957 not_null_list.nulls().cloned(),
958 ))
959}
960
961fn merge_list_struct_null(
962 left: &dyn Array,
963 right: &dyn Array,
964 not_null: &dyn Array,
965) -> Arc<dyn Array> {
966 match left.data_type() {
967 DataType::List(left_field) => {
968 merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
969 }
970 DataType::LargeList(left_field) => {
971 merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
972 }
973 _ => unreachable!(),
974 }
975}
976
977fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
978 if left.null_count() == left.len() {
982 return merge_list_struct_null(left, right, right);
983 } else if right.null_count() == right.len() {
984 return merge_list_struct_null(left, right, left);
985 }
986 match (left.data_type(), right.data_type()) {
987 (DataType::List(left_field), DataType::List(_)) => {
988 if !lists_have_same_offsets_helper::<i32>(left, right) {
989 panic!("Attempt to merge list struct arrays which do not have same offsets");
990 }
991 merge_list_structs_helper::<i32>(
992 left,
993 right,
994 left_field.name(),
995 left_field.is_nullable(),
996 )
997 }
998 (DataType::LargeList(left_field), DataType::LargeList(_)) => {
999 if !lists_have_same_offsets_helper::<i64>(left, right) {
1000 panic!("Attempt to merge list struct arrays which do not have same offsets");
1001 }
1002 merge_list_structs_helper::<i64>(
1003 left,
1004 right,
1005 left_field.name(),
1006 left_field.is_nullable(),
1007 )
1008 }
1009 _ => unreachable!(),
1010 }
1011}
1012
1013fn normalize_validity(
1016 validity: Option<&arrow_buffer::NullBuffer>,
1017) -> Option<&arrow_buffer::NullBuffer> {
1018 validity.and_then(|v| {
1019 if v.null_count() == v.len() {
1020 None
1021 } else {
1022 Some(v)
1023 }
1024 })
1025}
1026
1027fn merge_struct_validity(
1032 left_validity: Option<&arrow_buffer::NullBuffer>,
1033 right_validity: Option<&arrow_buffer::NullBuffer>,
1034) -> Option<arrow_buffer::NullBuffer> {
1035 let left_normalized = normalize_validity(left_validity);
1037 let right_normalized = normalize_validity(right_validity);
1038
1039 match (left_normalized, right_normalized) {
1040 (None, None) => None,
1042 (Some(left), None) => Some(left.clone()),
1043 (None, Some(right)) => Some(right.clone()),
1044 (Some(left), Some(right)) => {
1045 if left.null_count() == 0 && right.null_count() == 0 {
1047 return Some(left.clone());
1048 }
1049
1050 let left_buffer = left.inner();
1051 let right_buffer = right.inner();
1052
1053 let merged_buffer = left_buffer | right_buffer;
1056
1057 Some(arrow_buffer::NullBuffer::from(merged_buffer))
1058 }
1059 }
1060}
1061
1062fn merge_list_child_values(
1063 child_field: &Field,
1064 left_values: ArrayRef,
1065 right_values: ArrayRef,
1066) -> ArrayRef {
1067 match child_field.data_type() {
1068 DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1069 left_values.as_struct(),
1070 right_values.as_struct(),
1071 child_fields,
1072 )) as ArrayRef,
1073 DataType::List(grandchild) => {
1074 let left_list = left_values
1075 .as_any()
1076 .downcast_ref::<ListArray>()
1077 .expect("left list values should be ListArray");
1078 let right_list = right_values
1079 .as_any()
1080 .downcast_ref::<ListArray>()
1081 .expect("right list values should be ListArray");
1082 let merged_values = merge_list_child_values(
1083 grandchild.as_ref(),
1084 left_list.values().clone(),
1085 right_list.values().clone(),
1086 );
1087 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1088 Arc::new(ListArray::new(
1089 grandchild.clone(),
1090 left_list.offsets().clone(),
1091 merged_values,
1092 merged_validity,
1093 )) as ArrayRef
1094 }
1095 DataType::LargeList(grandchild) => {
1096 let left_list = left_values
1097 .as_any()
1098 .downcast_ref::<LargeListArray>()
1099 .expect("left list values should be LargeListArray");
1100 let right_list = right_values
1101 .as_any()
1102 .downcast_ref::<LargeListArray>()
1103 .expect("right list values should be LargeListArray");
1104 let merged_values = merge_list_child_values(
1105 grandchild.as_ref(),
1106 left_list.values().clone(),
1107 right_list.values().clone(),
1108 );
1109 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1110 Arc::new(LargeListArray::new(
1111 grandchild.clone(),
1112 left_list.offsets().clone(),
1113 merged_values,
1114 merged_validity,
1115 )) as ArrayRef
1116 }
1117 DataType::FixedSizeList(grandchild, list_size) => {
1118 let left_list = left_values
1119 .as_any()
1120 .downcast_ref::<FixedSizeListArray>()
1121 .expect("left list values should be FixedSizeListArray");
1122 let right_list = right_values
1123 .as_any()
1124 .downcast_ref::<FixedSizeListArray>()
1125 .expect("right list values should be FixedSizeListArray");
1126 let merged_values = merge_list_child_values(
1127 grandchild.as_ref(),
1128 left_list.values().clone(),
1129 right_list.values().clone(),
1130 );
1131 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1132 Arc::new(FixedSizeListArray::new(
1133 grandchild.clone(),
1134 *list_size,
1135 merged_values,
1136 merged_validity,
1137 )) as ArrayRef
1138 }
1139 _ => left_values.clone(),
1140 }
1141}
1142
1143fn adjust_child_validity(
1147 child: &ArrayRef,
1148 parent_validity: Option<&arrow_buffer::NullBuffer>,
1149) -> ArrayRef {
1150 let parent_validity = match parent_validity {
1152 None => return child.clone(),
1153 Some(p) if p.null_count() == 0 => return child.clone(), Some(p) => p,
1155 };
1156
1157 if child.data_type() == &DataType::Null {
1160 return child.clone();
1161 }
1162
1163 let child_validity = child.nulls();
1164
1165 let new_validity = match child_validity {
1167 None => {
1168 parent_validity.clone()
1170 }
1171 Some(child_nulls) => {
1172 let child_buffer = child_nulls.inner();
1173 let parent_buffer = parent_validity.inner();
1174
1175 let merged_buffer = child_buffer & parent_buffer;
1178
1179 arrow_buffer::NullBuffer::from(merged_buffer)
1180 }
1181 };
1182
1183 arrow_array::make_array(
1185 arrow_data::ArrayData::try_new(
1186 child.data_type().clone(),
1187 child.len(),
1188 Some(new_validity.into_inner().into_inner()),
1189 child.offset(),
1190 child.to_data().buffers().to_vec(),
1191 child.to_data().child_data().to_vec(),
1192 )
1193 .unwrap(),
1194 )
1195}
1196
1197fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1198 let mut fields: Vec<Field> = vec![];
1199 let mut columns: Vec<ArrayRef> = vec![];
1200 let right_fields = right_struct_array.fields();
1201 let right_columns = right_struct_array.columns();
1202
1203 let left_validity = left_struct_array.nulls();
1205 let right_validity = right_struct_array.nulls();
1206
1207 let merged_validity = merge_struct_validity(left_validity, right_validity);
1209
1210 for (left_field, left_column) in left_struct_array
1212 .fields()
1213 .iter()
1214 .zip(left_struct_array.columns().iter())
1215 {
1216 match right_fields
1217 .iter()
1218 .position(|f| f.name() == left_field.name())
1219 {
1220 Some(right_index) => {
1222 let right_field = right_fields.get(right_index).unwrap();
1223 let right_column = right_columns.get(right_index).unwrap();
1224 match (left_field.data_type(), right_field.data_type()) {
1226 (DataType::Struct(_), DataType::Struct(_)) => {
1227 let left_sub_array = left_column.as_struct();
1228 let right_sub_array = right_column.as_struct();
1229 let merged_sub_array = merge(left_sub_array, right_sub_array);
1230 fields.push(Field::new(
1231 left_field.name(),
1232 merged_sub_array.data_type().clone(),
1233 left_field.is_nullable(),
1234 ));
1235 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1236 }
1237 (DataType::List(left_list), DataType::List(right_list))
1238 if left_list.data_type().is_struct()
1239 && right_list.data_type().is_struct() =>
1240 {
1241 if left_list.data_type() == right_list.data_type() {
1243 fields.push(left_field.as_ref().clone());
1244 columns.push(left_column.clone());
1245 }
1246 let merged_sub_array = merge_list_struct(&left_column, &right_column);
1250
1251 fields.push(Field::new(
1252 left_field.name(),
1253 merged_sub_array.data_type().clone(),
1254 left_field.is_nullable(),
1255 ));
1256 columns.push(merged_sub_array);
1257 }
1258 _ => {
1260 fields.push(left_field.as_ref().clone());
1262 let adjusted_column = adjust_child_validity(left_column, left_validity);
1264 columns.push(adjusted_column);
1265 }
1266 }
1267 }
1268 None => {
1269 fields.push(left_field.as_ref().clone());
1270 let adjusted_column = adjust_child_validity(left_column, left_validity);
1272 columns.push(adjusted_column);
1273 }
1274 }
1275 }
1276
1277 right_fields
1279 .iter()
1280 .zip(right_columns.iter())
1281 .for_each(|(field, column)| {
1282 if !left_struct_array
1284 .fields()
1285 .iter()
1286 .any(|f| f.name() == field.name())
1287 {
1288 fields.push(field.as_ref().clone());
1289 let adjusted_column = adjust_child_validity(column, right_validity);
1292 columns.push(adjusted_column);
1293 }
1294 });
1295
1296 StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1297}
1298
1299fn merge_with_schema(
1300 left_struct_array: &StructArray,
1301 right_struct_array: &StructArray,
1302 fields: &Fields,
1303) -> StructArray {
1304 fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1306 match (left, right) {
1307 (DataType::Struct(_), DataType::Struct(_)) => true,
1308 (DataType::Struct(_), _) => false,
1309 (_, DataType::Struct(_)) => false,
1310 _ => true,
1311 }
1312 }
1313
1314 let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1315 let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1316
1317 let left_fields = left_struct_array.fields();
1318 let left_columns = left_struct_array.columns();
1319 let right_fields = right_struct_array.fields();
1320 let right_columns = right_struct_array.columns();
1321
1322 let left_validity = left_struct_array.nulls();
1324 let right_validity = right_struct_array.nulls();
1325
1326 let merged_validity = merge_struct_validity(left_validity, right_validity);
1328
1329 for field in fields {
1330 let left_match_idx = left_fields.iter().position(|f| {
1331 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1332 });
1333 let right_match_idx = right_fields.iter().position(|f| {
1334 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1335 });
1336
1337 match (left_match_idx, right_match_idx) {
1338 (None, Some(right_idx)) => {
1339 output_fields.push(right_fields[right_idx].as_ref().clone());
1340 let adjusted_column =
1342 adjust_child_validity(&right_columns[right_idx], right_validity);
1343 columns.push(adjusted_column);
1344 }
1345 (Some(left_idx), None) => {
1346 output_fields.push(left_fields[left_idx].as_ref().clone());
1347 let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1349 columns.push(adjusted_column);
1350 }
1351 (Some(left_idx), Some(right_idx)) => {
1352 match field.data_type() {
1353 DataType::Struct(child_fields) => {
1354 let left_sub_array = left_columns[left_idx].as_struct();
1355 let right_sub_array = right_columns[right_idx].as_struct();
1356 let merged_sub_array =
1357 merge_with_schema(left_sub_array, right_sub_array, child_fields);
1358 output_fields.push(Field::new(
1359 field.name(),
1360 merged_sub_array.data_type().clone(),
1361 field.is_nullable(),
1362 ));
1363 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1364 }
1365 DataType::List(child_field) => {
1366 let left_list = left_columns[left_idx]
1367 .as_any()
1368 .downcast_ref::<ListArray>()
1369 .unwrap();
1370 let right_list = right_columns[right_idx]
1371 .as_any()
1372 .downcast_ref::<ListArray>()
1373 .unwrap();
1374 let merged_values = merge_list_child_values(
1375 child_field.as_ref(),
1376 left_list.trimmed_values(),
1377 right_list.trimmed_values(),
1378 );
1379 let merged_validity =
1380 merge_struct_validity(left_list.nulls(), right_list.nulls());
1381 let merged_list = ListArray::new(
1382 child_field.clone(),
1383 left_list.offsets().clone(),
1384 merged_values,
1385 merged_validity,
1386 );
1387 output_fields.push(field.as_ref().clone());
1388 columns.push(Arc::new(merged_list) as ArrayRef);
1389 }
1390 DataType::LargeList(child_field) => {
1391 let left_list = left_columns[left_idx]
1392 .as_any()
1393 .downcast_ref::<LargeListArray>()
1394 .unwrap();
1395 let right_list = right_columns[right_idx]
1396 .as_any()
1397 .downcast_ref::<LargeListArray>()
1398 .unwrap();
1399 let merged_values = merge_list_child_values(
1400 child_field.as_ref(),
1401 left_list.trimmed_values(),
1402 right_list.trimmed_values(),
1403 );
1404 let merged_validity =
1405 merge_struct_validity(left_list.nulls(), right_list.nulls());
1406 let merged_list = LargeListArray::new(
1407 child_field.clone(),
1408 left_list.offsets().clone(),
1409 merged_values,
1410 merged_validity,
1411 );
1412 output_fields.push(field.as_ref().clone());
1413 columns.push(Arc::new(merged_list) as ArrayRef);
1414 }
1415 DataType::FixedSizeList(child_field, list_size) => {
1416 let left_list = left_columns[left_idx]
1417 .as_any()
1418 .downcast_ref::<FixedSizeListArray>()
1419 .unwrap();
1420 let right_list = right_columns[right_idx]
1421 .as_any()
1422 .downcast_ref::<FixedSizeListArray>()
1423 .unwrap();
1424 let merged_values = merge_list_child_values(
1425 child_field.as_ref(),
1426 left_list.values().clone(),
1427 right_list.values().clone(),
1428 );
1429 let merged_validity =
1430 merge_struct_validity(left_list.nulls(), right_list.nulls());
1431 let merged_list = FixedSizeListArray::new(
1432 child_field.clone(),
1433 *list_size,
1434 merged_values,
1435 merged_validity,
1436 );
1437 output_fields.push(field.as_ref().clone());
1438 columns.push(Arc::new(merged_list) as ArrayRef);
1439 }
1440 _ => {
1441 output_fields.push(left_fields[left_idx].as_ref().clone());
1442 let adjusted_column =
1444 adjust_child_validity(&left_columns[left_idx], left_validity);
1445 columns.push(adjusted_column);
1446 }
1447 }
1448 }
1449 (None, None) => {
1450 }
1452 }
1453 }
1454
1455 StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1456}
1457
1458fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1459 if components.is_empty() {
1460 return Some(array);
1461 }
1462 if !matches!(array.data_type(), DataType::Struct(_)) {
1463 return None;
1464 }
1465 let struct_arr = array.as_struct();
1466 struct_arr
1467 .column_by_name(components[0])
1468 .and_then(|arr| get_sub_array(arr, &components[1..]))
1469}
1470
1471pub fn interleave_batches(
1475 batches: &[RecordBatch],
1476 indices: &[(usize, usize)],
1477) -> Result<RecordBatch> {
1478 let first_batch = batches.first().ok_or_else(|| {
1479 ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1480 })?;
1481 let schema = first_batch.schema();
1482 let num_columns = first_batch.num_columns();
1483 let mut columns = Vec::with_capacity(num_columns);
1484 let mut chunks = Vec::with_capacity(batches.len());
1485
1486 for i in 0..num_columns {
1487 for batch in batches {
1488 chunks.push(batch.column(i).as_ref());
1489 }
1490 let new_column = interleave(&chunks, indices)?;
1491 columns.push(new_column);
1492 chunks.clear();
1493 }
1494
1495 RecordBatch::try_new(schema, columns)
1496}
1497
1498pub trait BufferExt {
1499 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1514
1515 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1524}
1525
1526fn is_pwr_two(n: u64) -> bool {
1527 n & (n - 1) == 0
1528}
1529
1530impl BufferExt for arrow_buffer::Buffer {
1531 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1532 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1533 {
1534 let size_bytes = bytes.len();
1536 Self::copy_bytes_bytes(bytes, size_bytes)
1537 } else {
1538 unsafe {
1541 Self::from_custom_allocation(
1542 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1543 bytes.len(),
1544 Arc::new(bytes),
1545 )
1546 }
1547 }
1548 }
1549
1550 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1551 assert!(size_bytes >= bytes.len());
1552 let mut buf = MutableBuffer::with_capacity(size_bytes);
1553 let to_fill = size_bytes - bytes.len();
1554 buf.extend(bytes);
1555 buf.extend(std::iter::repeat_n(0_u8, to_fill));
1556
1557 buf.shrink_to_fit();
1560
1561 Self::from(buf)
1562 }
1563}
1564
1565#[cfg(test)]
1566mod tests {
1567 use super::*;
1568 use arrow_array::{Float32Array, Int32Array, NullArray, StructArray};
1569 use arrow_array::{ListArray, StringArray, new_empty_array, new_null_array};
1570 use arrow_buffer::OffsetBuffer;
1571
1572 #[test]
1573 fn test_merge_recursive() {
1574 let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1575 let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1576 let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1577 let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1578
1579 let left_schema = Schema::new(vec![
1580 Field::new("a", DataType::Int32, true),
1581 Field::new(
1582 "b",
1583 DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1584 true,
1585 ),
1586 ]);
1587 let left_batch = RecordBatch::try_new(
1588 Arc::new(left_schema),
1589 vec![
1590 Arc::new(a_array.clone()),
1591 Arc::new(StructArray::from(vec![(
1592 Arc::new(Field::new("c", DataType::Int32, true)),
1593 Arc::new(c_array.clone()) as ArrayRef,
1594 )])),
1595 ],
1596 )
1597 .unwrap();
1598
1599 let right_schema = Schema::new(vec![
1600 Field::new("e", DataType::Int32, true),
1601 Field::new(
1602 "b",
1603 DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1604 true,
1605 ),
1606 ]);
1607 let right_batch = RecordBatch::try_new(
1608 Arc::new(right_schema),
1609 vec![
1610 Arc::new(e_array.clone()),
1611 Arc::new(StructArray::from(vec![(
1612 Arc::new(Field::new("d", DataType::Utf8, true)),
1613 Arc::new(d_array.clone()) as ArrayRef,
1614 )])) as ArrayRef,
1615 ],
1616 )
1617 .unwrap();
1618
1619 let merged_schema = Schema::new(vec![
1620 Field::new("a", DataType::Int32, true),
1621 Field::new(
1622 "b",
1623 DataType::Struct(
1624 vec![
1625 Field::new("c", DataType::Int32, true),
1626 Field::new("d", DataType::Utf8, true),
1627 ]
1628 .into(),
1629 ),
1630 true,
1631 ),
1632 Field::new("e", DataType::Int32, true),
1633 ]);
1634 let merged_batch = RecordBatch::try_new(
1635 Arc::new(merged_schema),
1636 vec![
1637 Arc::new(a_array) as ArrayRef,
1638 Arc::new(StructArray::from(vec![
1639 (
1640 Arc::new(Field::new("c", DataType::Int32, true)),
1641 Arc::new(c_array) as ArrayRef,
1642 ),
1643 (
1644 Arc::new(Field::new("d", DataType::Utf8, true)),
1645 Arc::new(d_array) as ArrayRef,
1646 ),
1647 ])) as ArrayRef,
1648 Arc::new(e_array) as ArrayRef,
1649 ],
1650 )
1651 .unwrap();
1652
1653 let result = left_batch.merge(&right_batch).unwrap();
1654 assert_eq!(result, merged_batch);
1655 }
1656
1657 #[test]
1658 fn test_merge_with_schema() {
1659 fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1660 let fields: Fields = names
1661 .iter()
1662 .zip(types)
1663 .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1664 .collect();
1665 let schema = Schema::new(vec![Field::new(
1666 "struct",
1667 DataType::Struct(fields.clone()),
1668 false,
1669 )]);
1670 let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1671 let batch = RecordBatch::try_new(
1672 Arc::new(schema.clone()),
1673 vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1674 );
1675 (schema, batch.unwrap())
1676 }
1677
1678 let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1679 let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1680 let (output_schema, _) = test_batch(
1681 &["b", "a", "c"],
1682 &[DataType::Int64, DataType::Int32, DataType::Int32],
1683 );
1684
1685 let merged = left_batch
1687 .merge_with_schema(&right_batch, &output_schema)
1688 .unwrap();
1689 assert_eq!(merged.schema().as_ref(), &output_schema);
1690
1691 let (naive_schema, _) = test_batch(
1693 &["a", "b", "c"],
1694 &[DataType::Int32, DataType::Int64, DataType::Int32],
1695 );
1696 let merged = left_batch.merge(&right_batch).unwrap();
1697 assert_eq!(merged.schema().as_ref(), &naive_schema);
1698 }
1699
1700 #[test]
1701 fn test_merge_list_struct() {
1702 let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1703 let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1704 let x_struct_field = Arc::new(Field::new(
1705 "item",
1706 DataType::Struct(Fields::from(vec![x_field.clone()])),
1707 true,
1708 ));
1709 let y_struct_field = Arc::new(Field::new(
1710 "item",
1711 DataType::Struct(Fields::from(vec![y_field.clone()])),
1712 true,
1713 ));
1714 let both_struct_field = Arc::new(Field::new(
1715 "item",
1716 DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1717 true,
1718 ));
1719 let left_schema = Schema::new(vec![Field::new(
1720 "list_struct",
1721 DataType::List(x_struct_field.clone()),
1722 true,
1723 )]);
1724 let right_schema = Schema::new(vec![Field::new(
1725 "list_struct",
1726 DataType::List(y_struct_field.clone()),
1727 true,
1728 )]);
1729 let both_schema = Schema::new(vec![Field::new(
1730 "list_struct",
1731 DataType::List(both_struct_field.clone()),
1732 true,
1733 )]);
1734
1735 let x = Arc::new(Int32Array::from(vec![1]));
1736 let y = Arc::new(Int32Array::from(vec![2]));
1737 let x_struct = Arc::new(StructArray::new(
1738 Fields::from(vec![x_field.clone()]),
1739 vec![x.clone()],
1740 None,
1741 ));
1742 let y_struct = Arc::new(StructArray::new(
1743 Fields::from(vec![y_field.clone()]),
1744 vec![y.clone()],
1745 None,
1746 ));
1747 let both_struct = Arc::new(StructArray::new(
1748 Fields::from(vec![x_field.clone(), y_field.clone()]),
1749 vec![x.clone(), y],
1750 None,
1751 ));
1752 let both_null_struct = Arc::new(StructArray::new(
1753 Fields::from(vec![x_field, y_field]),
1754 vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1755 None,
1756 ));
1757 let offsets = OffsetBuffer::from_lengths([1]);
1758 let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1759 let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1760 let both_list = ListArray::new(
1761 both_struct_field.clone(),
1762 offsets.clone(),
1763 both_struct,
1764 None,
1765 );
1766 let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1767 let x_batch =
1768 RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1769 let y_batch = RecordBatch::try_new(
1770 Arc::new(right_schema.clone()),
1771 vec![Arc::new(y_s_list.clone())],
1772 )
1773 .unwrap();
1774 let merged = x_batch.merge(&y_batch).unwrap();
1775 let expected =
1776 RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1777 assert_eq!(merged, expected);
1778
1779 let y_null_list = new_null_array(y_s_list.data_type(), 1);
1780 let y_null_batch =
1781 RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1782 .unwrap();
1783 let expected =
1784 RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1785 let merged = x_batch.merge(&y_null_batch).unwrap();
1786 assert_eq!(merged, expected);
1787 }
1788
1789 #[test]
1790 fn test_byte_width_opt() {
1791 assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1792 assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1793 assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1794 assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1795 assert_eq!(DataType::Utf8.byte_width_opt(), None);
1796 assert_eq!(DataType::Binary.byte_width_opt(), None);
1797 assert_eq!(
1798 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1799 None
1800 );
1801 assert_eq!(
1802 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1803 .byte_width_opt(),
1804 Some(12)
1805 );
1806 assert_eq!(
1807 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1808 .byte_width_opt(),
1809 Some(16)
1810 );
1811 assert_eq!(
1812 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1813 .byte_width_opt(),
1814 None
1815 );
1816 }
1817
1818 #[test]
1819 fn test_take_record_batch() {
1820 let schema = Arc::new(Schema::new(vec![
1821 Field::new("a", DataType::Int32, true),
1822 Field::new("b", DataType::Utf8, true),
1823 ]));
1824 let batch = RecordBatch::try_new(
1825 schema.clone(),
1826 vec![
1827 Arc::new(Int32Array::from_iter_values(0..20)),
1828 Arc::new(StringArray::from_iter_values(
1829 (0..20).map(|i| format!("str-{}", i)),
1830 )),
1831 ],
1832 )
1833 .unwrap();
1834 let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1835 assert_eq!(
1836 taken,
1837 RecordBatch::try_new(
1838 schema,
1839 vec![
1840 Arc::new(Int32Array::from(vec![1, 5, 10])),
1841 Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1842 ],
1843 )
1844 .unwrap()
1845 )
1846 }
1847
1848 #[test]
1849 fn test_schema_project_by_schema() {
1850 let metadata = [("key".to_string(), "value".to_string())];
1851 let schema = Arc::new(
1852 Schema::new(vec![
1853 Field::new("a", DataType::Int32, true),
1854 Field::new("b", DataType::Utf8, true),
1855 ])
1856 .with_metadata(metadata.clone().into()),
1857 );
1858 let batch = RecordBatch::try_new(
1859 schema,
1860 vec![
1861 Arc::new(Int32Array::from_iter_values(0..20)),
1862 Arc::new(StringArray::from_iter_values(
1863 (0..20).map(|i| format!("str-{}", i)),
1864 )),
1865 ],
1866 )
1867 .unwrap();
1868
1869 let empty_schema = Schema::empty();
1871 let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1872 let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1873 assert_eq!(
1874 empty_projected,
1875 RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1876 .with_schema(Arc::new(expected_schema))
1877 .unwrap()
1878 );
1879
1880 let reordered_schema = Schema::new(vec![
1882 Field::new("b", DataType::Utf8, true),
1883 Field::new("a", DataType::Int32, true),
1884 ]);
1885 let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1886 let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1887 assert_eq!(
1888 reordered_projected,
1889 RecordBatch::try_new(
1890 expected_schema,
1891 vec![
1892 Arc::new(StringArray::from_iter_values(
1893 (0..20).map(|i| format!("str-{}", i)),
1894 )),
1895 Arc::new(Int32Array::from_iter_values(0..20)),
1896 ],
1897 )
1898 .unwrap()
1899 );
1900
1901 let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1903 let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1904 let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1905 assert_eq!(
1906 sub_projected,
1907 RecordBatch::try_new(
1908 expected_schema,
1909 vec![Arc::new(Int32Array::from_iter_values(0..20))],
1910 )
1911 .unwrap()
1912 );
1913 }
1914
1915 #[test]
1916 fn test_project_preserves_struct_validity() {
1917 let fields = Fields::from(vec![
1919 Field::new("id", DataType::Int32, false),
1920 Field::new("value", DataType::Float32, true),
1921 ]);
1922
1923 let id_array = Int32Array::from(vec![1, 2, 3]);
1925 let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1926 let struct_array = StructArray::new(
1927 fields.clone(),
1928 vec![
1929 Arc::new(id_array) as ArrayRef,
1930 Arc::new(value_array) as ArrayRef,
1931 ],
1932 Some(vec![true, false, true].into()), );
1934
1935 let projected = project(&struct_array, &fields).unwrap();
1937
1938 assert_eq!(projected.null_count(), 1);
1940 assert!(!projected.is_null(0));
1941 assert!(projected.is_null(1));
1942 assert!(!projected.is_null(2));
1943 }
1944
1945 #[test]
1946 fn test_merge_struct_with_different_validity() {
1947 let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1950 let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1951 let left_struct = StructArray::new(
1952 left_fields,
1953 vec![Arc::new(height_array) as ArrayRef],
1954 Some(vec![true, false, true, false].into()), );
1956
1957 let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1959 let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1960 let right_struct = StructArray::new(
1961 right_fields,
1962 vec![Arc::new(width_array) as ArrayRef],
1963 Some(vec![true, true, false, false].into()), );
1965
1966 let merged = merge(&left_struct, &right_struct);
1968
1969 assert_eq!(merged.null_count(), 1); assert!(!merged.is_null(0));
1977 assert!(!merged.is_null(1));
1978 assert!(!merged.is_null(2));
1979 assert!(merged.is_null(3));
1980
1981 let height_col = merged.column_by_name("height").unwrap();
1983 let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1984 assert_eq!(height_values.value(0), 500);
1985 assert!(height_values.is_null(1)); assert_eq!(height_values.value(2), 600);
1987
1988 let width_col = merged.column_by_name("width").unwrap();
1989 let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1990 assert_eq!(width_values.value(0), 300);
1991 assert_eq!(width_values.value(1), 200);
1992 assert!(width_values.is_null(2)); }
1994
1995 #[test]
1996 fn test_merge_null_typed_column_with_parent_validity() {
1997 let left_struct = StructArray::new(
2001 Fields::from(vec![Field::new("a", DataType::Int32, true)]),
2002 vec![Arc::new(Int32Array::from(vec![Some(1), None])) as ArrayRef],
2003 Some(vec![true, false].into()),
2004 );
2005 let right_struct = StructArray::new(
2006 Fields::from(vec![Field::new("b", DataType::Null, true)]),
2007 vec![Arc::new(NullArray::new(2)) as ArrayRef],
2008 Some(vec![true, false].into()),
2009 );
2010
2011 let merged = merge(&left_struct, &right_struct);
2013 assert_eq!(merged.len(), 2);
2014 let b_col = merged.column_by_name("b").unwrap();
2015 assert_eq!(b_col.data_type(), &DataType::Null);
2017 assert_eq!(b_col.len(), 2);
2018 }
2019
2020 #[test]
2021 fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
2022 let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
2024 let left_count = Arc::new(Int32Array::from(vec![None, None]));
2025 let left_struct = Arc::new(StructArray::new(
2026 Fields::from(vec![
2027 Field::new("company_id", DataType::Int32, true),
2028 Field::new("count", DataType::Int32, true),
2029 ]),
2030 vec![left_company_id, left_count],
2031 None,
2032 ));
2033 let left_list = Arc::new(ListArray::new(
2034 Arc::new(Field::new(
2035 "item",
2036 DataType::Struct(left_struct.fields().clone()),
2037 true,
2038 )),
2039 OffsetBuffer::from_lengths([2]),
2040 left_struct,
2041 None,
2042 ));
2043
2044 let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
2046 let right_struct = Arc::new(StructArray::new(
2047 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2048 vec![right_company_name],
2049 None,
2050 ));
2051 let right_list = Arc::new(ListArray::new(
2052 Arc::new(Field::new(
2053 "item",
2054 DataType::Struct(right_struct.fields().clone()),
2055 true,
2056 )),
2057 OffsetBuffer::from_lengths([2]),
2058 right_struct,
2059 None,
2060 ));
2061
2062 let target_fields = Fields::from(vec![Field::new(
2063 "companies",
2064 DataType::List(Arc::new(Field::new(
2065 "item",
2066 DataType::Struct(Fields::from(vec![
2067 Field::new("company_id", DataType::Int32, true),
2068 Field::new("company_name", DataType::Utf8, true),
2069 Field::new("count", DataType::Int32, true),
2070 ])),
2071 true,
2072 ))),
2073 true,
2074 )]);
2075
2076 let left_batch = RecordBatch::try_new(
2077 Arc::new(Schema::new(vec![Field::new(
2078 "companies",
2079 left_list.data_type().clone(),
2080 true,
2081 )])),
2082 vec![left_list as ArrayRef],
2083 )
2084 .unwrap();
2085
2086 let right_batch = RecordBatch::try_new(
2087 Arc::new(Schema::new(vec![Field::new(
2088 "companies",
2089 right_list.data_type().clone(),
2090 true,
2091 )])),
2092 vec![right_list as ArrayRef],
2093 )
2094 .unwrap();
2095
2096 let merged = left_batch
2097 .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2098 .unwrap();
2099
2100 let merged_list = merged
2102 .column_by_name("companies")
2103 .unwrap()
2104 .as_any()
2105 .downcast_ref::<ListArray>()
2106 .unwrap();
2107 let merged_struct = merged_list.values().as_struct();
2108
2109 assert_eq!(merged_struct.num_columns(), 3);
2111 assert!(merged_struct.column_by_name("company_id").is_some());
2112 assert!(merged_struct.column_by_name("company_name").is_some());
2113 assert!(merged_struct.column_by_name("count").is_some());
2114
2115 let company_id = merged_struct
2117 .column_by_name("company_id")
2118 .unwrap()
2119 .as_any()
2120 .downcast_ref::<Int32Array>()
2121 .unwrap();
2122 assert!(company_id.is_null(0));
2123 assert!(company_id.is_null(1));
2124
2125 let company_name = merged_struct
2126 .column_by_name("company_name")
2127 .unwrap()
2128 .as_any()
2129 .downcast_ref::<StringArray>()
2130 .unwrap();
2131 assert_eq!(company_name.value(0), "Google");
2132 assert_eq!(company_name.value(1), "Microsoft");
2133
2134 let count = merged_struct
2135 .column_by_name("count")
2136 .unwrap()
2137 .as_any()
2138 .downcast_ref::<Int32Array>()
2139 .unwrap();
2140 assert!(count.is_null(0));
2141 assert!(count.is_null(1));
2142 }
2143
2144 #[test]
2145 fn test_merge_struct_lists() {
2146 test_merge_struct_lists_generic::<i32>();
2147 }
2148
2149 #[test]
2150 fn test_merge_struct_large_lists() {
2151 test_merge_struct_lists_generic::<i64>();
2152 }
2153
2154 fn test_merge_struct_lists_generic<O: OffsetSizeTrait>() {
2155 let left_company_id = Arc::new(Int32Array::from(vec![
2157 Some(1),
2158 Some(2),
2159 Some(3),
2160 Some(4),
2161 Some(5),
2162 Some(6),
2163 Some(7),
2164 Some(8),
2165 Some(9),
2166 Some(10),
2167 Some(11),
2168 Some(12),
2169 Some(13),
2170 Some(14),
2171 Some(15),
2172 Some(16),
2173 Some(17),
2174 Some(18),
2175 Some(19),
2176 Some(20),
2177 ]));
2178 let left_count = Arc::new(Int32Array::from(vec![
2179 Some(10),
2180 Some(20),
2181 Some(30),
2182 Some(40),
2183 Some(50),
2184 Some(60),
2185 Some(70),
2186 Some(80),
2187 Some(90),
2188 Some(100),
2189 Some(110),
2190 Some(120),
2191 Some(130),
2192 Some(140),
2193 Some(150),
2194 Some(160),
2195 Some(170),
2196 Some(180),
2197 Some(190),
2198 Some(200),
2199 ]));
2200 let left_struct = Arc::new(StructArray::new(
2201 Fields::from(vec![
2202 Field::new("company_id", DataType::Int32, true),
2203 Field::new("count", DataType::Int32, true),
2204 ]),
2205 vec![left_company_id, left_count],
2206 None,
2207 ));
2208
2209 let left_list = Arc::new(GenericListArray::<O>::new(
2210 Arc::new(Field::new(
2211 "item",
2212 DataType::Struct(left_struct.fields().clone()),
2213 true,
2214 )),
2215 OffsetBuffer::from_lengths([3, 1]),
2216 left_struct.clone(),
2217 None,
2218 ));
2219
2220 let left_list_struct = Arc::new(StructArray::new(
2221 Fields::from(vec![Field::new(
2222 "companies",
2223 if O::IS_LARGE {
2224 DataType::LargeList(Arc::new(Field::new(
2225 "item",
2226 DataType::Struct(left_struct.fields().clone()),
2227 true,
2228 )))
2229 } else {
2230 DataType::List(Arc::new(Field::new(
2231 "item",
2232 DataType::Struct(left_struct.fields().clone()),
2233 true,
2234 )))
2235 },
2236 true,
2237 )]),
2238 vec![left_list as ArrayRef],
2239 None,
2240 ));
2241
2242 let right_company_name = Arc::new(StringArray::from(vec![
2244 "Google",
2245 "Microsoft",
2246 "Apple",
2247 "Facebook",
2248 ]));
2249 let right_struct = Arc::new(StructArray::new(
2250 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2251 vec![right_company_name],
2252 None,
2253 ));
2254 let right_list = Arc::new(GenericListArray::<O>::new(
2255 Arc::new(Field::new(
2256 "item",
2257 DataType::Struct(right_struct.fields().clone()),
2258 true,
2259 )),
2260 OffsetBuffer::from_lengths([3, 1]),
2261 right_struct.clone(),
2262 None,
2263 ));
2264
2265 let right_list_struct = Arc::new(StructArray::new(
2266 Fields::from(vec![Field::new(
2267 "companies",
2268 if O::IS_LARGE {
2269 DataType::LargeList(Arc::new(Field::new(
2270 "item",
2271 DataType::Struct(right_struct.fields().clone()),
2272 true,
2273 )))
2274 } else {
2275 DataType::List(Arc::new(Field::new(
2276 "item",
2277 DataType::Struct(right_struct.fields().clone()),
2278 true,
2279 )))
2280 },
2281 true,
2282 )]),
2283 vec![right_list as ArrayRef],
2284 None,
2285 ));
2286
2287 let target_fields = Fields::from(vec![Field::new(
2289 "companies",
2290 if O::IS_LARGE {
2291 DataType::LargeList(Arc::new(Field::new(
2292 "item",
2293 DataType::Struct(Fields::from(vec![
2294 Field::new("company_id", DataType::Int32, true),
2295 Field::new("company_name", DataType::Utf8, true),
2296 Field::new("count", DataType::Int32, true),
2297 ])),
2298 true,
2299 )))
2300 } else {
2301 DataType::List(Arc::new(Field::new(
2302 "item",
2303 DataType::Struct(Fields::from(vec![
2304 Field::new("company_id", DataType::Int32, true),
2305 Field::new("company_name", DataType::Utf8, true),
2306 Field::new("count", DataType::Int32, true),
2307 ])),
2308 true,
2309 )))
2310 },
2311 true,
2312 )]);
2313
2314 let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
2316 assert_eq!(merged_array.len(), 2);
2317 }
2318
2319 #[test]
2320 fn test_project_by_schema_list_struct_reorder() {
2321 let source_inner_struct = DataType::Struct(Fields::from(vec![
2326 Field::new("c", DataType::Utf8, true),
2327 Field::new("b", DataType::Utf8, true),
2328 Field::new("a", DataType::Utf8, true),
2329 ]));
2330 let source_schema = Arc::new(Schema::new(vec![
2331 Field::new("id", DataType::Int32, false),
2332 Field::new(
2333 "data",
2334 DataType::List(Arc::new(Field::new(
2335 "item",
2336 source_inner_struct.clone(),
2337 true,
2338 ))),
2339 true,
2340 ),
2341 ]));
2342
2343 let c_array = StringArray::from(vec!["c1", "c2"]);
2345 let b_array = StringArray::from(vec!["b1", "b2"]);
2346 let a_array = StringArray::from(vec!["a1", "a2"]);
2347 let inner_struct = StructArray::from(vec![
2348 (
2349 Arc::new(Field::new("c", DataType::Utf8, true)),
2350 Arc::new(c_array) as ArrayRef,
2351 ),
2352 (
2353 Arc::new(Field::new("b", DataType::Utf8, true)),
2354 Arc::new(b_array) as ArrayRef,
2355 ),
2356 (
2357 Arc::new(Field::new("a", DataType::Utf8, true)),
2358 Arc::new(a_array) as ArrayRef,
2359 ),
2360 ]);
2361
2362 let list_array = ListArray::new(
2363 Arc::new(Field::new("item", source_inner_struct, true)),
2364 OffsetBuffer::from_lengths([1, 1]),
2365 Arc::new(inner_struct),
2366 None,
2367 );
2368
2369 let batch = RecordBatch::try_new(
2370 source_schema,
2371 vec![Arc::new(Int32Array::from(vec![1, 2])), Arc::new(list_array)],
2372 )
2373 .unwrap();
2374
2375 let target_inner_struct = DataType::Struct(Fields::from(vec![
2377 Field::new("a", DataType::Utf8, true),
2378 Field::new("b", DataType::Utf8, true),
2379 Field::new("c", DataType::Utf8, true),
2380 ]));
2381 let target_schema = Schema::new(vec![
2382 Field::new("id", DataType::Int32, false),
2383 Field::new(
2384 "data",
2385 DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2386 true,
2387 ),
2388 ]);
2389
2390 let projected = batch.project_by_schema(&target_schema).unwrap();
2392
2393 assert_eq!(projected.schema().as_ref(), &target_schema);
2395
2396 let projected_list = projected.column(1).as_list::<i32>();
2398 let projected_struct = projected_list.values().as_struct();
2399
2400 assert_eq!(
2402 projected_struct.column_by_name("a").unwrap().as_ref(),
2403 &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2404 );
2405 assert_eq!(
2406 projected_struct.column_by_name("b").unwrap().as_ref(),
2407 &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2408 );
2409 assert_eq!(
2410 projected_struct.column_by_name("c").unwrap().as_ref(),
2411 &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2412 );
2413
2414 assert_eq!(
2416 projected_struct.column(0).as_ref(),
2417 &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2418 );
2419 assert_eq!(
2420 projected_struct.column(1).as_ref(),
2421 &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2422 );
2423 assert_eq!(
2424 projected_struct.column(2).as_ref(),
2425 &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2426 );
2427 }
2428
2429 #[test]
2430 fn test_project_by_schema_nested_list_struct() {
2431 let inner_struct = DataType::Struct(Fields::from(vec![
2433 Field::new("y", DataType::Int32, true),
2434 Field::new("x", DataType::Int32, true),
2435 ]));
2436 let source_schema = Arc::new(Schema::new(vec![Field::new(
2437 "outer",
2438 DataType::List(Arc::new(Field::new(
2439 "item",
2440 DataType::Struct(Fields::from(vec![
2441 Field::new("b", DataType::Utf8, true),
2442 Field::new(
2443 "inner_list",
2444 DataType::List(Arc::new(Field::new("item", inner_struct.clone(), true))),
2445 true,
2446 ),
2447 Field::new("a", DataType::Utf8, true),
2448 ])),
2449 true,
2450 ))),
2451 true,
2452 )]));
2453
2454 let y_array = Int32Array::from(vec![1, 2]);
2456 let x_array = Int32Array::from(vec![3, 4]);
2457 let innermost_struct = StructArray::from(vec![
2458 (
2459 Arc::new(Field::new("y", DataType::Int32, true)),
2460 Arc::new(y_array) as ArrayRef,
2461 ),
2462 (
2463 Arc::new(Field::new("x", DataType::Int32, true)),
2464 Arc::new(x_array) as ArrayRef,
2465 ),
2466 ]);
2467 let inner_list = ListArray::new(
2468 Arc::new(Field::new("item", inner_struct.clone(), true)),
2469 OffsetBuffer::from_lengths([2]),
2470 Arc::new(innermost_struct),
2471 None,
2472 );
2473
2474 let b_array = StringArray::from(vec!["b1"]);
2475 let a_array = StringArray::from(vec!["a1"]);
2476 let middle_struct = StructArray::from(vec![
2477 (
2478 Arc::new(Field::new("b", DataType::Utf8, true)),
2479 Arc::new(b_array) as ArrayRef,
2480 ),
2481 (
2482 Arc::new(Field::new(
2483 "inner_list",
2484 DataType::List(Arc::new(Field::new("item", inner_struct, true))),
2485 true,
2486 )),
2487 Arc::new(inner_list) as ArrayRef,
2488 ),
2489 (
2490 Arc::new(Field::new("a", DataType::Utf8, true)),
2491 Arc::new(a_array) as ArrayRef,
2492 ),
2493 ]);
2494
2495 let outer_list = ListArray::new(
2496 Arc::new(Field::new("item", middle_struct.data_type().clone(), true)),
2497 OffsetBuffer::from_lengths([1]),
2498 Arc::new(middle_struct),
2499 None,
2500 );
2501
2502 let batch =
2503 RecordBatch::try_new(source_schema, vec![Arc::new(outer_list) as ArrayRef]).unwrap();
2504
2505 let target_inner_struct = DataType::Struct(Fields::from(vec![
2507 Field::new("x", DataType::Int32, true), Field::new("y", DataType::Int32, true),
2509 ]));
2510 let target_schema = Schema::new(vec![Field::new(
2511 "outer",
2512 DataType::List(Arc::new(Field::new(
2513 "item",
2514 DataType::Struct(Fields::from(vec![
2515 Field::new("a", DataType::Utf8, true), Field::new(
2517 "inner_list",
2518 DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2519 true,
2520 ),
2521 Field::new("b", DataType::Utf8, true),
2522 ])),
2523 true,
2524 ))),
2525 true,
2526 )]);
2527
2528 let projected = batch.project_by_schema(&target_schema).unwrap();
2529
2530 assert_eq!(projected.schema().as_ref(), &target_schema);
2532
2533 let outer_list = projected.column(0).as_list::<i32>();
2535 let middle_struct = outer_list.values().as_struct();
2536
2537 assert_eq!(
2539 middle_struct.column(0).as_ref(),
2540 &StringArray::from(vec!["a1"]) as &dyn Array
2541 );
2542 assert_eq!(
2543 middle_struct.column(2).as_ref(),
2544 &StringArray::from(vec!["b1"]) as &dyn Array
2545 );
2546
2547 let inner_list = middle_struct.column(1).as_list::<i32>();
2549 let innermost_struct = inner_list.values().as_struct();
2550 assert_eq!(
2551 innermost_struct.column(0).as_ref(),
2552 &Int32Array::from(vec![3, 4]) as &dyn Array
2553 );
2554 assert_eq!(
2555 innermost_struct.column(1).as_ref(),
2556 &Int32Array::from(vec![1, 2]) as &dyn Array
2557 );
2558 }
2559}