1use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12 cast::AsArray, Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray,
13 GenericListArray, LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
14 StructArray, UInt32Array, UInt8Array,
15};
16use arrow_array::{
17 new_null_array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30pub use floats::*;
31pub mod cast;
32pub mod json;
33pub mod list;
34pub mod memory;
35pub mod r#struct;
36
37pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
39
40pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
42
43pub const BLOB_META_KEY: &str = "lance-encoding:blob";
46
47type Result<T> = std::result::Result<T, ArrowError>;
48
49pub trait DataTypeExt {
50 fn is_binary_like(&self) -> bool;
63
64 fn is_struct(&self) -> bool;
66
67 fn is_fixed_stride(&self) -> bool;
72
73 fn is_dictionary(&self) -> bool;
75
76 fn byte_width(&self) -> usize;
79
80 fn byte_width_opt(&self) -> Option<usize>;
83}
84
85impl DataTypeExt for DataType {
86 fn is_binary_like(&self) -> bool {
87 use DataType::*;
88 matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
89 }
90
91 fn is_struct(&self) -> bool {
92 matches!(self, Self::Struct(_))
93 }
94
95 fn is_fixed_stride(&self) -> bool {
96 use DataType::*;
97 matches!(
98 self,
99 Boolean
100 | UInt8
101 | UInt16
102 | UInt32
103 | UInt64
104 | Int8
105 | Int16
106 | Int32
107 | Int64
108 | Float16
109 | Float32
110 | Float64
111 | Decimal128(_, _)
112 | Decimal256(_, _)
113 | FixedSizeList(_, _)
114 | FixedSizeBinary(_)
115 | Duration(_)
116 | Timestamp(_, _)
117 | Date32
118 | Date64
119 | Time32(_)
120 | Time64(_)
121 )
122 }
123
124 fn is_dictionary(&self) -> bool {
125 matches!(self, Self::Dictionary(_, _))
126 }
127
128 fn byte_width_opt(&self) -> Option<usize> {
129 match self {
130 Self::Int8 => Some(1),
131 Self::Int16 => Some(2),
132 Self::Int32 => Some(4),
133 Self::Int64 => Some(8),
134 Self::UInt8 => Some(1),
135 Self::UInt16 => Some(2),
136 Self::UInt32 => Some(4),
137 Self::UInt64 => Some(8),
138 Self::Float16 => Some(2),
139 Self::Float32 => Some(4),
140 Self::Float64 => Some(8),
141 Self::Date32 => Some(4),
142 Self::Date64 => Some(8),
143 Self::Time32(_) => Some(4),
144 Self::Time64(_) => Some(8),
145 Self::Timestamp(_, _) => Some(8),
146 Self::Duration(_) => Some(8),
147 Self::Decimal128(_, _) => Some(16),
148 Self::Decimal256(_, _) => Some(32),
149 Self::Interval(unit) => match unit {
150 IntervalUnit::YearMonth => Some(4),
151 IntervalUnit::DayTime => Some(8),
152 IntervalUnit::MonthDayNano => Some(16),
153 },
154 Self::FixedSizeBinary(s) => Some(*s as usize),
155 Self::FixedSizeList(dt, s) => dt
156 .data_type()
157 .byte_width_opt()
158 .map(|width| width * *s as usize),
159 _ => None,
160 }
161 }
162
163 fn byte_width(&self) -> usize {
164 self.byte_width_opt()
165 .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
166 }
167}
168
169pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
187 values: T,
188 offsets: &PrimitiveArray<Offset>,
189) -> Result<GenericListArray<Offset::Native>>
190where
191 Offset::Native: OffsetSizeTrait,
192{
193 let data_type = if Offset::Native::IS_LARGE {
194 DataType::LargeList(Arc::new(Field::new(
195 "item",
196 values.data_type().clone(),
197 true,
198 )))
199 } else {
200 DataType::List(Arc::new(Field::new(
201 "item",
202 values.data_type().clone(),
203 true,
204 )))
205 };
206 let data = ArrayDataBuilder::new(data_type)
207 .len(offsets.len() - 1)
208 .add_buffer(offsets.into_data().buffers()[0].clone())
209 .add_child_data(values.into_data())
210 .build()?;
211
212 Ok(GenericListArray::from(data))
213}
214
215pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
216 DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
217}
218
219pub trait FixedSizeListArrayExt {
220 fn try_new_from_values<T: Array + 'static>(
239 values: T,
240 list_size: i32,
241 ) -> Result<FixedSizeListArray>;
242
243 fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
257
258 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
261}
262
263impl FixedSizeListArrayExt for FixedSizeListArray {
264 fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
265 let field = Arc::new(Field::new("item", values.data_type().clone(), true));
266 let values = Arc::new(values);
267
268 Self::try_new(field, list_size, values, None)
269 }
270
271 fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
272 if n >= self.len() {
273 return Ok(self.clone());
274 }
275 let mut rng = SmallRng::from_os_rng();
276 let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
277 take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
278 }
279
280 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
281 match self.data_type() {
282 DataType::FixedSizeList(field, size) => match field.data_type() {
283 DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
284 DataType::Int8 => Ok(Self::new(
285 Arc::new(arrow_schema::Field::new(
286 field.name(),
287 DataType::Float32,
288 field.is_nullable(),
289 )),
290 *size,
291 Arc::new(Float32Array::from_iter_values(
292 self.values()
293 .as_any()
294 .downcast_ref::<Int8Array>()
295 .ok_or(ArrowError::ParseError(
296 "Fail to cast primitive array to Int8Type".to_string(),
297 ))?
298 .into_iter()
299 .filter_map(|x| x.map(|y| y as f32)),
300 )),
301 self.nulls().cloned(),
302 )),
303 DataType::Int16 => Ok(Self::new(
304 Arc::new(arrow_schema::Field::new(
305 field.name(),
306 DataType::Float32,
307 field.is_nullable(),
308 )),
309 *size,
310 Arc::new(Float32Array::from_iter_values(
311 self.values()
312 .as_any()
313 .downcast_ref::<Int16Array>()
314 .ok_or(ArrowError::ParseError(
315 "Fail to cast primitive array to Int8Type".to_string(),
316 ))?
317 .into_iter()
318 .filter_map(|x| x.map(|y| y as f32)),
319 )),
320 self.nulls().cloned(),
321 )),
322 DataType::Int32 => Ok(Self::new(
323 Arc::new(arrow_schema::Field::new(
324 field.name(),
325 DataType::Float32,
326 field.is_nullable(),
327 )),
328 *size,
329 Arc::new(Float32Array::from_iter_values(
330 self.values()
331 .as_any()
332 .downcast_ref::<Int32Array>()
333 .ok_or(ArrowError::ParseError(
334 "Fail to cast primitive array to Int8Type".to_string(),
335 ))?
336 .into_iter()
337 .filter_map(|x| x.map(|y| y as f32)),
338 )),
339 self.nulls().cloned(),
340 )),
341 DataType::Int64 => Ok(Self::new(
342 Arc::new(arrow_schema::Field::new(
343 field.name(),
344 DataType::Float64,
345 field.is_nullable(),
346 )),
347 *size,
348 Arc::new(Float64Array::from_iter_values(
349 self.values()
350 .as_any()
351 .downcast_ref::<Int64Array>()
352 .ok_or(ArrowError::ParseError(
353 "Fail to cast primitive array to Int8Type".to_string(),
354 ))?
355 .into_iter()
356 .filter_map(|x| x.map(|y| y as f64)),
357 )),
358 self.nulls().cloned(),
359 )),
360 DataType::UInt8 => Ok(Self::new(
361 Arc::new(arrow_schema::Field::new(
362 field.name(),
363 DataType::Float64,
364 field.is_nullable(),
365 )),
366 *size,
367 Arc::new(Float64Array::from_iter_values(
368 self.values()
369 .as_any()
370 .downcast_ref::<UInt8Array>()
371 .ok_or(ArrowError::ParseError(
372 "Fail to cast primitive array to Int8Type".to_string(),
373 ))?
374 .into_iter()
375 .filter_map(|x| x.map(|y| y as f64)),
376 )),
377 self.nulls().cloned(),
378 )),
379 DataType::UInt32 => Ok(Self::new(
380 Arc::new(arrow_schema::Field::new(
381 field.name(),
382 DataType::Float64,
383 field.is_nullable(),
384 )),
385 *size,
386 Arc::new(Float64Array::from_iter_values(
387 self.values()
388 .as_any()
389 .downcast_ref::<UInt32Array>()
390 .ok_or(ArrowError::ParseError(
391 "Fail to cast primitive array to Int8Type".to_string(),
392 ))?
393 .into_iter()
394 .filter_map(|x| x.map(|y| y as f64)),
395 )),
396 self.nulls().cloned(),
397 )),
398 data_type => Err(ArrowError::ParseError(format!(
399 "Expect either floating type or integer got {:?}",
400 data_type
401 ))),
402 },
403 data_type => Err(ArrowError::ParseError(format!(
404 "Expect either FixedSizeList got {:?}",
405 data_type
406 ))),
407 }
408 }
409}
410
411pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
414 arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
415}
416
417pub trait FixedSizeBinaryArrayExt {
418 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
437}
438
439impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
440 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
441 let data_type = DataType::FixedSizeBinary(stride);
442 let data = ArrayDataBuilder::new(data_type)
443 .len(values.len() / stride as usize)
444 .add_buffer(values.into_data().buffers()[0].clone())
445 .build()?;
446 Ok(Self::from(data))
447 }
448}
449
450pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
451 arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
452}
453
454pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
455 match arr.data_type() {
456 DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
457 DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
458 _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
459 }
460}
461
462pub trait RecordBatchExt {
464 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
494
495 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
497
498 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
502
503 fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
548
549 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
559
560 fn drop_column(&self, name: &str) -> Result<RecordBatch>;
564
565 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
567
568 fn replace_column_schema_by_name(
570 &self,
571 name: &str,
572 new_data_type: DataType,
573 column: Arc<dyn Array>,
574 ) -> Result<RecordBatch>;
575
576 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
578
579 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
581
582 fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
584
585 fn metadata(&self) -> &HashMap<String, String>;
587
588 fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
590 let mut metadata = self.metadata().clone();
591 metadata.insert(key, value);
592 self.with_metadata(metadata)
593 }
594
595 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
597
598 fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
600
601 fn shrink_to_fit(&self) -> Result<RecordBatch>;
603}
604
605impl RecordBatchExt for RecordBatch {
606 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
607 let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
608 let mut new_columns = self.columns().to_vec();
609 new_columns.push(arr);
610 Self::try_new(new_schema, new_columns)
611 }
612
613 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
614 let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
615 let mut new_columns = self.columns().to_vec();
616 new_columns.insert(index, arr);
617 Self::try_new(new_schema, new_columns)
618 }
619
620 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
621 let schema = Arc::new(Schema::new_with_metadata(
622 arr.fields().to_vec(),
623 self.schema().metadata.clone(),
624 ));
625 let batch = Self::from(arr);
626 batch.with_schema(schema)
627 }
628
629 fn merge(&self, other: &Self) -> Result<Self> {
630 if self.num_rows() != other.num_rows() {
631 return Err(ArrowError::InvalidArgumentError(format!(
632 "Attempt to merge two RecordBatch with different sizes: {} != {}",
633 self.num_rows(),
634 other.num_rows()
635 )));
636 }
637 let left_struct_array: StructArray = self.clone().into();
638 let right_struct_array: StructArray = other.clone().into();
639 self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
640 }
641
642 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
643 if self.num_rows() != other.num_rows() {
644 return Err(ArrowError::InvalidArgumentError(format!(
645 "Attempt to merge two RecordBatch with different sizes: {} != {}",
646 self.num_rows(),
647 other.num_rows()
648 )));
649 }
650 let left_struct_array: StructArray = self.clone().into();
651 let right_struct_array: StructArray = other.clone().into();
652 self.try_new_from_struct_array(merge_with_schema(
653 &left_struct_array,
654 &right_struct_array,
655 schema.fields(),
656 ))
657 }
658
659 fn drop_column(&self, name: &str) -> Result<Self> {
660 let mut fields = vec![];
661 let mut columns = vec![];
662 for i in 0..self.schema().fields.len() {
663 if self.schema().field(i).name() != name {
664 fields.push(self.schema().field(i).clone());
665 columns.push(self.column(i).clone());
666 }
667 }
668 Self::try_new(
669 Arc::new(Schema::new_with_metadata(
670 fields,
671 self.schema().metadata().clone(),
672 )),
673 columns,
674 )
675 }
676
677 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
678 let mut fields = self.schema().fields().to_vec();
679 if index >= fields.len() {
680 return Err(ArrowError::InvalidArgumentError(format!(
681 "Index out of bounds: {}",
682 index
683 )));
684 }
685 fields[index] = Arc::new(Field::new(
686 new_name,
687 fields[index].data_type().clone(),
688 fields[index].is_nullable(),
689 ));
690 Self::try_new(
691 Arc::new(Schema::new_with_metadata(
692 fields,
693 self.schema().metadata().clone(),
694 )),
695 self.columns().to_vec(),
696 )
697 }
698
699 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
700 let mut columns = self.columns().to_vec();
701 let field_i = self
702 .schema()
703 .fields()
704 .iter()
705 .position(|f| f.name() == name)
706 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
707 columns[field_i] = column;
708 Self::try_new(self.schema(), columns)
709 }
710
711 fn replace_column_schema_by_name(
712 &self,
713 name: &str,
714 new_data_type: DataType,
715 column: Arc<dyn Array>,
716 ) -> Result<RecordBatch> {
717 let fields = self
718 .schema()
719 .fields()
720 .iter()
721 .map(|x| {
722 if x.name() != name {
723 x.clone()
724 } else {
725 let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
726 Arc::new(new_field)
727 }
728 })
729 .collect::<Vec<_>>();
730 let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
731 let mut columns = self.columns().to_vec();
732 let field_i = self
733 .schema()
734 .fields()
735 .iter()
736 .position(|f| f.name() == name)
737 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
738 columns[field_i] = column;
739 Self::try_new(Arc::new(schema), columns)
740 }
741
742 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
743 let split = name.split('.').collect::<Vec<_>>();
744 if split.is_empty() {
745 return None;
746 }
747
748 self.column_by_name(split[0])
749 .and_then(|arr| get_sub_array(arr, &split[1..]))
750 }
751
752 fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
753 let struct_array: StructArray = self.clone().into();
754 self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
755 }
756
757 fn metadata(&self) -> &HashMap<String, String> {
758 self.schema_ref().metadata()
759 }
760
761 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
762 let mut schema = self.schema_ref().as_ref().clone();
763 schema.metadata = metadata;
764 Self::try_new(schema.into(), self.columns().into())
765 }
766
767 fn take(&self, indices: &UInt32Array) -> Result<Self> {
768 let struct_array: StructArray = self.clone().into();
769 let taken = take(&struct_array, indices, None)?;
770 self.try_new_from_struct_array(taken.as_struct().clone())
771 }
772
773 fn shrink_to_fit(&self) -> Result<Self> {
774 crate::deepcopy::deep_copy_batch_sliced(self)
776 }
777}
778
779fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
780 if fields.is_empty() {
781 return Ok(StructArray::new_empty_fields(
782 struct_array.len(),
783 struct_array.nulls().cloned(),
784 ));
785 }
786 let mut columns: Vec<ArrayRef> = vec![];
787 for field in fields.iter() {
788 if let Some(col) = struct_array.column_by_name(field.name()) {
789 match field.data_type() {
790 DataType::Struct(subfields) => {
792 let projected = project(col.as_struct(), subfields)?;
793 columns.push(Arc::new(projected));
794 }
795 _ => {
796 columns.push(col.clone());
797 }
798 }
799 } else {
800 return Err(ArrowError::SchemaError(format!(
801 "field {} does not exist in the RecordBatch",
802 field.name()
803 )));
804 }
805 }
806 StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
808}
809
810fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
811 let left_list: &GenericListArray<T> = left.as_list();
812 let right_list: &GenericListArray<T> = right.as_list();
813 left_list.offsets().inner() == right_list.offsets().inner()
814}
815
816fn merge_list_structs_helper<T: OffsetSizeTrait>(
817 left: &dyn Array,
818 right: &dyn Array,
819 items_field_name: impl Into<String>,
820 items_nullable: bool,
821) -> Arc<dyn Array> {
822 let left_list: &GenericListArray<T> = left.as_list();
823 let right_list: &GenericListArray<T> = right.as_list();
824 let left_struct = left_list.values();
825 let right_struct = right_list.values();
826 let left_struct_arr = left_struct.as_struct();
827 let right_struct_arr = right_struct.as_struct();
828 let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
829 let items_field = Arc::new(Field::new(
830 items_field_name,
831 merged_items.data_type().clone(),
832 items_nullable,
833 ));
834 Arc::new(GenericListArray::<T>::new(
835 items_field,
836 left_list.offsets().clone(),
837 merged_items,
838 left_list.nulls().cloned(),
839 ))
840}
841
842fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
843 left: &dyn Array,
844 right: &dyn Array,
845 not_null: &dyn Array,
846 items_field_name: impl Into<String>,
847) -> Arc<dyn Array> {
848 let left_list: &GenericListArray<T> = left.as_list::<T>();
849 let not_null_list = not_null.as_list::<T>();
850 let right_list = right.as_list::<T>();
851
852 let left_struct = left_list.values().as_struct();
853 let not_null_struct: &StructArray = not_null_list.values().as_struct();
854 let right_struct = right_list.values().as_struct();
855
856 let values_len = not_null_list.values().len();
857 let mut merged_fields =
858 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
859 let mut merged_columns =
860 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
861
862 for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
863 merged_fields.push(field.clone());
864 if let Some(val) = not_null_struct.column_by_name(field.name()) {
865 merged_columns.push(val.clone());
866 } else {
867 merged_columns.push(new_null_array(field.data_type(), values_len))
868 }
869 }
870 for (_, field) in right_struct
871 .columns()
872 .iter()
873 .zip(right_struct.fields())
874 .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
875 {
876 merged_fields.push(field.clone());
877 if let Some(val) = not_null_struct.column_by_name(field.name()) {
878 merged_columns.push(val.clone());
879 } else {
880 merged_columns.push(new_null_array(field.data_type(), values_len));
881 }
882 }
883
884 let merged_struct = Arc::new(StructArray::new(
885 Fields::from(merged_fields),
886 merged_columns,
887 not_null_struct.nulls().cloned(),
888 ));
889 let items_field = Arc::new(Field::new(
890 items_field_name,
891 merged_struct.data_type().clone(),
892 true,
893 ));
894 Arc::new(GenericListArray::<T>::new(
895 items_field,
896 not_null_list.offsets().clone(),
897 merged_struct,
898 not_null_list.nulls().cloned(),
899 ))
900}
901
902fn merge_list_struct_null(
903 left: &dyn Array,
904 right: &dyn Array,
905 not_null: &dyn Array,
906) -> Arc<dyn Array> {
907 match left.data_type() {
908 DataType::List(left_field) => {
909 merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
910 }
911 DataType::LargeList(left_field) => {
912 merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
913 }
914 _ => unreachable!(),
915 }
916}
917
918fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
919 if left.null_count() == left.len() {
923 return merge_list_struct_null(left, right, right);
924 } else if right.null_count() == right.len() {
925 return merge_list_struct_null(left, right, left);
926 }
927 match (left.data_type(), right.data_type()) {
928 (DataType::List(left_field), DataType::List(_)) => {
929 if !lists_have_same_offsets_helper::<i32>(left, right) {
930 panic!("Attempt to merge list struct arrays which do not have same offsets");
931 }
932 merge_list_structs_helper::<i32>(
933 left,
934 right,
935 left_field.name(),
936 left_field.is_nullable(),
937 )
938 }
939 (DataType::LargeList(left_field), DataType::LargeList(_)) => {
940 if !lists_have_same_offsets_helper::<i64>(left, right) {
941 panic!("Attempt to merge list struct arrays which do not have same offsets");
942 }
943 merge_list_structs_helper::<i64>(
944 left,
945 right,
946 left_field.name(),
947 left_field.is_nullable(),
948 )
949 }
950 _ => unreachable!(),
951 }
952}
953
954fn normalize_validity(
957 validity: Option<&arrow_buffer::NullBuffer>,
958) -> Option<&arrow_buffer::NullBuffer> {
959 validity.and_then(|v| {
960 if v.null_count() == v.len() {
961 None
962 } else {
963 Some(v)
964 }
965 })
966}
967
968fn merge_struct_validity(
973 left_validity: Option<&arrow_buffer::NullBuffer>,
974 right_validity: Option<&arrow_buffer::NullBuffer>,
975) -> Option<arrow_buffer::NullBuffer> {
976 let left_normalized = normalize_validity(left_validity);
978 let right_normalized = normalize_validity(right_validity);
979
980 match (left_normalized, right_normalized) {
981 (None, None) => None,
983 (Some(left), None) => Some(left.clone()),
984 (None, Some(right)) => Some(right.clone()),
985 (Some(left), Some(right)) => {
986 if left.null_count() == 0 && right.null_count() == 0 {
988 return Some(left.clone());
989 }
990
991 let left_buffer = left.inner();
992 let right_buffer = right.inner();
993
994 let merged_buffer = left_buffer | right_buffer;
997
998 Some(arrow_buffer::NullBuffer::from(merged_buffer))
999 }
1000 }
1001}
1002
1003fn merge_list_child_values(
1004 child_field: &Field,
1005 left_values: ArrayRef,
1006 right_values: ArrayRef,
1007) -> ArrayRef {
1008 match child_field.data_type() {
1009 DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1010 left_values.as_struct(),
1011 right_values.as_struct(),
1012 child_fields,
1013 )) as ArrayRef,
1014 DataType::List(grandchild) => {
1015 let left_list = left_values
1016 .as_any()
1017 .downcast_ref::<ListArray>()
1018 .expect("left list values should be ListArray");
1019 let right_list = right_values
1020 .as_any()
1021 .downcast_ref::<ListArray>()
1022 .expect("right list values should be ListArray");
1023 let merged_values = merge_list_child_values(
1024 grandchild.as_ref(),
1025 left_list.values().clone(),
1026 right_list.values().clone(),
1027 );
1028 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1029 Arc::new(ListArray::new(
1030 grandchild.clone(),
1031 left_list.offsets().clone(),
1032 merged_values,
1033 merged_validity,
1034 )) as ArrayRef
1035 }
1036 DataType::LargeList(grandchild) => {
1037 let left_list = left_values
1038 .as_any()
1039 .downcast_ref::<LargeListArray>()
1040 .expect("left list values should be LargeListArray");
1041 let right_list = right_values
1042 .as_any()
1043 .downcast_ref::<LargeListArray>()
1044 .expect("right list values should be LargeListArray");
1045 let merged_values = merge_list_child_values(
1046 grandchild.as_ref(),
1047 left_list.values().clone(),
1048 right_list.values().clone(),
1049 );
1050 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1051 Arc::new(LargeListArray::new(
1052 grandchild.clone(),
1053 left_list.offsets().clone(),
1054 merged_values,
1055 merged_validity,
1056 )) as ArrayRef
1057 }
1058 DataType::FixedSizeList(grandchild, list_size) => {
1059 let left_list = left_values
1060 .as_any()
1061 .downcast_ref::<FixedSizeListArray>()
1062 .expect("left list values should be FixedSizeListArray");
1063 let right_list = right_values
1064 .as_any()
1065 .downcast_ref::<FixedSizeListArray>()
1066 .expect("right list values should be FixedSizeListArray");
1067 let merged_values = merge_list_child_values(
1068 grandchild.as_ref(),
1069 left_list.values().clone(),
1070 right_list.values().clone(),
1071 );
1072 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1073 Arc::new(FixedSizeListArray::new(
1074 grandchild.clone(),
1075 *list_size,
1076 merged_values,
1077 merged_validity,
1078 )) as ArrayRef
1079 }
1080 _ => left_values.clone(),
1081 }
1082}
1083
1084fn adjust_child_validity(
1088 child: &ArrayRef,
1089 parent_validity: Option<&arrow_buffer::NullBuffer>,
1090) -> ArrayRef {
1091 let parent_validity = match parent_validity {
1093 None => return child.clone(),
1094 Some(p) if p.null_count() == 0 => return child.clone(), Some(p) => p,
1096 };
1097
1098 let child_validity = child.nulls();
1099
1100 let new_validity = match child_validity {
1102 None => {
1103 parent_validity.clone()
1105 }
1106 Some(child_nulls) => {
1107 let child_buffer = child_nulls.inner();
1108 let parent_buffer = parent_validity.inner();
1109
1110 let merged_buffer = child_buffer & parent_buffer;
1113
1114 arrow_buffer::NullBuffer::from(merged_buffer)
1115 }
1116 };
1117
1118 arrow_array::make_array(
1120 arrow_data::ArrayData::try_new(
1121 child.data_type().clone(),
1122 child.len(),
1123 Some(new_validity.into_inner().into_inner()),
1124 child.offset(),
1125 child.to_data().buffers().to_vec(),
1126 child.to_data().child_data().to_vec(),
1127 )
1128 .unwrap(),
1129 )
1130}
1131
1132fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1133 let mut fields: Vec<Field> = vec![];
1134 let mut columns: Vec<ArrayRef> = vec![];
1135 let right_fields = right_struct_array.fields();
1136 let right_columns = right_struct_array.columns();
1137
1138 let left_validity = left_struct_array.nulls();
1140 let right_validity = right_struct_array.nulls();
1141
1142 let merged_validity = merge_struct_validity(left_validity, right_validity);
1144
1145 for (left_field, left_column) in left_struct_array
1147 .fields()
1148 .iter()
1149 .zip(left_struct_array.columns().iter())
1150 {
1151 match right_fields
1152 .iter()
1153 .position(|f| f.name() == left_field.name())
1154 {
1155 Some(right_index) => {
1157 let right_field = right_fields.get(right_index).unwrap();
1158 let right_column = right_columns.get(right_index).unwrap();
1159 match (left_field.data_type(), right_field.data_type()) {
1161 (DataType::Struct(_), DataType::Struct(_)) => {
1162 let left_sub_array = left_column.as_struct();
1163 let right_sub_array = right_column.as_struct();
1164 let merged_sub_array = merge(left_sub_array, right_sub_array);
1165 fields.push(Field::new(
1166 left_field.name(),
1167 merged_sub_array.data_type().clone(),
1168 left_field.is_nullable(),
1169 ));
1170 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1171 }
1172 (DataType::List(left_list), DataType::List(right_list))
1173 if left_list.data_type().is_struct()
1174 && right_list.data_type().is_struct() =>
1175 {
1176 if left_list.data_type() == right_list.data_type() {
1178 fields.push(left_field.as_ref().clone());
1179 columns.push(left_column.clone());
1180 }
1181 let merged_sub_array = merge_list_struct(&left_column, &right_column);
1185
1186 fields.push(Field::new(
1187 left_field.name(),
1188 merged_sub_array.data_type().clone(),
1189 left_field.is_nullable(),
1190 ));
1191 columns.push(merged_sub_array);
1192 }
1193 _ => {
1195 fields.push(left_field.as_ref().clone());
1197 let adjusted_column = adjust_child_validity(left_column, left_validity);
1199 columns.push(adjusted_column);
1200 }
1201 }
1202 }
1203 None => {
1204 fields.push(left_field.as_ref().clone());
1205 let adjusted_column = adjust_child_validity(left_column, left_validity);
1207 columns.push(adjusted_column);
1208 }
1209 }
1210 }
1211
1212 right_fields
1214 .iter()
1215 .zip(right_columns.iter())
1216 .for_each(|(field, column)| {
1217 if !left_struct_array
1219 .fields()
1220 .iter()
1221 .any(|f| f.name() == field.name())
1222 {
1223 fields.push(field.as_ref().clone());
1224 let adjusted_column = adjust_child_validity(column, right_validity);
1227 columns.push(adjusted_column);
1228 }
1229 });
1230
1231 StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1232}
1233
1234fn merge_with_schema(
1235 left_struct_array: &StructArray,
1236 right_struct_array: &StructArray,
1237 fields: &Fields,
1238) -> StructArray {
1239 fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1241 match (left, right) {
1242 (DataType::Struct(_), DataType::Struct(_)) => true,
1243 (DataType::Struct(_), _) => false,
1244 (_, DataType::Struct(_)) => false,
1245 _ => true,
1246 }
1247 }
1248
1249 let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1250 let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1251
1252 let left_fields = left_struct_array.fields();
1253 let left_columns = left_struct_array.columns();
1254 let right_fields = right_struct_array.fields();
1255 let right_columns = right_struct_array.columns();
1256
1257 let left_validity = left_struct_array.nulls();
1259 let right_validity = right_struct_array.nulls();
1260
1261 let merged_validity = merge_struct_validity(left_validity, right_validity);
1263
1264 for field in fields {
1265 let left_match_idx = left_fields.iter().position(|f| {
1266 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1267 });
1268 let right_match_idx = right_fields.iter().position(|f| {
1269 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1270 });
1271
1272 match (left_match_idx, right_match_idx) {
1273 (None, Some(right_idx)) => {
1274 output_fields.push(right_fields[right_idx].as_ref().clone());
1275 let adjusted_column =
1277 adjust_child_validity(&right_columns[right_idx], right_validity);
1278 columns.push(adjusted_column);
1279 }
1280 (Some(left_idx), None) => {
1281 output_fields.push(left_fields[left_idx].as_ref().clone());
1282 let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1284 columns.push(adjusted_column);
1285 }
1286 (Some(left_idx), Some(right_idx)) => {
1287 match field.data_type() {
1288 DataType::Struct(child_fields) => {
1289 let left_sub_array = left_columns[left_idx].as_struct();
1290 let right_sub_array = right_columns[right_idx].as_struct();
1291 let merged_sub_array =
1292 merge_with_schema(left_sub_array, right_sub_array, child_fields);
1293 output_fields.push(Field::new(
1294 field.name(),
1295 merged_sub_array.data_type().clone(),
1296 field.is_nullable(),
1297 ));
1298 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1299 }
1300 DataType::List(child_field) => {
1301 let left_list = left_columns[left_idx]
1302 .as_any()
1303 .downcast_ref::<ListArray>()
1304 .unwrap();
1305 let right_list = right_columns[right_idx]
1306 .as_any()
1307 .downcast_ref::<ListArray>()
1308 .unwrap();
1309 let merged_values = merge_list_child_values(
1310 child_field.as_ref(),
1311 left_list.values().clone(),
1312 right_list.values().clone(),
1313 );
1314 let merged_validity =
1315 merge_struct_validity(left_list.nulls(), right_list.nulls());
1316 let merged_list = ListArray::new(
1317 child_field.clone(),
1318 left_list.offsets().clone(),
1319 merged_values,
1320 merged_validity,
1321 );
1322 output_fields.push(field.as_ref().clone());
1323 columns.push(Arc::new(merged_list) as ArrayRef);
1324 }
1325 DataType::LargeList(child_field) => {
1326 let left_list = left_columns[left_idx]
1327 .as_any()
1328 .downcast_ref::<LargeListArray>()
1329 .unwrap();
1330 let right_list = right_columns[right_idx]
1331 .as_any()
1332 .downcast_ref::<LargeListArray>()
1333 .unwrap();
1334 let merged_values = merge_list_child_values(
1335 child_field.as_ref(),
1336 left_list.values().clone(),
1337 right_list.values().clone(),
1338 );
1339 let merged_validity =
1340 merge_struct_validity(left_list.nulls(), right_list.nulls());
1341 let merged_list = LargeListArray::new(
1342 child_field.clone(),
1343 left_list.offsets().clone(),
1344 merged_values,
1345 merged_validity,
1346 );
1347 output_fields.push(field.as_ref().clone());
1348 columns.push(Arc::new(merged_list) as ArrayRef);
1349 }
1350 DataType::FixedSizeList(child_field, list_size) => {
1351 let left_list = left_columns[left_idx]
1352 .as_any()
1353 .downcast_ref::<FixedSizeListArray>()
1354 .unwrap();
1355 let right_list = right_columns[right_idx]
1356 .as_any()
1357 .downcast_ref::<FixedSizeListArray>()
1358 .unwrap();
1359 let merged_values = merge_list_child_values(
1360 child_field.as_ref(),
1361 left_list.values().clone(),
1362 right_list.values().clone(),
1363 );
1364 let merged_validity =
1365 merge_struct_validity(left_list.nulls(), right_list.nulls());
1366 let merged_list = FixedSizeListArray::new(
1367 child_field.clone(),
1368 *list_size,
1369 merged_values,
1370 merged_validity,
1371 );
1372 output_fields.push(field.as_ref().clone());
1373 columns.push(Arc::new(merged_list) as ArrayRef);
1374 }
1375 _ => {
1376 output_fields.push(left_fields[left_idx].as_ref().clone());
1377 let adjusted_column =
1379 adjust_child_validity(&left_columns[left_idx], left_validity);
1380 columns.push(adjusted_column);
1381 }
1382 }
1383 }
1384 (None, None) => {
1385 }
1387 }
1388 }
1389
1390 StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1391}
1392
1393fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1394 if components.is_empty() {
1395 return Some(array);
1396 }
1397 if !matches!(array.data_type(), DataType::Struct(_)) {
1398 return None;
1399 }
1400 let struct_arr = array.as_struct();
1401 struct_arr
1402 .column_by_name(components[0])
1403 .and_then(|arr| get_sub_array(arr, &components[1..]))
1404}
1405
1406pub fn interleave_batches(
1410 batches: &[RecordBatch],
1411 indices: &[(usize, usize)],
1412) -> Result<RecordBatch> {
1413 let first_batch = batches.first().ok_or_else(|| {
1414 ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1415 })?;
1416 let schema = first_batch.schema();
1417 let num_columns = first_batch.num_columns();
1418 let mut columns = Vec::with_capacity(num_columns);
1419 let mut chunks = Vec::with_capacity(batches.len());
1420
1421 for i in 0..num_columns {
1422 for batch in batches {
1423 chunks.push(batch.column(i).as_ref());
1424 }
1425 let new_column = interleave(&chunks, indices)?;
1426 columns.push(new_column);
1427 chunks.clear();
1428 }
1429
1430 RecordBatch::try_new(schema, columns)
1431}
1432
1433pub trait BufferExt {
1434 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1449
1450 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1459}
1460
1461fn is_pwr_two(n: u64) -> bool {
1462 n & (n - 1) == 0
1463}
1464
1465impl BufferExt for arrow_buffer::Buffer {
1466 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1467 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1468 {
1469 let size_bytes = bytes.len();
1471 Self::copy_bytes_bytes(bytes, size_bytes)
1472 } else {
1473 unsafe {
1476 Self::from_custom_allocation(
1477 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1478 bytes.len(),
1479 Arc::new(bytes),
1480 )
1481 }
1482 }
1483 }
1484
1485 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1486 assert!(size_bytes >= bytes.len());
1487 let mut buf = MutableBuffer::with_capacity(size_bytes);
1488 let to_fill = size_bytes - bytes.len();
1489 buf.extend(bytes);
1490 buf.extend(std::iter::repeat_n(0_u8, to_fill));
1491
1492 buf.shrink_to_fit();
1495
1496 Self::from(buf)
1497 }
1498}
1499
1500#[cfg(test)]
1501mod tests {
1502 use super::*;
1503 use arrow_array::{new_empty_array, new_null_array, ListArray, StringArray};
1504 use arrow_array::{Float32Array, Int32Array, StructArray};
1505 use arrow_buffer::OffsetBuffer;
1506
1507 #[test]
1508 fn test_merge_recursive() {
1509 let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1510 let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1511 let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1512 let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1513
1514 let left_schema = Schema::new(vec![
1515 Field::new("a", DataType::Int32, true),
1516 Field::new(
1517 "b",
1518 DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1519 true,
1520 ),
1521 ]);
1522 let left_batch = RecordBatch::try_new(
1523 Arc::new(left_schema),
1524 vec![
1525 Arc::new(a_array.clone()),
1526 Arc::new(StructArray::from(vec![(
1527 Arc::new(Field::new("c", DataType::Int32, true)),
1528 Arc::new(c_array.clone()) as ArrayRef,
1529 )])),
1530 ],
1531 )
1532 .unwrap();
1533
1534 let right_schema = Schema::new(vec![
1535 Field::new("e", DataType::Int32, true),
1536 Field::new(
1537 "b",
1538 DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1539 true,
1540 ),
1541 ]);
1542 let right_batch = RecordBatch::try_new(
1543 Arc::new(right_schema),
1544 vec![
1545 Arc::new(e_array.clone()),
1546 Arc::new(StructArray::from(vec![(
1547 Arc::new(Field::new("d", DataType::Utf8, true)),
1548 Arc::new(d_array.clone()) as ArrayRef,
1549 )])) as ArrayRef,
1550 ],
1551 )
1552 .unwrap();
1553
1554 let merged_schema = Schema::new(vec![
1555 Field::new("a", DataType::Int32, true),
1556 Field::new(
1557 "b",
1558 DataType::Struct(
1559 vec![
1560 Field::new("c", DataType::Int32, true),
1561 Field::new("d", DataType::Utf8, true),
1562 ]
1563 .into(),
1564 ),
1565 true,
1566 ),
1567 Field::new("e", DataType::Int32, true),
1568 ]);
1569 let merged_batch = RecordBatch::try_new(
1570 Arc::new(merged_schema),
1571 vec![
1572 Arc::new(a_array) as ArrayRef,
1573 Arc::new(StructArray::from(vec![
1574 (
1575 Arc::new(Field::new("c", DataType::Int32, true)),
1576 Arc::new(c_array) as ArrayRef,
1577 ),
1578 (
1579 Arc::new(Field::new("d", DataType::Utf8, true)),
1580 Arc::new(d_array) as ArrayRef,
1581 ),
1582 ])) as ArrayRef,
1583 Arc::new(e_array) as ArrayRef,
1584 ],
1585 )
1586 .unwrap();
1587
1588 let result = left_batch.merge(&right_batch).unwrap();
1589 assert_eq!(result, merged_batch);
1590 }
1591
1592 #[test]
1593 fn test_merge_with_schema() {
1594 fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1595 let fields: Fields = names
1596 .iter()
1597 .zip(types)
1598 .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1599 .collect();
1600 let schema = Schema::new(vec![Field::new(
1601 "struct",
1602 DataType::Struct(fields.clone()),
1603 false,
1604 )]);
1605 let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1606 let batch = RecordBatch::try_new(
1607 Arc::new(schema.clone()),
1608 vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1609 );
1610 (schema, batch.unwrap())
1611 }
1612
1613 let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1614 let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1615 let (output_schema, _) = test_batch(
1616 &["b", "a", "c"],
1617 &[DataType::Int64, DataType::Int32, DataType::Int32],
1618 );
1619
1620 let merged = left_batch
1622 .merge_with_schema(&right_batch, &output_schema)
1623 .unwrap();
1624 assert_eq!(merged.schema().as_ref(), &output_schema);
1625
1626 let (naive_schema, _) = test_batch(
1628 &["a", "b", "c"],
1629 &[DataType::Int32, DataType::Int64, DataType::Int32],
1630 );
1631 let merged = left_batch.merge(&right_batch).unwrap();
1632 assert_eq!(merged.schema().as_ref(), &naive_schema);
1633 }
1634
1635 #[test]
1636 fn test_merge_list_struct() {
1637 let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1638 let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1639 let x_struct_field = Arc::new(Field::new(
1640 "item",
1641 DataType::Struct(Fields::from(vec![x_field.clone()])),
1642 true,
1643 ));
1644 let y_struct_field = Arc::new(Field::new(
1645 "item",
1646 DataType::Struct(Fields::from(vec![y_field.clone()])),
1647 true,
1648 ));
1649 let both_struct_field = Arc::new(Field::new(
1650 "item",
1651 DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1652 true,
1653 ));
1654 let left_schema = Schema::new(vec![Field::new(
1655 "list_struct",
1656 DataType::List(x_struct_field.clone()),
1657 true,
1658 )]);
1659 let right_schema = Schema::new(vec![Field::new(
1660 "list_struct",
1661 DataType::List(y_struct_field.clone()),
1662 true,
1663 )]);
1664 let both_schema = Schema::new(vec![Field::new(
1665 "list_struct",
1666 DataType::List(both_struct_field.clone()),
1667 true,
1668 )]);
1669
1670 let x = Arc::new(Int32Array::from(vec![1]));
1671 let y = Arc::new(Int32Array::from(vec![2]));
1672 let x_struct = Arc::new(StructArray::new(
1673 Fields::from(vec![x_field.clone()]),
1674 vec![x.clone()],
1675 None,
1676 ));
1677 let y_struct = Arc::new(StructArray::new(
1678 Fields::from(vec![y_field.clone()]),
1679 vec![y.clone()],
1680 None,
1681 ));
1682 let both_struct = Arc::new(StructArray::new(
1683 Fields::from(vec![x_field.clone(), y_field.clone()]),
1684 vec![x.clone(), y],
1685 None,
1686 ));
1687 let both_null_struct = Arc::new(StructArray::new(
1688 Fields::from(vec![x_field, y_field]),
1689 vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1690 None,
1691 ));
1692 let offsets = OffsetBuffer::from_lengths([1]);
1693 let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1694 let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1695 let both_list = ListArray::new(
1696 both_struct_field.clone(),
1697 offsets.clone(),
1698 both_struct,
1699 None,
1700 );
1701 let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1702 let x_batch =
1703 RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1704 let y_batch = RecordBatch::try_new(
1705 Arc::new(right_schema.clone()),
1706 vec![Arc::new(y_s_list.clone())],
1707 )
1708 .unwrap();
1709 let merged = x_batch.merge(&y_batch).unwrap();
1710 let expected =
1711 RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1712 assert_eq!(merged, expected);
1713
1714 let y_null_list = new_null_array(y_s_list.data_type(), 1);
1715 let y_null_batch =
1716 RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1717 .unwrap();
1718 let expected =
1719 RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1720 let merged = x_batch.merge(&y_null_batch).unwrap();
1721 assert_eq!(merged, expected);
1722 }
1723
1724 #[test]
1725 fn test_byte_width_opt() {
1726 assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1727 assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1728 assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1729 assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1730 assert_eq!(DataType::Utf8.byte_width_opt(), None);
1731 assert_eq!(DataType::Binary.byte_width_opt(), None);
1732 assert_eq!(
1733 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1734 None
1735 );
1736 assert_eq!(
1737 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1738 .byte_width_opt(),
1739 Some(12)
1740 );
1741 assert_eq!(
1742 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1743 .byte_width_opt(),
1744 Some(16)
1745 );
1746 assert_eq!(
1747 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1748 .byte_width_opt(),
1749 None
1750 );
1751 }
1752
1753 #[test]
1754 fn test_take_record_batch() {
1755 let schema = Arc::new(Schema::new(vec![
1756 Field::new("a", DataType::Int32, true),
1757 Field::new("b", DataType::Utf8, true),
1758 ]));
1759 let batch = RecordBatch::try_new(
1760 schema.clone(),
1761 vec![
1762 Arc::new(Int32Array::from_iter_values(0..20)),
1763 Arc::new(StringArray::from_iter_values(
1764 (0..20).map(|i| format!("str-{}", i)),
1765 )),
1766 ],
1767 )
1768 .unwrap();
1769 let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1770 assert_eq!(
1771 taken,
1772 RecordBatch::try_new(
1773 schema,
1774 vec![
1775 Arc::new(Int32Array::from(vec![1, 5, 10])),
1776 Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1777 ],
1778 )
1779 .unwrap()
1780 )
1781 }
1782
1783 #[test]
1784 fn test_schema_project_by_schema() {
1785 let metadata = [("key".to_string(), "value".to_string())];
1786 let schema = Arc::new(
1787 Schema::new(vec![
1788 Field::new("a", DataType::Int32, true),
1789 Field::new("b", DataType::Utf8, true),
1790 ])
1791 .with_metadata(metadata.clone().into()),
1792 );
1793 let batch = RecordBatch::try_new(
1794 schema,
1795 vec![
1796 Arc::new(Int32Array::from_iter_values(0..20)),
1797 Arc::new(StringArray::from_iter_values(
1798 (0..20).map(|i| format!("str-{}", i)),
1799 )),
1800 ],
1801 )
1802 .unwrap();
1803
1804 let empty_schema = Schema::empty();
1806 let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1807 let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1808 assert_eq!(
1809 empty_projected,
1810 RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1811 .with_schema(Arc::new(expected_schema))
1812 .unwrap()
1813 );
1814
1815 let reordered_schema = Schema::new(vec![
1817 Field::new("b", DataType::Utf8, true),
1818 Field::new("a", DataType::Int32, true),
1819 ]);
1820 let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1821 let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1822 assert_eq!(
1823 reordered_projected,
1824 RecordBatch::try_new(
1825 expected_schema,
1826 vec![
1827 Arc::new(StringArray::from_iter_values(
1828 (0..20).map(|i| format!("str-{}", i)),
1829 )),
1830 Arc::new(Int32Array::from_iter_values(0..20)),
1831 ],
1832 )
1833 .unwrap()
1834 );
1835
1836 let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1838 let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1839 let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1840 assert_eq!(
1841 sub_projected,
1842 RecordBatch::try_new(
1843 expected_schema,
1844 vec![Arc::new(Int32Array::from_iter_values(0..20))],
1845 )
1846 .unwrap()
1847 );
1848 }
1849
1850 #[test]
1851 fn test_project_preserves_struct_validity() {
1852 let fields = Fields::from(vec![
1854 Field::new("id", DataType::Int32, false),
1855 Field::new("value", DataType::Float32, true),
1856 ]);
1857
1858 let id_array = Int32Array::from(vec![1, 2, 3]);
1860 let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1861 let struct_array = StructArray::new(
1862 fields.clone(),
1863 vec![
1864 Arc::new(id_array) as ArrayRef,
1865 Arc::new(value_array) as ArrayRef,
1866 ],
1867 Some(vec![true, false, true].into()), );
1869
1870 let projected = project(&struct_array, &fields).unwrap();
1872
1873 assert_eq!(projected.null_count(), 1);
1875 assert!(!projected.is_null(0));
1876 assert!(projected.is_null(1));
1877 assert!(!projected.is_null(2));
1878 }
1879
1880 #[test]
1881 fn test_merge_struct_with_different_validity() {
1882 let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1885 let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1886 let left_struct = StructArray::new(
1887 left_fields,
1888 vec![Arc::new(height_array) as ArrayRef],
1889 Some(vec![true, false, true, false].into()), );
1891
1892 let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1894 let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1895 let right_struct = StructArray::new(
1896 right_fields,
1897 vec![Arc::new(width_array) as ArrayRef],
1898 Some(vec![true, true, false, false].into()), );
1900
1901 let merged = merge(&left_struct, &right_struct);
1903
1904 assert_eq!(merged.null_count(), 1); assert!(!merged.is_null(0));
1912 assert!(!merged.is_null(1));
1913 assert!(!merged.is_null(2));
1914 assert!(merged.is_null(3));
1915
1916 let height_col = merged.column_by_name("height").unwrap();
1918 let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1919 assert_eq!(height_values.value(0), 500);
1920 assert!(height_values.is_null(1)); assert_eq!(height_values.value(2), 600);
1922
1923 let width_col = merged.column_by_name("width").unwrap();
1924 let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1925 assert_eq!(width_values.value(0), 300);
1926 assert_eq!(width_values.value(1), 200);
1927 assert!(width_values.is_null(2)); }
1929
1930 #[test]
1931 fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
1932 let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
1934 let left_count = Arc::new(Int32Array::from(vec![None, None]));
1935 let left_struct = Arc::new(StructArray::new(
1936 Fields::from(vec![
1937 Field::new("company_id", DataType::Int32, true),
1938 Field::new("count", DataType::Int32, true),
1939 ]),
1940 vec![left_company_id, left_count],
1941 None,
1942 ));
1943 let left_list = Arc::new(ListArray::new(
1944 Arc::new(Field::new(
1945 "item",
1946 DataType::Struct(left_struct.fields().clone()),
1947 true,
1948 )),
1949 OffsetBuffer::from_lengths([2]),
1950 left_struct,
1951 None,
1952 ));
1953
1954 let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
1956 let right_struct = Arc::new(StructArray::new(
1957 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
1958 vec![right_company_name],
1959 None,
1960 ));
1961 let right_list = Arc::new(ListArray::new(
1962 Arc::new(Field::new(
1963 "item",
1964 DataType::Struct(right_struct.fields().clone()),
1965 true,
1966 )),
1967 OffsetBuffer::from_lengths([2]),
1968 right_struct,
1969 None,
1970 ));
1971
1972 let target_fields = Fields::from(vec![Field::new(
1973 "companies",
1974 DataType::List(Arc::new(Field::new(
1975 "item",
1976 DataType::Struct(Fields::from(vec![
1977 Field::new("company_id", DataType::Int32, true),
1978 Field::new("company_name", DataType::Utf8, true),
1979 Field::new("count", DataType::Int32, true),
1980 ])),
1981 true,
1982 ))),
1983 true,
1984 )]);
1985
1986 let left_batch = RecordBatch::try_new(
1987 Arc::new(Schema::new(vec![Field::new(
1988 "companies",
1989 left_list.data_type().clone(),
1990 true,
1991 )])),
1992 vec![left_list as ArrayRef],
1993 )
1994 .unwrap();
1995
1996 let right_batch = RecordBatch::try_new(
1997 Arc::new(Schema::new(vec![Field::new(
1998 "companies",
1999 right_list.data_type().clone(),
2000 true,
2001 )])),
2002 vec![right_list as ArrayRef],
2003 )
2004 .unwrap();
2005
2006 let merged = left_batch
2007 .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2008 .unwrap();
2009
2010 let merged_list = merged
2012 .column_by_name("companies")
2013 .unwrap()
2014 .as_any()
2015 .downcast_ref::<ListArray>()
2016 .unwrap();
2017 let merged_struct = merged_list.values().as_struct();
2018
2019 assert_eq!(merged_struct.num_columns(), 3);
2021 assert!(merged_struct.column_by_name("company_id").is_some());
2022 assert!(merged_struct.column_by_name("company_name").is_some());
2023 assert!(merged_struct.column_by_name("count").is_some());
2024
2025 let company_id = merged_struct
2027 .column_by_name("company_id")
2028 .unwrap()
2029 .as_any()
2030 .downcast_ref::<Int32Array>()
2031 .unwrap();
2032 assert!(company_id.is_null(0));
2033 assert!(company_id.is_null(1));
2034
2035 let company_name = merged_struct
2036 .column_by_name("company_name")
2037 .unwrap()
2038 .as_any()
2039 .downcast_ref::<StringArray>()
2040 .unwrap();
2041 assert_eq!(company_name.value(0), "Google");
2042 assert_eq!(company_name.value(1), "Microsoft");
2043
2044 let count = merged_struct
2045 .column_by_name("count")
2046 .unwrap()
2047 .as_any()
2048 .downcast_ref::<Int32Array>()
2049 .unwrap();
2050 assert!(count.is_null(0));
2051 assert!(count.is_null(1));
2052 }
2053}