vortex_array/arrow/
convert.rs

1use arrow_array::array::{
2    Array as ArrowArray, ArrayRef as ArrowArrayRef, ArrowPrimitiveType,
3    BooleanArray as ArrowBooleanArray, GenericByteArray, NullArray as ArrowNullArray,
4    OffsetSizeTrait, PrimitiveArray as ArrowPrimitiveArray, StructArray as ArrowStructArray,
5};
6use arrow_array::cast::{AsArray, as_null_array};
7use arrow_array::types::{
8    ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
9    Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
10    Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
11    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
12    TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
13};
14use arrow_array::{BinaryViewArray, GenericByteViewArray, GenericListArray, StringViewArray};
15use arrow_buffer::buffer::{NullBuffer, OffsetBuffer};
16use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer as ArrowBuffer, ScalarBuffer};
17use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit};
18use vortex_buffer::{Alignment, Buffer, ByteBuffer};
19use vortex_dtype::datetime::TimeUnit;
20use vortex_dtype::{DType, DecimalDType, NativePType, PType};
21use vortex_error::{VortexExpect as _, vortex_panic};
22use vortex_scalar::i256;
23
24use crate::arrays::{
25    BoolArray, DecimalArray, ListArray, NullArray, PrimitiveArray, StructArray, TemporalArray,
26    VarBinArray, VarBinViewArray,
27};
28use crate::arrow::FromArrowArray;
29use crate::validity::Validity;
30use crate::{ArrayRef, IntoArray};
31
32impl IntoArray for ArrowBuffer {
33    fn into_array(self) -> ArrayRef {
34        PrimitiveArray::from_byte_buffer(
35            ByteBuffer::from_arrow_buffer(self, Alignment::of::<u8>()),
36            PType::U8,
37            Validity::NonNullable,
38        )
39        .into_array()
40    }
41}
42
43impl IntoArray for BooleanBuffer {
44    fn into_array(self) -> ArrayRef {
45        BoolArray::new(self, Validity::NonNullable).into_array()
46    }
47}
48
49impl<T> IntoArray for ScalarBuffer<T>
50where
51    T: ArrowNativeType + NativePType,
52{
53    fn into_array(self) -> ArrayRef {
54        PrimitiveArray::new(
55            Buffer::<T>::from_arrow_scalar_buffer(self),
56            Validity::NonNullable,
57        )
58        .into_array()
59    }
60}
61
62impl<O> IntoArray for OffsetBuffer<O>
63where
64    O: NativePType + OffsetSizeTrait,
65{
66    fn into_array(self) -> ArrayRef {
67        let primitive = PrimitiveArray::new(
68            Buffer::from_arrow_scalar_buffer(self.into_inner()),
69            Validity::NonNullable,
70        );
71
72        primitive.into_array()
73    }
74}
75
76macro_rules! impl_from_arrow_primitive {
77    ($ty:path) => {
78        impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
79            fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
80                let buffer = Buffer::from_arrow_scalar_buffer(value.values().clone());
81                let validity = nulls(value.nulls(), nullable);
82                PrimitiveArray::new(buffer, validity).into_array()
83            }
84        }
85    };
86}
87
88impl_from_arrow_primitive!(Int8Type);
89impl_from_arrow_primitive!(Int16Type);
90impl_from_arrow_primitive!(Int32Type);
91impl_from_arrow_primitive!(Int64Type);
92impl_from_arrow_primitive!(UInt8Type);
93impl_from_arrow_primitive!(UInt16Type);
94impl_from_arrow_primitive!(UInt32Type);
95impl_from_arrow_primitive!(UInt64Type);
96impl_from_arrow_primitive!(Float16Type);
97impl_from_arrow_primitive!(Float32Type);
98impl_from_arrow_primitive!(Float64Type);
99
100impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
101    fn from_arrow(array: &ArrowPrimitiveArray<Decimal128Type>, _nullable: bool) -> Self {
102        let decimal_type = DecimalDType::new(array.precision(), array.scale());
103        let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
104        let validity = nulls(array.nulls(), false);
105        DecimalArray::new(buffer, decimal_type, validity).into_array()
106    }
107}
108
109impl FromArrowArray<&ArrowPrimitiveArray<Decimal256Type>> for ArrayRef {
110    fn from_arrow(array: &ArrowPrimitiveArray<Decimal256Type>, _nullable: bool) -> Self {
111        let decimal_type = DecimalDType::new(array.precision(), array.scale());
112        let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
113        // SAFETY: Our i256 implementation has the same bit-pattern representation of the
114        //  arrow_buffer::i256 type. It is safe to treat values held inside the buffer as values
115        //  of either type.
116        let buffer =
117            unsafe { std::mem::transmute::<Buffer<arrow_buffer::i256>, Buffer<i256>>(buffer) };
118        let validity = nulls(array.nulls(), false);
119        DecimalArray::new(buffer, decimal_type, validity).into_array()
120    }
121}
122
123macro_rules! impl_from_arrow_temporal {
124    ($ty:path) => {
125        impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
126            fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
127                temporal_array(value, nullable)
128            }
129        }
130    };
131}
132
133// timestamp
134impl_from_arrow_temporal!(TimestampSecondType);
135impl_from_arrow_temporal!(TimestampMillisecondType);
136impl_from_arrow_temporal!(TimestampMicrosecondType);
137impl_from_arrow_temporal!(TimestampNanosecondType);
138
139// time
140impl_from_arrow_temporal!(Time32SecondType);
141impl_from_arrow_temporal!(Time32MillisecondType);
142impl_from_arrow_temporal!(Time64MicrosecondType);
143impl_from_arrow_temporal!(Time64NanosecondType);
144
145// date
146impl_from_arrow_temporal!(Date32Type);
147impl_from_arrow_temporal!(Date64Type);
148
149fn temporal_array<T: ArrowPrimitiveType>(value: &ArrowPrimitiveArray<T>, nullable: bool) -> ArrayRef
150where
151    T::Native: NativePType,
152{
153    let arr = PrimitiveArray::new(
154        Buffer::from_arrow_scalar_buffer(value.values().clone()),
155        nulls(value.nulls(), nullable),
156    )
157    .into_array();
158
159    match T::DATA_TYPE {
160        DataType::Timestamp(time_unit, tz) => {
161            let tz = tz.map(|s| s.to_string());
162            TemporalArray::new_timestamp(arr, time_unit.into(), tz).into()
163        }
164        DataType::Time32(time_unit) => TemporalArray::new_time(arr, time_unit.into()).into(),
165        DataType::Time64(time_unit) => TemporalArray::new_time(arr, time_unit.into()).into(),
166        DataType::Date32 => TemporalArray::new_date(arr, TimeUnit::D).into(),
167        DataType::Date64 => TemporalArray::new_date(arr, TimeUnit::Ms).into(),
168        DataType::Duration(_) => unimplemented!(),
169        DataType::Interval(_) => unimplemented!(),
170        _ => vortex_panic!("Invalid temporal type: {}", T::DATA_TYPE),
171    }
172}
173
174impl<T: ByteArrayType> FromArrowArray<&GenericByteArray<T>> for ArrayRef
175where
176    <T as ByteArrayType>::Offset: NativePType,
177{
178    fn from_arrow(value: &GenericByteArray<T>, nullable: bool) -> Self {
179        let dtype = match T::DATA_TYPE {
180            DataType::Binary | DataType::LargeBinary => DType::Binary(nullable.into()),
181            DataType::Utf8 | DataType::LargeUtf8 => DType::Utf8(nullable.into()),
182            _ => vortex_panic!("Invalid data type for ByteArray: {}", T::DATA_TYPE),
183        };
184        VarBinArray::try_new(
185            value.offsets().clone().into_array(),
186            ByteBuffer::from_arrow_buffer(value.values().clone(), Alignment::of::<u8>()),
187            dtype,
188            nulls(value.nulls(), nullable),
189        )
190        .vortex_expect("Failed to convert Arrow GenericByteArray to Vortex VarBinArray")
191        .into_array()
192    }
193}
194
195impl<T: ByteViewType> FromArrowArray<&GenericByteViewArray<T>> for ArrayRef {
196    fn from_arrow(value: &GenericByteViewArray<T>, nullable: bool) -> Self {
197        let dtype = match T::DATA_TYPE {
198            DataType::BinaryView => DType::Binary(nullable.into()),
199            DataType::Utf8View => DType::Utf8(nullable.into()),
200            _ => vortex_panic!("Invalid data type for ByteViewArray: {}", T::DATA_TYPE),
201        };
202
203        let views_buffer = Buffer::from_byte_buffer(
204            Buffer::from_arrow_scalar_buffer(value.views().clone()).into_byte_buffer(),
205        );
206
207        VarBinViewArray::try_new(
208            views_buffer,
209            value
210                .data_buffers()
211                .iter()
212                .map(|b| ByteBuffer::from_arrow_buffer(b.clone(), Alignment::of::<u8>()))
213                .collect::<Vec<_>>(),
214            dtype,
215            nulls(value.nulls(), nullable),
216        )
217        .vortex_expect("Failed to convert Arrow GenericByteViewArray to Vortex VarBinViewArray")
218        .into_array()
219    }
220}
221
222impl FromArrowArray<&ArrowBooleanArray> for ArrayRef {
223    fn from_arrow(value: &ArrowBooleanArray, nullable: bool) -> Self {
224        BoolArray::new(value.values().clone(), nulls(value.nulls(), nullable)).into_array()
225    }
226}
227
228impl FromArrowArray<&ArrowStructArray> for ArrayRef {
229    fn from_arrow(value: &ArrowStructArray, nullable: bool) -> Self {
230        StructArray::try_new(
231            value.column_names().iter().map(|s| (*s).into()).collect(),
232            value
233                .columns()
234                .iter()
235                .zip(value.fields())
236                .map(|(c, field)| Self::from_arrow(c.clone(), field.is_nullable()))
237                .collect(),
238            value.len(),
239            nulls(value.nulls(), nullable),
240        )
241        .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
242        .into_array()
243    }
244}
245
246impl<O: OffsetSizeTrait + NativePType> FromArrowArray<&GenericListArray<O>> for ArrayRef {
247    fn from_arrow(value: &GenericListArray<O>, nullable: bool) -> Self {
248        // Extract the validity of the underlying element array
249        let elem_nullable = match value.data_type() {
250            DataType::List(field) => field.is_nullable(),
251            DataType::LargeList(field) => field.is_nullable(),
252            dt => vortex_panic!("Invalid data type for ListArray: {dt}"),
253        };
254        ListArray::try_new(
255            Self::from_arrow(value.values().clone(), elem_nullable),
256            // offsets are always non-nullable
257            value.offsets().clone().into_array(),
258            nulls(value.nulls(), nullable),
259        )
260        .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
261        .into_array()
262    }
263}
264
265impl FromArrowArray<&ArrowNullArray> for ArrayRef {
266    fn from_arrow(value: &ArrowNullArray, nullable: bool) -> Self {
267        assert!(nullable);
268        NullArray::new(value.len()).into_array()
269    }
270}
271
272fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity {
273    if nullable {
274        nulls
275            .map(|nulls| {
276                if nulls.null_count() == nulls.len() {
277                    Validity::AllInvalid
278                } else {
279                    Validity::from(nulls.inner().clone())
280                }
281            })
282            .unwrap_or_else(|| Validity::AllValid)
283    } else {
284        assert!(nulls.map(|x| x.null_count() == 0).unwrap_or(true));
285        Validity::NonNullable
286    }
287}
288
289impl FromArrowArray<ArrowArrayRef> for ArrayRef {
290    fn from_arrow(array: ArrowArrayRef, nullable: bool) -> Self {
291        match array.data_type() {
292            DataType::Boolean => Self::from_arrow(array.as_boolean(), nullable),
293            DataType::UInt8 => Self::from_arrow(array.as_primitive::<UInt8Type>(), nullable),
294            DataType::UInt16 => Self::from_arrow(array.as_primitive::<UInt16Type>(), nullable),
295            DataType::UInt32 => Self::from_arrow(array.as_primitive::<UInt32Type>(), nullable),
296            DataType::UInt64 => Self::from_arrow(array.as_primitive::<UInt64Type>(), nullable),
297            DataType::Int8 => Self::from_arrow(array.as_primitive::<Int8Type>(), nullable),
298            DataType::Int16 => Self::from_arrow(array.as_primitive::<Int16Type>(), nullable),
299            DataType::Int32 => Self::from_arrow(array.as_primitive::<Int32Type>(), nullable),
300            DataType::Int64 => Self::from_arrow(array.as_primitive::<Int64Type>(), nullable),
301            DataType::Float16 => Self::from_arrow(array.as_primitive::<Float16Type>(), nullable),
302            DataType::Float32 => Self::from_arrow(array.as_primitive::<Float32Type>(), nullable),
303            DataType::Float64 => Self::from_arrow(array.as_primitive::<Float64Type>(), nullable),
304            DataType::Utf8 => Self::from_arrow(array.as_string::<i32>(), nullable),
305            DataType::LargeUtf8 => Self::from_arrow(array.as_string::<i64>(), nullable),
306            DataType::Binary => Self::from_arrow(array.as_binary::<i32>(), nullable),
307            DataType::LargeBinary => Self::from_arrow(array.as_binary::<i64>(), nullable),
308            DataType::BinaryView => Self::from_arrow(
309                array
310                    .as_any()
311                    .downcast_ref::<BinaryViewArray>()
312                    .vortex_expect("Expected Arrow BinaryViewArray for DataType::BinaryView"),
313                nullable,
314            ),
315            DataType::Utf8View => Self::from_arrow(
316                array
317                    .as_any()
318                    .downcast_ref::<StringViewArray>()
319                    .vortex_expect("Expected Arrow StringViewArray for DataType::Utf8View"),
320                nullable,
321            ),
322            DataType::Struct(_) => Self::from_arrow(array.as_struct(), nullable),
323            DataType::List(_) => Self::from_arrow(array.as_list::<i32>(), nullable),
324            DataType::LargeList(_) => Self::from_arrow(array.as_list::<i64>(), nullable),
325            DataType::Null => Self::from_arrow(as_null_array(&array), nullable),
326            DataType::Timestamp(u, _) => match u {
327                ArrowTimeUnit::Second => {
328                    Self::from_arrow(array.as_primitive::<TimestampSecondType>(), nullable)
329                }
330                ArrowTimeUnit::Millisecond => {
331                    Self::from_arrow(array.as_primitive::<TimestampMillisecondType>(), nullable)
332                }
333                ArrowTimeUnit::Microsecond => {
334                    Self::from_arrow(array.as_primitive::<TimestampMicrosecondType>(), nullable)
335                }
336                ArrowTimeUnit::Nanosecond => {
337                    Self::from_arrow(array.as_primitive::<TimestampNanosecondType>(), nullable)
338                }
339            },
340            DataType::Date32 => Self::from_arrow(array.as_primitive::<Date32Type>(), nullable),
341            DataType::Date64 => Self::from_arrow(array.as_primitive::<Date64Type>(), nullable),
342            DataType::Time32(u) => match u {
343                ArrowTimeUnit::Second => {
344                    Self::from_arrow(array.as_primitive::<Time32SecondType>(), nullable)
345                }
346                ArrowTimeUnit::Millisecond => {
347                    Self::from_arrow(array.as_primitive::<Time32MillisecondType>(), nullable)
348                }
349                _ => unreachable!(),
350            },
351            DataType::Time64(u) => match u {
352                ArrowTimeUnit::Microsecond => {
353                    Self::from_arrow(array.as_primitive::<Time64MicrosecondType>(), nullable)
354                }
355                ArrowTimeUnit::Nanosecond => {
356                    Self::from_arrow(array.as_primitive::<Time64NanosecondType>(), nullable)
357                }
358                _ => unreachable!(),
359            },
360            DataType::Decimal128(..) => {
361                Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
362            }
363            DataType::Decimal256(..) => {
364                Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
365            }
366            _ => vortex_panic!(
367                "Array encoding not implemented for Arrow data type {}",
368                array.data_type().clone()
369            ),
370        }
371    }
372}