Skip to main content

lance_arrow/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend Arrow Functionality
5//!
6//! To improve Arrow-RS ergonomic
7
8use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12    Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray, GenericListArray,
13    LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray,
14    UInt8Array, UInt32Array, cast::AsArray,
15};
16use arrow_array::{
17    Float32Array, Float64Array, Int8Array, Int16Array, Int32Array, Int64Array, new_null_array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SortOptions};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30use crate::list::ListArrayExt;
31pub use floats::*;
32
33pub mod cast;
34pub mod json;
35pub mod list;
36pub mod memory;
37pub mod scalar;
38pub mod stream;
39pub mod r#struct;
40
41/// Arrow extension metadata key for extension name
42pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
43
44/// Arrow extension metadata key for extension metadata
45pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
46
47/// Key used by lance to mark a field as a blob
48/// TODO: Use Arrow extension mechanism instead?
49pub const BLOB_META_KEY: &str = "lance-encoding:blob";
50/// Arrow extension type name for Lance blob v2 columns
51pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
52/// Metadata key for overriding the dedicated blob size threshold (in bytes)
53pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str =
54    "lance-encoding:blob-dedicated-size-threshold";
55
56type Result<T> = std::result::Result<T, ArrowError>;
57
58pub trait DataTypeExt {
59    /// Returns true if the data type is binary-like, such as (Large)Utf8 and (Large)Binary.
60    ///
61    /// ```
62    /// use lance_arrow::*;
63    /// use arrow_schema::DataType;
64    ///
65    /// assert!(DataType::Utf8.is_binary_like());
66    /// assert!(DataType::Binary.is_binary_like());
67    /// assert!(DataType::LargeUtf8.is_binary_like());
68    /// assert!(DataType::LargeBinary.is_binary_like());
69    /// assert!(!DataType::Int32.is_binary_like());
70    /// ```
71    fn is_binary_like(&self) -> bool;
72
73    /// Returns true if the data type is a struct.
74    fn is_struct(&self) -> bool;
75
76    /// Check whether the given Arrow DataType is fixed stride.
77    ///
78    /// A fixed stride type has the same byte width for all array elements
79    /// This includes all PrimitiveType's Boolean, FixedSizeList, FixedSizeBinary, and Decimals
80    fn is_fixed_stride(&self) -> bool;
81
82    /// Returns true if the [DataType] is a dictionary type.
83    fn is_dictionary(&self) -> bool;
84
85    /// Returns the byte width of the data type
86    /// Panics if the data type is not fixed stride.
87    fn byte_width(&self) -> usize;
88
89    /// Returns the byte width of the data type, if it is fixed stride.
90    /// Returns None if the data type is not fixed stride.
91    fn byte_width_opt(&self) -> Option<usize>;
92}
93
94impl DataTypeExt for DataType {
95    fn is_binary_like(&self) -> bool {
96        use DataType::*;
97        matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
98    }
99
100    fn is_struct(&self) -> bool {
101        matches!(self, Self::Struct(_))
102    }
103
104    fn is_fixed_stride(&self) -> bool {
105        use DataType::*;
106        matches!(
107            self,
108            Boolean
109                | UInt8
110                | UInt16
111                | UInt32
112                | UInt64
113                | Int8
114                | Int16
115                | Int32
116                | Int64
117                | Float16
118                | Float32
119                | Float64
120                | Decimal128(_, _)
121                | Decimal256(_, _)
122                | FixedSizeList(_, _)
123                | FixedSizeBinary(_)
124                | Duration(_)
125                | Timestamp(_, _)
126                | Date32
127                | Date64
128                | Time32(_)
129                | Time64(_)
130        )
131    }
132
133    fn is_dictionary(&self) -> bool {
134        matches!(self, Self::Dictionary(_, _))
135    }
136
137    fn byte_width_opt(&self) -> Option<usize> {
138        match self {
139            Self::Int8 => Some(1),
140            Self::Int16 => Some(2),
141            Self::Int32 => Some(4),
142            Self::Int64 => Some(8),
143            Self::UInt8 => Some(1),
144            Self::UInt16 => Some(2),
145            Self::UInt32 => Some(4),
146            Self::UInt64 => Some(8),
147            Self::Float16 => Some(2),
148            Self::Float32 => Some(4),
149            Self::Float64 => Some(8),
150            Self::Date32 => Some(4),
151            Self::Date64 => Some(8),
152            Self::Time32(_) => Some(4),
153            Self::Time64(_) => Some(8),
154            Self::Timestamp(_, _) => Some(8),
155            Self::Duration(_) => Some(8),
156            Self::Decimal128(_, _) => Some(16),
157            Self::Decimal256(_, _) => Some(32),
158            Self::Interval(unit) => match unit {
159                IntervalUnit::YearMonth => Some(4),
160                IntervalUnit::DayTime => Some(8),
161                IntervalUnit::MonthDayNano => Some(16),
162            },
163            Self::FixedSizeBinary(s) => Some(*s as usize),
164            Self::FixedSizeList(dt, s) => dt
165                .data_type()
166                .byte_width_opt()
167                .map(|width| width * *s as usize),
168            _ => None,
169        }
170    }
171
172    fn byte_width(&self) -> usize {
173        self.byte_width_opt()
174            .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
175    }
176}
177
178/// Create an [`GenericListArray`] from values and offsets.
179///
180/// ```
181/// use arrow_array::{Int32Array, Int64Array, ListArray};
182/// use arrow_array::types::Int64Type;
183/// use lance_arrow::try_new_generic_list_array;
184///
185/// let offsets = Int32Array::from_iter([0, 2, 7, 10]);
186/// let int_values = Int64Array::from_iter(0..10);
187/// let list_arr = try_new_generic_list_array(int_values, &offsets).unwrap();
188/// assert_eq!(list_arr,
189///     ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
190///         Some(vec![Some(0), Some(1)]),
191///         Some(vec![Some(2), Some(3), Some(4), Some(5), Some(6)]),
192///         Some(vec![Some(7), Some(8), Some(9)]),
193/// ]))
194/// ```
195pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
196    values: T,
197    offsets: &PrimitiveArray<Offset>,
198) -> Result<GenericListArray<Offset::Native>>
199where
200    Offset::Native: OffsetSizeTrait,
201{
202    let data_type = if Offset::Native::IS_LARGE {
203        DataType::LargeList(Arc::new(Field::new(
204            "item",
205            values.data_type().clone(),
206            true,
207        )))
208    } else {
209        DataType::List(Arc::new(Field::new(
210            "item",
211            values.data_type().clone(),
212            true,
213        )))
214    };
215    let data = ArrayDataBuilder::new(data_type)
216        .len(offsets.len() - 1)
217        .add_buffer(offsets.into_data().buffers()[0].clone())
218        .add_child_data(values.into_data())
219        .build()?;
220
221    Ok(GenericListArray::from(data))
222}
223
224pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
225    DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
226}
227
228pub trait FixedSizeListArrayExt {
229    /// Create an [`FixedSizeListArray`] from values and list size.
230    ///
231    /// ```
232    /// use arrow_array::{Int64Array, FixedSizeListArray};
233    /// use arrow_array::types::Int64Type;
234    /// use lance_arrow::FixedSizeListArrayExt;
235    ///
236    /// let int_values = Int64Array::from_iter(0..10);
237    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 2).unwrap();
238    /// assert_eq!(fixed_size_list_arr,
239    ///     FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(vec![
240    ///         Some(vec![Some(0), Some(1)]),
241    ///         Some(vec![Some(2), Some(3)]),
242    ///         Some(vec![Some(4), Some(5)]),
243    ///         Some(vec![Some(6), Some(7)]),
244    ///         Some(vec![Some(8), Some(9)])
245    /// ], 2))
246    /// ```
247    fn try_new_from_values<T: Array + 'static>(
248        values: T,
249        list_size: i32,
250    ) -> Result<FixedSizeListArray>;
251
252    /// Sample `n` rows from the [FixedSizeListArray]
253    ///
254    /// ```
255    /// use arrow_array::{Int64Array, FixedSizeListArray, Array};
256    /// use lance_arrow::FixedSizeListArrayExt;
257    ///
258    /// let int_values = Int64Array::from_iter(0..256);
259    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 16).unwrap();
260    /// let sampled = fixed_size_list_arr.sample(10).unwrap();
261    /// assert_eq!(sampled.len(), 10);
262    /// assert_eq!(sampled.value_length(), 16);
263    /// assert_eq!(sampled.values().len(), 160);
264    /// ```
265    fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
266
267    /// Ensure the [FixedSizeListArray] of Float16, Float32, Float64,
268    /// Int8, Int16, Int32, Int64, UInt8, UInt32 type to its closest floating point type.
269    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
270}
271
272impl FixedSizeListArrayExt for FixedSizeListArray {
273    fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
274        let field = Arc::new(Field::new("item", values.data_type().clone(), true));
275        let values = Arc::new(values);
276
277        Self::try_new(field, list_size, values, None)
278    }
279
280    fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
281        if n >= self.len() {
282            return Ok(self.clone());
283        }
284        let mut rng = SmallRng::from_os_rng();
285        let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
286        take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
287    }
288
289    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
290        match self.data_type() {
291            DataType::FixedSizeList(field, size) => match field.data_type() {
292                DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
293                DataType::Int8 => Ok(Self::new(
294                    Arc::new(arrow_schema::Field::new(
295                        field.name(),
296                        DataType::Float32,
297                        field.is_nullable(),
298                    )),
299                    *size,
300                    Arc::new(Float32Array::from_iter_values(
301                        self.values()
302                            .as_any()
303                            .downcast_ref::<Int8Array>()
304                            .ok_or(ArrowError::ParseError(
305                                "Fail to cast primitive array to Int8Type".to_string(),
306                            ))?
307                            .into_iter()
308                            .filter_map(|x| x.map(|y| y as f32)),
309                    )),
310                    self.nulls().cloned(),
311                )),
312                DataType::Int16 => Ok(Self::new(
313                    Arc::new(arrow_schema::Field::new(
314                        field.name(),
315                        DataType::Float32,
316                        field.is_nullable(),
317                    )),
318                    *size,
319                    Arc::new(Float32Array::from_iter_values(
320                        self.values()
321                            .as_any()
322                            .downcast_ref::<Int16Array>()
323                            .ok_or(ArrowError::ParseError(
324                                "Fail to cast primitive array to Int16Type".to_string(),
325                            ))?
326                            .into_iter()
327                            .filter_map(|x| x.map(|y| y as f32)),
328                    )),
329                    self.nulls().cloned(),
330                )),
331                DataType::Int32 => Ok(Self::new(
332                    Arc::new(arrow_schema::Field::new(
333                        field.name(),
334                        DataType::Float32,
335                        field.is_nullable(),
336                    )),
337                    *size,
338                    Arc::new(Float32Array::from_iter_values(
339                        self.values()
340                            .as_any()
341                            .downcast_ref::<Int32Array>()
342                            .ok_or(ArrowError::ParseError(
343                                "Fail to cast primitive array to Int32Type".to_string(),
344                            ))?
345                            .into_iter()
346                            .filter_map(|x| x.map(|y| y as f32)),
347                    )),
348                    self.nulls().cloned(),
349                )),
350                DataType::Int64 => Ok(Self::new(
351                    Arc::new(arrow_schema::Field::new(
352                        field.name(),
353                        DataType::Float64,
354                        field.is_nullable(),
355                    )),
356                    *size,
357                    Arc::new(Float64Array::from_iter_values(
358                        self.values()
359                            .as_any()
360                            .downcast_ref::<Int64Array>()
361                            .ok_or(ArrowError::ParseError(
362                                "Fail to cast primitive array to Int64Type".to_string(),
363                            ))?
364                            .into_iter()
365                            .filter_map(|x| x.map(|y| y as f64)),
366                    )),
367                    self.nulls().cloned(),
368                )),
369                DataType::UInt8 => Ok(Self::new(
370                    Arc::new(arrow_schema::Field::new(
371                        field.name(),
372                        DataType::Float64,
373                        field.is_nullable(),
374                    )),
375                    *size,
376                    Arc::new(Float64Array::from_iter_values(
377                        self.values()
378                            .as_any()
379                            .downcast_ref::<UInt8Array>()
380                            .ok_or(ArrowError::ParseError(
381                                "Fail to cast primitive array to UInt8Type".to_string(),
382                            ))?
383                            .into_iter()
384                            .filter_map(|x| x.map(|y| y as f64)),
385                    )),
386                    self.nulls().cloned(),
387                )),
388                DataType::UInt32 => Ok(Self::new(
389                    Arc::new(arrow_schema::Field::new(
390                        field.name(),
391                        DataType::Float64,
392                        field.is_nullable(),
393                    )),
394                    *size,
395                    Arc::new(Float64Array::from_iter_values(
396                        self.values()
397                            .as_any()
398                            .downcast_ref::<UInt32Array>()
399                            .ok_or(ArrowError::ParseError(
400                                "Fail to cast primitive array to UInt32Type".to_string(),
401                            ))?
402                            .into_iter()
403                            .filter_map(|x| x.map(|y| y as f64)),
404                    )),
405                    self.nulls().cloned(),
406                )),
407                data_type => Err(ArrowError::ParseError(format!(
408                    "Expect either floating type or integer got {:?}",
409                    data_type
410                ))),
411            },
412            data_type => Err(ArrowError::ParseError(format!(
413                "Expect either FixedSizeList got {:?}",
414                data_type
415            ))),
416        }
417    }
418}
419
420/// Force downcast of an [`Array`], such as an [`ArrayRef`], to
421/// [`FixedSizeListArray`], panic'ing on failure.
422pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
423    arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
424}
425
426pub trait FixedSizeBinaryArrayExt {
427    /// Create an [`FixedSizeBinaryArray`] from values and stride.
428    ///
429    /// ```
430    /// use arrow_array::{UInt8Array, FixedSizeBinaryArray};
431    /// use arrow_array::types::UInt8Type;
432    /// use lance_arrow::FixedSizeBinaryArrayExt;
433    ///
434    /// let int_values = UInt8Array::from_iter(0..10);
435    /// let fixed_size_list_arr = FixedSizeBinaryArray::try_new_from_values(&int_values, 2).unwrap();
436    /// assert_eq!(fixed_size_list_arr,
437    ///     FixedSizeBinaryArray::from(vec![
438    ///         Some(vec![0, 1].as_slice()),
439    ///         Some(vec![2, 3].as_slice()),
440    ///         Some(vec![4, 5].as_slice()),
441    ///         Some(vec![6, 7].as_slice()),
442    ///         Some(vec![8, 9].as_slice())
443    /// ]))
444    /// ```
445    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
446}
447
448impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
449    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
450        let data_type = DataType::FixedSizeBinary(stride);
451        let data = ArrayDataBuilder::new(data_type)
452            .len(values.len() / stride as usize)
453            .add_buffer(values.into_data().buffers()[0].clone())
454            .build()?;
455        Ok(Self::from(data))
456    }
457}
458
459pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
460    arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
461}
462
463pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
464    match arr.data_type() {
465        DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
466        DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
467        _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
468    }
469}
470
471/// Extends Arrow's [RecordBatch].
472pub trait RecordBatchExt {
473    /// Append a new column to this [`RecordBatch`] and returns a new RecordBatch.
474    ///
475    /// ```
476    /// use std::sync::Arc;
477    /// use arrow_array::{RecordBatch, Int32Array, StringArray};
478    /// use arrow_schema::{Schema, Field, DataType};
479    /// use lance_arrow::*;
480    ///
481    /// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
482    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
483    /// let record_batch = RecordBatch::try_new(schema, vec![int_arr.clone()]).unwrap();
484    ///
485    /// let new_field = Field::new("s", DataType::Utf8, true);
486    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
487    /// let new_record_batch = record_batch.try_with_column(new_field, str_arr.clone()).unwrap();
488    ///
489    /// assert_eq!(
490    ///     new_record_batch,
491    ///     RecordBatch::try_new(
492    ///         Arc::new(Schema::new(
493    ///             vec![
494    ///                 Field::new("a", DataType::Int32, true),
495    ///                 Field::new("s", DataType::Utf8, true)
496    ///             ])
497    ///         ),
498    ///         vec![int_arr, str_arr],
499    ///     ).unwrap()
500    /// )
501    /// ```
502    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
503
504    /// Created a new RecordBatch with column at index.
505    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
506
507    /// Creates a new [`RecordBatch`] from the provided  [`StructArray`].
508    ///
509    /// The fields on the [`StructArray`] need to match this [`RecordBatch`] schema
510    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
511
512    /// Merge with another [`RecordBatch`] and returns a new one.
513    ///
514    /// Fields are merged based on name.  First we iterate the left columns.  If a matching
515    /// name is found in the right then we merge the two columns.  If there is no match then
516    /// we add the left column to the output.
517    ///
518    /// To merge two columns we consider the type.  If both arrays are struct arrays we recurse.
519    /// Otherwise we use the left array.
520    ///
521    /// Afterwards we add all non-matching right columns to the output.
522    ///
523    /// Note: This method likely does not handle nested fields correctly and you may want to consider
524    /// using [`Self::merge_with_schema`] instead.
525    /// ```
526    /// use std::sync::Arc;
527    /// use arrow_array::*;
528    /// use arrow_schema::{Schema, Field, DataType};
529    /// use lance_arrow::*;
530    ///
531    /// let left_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
532    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
533    /// let left = RecordBatch::try_new(left_schema, vec![int_arr.clone()]).unwrap();
534    ///
535    /// let right_schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
536    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
537    /// let right = RecordBatch::try_new(right_schema, vec![str_arr.clone()]).unwrap();
538    ///
539    /// let new_record_batch = left.merge(&right).unwrap();
540    ///
541    /// assert_eq!(
542    ///     new_record_batch,
543    ///     RecordBatch::try_new(
544    ///         Arc::new(Schema::new(
545    ///             vec![
546    ///                 Field::new("a", DataType::Int32, true),
547    ///                 Field::new("s", DataType::Utf8, true)
548    ///             ])
549    ///         ),
550    ///         vec![int_arr, str_arr],
551    ///     ).unwrap()
552    /// )
553    /// ```
554    ///
555    /// TODO: add merge nested fields support.
556    fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
557
558    /// Create a batch by merging columns between two batches with a given schema.
559    ///
560    /// A reference schema is used to determine the proper ordering of nested fields.
561    ///
562    /// For each field in the reference schema we look for corresponding fields in
563    /// the left and right batches.  If a field is found in both batches we recursively merge
564    /// it.
565    ///
566    /// If a field is only in the left or right batch we take it as it is.
567    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
568
569    /// Drop one column specified with the name and return the new [`RecordBatch`].
570    ///
571    /// If the named column does not exist, it returns a copy of this [`RecordBatch`].
572    fn drop_column(&self, name: &str) -> Result<RecordBatch>;
573
574    /// Replace a column (specified by name) and return the new [`RecordBatch`].
575    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
576
577    /// Replace a column schema (specified by name) and return the new [`RecordBatch`].
578    fn replace_column_schema_by_name(
579        &self,
580        name: &str,
581        new_data_type: DataType,
582        column: Arc<dyn Array>,
583    ) -> Result<RecordBatch>;
584
585    /// Rename a column at a given index.
586    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
587
588    /// Get (potentially nested) column by qualified name.
589    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
590
591    /// Project the schema over the [RecordBatch].
592    fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
593
594    /// metadata of the schema.
595    fn metadata(&self) -> &HashMap<String, String>;
596
597    /// Add metadata to the schema.
598    fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
599        let mut metadata = self.metadata().clone();
600        metadata.insert(key, value);
601        self.with_metadata(metadata)
602    }
603
604    /// Replace the schema metadata with the provided one.
605    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
606
607    /// Take selected rows from the [RecordBatch].
608    fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
609
610    /// Create a new RecordBatch with compacted memory after slicing.
611    fn shrink_to_fit(&self) -> Result<RecordBatch>;
612
613    /// Helper method to sort the RecordBatch by a column
614    fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<RecordBatch>;
615}
616
617impl RecordBatchExt for RecordBatch {
618    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
619        let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
620        let mut new_columns = self.columns().to_vec();
621        new_columns.push(arr);
622        Self::try_new(new_schema, new_columns)
623    }
624
625    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
626        let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
627        let mut new_columns = self.columns().to_vec();
628        new_columns.insert(index, arr);
629        Self::try_new(new_schema, new_columns)
630    }
631
632    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
633        let schema = Arc::new(Schema::new_with_metadata(
634            arr.fields().to_vec(),
635            self.schema().metadata.clone(),
636        ));
637        let batch = Self::from(arr);
638        batch.with_schema(schema)
639    }
640
641    fn merge(&self, other: &Self) -> Result<Self> {
642        if self.num_rows() != other.num_rows() {
643            return Err(ArrowError::InvalidArgumentError(format!(
644                "Attempt to merge two RecordBatch with different sizes: {} != {}",
645                self.num_rows(),
646                other.num_rows()
647            )));
648        }
649        let left_struct_array: StructArray = self.clone().into();
650        let right_struct_array: StructArray = other.clone().into();
651        self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
652    }
653
654    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
655        if self.num_rows() != other.num_rows() {
656            return Err(ArrowError::InvalidArgumentError(format!(
657                "Attempt to merge two RecordBatch with different sizes: {} != {}",
658                self.num_rows(),
659                other.num_rows()
660            )));
661        }
662        let left_struct_array: StructArray = self.clone().into();
663        let right_struct_array: StructArray = other.clone().into();
664        self.try_new_from_struct_array(merge_with_schema(
665            &left_struct_array,
666            &right_struct_array,
667            schema.fields(),
668        ))
669    }
670
671    fn drop_column(&self, name: &str) -> Result<Self> {
672        let mut fields = vec![];
673        let mut columns = vec![];
674        for i in 0..self.schema().fields.len() {
675            if self.schema().field(i).name() != name {
676                fields.push(self.schema().field(i).clone());
677                columns.push(self.column(i).clone());
678            }
679        }
680        Self::try_new(
681            Arc::new(Schema::new_with_metadata(
682                fields,
683                self.schema().metadata().clone(),
684            )),
685            columns,
686        )
687    }
688
689    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
690        let mut fields = self.schema().fields().to_vec();
691        if index >= fields.len() {
692            return Err(ArrowError::InvalidArgumentError(format!(
693                "Index out of bounds: {}",
694                index
695            )));
696        }
697        fields[index] = Arc::new(Field::new(
698            new_name,
699            fields[index].data_type().clone(),
700            fields[index].is_nullable(),
701        ));
702        Self::try_new(
703            Arc::new(Schema::new_with_metadata(
704                fields,
705                self.schema().metadata().clone(),
706            )),
707            self.columns().to_vec(),
708        )
709    }
710
711    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
712        let mut columns = self.columns().to_vec();
713        let field_i = self
714            .schema()
715            .fields()
716            .iter()
717            .position(|f| f.name() == name)
718            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
719        columns[field_i] = column;
720        Self::try_new(self.schema(), columns)
721    }
722
723    fn replace_column_schema_by_name(
724        &self,
725        name: &str,
726        new_data_type: DataType,
727        column: Arc<dyn Array>,
728    ) -> Result<RecordBatch> {
729        let fields = self
730            .schema()
731            .fields()
732            .iter()
733            .map(|x| {
734                if x.name() != name {
735                    x.clone()
736                } else {
737                    let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
738                    Arc::new(new_field)
739                }
740            })
741            .collect::<Vec<_>>();
742        let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
743        let mut columns = self.columns().to_vec();
744        let field_i = self
745            .schema()
746            .fields()
747            .iter()
748            .position(|f| f.name() == name)
749            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
750        columns[field_i] = column;
751        Self::try_new(Arc::new(schema), columns)
752    }
753
754    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
755        let split = name.split('.').collect::<Vec<_>>();
756        if split.is_empty() {
757            return None;
758        }
759
760        self.column_by_name(split[0])
761            .and_then(|arr| get_sub_array(arr, &split[1..]))
762    }
763
764    fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
765        let struct_array: StructArray = self.clone().into();
766        self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
767    }
768
769    fn metadata(&self) -> &HashMap<String, String> {
770        self.schema_ref().metadata()
771    }
772
773    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
774        let mut schema = self.schema_ref().as_ref().clone();
775        schema.metadata = metadata;
776        Self::try_new(schema.into(), self.columns().into())
777    }
778
779    fn take(&self, indices: &UInt32Array) -> Result<Self> {
780        let struct_array: StructArray = self.clone().into();
781        let taken = take(&struct_array, indices, None)?;
782        self.try_new_from_struct_array(taken.as_struct().clone())
783    }
784
785    fn shrink_to_fit(&self) -> Result<Self> {
786        // Deep copy the sliced record batch, instead of whole batch
787        crate::deepcopy::deep_copy_batch_sliced(self)
788    }
789
790    fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<Self> {
791        if column >= self.num_columns() {
792            return Err(ArrowError::InvalidArgumentError(format!(
793                "Column index out of bounds: {}",
794                column
795            )));
796        }
797        let column = self.column(column);
798        let sorted = arrow_ord::sort::sort_to_indices(column, options, None)?;
799        self.take(&sorted)
800    }
801}
802
803/// Recursively projects an array to match the target field's structure.
804/// This handles reordering fields inside nested List<Struct> types.
805fn project_array(array: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
806    match target_field.data_type() {
807        DataType::Struct(subfields) => {
808            let struct_arr = array.as_struct();
809            let projected = project(struct_arr, subfields)?;
810            Ok(Arc::new(projected))
811        }
812        DataType::List(inner_field) => {
813            let list_arr: &ListArray = array.as_list();
814            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
815            Ok(Arc::new(ListArray::new(
816                inner_field.clone(),
817                list_arr.offsets().clone(),
818                projected_values,
819                list_arr.nulls().cloned(),
820            )))
821        }
822        DataType::LargeList(inner_field) => {
823            let list_arr: &LargeListArray = array.as_list();
824            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
825            Ok(Arc::new(LargeListArray::new(
826                inner_field.clone(),
827                list_arr.offsets().clone(),
828                projected_values,
829                list_arr.nulls().cloned(),
830            )))
831        }
832        DataType::FixedSizeList(inner_field, size) => {
833            let list_arr = array.as_fixed_size_list();
834            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
835            Ok(Arc::new(FixedSizeListArray::new(
836                inner_field.clone(),
837                *size,
838                projected_values,
839                list_arr.nulls().cloned(),
840            )))
841        }
842        _ => Ok(array.clone()),
843    }
844}
845
846fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
847    if fields.is_empty() {
848        return Ok(StructArray::new_empty_fields(
849            struct_array.len(),
850            struct_array.nulls().cloned(),
851        ));
852    }
853    let mut columns: Vec<ArrayRef> = vec![];
854    for field in fields.iter() {
855        if let Some(col) = struct_array.column_by_name(field.name()) {
856            let projected = project_array(col, field.as_ref())?;
857            columns.push(projected);
858        } else {
859            return Err(ArrowError::SchemaError(format!(
860                "field {} does not exist in the RecordBatch",
861                field.name()
862            )));
863        }
864    }
865    // Preserve the struct's validity when projecting
866    StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
867}
868
869fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
870    let left_list: &GenericListArray<T> = left.as_list();
871    let right_list: &GenericListArray<T> = right.as_list();
872    left_list.offsets().inner() == right_list.offsets().inner()
873}
874
875fn merge_list_structs_helper<T: OffsetSizeTrait>(
876    left: &dyn Array,
877    right: &dyn Array,
878    items_field_name: impl Into<String>,
879    items_nullable: bool,
880) -> Arc<dyn Array> {
881    let left_list: &GenericListArray<T> = left.as_list();
882    let right_list: &GenericListArray<T> = right.as_list();
883    let left_struct = left_list.values();
884    let right_struct = right_list.values();
885    let left_struct_arr = left_struct.as_struct();
886    let right_struct_arr = right_struct.as_struct();
887    let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
888    let items_field = Arc::new(Field::new(
889        items_field_name,
890        merged_items.data_type().clone(),
891        items_nullable,
892    ));
893    Arc::new(GenericListArray::<T>::new(
894        items_field,
895        left_list.offsets().clone(),
896        merged_items,
897        left_list.nulls().cloned(),
898    ))
899}
900
901fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
902    left: &dyn Array,
903    right: &dyn Array,
904    not_null: &dyn Array,
905    items_field_name: impl Into<String>,
906) -> Arc<dyn Array> {
907    let left_list: &GenericListArray<T> = left.as_list::<T>();
908    let not_null_list = not_null.as_list::<T>();
909    let right_list = right.as_list::<T>();
910
911    let left_struct = left_list.values().as_struct();
912    let not_null_struct: &StructArray = not_null_list.values().as_struct();
913    let right_struct = right_list.values().as_struct();
914
915    let values_len = not_null_list.values().len();
916    let mut merged_fields =
917        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
918    let mut merged_columns =
919        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
920
921    for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
922        merged_fields.push(field.clone());
923        if let Some(val) = not_null_struct.column_by_name(field.name()) {
924            merged_columns.push(val.clone());
925        } else {
926            merged_columns.push(new_null_array(field.data_type(), values_len))
927        }
928    }
929    for (_, field) in right_struct
930        .columns()
931        .iter()
932        .zip(right_struct.fields())
933        .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
934    {
935        merged_fields.push(field.clone());
936        if let Some(val) = not_null_struct.column_by_name(field.name()) {
937            merged_columns.push(val.clone());
938        } else {
939            merged_columns.push(new_null_array(field.data_type(), values_len));
940        }
941    }
942
943    let merged_struct = Arc::new(StructArray::new(
944        Fields::from(merged_fields),
945        merged_columns,
946        not_null_struct.nulls().cloned(),
947    ));
948    let items_field = Arc::new(Field::new(
949        items_field_name,
950        merged_struct.data_type().clone(),
951        true,
952    ));
953    Arc::new(GenericListArray::<T>::new(
954        items_field,
955        not_null_list.offsets().clone(),
956        merged_struct,
957        not_null_list.nulls().cloned(),
958    ))
959}
960
961fn merge_list_struct_null(
962    left: &dyn Array,
963    right: &dyn Array,
964    not_null: &dyn Array,
965) -> Arc<dyn Array> {
966    match left.data_type() {
967        DataType::List(left_field) => {
968            merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
969        }
970        DataType::LargeList(left_field) => {
971            merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
972        }
973        _ => unreachable!(),
974    }
975}
976
977fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
978    // Merging fields into a list<struct<...>> is tricky and can only succeed
979    // in two ways.  First, if both lists have the same offsets.  Second, if
980    // one of the lists is all-null
981    if left.null_count() == left.len() {
982        return merge_list_struct_null(left, right, right);
983    } else if right.null_count() == right.len() {
984        return merge_list_struct_null(left, right, left);
985    }
986    match (left.data_type(), right.data_type()) {
987        (DataType::List(left_field), DataType::List(_)) => {
988            if !lists_have_same_offsets_helper::<i32>(left, right) {
989                panic!("Attempt to merge list struct arrays which do not have same offsets");
990            }
991            merge_list_structs_helper::<i32>(
992                left,
993                right,
994                left_field.name(),
995                left_field.is_nullable(),
996            )
997        }
998        (DataType::LargeList(left_field), DataType::LargeList(_)) => {
999            if !lists_have_same_offsets_helper::<i64>(left, right) {
1000                panic!("Attempt to merge list struct arrays which do not have same offsets");
1001            }
1002            merge_list_structs_helper::<i64>(
1003                left,
1004                right,
1005                left_field.name(),
1006                left_field.is_nullable(),
1007            )
1008        }
1009        _ => unreachable!(),
1010    }
1011}
1012
1013/// Helper function to normalize validity buffers
1014/// Returns None for all-null validity (placeholder structs)
1015fn normalize_validity(
1016    validity: Option<&arrow_buffer::NullBuffer>,
1017) -> Option<&arrow_buffer::NullBuffer> {
1018    validity.and_then(|v| {
1019        if v.null_count() == v.len() {
1020            None
1021        } else {
1022            Some(v)
1023        }
1024    })
1025}
1026
1027/// Helper function to merge validity buffers from two struct arrays
1028/// Returns None only if both arrays are null at the same position
1029///
1030/// Special handling for placeholder structs (all-null validity)
1031fn merge_struct_validity(
1032    left_validity: Option<&arrow_buffer::NullBuffer>,
1033    right_validity: Option<&arrow_buffer::NullBuffer>,
1034) -> Option<arrow_buffer::NullBuffer> {
1035    // Normalize both validity buffers (convert all-null to None)
1036    let left_normalized = normalize_validity(left_validity);
1037    let right_normalized = normalize_validity(right_validity);
1038
1039    match (left_normalized, right_normalized) {
1040        // Fast paths: no computation needed
1041        (None, None) => None,
1042        (Some(left), None) => Some(left.clone()),
1043        (None, Some(right)) => Some(right.clone()),
1044        (Some(left), Some(right)) => {
1045            // Fast path: if both have no nulls, can return either one
1046            if left.null_count() == 0 && right.null_count() == 0 {
1047                return Some(left.clone());
1048            }
1049
1050            let left_buffer = left.inner();
1051            let right_buffer = right.inner();
1052
1053            // Perform bitwise OR directly on BooleanBuffers
1054            // This preserves the correct semantics: 1 = valid, 0 = null
1055            let merged_buffer = left_buffer | right_buffer;
1056
1057            Some(arrow_buffer::NullBuffer::from(merged_buffer))
1058        }
1059    }
1060}
1061
1062fn merge_list_child_values(
1063    child_field: &Field,
1064    left_values: ArrayRef,
1065    right_values: ArrayRef,
1066) -> ArrayRef {
1067    match child_field.data_type() {
1068        DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1069            left_values.as_struct(),
1070            right_values.as_struct(),
1071            child_fields,
1072        )) as ArrayRef,
1073        DataType::List(grandchild) => {
1074            let left_list = left_values
1075                .as_any()
1076                .downcast_ref::<ListArray>()
1077                .expect("left list values should be ListArray");
1078            let right_list = right_values
1079                .as_any()
1080                .downcast_ref::<ListArray>()
1081                .expect("right list values should be ListArray");
1082            let merged_values = merge_list_child_values(
1083                grandchild.as_ref(),
1084                left_list.values().clone(),
1085                right_list.values().clone(),
1086            );
1087            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1088            Arc::new(ListArray::new(
1089                grandchild.clone(),
1090                left_list.offsets().clone(),
1091                merged_values,
1092                merged_validity,
1093            )) as ArrayRef
1094        }
1095        DataType::LargeList(grandchild) => {
1096            let left_list = left_values
1097                .as_any()
1098                .downcast_ref::<LargeListArray>()
1099                .expect("left list values should be LargeListArray");
1100            let right_list = right_values
1101                .as_any()
1102                .downcast_ref::<LargeListArray>()
1103                .expect("right list values should be LargeListArray");
1104            let merged_values = merge_list_child_values(
1105                grandchild.as_ref(),
1106                left_list.values().clone(),
1107                right_list.values().clone(),
1108            );
1109            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1110            Arc::new(LargeListArray::new(
1111                grandchild.clone(),
1112                left_list.offsets().clone(),
1113                merged_values,
1114                merged_validity,
1115            )) as ArrayRef
1116        }
1117        DataType::FixedSizeList(grandchild, list_size) => {
1118            let left_list = left_values
1119                .as_any()
1120                .downcast_ref::<FixedSizeListArray>()
1121                .expect("left list values should be FixedSizeListArray");
1122            let right_list = right_values
1123                .as_any()
1124                .downcast_ref::<FixedSizeListArray>()
1125                .expect("right list values should be FixedSizeListArray");
1126            let merged_values = merge_list_child_values(
1127                grandchild.as_ref(),
1128                left_list.values().clone(),
1129                right_list.values().clone(),
1130            );
1131            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1132            Arc::new(FixedSizeListArray::new(
1133                grandchild.clone(),
1134                *list_size,
1135                merged_values,
1136                merged_validity,
1137            )) as ArrayRef
1138        }
1139        _ => left_values.clone(),
1140    }
1141}
1142
1143// Helper function to adjust child array validity based on parent struct validity
1144// When parent struct is null, propagates null to child array
1145// Optimized with fast paths and SIMD operations
1146fn adjust_child_validity(
1147    child: &ArrayRef,
1148    parent_validity: Option<&arrow_buffer::NullBuffer>,
1149) -> ArrayRef {
1150    // Fast path: no parent validity means no adjustment needed
1151    let parent_validity = match parent_validity {
1152        None => return child.clone(),
1153        Some(p) if p.null_count() == 0 => return child.clone(), // No nulls to propagate
1154        Some(p) => p,
1155    };
1156
1157    let child_validity = child.nulls();
1158
1159    // Compute the new validity: child_validity AND parent_validity
1160    let new_validity = match child_validity {
1161        None => {
1162            // Fast path: child has no existing validity, just use parent's
1163            parent_validity.clone()
1164        }
1165        Some(child_nulls) => {
1166            let child_buffer = child_nulls.inner();
1167            let parent_buffer = parent_validity.inner();
1168
1169            // Perform bitwise AND directly on BooleanBuffers
1170            // This preserves the correct semantics: 1 = valid, 0 = null
1171            let merged_buffer = child_buffer & parent_buffer;
1172
1173            arrow_buffer::NullBuffer::from(merged_buffer)
1174        }
1175    };
1176
1177    // Create new array with adjusted validity
1178    arrow_array::make_array(
1179        arrow_data::ArrayData::try_new(
1180            child.data_type().clone(),
1181            child.len(),
1182            Some(new_validity.into_inner().into_inner()),
1183            child.offset(),
1184            child.to_data().buffers().to_vec(),
1185            child.to_data().child_data().to_vec(),
1186        )
1187        .unwrap(),
1188    )
1189}
1190
1191fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1192    let mut fields: Vec<Field> = vec![];
1193    let mut columns: Vec<ArrayRef> = vec![];
1194    let right_fields = right_struct_array.fields();
1195    let right_columns = right_struct_array.columns();
1196
1197    // Get the validity buffers from both structs
1198    let left_validity = left_struct_array.nulls();
1199    let right_validity = right_struct_array.nulls();
1200
1201    // Compute merged validity
1202    let merged_validity = merge_struct_validity(left_validity, right_validity);
1203
1204    // iterate through the fields on the left hand side
1205    for (left_field, left_column) in left_struct_array
1206        .fields()
1207        .iter()
1208        .zip(left_struct_array.columns().iter())
1209    {
1210        match right_fields
1211            .iter()
1212            .position(|f| f.name() == left_field.name())
1213        {
1214            // if the field exists on the right hand side, merge them recursively if appropriate
1215            Some(right_index) => {
1216                let right_field = right_fields.get(right_index).unwrap();
1217                let right_column = right_columns.get(right_index).unwrap();
1218                // if both fields are struct, merge them recursively
1219                match (left_field.data_type(), right_field.data_type()) {
1220                    (DataType::Struct(_), DataType::Struct(_)) => {
1221                        let left_sub_array = left_column.as_struct();
1222                        let right_sub_array = right_column.as_struct();
1223                        let merged_sub_array = merge(left_sub_array, right_sub_array);
1224                        fields.push(Field::new(
1225                            left_field.name(),
1226                            merged_sub_array.data_type().clone(),
1227                            left_field.is_nullable(),
1228                        ));
1229                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1230                    }
1231                    (DataType::List(left_list), DataType::List(right_list))
1232                        if left_list.data_type().is_struct()
1233                            && right_list.data_type().is_struct() =>
1234                    {
1235                        // If there is nothing to merge just use the left field
1236                        if left_list.data_type() == right_list.data_type() {
1237                            fields.push(left_field.as_ref().clone());
1238                            columns.push(left_column.clone());
1239                        }
1240                        // If we have two List<Struct> and they have different sets of fields then
1241                        // we can merge them if the offsets arrays are the same.  Otherwise, we
1242                        // have to consider it an error.
1243                        let merged_sub_array = merge_list_struct(&left_column, &right_column);
1244
1245                        fields.push(Field::new(
1246                            left_field.name(),
1247                            merged_sub_array.data_type().clone(),
1248                            left_field.is_nullable(),
1249                        ));
1250                        columns.push(merged_sub_array);
1251                    }
1252                    // otherwise, just use the field on the left hand side
1253                    _ => {
1254                        // TODO handle list-of-struct and other types
1255                        fields.push(left_field.as_ref().clone());
1256                        // Adjust the column validity: if left struct was null, propagate to child
1257                        let adjusted_column = adjust_child_validity(left_column, left_validity);
1258                        columns.push(adjusted_column);
1259                    }
1260                }
1261            }
1262            None => {
1263                fields.push(left_field.as_ref().clone());
1264                // Adjust the column validity: if left struct was null, propagate to child
1265                let adjusted_column = adjust_child_validity(left_column, left_validity);
1266                columns.push(adjusted_column);
1267            }
1268        }
1269    }
1270
1271    // now iterate through the fields on the right hand side
1272    right_fields
1273        .iter()
1274        .zip(right_columns.iter())
1275        .for_each(|(field, column)| {
1276            // add new columns on the right
1277            if !left_struct_array
1278                .fields()
1279                .iter()
1280                .any(|f| f.name() == field.name())
1281            {
1282                fields.push(field.as_ref().clone());
1283                // This field doesn't exist on the left
1284                // We use the right's column but need to adjust for struct validity
1285                let adjusted_column = adjust_child_validity(column, right_validity);
1286                columns.push(adjusted_column);
1287            }
1288        });
1289
1290    StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1291}
1292
1293fn merge_with_schema(
1294    left_struct_array: &StructArray,
1295    right_struct_array: &StructArray,
1296    fields: &Fields,
1297) -> StructArray {
1298    // Helper function that returns true if both types are struct or both are non-struct
1299    fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1300        match (left, right) {
1301            (DataType::Struct(_), DataType::Struct(_)) => true,
1302            (DataType::Struct(_), _) => false,
1303            (_, DataType::Struct(_)) => false,
1304            _ => true,
1305        }
1306    }
1307
1308    let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1309    let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1310
1311    let left_fields = left_struct_array.fields();
1312    let left_columns = left_struct_array.columns();
1313    let right_fields = right_struct_array.fields();
1314    let right_columns = right_struct_array.columns();
1315
1316    // Get the validity buffers from both structs
1317    let left_validity = left_struct_array.nulls();
1318    let right_validity = right_struct_array.nulls();
1319
1320    // Compute merged validity
1321    let merged_validity = merge_struct_validity(left_validity, right_validity);
1322
1323    for field in fields {
1324        let left_match_idx = left_fields.iter().position(|f| {
1325            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1326        });
1327        let right_match_idx = right_fields.iter().position(|f| {
1328            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1329        });
1330
1331        match (left_match_idx, right_match_idx) {
1332            (None, Some(right_idx)) => {
1333                output_fields.push(right_fields[right_idx].as_ref().clone());
1334                // Adjust validity if the right struct was null
1335                let adjusted_column =
1336                    adjust_child_validity(&right_columns[right_idx], right_validity);
1337                columns.push(adjusted_column);
1338            }
1339            (Some(left_idx), None) => {
1340                output_fields.push(left_fields[left_idx].as_ref().clone());
1341                // Adjust validity if the left struct was null
1342                let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1343                columns.push(adjusted_column);
1344            }
1345            (Some(left_idx), Some(right_idx)) => {
1346                match field.data_type() {
1347                    DataType::Struct(child_fields) => {
1348                        let left_sub_array = left_columns[left_idx].as_struct();
1349                        let right_sub_array = right_columns[right_idx].as_struct();
1350                        let merged_sub_array =
1351                            merge_with_schema(left_sub_array, right_sub_array, child_fields);
1352                        output_fields.push(Field::new(
1353                            field.name(),
1354                            merged_sub_array.data_type().clone(),
1355                            field.is_nullable(),
1356                        ));
1357                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1358                    }
1359                    DataType::List(child_field) => {
1360                        let left_list = left_columns[left_idx]
1361                            .as_any()
1362                            .downcast_ref::<ListArray>()
1363                            .unwrap();
1364                        let right_list = right_columns[right_idx]
1365                            .as_any()
1366                            .downcast_ref::<ListArray>()
1367                            .unwrap();
1368                        let merged_values = merge_list_child_values(
1369                            child_field.as_ref(),
1370                            left_list.trimmed_values(),
1371                            right_list.trimmed_values(),
1372                        );
1373                        let merged_validity =
1374                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1375                        let merged_list = ListArray::new(
1376                            child_field.clone(),
1377                            left_list.offsets().clone(),
1378                            merged_values,
1379                            merged_validity,
1380                        );
1381                        output_fields.push(field.as_ref().clone());
1382                        columns.push(Arc::new(merged_list) as ArrayRef);
1383                    }
1384                    DataType::LargeList(child_field) => {
1385                        let left_list = left_columns[left_idx]
1386                            .as_any()
1387                            .downcast_ref::<LargeListArray>()
1388                            .unwrap();
1389                        let right_list = right_columns[right_idx]
1390                            .as_any()
1391                            .downcast_ref::<LargeListArray>()
1392                            .unwrap();
1393                        let merged_values = merge_list_child_values(
1394                            child_field.as_ref(),
1395                            left_list.trimmed_values(),
1396                            right_list.trimmed_values(),
1397                        );
1398                        let merged_validity =
1399                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1400                        let merged_list = LargeListArray::new(
1401                            child_field.clone(),
1402                            left_list.offsets().clone(),
1403                            merged_values,
1404                            merged_validity,
1405                        );
1406                        output_fields.push(field.as_ref().clone());
1407                        columns.push(Arc::new(merged_list) as ArrayRef);
1408                    }
1409                    DataType::FixedSizeList(child_field, list_size) => {
1410                        let left_list = left_columns[left_idx]
1411                            .as_any()
1412                            .downcast_ref::<FixedSizeListArray>()
1413                            .unwrap();
1414                        let right_list = right_columns[right_idx]
1415                            .as_any()
1416                            .downcast_ref::<FixedSizeListArray>()
1417                            .unwrap();
1418                        let merged_values = merge_list_child_values(
1419                            child_field.as_ref(),
1420                            left_list.values().clone(),
1421                            right_list.values().clone(),
1422                        );
1423                        let merged_validity =
1424                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1425                        let merged_list = FixedSizeListArray::new(
1426                            child_field.clone(),
1427                            *list_size,
1428                            merged_values,
1429                            merged_validity,
1430                        );
1431                        output_fields.push(field.as_ref().clone());
1432                        columns.push(Arc::new(merged_list) as ArrayRef);
1433                    }
1434                    _ => {
1435                        output_fields.push(left_fields[left_idx].as_ref().clone());
1436                        // For fields that exist in both, use left but adjust validity
1437                        let adjusted_column =
1438                            adjust_child_validity(&left_columns[left_idx], left_validity);
1439                        columns.push(adjusted_column);
1440                    }
1441                }
1442            }
1443            (None, None) => {
1444                // The field will not be included in the output
1445            }
1446        }
1447    }
1448
1449    StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1450}
1451
1452fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1453    if components.is_empty() {
1454        return Some(array);
1455    }
1456    if !matches!(array.data_type(), DataType::Struct(_)) {
1457        return None;
1458    }
1459    let struct_arr = array.as_struct();
1460    struct_arr
1461        .column_by_name(components[0])
1462        .and_then(|arr| get_sub_array(arr, &components[1..]))
1463}
1464
1465/// Interleave multiple RecordBatches into a single RecordBatch.
1466///
1467/// Behaves like [`arrow_select::interleave::interleave`], but for RecordBatches.
1468pub fn interleave_batches(
1469    batches: &[RecordBatch],
1470    indices: &[(usize, usize)],
1471) -> Result<RecordBatch> {
1472    let first_batch = batches.first().ok_or_else(|| {
1473        ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1474    })?;
1475    let schema = first_batch.schema();
1476    let num_columns = first_batch.num_columns();
1477    let mut columns = Vec::with_capacity(num_columns);
1478    let mut chunks = Vec::with_capacity(batches.len());
1479
1480    for i in 0..num_columns {
1481        for batch in batches {
1482            chunks.push(batch.column(i).as_ref());
1483        }
1484        let new_column = interleave(&chunks, indices)?;
1485        columns.push(new_column);
1486        chunks.clear();
1487    }
1488
1489    RecordBatch::try_new(schema, columns)
1490}
1491
1492pub trait BufferExt {
1493    /// Create an `arrow_buffer::Buffer`` from a `bytes::Bytes` object
1494    ///
1495    /// The alignment must be specified (as `bytes_per_value`) since we want to make
1496    /// sure we can safely reinterpret the buffer.
1497    ///
1498    /// If the buffer is properly aligned this will be zero-copy.  If not, a copy
1499    /// will be made and an owned buffer returned.
1500    ///
1501    /// If `bytes_per_value` is not a power of two, then we assume the buffer is
1502    /// never going to be reinterpreted into another type and we can safely
1503    /// ignore the alignment.
1504    ///
1505    /// Yes, the method name is odd.  It's because there is already a `from_bytes`
1506    /// which converts from `arrow_buffer::bytes::Bytes` (not `bytes::Bytes`)
1507    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1508
1509    /// Allocates a new properly aligned arrow buffer and copies `bytes` into it
1510    ///
1511    /// `size_bytes` can be larger than `bytes` and, if so, the trailing bytes will
1512    /// be zeroed out.
1513    ///
1514    /// # Panics
1515    ///
1516    /// Panics if `size_bytes` is less than `bytes.len()`
1517    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1518}
1519
1520fn is_pwr_two(n: u64) -> bool {
1521    n & (n - 1) == 0
1522}
1523
1524impl BufferExt for arrow_buffer::Buffer {
1525    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1526        if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1527        {
1528            // The original buffer is not aligned, cannot zero-copy
1529            let size_bytes = bytes.len();
1530            Self::copy_bytes_bytes(bytes, size_bytes)
1531        } else {
1532            // The original buffer is aligned, can zero-copy
1533            // SAFETY: the alignment is correct we can make this conversion
1534            unsafe {
1535                Self::from_custom_allocation(
1536                    NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1537                    bytes.len(),
1538                    Arc::new(bytes),
1539                )
1540            }
1541        }
1542    }
1543
1544    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1545        assert!(size_bytes >= bytes.len());
1546        let mut buf = MutableBuffer::with_capacity(size_bytes);
1547        let to_fill = size_bytes - bytes.len();
1548        buf.extend(bytes);
1549        buf.extend(std::iter::repeat_n(0_u8, to_fill));
1550
1551        // FIX for issue #4512: Shrink buffer to actual size before converting to immutable
1552        // This reduces memory overhead from capacity over-allocation
1553        buf.shrink_to_fit();
1554
1555        Self::from(buf)
1556    }
1557}
1558
1559#[cfg(test)]
1560mod tests {
1561    use super::*;
1562    use arrow_array::{Float32Array, Int32Array, StructArray};
1563    use arrow_array::{ListArray, StringArray, new_empty_array, new_null_array};
1564    use arrow_buffer::OffsetBuffer;
1565
1566    #[test]
1567    fn test_merge_recursive() {
1568        let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1569        let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1570        let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1571        let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1572
1573        let left_schema = Schema::new(vec![
1574            Field::new("a", DataType::Int32, true),
1575            Field::new(
1576                "b",
1577                DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1578                true,
1579            ),
1580        ]);
1581        let left_batch = RecordBatch::try_new(
1582            Arc::new(left_schema),
1583            vec![
1584                Arc::new(a_array.clone()),
1585                Arc::new(StructArray::from(vec![(
1586                    Arc::new(Field::new("c", DataType::Int32, true)),
1587                    Arc::new(c_array.clone()) as ArrayRef,
1588                )])),
1589            ],
1590        )
1591        .unwrap();
1592
1593        let right_schema = Schema::new(vec![
1594            Field::new("e", DataType::Int32, true),
1595            Field::new(
1596                "b",
1597                DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1598                true,
1599            ),
1600        ]);
1601        let right_batch = RecordBatch::try_new(
1602            Arc::new(right_schema),
1603            vec![
1604                Arc::new(e_array.clone()),
1605                Arc::new(StructArray::from(vec![(
1606                    Arc::new(Field::new("d", DataType::Utf8, true)),
1607                    Arc::new(d_array.clone()) as ArrayRef,
1608                )])) as ArrayRef,
1609            ],
1610        )
1611        .unwrap();
1612
1613        let merged_schema = Schema::new(vec![
1614            Field::new("a", DataType::Int32, true),
1615            Field::new(
1616                "b",
1617                DataType::Struct(
1618                    vec![
1619                        Field::new("c", DataType::Int32, true),
1620                        Field::new("d", DataType::Utf8, true),
1621                    ]
1622                    .into(),
1623                ),
1624                true,
1625            ),
1626            Field::new("e", DataType::Int32, true),
1627        ]);
1628        let merged_batch = RecordBatch::try_new(
1629            Arc::new(merged_schema),
1630            vec![
1631                Arc::new(a_array) as ArrayRef,
1632                Arc::new(StructArray::from(vec![
1633                    (
1634                        Arc::new(Field::new("c", DataType::Int32, true)),
1635                        Arc::new(c_array) as ArrayRef,
1636                    ),
1637                    (
1638                        Arc::new(Field::new("d", DataType::Utf8, true)),
1639                        Arc::new(d_array) as ArrayRef,
1640                    ),
1641                ])) as ArrayRef,
1642                Arc::new(e_array) as ArrayRef,
1643            ],
1644        )
1645        .unwrap();
1646
1647        let result = left_batch.merge(&right_batch).unwrap();
1648        assert_eq!(result, merged_batch);
1649    }
1650
1651    #[test]
1652    fn test_merge_with_schema() {
1653        fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1654            let fields: Fields = names
1655                .iter()
1656                .zip(types)
1657                .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1658                .collect();
1659            let schema = Schema::new(vec![Field::new(
1660                "struct",
1661                DataType::Struct(fields.clone()),
1662                false,
1663            )]);
1664            let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1665            let batch = RecordBatch::try_new(
1666                Arc::new(schema.clone()),
1667                vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1668            );
1669            (schema, batch.unwrap())
1670        }
1671
1672        let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1673        let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1674        let (output_schema, _) = test_batch(
1675            &["b", "a", "c"],
1676            &[DataType::Int64, DataType::Int32, DataType::Int32],
1677        );
1678
1679        // If we use merge_with_schema the schema is respected
1680        let merged = left_batch
1681            .merge_with_schema(&right_batch, &output_schema)
1682            .unwrap();
1683        assert_eq!(merged.schema().as_ref(), &output_schema);
1684
1685        // If we use merge we get first-come first-serve based on the left batch
1686        let (naive_schema, _) = test_batch(
1687            &["a", "b", "c"],
1688            &[DataType::Int32, DataType::Int64, DataType::Int32],
1689        );
1690        let merged = left_batch.merge(&right_batch).unwrap();
1691        assert_eq!(merged.schema().as_ref(), &naive_schema);
1692    }
1693
1694    #[test]
1695    fn test_merge_list_struct() {
1696        let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1697        let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1698        let x_struct_field = Arc::new(Field::new(
1699            "item",
1700            DataType::Struct(Fields::from(vec![x_field.clone()])),
1701            true,
1702        ));
1703        let y_struct_field = Arc::new(Field::new(
1704            "item",
1705            DataType::Struct(Fields::from(vec![y_field.clone()])),
1706            true,
1707        ));
1708        let both_struct_field = Arc::new(Field::new(
1709            "item",
1710            DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1711            true,
1712        ));
1713        let left_schema = Schema::new(vec![Field::new(
1714            "list_struct",
1715            DataType::List(x_struct_field.clone()),
1716            true,
1717        )]);
1718        let right_schema = Schema::new(vec![Field::new(
1719            "list_struct",
1720            DataType::List(y_struct_field.clone()),
1721            true,
1722        )]);
1723        let both_schema = Schema::new(vec![Field::new(
1724            "list_struct",
1725            DataType::List(both_struct_field.clone()),
1726            true,
1727        )]);
1728
1729        let x = Arc::new(Int32Array::from(vec![1]));
1730        let y = Arc::new(Int32Array::from(vec![2]));
1731        let x_struct = Arc::new(StructArray::new(
1732            Fields::from(vec![x_field.clone()]),
1733            vec![x.clone()],
1734            None,
1735        ));
1736        let y_struct = Arc::new(StructArray::new(
1737            Fields::from(vec![y_field.clone()]),
1738            vec![y.clone()],
1739            None,
1740        ));
1741        let both_struct = Arc::new(StructArray::new(
1742            Fields::from(vec![x_field.clone(), y_field.clone()]),
1743            vec![x.clone(), y],
1744            None,
1745        ));
1746        let both_null_struct = Arc::new(StructArray::new(
1747            Fields::from(vec![x_field, y_field]),
1748            vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1749            None,
1750        ));
1751        let offsets = OffsetBuffer::from_lengths([1]);
1752        let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1753        let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1754        let both_list = ListArray::new(
1755            both_struct_field.clone(),
1756            offsets.clone(),
1757            both_struct,
1758            None,
1759        );
1760        let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1761        let x_batch =
1762            RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1763        let y_batch = RecordBatch::try_new(
1764            Arc::new(right_schema.clone()),
1765            vec![Arc::new(y_s_list.clone())],
1766        )
1767        .unwrap();
1768        let merged = x_batch.merge(&y_batch).unwrap();
1769        let expected =
1770            RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1771        assert_eq!(merged, expected);
1772
1773        let y_null_list = new_null_array(y_s_list.data_type(), 1);
1774        let y_null_batch =
1775            RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1776                .unwrap();
1777        let expected =
1778            RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1779        let merged = x_batch.merge(&y_null_batch).unwrap();
1780        assert_eq!(merged, expected);
1781    }
1782
1783    #[test]
1784    fn test_byte_width_opt() {
1785        assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1786        assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1787        assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1788        assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1789        assert_eq!(DataType::Utf8.byte_width_opt(), None);
1790        assert_eq!(DataType::Binary.byte_width_opt(), None);
1791        assert_eq!(
1792            DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1793            None
1794        );
1795        assert_eq!(
1796            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1797                .byte_width_opt(),
1798            Some(12)
1799        );
1800        assert_eq!(
1801            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1802                .byte_width_opt(),
1803            Some(16)
1804        );
1805        assert_eq!(
1806            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1807                .byte_width_opt(),
1808            None
1809        );
1810    }
1811
1812    #[test]
1813    fn test_take_record_batch() {
1814        let schema = Arc::new(Schema::new(vec![
1815            Field::new("a", DataType::Int32, true),
1816            Field::new("b", DataType::Utf8, true),
1817        ]));
1818        let batch = RecordBatch::try_new(
1819            schema.clone(),
1820            vec![
1821                Arc::new(Int32Array::from_iter_values(0..20)),
1822                Arc::new(StringArray::from_iter_values(
1823                    (0..20).map(|i| format!("str-{}", i)),
1824                )),
1825            ],
1826        )
1827        .unwrap();
1828        let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1829        assert_eq!(
1830            taken,
1831            RecordBatch::try_new(
1832                schema,
1833                vec![
1834                    Arc::new(Int32Array::from(vec![1, 5, 10])),
1835                    Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1836                ],
1837            )
1838            .unwrap()
1839        )
1840    }
1841
1842    #[test]
1843    fn test_schema_project_by_schema() {
1844        let metadata = [("key".to_string(), "value".to_string())];
1845        let schema = Arc::new(
1846            Schema::new(vec![
1847                Field::new("a", DataType::Int32, true),
1848                Field::new("b", DataType::Utf8, true),
1849            ])
1850            .with_metadata(metadata.clone().into()),
1851        );
1852        let batch = RecordBatch::try_new(
1853            schema,
1854            vec![
1855                Arc::new(Int32Array::from_iter_values(0..20)),
1856                Arc::new(StringArray::from_iter_values(
1857                    (0..20).map(|i| format!("str-{}", i)),
1858                )),
1859            ],
1860        )
1861        .unwrap();
1862
1863        // Empty schema
1864        let empty_schema = Schema::empty();
1865        let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1866        let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1867        assert_eq!(
1868            empty_projected,
1869            RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1870                .with_schema(Arc::new(expected_schema))
1871                .unwrap()
1872        );
1873
1874        // Re-ordered schema
1875        let reordered_schema = Schema::new(vec![
1876            Field::new("b", DataType::Utf8, true),
1877            Field::new("a", DataType::Int32, true),
1878        ]);
1879        let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1880        let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1881        assert_eq!(
1882            reordered_projected,
1883            RecordBatch::try_new(
1884                expected_schema,
1885                vec![
1886                    Arc::new(StringArray::from_iter_values(
1887                        (0..20).map(|i| format!("str-{}", i)),
1888                    )),
1889                    Arc::new(Int32Array::from_iter_values(0..20)),
1890                ],
1891            )
1892            .unwrap()
1893        );
1894
1895        // Sub schema
1896        let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1897        let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1898        let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1899        assert_eq!(
1900            sub_projected,
1901            RecordBatch::try_new(
1902                expected_schema,
1903                vec![Arc::new(Int32Array::from_iter_values(0..20))],
1904            )
1905            .unwrap()
1906        );
1907    }
1908
1909    #[test]
1910    fn test_project_preserves_struct_validity() {
1911        // Test that projecting a struct array preserves its validity (fix for issue #4385)
1912        let fields = Fields::from(vec![
1913            Field::new("id", DataType::Int32, false),
1914            Field::new("value", DataType::Float32, true),
1915        ]);
1916
1917        // Create a struct array with validity
1918        let id_array = Int32Array::from(vec![1, 2, 3]);
1919        let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1920        let struct_array = StructArray::new(
1921            fields.clone(),
1922            vec![
1923                Arc::new(id_array) as ArrayRef,
1924                Arc::new(value_array) as ArrayRef,
1925            ],
1926            Some(vec![true, false, true].into()), // Second struct is null
1927        );
1928
1929        // Project the struct array
1930        let projected = project(&struct_array, &fields).unwrap();
1931
1932        // Verify the validity is preserved
1933        assert_eq!(projected.null_count(), 1);
1934        assert!(!projected.is_null(0));
1935        assert!(projected.is_null(1));
1936        assert!(!projected.is_null(2));
1937    }
1938
1939    #[test]
1940    fn test_merge_struct_with_different_validity() {
1941        // Test case from Weston's review comment
1942        // File 1 has height field with some nulls
1943        let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1944        let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1945        let left_struct = StructArray::new(
1946            left_fields,
1947            vec![Arc::new(height_array) as ArrayRef],
1948            Some(vec![true, false, true, false].into()), // Rows 2 and 4 are null structs
1949        );
1950
1951        // File 2 has width field with some nulls
1952        let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1953        let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1954        let right_struct = StructArray::new(
1955            right_fields,
1956            vec![Arc::new(width_array) as ArrayRef],
1957            Some(vec![true, true, false, false].into()), // Rows 3 and 4 are null structs
1958        );
1959
1960        // Merge the two structs
1961        let merged = merge(&left_struct, &right_struct);
1962
1963        // Expected:
1964        // Row 1: both non-null -> {width: 300, height: 500}
1965        // Row 2: left null, right non-null -> {width: 200, height: null}
1966        // Row 3: left non-null, right null -> {width: null, height: 600}
1967        // Row 4: both null -> null struct
1968
1969        assert_eq!(merged.null_count(), 1); // Only row 4 is null
1970        assert!(!merged.is_null(0));
1971        assert!(!merged.is_null(1));
1972        assert!(!merged.is_null(2));
1973        assert!(merged.is_null(3));
1974
1975        // Check field values
1976        let height_col = merged.column_by_name("height").unwrap();
1977        let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1978        assert_eq!(height_values.value(0), 500);
1979        assert!(height_values.is_null(1)); // height is null when left struct was null
1980        assert_eq!(height_values.value(2), 600);
1981
1982        let width_col = merged.column_by_name("width").unwrap();
1983        let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1984        assert_eq!(width_values.value(0), 300);
1985        assert_eq!(width_values.value(1), 200);
1986        assert!(width_values.is_null(2)); // width is null when right struct was null
1987    }
1988
1989    #[test]
1990    fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
1991        // left_list setup
1992        let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
1993        let left_count = Arc::new(Int32Array::from(vec![None, None]));
1994        let left_struct = Arc::new(StructArray::new(
1995            Fields::from(vec![
1996                Field::new("company_id", DataType::Int32, true),
1997                Field::new("count", DataType::Int32, true),
1998            ]),
1999            vec![left_company_id, left_count],
2000            None,
2001        ));
2002        let left_list = Arc::new(ListArray::new(
2003            Arc::new(Field::new(
2004                "item",
2005                DataType::Struct(left_struct.fields().clone()),
2006                true,
2007            )),
2008            OffsetBuffer::from_lengths([2]),
2009            left_struct,
2010            None,
2011        ));
2012
2013        // Right List Setup
2014        let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
2015        let right_struct = Arc::new(StructArray::new(
2016            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2017            vec![right_company_name],
2018            None,
2019        ));
2020        let right_list = Arc::new(ListArray::new(
2021            Arc::new(Field::new(
2022                "item",
2023                DataType::Struct(right_struct.fields().clone()),
2024                true,
2025            )),
2026            OffsetBuffer::from_lengths([2]),
2027            right_struct,
2028            None,
2029        ));
2030
2031        let target_fields = Fields::from(vec![Field::new(
2032            "companies",
2033            DataType::List(Arc::new(Field::new(
2034                "item",
2035                DataType::Struct(Fields::from(vec![
2036                    Field::new("company_id", DataType::Int32, true),
2037                    Field::new("company_name", DataType::Utf8, true),
2038                    Field::new("count", DataType::Int32, true),
2039                ])),
2040                true,
2041            ))),
2042            true,
2043        )]);
2044
2045        let left_batch = RecordBatch::try_new(
2046            Arc::new(Schema::new(vec![Field::new(
2047                "companies",
2048                left_list.data_type().clone(),
2049                true,
2050            )])),
2051            vec![left_list as ArrayRef],
2052        )
2053        .unwrap();
2054
2055        let right_batch = RecordBatch::try_new(
2056            Arc::new(Schema::new(vec![Field::new(
2057                "companies",
2058                right_list.data_type().clone(),
2059                true,
2060            )])),
2061            vec![right_list as ArrayRef],
2062        )
2063        .unwrap();
2064
2065        let merged = left_batch
2066            .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2067            .unwrap();
2068
2069        // Verify the merged structure
2070        let merged_list = merged
2071            .column_by_name("companies")
2072            .unwrap()
2073            .as_any()
2074            .downcast_ref::<ListArray>()
2075            .unwrap();
2076        let merged_struct = merged_list.values().as_struct();
2077
2078        // Should have all 3 fields
2079        assert_eq!(merged_struct.num_columns(), 3);
2080        assert!(merged_struct.column_by_name("company_id").is_some());
2081        assert!(merged_struct.column_by_name("company_name").is_some());
2082        assert!(merged_struct.column_by_name("count").is_some());
2083
2084        // Verify values
2085        let company_id = merged_struct
2086            .column_by_name("company_id")
2087            .unwrap()
2088            .as_any()
2089            .downcast_ref::<Int32Array>()
2090            .unwrap();
2091        assert!(company_id.is_null(0));
2092        assert!(company_id.is_null(1));
2093
2094        let company_name = merged_struct
2095            .column_by_name("company_name")
2096            .unwrap()
2097            .as_any()
2098            .downcast_ref::<StringArray>()
2099            .unwrap();
2100        assert_eq!(company_name.value(0), "Google");
2101        assert_eq!(company_name.value(1), "Microsoft");
2102
2103        let count = merged_struct
2104            .column_by_name("count")
2105            .unwrap()
2106            .as_any()
2107            .downcast_ref::<Int32Array>()
2108            .unwrap();
2109        assert!(count.is_null(0));
2110        assert!(count.is_null(1));
2111    }
2112
2113    #[test]
2114    fn test_merge_struct_lists() {
2115        test_merge_struct_lists_generic::<i32>();
2116    }
2117
2118    #[test]
2119    fn test_merge_struct_large_lists() {
2120        test_merge_struct_lists_generic::<i64>();
2121    }
2122
2123    fn test_merge_struct_lists_generic<O: OffsetSizeTrait>() {
2124        // left_list setup
2125        let left_company_id = Arc::new(Int32Array::from(vec![
2126            Some(1),
2127            Some(2),
2128            Some(3),
2129            Some(4),
2130            Some(5),
2131            Some(6),
2132            Some(7),
2133            Some(8),
2134            Some(9),
2135            Some(10),
2136            Some(11),
2137            Some(12),
2138            Some(13),
2139            Some(14),
2140            Some(15),
2141            Some(16),
2142            Some(17),
2143            Some(18),
2144            Some(19),
2145            Some(20),
2146        ]));
2147        let left_count = Arc::new(Int32Array::from(vec![
2148            Some(10),
2149            Some(20),
2150            Some(30),
2151            Some(40),
2152            Some(50),
2153            Some(60),
2154            Some(70),
2155            Some(80),
2156            Some(90),
2157            Some(100),
2158            Some(110),
2159            Some(120),
2160            Some(130),
2161            Some(140),
2162            Some(150),
2163            Some(160),
2164            Some(170),
2165            Some(180),
2166            Some(190),
2167            Some(200),
2168        ]));
2169        let left_struct = Arc::new(StructArray::new(
2170            Fields::from(vec![
2171                Field::new("company_id", DataType::Int32, true),
2172                Field::new("count", DataType::Int32, true),
2173            ]),
2174            vec![left_company_id, left_count],
2175            None,
2176        ));
2177
2178        let left_list = Arc::new(GenericListArray::<O>::new(
2179            Arc::new(Field::new(
2180                "item",
2181                DataType::Struct(left_struct.fields().clone()),
2182                true,
2183            )),
2184            OffsetBuffer::from_lengths([3, 1]),
2185            left_struct.clone(),
2186            None,
2187        ));
2188
2189        let left_list_struct = Arc::new(StructArray::new(
2190            Fields::from(vec![Field::new(
2191                "companies",
2192                if O::IS_LARGE {
2193                    DataType::LargeList(Arc::new(Field::new(
2194                        "item",
2195                        DataType::Struct(left_struct.fields().clone()),
2196                        true,
2197                    )))
2198                } else {
2199                    DataType::List(Arc::new(Field::new(
2200                        "item",
2201                        DataType::Struct(left_struct.fields().clone()),
2202                        true,
2203                    )))
2204                },
2205                true,
2206            )]),
2207            vec![left_list as ArrayRef],
2208            None,
2209        ));
2210
2211        // right_list setup
2212        let right_company_name = Arc::new(StringArray::from(vec![
2213            "Google",
2214            "Microsoft",
2215            "Apple",
2216            "Facebook",
2217        ]));
2218        let right_struct = Arc::new(StructArray::new(
2219            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2220            vec![right_company_name],
2221            None,
2222        ));
2223        let right_list = Arc::new(GenericListArray::<O>::new(
2224            Arc::new(Field::new(
2225                "item",
2226                DataType::Struct(right_struct.fields().clone()),
2227                true,
2228            )),
2229            OffsetBuffer::from_lengths([3, 1]),
2230            right_struct.clone(),
2231            None,
2232        ));
2233
2234        let right_list_struct = Arc::new(StructArray::new(
2235            Fields::from(vec![Field::new(
2236                "companies",
2237                if O::IS_LARGE {
2238                    DataType::LargeList(Arc::new(Field::new(
2239                        "item",
2240                        DataType::Struct(right_struct.fields().clone()),
2241                        true,
2242                    )))
2243                } else {
2244                    DataType::List(Arc::new(Field::new(
2245                        "item",
2246                        DataType::Struct(right_struct.fields().clone()),
2247                        true,
2248                    )))
2249                },
2250                true,
2251            )]),
2252            vec![right_list as ArrayRef],
2253            None,
2254        ));
2255
2256        // prepare schema
2257        let target_fields = Fields::from(vec![Field::new(
2258            "companies",
2259            if O::IS_LARGE {
2260                DataType::LargeList(Arc::new(Field::new(
2261                    "item",
2262                    DataType::Struct(Fields::from(vec![
2263                        Field::new("company_id", DataType::Int32, true),
2264                        Field::new("company_name", DataType::Utf8, true),
2265                        Field::new("count", DataType::Int32, true),
2266                    ])),
2267                    true,
2268                )))
2269            } else {
2270                DataType::List(Arc::new(Field::new(
2271                    "item",
2272                    DataType::Struct(Fields::from(vec![
2273                        Field::new("company_id", DataType::Int32, true),
2274                        Field::new("company_name", DataType::Utf8, true),
2275                        Field::new("count", DataType::Int32, true),
2276                    ])),
2277                    true,
2278                )))
2279            },
2280            true,
2281        )]);
2282
2283        // merge left_list and right_list
2284        let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
2285        assert_eq!(merged_array.len(), 2);
2286    }
2287
2288    #[test]
2289    fn test_project_by_schema_list_struct_reorder() {
2290        // Test that project_by_schema correctly reorders fields inside List<Struct>
2291        // This is a regression test for issue #5702
2292
2293        // Source schema with inner struct fields in order: c, b, a
2294        let source_inner_struct = DataType::Struct(Fields::from(vec![
2295            Field::new("c", DataType::Utf8, true),
2296            Field::new("b", DataType::Utf8, true),
2297            Field::new("a", DataType::Utf8, true),
2298        ]));
2299        let source_schema = Arc::new(Schema::new(vec![
2300            Field::new("id", DataType::Int32, false),
2301            Field::new(
2302                "data",
2303                DataType::List(Arc::new(Field::new(
2304                    "item",
2305                    source_inner_struct.clone(),
2306                    true,
2307                ))),
2308                true,
2309            ),
2310        ]));
2311
2312        // Create source data with c, b, a order
2313        let c_array = StringArray::from(vec!["c1", "c2"]);
2314        let b_array = StringArray::from(vec!["b1", "b2"]);
2315        let a_array = StringArray::from(vec!["a1", "a2"]);
2316        let inner_struct = StructArray::from(vec![
2317            (
2318                Arc::new(Field::new("c", DataType::Utf8, true)),
2319                Arc::new(c_array) as ArrayRef,
2320            ),
2321            (
2322                Arc::new(Field::new("b", DataType::Utf8, true)),
2323                Arc::new(b_array) as ArrayRef,
2324            ),
2325            (
2326                Arc::new(Field::new("a", DataType::Utf8, true)),
2327                Arc::new(a_array) as ArrayRef,
2328            ),
2329        ]);
2330
2331        let list_array = ListArray::new(
2332            Arc::new(Field::new("item", source_inner_struct, true)),
2333            OffsetBuffer::from_lengths([1, 1]),
2334            Arc::new(inner_struct),
2335            None,
2336        );
2337
2338        let batch = RecordBatch::try_new(
2339            source_schema,
2340            vec![Arc::new(Int32Array::from(vec![1, 2])), Arc::new(list_array)],
2341        )
2342        .unwrap();
2343
2344        // Target schema with inner struct fields in order: a, b, c
2345        let target_inner_struct = DataType::Struct(Fields::from(vec![
2346            Field::new("a", DataType::Utf8, true),
2347            Field::new("b", DataType::Utf8, true),
2348            Field::new("c", DataType::Utf8, true),
2349        ]));
2350        let target_schema = Schema::new(vec![
2351            Field::new("id", DataType::Int32, false),
2352            Field::new(
2353                "data",
2354                DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2355                true,
2356            ),
2357        ]);
2358
2359        // Project should reorder the inner struct fields
2360        let projected = batch.project_by_schema(&target_schema).unwrap();
2361
2362        // Verify the schema is correct
2363        assert_eq!(projected.schema().as_ref(), &target_schema);
2364
2365        // Verify the data is correct by checking inner struct field order
2366        let projected_list = projected.column(1).as_list::<i32>();
2367        let projected_struct = projected_list.values().as_struct();
2368
2369        // Fields should now be in order: a, b, c
2370        assert_eq!(
2371            projected_struct.column_by_name("a").unwrap().as_ref(),
2372            &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2373        );
2374        assert_eq!(
2375            projected_struct.column_by_name("b").unwrap().as_ref(),
2376            &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2377        );
2378        assert_eq!(
2379            projected_struct.column_by_name("c").unwrap().as_ref(),
2380            &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2381        );
2382
2383        // Also verify positional access matches expected order (a=0, b=1, c=2)
2384        assert_eq!(
2385            projected_struct.column(0).as_ref(),
2386            &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2387        );
2388        assert_eq!(
2389            projected_struct.column(1).as_ref(),
2390            &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2391        );
2392        assert_eq!(
2393            projected_struct.column(2).as_ref(),
2394            &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2395        );
2396    }
2397
2398    #[test]
2399    fn test_project_by_schema_nested_list_struct() {
2400        // Test deeply nested List<Struct<List<Struct>>> projection
2401        let inner_struct = DataType::Struct(Fields::from(vec![
2402            Field::new("y", DataType::Int32, true),
2403            Field::new("x", DataType::Int32, true),
2404        ]));
2405        let source_schema = Arc::new(Schema::new(vec![Field::new(
2406            "outer",
2407            DataType::List(Arc::new(Field::new(
2408                "item",
2409                DataType::Struct(Fields::from(vec![
2410                    Field::new("b", DataType::Utf8, true),
2411                    Field::new(
2412                        "inner_list",
2413                        DataType::List(Arc::new(Field::new("item", inner_struct.clone(), true))),
2414                        true,
2415                    ),
2416                    Field::new("a", DataType::Utf8, true),
2417                ])),
2418                true,
2419            ))),
2420            true,
2421        )]));
2422
2423        // Create deeply nested data
2424        let y_array = Int32Array::from(vec![1, 2]);
2425        let x_array = Int32Array::from(vec![3, 4]);
2426        let innermost_struct = StructArray::from(vec![
2427            (
2428                Arc::new(Field::new("y", DataType::Int32, true)),
2429                Arc::new(y_array) as ArrayRef,
2430            ),
2431            (
2432                Arc::new(Field::new("x", DataType::Int32, true)),
2433                Arc::new(x_array) as ArrayRef,
2434            ),
2435        ]);
2436        let inner_list = ListArray::new(
2437            Arc::new(Field::new("item", inner_struct.clone(), true)),
2438            OffsetBuffer::from_lengths([2]),
2439            Arc::new(innermost_struct),
2440            None,
2441        );
2442
2443        let b_array = StringArray::from(vec!["b1"]);
2444        let a_array = StringArray::from(vec!["a1"]);
2445        let middle_struct = StructArray::from(vec![
2446            (
2447                Arc::new(Field::new("b", DataType::Utf8, true)),
2448                Arc::new(b_array) as ArrayRef,
2449            ),
2450            (
2451                Arc::new(Field::new(
2452                    "inner_list",
2453                    DataType::List(Arc::new(Field::new("item", inner_struct, true))),
2454                    true,
2455                )),
2456                Arc::new(inner_list) as ArrayRef,
2457            ),
2458            (
2459                Arc::new(Field::new("a", DataType::Utf8, true)),
2460                Arc::new(a_array) as ArrayRef,
2461            ),
2462        ]);
2463
2464        let outer_list = ListArray::new(
2465            Arc::new(Field::new("item", middle_struct.data_type().clone(), true)),
2466            OffsetBuffer::from_lengths([1]),
2467            Arc::new(middle_struct),
2468            None,
2469        );
2470
2471        let batch =
2472            RecordBatch::try_new(source_schema, vec![Arc::new(outer_list) as ArrayRef]).unwrap();
2473
2474        // Target schema with reordered fields at all levels
2475        let target_inner_struct = DataType::Struct(Fields::from(vec![
2476            Field::new("x", DataType::Int32, true), // x before y now
2477            Field::new("y", DataType::Int32, true),
2478        ]));
2479        let target_schema = Schema::new(vec![Field::new(
2480            "outer",
2481            DataType::List(Arc::new(Field::new(
2482                "item",
2483                DataType::Struct(Fields::from(vec![
2484                    Field::new("a", DataType::Utf8, true), // a before b now
2485                    Field::new(
2486                        "inner_list",
2487                        DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2488                        true,
2489                    ),
2490                    Field::new("b", DataType::Utf8, true),
2491                ])),
2492                true,
2493            ))),
2494            true,
2495        )]);
2496
2497        let projected = batch.project_by_schema(&target_schema).unwrap();
2498
2499        // Verify schema
2500        assert_eq!(projected.schema().as_ref(), &target_schema);
2501
2502        // Verify deeply nested data is reordered correctly
2503        let outer_list = projected.column(0).as_list::<i32>();
2504        let middle_struct = outer_list.values().as_struct();
2505
2506        // Middle struct should have a first, then inner_list, then b
2507        assert_eq!(
2508            middle_struct.column(0).as_ref(),
2509            &StringArray::from(vec!["a1"]) as &dyn Array
2510        );
2511        assert_eq!(
2512            middle_struct.column(2).as_ref(),
2513            &StringArray::from(vec!["b1"]) as &dyn Array
2514        );
2515
2516        // Inner list's struct should have x first, then y
2517        let inner_list = middle_struct.column(1).as_list::<i32>();
2518        let innermost_struct = inner_list.values().as_struct();
2519        assert_eq!(
2520            innermost_struct.column(0).as_ref(),
2521            &Int32Array::from(vec![3, 4]) as &dyn Array
2522        );
2523        assert_eq!(
2524            innermost_struct.column(1).as_ref(),
2525            &Int32Array::from(vec![1, 2]) as &dyn Array
2526        );
2527    }
2528}