vortex_array/arrow/
array.rs

1use arrow_array::array::{
2    Array as ArrowArray, ArrayRef as ArrowArrayRef, ArrowPrimitiveType,
3    BooleanArray as ArrowBooleanArray, GenericByteArray, NullArray as ArrowNullArray,
4    OffsetSizeTrait, PrimitiveArray as ArrowPrimitiveArray, StructArray as ArrowStructArray,
5};
6use arrow_array::cast::{AsArray, as_null_array};
7use arrow_array::types::{
8    ByteArrayType, ByteViewType, Date32Type, Date64Type, DurationMicrosecondType,
9    DurationMillisecondType, DurationNanosecondType, DurationSecondType, Float16Type, Float32Type,
10    Float64Type, Int8Type, Int16Type, Int32Type, Int64Type, Time32MillisecondType,
11    Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimestampMicrosecondType,
12    TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt8Type, UInt16Type,
13    UInt32Type, UInt64Type,
14};
15use arrow_array::{BinaryViewArray, GenericByteViewArray, GenericListArray, StringViewArray};
16use arrow_buffer::buffer::{NullBuffer, OffsetBuffer};
17use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer as ArrowBuffer, ScalarBuffer};
18use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit};
19use vortex_buffer::{Alignment, Buffer, ByteBuffer};
20use vortex_datetime_dtype::TimeUnit;
21use vortex_dtype::{DType, NativePType, PType};
22use vortex_error::{VortexExpect as _, vortex_panic};
23
24use crate::arrays::{
25    BoolArray, ListArray, NullArray, PrimitiveArray, StructArray, TemporalArray, VarBinArray,
26    VarBinViewArray,
27};
28use crate::arrow::FromArrowArray;
29use crate::validity::Validity;
30use crate::{Array, ArrayRef, IntoArray};
31
32impl IntoArray for ArrowBuffer {
33    fn into_array(self) -> ArrayRef {
34        PrimitiveArray::from_byte_buffer(
35            ByteBuffer::from_arrow_buffer(self, Alignment::of::<u8>()),
36            PType::U8,
37            Validity::NonNullable,
38        )
39        .into_array()
40    }
41}
42
43impl IntoArray for BooleanBuffer {
44    fn into_array(self) -> ArrayRef {
45        BoolArray::new(self, Validity::NonNullable).into_array()
46    }
47}
48
49impl<T> IntoArray for ScalarBuffer<T>
50where
51    T: ArrowNativeType + NativePType,
52{
53    fn into_array(self) -> ArrayRef {
54        PrimitiveArray::new(
55            Buffer::<T>::from_arrow_scalar_buffer(self),
56            Validity::NonNullable,
57        )
58        .into_array()
59    }
60}
61
62impl<O> IntoArray for OffsetBuffer<O>
63where
64    O: NativePType + OffsetSizeTrait,
65{
66    fn into_array(self) -> ArrayRef {
67        let primitive = PrimitiveArray::new(
68            Buffer::from_arrow_scalar_buffer(self.into_inner()),
69            Validity::NonNullable,
70        );
71        // primitive.update_statistic(Stat::IsSorted, Precision::exact(true));
72        // primitive.update_statistic(Stat::IsStrictSorted, Precision::exact(true));
73        primitive.into_array()
74    }
75}
76
77impl<T: ArrowPrimitiveType> FromArrowArray<&ArrowPrimitiveArray<T>> for ArrayRef
78where
79    <T as ArrowPrimitiveType>::Native: NativePType,
80{
81    fn from_arrow(value: &ArrowPrimitiveArray<T>, nullable: bool) -> Self {
82        let arr = PrimitiveArray::new(
83            Buffer::from_arrow_scalar_buffer(value.values().clone()),
84            nulls(value.nulls(), nullable),
85        );
86
87        if T::DATA_TYPE.is_numeric() {
88            return arr.into_array();
89        }
90
91        match T::DATA_TYPE {
92            DataType::Timestamp(time_unit, tz) => {
93                let tz = tz.map(|s| s.to_string());
94                TemporalArray::new_timestamp(arr.into_array(), time_unit.into(), tz).into()
95            }
96            DataType::Time32(time_unit) => {
97                TemporalArray::new_time(arr.into_array(), time_unit.into()).into()
98            }
99            DataType::Time64(time_unit) => {
100                TemporalArray::new_time(arr.into_array(), time_unit.into()).into()
101            }
102            DataType::Date32 => TemporalArray::new_date(arr.into_array(), TimeUnit::D).into(),
103            DataType::Date64 => TemporalArray::new_date(arr.into_array(), TimeUnit::Ms).into(),
104            DataType::Duration(_) => unimplemented!(),
105            DataType::Interval(_) => unimplemented!(),
106            _ => vortex_panic!("Invalid data type for PrimitiveArray: {}", T::DATA_TYPE),
107        }
108    }
109}
110
111impl<T: ByteArrayType> FromArrowArray<&GenericByteArray<T>> for ArrayRef
112where
113    <T as ByteArrayType>::Offset: NativePType,
114{
115    fn from_arrow(value: &GenericByteArray<T>, nullable: bool) -> Self {
116        let dtype = match T::DATA_TYPE {
117            DataType::Binary | DataType::LargeBinary => DType::Binary(nullable.into()),
118            DataType::Utf8 | DataType::LargeUtf8 => DType::Utf8(nullable.into()),
119            _ => vortex_panic!("Invalid data type for ByteArray: {}", T::DATA_TYPE),
120        };
121        VarBinArray::try_new(
122            value.offsets().clone().into_array(),
123            ByteBuffer::from_arrow_buffer(value.values().clone(), Alignment::of::<u8>()),
124            dtype,
125            nulls(value.nulls(), nullable),
126        )
127        .vortex_expect("Failed to convert Arrow GenericByteArray to Vortex VarBinArray")
128        .into_array()
129    }
130}
131
132impl<T: ByteViewType> FromArrowArray<&GenericByteViewArray<T>> for ArrayRef {
133    fn from_arrow(value: &GenericByteViewArray<T>, nullable: bool) -> Self {
134        let dtype = match T::DATA_TYPE {
135            DataType::BinaryView => DType::Binary(nullable.into()),
136            DataType::Utf8View => DType::Utf8(nullable.into()),
137            _ => vortex_panic!("Invalid data type for ByteViewArray: {}", T::DATA_TYPE),
138        };
139
140        let views_buffer = Buffer::from_byte_buffer(
141            Buffer::from_arrow_scalar_buffer(value.views().clone()).into_byte_buffer(),
142        );
143
144        VarBinViewArray::try_new(
145            views_buffer,
146            value
147                .data_buffers()
148                .iter()
149                .map(|b| ByteBuffer::from_arrow_buffer(b.clone(), Alignment::of::<u8>()))
150                .collect::<Vec<_>>(),
151            dtype,
152            nulls(value.nulls(), nullable),
153        )
154        .vortex_expect("Failed to convert Arrow GenericByteViewArray to Vortex VarBinViewArray")
155        .into_array()
156    }
157}
158
159impl FromArrowArray<&ArrowBooleanArray> for ArrayRef {
160    fn from_arrow(value: &ArrowBooleanArray, nullable: bool) -> Self {
161        BoolArray::new(value.values().clone(), nulls(value.nulls(), nullable)).into_array()
162    }
163}
164
165impl FromArrowArray<&ArrowStructArray> for ArrayRef {
166    fn from_arrow(value: &ArrowStructArray, nullable: bool) -> Self {
167        StructArray::try_new(
168            value.column_names().iter().map(|s| (*s).into()).collect(),
169            value
170                .columns()
171                .iter()
172                .zip(value.fields())
173                .map(|(c, field)| Self::from_arrow(c.clone(), field.is_nullable()))
174                .collect(),
175            value.len(),
176            nulls(value.nulls(), nullable),
177        )
178        .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
179        .into_array()
180    }
181}
182
183impl<O: OffsetSizeTrait + NativePType> FromArrowArray<&GenericListArray<O>> for ArrayRef {
184    fn from_arrow(value: &GenericListArray<O>, nullable: bool) -> Self {
185        // Extract the validity of the underlying element array
186        let elem_nullable = match value.data_type() {
187            DataType::List(field) => field.is_nullable(),
188            DataType::LargeList(field) => field.is_nullable(),
189            dt => vortex_panic!("Invalid data type for ListArray: {dt}"),
190        };
191        ListArray::try_new(
192            Self::from_arrow(value.values().clone(), elem_nullable),
193            // offsets are always non-nullable
194            value.offsets().clone().into_array(),
195            nulls(value.nulls(), nullable),
196        )
197        .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
198        .into_array()
199    }
200}
201
202impl FromArrowArray<&ArrowNullArray> for ArrayRef {
203    fn from_arrow(value: &ArrowNullArray, nullable: bool) -> Self {
204        assert!(nullable);
205        NullArray::new(value.len()).into_array()
206    }
207}
208
209fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity {
210    if nullable {
211        nulls
212            .map(|nulls| {
213                if nulls.null_count() == nulls.len() {
214                    Validity::AllInvalid
215                } else {
216                    Validity::from(nulls.inner().clone())
217                }
218            })
219            .unwrap_or_else(|| Validity::AllValid)
220    } else {
221        assert!(nulls.map(|x| x.null_count() == 0).unwrap_or(true));
222        Validity::NonNullable
223    }
224}
225
226impl FromArrowArray<ArrowArrayRef> for ArrayRef {
227    fn from_arrow(array: ArrowArrayRef, nullable: bool) -> Self {
228        match array.data_type() {
229            DataType::Boolean => Self::from_arrow(array.as_boolean(), nullable),
230            DataType::UInt8 => Self::from_arrow(array.as_primitive::<UInt8Type>(), nullable),
231            DataType::UInt16 => Self::from_arrow(array.as_primitive::<UInt16Type>(), nullable),
232            DataType::UInt32 => Self::from_arrow(array.as_primitive::<UInt32Type>(), nullable),
233            DataType::UInt64 => Self::from_arrow(array.as_primitive::<UInt64Type>(), nullable),
234            DataType::Int8 => Self::from_arrow(array.as_primitive::<Int8Type>(), nullable),
235            DataType::Int16 => Self::from_arrow(array.as_primitive::<Int16Type>(), nullable),
236            DataType::Int32 => Self::from_arrow(array.as_primitive::<Int32Type>(), nullable),
237            DataType::Int64 => Self::from_arrow(array.as_primitive::<Int64Type>(), nullable),
238            DataType::Float16 => Self::from_arrow(array.as_primitive::<Float16Type>(), nullable),
239            DataType::Float32 => Self::from_arrow(array.as_primitive::<Float32Type>(), nullable),
240            DataType::Float64 => Self::from_arrow(array.as_primitive::<Float64Type>(), nullable),
241            DataType::Utf8 => Self::from_arrow(array.as_string::<i32>(), nullable),
242            DataType::LargeUtf8 => Self::from_arrow(array.as_string::<i64>(), nullable),
243            DataType::Binary => Self::from_arrow(array.as_binary::<i32>(), nullable),
244            DataType::LargeBinary => Self::from_arrow(array.as_binary::<i64>(), nullable),
245            DataType::BinaryView => Self::from_arrow(
246                array
247                    .as_any()
248                    .downcast_ref::<BinaryViewArray>()
249                    .vortex_expect("Expected Arrow BinaryViewArray for DataType::BinaryView"),
250                nullable,
251            ),
252            DataType::Utf8View => Self::from_arrow(
253                array
254                    .as_any()
255                    .downcast_ref::<StringViewArray>()
256                    .vortex_expect("Expected Arrow StringViewArray for DataType::Utf8View"),
257                nullable,
258            ),
259            DataType::Struct(_) => Self::from_arrow(array.as_struct(), nullable),
260            DataType::List(_) => Self::from_arrow(array.as_list::<i32>(), nullable),
261            DataType::LargeList(_) => Self::from_arrow(array.as_list::<i64>(), nullable),
262            DataType::Null => Self::from_arrow(as_null_array(&array), nullable),
263            DataType::Timestamp(u, _) => match u {
264                ArrowTimeUnit::Second => {
265                    Self::from_arrow(array.as_primitive::<TimestampSecondType>(), nullable)
266                }
267                ArrowTimeUnit::Millisecond => {
268                    Self::from_arrow(array.as_primitive::<TimestampMillisecondType>(), nullable)
269                }
270                ArrowTimeUnit::Microsecond => {
271                    Self::from_arrow(array.as_primitive::<TimestampMicrosecondType>(), nullable)
272                }
273                ArrowTimeUnit::Nanosecond => {
274                    Self::from_arrow(array.as_primitive::<TimestampNanosecondType>(), nullable)
275                }
276            },
277            DataType::Date32 => Self::from_arrow(array.as_primitive::<Date32Type>(), nullable),
278            DataType::Date64 => Self::from_arrow(array.as_primitive::<Date64Type>(), nullable),
279            DataType::Time32(u) => match u {
280                ArrowTimeUnit::Second => {
281                    Self::from_arrow(array.as_primitive::<Time32SecondType>(), nullable)
282                }
283                ArrowTimeUnit::Millisecond => {
284                    Self::from_arrow(array.as_primitive::<Time32MillisecondType>(), nullable)
285                }
286                _ => unreachable!(),
287            },
288            DataType::Time64(u) => match u {
289                ArrowTimeUnit::Microsecond => {
290                    Self::from_arrow(array.as_primitive::<Time64MicrosecondType>(), nullable)
291                }
292                ArrowTimeUnit::Nanosecond => {
293                    Self::from_arrow(array.as_primitive::<Time64NanosecondType>(), nullable)
294                }
295                _ => unreachable!(),
296            },
297            DataType::Duration(u) => match u {
298                ArrowTimeUnit::Second => {
299                    Self::from_arrow(array.as_primitive::<DurationSecondType>(), nullable)
300                }
301                ArrowTimeUnit::Millisecond => {
302                    Self::from_arrow(array.as_primitive::<DurationMillisecondType>(), nullable)
303                }
304                ArrowTimeUnit::Microsecond => {
305                    Self::from_arrow(array.as_primitive::<DurationMicrosecondType>(), nullable)
306                }
307                ArrowTimeUnit::Nanosecond => {
308                    Self::from_arrow(array.as_primitive::<DurationNanosecondType>(), nullable)
309                }
310            },
311            _ => vortex_panic!(
312                "Array encoding not implemented for Arrow data type {}",
313                array.data_type().clone()
314            ),
315        }
316    }
317}