lance_arrow/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend Arrow Functionality
5//!
6//! To improve Arrow-RS ergonomic
7
8use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12    cast::AsArray, Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray,
13    GenericListArray, LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
14    StructArray, UInt32Array, UInt8Array,
15};
16use arrow_array::{
17    new_null_array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30use crate::list::ListArrayExt;
31pub use floats::*;
32
33pub mod cast;
34pub mod json;
35pub mod list;
36pub mod memory;
37pub mod r#struct;
38
39/// Arrow extension metadata key for extension name
40pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
41
42/// Arrow extension metadata key for extension metadata
43pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
44
45/// Key used by lance to mark a field as a blob
46/// TODO: Use Arrow extension mechanism instead?
47pub const BLOB_META_KEY: &str = "lance-encoding:blob";
48/// Arrow extension type name for Lance blob v2 columns
49pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
50
51type Result<T> = std::result::Result<T, ArrowError>;
52
53pub trait DataTypeExt {
54    /// Returns true if the data type is binary-like, such as (Large)Utf8 and (Large)Binary.
55    ///
56    /// ```
57    /// use lance_arrow::*;
58    /// use arrow_schema::DataType;
59    ///
60    /// assert!(DataType::Utf8.is_binary_like());
61    /// assert!(DataType::Binary.is_binary_like());
62    /// assert!(DataType::LargeUtf8.is_binary_like());
63    /// assert!(DataType::LargeBinary.is_binary_like());
64    /// assert!(!DataType::Int32.is_binary_like());
65    /// ```
66    fn is_binary_like(&self) -> bool;
67
68    /// Returns true if the data type is a struct.
69    fn is_struct(&self) -> bool;
70
71    /// Check whether the given Arrow DataType is fixed stride.
72    ///
73    /// A fixed stride type has the same byte width for all array elements
74    /// This includes all PrimitiveType's Boolean, FixedSizeList, FixedSizeBinary, and Decimals
75    fn is_fixed_stride(&self) -> bool;
76
77    /// Returns true if the [DataType] is a dictionary type.
78    fn is_dictionary(&self) -> bool;
79
80    /// Returns the byte width of the data type
81    /// Panics if the data type is not fixed stride.
82    fn byte_width(&self) -> usize;
83
84    /// Returns the byte width of the data type, if it is fixed stride.
85    /// Returns None if the data type is not fixed stride.
86    fn byte_width_opt(&self) -> Option<usize>;
87}
88
89impl DataTypeExt for DataType {
90    fn is_binary_like(&self) -> bool {
91        use DataType::*;
92        matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
93    }
94
95    fn is_struct(&self) -> bool {
96        matches!(self, Self::Struct(_))
97    }
98
99    fn is_fixed_stride(&self) -> bool {
100        use DataType::*;
101        matches!(
102            self,
103            Boolean
104                | UInt8
105                | UInt16
106                | UInt32
107                | UInt64
108                | Int8
109                | Int16
110                | Int32
111                | Int64
112                | Float16
113                | Float32
114                | Float64
115                | Decimal128(_, _)
116                | Decimal256(_, _)
117                | FixedSizeList(_, _)
118                | FixedSizeBinary(_)
119                | Duration(_)
120                | Timestamp(_, _)
121                | Date32
122                | Date64
123                | Time32(_)
124                | Time64(_)
125        )
126    }
127
128    fn is_dictionary(&self) -> bool {
129        matches!(self, Self::Dictionary(_, _))
130    }
131
132    fn byte_width_opt(&self) -> Option<usize> {
133        match self {
134            Self::Int8 => Some(1),
135            Self::Int16 => Some(2),
136            Self::Int32 => Some(4),
137            Self::Int64 => Some(8),
138            Self::UInt8 => Some(1),
139            Self::UInt16 => Some(2),
140            Self::UInt32 => Some(4),
141            Self::UInt64 => Some(8),
142            Self::Float16 => Some(2),
143            Self::Float32 => Some(4),
144            Self::Float64 => Some(8),
145            Self::Date32 => Some(4),
146            Self::Date64 => Some(8),
147            Self::Time32(_) => Some(4),
148            Self::Time64(_) => Some(8),
149            Self::Timestamp(_, _) => Some(8),
150            Self::Duration(_) => Some(8),
151            Self::Decimal128(_, _) => Some(16),
152            Self::Decimal256(_, _) => Some(32),
153            Self::Interval(unit) => match unit {
154                IntervalUnit::YearMonth => Some(4),
155                IntervalUnit::DayTime => Some(8),
156                IntervalUnit::MonthDayNano => Some(16),
157            },
158            Self::FixedSizeBinary(s) => Some(*s as usize),
159            Self::FixedSizeList(dt, s) => dt
160                .data_type()
161                .byte_width_opt()
162                .map(|width| width * *s as usize),
163            _ => None,
164        }
165    }
166
167    fn byte_width(&self) -> usize {
168        self.byte_width_opt()
169            .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
170    }
171}
172
173/// Create an [`GenericListArray`] from values and offsets.
174///
175/// ```
176/// use arrow_array::{Int32Array, Int64Array, ListArray};
177/// use arrow_array::types::Int64Type;
178/// use lance_arrow::try_new_generic_list_array;
179///
180/// let offsets = Int32Array::from_iter([0, 2, 7, 10]);
181/// let int_values = Int64Array::from_iter(0..10);
182/// let list_arr = try_new_generic_list_array(int_values, &offsets).unwrap();
183/// assert_eq!(list_arr,
184///     ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
185///         Some(vec![Some(0), Some(1)]),
186///         Some(vec![Some(2), Some(3), Some(4), Some(5), Some(6)]),
187///         Some(vec![Some(7), Some(8), Some(9)]),
188/// ]))
189/// ```
190pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
191    values: T,
192    offsets: &PrimitiveArray<Offset>,
193) -> Result<GenericListArray<Offset::Native>>
194where
195    Offset::Native: OffsetSizeTrait,
196{
197    let data_type = if Offset::Native::IS_LARGE {
198        DataType::LargeList(Arc::new(Field::new(
199            "item",
200            values.data_type().clone(),
201            true,
202        )))
203    } else {
204        DataType::List(Arc::new(Field::new(
205            "item",
206            values.data_type().clone(),
207            true,
208        )))
209    };
210    let data = ArrayDataBuilder::new(data_type)
211        .len(offsets.len() - 1)
212        .add_buffer(offsets.into_data().buffers()[0].clone())
213        .add_child_data(values.into_data())
214        .build()?;
215
216    Ok(GenericListArray::from(data))
217}
218
219pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
220    DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
221}
222
223pub trait FixedSizeListArrayExt {
224    /// Create an [`FixedSizeListArray`] from values and list size.
225    ///
226    /// ```
227    /// use arrow_array::{Int64Array, FixedSizeListArray};
228    /// use arrow_array::types::Int64Type;
229    /// use lance_arrow::FixedSizeListArrayExt;
230    ///
231    /// let int_values = Int64Array::from_iter(0..10);
232    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 2).unwrap();
233    /// assert_eq!(fixed_size_list_arr,
234    ///     FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(vec![
235    ///         Some(vec![Some(0), Some(1)]),
236    ///         Some(vec![Some(2), Some(3)]),
237    ///         Some(vec![Some(4), Some(5)]),
238    ///         Some(vec![Some(6), Some(7)]),
239    ///         Some(vec![Some(8), Some(9)])
240    /// ], 2))
241    /// ```
242    fn try_new_from_values<T: Array + 'static>(
243        values: T,
244        list_size: i32,
245    ) -> Result<FixedSizeListArray>;
246
247    /// Sample `n` rows from the [FixedSizeListArray]
248    ///
249    /// ```
250    /// use arrow_array::{Int64Array, FixedSizeListArray, Array};
251    /// use lance_arrow::FixedSizeListArrayExt;
252    ///
253    /// let int_values = Int64Array::from_iter(0..256);
254    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 16).unwrap();
255    /// let sampled = fixed_size_list_arr.sample(10).unwrap();
256    /// assert_eq!(sampled.len(), 10);
257    /// assert_eq!(sampled.value_length(), 16);
258    /// assert_eq!(sampled.values().len(), 160);
259    /// ```
260    fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
261
262    /// Ensure the [FixedSizeListArray] of Float16, Float32, Float64,
263    /// Int8, Int16, Int32, Int64, UInt8, UInt32 type to its closest floating point type.
264    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
265}
266
267impl FixedSizeListArrayExt for FixedSizeListArray {
268    fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
269        let field = Arc::new(Field::new("item", values.data_type().clone(), true));
270        let values = Arc::new(values);
271
272        Self::try_new(field, list_size, values, None)
273    }
274
275    fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
276        if n >= self.len() {
277            return Ok(self.clone());
278        }
279        let mut rng = SmallRng::from_os_rng();
280        let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
281        take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
282    }
283
284    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
285        match self.data_type() {
286            DataType::FixedSizeList(field, size) => match field.data_type() {
287                DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
288                DataType::Int8 => Ok(Self::new(
289                    Arc::new(arrow_schema::Field::new(
290                        field.name(),
291                        DataType::Float32,
292                        field.is_nullable(),
293                    )),
294                    *size,
295                    Arc::new(Float32Array::from_iter_values(
296                        self.values()
297                            .as_any()
298                            .downcast_ref::<Int8Array>()
299                            .ok_or(ArrowError::ParseError(
300                                "Fail to cast primitive array to Int8Type".to_string(),
301                            ))?
302                            .into_iter()
303                            .filter_map(|x| x.map(|y| y as f32)),
304                    )),
305                    self.nulls().cloned(),
306                )),
307                DataType::Int16 => Ok(Self::new(
308                    Arc::new(arrow_schema::Field::new(
309                        field.name(),
310                        DataType::Float32,
311                        field.is_nullable(),
312                    )),
313                    *size,
314                    Arc::new(Float32Array::from_iter_values(
315                        self.values()
316                            .as_any()
317                            .downcast_ref::<Int16Array>()
318                            .ok_or(ArrowError::ParseError(
319                                "Fail to cast primitive array to Int8Type".to_string(),
320                            ))?
321                            .into_iter()
322                            .filter_map(|x| x.map(|y| y as f32)),
323                    )),
324                    self.nulls().cloned(),
325                )),
326                DataType::Int32 => Ok(Self::new(
327                    Arc::new(arrow_schema::Field::new(
328                        field.name(),
329                        DataType::Float32,
330                        field.is_nullable(),
331                    )),
332                    *size,
333                    Arc::new(Float32Array::from_iter_values(
334                        self.values()
335                            .as_any()
336                            .downcast_ref::<Int32Array>()
337                            .ok_or(ArrowError::ParseError(
338                                "Fail to cast primitive array to Int8Type".to_string(),
339                            ))?
340                            .into_iter()
341                            .filter_map(|x| x.map(|y| y as f32)),
342                    )),
343                    self.nulls().cloned(),
344                )),
345                DataType::Int64 => Ok(Self::new(
346                    Arc::new(arrow_schema::Field::new(
347                        field.name(),
348                        DataType::Float64,
349                        field.is_nullable(),
350                    )),
351                    *size,
352                    Arc::new(Float64Array::from_iter_values(
353                        self.values()
354                            .as_any()
355                            .downcast_ref::<Int64Array>()
356                            .ok_or(ArrowError::ParseError(
357                                "Fail to cast primitive array to Int8Type".to_string(),
358                            ))?
359                            .into_iter()
360                            .filter_map(|x| x.map(|y| y as f64)),
361                    )),
362                    self.nulls().cloned(),
363                )),
364                DataType::UInt8 => Ok(Self::new(
365                    Arc::new(arrow_schema::Field::new(
366                        field.name(),
367                        DataType::Float64,
368                        field.is_nullable(),
369                    )),
370                    *size,
371                    Arc::new(Float64Array::from_iter_values(
372                        self.values()
373                            .as_any()
374                            .downcast_ref::<UInt8Array>()
375                            .ok_or(ArrowError::ParseError(
376                                "Fail to cast primitive array to Int8Type".to_string(),
377                            ))?
378                            .into_iter()
379                            .filter_map(|x| x.map(|y| y as f64)),
380                    )),
381                    self.nulls().cloned(),
382                )),
383                DataType::UInt32 => Ok(Self::new(
384                    Arc::new(arrow_schema::Field::new(
385                        field.name(),
386                        DataType::Float64,
387                        field.is_nullable(),
388                    )),
389                    *size,
390                    Arc::new(Float64Array::from_iter_values(
391                        self.values()
392                            .as_any()
393                            .downcast_ref::<UInt32Array>()
394                            .ok_or(ArrowError::ParseError(
395                                "Fail to cast primitive array to Int8Type".to_string(),
396                            ))?
397                            .into_iter()
398                            .filter_map(|x| x.map(|y| y as f64)),
399                    )),
400                    self.nulls().cloned(),
401                )),
402                data_type => Err(ArrowError::ParseError(format!(
403                    "Expect either floating type or integer got {:?}",
404                    data_type
405                ))),
406            },
407            data_type => Err(ArrowError::ParseError(format!(
408                "Expect either FixedSizeList got {:?}",
409                data_type
410            ))),
411        }
412    }
413}
414
415/// Force downcast of an [`Array`], such as an [`ArrayRef`], to
416/// [`FixedSizeListArray`], panic'ing on failure.
417pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
418    arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
419}
420
421pub trait FixedSizeBinaryArrayExt {
422    /// Create an [`FixedSizeBinaryArray`] from values and stride.
423    ///
424    /// ```
425    /// use arrow_array::{UInt8Array, FixedSizeBinaryArray};
426    /// use arrow_array::types::UInt8Type;
427    /// use lance_arrow::FixedSizeBinaryArrayExt;
428    ///
429    /// let int_values = UInt8Array::from_iter(0..10);
430    /// let fixed_size_list_arr = FixedSizeBinaryArray::try_new_from_values(&int_values, 2).unwrap();
431    /// assert_eq!(fixed_size_list_arr,
432    ///     FixedSizeBinaryArray::from(vec![
433    ///         Some(vec![0, 1].as_slice()),
434    ///         Some(vec![2, 3].as_slice()),
435    ///         Some(vec![4, 5].as_slice()),
436    ///         Some(vec![6, 7].as_slice()),
437    ///         Some(vec![8, 9].as_slice())
438    /// ]))
439    /// ```
440    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
441}
442
443impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
444    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
445        let data_type = DataType::FixedSizeBinary(stride);
446        let data = ArrayDataBuilder::new(data_type)
447            .len(values.len() / stride as usize)
448            .add_buffer(values.into_data().buffers()[0].clone())
449            .build()?;
450        Ok(Self::from(data))
451    }
452}
453
454pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
455    arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
456}
457
458pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
459    match arr.data_type() {
460        DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
461        DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
462        _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
463    }
464}
465
466/// Extends Arrow's [RecordBatch].
467pub trait RecordBatchExt {
468    /// Append a new column to this [`RecordBatch`] and returns a new RecordBatch.
469    ///
470    /// ```
471    /// use std::sync::Arc;
472    /// use arrow_array::{RecordBatch, Int32Array, StringArray};
473    /// use arrow_schema::{Schema, Field, DataType};
474    /// use lance_arrow::*;
475    ///
476    /// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
477    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
478    /// let record_batch = RecordBatch::try_new(schema, vec![int_arr.clone()]).unwrap();
479    ///
480    /// let new_field = Field::new("s", DataType::Utf8, true);
481    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
482    /// let new_record_batch = record_batch.try_with_column(new_field, str_arr.clone()).unwrap();
483    ///
484    /// assert_eq!(
485    ///     new_record_batch,
486    ///     RecordBatch::try_new(
487    ///         Arc::new(Schema::new(
488    ///             vec![
489    ///                 Field::new("a", DataType::Int32, true),
490    ///                 Field::new("s", DataType::Utf8, true)
491    ///             ])
492    ///         ),
493    ///         vec![int_arr, str_arr],
494    ///     ).unwrap()
495    /// )
496    /// ```
497    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
498
499    /// Created a new RecordBatch with column at index.
500    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
501
502    /// Creates a new [`RecordBatch`] from the provided  [`StructArray`].
503    ///
504    /// The fields on the [`StructArray`] need to match this [`RecordBatch`] schema
505    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
506
507    /// Merge with another [`RecordBatch`] and returns a new one.
508    ///
509    /// Fields are merged based on name.  First we iterate the left columns.  If a matching
510    /// name is found in the right then we merge the two columns.  If there is no match then
511    /// we add the left column to the output.
512    ///
513    /// To merge two columns we consider the type.  If both arrays are struct arrays we recurse.
514    /// Otherwise we use the left array.
515    ///
516    /// Afterwards we add all non-matching right columns to the output.
517    ///
518    /// Note: This method likely does not handle nested fields correctly and you may want to consider
519    /// using [`merge_with_schema`] instead.
520    /// ```
521    /// use std::sync::Arc;
522    /// use arrow_array::*;
523    /// use arrow_schema::{Schema, Field, DataType};
524    /// use lance_arrow::*;
525    ///
526    /// let left_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
527    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
528    /// let left = RecordBatch::try_new(left_schema, vec![int_arr.clone()]).unwrap();
529    ///
530    /// let right_schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
531    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
532    /// let right = RecordBatch::try_new(right_schema, vec![str_arr.clone()]).unwrap();
533    ///
534    /// let new_record_batch = left.merge(&right).unwrap();
535    ///
536    /// assert_eq!(
537    ///     new_record_batch,
538    ///     RecordBatch::try_new(
539    ///         Arc::new(Schema::new(
540    ///             vec![
541    ///                 Field::new("a", DataType::Int32, true),
542    ///                 Field::new("s", DataType::Utf8, true)
543    ///             ])
544    ///         ),
545    ///         vec![int_arr, str_arr],
546    ///     ).unwrap()
547    /// )
548    /// ```
549    ///
550    /// TODO: add merge nested fields support.
551    fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
552
553    /// Create a batch by merging columns between two batches with a given schema.
554    ///
555    /// A reference schema is used to determine the proper ordering of nested fields.
556    ///
557    /// For each field in the reference schema we look for corresponding fields in
558    /// the left and right batches.  If a field is found in both batches we recursively merge
559    /// it.
560    ///
561    /// If a field is only in the left or right batch we take it as it is.
562    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
563
564    /// Drop one column specified with the name and return the new [`RecordBatch`].
565    ///
566    /// If the named column does not exist, it returns a copy of this [`RecordBatch`].
567    fn drop_column(&self, name: &str) -> Result<RecordBatch>;
568
569    /// Replace a column (specified by name) and return the new [`RecordBatch`].
570    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
571
572    /// Replace a column schema (specified by name) and return the new [`RecordBatch`].
573    fn replace_column_schema_by_name(
574        &self,
575        name: &str,
576        new_data_type: DataType,
577        column: Arc<dyn Array>,
578    ) -> Result<RecordBatch>;
579
580    /// Rename a column at a given index.
581    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
582
583    /// Get (potentially nested) column by qualified name.
584    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
585
586    /// Project the schema over the [RecordBatch].
587    fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
588
589    /// metadata of the schema.
590    fn metadata(&self) -> &HashMap<String, String>;
591
592    /// Add metadata to the schema.
593    fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
594        let mut metadata = self.metadata().clone();
595        metadata.insert(key, value);
596        self.with_metadata(metadata)
597    }
598
599    /// Replace the schema metadata with the provided one.
600    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
601
602    /// Take selected rows from the [RecordBatch].
603    fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
604
605    /// Create a new RecordBatch with compacted memory after slicing.
606    fn shrink_to_fit(&self) -> Result<RecordBatch>;
607}
608
609impl RecordBatchExt for RecordBatch {
610    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
611        let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
612        let mut new_columns = self.columns().to_vec();
613        new_columns.push(arr);
614        Self::try_new(new_schema, new_columns)
615    }
616
617    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
618        let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
619        let mut new_columns = self.columns().to_vec();
620        new_columns.insert(index, arr);
621        Self::try_new(new_schema, new_columns)
622    }
623
624    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
625        let schema = Arc::new(Schema::new_with_metadata(
626            arr.fields().to_vec(),
627            self.schema().metadata.clone(),
628        ));
629        let batch = Self::from(arr);
630        batch.with_schema(schema)
631    }
632
633    fn merge(&self, other: &Self) -> Result<Self> {
634        if self.num_rows() != other.num_rows() {
635            return Err(ArrowError::InvalidArgumentError(format!(
636                "Attempt to merge two RecordBatch with different sizes: {} != {}",
637                self.num_rows(),
638                other.num_rows()
639            )));
640        }
641        let left_struct_array: StructArray = self.clone().into();
642        let right_struct_array: StructArray = other.clone().into();
643        self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
644    }
645
646    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
647        if self.num_rows() != other.num_rows() {
648            return Err(ArrowError::InvalidArgumentError(format!(
649                "Attempt to merge two RecordBatch with different sizes: {} != {}",
650                self.num_rows(),
651                other.num_rows()
652            )));
653        }
654        let left_struct_array: StructArray = self.clone().into();
655        let right_struct_array: StructArray = other.clone().into();
656        self.try_new_from_struct_array(merge_with_schema(
657            &left_struct_array,
658            &right_struct_array,
659            schema.fields(),
660        ))
661    }
662
663    fn drop_column(&self, name: &str) -> Result<Self> {
664        let mut fields = vec![];
665        let mut columns = vec![];
666        for i in 0..self.schema().fields.len() {
667            if self.schema().field(i).name() != name {
668                fields.push(self.schema().field(i).clone());
669                columns.push(self.column(i).clone());
670            }
671        }
672        Self::try_new(
673            Arc::new(Schema::new_with_metadata(
674                fields,
675                self.schema().metadata().clone(),
676            )),
677            columns,
678        )
679    }
680
681    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
682        let mut fields = self.schema().fields().to_vec();
683        if index >= fields.len() {
684            return Err(ArrowError::InvalidArgumentError(format!(
685                "Index out of bounds: {}",
686                index
687            )));
688        }
689        fields[index] = Arc::new(Field::new(
690            new_name,
691            fields[index].data_type().clone(),
692            fields[index].is_nullable(),
693        ));
694        Self::try_new(
695            Arc::new(Schema::new_with_metadata(
696                fields,
697                self.schema().metadata().clone(),
698            )),
699            self.columns().to_vec(),
700        )
701    }
702
703    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
704        let mut columns = self.columns().to_vec();
705        let field_i = self
706            .schema()
707            .fields()
708            .iter()
709            .position(|f| f.name() == name)
710            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
711        columns[field_i] = column;
712        Self::try_new(self.schema(), columns)
713    }
714
715    fn replace_column_schema_by_name(
716        &self,
717        name: &str,
718        new_data_type: DataType,
719        column: Arc<dyn Array>,
720    ) -> Result<RecordBatch> {
721        let fields = self
722            .schema()
723            .fields()
724            .iter()
725            .map(|x| {
726                if x.name() != name {
727                    x.clone()
728                } else {
729                    let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
730                    Arc::new(new_field)
731                }
732            })
733            .collect::<Vec<_>>();
734        let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
735        let mut columns = self.columns().to_vec();
736        let field_i = self
737            .schema()
738            .fields()
739            .iter()
740            .position(|f| f.name() == name)
741            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
742        columns[field_i] = column;
743        Self::try_new(Arc::new(schema), columns)
744    }
745
746    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
747        let split = name.split('.').collect::<Vec<_>>();
748        if split.is_empty() {
749            return None;
750        }
751
752        self.column_by_name(split[0])
753            .and_then(|arr| get_sub_array(arr, &split[1..]))
754    }
755
756    fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
757        let struct_array: StructArray = self.clone().into();
758        self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
759    }
760
761    fn metadata(&self) -> &HashMap<String, String> {
762        self.schema_ref().metadata()
763    }
764
765    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
766        let mut schema = self.schema_ref().as_ref().clone();
767        schema.metadata = metadata;
768        Self::try_new(schema.into(), self.columns().into())
769    }
770
771    fn take(&self, indices: &UInt32Array) -> Result<Self> {
772        let struct_array: StructArray = self.clone().into();
773        let taken = take(&struct_array, indices, None)?;
774        self.try_new_from_struct_array(taken.as_struct().clone())
775    }
776
777    fn shrink_to_fit(&self) -> Result<Self> {
778        // Deep copy the sliced record batch, instead of whole batch
779        crate::deepcopy::deep_copy_batch_sliced(self)
780    }
781}
782
783fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
784    if fields.is_empty() {
785        return Ok(StructArray::new_empty_fields(
786            struct_array.len(),
787            struct_array.nulls().cloned(),
788        ));
789    }
790    let mut columns: Vec<ArrayRef> = vec![];
791    for field in fields.iter() {
792        if let Some(col) = struct_array.column_by_name(field.name()) {
793            match field.data_type() {
794                // TODO handle list-of-struct
795                DataType::Struct(subfields) => {
796                    let projected = project(col.as_struct(), subfields)?;
797                    columns.push(Arc::new(projected));
798                }
799                _ => {
800                    columns.push(col.clone());
801                }
802            }
803        } else {
804            return Err(ArrowError::SchemaError(format!(
805                "field {} does not exist in the RecordBatch",
806                field.name()
807            )));
808        }
809    }
810    // Preserve the struct's validity when projecting
811    StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
812}
813
814fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
815    let left_list: &GenericListArray<T> = left.as_list();
816    let right_list: &GenericListArray<T> = right.as_list();
817    left_list.offsets().inner() == right_list.offsets().inner()
818}
819
820fn merge_list_structs_helper<T: OffsetSizeTrait>(
821    left: &dyn Array,
822    right: &dyn Array,
823    items_field_name: impl Into<String>,
824    items_nullable: bool,
825) -> Arc<dyn Array> {
826    let left_list: &GenericListArray<T> = left.as_list();
827    let right_list: &GenericListArray<T> = right.as_list();
828    let left_struct = left_list.values();
829    let right_struct = right_list.values();
830    let left_struct_arr = left_struct.as_struct();
831    let right_struct_arr = right_struct.as_struct();
832    let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
833    let items_field = Arc::new(Field::new(
834        items_field_name,
835        merged_items.data_type().clone(),
836        items_nullable,
837    ));
838    Arc::new(GenericListArray::<T>::new(
839        items_field,
840        left_list.offsets().clone(),
841        merged_items,
842        left_list.nulls().cloned(),
843    ))
844}
845
846fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
847    left: &dyn Array,
848    right: &dyn Array,
849    not_null: &dyn Array,
850    items_field_name: impl Into<String>,
851) -> Arc<dyn Array> {
852    let left_list: &GenericListArray<T> = left.as_list::<T>();
853    let not_null_list = not_null.as_list::<T>();
854    let right_list = right.as_list::<T>();
855
856    let left_struct = left_list.values().as_struct();
857    let not_null_struct: &StructArray = not_null_list.values().as_struct();
858    let right_struct = right_list.values().as_struct();
859
860    let values_len = not_null_list.values().len();
861    let mut merged_fields =
862        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
863    let mut merged_columns =
864        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
865
866    for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
867        merged_fields.push(field.clone());
868        if let Some(val) = not_null_struct.column_by_name(field.name()) {
869            merged_columns.push(val.clone());
870        } else {
871            merged_columns.push(new_null_array(field.data_type(), values_len))
872        }
873    }
874    for (_, field) in right_struct
875        .columns()
876        .iter()
877        .zip(right_struct.fields())
878        .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
879    {
880        merged_fields.push(field.clone());
881        if let Some(val) = not_null_struct.column_by_name(field.name()) {
882            merged_columns.push(val.clone());
883        } else {
884            merged_columns.push(new_null_array(field.data_type(), values_len));
885        }
886    }
887
888    let merged_struct = Arc::new(StructArray::new(
889        Fields::from(merged_fields),
890        merged_columns,
891        not_null_struct.nulls().cloned(),
892    ));
893    let items_field = Arc::new(Field::new(
894        items_field_name,
895        merged_struct.data_type().clone(),
896        true,
897    ));
898    Arc::new(GenericListArray::<T>::new(
899        items_field,
900        not_null_list.offsets().clone(),
901        merged_struct,
902        not_null_list.nulls().cloned(),
903    ))
904}
905
906fn merge_list_struct_null(
907    left: &dyn Array,
908    right: &dyn Array,
909    not_null: &dyn Array,
910) -> Arc<dyn Array> {
911    match left.data_type() {
912        DataType::List(left_field) => {
913            merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
914        }
915        DataType::LargeList(left_field) => {
916            merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
917        }
918        _ => unreachable!(),
919    }
920}
921
922fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
923    // Merging fields into a list<struct<...>> is tricky and can only succeed
924    // in two ways.  First, if both lists have the same offsets.  Second, if
925    // one of the lists is all-null
926    if left.null_count() == left.len() {
927        return merge_list_struct_null(left, right, right);
928    } else if right.null_count() == right.len() {
929        return merge_list_struct_null(left, right, left);
930    }
931    match (left.data_type(), right.data_type()) {
932        (DataType::List(left_field), DataType::List(_)) => {
933            if !lists_have_same_offsets_helper::<i32>(left, right) {
934                panic!("Attempt to merge list struct arrays which do not have same offsets");
935            }
936            merge_list_structs_helper::<i32>(
937                left,
938                right,
939                left_field.name(),
940                left_field.is_nullable(),
941            )
942        }
943        (DataType::LargeList(left_field), DataType::LargeList(_)) => {
944            if !lists_have_same_offsets_helper::<i64>(left, right) {
945                panic!("Attempt to merge list struct arrays which do not have same offsets");
946            }
947            merge_list_structs_helper::<i64>(
948                left,
949                right,
950                left_field.name(),
951                left_field.is_nullable(),
952            )
953        }
954        _ => unreachable!(),
955    }
956}
957
958/// Helper function to normalize validity buffers
959/// Returns None for all-null validity (placeholder structs)
960fn normalize_validity(
961    validity: Option<&arrow_buffer::NullBuffer>,
962) -> Option<&arrow_buffer::NullBuffer> {
963    validity.and_then(|v| {
964        if v.null_count() == v.len() {
965            None
966        } else {
967            Some(v)
968        }
969    })
970}
971
972/// Helper function to merge validity buffers from two struct arrays
973/// Returns None only if both arrays are null at the same position
974///
975/// Special handling for placeholder structs (all-null validity)
976fn merge_struct_validity(
977    left_validity: Option<&arrow_buffer::NullBuffer>,
978    right_validity: Option<&arrow_buffer::NullBuffer>,
979) -> Option<arrow_buffer::NullBuffer> {
980    // Normalize both validity buffers (convert all-null to None)
981    let left_normalized = normalize_validity(left_validity);
982    let right_normalized = normalize_validity(right_validity);
983
984    match (left_normalized, right_normalized) {
985        // Fast paths: no computation needed
986        (None, None) => None,
987        (Some(left), None) => Some(left.clone()),
988        (None, Some(right)) => Some(right.clone()),
989        (Some(left), Some(right)) => {
990            // Fast path: if both have no nulls, can return either one
991            if left.null_count() == 0 && right.null_count() == 0 {
992                return Some(left.clone());
993            }
994
995            let left_buffer = left.inner();
996            let right_buffer = right.inner();
997
998            // Perform bitwise OR directly on BooleanBuffers
999            // This preserves the correct semantics: 1 = valid, 0 = null
1000            let merged_buffer = left_buffer | right_buffer;
1001
1002            Some(arrow_buffer::NullBuffer::from(merged_buffer))
1003        }
1004    }
1005}
1006
1007fn merge_list_child_values(
1008    child_field: &Field,
1009    left_values: ArrayRef,
1010    right_values: ArrayRef,
1011) -> ArrayRef {
1012    match child_field.data_type() {
1013        DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1014            left_values.as_struct(),
1015            right_values.as_struct(),
1016            child_fields,
1017        )) as ArrayRef,
1018        DataType::List(grandchild) => {
1019            let left_list = left_values
1020                .as_any()
1021                .downcast_ref::<ListArray>()
1022                .expect("left list values should be ListArray");
1023            let right_list = right_values
1024                .as_any()
1025                .downcast_ref::<ListArray>()
1026                .expect("right list values should be ListArray");
1027            let merged_values = merge_list_child_values(
1028                grandchild.as_ref(),
1029                left_list.values().clone(),
1030                right_list.values().clone(),
1031            );
1032            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1033            Arc::new(ListArray::new(
1034                grandchild.clone(),
1035                left_list.offsets().clone(),
1036                merged_values,
1037                merged_validity,
1038            )) as ArrayRef
1039        }
1040        DataType::LargeList(grandchild) => {
1041            let left_list = left_values
1042                .as_any()
1043                .downcast_ref::<LargeListArray>()
1044                .expect("left list values should be LargeListArray");
1045            let right_list = right_values
1046                .as_any()
1047                .downcast_ref::<LargeListArray>()
1048                .expect("right list values should be LargeListArray");
1049            let merged_values = merge_list_child_values(
1050                grandchild.as_ref(),
1051                left_list.values().clone(),
1052                right_list.values().clone(),
1053            );
1054            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1055            Arc::new(LargeListArray::new(
1056                grandchild.clone(),
1057                left_list.offsets().clone(),
1058                merged_values,
1059                merged_validity,
1060            )) as ArrayRef
1061        }
1062        DataType::FixedSizeList(grandchild, list_size) => {
1063            let left_list = left_values
1064                .as_any()
1065                .downcast_ref::<FixedSizeListArray>()
1066                .expect("left list values should be FixedSizeListArray");
1067            let right_list = right_values
1068                .as_any()
1069                .downcast_ref::<FixedSizeListArray>()
1070                .expect("right list values should be FixedSizeListArray");
1071            let merged_values = merge_list_child_values(
1072                grandchild.as_ref(),
1073                left_list.values().clone(),
1074                right_list.values().clone(),
1075            );
1076            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1077            Arc::new(FixedSizeListArray::new(
1078                grandchild.clone(),
1079                *list_size,
1080                merged_values,
1081                merged_validity,
1082            )) as ArrayRef
1083        }
1084        _ => left_values.clone(),
1085    }
1086}
1087
1088// Helper function to adjust child array validity based on parent struct validity
1089// When parent struct is null, propagates null to child array
1090// Optimized with fast paths and SIMD operations
1091fn adjust_child_validity(
1092    child: &ArrayRef,
1093    parent_validity: Option<&arrow_buffer::NullBuffer>,
1094) -> ArrayRef {
1095    // Fast path: no parent validity means no adjustment needed
1096    let parent_validity = match parent_validity {
1097        None => return child.clone(),
1098        Some(p) if p.null_count() == 0 => return child.clone(), // No nulls to propagate
1099        Some(p) => p,
1100    };
1101
1102    let child_validity = child.nulls();
1103
1104    // Compute the new validity: child_validity AND parent_validity
1105    let new_validity = match child_validity {
1106        None => {
1107            // Fast path: child has no existing validity, just use parent's
1108            parent_validity.clone()
1109        }
1110        Some(child_nulls) => {
1111            let child_buffer = child_nulls.inner();
1112            let parent_buffer = parent_validity.inner();
1113
1114            // Perform bitwise AND directly on BooleanBuffers
1115            // This preserves the correct semantics: 1 = valid, 0 = null
1116            let merged_buffer = child_buffer & parent_buffer;
1117
1118            arrow_buffer::NullBuffer::from(merged_buffer)
1119        }
1120    };
1121
1122    // Create new array with adjusted validity
1123    arrow_array::make_array(
1124        arrow_data::ArrayData::try_new(
1125            child.data_type().clone(),
1126            child.len(),
1127            Some(new_validity.into_inner().into_inner()),
1128            child.offset(),
1129            child.to_data().buffers().to_vec(),
1130            child.to_data().child_data().to_vec(),
1131        )
1132        .unwrap(),
1133    )
1134}
1135
1136fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1137    let mut fields: Vec<Field> = vec![];
1138    let mut columns: Vec<ArrayRef> = vec![];
1139    let right_fields = right_struct_array.fields();
1140    let right_columns = right_struct_array.columns();
1141
1142    // Get the validity buffers from both structs
1143    let left_validity = left_struct_array.nulls();
1144    let right_validity = right_struct_array.nulls();
1145
1146    // Compute merged validity
1147    let merged_validity = merge_struct_validity(left_validity, right_validity);
1148
1149    // iterate through the fields on the left hand side
1150    for (left_field, left_column) in left_struct_array
1151        .fields()
1152        .iter()
1153        .zip(left_struct_array.columns().iter())
1154    {
1155        match right_fields
1156            .iter()
1157            .position(|f| f.name() == left_field.name())
1158        {
1159            // if the field exists on the right hand side, merge them recursively if appropriate
1160            Some(right_index) => {
1161                let right_field = right_fields.get(right_index).unwrap();
1162                let right_column = right_columns.get(right_index).unwrap();
1163                // if both fields are struct, merge them recursively
1164                match (left_field.data_type(), right_field.data_type()) {
1165                    (DataType::Struct(_), DataType::Struct(_)) => {
1166                        let left_sub_array = left_column.as_struct();
1167                        let right_sub_array = right_column.as_struct();
1168                        let merged_sub_array = merge(left_sub_array, right_sub_array);
1169                        fields.push(Field::new(
1170                            left_field.name(),
1171                            merged_sub_array.data_type().clone(),
1172                            left_field.is_nullable(),
1173                        ));
1174                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1175                    }
1176                    (DataType::List(left_list), DataType::List(right_list))
1177                        if left_list.data_type().is_struct()
1178                            && right_list.data_type().is_struct() =>
1179                    {
1180                        // If there is nothing to merge just use the left field
1181                        if left_list.data_type() == right_list.data_type() {
1182                            fields.push(left_field.as_ref().clone());
1183                            columns.push(left_column.clone());
1184                        }
1185                        // If we have two List<Struct> and they have different sets of fields then
1186                        // we can merge them if the offsets arrays are the same.  Otherwise, we
1187                        // have to consider it an error.
1188                        let merged_sub_array = merge_list_struct(&left_column, &right_column);
1189
1190                        fields.push(Field::new(
1191                            left_field.name(),
1192                            merged_sub_array.data_type().clone(),
1193                            left_field.is_nullable(),
1194                        ));
1195                        columns.push(merged_sub_array);
1196                    }
1197                    // otherwise, just use the field on the left hand side
1198                    _ => {
1199                        // TODO handle list-of-struct and other types
1200                        fields.push(left_field.as_ref().clone());
1201                        // Adjust the column validity: if left struct was null, propagate to child
1202                        let adjusted_column = adjust_child_validity(left_column, left_validity);
1203                        columns.push(adjusted_column);
1204                    }
1205                }
1206            }
1207            None => {
1208                fields.push(left_field.as_ref().clone());
1209                // Adjust the column validity: if left struct was null, propagate to child
1210                let adjusted_column = adjust_child_validity(left_column, left_validity);
1211                columns.push(adjusted_column);
1212            }
1213        }
1214    }
1215
1216    // now iterate through the fields on the right hand side
1217    right_fields
1218        .iter()
1219        .zip(right_columns.iter())
1220        .for_each(|(field, column)| {
1221            // add new columns on the right
1222            if !left_struct_array
1223                .fields()
1224                .iter()
1225                .any(|f| f.name() == field.name())
1226            {
1227                fields.push(field.as_ref().clone());
1228                // This field doesn't exist on the left
1229                // We use the right's column but need to adjust for struct validity
1230                let adjusted_column = adjust_child_validity(column, right_validity);
1231                columns.push(adjusted_column);
1232            }
1233        });
1234
1235    StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1236}
1237
1238fn merge_with_schema(
1239    left_struct_array: &StructArray,
1240    right_struct_array: &StructArray,
1241    fields: &Fields,
1242) -> StructArray {
1243    // Helper function that returns true if both types are struct or both are non-struct
1244    fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1245        match (left, right) {
1246            (DataType::Struct(_), DataType::Struct(_)) => true,
1247            (DataType::Struct(_), _) => false,
1248            (_, DataType::Struct(_)) => false,
1249            _ => true,
1250        }
1251    }
1252
1253    let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1254    let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1255
1256    let left_fields = left_struct_array.fields();
1257    let left_columns = left_struct_array.columns();
1258    let right_fields = right_struct_array.fields();
1259    let right_columns = right_struct_array.columns();
1260
1261    // Get the validity buffers from both structs
1262    let left_validity = left_struct_array.nulls();
1263    let right_validity = right_struct_array.nulls();
1264
1265    // Compute merged validity
1266    let merged_validity = merge_struct_validity(left_validity, right_validity);
1267
1268    for field in fields {
1269        let left_match_idx = left_fields.iter().position(|f| {
1270            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1271        });
1272        let right_match_idx = right_fields.iter().position(|f| {
1273            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1274        });
1275
1276        match (left_match_idx, right_match_idx) {
1277            (None, Some(right_idx)) => {
1278                output_fields.push(right_fields[right_idx].as_ref().clone());
1279                // Adjust validity if the right struct was null
1280                let adjusted_column =
1281                    adjust_child_validity(&right_columns[right_idx], right_validity);
1282                columns.push(adjusted_column);
1283            }
1284            (Some(left_idx), None) => {
1285                output_fields.push(left_fields[left_idx].as_ref().clone());
1286                // Adjust validity if the left struct was null
1287                let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1288                columns.push(adjusted_column);
1289            }
1290            (Some(left_idx), Some(right_idx)) => {
1291                match field.data_type() {
1292                    DataType::Struct(child_fields) => {
1293                        let left_sub_array = left_columns[left_idx].as_struct();
1294                        let right_sub_array = right_columns[right_idx].as_struct();
1295                        let merged_sub_array =
1296                            merge_with_schema(left_sub_array, right_sub_array, child_fields);
1297                        output_fields.push(Field::new(
1298                            field.name(),
1299                            merged_sub_array.data_type().clone(),
1300                            field.is_nullable(),
1301                        ));
1302                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1303                    }
1304                    DataType::List(child_field) => {
1305                        let left_list = left_columns[left_idx]
1306                            .as_any()
1307                            .downcast_ref::<ListArray>()
1308                            .unwrap();
1309                        let right_list = right_columns[right_idx]
1310                            .as_any()
1311                            .downcast_ref::<ListArray>()
1312                            .unwrap();
1313                        let merged_values = merge_list_child_values(
1314                            child_field.as_ref(),
1315                            left_list.trimmed_values(),
1316                            right_list.trimmed_values(),
1317                        );
1318                        let merged_validity =
1319                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1320                        let merged_list = ListArray::new(
1321                            child_field.clone(),
1322                            left_list.offsets().clone(),
1323                            merged_values,
1324                            merged_validity,
1325                        );
1326                        output_fields.push(field.as_ref().clone());
1327                        columns.push(Arc::new(merged_list) as ArrayRef);
1328                    }
1329                    DataType::LargeList(child_field) => {
1330                        let left_list = left_columns[left_idx]
1331                            .as_any()
1332                            .downcast_ref::<LargeListArray>()
1333                            .unwrap();
1334                        let right_list = right_columns[right_idx]
1335                            .as_any()
1336                            .downcast_ref::<LargeListArray>()
1337                            .unwrap();
1338                        let merged_values = merge_list_child_values(
1339                            child_field.as_ref(),
1340                            left_list.trimmed_values(),
1341                            right_list.trimmed_values(),
1342                        );
1343                        let merged_validity =
1344                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1345                        let merged_list = LargeListArray::new(
1346                            child_field.clone(),
1347                            left_list.offsets().clone(),
1348                            merged_values,
1349                            merged_validity,
1350                        );
1351                        output_fields.push(field.as_ref().clone());
1352                        columns.push(Arc::new(merged_list) as ArrayRef);
1353                    }
1354                    DataType::FixedSizeList(child_field, list_size) => {
1355                        let left_list = left_columns[left_idx]
1356                            .as_any()
1357                            .downcast_ref::<FixedSizeListArray>()
1358                            .unwrap();
1359                        let right_list = right_columns[right_idx]
1360                            .as_any()
1361                            .downcast_ref::<FixedSizeListArray>()
1362                            .unwrap();
1363                        let merged_values = merge_list_child_values(
1364                            child_field.as_ref(),
1365                            left_list.values().clone(),
1366                            right_list.values().clone(),
1367                        );
1368                        let merged_validity =
1369                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1370                        let merged_list = FixedSizeListArray::new(
1371                            child_field.clone(),
1372                            *list_size,
1373                            merged_values,
1374                            merged_validity,
1375                        );
1376                        output_fields.push(field.as_ref().clone());
1377                        columns.push(Arc::new(merged_list) as ArrayRef);
1378                    }
1379                    _ => {
1380                        output_fields.push(left_fields[left_idx].as_ref().clone());
1381                        // For fields that exist in both, use left but adjust validity
1382                        let adjusted_column =
1383                            adjust_child_validity(&left_columns[left_idx], left_validity);
1384                        columns.push(adjusted_column);
1385                    }
1386                }
1387            }
1388            (None, None) => {
1389                // The field will not be included in the output
1390            }
1391        }
1392    }
1393
1394    StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1395}
1396
1397fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1398    if components.is_empty() {
1399        return Some(array);
1400    }
1401    if !matches!(array.data_type(), DataType::Struct(_)) {
1402        return None;
1403    }
1404    let struct_arr = array.as_struct();
1405    struct_arr
1406        .column_by_name(components[0])
1407        .and_then(|arr| get_sub_array(arr, &components[1..]))
1408}
1409
1410/// Interleave multiple RecordBatches into a single RecordBatch.
1411///
1412/// Behaves like [`arrow::compute::interleave`], but for RecordBatches.
1413pub fn interleave_batches(
1414    batches: &[RecordBatch],
1415    indices: &[(usize, usize)],
1416) -> Result<RecordBatch> {
1417    let first_batch = batches.first().ok_or_else(|| {
1418        ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1419    })?;
1420    let schema = first_batch.schema();
1421    let num_columns = first_batch.num_columns();
1422    let mut columns = Vec::with_capacity(num_columns);
1423    let mut chunks = Vec::with_capacity(batches.len());
1424
1425    for i in 0..num_columns {
1426        for batch in batches {
1427            chunks.push(batch.column(i).as_ref());
1428        }
1429        let new_column = interleave(&chunks, indices)?;
1430        columns.push(new_column);
1431        chunks.clear();
1432    }
1433
1434    RecordBatch::try_new(schema, columns)
1435}
1436
1437pub trait BufferExt {
1438    /// Create an `arrow_buffer::Buffer`` from a `bytes::Bytes` object
1439    ///
1440    /// The alignment must be specified (as `bytes_per_value`) since we want to make
1441    /// sure we can safely reinterpret the buffer.
1442    ///
1443    /// If the buffer is properly aligned this will be zero-copy.  If not, a copy
1444    /// will be made and an owned buffer returned.
1445    ///
1446    /// If `bytes_per_value` is not a power of two, then we assume the buffer is
1447    /// never going to be reinterpreted into another type and we can safely
1448    /// ignore the alignment.
1449    ///
1450    /// Yes, the method name is odd.  It's because there is already a `from_bytes`
1451    /// which converts from `arrow_buffer::bytes::Bytes` (not `bytes::Bytes`)
1452    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1453
1454    /// Allocates a new properly aligned arrow buffer and copies `bytes` into it
1455    ///
1456    /// `size_bytes` can be larger than `bytes` and, if so, the trailing bytes will
1457    /// be zeroed out.
1458    ///
1459    /// # Panics
1460    ///
1461    /// Panics if `size_bytes` is less than `bytes.len()`
1462    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1463}
1464
1465fn is_pwr_two(n: u64) -> bool {
1466    n & (n - 1) == 0
1467}
1468
1469impl BufferExt for arrow_buffer::Buffer {
1470    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1471        if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1472        {
1473            // The original buffer is not aligned, cannot zero-copy
1474            let size_bytes = bytes.len();
1475            Self::copy_bytes_bytes(bytes, size_bytes)
1476        } else {
1477            // The original buffer is aligned, can zero-copy
1478            // SAFETY: the alignment is correct we can make this conversion
1479            unsafe {
1480                Self::from_custom_allocation(
1481                    NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1482                    bytes.len(),
1483                    Arc::new(bytes),
1484                )
1485            }
1486        }
1487    }
1488
1489    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1490        assert!(size_bytes >= bytes.len());
1491        let mut buf = MutableBuffer::with_capacity(size_bytes);
1492        let to_fill = size_bytes - bytes.len();
1493        buf.extend(bytes);
1494        buf.extend(std::iter::repeat_n(0_u8, to_fill));
1495
1496        // FIX for issue #4512: Shrink buffer to actual size before converting to immutable
1497        // This reduces memory overhead from capacity over-allocation
1498        buf.shrink_to_fit();
1499
1500        Self::from(buf)
1501    }
1502}
1503
1504#[cfg(test)]
1505mod tests {
1506    use super::*;
1507    use arrow_array::{new_empty_array, new_null_array, ListArray, StringArray};
1508    use arrow_array::{Float32Array, Int32Array, StructArray};
1509    use arrow_buffer::OffsetBuffer;
1510
1511    #[test]
1512    fn test_merge_recursive() {
1513        let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1514        let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1515        let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1516        let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1517
1518        let left_schema = Schema::new(vec![
1519            Field::new("a", DataType::Int32, true),
1520            Field::new(
1521                "b",
1522                DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1523                true,
1524            ),
1525        ]);
1526        let left_batch = RecordBatch::try_new(
1527            Arc::new(left_schema),
1528            vec![
1529                Arc::new(a_array.clone()),
1530                Arc::new(StructArray::from(vec![(
1531                    Arc::new(Field::new("c", DataType::Int32, true)),
1532                    Arc::new(c_array.clone()) as ArrayRef,
1533                )])),
1534            ],
1535        )
1536        .unwrap();
1537
1538        let right_schema = Schema::new(vec![
1539            Field::new("e", DataType::Int32, true),
1540            Field::new(
1541                "b",
1542                DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1543                true,
1544            ),
1545        ]);
1546        let right_batch = RecordBatch::try_new(
1547            Arc::new(right_schema),
1548            vec![
1549                Arc::new(e_array.clone()),
1550                Arc::new(StructArray::from(vec![(
1551                    Arc::new(Field::new("d", DataType::Utf8, true)),
1552                    Arc::new(d_array.clone()) as ArrayRef,
1553                )])) as ArrayRef,
1554            ],
1555        )
1556        .unwrap();
1557
1558        let merged_schema = Schema::new(vec![
1559            Field::new("a", DataType::Int32, true),
1560            Field::new(
1561                "b",
1562                DataType::Struct(
1563                    vec![
1564                        Field::new("c", DataType::Int32, true),
1565                        Field::new("d", DataType::Utf8, true),
1566                    ]
1567                    .into(),
1568                ),
1569                true,
1570            ),
1571            Field::new("e", DataType::Int32, true),
1572        ]);
1573        let merged_batch = RecordBatch::try_new(
1574            Arc::new(merged_schema),
1575            vec![
1576                Arc::new(a_array) as ArrayRef,
1577                Arc::new(StructArray::from(vec![
1578                    (
1579                        Arc::new(Field::new("c", DataType::Int32, true)),
1580                        Arc::new(c_array) as ArrayRef,
1581                    ),
1582                    (
1583                        Arc::new(Field::new("d", DataType::Utf8, true)),
1584                        Arc::new(d_array) as ArrayRef,
1585                    ),
1586                ])) as ArrayRef,
1587                Arc::new(e_array) as ArrayRef,
1588            ],
1589        )
1590        .unwrap();
1591
1592        let result = left_batch.merge(&right_batch).unwrap();
1593        assert_eq!(result, merged_batch);
1594    }
1595
1596    #[test]
1597    fn test_merge_with_schema() {
1598        fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1599            let fields: Fields = names
1600                .iter()
1601                .zip(types)
1602                .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1603                .collect();
1604            let schema = Schema::new(vec![Field::new(
1605                "struct",
1606                DataType::Struct(fields.clone()),
1607                false,
1608            )]);
1609            let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1610            let batch = RecordBatch::try_new(
1611                Arc::new(schema.clone()),
1612                vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1613            );
1614            (schema, batch.unwrap())
1615        }
1616
1617        let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1618        let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1619        let (output_schema, _) = test_batch(
1620            &["b", "a", "c"],
1621            &[DataType::Int64, DataType::Int32, DataType::Int32],
1622        );
1623
1624        // If we use merge_with_schema the schema is respected
1625        let merged = left_batch
1626            .merge_with_schema(&right_batch, &output_schema)
1627            .unwrap();
1628        assert_eq!(merged.schema().as_ref(), &output_schema);
1629
1630        // If we use merge we get first-come first-serve based on the left batch
1631        let (naive_schema, _) = test_batch(
1632            &["a", "b", "c"],
1633            &[DataType::Int32, DataType::Int64, DataType::Int32],
1634        );
1635        let merged = left_batch.merge(&right_batch).unwrap();
1636        assert_eq!(merged.schema().as_ref(), &naive_schema);
1637    }
1638
1639    #[test]
1640    fn test_merge_list_struct() {
1641        let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1642        let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1643        let x_struct_field = Arc::new(Field::new(
1644            "item",
1645            DataType::Struct(Fields::from(vec![x_field.clone()])),
1646            true,
1647        ));
1648        let y_struct_field = Arc::new(Field::new(
1649            "item",
1650            DataType::Struct(Fields::from(vec![y_field.clone()])),
1651            true,
1652        ));
1653        let both_struct_field = Arc::new(Field::new(
1654            "item",
1655            DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1656            true,
1657        ));
1658        let left_schema = Schema::new(vec![Field::new(
1659            "list_struct",
1660            DataType::List(x_struct_field.clone()),
1661            true,
1662        )]);
1663        let right_schema = Schema::new(vec![Field::new(
1664            "list_struct",
1665            DataType::List(y_struct_field.clone()),
1666            true,
1667        )]);
1668        let both_schema = Schema::new(vec![Field::new(
1669            "list_struct",
1670            DataType::List(both_struct_field.clone()),
1671            true,
1672        )]);
1673
1674        let x = Arc::new(Int32Array::from(vec![1]));
1675        let y = Arc::new(Int32Array::from(vec![2]));
1676        let x_struct = Arc::new(StructArray::new(
1677            Fields::from(vec![x_field.clone()]),
1678            vec![x.clone()],
1679            None,
1680        ));
1681        let y_struct = Arc::new(StructArray::new(
1682            Fields::from(vec![y_field.clone()]),
1683            vec![y.clone()],
1684            None,
1685        ));
1686        let both_struct = Arc::new(StructArray::new(
1687            Fields::from(vec![x_field.clone(), y_field.clone()]),
1688            vec![x.clone(), y],
1689            None,
1690        ));
1691        let both_null_struct = Arc::new(StructArray::new(
1692            Fields::from(vec![x_field, y_field]),
1693            vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1694            None,
1695        ));
1696        let offsets = OffsetBuffer::from_lengths([1]);
1697        let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1698        let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1699        let both_list = ListArray::new(
1700            both_struct_field.clone(),
1701            offsets.clone(),
1702            both_struct,
1703            None,
1704        );
1705        let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1706        let x_batch =
1707            RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1708        let y_batch = RecordBatch::try_new(
1709            Arc::new(right_schema.clone()),
1710            vec![Arc::new(y_s_list.clone())],
1711        )
1712        .unwrap();
1713        let merged = x_batch.merge(&y_batch).unwrap();
1714        let expected =
1715            RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1716        assert_eq!(merged, expected);
1717
1718        let y_null_list = new_null_array(y_s_list.data_type(), 1);
1719        let y_null_batch =
1720            RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1721                .unwrap();
1722        let expected =
1723            RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1724        let merged = x_batch.merge(&y_null_batch).unwrap();
1725        assert_eq!(merged, expected);
1726    }
1727
1728    #[test]
1729    fn test_byte_width_opt() {
1730        assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1731        assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1732        assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1733        assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1734        assert_eq!(DataType::Utf8.byte_width_opt(), None);
1735        assert_eq!(DataType::Binary.byte_width_opt(), None);
1736        assert_eq!(
1737            DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1738            None
1739        );
1740        assert_eq!(
1741            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1742                .byte_width_opt(),
1743            Some(12)
1744        );
1745        assert_eq!(
1746            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1747                .byte_width_opt(),
1748            Some(16)
1749        );
1750        assert_eq!(
1751            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1752                .byte_width_opt(),
1753            None
1754        );
1755    }
1756
1757    #[test]
1758    fn test_take_record_batch() {
1759        let schema = Arc::new(Schema::new(vec![
1760            Field::new("a", DataType::Int32, true),
1761            Field::new("b", DataType::Utf8, true),
1762        ]));
1763        let batch = RecordBatch::try_new(
1764            schema.clone(),
1765            vec![
1766                Arc::new(Int32Array::from_iter_values(0..20)),
1767                Arc::new(StringArray::from_iter_values(
1768                    (0..20).map(|i| format!("str-{}", i)),
1769                )),
1770            ],
1771        )
1772        .unwrap();
1773        let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1774        assert_eq!(
1775            taken,
1776            RecordBatch::try_new(
1777                schema,
1778                vec![
1779                    Arc::new(Int32Array::from(vec![1, 5, 10])),
1780                    Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1781                ],
1782            )
1783            .unwrap()
1784        )
1785    }
1786
1787    #[test]
1788    fn test_schema_project_by_schema() {
1789        let metadata = [("key".to_string(), "value".to_string())];
1790        let schema = Arc::new(
1791            Schema::new(vec![
1792                Field::new("a", DataType::Int32, true),
1793                Field::new("b", DataType::Utf8, true),
1794            ])
1795            .with_metadata(metadata.clone().into()),
1796        );
1797        let batch = RecordBatch::try_new(
1798            schema,
1799            vec![
1800                Arc::new(Int32Array::from_iter_values(0..20)),
1801                Arc::new(StringArray::from_iter_values(
1802                    (0..20).map(|i| format!("str-{}", i)),
1803                )),
1804            ],
1805        )
1806        .unwrap();
1807
1808        // Empty schema
1809        let empty_schema = Schema::empty();
1810        let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1811        let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1812        assert_eq!(
1813            empty_projected,
1814            RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1815                .with_schema(Arc::new(expected_schema))
1816                .unwrap()
1817        );
1818
1819        // Re-ordered schema
1820        let reordered_schema = Schema::new(vec![
1821            Field::new("b", DataType::Utf8, true),
1822            Field::new("a", DataType::Int32, true),
1823        ]);
1824        let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1825        let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1826        assert_eq!(
1827            reordered_projected,
1828            RecordBatch::try_new(
1829                expected_schema,
1830                vec![
1831                    Arc::new(StringArray::from_iter_values(
1832                        (0..20).map(|i| format!("str-{}", i)),
1833                    )),
1834                    Arc::new(Int32Array::from_iter_values(0..20)),
1835                ],
1836            )
1837            .unwrap()
1838        );
1839
1840        // Sub schema
1841        let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1842        let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1843        let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1844        assert_eq!(
1845            sub_projected,
1846            RecordBatch::try_new(
1847                expected_schema,
1848                vec![Arc::new(Int32Array::from_iter_values(0..20))],
1849            )
1850            .unwrap()
1851        );
1852    }
1853
1854    #[test]
1855    fn test_project_preserves_struct_validity() {
1856        // Test that projecting a struct array preserves its validity (fix for issue #4385)
1857        let fields = Fields::from(vec![
1858            Field::new("id", DataType::Int32, false),
1859            Field::new("value", DataType::Float32, true),
1860        ]);
1861
1862        // Create a struct array with validity
1863        let id_array = Int32Array::from(vec![1, 2, 3]);
1864        let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1865        let struct_array = StructArray::new(
1866            fields.clone(),
1867            vec![
1868                Arc::new(id_array) as ArrayRef,
1869                Arc::new(value_array) as ArrayRef,
1870            ],
1871            Some(vec![true, false, true].into()), // Second struct is null
1872        );
1873
1874        // Project the struct array
1875        let projected = project(&struct_array, &fields).unwrap();
1876
1877        // Verify the validity is preserved
1878        assert_eq!(projected.null_count(), 1);
1879        assert!(!projected.is_null(0));
1880        assert!(projected.is_null(1));
1881        assert!(!projected.is_null(2));
1882    }
1883
1884    #[test]
1885    fn test_merge_struct_with_different_validity() {
1886        // Test case from Weston's review comment
1887        // File 1 has height field with some nulls
1888        let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1889        let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1890        let left_struct = StructArray::new(
1891            left_fields,
1892            vec![Arc::new(height_array) as ArrayRef],
1893            Some(vec![true, false, true, false].into()), // Rows 2 and 4 are null structs
1894        );
1895
1896        // File 2 has width field with some nulls
1897        let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1898        let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1899        let right_struct = StructArray::new(
1900            right_fields,
1901            vec![Arc::new(width_array) as ArrayRef],
1902            Some(vec![true, true, false, false].into()), // Rows 3 and 4 are null structs
1903        );
1904
1905        // Merge the two structs
1906        let merged = merge(&left_struct, &right_struct);
1907
1908        // Expected:
1909        // Row 1: both non-null -> {width: 300, height: 500}
1910        // Row 2: left null, right non-null -> {width: 200, height: null}
1911        // Row 3: left non-null, right null -> {width: null, height: 600}
1912        // Row 4: both null -> null struct
1913
1914        assert_eq!(merged.null_count(), 1); // Only row 4 is null
1915        assert!(!merged.is_null(0));
1916        assert!(!merged.is_null(1));
1917        assert!(!merged.is_null(2));
1918        assert!(merged.is_null(3));
1919
1920        // Check field values
1921        let height_col = merged.column_by_name("height").unwrap();
1922        let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1923        assert_eq!(height_values.value(0), 500);
1924        assert!(height_values.is_null(1)); // height is null when left struct was null
1925        assert_eq!(height_values.value(2), 600);
1926
1927        let width_col = merged.column_by_name("width").unwrap();
1928        let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1929        assert_eq!(width_values.value(0), 300);
1930        assert_eq!(width_values.value(1), 200);
1931        assert!(width_values.is_null(2)); // width is null when right struct was null
1932    }
1933
1934    #[test]
1935    fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
1936        // left_list setup
1937        let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
1938        let left_count = Arc::new(Int32Array::from(vec![None, None]));
1939        let left_struct = Arc::new(StructArray::new(
1940            Fields::from(vec![
1941                Field::new("company_id", DataType::Int32, true),
1942                Field::new("count", DataType::Int32, true),
1943            ]),
1944            vec![left_company_id, left_count],
1945            None,
1946        ));
1947        let left_list = Arc::new(ListArray::new(
1948            Arc::new(Field::new(
1949                "item",
1950                DataType::Struct(left_struct.fields().clone()),
1951                true,
1952            )),
1953            OffsetBuffer::from_lengths([2]),
1954            left_struct,
1955            None,
1956        ));
1957
1958        // Right List Setup
1959        let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
1960        let right_struct = Arc::new(StructArray::new(
1961            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
1962            vec![right_company_name],
1963            None,
1964        ));
1965        let right_list = Arc::new(ListArray::new(
1966            Arc::new(Field::new(
1967                "item",
1968                DataType::Struct(right_struct.fields().clone()),
1969                true,
1970            )),
1971            OffsetBuffer::from_lengths([2]),
1972            right_struct,
1973            None,
1974        ));
1975
1976        let target_fields = Fields::from(vec![Field::new(
1977            "companies",
1978            DataType::List(Arc::new(Field::new(
1979                "item",
1980                DataType::Struct(Fields::from(vec![
1981                    Field::new("company_id", DataType::Int32, true),
1982                    Field::new("company_name", DataType::Utf8, true),
1983                    Field::new("count", DataType::Int32, true),
1984                ])),
1985                true,
1986            ))),
1987            true,
1988        )]);
1989
1990        let left_batch = RecordBatch::try_new(
1991            Arc::new(Schema::new(vec![Field::new(
1992                "companies",
1993                left_list.data_type().clone(),
1994                true,
1995            )])),
1996            vec![left_list as ArrayRef],
1997        )
1998        .unwrap();
1999
2000        let right_batch = RecordBatch::try_new(
2001            Arc::new(Schema::new(vec![Field::new(
2002                "companies",
2003                right_list.data_type().clone(),
2004                true,
2005            )])),
2006            vec![right_list as ArrayRef],
2007        )
2008        .unwrap();
2009
2010        let merged = left_batch
2011            .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2012            .unwrap();
2013
2014        // Verify the merged structure
2015        let merged_list = merged
2016            .column_by_name("companies")
2017            .unwrap()
2018            .as_any()
2019            .downcast_ref::<ListArray>()
2020            .unwrap();
2021        let merged_struct = merged_list.values().as_struct();
2022
2023        // Should have all 3 fields
2024        assert_eq!(merged_struct.num_columns(), 3);
2025        assert!(merged_struct.column_by_name("company_id").is_some());
2026        assert!(merged_struct.column_by_name("company_name").is_some());
2027        assert!(merged_struct.column_by_name("count").is_some());
2028
2029        // Verify values
2030        let company_id = merged_struct
2031            .column_by_name("company_id")
2032            .unwrap()
2033            .as_any()
2034            .downcast_ref::<Int32Array>()
2035            .unwrap();
2036        assert!(company_id.is_null(0));
2037        assert!(company_id.is_null(1));
2038
2039        let company_name = merged_struct
2040            .column_by_name("company_name")
2041            .unwrap()
2042            .as_any()
2043            .downcast_ref::<StringArray>()
2044            .unwrap();
2045        assert_eq!(company_name.value(0), "Google");
2046        assert_eq!(company_name.value(1), "Microsoft");
2047
2048        let count = merged_struct
2049            .column_by_name("count")
2050            .unwrap()
2051            .as_any()
2052            .downcast_ref::<Int32Array>()
2053            .unwrap();
2054        assert!(count.is_null(0));
2055        assert!(count.is_null(1));
2056    }
2057
2058    #[test]
2059    fn test_merge_struct_lists() {
2060        test_merge_struct_lists_generic::<i32>();
2061    }
2062
2063    #[test]
2064    fn test_merge_struct_large_lists() {
2065        test_merge_struct_lists_generic::<i64>();
2066    }
2067
2068    fn test_merge_struct_lists_generic<O: OffsetSizeTrait>() {
2069        // left_list setup
2070        let left_company_id = Arc::new(Int32Array::from(vec![
2071            Some(1),
2072            Some(2),
2073            Some(3),
2074            Some(4),
2075            Some(5),
2076            Some(6),
2077            Some(7),
2078            Some(8),
2079            Some(9),
2080            Some(10),
2081            Some(11),
2082            Some(12),
2083            Some(13),
2084            Some(14),
2085            Some(15),
2086            Some(16),
2087            Some(17),
2088            Some(18),
2089            Some(19),
2090            Some(20),
2091        ]));
2092        let left_count = Arc::new(Int32Array::from(vec![
2093            Some(10),
2094            Some(20),
2095            Some(30),
2096            Some(40),
2097            Some(50),
2098            Some(60),
2099            Some(70),
2100            Some(80),
2101            Some(90),
2102            Some(100),
2103            Some(110),
2104            Some(120),
2105            Some(130),
2106            Some(140),
2107            Some(150),
2108            Some(160),
2109            Some(170),
2110            Some(180),
2111            Some(190),
2112            Some(200),
2113        ]));
2114        let left_struct = Arc::new(StructArray::new(
2115            Fields::from(vec![
2116                Field::new("company_id", DataType::Int32, true),
2117                Field::new("count", DataType::Int32, true),
2118            ]),
2119            vec![left_company_id, left_count],
2120            None,
2121        ));
2122
2123        let left_list = Arc::new(GenericListArray::<O>::new(
2124            Arc::new(Field::new(
2125                "item",
2126                DataType::Struct(left_struct.fields().clone()),
2127                true,
2128            )),
2129            OffsetBuffer::from_lengths([3, 1]),
2130            left_struct.clone(),
2131            None,
2132        ));
2133
2134        let left_list_struct = Arc::new(StructArray::new(
2135            Fields::from(vec![Field::new(
2136                "companies",
2137                if O::IS_LARGE {
2138                    DataType::LargeList(Arc::new(Field::new(
2139                        "item",
2140                        DataType::Struct(left_struct.fields().clone()),
2141                        true,
2142                    )))
2143                } else {
2144                    DataType::List(Arc::new(Field::new(
2145                        "item",
2146                        DataType::Struct(left_struct.fields().clone()),
2147                        true,
2148                    )))
2149                },
2150                true,
2151            )]),
2152            vec![left_list as ArrayRef],
2153            None,
2154        ));
2155
2156        // right_list setup
2157        let right_company_name = Arc::new(StringArray::from(vec![
2158            "Google",
2159            "Microsoft",
2160            "Apple",
2161            "Facebook",
2162        ]));
2163        let right_struct = Arc::new(StructArray::new(
2164            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2165            vec![right_company_name],
2166            None,
2167        ));
2168        let right_list = Arc::new(GenericListArray::<O>::new(
2169            Arc::new(Field::new(
2170                "item",
2171                DataType::Struct(right_struct.fields().clone()),
2172                true,
2173            )),
2174            OffsetBuffer::from_lengths([3, 1]),
2175            right_struct.clone(),
2176            None,
2177        ));
2178
2179        let right_list_struct = Arc::new(StructArray::new(
2180            Fields::from(vec![Field::new(
2181                "companies",
2182                if O::IS_LARGE {
2183                    DataType::LargeList(Arc::new(Field::new(
2184                        "item",
2185                        DataType::Struct(right_struct.fields().clone()),
2186                        true,
2187                    )))
2188                } else {
2189                    DataType::List(Arc::new(Field::new(
2190                        "item",
2191                        DataType::Struct(right_struct.fields().clone()),
2192                        true,
2193                    )))
2194                },
2195                true,
2196            )]),
2197            vec![right_list as ArrayRef],
2198            None,
2199        ));
2200
2201        // prepare schema
2202        let target_fields = Fields::from(vec![Field::new(
2203            "companies",
2204            if O::IS_LARGE {
2205                DataType::LargeList(Arc::new(Field::new(
2206                    "item",
2207                    DataType::Struct(Fields::from(vec![
2208                        Field::new("company_id", DataType::Int32, true),
2209                        Field::new("company_name", DataType::Utf8, true),
2210                        Field::new("count", DataType::Int32, true),
2211                    ])),
2212                    true,
2213                )))
2214            } else {
2215                DataType::List(Arc::new(Field::new(
2216                    "item",
2217                    DataType::Struct(Fields::from(vec![
2218                        Field::new("company_id", DataType::Int32, true),
2219                        Field::new("company_name", DataType::Utf8, true),
2220                        Field::new("count", DataType::Int32, true),
2221                    ])),
2222                    true,
2223                )))
2224            },
2225            true,
2226        )]);
2227
2228        // merge left_list and right_list
2229        let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
2230        assert_eq!(merged_array.len(), 2);
2231    }
2232}