lance_arrow/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend Arrow Functionality
5//!
6//! To improve Arrow-RS ergonomic
7
8use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12    cast::AsArray, Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray,
13    GenericListArray, LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
14    StructArray, UInt32Array, UInt8Array,
15};
16use arrow_array::{
17    new_null_array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30pub use floats::*;
31pub mod cast;
32pub mod json;
33pub mod list;
34pub mod memory;
35pub mod r#struct;
36
37/// Arrow extension metadata key for extension name
38pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
39
40/// Arrow extension metadata key for extension metadata  
41pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
42
43/// Key used by lance to mark a field as a blob
44/// TODO: Use Arrow extension mechanism instead?
45pub const BLOB_META_KEY: &str = "lance-encoding:blob";
46
47type Result<T> = std::result::Result<T, ArrowError>;
48
49pub trait DataTypeExt {
50    /// Returns true if the data type is binary-like, such as (Large)Utf8 and (Large)Binary.
51    ///
52    /// ```
53    /// use lance_arrow::*;
54    /// use arrow_schema::DataType;
55    ///
56    /// assert!(DataType::Utf8.is_binary_like());
57    /// assert!(DataType::Binary.is_binary_like());
58    /// assert!(DataType::LargeUtf8.is_binary_like());
59    /// assert!(DataType::LargeBinary.is_binary_like());
60    /// assert!(!DataType::Int32.is_binary_like());
61    /// ```
62    fn is_binary_like(&self) -> bool;
63
64    /// Returns true if the data type is a struct.
65    fn is_struct(&self) -> bool;
66
67    /// Check whether the given Arrow DataType is fixed stride.
68    ///
69    /// A fixed stride type has the same byte width for all array elements
70    /// This includes all PrimitiveType's Boolean, FixedSizeList, FixedSizeBinary, and Decimals
71    fn is_fixed_stride(&self) -> bool;
72
73    /// Returns true if the [DataType] is a dictionary type.
74    fn is_dictionary(&self) -> bool;
75
76    /// Returns the byte width of the data type
77    /// Panics if the data type is not fixed stride.
78    fn byte_width(&self) -> usize;
79
80    /// Returns the byte width of the data type, if it is fixed stride.
81    /// Returns None if the data type is not fixed stride.
82    fn byte_width_opt(&self) -> Option<usize>;
83}
84
85impl DataTypeExt for DataType {
86    fn is_binary_like(&self) -> bool {
87        use DataType::*;
88        matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
89    }
90
91    fn is_struct(&self) -> bool {
92        matches!(self, Self::Struct(_))
93    }
94
95    fn is_fixed_stride(&self) -> bool {
96        use DataType::*;
97        matches!(
98            self,
99            Boolean
100                | UInt8
101                | UInt16
102                | UInt32
103                | UInt64
104                | Int8
105                | Int16
106                | Int32
107                | Int64
108                | Float16
109                | Float32
110                | Float64
111                | Decimal128(_, _)
112                | Decimal256(_, _)
113                | FixedSizeList(_, _)
114                | FixedSizeBinary(_)
115                | Duration(_)
116                | Timestamp(_, _)
117                | Date32
118                | Date64
119                | Time32(_)
120                | Time64(_)
121        )
122    }
123
124    fn is_dictionary(&self) -> bool {
125        matches!(self, Self::Dictionary(_, _))
126    }
127
128    fn byte_width_opt(&self) -> Option<usize> {
129        match self {
130            Self::Int8 => Some(1),
131            Self::Int16 => Some(2),
132            Self::Int32 => Some(4),
133            Self::Int64 => Some(8),
134            Self::UInt8 => Some(1),
135            Self::UInt16 => Some(2),
136            Self::UInt32 => Some(4),
137            Self::UInt64 => Some(8),
138            Self::Float16 => Some(2),
139            Self::Float32 => Some(4),
140            Self::Float64 => Some(8),
141            Self::Date32 => Some(4),
142            Self::Date64 => Some(8),
143            Self::Time32(_) => Some(4),
144            Self::Time64(_) => Some(8),
145            Self::Timestamp(_, _) => Some(8),
146            Self::Duration(_) => Some(8),
147            Self::Decimal128(_, _) => Some(16),
148            Self::Decimal256(_, _) => Some(32),
149            Self::Interval(unit) => match unit {
150                IntervalUnit::YearMonth => Some(4),
151                IntervalUnit::DayTime => Some(8),
152                IntervalUnit::MonthDayNano => Some(16),
153            },
154            Self::FixedSizeBinary(s) => Some(*s as usize),
155            Self::FixedSizeList(dt, s) => dt
156                .data_type()
157                .byte_width_opt()
158                .map(|width| width * *s as usize),
159            _ => None,
160        }
161    }
162
163    fn byte_width(&self) -> usize {
164        self.byte_width_opt()
165            .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
166    }
167}
168
169/// Create an [`GenericListArray`] from values and offsets.
170///
171/// ```
172/// use arrow_array::{Int32Array, Int64Array, ListArray};
173/// use arrow_array::types::Int64Type;
174/// use lance_arrow::try_new_generic_list_array;
175///
176/// let offsets = Int32Array::from_iter([0, 2, 7, 10]);
177/// let int_values = Int64Array::from_iter(0..10);
178/// let list_arr = try_new_generic_list_array(int_values, &offsets).unwrap();
179/// assert_eq!(list_arr,
180///     ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
181///         Some(vec![Some(0), Some(1)]),
182///         Some(vec![Some(2), Some(3), Some(4), Some(5), Some(6)]),
183///         Some(vec![Some(7), Some(8), Some(9)]),
184/// ]))
185/// ```
186pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
187    values: T,
188    offsets: &PrimitiveArray<Offset>,
189) -> Result<GenericListArray<Offset::Native>>
190where
191    Offset::Native: OffsetSizeTrait,
192{
193    let data_type = if Offset::Native::IS_LARGE {
194        DataType::LargeList(Arc::new(Field::new(
195            "item",
196            values.data_type().clone(),
197            true,
198        )))
199    } else {
200        DataType::List(Arc::new(Field::new(
201            "item",
202            values.data_type().clone(),
203            true,
204        )))
205    };
206    let data = ArrayDataBuilder::new(data_type)
207        .len(offsets.len() - 1)
208        .add_buffer(offsets.into_data().buffers()[0].clone())
209        .add_child_data(values.into_data())
210        .build()?;
211
212    Ok(GenericListArray::from(data))
213}
214
215pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
216    DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
217}
218
219pub trait FixedSizeListArrayExt {
220    /// Create an [`FixedSizeListArray`] from values and list size.
221    ///
222    /// ```
223    /// use arrow_array::{Int64Array, FixedSizeListArray};
224    /// use arrow_array::types::Int64Type;
225    /// use lance_arrow::FixedSizeListArrayExt;
226    ///
227    /// let int_values = Int64Array::from_iter(0..10);
228    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 2).unwrap();
229    /// assert_eq!(fixed_size_list_arr,
230    ///     FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(vec![
231    ///         Some(vec![Some(0), Some(1)]),
232    ///         Some(vec![Some(2), Some(3)]),
233    ///         Some(vec![Some(4), Some(5)]),
234    ///         Some(vec![Some(6), Some(7)]),
235    ///         Some(vec![Some(8), Some(9)])
236    /// ], 2))
237    /// ```
238    fn try_new_from_values<T: Array + 'static>(
239        values: T,
240        list_size: i32,
241    ) -> Result<FixedSizeListArray>;
242
243    /// Sample `n` rows from the [FixedSizeListArray]
244    ///
245    /// ```
246    /// use arrow_array::{Int64Array, FixedSizeListArray, Array};
247    /// use lance_arrow::FixedSizeListArrayExt;
248    ///
249    /// let int_values = Int64Array::from_iter(0..256);
250    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 16).unwrap();
251    /// let sampled = fixed_size_list_arr.sample(10).unwrap();
252    /// assert_eq!(sampled.len(), 10);
253    /// assert_eq!(sampled.value_length(), 16);
254    /// assert_eq!(sampled.values().len(), 160);
255    /// ```
256    fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
257
258    /// Ensure the [FixedSizeListArray] of Float16, Float32, Float64,
259    /// Int8, Int16, Int32, Int64, UInt8, UInt32 type to its closest floating point type.
260    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
261}
262
263impl FixedSizeListArrayExt for FixedSizeListArray {
264    fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
265        let field = Arc::new(Field::new("item", values.data_type().clone(), true));
266        let values = Arc::new(values);
267
268        Self::try_new(field, list_size, values, None)
269    }
270
271    fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
272        if n >= self.len() {
273            return Ok(self.clone());
274        }
275        let mut rng = SmallRng::from_os_rng();
276        let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
277        take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
278    }
279
280    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
281        match self.data_type() {
282            DataType::FixedSizeList(field, size) => match field.data_type() {
283                DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
284                DataType::Int8 => Ok(Self::new(
285                    Arc::new(arrow_schema::Field::new(
286                        field.name(),
287                        DataType::Float32,
288                        field.is_nullable(),
289                    )),
290                    *size,
291                    Arc::new(Float32Array::from_iter_values(
292                        self.values()
293                            .as_any()
294                            .downcast_ref::<Int8Array>()
295                            .ok_or(ArrowError::ParseError(
296                                "Fail to cast primitive array to Int8Type".to_string(),
297                            ))?
298                            .into_iter()
299                            .filter_map(|x| x.map(|y| y as f32)),
300                    )),
301                    self.nulls().cloned(),
302                )),
303                DataType::Int16 => Ok(Self::new(
304                    Arc::new(arrow_schema::Field::new(
305                        field.name(),
306                        DataType::Float32,
307                        field.is_nullable(),
308                    )),
309                    *size,
310                    Arc::new(Float32Array::from_iter_values(
311                        self.values()
312                            .as_any()
313                            .downcast_ref::<Int16Array>()
314                            .ok_or(ArrowError::ParseError(
315                                "Fail to cast primitive array to Int8Type".to_string(),
316                            ))?
317                            .into_iter()
318                            .filter_map(|x| x.map(|y| y as f32)),
319                    )),
320                    self.nulls().cloned(),
321                )),
322                DataType::Int32 => Ok(Self::new(
323                    Arc::new(arrow_schema::Field::new(
324                        field.name(),
325                        DataType::Float32,
326                        field.is_nullable(),
327                    )),
328                    *size,
329                    Arc::new(Float32Array::from_iter_values(
330                        self.values()
331                            .as_any()
332                            .downcast_ref::<Int32Array>()
333                            .ok_or(ArrowError::ParseError(
334                                "Fail to cast primitive array to Int8Type".to_string(),
335                            ))?
336                            .into_iter()
337                            .filter_map(|x| x.map(|y| y as f32)),
338                    )),
339                    self.nulls().cloned(),
340                )),
341                DataType::Int64 => Ok(Self::new(
342                    Arc::new(arrow_schema::Field::new(
343                        field.name(),
344                        DataType::Float64,
345                        field.is_nullable(),
346                    )),
347                    *size,
348                    Arc::new(Float64Array::from_iter_values(
349                        self.values()
350                            .as_any()
351                            .downcast_ref::<Int64Array>()
352                            .ok_or(ArrowError::ParseError(
353                                "Fail to cast primitive array to Int8Type".to_string(),
354                            ))?
355                            .into_iter()
356                            .filter_map(|x| x.map(|y| y as f64)),
357                    )),
358                    self.nulls().cloned(),
359                )),
360                DataType::UInt8 => Ok(Self::new(
361                    Arc::new(arrow_schema::Field::new(
362                        field.name(),
363                        DataType::Float64,
364                        field.is_nullable(),
365                    )),
366                    *size,
367                    Arc::new(Float64Array::from_iter_values(
368                        self.values()
369                            .as_any()
370                            .downcast_ref::<UInt8Array>()
371                            .ok_or(ArrowError::ParseError(
372                                "Fail to cast primitive array to Int8Type".to_string(),
373                            ))?
374                            .into_iter()
375                            .filter_map(|x| x.map(|y| y as f64)),
376                    )),
377                    self.nulls().cloned(),
378                )),
379                DataType::UInt32 => Ok(Self::new(
380                    Arc::new(arrow_schema::Field::new(
381                        field.name(),
382                        DataType::Float64,
383                        field.is_nullable(),
384                    )),
385                    *size,
386                    Arc::new(Float64Array::from_iter_values(
387                        self.values()
388                            .as_any()
389                            .downcast_ref::<UInt32Array>()
390                            .ok_or(ArrowError::ParseError(
391                                "Fail to cast primitive array to Int8Type".to_string(),
392                            ))?
393                            .into_iter()
394                            .filter_map(|x| x.map(|y| y as f64)),
395                    )),
396                    self.nulls().cloned(),
397                )),
398                data_type => Err(ArrowError::ParseError(format!(
399                    "Expect either floating type or integer got {:?}",
400                    data_type
401                ))),
402            },
403            data_type => Err(ArrowError::ParseError(format!(
404                "Expect either FixedSizeList got {:?}",
405                data_type
406            ))),
407        }
408    }
409}
410
411/// Force downcast of an [`Array`], such as an [`ArrayRef`], to
412/// [`FixedSizeListArray`], panic'ing on failure.
413pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
414    arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
415}
416
417pub trait FixedSizeBinaryArrayExt {
418    /// Create an [`FixedSizeBinaryArray`] from values and stride.
419    ///
420    /// ```
421    /// use arrow_array::{UInt8Array, FixedSizeBinaryArray};
422    /// use arrow_array::types::UInt8Type;
423    /// use lance_arrow::FixedSizeBinaryArrayExt;
424    ///
425    /// let int_values = UInt8Array::from_iter(0..10);
426    /// let fixed_size_list_arr = FixedSizeBinaryArray::try_new_from_values(&int_values, 2).unwrap();
427    /// assert_eq!(fixed_size_list_arr,
428    ///     FixedSizeBinaryArray::from(vec![
429    ///         Some(vec![0, 1].as_slice()),
430    ///         Some(vec![2, 3].as_slice()),
431    ///         Some(vec![4, 5].as_slice()),
432    ///         Some(vec![6, 7].as_slice()),
433    ///         Some(vec![8, 9].as_slice())
434    /// ]))
435    /// ```
436    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
437}
438
439impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
440    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
441        let data_type = DataType::FixedSizeBinary(stride);
442        let data = ArrayDataBuilder::new(data_type)
443            .len(values.len() / stride as usize)
444            .add_buffer(values.into_data().buffers()[0].clone())
445            .build()?;
446        Ok(Self::from(data))
447    }
448}
449
450pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
451    arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
452}
453
454pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
455    match arr.data_type() {
456        DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
457        DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
458        _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
459    }
460}
461
462/// Extends Arrow's [RecordBatch].
463pub trait RecordBatchExt {
464    /// Append a new column to this [`RecordBatch`] and returns a new RecordBatch.
465    ///
466    /// ```
467    /// use std::sync::Arc;
468    /// use arrow_array::{RecordBatch, Int32Array, StringArray};
469    /// use arrow_schema::{Schema, Field, DataType};
470    /// use lance_arrow::*;
471    ///
472    /// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
473    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
474    /// let record_batch = RecordBatch::try_new(schema, vec![int_arr.clone()]).unwrap();
475    ///
476    /// let new_field = Field::new("s", DataType::Utf8, true);
477    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
478    /// let new_record_batch = record_batch.try_with_column(new_field, str_arr.clone()).unwrap();
479    ///
480    /// assert_eq!(
481    ///     new_record_batch,
482    ///     RecordBatch::try_new(
483    ///         Arc::new(Schema::new(
484    ///             vec![
485    ///                 Field::new("a", DataType::Int32, true),
486    ///                 Field::new("s", DataType::Utf8, true)
487    ///             ])
488    ///         ),
489    ///         vec![int_arr, str_arr],
490    ///     ).unwrap()
491    /// )
492    /// ```
493    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
494
495    /// Created a new RecordBatch with column at index.
496    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
497
498    /// Creates a new [`RecordBatch`] from the provided  [`StructArray`].
499    ///
500    /// The fields on the [`StructArray`] need to match this [`RecordBatch`] schema
501    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
502
503    /// Merge with another [`RecordBatch`] and returns a new one.
504    ///
505    /// Fields are merged based on name.  First we iterate the left columns.  If a matching
506    /// name is found in the right then we merge the two columns.  If there is no match then
507    /// we add the left column to the output.
508    ///
509    /// To merge two columns we consider the type.  If both arrays are struct arrays we recurse.
510    /// Otherwise we use the left array.
511    ///
512    /// Afterwards we add all non-matching right columns to the output.
513    ///
514    /// Note: This method likely does not handle nested fields correctly and you may want to consider
515    /// using [`merge_with_schema`] instead.
516    /// ```
517    /// use std::sync::Arc;
518    /// use arrow_array::*;
519    /// use arrow_schema::{Schema, Field, DataType};
520    /// use lance_arrow::*;
521    ///
522    /// let left_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
523    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
524    /// let left = RecordBatch::try_new(left_schema, vec![int_arr.clone()]).unwrap();
525    ///
526    /// let right_schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
527    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
528    /// let right = RecordBatch::try_new(right_schema, vec![str_arr.clone()]).unwrap();
529    ///
530    /// let new_record_batch = left.merge(&right).unwrap();
531    ///
532    /// assert_eq!(
533    ///     new_record_batch,
534    ///     RecordBatch::try_new(
535    ///         Arc::new(Schema::new(
536    ///             vec![
537    ///                 Field::new("a", DataType::Int32, true),
538    ///                 Field::new("s", DataType::Utf8, true)
539    ///             ])
540    ///         ),
541    ///         vec![int_arr, str_arr],
542    ///     ).unwrap()
543    /// )
544    /// ```
545    ///
546    /// TODO: add merge nested fields support.
547    fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
548
549    /// Create a batch by merging columns between two batches with a given schema.
550    ///
551    /// A reference schema is used to determine the proper ordering of nested fields.
552    ///
553    /// For each field in the reference schema we look for corresponding fields in
554    /// the left and right batches.  If a field is found in both batches we recursively merge
555    /// it.
556    ///
557    /// If a field is only in the left or right batch we take it as it is.
558    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
559
560    /// Drop one column specified with the name and return the new [`RecordBatch`].
561    ///
562    /// If the named column does not exist, it returns a copy of this [`RecordBatch`].
563    fn drop_column(&self, name: &str) -> Result<RecordBatch>;
564
565    /// Replace a column (specified by name) and return the new [`RecordBatch`].
566    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
567
568    /// Replace a column schema (specified by name) and return the new [`RecordBatch`].
569    fn replace_column_schema_by_name(
570        &self,
571        name: &str,
572        new_data_type: DataType,
573        column: Arc<dyn Array>,
574    ) -> Result<RecordBatch>;
575
576    /// Rename a column at a given index.
577    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
578
579    /// Get (potentially nested) column by qualified name.
580    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
581
582    /// Project the schema over the [RecordBatch].
583    fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
584
585    /// metadata of the schema.
586    fn metadata(&self) -> &HashMap<String, String>;
587
588    /// Add metadata to the schema.
589    fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
590        let mut metadata = self.metadata().clone();
591        metadata.insert(key, value);
592        self.with_metadata(metadata)
593    }
594
595    /// Replace the schema metadata with the provided one.
596    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
597
598    /// Take selected rows from the [RecordBatch].
599    fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
600
601    /// Create a new RecordBatch with compacted memory after slicing.
602    fn shrink_to_fit(&self) -> Result<RecordBatch>;
603}
604
605impl RecordBatchExt for RecordBatch {
606    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
607        let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
608        let mut new_columns = self.columns().to_vec();
609        new_columns.push(arr);
610        Self::try_new(new_schema, new_columns)
611    }
612
613    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
614        let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
615        let mut new_columns = self.columns().to_vec();
616        new_columns.insert(index, arr);
617        Self::try_new(new_schema, new_columns)
618    }
619
620    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
621        let schema = Arc::new(Schema::new_with_metadata(
622            arr.fields().to_vec(),
623            self.schema().metadata.clone(),
624        ));
625        let batch = Self::from(arr);
626        batch.with_schema(schema)
627    }
628
629    fn merge(&self, other: &Self) -> Result<Self> {
630        if self.num_rows() != other.num_rows() {
631            return Err(ArrowError::InvalidArgumentError(format!(
632                "Attempt to merge two RecordBatch with different sizes: {} != {}",
633                self.num_rows(),
634                other.num_rows()
635            )));
636        }
637        let left_struct_array: StructArray = self.clone().into();
638        let right_struct_array: StructArray = other.clone().into();
639        self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
640    }
641
642    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
643        if self.num_rows() != other.num_rows() {
644            return Err(ArrowError::InvalidArgumentError(format!(
645                "Attempt to merge two RecordBatch with different sizes: {} != {}",
646                self.num_rows(),
647                other.num_rows()
648            )));
649        }
650        let left_struct_array: StructArray = self.clone().into();
651        let right_struct_array: StructArray = other.clone().into();
652        self.try_new_from_struct_array(merge_with_schema(
653            &left_struct_array,
654            &right_struct_array,
655            schema.fields(),
656        ))
657    }
658
659    fn drop_column(&self, name: &str) -> Result<Self> {
660        let mut fields = vec![];
661        let mut columns = vec![];
662        for i in 0..self.schema().fields.len() {
663            if self.schema().field(i).name() != name {
664                fields.push(self.schema().field(i).clone());
665                columns.push(self.column(i).clone());
666            }
667        }
668        Self::try_new(
669            Arc::new(Schema::new_with_metadata(
670                fields,
671                self.schema().metadata().clone(),
672            )),
673            columns,
674        )
675    }
676
677    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
678        let mut fields = self.schema().fields().to_vec();
679        if index >= fields.len() {
680            return Err(ArrowError::InvalidArgumentError(format!(
681                "Index out of bounds: {}",
682                index
683            )));
684        }
685        fields[index] = Arc::new(Field::new(
686            new_name,
687            fields[index].data_type().clone(),
688            fields[index].is_nullable(),
689        ));
690        Self::try_new(
691            Arc::new(Schema::new_with_metadata(
692                fields,
693                self.schema().metadata().clone(),
694            )),
695            self.columns().to_vec(),
696        )
697    }
698
699    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
700        let mut columns = self.columns().to_vec();
701        let field_i = self
702            .schema()
703            .fields()
704            .iter()
705            .position(|f| f.name() == name)
706            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
707        columns[field_i] = column;
708        Self::try_new(self.schema(), columns)
709    }
710
711    fn replace_column_schema_by_name(
712        &self,
713        name: &str,
714        new_data_type: DataType,
715        column: Arc<dyn Array>,
716    ) -> Result<RecordBatch> {
717        let fields = self
718            .schema()
719            .fields()
720            .iter()
721            .map(|x| {
722                if x.name() != name {
723                    x.clone()
724                } else {
725                    let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
726                    Arc::new(new_field)
727                }
728            })
729            .collect::<Vec<_>>();
730        let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
731        let mut columns = self.columns().to_vec();
732        let field_i = self
733            .schema()
734            .fields()
735            .iter()
736            .position(|f| f.name() == name)
737            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
738        columns[field_i] = column;
739        Self::try_new(Arc::new(schema), columns)
740    }
741
742    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
743        let split = name.split('.').collect::<Vec<_>>();
744        if split.is_empty() {
745            return None;
746        }
747
748        self.column_by_name(split[0])
749            .and_then(|arr| get_sub_array(arr, &split[1..]))
750    }
751
752    fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
753        let struct_array: StructArray = self.clone().into();
754        self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
755    }
756
757    fn metadata(&self) -> &HashMap<String, String> {
758        self.schema_ref().metadata()
759    }
760
761    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
762        let mut schema = self.schema_ref().as_ref().clone();
763        schema.metadata = metadata;
764        Self::try_new(schema.into(), self.columns().into())
765    }
766
767    fn take(&self, indices: &UInt32Array) -> Result<Self> {
768        let struct_array: StructArray = self.clone().into();
769        let taken = take(&struct_array, indices, None)?;
770        self.try_new_from_struct_array(taken.as_struct().clone())
771    }
772
773    fn shrink_to_fit(&self) -> Result<Self> {
774        // Deep copy the sliced record batch, instead of whole batch
775        crate::deepcopy::deep_copy_batch_sliced(self)
776    }
777}
778
779fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
780    if fields.is_empty() {
781        return Ok(StructArray::new_empty_fields(
782            struct_array.len(),
783            struct_array.nulls().cloned(),
784        ));
785    }
786    let mut columns: Vec<ArrayRef> = vec![];
787    for field in fields.iter() {
788        if let Some(col) = struct_array.column_by_name(field.name()) {
789            match field.data_type() {
790                // TODO handle list-of-struct
791                DataType::Struct(subfields) => {
792                    let projected = project(col.as_struct(), subfields)?;
793                    columns.push(Arc::new(projected));
794                }
795                _ => {
796                    columns.push(col.clone());
797                }
798            }
799        } else {
800            return Err(ArrowError::SchemaError(format!(
801                "field {} does not exist in the RecordBatch",
802                field.name()
803            )));
804        }
805    }
806    // Preserve the struct's validity when projecting
807    StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
808}
809
810fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
811    let left_list: &GenericListArray<T> = left.as_list();
812    let right_list: &GenericListArray<T> = right.as_list();
813    left_list.offsets().inner() == right_list.offsets().inner()
814}
815
816fn merge_list_structs_helper<T: OffsetSizeTrait>(
817    left: &dyn Array,
818    right: &dyn Array,
819    items_field_name: impl Into<String>,
820    items_nullable: bool,
821) -> Arc<dyn Array> {
822    let left_list: &GenericListArray<T> = left.as_list();
823    let right_list: &GenericListArray<T> = right.as_list();
824    let left_struct = left_list.values();
825    let right_struct = right_list.values();
826    let left_struct_arr = left_struct.as_struct();
827    let right_struct_arr = right_struct.as_struct();
828    let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
829    let items_field = Arc::new(Field::new(
830        items_field_name,
831        merged_items.data_type().clone(),
832        items_nullable,
833    ));
834    Arc::new(GenericListArray::<T>::new(
835        items_field,
836        left_list.offsets().clone(),
837        merged_items,
838        left_list.nulls().cloned(),
839    ))
840}
841
842fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
843    left: &dyn Array,
844    right: &dyn Array,
845    not_null: &dyn Array,
846    items_field_name: impl Into<String>,
847) -> Arc<dyn Array> {
848    let left_list: &GenericListArray<T> = left.as_list::<T>();
849    let not_null_list = not_null.as_list::<T>();
850    let right_list = right.as_list::<T>();
851
852    let left_struct = left_list.values().as_struct();
853    let not_null_struct: &StructArray = not_null_list.values().as_struct();
854    let right_struct = right_list.values().as_struct();
855
856    let values_len = not_null_list.values().len();
857    let mut merged_fields =
858        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
859    let mut merged_columns =
860        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
861
862    for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
863        merged_fields.push(field.clone());
864        if let Some(val) = not_null_struct.column_by_name(field.name()) {
865            merged_columns.push(val.clone());
866        } else {
867            merged_columns.push(new_null_array(field.data_type(), values_len))
868        }
869    }
870    for (_, field) in right_struct
871        .columns()
872        .iter()
873        .zip(right_struct.fields())
874        .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
875    {
876        merged_fields.push(field.clone());
877        if let Some(val) = not_null_struct.column_by_name(field.name()) {
878            merged_columns.push(val.clone());
879        } else {
880            merged_columns.push(new_null_array(field.data_type(), values_len));
881        }
882    }
883
884    let merged_struct = Arc::new(StructArray::new(
885        Fields::from(merged_fields),
886        merged_columns,
887        not_null_struct.nulls().cloned(),
888    ));
889    let items_field = Arc::new(Field::new(
890        items_field_name,
891        merged_struct.data_type().clone(),
892        true,
893    ));
894    Arc::new(GenericListArray::<T>::new(
895        items_field,
896        not_null_list.offsets().clone(),
897        merged_struct,
898        not_null_list.nulls().cloned(),
899    ))
900}
901
902fn merge_list_struct_null(
903    left: &dyn Array,
904    right: &dyn Array,
905    not_null: &dyn Array,
906) -> Arc<dyn Array> {
907    match left.data_type() {
908        DataType::List(left_field) => {
909            merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
910        }
911        DataType::LargeList(left_field) => {
912            merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
913        }
914        _ => unreachable!(),
915    }
916}
917
918fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
919    // Merging fields into a list<struct<...>> is tricky and can only succeed
920    // in two ways.  First, if both lists have the same offsets.  Second, if
921    // one of the lists is all-null
922    if left.null_count() == left.len() {
923        return merge_list_struct_null(left, right, right);
924    } else if right.null_count() == right.len() {
925        return merge_list_struct_null(left, right, left);
926    }
927    match (left.data_type(), right.data_type()) {
928        (DataType::List(left_field), DataType::List(_)) => {
929            if !lists_have_same_offsets_helper::<i32>(left, right) {
930                panic!("Attempt to merge list struct arrays which do not have same offsets");
931            }
932            merge_list_structs_helper::<i32>(
933                left,
934                right,
935                left_field.name(),
936                left_field.is_nullable(),
937            )
938        }
939        (DataType::LargeList(left_field), DataType::LargeList(_)) => {
940            if !lists_have_same_offsets_helper::<i64>(left, right) {
941                panic!("Attempt to merge list struct arrays which do not have same offsets");
942            }
943            merge_list_structs_helper::<i64>(
944                left,
945                right,
946                left_field.name(),
947                left_field.is_nullable(),
948            )
949        }
950        _ => unreachable!(),
951    }
952}
953
954/// Helper function to normalize validity buffers
955/// Returns None for all-null validity (placeholder structs)
956fn normalize_validity(
957    validity: Option<&arrow_buffer::NullBuffer>,
958) -> Option<&arrow_buffer::NullBuffer> {
959    validity.and_then(|v| {
960        if v.null_count() == v.len() {
961            None
962        } else {
963            Some(v)
964        }
965    })
966}
967
968/// Helper function to merge validity buffers from two struct arrays
969/// Returns None only if both arrays are null at the same position
970///
971/// Special handling for placeholder structs (all-null validity)
972fn merge_struct_validity(
973    left_validity: Option<&arrow_buffer::NullBuffer>,
974    right_validity: Option<&arrow_buffer::NullBuffer>,
975) -> Option<arrow_buffer::NullBuffer> {
976    // Normalize both validity buffers (convert all-null to None)
977    let left_normalized = normalize_validity(left_validity);
978    let right_normalized = normalize_validity(right_validity);
979
980    match (left_normalized, right_normalized) {
981        // Fast paths: no computation needed
982        (None, None) => None,
983        (Some(left), None) => Some(left.clone()),
984        (None, Some(right)) => Some(right.clone()),
985        (Some(left), Some(right)) => {
986            // Fast path: if both have no nulls, can return either one
987            if left.null_count() == 0 && right.null_count() == 0 {
988                return Some(left.clone());
989            }
990
991            let left_buffer = left.inner();
992            let right_buffer = right.inner();
993
994            // Perform bitwise OR directly on BooleanBuffers
995            // This preserves the correct semantics: 1 = valid, 0 = null
996            let merged_buffer = left_buffer | right_buffer;
997
998            Some(arrow_buffer::NullBuffer::from(merged_buffer))
999        }
1000    }
1001}
1002
1003fn merge_list_child_values(
1004    child_field: &Field,
1005    left_values: ArrayRef,
1006    right_values: ArrayRef,
1007) -> ArrayRef {
1008    match child_field.data_type() {
1009        DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1010            left_values.as_struct(),
1011            right_values.as_struct(),
1012            child_fields,
1013        )) as ArrayRef,
1014        DataType::List(grandchild) => {
1015            let left_list = left_values
1016                .as_any()
1017                .downcast_ref::<ListArray>()
1018                .expect("left list values should be ListArray");
1019            let right_list = right_values
1020                .as_any()
1021                .downcast_ref::<ListArray>()
1022                .expect("right list values should be ListArray");
1023            let merged_values = merge_list_child_values(
1024                grandchild.as_ref(),
1025                left_list.values().clone(),
1026                right_list.values().clone(),
1027            );
1028            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1029            Arc::new(ListArray::new(
1030                grandchild.clone(),
1031                left_list.offsets().clone(),
1032                merged_values,
1033                merged_validity,
1034            )) as ArrayRef
1035        }
1036        DataType::LargeList(grandchild) => {
1037            let left_list = left_values
1038                .as_any()
1039                .downcast_ref::<LargeListArray>()
1040                .expect("left list values should be LargeListArray");
1041            let right_list = right_values
1042                .as_any()
1043                .downcast_ref::<LargeListArray>()
1044                .expect("right list values should be LargeListArray");
1045            let merged_values = merge_list_child_values(
1046                grandchild.as_ref(),
1047                left_list.values().clone(),
1048                right_list.values().clone(),
1049            );
1050            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1051            Arc::new(LargeListArray::new(
1052                grandchild.clone(),
1053                left_list.offsets().clone(),
1054                merged_values,
1055                merged_validity,
1056            )) as ArrayRef
1057        }
1058        DataType::FixedSizeList(grandchild, list_size) => {
1059            let left_list = left_values
1060                .as_any()
1061                .downcast_ref::<FixedSizeListArray>()
1062                .expect("left list values should be FixedSizeListArray");
1063            let right_list = right_values
1064                .as_any()
1065                .downcast_ref::<FixedSizeListArray>()
1066                .expect("right list values should be FixedSizeListArray");
1067            let merged_values = merge_list_child_values(
1068                grandchild.as_ref(),
1069                left_list.values().clone(),
1070                right_list.values().clone(),
1071            );
1072            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1073            Arc::new(FixedSizeListArray::new(
1074                grandchild.clone(),
1075                *list_size,
1076                merged_values,
1077                merged_validity,
1078            )) as ArrayRef
1079        }
1080        _ => left_values.clone(),
1081    }
1082}
1083
1084// Helper function to adjust child array validity based on parent struct validity
1085// When parent struct is null, propagates null to child array
1086// Optimized with fast paths and SIMD operations
1087fn adjust_child_validity(
1088    child: &ArrayRef,
1089    parent_validity: Option<&arrow_buffer::NullBuffer>,
1090) -> ArrayRef {
1091    // Fast path: no parent validity means no adjustment needed
1092    let parent_validity = match parent_validity {
1093        None => return child.clone(),
1094        Some(p) if p.null_count() == 0 => return child.clone(), // No nulls to propagate
1095        Some(p) => p,
1096    };
1097
1098    let child_validity = child.nulls();
1099
1100    // Compute the new validity: child_validity AND parent_validity
1101    let new_validity = match child_validity {
1102        None => {
1103            // Fast path: child has no existing validity, just use parent's
1104            parent_validity.clone()
1105        }
1106        Some(child_nulls) => {
1107            let child_buffer = child_nulls.inner();
1108            let parent_buffer = parent_validity.inner();
1109
1110            // Perform bitwise AND directly on BooleanBuffers
1111            // This preserves the correct semantics: 1 = valid, 0 = null
1112            let merged_buffer = child_buffer & parent_buffer;
1113
1114            arrow_buffer::NullBuffer::from(merged_buffer)
1115        }
1116    };
1117
1118    // Create new array with adjusted validity
1119    arrow_array::make_array(
1120        arrow_data::ArrayData::try_new(
1121            child.data_type().clone(),
1122            child.len(),
1123            Some(new_validity.into_inner().into_inner()),
1124            child.offset(),
1125            child.to_data().buffers().to_vec(),
1126            child.to_data().child_data().to_vec(),
1127        )
1128        .unwrap(),
1129    )
1130}
1131
1132fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1133    let mut fields: Vec<Field> = vec![];
1134    let mut columns: Vec<ArrayRef> = vec![];
1135    let right_fields = right_struct_array.fields();
1136    let right_columns = right_struct_array.columns();
1137
1138    // Get the validity buffers from both structs
1139    let left_validity = left_struct_array.nulls();
1140    let right_validity = right_struct_array.nulls();
1141
1142    // Compute merged validity
1143    let merged_validity = merge_struct_validity(left_validity, right_validity);
1144
1145    // iterate through the fields on the left hand side
1146    for (left_field, left_column) in left_struct_array
1147        .fields()
1148        .iter()
1149        .zip(left_struct_array.columns().iter())
1150    {
1151        match right_fields
1152            .iter()
1153            .position(|f| f.name() == left_field.name())
1154        {
1155            // if the field exists on the right hand side, merge them recursively if appropriate
1156            Some(right_index) => {
1157                let right_field = right_fields.get(right_index).unwrap();
1158                let right_column = right_columns.get(right_index).unwrap();
1159                // if both fields are struct, merge them recursively
1160                match (left_field.data_type(), right_field.data_type()) {
1161                    (DataType::Struct(_), DataType::Struct(_)) => {
1162                        let left_sub_array = left_column.as_struct();
1163                        let right_sub_array = right_column.as_struct();
1164                        let merged_sub_array = merge(left_sub_array, right_sub_array);
1165                        fields.push(Field::new(
1166                            left_field.name(),
1167                            merged_sub_array.data_type().clone(),
1168                            left_field.is_nullable(),
1169                        ));
1170                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1171                    }
1172                    (DataType::List(left_list), DataType::List(right_list))
1173                        if left_list.data_type().is_struct()
1174                            && right_list.data_type().is_struct() =>
1175                    {
1176                        // If there is nothing to merge just use the left field
1177                        if left_list.data_type() == right_list.data_type() {
1178                            fields.push(left_field.as_ref().clone());
1179                            columns.push(left_column.clone());
1180                        }
1181                        // If we have two List<Struct> and they have different sets of fields then
1182                        // we can merge them if the offsets arrays are the same.  Otherwise, we
1183                        // have to consider it an error.
1184                        let merged_sub_array = merge_list_struct(&left_column, &right_column);
1185
1186                        fields.push(Field::new(
1187                            left_field.name(),
1188                            merged_sub_array.data_type().clone(),
1189                            left_field.is_nullable(),
1190                        ));
1191                        columns.push(merged_sub_array);
1192                    }
1193                    // otherwise, just use the field on the left hand side
1194                    _ => {
1195                        // TODO handle list-of-struct and other types
1196                        fields.push(left_field.as_ref().clone());
1197                        // Adjust the column validity: if left struct was null, propagate to child
1198                        let adjusted_column = adjust_child_validity(left_column, left_validity);
1199                        columns.push(adjusted_column);
1200                    }
1201                }
1202            }
1203            None => {
1204                fields.push(left_field.as_ref().clone());
1205                // Adjust the column validity: if left struct was null, propagate to child
1206                let adjusted_column = adjust_child_validity(left_column, left_validity);
1207                columns.push(adjusted_column);
1208            }
1209        }
1210    }
1211
1212    // now iterate through the fields on the right hand side
1213    right_fields
1214        .iter()
1215        .zip(right_columns.iter())
1216        .for_each(|(field, column)| {
1217            // add new columns on the right
1218            if !left_struct_array
1219                .fields()
1220                .iter()
1221                .any(|f| f.name() == field.name())
1222            {
1223                fields.push(field.as_ref().clone());
1224                // This field doesn't exist on the left
1225                // We use the right's column but need to adjust for struct validity
1226                let adjusted_column = adjust_child_validity(column, right_validity);
1227                columns.push(adjusted_column);
1228            }
1229        });
1230
1231    StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1232}
1233
1234fn merge_with_schema(
1235    left_struct_array: &StructArray,
1236    right_struct_array: &StructArray,
1237    fields: &Fields,
1238) -> StructArray {
1239    // Helper function that returns true if both types are struct or both are non-struct
1240    fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1241        match (left, right) {
1242            (DataType::Struct(_), DataType::Struct(_)) => true,
1243            (DataType::Struct(_), _) => false,
1244            (_, DataType::Struct(_)) => false,
1245            _ => true,
1246        }
1247    }
1248
1249    let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1250    let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1251
1252    let left_fields = left_struct_array.fields();
1253    let left_columns = left_struct_array.columns();
1254    let right_fields = right_struct_array.fields();
1255    let right_columns = right_struct_array.columns();
1256
1257    // Get the validity buffers from both structs
1258    let left_validity = left_struct_array.nulls();
1259    let right_validity = right_struct_array.nulls();
1260
1261    // Compute merged validity
1262    let merged_validity = merge_struct_validity(left_validity, right_validity);
1263
1264    for field in fields {
1265        let left_match_idx = left_fields.iter().position(|f| {
1266            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1267        });
1268        let right_match_idx = right_fields.iter().position(|f| {
1269            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1270        });
1271
1272        match (left_match_idx, right_match_idx) {
1273            (None, Some(right_idx)) => {
1274                output_fields.push(right_fields[right_idx].as_ref().clone());
1275                // Adjust validity if the right struct was null
1276                let adjusted_column =
1277                    adjust_child_validity(&right_columns[right_idx], right_validity);
1278                columns.push(adjusted_column);
1279            }
1280            (Some(left_idx), None) => {
1281                output_fields.push(left_fields[left_idx].as_ref().clone());
1282                // Adjust validity if the left struct was null
1283                let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1284                columns.push(adjusted_column);
1285            }
1286            (Some(left_idx), Some(right_idx)) => {
1287                match field.data_type() {
1288                    DataType::Struct(child_fields) => {
1289                        let left_sub_array = left_columns[left_idx].as_struct();
1290                        let right_sub_array = right_columns[right_idx].as_struct();
1291                        let merged_sub_array =
1292                            merge_with_schema(left_sub_array, right_sub_array, child_fields);
1293                        output_fields.push(Field::new(
1294                            field.name(),
1295                            merged_sub_array.data_type().clone(),
1296                            field.is_nullable(),
1297                        ));
1298                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1299                    }
1300                    DataType::List(child_field) => {
1301                        let left_list = left_columns[left_idx]
1302                            .as_any()
1303                            .downcast_ref::<ListArray>()
1304                            .unwrap();
1305                        let right_list = right_columns[right_idx]
1306                            .as_any()
1307                            .downcast_ref::<ListArray>()
1308                            .unwrap();
1309                        let merged_values = merge_list_child_values(
1310                            child_field.as_ref(),
1311                            left_list.values().clone(),
1312                            right_list.values().clone(),
1313                        );
1314                        let merged_validity =
1315                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1316                        let merged_list = ListArray::new(
1317                            child_field.clone(),
1318                            left_list.offsets().clone(),
1319                            merged_values,
1320                            merged_validity,
1321                        );
1322                        output_fields.push(field.as_ref().clone());
1323                        columns.push(Arc::new(merged_list) as ArrayRef);
1324                    }
1325                    DataType::LargeList(child_field) => {
1326                        let left_list = left_columns[left_idx]
1327                            .as_any()
1328                            .downcast_ref::<LargeListArray>()
1329                            .unwrap();
1330                        let right_list = right_columns[right_idx]
1331                            .as_any()
1332                            .downcast_ref::<LargeListArray>()
1333                            .unwrap();
1334                        let merged_values = merge_list_child_values(
1335                            child_field.as_ref(),
1336                            left_list.values().clone(),
1337                            right_list.values().clone(),
1338                        );
1339                        let merged_validity =
1340                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1341                        let merged_list = LargeListArray::new(
1342                            child_field.clone(),
1343                            left_list.offsets().clone(),
1344                            merged_values,
1345                            merged_validity,
1346                        );
1347                        output_fields.push(field.as_ref().clone());
1348                        columns.push(Arc::new(merged_list) as ArrayRef);
1349                    }
1350                    DataType::FixedSizeList(child_field, list_size) => {
1351                        let left_list = left_columns[left_idx]
1352                            .as_any()
1353                            .downcast_ref::<FixedSizeListArray>()
1354                            .unwrap();
1355                        let right_list = right_columns[right_idx]
1356                            .as_any()
1357                            .downcast_ref::<FixedSizeListArray>()
1358                            .unwrap();
1359                        let merged_values = merge_list_child_values(
1360                            child_field.as_ref(),
1361                            left_list.values().clone(),
1362                            right_list.values().clone(),
1363                        );
1364                        let merged_validity =
1365                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1366                        let merged_list = FixedSizeListArray::new(
1367                            child_field.clone(),
1368                            *list_size,
1369                            merged_values,
1370                            merged_validity,
1371                        );
1372                        output_fields.push(field.as_ref().clone());
1373                        columns.push(Arc::new(merged_list) as ArrayRef);
1374                    }
1375                    _ => {
1376                        output_fields.push(left_fields[left_idx].as_ref().clone());
1377                        // For fields that exist in both, use left but adjust validity
1378                        let adjusted_column =
1379                            adjust_child_validity(&left_columns[left_idx], left_validity);
1380                        columns.push(adjusted_column);
1381                    }
1382                }
1383            }
1384            (None, None) => {
1385                // The field will not be included in the output
1386            }
1387        }
1388    }
1389
1390    StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1391}
1392
1393fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1394    if components.is_empty() {
1395        return Some(array);
1396    }
1397    if !matches!(array.data_type(), DataType::Struct(_)) {
1398        return None;
1399    }
1400    let struct_arr = array.as_struct();
1401    struct_arr
1402        .column_by_name(components[0])
1403        .and_then(|arr| get_sub_array(arr, &components[1..]))
1404}
1405
1406/// Interleave multiple RecordBatches into a single RecordBatch.
1407///
1408/// Behaves like [`arrow::compute::interleave`], but for RecordBatches.
1409pub fn interleave_batches(
1410    batches: &[RecordBatch],
1411    indices: &[(usize, usize)],
1412) -> Result<RecordBatch> {
1413    let first_batch = batches.first().ok_or_else(|| {
1414        ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1415    })?;
1416    let schema = first_batch.schema();
1417    let num_columns = first_batch.num_columns();
1418    let mut columns = Vec::with_capacity(num_columns);
1419    let mut chunks = Vec::with_capacity(batches.len());
1420
1421    for i in 0..num_columns {
1422        for batch in batches {
1423            chunks.push(batch.column(i).as_ref());
1424        }
1425        let new_column = interleave(&chunks, indices)?;
1426        columns.push(new_column);
1427        chunks.clear();
1428    }
1429
1430    RecordBatch::try_new(schema, columns)
1431}
1432
1433pub trait BufferExt {
1434    /// Create an `arrow_buffer::Buffer`` from a `bytes::Bytes` object
1435    ///
1436    /// The alignment must be specified (as `bytes_per_value`) since we want to make
1437    /// sure we can safely reinterpret the buffer.
1438    ///
1439    /// If the buffer is properly aligned this will be zero-copy.  If not, a copy
1440    /// will be made and an owned buffer returned.
1441    ///
1442    /// If `bytes_per_value` is not a power of two, then we assume the buffer is
1443    /// never going to be reinterpreted into another type and we can safely
1444    /// ignore the alignment.
1445    ///
1446    /// Yes, the method name is odd.  It's because there is already a `from_bytes`
1447    /// which converts from `arrow_buffer::bytes::Bytes` (not `bytes::Bytes`)
1448    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1449
1450    /// Allocates a new properly aligned arrow buffer and copies `bytes` into it
1451    ///
1452    /// `size_bytes` can be larger than `bytes` and, if so, the trailing bytes will
1453    /// be zeroed out.
1454    ///
1455    /// # Panics
1456    ///
1457    /// Panics if `size_bytes` is less than `bytes.len()`
1458    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1459}
1460
1461fn is_pwr_two(n: u64) -> bool {
1462    n & (n - 1) == 0
1463}
1464
1465impl BufferExt for arrow_buffer::Buffer {
1466    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1467        if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1468        {
1469            // The original buffer is not aligned, cannot zero-copy
1470            let size_bytes = bytes.len();
1471            Self::copy_bytes_bytes(bytes, size_bytes)
1472        } else {
1473            // The original buffer is aligned, can zero-copy
1474            // SAFETY: the alignment is correct we can make this conversion
1475            unsafe {
1476                Self::from_custom_allocation(
1477                    NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1478                    bytes.len(),
1479                    Arc::new(bytes),
1480                )
1481            }
1482        }
1483    }
1484
1485    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1486        assert!(size_bytes >= bytes.len());
1487        let mut buf = MutableBuffer::with_capacity(size_bytes);
1488        let to_fill = size_bytes - bytes.len();
1489        buf.extend(bytes);
1490        buf.extend(std::iter::repeat_n(0_u8, to_fill));
1491
1492        // FIX for issue #4512: Shrink buffer to actual size before converting to immutable
1493        // This reduces memory overhead from capacity over-allocation
1494        buf.shrink_to_fit();
1495
1496        Self::from(buf)
1497    }
1498}
1499
1500#[cfg(test)]
1501mod tests {
1502    use super::*;
1503    use arrow_array::{new_empty_array, new_null_array, ListArray, StringArray};
1504    use arrow_array::{Float32Array, Int32Array, StructArray};
1505    use arrow_buffer::OffsetBuffer;
1506
1507    #[test]
1508    fn test_merge_recursive() {
1509        let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1510        let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1511        let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1512        let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1513
1514        let left_schema = Schema::new(vec![
1515            Field::new("a", DataType::Int32, true),
1516            Field::new(
1517                "b",
1518                DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1519                true,
1520            ),
1521        ]);
1522        let left_batch = RecordBatch::try_new(
1523            Arc::new(left_schema),
1524            vec![
1525                Arc::new(a_array.clone()),
1526                Arc::new(StructArray::from(vec![(
1527                    Arc::new(Field::new("c", DataType::Int32, true)),
1528                    Arc::new(c_array.clone()) as ArrayRef,
1529                )])),
1530            ],
1531        )
1532        .unwrap();
1533
1534        let right_schema = Schema::new(vec![
1535            Field::new("e", DataType::Int32, true),
1536            Field::new(
1537                "b",
1538                DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1539                true,
1540            ),
1541        ]);
1542        let right_batch = RecordBatch::try_new(
1543            Arc::new(right_schema),
1544            vec![
1545                Arc::new(e_array.clone()),
1546                Arc::new(StructArray::from(vec![(
1547                    Arc::new(Field::new("d", DataType::Utf8, true)),
1548                    Arc::new(d_array.clone()) as ArrayRef,
1549                )])) as ArrayRef,
1550            ],
1551        )
1552        .unwrap();
1553
1554        let merged_schema = Schema::new(vec![
1555            Field::new("a", DataType::Int32, true),
1556            Field::new(
1557                "b",
1558                DataType::Struct(
1559                    vec![
1560                        Field::new("c", DataType::Int32, true),
1561                        Field::new("d", DataType::Utf8, true),
1562                    ]
1563                    .into(),
1564                ),
1565                true,
1566            ),
1567            Field::new("e", DataType::Int32, true),
1568        ]);
1569        let merged_batch = RecordBatch::try_new(
1570            Arc::new(merged_schema),
1571            vec![
1572                Arc::new(a_array) as ArrayRef,
1573                Arc::new(StructArray::from(vec![
1574                    (
1575                        Arc::new(Field::new("c", DataType::Int32, true)),
1576                        Arc::new(c_array) as ArrayRef,
1577                    ),
1578                    (
1579                        Arc::new(Field::new("d", DataType::Utf8, true)),
1580                        Arc::new(d_array) as ArrayRef,
1581                    ),
1582                ])) as ArrayRef,
1583                Arc::new(e_array) as ArrayRef,
1584            ],
1585        )
1586        .unwrap();
1587
1588        let result = left_batch.merge(&right_batch).unwrap();
1589        assert_eq!(result, merged_batch);
1590    }
1591
1592    #[test]
1593    fn test_merge_with_schema() {
1594        fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1595            let fields: Fields = names
1596                .iter()
1597                .zip(types)
1598                .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1599                .collect();
1600            let schema = Schema::new(vec![Field::new(
1601                "struct",
1602                DataType::Struct(fields.clone()),
1603                false,
1604            )]);
1605            let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1606            let batch = RecordBatch::try_new(
1607                Arc::new(schema.clone()),
1608                vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1609            );
1610            (schema, batch.unwrap())
1611        }
1612
1613        let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1614        let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1615        let (output_schema, _) = test_batch(
1616            &["b", "a", "c"],
1617            &[DataType::Int64, DataType::Int32, DataType::Int32],
1618        );
1619
1620        // If we use merge_with_schema the schema is respected
1621        let merged = left_batch
1622            .merge_with_schema(&right_batch, &output_schema)
1623            .unwrap();
1624        assert_eq!(merged.schema().as_ref(), &output_schema);
1625
1626        // If we use merge we get first-come first-serve based on the left batch
1627        let (naive_schema, _) = test_batch(
1628            &["a", "b", "c"],
1629            &[DataType::Int32, DataType::Int64, DataType::Int32],
1630        );
1631        let merged = left_batch.merge(&right_batch).unwrap();
1632        assert_eq!(merged.schema().as_ref(), &naive_schema);
1633    }
1634
1635    #[test]
1636    fn test_merge_list_struct() {
1637        let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1638        let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1639        let x_struct_field = Arc::new(Field::new(
1640            "item",
1641            DataType::Struct(Fields::from(vec![x_field.clone()])),
1642            true,
1643        ));
1644        let y_struct_field = Arc::new(Field::new(
1645            "item",
1646            DataType::Struct(Fields::from(vec![y_field.clone()])),
1647            true,
1648        ));
1649        let both_struct_field = Arc::new(Field::new(
1650            "item",
1651            DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1652            true,
1653        ));
1654        let left_schema = Schema::new(vec![Field::new(
1655            "list_struct",
1656            DataType::List(x_struct_field.clone()),
1657            true,
1658        )]);
1659        let right_schema = Schema::new(vec![Field::new(
1660            "list_struct",
1661            DataType::List(y_struct_field.clone()),
1662            true,
1663        )]);
1664        let both_schema = Schema::new(vec![Field::new(
1665            "list_struct",
1666            DataType::List(both_struct_field.clone()),
1667            true,
1668        )]);
1669
1670        let x = Arc::new(Int32Array::from(vec![1]));
1671        let y = Arc::new(Int32Array::from(vec![2]));
1672        let x_struct = Arc::new(StructArray::new(
1673            Fields::from(vec![x_field.clone()]),
1674            vec![x.clone()],
1675            None,
1676        ));
1677        let y_struct = Arc::new(StructArray::new(
1678            Fields::from(vec![y_field.clone()]),
1679            vec![y.clone()],
1680            None,
1681        ));
1682        let both_struct = Arc::new(StructArray::new(
1683            Fields::from(vec![x_field.clone(), y_field.clone()]),
1684            vec![x.clone(), y],
1685            None,
1686        ));
1687        let both_null_struct = Arc::new(StructArray::new(
1688            Fields::from(vec![x_field, y_field]),
1689            vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1690            None,
1691        ));
1692        let offsets = OffsetBuffer::from_lengths([1]);
1693        let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1694        let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1695        let both_list = ListArray::new(
1696            both_struct_field.clone(),
1697            offsets.clone(),
1698            both_struct,
1699            None,
1700        );
1701        let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1702        let x_batch =
1703            RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1704        let y_batch = RecordBatch::try_new(
1705            Arc::new(right_schema.clone()),
1706            vec![Arc::new(y_s_list.clone())],
1707        )
1708        .unwrap();
1709        let merged = x_batch.merge(&y_batch).unwrap();
1710        let expected =
1711            RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1712        assert_eq!(merged, expected);
1713
1714        let y_null_list = new_null_array(y_s_list.data_type(), 1);
1715        let y_null_batch =
1716            RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1717                .unwrap();
1718        let expected =
1719            RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1720        let merged = x_batch.merge(&y_null_batch).unwrap();
1721        assert_eq!(merged, expected);
1722    }
1723
1724    #[test]
1725    fn test_byte_width_opt() {
1726        assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1727        assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1728        assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1729        assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1730        assert_eq!(DataType::Utf8.byte_width_opt(), None);
1731        assert_eq!(DataType::Binary.byte_width_opt(), None);
1732        assert_eq!(
1733            DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1734            None
1735        );
1736        assert_eq!(
1737            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1738                .byte_width_opt(),
1739            Some(12)
1740        );
1741        assert_eq!(
1742            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1743                .byte_width_opt(),
1744            Some(16)
1745        );
1746        assert_eq!(
1747            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1748                .byte_width_opt(),
1749            None
1750        );
1751    }
1752
1753    #[test]
1754    fn test_take_record_batch() {
1755        let schema = Arc::new(Schema::new(vec![
1756            Field::new("a", DataType::Int32, true),
1757            Field::new("b", DataType::Utf8, true),
1758        ]));
1759        let batch = RecordBatch::try_new(
1760            schema.clone(),
1761            vec![
1762                Arc::new(Int32Array::from_iter_values(0..20)),
1763                Arc::new(StringArray::from_iter_values(
1764                    (0..20).map(|i| format!("str-{}", i)),
1765                )),
1766            ],
1767        )
1768        .unwrap();
1769        let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1770        assert_eq!(
1771            taken,
1772            RecordBatch::try_new(
1773                schema,
1774                vec![
1775                    Arc::new(Int32Array::from(vec![1, 5, 10])),
1776                    Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1777                ],
1778            )
1779            .unwrap()
1780        )
1781    }
1782
1783    #[test]
1784    fn test_schema_project_by_schema() {
1785        let metadata = [("key".to_string(), "value".to_string())];
1786        let schema = Arc::new(
1787            Schema::new(vec![
1788                Field::new("a", DataType::Int32, true),
1789                Field::new("b", DataType::Utf8, true),
1790            ])
1791            .with_metadata(metadata.clone().into()),
1792        );
1793        let batch = RecordBatch::try_new(
1794            schema,
1795            vec![
1796                Arc::new(Int32Array::from_iter_values(0..20)),
1797                Arc::new(StringArray::from_iter_values(
1798                    (0..20).map(|i| format!("str-{}", i)),
1799                )),
1800            ],
1801        )
1802        .unwrap();
1803
1804        // Empty schema
1805        let empty_schema = Schema::empty();
1806        let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1807        let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1808        assert_eq!(
1809            empty_projected,
1810            RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1811                .with_schema(Arc::new(expected_schema))
1812                .unwrap()
1813        );
1814
1815        // Re-ordered schema
1816        let reordered_schema = Schema::new(vec![
1817            Field::new("b", DataType::Utf8, true),
1818            Field::new("a", DataType::Int32, true),
1819        ]);
1820        let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1821        let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1822        assert_eq!(
1823            reordered_projected,
1824            RecordBatch::try_new(
1825                expected_schema,
1826                vec![
1827                    Arc::new(StringArray::from_iter_values(
1828                        (0..20).map(|i| format!("str-{}", i)),
1829                    )),
1830                    Arc::new(Int32Array::from_iter_values(0..20)),
1831                ],
1832            )
1833            .unwrap()
1834        );
1835
1836        // Sub schema
1837        let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1838        let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1839        let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1840        assert_eq!(
1841            sub_projected,
1842            RecordBatch::try_new(
1843                expected_schema,
1844                vec![Arc::new(Int32Array::from_iter_values(0..20))],
1845            )
1846            .unwrap()
1847        );
1848    }
1849
1850    #[test]
1851    fn test_project_preserves_struct_validity() {
1852        // Test that projecting a struct array preserves its validity (fix for issue #4385)
1853        let fields = Fields::from(vec![
1854            Field::new("id", DataType::Int32, false),
1855            Field::new("value", DataType::Float32, true),
1856        ]);
1857
1858        // Create a struct array with validity
1859        let id_array = Int32Array::from(vec![1, 2, 3]);
1860        let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1861        let struct_array = StructArray::new(
1862            fields.clone(),
1863            vec![
1864                Arc::new(id_array) as ArrayRef,
1865                Arc::new(value_array) as ArrayRef,
1866            ],
1867            Some(vec![true, false, true].into()), // Second struct is null
1868        );
1869
1870        // Project the struct array
1871        let projected = project(&struct_array, &fields).unwrap();
1872
1873        // Verify the validity is preserved
1874        assert_eq!(projected.null_count(), 1);
1875        assert!(!projected.is_null(0));
1876        assert!(projected.is_null(1));
1877        assert!(!projected.is_null(2));
1878    }
1879
1880    #[test]
1881    fn test_merge_struct_with_different_validity() {
1882        // Test case from Weston's review comment
1883        // File 1 has height field with some nulls
1884        let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1885        let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1886        let left_struct = StructArray::new(
1887            left_fields,
1888            vec![Arc::new(height_array) as ArrayRef],
1889            Some(vec![true, false, true, false].into()), // Rows 2 and 4 are null structs
1890        );
1891
1892        // File 2 has width field with some nulls
1893        let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1894        let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1895        let right_struct = StructArray::new(
1896            right_fields,
1897            vec![Arc::new(width_array) as ArrayRef],
1898            Some(vec![true, true, false, false].into()), // Rows 3 and 4 are null structs
1899        );
1900
1901        // Merge the two structs
1902        let merged = merge(&left_struct, &right_struct);
1903
1904        // Expected:
1905        // Row 1: both non-null -> {width: 300, height: 500}
1906        // Row 2: left null, right non-null -> {width: 200, height: null}
1907        // Row 3: left non-null, right null -> {width: null, height: 600}
1908        // Row 4: both null -> null struct
1909
1910        assert_eq!(merged.null_count(), 1); // Only row 4 is null
1911        assert!(!merged.is_null(0));
1912        assert!(!merged.is_null(1));
1913        assert!(!merged.is_null(2));
1914        assert!(merged.is_null(3));
1915
1916        // Check field values
1917        let height_col = merged.column_by_name("height").unwrap();
1918        let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1919        assert_eq!(height_values.value(0), 500);
1920        assert!(height_values.is_null(1)); // height is null when left struct was null
1921        assert_eq!(height_values.value(2), 600);
1922
1923        let width_col = merged.column_by_name("width").unwrap();
1924        let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1925        assert_eq!(width_values.value(0), 300);
1926        assert_eq!(width_values.value(1), 200);
1927        assert!(width_values.is_null(2)); // width is null when right struct was null
1928    }
1929
1930    #[test]
1931    fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
1932        // left_list setup
1933        let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
1934        let left_count = Arc::new(Int32Array::from(vec![None, None]));
1935        let left_struct = Arc::new(StructArray::new(
1936            Fields::from(vec![
1937                Field::new("company_id", DataType::Int32, true),
1938                Field::new("count", DataType::Int32, true),
1939            ]),
1940            vec![left_company_id, left_count],
1941            None,
1942        ));
1943        let left_list = Arc::new(ListArray::new(
1944            Arc::new(Field::new(
1945                "item",
1946                DataType::Struct(left_struct.fields().clone()),
1947                true,
1948            )),
1949            OffsetBuffer::from_lengths([2]),
1950            left_struct,
1951            None,
1952        ));
1953
1954        // Right List Setup
1955        let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
1956        let right_struct = Arc::new(StructArray::new(
1957            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
1958            vec![right_company_name],
1959            None,
1960        ));
1961        let right_list = Arc::new(ListArray::new(
1962            Arc::new(Field::new(
1963                "item",
1964                DataType::Struct(right_struct.fields().clone()),
1965                true,
1966            )),
1967            OffsetBuffer::from_lengths([2]),
1968            right_struct,
1969            None,
1970        ));
1971
1972        let target_fields = Fields::from(vec![Field::new(
1973            "companies",
1974            DataType::List(Arc::new(Field::new(
1975                "item",
1976                DataType::Struct(Fields::from(vec![
1977                    Field::new("company_id", DataType::Int32, true),
1978                    Field::new("company_name", DataType::Utf8, true),
1979                    Field::new("count", DataType::Int32, true),
1980                ])),
1981                true,
1982            ))),
1983            true,
1984        )]);
1985
1986        let left_batch = RecordBatch::try_new(
1987            Arc::new(Schema::new(vec![Field::new(
1988                "companies",
1989                left_list.data_type().clone(),
1990                true,
1991            )])),
1992            vec![left_list as ArrayRef],
1993        )
1994        .unwrap();
1995
1996        let right_batch = RecordBatch::try_new(
1997            Arc::new(Schema::new(vec![Field::new(
1998                "companies",
1999                right_list.data_type().clone(),
2000                true,
2001            )])),
2002            vec![right_list as ArrayRef],
2003        )
2004        .unwrap();
2005
2006        let merged = left_batch
2007            .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2008            .unwrap();
2009
2010        // Verify the merged structure
2011        let merged_list = merged
2012            .column_by_name("companies")
2013            .unwrap()
2014            .as_any()
2015            .downcast_ref::<ListArray>()
2016            .unwrap();
2017        let merged_struct = merged_list.values().as_struct();
2018
2019        // Should have all 3 fields
2020        assert_eq!(merged_struct.num_columns(), 3);
2021        assert!(merged_struct.column_by_name("company_id").is_some());
2022        assert!(merged_struct.column_by_name("company_name").is_some());
2023        assert!(merged_struct.column_by_name("count").is_some());
2024
2025        // Verify values
2026        let company_id = merged_struct
2027            .column_by_name("company_id")
2028            .unwrap()
2029            .as_any()
2030            .downcast_ref::<Int32Array>()
2031            .unwrap();
2032        assert!(company_id.is_null(0));
2033        assert!(company_id.is_null(1));
2034
2035        let company_name = merged_struct
2036            .column_by_name("company_name")
2037            .unwrap()
2038            .as_any()
2039            .downcast_ref::<StringArray>()
2040            .unwrap();
2041        assert_eq!(company_name.value(0), "Google");
2042        assert_eq!(company_name.value(1), "Microsoft");
2043
2044        let count = merged_struct
2045            .column_by_name("count")
2046            .unwrap()
2047            .as_any()
2048            .downcast_ref::<Int32Array>()
2049            .unwrap();
2050        assert!(count.is_null(0));
2051        assert!(count.is_null(1));
2052    }
2053}