Skip to main content

lance_arrow/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend Arrow Functionality
5//!
6//! To improve Arrow-RS ergonomic
7
8use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12    cast::AsArray, Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray,
13    GenericListArray, LargeListArray, ListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch,
14    StructArray, UInt32Array, UInt8Array,
15};
16use arrow_array::{
17    new_null_array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema, SortOptions};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30use crate::list::ListArrayExt;
31pub use floats::*;
32
33pub mod cast;
34pub mod json;
35pub mod list;
36pub mod memory;
37pub mod r#struct;
38
39/// Arrow extension metadata key for extension name
40pub const ARROW_EXT_NAME_KEY: &str = "ARROW:extension:name";
41
42/// Arrow extension metadata key for extension metadata
43pub const ARROW_EXT_META_KEY: &str = "ARROW:extension:metadata";
44
45/// Key used by lance to mark a field as a blob
46/// TODO: Use Arrow extension mechanism instead?
47pub const BLOB_META_KEY: &str = "lance-encoding:blob";
48/// Arrow extension type name for Lance blob v2 columns
49pub const BLOB_V2_EXT_NAME: &str = "lance.blob.v2";
50/// Metadata key for overriding the dedicated blob size threshold (in bytes)
51pub const BLOB_DEDICATED_SIZE_THRESHOLD_META_KEY: &str =
52    "lance-encoding:blob-dedicated-size-threshold";
53
54type Result<T> = std::result::Result<T, ArrowError>;
55
56pub trait DataTypeExt {
57    /// Returns true if the data type is binary-like, such as (Large)Utf8 and (Large)Binary.
58    ///
59    /// ```
60    /// use lance_arrow::*;
61    /// use arrow_schema::DataType;
62    ///
63    /// assert!(DataType::Utf8.is_binary_like());
64    /// assert!(DataType::Binary.is_binary_like());
65    /// assert!(DataType::LargeUtf8.is_binary_like());
66    /// assert!(DataType::LargeBinary.is_binary_like());
67    /// assert!(!DataType::Int32.is_binary_like());
68    /// ```
69    fn is_binary_like(&self) -> bool;
70
71    /// Returns true if the data type is a struct.
72    fn is_struct(&self) -> bool;
73
74    /// Check whether the given Arrow DataType is fixed stride.
75    ///
76    /// A fixed stride type has the same byte width for all array elements
77    /// This includes all PrimitiveType's Boolean, FixedSizeList, FixedSizeBinary, and Decimals
78    fn is_fixed_stride(&self) -> bool;
79
80    /// Returns true if the [DataType] is a dictionary type.
81    fn is_dictionary(&self) -> bool;
82
83    /// Returns the byte width of the data type
84    /// Panics if the data type is not fixed stride.
85    fn byte_width(&self) -> usize;
86
87    /// Returns the byte width of the data type, if it is fixed stride.
88    /// Returns None if the data type is not fixed stride.
89    fn byte_width_opt(&self) -> Option<usize>;
90}
91
92impl DataTypeExt for DataType {
93    fn is_binary_like(&self) -> bool {
94        use DataType::*;
95        matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
96    }
97
98    fn is_struct(&self) -> bool {
99        matches!(self, Self::Struct(_))
100    }
101
102    fn is_fixed_stride(&self) -> bool {
103        use DataType::*;
104        matches!(
105            self,
106            Boolean
107                | UInt8
108                | UInt16
109                | UInt32
110                | UInt64
111                | Int8
112                | Int16
113                | Int32
114                | Int64
115                | Float16
116                | Float32
117                | Float64
118                | Decimal128(_, _)
119                | Decimal256(_, _)
120                | FixedSizeList(_, _)
121                | FixedSizeBinary(_)
122                | Duration(_)
123                | Timestamp(_, _)
124                | Date32
125                | Date64
126                | Time32(_)
127                | Time64(_)
128        )
129    }
130
131    fn is_dictionary(&self) -> bool {
132        matches!(self, Self::Dictionary(_, _))
133    }
134
135    fn byte_width_opt(&self) -> Option<usize> {
136        match self {
137            Self::Int8 => Some(1),
138            Self::Int16 => Some(2),
139            Self::Int32 => Some(4),
140            Self::Int64 => Some(8),
141            Self::UInt8 => Some(1),
142            Self::UInt16 => Some(2),
143            Self::UInt32 => Some(4),
144            Self::UInt64 => Some(8),
145            Self::Float16 => Some(2),
146            Self::Float32 => Some(4),
147            Self::Float64 => Some(8),
148            Self::Date32 => Some(4),
149            Self::Date64 => Some(8),
150            Self::Time32(_) => Some(4),
151            Self::Time64(_) => Some(8),
152            Self::Timestamp(_, _) => Some(8),
153            Self::Duration(_) => Some(8),
154            Self::Decimal128(_, _) => Some(16),
155            Self::Decimal256(_, _) => Some(32),
156            Self::Interval(unit) => match unit {
157                IntervalUnit::YearMonth => Some(4),
158                IntervalUnit::DayTime => Some(8),
159                IntervalUnit::MonthDayNano => Some(16),
160            },
161            Self::FixedSizeBinary(s) => Some(*s as usize),
162            Self::FixedSizeList(dt, s) => dt
163                .data_type()
164                .byte_width_opt()
165                .map(|width| width * *s as usize),
166            _ => None,
167        }
168    }
169
170    fn byte_width(&self) -> usize {
171        self.byte_width_opt()
172            .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
173    }
174}
175
176/// Create an [`GenericListArray`] from values and offsets.
177///
178/// ```
179/// use arrow_array::{Int32Array, Int64Array, ListArray};
180/// use arrow_array::types::Int64Type;
181/// use lance_arrow::try_new_generic_list_array;
182///
183/// let offsets = Int32Array::from_iter([0, 2, 7, 10]);
184/// let int_values = Int64Array::from_iter(0..10);
185/// let list_arr = try_new_generic_list_array(int_values, &offsets).unwrap();
186/// assert_eq!(list_arr,
187///     ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
188///         Some(vec![Some(0), Some(1)]),
189///         Some(vec![Some(2), Some(3), Some(4), Some(5), Some(6)]),
190///         Some(vec![Some(7), Some(8), Some(9)]),
191/// ]))
192/// ```
193pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
194    values: T,
195    offsets: &PrimitiveArray<Offset>,
196) -> Result<GenericListArray<Offset::Native>>
197where
198    Offset::Native: OffsetSizeTrait,
199{
200    let data_type = if Offset::Native::IS_LARGE {
201        DataType::LargeList(Arc::new(Field::new(
202            "item",
203            values.data_type().clone(),
204            true,
205        )))
206    } else {
207        DataType::List(Arc::new(Field::new(
208            "item",
209            values.data_type().clone(),
210            true,
211        )))
212    };
213    let data = ArrayDataBuilder::new(data_type)
214        .len(offsets.len() - 1)
215        .add_buffer(offsets.into_data().buffers()[0].clone())
216        .add_child_data(values.into_data())
217        .build()?;
218
219    Ok(GenericListArray::from(data))
220}
221
222pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
223    DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
224}
225
226pub trait FixedSizeListArrayExt {
227    /// Create an [`FixedSizeListArray`] from values and list size.
228    ///
229    /// ```
230    /// use arrow_array::{Int64Array, FixedSizeListArray};
231    /// use arrow_array::types::Int64Type;
232    /// use lance_arrow::FixedSizeListArrayExt;
233    ///
234    /// let int_values = Int64Array::from_iter(0..10);
235    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 2).unwrap();
236    /// assert_eq!(fixed_size_list_arr,
237    ///     FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(vec![
238    ///         Some(vec![Some(0), Some(1)]),
239    ///         Some(vec![Some(2), Some(3)]),
240    ///         Some(vec![Some(4), Some(5)]),
241    ///         Some(vec![Some(6), Some(7)]),
242    ///         Some(vec![Some(8), Some(9)])
243    /// ], 2))
244    /// ```
245    fn try_new_from_values<T: Array + 'static>(
246        values: T,
247        list_size: i32,
248    ) -> Result<FixedSizeListArray>;
249
250    /// Sample `n` rows from the [FixedSizeListArray]
251    ///
252    /// ```
253    /// use arrow_array::{Int64Array, FixedSizeListArray, Array};
254    /// use lance_arrow::FixedSizeListArrayExt;
255    ///
256    /// let int_values = Int64Array::from_iter(0..256);
257    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 16).unwrap();
258    /// let sampled = fixed_size_list_arr.sample(10).unwrap();
259    /// assert_eq!(sampled.len(), 10);
260    /// assert_eq!(sampled.value_length(), 16);
261    /// assert_eq!(sampled.values().len(), 160);
262    /// ```
263    fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
264
265    /// Ensure the [FixedSizeListArray] of Float16, Float32, Float64,
266    /// Int8, Int16, Int32, Int64, UInt8, UInt32 type to its closest floating point type.
267    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
268}
269
270impl FixedSizeListArrayExt for FixedSizeListArray {
271    fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
272        let field = Arc::new(Field::new("item", values.data_type().clone(), true));
273        let values = Arc::new(values);
274
275        Self::try_new(field, list_size, values, None)
276    }
277
278    fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
279        if n >= self.len() {
280            return Ok(self.clone());
281        }
282        let mut rng = SmallRng::from_os_rng();
283        let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
284        take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
285    }
286
287    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
288        match self.data_type() {
289            DataType::FixedSizeList(field, size) => match field.data_type() {
290                DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
291                DataType::Int8 => Ok(Self::new(
292                    Arc::new(arrow_schema::Field::new(
293                        field.name(),
294                        DataType::Float32,
295                        field.is_nullable(),
296                    )),
297                    *size,
298                    Arc::new(Float32Array::from_iter_values(
299                        self.values()
300                            .as_any()
301                            .downcast_ref::<Int8Array>()
302                            .ok_or(ArrowError::ParseError(
303                                "Fail to cast primitive array to Int8Type".to_string(),
304                            ))?
305                            .into_iter()
306                            .filter_map(|x| x.map(|y| y as f32)),
307                    )),
308                    self.nulls().cloned(),
309                )),
310                DataType::Int16 => Ok(Self::new(
311                    Arc::new(arrow_schema::Field::new(
312                        field.name(),
313                        DataType::Float32,
314                        field.is_nullable(),
315                    )),
316                    *size,
317                    Arc::new(Float32Array::from_iter_values(
318                        self.values()
319                            .as_any()
320                            .downcast_ref::<Int16Array>()
321                            .ok_or(ArrowError::ParseError(
322                                "Fail to cast primitive array to Int8Type".to_string(),
323                            ))?
324                            .into_iter()
325                            .filter_map(|x| x.map(|y| y as f32)),
326                    )),
327                    self.nulls().cloned(),
328                )),
329                DataType::Int32 => Ok(Self::new(
330                    Arc::new(arrow_schema::Field::new(
331                        field.name(),
332                        DataType::Float32,
333                        field.is_nullable(),
334                    )),
335                    *size,
336                    Arc::new(Float32Array::from_iter_values(
337                        self.values()
338                            .as_any()
339                            .downcast_ref::<Int32Array>()
340                            .ok_or(ArrowError::ParseError(
341                                "Fail to cast primitive array to Int8Type".to_string(),
342                            ))?
343                            .into_iter()
344                            .filter_map(|x| x.map(|y| y as f32)),
345                    )),
346                    self.nulls().cloned(),
347                )),
348                DataType::Int64 => Ok(Self::new(
349                    Arc::new(arrow_schema::Field::new(
350                        field.name(),
351                        DataType::Float64,
352                        field.is_nullable(),
353                    )),
354                    *size,
355                    Arc::new(Float64Array::from_iter_values(
356                        self.values()
357                            .as_any()
358                            .downcast_ref::<Int64Array>()
359                            .ok_or(ArrowError::ParseError(
360                                "Fail to cast primitive array to Int8Type".to_string(),
361                            ))?
362                            .into_iter()
363                            .filter_map(|x| x.map(|y| y as f64)),
364                    )),
365                    self.nulls().cloned(),
366                )),
367                DataType::UInt8 => Ok(Self::new(
368                    Arc::new(arrow_schema::Field::new(
369                        field.name(),
370                        DataType::Float64,
371                        field.is_nullable(),
372                    )),
373                    *size,
374                    Arc::new(Float64Array::from_iter_values(
375                        self.values()
376                            .as_any()
377                            .downcast_ref::<UInt8Array>()
378                            .ok_or(ArrowError::ParseError(
379                                "Fail to cast primitive array to Int8Type".to_string(),
380                            ))?
381                            .into_iter()
382                            .filter_map(|x| x.map(|y| y as f64)),
383                    )),
384                    self.nulls().cloned(),
385                )),
386                DataType::UInt32 => Ok(Self::new(
387                    Arc::new(arrow_schema::Field::new(
388                        field.name(),
389                        DataType::Float64,
390                        field.is_nullable(),
391                    )),
392                    *size,
393                    Arc::new(Float64Array::from_iter_values(
394                        self.values()
395                            .as_any()
396                            .downcast_ref::<UInt32Array>()
397                            .ok_or(ArrowError::ParseError(
398                                "Fail to cast primitive array to Int8Type".to_string(),
399                            ))?
400                            .into_iter()
401                            .filter_map(|x| x.map(|y| y as f64)),
402                    )),
403                    self.nulls().cloned(),
404                )),
405                data_type => Err(ArrowError::ParseError(format!(
406                    "Expect either floating type or integer got {:?}",
407                    data_type
408                ))),
409            },
410            data_type => Err(ArrowError::ParseError(format!(
411                "Expect either FixedSizeList got {:?}",
412                data_type
413            ))),
414        }
415    }
416}
417
418/// Force downcast of an [`Array`], such as an [`ArrayRef`], to
419/// [`FixedSizeListArray`], panic'ing on failure.
420pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
421    arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
422}
423
424pub trait FixedSizeBinaryArrayExt {
425    /// Create an [`FixedSizeBinaryArray`] from values and stride.
426    ///
427    /// ```
428    /// use arrow_array::{UInt8Array, FixedSizeBinaryArray};
429    /// use arrow_array::types::UInt8Type;
430    /// use lance_arrow::FixedSizeBinaryArrayExt;
431    ///
432    /// let int_values = UInt8Array::from_iter(0..10);
433    /// let fixed_size_list_arr = FixedSizeBinaryArray::try_new_from_values(&int_values, 2).unwrap();
434    /// assert_eq!(fixed_size_list_arr,
435    ///     FixedSizeBinaryArray::from(vec![
436    ///         Some(vec![0, 1].as_slice()),
437    ///         Some(vec![2, 3].as_slice()),
438    ///         Some(vec![4, 5].as_slice()),
439    ///         Some(vec![6, 7].as_slice()),
440    ///         Some(vec![8, 9].as_slice())
441    /// ]))
442    /// ```
443    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
444}
445
446impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
447    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
448        let data_type = DataType::FixedSizeBinary(stride);
449        let data = ArrayDataBuilder::new(data_type)
450            .len(values.len() / stride as usize)
451            .add_buffer(values.into_data().buffers()[0].clone())
452            .build()?;
453        Ok(Self::from(data))
454    }
455}
456
457pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
458    arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
459}
460
461pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
462    match arr.data_type() {
463        DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
464        DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
465        _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
466    }
467}
468
469/// Extends Arrow's [RecordBatch].
470pub trait RecordBatchExt {
471    /// Append a new column to this [`RecordBatch`] and returns a new RecordBatch.
472    ///
473    /// ```
474    /// use std::sync::Arc;
475    /// use arrow_array::{RecordBatch, Int32Array, StringArray};
476    /// use arrow_schema::{Schema, Field, DataType};
477    /// use lance_arrow::*;
478    ///
479    /// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
480    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
481    /// let record_batch = RecordBatch::try_new(schema, vec![int_arr.clone()]).unwrap();
482    ///
483    /// let new_field = Field::new("s", DataType::Utf8, true);
484    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
485    /// let new_record_batch = record_batch.try_with_column(new_field, str_arr.clone()).unwrap();
486    ///
487    /// assert_eq!(
488    ///     new_record_batch,
489    ///     RecordBatch::try_new(
490    ///         Arc::new(Schema::new(
491    ///             vec![
492    ///                 Field::new("a", DataType::Int32, true),
493    ///                 Field::new("s", DataType::Utf8, true)
494    ///             ])
495    ///         ),
496    ///         vec![int_arr, str_arr],
497    ///     ).unwrap()
498    /// )
499    /// ```
500    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
501
502    /// Created a new RecordBatch with column at index.
503    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
504
505    /// Creates a new [`RecordBatch`] from the provided  [`StructArray`].
506    ///
507    /// The fields on the [`StructArray`] need to match this [`RecordBatch`] schema
508    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
509
510    /// Merge with another [`RecordBatch`] and returns a new one.
511    ///
512    /// Fields are merged based on name.  First we iterate the left columns.  If a matching
513    /// name is found in the right then we merge the two columns.  If there is no match then
514    /// we add the left column to the output.
515    ///
516    /// To merge two columns we consider the type.  If both arrays are struct arrays we recurse.
517    /// Otherwise we use the left array.
518    ///
519    /// Afterwards we add all non-matching right columns to the output.
520    ///
521    /// Note: This method likely does not handle nested fields correctly and you may want to consider
522    /// using [`Self::merge_with_schema`] instead.
523    /// ```
524    /// use std::sync::Arc;
525    /// use arrow_array::*;
526    /// use arrow_schema::{Schema, Field, DataType};
527    /// use lance_arrow::*;
528    ///
529    /// let left_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
530    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
531    /// let left = RecordBatch::try_new(left_schema, vec![int_arr.clone()]).unwrap();
532    ///
533    /// let right_schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
534    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
535    /// let right = RecordBatch::try_new(right_schema, vec![str_arr.clone()]).unwrap();
536    ///
537    /// let new_record_batch = left.merge(&right).unwrap();
538    ///
539    /// assert_eq!(
540    ///     new_record_batch,
541    ///     RecordBatch::try_new(
542    ///         Arc::new(Schema::new(
543    ///             vec![
544    ///                 Field::new("a", DataType::Int32, true),
545    ///                 Field::new("s", DataType::Utf8, true)
546    ///             ])
547    ///         ),
548    ///         vec![int_arr, str_arr],
549    ///     ).unwrap()
550    /// )
551    /// ```
552    ///
553    /// TODO: add merge nested fields support.
554    fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
555
556    /// Create a batch by merging columns between two batches with a given schema.
557    ///
558    /// A reference schema is used to determine the proper ordering of nested fields.
559    ///
560    /// For each field in the reference schema we look for corresponding fields in
561    /// the left and right batches.  If a field is found in both batches we recursively merge
562    /// it.
563    ///
564    /// If a field is only in the left or right batch we take it as it is.
565    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
566
567    /// Drop one column specified with the name and return the new [`RecordBatch`].
568    ///
569    /// If the named column does not exist, it returns a copy of this [`RecordBatch`].
570    fn drop_column(&self, name: &str) -> Result<RecordBatch>;
571
572    /// Replace a column (specified by name) and return the new [`RecordBatch`].
573    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
574
575    /// Replace a column schema (specified by name) and return the new [`RecordBatch`].
576    fn replace_column_schema_by_name(
577        &self,
578        name: &str,
579        new_data_type: DataType,
580        column: Arc<dyn Array>,
581    ) -> Result<RecordBatch>;
582
583    /// Rename a column at a given index.
584    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
585
586    /// Get (potentially nested) column by qualified name.
587    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
588
589    /// Project the schema over the [RecordBatch].
590    fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
591
592    /// metadata of the schema.
593    fn metadata(&self) -> &HashMap<String, String>;
594
595    /// Add metadata to the schema.
596    fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
597        let mut metadata = self.metadata().clone();
598        metadata.insert(key, value);
599        self.with_metadata(metadata)
600    }
601
602    /// Replace the schema metadata with the provided one.
603    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
604
605    /// Take selected rows from the [RecordBatch].
606    fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
607
608    /// Create a new RecordBatch with compacted memory after slicing.
609    fn shrink_to_fit(&self) -> Result<RecordBatch>;
610
611    /// Helper method to sort the RecordBatch by a column
612    fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<RecordBatch>;
613}
614
615impl RecordBatchExt for RecordBatch {
616    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
617        let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
618        let mut new_columns = self.columns().to_vec();
619        new_columns.push(arr);
620        Self::try_new(new_schema, new_columns)
621    }
622
623    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
624        let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
625        let mut new_columns = self.columns().to_vec();
626        new_columns.insert(index, arr);
627        Self::try_new(new_schema, new_columns)
628    }
629
630    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
631        let schema = Arc::new(Schema::new_with_metadata(
632            arr.fields().to_vec(),
633            self.schema().metadata.clone(),
634        ));
635        let batch = Self::from(arr);
636        batch.with_schema(schema)
637    }
638
639    fn merge(&self, other: &Self) -> Result<Self> {
640        if self.num_rows() != other.num_rows() {
641            return Err(ArrowError::InvalidArgumentError(format!(
642                "Attempt to merge two RecordBatch with different sizes: {} != {}",
643                self.num_rows(),
644                other.num_rows()
645            )));
646        }
647        let left_struct_array: StructArray = self.clone().into();
648        let right_struct_array: StructArray = other.clone().into();
649        self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
650    }
651
652    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
653        if self.num_rows() != other.num_rows() {
654            return Err(ArrowError::InvalidArgumentError(format!(
655                "Attempt to merge two RecordBatch with different sizes: {} != {}",
656                self.num_rows(),
657                other.num_rows()
658            )));
659        }
660        let left_struct_array: StructArray = self.clone().into();
661        let right_struct_array: StructArray = other.clone().into();
662        self.try_new_from_struct_array(merge_with_schema(
663            &left_struct_array,
664            &right_struct_array,
665            schema.fields(),
666        ))
667    }
668
669    fn drop_column(&self, name: &str) -> Result<Self> {
670        let mut fields = vec![];
671        let mut columns = vec![];
672        for i in 0..self.schema().fields.len() {
673            if self.schema().field(i).name() != name {
674                fields.push(self.schema().field(i).clone());
675                columns.push(self.column(i).clone());
676            }
677        }
678        Self::try_new(
679            Arc::new(Schema::new_with_metadata(
680                fields,
681                self.schema().metadata().clone(),
682            )),
683            columns,
684        )
685    }
686
687    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
688        let mut fields = self.schema().fields().to_vec();
689        if index >= fields.len() {
690            return Err(ArrowError::InvalidArgumentError(format!(
691                "Index out of bounds: {}",
692                index
693            )));
694        }
695        fields[index] = Arc::new(Field::new(
696            new_name,
697            fields[index].data_type().clone(),
698            fields[index].is_nullable(),
699        ));
700        Self::try_new(
701            Arc::new(Schema::new_with_metadata(
702                fields,
703                self.schema().metadata().clone(),
704            )),
705            self.columns().to_vec(),
706        )
707    }
708
709    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
710        let mut columns = self.columns().to_vec();
711        let field_i = self
712            .schema()
713            .fields()
714            .iter()
715            .position(|f| f.name() == name)
716            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
717        columns[field_i] = column;
718        Self::try_new(self.schema(), columns)
719    }
720
721    fn replace_column_schema_by_name(
722        &self,
723        name: &str,
724        new_data_type: DataType,
725        column: Arc<dyn Array>,
726    ) -> Result<RecordBatch> {
727        let fields = self
728            .schema()
729            .fields()
730            .iter()
731            .map(|x| {
732                if x.name() != name {
733                    x.clone()
734                } else {
735                    let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
736                    Arc::new(new_field)
737                }
738            })
739            .collect::<Vec<_>>();
740        let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
741        let mut columns = self.columns().to_vec();
742        let field_i = self
743            .schema()
744            .fields()
745            .iter()
746            .position(|f| f.name() == name)
747            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
748        columns[field_i] = column;
749        Self::try_new(Arc::new(schema), columns)
750    }
751
752    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
753        let split = name.split('.').collect::<Vec<_>>();
754        if split.is_empty() {
755            return None;
756        }
757
758        self.column_by_name(split[0])
759            .and_then(|arr| get_sub_array(arr, &split[1..]))
760    }
761
762    fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
763        let struct_array: StructArray = self.clone().into();
764        self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
765    }
766
767    fn metadata(&self) -> &HashMap<String, String> {
768        self.schema_ref().metadata()
769    }
770
771    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
772        let mut schema = self.schema_ref().as_ref().clone();
773        schema.metadata = metadata;
774        Self::try_new(schema.into(), self.columns().into())
775    }
776
777    fn take(&self, indices: &UInt32Array) -> Result<Self> {
778        let struct_array: StructArray = self.clone().into();
779        let taken = take(&struct_array, indices, None)?;
780        self.try_new_from_struct_array(taken.as_struct().clone())
781    }
782
783    fn shrink_to_fit(&self) -> Result<Self> {
784        // Deep copy the sliced record batch, instead of whole batch
785        crate::deepcopy::deep_copy_batch_sliced(self)
786    }
787
788    fn sort_by_column(&self, column: usize, options: Option<SortOptions>) -> Result<Self> {
789        if column >= self.num_columns() {
790            return Err(ArrowError::InvalidArgumentError(format!(
791                "Column index out of bounds: {}",
792                column
793            )));
794        }
795        let column = self.column(column);
796        let sorted = arrow_ord::sort::sort_to_indices(column, options, None)?;
797        self.take(&sorted)
798    }
799}
800
801/// Recursively projects an array to match the target field's structure.
802/// This handles reordering fields inside nested List<Struct> types.
803fn project_array(array: &ArrayRef, target_field: &Field) -> Result<ArrayRef> {
804    match target_field.data_type() {
805        DataType::Struct(subfields) => {
806            let struct_arr = array.as_struct();
807            let projected = project(struct_arr, subfields)?;
808            Ok(Arc::new(projected))
809        }
810        DataType::List(inner_field) => {
811            let list_arr: &ListArray = array.as_list();
812            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
813            Ok(Arc::new(ListArray::new(
814                inner_field.clone(),
815                list_arr.offsets().clone(),
816                projected_values,
817                list_arr.nulls().cloned(),
818            )))
819        }
820        DataType::LargeList(inner_field) => {
821            let list_arr: &LargeListArray = array.as_list();
822            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
823            Ok(Arc::new(LargeListArray::new(
824                inner_field.clone(),
825                list_arr.offsets().clone(),
826                projected_values,
827                list_arr.nulls().cloned(),
828            )))
829        }
830        DataType::FixedSizeList(inner_field, size) => {
831            let list_arr = array.as_fixed_size_list();
832            let projected_values = project_array(list_arr.values(), inner_field.as_ref())?;
833            Ok(Arc::new(FixedSizeListArray::new(
834                inner_field.clone(),
835                *size,
836                projected_values,
837                list_arr.nulls().cloned(),
838            )))
839        }
840        _ => Ok(array.clone()),
841    }
842}
843
844fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
845    if fields.is_empty() {
846        return Ok(StructArray::new_empty_fields(
847            struct_array.len(),
848            struct_array.nulls().cloned(),
849        ));
850    }
851    let mut columns: Vec<ArrayRef> = vec![];
852    for field in fields.iter() {
853        if let Some(col) = struct_array.column_by_name(field.name()) {
854            let projected = project_array(col, field.as_ref())?;
855            columns.push(projected);
856        } else {
857            return Err(ArrowError::SchemaError(format!(
858                "field {} does not exist in the RecordBatch",
859                field.name()
860            )));
861        }
862    }
863    // Preserve the struct's validity when projecting
864    StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
865}
866
867fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
868    let left_list: &GenericListArray<T> = left.as_list();
869    let right_list: &GenericListArray<T> = right.as_list();
870    left_list.offsets().inner() == right_list.offsets().inner()
871}
872
873fn merge_list_structs_helper<T: OffsetSizeTrait>(
874    left: &dyn Array,
875    right: &dyn Array,
876    items_field_name: impl Into<String>,
877    items_nullable: bool,
878) -> Arc<dyn Array> {
879    let left_list: &GenericListArray<T> = left.as_list();
880    let right_list: &GenericListArray<T> = right.as_list();
881    let left_struct = left_list.values();
882    let right_struct = right_list.values();
883    let left_struct_arr = left_struct.as_struct();
884    let right_struct_arr = right_struct.as_struct();
885    let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
886    let items_field = Arc::new(Field::new(
887        items_field_name,
888        merged_items.data_type().clone(),
889        items_nullable,
890    ));
891    Arc::new(GenericListArray::<T>::new(
892        items_field,
893        left_list.offsets().clone(),
894        merged_items,
895        left_list.nulls().cloned(),
896    ))
897}
898
899fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
900    left: &dyn Array,
901    right: &dyn Array,
902    not_null: &dyn Array,
903    items_field_name: impl Into<String>,
904) -> Arc<dyn Array> {
905    let left_list: &GenericListArray<T> = left.as_list::<T>();
906    let not_null_list = not_null.as_list::<T>();
907    let right_list = right.as_list::<T>();
908
909    let left_struct = left_list.values().as_struct();
910    let not_null_struct: &StructArray = not_null_list.values().as_struct();
911    let right_struct = right_list.values().as_struct();
912
913    let values_len = not_null_list.values().len();
914    let mut merged_fields =
915        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
916    let mut merged_columns =
917        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
918
919    for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
920        merged_fields.push(field.clone());
921        if let Some(val) = not_null_struct.column_by_name(field.name()) {
922            merged_columns.push(val.clone());
923        } else {
924            merged_columns.push(new_null_array(field.data_type(), values_len))
925        }
926    }
927    for (_, field) in right_struct
928        .columns()
929        .iter()
930        .zip(right_struct.fields())
931        .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
932    {
933        merged_fields.push(field.clone());
934        if let Some(val) = not_null_struct.column_by_name(field.name()) {
935            merged_columns.push(val.clone());
936        } else {
937            merged_columns.push(new_null_array(field.data_type(), values_len));
938        }
939    }
940
941    let merged_struct = Arc::new(StructArray::new(
942        Fields::from(merged_fields),
943        merged_columns,
944        not_null_struct.nulls().cloned(),
945    ));
946    let items_field = Arc::new(Field::new(
947        items_field_name,
948        merged_struct.data_type().clone(),
949        true,
950    ));
951    Arc::new(GenericListArray::<T>::new(
952        items_field,
953        not_null_list.offsets().clone(),
954        merged_struct,
955        not_null_list.nulls().cloned(),
956    ))
957}
958
959fn merge_list_struct_null(
960    left: &dyn Array,
961    right: &dyn Array,
962    not_null: &dyn Array,
963) -> Arc<dyn Array> {
964    match left.data_type() {
965        DataType::List(left_field) => {
966            merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
967        }
968        DataType::LargeList(left_field) => {
969            merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
970        }
971        _ => unreachable!(),
972    }
973}
974
975fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
976    // Merging fields into a list<struct<...>> is tricky and can only succeed
977    // in two ways.  First, if both lists have the same offsets.  Second, if
978    // one of the lists is all-null
979    if left.null_count() == left.len() {
980        return merge_list_struct_null(left, right, right);
981    } else if right.null_count() == right.len() {
982        return merge_list_struct_null(left, right, left);
983    }
984    match (left.data_type(), right.data_type()) {
985        (DataType::List(left_field), DataType::List(_)) => {
986            if !lists_have_same_offsets_helper::<i32>(left, right) {
987                panic!("Attempt to merge list struct arrays which do not have same offsets");
988            }
989            merge_list_structs_helper::<i32>(
990                left,
991                right,
992                left_field.name(),
993                left_field.is_nullable(),
994            )
995        }
996        (DataType::LargeList(left_field), DataType::LargeList(_)) => {
997            if !lists_have_same_offsets_helper::<i64>(left, right) {
998                panic!("Attempt to merge list struct arrays which do not have same offsets");
999            }
1000            merge_list_structs_helper::<i64>(
1001                left,
1002                right,
1003                left_field.name(),
1004                left_field.is_nullable(),
1005            )
1006        }
1007        _ => unreachable!(),
1008    }
1009}
1010
1011/// Helper function to normalize validity buffers
1012/// Returns None for all-null validity (placeholder structs)
1013fn normalize_validity(
1014    validity: Option<&arrow_buffer::NullBuffer>,
1015) -> Option<&arrow_buffer::NullBuffer> {
1016    validity.and_then(|v| {
1017        if v.null_count() == v.len() {
1018            None
1019        } else {
1020            Some(v)
1021        }
1022    })
1023}
1024
1025/// Helper function to merge validity buffers from two struct arrays
1026/// Returns None only if both arrays are null at the same position
1027///
1028/// Special handling for placeholder structs (all-null validity)
1029fn merge_struct_validity(
1030    left_validity: Option<&arrow_buffer::NullBuffer>,
1031    right_validity: Option<&arrow_buffer::NullBuffer>,
1032) -> Option<arrow_buffer::NullBuffer> {
1033    // Normalize both validity buffers (convert all-null to None)
1034    let left_normalized = normalize_validity(left_validity);
1035    let right_normalized = normalize_validity(right_validity);
1036
1037    match (left_normalized, right_normalized) {
1038        // Fast paths: no computation needed
1039        (None, None) => None,
1040        (Some(left), None) => Some(left.clone()),
1041        (None, Some(right)) => Some(right.clone()),
1042        (Some(left), Some(right)) => {
1043            // Fast path: if both have no nulls, can return either one
1044            if left.null_count() == 0 && right.null_count() == 0 {
1045                return Some(left.clone());
1046            }
1047
1048            let left_buffer = left.inner();
1049            let right_buffer = right.inner();
1050
1051            // Perform bitwise OR directly on BooleanBuffers
1052            // This preserves the correct semantics: 1 = valid, 0 = null
1053            let merged_buffer = left_buffer | right_buffer;
1054
1055            Some(arrow_buffer::NullBuffer::from(merged_buffer))
1056        }
1057    }
1058}
1059
1060fn merge_list_child_values(
1061    child_field: &Field,
1062    left_values: ArrayRef,
1063    right_values: ArrayRef,
1064) -> ArrayRef {
1065    match child_field.data_type() {
1066        DataType::Struct(child_fields) => Arc::new(merge_with_schema(
1067            left_values.as_struct(),
1068            right_values.as_struct(),
1069            child_fields,
1070        )) as ArrayRef,
1071        DataType::List(grandchild) => {
1072            let left_list = left_values
1073                .as_any()
1074                .downcast_ref::<ListArray>()
1075                .expect("left list values should be ListArray");
1076            let right_list = right_values
1077                .as_any()
1078                .downcast_ref::<ListArray>()
1079                .expect("right list values should be ListArray");
1080            let merged_values = merge_list_child_values(
1081                grandchild.as_ref(),
1082                left_list.values().clone(),
1083                right_list.values().clone(),
1084            );
1085            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1086            Arc::new(ListArray::new(
1087                grandchild.clone(),
1088                left_list.offsets().clone(),
1089                merged_values,
1090                merged_validity,
1091            )) as ArrayRef
1092        }
1093        DataType::LargeList(grandchild) => {
1094            let left_list = left_values
1095                .as_any()
1096                .downcast_ref::<LargeListArray>()
1097                .expect("left list values should be LargeListArray");
1098            let right_list = right_values
1099                .as_any()
1100                .downcast_ref::<LargeListArray>()
1101                .expect("right list values should be LargeListArray");
1102            let merged_values = merge_list_child_values(
1103                grandchild.as_ref(),
1104                left_list.values().clone(),
1105                right_list.values().clone(),
1106            );
1107            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1108            Arc::new(LargeListArray::new(
1109                grandchild.clone(),
1110                left_list.offsets().clone(),
1111                merged_values,
1112                merged_validity,
1113            )) as ArrayRef
1114        }
1115        DataType::FixedSizeList(grandchild, list_size) => {
1116            let left_list = left_values
1117                .as_any()
1118                .downcast_ref::<FixedSizeListArray>()
1119                .expect("left list values should be FixedSizeListArray");
1120            let right_list = right_values
1121                .as_any()
1122                .downcast_ref::<FixedSizeListArray>()
1123                .expect("right list values should be FixedSizeListArray");
1124            let merged_values = merge_list_child_values(
1125                grandchild.as_ref(),
1126                left_list.values().clone(),
1127                right_list.values().clone(),
1128            );
1129            let merged_validity = merge_struct_validity(left_list.nulls(), right_list.nulls());
1130            Arc::new(FixedSizeListArray::new(
1131                grandchild.clone(),
1132                *list_size,
1133                merged_values,
1134                merged_validity,
1135            )) as ArrayRef
1136        }
1137        _ => left_values.clone(),
1138    }
1139}
1140
1141// Helper function to adjust child array validity based on parent struct validity
1142// When parent struct is null, propagates null to child array
1143// Optimized with fast paths and SIMD operations
1144fn adjust_child_validity(
1145    child: &ArrayRef,
1146    parent_validity: Option<&arrow_buffer::NullBuffer>,
1147) -> ArrayRef {
1148    // Fast path: no parent validity means no adjustment needed
1149    let parent_validity = match parent_validity {
1150        None => return child.clone(),
1151        Some(p) if p.null_count() == 0 => return child.clone(), // No nulls to propagate
1152        Some(p) => p,
1153    };
1154
1155    let child_validity = child.nulls();
1156
1157    // Compute the new validity: child_validity AND parent_validity
1158    let new_validity = match child_validity {
1159        None => {
1160            // Fast path: child has no existing validity, just use parent's
1161            parent_validity.clone()
1162        }
1163        Some(child_nulls) => {
1164            let child_buffer = child_nulls.inner();
1165            let parent_buffer = parent_validity.inner();
1166
1167            // Perform bitwise AND directly on BooleanBuffers
1168            // This preserves the correct semantics: 1 = valid, 0 = null
1169            let merged_buffer = child_buffer & parent_buffer;
1170
1171            arrow_buffer::NullBuffer::from(merged_buffer)
1172        }
1173    };
1174
1175    // Create new array with adjusted validity
1176    arrow_array::make_array(
1177        arrow_data::ArrayData::try_new(
1178            child.data_type().clone(),
1179            child.len(),
1180            Some(new_validity.into_inner().into_inner()),
1181            child.offset(),
1182            child.to_data().buffers().to_vec(),
1183            child.to_data().child_data().to_vec(),
1184        )
1185        .unwrap(),
1186    )
1187}
1188
1189fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1190    let mut fields: Vec<Field> = vec![];
1191    let mut columns: Vec<ArrayRef> = vec![];
1192    let right_fields = right_struct_array.fields();
1193    let right_columns = right_struct_array.columns();
1194
1195    // Get the validity buffers from both structs
1196    let left_validity = left_struct_array.nulls();
1197    let right_validity = right_struct_array.nulls();
1198
1199    // Compute merged validity
1200    let merged_validity = merge_struct_validity(left_validity, right_validity);
1201
1202    // iterate through the fields on the left hand side
1203    for (left_field, left_column) in left_struct_array
1204        .fields()
1205        .iter()
1206        .zip(left_struct_array.columns().iter())
1207    {
1208        match right_fields
1209            .iter()
1210            .position(|f| f.name() == left_field.name())
1211        {
1212            // if the field exists on the right hand side, merge them recursively if appropriate
1213            Some(right_index) => {
1214                let right_field = right_fields.get(right_index).unwrap();
1215                let right_column = right_columns.get(right_index).unwrap();
1216                // if both fields are struct, merge them recursively
1217                match (left_field.data_type(), right_field.data_type()) {
1218                    (DataType::Struct(_), DataType::Struct(_)) => {
1219                        let left_sub_array = left_column.as_struct();
1220                        let right_sub_array = right_column.as_struct();
1221                        let merged_sub_array = merge(left_sub_array, right_sub_array);
1222                        fields.push(Field::new(
1223                            left_field.name(),
1224                            merged_sub_array.data_type().clone(),
1225                            left_field.is_nullable(),
1226                        ));
1227                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1228                    }
1229                    (DataType::List(left_list), DataType::List(right_list))
1230                        if left_list.data_type().is_struct()
1231                            && right_list.data_type().is_struct() =>
1232                    {
1233                        // If there is nothing to merge just use the left field
1234                        if left_list.data_type() == right_list.data_type() {
1235                            fields.push(left_field.as_ref().clone());
1236                            columns.push(left_column.clone());
1237                        }
1238                        // If we have two List<Struct> and they have different sets of fields then
1239                        // we can merge them if the offsets arrays are the same.  Otherwise, we
1240                        // have to consider it an error.
1241                        let merged_sub_array = merge_list_struct(&left_column, &right_column);
1242
1243                        fields.push(Field::new(
1244                            left_field.name(),
1245                            merged_sub_array.data_type().clone(),
1246                            left_field.is_nullable(),
1247                        ));
1248                        columns.push(merged_sub_array);
1249                    }
1250                    // otherwise, just use the field on the left hand side
1251                    _ => {
1252                        // TODO handle list-of-struct and other types
1253                        fields.push(left_field.as_ref().clone());
1254                        // Adjust the column validity: if left struct was null, propagate to child
1255                        let adjusted_column = adjust_child_validity(left_column, left_validity);
1256                        columns.push(adjusted_column);
1257                    }
1258                }
1259            }
1260            None => {
1261                fields.push(left_field.as_ref().clone());
1262                // Adjust the column validity: if left struct was null, propagate to child
1263                let adjusted_column = adjust_child_validity(left_column, left_validity);
1264                columns.push(adjusted_column);
1265            }
1266        }
1267    }
1268
1269    // now iterate through the fields on the right hand side
1270    right_fields
1271        .iter()
1272        .zip(right_columns.iter())
1273        .for_each(|(field, column)| {
1274            // add new columns on the right
1275            if !left_struct_array
1276                .fields()
1277                .iter()
1278                .any(|f| f.name() == field.name())
1279            {
1280                fields.push(field.as_ref().clone());
1281                // This field doesn't exist on the left
1282                // We use the right's column but need to adjust for struct validity
1283                let adjusted_column = adjust_child_validity(column, right_validity);
1284                columns.push(adjusted_column);
1285            }
1286        });
1287
1288    StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1289}
1290
1291fn merge_with_schema(
1292    left_struct_array: &StructArray,
1293    right_struct_array: &StructArray,
1294    fields: &Fields,
1295) -> StructArray {
1296    // Helper function that returns true if both types are struct or both are non-struct
1297    fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1298        match (left, right) {
1299            (DataType::Struct(_), DataType::Struct(_)) => true,
1300            (DataType::Struct(_), _) => false,
1301            (_, DataType::Struct(_)) => false,
1302            _ => true,
1303        }
1304    }
1305
1306    let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1307    let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1308
1309    let left_fields = left_struct_array.fields();
1310    let left_columns = left_struct_array.columns();
1311    let right_fields = right_struct_array.fields();
1312    let right_columns = right_struct_array.columns();
1313
1314    // Get the validity buffers from both structs
1315    let left_validity = left_struct_array.nulls();
1316    let right_validity = right_struct_array.nulls();
1317
1318    // Compute merged validity
1319    let merged_validity = merge_struct_validity(left_validity, right_validity);
1320
1321    for field in fields {
1322        let left_match_idx = left_fields.iter().position(|f| {
1323            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1324        });
1325        let right_match_idx = right_fields.iter().position(|f| {
1326            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1327        });
1328
1329        match (left_match_idx, right_match_idx) {
1330            (None, Some(right_idx)) => {
1331                output_fields.push(right_fields[right_idx].as_ref().clone());
1332                // Adjust validity if the right struct was null
1333                let adjusted_column =
1334                    adjust_child_validity(&right_columns[right_idx], right_validity);
1335                columns.push(adjusted_column);
1336            }
1337            (Some(left_idx), None) => {
1338                output_fields.push(left_fields[left_idx].as_ref().clone());
1339                // Adjust validity if the left struct was null
1340                let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1341                columns.push(adjusted_column);
1342            }
1343            (Some(left_idx), Some(right_idx)) => {
1344                match field.data_type() {
1345                    DataType::Struct(child_fields) => {
1346                        let left_sub_array = left_columns[left_idx].as_struct();
1347                        let right_sub_array = right_columns[right_idx].as_struct();
1348                        let merged_sub_array =
1349                            merge_with_schema(left_sub_array, right_sub_array, child_fields);
1350                        output_fields.push(Field::new(
1351                            field.name(),
1352                            merged_sub_array.data_type().clone(),
1353                            field.is_nullable(),
1354                        ));
1355                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1356                    }
1357                    DataType::List(child_field) => {
1358                        let left_list = left_columns[left_idx]
1359                            .as_any()
1360                            .downcast_ref::<ListArray>()
1361                            .unwrap();
1362                        let right_list = right_columns[right_idx]
1363                            .as_any()
1364                            .downcast_ref::<ListArray>()
1365                            .unwrap();
1366                        let merged_values = merge_list_child_values(
1367                            child_field.as_ref(),
1368                            left_list.trimmed_values(),
1369                            right_list.trimmed_values(),
1370                        );
1371                        let merged_validity =
1372                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1373                        let merged_list = ListArray::new(
1374                            child_field.clone(),
1375                            left_list.offsets().clone(),
1376                            merged_values,
1377                            merged_validity,
1378                        );
1379                        output_fields.push(field.as_ref().clone());
1380                        columns.push(Arc::new(merged_list) as ArrayRef);
1381                    }
1382                    DataType::LargeList(child_field) => {
1383                        let left_list = left_columns[left_idx]
1384                            .as_any()
1385                            .downcast_ref::<LargeListArray>()
1386                            .unwrap();
1387                        let right_list = right_columns[right_idx]
1388                            .as_any()
1389                            .downcast_ref::<LargeListArray>()
1390                            .unwrap();
1391                        let merged_values = merge_list_child_values(
1392                            child_field.as_ref(),
1393                            left_list.trimmed_values(),
1394                            right_list.trimmed_values(),
1395                        );
1396                        let merged_validity =
1397                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1398                        let merged_list = LargeListArray::new(
1399                            child_field.clone(),
1400                            left_list.offsets().clone(),
1401                            merged_values,
1402                            merged_validity,
1403                        );
1404                        output_fields.push(field.as_ref().clone());
1405                        columns.push(Arc::new(merged_list) as ArrayRef);
1406                    }
1407                    DataType::FixedSizeList(child_field, list_size) => {
1408                        let left_list = left_columns[left_idx]
1409                            .as_any()
1410                            .downcast_ref::<FixedSizeListArray>()
1411                            .unwrap();
1412                        let right_list = right_columns[right_idx]
1413                            .as_any()
1414                            .downcast_ref::<FixedSizeListArray>()
1415                            .unwrap();
1416                        let merged_values = merge_list_child_values(
1417                            child_field.as_ref(),
1418                            left_list.values().clone(),
1419                            right_list.values().clone(),
1420                        );
1421                        let merged_validity =
1422                            merge_struct_validity(left_list.nulls(), right_list.nulls());
1423                        let merged_list = FixedSizeListArray::new(
1424                            child_field.clone(),
1425                            *list_size,
1426                            merged_values,
1427                            merged_validity,
1428                        );
1429                        output_fields.push(field.as_ref().clone());
1430                        columns.push(Arc::new(merged_list) as ArrayRef);
1431                    }
1432                    _ => {
1433                        output_fields.push(left_fields[left_idx].as_ref().clone());
1434                        // For fields that exist in both, use left but adjust validity
1435                        let adjusted_column =
1436                            adjust_child_validity(&left_columns[left_idx], left_validity);
1437                        columns.push(adjusted_column);
1438                    }
1439                }
1440            }
1441            (None, None) => {
1442                // The field will not be included in the output
1443            }
1444        }
1445    }
1446
1447    StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1448}
1449
1450fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1451    if components.is_empty() {
1452        return Some(array);
1453    }
1454    if !matches!(array.data_type(), DataType::Struct(_)) {
1455        return None;
1456    }
1457    let struct_arr = array.as_struct();
1458    struct_arr
1459        .column_by_name(components[0])
1460        .and_then(|arr| get_sub_array(arr, &components[1..]))
1461}
1462
1463/// Interleave multiple RecordBatches into a single RecordBatch.
1464///
1465/// Behaves like [`arrow_select::interleave::interleave`], but for RecordBatches.
1466pub fn interleave_batches(
1467    batches: &[RecordBatch],
1468    indices: &[(usize, usize)],
1469) -> Result<RecordBatch> {
1470    let first_batch = batches.first().ok_or_else(|| {
1471        ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1472    })?;
1473    let schema = first_batch.schema();
1474    let num_columns = first_batch.num_columns();
1475    let mut columns = Vec::with_capacity(num_columns);
1476    let mut chunks = Vec::with_capacity(batches.len());
1477
1478    for i in 0..num_columns {
1479        for batch in batches {
1480            chunks.push(batch.column(i).as_ref());
1481        }
1482        let new_column = interleave(&chunks, indices)?;
1483        columns.push(new_column);
1484        chunks.clear();
1485    }
1486
1487    RecordBatch::try_new(schema, columns)
1488}
1489
1490pub trait BufferExt {
1491    /// Create an `arrow_buffer::Buffer`` from a `bytes::Bytes` object
1492    ///
1493    /// The alignment must be specified (as `bytes_per_value`) since we want to make
1494    /// sure we can safely reinterpret the buffer.
1495    ///
1496    /// If the buffer is properly aligned this will be zero-copy.  If not, a copy
1497    /// will be made and an owned buffer returned.
1498    ///
1499    /// If `bytes_per_value` is not a power of two, then we assume the buffer is
1500    /// never going to be reinterpreted into another type and we can safely
1501    /// ignore the alignment.
1502    ///
1503    /// Yes, the method name is odd.  It's because there is already a `from_bytes`
1504    /// which converts from `arrow_buffer::bytes::Bytes` (not `bytes::Bytes`)
1505    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1506
1507    /// Allocates a new properly aligned arrow buffer and copies `bytes` into it
1508    ///
1509    /// `size_bytes` can be larger than `bytes` and, if so, the trailing bytes will
1510    /// be zeroed out.
1511    ///
1512    /// # Panics
1513    ///
1514    /// Panics if `size_bytes` is less than `bytes.len()`
1515    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1516}
1517
1518fn is_pwr_two(n: u64) -> bool {
1519    n & (n - 1) == 0
1520}
1521
1522impl BufferExt for arrow_buffer::Buffer {
1523    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1524        if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1525        {
1526            // The original buffer is not aligned, cannot zero-copy
1527            let size_bytes = bytes.len();
1528            Self::copy_bytes_bytes(bytes, size_bytes)
1529        } else {
1530            // The original buffer is aligned, can zero-copy
1531            // SAFETY: the alignment is correct we can make this conversion
1532            unsafe {
1533                Self::from_custom_allocation(
1534                    NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1535                    bytes.len(),
1536                    Arc::new(bytes),
1537                )
1538            }
1539        }
1540    }
1541
1542    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1543        assert!(size_bytes >= bytes.len());
1544        let mut buf = MutableBuffer::with_capacity(size_bytes);
1545        let to_fill = size_bytes - bytes.len();
1546        buf.extend(bytes);
1547        buf.extend(std::iter::repeat_n(0_u8, to_fill));
1548
1549        // FIX for issue #4512: Shrink buffer to actual size before converting to immutable
1550        // This reduces memory overhead from capacity over-allocation
1551        buf.shrink_to_fit();
1552
1553        Self::from(buf)
1554    }
1555}
1556
1557#[cfg(test)]
1558mod tests {
1559    use super::*;
1560    use arrow_array::{new_empty_array, new_null_array, ListArray, StringArray};
1561    use arrow_array::{Float32Array, Int32Array, StructArray};
1562    use arrow_buffer::OffsetBuffer;
1563
1564    #[test]
1565    fn test_merge_recursive() {
1566        let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1567        let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1568        let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1569        let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1570
1571        let left_schema = Schema::new(vec![
1572            Field::new("a", DataType::Int32, true),
1573            Field::new(
1574                "b",
1575                DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1576                true,
1577            ),
1578        ]);
1579        let left_batch = RecordBatch::try_new(
1580            Arc::new(left_schema),
1581            vec![
1582                Arc::new(a_array.clone()),
1583                Arc::new(StructArray::from(vec![(
1584                    Arc::new(Field::new("c", DataType::Int32, true)),
1585                    Arc::new(c_array.clone()) as ArrayRef,
1586                )])),
1587            ],
1588        )
1589        .unwrap();
1590
1591        let right_schema = Schema::new(vec![
1592            Field::new("e", DataType::Int32, true),
1593            Field::new(
1594                "b",
1595                DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1596                true,
1597            ),
1598        ]);
1599        let right_batch = RecordBatch::try_new(
1600            Arc::new(right_schema),
1601            vec![
1602                Arc::new(e_array.clone()),
1603                Arc::new(StructArray::from(vec![(
1604                    Arc::new(Field::new("d", DataType::Utf8, true)),
1605                    Arc::new(d_array.clone()) as ArrayRef,
1606                )])) as ArrayRef,
1607            ],
1608        )
1609        .unwrap();
1610
1611        let merged_schema = Schema::new(vec![
1612            Field::new("a", DataType::Int32, true),
1613            Field::new(
1614                "b",
1615                DataType::Struct(
1616                    vec![
1617                        Field::new("c", DataType::Int32, true),
1618                        Field::new("d", DataType::Utf8, true),
1619                    ]
1620                    .into(),
1621                ),
1622                true,
1623            ),
1624            Field::new("e", DataType::Int32, true),
1625        ]);
1626        let merged_batch = RecordBatch::try_new(
1627            Arc::new(merged_schema),
1628            vec![
1629                Arc::new(a_array) as ArrayRef,
1630                Arc::new(StructArray::from(vec![
1631                    (
1632                        Arc::new(Field::new("c", DataType::Int32, true)),
1633                        Arc::new(c_array) as ArrayRef,
1634                    ),
1635                    (
1636                        Arc::new(Field::new("d", DataType::Utf8, true)),
1637                        Arc::new(d_array) as ArrayRef,
1638                    ),
1639                ])) as ArrayRef,
1640                Arc::new(e_array) as ArrayRef,
1641            ],
1642        )
1643        .unwrap();
1644
1645        let result = left_batch.merge(&right_batch).unwrap();
1646        assert_eq!(result, merged_batch);
1647    }
1648
1649    #[test]
1650    fn test_merge_with_schema() {
1651        fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1652            let fields: Fields = names
1653                .iter()
1654                .zip(types)
1655                .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1656                .collect();
1657            let schema = Schema::new(vec![Field::new(
1658                "struct",
1659                DataType::Struct(fields.clone()),
1660                false,
1661            )]);
1662            let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1663            let batch = RecordBatch::try_new(
1664                Arc::new(schema.clone()),
1665                vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1666            );
1667            (schema, batch.unwrap())
1668        }
1669
1670        let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1671        let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1672        let (output_schema, _) = test_batch(
1673            &["b", "a", "c"],
1674            &[DataType::Int64, DataType::Int32, DataType::Int32],
1675        );
1676
1677        // If we use merge_with_schema the schema is respected
1678        let merged = left_batch
1679            .merge_with_schema(&right_batch, &output_schema)
1680            .unwrap();
1681        assert_eq!(merged.schema().as_ref(), &output_schema);
1682
1683        // If we use merge we get first-come first-serve based on the left batch
1684        let (naive_schema, _) = test_batch(
1685            &["a", "b", "c"],
1686            &[DataType::Int32, DataType::Int64, DataType::Int32],
1687        );
1688        let merged = left_batch.merge(&right_batch).unwrap();
1689        assert_eq!(merged.schema().as_ref(), &naive_schema);
1690    }
1691
1692    #[test]
1693    fn test_merge_list_struct() {
1694        let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1695        let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1696        let x_struct_field = Arc::new(Field::new(
1697            "item",
1698            DataType::Struct(Fields::from(vec![x_field.clone()])),
1699            true,
1700        ));
1701        let y_struct_field = Arc::new(Field::new(
1702            "item",
1703            DataType::Struct(Fields::from(vec![y_field.clone()])),
1704            true,
1705        ));
1706        let both_struct_field = Arc::new(Field::new(
1707            "item",
1708            DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1709            true,
1710        ));
1711        let left_schema = Schema::new(vec![Field::new(
1712            "list_struct",
1713            DataType::List(x_struct_field.clone()),
1714            true,
1715        )]);
1716        let right_schema = Schema::new(vec![Field::new(
1717            "list_struct",
1718            DataType::List(y_struct_field.clone()),
1719            true,
1720        )]);
1721        let both_schema = Schema::new(vec![Field::new(
1722            "list_struct",
1723            DataType::List(both_struct_field.clone()),
1724            true,
1725        )]);
1726
1727        let x = Arc::new(Int32Array::from(vec![1]));
1728        let y = Arc::new(Int32Array::from(vec![2]));
1729        let x_struct = Arc::new(StructArray::new(
1730            Fields::from(vec![x_field.clone()]),
1731            vec![x.clone()],
1732            None,
1733        ));
1734        let y_struct = Arc::new(StructArray::new(
1735            Fields::from(vec![y_field.clone()]),
1736            vec![y.clone()],
1737            None,
1738        ));
1739        let both_struct = Arc::new(StructArray::new(
1740            Fields::from(vec![x_field.clone(), y_field.clone()]),
1741            vec![x.clone(), y],
1742            None,
1743        ));
1744        let both_null_struct = Arc::new(StructArray::new(
1745            Fields::from(vec![x_field, y_field]),
1746            vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1747            None,
1748        ));
1749        let offsets = OffsetBuffer::from_lengths([1]);
1750        let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1751        let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1752        let both_list = ListArray::new(
1753            both_struct_field.clone(),
1754            offsets.clone(),
1755            both_struct,
1756            None,
1757        );
1758        let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1759        let x_batch =
1760            RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1761        let y_batch = RecordBatch::try_new(
1762            Arc::new(right_schema.clone()),
1763            vec![Arc::new(y_s_list.clone())],
1764        )
1765        .unwrap();
1766        let merged = x_batch.merge(&y_batch).unwrap();
1767        let expected =
1768            RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1769        assert_eq!(merged, expected);
1770
1771        let y_null_list = new_null_array(y_s_list.data_type(), 1);
1772        let y_null_batch =
1773            RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1774                .unwrap();
1775        let expected =
1776            RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1777        let merged = x_batch.merge(&y_null_batch).unwrap();
1778        assert_eq!(merged, expected);
1779    }
1780
1781    #[test]
1782    fn test_byte_width_opt() {
1783        assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1784        assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1785        assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1786        assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1787        assert_eq!(DataType::Utf8.byte_width_opt(), None);
1788        assert_eq!(DataType::Binary.byte_width_opt(), None);
1789        assert_eq!(
1790            DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1791            None
1792        );
1793        assert_eq!(
1794            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1795                .byte_width_opt(),
1796            Some(12)
1797        );
1798        assert_eq!(
1799            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1800                .byte_width_opt(),
1801            Some(16)
1802        );
1803        assert_eq!(
1804            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1805                .byte_width_opt(),
1806            None
1807        );
1808    }
1809
1810    #[test]
1811    fn test_take_record_batch() {
1812        let schema = Arc::new(Schema::new(vec![
1813            Field::new("a", DataType::Int32, true),
1814            Field::new("b", DataType::Utf8, true),
1815        ]));
1816        let batch = RecordBatch::try_new(
1817            schema.clone(),
1818            vec![
1819                Arc::new(Int32Array::from_iter_values(0..20)),
1820                Arc::new(StringArray::from_iter_values(
1821                    (0..20).map(|i| format!("str-{}", i)),
1822                )),
1823            ],
1824        )
1825        .unwrap();
1826        let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1827        assert_eq!(
1828            taken,
1829            RecordBatch::try_new(
1830                schema,
1831                vec![
1832                    Arc::new(Int32Array::from(vec![1, 5, 10])),
1833                    Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1834                ],
1835            )
1836            .unwrap()
1837        )
1838    }
1839
1840    #[test]
1841    fn test_schema_project_by_schema() {
1842        let metadata = [("key".to_string(), "value".to_string())];
1843        let schema = Arc::new(
1844            Schema::new(vec![
1845                Field::new("a", DataType::Int32, true),
1846                Field::new("b", DataType::Utf8, true),
1847            ])
1848            .with_metadata(metadata.clone().into()),
1849        );
1850        let batch = RecordBatch::try_new(
1851            schema,
1852            vec![
1853                Arc::new(Int32Array::from_iter_values(0..20)),
1854                Arc::new(StringArray::from_iter_values(
1855                    (0..20).map(|i| format!("str-{}", i)),
1856                )),
1857            ],
1858        )
1859        .unwrap();
1860
1861        // Empty schema
1862        let empty_schema = Schema::empty();
1863        let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1864        let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1865        assert_eq!(
1866            empty_projected,
1867            RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1868                .with_schema(Arc::new(expected_schema))
1869                .unwrap()
1870        );
1871
1872        // Re-ordered schema
1873        let reordered_schema = Schema::new(vec![
1874            Field::new("b", DataType::Utf8, true),
1875            Field::new("a", DataType::Int32, true),
1876        ]);
1877        let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1878        let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1879        assert_eq!(
1880            reordered_projected,
1881            RecordBatch::try_new(
1882                expected_schema,
1883                vec![
1884                    Arc::new(StringArray::from_iter_values(
1885                        (0..20).map(|i| format!("str-{}", i)),
1886                    )),
1887                    Arc::new(Int32Array::from_iter_values(0..20)),
1888                ],
1889            )
1890            .unwrap()
1891        );
1892
1893        // Sub schema
1894        let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1895        let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1896        let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1897        assert_eq!(
1898            sub_projected,
1899            RecordBatch::try_new(
1900                expected_schema,
1901                vec![Arc::new(Int32Array::from_iter_values(0..20))],
1902            )
1903            .unwrap()
1904        );
1905    }
1906
1907    #[test]
1908    fn test_project_preserves_struct_validity() {
1909        // Test that projecting a struct array preserves its validity (fix for issue #4385)
1910        let fields = Fields::from(vec![
1911            Field::new("id", DataType::Int32, false),
1912            Field::new("value", DataType::Float32, true),
1913        ]);
1914
1915        // Create a struct array with validity
1916        let id_array = Int32Array::from(vec![1, 2, 3]);
1917        let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1918        let struct_array = StructArray::new(
1919            fields.clone(),
1920            vec![
1921                Arc::new(id_array) as ArrayRef,
1922                Arc::new(value_array) as ArrayRef,
1923            ],
1924            Some(vec![true, false, true].into()), // Second struct is null
1925        );
1926
1927        // Project the struct array
1928        let projected = project(&struct_array, &fields).unwrap();
1929
1930        // Verify the validity is preserved
1931        assert_eq!(projected.null_count(), 1);
1932        assert!(!projected.is_null(0));
1933        assert!(projected.is_null(1));
1934        assert!(!projected.is_null(2));
1935    }
1936
1937    #[test]
1938    fn test_merge_struct_with_different_validity() {
1939        // Test case from Weston's review comment
1940        // File 1 has height field with some nulls
1941        let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1942        let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1943        let left_struct = StructArray::new(
1944            left_fields,
1945            vec![Arc::new(height_array) as ArrayRef],
1946            Some(vec![true, false, true, false].into()), // Rows 2 and 4 are null structs
1947        );
1948
1949        // File 2 has width field with some nulls
1950        let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1951        let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1952        let right_struct = StructArray::new(
1953            right_fields,
1954            vec![Arc::new(width_array) as ArrayRef],
1955            Some(vec![true, true, false, false].into()), // Rows 3 and 4 are null structs
1956        );
1957
1958        // Merge the two structs
1959        let merged = merge(&left_struct, &right_struct);
1960
1961        // Expected:
1962        // Row 1: both non-null -> {width: 300, height: 500}
1963        // Row 2: left null, right non-null -> {width: 200, height: null}
1964        // Row 3: left non-null, right null -> {width: null, height: 600}
1965        // Row 4: both null -> null struct
1966
1967        assert_eq!(merged.null_count(), 1); // Only row 4 is null
1968        assert!(!merged.is_null(0));
1969        assert!(!merged.is_null(1));
1970        assert!(!merged.is_null(2));
1971        assert!(merged.is_null(3));
1972
1973        // Check field values
1974        let height_col = merged.column_by_name("height").unwrap();
1975        let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1976        assert_eq!(height_values.value(0), 500);
1977        assert!(height_values.is_null(1)); // height is null when left struct was null
1978        assert_eq!(height_values.value(2), 600);
1979
1980        let width_col = merged.column_by_name("width").unwrap();
1981        let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1982        assert_eq!(width_values.value(0), 300);
1983        assert_eq!(width_values.value(1), 200);
1984        assert!(width_values.is_null(2)); // width is null when right struct was null
1985    }
1986
1987    #[test]
1988    fn test_merge_with_schema_with_nullable_struct_list_schema_mismatch() {
1989        // left_list setup
1990        let left_company_id = Arc::new(Int32Array::from(vec![None, None]));
1991        let left_count = Arc::new(Int32Array::from(vec![None, None]));
1992        let left_struct = Arc::new(StructArray::new(
1993            Fields::from(vec![
1994                Field::new("company_id", DataType::Int32, true),
1995                Field::new("count", DataType::Int32, true),
1996            ]),
1997            vec![left_company_id, left_count],
1998            None,
1999        ));
2000        let left_list = Arc::new(ListArray::new(
2001            Arc::new(Field::new(
2002                "item",
2003                DataType::Struct(left_struct.fields().clone()),
2004                true,
2005            )),
2006            OffsetBuffer::from_lengths([2]),
2007            left_struct,
2008            None,
2009        ));
2010
2011        // Right List Setup
2012        let right_company_name = Arc::new(StringArray::from(vec!["Google", "Microsoft"]));
2013        let right_struct = Arc::new(StructArray::new(
2014            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2015            vec![right_company_name],
2016            None,
2017        ));
2018        let right_list = Arc::new(ListArray::new(
2019            Arc::new(Field::new(
2020                "item",
2021                DataType::Struct(right_struct.fields().clone()),
2022                true,
2023            )),
2024            OffsetBuffer::from_lengths([2]),
2025            right_struct,
2026            None,
2027        ));
2028
2029        let target_fields = Fields::from(vec![Field::new(
2030            "companies",
2031            DataType::List(Arc::new(Field::new(
2032                "item",
2033                DataType::Struct(Fields::from(vec![
2034                    Field::new("company_id", DataType::Int32, true),
2035                    Field::new("company_name", DataType::Utf8, true),
2036                    Field::new("count", DataType::Int32, true),
2037                ])),
2038                true,
2039            ))),
2040            true,
2041        )]);
2042
2043        let left_batch = RecordBatch::try_new(
2044            Arc::new(Schema::new(vec![Field::new(
2045                "companies",
2046                left_list.data_type().clone(),
2047                true,
2048            )])),
2049            vec![left_list as ArrayRef],
2050        )
2051        .unwrap();
2052
2053        let right_batch = RecordBatch::try_new(
2054            Arc::new(Schema::new(vec![Field::new(
2055                "companies",
2056                right_list.data_type().clone(),
2057                true,
2058            )])),
2059            vec![right_list as ArrayRef],
2060        )
2061        .unwrap();
2062
2063        let merged = left_batch
2064            .merge_with_schema(&right_batch, &Schema::new(target_fields.to_vec()))
2065            .unwrap();
2066
2067        // Verify the merged structure
2068        let merged_list = merged
2069            .column_by_name("companies")
2070            .unwrap()
2071            .as_any()
2072            .downcast_ref::<ListArray>()
2073            .unwrap();
2074        let merged_struct = merged_list.values().as_struct();
2075
2076        // Should have all 3 fields
2077        assert_eq!(merged_struct.num_columns(), 3);
2078        assert!(merged_struct.column_by_name("company_id").is_some());
2079        assert!(merged_struct.column_by_name("company_name").is_some());
2080        assert!(merged_struct.column_by_name("count").is_some());
2081
2082        // Verify values
2083        let company_id = merged_struct
2084            .column_by_name("company_id")
2085            .unwrap()
2086            .as_any()
2087            .downcast_ref::<Int32Array>()
2088            .unwrap();
2089        assert!(company_id.is_null(0));
2090        assert!(company_id.is_null(1));
2091
2092        let company_name = merged_struct
2093            .column_by_name("company_name")
2094            .unwrap()
2095            .as_any()
2096            .downcast_ref::<StringArray>()
2097            .unwrap();
2098        assert_eq!(company_name.value(0), "Google");
2099        assert_eq!(company_name.value(1), "Microsoft");
2100
2101        let count = merged_struct
2102            .column_by_name("count")
2103            .unwrap()
2104            .as_any()
2105            .downcast_ref::<Int32Array>()
2106            .unwrap();
2107        assert!(count.is_null(0));
2108        assert!(count.is_null(1));
2109    }
2110
2111    #[test]
2112    fn test_merge_struct_lists() {
2113        test_merge_struct_lists_generic::<i32>();
2114    }
2115
2116    #[test]
2117    fn test_merge_struct_large_lists() {
2118        test_merge_struct_lists_generic::<i64>();
2119    }
2120
2121    fn test_merge_struct_lists_generic<O: OffsetSizeTrait>() {
2122        // left_list setup
2123        let left_company_id = Arc::new(Int32Array::from(vec![
2124            Some(1),
2125            Some(2),
2126            Some(3),
2127            Some(4),
2128            Some(5),
2129            Some(6),
2130            Some(7),
2131            Some(8),
2132            Some(9),
2133            Some(10),
2134            Some(11),
2135            Some(12),
2136            Some(13),
2137            Some(14),
2138            Some(15),
2139            Some(16),
2140            Some(17),
2141            Some(18),
2142            Some(19),
2143            Some(20),
2144        ]));
2145        let left_count = Arc::new(Int32Array::from(vec![
2146            Some(10),
2147            Some(20),
2148            Some(30),
2149            Some(40),
2150            Some(50),
2151            Some(60),
2152            Some(70),
2153            Some(80),
2154            Some(90),
2155            Some(100),
2156            Some(110),
2157            Some(120),
2158            Some(130),
2159            Some(140),
2160            Some(150),
2161            Some(160),
2162            Some(170),
2163            Some(180),
2164            Some(190),
2165            Some(200),
2166        ]));
2167        let left_struct = Arc::new(StructArray::new(
2168            Fields::from(vec![
2169                Field::new("company_id", DataType::Int32, true),
2170                Field::new("count", DataType::Int32, true),
2171            ]),
2172            vec![left_company_id, left_count],
2173            None,
2174        ));
2175
2176        let left_list = Arc::new(GenericListArray::<O>::new(
2177            Arc::new(Field::new(
2178                "item",
2179                DataType::Struct(left_struct.fields().clone()),
2180                true,
2181            )),
2182            OffsetBuffer::from_lengths([3, 1]),
2183            left_struct.clone(),
2184            None,
2185        ));
2186
2187        let left_list_struct = Arc::new(StructArray::new(
2188            Fields::from(vec![Field::new(
2189                "companies",
2190                if O::IS_LARGE {
2191                    DataType::LargeList(Arc::new(Field::new(
2192                        "item",
2193                        DataType::Struct(left_struct.fields().clone()),
2194                        true,
2195                    )))
2196                } else {
2197                    DataType::List(Arc::new(Field::new(
2198                        "item",
2199                        DataType::Struct(left_struct.fields().clone()),
2200                        true,
2201                    )))
2202                },
2203                true,
2204            )]),
2205            vec![left_list as ArrayRef],
2206            None,
2207        ));
2208
2209        // right_list setup
2210        let right_company_name = Arc::new(StringArray::from(vec![
2211            "Google",
2212            "Microsoft",
2213            "Apple",
2214            "Facebook",
2215        ]));
2216        let right_struct = Arc::new(StructArray::new(
2217            Fields::from(vec![Field::new("company_name", DataType::Utf8, true)]),
2218            vec![right_company_name],
2219            None,
2220        ));
2221        let right_list = Arc::new(GenericListArray::<O>::new(
2222            Arc::new(Field::new(
2223                "item",
2224                DataType::Struct(right_struct.fields().clone()),
2225                true,
2226            )),
2227            OffsetBuffer::from_lengths([3, 1]),
2228            right_struct.clone(),
2229            None,
2230        ));
2231
2232        let right_list_struct = Arc::new(StructArray::new(
2233            Fields::from(vec![Field::new(
2234                "companies",
2235                if O::IS_LARGE {
2236                    DataType::LargeList(Arc::new(Field::new(
2237                        "item",
2238                        DataType::Struct(right_struct.fields().clone()),
2239                        true,
2240                    )))
2241                } else {
2242                    DataType::List(Arc::new(Field::new(
2243                        "item",
2244                        DataType::Struct(right_struct.fields().clone()),
2245                        true,
2246                    )))
2247                },
2248                true,
2249            )]),
2250            vec![right_list as ArrayRef],
2251            None,
2252        ));
2253
2254        // prepare schema
2255        let target_fields = Fields::from(vec![Field::new(
2256            "companies",
2257            if O::IS_LARGE {
2258                DataType::LargeList(Arc::new(Field::new(
2259                    "item",
2260                    DataType::Struct(Fields::from(vec![
2261                        Field::new("company_id", DataType::Int32, true),
2262                        Field::new("company_name", DataType::Utf8, true),
2263                        Field::new("count", DataType::Int32, true),
2264                    ])),
2265                    true,
2266                )))
2267            } else {
2268                DataType::List(Arc::new(Field::new(
2269                    "item",
2270                    DataType::Struct(Fields::from(vec![
2271                        Field::new("company_id", DataType::Int32, true),
2272                        Field::new("company_name", DataType::Utf8, true),
2273                        Field::new("count", DataType::Int32, true),
2274                    ])),
2275                    true,
2276                )))
2277            },
2278            true,
2279        )]);
2280
2281        // merge left_list and right_list
2282        let merged_array = merge_with_schema(&left_list_struct, &right_list_struct, &target_fields);
2283        assert_eq!(merged_array.len(), 2);
2284    }
2285
2286    #[test]
2287    fn test_project_by_schema_list_struct_reorder() {
2288        // Test that project_by_schema correctly reorders fields inside List<Struct>
2289        // This is a regression test for issue #5702
2290
2291        // Source schema with inner struct fields in order: c, b, a
2292        let source_inner_struct = DataType::Struct(Fields::from(vec![
2293            Field::new("c", DataType::Utf8, true),
2294            Field::new("b", DataType::Utf8, true),
2295            Field::new("a", DataType::Utf8, true),
2296        ]));
2297        let source_schema = Arc::new(Schema::new(vec![
2298            Field::new("id", DataType::Int32, false),
2299            Field::new(
2300                "data",
2301                DataType::List(Arc::new(Field::new(
2302                    "item",
2303                    source_inner_struct.clone(),
2304                    true,
2305                ))),
2306                true,
2307            ),
2308        ]));
2309
2310        // Create source data with c, b, a order
2311        let c_array = StringArray::from(vec!["c1", "c2"]);
2312        let b_array = StringArray::from(vec!["b1", "b2"]);
2313        let a_array = StringArray::from(vec!["a1", "a2"]);
2314        let inner_struct = StructArray::from(vec![
2315            (
2316                Arc::new(Field::new("c", DataType::Utf8, true)),
2317                Arc::new(c_array) as ArrayRef,
2318            ),
2319            (
2320                Arc::new(Field::new("b", DataType::Utf8, true)),
2321                Arc::new(b_array) as ArrayRef,
2322            ),
2323            (
2324                Arc::new(Field::new("a", DataType::Utf8, true)),
2325                Arc::new(a_array) as ArrayRef,
2326            ),
2327        ]);
2328
2329        let list_array = ListArray::new(
2330            Arc::new(Field::new("item", source_inner_struct, true)),
2331            OffsetBuffer::from_lengths([1, 1]),
2332            Arc::new(inner_struct),
2333            None,
2334        );
2335
2336        let batch = RecordBatch::try_new(
2337            source_schema,
2338            vec![Arc::new(Int32Array::from(vec![1, 2])), Arc::new(list_array)],
2339        )
2340        .unwrap();
2341
2342        // Target schema with inner struct fields in order: a, b, c
2343        let target_inner_struct = DataType::Struct(Fields::from(vec![
2344            Field::new("a", DataType::Utf8, true),
2345            Field::new("b", DataType::Utf8, true),
2346            Field::new("c", DataType::Utf8, true),
2347        ]));
2348        let target_schema = Schema::new(vec![
2349            Field::new("id", DataType::Int32, false),
2350            Field::new(
2351                "data",
2352                DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2353                true,
2354            ),
2355        ]);
2356
2357        // Project should reorder the inner struct fields
2358        let projected = batch.project_by_schema(&target_schema).unwrap();
2359
2360        // Verify the schema is correct
2361        assert_eq!(projected.schema().as_ref(), &target_schema);
2362
2363        // Verify the data is correct by checking inner struct field order
2364        let projected_list = projected.column(1).as_list::<i32>();
2365        let projected_struct = projected_list.values().as_struct();
2366
2367        // Fields should now be in order: a, b, c
2368        assert_eq!(
2369            projected_struct.column_by_name("a").unwrap().as_ref(),
2370            &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2371        );
2372        assert_eq!(
2373            projected_struct.column_by_name("b").unwrap().as_ref(),
2374            &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2375        );
2376        assert_eq!(
2377            projected_struct.column_by_name("c").unwrap().as_ref(),
2378            &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2379        );
2380
2381        // Also verify positional access matches expected order (a=0, b=1, c=2)
2382        assert_eq!(
2383            projected_struct.column(0).as_ref(),
2384            &StringArray::from(vec!["a1", "a2"]) as &dyn Array
2385        );
2386        assert_eq!(
2387            projected_struct.column(1).as_ref(),
2388            &StringArray::from(vec!["b1", "b2"]) as &dyn Array
2389        );
2390        assert_eq!(
2391            projected_struct.column(2).as_ref(),
2392            &StringArray::from(vec!["c1", "c2"]) as &dyn Array
2393        );
2394    }
2395
2396    #[test]
2397    fn test_project_by_schema_nested_list_struct() {
2398        // Test deeply nested List<Struct<List<Struct>>> projection
2399        let inner_struct = DataType::Struct(Fields::from(vec![
2400            Field::new("y", DataType::Int32, true),
2401            Field::new("x", DataType::Int32, true),
2402        ]));
2403        let source_schema = Arc::new(Schema::new(vec![Field::new(
2404            "outer",
2405            DataType::List(Arc::new(Field::new(
2406                "item",
2407                DataType::Struct(Fields::from(vec![
2408                    Field::new("b", DataType::Utf8, true),
2409                    Field::new(
2410                        "inner_list",
2411                        DataType::List(Arc::new(Field::new("item", inner_struct.clone(), true))),
2412                        true,
2413                    ),
2414                    Field::new("a", DataType::Utf8, true),
2415                ])),
2416                true,
2417            ))),
2418            true,
2419        )]));
2420
2421        // Create deeply nested data
2422        let y_array = Int32Array::from(vec![1, 2]);
2423        let x_array = Int32Array::from(vec![3, 4]);
2424        let innermost_struct = StructArray::from(vec![
2425            (
2426                Arc::new(Field::new("y", DataType::Int32, true)),
2427                Arc::new(y_array) as ArrayRef,
2428            ),
2429            (
2430                Arc::new(Field::new("x", DataType::Int32, true)),
2431                Arc::new(x_array) as ArrayRef,
2432            ),
2433        ]);
2434        let inner_list = ListArray::new(
2435            Arc::new(Field::new("item", inner_struct.clone(), true)),
2436            OffsetBuffer::from_lengths([2]),
2437            Arc::new(innermost_struct),
2438            None,
2439        );
2440
2441        let b_array = StringArray::from(vec!["b1"]);
2442        let a_array = StringArray::from(vec!["a1"]);
2443        let middle_struct = StructArray::from(vec![
2444            (
2445                Arc::new(Field::new("b", DataType::Utf8, true)),
2446                Arc::new(b_array) as ArrayRef,
2447            ),
2448            (
2449                Arc::new(Field::new(
2450                    "inner_list",
2451                    DataType::List(Arc::new(Field::new("item", inner_struct, true))),
2452                    true,
2453                )),
2454                Arc::new(inner_list) as ArrayRef,
2455            ),
2456            (
2457                Arc::new(Field::new("a", DataType::Utf8, true)),
2458                Arc::new(a_array) as ArrayRef,
2459            ),
2460        ]);
2461
2462        let outer_list = ListArray::new(
2463            Arc::new(Field::new("item", middle_struct.data_type().clone(), true)),
2464            OffsetBuffer::from_lengths([1]),
2465            Arc::new(middle_struct),
2466            None,
2467        );
2468
2469        let batch =
2470            RecordBatch::try_new(source_schema, vec![Arc::new(outer_list) as ArrayRef]).unwrap();
2471
2472        // Target schema with reordered fields at all levels
2473        let target_inner_struct = DataType::Struct(Fields::from(vec![
2474            Field::new("x", DataType::Int32, true), // x before y now
2475            Field::new("y", DataType::Int32, true),
2476        ]));
2477        let target_schema = Schema::new(vec![Field::new(
2478            "outer",
2479            DataType::List(Arc::new(Field::new(
2480                "item",
2481                DataType::Struct(Fields::from(vec![
2482                    Field::new("a", DataType::Utf8, true), // a before b now
2483                    Field::new(
2484                        "inner_list",
2485                        DataType::List(Arc::new(Field::new("item", target_inner_struct, true))),
2486                        true,
2487                    ),
2488                    Field::new("b", DataType::Utf8, true),
2489                ])),
2490                true,
2491            ))),
2492            true,
2493        )]);
2494
2495        let projected = batch.project_by_schema(&target_schema).unwrap();
2496
2497        // Verify schema
2498        assert_eq!(projected.schema().as_ref(), &target_schema);
2499
2500        // Verify deeply nested data is reordered correctly
2501        let outer_list = projected.column(0).as_list::<i32>();
2502        let middle_struct = outer_list.values().as_struct();
2503
2504        // Middle struct should have a first, then inner_list, then b
2505        assert_eq!(
2506            middle_struct.column(0).as_ref(),
2507            &StringArray::from(vec!["a1"]) as &dyn Array
2508        );
2509        assert_eq!(
2510            middle_struct.column(2).as_ref(),
2511            &StringArray::from(vec!["b1"]) as &dyn Array
2512        );
2513
2514        // Inner list's struct should have x first, then y
2515        let inner_list = middle_struct.column(1).as_list::<i32>();
2516        let innermost_struct = inner_list.values().as_struct();
2517        assert_eq!(
2518            innermost_struct.column(0).as_ref(),
2519            &Int32Array::from(vec![3, 4]) as &dyn Array
2520        );
2521        assert_eq!(
2522            innermost_struct.column(1).as_ref(),
2523            &Int32Array::from(vec![1, 2]) as &dyn Array
2524        );
2525    }
2526}