Skip to main content

lance_arrow/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend Arrow Functionality
5//!
6//! To improve Arrow-RS ergonomic
7
8use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12    Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray, GenericListArray,
13    LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray,
14    UInt8Array, UInt32Array, cast::AsArray,
15};
16use arrow_array::{
17    Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, new_null_array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SortOptions};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30use crate::list::ListArrayExt;
31pub use floats::*;
32
33pub mod cast;
34pub mod json;
35pub mod list;
36pub mod memory;
37pub mod scalar;
38pub mod stream;
39pub mod r#struct;
40
41/// Arrow extension metadata key for extension name
42pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
43
44/// Arrow extension metadata key for extension metadata
45pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
46
47/// Key used by lance to mark a field as a blob
48/// TODO: Use Arrow extension mechanism instead?
49pub const BLOB_META_KEY: &str = "lance-encoding:blob";
50/// Arrow extension type name for Lance blob v2 columns
51pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
52/// Metadata key for overriding the dedicated blob size threshold (in bytes)
53pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str =
54    "lance-encoding:blob-dedicated-size-threshold";
55
56type Result<T> = std::result::Result<T, ArrowError>;
57
58pub trait DataTypeExt {
59    /// Returns true if the data type is binary-like, such as (Large)Utf8 and (Large)Binary.
60    ///
61    /// ```
62    /// use lance_arrow::*;
63    /// use arrow_schema::DataType;
64    ///
65    /// assert!(DataType::Utf8.is_binary_like());
66    /// assert!(DataType::Binary.is_binary_like());
67    /// assert!(DataType::LargeUtf8.is_binary_like());
68    /// assert!(DataType::LargeBinary.is_binary_like());
69    /// assert!(!DataType::Int32.is_binary_like());
70    /// ```
71    fn is_binary_like(&self) -> bool;
72
73    /// Returns true if the data type is a struct.
74    fn is_struct(&self) -> bool;
75
76    /// Check whether the given Arrow DataType is fixed stride.
77    ///
78    /// A fixed stride type has the same byte width for all array elements
79    /// This includes all PrimitiveType's Boolean, FixedSizeList, FixedSizeBinary, and Decimals
80    fn is_fixed_stride(&self) -> bool;
81
82    /// Returns true if the [DataType] is a dictionary type.
83    fn is_dictionary(&self) -> bool;
84
85    /// Returns the byte width of the data type
86    /// Panics if the data type is not fixed stride.
87    fn byte_width(&self) -> usize;
88
89    /// Returns the byte width of the data type, if it is fixed stride.
90    /// Returns None if the data type is not fixed stride.
91    fn byte_width_opt(&self) -> Option<usize>;
92}
93
94impl DataTypeExt for DataType {
95    fn is_binary_like(&self) -> bool {
96        use DataType::*;
97        matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
98    }
99
100    fn is_struct(&self) -> bool {
101        matches!(self, Self::Struct(_))
102    }
103
104    fn is_fixed_stride(&self) -> bool {
105        use DataType::*;
106        matches!(
107            self,
108            Boolean
109                | UInt8
110                | UInt16
111                | UInt32
112                | UInt64
113                | Int8
114                | Int16
115                | Int32
116                | Int64
117                | Float16
118                | Float32
119                | Float64
120                | Decimal128(_, _)
121                | Decimal256(_, _)
122                | FixedSizeList(_, _)
123                | FixedSizeBinary(_)
124                | Duration(_)
125                | Timestamp(_, _)
126                | Date32
127                | Date64
128                | Time32(_)
129                | Time64(_)
130        )
131    }
132
133    fn is_dictionary(&self) -> bool {
134        matches!(self, Self::Dictionary(_, _))
135    }
136
137    fn byte_width_opt(&self) -> Option<usize> {
138        match self {
139            Self::Int8 => Some(1),
140            Self::Int16 => Some(2),
141            Self::Int32 => Some(4),
142            Self::Int64 => Some(8),
143            Self::UInt8 => Some(1),
144            Self::UInt16 => Some(2),
145            Self::UInt32 => Some(4),
146            Self::UInt64 => Some(8),
147            Self::Float16 => Some(2),
148            Self::Float32 => Some(4),
149            Self::Float64 => Some(8),
150            Self::Date32 => Some(4),
151            Self::Date64 => Some(8),
152            Self::Time32(_) => Some(4),
153            Self::Time64(_) => Some(8),
154            Self::Timestamp(_, _) => Some(8),
155            Self::Duration(_) => Some(8),
156            Self::Decimal128(_, _) => Some(16),
157            Self::Decimal256(_, _) => Some(32),
158            Self::Interval(unit) => match unit {
159                IntervalUnit::YearMonth => Some(4),
160                IntervalUnit::DayTime => Some(8),
161                IntervalUnit::MonthDayNano => Some(16),
162            },
163            Self::FixedSizeBinary(s) => Some(*s as usize),
164            Self::FixedSizeList(dt, s) => dt
165                .data_type()
166                .byte_width_opt()
167                .map(|width| width * *s as usize),
168            _ => None,
169        }
170    }
171
172    fn byte_width(&self) -> usize {
173        self.byte_width_opt()
174            .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
175    }
176}
177
178/// Create an [`GenericListArray`] from values and offsets.
179///
180/// ```
181/// use arrow_array::{Int32Array, Int64Array, ListArray};
182/// use arrow_array::types::Int64Type;
183/// use lance_arrow::try_new_generic_list_array;
184///
185/// let offsets = Int32Array::from_iter([0, 2, 7, 10]);
186/// let int_values = Int64Array::from_iter(0..10);
187/// let list_arr = try_new_generic_list_array(int_values, &offsets).unwrap();
188/// assert_eq!(list_arr,
189///     ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
190///         Some(vec![Some(0), Some(1)]),
191///         Some(vec![Some(2), Some(3), Some(4), Some(5), Some(6)]),
192///         Some(vec![Some(7), Some(8), Some(9)]),
193/// ]))
194/// ```
195pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
196    values: T,
197    offsets: &PrimitiveArray<Offset>,
198) -> Result<GenericListArray<Offset::Native>>
199where
200    Offset::Native: OffsetSizeTrait,
201{
202    let data_type = if Offset::Native::IS_LARGE {
203        DataType::LargeList(Arc::new(Field::new(
204            "item",
205            values.data_type().clone(),
206            true,
207        )))
208    } else {
209        DataType::List(Arc::new(Field::new(
210            "item",
211            values.data_type().clone(),
212            true,
213        )))
214    };
215    let data = ArrayDataBuilder::new(data_type)
216        .len(offsets.len() - 1)
217        .add_buffer(offsets.into_data().buffers()[0].clone())
218        .add_child_data(values.into_data())
219        .build()?;
220
221    Ok(GenericListArray::from(data))
222}
223
224pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
225    DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
226}
227
228pub trait FixedSizeListArrayExt {
229    /// Create an [`FixedSizeListArray`] from values and list size.
230    ///
231    /// ```
232    /// use arrow_array::{Int64Array, FixedSizeListArray};
233    /// use arrow_array::types::Int64Type;
234    /// use lance_arrow::FixedSizeListArrayExt;
235    ///
236    /// let int_values = Int64Array::from_iter(0..10);
237    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 2).unwrap();
238    /// assert_eq!(fixed_size_list_arr,
239    ///     FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(vec![
240    ///         Some(vec![Some(0), Some(1)]),
241    ///         Some(vec![Some(2), Some(3)]),
242    ///         Some(vec![Some(4), Some(5)]),
243    ///         Some(vec![Some(6), Some(7)]),
244    ///         Some(vec![Some(8), Some(9)])
245    /// ], 2))
246    /// ```
247    fn try_new_from_values<T: Array + 'static>(
248        values: T,
249        list_size: i32,
250    ) -> Result<FixedSizeListArray>;
251
252    /// Sample `n` rows from the [FixedSizeListArray]
253    ///
254    /// ```
255    /// use arrow_array::{Int64Array, FixedSizeListArray, Array};
256    /// use lance_arrow::FixedSizeListArrayExt;
257    ///
258    /// let int_values = Int64Array::from_iter(0..256);
259    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 16).unwrap();
260    /// let sampled = fixed_size_list_arr.sample(10).unwrap();
261    /// assert_eq!(sampled.len(), 10);
262    /// assert_eq!(sampled.value_length(), 16);
263    /// assert_eq!(sampled.values().len(), 160);
264    /// ```
265    fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
266
267    /// Ensure the [FixedSizeListArray] of Float16, Float32, Float64,
268    /// Int8, Int16, Int32, Int64, UInt8, UInt32 type to its closest floating point type.
269    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
270}
271
272impl FixedSizeListArrayExt for FixedSizeListArray {
273    fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
274        let field = Arc::new(Field::new("item", values.data_type().clone(), true));
275        let values = Arc::new(values);
276
277        Self::try_new(field, list_size, values, None)
278    }
279
280    fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
281        if n >= self.len() {
282            return Ok(self.clone());
283        }
284        let mut rng = SmallRng::from_os_rng();
285        let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
286        take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
287    }
288
289    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
290        match self.data_type() {
291            DataType::FixedSizeList(field, size) => match field.data_type() {
292                DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
293                DataType::Int8 => Ok(Self::new(
294                    Arc::new(arrow_schema::Field::new(
295                        field.name(),
296                        DataType::Float32,
297                        field.is_nullable(),
298                    )),
299                    *size,
300                    Arc::new(Float32Array::from_iter_values(
301                        self.values()
302                            .as_any()
303                            .downcast_ref::<Int8Array>()
304                            .ok_or(ArrowError::ParseError(
305                                "Fail to cast primitive array to Int8Type".to_string(),
306                            ))?
307                            .into_iter()
308                            .filter_map(|x| x.map(|y| y as f32)),
309                    )),
310                    self.nulls().cloned(),
311                )),
312                DataType::Int16 => Ok(Self::new(
313                    Arc::new(arrow_schema::Field::new(
314                        field.name(),
315                        DataType::Float32,
316                        field.is_nullable(),
317                    )),
318                    *size,
319                    Arc::new(Float32Array::from_iter_values(
320                        self.values()
321                            .as_any()
322                            .downcast_ref::<Int16Array>()
323                            .ok_or(ArrowError::ParseError(
324                                "Fail to cast primitive array to Int16Type".to_string(),
325                            ))?
326                            .into_iter()
327                            .filter_map(|x| x.map(|y| y as f32)),
328                    )),
329                    self.nulls().cloned(),
330                )),
331                DataType::Int32 => Ok(Self::new(
332                    Arc::new(arrow_schema::Field::new(
333                        field.name(),
334                        DataType::Float32,
335                        field.is_nullable(),
336                    )),
337                    *size,
338                    Arc::new(Float32Array::from_iter_values(
339                        self.values()
340                            .as_any()
341                            .downcast_ref::<Int32Array>()
342                            .ok_or(ArrowError::ParseError(
343                                "Fail to cast primitive array to Int32Type".to_string(),
344                            ))?
345                            .into_iter()
346                            .filter_map(|x| x.map(|y| y as f32)),
347                    )),
348                    self.nulls().cloned(),
349                )),
350                DataType::Int64 => Ok(Self::new(
351                    Arc::new(arrow_schema::Field::new(
352                        field.name(),
353                        DataType::Float64,
354                        field.is_nullable(),
355                    )),
356                    *size,
357                    Arc::new(Float64Array::from_iter_values(
358                        self.values()
359                            .as_any()
360                            .downcast_ref::<Int64Array>()
361                            .ok_or(ArrowError::ParseError(
362                                "Fail to cast primitive array to Int64Type".to_string(),
363                            ))?
364                            .into_iter()
365                            .filter_map(|x| x.map(|y| y as f64)),
366                    )),
367                    self.nulls().cloned(),
368                )),
369                DataType::UInt8 => Ok(Self::new(
370                    Arc::new(arrow_schema::Field::new(
371                        field.name(),
372                        DataType::Float64,
373                        field.is_nullable(),
374                    )),
375                    *size,
376                    Arc::new(Float64Array::from_iter_values(
377                        self.values()
378                            .as_any()
379                            .downcast_ref::<UInt8Array>()
380                            .ok_or(ArrowError::ParseError(
381                                "Fail to cast primitive array to UInt8Type".to_string(),
382                            ))?
383                            .into_iter()
384                            .filter_map(|x| x.map(|y| y as f64)),
385                    )),
386                    self.nulls().cloned(),
387                )),
388                DataType::UInt32 => Ok(Self::new(
389                    Arc::new(arrow_schema::Field::new(
390                        field.name(),
391                        DataType::Float64,
392                        field.is_nullable(),
393                    )),
394                    *size,
395                    Arc::new(Float64Array::from_iter_values(
396                        self.values()
397                            .as_any()
398                            .downcast_ref::<UInt32Array>()
399                            .ok_or(ArrowError::ParseError(
400                                "Fail to cast primitive array to UInt32Type".to_string(),
401                            ))?
402                            .into_iter()
403                            .filter_map(|x| x.map(|y| y as f64)),
404                    )),
405                    self.nulls().cloned(),
406                )),
407                data_type => Err(ArrowError::ParseError(format!(
408                    "Expect either floating type or integer got {:?}",
409                    data_type
410                ))),
411            },
412            data_type => Err(ArrowError::ParseError(format!(
413                "Expect either FixedSizeList got {:?}",
414                data_type
415            ))),
416        }
417    }
418}
419
420/// Force downcast of an [`Array`], such as an [`ArrayRef`], to
421/// [`FixedSizeListArray`], panic'ing on failure.
422pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
423    arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
424}
425
426pub trait FixedSizeBinaryArrayExt {
427    /// Create an [`FixedSizeBinaryArray`] from values and stride.
428    ///
429    /// ```
430    /// use arrow_array::{UInt8Array, FixedSizeBinaryArray};
431    /// use arrow_array::types::UInt8Type;
432    /// use lance_arrow::FixedSizeBinaryArrayExt;
433    ///
434    /// let int_values = UInt8Array::from_iter(0..10);
435    /// let fixed_size_list_arr = FixedSizeBinaryArray::try_new_from_values(&int_values, 2).unwrap();
436    /// assert_eq!(fixed_size_list_arr,
437    ///     FixedSizeBinaryArray::from(vec![
438    ///         Some(vec![0, 1].as_slice()),
439    ///         Some(vec![2, 3].as_slice()),
440    ///         Some(vec![4, 5].as_slice()),
441    ///         Some(vec![6, 7].as_slice()),
442    ///         Some(vec![8, 9].as_slice())
443    /// ]))
444    /// ```
445    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
446}
447
448impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
449    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
450        let data_type = DataType::FixedSizeBinary(stride);
451        let data = ArrayDataBuilder::new(data_type)
452            .len(values.len() / stride as usize)
453            .add_buffer(values.into_data().buffers()[0].clone())
454            .build()?;
455        Ok(Self::from(data))
456    }
457}
458
459pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
460    arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
461}
462
463pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
464    match arr.data_type() {
465        DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
466        DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
467        _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
468    }
469}
470
471/// Extends Arrow's [RecordBatch].
472pub trait RecordBatchExt {
473    /// Append a new column to this [`RecordBatch`] and returns a new RecordBatch.
474    ///
475    /// ```
476    /// use std::sync::Arc;
477    /// use arrow_array::{RecordBatch, Int32Array, StringArray};
478    /// use arrow_schema::{Schema, Field, DataType};
479    /// use lance_arrow::*;
480    ///
481    /// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
482    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
483    /// let record_batch = RecordBatch::try_new(schema, vec![int_arr.clone()]).unwrap();
484    ///
485    /// let new_field = Field::new("s", DataType::Utf8, true);
486    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
487    /// let new_record_batch = record_batch.try_with_column(new_field, str_arr.clone()).unwrap();
488    ///
489    /// assert_eq!(
490    ///     new_record_batch,
491    ///     RecordBatch::try_new(
492    ///         Arc::new(Schema::new(
493    ///             vec![
494    ///                 Field::new("a", DataType::Int32, true),
495    ///                 Field::new("s", DataType::Utf8, true)
496    ///             ])
497    ///         ),
498    ///         vec![int_arr, str_arr],
499    ///     ).unwrap()
500    /// )
501    /// ```
502    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
503
504    /// Created a new RecordBatch with column at index.
505    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
506
507    /// Creates a new [`RecordBatch`] from the provided  [`StructArray`].
508    ///
509    /// The fields on the [`StructArray`] need to match this [`RecordBatch`] schema
510    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
511
512    /// Merge with another [`RecordBatch`] and returns a new one.
513    ///
514    /// Fields are merged based on name.  First we iterate the left columns.  If a matching
515    /// name is found in the right then we merge the two columns.  If there is no match then
516    /// we add the left column to the output.
517    ///
518    /// To merge two columns we consider the type.  If both arrays are struct arrays we recurse.
519    /// Otherwise we use the left array.
520    ///
521    /// Afterwards we add all non-matching right columns to the output.
522    ///
523    /// Note: This method likely does not handle nested fields correctly and you may want to consider
524    /// using [`Self::merge_with_schema`] instead.
525    /// ```
526    /// use std::sync::Arc;
527    /// use arrow_array::*;
528    /// use arrow_schema::{Schema, Field, DataType};
529    /// use lance_arrow::*;
530    ///
531    /// let left_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
532    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
533    /// let left = RecordBatch::try_new(left_schema, vec![int_arr.clone()]).unwrap();
534    ///
535    /// let right_schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
536    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
537    /// let right = RecordBatch::try_new(right_schema, vec![str_arr.clone()]).unwrap();
538    ///
539    /// let new_record_batch = left.merge(&right).unwrap();
540    ///
541    /// assert_eq!(
542    ///     new_record_batch,
543    ///     RecordBatch::try_new(
544    ///         Arc::new(Schema::new(
545    ///             vec![
546    ///                 Field::new("a", DataType::Int32, true),
547    ///                 Field::new("s", DataType::Utf8, true)
548    ///             ])
549    ///         ),
550    ///         vec![int_arr, str_arr],
551    ///     ).unwrap()
552    /// )
553    /// ```
554    ///
555    /// TODO: add merge nested fields support.
556    fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
557
558    /// Create a batch by merging columns between two batches with a given schema.
559    ///
560    /// A reference schema is used to determine the proper ordering of nested fields.
561    ///
562    /// For each field in the reference schema we look for corresponding fields in
563    /// the left and right batches.  If a field is found in both batches we recursively merge
564    /// it.
565    ///
566    /// If a field is only in the left or right batch we take it as it is.
567    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
568
569    /// Drop one column specified with the name and return the new [`RecordBatch`].
570    ///
571    /// If the named column does not exist, it returns a copy of this [`RecordBatch`].
572    fn drop_column(&self, name: &str) -> Result<RecordBatch>;
573
574    /// Replace a column (specified by name) and return the new [`RecordBatch`].
575    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
576
577    /// Replace a column schema (specified by name) and return the new [`RecordBatch`].
578    fn replace_column_schema_by_name(
579        &self,
580        name: &str,
581        new_data_type: DataType,
582        column: Arc<dyn Array>,
583    ) -> Result<RecordBatch>;
584
585    /// Rename a column at a given index.
586    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
587
588    /// Get (potentially nested) column by qualified name.
589    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
590
591    /// Project the schema over the [RecordBatch].
592    fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
593
594    /// metadata of the schema.
595    fn metadata(&self) -> &HashMap<String, String>;
596
597    /// Add metadata to the schema.
598    fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
599        let mut metadata = self.metadata().clone();
600        metadata.insert(key, value);
601        self.with_metadata(metadata)
602    }
603
604    /// Replace the schema metadata with the provided one.
605    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
606
607    /// Take selected rows from the [RecordBatch].
608    fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
609
610    /// Create a new RecordBatch with compacted memory after slicing.
611    fn shrink_to_fit(&self) -> Result<RecordBatch>;
612
613    /// Helper method to sort the RecordBatch by a column
614    fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<RecordBatch>;
615}
616
617impl RecordBatchExt for RecordBatch {
618    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
619        let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
620        let mut new_columns = self.columns().to_vec();
621        new_columns.push(arr);
622        Self::try_new(new_schema, new_columns)
623    }
624
625    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
626        let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
627        let mut new_columns = self.columns().to_vec();
628        new_columns.insert(index, arr);
629        Self::try_new(new_schema, new_columns)
630    }
631
632    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
633        let schema = Arc::new(Schema::new_with_metadata(
634            arr.fields().to_vec(),
635            self.schema().metadata.clone(),
636        ));
637        let batch = Self::from(arr);
638        batch.with_schema(schema)
639    }
640
641    fn merge(&self, other: &Self) -> Result<Self> {
642        if self.num_rows() != other.num_rows() {
643            return Err(ArrowError::InvalidArgumentError(format!(
644                "Attempt to merge two RecordBatch with different sizes: {} != {}",
645                self.num_rows(),
646                other.num_rows()
647            )));
648        }
649        let left_struct_array: StructArray = self.clone().into();
650        let right_struct_array: StructArray = other.clone().into();
651        self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
652    }
653
654    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
655        if self.num_rows() != other.num_rows() {
656            return Err(ArrowError::InvalidArgumentError(format!(
657                "Attempt to merge two RecordBatch with different sizes: {} != {}",
658                self.num_rows(),
659                other.num_rows()
660            )));
661        }
662        let left_struct_array: StructArray = self.clone().into();
663        let right_struct_array: StructArray = other.clone().into();
664        self.try_new_from_struct_array(merge_with_schema(
665            &left_struct_array,
666            &right_struct_array,
667            schema.fields(),
668        ))
669    }
670
671    fn drop_column(&self, name: &str) -> Result<Self> {
672        let mut fields = vec![];
673        let mut columns = vec![];
674        for i in 0..self.schema().fields.len() {
675            if self.schema().field(i).name() != name {
676                fields.push(self.schema().field(i).clone());
677                columns.push(self.column(i).clone());
678            }
679        }
680        Self::try_new(
681            Arc::new(Schema::new_with_metadata(
682                fields,
683                self.schema().metadata().clone(),
684            )),
685            columns,
686        )
687    }
688
689    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
690        let mut fields = self.schema().fields().to_vec();
691        if index >= fields.len() {
692            return Err(ArrowError::InvalidArgumentError(format!(
693                "Index out of bounds: {}",
694                index
695            )));
696        }
697        fields[index] = Arc::new(Field::new(
698            new_name,
699            fields[index].data_type().clone(),
700            fields[index].is_nullable(),
701        ));
702        Self::try_new(
703            Arc::new(Schema::new_with_metadata(
704                fields,
705                self.schema().metadata().clone(),
706            )),
707            self.columns().to_vec(),
708        )
709    }
710
711    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
712        let mut columns = self.columns().to_vec();
713        let field_i = self
714            .schema()
715            .fields()
716            .iter()
717            .position(|f| f.name() == name)
718            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
719        columns[field_i] = column;
720        Self::try_new(self.schema(), columns)
721    }
722
723    fn replace_column_schema_by_name(
724        &self,
725        name: &str,
726        new_data_type: DataType,
727        column: Arc<dyn Array>,
728    ) -> Result<RecordBatch> {
729        let fields = self
730            .schema()
731            .fields()
732            .iter()
733            .map(|x| {
734                if x.name() != name {
735                    x.clone()
736                } else {
737                    let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
738                    Arc::new(new_field)
739                }
740            })
741            .collect::<Vec<_>>();
742        let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
743        let mut columns = self.columns().to_vec();
744        let field_i = self
745            .schema()
746            .fields()
747            .iter()
748            .position(|f| f.name() == name)
749            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
750        columns[field_i] = column;
751        Self::try_new(Arc::new(schema), columns)
752    }
753
754    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
755        let split = name.split('.').collect::<Vec<_>>();
756        if split.is_empty() {
757            return None;
758        }
759
760        self.column_by_name(split[0])
761            .and_then(|arr| get_sub_array(arr, &split[1..]))
762    }
763
764    fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
765        let struct_array: StructArray = self.clone().into();
766        self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
767    }
768
769    fn metadata(&self) -> &HashMap<String, String> {
770        self.schema_ref().metadata()
771    }
772
773    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
774        let mut schema = self.schema_ref().as_ref().clone();
775        schema.metadata = metadata;
776        Self::try_new(schema.into(), self.columns().into())
777    }
778
779    fn take(&self, indices: &UInt32Array) -> Result<Self> {
780        let struct_array: StructArray = self.clone().into();
781        let taken = take(&struct_array, indices, None)?;
782        self.try_new_from_struct_array(taken.as_struct().clone())
783    }
784
785    fn shrink_to_fit(&self) -> Result<Self> {
786        // Deep copy the sliced record batch, instead of whole batch
787        crate::deepcopy::deep_copy_batch_sliced(self)
788    }
789
790    fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<Self> {
791        if column >= self.num_columns() {
792            return Err(ArrowError::InvalidArgumentError(format!(
793                "Column index out of bounds: {}",
794                column
795            )));
796        }
797        let column = self.column(column);
798        let sorted = arrow_ord::sort::sort_to_indices(column, options, None)?;
799        self.take(&sorted)
800    }
801}
802
803/// Recursively projects an array to match the target field's structure.
804/// This handles reordering fields inside nested List<Struct> types.
805fn project_array(array: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
806    match target_field.data_type() {
807        DataType::Struct(subfields) => {
808            let struct_arr = array.as_struct();
809            let projected = project(struct_arr, subfields)?;
810            Ok(Arc::new(projected))
811        }
812        DataType::List(inner_field) => {
813            let list_arr: &ListArray = array.as_list();
814            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
815            Ok(Arc::new(ListArray::new(
816                inner_field.clone(),
817                list_arr.offsets().clone(),
818                projected_values,
819                list_arr.nulls().cloned(),
820            )))
821        }
822        DataType::LargeList(inner_field) => {
823            let list_arr: &LargeListArray = array.as_list();
824            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
825            Ok(Arc::new(LargeListArray::new(
826                inner_field.clone(),
827                list_arr.offsets().clone(),
828                projected_values,
829                list_arr.nulls().cloned(),
830            )))
831        }
832        DataType::FixedSizeList(inner_field, size) => {
833            let list_arr = array.as_fixed_size_list();
834            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
835            Ok(Arc::new(FixedSizeListArray::new(
836                inner_field.clone(),
837                *size,
838                projected_values,
839                list_arr.nulls().cloned(),
840            )))
841        }
842        _ => Ok(array.clone()),
843    }
844}
845
846fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
847    if fields.is_empty() {
848        return Ok(StructArray::new_empty_fields(
849            struct_array.len(),
850            struct_array.nulls().cloned(),
851        ));
852    }
853    let mut columns: Vec<ArrayRef> = vec![];
854    for field in fields.iter() {
855        if let Some(col) = struct_array.column_by_name(field.name()) {
856            let projected = project_array(col, field.as_ref())?;
857            columns.push(projected);
858        } else {
859            return Err(ArrowError::SchemaError(format!(
860                "field {} does not exist in the RecordBatch",
861                field.name()
862            )));
863        }
864    }
865    // Preserve the struct's validity when projecting
866    StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
867}
868
869fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
870    let left_list: &GenericListArray<T> = left.as_list();
871    let right_list: &GenericListArray<T> = right.as_list();
872    left_list.offsets().inner() == right_list.offsets().inner()
873}
874
875fn merge_list_structs_helper<T: OffsetSizeTrait>(
876    left: &dyn Array,
877    right: &dyn Array,
878    items_field_name: impl Into<String>,
879    items_nullable: bool,
880) -> Arc<dyn Array> {
881    let left_list: &GenericListArray<T> = left.as_list();
882    let right_list: &GenericListArray<T> = right.as_list();
883    let left_struct = left_list.values();
884    let right_struct = right_list.values();
885    let left_struct_arr = left_struct.as_struct();
886    let right_struct_arr = right_struct.as_struct();
887    let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
888    let items_field = Arc::new(Field::new(
889        items_field_name,
890        merged_items.data_type().clone(),
891        items_nullable,
892    ));
893    Arc::new(GenericListArray::<T>::new(
894        items_field,
895        left_list.offsets().clone(),
896        merged_items,
897        left_list.nulls().cloned(),
898    ))
899}
900
901fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
902    left: &dyn Array,
903    right: &dyn Array,
904    not_null: &dyn Array,
905    items_field_name: impl Into<String>,
906) -> Arc<dyn Array> {
907    let left_list: &GenericListArray<T> = left.as_list::<T>();
908    let not_null_list = not_null.as_list::<T>();
909    let right_list = right.as_list::<T>();
910
911    let left_struct = left_list.values().as_struct();
912    let not_null_struct: &StructArray = not_null_list.values().as_struct();
913    let right_struct = right_list.values().as_struct();
914
915    let values_len = not_null_list.values().len();
916    let mut merged_fields =
917        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
918    let mut merged_columns =
919        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
920
921    for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
922        merged_fields.push(field.clone());
923        if let Some(val) = not_null_struct.column_by_name(field.name()) {
924            merged_columns.push(val.clone());
925        } else {
926            merged_columns.push(new_null_array(field.data_type(), values_len))
927        }
928    }
929    for (_, field) in right_struct
930        .columns()
931        .iter()
932        .zip(right_struct.fields())
933        .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
934    {
935        merged_fields.push(field.clone());
936        if let Some(val) = not_null_struct.column_by_name(field.name()) {
937            merged_columns.push(val.clone());
938        } else {
939            merged_columns.push(new_null_array(field.data_type(), values_len));
940        }
941    }
942
943    let merged_struct = Arc::new(StructArray::new(
944        Fields::from(merged_fields),
945        merged_columns,
946        not_null_struct.nulls().cloned(),
947    ));
948    let items_field = Arc::new(Field::new(
949        items_field_name,
950        merged_struct.data_type().clone(),
951        true,
952    ));
953    Arc::new(GenericListArray::<T>::new(
954        items_field,
955        not_null_list.offsets().clone(),
956        merged_struct,
957        not_null_list.nulls().cloned(),
958    ))
959}
960
961fn merge_list_struct_null(
962    left: &dyn Array,
963    right: &dyn Array,
964    not_null: &dyn Array,
965) -> Arc<dyn Array> {
966    match left.data_type() {
967        DataType::List(left_field) => {
968            merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
969        }
970        DataType::LargeList(left_field) => {
971            merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
972        }
973        _ => unreachable!(),
974    }
975}
976
977fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
978    // Merging fields into a list<struct<...>> is tricky and can only succeed
979    // in two ways.  First, if both lists have the same offsets.  Second, if
980    // one of the lists is all-null
981    if left.null_count() == left.len() {
982        return merge_list_struct_null(left, right, right);
983    } else if right.null_count() == right.len() {
984        return merge_list_struct_null(left, right, left);
985    }
986    match (left.data_type(), right.data_type()) {
987        (DataType::List(left_field), DataType::List(_)) => {
988            if !lists_have_same_offsets_helper::<i32>(left, right) {
989                panic!("Attempt to merge list struct arrays which do not have same offsets");
990            }
991            merge_list_structs_helper::<i32>(
992                left,
993                right,
994                left_field.name(),
995                left_field.is_nullable(),
996            )
997        }
998        (DataType::LargeList(left_field), DataType::LargeList(_)) => {
999            if !lists_have_same_offsets_helper::<i64>(left, right) {
1000                panic!("Attempt to merge list struct arrays which do not have same offsets");
1001            }
1002            merge_list_structs_helper::<i64>(
1003                left,
1004                right,
1005                left_field.name(),
1006                left_field.is_nullable(),
1007            )
1008        }
1009        _ => unreachable!(),
1010    }
1011}
1012
1013/// Helper function to normalize validity buffers
1014/// Returns None for all-null validity (placeholder structs)
1015fn normalize_validity(
1016    validity: Option<&arrow_buffer::NullBuffer>,
1017) -> Option<&arrow_buffer::NullBuffer> {
1018    validity.and_then(|v| {
1019        if v.null_count() == v.len() {
1020            None
1021        } else {
1022            Some(v)
1023        }
1024    })
1025}
1026
1027/// Helper function to merge validity buffers from two struct arrays
1028/// Returns None only if both arrays are null at the same position
1029///
1030/// Special handling for placeholder structs (all-null validity)
1031fn merge_struct_validity(
1032    left_validity: Option<&arrow_buffer::NullBuffer>,
1033    right_validity: Option<&arrow_buffer::NullBuffer>,
1034) -> Option<arrow_buffer::NullBuffer> {
1035    // Normalize both validity buffers (convert all-null to None)
1036    let left_normalized = normalize_validity(left_validity);
1037    let right_normalized = normalize_validity(right_validity);
1038
1039    match (left_normalized, right_normalized) {
1040        // Fast paths: no computation needed
1041        (None, None) => None,
1042        (Some(left), None) => Some(left.clone()),
1043        (None, Some(right)) => Some(right.clone()),
1044        (Some(left), Some(right)) => {
1045            // Fast path: if both have no nulls, can return either one
1046            if left.null_count() == 0 && right.null_count() == 0 {
1047                return Some(left.clone());
1048            }
1049
1050            let left_buffer = left.inner();
1051            let right_buffer = right.inner();
1052
1053            // Perform bitwise OR directly on BooleanBuffers
1054            // This preserves the correct semantics: 1 = valid, 0 = null
1055            let merged_buffer = left_buffer | right_buffer;
1056
1057            Some(arrow_buffer::NullBuffer::from(merged_buffer))
1058        }
1059    }
1060}
1061
1062fn merge_list_child_values(
1063    child_field: &Field,
1064    left_values: ArrayRef,
1065    right_values: ArrayRef,
1066) -> ArrayRef {
1067    match child_field.data_type() {
1068        DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1069            left_values.as_struct(),
1070            right_values.as_struct(),
1071            child_fields,
1072        )) as ArrayRef,
1073        DataType::List(grandchild) => {
1074            let left_list = left_values
1075                .as_any()
1076                .downcast_ref::<ListArray>()
1077                .expect("left list values should be ListArray");
1078            let right_list = right_values
1079                .as_any()
1080                .downcast_ref::<ListArray>()
1081                .expect("right list values should be ListArray");
1082            let merged_values = merge_list_child_values(
1083                grandchild.as_ref(),
1084                left_list.values().clone(),
1085                right_list.values().clone(),
1086            );
1087            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1088            Arc::new(ListArray::new(
1089                grandchild.clone(),
1090                left_list.offsets().clone(),
1091                merged_values,
1092                merged_validity,
1093            )) as ArrayRef
1094        }
1095        DataType::LargeList(grandchild) => {
1096            let left_list = left_values
1097                .as_any()
1098                .downcast_ref::<LargeListArray>()
1099                .expect("left list values should be LargeListArray");
1100            let right_list = right_values
1101                .as_any()
1102                .downcast_ref::<LargeListArray>()
1103                .expect("right list values should be LargeListArray");
1104            let merged_values = merge_list_child_values(
1105                grandchild.as_ref(),
1106                left_list.values().clone(),
1107                right_list.values().clone(),
1108            );
1109            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1110            Arc::new(LargeListArray::new(
1111                grandchild.clone(),
1112                left_list.offsets().clone(),
1113                merged_values,
1114                merged_validity,
1115            )) as ArrayRef
1116        }
1117        DataType::FixedSizeList(grandchild, list_size) => {
1118            let left_list = left_values
1119                .as_any()
1120                .downcast_ref::<FixedSizeListArray>()
1121                .expect("left list values should be FixedSizeListArray");
1122            let right_list = right_values
1123                .as_any()
1124                .downcast_ref::<FixedSizeListArray>()
1125                .expect("right list values should be FixedSizeListArray");
1126            let merged_values = merge_list_child_values(
1127                grandchild.as_ref(),
1128                left_list.values().clone(),
1129                right_list.values().clone(),
1130            );
1131            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1132            Arc::new(FixedSizeListArray::new(
1133                grandchild.clone(),
1134                *list_size,
1135                merged_values,
1136                merged_validity,
1137            )) as ArrayRef
1138        }
1139        _ => left_values.clone(),
1140    }
1141}
1142
1143// Helper function to adjust child array validity based on parent struct validity
1144// When parent struct is null, propagates null to child array
1145// Optimized with fast paths and SIMD operations
1146fn adjust_child_validity(
1147    child: &ArrayRef,
1148    parent_validity: Option<&arrow_buffer::NullBuffer>,
1149) -> ArrayRef {
1150    // Fast path: no parent validity means no adjustment needed
1151    let parent_validity = match parent_validity {
1152        None => return child.clone(),
1153        Some(p) if p.null_count() == 0 => return child.clone(), // No nulls to propagate
1154        Some(p) => p,
1155    };
1156
1157    // Fast path: DataType::Null arrays are always entirely null by definition and cannot
1158    // carry an explicit null bitmap (Arrow rejects it). No adjustment is needed.
1159    if child.data_type() == &DataType::Null {
1160        return child.clone();
1161    }
1162
1163    let child_validity = child.nulls();
1164
1165    // Compute the new validity: child_validity AND parent_validity
1166    let new_validity = match child_validity {
1167        None => {
1168            // Fast path: child has no existing validity, just use parent's
1169            parent_validity.clone()
1170        }
1171        Some(child_nulls) => {
1172            let child_buffer = child_nulls.inner();
1173            let parent_buffer = parent_validity.inner();
1174
1175            // Perform bitwise AND directly on BooleanBuffers
1176            // This preserves the correct semantics: 1 = valid, 0 = null
1177            let merged_buffer = child_buffer & parent_buffer;
1178
1179            arrow_buffer::NullBuffer::from(merged_buffer)
1180        }
1181    };
1182
1183    // Create new array with adjusted validity
1184    arrow_array::make_array(
1185        arrow_data::ArrayData::try_new(
1186            child.data_type().clone(),
1187            child.len(),
1188            Some(new_validity.into_inner().into_inner()),
1189            child.offset(),
1190            child.to_data().buffers().to_vec(),
1191            child.to_data().child_data().to_vec(),
1192        )
1193        .unwrap(),
1194    )
1195}
1196
1197fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1198    let mut fields: Vec<Field> = vec![];
1199    let mut columns: Vec<ArrayRef> = vec![];
1200    let right_fields = right_struct_array.fields();
1201    let right_columns = right_struct_array.columns();
1202
1203    // Get the validity buffers from both structs
1204    let left_validity = left_struct_array.nulls();
1205    let right_validity = right_struct_array.nulls();
1206
1207    // Compute merged validity
1208    let merged_validity = merge_struct_validity(left_validity, right_validity);
1209
1210    // iterate through the fields on the left hand side
1211    for (left_field, left_column) in left_struct_array
1212        .fields()
1213        .iter()
1214        .zip(left_struct_array.columns().iter())
1215    {
1216        match right_fields
1217            .iter()
1218            .position(|f| f.name() == left_field.name())
1219        {
1220            // if the field exists on the right hand side, merge them recursively if appropriate
1221            Some(right_index) => {
1222                let right_field = right_fields.get(right_index).unwrap();
1223                let right_column = right_columns.get(right_index).unwrap();
1224                // if both fields are struct, merge them recursively
1225                match (left_field.data_type(), right_field.data_type()) {
1226                    (DataType::Struct(_), DataType::Struct(_)) => {
1227                        let left_sub_array = left_column.as_struct();
1228                        let right_sub_array = right_column.as_struct();
1229                        let merged_sub_array = merge(left_sub_array, right_sub_array);
1230                        fields.push(Field::new(
1231                            left_field.name(),
1232                            merged_sub_array.data_type().clone(),
1233                            left_field.is_nullable(),
1234                        ));
1235                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1236                    }
1237                    (DataType::List(left_list), DataType::List(right_list))
1238                        if left_list.data_type().is_struct()
1239                            && right_list.data_type().is_struct() =>
1240                    {
1241                        // If there is nothing to merge just use the left field
1242                        if left_list.data_type() == right_list.data_type() {
1243                            fields.push(left_field.as_ref().clone());
1244                            columns.push(left_column.clone());
1245                        }
1246                        // If we have two List<Struct> and they have different sets of fields then
1247                        // we can merge them if the offsets arrays are the same.  Otherwise, we
1248                        // have to consider it an error.
1249                        let merged_sub_array = merge_list_struct(&left_column, &right_column);
1250
1251                        fields.push(Field::new(
1252                            left_field.name(),
1253                            merged_sub_array.data_type().clone(),
1254                            left_field.is_nullable(),
1255                        ));
1256                        columns.push(merged_sub_array);
1257                    }
1258                    // otherwise, just use the field on the left hand side
1259                    _ => {
1260                        // TODO handle list-of-struct and other types
1261                        fields.push(left_field.as_ref().clone());
1262                        // Adjust the column validity: if left struct was null, propagate to child
1263                        let adjusted_column = adjust_child_validity(left_column, left_validity);
1264                        columns.push(adjusted_column);
1265                    }
1266                }
1267            }
1268            None => {
1269                fields.push(left_field.as_ref().clone());
1270                // Adjust the column validity: if left struct was null, propagate to child
1271                let adjusted_column = adjust_child_validity(left_column, left_validity);
1272                columns.push(adjusted_column);
1273            }
1274        }
1275    }
1276
1277    // now iterate through the fields on the right hand side
1278    right_fields
1279        .iter()
1280        .zip(right_columns.iter())
1281        .for_each(|(field, column)| {
1282            // add new columns on the right
1283            if !left_struct_array
1284                .fields()
1285                .iter()
1286                .any(|f| f.name() == field.name())
1287            {
1288                fields.push(field.as_ref().clone());
1289                // This field doesn't exist on the left
1290                // We use the right's column but need to adjust for struct validity
1291                let adjusted_column = adjust_child_validity(column, right_validity);
1292                columns.push(adjusted_column);
1293            }
1294        });
1295
1296    StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1297}
1298
1299fn merge_with_schema(
1300    left_struct_array: &StructArray,
1301    right_struct_array: &StructArray,
1302    fields: &Fields,
1303) -> StructArray {
1304    // Helper function that returns true if both types are struct or both are non-struct
1305    fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1306        match (left, right) {
1307            (DataType::Struct(_), DataType::Struct(_)) => true,
1308            (DataType::Struct(_), _) => false,
1309            (_, DataType::Struct(_)) => false,
1310            _ => true,
1311        }
1312    }
1313
1314    let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1315    let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1316
1317    let left_fields = left_struct_array.fields();
1318    let left_columns = left_struct_array.columns();
1319    let right_fields = right_struct_array.fields();
1320    let right_columns = right_struct_array.columns();
1321
1322    // Get the validity buffers from both structs
1323    let left_validity = left_struct_array.nulls();
1324    let right_validity = right_struct_array.nulls();
1325
1326    // Compute merged validity
1327    let merged_validity = merge_struct_validity(left_validity, right_validity);
1328
1329    for field in fields {
1330        let left_match_idx = left_fields.iter().position(|f| {
1331            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1332        });
1333        let right_match_idx = right_fields.iter().position(|f| {
1334            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1335        });
1336
1337        match (left_match_idx, right_match_idx) {
1338            (None, Some(right_idx)) => {
1339                output_fields.push(right_fields[right_idx].as_ref().clone());
1340                // Adjust validity if the right struct was null
1341                let adjusted_column =
1342                    adjust_child_validity(&right_columns[right_idx], right_validity);
1343                columns.push(adjusted_column);
1344            }
1345            (Some(left_idx), None) => {
1346                output_fields.push(left_fields[left_idx].as_ref().clone());
1347                // Adjust validity if the left struct was null
1348                let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1349                columns.push(adjusted_column);
1350            }
1351            (Some(left_idx), Some(right_idx)) => {
1352                match field.data_type() {
1353                    DataType::Struct(child_fields) => {
1354                        let left_sub_array = left_columns[left_idx].as_struct();
1355                        let right_sub_array = right_columns[right_idx].as_struct();
1356                        let merged_sub_array =
1357                            merge_with_schema(left_sub_array, right_sub_array, child_fields);
1358                        output_fields.push(Field::new(
1359                            field.name(),
1360                            merged_sub_array.data_type().clone(),
1361                            field.is_nullable(),
1362                        ));
1363                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1364                    }
1365                    DataType::List(child_field) => {
1366                        let left_list = left_columns[left_idx]
1367                            .as_any()
1368                            .downcast_ref::<ListArray>()
1369                            .unwrap();
1370                        let right_list = right_columns[right_idx]
1371                            .as_any()
1372                            .downcast_ref::<ListArray>()
1373                            .unwrap();
1374                        let merged_values = merge_list_child_values(
1375                            child_field.as_ref(),
1376                            left_list.trimmed_values(),
1377                            right_list.trimmed_values(),
1378                        );
1379                        let merged_validity =
1380                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1381                        let merged_list = ListArray::new(
1382                            child_field.clone(),
1383                            left_list.offsets().clone(),
1384                            merged_values,
1385                            merged_validity,
1386                        );
1387                        output_fields.push(field.as_ref().clone());
1388                        columns.push(Arc::new(merged_list) as ArrayRef);
1389                    }
1390                    DataType::LargeList(child_field) => {
1391                        let left_list = left_columns[left_idx]
1392                            .as_any()
1393                            .downcast_ref::<LargeListArray>()
1394                            .unwrap();
1395                        let right_list = right_columns[right_idx]
1396                            .as_any()
1397                            .downcast_ref::<LargeListArray>()
1398                            .unwrap();
1399                        let merged_values = merge_list_child_values(
1400                            child_field.as_ref(),
1401                            left_list.trimmed_values(),
1402                            right_list.trimmed_values(),
1403                        );
1404                        let merged_validity =
1405                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1406                        let merged_list = LargeListArray::new(
1407                            child_field.clone(),
1408                            left_list.offsets().clone(),
1409                            merged_values,
1410                            merged_validity,
1411                        );
1412                        output_fields.push(field.as_ref().clone());
1413                        columns.push(Arc::new(merged_list) as ArrayRef);
1414                    }
1415                    DataType::FixedSizeList(child_field, list_size) => {
1416                        let left_list = left_columns[left_idx]
1417                            .as_any()
1418                            .downcast_ref::<FixedSizeListArray>()
1419                            .unwrap();
1420                        let right_list = right_columns[right_idx]
1421                            .as_any()
1422                            .downcast_ref::<FixedSizeListArray>()
1423                            .unwrap();
1424                        let merged_values = merge_list_child_values(
1425                            child_field.as_ref(),
1426                            left_list.values().clone(),
1427                            right_list.values().clone(),
1428                        );
1429                        let merged_validity =
1430                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1431                        let merged_list = FixedSizeListArray::new(
1432                            child_field.clone(),
1433                            *list_size,
1434                            merged_values,
1435                            merged_validity,
1436                        );
1437                        output_fields.push(field.as_ref().clone());
1438                        columns.push(Arc::new(merged_list) as ArrayRef);
1439                    }
1440                    _ => {
1441                        output_fields.push(left_fields[left_idx].as_ref().clone());
1442                        // For fields that exist in both, use left but adjust validity
1443                        let adjusted_column =
1444                            adjust_child_validity(&left_columns[left_idx], left_validity);
1445                        columns.push(adjusted_column);
1446                    }
1447                }
1448            }
1449            (None, None) => {
1450                // The field will not be included in the output
1451            }
1452        }
1453    }
1454
1455    StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1456}
1457
1458fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1459    if components.is_empty() {
1460        return Some(array);
1461    }
1462    if !matches!(array.data_type(), DataType::Struct(_)) {
1463        return None;
1464    }
1465    let struct_arr = array.as_struct();
1466    struct_arr
1467        .column_by_name(components[0])
1468        .and_then(|arr| get_sub_array(arr, &components[1..]))
1469}
1470
1471/// Interleave multiple RecordBatches into a single RecordBatch.
1472///
1473/// Behaves like [`arrow_select::interleave::interleave`], but for RecordBatches.
1474pub fn interleave_batches(
1475    batches: &[RecordBatch],
1476    indices: &[(usize, usize)],
1477) -> Result<RecordBatch> {
1478    let first_batch = batches.first().ok_or_else(|| {
1479        ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1480    })?;
1481    let schema = first_batch.schema();
1482    let num_columns = first_batch.num_columns();
1483    let mut columns = Vec::with_capacity(num_columns);
1484    let mut chunks = Vec::with_capacity(batches.len());
1485
1486    for i in 0..num_columns {
1487        for batch in batches {
1488            chunks.push(batch.column(i).as_ref());
1489        }
1490        let new_column = interleave(&chunks, indices)?;
1491        columns.push(new_column);
1492        chunks.clear();
1493    }
1494
1495    RecordBatch::try_new(schema, columns)
1496}
1497
1498pub trait BufferExt {
1499    /// Create an `arrow_buffer::Buffer`` from a `bytes::Bytes` object
1500    ///
1501    /// The alignment must be specified (as `bytes_per_value`) since we want to make
1502    /// sure we can safely reinterpret the buffer.
1503    ///
1504    /// If the buffer is properly aligned this will be zero-copy.  If not, a copy
1505    /// will be made and an owned buffer returned.
1506    ///
1507    /// If `bytes_per_value` is not a power of two, then we assume the buffer is
1508    /// never going to be reinterpreted into another type and we can safely
1509    /// ignore the alignment.
1510    ///
1511    /// Yes, the method name is odd.  It's because there is already a `from_bytes`
1512    /// which converts from `arrow_buffer::bytes::Bytes` (not `bytes::Bytes`)
1513    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1514
1515    /// Allocates a new properly aligned arrow buffer and copies `bytes` into it
1516    ///
1517    /// `size_bytes` can be larger than `bytes` and, if so, the trailing bytes will
1518    /// be zeroed out.
1519    ///
1520    /// # Panics
1521    ///
1522    /// Panics if `size_bytes` is less than `bytes.len()`
1523    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1524}
1525
1526fn is_pwr_two(n: u64) -> bool {
1527    n & (n - 1) == 0
1528}
1529
1530impl BufferExt for arrow_buffer::Buffer {
1531    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1532        if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1533        {
1534            // The original buffer is not aligned, cannot zero-copy
1535            let size_bytes = bytes.len();
1536            Self::copy_bytes_bytes(bytes, size_bytes)
1537        } else {
1538            // The original buffer is aligned, can zero-copy
1539            // SAFETY: the alignment is correct we can make this conversion
1540            unsafe {
1541                Self::from_custom_allocation(
1542                    NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1543                    bytes.len(),
1544                    Arc::new(bytes),
1545                )
1546            }
1547        }
1548    }
1549
1550    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1551        assert!(size_bytes >= bytes.len());
1552        let mut buf = MutableBuffer::with_capacity(size_bytes);
1553        let to_fill = size_bytes - bytes.len();
1554        buf.extend(bytes);
1555        buf.extend(std::iter::repeat_n(0_u8, to_fill));
1556
1557        // FIX for issue #4512: Shrink buffer to actual size before converting to immutable
1558        // This reduces memory overhead from capacity over-allocation
1559        buf.shrink_to_fit();
1560
1561        Self::from(buf)
1562    }
1563}
1564
1565#[cfg(test)]
1566mod tests {
1567    use super::*;
1568    use arrow_array::{Float32Array, Int32Array, NullArray, StructArray};
1569    use arrow_array::{ListArray, StringArray, new_empty_array, new_null_array};
1570    use arrow_buffer::OffsetBuffer;
1571
1572    #[test]
1573    fn test_merge_recursive() {
1574        let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1575        let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1576        let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1577        let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1578
1579        let left_schema = Schema::new(vec![
1580            Field::new("a", DataType::Int32, true),
1581            Field::new(
1582                "b",
1583                DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1584                true,
1585            ),
1586        ]);
1587        let left_batch = RecordBatch::try_new(
1588            Arc::new(left_schema),
1589            vec![
1590                Arc::new(a_array.clone()),
1591                Arc::new(StructArray::from(vec![(
1592                    Arc::new(Field::new("c", DataType::Int32, true)),
1593                    Arc::new(c_array.clone()) as ArrayRef,
1594                )])),
1595            ],
1596        )
1597        .unwrap();
1598
1599        let right_schema = Schema::new(vec![
1600            Field::new("e", DataType::Int32, true),
1601            Field::new(
1602                "b",
1603                DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1604                true,
1605            ),
1606        ]);
1607        let right_batch = RecordBatch::try_new(
1608            Arc::new(right_schema),
1609            vec![
1610                Arc::new(e_array.clone()),
1611                Arc::new(StructArray::from(vec![(
1612                    Arc::new(Field::new("d", DataType::Utf8, true)),
1613                    Arc::new(d_array.clone()) as ArrayRef,
1614                )])) as ArrayRef,
1615            ],
1616        )
1617        .unwrap();
1618
1619        let merged_schema = Schema::new(vec![
1620            Field::new("a", DataType::Int32, true),
1621            Field::new(
1622                "b",
1623                DataType::Struct(
1624                    vec![
1625                        Field::new("c", DataType::Int32, true),
1626                        Field::new("d", DataType::Utf8, true),
1627                    ]
1628                    .into(),
1629                ),
1630                true,
1631            ),
1632            Field::new("e", DataType::Int32, true),
1633        ]);
1634        let merged_batch = RecordBatch::try_new(
1635            Arc::new(merged_schema),
1636            vec![
1637                Arc::new(a_array) as ArrayRef,
1638                Arc::new(StructArray::from(vec![
1639                    (
1640                        Arc::new(Field::new("c", DataType::Int32, true)),
1641                        Arc::new(c_array) as ArrayRef,
1642                    ),
1643                    (
1644                        Arc::new(Field::new("d", DataType::Utf8, true)),
1645                        Arc::new(d_array) as ArrayRef,
1646                    ),
1647                ])) as ArrayRef,
1648                Arc::new(e_array) as ArrayRef,
1649            ],
1650        )
1651        .unwrap();
1652
1653        let result = left_batch.merge(&right_batch).unwrap();
1654        assert_eq!(result, merged_batch);
1655    }
1656
1657    #[test]
1658    fn test_merge_with_schema() {
1659        fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1660            let fields: Fields = names
1661                .iter()
1662                .zip(types)
1663                .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1664                .collect();
1665            let schema = Schema::new(vec![Field::new(
1666                "struct",
1667                DataType::Struct(fields.clone()),
1668                false,
1669            )]);
1670            let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1671            let batch = RecordBatch::try_new(
1672                Arc::new(schema.clone()),
1673                vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1674            );
1675            (schema, batch.unwrap())
1676        }
1677
1678        let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1679        let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1680        let (output_schema, _) = test_batch(
1681            &["b", "a", "c"],
1682            &[DataType::Int64, DataType::Int32, DataType::Int32],
1683        );
1684
1685        // If we use merge_with_schema the schema is respected
1686        let merged = left_batch
1687            .merge_with_schema(&right_batch, &output_schema)
1688            .unwrap();
1689        assert_eq!(merged.schema().as_ref(), &output_schema);
1690
1691        // If we use merge we get first-come first-serve based on the left batch
1692        let (naive_schema, _) = test_batch(
1693            &["a", "b", "c"],
1694            &[DataType::Int32, DataType::Int64, DataType::Int32],
1695        );
1696        let merged = left_batch.merge(&right_batch).unwrap();
1697        assert_eq!(merged.schema().as_ref(), &naive_schema);
1698    }
1699
1700    #[test]
1701    fn test_merge_list_struct() {
1702        let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1703        let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1704        let x_struct_field = Arc::new(Field::new(
1705            "item",
1706            DataType::Struct(Fields::from(vec![x_field.clone()])),
1707            true,
1708        ));
1709        let y_struct_field = Arc::new(Field::new(
1710            "item",
1711            DataType::Struct(Fields::from(vec![y_field.clone()])),
1712            true,
1713        ));
1714        let both_struct_field = Arc::new(Field::new(
1715            "item",
1716            DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1717            true,
1718        ));
1719        let left_schema = Schema::new(vec![Field::new(
1720            "list_struct",
1721            DataType::List(x_struct_field.clone()),
1722            true,
1723        )]);
1724        let right_schema = Schema::new(vec![Field::new(
1725            "list_struct",
1726            DataType::List(y_struct_field.clone()),
1727            true,
1728        )]);
1729        let both_schema = Schema::new(vec![Field::new(
1730            "list_struct",
1731            DataType::List(both_struct_field.clone()),
1732            true,
1733        )]);
1734
1735        let x = Arc::new(Int32Array::from(vec![1]));
1736        let y = Arc::new(Int32Array::from(vec![2]));
1737        let x_struct = Arc::new(StructArray::new(
1738            Fields::from(vec![x_field.clone()]),
1739            vec![x.clone()],
1740            None,
1741        ));
1742        let y_struct = Arc::new(StructArray::new(
1743            Fields::from(vec![y_field.clone()]),
1744            vec![y.clone()],
1745            None,
1746        ));
1747        let both_struct = Arc::new(StructArray::new(
1748            Fields::from(vec![x_field.clone(), y_field.clone()]),
1749            vec![x.clone(), y],
1750            None,
1751        ));
1752        let both_null_struct = Arc::new(StructArray::new(
1753            Fields::from(vec![x_field, y_field]),
1754            vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1755            None,
1756        ));
1757        let offsets = OffsetBuffer::from_lengths([1]);
1758        let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1759        let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1760        let both_list = ListArray::new(
1761            both_struct_field.clone(),
1762            offsets.clone(),
1763            both_struct,
1764            None,
1765        );
1766        let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1767        let x_batch =
1768            RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1769        let y_batch = RecordBatch::try_new(
1770            Arc::new(right_schema.clone()),
1771            vec![Arc::new(y_s_list.clone())],
1772        )
1773        .unwrap();
1774        let merged = x_batch.merge(&y_batch).unwrap();
1775        let expected =
1776            RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1777        assert_eq!(merged, expected);
1778
1779        let y_null_list = new_null_array(y_s_list.data_type(), 1);
1780        let y_null_batch =
1781            RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1782                .unwrap();
1783        let expected =
1784            RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1785        let merged = x_batch.merge(&y_null_batch).unwrap();
1786        assert_eq!(merged, expected);
1787    }
1788
1789    #[test]
1790    fn test_byte_width_opt() {
1791        assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1792        assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1793        assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1794        assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1795        assert_eq!(DataType::Utf8.byte_width_opt(), None);
1796        assert_eq!(DataType::Binary.byte_width_opt(), None);
1797        assert_eq!(
1798            DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1799            None
1800        );
1801        assert_eq!(
1802            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1803                .byte_width_opt(),
1804            Some(12)
1805        );
1806        assert_eq!(
1807            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1808                .byte_width_opt(),
1809            Some(16)
1810        );
1811        assert_eq!(
1812            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1813                .byte_width_opt(),
1814            None
1815        );
1816    }
1817
1818    #[test]
1819    fn test_take_record_batch() {
1820        let schema = Arc::new(Schema::new(vec![
1821            Field::new("a", DataType::Int32, true),
1822            Field::new("b", DataType::Utf8, true),
1823        ]));
1824        let batch = RecordBatch::try_new(
1825            schema.clone(),
1826            vec![
1827                Arc::new(Int32Array::from_iter_values(0..20)),
1828                Arc::new(StringArray::from_iter_values(
1829                    (0..20).map(|i| format!("str-{}", i)),
1830                )),
1831            ],
1832        )
1833        .unwrap();
1834        let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1835        assert_eq!(
1836            taken,
1837            RecordBatch::try_new(
1838                schema,
1839                vec![
1840                    Arc::new(Int32Array::from(vec![1, 5, 10])),
1841                    Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1842                ],
1843            )
1844            .unwrap()
1845        )
1846    }
1847
1848    #[test]
1849    fn test_schema_project_by_schema() {
1850        let metadata = [("key".to_string(), "value".to_string())];
1851        let schema = Arc::new(
1852            Schema::new(vec![
1853                Field::new("a", DataType::Int32, true),
1854                Field::new("b", DataType::Utf8, true),
1855            ])
1856            .with_metadata(metadata.clone().into()),
1857        );
1858        let batch = RecordBatch::try_new(
1859            schema,
1860            vec![
1861                Arc::new(Int32Array::from_iter_values(0..20)),
1862                Arc::new(StringArray::from_iter_values(
1863                    (0..20).map(|i| format!("str-{}", i)),
1864                )),
1865            ],
1866        )
1867        .unwrap();
1868
1869        // Empty schema
1870        let empty_schema = Schema::empty();
1871        let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1872        let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1873        assert_eq!(
1874            empty_projected,
1875            RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1876                .with_schema(Arc::new(expected_schema))
1877                .unwrap()
1878        );
1879
1880        // Re-ordered schema
1881        let reordered_schema = Schema::new(vec![
1882            Field::new("b", DataType::Utf8, true),
1883            Field::new("a", DataType::Int32, true),
1884        ]);
1885        let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1886        let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1887        assert_eq!(
1888            reordered_projected,
1889            RecordBatch::try_new(
1890                expected_schema,
1891                vec![
1892                    Arc::new(StringArray::from_iter_values(
1893                        (0..20).map(|i| format!("str-{}", i)),
1894                    )),
1895                    Arc::new(Int32Array::from_iter_values(0..20)),
1896                ],
1897            )
1898            .unwrap()
1899        );
1900
1901        // Sub schema
1902        let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1903        let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1904        let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1905        assert_eq!(
1906            sub_projected,
1907            RecordBatch::try_new(
1908                expected_schema,
1909                vec![Arc::new(Int32Array::from_iter_values(0..20))],
1910            )
1911            .unwrap()
1912        );
1913    }
1914
1915    #[test]
1916    fn test_project_preserves_struct_validity() {
1917        // Test that projecting a struct array preserves its validity (fix for issue #4385)
1918        let fields = Fields::from(vec![
1919            Field::new("id", DataType::Int32, false),
1920            Field::new("value", DataType::Float32, true),
1921        ]);
1922
1923        // Create a struct array with validity
1924        let id_array = Int32Array::from(vec![1, 2, 3]);
1925        let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1926        let struct_array = StructArray::new(
1927            fields.clone(),
1928            vec![
1929                Arc::new(id_array) as ArrayRef,
1930                Arc::new(value_array) as ArrayRef,
1931            ],
1932            Some(vec![true, false, true].into()), // Second struct is null
1933        );
1934
1935        // Project the struct array
1936        let projected = project(&struct_array, &fields).unwrap();
1937
1938        // Verify the validity is preserved
1939        assert_eq!(projected.null_count(), 1);
1940        assert!(!projected.is_null(0));
1941        assert!(projected.is_null(1));
1942        assert!(!projected.is_null(2));
1943    }
1944
1945    #[test]
1946    fn test_merge_struct_with_different_validity() {
1947        // Test case from Weston's review comment
1948        // File 1 has height field with some nulls
1949        let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1950        let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1951        let left_struct = StructArray::new(
1952            left_fields,
1953            vec![Arc::new(height_array) as ArrayRef],
1954            Some(vec![true, false, true, false].into()), // Rows 2 and 4 are null structs
1955        );
1956
1957        // File 2 has width field with some nulls
1958        let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1959        let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1960        let right_struct = StructArray::new(
1961            right_fields,
1962            vec![Arc::new(width_array) as ArrayRef],
1963            Some(vec![true, true, false, false].into()), // Rows 3 and 4 are null structs
1964        );
1965
1966        // Merge the two structs
1967        let merged = merge(&left_struct, &right_struct);
1968
1969        // Expected:
1970        // Row 1: both non-null -> {width: 300, height: 500}
1971        // Row 2: left null, right non-null -> {width: 200, height: null}
1972        // Row 3: left non-null, right null -> {width: null, height: 600}
1973        // Row 4: both null -> null struct
1974
1975        assert_eq!(merged.null_count(), 1); // Only row 4 is null
1976        assert!(!merged.is_null(0));
1977        assert!(!merged.is_null(1));
1978        assert!(!merged.is_null(2));
1979        assert!(merged.is_null(3));
1980
1981        // Check field values
1982        let height_col = merged.column_by_name("height").unwrap();
1983        let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1984        assert_eq!(height_values.value(0), 500);
1985        assert!(height_values.is_null(1)); // height is null when left struct was null
1986        assert_eq!(height_values.value(2), 600);
1987
1988        let width_col = merged.column_by_name("width").unwrap();
1989        let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1990        assert_eq!(width_values.value(0), 300);
1991        assert_eq!(width_values.value(1), 200);
1992        assert!(width_values.is_null(2)); // width is null when right struct was null
1993    }
1994
1995    #[test]
1996    fn test_merge_null_typed_column_with_parent_validity() {
1997        // Reproduces ENT-990: panic in adjust_child_validity when a Null-typed column
1998        // exists on one side and the parent struct has null rows.
1999        // Arrow's Null type has no null bitmap, so passing one to ArrayData::try_new panics.
2000        let left_struct = StructArray::new(
2001            Fields::from(vec![Field::new("a", DataType::Int32, true)]),
2002            vec![Arc::new(Int32Array::from(vec![Some(1), None])) as ArrayRef],
2003            Some(vec![true, false].into()),
2004        );
2005        let right_struct = StructArray::new(
2006            Fields::from(vec![Field::new("b", DataType::Null, true)]),
2007            vec![Arc::new(NullArray::new(2)) as ArrayRef],
2008            Some(vec![true, false].into()),
2009        );
2010
2011        // Previously panicked: "Arrays of type Null cannot contain a null bitmask"
2012        let merged = merge(&left_struct, &right_struct);
2013        assert_eq!(merged.len(), 2);
2014        let b_col = merged.column_by_name("b").unwrap();
2015        // DataType::Null implies all-null by definition; no null bitmap is stored.
2016        assert_eq!(b_col.data_type(), &DataType::Null);
2017        assert_eq!(b_col.len(), 2);
2018    }
2019
2020    #[test]
2021    fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
2022        // left_list setup
2023        let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
2024        let left_count = Arc::new(Int32Array::from(vec![None, None]));
2025        let left_struct = Arc::new(StructArray::new(
2026            Fields::from(vec![
2027                Field::new("company_id", DataType::Int32, true),
2028                Field::new("count", DataType::Int32, true),
2029            ]),
2030            vec![left_company_id, left_count],
2031            None,
2032        ));
2033        let left_list = Arc::new(ListArray::new(
2034            Arc::new(Field::new(
2035                "item",
2036                DataType::Struct(left_struct.fields().clone()),
2037                true,
2038            )),
2039            OffsetBuffer::from_lengths([2]),
2040            left_struct,
2041            None,
2042        ));
2043
2044        // Right List Setup
2045        let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
2046        let right_struct = Arc::new(StructArray::new(
2047            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2048            vec![right_company_name],
2049            None,
2050        ));
2051        let right_list = Arc::new(ListArray::new(
2052            Arc::new(Field::new(
2053                "item",
2054                DataType::Struct(right_struct.fields().clone()),
2055                true,
2056            )),
2057            OffsetBuffer::from_lengths([2]),
2058            right_struct,
2059            None,
2060        ));
2061
2062        let target_fields = Fields::from(vec![Field::new(
2063            "companies",
2064            DataType::List(Arc::new(Field::new(
2065                "item",
2066                DataType::Struct(Fields::from(vec![
2067                    Field::new("company_id", DataType::Int32, true),
2068                    Field::new("company_name", DataType::Utf8, true),
2069                    Field::new("count", DataType::Int32, true),
2070                ])),
2071                true,
2072            ))),
2073            true,
2074        )]);
2075
2076        let left_batch = RecordBatch::try_new(
2077            Arc::new(Schema::new(vec![Field::new(
2078                "companies",
2079                left_list.data_type().clone(),
2080                true,
2081            )])),
2082            vec![left_list as ArrayRef],
2083        )
2084        .unwrap();
2085
2086        let right_batch = RecordBatch::try_new(
2087            Arc::new(Schema::new(vec![Field::new(
2088                "companies",
2089                right_list.data_type().clone(),
2090                true,
2091            )])),
2092            vec![right_list as ArrayRef],
2093        )
2094        .unwrap();
2095
2096        let merged = left_batch
2097            .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2098            .unwrap();
2099
2100        // Verify the merged structure
2101        let merged_list = merged
2102            .column_by_name("companies")
2103            .unwrap()
2104            .as_any()
2105            .downcast_ref::<ListArray>()
2106            .unwrap();
2107        let merged_struct = merged_list.values().as_struct();
2108
2109        // Should have all 3 fields
2110        assert_eq!(merged_struct.num_columns(), 3);
2111        assert!(merged_struct.column_by_name("company_id").is_some());
2112        assert!(merged_struct.column_by_name("company_name").is_some());
2113        assert!(merged_struct.column_by_name("count").is_some());
2114
2115        // Verify values
2116        let company_id = merged_struct
2117            .column_by_name("company_id")
2118            .unwrap()
2119            .as_any()
2120            .downcast_ref::<Int32Array>()
2121            .unwrap();
2122        assert!(company_id.is_null(0));
2123        assert!(company_id.is_null(1));
2124
2125        let company_name = merged_struct
2126            .column_by_name("company_name")
2127            .unwrap()
2128            .as_any()
2129            .downcast_ref::<StringArray>()
2130            .unwrap();
2131        assert_eq!(company_name.value(0), "Google");
2132        assert_eq!(company_name.value(1), "Microsoft");
2133
2134        let count = merged_struct
2135            .column_by_name("count")
2136            .unwrap()
2137            .as_any()
2138            .downcast_ref::<Int32Array>()
2139            .unwrap();
2140        assert!(count.is_null(0));
2141        assert!(count.is_null(1));
2142    }
2143
2144    #[test]
2145    fn test_merge_struct_lists() {
2146        test_merge_struct_lists_generic::<i32>();
2147    }
2148
2149    #[test]
2150    fn test_merge_struct_large_lists() {
2151        test_merge_struct_lists_generic::<i64>();
2152    }
2153
2154    fn test_merge_struct_lists_generic<O: OffsetSizeTrait>() {
2155        // left_list setup
2156        let left_company_id = Arc::new(Int32Array::from(vec![
2157            Some(1),
2158            Some(2),
2159            Some(3),
2160            Some(4),
2161            Some(5),
2162            Some(6),
2163            Some(7),
2164            Some(8),
2165            Some(9),
2166            Some(10),
2167            Some(11),
2168            Some(12),
2169            Some(13),
2170            Some(14),
2171            Some(15),
2172            Some(16),
2173            Some(17),
2174            Some(18),
2175            Some(19),
2176            Some(20),
2177        ]));
2178        let left_count = Arc::new(Int32Array::from(vec![
2179            Some(10),
2180            Some(20),
2181            Some(30),
2182            Some(40),
2183            Some(50),
2184            Some(60),
2185            Some(70),
2186            Some(80),
2187            Some(90),
2188            Some(100),
2189            Some(110),
2190            Some(120),
2191            Some(130),
2192            Some(140),
2193            Some(150),
2194            Some(160),
2195            Some(170),
2196            Some(180),
2197            Some(190),
2198            Some(200),
2199        ]));
2200        let left_struct = Arc::new(StructArray::new(
2201            Fields::from(vec![
2202                Field::new("company_id", DataType::Int32, true),
2203                Field::new("count", DataType::Int32, true),
2204            ]),
2205            vec![left_company_id, left_count],
2206            None,
2207        ));
2208
2209        let left_list = Arc::new(GenericListArray::<O>::new(
2210            Arc::new(Field::new(
2211                "item",
2212                DataType::Struct(left_struct.fields().clone()),
2213                true,
2214            )),
2215            OffsetBuffer::from_lengths([3, 1]),
2216            left_struct.clone(),
2217            None,
2218        ));
2219
2220        let left_list_struct = Arc::new(StructArray::new(
2221            Fields::from(vec![Field::new(
2222                "companies",
2223                if O::IS_LARGE {
2224                    DataType::LargeList(Arc::new(Field::new(
2225                        "item",
2226                        DataType::Struct(left_struct.fields().clone()),
2227                        true,
2228                    )))
2229                } else {
2230                    DataType::List(Arc::new(Field::new(
2231                        "item",
2232                        DataType::Struct(left_struct.fields().clone()),
2233                        true,
2234                    )))
2235                },
2236                true,
2237            )]),
2238            vec![left_list as ArrayRef],
2239            None,
2240        ));
2241
2242        // right_list setup
2243        let right_company_name = Arc::new(StringArray::from(vec![
2244            "Google",
2245            "Microsoft",
2246            "Apple",
2247            "Facebook",
2248        ]));
2249        let right_struct = Arc::new(StructArray::new(
2250            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2251            vec![right_company_name],
2252            None,
2253        ));
2254        let right_list = Arc::new(GenericListArray::<O>::new(
2255            Arc::new(Field::new(
2256                "item",
2257                DataType::Struct(right_struct.fields().clone()),
2258                true,
2259            )),
2260            OffsetBuffer::from_lengths([3, 1]),
2261            right_struct.clone(),
2262            None,
2263        ));
2264
2265        let right_list_struct = Arc::new(StructArray::new(
2266            Fields::from(vec![Field::new(
2267                "companies",
2268                if O::IS_LARGE {
2269                    DataType::LargeList(Arc::new(Field::new(
2270                        "item",
2271                        DataType::Struct(right_struct.fields().clone()),
2272                        true,
2273                    )))
2274                } else {
2275                    DataType::List(Arc::new(Field::new(
2276                        "item",
2277                        DataType::Struct(right_struct.fields().clone()),
2278                        true,
2279                    )))
2280                },
2281                true,
2282            )]),
2283            vec![right_list as ArrayRef],
2284            None,
2285        ));
2286
2287        // prepare schema
2288        let target_fields = Fields::from(vec![Field::new(
2289            "companies",
2290            if O::IS_LARGE {
2291                DataType::LargeList(Arc::new(Field::new(
2292                    "item",
2293                    DataType::Struct(Fields::from(vec![
2294                        Field::new("company_id", DataType::Int32, true),
2295                        Field::new("company_name", DataType::Utf8, true),
2296                        Field::new("count", DataType::Int32, true),
2297                    ])),
2298                    true,
2299                )))
2300            } else {
2301                DataType::List(Arc::new(Field::new(
2302                    "item",
2303                    DataType::Struct(Fields::from(vec![
2304                        Field::new("company_id", DataType::Int32, true),
2305                        Field::new("company_name", DataType::Utf8, true),
2306                        Field::new("count", DataType::Int32, true),
2307                    ])),
2308                    true,
2309                )))
2310            },
2311            true,
2312        )]);
2313
2314        // merge left_list and right_list
2315        let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
2316        assert_eq!(merged_array.len(), 2);
2317    }
2318
2319    #[test]
2320    fn test_project_by_schema_list_struct_reorder() {
2321        // Test that project_by_schema correctly reorders fields inside List<Struct>
2322        // This is a regression test for issue #5702
2323
2324        // Source schema with inner struct fields in order: c, b, a
2325        let source_inner_struct = DataType::Struct(Fields::from(vec![
2326            Field::new("c", DataType::Utf8, true),
2327            Field::new("b", DataType::Utf8, true),
2328            Field::new("a", DataType::Utf8, true),
2329        ]));
2330        let source_schema = Arc::new(Schema::new(vec![
2331            Field::new("id", DataType::Int32, false),
2332            Field::new(
2333                "data",
2334                DataType::List(Arc::new(Field::new(
2335                    "item",
2336                    source_inner_struct.clone(),
2337                    true,
2338                ))),
2339                true,
2340            ),
2341        ]));
2342
2343        // Create source data with c, b, a order
2344        let c_array = StringArray::from(vec!["c1", "c2"]);
2345        let b_array = StringArray::from(vec!["b1", "b2"]);
2346        let a_array = StringArray::from(vec!["a1", "a2"]);
2347        let inner_struct = StructArray::from(vec![
2348            (
2349                Arc::new(Field::new("c", DataType::Utf8, true)),
2350                Arc::new(c_array) as ArrayRef,
2351            ),
2352            (
2353                Arc::new(Field::new("b", DataType::Utf8, true)),
2354                Arc::new(b_array) as ArrayRef,
2355            ),
2356            (
2357                Arc::new(Field::new("a", DataType::Utf8, true)),
2358                Arc::new(a_array) as ArrayRef,
2359            ),
2360        ]);
2361
2362        let list_array = ListArray::new(
2363            Arc::new(Field::new("item", source_inner_struct, true)),
2364            OffsetBuffer::from_lengths([1, 1]),
2365            Arc::new(inner_struct),
2366            None,
2367        );
2368
2369        let batch = RecordBatch::try_new(
2370            source_schema,
2371            vec![Arc::new(Int32Array::from(vec![1, 2])), Arc::new(list_array)],
2372        )
2373        .unwrap();
2374
2375        // Target schema with inner struct fields in order: a, b, c
2376        let target_inner_struct = DataType::Struct(Fields::from(vec![
2377            Field::new("a", DataType::Utf8, true),
2378            Field::new("b", DataType::Utf8, true),
2379            Field::new("c", DataType::Utf8, true),
2380        ]));
2381        let target_schema = Schema::new(vec![
2382            Field::new("id", DataType::Int32, false),
2383            Field::new(
2384                "data",
2385                DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2386                true,
2387            ),
2388        ]);
2389
2390        // Project should reorder the inner struct fields
2391        let projected = batch.project_by_schema(&target_schema).unwrap();
2392
2393        // Verify the schema is correct
2394        assert_eq!(projected.schema().as_ref(), &target_schema);
2395
2396        // Verify the data is correct by checking inner struct field order
2397        let projected_list = projected.column(1).as_list::<i32>();
2398        let projected_struct = projected_list.values().as_struct();
2399
2400        // Fields should now be in order: a, b, c
2401        assert_eq!(
2402            projected_struct.column_by_name("a").unwrap().as_ref(),
2403            &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2404        );
2405        assert_eq!(
2406            projected_struct.column_by_name("b").unwrap().as_ref(),
2407            &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2408        );
2409        assert_eq!(
2410            projected_struct.column_by_name("c").unwrap().as_ref(),
2411            &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2412        );
2413
2414        // Also verify positional access matches expected order (a=0, b=1, c=2)
2415        assert_eq!(
2416            projected_struct.column(0).as_ref(),
2417            &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2418        );
2419        assert_eq!(
2420            projected_struct.column(1).as_ref(),
2421            &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2422        );
2423        assert_eq!(
2424            projected_struct.column(2).as_ref(),
2425            &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2426        );
2427    }
2428
2429    #[test]
2430    fn test_project_by_schema_nested_list_struct() {
2431        // Test deeply nested List<Struct<List<Struct>>> projection
2432        let inner_struct = DataType::Struct(Fields::from(vec![
2433            Field::new("y", DataType::Int32, true),
2434            Field::new("x", DataType::Int32, true),
2435        ]));
2436        let source_schema = Arc::new(Schema::new(vec![Field::new(
2437            "outer",
2438            DataType::List(Arc::new(Field::new(
2439                "item",
2440                DataType::Struct(Fields::from(vec![
2441                    Field::new("b", DataType::Utf8, true),
2442                    Field::new(
2443                        "inner_list",
2444                        DataType::List(Arc::new(Field::new("item", inner_struct.clone(), true))),
2445                        true,
2446                    ),
2447                    Field::new("a", DataType::Utf8, true),
2448                ])),
2449                true,
2450            ))),
2451            true,
2452        )]));
2453
2454        // Create deeply nested data
2455        let y_array = Int32Array::from(vec![1, 2]);
2456        let x_array = Int32Array::from(vec![3, 4]);
2457        let innermost_struct = StructArray::from(vec![
2458            (
2459                Arc::new(Field::new("y", DataType::Int32, true)),
2460                Arc::new(y_array) as ArrayRef,
2461            ),
2462            (
2463                Arc::new(Field::new("x", DataType::Int32, true)),
2464                Arc::new(x_array) as ArrayRef,
2465            ),
2466        ]);
2467        let inner_list = ListArray::new(
2468            Arc::new(Field::new("item", inner_struct.clone(), true)),
2469            OffsetBuffer::from_lengths([2]),
2470            Arc::new(innermost_struct),
2471            None,
2472        );
2473
2474        let b_array = StringArray::from(vec!["b1"]);
2475        let a_array = StringArray::from(vec!["a1"]);
2476        let middle_struct = StructArray::from(vec![
2477            (
2478                Arc::new(Field::new("b", DataType::Utf8, true)),
2479                Arc::new(b_array) as ArrayRef,
2480            ),
2481            (
2482                Arc::new(Field::new(
2483                    "inner_list",
2484                    DataType::List(Arc::new(Field::new("item", inner_struct, true))),
2485                    true,
2486                )),
2487                Arc::new(inner_list) as ArrayRef,
2488            ),
2489            (
2490                Arc::new(Field::new("a", DataType::Utf8, true)),
2491                Arc::new(a_array) as ArrayRef,
2492            ),
2493        ]);
2494
2495        let outer_list = ListArray::new(
2496            Arc::new(Field::new("item", middle_struct.data_type().clone(), true)),
2497            OffsetBuffer::from_lengths([1]),
2498            Arc::new(middle_struct),
2499            None,
2500        );
2501
2502        let batch =
2503            RecordBatch::try_new(source_schema, vec![Arc::new(outer_list) as ArrayRef]).unwrap();
2504
2505        // Target schema with reordered fields at all levels
2506        let target_inner_struct = DataType::Struct(Fields::from(vec![
2507            Field::new("x", DataType::Int32, true), // x before y now
2508            Field::new("y", DataType::Int32, true),
2509        ]));
2510        let target_schema = Schema::new(vec![Field::new(
2511            "outer",
2512            DataType::List(Arc::new(Field::new(
2513                "item",
2514                DataType::Struct(Fields::from(vec![
2515                    Field::new("a", DataType::Utf8, true), // a before b now
2516                    Field::new(
2517                        "inner_list",
2518                        DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2519                        true,
2520                    ),
2521                    Field::new("b", DataType::Utf8, true),
2522                ])),
2523                true,
2524            ))),
2525            true,
2526        )]);
2527
2528        let projected = batch.project_by_schema(&target_schema).unwrap();
2529
2530        // Verify schema
2531        assert_eq!(projected.schema().as_ref(), &target_schema);
2532
2533        // Verify deeply nested data is reordered correctly
2534        let outer_list = projected.column(0).as_list::<i32>();
2535        let middle_struct = outer_list.values().as_struct();
2536
2537        // Middle struct should have a first, then inner_list, then b
2538        assert_eq!(
2539            middle_struct.column(0).as_ref(),
2540            &StringArray::from(vec!["a1"]) as &dyn Array
2541        );
2542        assert_eq!(
2543            middle_struct.column(2).as_ref(),
2544            &StringArray::from(vec!["b1"]) as &dyn Array
2545        );
2546
2547        // Inner list's struct should have x first, then y
2548        let inner_list = middle_struct.column(1).as_list::<i32>();
2549        let innermost_struct = inner_list.values().as_struct();
2550        assert_eq!(
2551            innermost_struct.column(0).as_ref(),
2552            &Int32Array::from(vec![3, 4]) as &dyn Array
2553        );
2554        assert_eq!(
2555            innermost_struct.column(1).as_ref(),
2556            &Int32Array::from(vec![1, 2]) as &dyn Array
2557        );
2558    }
2559}