1use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12 cast::AsArray, Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray,
13 GenericListArray, LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
14 StructArray, UInt32Array, UInt8Array,
15};
16use arrow_array::{
17 new_null_array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30use crate::list::ListArrayExt;
31pub use floats::*;
32
33pub mod cast;
34pub mod json;
35pub mod list;
36pub mod memory;
37pub mod r#struct;
38
39pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
41
42pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
44
45pub const BLOB_META_KEY: &str = "lance-encoding:blob";
48pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
50
51type Result<T> = std::result::Result<T, ArrowError>;
52
53pub trait DataTypeExt {
54 fn is_binary_like(&self) -> bool;
67
68 fn is_struct(&self) -> bool;
70
71 fn is_fixed_stride(&self) -> bool;
76
77 fn is_dictionary(&self) -> bool;
79
80 fn byte_width(&self) -> usize;
83
84 fn byte_width_opt(&self) -> Option<usize>;
87}
88
89impl DataTypeExt for DataType {
90 fn is_binary_like(&self) -> bool {
91 use DataType::*;
92 matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
93 }
94
95 fn is_struct(&self) -> bool {
96 matches!(self, Self::Struct(_))
97 }
98
99 fn is_fixed_stride(&self) -> bool {
100 use DataType::*;
101 matches!(
102 self,
103 Boolean
104 | UInt8
105 | UInt16
106 | UInt32
107 | UInt64
108 | Int8
109 | Int16
110 | Int32
111 | Int64
112 | Float16
113 | Float32
114 | Float64
115 | Decimal128(_, _)
116 | Decimal256(_, _)
117 | FixedSizeList(_, _)
118 | FixedSizeBinary(_)
119 | Duration(_)
120 | Timestamp(_, _)
121 | Date32
122 | Date64
123 | Time32(_)
124 | Time64(_)
125 )
126 }
127
128 fn is_dictionary(&self) -> bool {
129 matches!(self, Self::Dictionary(_, _))
130 }
131
132 fn byte_width_opt(&self) -> Option<usize> {
133 match self {
134 Self::Int8 => Some(1),
135 Self::Int16 => Some(2),
136 Self::Int32 => Some(4),
137 Self::Int64 => Some(8),
138 Self::UInt8 => Some(1),
139 Self::UInt16 => Some(2),
140 Self::UInt32 => Some(4),
141 Self::UInt64 => Some(8),
142 Self::Float16 => Some(2),
143 Self::Float32 => Some(4),
144 Self::Float64 => Some(8),
145 Self::Date32 => Some(4),
146 Self::Date64 => Some(8),
147 Self::Time32(_) => Some(4),
148 Self::Time64(_) => Some(8),
149 Self::Timestamp(_, _) => Some(8),
150 Self::Duration(_) => Some(8),
151 Self::Decimal128(_, _) => Some(16),
152 Self::Decimal256(_, _) => Some(32),
153 Self::Interval(unit) => match unit {
154 IntervalUnit::YearMonth => Some(4),
155 IntervalUnit::DayTime => Some(8),
156 IntervalUnit::MonthDayNano => Some(16),
157 },
158 Self::FixedSizeBinary(s) => Some(*s as usize),
159 Self::FixedSizeList(dt, s) => dt
160 .data_type()
161 .byte_width_opt()
162 .map(|width| width * *s as usize),
163 _ => None,
164 }
165 }
166
167 fn byte_width(&self) -> usize {
168 self.byte_width_opt()
169 .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
170 }
171}
172
173pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
191 values: T,
192 offsets: &PrimitiveArray<Offset>,
193) -> Result<GenericListArray<Offset::Native>>
194where
195 Offset::Native: OffsetSizeTrait,
196{
197 let data_type = if Offset::Native::IS_LARGE {
198 DataType::LargeList(Arc::new(Field::new(
199 "item",
200 values.data_type().clone(),
201 true,
202 )))
203 } else {
204 DataType::List(Arc::new(Field::new(
205 "item",
206 values.data_type().clone(),
207 true,
208 )))
209 };
210 let data = ArrayDataBuilder::new(data_type)
211 .len(offsets.len() - 1)
212 .add_buffer(offsets.into_data().buffers()[0].clone())
213 .add_child_data(values.into_data())
214 .build()?;
215
216 Ok(GenericListArray::from(data))
217}
218
219pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
220 DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
221}
222
223pub trait FixedSizeListArrayExt {
224 fn try_new_from_values<T: Array + 'static>(
243 values: T,
244 list_size: i32,
245 ) -> Result<FixedSizeListArray>;
246
247 fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
261
262 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
265}
266
267impl FixedSizeListArrayExt for FixedSizeListArray {
268 fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
269 let field = Arc::new(Field::new("item", values.data_type().clone(), true));
270 let values = Arc::new(values);
271
272 Self::try_new(field, list_size, values, None)
273 }
274
275 fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
276 if n >= self.len() {
277 return Ok(self.clone());
278 }
279 let mut rng = SmallRng::from_os_rng();
280 let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
281 take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
282 }
283
284 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
285 match self.data_type() {
286 DataType::FixedSizeList(field, size) => match field.data_type() {
287 DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
288 DataType::Int8 => Ok(Self::new(
289 Arc::new(arrow_schema::Field::new(
290 field.name(),
291 DataType::Float32,
292 field.is_nullable(),
293 )),
294 *size,
295 Arc::new(Float32Array::from_iter_values(
296 self.values()
297 .as_any()
298 .downcast_ref::<Int8Array>()
299 .ok_or(ArrowError::ParseError(
300 "Fail to cast primitive array to Int8Type".to_string(),
301 ))?
302 .into_iter()
303 .filter_map(|x| x.map(|y| y as f32)),
304 )),
305 self.nulls().cloned(),
306 )),
307 DataType::Int16 => Ok(Self::new(
308 Arc::new(arrow_schema::Field::new(
309 field.name(),
310 DataType::Float32,
311 field.is_nullable(),
312 )),
313 *size,
314 Arc::new(Float32Array::from_iter_values(
315 self.values()
316 .as_any()
317 .downcast_ref::<Int16Array>()
318 .ok_or(ArrowError::ParseError(
319 "Fail to cast primitive array to Int8Type".to_string(),
320 ))?
321 .into_iter()
322 .filter_map(|x| x.map(|y| y as f32)),
323 )),
324 self.nulls().cloned(),
325 )),
326 DataType::Int32 => Ok(Self::new(
327 Arc::new(arrow_schema::Field::new(
328 field.name(),
329 DataType::Float32,
330 field.is_nullable(),
331 )),
332 *size,
333 Arc::new(Float32Array::from_iter_values(
334 self.values()
335 .as_any()
336 .downcast_ref::<Int32Array>()
337 .ok_or(ArrowError::ParseError(
338 "Fail to cast primitive array to Int8Type".to_string(),
339 ))?
340 .into_iter()
341 .filter_map(|x| x.map(|y| y as f32)),
342 )),
343 self.nulls().cloned(),
344 )),
345 DataType::Int64 => Ok(Self::new(
346 Arc::new(arrow_schema::Field::new(
347 field.name(),
348 DataType::Float64,
349 field.is_nullable(),
350 )),
351 *size,
352 Arc::new(Float64Array::from_iter_values(
353 self.values()
354 .as_any()
355 .downcast_ref::<Int64Array>()
356 .ok_or(ArrowError::ParseError(
357 "Fail to cast primitive array to Int8Type".to_string(),
358 ))?
359 .into_iter()
360 .filter_map(|x| x.map(|y| y as f64)),
361 )),
362 self.nulls().cloned(),
363 )),
364 DataType::UInt8 => Ok(Self::new(
365 Arc::new(arrow_schema::Field::new(
366 field.name(),
367 DataType::Float64,
368 field.is_nullable(),
369 )),
370 *size,
371 Arc::new(Float64Array::from_iter_values(
372 self.values()
373 .as_any()
374 .downcast_ref::<UInt8Array>()
375 .ok_or(ArrowError::ParseError(
376 "Fail to cast primitive array to Int8Type".to_string(),
377 ))?
378 .into_iter()
379 .filter_map(|x| x.map(|y| y as f64)),
380 )),
381 self.nulls().cloned(),
382 )),
383 DataType::UInt32 => Ok(Self::new(
384 Arc::new(arrow_schema::Field::new(
385 field.name(),
386 DataType::Float64,
387 field.is_nullable(),
388 )),
389 *size,
390 Arc::new(Float64Array::from_iter_values(
391 self.values()
392 .as_any()
393 .downcast_ref::<UInt32Array>()
394 .ok_or(ArrowError::ParseError(
395 "Fail to cast primitive array to Int8Type".to_string(),
396 ))?
397 .into_iter()
398 .filter_map(|x| x.map(|y| y as f64)),
399 )),
400 self.nulls().cloned(),
401 )),
402 data_type => Err(ArrowError::ParseError(format!(
403 "Expect either floating type or integer got {:?}",
404 data_type
405 ))),
406 },
407 data_type => Err(ArrowError::ParseError(format!(
408 "Expect either FixedSizeList got {:?}",
409 data_type
410 ))),
411 }
412 }
413}
414
415pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
418 arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
419}
420
421pub trait FixedSizeBinaryArrayExt {
422 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
441}
442
443impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
444 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
445 let data_type = DataType::FixedSizeBinary(stride);
446 let data = ArrayDataBuilder::new(data_type)
447 .len(values.len() / stride as usize)
448 .add_buffer(values.into_data().buffers()[0].clone())
449 .build()?;
450 Ok(Self::from(data))
451 }
452}
453
454pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
455 arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
456}
457
458pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
459 match arr.data_type() {
460 DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
461 DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
462 _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
463 }
464}
465
466pub trait RecordBatchExt {
468 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
498
499 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
501
502 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
506
507 fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
552
553 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
563
564 fn drop_column(&self, name: &str) -> Result<RecordBatch>;
568
569 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
571
572 fn replace_column_schema_by_name(
574 &self,
575 name: &str,
576 new_data_type: DataType,
577 column: Arc<dyn Array>,
578 ) -> Result<RecordBatch>;
579
580 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
582
583 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
585
586 fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
588
589 fn metadata(&self) -> &HashMap<String, String>;
591
592 fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
594 let mut metadata = self.metadata().clone();
595 metadata.insert(key, value);
596 self.with_metadata(metadata)
597 }
598
599 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
601
602 fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
604
605 fn shrink_to_fit(&self) -> Result<RecordBatch>;
607}
608
609impl RecordBatchExt for RecordBatch {
610 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
611 let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
612 let mut new_columns = self.columns().to_vec();
613 new_columns.push(arr);
614 Self::try_new(new_schema, new_columns)
615 }
616
617 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
618 let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
619 let mut new_columns = self.columns().to_vec();
620 new_columns.insert(index, arr);
621 Self::try_new(new_schema, new_columns)
622 }
623
624 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
625 let schema = Arc::new(Schema::new_with_metadata(
626 arr.fields().to_vec(),
627 self.schema().metadata.clone(),
628 ));
629 let batch = Self::from(arr);
630 batch.with_schema(schema)
631 }
632
633 fn merge(&self, other: &Self) -> Result<Self> {
634 if self.num_rows() != other.num_rows() {
635 return Err(ArrowError::InvalidArgumentError(format!(
636 "Attempt to merge two RecordBatch with different sizes: {} != {}",
637 self.num_rows(),
638 other.num_rows()
639 )));
640 }
641 let left_struct_array: StructArray = self.clone().into();
642 let right_struct_array: StructArray = other.clone().into();
643 self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
644 }
645
646 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
647 if self.num_rows() != other.num_rows() {
648 return Err(ArrowError::InvalidArgumentError(format!(
649 "Attempt to merge two RecordBatch with different sizes: {} != {}",
650 self.num_rows(),
651 other.num_rows()
652 )));
653 }
654 let left_struct_array: StructArray = self.clone().into();
655 let right_struct_array: StructArray = other.clone().into();
656 self.try_new_from_struct_array(merge_with_schema(
657 &left_struct_array,
658 &right_struct_array,
659 schema.fields(),
660 ))
661 }
662
663 fn drop_column(&self, name: &str) -> Result<Self> {
664 let mut fields = vec![];
665 let mut columns = vec![];
666 for i in 0..self.schema().fields.len() {
667 if self.schema().field(i).name() != name {
668 fields.push(self.schema().field(i).clone());
669 columns.push(self.column(i).clone());
670 }
671 }
672 Self::try_new(
673 Arc::new(Schema::new_with_metadata(
674 fields,
675 self.schema().metadata().clone(),
676 )),
677 columns,
678 )
679 }
680
681 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
682 let mut fields = self.schema().fields().to_vec();
683 if index >= fields.len() {
684 return Err(ArrowError::InvalidArgumentError(format!(
685 "Index out of bounds: {}",
686 index
687 )));
688 }
689 fields[index] = Arc::new(Field::new(
690 new_name,
691 fields[index].data_type().clone(),
692 fields[index].is_nullable(),
693 ));
694 Self::try_new(
695 Arc::new(Schema::new_with_metadata(
696 fields,
697 self.schema().metadata().clone(),
698 )),
699 self.columns().to_vec(),
700 )
701 }
702
703 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
704 let mut columns = self.columns().to_vec();
705 let field_i = self
706 .schema()
707 .fields()
708 .iter()
709 .position(|f| f.name() == name)
710 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
711 columns[field_i] = column;
712 Self::try_new(self.schema(), columns)
713 }
714
715 fn replace_column_schema_by_name(
716 &self,
717 name: &str,
718 new_data_type: DataType,
719 column: Arc<dyn Array>,
720 ) -> Result<RecordBatch> {
721 let fields = self
722 .schema()
723 .fields()
724 .iter()
725 .map(|x| {
726 if x.name() != name {
727 x.clone()
728 } else {
729 let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
730 Arc::new(new_field)
731 }
732 })
733 .collect::<Vec<_>>();
734 let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
735 let mut columns = self.columns().to_vec();
736 let field_i = self
737 .schema()
738 .fields()
739 .iter()
740 .position(|f| f.name() == name)
741 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
742 columns[field_i] = column;
743 Self::try_new(Arc::new(schema), columns)
744 }
745
746 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
747 let split = name.split('.').collect::<Vec<_>>();
748 if split.is_empty() {
749 return None;
750 }
751
752 self.column_by_name(split[0])
753 .and_then(|arr| get_sub_array(arr, &split[1..]))
754 }
755
756 fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
757 let struct_array: StructArray = self.clone().into();
758 self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
759 }
760
761 fn metadata(&self) -> &HashMap<String, String> {
762 self.schema_ref().metadata()
763 }
764
765 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
766 let mut schema = self.schema_ref().as_ref().clone();
767 schema.metadata = metadata;
768 Self::try_new(schema.into(), self.columns().into())
769 }
770
771 fn take(&self, indices: &UInt32Array) -> Result<Self> {
772 let struct_array: StructArray = self.clone().into();
773 let taken = take(&struct_array, indices, None)?;
774 self.try_new_from_struct_array(taken.as_struct().clone())
775 }
776
777 fn shrink_to_fit(&self) -> Result<Self> {
778 crate::deepcopy::deep_copy_batch_sliced(self)
780 }
781}
782
783fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
784 if fields.is_empty() {
785 return Ok(StructArray::new_empty_fields(
786 struct_array.len(),
787 struct_array.nulls().cloned(),
788 ));
789 }
790 let mut columns: Vec<ArrayRef> = vec![];
791 for field in fields.iter() {
792 if let Some(col) = struct_array.column_by_name(field.name()) {
793 match field.data_type() {
794 DataType::Struct(subfields) => {
796 let projected = project(col.as_struct(), subfields)?;
797 columns.push(Arc::new(projected));
798 }
799 _ => {
800 columns.push(col.clone());
801 }
802 }
803 } else {
804 return Err(ArrowError::SchemaError(format!(
805 "field {} does not exist in the RecordBatch",
806 field.name()
807 )));
808 }
809 }
810 StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
812}
813
814fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
815 let left_list: &GenericListArray<T> = left.as_list();
816 let right_list: &GenericListArray<T> = right.as_list();
817 left_list.offsets().inner() == right_list.offsets().inner()
818}
819
820fn merge_list_structs_helper<T: OffsetSizeTrait>(
821 left: &dyn Array,
822 right: &dyn Array,
823 items_field_name: impl Into<String>,
824 items_nullable: bool,
825) -> Arc<dyn Array> {
826 let left_list: &GenericListArray<T> = left.as_list();
827 let right_list: &GenericListArray<T> = right.as_list();
828 let left_struct = left_list.values();
829 let right_struct = right_list.values();
830 let left_struct_arr = left_struct.as_struct();
831 let right_struct_arr = right_struct.as_struct();
832 let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
833 let items_field = Arc::new(Field::new(
834 items_field_name,
835 merged_items.data_type().clone(),
836 items_nullable,
837 ));
838 Arc::new(GenericListArray::<T>::new(
839 items_field,
840 left_list.offsets().clone(),
841 merged_items,
842 left_list.nulls().cloned(),
843 ))
844}
845
846fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
847 left: &dyn Array,
848 right: &dyn Array,
849 not_null: &dyn Array,
850 items_field_name: impl Into<String>,
851) -> Arc<dyn Array> {
852 let left_list: &GenericListArray<T> = left.as_list::<T>();
853 let not_null_list = not_null.as_list::<T>();
854 let right_list = right.as_list::<T>();
855
856 let left_struct = left_list.values().as_struct();
857 let not_null_struct: &StructArray = not_null_list.values().as_struct();
858 let right_struct = right_list.values().as_struct();
859
860 let values_len = not_null_list.values().len();
861 let mut merged_fields =
862 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
863 let mut merged_columns =
864 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
865
866 for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
867 merged_fields.push(field.clone());
868 if let Some(val) = not_null_struct.column_by_name(field.name()) {
869 merged_columns.push(val.clone());
870 } else {
871 merged_columns.push(new_null_array(field.data_type(), values_len))
872 }
873 }
874 for (_, field) in right_struct
875 .columns()
876 .iter()
877 .zip(right_struct.fields())
878 .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
879 {
880 merged_fields.push(field.clone());
881 if let Some(val) = not_null_struct.column_by_name(field.name()) {
882 merged_columns.push(val.clone());
883 } else {
884 merged_columns.push(new_null_array(field.data_type(), values_len));
885 }
886 }
887
888 let merged_struct = Arc::new(StructArray::new(
889 Fields::from(merged_fields),
890 merged_columns,
891 not_null_struct.nulls().cloned(),
892 ));
893 let items_field = Arc::new(Field::new(
894 items_field_name,
895 merged_struct.data_type().clone(),
896 true,
897 ));
898 Arc::new(GenericListArray::<T>::new(
899 items_field,
900 not_null_list.offsets().clone(),
901 merged_struct,
902 not_null_list.nulls().cloned(),
903 ))
904}
905
906fn merge_list_struct_null(
907 left: &dyn Array,
908 right: &dyn Array,
909 not_null: &dyn Array,
910) -> Arc<dyn Array> {
911 match left.data_type() {
912 DataType::List(left_field) => {
913 merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
914 }
915 DataType::LargeList(left_field) => {
916 merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
917 }
918 _ => unreachable!(),
919 }
920}
921
922fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
923 if left.null_count() == left.len() {
927 return merge_list_struct_null(left, right, right);
928 } else if right.null_count() == right.len() {
929 return merge_list_struct_null(left, right, left);
930 }
931 match (left.data_type(), right.data_type()) {
932 (DataType::List(left_field), DataType::List(_)) => {
933 if !lists_have_same_offsets_helper::<i32>(left, right) {
934 panic!("Attempt to merge list struct arrays which do not have same offsets");
935 }
936 merge_list_structs_helper::<i32>(
937 left,
938 right,
939 left_field.name(),
940 left_field.is_nullable(),
941 )
942 }
943 (DataType::LargeList(left_field), DataType::LargeList(_)) => {
944 if !lists_have_same_offsets_helper::<i64>(left, right) {
945 panic!("Attempt to merge list struct arrays which do not have same offsets");
946 }
947 merge_list_structs_helper::<i64>(
948 left,
949 right,
950 left_field.name(),
951 left_field.is_nullable(),
952 )
953 }
954 _ => unreachable!(),
955 }
956}
957
958fn normalize_validity(
961 validity: Option<&arrow_buffer::NullBuffer>,
962) -> Option<&arrow_buffer::NullBuffer> {
963 validity.and_then(|v| {
964 if v.null_count() == v.len() {
965 None
966 } else {
967 Some(v)
968 }
969 })
970}
971
972fn merge_struct_validity(
977 left_validity: Option<&arrow_buffer::NullBuffer>,
978 right_validity: Option<&arrow_buffer::NullBuffer>,
979) -> Option<arrow_buffer::NullBuffer> {
980 let left_normalized = normalize_validity(left_validity);
982 let right_normalized = normalize_validity(right_validity);
983
984 match (left_normalized, right_normalized) {
985 (None, None) => None,
987 (Some(left), None) => Some(left.clone()),
988 (None, Some(right)) => Some(right.clone()),
989 (Some(left), Some(right)) => {
990 if left.null_count() == 0 && right.null_count() == 0 {
992 return Some(left.clone());
993 }
994
995 let left_buffer = left.inner();
996 let right_buffer = right.inner();
997
998 let merged_buffer = left_buffer | right_buffer;
1001
1002 Some(arrow_buffer::NullBuffer::from(merged_buffer))
1003 }
1004 }
1005}
1006
1007fn merge_list_child_values(
1008 child_field: &Field,
1009 left_values: ArrayRef,
1010 right_values: ArrayRef,
1011) -> ArrayRef {
1012 match child_field.data_type() {
1013 DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1014 left_values.as_struct(),
1015 right_values.as_struct(),
1016 child_fields,
1017 )) as ArrayRef,
1018 DataType::List(grandchild) => {
1019 let left_list = left_values
1020 .as_any()
1021 .downcast_ref::<ListArray>()
1022 .expect("left list values should be ListArray");
1023 let right_list = right_values
1024 .as_any()
1025 .downcast_ref::<ListArray>()
1026 .expect("right list values should be ListArray");
1027 let merged_values = merge_list_child_values(
1028 grandchild.as_ref(),
1029 left_list.values().clone(),
1030 right_list.values().clone(),
1031 );
1032 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1033 Arc::new(ListArray::new(
1034 grandchild.clone(),
1035 left_list.offsets().clone(),
1036 merged_values,
1037 merged_validity,
1038 )) as ArrayRef
1039 }
1040 DataType::LargeList(grandchild) => {
1041 let left_list = left_values
1042 .as_any()
1043 .downcast_ref::<LargeListArray>()
1044 .expect("left list values should be LargeListArray");
1045 let right_list = right_values
1046 .as_any()
1047 .downcast_ref::<LargeListArray>()
1048 .expect("right list values should be LargeListArray");
1049 let merged_values = merge_list_child_values(
1050 grandchild.as_ref(),
1051 left_list.values().clone(),
1052 right_list.values().clone(),
1053 );
1054 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1055 Arc::new(LargeListArray::new(
1056 grandchild.clone(),
1057 left_list.offsets().clone(),
1058 merged_values,
1059 merged_validity,
1060 )) as ArrayRef
1061 }
1062 DataType::FixedSizeList(grandchild, list_size) => {
1063 let left_list = left_values
1064 .as_any()
1065 .downcast_ref::<FixedSizeListArray>()
1066 .expect("left list values should be FixedSizeListArray");
1067 let right_list = right_values
1068 .as_any()
1069 .downcast_ref::<FixedSizeListArray>()
1070 .expect("right list values should be FixedSizeListArray");
1071 let merged_values = merge_list_child_values(
1072 grandchild.as_ref(),
1073 left_list.values().clone(),
1074 right_list.values().clone(),
1075 );
1076 let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1077 Arc::new(FixedSizeListArray::new(
1078 grandchild.clone(),
1079 *list_size,
1080 merged_values,
1081 merged_validity,
1082 )) as ArrayRef
1083 }
1084 _ => left_values.clone(),
1085 }
1086}
1087
1088fn adjust_child_validity(
1092 child: &ArrayRef,
1093 parent_validity: Option<&arrow_buffer::NullBuffer>,
1094) -> ArrayRef {
1095 let parent_validity = match parent_validity {
1097 None => return child.clone(),
1098 Some(p) if p.null_count() == 0 => return child.clone(), Some(p) => p,
1100 };
1101
1102 let child_validity = child.nulls();
1103
1104 let new_validity = match child_validity {
1106 None => {
1107 parent_validity.clone()
1109 }
1110 Some(child_nulls) => {
1111 let child_buffer = child_nulls.inner();
1112 let parent_buffer = parent_validity.inner();
1113
1114 let merged_buffer = child_buffer & parent_buffer;
1117
1118 arrow_buffer::NullBuffer::from(merged_buffer)
1119 }
1120 };
1121
1122 arrow_array::make_array(
1124 arrow_data::ArrayData::try_new(
1125 child.data_type().clone(),
1126 child.len(),
1127 Some(new_validity.into_inner().into_inner()),
1128 child.offset(),
1129 child.to_data().buffers().to_vec(),
1130 child.to_data().child_data().to_vec(),
1131 )
1132 .unwrap(),
1133 )
1134}
1135
1136fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1137 let mut fields: Vec<Field> = vec![];
1138 let mut columns: Vec<ArrayRef> = vec![];
1139 let right_fields = right_struct_array.fields();
1140 let right_columns = right_struct_array.columns();
1141
1142 let left_validity = left_struct_array.nulls();
1144 let right_validity = right_struct_array.nulls();
1145
1146 let merged_validity = merge_struct_validity(left_validity, right_validity);
1148
1149 for (left_field, left_column) in left_struct_array
1151 .fields()
1152 .iter()
1153 .zip(left_struct_array.columns().iter())
1154 {
1155 match right_fields
1156 .iter()
1157 .position(|f| f.name() == left_field.name())
1158 {
1159 Some(right_index) => {
1161 let right_field = right_fields.get(right_index).unwrap();
1162 let right_column = right_columns.get(right_index).unwrap();
1163 match (left_field.data_type(), right_field.data_type()) {
1165 (DataType::Struct(_), DataType::Struct(_)) => {
1166 let left_sub_array = left_column.as_struct();
1167 let right_sub_array = right_column.as_struct();
1168 let merged_sub_array = merge(left_sub_array, right_sub_array);
1169 fields.push(Field::new(
1170 left_field.name(),
1171 merged_sub_array.data_type().clone(),
1172 left_field.is_nullable(),
1173 ));
1174 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1175 }
1176 (DataType::List(left_list), DataType::List(right_list))
1177 if left_list.data_type().is_struct()
1178 && right_list.data_type().is_struct() =>
1179 {
1180 if left_list.data_type() == right_list.data_type() {
1182 fields.push(left_field.as_ref().clone());
1183 columns.push(left_column.clone());
1184 }
1185 let merged_sub_array = merge_list_struct(&left_column, &right_column);
1189
1190 fields.push(Field::new(
1191 left_field.name(),
1192 merged_sub_array.data_type().clone(),
1193 left_field.is_nullable(),
1194 ));
1195 columns.push(merged_sub_array);
1196 }
1197 _ => {
1199 fields.push(left_field.as_ref().clone());
1201 let adjusted_column = adjust_child_validity(left_column, left_validity);
1203 columns.push(adjusted_column);
1204 }
1205 }
1206 }
1207 None => {
1208 fields.push(left_field.as_ref().clone());
1209 let adjusted_column = adjust_child_validity(left_column, left_validity);
1211 columns.push(adjusted_column);
1212 }
1213 }
1214 }
1215
1216 right_fields
1218 .iter()
1219 .zip(right_columns.iter())
1220 .for_each(|(field, column)| {
1221 if !left_struct_array
1223 .fields()
1224 .iter()
1225 .any(|f| f.name() == field.name())
1226 {
1227 fields.push(field.as_ref().clone());
1228 let adjusted_column = adjust_child_validity(column, right_validity);
1231 columns.push(adjusted_column);
1232 }
1233 });
1234
1235 StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1236}
1237
1238fn merge_with_schema(
1239 left_struct_array: &StructArray,
1240 right_struct_array: &StructArray,
1241 fields: &Fields,
1242) -> StructArray {
1243 fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1245 match (left, right) {
1246 (DataType::Struct(_), DataType::Struct(_)) => true,
1247 (DataType::Struct(_), _) => false,
1248 (_, DataType::Struct(_)) => false,
1249 _ => true,
1250 }
1251 }
1252
1253 let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1254 let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1255
1256 let left_fields = left_struct_array.fields();
1257 let left_columns = left_struct_array.columns();
1258 let right_fields = right_struct_array.fields();
1259 let right_columns = right_struct_array.columns();
1260
1261 let left_validity = left_struct_array.nulls();
1263 let right_validity = right_struct_array.nulls();
1264
1265 let merged_validity = merge_struct_validity(left_validity, right_validity);
1267
1268 for field in fields {
1269 let left_match_idx = left_fields.iter().position(|f| {
1270 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1271 });
1272 let right_match_idx = right_fields.iter().position(|f| {
1273 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1274 });
1275
1276 match (left_match_idx, right_match_idx) {
1277 (None, Some(right_idx)) => {
1278 output_fields.push(right_fields[right_idx].as_ref().clone());
1279 let adjusted_column =
1281 adjust_child_validity(&right_columns[right_idx], right_validity);
1282 columns.push(adjusted_column);
1283 }
1284 (Some(left_idx), None) => {
1285 output_fields.push(left_fields[left_idx].as_ref().clone());
1286 let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1288 columns.push(adjusted_column);
1289 }
1290 (Some(left_idx), Some(right_idx)) => {
1291 match field.data_type() {
1292 DataType::Struct(child_fields) => {
1293 let left_sub_array = left_columns[left_idx].as_struct();
1294 let right_sub_array = right_columns[right_idx].as_struct();
1295 let merged_sub_array =
1296 merge_with_schema(left_sub_array, right_sub_array, child_fields);
1297 output_fields.push(Field::new(
1298 field.name(),
1299 merged_sub_array.data_type().clone(),
1300 field.is_nullable(),
1301 ));
1302 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1303 }
1304 DataType::List(child_field) => {
1305 let left_list = left_columns[left_idx]
1306 .as_any()
1307 .downcast_ref::<ListArray>()
1308 .unwrap();
1309 let right_list = right_columns[right_idx]
1310 .as_any()
1311 .downcast_ref::<ListArray>()
1312 .unwrap();
1313 let merged_values = merge_list_child_values(
1314 child_field.as_ref(),
1315 left_list.trimmed_values(),
1316 right_list.trimmed_values(),
1317 );
1318 let merged_validity =
1319 merge_struct_validity(left_list.nulls(), right_list.nulls());
1320 let merged_list = ListArray::new(
1321 child_field.clone(),
1322 left_list.offsets().clone(),
1323 merged_values,
1324 merged_validity,
1325 );
1326 output_fields.push(field.as_ref().clone());
1327 columns.push(Arc::new(merged_list) as ArrayRef);
1328 }
1329 DataType::LargeList(child_field) => {
1330 let left_list = left_columns[left_idx]
1331 .as_any()
1332 .downcast_ref::<LargeListArray>()
1333 .unwrap();
1334 let right_list = right_columns[right_idx]
1335 .as_any()
1336 .downcast_ref::<LargeListArray>()
1337 .unwrap();
1338 let merged_values = merge_list_child_values(
1339 child_field.as_ref(),
1340 left_list.trimmed_values(),
1341 right_list.trimmed_values(),
1342 );
1343 let merged_validity =
1344 merge_struct_validity(left_list.nulls(), right_list.nulls());
1345 let merged_list = LargeListArray::new(
1346 child_field.clone(),
1347 left_list.offsets().clone(),
1348 merged_values,
1349 merged_validity,
1350 );
1351 output_fields.push(field.as_ref().clone());
1352 columns.push(Arc::new(merged_list) as ArrayRef);
1353 }
1354 DataType::FixedSizeList(child_field, list_size) => {
1355 let left_list = left_columns[left_idx]
1356 .as_any()
1357 .downcast_ref::<FixedSizeListArray>()
1358 .unwrap();
1359 let right_list = right_columns[right_idx]
1360 .as_any()
1361 .downcast_ref::<FixedSizeListArray>()
1362 .unwrap();
1363 let merged_values = merge_list_child_values(
1364 child_field.as_ref(),
1365 left_list.values().clone(),
1366 right_list.values().clone(),
1367 );
1368 let merged_validity =
1369 merge_struct_validity(left_list.nulls(), right_list.nulls());
1370 let merged_list = FixedSizeListArray::new(
1371 child_field.clone(),
1372 *list_size,
1373 merged_values,
1374 merged_validity,
1375 );
1376 output_fields.push(field.as_ref().clone());
1377 columns.push(Arc::new(merged_list) as ArrayRef);
1378 }
1379 _ => {
1380 output_fields.push(left_fields[left_idx].as_ref().clone());
1381 let adjusted_column =
1383 adjust_child_validity(&left_columns[left_idx], left_validity);
1384 columns.push(adjusted_column);
1385 }
1386 }
1387 }
1388 (None, None) => {
1389 }
1391 }
1392 }
1393
1394 StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1395}
1396
1397fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1398 if components.is_empty() {
1399 return Some(array);
1400 }
1401 if !matches!(array.data_type(), DataType::Struct(_)) {
1402 return None;
1403 }
1404 let struct_arr = array.as_struct();
1405 struct_arr
1406 .column_by_name(components[0])
1407 .and_then(|arr| get_sub_array(arr, &components[1..]))
1408}
1409
1410pub fn interleave_batches(
1414 batches: &[RecordBatch],
1415 indices: &[(usize, usize)],
1416) -> Result<RecordBatch> {
1417 let first_batch = batches.first().ok_or_else(|| {
1418 ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1419 })?;
1420 let schema = first_batch.schema();
1421 let num_columns = first_batch.num_columns();
1422 let mut columns = Vec::with_capacity(num_columns);
1423 let mut chunks = Vec::with_capacity(batches.len());
1424
1425 for i in 0..num_columns {
1426 for batch in batches {
1427 chunks.push(batch.column(i).as_ref());
1428 }
1429 let new_column = interleave(&chunks, indices)?;
1430 columns.push(new_column);
1431 chunks.clear();
1432 }
1433
1434 RecordBatch::try_new(schema, columns)
1435}
1436
1437pub trait BufferExt {
1438 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1453
1454 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1463}
1464
1465fn is_pwr_two(n: u64) -> bool {
1466 n & (n - 1) == 0
1467}
1468
1469impl BufferExt for arrow_buffer::Buffer {
1470 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1471 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1472 {
1473 let size_bytes = bytes.len();
1475 Self::copy_bytes_bytes(bytes, size_bytes)
1476 } else {
1477 unsafe {
1480 Self::from_custom_allocation(
1481 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1482 bytes.len(),
1483 Arc::new(bytes),
1484 )
1485 }
1486 }
1487 }
1488
1489 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1490 assert!(size_bytes >= bytes.len());
1491 let mut buf = MutableBuffer::with_capacity(size_bytes);
1492 let to_fill = size_bytes - bytes.len();
1493 buf.extend(bytes);
1494 buf.extend(std::iter::repeat_n(0_u8, to_fill));
1495
1496 buf.shrink_to_fit();
1499
1500 Self::from(buf)
1501 }
1502}
1503
1504#[cfg(test)]
1505mod tests {
1506 use super::*;
1507 use arrow_array::{new_empty_array, new_null_array, ListArray, StringArray};
1508 use arrow_array::{Float32Array, Int32Array, StructArray};
1509 use arrow_buffer::OffsetBuffer;
1510
1511 #[test]
1512 fn test_merge_recursive() {
1513 let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1514 let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1515 let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1516 let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1517
1518 let left_schema = Schema::new(vec![
1519 Field::new("a", DataType::Int32, true),
1520 Field::new(
1521 "b",
1522 DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1523 true,
1524 ),
1525 ]);
1526 let left_batch = RecordBatch::try_new(
1527 Arc::new(left_schema),
1528 vec![
1529 Arc::new(a_array.clone()),
1530 Arc::new(StructArray::from(vec![(
1531 Arc::new(Field::new("c", DataType::Int32, true)),
1532 Arc::new(c_array.clone()) as ArrayRef,
1533 )])),
1534 ],
1535 )
1536 .unwrap();
1537
1538 let right_schema = Schema::new(vec![
1539 Field::new("e", DataType::Int32, true),
1540 Field::new(
1541 "b",
1542 DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1543 true,
1544 ),
1545 ]);
1546 let right_batch = RecordBatch::try_new(
1547 Arc::new(right_schema),
1548 vec![
1549 Arc::new(e_array.clone()),
1550 Arc::new(StructArray::from(vec![(
1551 Arc::new(Field::new("d", DataType::Utf8, true)),
1552 Arc::new(d_array.clone()) as ArrayRef,
1553 )])) as ArrayRef,
1554 ],
1555 )
1556 .unwrap();
1557
1558 let merged_schema = Schema::new(vec![
1559 Field::new("a", DataType::Int32, true),
1560 Field::new(
1561 "b",
1562 DataType::Struct(
1563 vec![
1564 Field::new("c", DataType::Int32, true),
1565 Field::new("d", DataType::Utf8, true),
1566 ]
1567 .into(),
1568 ),
1569 true,
1570 ),
1571 Field::new("e", DataType::Int32, true),
1572 ]);
1573 let merged_batch = RecordBatch::try_new(
1574 Arc::new(merged_schema),
1575 vec![
1576 Arc::new(a_array) as ArrayRef,
1577 Arc::new(StructArray::from(vec![
1578 (
1579 Arc::new(Field::new("c", DataType::Int32, true)),
1580 Arc::new(c_array) as ArrayRef,
1581 ),
1582 (
1583 Arc::new(Field::new("d", DataType::Utf8, true)),
1584 Arc::new(d_array) as ArrayRef,
1585 ),
1586 ])) as ArrayRef,
1587 Arc::new(e_array) as ArrayRef,
1588 ],
1589 )
1590 .unwrap();
1591
1592 let result = left_batch.merge(&right_batch).unwrap();
1593 assert_eq!(result, merged_batch);
1594 }
1595
1596 #[test]
1597 fn test_merge_with_schema() {
1598 fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1599 let fields: Fields = names
1600 .iter()
1601 .zip(types)
1602 .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1603 .collect();
1604 let schema = Schema::new(vec![Field::new(
1605 "struct",
1606 DataType::Struct(fields.clone()),
1607 false,
1608 )]);
1609 let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1610 let batch = RecordBatch::try_new(
1611 Arc::new(schema.clone()),
1612 vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1613 );
1614 (schema, batch.unwrap())
1615 }
1616
1617 let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1618 let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1619 let (output_schema, _) = test_batch(
1620 &["b", "a", "c"],
1621 &[DataType::Int64, DataType::Int32, DataType::Int32],
1622 );
1623
1624 let merged = left_batch
1626 .merge_with_schema(&right_batch, &output_schema)
1627 .unwrap();
1628 assert_eq!(merged.schema().as_ref(), &output_schema);
1629
1630 let (naive_schema, _) = test_batch(
1632 &["a", "b", "c"],
1633 &[DataType::Int32, DataType::Int64, DataType::Int32],
1634 );
1635 let merged = left_batch.merge(&right_batch).unwrap();
1636 assert_eq!(merged.schema().as_ref(), &naive_schema);
1637 }
1638
1639 #[test]
1640 fn test_merge_list_struct() {
1641 let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1642 let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1643 let x_struct_field = Arc::new(Field::new(
1644 "item",
1645 DataType::Struct(Fields::from(vec![x_field.clone()])),
1646 true,
1647 ));
1648 let y_struct_field = Arc::new(Field::new(
1649 "item",
1650 DataType::Struct(Fields::from(vec![y_field.clone()])),
1651 true,
1652 ));
1653 let both_struct_field = Arc::new(Field::new(
1654 "item",
1655 DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1656 true,
1657 ));
1658 let left_schema = Schema::new(vec![Field::new(
1659 "list_struct",
1660 DataType::List(x_struct_field.clone()),
1661 true,
1662 )]);
1663 let right_schema = Schema::new(vec![Field::new(
1664 "list_struct",
1665 DataType::List(y_struct_field.clone()),
1666 true,
1667 )]);
1668 let both_schema = Schema::new(vec![Field::new(
1669 "list_struct",
1670 DataType::List(both_struct_field.clone()),
1671 true,
1672 )]);
1673
1674 let x = Arc::new(Int32Array::from(vec![1]));
1675 let y = Arc::new(Int32Array::from(vec![2]));
1676 let x_struct = Arc::new(StructArray::new(
1677 Fields::from(vec![x_field.clone()]),
1678 vec![x.clone()],
1679 None,
1680 ));
1681 let y_struct = Arc::new(StructArray::new(
1682 Fields::from(vec![y_field.clone()]),
1683 vec![y.clone()],
1684 None,
1685 ));
1686 let both_struct = Arc::new(StructArray::new(
1687 Fields::from(vec![x_field.clone(), y_field.clone()]),
1688 vec![x.clone(), y],
1689 None,
1690 ));
1691 let both_null_struct = Arc::new(StructArray::new(
1692 Fields::from(vec![x_field, y_field]),
1693 vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1694 None,
1695 ));
1696 let offsets = OffsetBuffer::from_lengths([1]);
1697 let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1698 let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1699 let both_list = ListArray::new(
1700 both_struct_field.clone(),
1701 offsets.clone(),
1702 both_struct,
1703 None,
1704 );
1705 let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1706 let x_batch =
1707 RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1708 let y_batch = RecordBatch::try_new(
1709 Arc::new(right_schema.clone()),
1710 vec![Arc::new(y_s_list.clone())],
1711 )
1712 .unwrap();
1713 let merged = x_batch.merge(&y_batch).unwrap();
1714 let expected =
1715 RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1716 assert_eq!(merged, expected);
1717
1718 let y_null_list = new_null_array(y_s_list.data_type(), 1);
1719 let y_null_batch =
1720 RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1721 .unwrap();
1722 let expected =
1723 RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1724 let merged = x_batch.merge(&y_null_batch).unwrap();
1725 assert_eq!(merged, expected);
1726 }
1727
1728 #[test]
1729 fn test_byte_width_opt() {
1730 assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1731 assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1732 assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1733 assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1734 assert_eq!(DataType::Utf8.byte_width_opt(), None);
1735 assert_eq!(DataType::Binary.byte_width_opt(), None);
1736 assert_eq!(
1737 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1738 None
1739 );
1740 assert_eq!(
1741 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1742 .byte_width_opt(),
1743 Some(12)
1744 );
1745 assert_eq!(
1746 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1747 .byte_width_opt(),
1748 Some(16)
1749 );
1750 assert_eq!(
1751 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1752 .byte_width_opt(),
1753 None
1754 );
1755 }
1756
1757 #[test]
1758 fn test_take_record_batch() {
1759 let schema = Arc::new(Schema::new(vec![
1760 Field::new("a", DataType::Int32, true),
1761 Field::new("b", DataType::Utf8, true),
1762 ]));
1763 let batch = RecordBatch::try_new(
1764 schema.clone(),
1765 vec![
1766 Arc::new(Int32Array::from_iter_values(0..20)),
1767 Arc::new(StringArray::from_iter_values(
1768 (0..20).map(|i| format!("str-{}", i)),
1769 )),
1770 ],
1771 )
1772 .unwrap();
1773 let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1774 assert_eq!(
1775 taken,
1776 RecordBatch::try_new(
1777 schema,
1778 vec![
1779 Arc::new(Int32Array::from(vec![1, 5, 10])),
1780 Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1781 ],
1782 )
1783 .unwrap()
1784 )
1785 }
1786
1787 #[test]
1788 fn test_schema_project_by_schema() {
1789 let metadata = [("key".to_string(), "value".to_string())];
1790 let schema = Arc::new(
1791 Schema::new(vec![
1792 Field::new("a", DataType::Int32, true),
1793 Field::new("b", DataType::Utf8, true),
1794 ])
1795 .with_metadata(metadata.clone().into()),
1796 );
1797 let batch = RecordBatch::try_new(
1798 schema,
1799 vec![
1800 Arc::new(Int32Array::from_iter_values(0..20)),
1801 Arc::new(StringArray::from_iter_values(
1802 (0..20).map(|i| format!("str-{}", i)),
1803 )),
1804 ],
1805 )
1806 .unwrap();
1807
1808 let empty_schema = Schema::empty();
1810 let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1811 let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1812 assert_eq!(
1813 empty_projected,
1814 RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1815 .with_schema(Arc::new(expected_schema))
1816 .unwrap()
1817 );
1818
1819 let reordered_schema = Schema::new(vec![
1821 Field::new("b", DataType::Utf8, true),
1822 Field::new("a", DataType::Int32, true),
1823 ]);
1824 let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1825 let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1826 assert_eq!(
1827 reordered_projected,
1828 RecordBatch::try_new(
1829 expected_schema,
1830 vec![
1831 Arc::new(StringArray::from_iter_values(
1832 (0..20).map(|i| format!("str-{}", i)),
1833 )),
1834 Arc::new(Int32Array::from_iter_values(0..20)),
1835 ],
1836 )
1837 .unwrap()
1838 );
1839
1840 let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1842 let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1843 let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1844 assert_eq!(
1845 sub_projected,
1846 RecordBatch::try_new(
1847 expected_schema,
1848 vec![Arc::new(Int32Array::from_iter_values(0..20))],
1849 )
1850 .unwrap()
1851 );
1852 }
1853
1854 #[test]
1855 fn test_project_preserves_struct_validity() {
1856 let fields = Fields::from(vec![
1858 Field::new("id", DataType::Int32, false),
1859 Field::new("value", DataType::Float32, true),
1860 ]);
1861
1862 let id_array = Int32Array::from(vec![1, 2, 3]);
1864 let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1865 let struct_array = StructArray::new(
1866 fields.clone(),
1867 vec![
1868 Arc::new(id_array) as ArrayRef,
1869 Arc::new(value_array) as ArrayRef,
1870 ],
1871 Some(vec![true, false, true].into()), );
1873
1874 let projected = project(&struct_array, &fields).unwrap();
1876
1877 assert_eq!(projected.null_count(), 1);
1879 assert!(!projected.is_null(0));
1880 assert!(projected.is_null(1));
1881 assert!(!projected.is_null(2));
1882 }
1883
1884 #[test]
1885 fn test_merge_struct_with_different_validity() {
1886 let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1889 let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1890 let left_struct = StructArray::new(
1891 left_fields,
1892 vec![Arc::new(height_array) as ArrayRef],
1893 Some(vec![true, false, true, false].into()), );
1895
1896 let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1898 let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1899 let right_struct = StructArray::new(
1900 right_fields,
1901 vec![Arc::new(width_array) as ArrayRef],
1902 Some(vec![true, true, false, false].into()), );
1904
1905 let merged = merge(&left_struct, &right_struct);
1907
1908 assert_eq!(merged.null_count(), 1); assert!(!merged.is_null(0));
1916 assert!(!merged.is_null(1));
1917 assert!(!merged.is_null(2));
1918 assert!(merged.is_null(3));
1919
1920 let height_col = merged.column_by_name("height").unwrap();
1922 let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1923 assert_eq!(height_values.value(0), 500);
1924 assert!(height_values.is_null(1)); assert_eq!(height_values.value(2), 600);
1926
1927 let width_col = merged.column_by_name("width").unwrap();
1928 let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1929 assert_eq!(width_values.value(0), 300);
1930 assert_eq!(width_values.value(1), 200);
1931 assert!(width_values.is_null(2)); }
1933
1934 #[test]
1935 fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
1936 let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
1938 let left_count = Arc::new(Int32Array::from(vec![None, None]));
1939 let left_struct = Arc::new(StructArray::new(
1940 Fields::from(vec![
1941 Field::new("company_id", DataType::Int32, true),
1942 Field::new("count", DataType::Int32, true),
1943 ]),
1944 vec![left_company_id, left_count],
1945 None,
1946 ));
1947 let left_list = Arc::new(ListArray::new(
1948 Arc::new(Field::new(
1949 "item",
1950 DataType::Struct(left_struct.fields().clone()),
1951 true,
1952 )),
1953 OffsetBuffer::from_lengths([2]),
1954 left_struct,
1955 None,
1956 ));
1957
1958 let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
1960 let right_struct = Arc::new(StructArray::new(
1961 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
1962 vec![right_company_name],
1963 None,
1964 ));
1965 let right_list = Arc::new(ListArray::new(
1966 Arc::new(Field::new(
1967 "item",
1968 DataType::Struct(right_struct.fields().clone()),
1969 true,
1970 )),
1971 OffsetBuffer::from_lengths([2]),
1972 right_struct,
1973 None,
1974 ));
1975
1976 let target_fields = Fields::from(vec![Field::new(
1977 "companies",
1978 DataType::List(Arc::new(Field::new(
1979 "item",
1980 DataType::Struct(Fields::from(vec![
1981 Field::new("company_id", DataType::Int32, true),
1982 Field::new("company_name", DataType::Utf8, true),
1983 Field::new("count", DataType::Int32, true),
1984 ])),
1985 true,
1986 ))),
1987 true,
1988 )]);
1989
1990 let left_batch = RecordBatch::try_new(
1991 Arc::new(Schema::new(vec![Field::new(
1992 "companies",
1993 left_list.data_type().clone(),
1994 true,
1995 )])),
1996 vec![left_list as ArrayRef],
1997 )
1998 .unwrap();
1999
2000 let right_batch = RecordBatch::try_new(
2001 Arc::new(Schema::new(vec![Field::new(
2002 "companies",
2003 right_list.data_type().clone(),
2004 true,
2005 )])),
2006 vec![right_list as ArrayRef],
2007 )
2008 .unwrap();
2009
2010 let merged = left_batch
2011 .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2012 .unwrap();
2013
2014 let merged_list = merged
2016 .column_by_name("companies")
2017 .unwrap()
2018 .as_any()
2019 .downcast_ref::<ListArray>()
2020 .unwrap();
2021 let merged_struct = merged_list.values().as_struct();
2022
2023 assert_eq!(merged_struct.num_columns(), 3);
2025 assert!(merged_struct.column_by_name("company_id").is_some());
2026 assert!(merged_struct.column_by_name("company_name").is_some());
2027 assert!(merged_struct.column_by_name("count").is_some());
2028
2029 let company_id = merged_struct
2031 .column_by_name("company_id")
2032 .unwrap()
2033 .as_any()
2034 .downcast_ref::<Int32Array>()
2035 .unwrap();
2036 assert!(company_id.is_null(0));
2037 assert!(company_id.is_null(1));
2038
2039 let company_name = merged_struct
2040 .column_by_name("company_name")
2041 .unwrap()
2042 .as_any()
2043 .downcast_ref::<StringArray>()
2044 .unwrap();
2045 assert_eq!(company_name.value(0), "Google");
2046 assert_eq!(company_name.value(1), "Microsoft");
2047
2048 let count = merged_struct
2049 .column_by_name("count")
2050 .unwrap()
2051 .as_any()
2052 .downcast_ref::<Int32Array>()
2053 .unwrap();
2054 assert!(count.is_null(0));
2055 assert!(count.is_null(1));
2056 }
2057
2058 #[test]
2059 fn test_merge_struct_lists() {
2060 test_merge_struct_lists_generic::<i32>();
2061 }
2062
2063 #[test]
2064 fn test_merge_struct_large_lists() {
2065 test_merge_struct_lists_generic::<i64>();
2066 }
2067
2068 fn test_merge_struct_lists_generic<O: OffsetSizeTrait>() {
2069 let left_company_id = Arc::new(Int32Array::from(vec![
2071 Some(1),
2072 Some(2),
2073 Some(3),
2074 Some(4),
2075 Some(5),
2076 Some(6),
2077 Some(7),
2078 Some(8),
2079 Some(9),
2080 Some(10),
2081 Some(11),
2082 Some(12),
2083 Some(13),
2084 Some(14),
2085 Some(15),
2086 Some(16),
2087 Some(17),
2088 Some(18),
2089 Some(19),
2090 Some(20),
2091 ]));
2092 let left_count = Arc::new(Int32Array::from(vec![
2093 Some(10),
2094 Some(20),
2095 Some(30),
2096 Some(40),
2097 Some(50),
2098 Some(60),
2099 Some(70),
2100 Some(80),
2101 Some(90),
2102 Some(100),
2103 Some(110),
2104 Some(120),
2105 Some(130),
2106 Some(140),
2107 Some(150),
2108 Some(160),
2109 Some(170),
2110 Some(180),
2111 Some(190),
2112 Some(200),
2113 ]));
2114 let left_struct = Arc::new(StructArray::new(
2115 Fields::from(vec![
2116 Field::new("company_id", DataType::Int32, true),
2117 Field::new("count", DataType::Int32, true),
2118 ]),
2119 vec![left_company_id, left_count],
2120 None,
2121 ));
2122
2123 let left_list = Arc::new(GenericListArray::<O>::new(
2124 Arc::new(Field::new(
2125 "item",
2126 DataType::Struct(left_struct.fields().clone()),
2127 true,
2128 )),
2129 OffsetBuffer::from_lengths([3, 1]),
2130 left_struct.clone(),
2131 None,
2132 ));
2133
2134 let left_list_struct = Arc::new(StructArray::new(
2135 Fields::from(vec![Field::new(
2136 "companies",
2137 if O::IS_LARGE {
2138 DataType::LargeList(Arc::new(Field::new(
2139 "item",
2140 DataType::Struct(left_struct.fields().clone()),
2141 true,
2142 )))
2143 } else {
2144 DataType::List(Arc::new(Field::new(
2145 "item",
2146 DataType::Struct(left_struct.fields().clone()),
2147 true,
2148 )))
2149 },
2150 true,
2151 )]),
2152 vec![left_list as ArrayRef],
2153 None,
2154 ));
2155
2156 let right_company_name = Arc::new(StringArray::from(vec![
2158 "Google",
2159 "Microsoft",
2160 "Apple",
2161 "Facebook",
2162 ]));
2163 let right_struct = Arc::new(StructArray::new(
2164 Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2165 vec![right_company_name],
2166 None,
2167 ));
2168 let right_list = Arc::new(GenericListArray::<O>::new(
2169 Arc::new(Field::new(
2170 "item",
2171 DataType::Struct(right_struct.fields().clone()),
2172 true,
2173 )),
2174 OffsetBuffer::from_lengths([3, 1]),
2175 right_struct.clone(),
2176 None,
2177 ));
2178
2179 let right_list_struct = Arc::new(StructArray::new(
2180 Fields::from(vec![Field::new(
2181 "companies",
2182 if O::IS_LARGE {
2183 DataType::LargeList(Arc::new(Field::new(
2184 "item",
2185 DataType::Struct(right_struct.fields().clone()),
2186 true,
2187 )))
2188 } else {
2189 DataType::List(Arc::new(Field::new(
2190 "item",
2191 DataType::Struct(right_struct.fields().clone()),
2192 true,
2193 )))
2194 },
2195 true,
2196 )]),
2197 vec![right_list as ArrayRef],
2198 None,
2199 ));
2200
2201 let target_fields = Fields::from(vec![Field::new(
2203 "companies",
2204 if O::IS_LARGE {
2205 DataType::LargeList(Arc::new(Field::new(
2206 "item",
2207 DataType::Struct(Fields::from(vec![
2208 Field::new("company_id", DataType::Int32, true),
2209 Field::new("company_name", DataType::Utf8, true),
2210 Field::new("count", DataType::Int32, true),
2211 ])),
2212 true,
2213 )))
2214 } else {
2215 DataType::List(Arc::new(Field::new(
2216 "item",
2217 DataType::Struct(Fields::from(vec![
2218 Field::new("company_id", DataType::Int32, true),
2219 Field::new("company_name", DataType::Utf8, true),
2220 Field::new("count", DataType::Int32, true),
2221 ])),
2222 true,
2223 )))
2224 },
2225 true,
2226 )]);
2227
2228 let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
2230 assert_eq!(merged_array.len(), 2);
2231 }
2232}