1use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12 cast::AsArray, Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray,
13 GenericListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray, UInt32Array,
14 UInt8Array,
15};
16use arrow_array::{
17 new_null_array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30pub use floats::*;
31pub mod cast;
32pub mod list;
33pub mod memory;
34
35type Result<T> = std::result::Result<T, ArrowError>;
36
37pub trait DataTypeExt {
38 fn is_binary_like(&self) -> bool;
51
52 fn is_struct(&self) -> bool;
54
55 fn is_fixed_stride(&self) -> bool;
60
61 fn is_dictionary(&self) -> bool;
63
64 fn byte_width(&self) -> usize;
67
68 fn byte_width_opt(&self) -> Option<usize>;
71}
72
73impl DataTypeExt for DataType {
74 fn is_binary_like(&self) -> bool {
75 use DataType::*;
76 matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
77 }
78
79 fn is_struct(&self) -> bool {
80 matches!(self, Self::Struct(_))
81 }
82
83 fn is_fixed_stride(&self) -> bool {
84 use DataType::*;
85 matches!(
86 self,
87 Boolean
88 | UInt8
89 | UInt16
90 | UInt32
91 | UInt64
92 | Int8
93 | Int16
94 | Int32
95 | Int64
96 | Float16
97 | Float32
98 | Float64
99 | Decimal128(_, _)
100 | Decimal256(_, _)
101 | FixedSizeList(_, _)
102 | FixedSizeBinary(_)
103 | Duration(_)
104 | Timestamp(_, _)
105 | Date32
106 | Date64
107 | Time32(_)
108 | Time64(_)
109 )
110 }
111
112 fn is_dictionary(&self) -> bool {
113 matches!(self, Self::Dictionary(_, _))
114 }
115
116 fn byte_width_opt(&self) -> Option<usize> {
117 match self {
118 Self::Int8 => Some(1),
119 Self::Int16 => Some(2),
120 Self::Int32 => Some(4),
121 Self::Int64 => Some(8),
122 Self::UInt8 => Some(1),
123 Self::UInt16 => Some(2),
124 Self::UInt32 => Some(4),
125 Self::UInt64 => Some(8),
126 Self::Float16 => Some(2),
127 Self::Float32 => Some(4),
128 Self::Float64 => Some(8),
129 Self::Date32 => Some(4),
130 Self::Date64 => Some(8),
131 Self::Time32(_) => Some(4),
132 Self::Time64(_) => Some(8),
133 Self::Timestamp(_, _) => Some(8),
134 Self::Duration(_) => Some(8),
135 Self::Decimal128(_, _) => Some(16),
136 Self::Decimal256(_, _) => Some(32),
137 Self::Interval(unit) => match unit {
138 IntervalUnit::YearMonth => Some(4),
139 IntervalUnit::DayTime => Some(8),
140 IntervalUnit::MonthDayNano => Some(16),
141 },
142 Self::FixedSizeBinary(s) => Some(*s as usize),
143 Self::FixedSizeList(dt, s) => dt
144 .data_type()
145 .byte_width_opt()
146 .map(|width| width * *s as usize),
147 _ => None,
148 }
149 }
150
151 fn byte_width(&self) -> usize {
152 self.byte_width_opt()
153 .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
154 }
155}
156
157pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
175 values: T,
176 offsets: &PrimitiveArray<Offset>,
177) -> Result<GenericListArray<Offset::Native>>
178where
179 Offset::Native: OffsetSizeTrait,
180{
181 let data_type = if Offset::Native::IS_LARGE {
182 DataType::LargeList(Arc::new(Field::new(
183 "item",
184 values.data_type().clone(),
185 true,
186 )))
187 } else {
188 DataType::List(Arc::new(Field::new(
189 "item",
190 values.data_type().clone(),
191 true,
192 )))
193 };
194 let data = ArrayDataBuilder::new(data_type)
195 .len(offsets.len() - 1)
196 .add_buffer(offsets.into_data().buffers()[0].clone())
197 .add_child_data(values.into_data())
198 .build()?;
199
200 Ok(GenericListArray::from(data))
201}
202
203pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
204 DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
205}
206
207pub trait FixedSizeListArrayExt {
208 fn try_new_from_values<T: Array + 'static>(
227 values: T,
228 list_size: i32,
229 ) -> Result<FixedSizeListArray>;
230
231 fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
245
246 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
249}
250
251impl FixedSizeListArrayExt for FixedSizeListArray {
252 fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
253 let field = Arc::new(Field::new("item", values.data_type().clone(), true));
254 let values = Arc::new(values);
255
256 Self::try_new(field, list_size, values, None)
257 }
258
259 fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
260 if n >= self.len() {
261 return Ok(self.clone());
262 }
263 let mut rng = SmallRng::from_entropy();
264 let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
265 take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
266 }
267
268 fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
269 match self.data_type() {
270 DataType::FixedSizeList(field, size) => match field.data_type() {
271 DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
272 DataType::Int8 => Ok(Self::new(
273 Arc::new(arrow_schema::Field::new(
274 field.name(),
275 DataType::Float32,
276 field.is_nullable(),
277 )),
278 *size,
279 Arc::new(Float32Array::from_iter_values(
280 self.values()
281 .as_any()
282 .downcast_ref::<Int8Array>()
283 .ok_or(ArrowError::ParseError(
284 "Fail to cast primitive array to Int8Type".to_string(),
285 ))?
286 .into_iter()
287 .filter_map(|x| x.map(|y| y as f32)),
288 )),
289 self.nulls().cloned(),
290 )),
291 DataType::Int16 => Ok(Self::new(
292 Arc::new(arrow_schema::Field::new(
293 field.name(),
294 DataType::Float32,
295 field.is_nullable(),
296 )),
297 *size,
298 Arc::new(Float32Array::from_iter_values(
299 self.values()
300 .as_any()
301 .downcast_ref::<Int16Array>()
302 .ok_or(ArrowError::ParseError(
303 "Fail to cast primitive array to Int8Type".to_string(),
304 ))?
305 .into_iter()
306 .filter_map(|x| x.map(|y| y as f32)),
307 )),
308 self.nulls().cloned(),
309 )),
310 DataType::Int32 => Ok(Self::new(
311 Arc::new(arrow_schema::Field::new(
312 field.name(),
313 DataType::Float32,
314 field.is_nullable(),
315 )),
316 *size,
317 Arc::new(Float32Array::from_iter_values(
318 self.values()
319 .as_any()
320 .downcast_ref::<Int32Array>()
321 .ok_or(ArrowError::ParseError(
322 "Fail to cast primitive array to Int8Type".to_string(),
323 ))?
324 .into_iter()
325 .filter_map(|x| x.map(|y| y as f32)),
326 )),
327 self.nulls().cloned(),
328 )),
329 DataType::Int64 => Ok(Self::new(
330 Arc::new(arrow_schema::Field::new(
331 field.name(),
332 DataType::Float64,
333 field.is_nullable(),
334 )),
335 *size,
336 Arc::new(Float64Array::from_iter_values(
337 self.values()
338 .as_any()
339 .downcast_ref::<Int64Array>()
340 .ok_or(ArrowError::ParseError(
341 "Fail to cast primitive array to Int8Type".to_string(),
342 ))?
343 .into_iter()
344 .filter_map(|x| x.map(|y| y as f64)),
345 )),
346 self.nulls().cloned(),
347 )),
348 DataType::UInt8 => Ok(Self::new(
349 Arc::new(arrow_schema::Field::new(
350 field.name(),
351 DataType::Float64,
352 field.is_nullable(),
353 )),
354 *size,
355 Arc::new(Float64Array::from_iter_values(
356 self.values()
357 .as_any()
358 .downcast_ref::<UInt8Array>()
359 .ok_or(ArrowError::ParseError(
360 "Fail to cast primitive array to Int8Type".to_string(),
361 ))?
362 .into_iter()
363 .filter_map(|x| x.map(|y| y as f64)),
364 )),
365 self.nulls().cloned(),
366 )),
367 DataType::UInt32 => Ok(Self::new(
368 Arc::new(arrow_schema::Field::new(
369 field.name(),
370 DataType::Float64,
371 field.is_nullable(),
372 )),
373 *size,
374 Arc::new(Float64Array::from_iter_values(
375 self.values()
376 .as_any()
377 .downcast_ref::<UInt32Array>()
378 .ok_or(ArrowError::ParseError(
379 "Fail to cast primitive array to Int8Type".to_string(),
380 ))?
381 .into_iter()
382 .filter_map(|x| x.map(|y| y as f64)),
383 )),
384 self.nulls().cloned(),
385 )),
386 data_type => Err(ArrowError::ParseError(format!(
387 "Expect either floating type or integer got {:?}",
388 data_type
389 ))),
390 },
391 data_type => Err(ArrowError::ParseError(format!(
392 "Expect either FixedSizeList got {:?}",
393 data_type
394 ))),
395 }
396 }
397}
398
399pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
402 arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
403}
404
405pub trait FixedSizeBinaryArrayExt {
406 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
425}
426
427impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
428 fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
429 let data_type = DataType::FixedSizeBinary(stride);
430 let data = ArrayDataBuilder::new(data_type)
431 .len(values.len() / stride as usize)
432 .add_buffer(values.into_data().buffers()[0].clone())
433 .build()?;
434 Ok(Self::from(data))
435 }
436}
437
438pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
439 arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
440}
441
442pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
443 match arr.data_type() {
444 DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
445 DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
446 _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
447 }
448}
449
450pub trait RecordBatchExt {
452 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
482
483 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
485
486 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
490
491 fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
536
537 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
547
548 fn drop_column(&self, name: &str) -> Result<RecordBatch>;
552
553 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
555
556 fn replace_column_schema_by_name(
558 &self,
559 name: &str,
560 new_data_type: DataType,
561 column: Arc<dyn Array>,
562 ) -> Result<RecordBatch>;
563
564 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
566
567 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
569
570 fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
572
573 fn metadata(&self) -> &HashMap<String, String>;
575
576 fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
578 let mut metadata = self.metadata().clone();
579 metadata.insert(key, value);
580 self.with_metadata(metadata)
581 }
582
583 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
585
586 fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
588}
589
590impl RecordBatchExt for RecordBatch {
591 fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
592 let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
593 let mut new_columns = self.columns().to_vec();
594 new_columns.push(arr);
595 Self::try_new(new_schema, new_columns)
596 }
597
598 fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
599 let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
600 let mut new_columns = self.columns().to_vec();
601 new_columns.insert(index, arr);
602 Self::try_new(new_schema, new_columns)
603 }
604
605 fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
606 let schema = Arc::new(Schema::new_with_metadata(
607 arr.fields().to_vec(),
608 self.schema().metadata.clone(),
609 ));
610 let batch = Self::from(arr);
611 batch.with_schema(schema)
612 }
613
614 fn merge(&self, other: &Self) -> Result<Self> {
615 if self.num_rows() != other.num_rows() {
616 return Err(ArrowError::InvalidArgumentError(format!(
617 "Attempt to merge two RecordBatch with different sizes: {} != {}",
618 self.num_rows(),
619 other.num_rows()
620 )));
621 }
622 let left_struct_array: StructArray = self.clone().into();
623 let right_struct_array: StructArray = other.clone().into();
624 self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
625 }
626
627 fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
628 if self.num_rows() != other.num_rows() {
629 return Err(ArrowError::InvalidArgumentError(format!(
630 "Attempt to merge two RecordBatch with different sizes: {} != {}",
631 self.num_rows(),
632 other.num_rows()
633 )));
634 }
635 let left_struct_array: StructArray = self.clone().into();
636 let right_struct_array: StructArray = other.clone().into();
637 self.try_new_from_struct_array(merge_with_schema(
638 &left_struct_array,
639 &right_struct_array,
640 schema.fields(),
641 ))
642 }
643
644 fn drop_column(&self, name: &str) -> Result<Self> {
645 let mut fields = vec![];
646 let mut columns = vec![];
647 for i in 0..self.schema().fields.len() {
648 if self.schema().field(i).name() != name {
649 fields.push(self.schema().field(i).clone());
650 columns.push(self.column(i).clone());
651 }
652 }
653 Self::try_new(
654 Arc::new(Schema::new_with_metadata(
655 fields,
656 self.schema().metadata().clone(),
657 )),
658 columns,
659 )
660 }
661
662 fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
663 let mut fields = self.schema().fields().to_vec();
664 if index >= fields.len() {
665 return Err(ArrowError::InvalidArgumentError(format!(
666 "Index out of bounds: {}",
667 index
668 )));
669 }
670 fields[index] = Arc::new(Field::new(
671 new_name,
672 fields[index].data_type().clone(),
673 fields[index].is_nullable(),
674 ));
675 Self::try_new(
676 Arc::new(Schema::new_with_metadata(
677 fields,
678 self.schema().metadata().clone(),
679 )),
680 self.columns().to_vec(),
681 )
682 }
683
684 fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
685 let mut columns = self.columns().to_vec();
686 let field_i = self
687 .schema()
688 .fields()
689 .iter()
690 .position(|f| f.name() == name)
691 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
692 columns[field_i] = column;
693 Self::try_new(self.schema(), columns)
694 }
695
696 fn replace_column_schema_by_name(
697 &self,
698 name: &str,
699 new_data_type: DataType,
700 column: Arc<dyn Array>,
701 ) -> Result<RecordBatch> {
702 let fields = self
703 .schema()
704 .fields()
705 .iter()
706 .map(|x| {
707 if x.name() != name {
708 x.clone()
709 } else {
710 let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
711 Arc::new(new_field)
712 }
713 })
714 .collect::<Vec<_>>();
715 let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
716 let mut columns = self.columns().to_vec();
717 let field_i = self
718 .schema()
719 .fields()
720 .iter()
721 .position(|f| f.name() == name)
722 .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
723 columns[field_i] = column;
724 Self::try_new(Arc::new(schema), columns)
725 }
726
727 fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
728 let split = name.split('.').collect::<Vec<_>>();
729 if split.is_empty() {
730 return None;
731 }
732
733 self.column_by_name(split[0])
734 .and_then(|arr| get_sub_array(arr, &split[1..]))
735 }
736
737 fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
738 let struct_array: StructArray = self.clone().into();
739 self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
740 }
741
742 fn metadata(&self) -> &HashMap<String, String> {
743 self.schema_ref().metadata()
744 }
745
746 fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
747 let mut schema = self.schema_ref().as_ref().clone();
748 schema.metadata = metadata;
749 Self::try_new(schema.into(), self.columns().into())
750 }
751
752 fn take(&self, indices: &UInt32Array) -> Result<Self> {
753 let struct_array: StructArray = self.clone().into();
754 let taken = take(&struct_array, indices, None)?;
755 self.try_new_from_struct_array(taken.as_struct().clone())
756 }
757}
758
759fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
760 if fields.is_empty() {
761 return Ok(StructArray::new_empty_fields(
762 struct_array.len(),
763 struct_array.nulls().cloned(),
764 ));
765 }
766 let mut columns: Vec<ArrayRef> = vec![];
767 for field in fields.iter() {
768 if let Some(col) = struct_array.column_by_name(field.name()) {
769 match field.data_type() {
770 DataType::Struct(subfields) => {
772 let projected = project(col.as_struct(), subfields)?;
773 columns.push(Arc::new(projected));
774 }
775 _ => {
776 columns.push(col.clone());
777 }
778 }
779 } else {
780 return Err(ArrowError::SchemaError(format!(
781 "field {} does not exist in the RecordBatch",
782 field.name()
783 )));
784 }
785 }
786 StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
788}
789
790fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
791 let left_list: &GenericListArray<T> = left.as_list();
792 let right_list: &GenericListArray<T> = right.as_list();
793 left_list.offsets().inner() == right_list.offsets().inner()
794}
795
796fn merge_list_structs_helper<T: OffsetSizeTrait>(
797 left: &dyn Array,
798 right: &dyn Array,
799 items_field_name: impl Into<String>,
800 items_nullable: bool,
801) -> Arc<dyn Array> {
802 let left_list: &GenericListArray<T> = left.as_list();
803 let right_list: &GenericListArray<T> = right.as_list();
804 let left_struct = left_list.values();
805 let right_struct = right_list.values();
806 let left_struct_arr = left_struct.as_struct();
807 let right_struct_arr = right_struct.as_struct();
808 let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
809 let items_field = Arc::new(Field::new(
810 items_field_name,
811 merged_items.data_type().clone(),
812 items_nullable,
813 ));
814 Arc::new(GenericListArray::<T>::new(
815 items_field,
816 left_list.offsets().clone(),
817 merged_items,
818 left_list.nulls().cloned(),
819 ))
820}
821
822fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
823 left: &dyn Array,
824 right: &dyn Array,
825 not_null: &dyn Array,
826 items_field_name: impl Into<String>,
827) -> Arc<dyn Array> {
828 let left_list: &GenericListArray<T> = left.as_list::<T>();
829 let not_null_list = not_null.as_list::<T>();
830 let right_list = right.as_list::<T>();
831
832 let left_struct = left_list.values().as_struct();
833 let not_null_struct: &StructArray = not_null_list.values().as_struct();
834 let right_struct = right_list.values().as_struct();
835
836 let values_len = not_null_list.values().len();
837 let mut merged_fields =
838 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
839 let mut merged_columns =
840 Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
841
842 for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
843 merged_fields.push(field.clone());
844 if let Some(val) = not_null_struct.column_by_name(field.name()) {
845 merged_columns.push(val.clone());
846 } else {
847 merged_columns.push(new_null_array(field.data_type(), values_len))
848 }
849 }
850 for (_, field) in right_struct
851 .columns()
852 .iter()
853 .zip(right_struct.fields())
854 .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
855 {
856 merged_fields.push(field.clone());
857 if let Some(val) = not_null_struct.column_by_name(field.name()) {
858 merged_columns.push(val.clone());
859 } else {
860 merged_columns.push(new_null_array(field.data_type(), values_len));
861 }
862 }
863
864 let merged_struct = Arc::new(StructArray::new(
865 Fields::from(merged_fields),
866 merged_columns,
867 not_null_struct.nulls().cloned(),
868 ));
869 let items_field = Arc::new(Field::new(
870 items_field_name,
871 merged_struct.data_type().clone(),
872 true,
873 ));
874 Arc::new(GenericListArray::<T>::new(
875 items_field,
876 not_null_list.offsets().clone(),
877 merged_struct,
878 not_null_list.nulls().cloned(),
879 ))
880}
881
882fn merge_list_struct_null(
883 left: &dyn Array,
884 right: &dyn Array,
885 not_null: &dyn Array,
886) -> Arc<dyn Array> {
887 match left.data_type() {
888 DataType::List(left_field) => {
889 merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
890 }
891 DataType::LargeList(left_field) => {
892 merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
893 }
894 _ => unreachable!(),
895 }
896}
897
898fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
899 if left.null_count() == left.len() {
903 return merge_list_struct_null(left, right, right);
904 } else if right.null_count() == right.len() {
905 return merge_list_struct_null(left, right, left);
906 }
907 match (left.data_type(), right.data_type()) {
908 (DataType::List(left_field), DataType::List(_)) => {
909 if !lists_have_same_offsets_helper::<i32>(left, right) {
910 panic!("Attempt to merge list struct arrays which do not have same offsets");
911 }
912 merge_list_structs_helper::<i32>(
913 left,
914 right,
915 left_field.name(),
916 left_field.is_nullable(),
917 )
918 }
919 (DataType::LargeList(left_field), DataType::LargeList(_)) => {
920 if !lists_have_same_offsets_helper::<i64>(left, right) {
921 panic!("Attempt to merge list struct arrays which do not have same offsets");
922 }
923 merge_list_structs_helper::<i64>(
924 left,
925 right,
926 left_field.name(),
927 left_field.is_nullable(),
928 )
929 }
930 _ => unreachable!(),
931 }
932}
933
934fn normalize_validity(
937 validity: Option<&arrow_buffer::NullBuffer>,
938) -> Option<&arrow_buffer::NullBuffer> {
939 validity.and_then(|v| {
940 if v.null_count() == v.len() {
941 None
942 } else {
943 Some(v)
944 }
945 })
946}
947
948fn merge_struct_validity(
953 left_validity: Option<&arrow_buffer::NullBuffer>,
954 right_validity: Option<&arrow_buffer::NullBuffer>,
955) -> Option<arrow_buffer::NullBuffer> {
956 let left_normalized = normalize_validity(left_validity);
958 let right_normalized = normalize_validity(right_validity);
959
960 match (left_normalized, right_normalized) {
961 (None, None) => None,
963 (Some(left), None) => Some(left.clone()),
964 (None, Some(right)) => Some(right.clone()),
965 (Some(left), Some(right)) => {
966 if left.null_count() == 0 && right.null_count() == 0 {
968 return Some(left.clone());
969 }
970
971 let left_buffer = left.inner();
972 let right_buffer = right.inner();
973
974 let merged_buffer = left_buffer | right_buffer;
977
978 Some(arrow_buffer::NullBuffer::from(merged_buffer))
979 }
980 }
981}
982
983fn adjust_child_validity(
987 child: &ArrayRef,
988 parent_validity: Option<&arrow_buffer::NullBuffer>,
989) -> ArrayRef {
990 let parent_validity = match parent_validity {
992 None => return child.clone(),
993 Some(p) if p.null_count() == 0 => return child.clone(), Some(p) => p,
995 };
996
997 let child_validity = child.nulls();
998
999 let new_validity = match child_validity {
1001 None => {
1002 parent_validity.clone()
1004 }
1005 Some(child_nulls) => {
1006 let child_buffer = child_nulls.inner();
1007 let parent_buffer = parent_validity.inner();
1008
1009 let merged_buffer = child_buffer & parent_buffer;
1012
1013 arrow_buffer::NullBuffer::from(merged_buffer)
1014 }
1015 };
1016
1017 arrow_array::make_array(
1019 arrow_data::ArrayData::try_new(
1020 child.data_type().clone(),
1021 child.len(),
1022 Some(new_validity.into_inner().into_inner()),
1023 child.offset(),
1024 child.to_data().buffers().to_vec(),
1025 child.to_data().child_data().to_vec(),
1026 )
1027 .unwrap(),
1028 )
1029}
1030
1031fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1032 let mut fields: Vec<Field> = vec![];
1033 let mut columns: Vec<ArrayRef> = vec![];
1034 let right_fields = right_struct_array.fields();
1035 let right_columns = right_struct_array.columns();
1036
1037 let left_validity = left_struct_array.nulls();
1039 let right_validity = right_struct_array.nulls();
1040
1041 let merged_validity = merge_struct_validity(left_validity, right_validity);
1043
1044 for (left_field, left_column) in left_struct_array
1046 .fields()
1047 .iter()
1048 .zip(left_struct_array.columns().iter())
1049 {
1050 match right_fields
1051 .iter()
1052 .position(|f| f.name() == left_field.name())
1053 {
1054 Some(right_index) => {
1056 let right_field = right_fields.get(right_index).unwrap();
1057 let right_column = right_columns.get(right_index).unwrap();
1058 match (left_field.data_type(), right_field.data_type()) {
1060 (DataType::Struct(_), DataType::Struct(_)) => {
1061 let left_sub_array = left_column.as_struct();
1062 let right_sub_array = right_column.as_struct();
1063 let merged_sub_array = merge(left_sub_array, right_sub_array);
1064 fields.push(Field::new(
1065 left_field.name(),
1066 merged_sub_array.data_type().clone(),
1067 left_field.is_nullable(),
1068 ));
1069 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1070 }
1071 (DataType::List(left_list), DataType::List(right_list))
1072 if left_list.data_type().is_struct()
1073 && right_list.data_type().is_struct() =>
1074 {
1075 if left_list.data_type() == right_list.data_type() {
1077 fields.push(left_field.as_ref().clone());
1078 columns.push(left_column.clone());
1079 }
1080 let merged_sub_array = merge_list_struct(&left_column, &right_column);
1084
1085 fields.push(Field::new(
1086 left_field.name(),
1087 merged_sub_array.data_type().clone(),
1088 left_field.is_nullable(),
1089 ));
1090 columns.push(merged_sub_array);
1091 }
1092 _ => {
1094 fields.push(left_field.as_ref().clone());
1096 let adjusted_column = adjust_child_validity(left_column, left_validity);
1098 columns.push(adjusted_column);
1099 }
1100 }
1101 }
1102 None => {
1103 fields.push(left_field.as_ref().clone());
1104 let adjusted_column = adjust_child_validity(left_column, left_validity);
1106 columns.push(adjusted_column);
1107 }
1108 }
1109 }
1110
1111 right_fields
1113 .iter()
1114 .zip(right_columns.iter())
1115 .for_each(|(field, column)| {
1116 if !left_struct_array
1118 .fields()
1119 .iter()
1120 .any(|f| f.name() == field.name())
1121 {
1122 fields.push(field.as_ref().clone());
1123 let adjusted_column = adjust_child_validity(column, right_validity);
1126 columns.push(adjusted_column);
1127 }
1128 });
1129
1130 StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1131}
1132
1133fn merge_with_schema(
1134 left_struct_array: &StructArray,
1135 right_struct_array: &StructArray,
1136 fields: &Fields,
1137) -> StructArray {
1138 fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1140 match (left, right) {
1141 (DataType::Struct(_), DataType::Struct(_)) => true,
1142 (DataType::Struct(_), _) => false,
1143 (_, DataType::Struct(_)) => false,
1144 _ => true,
1145 }
1146 }
1147
1148 let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1149 let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1150
1151 let left_fields = left_struct_array.fields();
1152 let left_columns = left_struct_array.columns();
1153 let right_fields = right_struct_array.fields();
1154 let right_columns = right_struct_array.columns();
1155
1156 let left_validity = left_struct_array.nulls();
1158 let right_validity = right_struct_array.nulls();
1159
1160 let merged_validity = merge_struct_validity(left_validity, right_validity);
1162
1163 for field in fields {
1164 let left_match_idx = left_fields.iter().position(|f| {
1165 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1166 });
1167 let right_match_idx = right_fields.iter().position(|f| {
1168 f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1169 });
1170
1171 match (left_match_idx, right_match_idx) {
1172 (None, Some(right_idx)) => {
1173 output_fields.push(right_fields[right_idx].as_ref().clone());
1174 let adjusted_column =
1176 adjust_child_validity(&right_columns[right_idx], right_validity);
1177 columns.push(adjusted_column);
1178 }
1179 (Some(left_idx), None) => {
1180 output_fields.push(left_fields[left_idx].as_ref().clone());
1181 let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1183 columns.push(adjusted_column);
1184 }
1185 (Some(left_idx), Some(right_idx)) => {
1186 if let DataType::Struct(child_fields) = field.data_type() {
1187 let left_sub_array = left_columns[left_idx].as_struct();
1188 let right_sub_array = right_columns[right_idx].as_struct();
1189 let merged_sub_array =
1190 merge_with_schema(left_sub_array, right_sub_array, child_fields);
1191 output_fields.push(Field::new(
1192 field.name(),
1193 merged_sub_array.data_type().clone(),
1194 field.is_nullable(),
1195 ));
1196 columns.push(Arc::new(merged_sub_array) as ArrayRef);
1197 } else {
1198 output_fields.push(left_fields[left_idx].as_ref().clone());
1199 let adjusted_column =
1201 adjust_child_validity(&left_columns[left_idx], left_validity);
1202 columns.push(adjusted_column);
1203 }
1204 }
1205 (None, None) => {
1206 }
1208 }
1209 }
1210
1211 StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1212}
1213
1214fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1215 if components.is_empty() {
1216 return Some(array);
1217 }
1218 if !matches!(array.data_type(), DataType::Struct(_)) {
1219 return None;
1220 }
1221 let struct_arr = array.as_struct();
1222 struct_arr
1223 .column_by_name(components[0])
1224 .and_then(|arr| get_sub_array(arr, &components[1..]))
1225}
1226
1227pub fn interleave_batches(
1231 batches: &[RecordBatch],
1232 indices: &[(usize, usize)],
1233) -> Result<RecordBatch> {
1234 let first_batch = batches.first().ok_or_else(|| {
1235 ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1236 })?;
1237 let schema = first_batch.schema();
1238 let num_columns = first_batch.num_columns();
1239 let mut columns = Vec::with_capacity(num_columns);
1240 let mut chunks = Vec::with_capacity(batches.len());
1241
1242 for i in 0..num_columns {
1243 for batch in batches {
1244 chunks.push(batch.column(i).as_ref());
1245 }
1246 let new_column = interleave(&chunks, indices)?;
1247 columns.push(new_column);
1248 chunks.clear();
1249 }
1250
1251 RecordBatch::try_new(schema, columns)
1252}
1253
1254pub trait BufferExt {
1255 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1270
1271 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1280}
1281
1282fn is_pwr_two(n: u64) -> bool {
1283 n & (n - 1) == 0
1284}
1285
1286impl BufferExt for arrow_buffer::Buffer {
1287 fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1288 if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1289 {
1290 let size_bytes = bytes.len();
1292 Self::copy_bytes_bytes(bytes, size_bytes)
1293 } else {
1294 unsafe {
1297 Self::from_custom_allocation(
1298 NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1299 bytes.len(),
1300 Arc::new(bytes),
1301 )
1302 }
1303 }
1304 }
1305
1306 fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1307 assert!(size_bytes >= bytes.len());
1308 let mut buf = MutableBuffer::with_capacity(size_bytes);
1309 let to_fill = size_bytes - bytes.len();
1310 buf.extend(bytes);
1311 buf.extend(std::iter::repeat_n(0_u8, to_fill));
1312 Self::from(buf)
1313 }
1314}
1315
1316#[cfg(test)]
1317mod tests {
1318 use super::*;
1319 use arrow_array::{new_empty_array, new_null_array, ListArray, StringArray};
1320 use arrow_array::{Float32Array, Int32Array, StructArray};
1321 use arrow_buffer::OffsetBuffer;
1322
1323 #[test]
1324 fn test_merge_recursive() {
1325 let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1326 let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1327 let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1328 let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1329
1330 let left_schema = Schema::new(vec![
1331 Field::new("a", DataType::Int32, true),
1332 Field::new(
1333 "b",
1334 DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1335 true,
1336 ),
1337 ]);
1338 let left_batch = RecordBatch::try_new(
1339 Arc::new(left_schema),
1340 vec![
1341 Arc::new(a_array.clone()),
1342 Arc::new(StructArray::from(vec![(
1343 Arc::new(Field::new("c", DataType::Int32, true)),
1344 Arc::new(c_array.clone()) as ArrayRef,
1345 )])),
1346 ],
1347 )
1348 .unwrap();
1349
1350 let right_schema = Schema::new(vec![
1351 Field::new("e", DataType::Int32, true),
1352 Field::new(
1353 "b",
1354 DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1355 true,
1356 ),
1357 ]);
1358 let right_batch = RecordBatch::try_new(
1359 Arc::new(right_schema),
1360 vec![
1361 Arc::new(e_array.clone()),
1362 Arc::new(StructArray::from(vec![(
1363 Arc::new(Field::new("d", DataType::Utf8, true)),
1364 Arc::new(d_array.clone()) as ArrayRef,
1365 )])) as ArrayRef,
1366 ],
1367 )
1368 .unwrap();
1369
1370 let merged_schema = Schema::new(vec![
1371 Field::new("a", DataType::Int32, true),
1372 Field::new(
1373 "b",
1374 DataType::Struct(
1375 vec![
1376 Field::new("c", DataType::Int32, true),
1377 Field::new("d", DataType::Utf8, true),
1378 ]
1379 .into(),
1380 ),
1381 true,
1382 ),
1383 Field::new("e", DataType::Int32, true),
1384 ]);
1385 let merged_batch = RecordBatch::try_new(
1386 Arc::new(merged_schema),
1387 vec![
1388 Arc::new(a_array) as ArrayRef,
1389 Arc::new(StructArray::from(vec![
1390 (
1391 Arc::new(Field::new("c", DataType::Int32, true)),
1392 Arc::new(c_array) as ArrayRef,
1393 ),
1394 (
1395 Arc::new(Field::new("d", DataType::Utf8, true)),
1396 Arc::new(d_array) as ArrayRef,
1397 ),
1398 ])) as ArrayRef,
1399 Arc::new(e_array) as ArrayRef,
1400 ],
1401 )
1402 .unwrap();
1403
1404 let result = left_batch.merge(&right_batch).unwrap();
1405 assert_eq!(result, merged_batch);
1406 }
1407
1408 #[test]
1409 fn test_merge_with_schema() {
1410 fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1411 let fields: Fields = names
1412 .iter()
1413 .zip(types)
1414 .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1415 .collect();
1416 let schema = Schema::new(vec![Field::new(
1417 "struct",
1418 DataType::Struct(fields.clone()),
1419 false,
1420 )]);
1421 let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1422 let batch = RecordBatch::try_new(
1423 Arc::new(schema.clone()),
1424 vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1425 );
1426 (schema, batch.unwrap())
1427 }
1428
1429 let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1430 let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1431 let (output_schema, _) = test_batch(
1432 &["b", "a", "c"],
1433 &[DataType::Int64, DataType::Int32, DataType::Int32],
1434 );
1435
1436 let merged = left_batch
1438 .merge_with_schema(&right_batch, &output_schema)
1439 .unwrap();
1440 assert_eq!(merged.schema().as_ref(), &output_schema);
1441
1442 let (naive_schema, _) = test_batch(
1444 &["a", "b", "c"],
1445 &[DataType::Int32, DataType::Int64, DataType::Int32],
1446 );
1447 let merged = left_batch.merge(&right_batch).unwrap();
1448 assert_eq!(merged.schema().as_ref(), &naive_schema);
1449 }
1450
1451 #[test]
1452 fn test_merge_list_struct() {
1453 let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1454 let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1455 let x_struct_field = Arc::new(Field::new(
1456 "item",
1457 DataType::Struct(Fields::from(vec![x_field.clone()])),
1458 true,
1459 ));
1460 let y_struct_field = Arc::new(Field::new(
1461 "item",
1462 DataType::Struct(Fields::from(vec![y_field.clone()])),
1463 true,
1464 ));
1465 let both_struct_field = Arc::new(Field::new(
1466 "item",
1467 DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1468 true,
1469 ));
1470 let left_schema = Schema::new(vec![Field::new(
1471 "list_struct",
1472 DataType::List(x_struct_field.clone()),
1473 true,
1474 )]);
1475 let right_schema = Schema::new(vec![Field::new(
1476 "list_struct",
1477 DataType::List(y_struct_field.clone()),
1478 true,
1479 )]);
1480 let both_schema = Schema::new(vec![Field::new(
1481 "list_struct",
1482 DataType::List(both_struct_field.clone()),
1483 true,
1484 )]);
1485
1486 let x = Arc::new(Int32Array::from(vec![1]));
1487 let y = Arc::new(Int32Array::from(vec![2]));
1488 let x_struct = Arc::new(StructArray::new(
1489 Fields::from(vec![x_field.clone()]),
1490 vec![x.clone()],
1491 None,
1492 ));
1493 let y_struct = Arc::new(StructArray::new(
1494 Fields::from(vec![y_field.clone()]),
1495 vec![y.clone()],
1496 None,
1497 ));
1498 let both_struct = Arc::new(StructArray::new(
1499 Fields::from(vec![x_field.clone(), y_field.clone()]),
1500 vec![x.clone(), y],
1501 None,
1502 ));
1503 let both_null_struct = Arc::new(StructArray::new(
1504 Fields::from(vec![x_field, y_field]),
1505 vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1506 None,
1507 ));
1508 let offsets = OffsetBuffer::from_lengths([1]);
1509 let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1510 let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1511 let both_list = ListArray::new(
1512 both_struct_field.clone(),
1513 offsets.clone(),
1514 both_struct,
1515 None,
1516 );
1517 let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1518 let x_batch =
1519 RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1520 let y_batch = RecordBatch::try_new(
1521 Arc::new(right_schema.clone()),
1522 vec![Arc::new(y_s_list.clone())],
1523 )
1524 .unwrap();
1525 let merged = x_batch.merge(&y_batch).unwrap();
1526 let expected =
1527 RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1528 assert_eq!(merged, expected);
1529
1530 let y_null_list = new_null_array(y_s_list.data_type(), 1);
1531 let y_null_batch =
1532 RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1533 .unwrap();
1534 let expected =
1535 RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1536 let merged = x_batch.merge(&y_null_batch).unwrap();
1537 assert_eq!(merged, expected);
1538 }
1539
1540 #[test]
1541 fn test_byte_width_opt() {
1542 assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1543 assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1544 assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1545 assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1546 assert_eq!(DataType::Utf8.byte_width_opt(), None);
1547 assert_eq!(DataType::Binary.byte_width_opt(), None);
1548 assert_eq!(
1549 DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1550 None
1551 );
1552 assert_eq!(
1553 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1554 .byte_width_opt(),
1555 Some(12)
1556 );
1557 assert_eq!(
1558 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1559 .byte_width_opt(),
1560 Some(16)
1561 );
1562 assert_eq!(
1563 DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1564 .byte_width_opt(),
1565 None
1566 );
1567 }
1568
1569 #[test]
1570 fn test_take_record_batch() {
1571 let schema = Arc::new(Schema::new(vec![
1572 Field::new("a", DataType::Int32, true),
1573 Field::new("b", DataType::Utf8, true),
1574 ]));
1575 let batch = RecordBatch::try_new(
1576 schema.clone(),
1577 vec![
1578 Arc::new(Int32Array::from_iter_values(0..20)),
1579 Arc::new(StringArray::from_iter_values(
1580 (0..20).map(|i| format!("str-{}", i)),
1581 )),
1582 ],
1583 )
1584 .unwrap();
1585 let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1586 assert_eq!(
1587 taken,
1588 RecordBatch::try_new(
1589 schema,
1590 vec![
1591 Arc::new(Int32Array::from(vec![1, 5, 10])),
1592 Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1593 ],
1594 )
1595 .unwrap()
1596 )
1597 }
1598
1599 #[test]
1600 fn test_schema_project_by_schema() {
1601 let metadata = [("key".to_string(), "value".to_string())];
1602 let schema = Arc::new(
1603 Schema::new(vec![
1604 Field::new("a", DataType::Int32, true),
1605 Field::new("b", DataType::Utf8, true),
1606 ])
1607 .with_metadata(metadata.clone().into()),
1608 );
1609 let batch = RecordBatch::try_new(
1610 schema,
1611 vec![
1612 Arc::new(Int32Array::from_iter_values(0..20)),
1613 Arc::new(StringArray::from_iter_values(
1614 (0..20).map(|i| format!("str-{}", i)),
1615 )),
1616 ],
1617 )
1618 .unwrap();
1619
1620 let empty_schema = Schema::empty();
1622 let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1623 let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1624 assert_eq!(
1625 empty_projected,
1626 RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1627 .with_schema(Arc::new(expected_schema))
1628 .unwrap()
1629 );
1630
1631 let reordered_schema = Schema::new(vec![
1633 Field::new("b", DataType::Utf8, true),
1634 Field::new("a", DataType::Int32, true),
1635 ]);
1636 let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1637 let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1638 assert_eq!(
1639 reordered_projected,
1640 RecordBatch::try_new(
1641 expected_schema,
1642 vec![
1643 Arc::new(StringArray::from_iter_values(
1644 (0..20).map(|i| format!("str-{}", i)),
1645 )),
1646 Arc::new(Int32Array::from_iter_values(0..20)),
1647 ],
1648 )
1649 .unwrap()
1650 );
1651
1652 let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1654 let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1655 let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1656 assert_eq!(
1657 sub_projected,
1658 RecordBatch::try_new(
1659 expected_schema,
1660 vec![Arc::new(Int32Array::from_iter_values(0..20))],
1661 )
1662 .unwrap()
1663 );
1664 }
1665
1666 #[test]
1667 fn test_project_preserves_struct_validity() {
1668 let fields = Fields::from(vec![
1670 Field::new("id", DataType::Int32, false),
1671 Field::new("value", DataType::Float32, true),
1672 ]);
1673
1674 let id_array = Int32Array::from(vec![1, 2, 3]);
1676 let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1677 let struct_array = StructArray::new(
1678 fields.clone(),
1679 vec![
1680 Arc::new(id_array) as ArrayRef,
1681 Arc::new(value_array) as ArrayRef,
1682 ],
1683 Some(vec![true, false, true].into()), );
1685
1686 let projected = project(&struct_array, &fields).unwrap();
1688
1689 assert_eq!(projected.null_count(), 1);
1691 assert!(!projected.is_null(0));
1692 assert!(projected.is_null(1));
1693 assert!(!projected.is_null(2));
1694 }
1695
1696 #[test]
1697 fn test_merge_struct_with_different_validity() {
1698 let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1701 let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1702 let left_struct = StructArray::new(
1703 left_fields,
1704 vec![Arc::new(height_array) as ArrayRef],
1705 Some(vec![true, false, true, false].into()), );
1707
1708 let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1710 let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1711 let right_struct = StructArray::new(
1712 right_fields,
1713 vec![Arc::new(width_array) as ArrayRef],
1714 Some(vec![true, true, false, false].into()), );
1716
1717 let merged = merge(&left_struct, &right_struct);
1719
1720 assert_eq!(merged.null_count(), 1); assert!(!merged.is_null(0));
1728 assert!(!merged.is_null(1));
1729 assert!(!merged.is_null(2));
1730 assert!(merged.is_null(3));
1731
1732 let height_col = merged.column_by_name("height").unwrap();
1734 let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1735 assert_eq!(height_values.value(0), 500);
1736 assert!(height_values.is_null(1)); assert_eq!(height_values.value(2), 600);
1738
1739 let width_col = merged.column_by_name("width").unwrap();
1740 let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1741 assert_eq!(width_values.value(0), 300);
1742 assert_eq!(width_values.value(1), 200);
1743 assert!(width_values.is_null(2)); }
1745}