lance_arrow/
lib.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Extend Arrow Functionality
5//!
6//! To improve Arrow-RS ergonomic
7
8use std::sync::Arc;
9use std::{collections::HashMap, ptr::NonNull};
10
11use arrow_array::{
12    cast::AsArray, Array, ArrayRef, ArrowNumericType, FixedSizeBinaryArray, FixedSizeListArray,
13    GenericListArray, OffsetSizeTrait, PrimitiveArray, RecordBatch, StructArray, UInt32Array,
14    UInt8Array,
15};
16use arrow_array::{
17    new_null_array, Float32Array, Float64Array, Int16Array, Int32Array, Int64Array, Int8Array,
18};
19use arrow_buffer::MutableBuffer;
20use arrow_data::ArrayDataBuilder;
21use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema};
22use arrow_select::{interleave::interleave, take::take};
23use rand::prelude::*;
24
25pub mod deepcopy;
26pub mod schema;
27pub use schema::*;
28pub mod bfloat16;
29pub mod floats;
30pub use floats::*;
31pub mod cast;
32pub mod list;
33pub mod memory;
34
35type Result<T> = std::result::Result<T, ArrowError>;
36
37pub trait DataTypeExt {
38    /// Returns true if the data type is binary-like, such as (Large)Utf8 and (Large)Binary.
39    ///
40    /// ```
41    /// use lance_arrow::*;
42    /// use arrow_schema::DataType;
43    ///
44    /// assert!(DataType::Utf8.is_binary_like());
45    /// assert!(DataType::Binary.is_binary_like());
46    /// assert!(DataType::LargeUtf8.is_binary_like());
47    /// assert!(DataType::LargeBinary.is_binary_like());
48    /// assert!(!DataType::Int32.is_binary_like());
49    /// ```
50    fn is_binary_like(&self) -> bool;
51
52    /// Returns true if the data type is a struct.
53    fn is_struct(&self) -> bool;
54
55    /// Check whether the given Arrow DataType is fixed stride.
56    ///
57    /// A fixed stride type has the same byte width for all array elements
58    /// This includes all PrimitiveType's Boolean, FixedSizeList, FixedSizeBinary, and Decimals
59    fn is_fixed_stride(&self) -> bool;
60
61    /// Returns true if the [DataType] is a dictionary type.
62    fn is_dictionary(&self) -> bool;
63
64    /// Returns the byte width of the data type
65    /// Panics if the data type is not fixed stride.
66    fn byte_width(&self) -> usize;
67
68    /// Returns the byte width of the data type, if it is fixed stride.
69    /// Returns None if the data type is not fixed stride.
70    fn byte_width_opt(&self) -> Option<usize>;
71}
72
73impl DataTypeExt for DataType {
74    fn is_binary_like(&self) -> bool {
75        use DataType::*;
76        matches!(self, Utf8 | Binary | LargeUtf8 | LargeBinary)
77    }
78
79    fn is_struct(&self) -> bool {
80        matches!(self, Self::Struct(_))
81    }
82
83    fn is_fixed_stride(&self) -> bool {
84        use DataType::*;
85        matches!(
86            self,
87            Boolean
88                | UInt8
89                | UInt16
90                | UInt32
91                | UInt64
92                | Int8
93                | Int16
94                | Int32
95                | Int64
96                | Float16
97                | Float32
98                | Float64
99                | Decimal128(_, _)
100                | Decimal256(_, _)
101                | FixedSizeList(_, _)
102                | FixedSizeBinary(_)
103                | Duration(_)
104                | Timestamp(_, _)
105                | Date32
106                | Date64
107                | Time32(_)
108                | Time64(_)
109        )
110    }
111
112    fn is_dictionary(&self) -> bool {
113        matches!(self, Self::Dictionary(_, _))
114    }
115
116    fn byte_width_opt(&self) -> Option<usize> {
117        match self {
118            Self::Int8 => Some(1),
119            Self::Int16 => Some(2),
120            Self::Int32 => Some(4),
121            Self::Int64 => Some(8),
122            Self::UInt8 => Some(1),
123            Self::UInt16 => Some(2),
124            Self::UInt32 => Some(4),
125            Self::UInt64 => Some(8),
126            Self::Float16 => Some(2),
127            Self::Float32 => Some(4),
128            Self::Float64 => Some(8),
129            Self::Date32 => Some(4),
130            Self::Date64 => Some(8),
131            Self::Time32(_) => Some(4),
132            Self::Time64(_) => Some(8),
133            Self::Timestamp(_, _) => Some(8),
134            Self::Duration(_) => Some(8),
135            Self::Decimal128(_, _) => Some(16),
136            Self::Decimal256(_, _) => Some(32),
137            Self::Interval(unit) => match unit {
138                IntervalUnit::YearMonth => Some(4),
139                IntervalUnit::DayTime => Some(8),
140                IntervalUnit::MonthDayNano => Some(16),
141            },
142            Self::FixedSizeBinary(s) => Some(*s as usize),
143            Self::FixedSizeList(dt, s) => dt
144                .data_type()
145                .byte_width_opt()
146                .map(|width| width * *s as usize),
147            _ => None,
148        }
149    }
150
151    fn byte_width(&self) -> usize {
152        self.byte_width_opt()
153            .unwrap_or_else(|| panic!("Expecting fixed stride data type, found {:?}", self))
154    }
155}
156
157/// Create an [`GenericListArray`] from values and offsets.
158///
159/// ```
160/// use arrow_array::{Int32Array, Int64Array, ListArray};
161/// use arrow_array::types::Int64Type;
162/// use lance_arrow::try_new_generic_list_array;
163///
164/// let offsets = Int32Array::from_iter([0, 2, 7, 10]);
165/// let int_values = Int64Array::from_iter(0..10);
166/// let list_arr = try_new_generic_list_array(int_values, &offsets).unwrap();
167/// assert_eq!(list_arr,
168///     ListArray::from_iter_primitive::<Int64Type, _, _>(vec![
169///         Some(vec![Some(0), Some(1)]),
170///         Some(vec![Some(2), Some(3), Some(4), Some(5), Some(6)]),
171///         Some(vec![Some(7), Some(8), Some(9)]),
172/// ]))
173/// ```
174pub fn try_new_generic_list_array<T: Array, Offset: ArrowNumericType>(
175    values: T,
176    offsets: &PrimitiveArray<Offset>,
177) -> Result<GenericListArray<Offset::Native>>
178where
179    Offset::Native: OffsetSizeTrait,
180{
181    let data_type = if Offset::Native::IS_LARGE {
182        DataType::LargeList(Arc::new(Field::new(
183            "item",
184            values.data_type().clone(),
185            true,
186        )))
187    } else {
188        DataType::List(Arc::new(Field::new(
189            "item",
190            values.data_type().clone(),
191            true,
192        )))
193    };
194    let data = ArrayDataBuilder::new(data_type)
195        .len(offsets.len() - 1)
196        .add_buffer(offsets.into_data().buffers()[0].clone())
197        .add_child_data(values.into_data())
198        .build()?;
199
200    Ok(GenericListArray::from(data))
201}
202
203pub fn fixed_size_list_type(list_width: i32, inner_type: DataType) -> DataType {
204    DataType::FixedSizeList(Arc::new(Field::new("item", inner_type, true)), list_width)
205}
206
207pub trait FixedSizeListArrayExt {
208    /// Create an [`FixedSizeListArray`] from values and list size.
209    ///
210    /// ```
211    /// use arrow_array::{Int64Array, FixedSizeListArray};
212    /// use arrow_array::types::Int64Type;
213    /// use lance_arrow::FixedSizeListArrayExt;
214    ///
215    /// let int_values = Int64Array::from_iter(0..10);
216    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 2).unwrap();
217    /// assert_eq!(fixed_size_list_arr,
218    ///     FixedSizeListArray::from_iter_primitive::<Int64Type, _, _>(vec![
219    ///         Some(vec![Some(0), Some(1)]),
220    ///         Some(vec![Some(2), Some(3)]),
221    ///         Some(vec![Some(4), Some(5)]),
222    ///         Some(vec![Some(6), Some(7)]),
223    ///         Some(vec![Some(8), Some(9)])
224    /// ], 2))
225    /// ```
226    fn try_new_from_values<T: Array + 'static>(
227        values: T,
228        list_size: i32,
229    ) -> Result<FixedSizeListArray>;
230
231    /// Sample `n` rows from the [FixedSizeListArray]
232    ///
233    /// ```
234    /// use arrow_array::{Int64Array, FixedSizeListArray, Array};
235    /// use lance_arrow::FixedSizeListArrayExt;
236    ///
237    /// let int_values = Int64Array::from_iter(0..256);
238    /// let fixed_size_list_arr = FixedSizeListArray::try_new_from_values(int_values, 16).unwrap();
239    /// let sampled = fixed_size_list_arr.sample(10).unwrap();
240    /// assert_eq!(sampled.len(), 10);
241    /// assert_eq!(sampled.value_length(), 16);
242    /// assert_eq!(sampled.values().len(), 160);
243    /// ```
244    fn sample(&self, n: usize) -> Result<FixedSizeListArray>;
245
246    /// Ensure the [FixedSizeListArray] of Float16, Float32, Float64,
247    /// Int8, Int16, Int32, Int64, UInt8, UInt32 type to its closest floating point type.
248    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray>;
249}
250
251impl FixedSizeListArrayExt for FixedSizeListArray {
252    fn try_new_from_values<T: Array + 'static>(values: T, list_size: i32) -> Result<Self> {
253        let field = Arc::new(Field::new("item", values.data_type().clone(), true));
254        let values = Arc::new(values);
255
256        Self::try_new(field, list_size, values, None)
257    }
258
259    fn sample(&self, n: usize) -> Result<FixedSizeListArray> {
260        if n >= self.len() {
261            return Ok(self.clone());
262        }
263        let mut rng = SmallRng::from_entropy();
264        let chosen = (0..self.len() as u32).choose_multiple(&mut rng, n);
265        take(self, &UInt32Array::from(chosen), None).map(|arr| arr.as_fixed_size_list().clone())
266    }
267
268    fn convert_to_floating_point(&self) -> Result<FixedSizeListArray> {
269        match self.data_type() {
270            DataType::FixedSizeList(field, size) => match field.data_type() {
271                DataType::Float16 | DataType::Float32 | DataType::Float64 => Ok(self.clone()),
272                DataType::Int8 => Ok(Self::new(
273                    Arc::new(arrow_schema::Field::new(
274                        field.name(),
275                        DataType::Float32,
276                        field.is_nullable(),
277                    )),
278                    *size,
279                    Arc::new(Float32Array::from_iter_values(
280                        self.values()
281                            .as_any()
282                            .downcast_ref::<Int8Array>()
283                            .ok_or(ArrowError::ParseError(
284                                "Fail to cast primitive array to Int8Type".to_string(),
285                            ))?
286                            .into_iter()
287                            .filter_map(|x| x.map(|y| y as f32)),
288                    )),
289                    self.nulls().cloned(),
290                )),
291                DataType::Int16 => Ok(Self::new(
292                    Arc::new(arrow_schema::Field::new(
293                        field.name(),
294                        DataType::Float32,
295                        field.is_nullable(),
296                    )),
297                    *size,
298                    Arc::new(Float32Array::from_iter_values(
299                        self.values()
300                            .as_any()
301                            .downcast_ref::<Int16Array>()
302                            .ok_or(ArrowError::ParseError(
303                                "Fail to cast primitive array to Int8Type".to_string(),
304                            ))?
305                            .into_iter()
306                            .filter_map(|x| x.map(|y| y as f32)),
307                    )),
308                    self.nulls().cloned(),
309                )),
310                DataType::Int32 => Ok(Self::new(
311                    Arc::new(arrow_schema::Field::new(
312                        field.name(),
313                        DataType::Float32,
314                        field.is_nullable(),
315                    )),
316                    *size,
317                    Arc::new(Float32Array::from_iter_values(
318                        self.values()
319                            .as_any()
320                            .downcast_ref::<Int32Array>()
321                            .ok_or(ArrowError::ParseError(
322                                "Fail to cast primitive array to Int8Type".to_string(),
323                            ))?
324                            .into_iter()
325                            .filter_map(|x| x.map(|y| y as f32)),
326                    )),
327                    self.nulls().cloned(),
328                )),
329                DataType::Int64 => Ok(Self::new(
330                    Arc::new(arrow_schema::Field::new(
331                        field.name(),
332                        DataType::Float64,
333                        field.is_nullable(),
334                    )),
335                    *size,
336                    Arc::new(Float64Array::from_iter_values(
337                        self.values()
338                            .as_any()
339                            .downcast_ref::<Int64Array>()
340                            .ok_or(ArrowError::ParseError(
341                                "Fail to cast primitive array to Int8Type".to_string(),
342                            ))?
343                            .into_iter()
344                            .filter_map(|x| x.map(|y| y as f64)),
345                    )),
346                    self.nulls().cloned(),
347                )),
348                DataType::UInt8 => Ok(Self::new(
349                    Arc::new(arrow_schema::Field::new(
350                        field.name(),
351                        DataType::Float64,
352                        field.is_nullable(),
353                    )),
354                    *size,
355                    Arc::new(Float64Array::from_iter_values(
356                        self.values()
357                            .as_any()
358                            .downcast_ref::<UInt8Array>()
359                            .ok_or(ArrowError::ParseError(
360                                "Fail to cast primitive array to Int8Type".to_string(),
361                            ))?
362                            .into_iter()
363                            .filter_map(|x| x.map(|y| y as f64)),
364                    )),
365                    self.nulls().cloned(),
366                )),
367                DataType::UInt32 => Ok(Self::new(
368                    Arc::new(arrow_schema::Field::new(
369                        field.name(),
370                        DataType::Float64,
371                        field.is_nullable(),
372                    )),
373                    *size,
374                    Arc::new(Float64Array::from_iter_values(
375                        self.values()
376                            .as_any()
377                            .downcast_ref::<UInt32Array>()
378                            .ok_or(ArrowError::ParseError(
379                                "Fail to cast primitive array to Int8Type".to_string(),
380                            ))?
381                            .into_iter()
382                            .filter_map(|x| x.map(|y| y as f64)),
383                    )),
384                    self.nulls().cloned(),
385                )),
386                data_type => Err(ArrowError::ParseError(format!(
387                    "Expect either floating type or integer got {:?}",
388                    data_type
389                ))),
390            },
391            data_type => Err(ArrowError::ParseError(format!(
392                "Expect either FixedSizeList got {:?}",
393                data_type
394            ))),
395        }
396    }
397}
398
399/// Force downcast of an [`Array`], such as an [`ArrayRef`], to
400/// [`FixedSizeListArray`], panic'ing on failure.
401pub fn as_fixed_size_list_array(arr: &dyn Array) -> &FixedSizeListArray {
402    arr.as_any().downcast_ref::<FixedSizeListArray>().unwrap()
403}
404
405pub trait FixedSizeBinaryArrayExt {
406    /// Create an [`FixedSizeBinaryArray`] from values and stride.
407    ///
408    /// ```
409    /// use arrow_array::{UInt8Array, FixedSizeBinaryArray};
410    /// use arrow_array::types::UInt8Type;
411    /// use lance_arrow::FixedSizeBinaryArrayExt;
412    ///
413    /// let int_values = UInt8Array::from_iter(0..10);
414    /// let fixed_size_list_arr = FixedSizeBinaryArray::try_new_from_values(&int_values, 2).unwrap();
415    /// assert_eq!(fixed_size_list_arr,
416    ///     FixedSizeBinaryArray::from(vec![
417    ///         Some(vec![0, 1].as_slice()),
418    ///         Some(vec![2, 3].as_slice()),
419    ///         Some(vec![4, 5].as_slice()),
420    ///         Some(vec![6, 7].as_slice()),
421    ///         Some(vec![8, 9].as_slice())
422    /// ]))
423    /// ```
424    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<FixedSizeBinaryArray>;
425}
426
427impl FixedSizeBinaryArrayExt for FixedSizeBinaryArray {
428    fn try_new_from_values(values: &UInt8Array, stride: i32) -> Result<Self> {
429        let data_type = DataType::FixedSizeBinary(stride);
430        let data = ArrayDataBuilder::new(data_type)
431            .len(values.len() / stride as usize)
432            .add_buffer(values.into_data().buffers()[0].clone())
433            .build()?;
434        Ok(Self::from(data))
435    }
436}
437
438pub fn as_fixed_size_binary_array(arr: &dyn Array) -> &FixedSizeBinaryArray {
439    arr.as_any().downcast_ref::<FixedSizeBinaryArray>().unwrap()
440}
441
442pub fn iter_str_array(arr: &dyn Array) -> Box<dyn Iterator<Item = Option<&str>> + Send + '_> {
443    match arr.data_type() {
444        DataType::Utf8 => Box::new(arr.as_string::<i32>().iter()),
445        DataType::LargeUtf8 => Box::new(arr.as_string::<i64>().iter()),
446        _ => panic!("Expecting Utf8 or LargeUtf8, found {:?}", arr.data_type()),
447    }
448}
449
450/// Extends Arrow's [RecordBatch].
451pub trait RecordBatchExt {
452    /// Append a new column to this [`RecordBatch`] and returns a new RecordBatch.
453    ///
454    /// ```
455    /// use std::sync::Arc;
456    /// use arrow_array::{RecordBatch, Int32Array, StringArray};
457    /// use arrow_schema::{Schema, Field, DataType};
458    /// use lance_arrow::*;
459    ///
460    /// let schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
461    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
462    /// let record_batch = RecordBatch::try_new(schema, vec![int_arr.clone()]).unwrap();
463    ///
464    /// let new_field = Field::new("s", DataType::Utf8, true);
465    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
466    /// let new_record_batch = record_batch.try_with_column(new_field, str_arr.clone()).unwrap();
467    ///
468    /// assert_eq!(
469    ///     new_record_batch,
470    ///     RecordBatch::try_new(
471    ///         Arc::new(Schema::new(
472    ///             vec![
473    ///                 Field::new("a", DataType::Int32, true),
474    ///                 Field::new("s", DataType::Utf8, true)
475    ///             ])
476    ///         ),
477    ///         vec![int_arr, str_arr],
478    ///     ).unwrap()
479    /// )
480    /// ```
481    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
482
483    /// Created a new RecordBatch with column at index.
484    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<RecordBatch>;
485
486    /// Creates a new [`RecordBatch`] from the provided  [`StructArray`].
487    ///
488    /// The fields on the [`StructArray`] need to match this [`RecordBatch`] schema
489    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<RecordBatch>;
490
491    /// Merge with another [`RecordBatch`] and returns a new one.
492    ///
493    /// Fields are merged based on name.  First we iterate the left columns.  If a matching
494    /// name is found in the right then we merge the two columns.  If there is no match then
495    /// we add the left column to the output.
496    ///
497    /// To merge two columns we consider the type.  If both arrays are struct arrays we recurse.
498    /// Otherwise we use the left array.
499    ///
500    /// Afterwards we add all non-matching right columns to the output.
501    ///
502    /// Note: This method likely does not handle nested fields correctly and you may want to consider
503    /// using [`merge_with_schema`] instead.
504    /// ```
505    /// use std::sync::Arc;
506    /// use arrow_array::*;
507    /// use arrow_schema::{Schema, Field, DataType};
508    /// use lance_arrow::*;
509    ///
510    /// let left_schema = Arc::new(Schema::new(vec![Field::new("a", DataType::Int32, true)]));
511    /// let int_arr = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
512    /// let left = RecordBatch::try_new(left_schema, vec![int_arr.clone()]).unwrap();
513    ///
514    /// let right_schema = Arc::new(Schema::new(vec![Field::new("s", DataType::Utf8, true)]));
515    /// let str_arr = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
516    /// let right = RecordBatch::try_new(right_schema, vec![str_arr.clone()]).unwrap();
517    ///
518    /// let new_record_batch = left.merge(&right).unwrap();
519    ///
520    /// assert_eq!(
521    ///     new_record_batch,
522    ///     RecordBatch::try_new(
523    ///         Arc::new(Schema::new(
524    ///             vec![
525    ///                 Field::new("a", DataType::Int32, true),
526    ///                 Field::new("s", DataType::Utf8, true)
527    ///             ])
528    ///         ),
529    ///         vec![int_arr, str_arr],
530    ///     ).unwrap()
531    /// )
532    /// ```
533    ///
534    /// TODO: add merge nested fields support.
535    fn merge(&self, other: &RecordBatch) -> Result<RecordBatch>;
536
537    /// Create a batch by merging columns between two batches with a given schema.
538    ///
539    /// A reference schema is used to determine the proper ordering of nested fields.
540    ///
541    /// For each field in the reference schema we look for corresponding fields in
542    /// the left and right batches.  If a field is found in both batches we recursively merge
543    /// it.
544    ///
545    /// If a field is only in the left or right batch we take it as it is.
546    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch>;
547
548    /// Drop one column specified with the name and return the new [`RecordBatch`].
549    ///
550    /// If the named column does not exist, it returns a copy of this [`RecordBatch`].
551    fn drop_column(&self, name: &str) -> Result<RecordBatch>;
552
553    /// Replace a column (specified by name) and return the new [`RecordBatch`].
554    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch>;
555
556    /// Replace a column schema (specified by name) and return the new [`RecordBatch`].
557    fn replace_column_schema_by_name(
558        &self,
559        name: &str,
560        new_data_type: DataType,
561        column: Arc<dyn Array>,
562    ) -> Result<RecordBatch>;
563
564    /// Rename a column at a given index.
565    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch>;
566
567    /// Get (potentially nested) column by qualified name.
568    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef>;
569
570    /// Project the schema over the [RecordBatch].
571    fn project_by_schema(&self, schema: &Schema) -> Result<RecordBatch>;
572
573    /// metadata of the schema.
574    fn metadata(&self) -> &HashMap<String, String>;
575
576    /// Add metadata to the schema.
577    fn add_metadata(&self, key: String, value: String) -> Result<RecordBatch> {
578        let mut metadata = self.metadata().clone();
579        metadata.insert(key, value);
580        self.with_metadata(metadata)
581    }
582
583    /// Replace the schema metadata with the provided one.
584    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch>;
585
586    /// Take selected rows from the [RecordBatch].
587    fn take(&self, indices: &UInt32Array) -> Result<RecordBatch>;
588}
589
590impl RecordBatchExt for RecordBatch {
591    fn try_with_column(&self, field: Field, arr: ArrayRef) -> Result<Self> {
592        let new_schema = Arc::new(self.schema().as_ref().try_with_column(field)?);
593        let mut new_columns = self.columns().to_vec();
594        new_columns.push(arr);
595        Self::try_new(new_schema, new_columns)
596    }
597
598    fn try_with_column_at(&self, index: usize, field: Field, arr: ArrayRef) -> Result<Self> {
599        let new_schema = Arc::new(self.schema().as_ref().try_with_column_at(index, field)?);
600        let mut new_columns = self.columns().to_vec();
601        new_columns.insert(index, arr);
602        Self::try_new(new_schema, new_columns)
603    }
604
605    fn try_new_from_struct_array(&self, arr: StructArray) -> Result<Self> {
606        let schema = Arc::new(Schema::new_with_metadata(
607            arr.fields().to_vec(),
608            self.schema().metadata.clone(),
609        ));
610        let batch = Self::from(arr);
611        batch.with_schema(schema)
612    }
613
614    fn merge(&self, other: &Self) -> Result<Self> {
615        if self.num_rows() != other.num_rows() {
616            return Err(ArrowError::InvalidArgumentError(format!(
617                "Attempt to merge two RecordBatch with different sizes: {} != {}",
618                self.num_rows(),
619                other.num_rows()
620            )));
621        }
622        let left_struct_array: StructArray = self.clone().into();
623        let right_struct_array: StructArray = other.clone().into();
624        self.try_new_from_struct_array(merge(&left_struct_array, &right_struct_array))
625    }
626
627    fn merge_with_schema(&self, other: &RecordBatch, schema: &Schema) -> Result<RecordBatch> {
628        if self.num_rows() != other.num_rows() {
629            return Err(ArrowError::InvalidArgumentError(format!(
630                "Attempt to merge two RecordBatch with different sizes: {} != {}",
631                self.num_rows(),
632                other.num_rows()
633            )));
634        }
635        let left_struct_array: StructArray = self.clone().into();
636        let right_struct_array: StructArray = other.clone().into();
637        self.try_new_from_struct_array(merge_with_schema(
638            &left_struct_array,
639            &right_struct_array,
640            schema.fields(),
641        ))
642    }
643
644    fn drop_column(&self, name: &str) -> Result<Self> {
645        let mut fields = vec![];
646        let mut columns = vec![];
647        for i in 0..self.schema().fields.len() {
648            if self.schema().field(i).name() != name {
649                fields.push(self.schema().field(i).clone());
650                columns.push(self.column(i).clone());
651            }
652        }
653        Self::try_new(
654            Arc::new(Schema::new_with_metadata(
655                fields,
656                self.schema().metadata().clone(),
657            )),
658            columns,
659        )
660    }
661
662    fn rename_column(&self, index: usize, new_name: &str) -> Result<RecordBatch> {
663        let mut fields = self.schema().fields().to_vec();
664        if index >= fields.len() {
665            return Err(ArrowError::InvalidArgumentError(format!(
666                "Index out of bounds: {}",
667                index
668            )));
669        }
670        fields[index] = Arc::new(Field::new(
671            new_name,
672            fields[index].data_type().clone(),
673            fields[index].is_nullable(),
674        ));
675        Self::try_new(
676            Arc::new(Schema::new_with_metadata(
677                fields,
678                self.schema().metadata().clone(),
679            )),
680            self.columns().to_vec(),
681        )
682    }
683
684    fn replace_column_by_name(&self, name: &str, column: Arc<dyn Array>) -> Result<RecordBatch> {
685        let mut columns = self.columns().to_vec();
686        let field_i = self
687            .schema()
688            .fields()
689            .iter()
690            .position(|f| f.name() == name)
691            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
692        columns[field_i] = column;
693        Self::try_new(self.schema(), columns)
694    }
695
696    fn replace_column_schema_by_name(
697        &self,
698        name: &str,
699        new_data_type: DataType,
700        column: Arc<dyn Array>,
701    ) -> Result<RecordBatch> {
702        let fields = self
703            .schema()
704            .fields()
705            .iter()
706            .map(|x| {
707                if x.name() != name {
708                    x.clone()
709                } else {
710                    let new_field = Field::new(name, new_data_type.clone(), x.is_nullable());
711                    Arc::new(new_field)
712                }
713            })
714            .collect::<Vec<_>>();
715        let schema = Schema::new_with_metadata(fields, self.schema().metadata.clone());
716        let mut columns = self.columns().to_vec();
717        let field_i = self
718            .schema()
719            .fields()
720            .iter()
721            .position(|f| f.name() == name)
722            .ok_or_else(|| ArrowError::SchemaError(format!("Field {} does not exist", name)))?;
723        columns[field_i] = column;
724        Self::try_new(Arc::new(schema), columns)
725    }
726
727    fn column_by_qualified_name(&self, name: &str) -> Option<&ArrayRef> {
728        let split = name.split('.').collect::<Vec<_>>();
729        if split.is_empty() {
730            return None;
731        }
732
733        self.column_by_name(split[0])
734            .and_then(|arr| get_sub_array(arr, &split[1..]))
735    }
736
737    fn project_by_schema(&self, schema: &Schema) -> Result<Self> {
738        let struct_array: StructArray = self.clone().into();
739        self.try_new_from_struct_array(project(&struct_array, schema.fields())?)
740    }
741
742    fn metadata(&self) -> &HashMap<String, String> {
743        self.schema_ref().metadata()
744    }
745
746    fn with_metadata(&self, metadata: HashMap<String, String>) -> Result<RecordBatch> {
747        let mut schema = self.schema_ref().as_ref().clone();
748        schema.metadata = metadata;
749        Self::try_new(schema.into(), self.columns().into())
750    }
751
752    fn take(&self, indices: &UInt32Array) -> Result<Self> {
753        let struct_array: StructArray = self.clone().into();
754        let taken = take(&struct_array, indices, None)?;
755        self.try_new_from_struct_array(taken.as_struct().clone())
756    }
757}
758
759fn project(struct_array: &StructArray, fields: &Fields) -> Result<StructArray> {
760    if fields.is_empty() {
761        return Ok(StructArray::new_empty_fields(
762            struct_array.len(),
763            struct_array.nulls().cloned(),
764        ));
765    }
766    let mut columns: Vec<ArrayRef> = vec![];
767    for field in fields.iter() {
768        if let Some(col) = struct_array.column_by_name(field.name()) {
769            match field.data_type() {
770                // TODO handle list-of-struct
771                DataType::Struct(subfields) => {
772                    let projected = project(col.as_struct(), subfields)?;
773                    columns.push(Arc::new(projected));
774                }
775                _ => {
776                    columns.push(col.clone());
777                }
778            }
779        } else {
780            return Err(ArrowError::SchemaError(format!(
781                "field {} does not exist in the RecordBatch",
782                field.name()
783            )));
784        }
785    }
786    // Preserve the struct's validity when projecting
787    StructArray::try_new(fields.clone(), columns, struct_array.nulls().cloned())
788}
789
790fn lists_have_same_offsets_helper<T: OffsetSizeTrait>(left: &dyn Array, right: &dyn Array) -> bool {
791    let left_list: &GenericListArray<T> = left.as_list();
792    let right_list: &GenericListArray<T> = right.as_list();
793    left_list.offsets().inner() == right_list.offsets().inner()
794}
795
796fn merge_list_structs_helper<T: OffsetSizeTrait>(
797    left: &dyn Array,
798    right: &dyn Array,
799    items_field_name: impl Into<String>,
800    items_nullable: bool,
801) -> Arc<dyn Array> {
802    let left_list: &GenericListArray<T> = left.as_list();
803    let right_list: &GenericListArray<T> = right.as_list();
804    let left_struct = left_list.values();
805    let right_struct = right_list.values();
806    let left_struct_arr = left_struct.as_struct();
807    let right_struct_arr = right_struct.as_struct();
808    let merged_items = Arc::new(merge(left_struct_arr, right_struct_arr));
809    let items_field = Arc::new(Field::new(
810        items_field_name,
811        merged_items.data_type().clone(),
812        items_nullable,
813    ));
814    Arc::new(GenericListArray::<T>::new(
815        items_field,
816        left_list.offsets().clone(),
817        merged_items,
818        left_list.nulls().cloned(),
819    ))
820}
821
822fn merge_list_struct_null_helper<T: OffsetSizeTrait>(
823    left: &dyn Array,
824    right: &dyn Array,
825    not_null: &dyn Array,
826    items_field_name: impl Into<String>,
827) -> Arc<dyn Array> {
828    let left_list: &GenericListArray<T> = left.as_list::<T>();
829    let not_null_list = not_null.as_list::<T>();
830    let right_list = right.as_list::<T>();
831
832    let left_struct = left_list.values().as_struct();
833    let not_null_struct: &StructArray = not_null_list.values().as_struct();
834    let right_struct = right_list.values().as_struct();
835
836    let values_len = not_null_list.values().len();
837    let mut merged_fields =
838        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
839    let mut merged_columns =
840        Vec::with_capacity(not_null_struct.num_columns() + right_struct.num_columns());
841
842    for (_, field) in left_struct.columns().iter().zip(left_struct.fields()) {
843        merged_fields.push(field.clone());
844        if let Some(val) = not_null_struct.column_by_name(field.name()) {
845            merged_columns.push(val.clone());
846        } else {
847            merged_columns.push(new_null_array(field.data_type(), values_len))
848        }
849    }
850    for (_, field) in right_struct
851        .columns()
852        .iter()
853        .zip(right_struct.fields())
854        .filter(|(_, field)| left_struct.column_by_name(field.name()).is_none())
855    {
856        merged_fields.push(field.clone());
857        if let Some(val) = not_null_struct.column_by_name(field.name()) {
858            merged_columns.push(val.clone());
859        } else {
860            merged_columns.push(new_null_array(field.data_type(), values_len));
861        }
862    }
863
864    let merged_struct = Arc::new(StructArray::new(
865        Fields::from(merged_fields),
866        merged_columns,
867        not_null_struct.nulls().cloned(),
868    ));
869    let items_field = Arc::new(Field::new(
870        items_field_name,
871        merged_struct.data_type().clone(),
872        true,
873    ));
874    Arc::new(GenericListArray::<T>::new(
875        items_field,
876        not_null_list.offsets().clone(),
877        merged_struct,
878        not_null_list.nulls().cloned(),
879    ))
880}
881
882fn merge_list_struct_null(
883    left: &dyn Array,
884    right: &dyn Array,
885    not_null: &dyn Array,
886) -> Arc<dyn Array> {
887    match left.data_type() {
888        DataType::List(left_field) => {
889            merge_list_struct_null_helper::<i32>(left, right, not_null, left_field.name())
890        }
891        DataType::LargeList(left_field) => {
892            merge_list_struct_null_helper::<i64>(left, right, not_null, left_field.name())
893        }
894        _ => unreachable!(),
895    }
896}
897
898fn merge_list_struct(left: &dyn Array, right: &dyn Array) -> Arc<dyn Array> {
899    // Merging fields into a list<struct<...>> is tricky and can only succeed
900    // in two ways.  First, if both lists have the same offsets.  Second, if
901    // one of the lists is all-null
902    if left.null_count() == left.len() {
903        return merge_list_struct_null(left, right, right);
904    } else if right.null_count() == right.len() {
905        return merge_list_struct_null(left, right, left);
906    }
907    match (left.data_type(), right.data_type()) {
908        (DataType::List(left_field), DataType::List(_)) => {
909            if !lists_have_same_offsets_helper::<i32>(left, right) {
910                panic!("Attempt to merge list struct arrays which do not have same offsets");
911            }
912            merge_list_structs_helper::<i32>(
913                left,
914                right,
915                left_field.name(),
916                left_field.is_nullable(),
917            )
918        }
919        (DataType::LargeList(left_field), DataType::LargeList(_)) => {
920            if !lists_have_same_offsets_helper::<i64>(left, right) {
921                panic!("Attempt to merge list struct arrays which do not have same offsets");
922            }
923            merge_list_structs_helper::<i64>(
924                left,
925                right,
926                left_field.name(),
927                left_field.is_nullable(),
928            )
929        }
930        _ => unreachable!(),
931    }
932}
933
934/// Helper function to normalize validity buffers
935/// Returns None for all-null validity (placeholder structs)
936fn normalize_validity(
937    validity: Option<&arrow_buffer::NullBuffer>,
938) -> Option<&arrow_buffer::NullBuffer> {
939    validity.and_then(|v| {
940        if v.null_count() == v.len() {
941            None
942        } else {
943            Some(v)
944        }
945    })
946}
947
948/// Helper function to merge validity buffers from two struct arrays
949/// Returns None only if both arrays are null at the same position
950///
951/// Special handling for placeholder structs (all-null validity)
952fn merge_struct_validity(
953    left_validity: Option<&arrow_buffer::NullBuffer>,
954    right_validity: Option<&arrow_buffer::NullBuffer>,
955) -> Option<arrow_buffer::NullBuffer> {
956    // Normalize both validity buffers (convert all-null to None)
957    let left_normalized = normalize_validity(left_validity);
958    let right_normalized = normalize_validity(right_validity);
959
960    match (left_normalized, right_normalized) {
961        // Fast paths: no computation needed
962        (None, None) => None,
963        (Some(left), None) => Some(left.clone()),
964        (None, Some(right)) => Some(right.clone()),
965        (Some(left), Some(right)) => {
966            // Fast path: if both have no nulls, can return either one
967            if left.null_count() == 0 && right.null_count() == 0 {
968                return Some(left.clone());
969            }
970
971            let left_buffer = left.inner();
972            let right_buffer = right.inner();
973
974            // Perform bitwise OR directly on BooleanBuffers
975            // This preserves the correct semantics: 1 = valid, 0 = null
976            let merged_buffer = left_buffer | right_buffer;
977
978            Some(arrow_buffer::NullBuffer::from(merged_buffer))
979        }
980    }
981}
982
983// Helper function to adjust child array validity based on parent struct validity
984// When parent struct is null, propagates null to child array
985// Optimized with fast paths and SIMD operations
986fn adjust_child_validity(
987    child: &ArrayRef,
988    parent_validity: Option<&arrow_buffer::NullBuffer>,
989) -> ArrayRef {
990    // Fast path: no parent validity means no adjustment needed
991    let parent_validity = match parent_validity {
992        None => return child.clone(),
993        Some(p) if p.null_count() == 0 => return child.clone(), // No nulls to propagate
994        Some(p) => p,
995    };
996
997    let child_validity = child.nulls();
998
999    // Compute the new validity: child_validity AND parent_validity
1000    let new_validity = match child_validity {
1001        None => {
1002            // Fast path: child has no existing validity, just use parent's
1003            parent_validity.clone()
1004        }
1005        Some(child_nulls) => {
1006            let child_buffer = child_nulls.inner();
1007            let parent_buffer = parent_validity.inner();
1008
1009            // Perform bitwise AND directly on BooleanBuffers
1010            // This preserves the correct semantics: 1 = valid, 0 = null
1011            let merged_buffer = child_buffer & parent_buffer;
1012
1013            arrow_buffer::NullBuffer::from(merged_buffer)
1014        }
1015    };
1016
1017    // Create new array with adjusted validity
1018    arrow_array::make_array(
1019        arrow_data::ArrayData::try_new(
1020            child.data_type().clone(),
1021            child.len(),
1022            Some(new_validity.into_inner().into_inner()),
1023            child.offset(),
1024            child.to_data().buffers().to_vec(),
1025            child.to_data().child_data().to_vec(),
1026        )
1027        .unwrap(),
1028    )
1029}
1030
1031fn merge(left_struct_array: &StructArray, right_struct_array: &StructArray) -> StructArray {
1032    let mut fields: Vec<Field> = vec![];
1033    let mut columns: Vec<ArrayRef> = vec![];
1034    let right_fields = right_struct_array.fields();
1035    let right_columns = right_struct_array.columns();
1036
1037    // Get the validity buffers from both structs
1038    let left_validity = left_struct_array.nulls();
1039    let right_validity = right_struct_array.nulls();
1040
1041    // Compute merged validity
1042    let merged_validity = merge_struct_validity(left_validity, right_validity);
1043
1044    // iterate through the fields on the left hand side
1045    for (left_field, left_column) in left_struct_array
1046        .fields()
1047        .iter()
1048        .zip(left_struct_array.columns().iter())
1049    {
1050        match right_fields
1051            .iter()
1052            .position(|f| f.name() == left_field.name())
1053        {
1054            // if the field exists on the right hand side, merge them recursively if appropriate
1055            Some(right_index) => {
1056                let right_field = right_fields.get(right_index).unwrap();
1057                let right_column = right_columns.get(right_index).unwrap();
1058                // if both fields are struct, merge them recursively
1059                match (left_field.data_type(), right_field.data_type()) {
1060                    (DataType::Struct(_), DataType::Struct(_)) => {
1061                        let left_sub_array = left_column.as_struct();
1062                        let right_sub_array = right_column.as_struct();
1063                        let merged_sub_array = merge(left_sub_array, right_sub_array);
1064                        fields.push(Field::new(
1065                            left_field.name(),
1066                            merged_sub_array.data_type().clone(),
1067                            left_field.is_nullable(),
1068                        ));
1069                        columns.push(Arc::new(merged_sub_array) as ArrayRef);
1070                    }
1071                    (DataType::List(left_list), DataType::List(right_list))
1072                        if left_list.data_type().is_struct()
1073                            && right_list.data_type().is_struct() =>
1074                    {
1075                        // If there is nothing to merge just use the left field
1076                        if left_list.data_type() == right_list.data_type() {
1077                            fields.push(left_field.as_ref().clone());
1078                            columns.push(left_column.clone());
1079                        }
1080                        // If we have two List<Struct> and they have different sets of fields then
1081                        // we can merge them if the offsets arrays are the same.  Otherwise, we
1082                        // have to consider it an error.
1083                        let merged_sub_array = merge_list_struct(&left_column, &right_column);
1084
1085                        fields.push(Field::new(
1086                            left_field.name(),
1087                            merged_sub_array.data_type().clone(),
1088                            left_field.is_nullable(),
1089                        ));
1090                        columns.push(merged_sub_array);
1091                    }
1092                    // otherwise, just use the field on the left hand side
1093                    _ => {
1094                        // TODO handle list-of-struct and other types
1095                        fields.push(left_field.as_ref().clone());
1096                        // Adjust the column validity: if left struct was null, propagate to child
1097                        let adjusted_column = adjust_child_validity(left_column, left_validity);
1098                        columns.push(adjusted_column);
1099                    }
1100                }
1101            }
1102            None => {
1103                fields.push(left_field.as_ref().clone());
1104                // Adjust the column validity: if left struct was null, propagate to child
1105                let adjusted_column = adjust_child_validity(left_column, left_validity);
1106                columns.push(adjusted_column);
1107            }
1108        }
1109    }
1110
1111    // now iterate through the fields on the right hand side
1112    right_fields
1113        .iter()
1114        .zip(right_columns.iter())
1115        .for_each(|(field, column)| {
1116            // add new columns on the right
1117            if !left_struct_array
1118                .fields()
1119                .iter()
1120                .any(|f| f.name() == field.name())
1121            {
1122                fields.push(field.as_ref().clone());
1123                // This field doesn't exist on the left
1124                // We use the right's column but need to adjust for struct validity
1125                let adjusted_column = adjust_child_validity(column, right_validity);
1126                columns.push(adjusted_column);
1127            }
1128        });
1129
1130    StructArray::try_new(Fields::from(fields), columns, merged_validity).unwrap()
1131}
1132
1133fn merge_with_schema(
1134    left_struct_array: &StructArray,
1135    right_struct_array: &StructArray,
1136    fields: &Fields,
1137) -> StructArray {
1138    // Helper function that returns true if both types are struct or both are non-struct
1139    fn same_type_kind(left: &DataType, right: &DataType) -> bool {
1140        match (left, right) {
1141            (DataType::Struct(_), DataType::Struct(_)) => true,
1142            (DataType::Struct(_), _) => false,
1143            (_, DataType::Struct(_)) => false,
1144            _ => true,
1145        }
1146    }
1147
1148    let mut output_fields: Vec<Field> = Vec::with_capacity(fields.len());
1149    let mut columns: Vec<ArrayRef> = Vec::with_capacity(fields.len());
1150
1151    let left_fields = left_struct_array.fields();
1152    let left_columns = left_struct_array.columns();
1153    let right_fields = right_struct_array.fields();
1154    let right_columns = right_struct_array.columns();
1155
1156    // Get the validity buffers from both structs
1157    let left_validity = left_struct_array.nulls();
1158    let right_validity = right_struct_array.nulls();
1159
1160    // Compute merged validity
1161    let merged_validity = merge_struct_validity(left_validity, right_validity);
1162
1163    for field in fields {
1164        let left_match_idx = left_fields.iter().position(|f| {
1165            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1166        });
1167        let right_match_idx = right_fields.iter().position(|f| {
1168            f.name() == field.name() && same_type_kind(f.data_type(), field.data_type())
1169        });
1170
1171        match (left_match_idx, right_match_idx) {
1172            (None, Some(right_idx)) => {
1173                output_fields.push(right_fields[right_idx].as_ref().clone());
1174                // Adjust validity if the right struct was null
1175                let adjusted_column =
1176                    adjust_child_validity(&right_columns[right_idx], right_validity);
1177                columns.push(adjusted_column);
1178            }
1179            (Some(left_idx), None) => {
1180                output_fields.push(left_fields[left_idx].as_ref().clone());
1181                // Adjust validity if the left struct was null
1182                let adjusted_column = adjust_child_validity(&left_columns[left_idx], left_validity);
1183                columns.push(adjusted_column);
1184            }
1185            (Some(left_idx), Some(right_idx)) => {
1186                if let DataType::Struct(child_fields) = field.data_type() {
1187                    let left_sub_array = left_columns[left_idx].as_struct();
1188                    let right_sub_array = right_columns[right_idx].as_struct();
1189                    let merged_sub_array =
1190                        merge_with_schema(left_sub_array, right_sub_array, child_fields);
1191                    output_fields.push(Field::new(
1192                        field.name(),
1193                        merged_sub_array.data_type().clone(),
1194                        field.is_nullable(),
1195                    ));
1196                    columns.push(Arc::new(merged_sub_array) as ArrayRef);
1197                } else {
1198                    output_fields.push(left_fields[left_idx].as_ref().clone());
1199                    // For fields that exist in both, use left but adjust validity
1200                    let adjusted_column =
1201                        adjust_child_validity(&left_columns[left_idx], left_validity);
1202                    columns.push(adjusted_column);
1203                }
1204            }
1205            (None, None) => {
1206                // The field will not be included in the output
1207            }
1208        }
1209    }
1210
1211    StructArray::try_new(Fields::from(output_fields), columns, merged_validity).unwrap()
1212}
1213
1214fn get_sub_array<'a>(array: &'a ArrayRef, components: &[&str]) -> Option<&'a ArrayRef> {
1215    if components.is_empty() {
1216        return Some(array);
1217    }
1218    if !matches!(array.data_type(), DataType::Struct(_)) {
1219        return None;
1220    }
1221    let struct_arr = array.as_struct();
1222    struct_arr
1223        .column_by_name(components[0])
1224        .and_then(|arr| get_sub_array(arr, &components[1..]))
1225}
1226
1227/// Interleave multiple RecordBatches into a single RecordBatch.
1228///
1229/// Behaves like [`arrow::compute::interleave`], but for RecordBatches.
1230pub fn interleave_batches(
1231    batches: &[RecordBatch],
1232    indices: &[(usize, usize)],
1233) -> Result<RecordBatch> {
1234    let first_batch = batches.first().ok_or_else(|| {
1235        ArrowError::InvalidArgumentError("Cannot interleave zero RecordBatches".to_string())
1236    })?;
1237    let schema = first_batch.schema();
1238    let num_columns = first_batch.num_columns();
1239    let mut columns = Vec::with_capacity(num_columns);
1240    let mut chunks = Vec::with_capacity(batches.len());
1241
1242    for i in 0..num_columns {
1243        for batch in batches {
1244            chunks.push(batch.column(i).as_ref());
1245        }
1246        let new_column = interleave(&chunks, indices)?;
1247        columns.push(new_column);
1248        chunks.clear();
1249    }
1250
1251    RecordBatch::try_new(schema, columns)
1252}
1253
1254pub trait BufferExt {
1255    /// Create an `arrow_buffer::Buffer`` from a `bytes::Bytes` object
1256    ///
1257    /// The alignment must be specified (as `bytes_per_value`) since we want to make
1258    /// sure we can safely reinterpret the buffer.
1259    ///
1260    /// If the buffer is properly aligned this will be zero-copy.  If not, a copy
1261    /// will be made and an owned buffer returned.
1262    ///
1263    /// If `bytes_per_value` is not a power of two, then we assume the buffer is
1264    /// never going to be reinterpreted into another type and we can safely
1265    /// ignore the alignment.
1266    ///
1267    /// Yes, the method name is odd.  It's because there is already a `from_bytes`
1268    /// which converts from `arrow_buffer::bytes::Bytes` (not `bytes::Bytes`)
1269    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self;
1270
1271    /// Allocates a new properly aligned arrow buffer and copies `bytes` into it
1272    ///
1273    /// `size_bytes` can be larger than `bytes` and, if so, the trailing bytes will
1274    /// be zeroed out.
1275    ///
1276    /// # Panics
1277    ///
1278    /// Panics if `size_bytes` is less than `bytes.len()`
1279    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self;
1280}
1281
1282fn is_pwr_two(n: u64) -> bool {
1283    n & (n - 1) == 0
1284}
1285
1286impl BufferExt for arrow_buffer::Buffer {
1287    fn from_bytes_bytes(bytes: bytes::Bytes, bytes_per_value: u64) -> Self {
1288        if is_pwr_two(bytes_per_value) && bytes.as_ptr().align_offset(bytes_per_value as usize) != 0
1289        {
1290            // The original buffer is not aligned, cannot zero-copy
1291            let size_bytes = bytes.len();
1292            Self::copy_bytes_bytes(bytes, size_bytes)
1293        } else {
1294            // The original buffer is aligned, can zero-copy
1295            // SAFETY: the alignment is correct we can make this conversion
1296            unsafe {
1297                Self::from_custom_allocation(
1298                    NonNull::new(bytes.as_ptr() as _).expect("should be a valid pointer"),
1299                    bytes.len(),
1300                    Arc::new(bytes),
1301                )
1302            }
1303        }
1304    }
1305
1306    fn copy_bytes_bytes(bytes: bytes::Bytes, size_bytes: usize) -> Self {
1307        assert!(size_bytes >= bytes.len());
1308        let mut buf = MutableBuffer::with_capacity(size_bytes);
1309        let to_fill = size_bytes - bytes.len();
1310        buf.extend(bytes);
1311        buf.extend(std::iter::repeat_n(0_u8, to_fill));
1312        Self::from(buf)
1313    }
1314}
1315
1316#[cfg(test)]
1317mod tests {
1318    use super::*;
1319    use arrow_array::{new_empty_array, new_null_array, ListArray, StringArray};
1320    use arrow_array::{Float32Array, Int32Array, StructArray};
1321    use arrow_buffer::OffsetBuffer;
1322
1323    #[test]
1324    fn test_merge_recursive() {
1325        let a_array = Int32Array::from(vec![Some(1), Some(2), Some(3)]);
1326        let e_array = Int32Array::from(vec![Some(4), Some(5), Some(6)]);
1327        let c_array = Int32Array::from(vec![Some(7), Some(8), Some(9)]);
1328        let d_array = StringArray::from(vec![Some("a"), Some("b"), Some("c")]);
1329
1330        let left_schema = Schema::new(vec![
1331            Field::new("a", DataType::Int32, true),
1332            Field::new(
1333                "b",
1334                DataType::Struct(vec![Field::new("c", DataType::Int32, true)].into()),
1335                true,
1336            ),
1337        ]);
1338        let left_batch = RecordBatch::try_new(
1339            Arc::new(left_schema),
1340            vec![
1341                Arc::new(a_array.clone()),
1342                Arc::new(StructArray::from(vec![(
1343                    Arc::new(Field::new("c", DataType::Int32, true)),
1344                    Arc::new(c_array.clone()) as ArrayRef,
1345                )])),
1346            ],
1347        )
1348        .unwrap();
1349
1350        let right_schema = Schema::new(vec![
1351            Field::new("e", DataType::Int32, true),
1352            Field::new(
1353                "b",
1354                DataType::Struct(vec![Field::new("d", DataType::Utf8, true)].into()),
1355                true,
1356            ),
1357        ]);
1358        let right_batch = RecordBatch::try_new(
1359            Arc::new(right_schema),
1360            vec![
1361                Arc::new(e_array.clone()),
1362                Arc::new(StructArray::from(vec![(
1363                    Arc::new(Field::new("d", DataType::Utf8, true)),
1364                    Arc::new(d_array.clone()) as ArrayRef,
1365                )])) as ArrayRef,
1366            ],
1367        )
1368        .unwrap();
1369
1370        let merged_schema = Schema::new(vec![
1371            Field::new("a", DataType::Int32, true),
1372            Field::new(
1373                "b",
1374                DataType::Struct(
1375                    vec![
1376                        Field::new("c", DataType::Int32, true),
1377                        Field::new("d", DataType::Utf8, true),
1378                    ]
1379                    .into(),
1380                ),
1381                true,
1382            ),
1383            Field::new("e", DataType::Int32, true),
1384        ]);
1385        let merged_batch = RecordBatch::try_new(
1386            Arc::new(merged_schema),
1387            vec![
1388                Arc::new(a_array) as ArrayRef,
1389                Arc::new(StructArray::from(vec![
1390                    (
1391                        Arc::new(Field::new("c", DataType::Int32, true)),
1392                        Arc::new(c_array) as ArrayRef,
1393                    ),
1394                    (
1395                        Arc::new(Field::new("d", DataType::Utf8, true)),
1396                        Arc::new(d_array) as ArrayRef,
1397                    ),
1398                ])) as ArrayRef,
1399                Arc::new(e_array) as ArrayRef,
1400            ],
1401        )
1402        .unwrap();
1403
1404        let result = left_batch.merge(&right_batch).unwrap();
1405        assert_eq!(result, merged_batch);
1406    }
1407
1408    #[test]
1409    fn test_merge_with_schema() {
1410        fn test_batch(names: &[&str], types: &[DataType]) -> (Schema, RecordBatch) {
1411            let fields: Fields = names
1412                .iter()
1413                .zip(types)
1414                .map(|(name, ty)| Field::new(name.to_string(), ty.clone(), false))
1415                .collect();
1416            let schema = Schema::new(vec![Field::new(
1417                "struct",
1418                DataType::Struct(fields.clone()),
1419                false,
1420            )]);
1421            let children = types.iter().map(new_empty_array).collect::<Vec<_>>();
1422            let batch = RecordBatch::try_new(
1423                Arc::new(schema.clone()),
1424                vec![Arc::new(StructArray::new(fields, children, None)) as ArrayRef],
1425            );
1426            (schema, batch.unwrap())
1427        }
1428
1429        let (_, left_batch) = test_batch(&["a", "b"], &[DataType::Int32, DataType::Int64]);
1430        let (_, right_batch) = test_batch(&["c", "b"], &[DataType::Int32, DataType::Int64]);
1431        let (output_schema, _) = test_batch(
1432            &["b", "a", "c"],
1433            &[DataType::Int64, DataType::Int32, DataType::Int32],
1434        );
1435
1436        // If we use merge_with_schema the schema is respected
1437        let merged = left_batch
1438            .merge_with_schema(&right_batch, &output_schema)
1439            .unwrap();
1440        assert_eq!(merged.schema().as_ref(), &output_schema);
1441
1442        // If we use merge we get first-come first-serve based on the left batch
1443        let (naive_schema, _) = test_batch(
1444            &["a", "b", "c"],
1445            &[DataType::Int32, DataType::Int64, DataType::Int32],
1446        );
1447        let merged = left_batch.merge(&right_batch).unwrap();
1448        assert_eq!(merged.schema().as_ref(), &naive_schema);
1449    }
1450
1451    #[test]
1452    fn test_merge_list_struct() {
1453        let x_field = Arc::new(Field::new("x", DataType::Int32, true));
1454        let y_field = Arc::new(Field::new("y", DataType::Int32, true));
1455        let x_struct_field = Arc::new(Field::new(
1456            "item",
1457            DataType::Struct(Fields::from(vec![x_field.clone()])),
1458            true,
1459        ));
1460        let y_struct_field = Arc::new(Field::new(
1461            "item",
1462            DataType::Struct(Fields::from(vec![y_field.clone()])),
1463            true,
1464        ));
1465        let both_struct_field = Arc::new(Field::new(
1466            "item",
1467            DataType::Struct(Fields::from(vec![x_field.clone(), y_field.clone()])),
1468            true,
1469        ));
1470        let left_schema = Schema::new(vec![Field::new(
1471            "list_struct",
1472            DataType::List(x_struct_field.clone()),
1473            true,
1474        )]);
1475        let right_schema = Schema::new(vec![Field::new(
1476            "list_struct",
1477            DataType::List(y_struct_field.clone()),
1478            true,
1479        )]);
1480        let both_schema = Schema::new(vec![Field::new(
1481            "list_struct",
1482            DataType::List(both_struct_field.clone()),
1483            true,
1484        )]);
1485
1486        let x = Arc::new(Int32Array::from(vec![1]));
1487        let y = Arc::new(Int32Array::from(vec![2]));
1488        let x_struct = Arc::new(StructArray::new(
1489            Fields::from(vec![x_field.clone()]),
1490            vec![x.clone()],
1491            None,
1492        ));
1493        let y_struct = Arc::new(StructArray::new(
1494            Fields::from(vec![y_field.clone()]),
1495            vec![y.clone()],
1496            None,
1497        ));
1498        let both_struct = Arc::new(StructArray::new(
1499            Fields::from(vec![x_field.clone(), y_field.clone()]),
1500            vec![x.clone(), y],
1501            None,
1502        ));
1503        let both_null_struct = Arc::new(StructArray::new(
1504            Fields::from(vec![x_field, y_field]),
1505            vec![x, Arc::new(new_null_array(&DataType::Int32, 1))],
1506            None,
1507        ));
1508        let offsets = OffsetBuffer::from_lengths([1]);
1509        let x_s_list = ListArray::new(x_struct_field, offsets.clone(), x_struct, None);
1510        let y_s_list = ListArray::new(y_struct_field, offsets.clone(), y_struct, None);
1511        let both_list = ListArray::new(
1512            both_struct_field.clone(),
1513            offsets.clone(),
1514            both_struct,
1515            None,
1516        );
1517        let both_null_list = ListArray::new(both_struct_field, offsets, both_null_struct, None);
1518        let x_batch =
1519            RecordBatch::try_new(Arc::new(left_schema), vec![Arc::new(x_s_list)]).unwrap();
1520        let y_batch = RecordBatch::try_new(
1521            Arc::new(right_schema.clone()),
1522            vec![Arc::new(y_s_list.clone())],
1523        )
1524        .unwrap();
1525        let merged = x_batch.merge(&y_batch).unwrap();
1526        let expected =
1527            RecordBatch::try_new(Arc::new(both_schema.clone()), vec![Arc::new(both_list)]).unwrap();
1528        assert_eq!(merged, expected);
1529
1530        let y_null_list = new_null_array(y_s_list.data_type(), 1);
1531        let y_null_batch =
1532            RecordBatch::try_new(Arc::new(right_schema), vec![Arc::new(y_null_list.clone())])
1533                .unwrap();
1534        let expected =
1535            RecordBatch::try_new(Arc::new(both_schema), vec![Arc::new(both_null_list)]).unwrap();
1536        let merged = x_batch.merge(&y_null_batch).unwrap();
1537        assert_eq!(merged, expected);
1538    }
1539
1540    #[test]
1541    fn test_byte_width_opt() {
1542        assert_eq!(DataType::Int32.byte_width_opt(), Some(4));
1543        assert_eq!(DataType::Int64.byte_width_opt(), Some(8));
1544        assert_eq!(DataType::Float32.byte_width_opt(), Some(4));
1545        assert_eq!(DataType::Float64.byte_width_opt(), Some(8));
1546        assert_eq!(DataType::Utf8.byte_width_opt(), None);
1547        assert_eq!(DataType::Binary.byte_width_opt(), None);
1548        assert_eq!(
1549            DataType::List(Arc::new(Field::new("item", DataType::Int32, true))).byte_width_opt(),
1550            None
1551        );
1552        assert_eq!(
1553            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 3)
1554                .byte_width_opt(),
1555            Some(12)
1556        );
1557        assert_eq!(
1558            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Int32, true)), 4)
1559                .byte_width_opt(),
1560            Some(16)
1561        );
1562        assert_eq!(
1563            DataType::FixedSizeList(Arc::new(Field::new("item", DataType::Utf8, true)), 5)
1564                .byte_width_opt(),
1565            None
1566        );
1567    }
1568
1569    #[test]
1570    fn test_take_record_batch() {
1571        let schema = Arc::new(Schema::new(vec![
1572            Field::new("a", DataType::Int32, true),
1573            Field::new("b", DataType::Utf8, true),
1574        ]));
1575        let batch = RecordBatch::try_new(
1576            schema.clone(),
1577            vec![
1578                Arc::new(Int32Array::from_iter_values(0..20)),
1579                Arc::new(StringArray::from_iter_values(
1580                    (0..20).map(|i| format!("str-{}", i)),
1581                )),
1582            ],
1583        )
1584        .unwrap();
1585        let taken = batch.take(&(vec![1_u32, 5_u32, 10_u32].into())).unwrap();
1586        assert_eq!(
1587            taken,
1588            RecordBatch::try_new(
1589                schema,
1590                vec![
1591                    Arc::new(Int32Array::from(vec![1, 5, 10])),
1592                    Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1593                ],
1594            )
1595            .unwrap()
1596        )
1597    }
1598
1599    #[test]
1600    fn test_schema_project_by_schema() {
1601        let metadata = [("key".to_string(), "value".to_string())];
1602        let schema = Arc::new(
1603            Schema::new(vec![
1604                Field::new("a", DataType::Int32, true),
1605                Field::new("b", DataType::Utf8, true),
1606            ])
1607            .with_metadata(metadata.clone().into()),
1608        );
1609        let batch = RecordBatch::try_new(
1610            schema,
1611            vec![
1612                Arc::new(Int32Array::from_iter_values(0..20)),
1613                Arc::new(StringArray::from_iter_values(
1614                    (0..20).map(|i| format!("str-{}", i)),
1615                )),
1616            ],
1617        )
1618        .unwrap();
1619
1620        // Empty schema
1621        let empty_schema = Schema::empty();
1622        let empty_projected = batch.project_by_schema(&empty_schema).unwrap();
1623        let expected_schema = empty_schema.with_metadata(metadata.clone().into());
1624        assert_eq!(
1625            empty_projected,
1626            RecordBatch::from(StructArray::new_empty_fields(batch.num_rows(), None))
1627                .with_schema(Arc::new(expected_schema))
1628                .unwrap()
1629        );
1630
1631        // Re-ordered schema
1632        let reordered_schema = Schema::new(vec![
1633            Field::new("b", DataType::Utf8, true),
1634            Field::new("a", DataType::Int32, true),
1635        ]);
1636        let reordered_projected = batch.project_by_schema(&reordered_schema).unwrap();
1637        let expected_schema = Arc::new(reordered_schema.with_metadata(metadata.clone().into()));
1638        assert_eq!(
1639            reordered_projected,
1640            RecordBatch::try_new(
1641                expected_schema,
1642                vec![
1643                    Arc::new(StringArray::from_iter_values(
1644                        (0..20).map(|i| format!("str-{}", i)),
1645                    )),
1646                    Arc::new(Int32Array::from_iter_values(0..20)),
1647                ],
1648            )
1649            .unwrap()
1650        );
1651
1652        // Sub schema
1653        let sub_schema = Schema::new(vec![Field::new("a", DataType::Int32, true)]);
1654        let sub_projected = batch.project_by_schema(&sub_schema).unwrap();
1655        let expected_schema = Arc::new(sub_schema.with_metadata(metadata.into()));
1656        assert_eq!(
1657            sub_projected,
1658            RecordBatch::try_new(
1659                expected_schema,
1660                vec![Arc::new(Int32Array::from_iter_values(0..20))],
1661            )
1662            .unwrap()
1663        );
1664    }
1665
1666    #[test]
1667    fn test_project_preserves_struct_validity() {
1668        // Test that projecting a struct array preserves its validity (fix for issue #4385)
1669        let fields = Fields::from(vec![
1670            Field::new("id", DataType::Int32, false),
1671            Field::new("value", DataType::Float32, true),
1672        ]);
1673
1674        // Create a struct array with validity
1675        let id_array = Int32Array::from(vec![1, 2, 3]);
1676        let value_array = Float32Array::from(vec![Some(1.0), Some(2.0), Some(3.0)]);
1677        let struct_array = StructArray::new(
1678            fields.clone(),
1679            vec![
1680                Arc::new(id_array) as ArrayRef,
1681                Arc::new(value_array) as ArrayRef,
1682            ],
1683            Some(vec![true, false, true].into()), // Second struct is null
1684        );
1685
1686        // Project the struct array
1687        let projected = project(&struct_array, &fields).unwrap();
1688
1689        // Verify the validity is preserved
1690        assert_eq!(projected.null_count(), 1);
1691        assert!(!projected.is_null(0));
1692        assert!(projected.is_null(1));
1693        assert!(!projected.is_null(2));
1694    }
1695
1696    #[test]
1697    fn test_merge_struct_with_different_validity() {
1698        // Test case from Weston's review comment
1699        // File 1 has height field with some nulls
1700        let height_array = Int32Array::from(vec![Some(500), None, Some(600), None]);
1701        let left_fields = Fields::from(vec![Field::new("height", DataType::Int32, true)]);
1702        let left_struct = StructArray::new(
1703            left_fields,
1704            vec![Arc::new(height_array) as ArrayRef],
1705            Some(vec![true, false, true, false].into()), // Rows 2 and 4 are null structs
1706        );
1707
1708        // File 2 has width field with some nulls
1709        let width_array = Int32Array::from(vec![Some(300), Some(200), None, None]);
1710        let right_fields = Fields::from(vec![Field::new("width", DataType::Int32, true)]);
1711        let right_struct = StructArray::new(
1712            right_fields,
1713            vec![Arc::new(width_array) as ArrayRef],
1714            Some(vec![true, true, false, false].into()), // Rows 3 and 4 are null structs
1715        );
1716
1717        // Merge the two structs
1718        let merged = merge(&left_struct, &right_struct);
1719
1720        // Expected:
1721        // Row 1: both non-null -> {width: 300, height: 500}
1722        // Row 2: left null, right non-null -> {width: 200, height: null}
1723        // Row 3: left non-null, right null -> {width: null, height: 600}
1724        // Row 4: both null -> null struct
1725
1726        assert_eq!(merged.null_count(), 1); // Only row 4 is null
1727        assert!(!merged.is_null(0));
1728        assert!(!merged.is_null(1));
1729        assert!(!merged.is_null(2));
1730        assert!(merged.is_null(3));
1731
1732        // Check field values
1733        let height_col = merged.column_by_name("height").unwrap();
1734        let height_values = height_col.as_any().downcast_ref::<Int32Array>().unwrap();
1735        assert_eq!(height_values.value(0), 500);
1736        assert!(height_values.is_null(1)); // height is null when left struct was null
1737        assert_eq!(height_values.value(2), 600);
1738
1739        let width_col = merged.column_by_name("width").unwrap();
1740        let width_values = width_col.as_any().downcast_ref::<Int32Array>().unwrap();
1741        assert_eq!(width_values.value(0), 300);
1742        assert_eq!(width_values.value(1), 200);
1743        assert!(width_values.is_null(2)); // width is null when right struct was null
1744    }
1745}