1use arrow_array::array::{
2 Array as ArrowArray, ArrayRef as ArrowArrayRef, ArrowPrimitiveType,
3 BooleanArray as ArrowBooleanArray, GenericByteArray, NullArray as ArrowNullArray,
4 OffsetSizeTrait, PrimitiveArray as ArrowPrimitiveArray, StructArray as ArrowStructArray,
5};
6use arrow_array::cast::{AsArray, as_null_array};
7use arrow_array::types::{
8 ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
9 Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
10 Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
11 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
12 TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
13};
14use arrow_array::{BinaryViewArray, GenericByteViewArray, GenericListArray, StringViewArray};
15use arrow_buffer::buffer::{NullBuffer, OffsetBuffer};
16use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer as ArrowBuffer, ScalarBuffer};
17use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit};
18use vortex_buffer::{Alignment, Buffer, ByteBuffer};
19use vortex_dtype::datetime::TimeUnit;
20use vortex_dtype::{DType, DecimalDType, NativePType, PType};
21use vortex_error::{VortexExpect as _, vortex_panic};
22use vortex_scalar::i256;
23
24use crate::arrays::{
25 BoolArray, DecimalArray, ListArray, NullArray, PrimitiveArray, StructArray, TemporalArray,
26 VarBinArray, VarBinViewArray,
27};
28use crate::arrow::FromArrowArray;
29use crate::validity::Validity;
30use crate::{ArrayRef, IntoArray};
31
32impl IntoArray for ArrowBuffer {
33 fn into_array(self) -> ArrayRef {
34 PrimitiveArray::from_byte_buffer(
35 ByteBuffer::from_arrow_buffer(self, Alignment::of::<u8>()),
36 PType::U8,
37 Validity::NonNullable,
38 )
39 .into_array()
40 }
41}
42
43impl IntoArray for BooleanBuffer {
44 fn into_array(self) -> ArrayRef {
45 BoolArray::new(self, Validity::NonNullable).into_array()
46 }
47}
48
49impl<T> IntoArray for ScalarBuffer<T>
50where
51 T: ArrowNativeType + NativePType,
52{
53 fn into_array(self) -> ArrayRef {
54 PrimitiveArray::new(
55 Buffer::<T>::from_arrow_scalar_buffer(self),
56 Validity::NonNullable,
57 )
58 .into_array()
59 }
60}
61
62impl<O> IntoArray for OffsetBuffer<O>
63where
64 O: NativePType + OffsetSizeTrait,
65{
66 fn into_array(self) -> ArrayRef {
67 let primitive = PrimitiveArray::new(
68 Buffer::from_arrow_scalar_buffer(self.into_inner()),
69 Validity::NonNullable,
70 );
71
72 primitive.into_array()
73 }
74}
75
76macro_rules! impl_from_arrow_primitive {
77 ($ty:path) => {
78 impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
79 fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
80 let buffer = Buffer::from_arrow_scalar_buffer(value.values().clone());
81 let validity = nulls(value.nulls(), nullable);
82 PrimitiveArray::new(buffer, validity).into_array()
83 }
84 }
85 };
86}
87
88impl_from_arrow_primitive!(Int8Type);
89impl_from_arrow_primitive!(Int16Type);
90impl_from_arrow_primitive!(Int32Type);
91impl_from_arrow_primitive!(Int64Type);
92impl_from_arrow_primitive!(UInt8Type);
93impl_from_arrow_primitive!(UInt16Type);
94impl_from_arrow_primitive!(UInt32Type);
95impl_from_arrow_primitive!(UInt64Type);
96impl_from_arrow_primitive!(Float16Type);
97impl_from_arrow_primitive!(Float32Type);
98impl_from_arrow_primitive!(Float64Type);
99
100impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
101 fn from_arrow(array: &ArrowPrimitiveArray<Decimal128Type>, _nullable: bool) -> Self {
102 let decimal_type = DecimalDType::new(array.precision(), array.scale());
103 let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
104 let validity = nulls(array.nulls(), false);
105 DecimalArray::new(buffer, decimal_type, validity).into_array()
106 }
107}
108
109impl FromArrowArray<&ArrowPrimitiveArray<Decimal256Type>> for ArrayRef {
110 fn from_arrow(array: &ArrowPrimitiveArray<Decimal256Type>, _nullable: bool) -> Self {
111 let decimal_type = DecimalDType::new(array.precision(), array.scale());
112 let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
113 let buffer =
117 unsafe { std::mem::transmute::<Buffer<arrow_buffer::i256>, Buffer<i256>>(buffer) };
118 let validity = nulls(array.nulls(), false);
119 DecimalArray::new(buffer, decimal_type, validity).into_array()
120 }
121}
122
123macro_rules! impl_from_arrow_temporal {
124 ($ty:path) => {
125 impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
126 fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
127 temporal_array(value, nullable)
128 }
129 }
130 };
131}
132
133impl_from_arrow_temporal!(TimestampSecondType);
135impl_from_arrow_temporal!(TimestampMillisecondType);
136impl_from_arrow_temporal!(TimestampMicrosecondType);
137impl_from_arrow_temporal!(TimestampNanosecondType);
138
139impl_from_arrow_temporal!(Time32SecondType);
141impl_from_arrow_temporal!(Time32MillisecondType);
142impl_from_arrow_temporal!(Time64MicrosecondType);
143impl_from_arrow_temporal!(Time64NanosecondType);
144
145impl_from_arrow_temporal!(Date32Type);
147impl_from_arrow_temporal!(Date64Type);
148
149fn temporal_array<T: ArrowPrimitiveType>(value: &ArrowPrimitiveArray<T>, nullable: bool) -> ArrayRef
150where
151 T::Native: NativePType,
152{
153 let arr = PrimitiveArray::new(
154 Buffer::from_arrow_scalar_buffer(value.values().clone()),
155 nulls(value.nulls(), nullable),
156 )
157 .into_array();
158
159 match T::DATA_TYPE {
160 DataType::Timestamp(time_unit, tz) => {
161 let tz = tz.map(|s| s.to_string());
162 TemporalArray::new_timestamp(arr, time_unit.into(), tz).into()
163 }
164 DataType::Time32(time_unit) => TemporalArray::new_time(arr, time_unit.into()).into(),
165 DataType::Time64(time_unit) => TemporalArray::new_time(arr, time_unit.into()).into(),
166 DataType::Date32 => TemporalArray::new_date(arr, TimeUnit::D).into(),
167 DataType::Date64 => TemporalArray::new_date(arr, TimeUnit::Ms).into(),
168 DataType::Duration(_) => unimplemented!(),
169 DataType::Interval(_) => unimplemented!(),
170 _ => vortex_panic!("Invalid temporal type: {}", T::DATA_TYPE),
171 }
172}
173
174impl<T: ByteArrayType> FromArrowArray<&GenericByteArray<T>> for ArrayRef
175where
176 <T as ByteArrayType>::Offset: NativePType,
177{
178 fn from_arrow(value: &GenericByteArray<T>, nullable: bool) -> Self {
179 let dtype = match T::DATA_TYPE {
180 DataType::Binary | DataType::LargeBinary => DType::Binary(nullable.into()),
181 DataType::Utf8 | DataType::LargeUtf8 => DType::Utf8(nullable.into()),
182 _ => vortex_panic!("Invalid data type for ByteArray: {}", T::DATA_TYPE),
183 };
184 VarBinArray::try_new(
185 value.offsets().clone().into_array(),
186 ByteBuffer::from_arrow_buffer(value.values().clone(), Alignment::of::<u8>()),
187 dtype,
188 nulls(value.nulls(), nullable),
189 )
190 .vortex_expect("Failed to convert Arrow GenericByteArray to Vortex VarBinArray")
191 .into_array()
192 }
193}
194
195impl<T: ByteViewType> FromArrowArray<&GenericByteViewArray<T>> for ArrayRef {
196 fn from_arrow(value: &GenericByteViewArray<T>, nullable: bool) -> Self {
197 let dtype = match T::DATA_TYPE {
198 DataType::BinaryView => DType::Binary(nullable.into()),
199 DataType::Utf8View => DType::Utf8(nullable.into()),
200 _ => vortex_panic!("Invalid data type for ByteViewArray: {}", T::DATA_TYPE),
201 };
202
203 let views_buffer = Buffer::from_byte_buffer(
204 Buffer::from_arrow_scalar_buffer(value.views().clone()).into_byte_buffer(),
205 );
206
207 VarBinViewArray::try_new(
208 views_buffer,
209 value
210 .data_buffers()
211 .iter()
212 .map(|b| ByteBuffer::from_arrow_buffer(b.clone(), Alignment::of::<u8>()))
213 .collect::<Vec<_>>(),
214 dtype,
215 nulls(value.nulls(), nullable),
216 )
217 .vortex_expect("Failed to convert Arrow GenericByteViewArray to Vortex VarBinViewArray")
218 .into_array()
219 }
220}
221
222impl FromArrowArray<&ArrowBooleanArray> for ArrayRef {
223 fn from_arrow(value: &ArrowBooleanArray, nullable: bool) -> Self {
224 BoolArray::new(value.values().clone(), nulls(value.nulls(), nullable)).into_array()
225 }
226}
227
228impl FromArrowArray<&ArrowStructArray> for ArrayRef {
229 fn from_arrow(value: &ArrowStructArray, nullable: bool) -> Self {
230 StructArray::try_new(
231 value.column_names().iter().map(|s| (*s).into()).collect(),
232 value
233 .columns()
234 .iter()
235 .zip(value.fields())
236 .map(|(c, field)| Self::from_arrow(c.clone(), field.is_nullable()))
237 .collect(),
238 value.len(),
239 nulls(value.nulls(), nullable),
240 )
241 .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
242 .into_array()
243 }
244}
245
246impl<O: OffsetSizeTrait + NativePType> FromArrowArray<&GenericListArray<O>> for ArrayRef {
247 fn from_arrow(value: &GenericListArray<O>, nullable: bool) -> Self {
248 let elem_nullable = match value.data_type() {
250 DataType::List(field) => field.is_nullable(),
251 DataType::LargeList(field) => field.is_nullable(),
252 dt => vortex_panic!("Invalid data type for ListArray: {dt}"),
253 };
254 ListArray::try_new(
255 Self::from_arrow(value.values().clone(), elem_nullable),
256 value.offsets().clone().into_array(),
258 nulls(value.nulls(), nullable),
259 )
260 .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
261 .into_array()
262 }
263}
264
265impl FromArrowArray<&ArrowNullArray> for ArrayRef {
266 fn from_arrow(value: &ArrowNullArray, nullable: bool) -> Self {
267 assert!(nullable);
268 NullArray::new(value.len()).into_array()
269 }
270}
271
272fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity {
273 if nullable {
274 nulls
275 .map(|nulls| {
276 if nulls.null_count() == nulls.len() {
277 Validity::AllInvalid
278 } else {
279 Validity::from(nulls.inner().clone())
280 }
281 })
282 .unwrap_or_else(|| Validity::AllValid)
283 } else {
284 assert!(nulls.map(|x| x.null_count() == 0).unwrap_or(true));
285 Validity::NonNullable
286 }
287}
288
289impl FromArrowArray<ArrowArrayRef> for ArrayRef {
290 fn from_arrow(array: ArrowArrayRef, nullable: bool) -> Self {
291 match array.data_type() {
292 DataType::Boolean => Self::from_arrow(array.as_boolean(), nullable),
293 DataType::UInt8 => Self::from_arrow(array.as_primitive::<UInt8Type>(), nullable),
294 DataType::UInt16 => Self::from_arrow(array.as_primitive::<UInt16Type>(), nullable),
295 DataType::UInt32 => Self::from_arrow(array.as_primitive::<UInt32Type>(), nullable),
296 DataType::UInt64 => Self::from_arrow(array.as_primitive::<UInt64Type>(), nullable),
297 DataType::Int8 => Self::from_arrow(array.as_primitive::<Int8Type>(), nullable),
298 DataType::Int16 => Self::from_arrow(array.as_primitive::<Int16Type>(), nullable),
299 DataType::Int32 => Self::from_arrow(array.as_primitive::<Int32Type>(), nullable),
300 DataType::Int64 => Self::from_arrow(array.as_primitive::<Int64Type>(), nullable),
301 DataType::Float16 => Self::from_arrow(array.as_primitive::<Float16Type>(), nullable),
302 DataType::Float32 => Self::from_arrow(array.as_primitive::<Float32Type>(), nullable),
303 DataType::Float64 => Self::from_arrow(array.as_primitive::<Float64Type>(), nullable),
304 DataType::Utf8 => Self::from_arrow(array.as_string::<i32>(), nullable),
305 DataType::LargeUtf8 => Self::from_arrow(array.as_string::<i64>(), nullable),
306 DataType::Binary => Self::from_arrow(array.as_binary::<i32>(), nullable),
307 DataType::LargeBinary => Self::from_arrow(array.as_binary::<i64>(), nullable),
308 DataType::BinaryView => Self::from_arrow(
309 array
310 .as_any()
311 .downcast_ref::<BinaryViewArray>()
312 .vortex_expect("Expected Arrow BinaryViewArray for DataType::BinaryView"),
313 nullable,
314 ),
315 DataType::Utf8View => Self::from_arrow(
316 array
317 .as_any()
318 .downcast_ref::<StringViewArray>()
319 .vortex_expect("Expected Arrow StringViewArray for DataType::Utf8View"),
320 nullable,
321 ),
322 DataType::Struct(_) => Self::from_arrow(array.as_struct(), nullable),
323 DataType::List(_) => Self::from_arrow(array.as_list::<i32>(), nullable),
324 DataType::LargeList(_) => Self::from_arrow(array.as_list::<i64>(), nullable),
325 DataType::Null => Self::from_arrow(as_null_array(&array), nullable),
326 DataType::Timestamp(u, _) => match u {
327 ArrowTimeUnit::Second => {
328 Self::from_arrow(array.as_primitive::<TimestampSecondType>(), nullable)
329 }
330 ArrowTimeUnit::Millisecond => {
331 Self::from_arrow(array.as_primitive::<TimestampMillisecondType>(), nullable)
332 }
333 ArrowTimeUnit::Microsecond => {
334 Self::from_arrow(array.as_primitive::<TimestampMicrosecondType>(), nullable)
335 }
336 ArrowTimeUnit::Nanosecond => {
337 Self::from_arrow(array.as_primitive::<TimestampNanosecondType>(), nullable)
338 }
339 },
340 DataType::Date32 => Self::from_arrow(array.as_primitive::<Date32Type>(), nullable),
341 DataType::Date64 => Self::from_arrow(array.as_primitive::<Date64Type>(), nullable),
342 DataType::Time32(u) => match u {
343 ArrowTimeUnit::Second => {
344 Self::from_arrow(array.as_primitive::<Time32SecondType>(), nullable)
345 }
346 ArrowTimeUnit::Millisecond => {
347 Self::from_arrow(array.as_primitive::<Time32MillisecondType>(), nullable)
348 }
349 _ => unreachable!(),
350 },
351 DataType::Time64(u) => match u {
352 ArrowTimeUnit::Microsecond => {
353 Self::from_arrow(array.as_primitive::<Time64MicrosecondType>(), nullable)
354 }
355 ArrowTimeUnit::Nanosecond => {
356 Self::from_arrow(array.as_primitive::<Time64NanosecondType>(), nullable)
357 }
358 _ => unreachable!(),
359 },
360 DataType::Decimal128(..) => {
361 Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
362 }
363 DataType::Decimal256(..) => {
364 Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
365 }
366 _ => vortex_panic!(
367 "Array encoding not implemented for Arrow data type {}",
368 array.data_type().clone()
369 ),
370 }
371 }
372}