vortex_array/arrow/
convert.rs

1use arrow_array::array::{
2    Array as ArrowArray, ArrowPrimitiveType, BooleanArray as ArrowBooleanArray, GenericByteArray,
3    NullArray as ArrowNullArray, OffsetSizeTrait, PrimitiveArray as ArrowPrimitiveArray,
4    StructArray as ArrowStructArray,
5};
6use arrow_array::cast::{AsArray, as_null_array};
7use arrow_array::types::{
8    ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
9    Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
10    Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
11    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
12    TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
13};
14use arrow_array::{
15    BinaryViewArray, GenericByteViewArray, GenericListArray, StringViewArray, make_array,
16};
17use arrow_buffer::buffer::{NullBuffer, OffsetBuffer};
18use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer as ArrowBuffer, ScalarBuffer};
19use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit};
20use itertools::Itertools;
21use vortex_buffer::{Alignment, Buffer, ByteBuffer};
22use vortex_dtype::datetime::TimeUnit;
23use vortex_dtype::{DType, DecimalDType, NativePType, PType};
24use vortex_error::{VortexExpect as _, vortex_panic};
25use vortex_scalar::i256;
26
27use crate::arrays::{
28    BoolArray, DecimalArray, ListArray, NullArray, PrimitiveArray, StructArray, TemporalArray,
29    VarBinArray, VarBinViewArray,
30};
31use crate::arrow::FromArrowArray;
32use crate::validity::Validity;
33use crate::{ArrayRef, IntoArray};
34
35impl IntoArray for ArrowBuffer {
36    fn into_array(self) -> ArrayRef {
37        PrimitiveArray::from_byte_buffer(
38            ByteBuffer::from_arrow_buffer(self, Alignment::of::<u8>()),
39            PType::U8,
40            Validity::NonNullable,
41        )
42        .into_array()
43    }
44}
45
46impl IntoArray for BooleanBuffer {
47    fn into_array(self) -> ArrayRef {
48        BoolArray::new(self, Validity::NonNullable).into_array()
49    }
50}
51
52impl<T> IntoArray for ScalarBuffer<T>
53where
54    T: ArrowNativeType + NativePType,
55{
56    fn into_array(self) -> ArrayRef {
57        PrimitiveArray::new(
58            Buffer::<T>::from_arrow_scalar_buffer(self),
59            Validity::NonNullable,
60        )
61        .into_array()
62    }
63}
64
65impl<O> IntoArray for OffsetBuffer<O>
66where
67    O: NativePType + OffsetSizeTrait,
68{
69    fn into_array(self) -> ArrayRef {
70        let primitive = PrimitiveArray::new(
71            Buffer::from_arrow_scalar_buffer(self.into_inner()),
72            Validity::NonNullable,
73        );
74
75        primitive.into_array()
76    }
77}
78
79macro_rules! impl_from_arrow_primitive {
80    ($ty:path) => {
81        impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
82            fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
83                let buffer = Buffer::from_arrow_scalar_buffer(value.values().clone());
84                let validity = nulls(value.nulls(), nullable);
85                PrimitiveArray::new(buffer, validity).into_array()
86            }
87        }
88    };
89}
90
91impl_from_arrow_primitive!(Int8Type);
92impl_from_arrow_primitive!(Int16Type);
93impl_from_arrow_primitive!(Int32Type);
94impl_from_arrow_primitive!(Int64Type);
95impl_from_arrow_primitive!(UInt8Type);
96impl_from_arrow_primitive!(UInt16Type);
97impl_from_arrow_primitive!(UInt32Type);
98impl_from_arrow_primitive!(UInt64Type);
99impl_from_arrow_primitive!(Float16Type);
100impl_from_arrow_primitive!(Float32Type);
101impl_from_arrow_primitive!(Float64Type);
102
103impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
104    fn from_arrow(array: &ArrowPrimitiveArray<Decimal128Type>, nullable: bool) -> Self {
105        let decimal_type = DecimalDType::new(array.precision(), array.scale());
106        let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
107        let validity = nulls(array.nulls(), nullable);
108        DecimalArray::new(buffer, decimal_type, validity).into_array()
109    }
110}
111
112impl FromArrowArray<&ArrowPrimitiveArray<Decimal256Type>> for ArrayRef {
113    fn from_arrow(array: &ArrowPrimitiveArray<Decimal256Type>, nullable: bool) -> Self {
114        let decimal_type = DecimalDType::new(array.precision(), array.scale());
115        let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
116        // SAFETY: Our i256 implementation has the same bit-pattern representation of the
117        //  arrow_buffer::i256 type. It is safe to treat values held inside the buffer as values
118        //  of either type.
119        let buffer =
120            unsafe { std::mem::transmute::<Buffer<arrow_buffer::i256>, Buffer<i256>>(buffer) };
121        let validity = nulls(array.nulls(), nullable);
122        DecimalArray::new(buffer, decimal_type, validity).into_array()
123    }
124}
125
126macro_rules! impl_from_arrow_temporal {
127    ($ty:path) => {
128        impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
129            fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
130                temporal_array(value, nullable)
131            }
132        }
133    };
134}
135
136// timestamp
137impl_from_arrow_temporal!(TimestampSecondType);
138impl_from_arrow_temporal!(TimestampMillisecondType);
139impl_from_arrow_temporal!(TimestampMicrosecondType);
140impl_from_arrow_temporal!(TimestampNanosecondType);
141
142// time
143impl_from_arrow_temporal!(Time32SecondType);
144impl_from_arrow_temporal!(Time32MillisecondType);
145impl_from_arrow_temporal!(Time64MicrosecondType);
146impl_from_arrow_temporal!(Time64NanosecondType);
147
148// date
149impl_from_arrow_temporal!(Date32Type);
150impl_from_arrow_temporal!(Date64Type);
151
152fn temporal_array<T: ArrowPrimitiveType>(value: &ArrowPrimitiveArray<T>, nullable: bool) -> ArrayRef
153where
154    T::Native: NativePType,
155{
156    let arr = PrimitiveArray::new(
157        Buffer::from_arrow_scalar_buffer(value.values().clone()),
158        nulls(value.nulls(), nullable),
159    )
160    .into_array();
161
162    match T::DATA_TYPE {
163        DataType::Timestamp(time_unit, tz) => {
164            let tz = tz.map(|s| s.to_string());
165            TemporalArray::new_timestamp(arr, time_unit.into(), tz).into()
166        }
167        DataType::Time32(time_unit) => TemporalArray::new_time(arr, time_unit.into()).into(),
168        DataType::Time64(time_unit) => TemporalArray::new_time(arr, time_unit.into()).into(),
169        DataType::Date32 => TemporalArray::new_date(arr, TimeUnit::D).into(),
170        DataType::Date64 => TemporalArray::new_date(arr, TimeUnit::Ms).into(),
171        DataType::Duration(_) => unimplemented!(),
172        DataType::Interval(_) => unimplemented!(),
173        _ => vortex_panic!("Invalid temporal type: {}", T::DATA_TYPE),
174    }
175}
176
177impl<T: ByteArrayType> FromArrowArray<&GenericByteArray<T>> for ArrayRef
178where
179    <T as ByteArrayType>::Offset: NativePType,
180{
181    fn from_arrow(value: &GenericByteArray<T>, nullable: bool) -> Self {
182        let dtype = match T::DATA_TYPE {
183            DataType::Binary | DataType::LargeBinary => DType::Binary(nullable.into()),
184            DataType::Utf8 | DataType::LargeUtf8 => DType::Utf8(nullable.into()),
185            _ => vortex_panic!("Invalid data type for ByteArray: {}", T::DATA_TYPE),
186        };
187        VarBinArray::try_new(
188            value.offsets().clone().into_array(),
189            ByteBuffer::from_arrow_buffer(value.values().clone(), Alignment::of::<u8>()),
190            dtype,
191            nulls(value.nulls(), nullable),
192        )
193        .vortex_expect("Failed to convert Arrow GenericByteArray to Vortex VarBinArray")
194        .into_array()
195    }
196}
197
198impl<T: ByteViewType> FromArrowArray<&GenericByteViewArray<T>> for ArrayRef {
199    fn from_arrow(value: &GenericByteViewArray<T>, nullable: bool) -> Self {
200        let dtype = match T::DATA_TYPE {
201            DataType::BinaryView => DType::Binary(nullable.into()),
202            DataType::Utf8View => DType::Utf8(nullable.into()),
203            _ => vortex_panic!("Invalid data type for ByteViewArray: {}", T::DATA_TYPE),
204        };
205
206        let views_buffer = Buffer::from_byte_buffer(
207            Buffer::from_arrow_scalar_buffer(value.views().clone()).into_byte_buffer(),
208        );
209
210        VarBinViewArray::try_new(
211            views_buffer,
212            value
213                .data_buffers()
214                .iter()
215                .map(|b| ByteBuffer::from_arrow_buffer(b.clone(), Alignment::of::<u8>()))
216                .collect::<Vec<_>>(),
217            dtype,
218            nulls(value.nulls(), nullable),
219        )
220        .vortex_expect("Failed to convert Arrow GenericByteViewArray to Vortex VarBinViewArray")
221        .into_array()
222    }
223}
224
225impl FromArrowArray<&ArrowBooleanArray> for ArrayRef {
226    fn from_arrow(value: &ArrowBooleanArray, nullable: bool) -> Self {
227        BoolArray::new(value.values().clone(), nulls(value.nulls(), nullable)).into_array()
228    }
229}
230
231/// Strip out the nulls from this array and return a new array without nulls.
232fn remove_nulls(data: arrow_data::ArrayData) -> arrow_data::ArrayData {
233    if data.null_count() == 0 {
234        // No nulls to remove, return the array as is
235        return data;
236    }
237
238    let children = match data.data_type() {
239        DataType::Struct(fields) => Some(
240            fields
241                .iter()
242                .zip(data.child_data().iter())
243                .map(|(field, child_data)| {
244                    if field.is_nullable() {
245                        child_data.clone()
246                    } else {
247                        remove_nulls(child_data.clone())
248                    }
249                })
250                .collect_vec(),
251        ),
252        DataType::List(f)
253        | DataType::LargeList(f)
254        | DataType::ListView(f)
255        | DataType::LargeListView(f)
256        | DataType::FixedSizeList(f, _)
257            if !f.is_nullable() =>
258        {
259            // All list types only have one child
260            assert_eq!(
261                data.child_data().len(),
262                1,
263                "List types should have one child"
264            );
265            Some(vec![remove_nulls(data.child_data()[0].clone())])
266        }
267        _ => None,
268    };
269
270    let mut builder = data.into_builder().nulls(None);
271    if let Some(children) = children {
272        builder = builder.child_data(children);
273    }
274    builder
275        .build()
276        .vortex_expect("reconstructing array without nulls")
277}
278
279impl FromArrowArray<&ArrowStructArray> for ArrayRef {
280    fn from_arrow(value: &ArrowStructArray, nullable: bool) -> Self {
281        StructArray::try_new(
282            value.column_names().iter().copied().collect(),
283            value
284                .columns()
285                .iter()
286                .zip(value.fields())
287                .map(|(c, field)| {
288                    // Arrow pushes down nulls, even into non-nullable fields. So we strip them
289                    // out here because Vortex is a little more strict.
290                    if c.null_count() > 0 && !field.is_nullable() {
291                        let stripped = make_array(remove_nulls(c.into_data()));
292                        Self::from_arrow(stripped.as_ref(), false)
293                    } else {
294                        Self::from_arrow(c.as_ref(), field.is_nullable())
295                    }
296                })
297                .collect(),
298            value.len(),
299            nulls(value.nulls(), nullable),
300        )
301        .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
302        .into_array()
303    }
304}
305
306impl<O: OffsetSizeTrait + NativePType> FromArrowArray<&GenericListArray<O>> for ArrayRef {
307    fn from_arrow(value: &GenericListArray<O>, nullable: bool) -> Self {
308        // Extract the validity of the underlying element array
309        let elem_nullable = match value.data_type() {
310            DataType::List(field) => field.is_nullable(),
311            DataType::LargeList(field) => field.is_nullable(),
312            dt => vortex_panic!("Invalid data type for ListArray: {dt}"),
313        };
314        ListArray::try_new(
315            Self::from_arrow(value.values().as_ref(), elem_nullable),
316            // offsets are always non-nullable
317            value.offsets().clone().into_array(),
318            nulls(value.nulls(), nullable),
319        )
320        .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
321        .into_array()
322    }
323}
324
325impl FromArrowArray<&ArrowNullArray> for ArrayRef {
326    fn from_arrow(value: &ArrowNullArray, nullable: bool) -> Self {
327        assert!(nullable);
328        NullArray::new(value.len()).into_array()
329    }
330}
331
332fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity {
333    if nullable {
334        nulls
335            .map(|nulls| {
336                if nulls.null_count() == nulls.len() {
337                    Validity::AllInvalid
338                } else {
339                    Validity::from(nulls.inner().clone())
340                }
341            })
342            .unwrap_or_else(|| Validity::AllValid)
343    } else {
344        assert!(nulls.map(|x| x.null_count() == 0).unwrap_or(true));
345        Validity::NonNullable
346    }
347}
348
349impl FromArrowArray<&dyn ArrowArray> for ArrayRef {
350    fn from_arrow(array: &dyn ArrowArray, nullable: bool) -> Self {
351        match array.data_type() {
352            DataType::Boolean => Self::from_arrow(array.as_boolean(), nullable),
353            DataType::UInt8 => Self::from_arrow(array.as_primitive::<UInt8Type>(), nullable),
354            DataType::UInt16 => Self::from_arrow(array.as_primitive::<UInt16Type>(), nullable),
355            DataType::UInt32 => Self::from_arrow(array.as_primitive::<UInt32Type>(), nullable),
356            DataType::UInt64 => Self::from_arrow(array.as_primitive::<UInt64Type>(), nullable),
357            DataType::Int8 => Self::from_arrow(array.as_primitive::<Int8Type>(), nullable),
358            DataType::Int16 => Self::from_arrow(array.as_primitive::<Int16Type>(), nullable),
359            DataType::Int32 => Self::from_arrow(array.as_primitive::<Int32Type>(), nullable),
360            DataType::Int64 => Self::from_arrow(array.as_primitive::<Int64Type>(), nullable),
361            DataType::Float16 => Self::from_arrow(array.as_primitive::<Float16Type>(), nullable),
362            DataType::Float32 => Self::from_arrow(array.as_primitive::<Float32Type>(), nullable),
363            DataType::Float64 => Self::from_arrow(array.as_primitive::<Float64Type>(), nullable),
364            DataType::Utf8 => Self::from_arrow(array.as_string::<i32>(), nullable),
365            DataType::LargeUtf8 => Self::from_arrow(array.as_string::<i64>(), nullable),
366            DataType::Binary => Self::from_arrow(array.as_binary::<i32>(), nullable),
367            DataType::LargeBinary => Self::from_arrow(array.as_binary::<i64>(), nullable),
368            DataType::BinaryView => Self::from_arrow(
369                array
370                    .as_any()
371                    .downcast_ref::<BinaryViewArray>()
372                    .vortex_expect("Expected Arrow BinaryViewArray for DataType::BinaryView"),
373                nullable,
374            ),
375            DataType::Utf8View => Self::from_arrow(
376                array
377                    .as_any()
378                    .downcast_ref::<StringViewArray>()
379                    .vortex_expect("Expected Arrow StringViewArray for DataType::Utf8View"),
380                nullable,
381            ),
382            DataType::Struct(_) => Self::from_arrow(array.as_struct(), nullable),
383            DataType::List(_) => Self::from_arrow(array.as_list::<i32>(), nullable),
384            DataType::LargeList(_) => Self::from_arrow(array.as_list::<i64>(), nullable),
385            DataType::Null => Self::from_arrow(as_null_array(array), nullable),
386            DataType::Timestamp(u, _) => match u {
387                ArrowTimeUnit::Second => {
388                    Self::from_arrow(array.as_primitive::<TimestampSecondType>(), nullable)
389                }
390                ArrowTimeUnit::Millisecond => {
391                    Self::from_arrow(array.as_primitive::<TimestampMillisecondType>(), nullable)
392                }
393                ArrowTimeUnit::Microsecond => {
394                    Self::from_arrow(array.as_primitive::<TimestampMicrosecondType>(), nullable)
395                }
396                ArrowTimeUnit::Nanosecond => {
397                    Self::from_arrow(array.as_primitive::<TimestampNanosecondType>(), nullable)
398                }
399            },
400            DataType::Date32 => Self::from_arrow(array.as_primitive::<Date32Type>(), nullable),
401            DataType::Date64 => Self::from_arrow(array.as_primitive::<Date64Type>(), nullable),
402            DataType::Time32(u) => match u {
403                ArrowTimeUnit::Second => {
404                    Self::from_arrow(array.as_primitive::<Time32SecondType>(), nullable)
405                }
406                ArrowTimeUnit::Millisecond => {
407                    Self::from_arrow(array.as_primitive::<Time32MillisecondType>(), nullable)
408                }
409                _ => unreachable!(),
410            },
411            DataType::Time64(u) => match u {
412                ArrowTimeUnit::Microsecond => {
413                    Self::from_arrow(array.as_primitive::<Time64MicrosecondType>(), nullable)
414                }
415                ArrowTimeUnit::Nanosecond => {
416                    Self::from_arrow(array.as_primitive::<Time64NanosecondType>(), nullable)
417                }
418                _ => unreachable!(),
419            },
420            DataType::Decimal128(..) => {
421                Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
422            }
423            DataType::Decimal256(..) => {
424                Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
425            }
426            _ => vortex_panic!(
427                "Array encoding not implemented for Arrow data type {}",
428                array.data_type().clone()
429            ),
430        }
431    }
432}
433
434#[cfg(test)]
435mod tests {
436    use arrow_array::new_null_array;
437    use arrow_schema::{DataType, Field, Fields};
438
439    use crate::ArrayRef;
440    use crate::arrow::FromArrowArray as _;
441
442    #[test]
443    pub fn nullable_may_contain_non_nullable() {
444        let null_struct_array_with_non_nullable_field = new_null_array(
445            &DataType::Struct(Fields::from(vec![Field::new(
446                "non_nullable_inner",
447                DataType::Int32,
448                false,
449            )])),
450            1,
451        );
452        ArrayRef::from_arrow(null_struct_array_with_non_nullable_field.as_ref(), true);
453    }
454
455    #[test]
456    pub fn nullable_may_contain_deeply_nested_non_nullable() {
457        let null_struct_array_with_non_nullable_field = new_null_array(
458            &DataType::Struct(Fields::from(vec![Field::new(
459                "non_nullable_inner",
460                DataType::Struct(Fields::from(vec![Field::new(
461                    "non_nullable_deeper_inner",
462                    DataType::Int32,
463                    false,
464                )])),
465                false,
466            )])),
467            1,
468        );
469        ArrayRef::from_arrow(null_struct_array_with_non_nullable_field.as_ref(), true);
470    }
471
472    #[test]
473    #[should_panic]
474    pub fn cannot_handle_nullable_struct_containing_non_nullable_dictionary() {
475        let null_struct_array_with_non_nullable_field = new_null_array(
476            &DataType::Struct(Fields::from(vec![Field::new(
477                "non_nullable_deeper_inner",
478                DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
479                false,
480            )])),
481            1,
482        );
483
484        ArrayRef::from_arrow(null_struct_array_with_non_nullable_field.as_ref(), true);
485    }
486}