vortex_array/arrow/
array.rs

1use arrow_array::array::{
2    Array as ArrowArray, ArrayRef as ArrowArrayRef, ArrowPrimitiveType,
3    BooleanArray as ArrowBooleanArray, GenericByteArray, NullArray as ArrowNullArray,
4    OffsetSizeTrait, PrimitiveArray as ArrowPrimitiveArray, StructArray as ArrowStructArray,
5};
6use arrow_array::cast::{AsArray, as_null_array};
7use arrow_array::types::{
8    ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
9    Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
10    Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
11    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
12    TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
13};
14use arrow_array::{BinaryViewArray, GenericByteViewArray, GenericListArray, StringViewArray};
15use arrow_buffer::buffer::{NullBuffer, OffsetBuffer};
16use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer as ArrowBuffer, ScalarBuffer};
17use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit};
18use vortex_buffer::{Alignment, Buffer, ByteBuffer};
19use vortex_dtype::datetime::TimeUnit;
20use vortex_dtype::{DType, DecimalDType, NativePType, PType};
21use vortex_error::{VortexExpect as _, vortex_panic};
22use vortex_scalar::i256;
23
24use crate::arrays::{
25    BoolArray, DecimalArray, ListArray, NullArray, PrimitiveArray, StructArray, TemporalArray,
26    VarBinArray, VarBinViewArray,
27};
28use crate::arrow::FromArrowArray;
29use crate::validity::Validity;
30use crate::{Array, ArrayRef, IntoArray};
31
32impl IntoArray for ArrowBuffer {
33    fn into_array(self) -> ArrayRef {
34        PrimitiveArray::from_byte_buffer(
35            ByteBuffer::from_arrow_buffer(self, Alignment::of::<u8>()),
36            PType::U8,
37            Validity::NonNullable,
38        )
39        .into_array()
40    }
41}
42
43impl IntoArray for BooleanBuffer {
44    fn into_array(self) -> ArrayRef {
45        BoolArray::new(self, Validity::NonNullable).into_array()
46    }
47}
48
49impl<T> IntoArray for ScalarBuffer<T>
50where
51    T: ArrowNativeType + NativePType,
52{
53    fn into_array(self) -> ArrayRef {
54        PrimitiveArray::new(
55            Buffer::<T>::from_arrow_scalar_buffer(self),
56            Validity::NonNullable,
57        )
58        .into_array()
59    }
60}
61
62impl<O> IntoArray for OffsetBuffer<O>
63where
64    O: NativePType + OffsetSizeTrait,
65{
66    fn into_array(self) -> ArrayRef {
67        let primitive = PrimitiveArray::new(
68            Buffer::from_arrow_scalar_buffer(self.into_inner()),
69            Validity::NonNullable,
70        );
71
72        primitive.into_array()
73    }
74}
75
76macro_rules! impl_from_arrow_primitive {
77    ($ty:path) => {
78        impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
79            fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
80                let buffer = Buffer::from_arrow_scalar_buffer(value.values().clone());
81                let validity = nulls(value.nulls(), nullable);
82                PrimitiveArray::new(buffer, validity).into_array()
83            }
84        }
85    };
86}
87
88impl_from_arrow_primitive!(Int8Type);
89impl_from_arrow_primitive!(Int16Type);
90impl_from_arrow_primitive!(Int32Type);
91impl_from_arrow_primitive!(Int64Type);
92impl_from_arrow_primitive!(UInt8Type);
93impl_from_arrow_primitive!(UInt16Type);
94impl_from_arrow_primitive!(UInt32Type);
95impl_from_arrow_primitive!(UInt64Type);
96impl_from_arrow_primitive!(Float16Type);
97impl_from_arrow_primitive!(Float32Type);
98impl_from_arrow_primitive!(Float64Type);
99
100impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
101    fn from_arrow(array: &ArrowPrimitiveArray<Decimal128Type>, _nullable: bool) -> Self {
102        let decimal_type = DecimalDType::new(array.precision(), array.scale());
103        let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
104        let validity = nulls(array.nulls(), false);
105        DecimalArray::new(buffer, decimal_type, validity).into_array()
106    }
107}
108
109impl FromArrowArray<&ArrowPrimitiveArray<Decimal256Type>> for ArrayRef {
110    fn from_arrow(array: &ArrowPrimitiveArray<Decimal256Type>, _nullable: bool) -> Self {
111        let decimal_type = DecimalDType::new(array.precision(), array.scale());
112        let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
113        // SAFETY: Our i256 implementation has the same bit-pattern representation of the
114        //  arrow_buffer::i256 type. It is safe to treat values held inside the buffer as values
115        //  of either type.
116        let buffer =
117            unsafe { std::mem::transmute::<Buffer<arrow_buffer::i256>, Buffer<i256>>(buffer) };
118        let validity = nulls(array.nulls(), false);
119        DecimalArray::new(buffer, decimal_type, validity).into_array()
120    }
121}
122
123macro_rules! impl_from_arrow_temporal {
124    ($ty:path) => {
125        impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
126            fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
127                temporal_array(value, nullable)
128            }
129        }
130    };
131}
132
133// timestamp
134impl_from_arrow_temporal!(TimestampSecondType);
135impl_from_arrow_temporal!(TimestampMillisecondType);
136impl_from_arrow_temporal!(TimestampMicrosecondType);
137impl_from_arrow_temporal!(TimestampNanosecondType);
138
139// time
140impl_from_arrow_temporal!(Time32SecondType);
141impl_from_arrow_temporal!(Time32MillisecondType);
142impl_from_arrow_temporal!(Time64MicrosecondType);
143impl_from_arrow_temporal!(Time64NanosecondType);
144
145// date
146impl_from_arrow_temporal!(Date32Type);
147impl_from_arrow_temporal!(Date64Type);
148
149fn temporal_array<T: ArrowPrimitiveType>(value: &ArrowPrimitiveArray<T>, nullable: bool) -> ArrayRef
150where
151    T::Native: NativePType,
152{
153    let arr = PrimitiveArray::new(
154        Buffer::from_arrow_scalar_buffer(value.values().clone()),
155        nulls(value.nulls(), nullable),
156    )
157    .into_array();
158
159    match T::DATA_TYPE {
160        DataType::Timestamp(time_unit, tz) => {
161            let tz = tz.map(|s| s.to_string());
162            TemporalArray::new_timestamp(arr.into_array(), time_unit.into(), tz).into()
163        }
164        DataType::Time32(time_unit) => {
165            TemporalArray::new_time(arr.into_array(), time_unit.into()).into()
166        }
167        DataType::Time64(time_unit) => {
168            TemporalArray::new_time(arr.into_array(), time_unit.into()).into()
169        }
170        DataType::Date32 => TemporalArray::new_date(arr.into_array(), TimeUnit::D).into(),
171        DataType::Date64 => TemporalArray::new_date(arr.into_array(), TimeUnit::Ms).into(),
172        DataType::Duration(_) => unimplemented!(),
173        DataType::Interval(_) => unimplemented!(),
174        _ => vortex_panic!("Invalid temporal type: {}", T::DATA_TYPE),
175    }
176}
177
178impl<T: ByteArrayType> FromArrowArray<&GenericByteArray<T>> for ArrayRef
179where
180    <T as ByteArrayType>::Offset: NativePType,
181{
182    fn from_arrow(value: &GenericByteArray<T>, nullable: bool) -> Self {
183        let dtype = match T::DATA_TYPE {
184            DataType::Binary | DataType::LargeBinary => DType::Binary(nullable.into()),
185            DataType::Utf8 | DataType::LargeUtf8 => DType::Utf8(nullable.into()),
186            _ => vortex_panic!("Invalid data type for ByteArray: {}", T::DATA_TYPE),
187        };
188        VarBinArray::try_new(
189            value.offsets().clone().into_array(),
190            ByteBuffer::from_arrow_buffer(value.values().clone(), Alignment::of::<u8>()),
191            dtype,
192            nulls(value.nulls(), nullable),
193        )
194        .vortex_expect("Failed to convert Arrow GenericByteArray to Vortex VarBinArray")
195        .into_array()
196    }
197}
198
199impl<T: ByteViewType> FromArrowArray<&GenericByteViewArray<T>> for ArrayRef {
200    fn from_arrow(value: &GenericByteViewArray<T>, nullable: bool) -> Self {
201        let dtype = match T::DATA_TYPE {
202            DataType::BinaryView => DType::Binary(nullable.into()),
203            DataType::Utf8View => DType::Utf8(nullable.into()),
204            _ => vortex_panic!("Invalid data type for ByteViewArray: {}", T::DATA_TYPE),
205        };
206
207        let views_buffer = Buffer::from_byte_buffer(
208            Buffer::from_arrow_scalar_buffer(value.views().clone()).into_byte_buffer(),
209        );
210
211        VarBinViewArray::try_new(
212            views_buffer,
213            value
214                .data_buffers()
215                .iter()
216                .map(|b| ByteBuffer::from_arrow_buffer(b.clone(), Alignment::of::<u8>()))
217                .collect::<Vec<_>>(),
218            dtype,
219            nulls(value.nulls(), nullable),
220        )
221        .vortex_expect("Failed to convert Arrow GenericByteViewArray to Vortex VarBinViewArray")
222        .into_array()
223    }
224}
225
226impl FromArrowArray<&ArrowBooleanArray> for ArrayRef {
227    fn from_arrow(value: &ArrowBooleanArray, nullable: bool) -> Self {
228        BoolArray::new(value.values().clone(), nulls(value.nulls(), nullable)).into_array()
229    }
230}
231
232impl FromArrowArray<&ArrowStructArray> for ArrayRef {
233    fn from_arrow(value: &ArrowStructArray, nullable: bool) -> Self {
234        StructArray::try_new(
235            value.column_names().iter().map(|s| (*s).into()).collect(),
236            value
237                .columns()
238                .iter()
239                .zip(value.fields())
240                .map(|(c, field)| Self::from_arrow(c.clone(), field.is_nullable()))
241                .collect(),
242            value.len(),
243            nulls(value.nulls(), nullable),
244        )
245        .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
246        .into_array()
247    }
248}
249
250impl<O: OffsetSizeTrait + NativePType> FromArrowArray<&GenericListArray<O>> for ArrayRef {
251    fn from_arrow(value: &GenericListArray<O>, nullable: bool) -> Self {
252        // Extract the validity of the underlying element array
253        let elem_nullable = match value.data_type() {
254            DataType::List(field) => field.is_nullable(),
255            DataType::LargeList(field) => field.is_nullable(),
256            dt => vortex_panic!("Invalid data type for ListArray: {dt}"),
257        };
258        ListArray::try_new(
259            Self::from_arrow(value.values().clone(), elem_nullable),
260            // offsets are always non-nullable
261            value.offsets().clone().into_array(),
262            nulls(value.nulls(), nullable),
263        )
264        .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
265        .into_array()
266    }
267}
268
269impl FromArrowArray<&ArrowNullArray> for ArrayRef {
270    fn from_arrow(value: &ArrowNullArray, nullable: bool) -> Self {
271        assert!(nullable);
272        NullArray::new(value.len()).into_array()
273    }
274}
275
276fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity {
277    if nullable {
278        nulls
279            .map(|nulls| {
280                if nulls.null_count() == nulls.len() {
281                    Validity::AllInvalid
282                } else {
283                    Validity::from(nulls.inner().clone())
284                }
285            })
286            .unwrap_or_else(|| Validity::AllValid)
287    } else {
288        assert!(nulls.map(|x| x.null_count() == 0).unwrap_or(true));
289        Validity::NonNullable
290    }
291}
292
293impl FromArrowArray<ArrowArrayRef> for ArrayRef {
294    fn from_arrow(array: ArrowArrayRef, nullable: bool) -> Self {
295        match array.data_type() {
296            DataType::Boolean => Self::from_arrow(array.as_boolean(), nullable),
297            DataType::UInt8 => Self::from_arrow(array.as_primitive::<UInt8Type>(), nullable),
298            DataType::UInt16 => Self::from_arrow(array.as_primitive::<UInt16Type>(), nullable),
299            DataType::UInt32 => Self::from_arrow(array.as_primitive::<UInt32Type>(), nullable),
300            DataType::UInt64 => Self::from_arrow(array.as_primitive::<UInt64Type>(), nullable),
301            DataType::Int8 => Self::from_arrow(array.as_primitive::<Int8Type>(), nullable),
302            DataType::Int16 => Self::from_arrow(array.as_primitive::<Int16Type>(), nullable),
303            DataType::Int32 => Self::from_arrow(array.as_primitive::<Int32Type>(), nullable),
304            DataType::Int64 => Self::from_arrow(array.as_primitive::<Int64Type>(), nullable),
305            DataType::Float16 => Self::from_arrow(array.as_primitive::<Float16Type>(), nullable),
306            DataType::Float32 => Self::from_arrow(array.as_primitive::<Float32Type>(), nullable),
307            DataType::Float64 => Self::from_arrow(array.as_primitive::<Float64Type>(), nullable),
308            DataType::Utf8 => Self::from_arrow(array.as_string::<i32>(), nullable),
309            DataType::LargeUtf8 => Self::from_arrow(array.as_string::<i64>(), nullable),
310            DataType::Binary => Self::from_arrow(array.as_binary::<i32>(), nullable),
311            DataType::LargeBinary => Self::from_arrow(array.as_binary::<i64>(), nullable),
312            DataType::BinaryView => Self::from_arrow(
313                array
314                    .as_any()
315                    .downcast_ref::<BinaryViewArray>()
316                    .vortex_expect("Expected Arrow BinaryViewArray for DataType::BinaryView"),
317                nullable,
318            ),
319            DataType::Utf8View => Self::from_arrow(
320                array
321                    .as_any()
322                    .downcast_ref::<StringViewArray>()
323                    .vortex_expect("Expected Arrow StringViewArray for DataType::Utf8View"),
324                nullable,
325            ),
326            DataType::Struct(_) => Self::from_arrow(array.as_struct(), nullable),
327            DataType::List(_) => Self::from_arrow(array.as_list::<i32>(), nullable),
328            DataType::LargeList(_) => Self::from_arrow(array.as_list::<i64>(), nullable),
329            DataType::Null => Self::from_arrow(as_null_array(&array), nullable),
330            DataType::Timestamp(u, _) => match u {
331                ArrowTimeUnit::Second => {
332                    Self::from_arrow(array.as_primitive::<TimestampSecondType>(), nullable)
333                }
334                ArrowTimeUnit::Millisecond => {
335                    Self::from_arrow(array.as_primitive::<TimestampMillisecondType>(), nullable)
336                }
337                ArrowTimeUnit::Microsecond => {
338                    Self::from_arrow(array.as_primitive::<TimestampMicrosecondType>(), nullable)
339                }
340                ArrowTimeUnit::Nanosecond => {
341                    Self::from_arrow(array.as_primitive::<TimestampNanosecondType>(), nullable)
342                }
343            },
344            DataType::Date32 => Self::from_arrow(array.as_primitive::<Date32Type>(), nullable),
345            DataType::Date64 => Self::from_arrow(array.as_primitive::<Date64Type>(), nullable),
346            DataType::Time32(u) => match u {
347                ArrowTimeUnit::Second => {
348                    Self::from_arrow(array.as_primitive::<Time32SecondType>(), nullable)
349                }
350                ArrowTimeUnit::Millisecond => {
351                    Self::from_arrow(array.as_primitive::<Time32MillisecondType>(), nullable)
352                }
353                _ => unreachable!(),
354            },
355            DataType::Time64(u) => match u {
356                ArrowTimeUnit::Microsecond => {
357                    Self::from_arrow(array.as_primitive::<Time64MicrosecondType>(), nullable)
358                }
359                ArrowTimeUnit::Nanosecond => {
360                    Self::from_arrow(array.as_primitive::<Time64NanosecondType>(), nullable)
361                }
362                _ => unreachable!(),
363            },
364            DataType::Decimal128(..) => {
365                Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
366            }
367            DataType::Decimal256(..) => {
368                Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
369            }
370            _ => vortex_panic!(
371                "Array encoding not implemented for Arrow data type {}",
372                array.data_type().clone()
373            ),
374        }
375    }
376}