Skip to main content

lance_arrow/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend Arrow Functionality
5//!
6//! To improve Arrow-RS ergonomic
7
8use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12    Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray, GenericListArray,
13    LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray,
14    UInt8Array, UInt32Array, cast::AsArray,
15};
16use arrow_array::{
17    Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, new_null_array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SortOptions};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30use crate::list::ListArrayExt;
31pub use floats::*;
32
33pub mod ipc;
34pub mod json;
35pub mod list;
36pub mod memory;
37pub mod scalar;
38pub mod stream;
39pub mod r#struct;
40
41/// Arrow extension metadata key for extension name
42pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
43
44/// Arrow extension metadata key for extension metadata
45pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
46
47/// Key used by lance to mark a field as a blob
48/// TODO: Use Arrow extension mechanism instead?
49pub const BLOB_META_KEY: &str = "lance-encoding:blob";
50/// Arrow extension type name for Lance blob v2 columns
51pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
52/// Metadata key for overriding the dedicated blob size threshold (in bytes)
53pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str =
54    "lance-encoding:blob-dedicated-size-threshold";
55/// Metadata key for overriding the inline blob size threshold (in bytes)
56pub const BLOB_INLINE_SIZE_THRESHOLD_META_KEY: &str = "lance-encoding:blob-inline-size-threshold";
57
58type Result<T> = std::result::Result<T, ArrowError>;
59
60pub trait DataTypeExt {
61    /// Returns true if the data type is binary-like, such as Utf8, Binary, or the large and/or view variants.
62    fn is_binary_like(&self) -> bool;
63
64    /// Returns true if the data type is a struct.
65    fn is_struct(&self) -> bool;
66
67    /// Check whether the given Arrow DataType is fixed stride.
68    ///
69    /// A fixed stride type has the same byte width for all array elements
70    /// This includes all PrimitiveType's Boolean, FixedSizeList, FixedSizeBinary, and Decimals
71    fn is_fixed_stride(&self) -> bool;
72
73    /// Returns true if the [DataType] is a dictionary type.
74    fn is_dictionary(&self) -> bool;
75
76    /// Returns the byte width of the data type
77    /// Panics if the data type is not fixed stride.
78    fn byte_width(&self) -> usize;
79
80    /// Returns the byte width of the data type, if it is fixed stride.
81    /// Returns None if the data type is not fixed stride.
82    fn byte_width_opt(&self) -> Option<usize>;
83}
84
85impl DataTypeExt for DataType {
86    fn is_binary_like(&self) -> bool {
87        use DataType::*;
88        matches!(
89            self,
90            Utf8 | Binary | LargeUtf8 | LargeBinary | Utf8View | BinaryView
91        )
92    }
93
94    fn is_struct(&self) -> bool {
95        matches!(self, Self::Struct(_))
96    }
97
98    fn is_fixed_stride(&self) -> bool {
99        use DataType::*;
100        matches!(
101            self,
102            Boolean
103                | UInt8
104                | UInt16
105                | UInt32
106                | UInt64
107                | Int8
108                | Int16
109                | Int32
110                | Int64
111                | Float16
112                | Float32
113                | Float64
114                | Decimal128(_, _)
115                | Decimal256(_, _)
116                | FixedSizeList(_, _)
117                | FixedSizeBinary(_)
118                | Duration(_)
119                | Timestamp(_, _)
120                | Date32
121                | Date64
122                | Time32(_)
123                | Time64(_)
124        )
125    }
126
127    fn is_dictionary(&self) -> bool {
128        matches!(self, Self::Dictionary(_, _))
129    }
130
131    fn byte_width_opt(&self) -> Option<usize> {
132        match self {
133            Self::Int8 => Some(1),
134            Self::Int16 => Some(2),
135            Self::Int32 => Some(4),
136            Self::Int64 => Some(8),
137            Self::UInt8 => Some(1),
138            Self::UInt16 => Some(2),
139            Self::UInt32 => Some(4),
140            Self::UInt64 => Some(8),
141            Self::Float16 => Some(2),
142            Self::Float32 => Some(4),
143            Self::Float64 => Some(8),
144            Self::Date32 => Some(4),
145            Self::Date64 => Some(8),
146            Self::Time32(_) => Some(4),
147            Self::Time64(_) => Some(8),
148            Self::Timestamp(_, _) => Some(8),
149            Self::Duration(_) => Some(8),
150            Self::Decimal128(_, _) => Some(16),
151            Self::Decimal256(_, _) => Some(32),
152            Self::Interval(unit) => match unit {
153                IntervalUnit::YearMonth => Some(4),
154                IntervalUnit::DayTime => Some(8),
155                IntervalUnit::MonthDayNano => Some(16),
156            },
157            Self::FixedSizeBinary(s) => Some(*s as usize),
158            Self::FixedSizeList(dt, s) => dt
159                .data_type()
160                .byte_width_opt()
161                .map(|width| width * *s as usize),
162            _ => None,
163        }
164    }
165
166    fn byte_width(&self) -> usize {
167        self.byte_width_opt()
168            .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
169    }
170}
171
172/// Create an [`GenericListArray`] from values and offsets.
173///
174/// ```
175/// use arrow_array::{Int32Array, Int64Array, ListArray};
176/// use arrow_array::types::Int64Type;
177/// use lance_arrow::try_new_generic_list_array;
178///
179/// let offsets = Int32Array::from_iter([0, 2, 7, 10]);
180/// let int_values = Int64Array::from_iter(0..10);
181/// let list_arr = try_new_generic_list_array(int_values, &offsets).unwrap();
182/// assert_eq!(list_arr,
183///     ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
184///         Some(vec![Some(0), Some(1)]),
185///         Some(vec![Some(2), Some(3), Some(4), Some(5), Some(6)]),
186///         Some(vec![Some(7), Some(8), Some(9)]),
187/// ]))
188/// ```
189pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
190    values: T,
191    offsets: &PrimitiveArray<Offset>,
192) -> Result<GenericListArray<Offset::Native>>
193where
194    Offset::Native: OffsetSizeTrait,
195{
196    let data_type = if Offset::Native::IS_LARGE {
197        DataType::LargeList(Arc::new(Field::new(
198            "item",
199            values.data_type().clone(),
200            true,
201        )))
202    } else {
203        DataType::List(Arc::new(Field::new(
204            "item",
205            values.data_type().clone(),
206            true,
207        )))
208    };
209    let data = ArrayDataBuilder::new(data_type)
210        .len(offsets.len() - 1)
211        .add_buffer(offsets.into_data().buffers()[0].clone())
212        .add_child_data(values.into_data())
213        .build()?;
214
215    Ok(GenericListArray::from(data))
216}
217
218pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
219    DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
220}
221
222pub trait FixedSizeListArrayExt {
223    /// Create an [`FixedSizeListArray`] from values and list size.
224    ///
225    /// ```
226    /// use arrow_array::{Int64Array, FixedSizeListArray};
227    /// use arrow_array::types::Int64Type;
228    /// use lance_arrow::FixedSizeListArrayExt;
229    ///
230    /// let int_values = Int64Array::from_iter(0..10);
231    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 2).unwrap();
232    /// assert_eq!(fixed_size_list_arr,
233    ///     FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(vec![
234    ///         Some(vec![Some(0), Some(1)]),
235    ///         Some(vec![Some(2), Some(3)]),
236    ///         Some(vec![Some(4), Some(5)]),
237    ///         Some(vec![Some(6), Some(7)]),
238    ///         Some(vec![Some(8), Some(9)])
239    /// ], 2))
240    /// ```
241    fn try_new_from_values<T: Array + 'static>(
242        values: T,
243        list_size: i32,
244    ) -> Result<FixedSizeListArray>;
245
246    /// Sample `n` rows from the [FixedSizeListArray]
247    ///
248    /// ```
249    /// use arrow_array::{Int64Array, FixedSizeListArray, Array};
250    /// use lance_arrow::FixedSizeListArrayExt;
251    ///
252    /// let int_values = Int64Array::from_iter(0..256);
253    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 16).unwrap();
254    /// let sampled = fixed_size_list_arr.sample(10).unwrap();
255    /// assert_eq!(sampled.len(), 10);
256    /// assert_eq!(sampled.value_length(), 16);
257    /// assert_eq!(sampled.values().len(), 160);
258    /// ```
259    fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
260
261    /// Ensure the [FixedSizeListArray] of Float16, Float32, Float64,
262    /// Int8, Int16, Int32, Int64, UInt8, UInt32 type to its closest floating point type.
263    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
264}
265
266impl FixedSizeListArrayExt for FixedSizeListArray {
267    fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
268        let field = Arc::new(Field::new("item", values.data_type().clone(), true));
269        let values = Arc::new(values);
270
271        Self::try_new(field, list_size, values, None)
272    }
273
274    fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
275        if n >= self.len() {
276            return Ok(self.clone());
277        }
278        let mut rng = SmallRng::from_os_rng();
279        let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
280        take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
281    }
282
283    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
284        match self.data_type() {
285            DataType::FixedSizeList(field, size) => match field.data_type() {
286                DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
287                DataType::Int8 => Ok(Self::new(
288                    Arc::new(arrow_schema::Field::new(
289                        field.name(),
290                        DataType::Float32,
291                        field.is_nullable(),
292                    )),
293                    *size,
294                    Arc::new(Float32Array::from_iter_values(
295                        self.values()
296                            .as_any()
297                            .downcast_ref::<Int8Array>()
298                            .ok_or(ArrowError::ParseError(
299                                "Fail to cast primitive array to Int8Type".to_string(),
300                            ))?
301                            .into_iter()
302                            .filter_map(|x| x.map(|y| y as f32)),
303                    )),
304                    self.nulls().cloned(),
305                )),
306                DataType::Int16 => Ok(Self::new(
307                    Arc::new(arrow_schema::Field::new(
308                        field.name(),
309                        DataType::Float32,
310                        field.is_nullable(),
311                    )),
312                    *size,
313                    Arc::new(Float32Array::from_iter_values(
314                        self.values()
315                            .as_any()
316                            .downcast_ref::<Int16Array>()
317                            .ok_or(ArrowError::ParseError(
318                                "Fail to cast primitive array to Int16Type".to_string(),
319                            ))?
320                            .into_iter()
321                            .filter_map(|x| x.map(|y| y as f32)),
322                    )),
323                    self.nulls().cloned(),
324                )),
325                DataType::Int32 => Ok(Self::new(
326                    Arc::new(arrow_schema::Field::new(
327                        field.name(),
328                        DataType::Float32,
329                        field.is_nullable(),
330                    )),
331                    *size,
332                    Arc::new(Float32Array::from_iter_values(
333                        self.values()
334                            .as_any()
335                            .downcast_ref::<Int32Array>()
336                            .ok_or(ArrowError::ParseError(
337                                "Fail to cast primitive array to Int32Type".to_string(),
338                            ))?
339                            .into_iter()
340                            .filter_map(|x| x.map(|y| y as f32)),
341                    )),
342                    self.nulls().cloned(),
343                )),
344                DataType::Int64 => Ok(Self::new(
345                    Arc::new(arrow_schema::Field::new(
346                        field.name(),
347                        DataType::Float64,
348                        field.is_nullable(),
349                    )),
350                    *size,
351                    Arc::new(Float64Array::from_iter_values(
352                        self.values()
353                            .as_any()
354                            .downcast_ref::<Int64Array>()
355                            .ok_or(ArrowError::ParseError(
356                                "Fail to cast primitive array to Int64Type".to_string(),
357                            ))?
358                            .into_iter()
359                            .filter_map(|x| x.map(|y| y as f64)),
360                    )),
361                    self.nulls().cloned(),
362                )),
363                DataType::UInt8 => Ok(Self::new(
364                    Arc::new(arrow_schema::Field::new(
365                        field.name(),
366                        DataType::Float64,
367                        field.is_nullable(),
368                    )),
369                    *size,
370                    Arc::new(Float64Array::from_iter_values(
371                        self.values()
372                            .as_any()
373                            .downcast_ref::<UInt8Array>()
374                            .ok_or(ArrowError::ParseError(
375                                "Fail to cast primitive array to UInt8Type".to_string(),
376                            ))?
377                            .into_iter()
378                            .filter_map(|x| x.map(|y| y as f64)),
379                    )),
380                    self.nulls().cloned(),
381                )),
382                DataType::UInt32 => Ok(Self::new(
383                    Arc::new(arrow_schema::Field::new(
384                        field.name(),
385                        DataType::Float64,
386                        field.is_nullable(),
387                    )),
388                    *size,
389                    Arc::new(Float64Array::from_iter_values(
390                        self.values()
391                            .as_any()
392                            .downcast_ref::<UInt32Array>()
393                            .ok_or(ArrowError::ParseError(
394                                "Fail to cast primitive array to UInt32Type".to_string(),
395                            ))?
396                            .into_iter()
397                            .filter_map(|x| x.map(|y| y as f64)),
398                    )),
399                    self.nulls().cloned(),
400                )),
401                data_type => Err(ArrowError::ParseError(format!(
402                    "Expect either floating type or integer got {:?}",
403                    data_type
404                ))),
405            },
406            data_type => Err(ArrowError::ParseError(format!(
407                "Expect either FixedSizeList got {:?}",
408                data_type
409            ))),
410        }
411    }
412}
413
414/// Force downcast of an [`Array`], such as an [`ArrayRef`], to
415/// [`FixedSizeListArray`], panic'ing on failure.
416pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
417    arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
418}
419
420pub trait FixedSizeBinaryArrayExt {
421    /// Create an [`FixedSizeBinaryArray`] from values and stride.
422    ///
423    /// ```
424    /// use arrow_array::{UInt8Array, FixedSizeBinaryArray};
425    /// use arrow_array::types::UInt8Type;
426    /// use lance_arrow::FixedSizeBinaryArrayExt;
427    ///
428    /// let int_values = UInt8Array::from_iter(0..10);
429    /// let fixed_size_list_arr = FixedSizeBinaryArray::try_new_from_values(&int_values, 2).unwrap();
430    /// assert_eq!(fixed_size_list_arr,
431    ///     FixedSizeBinaryArray::from(vec![
432    ///         Some(vec![0, 1].as_slice()),
433    ///         Some(vec![2, 3].as_slice()),
434    ///         Some(vec![4, 5].as_slice()),
435    ///         Some(vec![6, 7].as_slice()),
436    ///         Some(vec![8, 9].as_slice())
437    /// ]))
438    /// ```
439    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
440}
441
442impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
443    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
444        let data_type = DataType::FixedSizeBinary(stride);
445        let data = ArrayDataBuilder::new(data_type)
446            .len(values.len() / stride as usize)
447            .add_buffer(values.into_data().buffers()[0].clone())
448            .build()?;
449        Ok(Self::from(data))
450    }
451}
452
453pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
454    arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
455}
456
457pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
458    match arr.data_type() {
459        DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
460        DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
461        _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
462    }
463}
464
465/// Extends Arrow's [RecordBatch].
466pub trait RecordBatchExt {
467    /// Append a new column to this [`RecordBatch`] and returns a new RecordBatch.
468    ///
469    /// ```
470    /// use std::sync::Arc;
471    /// use arrow_array::{RecordBatch, Int32Array, StringArray};
472    /// use arrow_schema::{Schema, Field, DataType};
473    /// use lance_arrow::*;
474    ///
475    /// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
476    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
477    /// let record_batch = RecordBatch::try_new(schema, vec![int_arr.clone()]).unwrap();
478    ///
479    /// let new_field = Field::new("s", DataType::Utf8, true);
480    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
481    /// let new_record_batch = record_batch.try_with_column(new_field, str_arr.clone()).unwrap();
482    ///
483    /// assert_eq!(
484    ///     new_record_batch,
485    ///     RecordBatch::try_new(
486    ///         Arc::new(Schema::new(
487    ///             vec![
488    ///                 Field::new("a", DataType::Int32, true),
489    ///                 Field::new("s", DataType::Utf8, true)
490    ///             ])
491    ///         ),
492    ///         vec![int_arr, str_arr],
493    ///     ).unwrap()
494    /// )
495    /// ```
496    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
497
498    /// Created a new RecordBatch with column at index.
499    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
500
501    /// Creates a new [`RecordBatch`] from the provided  [`StructArray`].
502    ///
503    /// The fields on the [`StructArray`] need to match this [`RecordBatch`] schema
504    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
505
506    /// Merge with another [`RecordBatch`] and returns a new one.
507    ///
508    /// Fields are merged based on name.  First we iterate the left columns.  If a matching
509    /// name is found in the right then we merge the two columns.  If there is no match then
510    /// we add the left column to the output.
511    ///
512    /// To merge two columns we consider the type.  If both arrays are struct arrays we recurse.
513    /// Otherwise we use the left array.
514    ///
515    /// Afterwards we add all non-matching right columns to the output.
516    ///
517    /// Note: This method likely does not handle nested fields correctly and you may want to consider
518    /// using [`Self::merge_with_schema`] instead.
519    /// ```
520    /// use std::sync::Arc;
521    /// use arrow_array::*;
522    /// use arrow_schema::{Schema, Field, DataType};
523    /// use lance_arrow::*;
524    ///
525    /// let left_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
526    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
527    /// let left = RecordBatch::try_new(left_schema, vec![int_arr.clone()]).unwrap();
528    ///
529    /// let right_schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
530    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
531    /// let right = RecordBatch::try_new(right_schema, vec![str_arr.clone()]).unwrap();
532    ///
533    /// let new_record_batch = left.merge(&right).unwrap();
534    ///
535    /// assert_eq!(
536    ///     new_record_batch,
537    ///     RecordBatch::try_new(
538    ///         Arc::new(Schema::new(
539    ///             vec![
540    ///                 Field::new("a", DataType::Int32, true),
541    ///                 Field::new("s", DataType::Utf8, true)
542    ///             ])
543    ///         ),
544    ///         vec![int_arr, str_arr],
545    ///     ).unwrap()
546    /// )
547    /// ```
548    ///
549    /// TODO: add merge nested fields support.
550    fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
551
552    /// Create a batch by merging columns between two batches with a given schema.
553    ///
554    /// A reference schema is used to determine the proper ordering of nested fields.
555    ///
556    /// For each field in the reference schema we look for corresponding fields in
557    /// the left and right batches.  If a field is found in both batches we recursively merge
558    /// it.
559    ///
560    /// If a field is only in the left or right batch we take it as it is.
561    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
562
563    /// Drop one column specified with the name and return the new [`RecordBatch`].
564    ///
565    /// If the named column does not exist, it returns a copy of this [`RecordBatch`].
566    fn drop_column(&self, name: &str) -> Result<RecordBatch>;
567
568    /// Replace a column (specified by name) and return the new [`RecordBatch`].
569    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
570
571    /// Replace a column schema (specified by name) and return the new [`RecordBatch`].
572    fn replace_column_schema_by_name(
573        &self,
574        name: &str,
575        new_data_type: DataType,
576        column: Arc<dyn Array>,
577    ) -> Result<RecordBatch>;
578
579    /// Rename a column at a given index.
580    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
581
582    /// Get (potentially nested) column by qualified name.
583    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
584
585    /// Project the schema over the [RecordBatch].
586    fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
587
588    /// metadata of the schema.
589    fn metadata(&self) -> &HashMap<String, String>;
590
591    /// Add metadata to the schema.
592    fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
593        let mut metadata = self.metadata().clone();
594        metadata.insert(key, value);
595        self.with_metadata(metadata)
596    }
597
598    /// Replace the schema metadata with the provided one.
599    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
600
601    /// Take selected rows from the [RecordBatch].
602    fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
603
604    /// Create a new RecordBatch with compacted memory after slicing.
605    fn shrink_to_fit(&self) -> Result<RecordBatch>;
606
607    /// Helper method to sort the RecordBatch by a column
608    fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<RecordBatch>;
609}
610
611impl RecordBatchExt for RecordBatch {
612    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
613        let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
614        let mut new_columns = self.columns().to_vec();
615        new_columns.push(arr);
616        Self::try_new(new_schema, new_columns)
617    }
618
619    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
620        let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
621        let mut new_columns = self.columns().to_vec();
622        new_columns.insert(index, arr);
623        Self::try_new(new_schema, new_columns)
624    }
625
626    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
627        let schema = Arc::new(Schema::new_with_metadata(
628            arr.fields().to_vec(),
629            self.schema().metadata.clone(),
630        ));
631        let batch = Self::from(arr);
632        batch.with_schema(schema)
633    }
634
635    fn merge(&self, other: &Self) -> Result<Self> {
636        if self.num_rows() != other.num_rows() {
637            return Err(ArrowError::InvalidArgumentError(format!(
638                "Attempt to merge two RecordBatch with different sizes: {} != {}",
639                self.num_rows(),
640                other.num_rows()
641            )));
642        }
643        let left_struct_array: StructArray = self.clone().into();
644        let right_struct_array: StructArray = other.clone().into();
645        self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
646    }
647
648    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
649        if self.num_rows() != other.num_rows() {
650            return Err(ArrowError::InvalidArgumentError(format!(
651                "Attempt to merge two RecordBatch with different sizes: {} != {}",
652                self.num_rows(),
653                other.num_rows()
654            )));
655        }
656        let left_struct_array: StructArray = self.clone().into();
657        let right_struct_array: StructArray = other.clone().into();
658        self.try_new_from_struct_array(merge_with_schema(
659            &left_struct_array,
660            &right_struct_array,
661            schema.fields(),
662        ))
663    }
664
665    fn drop_column(&self, name: &str) -> Result<Self> {
666        let mut fields = vec![];
667        let mut columns = vec![];
668        for i in 0..self.schema().fields.len() {
669            if self.schema().field(i).name() != name {
670                fields.push(self.schema().field(i).clone());
671                columns.push(self.column(i).clone());
672            }
673        }
674        Self::try_new(
675            Arc::new(Schema::new_with_metadata(
676                fields,
677                self.schema().metadata().clone(),
678            )),
679            columns,
680        )
681    }
682
683    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
684        let mut fields = self.schema().fields().to_vec();
685        if index >= fields.len() {
686            return Err(ArrowError::InvalidArgumentError(format!(
687                "Index out of bounds: {}",
688                index
689            )));
690        }
691        fields[index] = Arc::new(Field::new(
692            new_name,
693            fields[index].data_type().clone(),
694            fields[index].is_nullable(),
695        ));
696        Self::try_new(
697            Arc::new(Schema::new_with_metadata(
698                fields,
699                self.schema().metadata().clone(),
700            )),
701            self.columns().to_vec(),
702        )
703    }
704
705    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
706        let mut columns = self.columns().to_vec();
707        let field_i = self
708            .schema()
709            .fields()
710            .iter()
711            .position(|f| f.name() == name)
712            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
713        columns[field_i] = column;
714        Self::try_new(self.schema(), columns)
715    }
716
717    fn replace_column_schema_by_name(
718        &self,
719        name: &str,
720        new_data_type: DataType,
721        column: Arc<dyn Array>,
722    ) -> Result<RecordBatch> {
723        let fields = self
724            .schema()
725            .fields()
726            .iter()
727            .map(|x| {
728                if x.name() != name {
729                    x.clone()
730                } else {
731                    let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
732                    Arc::new(new_field)
733                }
734            })
735            .collect::<Vec<_>>();
736        let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
737        let mut columns = self.columns().to_vec();
738        let field_i = self
739            .schema()
740            .fields()
741            .iter()
742            .position(|f| f.name() == name)
743            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
744        columns[field_i] = column;
745        Self::try_new(Arc::new(schema), columns)
746    }
747
748    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
749        let split = name.split('.').collect::<Vec<_>>();
750        if split.is_empty() {
751            return None;
752        }
753
754        self.column_by_name(split[0])
755            .and_then(|arr| get_sub_array(arr, &split[1..]))
756    }
757
758    fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
759        let struct_array: StructArray = self.clone().into();
760        self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
761    }
762
763    fn metadata(&self) -> &HashMap<String, String> {
764        self.schema_ref().metadata()
765    }
766
767    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
768        let mut schema = self.schema_ref().as_ref().clone();
769        schema.metadata = metadata;
770        Self::try_new(schema.into(), self.columns().into())
771    }
772
773    fn take(&self, indices: &UInt32Array) -> Result<Self> {
774        let struct_array: StructArray = self.clone().into();
775        let taken = take(&struct_array, indices, None)?;
776        self.try_new_from_struct_array(taken.as_struct().clone())
777    }
778
779    fn shrink_to_fit(&self) -> Result<Self> {
780        // Deep copy the sliced record batch, instead of whole batch
781        crate::deepcopy::deep_copy_batch_sliced(self)
782    }
783
784    fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<Self> {
785        if column >= self.num_columns() {
786            return Err(ArrowError::InvalidArgumentError(format!(
787                "Column index out of bounds: {}",
788                column
789            )));
790        }
791        let column = self.column(column);
792        let sorted = arrow_ord::sort::sort_to_indices(column, options, None)?;
793        self.take(&sorted)
794    }
795}
796
797/// Recursively projects an array to match the target field's structure.
798/// This handles reordering fields inside nested List<Struct> types.
799fn project_array(array: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
800    match target_field.data_type() {
801        DataType::Struct(subfields) => {
802            let struct_arr = array.as_struct();
803            let projected = project(struct_arr, subfields)?;
804            Ok(Arc::new(projected))
805        }
806        DataType::List(inner_field) => {
807            let list_arr: &ListArray = array.as_list();
808            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
809            Ok(Arc::new(ListArray::new(
810                inner_field.clone(),
811                list_arr.offsets().clone(),
812                projected_values,
813                list_arr.nulls().cloned(),
814            )))
815        }
816        DataType::LargeList(inner_field) => {
817            let list_arr: &LargeListArray = array.as_list();
818            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
819            Ok(Arc::new(LargeListArray::new(
820                inner_field.clone(),
821                list_arr.offsets().clone(),
822                projected_values,
823                list_arr.nulls().cloned(),
824            )))
825        }
826        DataType::FixedSizeList(inner_field, size) => {
827            let list_arr = array.as_fixed_size_list();
828            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
829            Ok(Arc::new(FixedSizeListArray::new(
830                inner_field.clone(),
831                *size,
832                projected_values,
833                list_arr.nulls().cloned(),
834            )))
835        }
836        _ => Ok(array.clone()),
837    }
838}
839
840fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
841    if fields.is_empty() {
842        return Ok(StructArray::new_empty_fields(
843            struct_array.len(),
844            struct_array.nulls().cloned(),
845        ));
846    }
847    let mut columns: Vec<ArrayRef> = vec![];
848    for field in fields.iter() {
849        if let Some(col) = struct_array.column_by_name(field.name()) {
850            let projected = project_array(col, field.as_ref())?;
851            columns.push(projected);
852        } else {
853            return Err(ArrowError::SchemaError(format!(
854                "field {} does not exist in the RecordBatch",
855                field.name()
856            )));
857        }
858    }
859    // Preserve the struct's validity when projecting
860    StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
861}
862
863fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
864    let left_list: &GenericListArray<T> = left.as_list();
865    let right_list: &GenericListArray<T> = right.as_list();
866    left_list.offsets().inner() == right_list.offsets().inner()
867}
868
869fn merge_list_structs_helper<T: OffsetSizeTrait>(
870    left: &dyn Array,
871    right: &dyn Array,
872    items_field_name: impl Into<String>,
873    items_nullable: bool,
874) -> Arc<dyn Array> {
875    let left_list: &GenericListArray<T> = left.as_list();
876    let right_list: &GenericListArray<T> = right.as_list();
877    let left_struct = left_list.values();
878    let right_struct = right_list.values();
879    let left_struct_arr = left_struct.as_struct();
880    let right_struct_arr = right_struct.as_struct();
881    let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
882    let items_field = Arc::new(Field::new(
883        items_field_name,
884        merged_items.data_type().clone(),
885        items_nullable,
886    ));
887    Arc::new(GenericListArray::<T>::new(
888        items_field,
889        left_list.offsets().clone(),
890        merged_items,
891        left_list.nulls().cloned(),
892    ))
893}
894
895fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
896    left: &dyn Array,
897    right: &dyn Array,
898    not_null: &dyn Array,
899    items_field_name: impl Into<String>,
900) -> Arc<dyn Array> {
901    let left_list: &GenericListArray<T> = left.as_list::<T>();
902    let not_null_list = not_null.as_list::<T>();
903    let right_list = right.as_list::<T>();
904
905    let left_struct = left_list.values().as_struct();
906    let not_null_struct: &StructArray = not_null_list.values().as_struct();
907    let right_struct = right_list.values().as_struct();
908
909    let values_len = not_null_list.values().len();
910    let mut merged_fields =
911        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
912    let mut merged_columns =
913        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
914
915    for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
916        merged_fields.push(field.clone());
917        if let Some(val) = not_null_struct.column_by_name(field.name()) {
918            merged_columns.push(val.clone());
919        } else {
920            merged_columns.push(new_null_array(field.data_type(), values_len))
921        }
922    }
923    for (_, field) in right_struct
924        .columns()
925        .iter()
926        .zip(right_struct.fields())
927        .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
928    {
929        merged_fields.push(field.clone());
930        if let Some(val) = not_null_struct.column_by_name(field.name()) {
931            merged_columns.push(val.clone());
932        } else {
933            merged_columns.push(new_null_array(field.data_type(), values_len));
934        }
935    }
936
937    let merged_struct = Arc::new(StructArray::new(
938        Fields::from(merged_fields),
939        merged_columns,
940        not_null_struct.nulls().cloned(),
941    ));
942    let items_field = Arc::new(Field::new(
943        items_field_name,
944        merged_struct.data_type().clone(),
945        true,
946    ));
947    Arc::new(GenericListArray::<T>::new(
948        items_field,
949        not_null_list.offsets().clone(),
950        merged_struct,
951        not_null_list.nulls().cloned(),
952    ))
953}
954
955fn merge_list_struct_null(
956    left: &dyn Array,
957    right: &dyn Array,
958    not_null: &dyn Array,
959) -> Arc<dyn Array> {
960    match left.data_type() {
961        DataType::List(left_field) => {
962            merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
963        }
964        DataType::LargeList(left_field) => {
965            merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
966        }
967        _ => unreachable!(),
968    }
969}
970
971fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
972    // Merging fields into a list<struct<...>> is tricky and can only succeed
973    // in two ways.  First, if both lists have the same offsets.  Second, if
974    // one of the lists is all-null
975    if left.null_count() == left.len() {
976        return merge_list_struct_null(left, right, right);
977    } else if right.null_count() == right.len() {
978        return merge_list_struct_null(left, right, left);
979    }
980    match (left.data_type(), right.data_type()) {
981        (DataType::List(left_field), DataType::List(_)) => {
982            if !lists_have_same_offsets_helper::<i32>(left, right) {
983                panic!("Attempt to merge list struct arrays which do not have same offsets");
984            }
985            merge_list_structs_helper::<i32>(
986                left,
987                right,
988                left_field.name(),
989                left_field.is_nullable(),
990            )
991        }
992        (DataType::LargeList(left_field), DataType::LargeList(_)) => {
993            if !lists_have_same_offsets_helper::<i64>(left, right) {
994                panic!("Attempt to merge list struct arrays which do not have same offsets");
995            }
996            merge_list_structs_helper::<i64>(
997                left,
998                right,
999                left_field.name(),
1000                left_field.is_nullable(),
1001            )
1002        }
1003        _ => unreachable!(),
1004    }
1005}
1006
1007/// Helper function to normalize validity buffers
1008/// Returns None for all-null validity (placeholder structs)
1009fn normalize_validity(
1010    validity: Option<&arrow_buffer::NullBuffer>,
1011) -> Option<&arrow_buffer::NullBuffer> {
1012    validity.and_then(|v| {
1013        if v.null_count() == v.len() {
1014            None
1015        } else {
1016            Some(v)
1017        }
1018    })
1019}
1020
1021/// Helper function to merge validity buffers from two struct arrays
1022/// Returns None only if both arrays are null at the same position
1023///
1024/// Special handling for placeholder structs (all-null validity)
1025fn merge_struct_validity(
1026    left_validity: Option<&arrow_buffer::NullBuffer>,
1027    right_validity: Option<&arrow_buffer::NullBuffer>,
1028) -> Option<arrow_buffer::NullBuffer> {
1029    // Normalize both validity buffers (convert all-null to None)
1030    let left_normalized = normalize_validity(left_validity);
1031    let right_normalized = normalize_validity(right_validity);
1032
1033    match (left_normalized, right_normalized) {
1034        // Fast paths: no computation needed
1035        (None, None) => None,
1036        (Some(left), None) => Some(left.clone()),
1037        (None, Some(right)) => Some(right.clone()),
1038        (Some(left), Some(right)) => {
1039            // Fast path: if both have no nulls, can return either one
1040            if left.null_count() == 0 && right.null_count() == 0 {
1041                return Some(left.clone());
1042            }
1043
1044            let left_buffer = left.inner();
1045            let right_buffer = right.inner();
1046
1047            // Perform bitwise OR directly on BooleanBuffers
1048            // This preserves the correct semantics: 1 = valid, 0 = null
1049            let merged_buffer = left_buffer | right_buffer;
1050
1051            Some(arrow_buffer::NullBuffer::from(merged_buffer))
1052        }
1053    }
1054}
1055
1056fn merge_list_child_values(
1057    child_field: &Field,
1058    left_values: ArrayRef,
1059    right_values: ArrayRef,
1060) -> ArrayRef {
1061    match child_field.data_type() {
1062        DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1063            left_values.as_struct(),
1064            right_values.as_struct(),
1065            child_fields,
1066        )) as ArrayRef,
1067        DataType::List(grandchild) => {
1068            let left_list = left_values
1069                .as_any()
1070                .downcast_ref::<ListArray>()
1071                .expect("left list values should be ListArray");
1072            let right_list = right_values
1073                .as_any()
1074                .downcast_ref::<ListArray>()
1075                .expect("right list values should be ListArray");
1076            let merged_values = merge_list_child_values(
1077                grandchild.as_ref(),
1078                left_list.values().clone(),
1079                right_list.values().clone(),
1080            );
1081            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1082            Arc::new(ListArray::new(
1083                grandchild.clone(),
1084                left_list.offsets().clone(),
1085                merged_values,
1086                merged_validity,
1087            )) as ArrayRef
1088        }
1089        DataType::LargeList(grandchild) => {
1090            let left_list = left_values
1091                .as_any()
1092                .downcast_ref::<LargeListArray>()
1093                .expect("left list values should be LargeListArray");
1094            let right_list = right_values
1095                .as_any()
1096                .downcast_ref::<LargeListArray>()
1097                .expect("right list values should be LargeListArray");
1098            let merged_values = merge_list_child_values(
1099                grandchild.as_ref(),
1100                left_list.values().clone(),
1101                right_list.values().clone(),
1102            );
1103            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1104            Arc::new(LargeListArray::new(
1105                grandchild.clone(),
1106                left_list.offsets().clone(),
1107                merged_values,
1108                merged_validity,
1109            )) as ArrayRef
1110        }
1111        DataType::FixedSizeList(grandchild, list_size) => {
1112            let left_list = left_values
1113                .as_any()
1114                .downcast_ref::<FixedSizeListArray>()
1115                .expect("left list values should be FixedSizeListArray");
1116            let right_list = right_values
1117                .as_any()
1118                .downcast_ref::<FixedSizeListArray>()
1119                .expect("right list values should be FixedSizeListArray");
1120            let merged_values = merge_list_child_values(
1121                grandchild.as_ref(),
1122                left_list.values().clone(),
1123                right_list.values().clone(),
1124            );
1125            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1126            Arc::new(FixedSizeListArray::new(
1127                grandchild.clone(),
1128                *list_size,
1129                merged_values,
1130                merged_validity,
1131            )) as ArrayRef
1132        }
1133        _ => left_values.clone(),
1134    }
1135}
1136
1137// Helper function to adjust child array validity based on parent struct validity
1138// When parent struct is null, propagates null to child array
1139// Optimized with fast paths and SIMD operations
1140fn adjust_child_validity(
1141    child: &ArrayRef,
1142    parent_validity: Option<&arrow_buffer::NullBuffer>,
1143) -> ArrayRef {
1144    // Fast path: no parent validity means no adjustment needed
1145    let parent_validity = match parent_validity {
1146        None => return child.clone(),
1147        Some(p) if p.null_count() == 0 => return child.clone(), // No nulls to propagate
1148        Some(p) => p,
1149    };
1150
1151    // Fast path: DataType::Null arrays are always entirely null by definition and cannot
1152    // carry an explicit null bitmap (Arrow rejects it). No adjustment is needed.
1153    if child.data_type() == &DataType::Null {
1154        return child.clone();
1155    }
1156
1157    let child_validity = child.nulls();
1158
1159    // Compute the new validity: child_validity AND parent_validity
1160    let new_validity = match child_validity {
1161        None => {
1162            // Fast path: child has no existing validity, just use parent's
1163            parent_validity.clone()
1164        }
1165        Some(child_nulls) => {
1166            let child_buffer = child_nulls.inner();
1167            let parent_buffer = parent_validity.inner();
1168
1169            // Perform bitwise AND directly on BooleanBuffers
1170            // This preserves the correct semantics: 1 = valid, 0 = null
1171            let merged_buffer = child_buffer & parent_buffer;
1172
1173            arrow_buffer::NullBuffer::from(merged_buffer)
1174        }
1175    };
1176
1177    // Create new array with adjusted validity
1178    arrow_array::make_array(
1179        arrow_data::ArrayData::try_new(
1180            child.data_type().clone(),
1181            child.len(),
1182            Some(new_validity.into_inner().into_inner()),
1183            child.offset(),
1184            child.to_data().buffers().to_vec(),
1185            child.to_data().child_data().to_vec(),
1186        )
1187        .unwrap(),
1188    )
1189}
1190
1191fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1192    let mut fields: Vec<Field> = vec![];
1193    let mut columns: Vec<ArrayRef> = vec![];
1194    let right_fields = right_struct_array.fields();
1195    let right_columns = right_struct_array.columns();
1196
1197    // Get the validity buffers from both structs
1198    let left_validity = left_struct_array.nulls();
1199    let right_validity = right_struct_array.nulls();
1200
1201    // Compute merged validity
1202    let merged_validity = merge_struct_validity(left_validity, right_validity);
1203
1204    // iterate through the fields on the left hand side
1205    for (left_field, left_column) in left_struct_array
1206        .fields()
1207        .iter()
1208        .zip(left_struct_array.columns().iter())
1209    {
1210        match right_fields
1211            .iter()
1212            .position(|f| f.name() == left_field.name())
1213        {
1214            // if the field exists on the right hand side, merge them recursively if appropriate
1215            Some(right_index) => {
1216                let right_field = right_fields.get(right_index).unwrap();
1217                let right_column = right_columns.get(right_index).unwrap();
1218                // if both fields are struct, merge them recursively
1219                match (left_field.data_type(), right_field.data_type()) {
1220                    (DataType::Struct(_), DataType::Struct(_)) => {
1221                        let left_sub_array = left_column.as_struct();
1222                        let right_sub_array = right_column.as_struct();
1223                        let merged_sub_array = merge(left_sub_array, right_sub_array);
1224                        fields.push(Field::new(
1225                            left_field.name(),
1226                            merged_sub_array.data_type().clone(),
1227                            left_field.is_nullable(),
1228                        ));
1229                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1230                    }
1231                    (DataType::List(left_list), DataType::List(right_list))
1232                        if left_list.data_type().is_struct()
1233                            && right_list.data_type().is_struct() =>
1234                    {
1235                        // If there is nothing to merge just use the left field
1236                        if left_list.data_type() == right_list.data_type() {
1237                            fields.push(left_field.as_ref().clone());
1238                            columns.push(left_column.clone());
1239                        }
1240                        // If we have two List<Struct> and they have different sets of fields then
1241                        // we can merge them if the offsets arrays are the same.  Otherwise, we
1242                        // have to consider it an error.
1243                        let merged_sub_array = merge_list_struct(&left_column, &right_column);
1244
1245                        fields.push(Field::new(
1246                            left_field.name(),
1247                            merged_sub_array.data_type().clone(),
1248                            left_field.is_nullable(),
1249                        ));
1250                        columns.push(merged_sub_array);
1251                    }
1252                    // otherwise, just use the field on the left hand side
1253                    _ => {
1254                        // TODO handle list-of-struct and other types
1255                        fields.push(left_field.as_ref().clone());
1256                        // Adjust the column validity: if left struct was null, propagate to child
1257                        let adjusted_column = adjust_child_validity(left_column, left_validity);
1258                        columns.push(adjusted_column);
1259                    }
1260                }
1261            }
1262            None => {
1263                fields.push(left_field.as_ref().clone());
1264                // Adjust the column validity: if left struct was null, propagate to child
1265                let adjusted_column = adjust_child_validity(left_column, left_validity);
1266                columns.push(adjusted_column);
1267            }
1268        }
1269    }
1270
1271    // now iterate through the fields on the right hand side
1272    right_fields
1273        .iter()
1274        .zip(right_columns.iter())
1275        .for_each(|(field, column)| {
1276            // add new columns on the right
1277            if !left_struct_array
1278                .fields()
1279                .iter()
1280                .any(|f| f.name() == field.name())
1281            {
1282                fields.push(field.as_ref().clone());
1283                // This field doesn't exist on the left
1284                // We use the right's column but need to adjust for struct validity
1285                let adjusted_column = adjust_child_validity(column, right_validity);
1286                columns.push(adjusted_column);
1287            }
1288        });
1289
1290    StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1291}
1292
1293fn merge_with_schema(
1294    left_struct_array: &StructArray,
1295    right_struct_array: &StructArray,
1296    fields: &Fields,
1297) -> StructArray {
1298    // Helper function that returns true if both types are struct or both are non-struct
1299    fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1300        match (left, right) {
1301            (DataType::Struct(_), DataType::Struct(_)) => true,
1302            (DataType::Struct(_), _) => false,
1303            (_, DataType::Struct(_)) => false,
1304            _ => true,
1305        }
1306    }
1307
1308    let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1309    let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1310
1311    let left_fields = left_struct_array.fields();
1312    let left_columns = left_struct_array.columns();
1313    let right_fields = right_struct_array.fields();
1314    let right_columns = right_struct_array.columns();
1315
1316    // Get the validity buffers from both structs
1317    let left_validity = left_struct_array.nulls();
1318    let right_validity = right_struct_array.nulls();
1319
1320    // Compute merged validity
1321    let merged_validity = merge_struct_validity(left_validity, right_validity);
1322
1323    for field in fields {
1324        let left_match_idx = left_fields.iter().position(|f| {
1325            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1326        });
1327        let right_match_idx = right_fields.iter().position(|f| {
1328            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1329        });
1330
1331        match (left_match_idx, right_match_idx) {
1332            (None, Some(right_idx)) => {
1333                output_fields.push(right_fields[right_idx].as_ref().clone());
1334                // Adjust validity if the right struct was null
1335                let adjusted_column =
1336                    adjust_child_validity(&right_columns[right_idx], right_validity);
1337                columns.push(adjusted_column);
1338            }
1339            (Some(left_idx), None) => {
1340                output_fields.push(left_fields[left_idx].as_ref().clone());
1341                // Adjust validity if the left struct was null
1342                let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1343                columns.push(adjusted_column);
1344            }
1345            (Some(left_idx), Some(right_idx)) => {
1346                match field.data_type() {
1347                    DataType::Struct(child_fields) => {
1348                        let left_sub_array = left_columns[left_idx].as_struct();
1349                        let right_sub_array = right_columns[right_idx].as_struct();
1350                        let merged_sub_array =
1351                            merge_with_schema(left_sub_array, right_sub_array, child_fields);
1352                        output_fields.push(Field::new(
1353                            field.name(),
1354                            merged_sub_array.data_type().clone(),
1355                            field.is_nullable(),
1356                        ));
1357                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1358                    }
1359                    DataType::List(child_field) => {
1360                        let left_list = left_columns[left_idx]
1361                            .as_any()
1362                            .downcast_ref::<ListArray>()
1363                            .unwrap();
1364                        let right_list = right_columns[right_idx]
1365                            .as_any()
1366                            .downcast_ref::<ListArray>()
1367                            .unwrap();
1368                        let merged_values = merge_list_child_values(
1369                            child_field.as_ref(),
1370                            left_list.trimmed_values(),
1371                            right_list.trimmed_values(),
1372                        );
1373                        let merged_validity =
1374                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1375                        let merged_list = ListArray::new(
1376                            child_field.clone(),
1377                            left_list.offsets().clone(),
1378                            merged_values,
1379                            merged_validity,
1380                        );
1381                        output_fields.push(field.as_ref().clone());
1382                        columns.push(Arc::new(merged_list) as ArrayRef);
1383                    }
1384                    DataType::LargeList(child_field) => {
1385                        let left_list = left_columns[left_idx]
1386                            .as_any()
1387                            .downcast_ref::<LargeListArray>()
1388                            .unwrap();
1389                        let right_list = right_columns[right_idx]
1390                            .as_any()
1391                            .downcast_ref::<LargeListArray>()
1392                            .unwrap();
1393                        let merged_values = merge_list_child_values(
1394                            child_field.as_ref(),
1395                            left_list.trimmed_values(),
1396                            right_list.trimmed_values(),
1397                        );
1398                        let merged_validity =
1399                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1400                        let merged_list = LargeListArray::new(
1401                            child_field.clone(),
1402                            left_list.offsets().clone(),
1403                            merged_values,
1404                            merged_validity,
1405                        );
1406                        output_fields.push(field.as_ref().clone());
1407                        columns.push(Arc::new(merged_list) as ArrayRef);
1408                    }
1409                    DataType::FixedSizeList(child_field, list_size) => {
1410                        let left_list = left_columns[left_idx]
1411                            .as_any()
1412                            .downcast_ref::<FixedSizeListArray>()
1413                            .unwrap();
1414                        let right_list = right_columns[right_idx]
1415                            .as_any()
1416                            .downcast_ref::<FixedSizeListArray>()
1417                            .unwrap();
1418                        let merged_values = merge_list_child_values(
1419                            child_field.as_ref(),
1420                            left_list.values().clone(),
1421                            right_list.values().clone(),
1422                        );
1423                        let merged_validity =
1424                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1425                        let merged_list = FixedSizeListArray::new(
1426                            child_field.clone(),
1427                            *list_size,
1428                            merged_values,
1429                            merged_validity,
1430                        );
1431                        output_fields.push(field.as_ref().clone());
1432                        columns.push(Arc::new(merged_list) as ArrayRef);
1433                    }
1434                    _ => {
1435                        output_fields.push(left_fields[left_idx].as_ref().clone());
1436                        // For fields that exist in both, use left but adjust validity
1437                        let adjusted_column =
1438                            adjust_child_validity(&left_columns[left_idx], left_validity);
1439                        columns.push(adjusted_column);
1440                    }
1441                }
1442            }
1443            (None, None) => {
1444                // The field will not be included in the output
1445            }
1446        }
1447    }
1448
1449    StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1450}
1451
1452fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1453    if components.is_empty() {
1454        return Some(array);
1455    }
1456    if !matches!(array.data_type(), DataType::Struct(_)) {
1457        return None;
1458    }
1459    let struct_arr = array.as_struct();
1460    struct_arr
1461        .column_by_name(components[0])
1462        .and_then(|arr| get_sub_array(arr, &components[1..]))
1463}
1464
1465/// Interleave multiple RecordBatches into a single RecordBatch.
1466///
1467/// Behaves like [`arrow_select::interleave::interleave`], but for RecordBatches.
1468pub fn interleave_batches(
1469    batches: &[RecordBatch],
1470    indices: &[(usize, usize)],
1471) -> Result<RecordBatch> {
1472    let first_batch = batches.first().ok_or_else(|| {
1473        ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1474    })?;
1475    let schema = first_batch.schema();
1476    let num_columns = first_batch.num_columns();
1477    let mut columns = Vec::with_capacity(num_columns);
1478    let mut chunks = Vec::with_capacity(batches.len());
1479
1480    for i in 0..num_columns {
1481        for batch in batches {
1482            chunks.push(batch.column(i).as_ref());
1483        }
1484        let new_column = interleave(&chunks, indices)?;
1485        columns.push(new_column);
1486        chunks.clear();
1487    }
1488
1489    RecordBatch::try_new(schema, columns)
1490}
1491
1492pub trait BufferExt {
1493    /// Create an `arrow_buffer::Buffer`` from a `bytes::Bytes` object
1494    ///
1495    /// The alignment must be specified (as `bytes_per_value`) since we want to make
1496    /// sure we can safely reinterpret the buffer.
1497    ///
1498    /// If the buffer is properly aligned this will be zero-copy.  If not, a copy
1499    /// will be made and an owned buffer returned.
1500    ///
1501    /// If `bytes_per_value` is not a power of two, then we assume the buffer is
1502    /// never going to be reinterpreted into another type and we can safely
1503    /// ignore the alignment.
1504    ///
1505    /// Yes, the method name is odd.  It's because there is already a `from_bytes`
1506    /// which converts from `arrow_buffer::bytes::Bytes` (not `bytes::Bytes`)
1507    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1508
1509    /// Allocates a new properly aligned arrow buffer and copies `bytes` into it
1510    ///
1511    /// `size_bytes` can be larger than `bytes` and, if so, the trailing bytes will
1512    /// be zeroed out.
1513    ///
1514    /// # Panics
1515    ///
1516    /// Panics if `size_bytes` is less than `bytes.len()`
1517    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1518}
1519
1520fn is_pwr_two(n: u64) -> bool {
1521    n & (n - 1) == 0
1522}
1523
1524impl BufferExt for arrow_buffer::Buffer {
1525    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1526        if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1527        {
1528            // The original buffer is not aligned, cannot zero-copy
1529            let size_bytes = bytes.len();
1530            Self::copy_bytes_bytes(bytes, size_bytes)
1531        } else {
1532            // The original buffer is aligned, can zero-copy
1533            // SAFETY: the alignment is correct we can make this conversion
1534            unsafe {
1535                Self::from_custom_allocation(
1536                    NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1537                    bytes.len(),
1538                    Arc::new(bytes),
1539                )
1540            }
1541        }
1542    }
1543
1544    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1545        assert!(size_bytes >= bytes.len());
1546        let mut buf = MutableBuffer::with_capacity(size_bytes);
1547        let to_fill = size_bytes - bytes.len();
1548        buf.extend(bytes);
1549        buf.extend(std::iter::repeat_n(0_u8, to_fill));
1550
1551        // FIX for issue #4512: Shrink buffer to actual size before converting to immutable
1552        // This reduces memory overhead from capacity over-allocation
1553        buf.shrink_to_fit();
1554
1555        Self::from(buf)
1556    }
1557}
1558
1559#[cfg(test)]
1560mod tests {
1561    use super::*;
1562    use arrow_array::{Float32Array, Int32Array, NullArray, StructArray};
1563    use arrow_array::{ListArray, StringArray, new_empty_array, new_null_array};
1564    use arrow_buffer::OffsetBuffer;
1565
1566    #[test]
1567    fn test_merge_recursive() {
1568        let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1569        let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1570        let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1571        let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1572
1573        let left_schema = Schema::new(vec![
1574            Field::new("a", DataType::Int32, true),
1575            Field::new(
1576                "b",
1577                DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1578                true,
1579            ),
1580        ]);
1581        let left_batch = RecordBatch::try_new(
1582            Arc::new(left_schema),
1583            vec![
1584                Arc::new(a_array.clone()),
1585                Arc::new(StructArray::from(vec![(
1586                    Arc::new(Field::new("c", DataType::Int32, true)),
1587                    Arc::new(c_array.clone()) as ArrayRef,
1588                )])),
1589            ],
1590        )
1591        .unwrap();
1592
1593        let right_schema = Schema::new(vec![
1594            Field::new("e", DataType::Int32, true),
1595            Field::new(
1596                "b",
1597                DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1598                true,
1599            ),
1600        ]);
1601        let right_batch = RecordBatch::try_new(
1602            Arc::new(right_schema),
1603            vec![
1604                Arc::new(e_array.clone()),
1605                Arc::new(StructArray::from(vec![(
1606                    Arc::new(Field::new("d", DataType::Utf8, true)),
1607                    Arc::new(d_array.clone()) as ArrayRef,
1608                )])) as ArrayRef,
1609            ],
1610        )
1611        .unwrap();
1612
1613        let merged_schema = Schema::new(vec![
1614            Field::new("a", DataType::Int32, true),
1615            Field::new(
1616                "b",
1617                DataType::Struct(
1618                    vec![
1619                        Field::new("c", DataType::Int32, true),
1620                        Field::new("d", DataType::Utf8, true),
1621                    ]
1622                    .into(),
1623                ),
1624                true,
1625            ),
1626            Field::new("e", DataType::Int32, true),
1627        ]);
1628        let merged_batch = RecordBatch::try_new(
1629            Arc::new(merged_schema),
1630            vec![
1631                Arc::new(a_array) as ArrayRef,
1632                Arc::new(StructArray::from(vec![
1633                    (
1634                        Arc::new(Field::new("c", DataType::Int32, true)),
1635                        Arc::new(c_array) as ArrayRef,
1636                    ),
1637                    (
1638                        Arc::new(Field::new("d", DataType::Utf8, true)),
1639                        Arc::new(d_array) as ArrayRef,
1640                    ),
1641                ])) as ArrayRef,
1642                Arc::new(e_array) as ArrayRef,
1643            ],
1644        )
1645        .unwrap();
1646
1647        let result = left_batch.merge(&right_batch).unwrap();
1648        assert_eq!(result, merged_batch);
1649    }
1650
1651    #[test]
1652    fn test_merge_with_schema() {
1653        fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1654            let fields: Fields = names
1655                .iter()
1656                .zip(types)
1657                .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1658                .collect();
1659            let schema = Schema::new(vec![Field::new(
1660                "struct",
1661                DataType::Struct(fields.clone()),
1662                false,
1663            )]);
1664            let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1665            let batch = RecordBatch::try_new(
1666                Arc::new(schema.clone()),
1667                vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1668            );
1669            (schema, batch.unwrap())
1670        }
1671
1672        let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1673        let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1674        let (output_schema, _) = test_batch(
1675            &["b", "a", "c"],
1676            &[DataType::Int64, DataType::Int32, DataType::Int32],
1677        );
1678
1679        // If we use merge_with_schema the schema is respected
1680        let merged = left_batch
1681            .merge_with_schema(&right_batch, &output_schema)
1682            .unwrap();
1683        assert_eq!(merged.schema().as_ref(), &output_schema);
1684
1685        // If we use merge we get first-come first-serve based on the left batch
1686        let (naive_schema, _) = test_batch(
1687            &["a", "b", "c"],
1688            &[DataType::Int32, DataType::Int64, DataType::Int32],
1689        );
1690        let merged = left_batch.merge(&right_batch).unwrap();
1691        assert_eq!(merged.schema().as_ref(), &naive_schema);
1692    }
1693
1694    #[test]
1695    fn test_merge_list_struct() {
1696        let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1697        let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1698        let x_struct_field = Arc::new(Field::new(
1699            "item",
1700            DataType::Struct(Fields::from(vec![x_field.clone()])),
1701            true,
1702        ));
1703        let y_struct_field = Arc::new(Field::new(
1704            "item",
1705            DataType::Struct(Fields::from(vec![y_field.clone()])),
1706            true,
1707        ));
1708        let both_struct_field = Arc::new(Field::new(
1709            "item",
1710            DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1711            true,
1712        ));
1713        let left_schema = Schema::new(vec![Field::new(
1714            "list_struct",
1715            DataType::List(x_struct_field.clone()),
1716            true,
1717        )]);
1718        let right_schema = Schema::new(vec![Field::new(
1719            "list_struct",
1720            DataType::List(y_struct_field.clone()),
1721            true,
1722        )]);
1723        let both_schema = Schema::new(vec![Field::new(
1724            "list_struct",
1725            DataType::List(both_struct_field.clone()),
1726            true,
1727        )]);
1728
1729        let x = Arc::new(Int32Array::from(vec![1]));
1730        let y = Arc::new(Int32Array::from(vec![2]));
1731        let x_struct = Arc::new(StructArray::new(
1732            Fields::from(vec![x_field.clone()]),
1733            vec![x.clone()],
1734            None,
1735        ));
1736        let y_struct = Arc::new(StructArray::new(
1737            Fields::from(vec![y_field.clone()]),
1738            vec![y.clone()],
1739            None,
1740        ));
1741        let both_struct = Arc::new(StructArray::new(
1742            Fields::from(vec![x_field.clone(), y_field.clone()]),
1743            vec![x.clone(), y],
1744            None,
1745        ));
1746        let both_null_struct = Arc::new(StructArray::new(
1747            Fields::from(vec![x_field, y_field]),
1748            vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1749            None,
1750        ));
1751        let offsets = OffsetBuffer::from_lengths([1]);
1752        let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1753        let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1754        let both_list = ListArray::new(
1755            both_struct_field.clone(),
1756            offsets.clone(),
1757            both_struct,
1758            None,
1759        );
1760        let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1761        let x_batch =
1762            RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1763        let y_batch = RecordBatch::try_new(
1764            Arc::new(right_schema.clone()),
1765            vec![Arc::new(y_s_list.clone())],
1766        )
1767        .unwrap();
1768        let merged = x_batch.merge(&y_batch).unwrap();
1769        let expected =
1770            RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1771        assert_eq!(merged, expected);
1772
1773        let y_null_list = new_null_array(y_s_list.data_type(), 1);
1774        let y_null_batch =
1775            RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1776                .unwrap();
1777        let expected =
1778            RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1779        let merged = x_batch.merge(&y_null_batch).unwrap();
1780        assert_eq!(merged, expected);
1781    }
1782
1783    #[test]
1784    fn test_byte_width_opt() {
1785        assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1786        assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1787        assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1788        assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1789        assert_eq!(DataType::Utf8.byte_width_opt(), None);
1790        assert_eq!(DataType::Binary.byte_width_opt(), None);
1791        assert_eq!(
1792            DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1793            None
1794        );
1795        assert_eq!(
1796            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1797                .byte_width_opt(),
1798            Some(12)
1799        );
1800        assert_eq!(
1801            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1802                .byte_width_opt(),
1803            Some(16)
1804        );
1805        assert_eq!(
1806            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1807                .byte_width_opt(),
1808            None
1809        );
1810    }
1811
1812    #[test]
1813    fn test_take_record_batch() {
1814        let schema = Arc::new(Schema::new(vec![
1815            Field::new("a", DataType::Int32, true),
1816            Field::new("b", DataType::Utf8, true),
1817        ]));
1818        let batch = RecordBatch::try_new(
1819            schema.clone(),
1820            vec![
1821                Arc::new(Int32Array::from_iter_values(0..20)),
1822                Arc::new(StringArray::from_iter_values(
1823                    (0..20).map(|i| format!("str-{}", i)),
1824                )),
1825            ],
1826        )
1827        .unwrap();
1828        let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1829        assert_eq!(
1830            taken,
1831            RecordBatch::try_new(
1832                schema,
1833                vec![
1834                    Arc::new(Int32Array::from(vec![1, 5, 10])),
1835                    Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1836                ],
1837            )
1838            .unwrap()
1839        )
1840    }
1841
1842    #[test]
1843    fn test_schema_project_by_schema() {
1844        let metadata = [("key".to_string(), "value".to_string())];
1845        let schema = Arc::new(
1846            Schema::new(vec![
1847                Field::new("a", DataType::Int32, true),
1848                Field::new("b", DataType::Utf8, true),
1849            ])
1850            .with_metadata(metadata.clone().into()),
1851        );
1852        let batch = RecordBatch::try_new(
1853            schema,
1854            vec![
1855                Arc::new(Int32Array::from_iter_values(0..20)),
1856                Arc::new(StringArray::from_iter_values(
1857                    (0..20).map(|i| format!("str-{}", i)),
1858                )),
1859            ],
1860        )
1861        .unwrap();
1862
1863        // Empty schema
1864        let empty_schema = Schema::empty();
1865        let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1866        let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1867        assert_eq!(
1868            empty_projected,
1869            RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1870                .with_schema(Arc::new(expected_schema))
1871                .unwrap()
1872        );
1873
1874        // Re-ordered schema
1875        let reordered_schema = Schema::new(vec![
1876            Field::new("b", DataType::Utf8, true),
1877            Field::new("a", DataType::Int32, true),
1878        ]);
1879        let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1880        let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1881        assert_eq!(
1882            reordered_projected,
1883            RecordBatch::try_new(
1884                expected_schema,
1885                vec![
1886                    Arc::new(StringArray::from_iter_values(
1887                        (0..20).map(|i| format!("str-{}", i)),
1888                    )),
1889                    Arc::new(Int32Array::from_iter_values(0..20)),
1890                ],
1891            )
1892            .unwrap()
1893        );
1894
1895        // Sub schema
1896        let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1897        let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1898        let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1899        assert_eq!(
1900            sub_projected,
1901            RecordBatch::try_new(
1902                expected_schema,
1903                vec![Arc::new(Int32Array::from_iter_values(0..20))],
1904            )
1905            .unwrap()
1906        );
1907    }
1908
1909    #[test]
1910    fn test_project_preserves_struct_validity() {
1911        // Test that projecting a struct array preserves its validity (fix for issue #4385)
1912        let fields = Fields::from(vec![
1913            Field::new("id", DataType::Int32, false),
1914            Field::new("value", DataType::Float32, true),
1915        ]);
1916
1917        // Create a struct array with validity
1918        let id_array = Int32Array::from(vec![1, 2, 3]);
1919        let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1920        let struct_array = StructArray::new(
1921            fields.clone(),
1922            vec![
1923                Arc::new(id_array) as ArrayRef,
1924                Arc::new(value_array) as ArrayRef,
1925            ],
1926            Some(vec![true, false, true].into()), // Second struct is null
1927        );
1928
1929        // Project the struct array
1930        let projected = project(&struct_array, &fields).unwrap();
1931
1932        // Verify the validity is preserved
1933        assert_eq!(projected.null_count(), 1);
1934        assert!(!projected.is_null(0));
1935        assert!(projected.is_null(1));
1936        assert!(!projected.is_null(2));
1937    }
1938
1939    #[test]
1940    fn test_merge_struct_with_different_validity() {
1941        // Test case from Weston's review comment
1942        // File 1 has height field with some nulls
1943        let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1944        let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1945        let left_struct = StructArray::new(
1946            left_fields,
1947            vec![Arc::new(height_array) as ArrayRef],
1948            Some(vec![true, false, true, false].into()), // Rows 2 and 4 are null structs
1949        );
1950
1951        // File 2 has width field with some nulls
1952        let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1953        let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1954        let right_struct = StructArray::new(
1955            right_fields,
1956            vec![Arc::new(width_array) as ArrayRef],
1957            Some(vec![true, true, false, false].into()), // Rows 3 and 4 are null structs
1958        );
1959
1960        // Merge the two structs
1961        let merged = merge(&left_struct, &right_struct);
1962
1963        // Expected:
1964        // Row 1: both non-null -> {width: 300, height: 500}
1965        // Row 2: left null, right non-null -> {width: 200, height: null}
1966        // Row 3: left non-null, right null -> {width: null, height: 600}
1967        // Row 4: both null -> null struct
1968
1969        assert_eq!(merged.null_count(), 1); // Only row 4 is null
1970        assert!(!merged.is_null(0));
1971        assert!(!merged.is_null(1));
1972        assert!(!merged.is_null(2));
1973        assert!(merged.is_null(3));
1974
1975        // Check field values
1976        let height_col = merged.column_by_name("height").unwrap();
1977        let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1978        assert_eq!(height_values.value(0), 500);
1979        assert!(height_values.is_null(1)); // height is null when left struct was null
1980        assert_eq!(height_values.value(2), 600);
1981
1982        let width_col = merged.column_by_name("width").unwrap();
1983        let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1984        assert_eq!(width_values.value(0), 300);
1985        assert_eq!(width_values.value(1), 200);
1986        assert!(width_values.is_null(2)); // width is null when right struct was null
1987    }
1988
1989    #[test]
1990    fn test_merge_null_typed_column_with_parent_validity() {
1991        // Reproduces ENT-990: panic in adjust_child_validity when a Null-typed column
1992        // exists on one side and the parent struct has null rows.
1993        // Arrow's Null type has no null bitmap, so passing one to ArrayData::try_new panics.
1994        let left_struct = StructArray::new(
1995            Fields::from(vec![Field::new("a", DataType::Int32, true)]),
1996            vec![Arc::new(Int32Array::from(vec![Some(1), None])) as ArrayRef],
1997            Some(vec![true, false].into()),
1998        );
1999        let right_struct = StructArray::new(
2000            Fields::from(vec![Field::new("b", DataType::Null, true)]),
2001            vec![Arc::new(NullArray::new(2)) as ArrayRef],
2002            Some(vec![true, false].into()),
2003        );
2004
2005        // Previously panicked: "Arrays of type Null cannot contain a null bitmask"
2006        let merged = merge(&left_struct, &right_struct);
2007        assert_eq!(merged.len(), 2);
2008        let b_col = merged.column_by_name("b").unwrap();
2009        // DataType::Null implies all-null by definition; no null bitmap is stored.
2010        assert_eq!(b_col.data_type(), &DataType::Null);
2011        assert_eq!(b_col.len(), 2);
2012    }
2013
2014    #[test]
2015    fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
2016        // left_list setup
2017        let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
2018        let left_count = Arc::new(Int32Array::from(vec![None, None]));
2019        let left_struct = Arc::new(StructArray::new(
2020            Fields::from(vec![
2021                Field::new("company_id", DataType::Int32, true),
2022                Field::new("count", DataType::Int32, true),
2023            ]),
2024            vec![left_company_id, left_count],
2025            None,
2026        ));
2027        let left_list = Arc::new(ListArray::new(
2028            Arc::new(Field::new(
2029                "item",
2030                DataType::Struct(left_struct.fields().clone()),
2031                true,
2032            )),
2033            OffsetBuffer::from_lengths([2]),
2034            left_struct,
2035            None,
2036        ));
2037
2038        // Right List Setup
2039        let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
2040        let right_struct = Arc::new(StructArray::new(
2041            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2042            vec![right_company_name],
2043            None,
2044        ));
2045        let right_list = Arc::new(ListArray::new(
2046            Arc::new(Field::new(
2047                "item",
2048                DataType::Struct(right_struct.fields().clone()),
2049                true,
2050            )),
2051            OffsetBuffer::from_lengths([2]),
2052            right_struct,
2053            None,
2054        ));
2055
2056        let target_fields = Fields::from(vec![Field::new(
2057            "companies",
2058            DataType::List(Arc::new(Field::new(
2059                "item",
2060                DataType::Struct(Fields::from(vec![
2061                    Field::new("company_id", DataType::Int32, true),
2062                    Field::new("company_name", DataType::Utf8, true),
2063                    Field::new("count", DataType::Int32, true),
2064                ])),
2065                true,
2066            ))),
2067            true,
2068        )]);
2069
2070        let left_batch = RecordBatch::try_new(
2071            Arc::new(Schema::new(vec![Field::new(
2072                "companies",
2073                left_list.data_type().clone(),
2074                true,
2075            )])),
2076            vec![left_list as ArrayRef],
2077        )
2078        .unwrap();
2079
2080        let right_batch = RecordBatch::try_new(
2081            Arc::new(Schema::new(vec![Field::new(
2082                "companies",
2083                right_list.data_type().clone(),
2084                true,
2085            )])),
2086            vec![right_list as ArrayRef],
2087        )
2088        .unwrap();
2089
2090        let merged = left_batch
2091            .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2092            .unwrap();
2093
2094        // Verify the merged structure
2095        let merged_list = merged
2096            .column_by_name("companies")
2097            .unwrap()
2098            .as_any()
2099            .downcast_ref::<ListArray>()
2100            .unwrap();
2101        let merged_struct = merged_list.values().as_struct();
2102
2103        // Should have all 3 fields
2104        assert_eq!(merged_struct.num_columns(), 3);
2105        assert!(merged_struct.column_by_name("company_id").is_some());
2106        assert!(merged_struct.column_by_name("company_name").is_some());
2107        assert!(merged_struct.column_by_name("count").is_some());
2108
2109        // Verify values
2110        let company_id = merged_struct
2111            .column_by_name("company_id")
2112            .unwrap()
2113            .as_any()
2114            .downcast_ref::<Int32Array>()
2115            .unwrap();
2116        assert!(company_id.is_null(0));
2117        assert!(company_id.is_null(1));
2118
2119        let company_name = merged_struct
2120            .column_by_name("company_name")
2121            .unwrap()
2122            .as_any()
2123            .downcast_ref::<StringArray>()
2124            .unwrap();
2125        assert_eq!(company_name.value(0), "Google");
2126        assert_eq!(company_name.value(1), "Microsoft");
2127
2128        let count = merged_struct
2129            .column_by_name("count")
2130            .unwrap()
2131            .as_any()
2132            .downcast_ref::<Int32Array>()
2133            .unwrap();
2134        assert!(count.is_null(0));
2135        assert!(count.is_null(1));
2136    }
2137
2138    #[test]
2139    fn test_merge_struct_lists() {
2140        test_merge_struct_lists_generic::<i32>();
2141    }
2142
2143    #[test]
2144    fn test_merge_struct_large_lists() {
2145        test_merge_struct_lists_generic::<i64>();
2146    }
2147
2148    fn test_merge_struct_lists_generic<O: OffsetSizeTrait>() {
2149        // left_list setup
2150        let left_company_id = Arc::new(Int32Array::from(vec![
2151            Some(1),
2152            Some(2),
2153            Some(3),
2154            Some(4),
2155            Some(5),
2156            Some(6),
2157            Some(7),
2158            Some(8),
2159            Some(9),
2160            Some(10),
2161            Some(11),
2162            Some(12),
2163            Some(13),
2164            Some(14),
2165            Some(15),
2166            Some(16),
2167            Some(17),
2168            Some(18),
2169            Some(19),
2170            Some(20),
2171        ]));
2172        let left_count = Arc::new(Int32Array::from(vec![
2173            Some(10),
2174            Some(20),
2175            Some(30),
2176            Some(40),
2177            Some(50),
2178            Some(60),
2179            Some(70),
2180            Some(80),
2181            Some(90),
2182            Some(100),
2183            Some(110),
2184            Some(120),
2185            Some(130),
2186            Some(140),
2187            Some(150),
2188            Some(160),
2189            Some(170),
2190            Some(180),
2191            Some(190),
2192            Some(200),
2193        ]));
2194        let left_struct = Arc::new(StructArray::new(
2195            Fields::from(vec![
2196                Field::new("company_id", DataType::Int32, true),
2197                Field::new("count", DataType::Int32, true),
2198            ]),
2199            vec![left_company_id, left_count],
2200            None,
2201        ));
2202
2203        let left_list = Arc::new(GenericListArray::<O>::new(
2204            Arc::new(Field::new(
2205                "item",
2206                DataType::Struct(left_struct.fields().clone()),
2207                true,
2208            )),
2209            OffsetBuffer::from_lengths([3, 1]),
2210            left_struct.clone(),
2211            None,
2212        ));
2213
2214        let left_list_struct = Arc::new(StructArray::new(
2215            Fields::from(vec![Field::new(
2216                "companies",
2217                if O::IS_LARGE {
2218                    DataType::LargeList(Arc::new(Field::new(
2219                        "item",
2220                        DataType::Struct(left_struct.fields().clone()),
2221                        true,
2222                    )))
2223                } else {
2224                    DataType::List(Arc::new(Field::new(
2225                        "item",
2226                        DataType::Struct(left_struct.fields().clone()),
2227                        true,
2228                    )))
2229                },
2230                true,
2231            )]),
2232            vec![left_list as ArrayRef],
2233            None,
2234        ));
2235
2236        // right_list setup
2237        let right_company_name = Arc::new(StringArray::from(vec![
2238            "Google",
2239            "Microsoft",
2240            "Apple",
2241            "Facebook",
2242        ]));
2243        let right_struct = Arc::new(StructArray::new(
2244            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2245            vec![right_company_name],
2246            None,
2247        ));
2248        let right_list = Arc::new(GenericListArray::<O>::new(
2249            Arc::new(Field::new(
2250                "item",
2251                DataType::Struct(right_struct.fields().clone()),
2252                true,
2253            )),
2254            OffsetBuffer::from_lengths([3, 1]),
2255            right_struct.clone(),
2256            None,
2257        ));
2258
2259        let right_list_struct = Arc::new(StructArray::new(
2260            Fields::from(vec![Field::new(
2261                "companies",
2262                if O::IS_LARGE {
2263                    DataType::LargeList(Arc::new(Field::new(
2264                        "item",
2265                        DataType::Struct(right_struct.fields().clone()),
2266                        true,
2267                    )))
2268                } else {
2269                    DataType::List(Arc::new(Field::new(
2270                        "item",
2271                        DataType::Struct(right_struct.fields().clone()),
2272                        true,
2273                    )))
2274                },
2275                true,
2276            )]),
2277            vec![right_list as ArrayRef],
2278            None,
2279        ));
2280
2281        // prepare schema
2282        let target_fields = Fields::from(vec![Field::new(
2283            "companies",
2284            if O::IS_LARGE {
2285                DataType::LargeList(Arc::new(Field::new(
2286                    "item",
2287                    DataType::Struct(Fields::from(vec![
2288                        Field::new("company_id", DataType::Int32, true),
2289                        Field::new("company_name", DataType::Utf8, true),
2290                        Field::new("count", DataType::Int32, true),
2291                    ])),
2292                    true,
2293                )))
2294            } else {
2295                DataType::List(Arc::new(Field::new(
2296                    "item",
2297                    DataType::Struct(Fields::from(vec![
2298                        Field::new("company_id", DataType::Int32, true),
2299                        Field::new("company_name", DataType::Utf8, true),
2300                        Field::new("count", DataType::Int32, true),
2301                    ])),
2302                    true,
2303                )))
2304            },
2305            true,
2306        )]);
2307
2308        // merge left_list and right_list
2309        let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
2310        assert_eq!(merged_array.len(), 2);
2311    }
2312
2313    #[test]
2314    fn test_project_by_schema_list_struct_reorder() {
2315        // Test that project_by_schema correctly reorders fields inside List<Struct>
2316        // This is a regression test for issue #5702
2317
2318        // Source schema with inner struct fields in order: c, b, a
2319        let source_inner_struct = DataType::Struct(Fields::from(vec![
2320            Field::new("c", DataType::Utf8, true),
2321            Field::new("b", DataType::Utf8, true),
2322            Field::new("a", DataType::Utf8, true),
2323        ]));
2324        let source_schema = Arc::new(Schema::new(vec![
2325            Field::new("id", DataType::Int32, false),
2326            Field::new(
2327                "data",
2328                DataType::List(Arc::new(Field::new(
2329                    "item",
2330                    source_inner_struct.clone(),
2331                    true,
2332                ))),
2333                true,
2334            ),
2335        ]));
2336
2337        // Create source data with c, b, a order
2338        let c_array = StringArray::from(vec!["c1", "c2"]);
2339        let b_array = StringArray::from(vec!["b1", "b2"]);
2340        let a_array = StringArray::from(vec!["a1", "a2"]);
2341        let inner_struct = StructArray::from(vec![
2342            (
2343                Arc::new(Field::new("c", DataType::Utf8, true)),
2344                Arc::new(c_array) as ArrayRef,
2345            ),
2346            (
2347                Arc::new(Field::new("b", DataType::Utf8, true)),
2348                Arc::new(b_array) as ArrayRef,
2349            ),
2350            (
2351                Arc::new(Field::new("a", DataType::Utf8, true)),
2352                Arc::new(a_array) as ArrayRef,
2353            ),
2354        ]);
2355
2356        let list_array = ListArray::new(
2357            Arc::new(Field::new("item", source_inner_struct, true)),
2358            OffsetBuffer::from_lengths([1, 1]),
2359            Arc::new(inner_struct),
2360            None,
2361        );
2362
2363        let batch = RecordBatch::try_new(
2364            source_schema,
2365            vec![Arc::new(Int32Array::from(vec![1, 2])), Arc::new(list_array)],
2366        )
2367        .unwrap();
2368
2369        // Target schema with inner struct fields in order: a, b, c
2370        let target_inner_struct = DataType::Struct(Fields::from(vec![
2371            Field::new("a", DataType::Utf8, true),
2372            Field::new("b", DataType::Utf8, true),
2373            Field::new("c", DataType::Utf8, true),
2374        ]));
2375        let target_schema = Schema::new(vec![
2376            Field::new("id", DataType::Int32, false),
2377            Field::new(
2378                "data",
2379                DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2380                true,
2381            ),
2382        ]);
2383
2384        // Project should reorder the inner struct fields
2385        let projected = batch.project_by_schema(&target_schema).unwrap();
2386
2387        // Verify the schema is correct
2388        assert_eq!(projected.schema().as_ref(), &target_schema);
2389
2390        // Verify the data is correct by checking inner struct field order
2391        let projected_list = projected.column(1).as_list::<i32>();
2392        let projected_struct = projected_list.values().as_struct();
2393
2394        // Fields should now be in order: a, b, c
2395        assert_eq!(
2396            projected_struct.column_by_name("a").unwrap().as_ref(),
2397            &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2398        );
2399        assert_eq!(
2400            projected_struct.column_by_name("b").unwrap().as_ref(),
2401            &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2402        );
2403        assert_eq!(
2404            projected_struct.column_by_name("c").unwrap().as_ref(),
2405            &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2406        );
2407
2408        // Also verify positional access matches expected order (a=0, b=1, c=2)
2409        assert_eq!(
2410            projected_struct.column(0).as_ref(),
2411            &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2412        );
2413        assert_eq!(
2414            projected_struct.column(1).as_ref(),
2415            &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2416        );
2417        assert_eq!(
2418            projected_struct.column(2).as_ref(),
2419            &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2420        );
2421    }
2422
2423    #[test]
2424    fn test_project_by_schema_nested_list_struct() {
2425        // Test deeply nested List<Struct<List<Struct>>> projection
2426        let inner_struct = DataType::Struct(Fields::from(vec![
2427            Field::new("y", DataType::Int32, true),
2428            Field::new("x", DataType::Int32, true),
2429        ]));
2430        let source_schema = Arc::new(Schema::new(vec![Field::new(
2431            "outer",
2432            DataType::List(Arc::new(Field::new(
2433                "item",
2434                DataType::Struct(Fields::from(vec![
2435                    Field::new("b", DataType::Utf8, true),
2436                    Field::new(
2437                        "inner_list",
2438                        DataType::List(Arc::new(Field::new("item", inner_struct.clone(), true))),
2439                        true,
2440                    ),
2441                    Field::new("a", DataType::Utf8, true),
2442                ])),
2443                true,
2444            ))),
2445            true,
2446        )]));
2447
2448        // Create deeply nested data
2449        let y_array = Int32Array::from(vec![1, 2]);
2450        let x_array = Int32Array::from(vec![3, 4]);
2451        let innermost_struct = StructArray::from(vec![
2452            (
2453                Arc::new(Field::new("y", DataType::Int32, true)),
2454                Arc::new(y_array) as ArrayRef,
2455            ),
2456            (
2457                Arc::new(Field::new("x", DataType::Int32, true)),
2458                Arc::new(x_array) as ArrayRef,
2459            ),
2460        ]);
2461        let inner_list = ListArray::new(
2462            Arc::new(Field::new("item", inner_struct.clone(), true)),
2463            OffsetBuffer::from_lengths([2]),
2464            Arc::new(innermost_struct),
2465            None,
2466        );
2467
2468        let b_array = StringArray::from(vec!["b1"]);
2469        let a_array = StringArray::from(vec!["a1"]);
2470        let middle_struct = StructArray::from(vec![
2471            (
2472                Arc::new(Field::new("b", DataType::Utf8, true)),
2473                Arc::new(b_array) as ArrayRef,
2474            ),
2475            (
2476                Arc::new(Field::new(
2477                    "inner_list",
2478                    DataType::List(Arc::new(Field::new("item", inner_struct, true))),
2479                    true,
2480                )),
2481                Arc::new(inner_list) as ArrayRef,
2482            ),
2483            (
2484                Arc::new(Field::new("a", DataType::Utf8, true)),
2485                Arc::new(a_array) as ArrayRef,
2486            ),
2487        ]);
2488
2489        let outer_list = ListArray::new(
2490            Arc::new(Field::new("item", middle_struct.data_type().clone(), true)),
2491            OffsetBuffer::from_lengths([1]),
2492            Arc::new(middle_struct),
2493            None,
2494        );
2495
2496        let batch =
2497            RecordBatch::try_new(source_schema, vec![Arc::new(outer_list) as ArrayRef]).unwrap();
2498
2499        // Target schema with reordered fields at all levels
2500        let target_inner_struct = DataType::Struct(Fields::from(vec![
2501            Field::new("x", DataType::Int32, true), // x before y now
2502            Field::new("y", DataType::Int32, true),
2503        ]));
2504        let target_schema = Schema::new(vec![Field::new(
2505            "outer",
2506            DataType::List(Arc::new(Field::new(
2507                "item",
2508                DataType::Struct(Fields::from(vec![
2509                    Field::new("a", DataType::Utf8, true), // a before b now
2510                    Field::new(
2511                        "inner_list",
2512                        DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2513                        true,
2514                    ),
2515                    Field::new("b", DataType::Utf8, true),
2516                ])),
2517                true,
2518            ))),
2519            true,
2520        )]);
2521
2522        let projected = batch.project_by_schema(&target_schema).unwrap();
2523
2524        // Verify schema
2525        assert_eq!(projected.schema().as_ref(), &target_schema);
2526
2527        // Verify deeply nested data is reordered correctly
2528        let outer_list = projected.column(0).as_list::<i32>();
2529        let middle_struct = outer_list.values().as_struct();
2530
2531        // Middle struct should have a first, then inner_list, then b
2532        assert_eq!(
2533            middle_struct.column(0).as_ref(),
2534            &StringArray::from(vec!["a1"]) as &dyn Array
2535        );
2536        assert_eq!(
2537            middle_struct.column(2).as_ref(),
2538            &StringArray::from(vec!["b1"]) as &dyn Array
2539        );
2540
2541        // Inner list's struct should have x first, then y
2542        let inner_list = middle_struct.column(1).as_list::<i32>();
2543        let innermost_struct = inner_list.values().as_struct();
2544        assert_eq!(
2545            innermost_struct.column(0).as_ref(),
2546            &Int32Array::from(vec![3, 4]) as &dyn Array
2547        );
2548        assert_eq!(
2549            innermost_struct.column(1).as_ref(),
2550            &Int32Array::from(vec![1, 2]) as &dyn Array
2551        );
2552    }
2553}