1use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12 Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray, GenericListArray,
13 LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray,
14 UInt8Array, UInt32Array, cast::AsArray,
15};
16use arrow_array::{
17 Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, new_null_array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SortOptions};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30use crate::list::ListArrayExt;
31pub use floats::*;
32
33pub mod ipc;
34pub mod json;
35pub mod list;
36pub mod memory;
37pub mod scalar;
38pub mod stream;
39pub mod r#struct;
40
41pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
43
44pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
46
47pub const BLOB_META_KEY: &str = "lance-encoding:blob";
50pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
52pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str =
54 "lance-encoding:blob-dedicated-size-threshold";
55pub const BLOB_INLINE_SIZE_THRESHOLD_META_KEY: &str = "lance-encoding:blob-inline-size-threshold";
57
58type Result<T> = std::result::Result<T, ArrowError>;
59
60pub trait DataTypeExt {
61 fn is_binary_like(&self) -> bool;
63
64 fn is_struct(&self) -> bool;
66
67 fn is_fixed_stride(&self) -> bool;
72
73 fn is_dictionary(&self) -> bool;
75
76 fn byte_width(&self) -> usize;
79
80 fn byte_width_opt(&self) -> Option<usize>;
83}
84
85impl DataTypeExt for DataType {
86 fn is_binary_like(&self) -> bool {
87 use DataType::*;
88 matches!(
89 self,
90 Utf8 | Binary | LargeUtf8 | LargeBinary | Utf8View | BinaryView
91 )
92 }
93
94 fn is_struct(&self) -> bool {
95 matches!(self, Self::Struct(_))
96 }
97
98 fn is_fixed_stride(&self) -> bool {
99 use DataType::*;
100 matches!(
101 self,
102 Boolean
103 | UInt8
104 | UInt16
105 | UInt32
106 | UInt64
107 | Int8
108 | Int16
109 | Int32
110 | Int64
111 | Float16
112 | Float32
113 | Float64
114 | Decimal128(_, _)
115 | Decimal256(_, _)
116 | FixedSizeList(_, _)
117 | FixedSizeBinary(_)
118 | Duration(_)
119 | Timestamp(_, _)
120 | Date32
121 | Date64
122 | Time32(_)
123 | Time64(_)
124 )
125 }
126
127 fn is_dictionary(&self) -> bool {
128 matches!(self, Self::Dictionary(_, _))
129 }
130
131 fn byte_width_opt(&self) -> Option<usize> {
132 match self {
133 Self::Int8 => Some(1),
134 Self::Int16 => Some(2),
135 Self::Int32 => Some(4),
136 Self::Int64 => Some(8),
137 Self::UInt8 => Some(1),
138 Self::UInt16 => Some(2),
139 Self::UInt32 => Some(4),
140 Self::UInt64 => Some(8),
141 Self::Float16 => Some(2),
142 Self::Float32 => Some(4),
143 Self::Float64 => Some(8),
144 Self::Date32 => Some(4),
145 Self::Date64 => Some(8),
146 Self::Time32(_) => Some(4),
147 Self::Time64(_) => Some(8),
148 Self::Timestamp(_, _) => Some(8),
149 Self::Duration(_) => Some(8),
150 Self::Decimal128(_, _) => Some(16),
151 Self::Decimal256(_, _) => Some(32),
152 Self::Interval(unit) => match unit {
153 IntervalUnit::YearMonth => Some(4),
154 IntervalUnit::DayTime => Some(8),
155 IntervalUnit::MonthDayNano => Some(16),
156 },
157 Self::FixedSizeBinary(s) => Some(*s as usize),
158 Self::FixedSizeList(dt, s) => dt
159 .data_type()
160 .byte_width_opt()
161 .map(|width| width * *s as usize),
162 _ => None,
163 }
164 }
165
166 fn byte_width(&self) -> usize {
167 self.byte_width_opt()
168 .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
169 }
170}
171
172pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
190 values: T,
191 offsets: &PrimitiveArray<Offset>,
192) -> Result<GenericListArray<Offset::Native>>
193where
194 Offset::Native: OffsetSizeTrait,
195{
196 let data_type = if Offset::Native::IS_LARGE {
197 DataType::LargeList(Arc::new(Field::new(
198 "item",
199 values.data_type().clone(),
200 true,
201 )))
202 } else {
203 DataType::List(Arc::new(Field::new(
204 "item",
205 values.data_type().clone(),
206 true,
207 )))
208 };
209 let data = ArrayDataBuilder::new(data_type)
210 .len(offsets.len() - 1)
211 .add_buffer(offsets.into_data().buffers()[0].clone())
212 .add_child_data(values.into_data())
213 .build()?;
214
215 Ok(GenericListArray::from(data))
216}
217
218pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
219 DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
220}
221
222pub trait FixedSizeListArrayExt {
223 fn try_new_from_values<T: Array + 'static>(
242 values: T,
243 list_size: i32,
244 ) -> Result<FixedSizeListArray>;
245
246 fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
260
261 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
264}
265
266impl FixedSizeListArrayExt for FixedSizeListArray {
267 fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
268 let field = Arc::new(Field::new("item", values.data_type().clone(), true));
269 let values = Arc::new(values);
270
271 Self::try_new(field, list_size, values, None)
272 }
273
274 fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
275 if n >= self.len() {
276 return Ok(self.clone());
277 }
278 let mut rng = SmallRng::from_os_rng();
279 let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
280 take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
281 }
282
283 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
284 match self.data_type() {
285 DataType::FixedSizeList(field, size) => match field.data_type() {
286 DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
287 DataType::Int8 => Ok(Self::new(
288 Arc::new(arrow_schema::Field::new(
289 field.name(),
290 DataType::Float32,
291 field.is_nullable(),
292 )),
293 *size,
294 Arc::new(Float32Array::from_iter_values(
295 self.values()
296 .as_any()
297 .downcast_ref::<Int8Array>()
298 .ok_or(ArrowError::ParseError(
299 "Fail to cast primitive array to Int8Type".to_string(),
300 ))?
301 .into_iter()
302 .filter_map(|x| x.map(|y| y as f32)),
303 )),
304 self.nulls().cloned(),
305 )),
306 DataType::Int16 => Ok(Self::new(
307 Arc::new(arrow_schema::Field::new(
308 field.name(),
309 DataType::Float32,
310 field.is_nullable(),
311 )),
312 *size,
313 Arc::new(Float32Array::from_iter_values(
314 self.values()
315 .as_any()
316 .downcast_ref::<Int16Array>()
317 .ok_or(ArrowError::ParseError(
318 "Fail to cast primitive array to Int16Type".to_string(),
319 ))?
320 .into_iter()
321 .filter_map(|x| x.map(|y| y as f32)),
322 )),
323 self.nulls().cloned(),
324 )),
325 DataType::Int32 => Ok(Self::new(
326 Arc::new(arrow_schema::Field::new(
327 field.name(),
328 DataType::Float32,
329 field.is_nullable(),
330 )),
331 *size,
332 Arc::new(Float32Array::from_iter_values(
333 self.values()
334 .as_any()
335 .downcast_ref::<Int32Array>()
336 .ok_or(ArrowError::ParseError(
337 "Fail to cast primitive array to Int32Type".to_string(),
338 ))?
339 .into_iter()
340 .filter_map(|x| x.map(|y| y as f32)),
341 )),
342 self.nulls().cloned(),
343 )),
344 DataType::Int64 => Ok(Self::new(
345 Arc::new(arrow_schema::Field::new(
346 field.name(),
347 DataType::Float64,
348 field.is_nullable(),
349 )),
350 *size,
351 Arc::new(Float64Array::from_iter_values(
352 self.values()
353 .as_any()
354 .downcast_ref::<Int64Array>()
355 .ok_or(ArrowError::ParseError(
356 "Fail to cast primitive array to Int64Type".to_string(),
357 ))?
358 .into_iter()
359 .filter_map(|x| x.map(|y| y as f64)),
360 )),
361 self.nulls().cloned(),
362 )),
363 DataType::UInt8 => Ok(Self::new(
364 Arc::new(arrow_schema::Field::new(
365 field.name(),
366 DataType::Float64,
367 field.is_nullable(),
368 )),
369 *size,
370 Arc::new(Float64Array::from_iter_values(
371 self.values()
372 .as_any()
373 .downcast_ref::<UInt8Array>()
374 .ok_or(ArrowError::ParseError(
375 "Fail to cast primitive array to UInt8Type".to_string(),
376 ))?
377 .into_iter()
378 .filter_map(|x| x.map(|y| y as f64)),
379 )),
380 self.nulls().cloned(),
381 )),
382 DataType::UInt32 => Ok(Self::new(
383 Arc::new(arrow_schema::Field::new(
384 field.name(),
385 DataType::Float64,
386 field.is_nullable(),
387 )),
388 *size,
389 Arc::new(Float64Array::from_iter_values(
390 self.values()
391 .as_any()
392 .downcast_ref::<UInt32Array>()
393 .ok_or(ArrowError::ParseError(
394 "Fail to cast primitive array to UInt32Type".to_string(),
395 ))?
396 .into_iter()
397 .filter_map(|x| x.map(|y| y as f64)),
398 )),
399 self.nulls().cloned(),
400 )),
401 data_type => Err(ArrowError::ParseError(format!(
402 "Expect either floating type or integer got {:?}",
403 data_type
404 ))),
405 },
406 data_type => Err(ArrowError::ParseError(format!(
407 "Expect either FixedSizeList got {:?}",
408 data_type
409 ))),
410 }
411 }
412}
413
414pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
417 arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
418}
419
420pub trait FixedSizeBinaryArrayExt {
421 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
440}
441
442impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
443 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
444 let data_type = DataType::FixedSizeBinary(stride);
445 let data = ArrayDataBuilder::new(data_type)
446 .len(values.len() / stride as usize)
447 .add_buffer(values.into_data().buffers()[0].clone())
448 .build()?;
449 Ok(Self::from(data))
450 }
451}
452
453pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
454 arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
455}
456
457pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
458 match arr.data_type() {
459 DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
460 DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
461 _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
462 }
463}
464
465pub trait RecordBatchExt {
467 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
497
498 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
500
501 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
505
506 fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
551
552 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
562
563 fn drop_column(&self, name: &str) -> Result<RecordBatch>;
567
568 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
570
571 fn replace_column_schema_by_name(
573 &self,
574 name: &str,
575 new_data_type: DataType,
576 column: Arc<dyn Array>,
577 ) -> Result<RecordBatch>;
578
579 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
581
582 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
584
585 fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
587
588 fn metadata(&self) -> &HashMap<String, String>;
590
591 fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
593 let mut metadata = self.metadata().clone();
594 metadata.insert(key, value);
595 self.with_metadata(metadata)
596 }
597
598 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
600
601 fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
603
604 fn shrink_to_fit(&self) -> Result<RecordBatch>;
606
607 fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<RecordBatch>;
609}
610
611impl RecordBatchExt for RecordBatch {
612 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
613 let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
614 let mut new_columns = self.columns().to_vec();
615 new_columns.push(arr);
616 Self::try_new(new_schema, new_columns)
617 }
618
619 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
620 let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
621 let mut new_columns = self.columns().to_vec();
622 new_columns.insert(index, arr);
623 Self::try_new(new_schema, new_columns)
624 }
625
626 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
627 let schema = Arc::new(Schema::new_with_metadata(
628 arr.fields().to_vec(),
629 self.schema().metadata.clone(),
630 ));
631 let batch = Self::from(arr);
632 batch.with_schema(schema)
633 }
634
635 fn merge(&self, other: &Self) -> Result<Self> {
636 if self.num_rows() != other.num_rows() {
637 return Err(ArrowError::InvalidArgumentError(format!(
638 "Attempt to merge two RecordBatch with different sizes: {} != {}",
639 self.num_rows(),
640 other.num_rows()
641 )));
642 }
643 let left_struct_array: StructArray = self.clone().into();
644 let right_struct_array: StructArray = other.clone().into();
645 self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
646 }
647
648 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
649 if self.num_rows() != other.num_rows() {
650 return Err(ArrowError::InvalidArgumentError(format!(
651 "Attempt to merge two RecordBatch with different sizes: {} != {}",
652 self.num_rows(),
653 other.num_rows()
654 )));
655 }
656 let left_struct_array: StructArray = self.clone().into();
657 let right_struct_array: StructArray = other.clone().into();
658 self.try_new_from_struct_array(merge_with_schema(
659 &left_struct_array,
660 &right_struct_array,
661 schema.fields(),
662 ))
663 }
664
665 fn drop_column(&self, name: &str) -> Result<Self> {
666 let mut fields = vec![];
667 let mut columns = vec![];
668 for i in 0..self.schema().fields.len() {
669 if self.schema().field(i).name() != name {
670 fields.push(self.schema().field(i).clone());
671 columns.push(self.column(i).clone());
672 }
673 }
674 Self::try_new(
675 Arc::new(Schema::new_with_metadata(
676 fields,
677 self.schema().metadata().clone(),
678 )),
679 columns,
680 )
681 }
682
683 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
684 let mut fields = self.schema().fields().to_vec();
685 if index >= fields.len() {
686 return Err(ArrowError::InvalidArgumentError(format!(
687 "Index out of bounds: {}",
688 index
689 )));
690 }
691 fields[index] = Arc::new(Field::new(
692 new_name,
693 fields[index].data_type().clone(),
694 fields[index].is_nullable(),
695 ));
696 Self::try_new(
697 Arc::new(Schema::new_with_metadata(
698 fields,
699 self.schema().metadata().clone(),
700 )),
701 self.columns().to_vec(),
702 )
703 }
704
705 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
706 let mut columns = self.columns().to_vec();
707 let field_i = self
708 .schema()
709 .fields()
710 .iter()
711 .position(|f| f.name() == name)
712 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
713 columns[field_i] = column;
714 Self::try_new(self.schema(), columns)
715 }
716
717 fn replace_column_schema_by_name(
718 &self,
719 name: &str,
720 new_data_type: DataType,
721 column: Arc<dyn Array>,
722 ) -> Result<RecordBatch> {
723 let fields = self
724 .schema()
725 .fields()
726 .iter()
727 .map(|x| {
728 if x.name() != name {
729 x.clone()
730 } else {
731 let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
732 Arc::new(new_field)
733 }
734 })
735 .collect::<Vec<_>>();
736 let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
737 let mut columns = self.columns().to_vec();
738 let field_i = self
739 .schema()
740 .fields()
741 .iter()
742 .position(|f| f.name() == name)
743 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
744 columns[field_i] = column;
745 Self::try_new(Arc::new(schema), columns)
746 }
747
748 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
749 let split = name.split('.').collect::<Vec<_>>();
750 if split.is_empty() {
751 return None;
752 }
753
754 self.column_by_name(split[0])
755 .and_then(|arr| get_sub_array(arr, &split[1..]))
756 }
757
758 fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
759 let struct_array: StructArray = self.clone().into();
760 self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
761 }
762
763 fn metadata(&self) -> &HashMap<String, String> {
764 self.schema_ref().metadata()
765 }
766
767 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
768 let mut schema = self.schema_ref().as_ref().clone();
769 schema.metadata = metadata;
770 Self::try_new(schema.into(), self.columns().into())
771 }
772
773 fn take(&self, indices: &UInt32Array) -> Result<Self> {
774 let struct_array: StructArray = self.clone().into();
775 let taken = take(&struct_array, indices, None)?;
776 self.try_new_from_struct_array(taken.as_struct().clone())
777 }
778
779 fn shrink_to_fit(&self) -> Result<Self> {
780 crate::deepcopy::deep_copy_batch_sliced(self)
782 }
783
784 fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<Self> {
785 if column >= self.num_columns() {
786 return Err(ArrowError::InvalidArgumentError(format!(
787 "Column index out of bounds: {}",
788 column
789 )));
790 }
791 let column = self.column(column);
792 let sorted = arrow_ord::sort::sort_to_indices(column, options, None)?;
793 self.take(&sorted)
794 }
795}
796
797fn project_array(array: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
800 match target_field.data_type() {
801 DataType::Struct(subfields) => {
802 let struct_arr = array.as_struct();
803 let projected = project(struct_arr, subfields)?;
804 Ok(Arc::new(projected))
805 }
806 DataType::List(inner_field) => {
807 let list_arr: &ListArray = array.as_list();
808 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
809 Ok(Arc::new(ListArray::new(
810 inner_field.clone(),
811 list_arr.offsets().clone(),
812 projected_values,
813 list_arr.nulls().cloned(),
814 )))
815 }
816 DataType::LargeList(inner_field) => {
817 let list_arr: &LargeListArray = array.as_list();
818 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
819 Ok(Arc::new(LargeListArray::new(
820 inner_field.clone(),
821 list_arr.offsets().clone(),
822 projected_values,
823 list_arr.nulls().cloned(),
824 )))
825 }
826 DataType::FixedSizeList(inner_field, size) => {
827 let list_arr = array.as_fixed_size_list();
828 let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
829 Ok(Arc::new(FixedSizeListArray::new(
830 inner_field.clone(),
831 *size,
832 projected_values,
833 list_arr.nulls().cloned(),
834 )))
835 }
836 _ => Ok(array.clone()),
837 }
838}
839
840fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
841 if fields.is_empty() {
842 return Ok(StructArray::new_empty_fields(
843 struct_array.len(),
844 struct_array.nulls().cloned(),
845 ));
846 }
847 let mut columns: Vec<ArrayRef> = vec![];
848 for field in fields.iter() {
849 if let Some(col) = struct_array.column_by_name(field.name()) {
850 let projected = project_array(col, field.as_ref())?;
851 columns.push(projected);
852 } else {
853 return Err(ArrowError::SchemaError(format!(
854 "field {} does not exist in the RecordBatch",
855 field.name()
856 )));
857 }
858 }
859 StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
861}
862
863fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
864 let left_list: &GenericListArray<T> = left.as_list();
865 let right_list: &GenericListArray<T> = right.as_list();
866 left_list.offsets().inner() == right_list.offsets().inner()
867}
868
869fn merge_list_structs_helper<T: OffsetSizeTrait>(
870 left: &dyn Array,
871 right: &dyn Array,
872 items_field_name: impl Into<String>,
873 items_nullable: bool,
874) -> Arc<dyn Array> {
875 let left_list: &GenericListArray<T> = left.as_list();
876 let right_list: &GenericListArray<T> = right.as_list();
877 let left_struct = left_list.values();
878 let right_struct = right_list.values();
879 let left_struct_arr = left_struct.as_struct();
880 let right_struct_arr = right_struct.as_struct();
881 let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
882 let items_field = Arc::new(Field::new(
883 items_field_name,
884 merged_items.data_type().clone(),
885 items_nullable,
886 ));
887 Arc::new(GenericListArray::<T>::new(
888 items_field,
889 left_list.offsets().clone(),
890 merged_items,
891 left_list.nulls().cloned(),
892 ))
893}
894
895fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
896 left: &dyn Array,
897 right: &dyn Array,
898 not_null: &dyn Array,
899 items_field_name: impl Into<String>,
900) -> Arc<dyn Array> {
901 let left_list: &GenericListArray<T> = left.as_list::<T>();
902 let not_null_list = not_null.as_list::<T>();
903 let right_list = right.as_list::<T>();
904
905 let left_struct = left_list.values().as_struct();
906 let not_null_struct: &StructArray = not_null_list.values().as_struct();
907 let right_struct = right_list.values().as_struct();
908
909 let values_len = not_null_list.values().len();
910 let mut merged_fields =
911 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
912 let mut merged_columns =
913 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
914
915 for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
916 merged_fields.push(field.clone());
917 if let Some(val) = not_null_struct.column_by_name(field.name()) {
918 merged_columns.push(val.clone());
919 } else {
920 merged_columns.push(new_null_array(field.data_type(), values_len))
921 }
922 }
923 for (_, field) in right_struct
924 .columns()
925 .iter()
926 .zip(right_struct.fields())
927 .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
928 {
929 merged_fields.push(field.clone());
930 if let Some(val) = not_null_struct.column_by_name(field.name()) {
931 merged_columns.push(val.clone());
932 } else {
933 merged_columns.push(new_null_array(field.data_type(), values_len));
934 }
935 }
936
937 let merged_struct = Arc::new(StructArray::new(
938 Fields::from(merged_fields),
939 merged_columns,
940 not_null_struct.nulls().cloned(),
941 ));
942 let items_field = Arc::new(Field::new(
943 items_field_name,
944 merged_struct.data_type().clone(),
945 true,
946 ));
947 Arc::new(GenericListArray::<T>::new(
948 items_field,
949 not_null_list.offsets().clone(),
950 merged_struct,
951 not_null_list.nulls().cloned(),
952 ))
953}
954
955fn merge_list_struct_null(
956 left: &dyn Array,
957 right: &dyn Array,
958 not_null: &dyn Array,
959) -> Arc<dyn Array> {
960 match left.data_type() {
961 DataType::List(left_field) => {
962 merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
963 }
964 DataType::LargeList(left_field) => {
965 merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
966 }
967 _ => unreachable!(),
968 }
969}
970
971fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
972 if left.null_count() == left.len() {
976 return merge_list_struct_null(left, right, right);
977 } else if right.null_count() == right.len() {
978 return merge_list_struct_null(left, right, left);
979 }
980 match (left.data_type(), right.data_type()) {
981 (DataType::List(left_field), DataType::List(_)) => {
982 if !lists_have_same_offsets_helper::<i32>(left, right) {
983 panic!("Attempt to merge list struct arrays which do not have same offsets");
984 }
985 merge_list_structs_helper::<i32>(
986 left,
987 right,
988 left_field.name(),
989 left_field.is_nullable(),
990 )
991 }
992 (DataType::LargeList(left_field), DataType::LargeList(_)) => {
993 if !lists_have_same_offsets_helper::<i64>(left, right) {
994 panic!("Attempt to merge list struct arrays which do not have same offsets");
995 }
996 merge_list_structs_helper::<i64>(
997 left,
998 right,
999 left_field.name(),
1000 left_field.is_nullable(),
1001 )
1002 }
1003 _ => unreachable!(),
1004 }
1005}
1006
1007fn normalize_validity(
1010 validity: Option<&arrow_buffer::NullBuffer>,
1011) -> Option<&arrow_buffer::NullBuffer> {
1012 validity.and_then(|v| {
1013 if v.null_count() == v.len() {
1014 None
1015 } else {
1016 Some(v)
1017 }
1018 })
1019}
1020
1021fn merge_struct_validity(
1026 left_validity: Option<&arrow_buffer::NullBuffer>,
1027 right_validity: Option<&arrow_buffer::NullBuffer>,
1028) -> Option<arrow_buffer::NullBuffer> {
1029 let left_normalized = normalize_validity(left_validity);
1031 let right_normalized = normalize_validity(right_validity);
1032
1033 match (left_normalized, right_normalized) {
1034 (None, None) => None,
1036 (Some(left), None) => Some(left.clone()),
1037 (None, Some(right)) => Some(right.clone()),
1038 (Some(left), Some(right)) => {
1039 if left.null_count() == 0 && right.null_count() == 0 {
1041 return Some(left.clone());
1042 }
1043
1044 let left_buffer = left.inner();
1045 let right_buffer = right.inner();
1046
1047 let merged_buffer = left_buffer | right_buffer;
1050
1051 Some(arrow_buffer::NullBuffer::from(merged_buffer))
1052 }
1053 }
1054}
1055
1056fn merge_list_child_values(
1057 child_field: &Field,
1058 left_values: ArrayRef,
1059 right_values: ArrayRef,
1060) -> ArrayRef {
1061 match child_field.data_type() {
1062 DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1063 left_values.as_struct(),
1064 right_values.as_struct(),
1065 child_fields,
1066 )) as ArrayRef,
1067 DataType::List(grandchild) => {
1068 let left_list = left_values
1069 .as_any()
1070 .downcast_ref::<ListArray>()
1071 .expect("left list values should be ListArray");
1072 let right_list = right_values
1073 .as_any()
1074 .downcast_ref::<ListArray>()
1075 .expect("right list values should be ListArray");
1076 let merged_values = merge_list_child_values(
1077 grandchild.as_ref(),
1078 left_list.values().clone(),
1079 right_list.values().clone(),
1080 );
1081 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1082 Arc::new(ListArray::new(
1083 grandchild.clone(),
1084 left_list.offsets().clone(),
1085 merged_values,
1086 merged_validity,
1087 )) as ArrayRef
1088 }
1089 DataType::LargeList(grandchild) => {
1090 let left_list = left_values
1091 .as_any()
1092 .downcast_ref::<LargeListArray>()
1093 .expect("left list values should be LargeListArray");
1094 let right_list = right_values
1095 .as_any()
1096 .downcast_ref::<LargeListArray>()
1097 .expect("right list values should be LargeListArray");
1098 let merged_values = merge_list_child_values(
1099 grandchild.as_ref(),
1100 left_list.values().clone(),
1101 right_list.values().clone(),
1102 );
1103 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1104 Arc::new(LargeListArray::new(
1105 grandchild.clone(),
1106 left_list.offsets().clone(),
1107 merged_values,
1108 merged_validity,
1109 )) as ArrayRef
1110 }
1111 DataType::FixedSizeList(grandchild, list_size) => {
1112 let left_list = left_values
1113 .as_any()
1114 .downcast_ref::<FixedSizeListArray>()
1115 .expect("left list values should be FixedSizeListArray");
1116 let right_list = right_values
1117 .as_any()
1118 .downcast_ref::<FixedSizeListArray>()
1119 .expect("right list values should be FixedSizeListArray");
1120 let merged_values = merge_list_child_values(
1121 grandchild.as_ref(),
1122 left_list.values().clone(),
1123 right_list.values().clone(),
1124 );
1125 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1126 Arc::new(FixedSizeListArray::new(
1127 grandchild.clone(),
1128 *list_size,
1129 merged_values,
1130 merged_validity,
1131 )) as ArrayRef
1132 }
1133 _ => left_values.clone(),
1134 }
1135}
1136
1137fn adjust_child_validity(
1141 child: &ArrayRef,
1142 parent_validity: Option<&arrow_buffer::NullBuffer>,
1143) -> ArrayRef {
1144 let parent_validity = match parent_validity {
1146 None => return child.clone(),
1147 Some(p) if p.null_count() == 0 => return child.clone(), Some(p) => p,
1149 };
1150
1151 if child.data_type() == &DataType::Null {
1154 return child.clone();
1155 }
1156
1157 let child_validity = child.nulls();
1158
1159 let new_validity = match child_validity {
1161 None => {
1162 parent_validity.clone()
1164 }
1165 Some(child_nulls) => {
1166 let child_buffer = child_nulls.inner();
1167 let parent_buffer = parent_validity.inner();
1168
1169 let merged_buffer = child_buffer & parent_buffer;
1172
1173 arrow_buffer::NullBuffer::from(merged_buffer)
1174 }
1175 };
1176
1177 arrow_array::make_array(
1179 arrow_data::ArrayData::try_new(
1180 child.data_type().clone(),
1181 child.len(),
1182 Some(new_validity.into_inner().into_inner()),
1183 child.offset(),
1184 child.to_data().buffers().to_vec(),
1185 child.to_data().child_data().to_vec(),
1186 )
1187 .unwrap(),
1188 )
1189}
1190
1191fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1192 let mut fields: Vec<Field> = vec![];
1193 let mut columns: Vec<ArrayRef> = vec![];
1194 let right_fields = right_struct_array.fields();
1195 let right_columns = right_struct_array.columns();
1196
1197 let left_validity = left_struct_array.nulls();
1199 let right_validity = right_struct_array.nulls();
1200
1201 let merged_validity = merge_struct_validity(left_validity, right_validity);
1203
1204 for (left_field, left_column) in left_struct_array
1206 .fields()
1207 .iter()
1208 .zip(left_struct_array.columns().iter())
1209 {
1210 match right_fields
1211 .iter()
1212 .position(|f| f.name() == left_field.name())
1213 {
1214 Some(right_index) => {
1216 let right_field = right_fields.get(right_index).unwrap();
1217 let right_column = right_columns.get(right_index).unwrap();
1218 match (left_field.data_type(), right_field.data_type()) {
1220 (DataType::Struct(_), DataType::Struct(_)) => {
1221 let left_sub_array = left_column.as_struct();
1222 let right_sub_array = right_column.as_struct();
1223 let merged_sub_array = merge(left_sub_array, right_sub_array);
1224 fields.push(Field::new(
1225 left_field.name(),
1226 merged_sub_array.data_type().clone(),
1227 left_field.is_nullable(),
1228 ));
1229 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1230 }
1231 (DataType::List(left_list), DataType::List(right_list))
1232 if left_list.data_type().is_struct()
1233 && right_list.data_type().is_struct() =>
1234 {
1235 if left_list.data_type() == right_list.data_type() {
1237 fields.push(left_field.as_ref().clone());
1238 columns.push(left_column.clone());
1239 }
1240 let merged_sub_array = merge_list_struct(&left_column, &right_column);
1244
1245 fields.push(Field::new(
1246 left_field.name(),
1247 merged_sub_array.data_type().clone(),
1248 left_field.is_nullable(),
1249 ));
1250 columns.push(merged_sub_array);
1251 }
1252 _ => {
1254 fields.push(left_field.as_ref().clone());
1256 let adjusted_column = adjust_child_validity(left_column, left_validity);
1258 columns.push(adjusted_column);
1259 }
1260 }
1261 }
1262 None => {
1263 fields.push(left_field.as_ref().clone());
1264 let adjusted_column = adjust_child_validity(left_column, left_validity);
1266 columns.push(adjusted_column);
1267 }
1268 }
1269 }
1270
1271 right_fields
1273 .iter()
1274 .zip(right_columns.iter())
1275 .for_each(|(field, column)| {
1276 if !left_struct_array
1278 .fields()
1279 .iter()
1280 .any(|f| f.name() == field.name())
1281 {
1282 fields.push(field.as_ref().clone());
1283 let adjusted_column = adjust_child_validity(column, right_validity);
1286 columns.push(adjusted_column);
1287 }
1288 });
1289
1290 StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1291}
1292
1293fn merge_with_schema(
1294 left_struct_array: &StructArray,
1295 right_struct_array: &StructArray,
1296 fields: &Fields,
1297) -> StructArray {
1298 fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1300 match (left, right) {
1301 (DataType::Struct(_), DataType::Struct(_)) => true,
1302 (DataType::Struct(_), _) => false,
1303 (_, DataType::Struct(_)) => false,
1304 _ => true,
1305 }
1306 }
1307
1308 let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1309 let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1310
1311 let left_fields = left_struct_array.fields();
1312 let left_columns = left_struct_array.columns();
1313 let right_fields = right_struct_array.fields();
1314 let right_columns = right_struct_array.columns();
1315
1316 let left_validity = left_struct_array.nulls();
1318 let right_validity = right_struct_array.nulls();
1319
1320 let merged_validity = merge_struct_validity(left_validity, right_validity);
1322
1323 for field in fields {
1324 let left_match_idx = left_fields.iter().position(|f| {
1325 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1326 });
1327 let right_match_idx = right_fields.iter().position(|f| {
1328 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1329 });
1330
1331 match (left_match_idx, right_match_idx) {
1332 (None, Some(right_idx)) => {
1333 output_fields.push(right_fields[right_idx].as_ref().clone());
1334 let adjusted_column =
1336 adjust_child_validity(&right_columns[right_idx], right_validity);
1337 columns.push(adjusted_column);
1338 }
1339 (Some(left_idx), None) => {
1340 output_fields.push(left_fields[left_idx].as_ref().clone());
1341 let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1343 columns.push(adjusted_column);
1344 }
1345 (Some(left_idx), Some(right_idx)) => {
1346 match field.data_type() {
1347 DataType::Struct(child_fields) => {
1348 let left_sub_array = left_columns[left_idx].as_struct();
1349 let right_sub_array = right_columns[right_idx].as_struct();
1350 let merged_sub_array =
1351 merge_with_schema(left_sub_array, right_sub_array, child_fields);
1352 output_fields.push(Field::new(
1353 field.name(),
1354 merged_sub_array.data_type().clone(),
1355 field.is_nullable(),
1356 ));
1357 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1358 }
1359 DataType::List(child_field) => {
1360 let left_list = left_columns[left_idx]
1361 .as_any()
1362 .downcast_ref::<ListArray>()
1363 .unwrap();
1364 let right_list = right_columns[right_idx]
1365 .as_any()
1366 .downcast_ref::<ListArray>()
1367 .unwrap();
1368 let merged_values = merge_list_child_values(
1369 child_field.as_ref(),
1370 left_list.trimmed_values(),
1371 right_list.trimmed_values(),
1372 );
1373 let merged_validity =
1374 merge_struct_validity(left_list.nulls(), right_list.nulls());
1375 let merged_list = ListArray::new(
1376 child_field.clone(),
1377 left_list.offsets().clone(),
1378 merged_values,
1379 merged_validity,
1380 );
1381 output_fields.push(field.as_ref().clone());
1382 columns.push(Arc::new(merged_list) as ArrayRef);
1383 }
1384 DataType::LargeList(child_field) => {
1385 let left_list = left_columns[left_idx]
1386 .as_any()
1387 .downcast_ref::<LargeListArray>()
1388 .unwrap();
1389 let right_list = right_columns[right_idx]
1390 .as_any()
1391 .downcast_ref::<LargeListArray>()
1392 .unwrap();
1393 let merged_values = merge_list_child_values(
1394 child_field.as_ref(),
1395 left_list.trimmed_values(),
1396 right_list.trimmed_values(),
1397 );
1398 let merged_validity =
1399 merge_struct_validity(left_list.nulls(), right_list.nulls());
1400 let merged_list = LargeListArray::new(
1401 child_field.clone(),
1402 left_list.offsets().clone(),
1403 merged_values,
1404 merged_validity,
1405 );
1406 output_fields.push(field.as_ref().clone());
1407 columns.push(Arc::new(merged_list) as ArrayRef);
1408 }
1409 DataType::FixedSizeList(child_field, list_size) => {
1410 let left_list = left_columns[left_idx]
1411 .as_any()
1412 .downcast_ref::<FixedSizeListArray>()
1413 .unwrap();
1414 let right_list = right_columns[right_idx]
1415 .as_any()
1416 .downcast_ref::<FixedSizeListArray>()
1417 .unwrap();
1418 let merged_values = merge_list_child_values(
1419 child_field.as_ref(),
1420 left_list.values().clone(),
1421 right_list.values().clone(),
1422 );
1423 let merged_validity =
1424 merge_struct_validity(left_list.nulls(), right_list.nulls());
1425 let merged_list = FixedSizeListArray::new(
1426 child_field.clone(),
1427 *list_size,
1428 merged_values,
1429 merged_validity,
1430 );
1431 output_fields.push(field.as_ref().clone());
1432 columns.push(Arc::new(merged_list) as ArrayRef);
1433 }
1434 _ => {
1435 output_fields.push(left_fields[left_idx].as_ref().clone());
1436 let adjusted_column =
1438 adjust_child_validity(&left_columns[left_idx], left_validity);
1439 columns.push(adjusted_column);
1440 }
1441 }
1442 }
1443 (None, None) => {
1444 }
1446 }
1447 }
1448
1449 StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1450}
1451
1452fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1453 if components.is_empty() {
1454 return Some(array);
1455 }
1456 if !matches!(array.data_type(), DataType::Struct(_)) {
1457 return None;
1458 }
1459 let struct_arr = array.as_struct();
1460 struct_arr
1461 .column_by_name(components[0])
1462 .and_then(|arr| get_sub_array(arr, &components[1..]))
1463}
1464
1465pub fn interleave_batches(
1469 batches: &[RecordBatch],
1470 indices: &[(usize, usize)],
1471) -> Result<RecordBatch> {
1472 let first_batch = batches.first().ok_or_else(|| {
1473 ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1474 })?;
1475 let schema = first_batch.schema();
1476 let num_columns = first_batch.num_columns();
1477 let mut columns = Vec::with_capacity(num_columns);
1478 let mut chunks = Vec::with_capacity(batches.len());
1479
1480 for i in 0..num_columns {
1481 for batch in batches {
1482 chunks.push(batch.column(i).as_ref());
1483 }
1484 let new_column = interleave(&chunks, indices)?;
1485 columns.push(new_column);
1486 chunks.clear();
1487 }
1488
1489 RecordBatch::try_new(schema, columns)
1490}
1491
1492pub trait BufferExt {
1493 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1508
1509 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1518}
1519
1520fn is_pwr_two(n: u64) -> bool {
1521 n & (n - 1) == 0
1522}
1523
1524impl BufferExt for arrow_buffer::Buffer {
1525 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1526 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1527 {
1528 let size_bytes = bytes.len();
1530 Self::copy_bytes_bytes(bytes, size_bytes)
1531 } else {
1532 unsafe {
1535 Self::from_custom_allocation(
1536 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1537 bytes.len(),
1538 Arc::new(bytes),
1539 )
1540 }
1541 }
1542 }
1543
1544 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1545 assert!(size_bytes >= bytes.len());
1546 let mut buf = MutableBuffer::with_capacity(size_bytes);
1547 let to_fill = size_bytes - bytes.len();
1548 buf.extend(bytes);
1549 buf.extend(std::iter::repeat_n(0_u8, to_fill));
1550
1551 buf.shrink_to_fit();
1554
1555 Self::from(buf)
1556 }
1557}
1558
1559#[cfg(test)]
1560mod tests {
1561 use super::*;
1562 use arrow_array::{Float32Array, Int32Array, NullArray, StructArray};
1563 use arrow_array::{ListArray, StringArray, new_empty_array, new_null_array};
1564 use arrow_buffer::OffsetBuffer;
1565
1566 #[test]
1567 fn test_merge_recursive() {
1568 let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1569 let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1570 let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1571 let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1572
1573 let left_schema = Schema::new(vec![
1574 Field::new("a", DataType::Int32, true),
1575 Field::new(
1576 "b",
1577 DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1578 true,
1579 ),
1580 ]);
1581 let left_batch = RecordBatch::try_new(
1582 Arc::new(left_schema),
1583 vec![
1584 Arc::new(a_array.clone()),
1585 Arc::new(StructArray::from(vec![(
1586 Arc::new(Field::new("c", DataType::Int32, true)),
1587 Arc::new(c_array.clone()) as ArrayRef,
1588 )])),
1589 ],
1590 )
1591 .unwrap();
1592
1593 let right_schema = Schema::new(vec![
1594 Field::new("e", DataType::Int32, true),
1595 Field::new(
1596 "b",
1597 DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1598 true,
1599 ),
1600 ]);
1601 let right_batch = RecordBatch::try_new(
1602 Arc::new(right_schema),
1603 vec![
1604 Arc::new(e_array.clone()),
1605 Arc::new(StructArray::from(vec![(
1606 Arc::new(Field::new("d", DataType::Utf8, true)),
1607 Arc::new(d_array.clone()) as ArrayRef,
1608 )])) as ArrayRef,
1609 ],
1610 )
1611 .unwrap();
1612
1613 let merged_schema = Schema::new(vec![
1614 Field::new("a", DataType::Int32, true),
1615 Field::new(
1616 "b",
1617 DataType::Struct(
1618 vec![
1619 Field::new("c", DataType::Int32, true),
1620 Field::new("d", DataType::Utf8, true),
1621 ]
1622 .into(),
1623 ),
1624 true,
1625 ),
1626 Field::new("e", DataType::Int32, true),
1627 ]);
1628 let merged_batch = RecordBatch::try_new(
1629 Arc::new(merged_schema),
1630 vec![
1631 Arc::new(a_array) as ArrayRef,
1632 Arc::new(StructArray::from(vec![
1633 (
1634 Arc::new(Field::new("c", DataType::Int32, true)),
1635 Arc::new(c_array) as ArrayRef,
1636 ),
1637 (
1638 Arc::new(Field::new("d", DataType::Utf8, true)),
1639 Arc::new(d_array) as ArrayRef,
1640 ),
1641 ])) as ArrayRef,
1642 Arc::new(e_array) as ArrayRef,
1643 ],
1644 )
1645 .unwrap();
1646
1647 let result = left_batch.merge(&right_batch).unwrap();
1648 assert_eq!(result, merged_batch);
1649 }
1650
1651 #[test]
1652 fn test_merge_with_schema() {
1653 fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1654 let fields: Fields = names
1655 .iter()
1656 .zip(types)
1657 .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1658 .collect();
1659 let schema = Schema::new(vec![Field::new(
1660 "struct",
1661 DataType::Struct(fields.clone()),
1662 false,
1663 )]);
1664 let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1665 let batch = RecordBatch::try_new(
1666 Arc::new(schema.clone()),
1667 vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1668 );
1669 (schema, batch.unwrap())
1670 }
1671
1672 let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1673 let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1674 let (output_schema, _) = test_batch(
1675 &["b", "a", "c"],
1676 &[DataType::Int64, DataType::Int32, DataType::Int32],
1677 );
1678
1679 let merged = left_batch
1681 .merge_with_schema(&right_batch, &output_schema)
1682 .unwrap();
1683 assert_eq!(merged.schema().as_ref(), &output_schema);
1684
1685 let (naive_schema, _) = test_batch(
1687 &["a", "b", "c"],
1688 &[DataType::Int32, DataType::Int64, DataType::Int32],
1689 );
1690 let merged = left_batch.merge(&right_batch).unwrap();
1691 assert_eq!(merged.schema().as_ref(), &naive_schema);
1692 }
1693
1694 #[test]
1695 fn test_merge_list_struct() {
1696 let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1697 let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1698 let x_struct_field = Arc::new(Field::new(
1699 "item",
1700 DataType::Struct(Fields::from(vec![x_field.clone()])),
1701 true,
1702 ));
1703 let y_struct_field = Arc::new(Field::new(
1704 "item",
1705 DataType::Struct(Fields::from(vec![y_field.clone()])),
1706 true,
1707 ));
1708 let both_struct_field = Arc::new(Field::new(
1709 "item",
1710 DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1711 true,
1712 ));
1713 let left_schema = Schema::new(vec![Field::new(
1714 "list_struct",
1715 DataType::List(x_struct_field.clone()),
1716 true,
1717 )]);
1718 let right_schema = Schema::new(vec![Field::new(
1719 "list_struct",
1720 DataType::List(y_struct_field.clone()),
1721 true,
1722 )]);
1723 let both_schema = Schema::new(vec![Field::new(
1724 "list_struct",
1725 DataType::List(both_struct_field.clone()),
1726 true,
1727 )]);
1728
1729 let x = Arc::new(Int32Array::from(vec![1]));
1730 let y = Arc::new(Int32Array::from(vec![2]));
1731 let x_struct = Arc::new(StructArray::new(
1732 Fields::from(vec![x_field.clone()]),
1733 vec![x.clone()],
1734 None,
1735 ));
1736 let y_struct = Arc::new(StructArray::new(
1737 Fields::from(vec![y_field.clone()]),
1738 vec![y.clone()],
1739 None,
1740 ));
1741 let both_struct = Arc::new(StructArray::new(
1742 Fields::from(vec![x_field.clone(), y_field.clone()]),
1743 vec![x.clone(), y],
1744 None,
1745 ));
1746 let both_null_struct = Arc::new(StructArray::new(
1747 Fields::from(vec![x_field, y_field]),
1748 vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1749 None,
1750 ));
1751 let offsets = OffsetBuffer::from_lengths([1]);
1752 let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1753 let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1754 let both_list = ListArray::new(
1755 both_struct_field.clone(),
1756 offsets.clone(),
1757 both_struct,
1758 None,
1759 );
1760 let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1761 let x_batch =
1762 RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1763 let y_batch = RecordBatch::try_new(
1764 Arc::new(right_schema.clone()),
1765 vec![Arc::new(y_s_list.clone())],
1766 )
1767 .unwrap();
1768 let merged = x_batch.merge(&y_batch).unwrap();
1769 let expected =
1770 RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1771 assert_eq!(merged, expected);
1772
1773 let y_null_list = new_null_array(y_s_list.data_type(), 1);
1774 let y_null_batch =
1775 RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1776 .unwrap();
1777 let expected =
1778 RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1779 let merged = x_batch.merge(&y_null_batch).unwrap();
1780 assert_eq!(merged, expected);
1781 }
1782
1783 #[test]
1784 fn test_byte_width_opt() {
1785 assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1786 assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1787 assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1788 assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1789 assert_eq!(DataType::Utf8.byte_width_opt(), None);
1790 assert_eq!(DataType::Binary.byte_width_opt(), None);
1791 assert_eq!(
1792 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1793 None
1794 );
1795 assert_eq!(
1796 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1797 .byte_width_opt(),
1798 Some(12)
1799 );
1800 assert_eq!(
1801 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1802 .byte_width_opt(),
1803 Some(16)
1804 );
1805 assert_eq!(
1806 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1807 .byte_width_opt(),
1808 None
1809 );
1810 }
1811
1812 #[test]
1813 fn test_take_record_batch() {
1814 let schema = Arc::new(Schema::new(vec![
1815 Field::new("a", DataType::Int32, true),
1816 Field::new("b", DataType::Utf8, true),
1817 ]));
1818 let batch = RecordBatch::try_new(
1819 schema.clone(),
1820 vec![
1821 Arc::new(Int32Array::from_iter_values(0..20)),
1822 Arc::new(StringArray::from_iter_values(
1823 (0..20).map(|i| format!("str-{}", i)),
1824 )),
1825 ],
1826 )
1827 .unwrap();
1828 let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1829 assert_eq!(
1830 taken,
1831 RecordBatch::try_new(
1832 schema,
1833 vec![
1834 Arc::new(Int32Array::from(vec![1, 5, 10])),
1835 Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1836 ],
1837 )
1838 .unwrap()
1839 )
1840 }
1841
1842 #[test]
1843 fn test_schema_project_by_schema() {
1844 let metadata = [("key".to_string(), "value".to_string())];
1845 let schema = Arc::new(
1846 Schema::new(vec![
1847 Field::new("a", DataType::Int32, true),
1848 Field::new("b", DataType::Utf8, true),
1849 ])
1850 .with_metadata(metadata.clone().into()),
1851 );
1852 let batch = RecordBatch::try_new(
1853 schema,
1854 vec![
1855 Arc::new(Int32Array::from_iter_values(0..20)),
1856 Arc::new(StringArray::from_iter_values(
1857 (0..20).map(|i| format!("str-{}", i)),
1858 )),
1859 ],
1860 )
1861 .unwrap();
1862
1863 let empty_schema = Schema::empty();
1865 let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1866 let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1867 assert_eq!(
1868 empty_projected,
1869 RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1870 .with_schema(Arc::new(expected_schema))
1871 .unwrap()
1872 );
1873
1874 let reordered_schema = Schema::new(vec![
1876 Field::new("b", DataType::Utf8, true),
1877 Field::new("a", DataType::Int32, true),
1878 ]);
1879 let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1880 let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1881 assert_eq!(
1882 reordered_projected,
1883 RecordBatch::try_new(
1884 expected_schema,
1885 vec![
1886 Arc::new(StringArray::from_iter_values(
1887 (0..20).map(|i| format!("str-{}", i)),
1888 )),
1889 Arc::new(Int32Array::from_iter_values(0..20)),
1890 ],
1891 )
1892 .unwrap()
1893 );
1894
1895 let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1897 let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1898 let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1899 assert_eq!(
1900 sub_projected,
1901 RecordBatch::try_new(
1902 expected_schema,
1903 vec![Arc::new(Int32Array::from_iter_values(0..20))],
1904 )
1905 .unwrap()
1906 );
1907 }
1908
1909 #[test]
1910 fn test_project_preserves_struct_validity() {
1911 let fields = Fields::from(vec![
1913 Field::new("id", DataType::Int32, false),
1914 Field::new("value", DataType::Float32, true),
1915 ]);
1916
1917 let id_array = Int32Array::from(vec![1, 2, 3]);
1919 let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1920 let struct_array = StructArray::new(
1921 fields.clone(),
1922 vec![
1923 Arc::new(id_array) as ArrayRef,
1924 Arc::new(value_array) as ArrayRef,
1925 ],
1926 Some(vec![true, false, true].into()), );
1928
1929 let projected = project(&struct_array, &fields).unwrap();
1931
1932 assert_eq!(projected.null_count(), 1);
1934 assert!(!projected.is_null(0));
1935 assert!(projected.is_null(1));
1936 assert!(!projected.is_null(2));
1937 }
1938
1939 #[test]
1940 fn test_merge_struct_with_different_validity() {
1941 let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1944 let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1945 let left_struct = StructArray::new(
1946 left_fields,
1947 vec![Arc::new(height_array) as ArrayRef],
1948 Some(vec![true, false, true, false].into()), );
1950
1951 let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1953 let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1954 let right_struct = StructArray::new(
1955 right_fields,
1956 vec![Arc::new(width_array) as ArrayRef],
1957 Some(vec![true, true, false, false].into()), );
1959
1960 let merged = merge(&left_struct, &right_struct);
1962
1963 assert_eq!(merged.null_count(), 1); assert!(!merged.is_null(0));
1971 assert!(!merged.is_null(1));
1972 assert!(!merged.is_null(2));
1973 assert!(merged.is_null(3));
1974
1975 let height_col = merged.column_by_name("height").unwrap();
1977 let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1978 assert_eq!(height_values.value(0), 500);
1979 assert!(height_values.is_null(1)); assert_eq!(height_values.value(2), 600);
1981
1982 let width_col = merged.column_by_name("width").unwrap();
1983 let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1984 assert_eq!(width_values.value(0), 300);
1985 assert_eq!(width_values.value(1), 200);
1986 assert!(width_values.is_null(2)); }
1988
1989 #[test]
1990 fn test_merge_null_typed_column_with_parent_validity() {
1991 let left_struct = StructArray::new(
1995 Fields::from(vec![Field::new("a", DataType::Int32, true)]),
1996 vec![Arc::new(Int32Array::from(vec![Some(1), None])) as ArrayRef],
1997 Some(vec![true, false].into()),
1998 );
1999 let right_struct = StructArray::new(
2000 Fields::from(vec![Field::new("b", DataType::Null, true)]),
2001 vec![Arc::new(NullArray::new(2)) as ArrayRef],
2002 Some(vec![true, false].into()),
2003 );
2004
2005 let merged = merge(&left_struct, &right_struct);
2007 assert_eq!(merged.len(), 2);
2008 let b_col = merged.column_by_name("b").unwrap();
2009 assert_eq!(b_col.data_type(), &DataType::Null);
2011 assert_eq!(b_col.len(), 2);
2012 }
2013
2014 #[test]
2015 fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
2016 let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
2018 let left_count = Arc::new(Int32Array::from(vec![None, None]));
2019 let left_struct = Arc::new(StructArray::new(
2020 Fields::from(vec![
2021 Field::new("company_id", DataType::Int32, true),
2022 Field::new("count", DataType::Int32, true),
2023 ]),
2024 vec![left_company_id, left_count],
2025 None,
2026 ));
2027 let left_list = Arc::new(ListArray::new(
2028 Arc::new(Field::new(
2029 "item",
2030 DataType::Struct(left_struct.fields().clone()),
2031 true,
2032 )),
2033 OffsetBuffer::from_lengths([2]),
2034 left_struct,
2035 None,
2036 ));
2037
2038 let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
2040 let right_struct = Arc::new(StructArray::new(
2041 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2042 vec![right_company_name],
2043 None,
2044 ));
2045 let right_list = Arc::new(ListArray::new(
2046 Arc::new(Field::new(
2047 "item",
2048 DataType::Struct(right_struct.fields().clone()),
2049 true,
2050 )),
2051 OffsetBuffer::from_lengths([2]),
2052 right_struct,
2053 None,
2054 ));
2055
2056 let target_fields = Fields::from(vec![Field::new(
2057 "companies",
2058 DataType::List(Arc::new(Field::new(
2059 "item",
2060 DataType::Struct(Fields::from(vec![
2061 Field::new("company_id", DataType::Int32, true),
2062 Field::new("company_name", DataType::Utf8, true),
2063 Field::new("count", DataType::Int32, true),
2064 ])),
2065 true,
2066 ))),
2067 true,
2068 )]);
2069
2070 let left_batch = RecordBatch::try_new(
2071 Arc::new(Schema::new(vec![Field::new(
2072 "companies",
2073 left_list.data_type().clone(),
2074 true,
2075 )])),
2076 vec![left_list as ArrayRef],
2077 )
2078 .unwrap();
2079
2080 let right_batch = RecordBatch::try_new(
2081 Arc::new(Schema::new(vec![Field::new(
2082 "companies",
2083 right_list.data_type().clone(),
2084 true,
2085 )])),
2086 vec![right_list as ArrayRef],
2087 )
2088 .unwrap();
2089
2090 let merged = left_batch
2091 .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2092 .unwrap();
2093
2094 let merged_list = merged
2096 .column_by_name("companies")
2097 .unwrap()
2098 .as_any()
2099 .downcast_ref::<ListArray>()
2100 .unwrap();
2101 let merged_struct = merged_list.values().as_struct();
2102
2103 assert_eq!(merged_struct.num_columns(), 3);
2105 assert!(merged_struct.column_by_name("company_id").is_some());
2106 assert!(merged_struct.column_by_name("company_name").is_some());
2107 assert!(merged_struct.column_by_name("count").is_some());
2108
2109 let company_id = merged_struct
2111 .column_by_name("company_id")
2112 .unwrap()
2113 .as_any()
2114 .downcast_ref::<Int32Array>()
2115 .unwrap();
2116 assert!(company_id.is_null(0));
2117 assert!(company_id.is_null(1));
2118
2119 let company_name = merged_struct
2120 .column_by_name("company_name")
2121 .unwrap()
2122 .as_any()
2123 .downcast_ref::<StringArray>()
2124 .unwrap();
2125 assert_eq!(company_name.value(0), "Google");
2126 assert_eq!(company_name.value(1), "Microsoft");
2127
2128 let count = merged_struct
2129 .column_by_name("count")
2130 .unwrap()
2131 .as_any()
2132 .downcast_ref::<Int32Array>()
2133 .unwrap();
2134 assert!(count.is_null(0));
2135 assert!(count.is_null(1));
2136 }
2137
2138 #[test]
2139 fn test_merge_struct_lists() {
2140 test_merge_struct_lists_generic::<i32>();
2141 }
2142
2143 #[test]
2144 fn test_merge_struct_large_lists() {
2145 test_merge_struct_lists_generic::<i64>();
2146 }
2147
2148 fn test_merge_struct_lists_generic<O: OffsetSizeTrait>() {
2149 let left_company_id = Arc::new(Int32Array::from(vec![
2151 Some(1),
2152 Some(2),
2153 Some(3),
2154 Some(4),
2155 Some(5),
2156 Some(6),
2157 Some(7),
2158 Some(8),
2159 Some(9),
2160 Some(10),
2161 Some(11),
2162 Some(12),
2163 Some(13),
2164 Some(14),
2165 Some(15),
2166 Some(16),
2167 Some(17),
2168 Some(18),
2169 Some(19),
2170 Some(20),
2171 ]));
2172 let left_count = Arc::new(Int32Array::from(vec![
2173 Some(10),
2174 Some(20),
2175 Some(30),
2176 Some(40),
2177 Some(50),
2178 Some(60),
2179 Some(70),
2180 Some(80),
2181 Some(90),
2182 Some(100),
2183 Some(110),
2184 Some(120),
2185 Some(130),
2186 Some(140),
2187 Some(150),
2188 Some(160),
2189 Some(170),
2190 Some(180),
2191 Some(190),
2192 Some(200),
2193 ]));
2194 let left_struct = Arc::new(StructArray::new(
2195 Fields::from(vec![
2196 Field::new("company_id", DataType::Int32, true),
2197 Field::new("count", DataType::Int32, true),
2198 ]),
2199 vec![left_company_id, left_count],
2200 None,
2201 ));
2202
2203 let left_list = Arc::new(GenericListArray::<O>::new(
2204 Arc::new(Field::new(
2205 "item",
2206 DataType::Struct(left_struct.fields().clone()),
2207 true,
2208 )),
2209 OffsetBuffer::from_lengths([3, 1]),
2210 left_struct.clone(),
2211 None,
2212 ));
2213
2214 let left_list_struct = Arc::new(StructArray::new(
2215 Fields::from(vec![Field::new(
2216 "companies",
2217 if O::IS_LARGE {
2218 DataType::LargeList(Arc::new(Field::new(
2219 "item",
2220 DataType::Struct(left_struct.fields().clone()),
2221 true,
2222 )))
2223 } else {
2224 DataType::List(Arc::new(Field::new(
2225 "item",
2226 DataType::Struct(left_struct.fields().clone()),
2227 true,
2228 )))
2229 },
2230 true,
2231 )]),
2232 vec![left_list as ArrayRef],
2233 None,
2234 ));
2235
2236 let right_company_name = Arc::new(StringArray::from(vec![
2238 "Google",
2239 "Microsoft",
2240 "Apple",
2241 "Facebook",
2242 ]));
2243 let right_struct = Arc::new(StructArray::new(
2244 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2245 vec![right_company_name],
2246 None,
2247 ));
2248 let right_list = Arc::new(GenericListArray::<O>::new(
2249 Arc::new(Field::new(
2250 "item",
2251 DataType::Struct(right_struct.fields().clone()),
2252 true,
2253 )),
2254 OffsetBuffer::from_lengths([3, 1]),
2255 right_struct.clone(),
2256 None,
2257 ));
2258
2259 let right_list_struct = Arc::new(StructArray::new(
2260 Fields::from(vec![Field::new(
2261 "companies",
2262 if O::IS_LARGE {
2263 DataType::LargeList(Arc::new(Field::new(
2264 "item",
2265 DataType::Struct(right_struct.fields().clone()),
2266 true,
2267 )))
2268 } else {
2269 DataType::List(Arc::new(Field::new(
2270 "item",
2271 DataType::Struct(right_struct.fields().clone()),
2272 true,
2273 )))
2274 },
2275 true,
2276 )]),
2277 vec![right_list as ArrayRef],
2278 None,
2279 ));
2280
2281 let target_fields = Fields::from(vec![Field::new(
2283 "companies",
2284 if O::IS_LARGE {
2285 DataType::LargeList(Arc::new(Field::new(
2286 "item",
2287 DataType::Struct(Fields::from(vec![
2288 Field::new("company_id", DataType::Int32, true),
2289 Field::new("company_name", DataType::Utf8, true),
2290 Field::new("count", DataType::Int32, true),
2291 ])),
2292 true,
2293 )))
2294 } else {
2295 DataType::List(Arc::new(Field::new(
2296 "item",
2297 DataType::Struct(Fields::from(vec![
2298 Field::new("company_id", DataType::Int32, true),
2299 Field::new("company_name", DataType::Utf8, true),
2300 Field::new("count", DataType::Int32, true),
2301 ])),
2302 true,
2303 )))
2304 },
2305 true,
2306 )]);
2307
2308 let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
2310 assert_eq!(merged_array.len(), 2);
2311 }
2312
2313 #[test]
2314 fn test_project_by_schema_list_struct_reorder() {
2315 let source_inner_struct = DataType::Struct(Fields::from(vec![
2320 Field::new("c", DataType::Utf8, true),
2321 Field::new("b", DataType::Utf8, true),
2322 Field::new("a", DataType::Utf8, true),
2323 ]));
2324 let source_schema = Arc::new(Schema::new(vec![
2325 Field::new("id", DataType::Int32, false),
2326 Field::new(
2327 "data",
2328 DataType::List(Arc::new(Field::new(
2329 "item",
2330 source_inner_struct.clone(),
2331 true,
2332 ))),
2333 true,
2334 ),
2335 ]));
2336
2337 let c_array = StringArray::from(vec!["c1", "c2"]);
2339 let b_array = StringArray::from(vec!["b1", "b2"]);
2340 let a_array = StringArray::from(vec!["a1", "a2"]);
2341 let inner_struct = StructArray::from(vec![
2342 (
2343 Arc::new(Field::new("c", DataType::Utf8, true)),
2344 Arc::new(c_array) as ArrayRef,
2345 ),
2346 (
2347 Arc::new(Field::new("b", DataType::Utf8, true)),
2348 Arc::new(b_array) as ArrayRef,
2349 ),
2350 (
2351 Arc::new(Field::new("a", DataType::Utf8, true)),
2352 Arc::new(a_array) as ArrayRef,
2353 ),
2354 ]);
2355
2356 let list_array = ListArray::new(
2357 Arc::new(Field::new("item", source_inner_struct, true)),
2358 OffsetBuffer::from_lengths([1, 1]),
2359 Arc::new(inner_struct),
2360 None,
2361 );
2362
2363 let batch = RecordBatch::try_new(
2364 source_schema,
2365 vec![Arc::new(Int32Array::from(vec![1, 2])), Arc::new(list_array)],
2366 )
2367 .unwrap();
2368
2369 let target_inner_struct = DataType::Struct(Fields::from(vec![
2371 Field::new("a", DataType::Utf8, true),
2372 Field::new("b", DataType::Utf8, true),
2373 Field::new("c", DataType::Utf8, true),
2374 ]));
2375 let target_schema = Schema::new(vec![
2376 Field::new("id", DataType::Int32, false),
2377 Field::new(
2378 "data",
2379 DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2380 true,
2381 ),
2382 ]);
2383
2384 let projected = batch.project_by_schema(&target_schema).unwrap();
2386
2387 assert_eq!(projected.schema().as_ref(), &target_schema);
2389
2390 let projected_list = projected.column(1).as_list::<i32>();
2392 let projected_struct = projected_list.values().as_struct();
2393
2394 assert_eq!(
2396 projected_struct.column_by_name("a").unwrap().as_ref(),
2397 &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2398 );
2399 assert_eq!(
2400 projected_struct.column_by_name("b").unwrap().as_ref(),
2401 &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2402 );
2403 assert_eq!(
2404 projected_struct.column_by_name("c").unwrap().as_ref(),
2405 &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2406 );
2407
2408 assert_eq!(
2410 projected_struct.column(0).as_ref(),
2411 &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2412 );
2413 assert_eq!(
2414 projected_struct.column(1).as_ref(),
2415 &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2416 );
2417 assert_eq!(
2418 projected_struct.column(2).as_ref(),
2419 &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2420 );
2421 }
2422
2423 #[test]
2424 fn test_project_by_schema_nested_list_struct() {
2425 let inner_struct = DataType::Struct(Fields::from(vec![
2427 Field::new("y", DataType::Int32, true),
2428 Field::new("x", DataType::Int32, true),
2429 ]));
2430 let source_schema = Arc::new(Schema::new(vec![Field::new(
2431 "outer",
2432 DataType::List(Arc::new(Field::new(
2433 "item",
2434 DataType::Struct(Fields::from(vec![
2435 Field::new("b", DataType::Utf8, true),
2436 Field::new(
2437 "inner_list",
2438 DataType::List(Arc::new(Field::new("item", inner_struct.clone(), true))),
2439 true,
2440 ),
2441 Field::new("a", DataType::Utf8, true),
2442 ])),
2443 true,
2444 ))),
2445 true,
2446 )]));
2447
2448 let y_array = Int32Array::from(vec![1, 2]);
2450 let x_array = Int32Array::from(vec![3, 4]);
2451 let innermost_struct = StructArray::from(vec![
2452 (
2453 Arc::new(Field::new("y", DataType::Int32, true)),
2454 Arc::new(y_array) as ArrayRef,
2455 ),
2456 (
2457 Arc::new(Field::new("x", DataType::Int32, true)),
2458 Arc::new(x_array) as ArrayRef,
2459 ),
2460 ]);
2461 let inner_list = ListArray::new(
2462 Arc::new(Field::new("item", inner_struct.clone(), true)),
2463 OffsetBuffer::from_lengths([2]),
2464 Arc::new(innermost_struct),
2465 None,
2466 );
2467
2468 let b_array = StringArray::from(vec!["b1"]);
2469 let a_array = StringArray::from(vec!["a1"]);
2470 let middle_struct = StructArray::from(vec![
2471 (
2472 Arc::new(Field::new("b", DataType::Utf8, true)),
2473 Arc::new(b_array) as ArrayRef,
2474 ),
2475 (
2476 Arc::new(Field::new(
2477 "inner_list",
2478 DataType::List(Arc::new(Field::new("item", inner_struct, true))),
2479 true,
2480 )),
2481 Arc::new(inner_list) as ArrayRef,
2482 ),
2483 (
2484 Arc::new(Field::new("a", DataType::Utf8, true)),
2485 Arc::new(a_array) as ArrayRef,
2486 ),
2487 ]);
2488
2489 let outer_list = ListArray::new(
2490 Arc::new(Field::new("item", middle_struct.data_type().clone(), true)),
2491 OffsetBuffer::from_lengths([1]),
2492 Arc::new(middle_struct),
2493 None,
2494 );
2495
2496 let batch =
2497 RecordBatch::try_new(source_schema, vec![Arc::new(outer_list) as ArrayRef]).unwrap();
2498
2499 let target_inner_struct = DataType::Struct(Fields::from(vec![
2501 Field::new("x", DataType::Int32, true), Field::new("y", DataType::Int32, true),
2503 ]));
2504 let target_schema = Schema::new(vec![Field::new(
2505 "outer",
2506 DataType::List(Arc::new(Field::new(
2507 "item",
2508 DataType::Struct(Fields::from(vec![
2509 Field::new("a", DataType::Utf8, true), Field::new(
2511 "inner_list",
2512 DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2513 true,
2514 ),
2515 Field::new("b", DataType::Utf8, true),
2516 ])),
2517 true,
2518 ))),
2519 true,
2520 )]);
2521
2522 let projected = batch.project_by_schema(&target_schema).unwrap();
2523
2524 assert_eq!(projected.schema().as_ref(), &target_schema);
2526
2527 let outer_list = projected.column(0).as_list::<i32>();
2529 let middle_struct = outer_list.values().as_struct();
2530
2531 assert_eq!(
2533 middle_struct.column(0).as_ref(),
2534 &StringArray::from(vec!["a1"]) as &dyn Array
2535 );
2536 assert_eq!(
2537 middle_struct.column(2).as_ref(),
2538 &StringArray::from(vec!["b1"]) as &dyn Array
2539 );
2540
2541 let inner_list = middle_struct.column(1).as_list::<i32>();
2543 let innermost_struct = inner_list.values().as_struct();
2544 assert_eq!(
2545 innermost_struct.column(0).as_ref(),
2546 &Int32Array::from(vec![3, 4]) as &dyn Array
2547 );
2548 assert_eq!(
2549 innermost_struct.column(1).as_ref(),
2550 &Int32Array::from(vec![1, 2]) as &dyn Array
2551 );
2552 }
2553}