1use arrow_array::array::{
2 Array as ArrowArray, ArrowPrimitiveType, BooleanArray as ArrowBooleanArray, GenericByteArray,
3 NullArray as ArrowNullArray, OffsetSizeTrait, PrimitiveArray as ArrowPrimitiveArray,
4 StructArray as ArrowStructArray,
5};
6use arrow_array::cast::{AsArray, as_null_array};
7use arrow_array::types::{
8 ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
9 Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
10 Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
11 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
12 TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
13};
14use arrow_array::{
15 BinaryViewArray, GenericByteViewArray, GenericListArray, StringViewArray, make_array,
16};
17use arrow_buffer::buffer::{NullBuffer, OffsetBuffer};
18use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer as ArrowBuffer, ScalarBuffer};
19use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit};
20use itertools::Itertools;
21use vortex_buffer::{Alignment, Buffer, ByteBuffer};
22use vortex_dtype::datetime::TimeUnit;
23use vortex_dtype::{DType, DecimalDType, NativePType, PType};
24use vortex_error::{VortexExpect as _, vortex_panic};
25use vortex_scalar::i256;
26
27use crate::arrays::{
28 BoolArray, DecimalArray, ListArray, NullArray, PrimitiveArray, StructArray, TemporalArray,
29 VarBinArray, VarBinViewArray,
30};
31use crate::arrow::FromArrowArray;
32use crate::validity::Validity;
33use crate::{ArrayRef, IntoArray};
34
35impl IntoArray for ArrowBuffer {
36 fn into_array(self) -> ArrayRef {
37 PrimitiveArray::from_byte_buffer(
38 ByteBuffer::from_arrow_buffer(self, Alignment::of::<u8>()),
39 PType::U8,
40 Validity::NonNullable,
41 )
42 .into_array()
43 }
44}
45
46impl IntoArray for BooleanBuffer {
47 fn into_array(self) -> ArrayRef {
48 BoolArray::new(self, Validity::NonNullable).into_array()
49 }
50}
51
52impl<T> IntoArray for ScalarBuffer<T>
53where
54 T: ArrowNativeType + NativePType,
55{
56 fn into_array(self) -> ArrayRef {
57 PrimitiveArray::new(
58 Buffer::<T>::from_arrow_scalar_buffer(self),
59 Validity::NonNullable,
60 )
61 .into_array()
62 }
63}
64
65impl<O> IntoArray for OffsetBuffer<O>
66where
67 O: NativePType + OffsetSizeTrait,
68{
69 fn into_array(self) -> ArrayRef {
70 let primitive = PrimitiveArray::new(
71 Buffer::from_arrow_scalar_buffer(self.into_inner()),
72 Validity::NonNullable,
73 );
74
75 primitive.into_array()
76 }
77}
78
79macro_rules! impl_from_arrow_primitive {
80 ($ty:path) => {
81 impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
82 fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
83 let buffer = Buffer::from_arrow_scalar_buffer(value.values().clone());
84 let validity = nulls(value.nulls(), nullable);
85 PrimitiveArray::new(buffer, validity).into_array()
86 }
87 }
88 };
89}
90
91impl_from_arrow_primitive!(Int8Type);
92impl_from_arrow_primitive!(Int16Type);
93impl_from_arrow_primitive!(Int32Type);
94impl_from_arrow_primitive!(Int64Type);
95impl_from_arrow_primitive!(UInt8Type);
96impl_from_arrow_primitive!(UInt16Type);
97impl_from_arrow_primitive!(UInt32Type);
98impl_from_arrow_primitive!(UInt64Type);
99impl_from_arrow_primitive!(Float16Type);
100impl_from_arrow_primitive!(Float32Type);
101impl_from_arrow_primitive!(Float64Type);
102
103impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
104 fn from_arrow(array: &ArrowPrimitiveArray<Decimal128Type>, nullable: bool) -> Self {
105 let decimal_type = DecimalDType::new(array.precision(), array.scale());
106 let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
107 let validity = nulls(array.nulls(), nullable);
108 DecimalArray::new(buffer, decimal_type, validity).into_array()
109 }
110}
111
112impl FromArrowArray<&ArrowPrimitiveArray<Decimal256Type>> for ArrayRef {
113 fn from_arrow(array: &ArrowPrimitiveArray<Decimal256Type>, nullable: bool) -> Self {
114 let decimal_type = DecimalDType::new(array.precision(), array.scale());
115 let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
116 let buffer =
120 unsafe { std::mem::transmute::<Buffer<arrow_buffer::i256>, Buffer<i256>>(buffer) };
121 let validity = nulls(array.nulls(), nullable);
122 DecimalArray::new(buffer, decimal_type, validity).into_array()
123 }
124}
125
126macro_rules! impl_from_arrow_temporal {
127 ($ty:path) => {
128 impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
129 fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
130 temporal_array(value, nullable)
131 }
132 }
133 };
134}
135
136impl_from_arrow_temporal!(TimestampSecondType);
138impl_from_arrow_temporal!(TimestampMillisecondType);
139impl_from_arrow_temporal!(TimestampMicrosecondType);
140impl_from_arrow_temporal!(TimestampNanosecondType);
141
142impl_from_arrow_temporal!(Time32SecondType);
144impl_from_arrow_temporal!(Time32MillisecondType);
145impl_from_arrow_temporal!(Time64MicrosecondType);
146impl_from_arrow_temporal!(Time64NanosecondType);
147
148impl_from_arrow_temporal!(Date32Type);
150impl_from_arrow_temporal!(Date64Type);
151
152fn temporal_array<T: ArrowPrimitiveType>(value: &ArrowPrimitiveArray<T>, nullable: bool) -> ArrayRef
153where
154 T::Native: NativePType,
155{
156 let arr = PrimitiveArray::new(
157 Buffer::from_arrow_scalar_buffer(value.values().clone()),
158 nulls(value.nulls(), nullable),
159 )
160 .into_array();
161
162 match T::DATA_TYPE {
163 DataType::Timestamp(time_unit, tz) => {
164 let tz = tz.map(|s| s.to_string());
165 TemporalArray::new_timestamp(arr, time_unit.into(), tz).into()
166 }
167 DataType::Time32(time_unit) => TemporalArray::new_time(arr, time_unit.into()).into(),
168 DataType::Time64(time_unit) => TemporalArray::new_time(arr, time_unit.into()).into(),
169 DataType::Date32 => TemporalArray::new_date(arr, TimeUnit::D).into(),
170 DataType::Date64 => TemporalArray::new_date(arr, TimeUnit::Ms).into(),
171 DataType::Duration(_) => unimplemented!(),
172 DataType::Interval(_) => unimplemented!(),
173 _ => vortex_panic!("Invalid temporal type: {}", T::DATA_TYPE),
174 }
175}
176
177impl<T: ByteArrayType> FromArrowArray<&GenericByteArray<T>> for ArrayRef
178where
179 <T as ByteArrayType>::Offset: NativePType,
180{
181 fn from_arrow(value: &GenericByteArray<T>, nullable: bool) -> Self {
182 let dtype = match T::DATA_TYPE {
183 DataType::Binary | DataType::LargeBinary => DType::Binary(nullable.into()),
184 DataType::Utf8 | DataType::LargeUtf8 => DType::Utf8(nullable.into()),
185 _ => vortex_panic!("Invalid data type for ByteArray: {}", T::DATA_TYPE),
186 };
187 VarBinArray::try_new(
188 value.offsets().clone().into_array(),
189 ByteBuffer::from_arrow_buffer(value.values().clone(), Alignment::of::<u8>()),
190 dtype,
191 nulls(value.nulls(), nullable),
192 )
193 .vortex_expect("Failed to convert Arrow GenericByteArray to Vortex VarBinArray")
194 .into_array()
195 }
196}
197
198impl<T: ByteViewType> FromArrowArray<&GenericByteViewArray<T>> for ArrayRef {
199 fn from_arrow(value: &GenericByteViewArray<T>, nullable: bool) -> Self {
200 let dtype = match T::DATA_TYPE {
201 DataType::BinaryView => DType::Binary(nullable.into()),
202 DataType::Utf8View => DType::Utf8(nullable.into()),
203 _ => vortex_panic!("Invalid data type for ByteViewArray: {}", T::DATA_TYPE),
204 };
205
206 let views_buffer = Buffer::from_byte_buffer(
207 Buffer::from_arrow_scalar_buffer(value.views().clone()).into_byte_buffer(),
208 );
209
210 VarBinViewArray::try_new(
211 views_buffer,
212 value
213 .data_buffers()
214 .iter()
215 .map(|b| ByteBuffer::from_arrow_buffer(b.clone(), Alignment::of::<u8>()))
216 .collect::<Vec<_>>(),
217 dtype,
218 nulls(value.nulls(), nullable),
219 )
220 .vortex_expect("Failed to convert Arrow GenericByteViewArray to Vortex VarBinViewArray")
221 .into_array()
222 }
223}
224
225impl FromArrowArray<&ArrowBooleanArray> for ArrayRef {
226 fn from_arrow(value: &ArrowBooleanArray, nullable: bool) -> Self {
227 BoolArray::new(value.values().clone(), nulls(value.nulls(), nullable)).into_array()
228 }
229}
230
231fn remove_nulls(data: arrow_data::ArrayData) -> arrow_data::ArrayData {
233 if data.null_count() == 0 {
234 return data;
236 }
237
238 let children = match data.data_type() {
239 DataType::Struct(fields) => Some(
240 fields
241 .iter()
242 .zip(data.child_data().iter())
243 .map(|(field, child_data)| {
244 if field.is_nullable() {
245 child_data.clone()
246 } else {
247 remove_nulls(child_data.clone())
248 }
249 })
250 .collect_vec(),
251 ),
252 DataType::List(f)
253 | DataType::LargeList(f)
254 | DataType::ListView(f)
255 | DataType::LargeListView(f)
256 | DataType::FixedSizeList(f, _)
257 if !f.is_nullable() =>
258 {
259 assert_eq!(
261 data.child_data().len(),
262 1,
263 "List types should have one child"
264 );
265 Some(vec![remove_nulls(data.child_data()[0].clone())])
266 }
267 _ => None,
268 };
269
270 let mut builder = data.into_builder().nulls(None);
271 if let Some(children) = children {
272 builder = builder.child_data(children);
273 }
274 builder
275 .build()
276 .vortex_expect("reconstructing array without nulls")
277}
278
279impl FromArrowArray<&ArrowStructArray> for ArrayRef {
280 fn from_arrow(value: &ArrowStructArray, nullable: bool) -> Self {
281 StructArray::try_new(
282 value.column_names().iter().copied().collect(),
283 value
284 .columns()
285 .iter()
286 .zip(value.fields())
287 .map(|(c, field)| {
288 if c.null_count() > 0 && !field.is_nullable() {
291 let stripped = make_array(remove_nulls(c.into_data()));
292 Self::from_arrow(stripped.as_ref(), false)
293 } else {
294 Self::from_arrow(c.as_ref(), field.is_nullable())
295 }
296 })
297 .collect(),
298 value.len(),
299 nulls(value.nulls(), nullable),
300 )
301 .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
302 .into_array()
303 }
304}
305
306impl<O: OffsetSizeTrait + NativePType> FromArrowArray<&GenericListArray<O>> for ArrayRef {
307 fn from_arrow(value: &GenericListArray<O>, nullable: bool) -> Self {
308 let elem_nullable = match value.data_type() {
310 DataType::List(field) => field.is_nullable(),
311 DataType::LargeList(field) => field.is_nullable(),
312 dt => vortex_panic!("Invalid data type for ListArray: {dt}"),
313 };
314 ListArray::try_new(
315 Self::from_arrow(value.values().as_ref(), elem_nullable),
316 value.offsets().clone().into_array(),
318 nulls(value.nulls(), nullable),
319 )
320 .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
321 .into_array()
322 }
323}
324
325impl FromArrowArray<&ArrowNullArray> for ArrayRef {
326 fn from_arrow(value: &ArrowNullArray, nullable: bool) -> Self {
327 assert!(nullable);
328 NullArray::new(value.len()).into_array()
329 }
330}
331
332fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity {
333 if nullable {
334 nulls
335 .map(|nulls| {
336 if nulls.null_count() == nulls.len() {
337 Validity::AllInvalid
338 } else {
339 Validity::from(nulls.inner().clone())
340 }
341 })
342 .unwrap_or_else(|| Validity::AllValid)
343 } else {
344 assert!(nulls.map(|x| x.null_count() == 0).unwrap_or(true));
345 Validity::NonNullable
346 }
347}
348
349impl FromArrowArray<&dyn ArrowArray> for ArrayRef {
350 fn from_arrow(array: &dyn ArrowArray, nullable: bool) -> Self {
351 match array.data_type() {
352 DataType::Boolean => Self::from_arrow(array.as_boolean(), nullable),
353 DataType::UInt8 => Self::from_arrow(array.as_primitive::<UInt8Type>(), nullable),
354 DataType::UInt16 => Self::from_arrow(array.as_primitive::<UInt16Type>(), nullable),
355 DataType::UInt32 => Self::from_arrow(array.as_primitive::<UInt32Type>(), nullable),
356 DataType::UInt64 => Self::from_arrow(array.as_primitive::<UInt64Type>(), nullable),
357 DataType::Int8 => Self::from_arrow(array.as_primitive::<Int8Type>(), nullable),
358 DataType::Int16 => Self::from_arrow(array.as_primitive::<Int16Type>(), nullable),
359 DataType::Int32 => Self::from_arrow(array.as_primitive::<Int32Type>(), nullable),
360 DataType::Int64 => Self::from_arrow(array.as_primitive::<Int64Type>(), nullable),
361 DataType::Float16 => Self::from_arrow(array.as_primitive::<Float16Type>(), nullable),
362 DataType::Float32 => Self::from_arrow(array.as_primitive::<Float32Type>(), nullable),
363 DataType::Float64 => Self::from_arrow(array.as_primitive::<Float64Type>(), nullable),
364 DataType::Utf8 => Self::from_arrow(array.as_string::<i32>(), nullable),
365 DataType::LargeUtf8 => Self::from_arrow(array.as_string::<i64>(), nullable),
366 DataType::Binary => Self::from_arrow(array.as_binary::<i32>(), nullable),
367 DataType::LargeBinary => Self::from_arrow(array.as_binary::<i64>(), nullable),
368 DataType::BinaryView => Self::from_arrow(
369 array
370 .as_any()
371 .downcast_ref::<BinaryViewArray>()
372 .vortex_expect("Expected Arrow BinaryViewArray for DataType::BinaryView"),
373 nullable,
374 ),
375 DataType::Utf8View => Self::from_arrow(
376 array
377 .as_any()
378 .downcast_ref::<StringViewArray>()
379 .vortex_expect("Expected Arrow StringViewArray for DataType::Utf8View"),
380 nullable,
381 ),
382 DataType::Struct(_) => Self::from_arrow(array.as_struct(), nullable),
383 DataType::List(_) => Self::from_arrow(array.as_list::<i32>(), nullable),
384 DataType::LargeList(_) => Self::from_arrow(array.as_list::<i64>(), nullable),
385 DataType::Null => Self::from_arrow(as_null_array(array), nullable),
386 DataType::Timestamp(u, _) => match u {
387 ArrowTimeUnit::Second => {
388 Self::from_arrow(array.as_primitive::<TimestampSecondType>(), nullable)
389 }
390 ArrowTimeUnit::Millisecond => {
391 Self::from_arrow(array.as_primitive::<TimestampMillisecondType>(), nullable)
392 }
393 ArrowTimeUnit::Microsecond => {
394 Self::from_arrow(array.as_primitive::<TimestampMicrosecondType>(), nullable)
395 }
396 ArrowTimeUnit::Nanosecond => {
397 Self::from_arrow(array.as_primitive::<TimestampNanosecondType>(), nullable)
398 }
399 },
400 DataType::Date32 => Self::from_arrow(array.as_primitive::<Date32Type>(), nullable),
401 DataType::Date64 => Self::from_arrow(array.as_primitive::<Date64Type>(), nullable),
402 DataType::Time32(u) => match u {
403 ArrowTimeUnit::Second => {
404 Self::from_arrow(array.as_primitive::<Time32SecondType>(), nullable)
405 }
406 ArrowTimeUnit::Millisecond => {
407 Self::from_arrow(array.as_primitive::<Time32MillisecondType>(), nullable)
408 }
409 _ => unreachable!(),
410 },
411 DataType::Time64(u) => match u {
412 ArrowTimeUnit::Microsecond => {
413 Self::from_arrow(array.as_primitive::<Time64MicrosecondType>(), nullable)
414 }
415 ArrowTimeUnit::Nanosecond => {
416 Self::from_arrow(array.as_primitive::<Time64NanosecondType>(), nullable)
417 }
418 _ => unreachable!(),
419 },
420 DataType::Decimal128(..) => {
421 Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
422 }
423 DataType::Decimal256(..) => {
424 Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
425 }
426 _ => vortex_panic!(
427 "Array encoding not implemented for Arrow data type {}",
428 array.data_type().clone()
429 ),
430 }
431 }
432}
433
434#[cfg(test)]
435mod tests {
436 use arrow_array::new_null_array;
437 use arrow_schema::{DataType, Field, Fields};
438
439 use crate::ArrayRef;
440 use crate::arrow::FromArrowArray as _;
441
442 #[test]
443 pub fn nullable_may_contain_non_nullable() {
444 let null_struct_array_with_non_nullable_field = new_null_array(
445 &DataType::Struct(Fields::from(vec![Field::new(
446 "non_nullable_inner",
447 DataType::Int32,
448 false,
449 )])),
450 1,
451 );
452 ArrayRef::from_arrow(null_struct_array_with_non_nullable_field.as_ref(), true);
453 }
454
455 #[test]
456 pub fn nullable_may_contain_deeply_nested_non_nullable() {
457 let null_struct_array_with_non_nullable_field = new_null_array(
458 &DataType::Struct(Fields::from(vec![Field::new(
459 "non_nullable_inner",
460 DataType::Struct(Fields::from(vec![Field::new(
461 "non_nullable_deeper_inner",
462 DataType::Int32,
463 false,
464 )])),
465 false,
466 )])),
467 1,
468 );
469 ArrayRef::from_arrow(null_struct_array_with_non_nullable_field.as_ref(), true);
470 }
471
472 #[test]
473 #[should_panic]
474 pub fn cannot_handle_nullable_struct_containing_non_nullable_dictionary() {
475 let null_struct_array_with_non_nullable_field = new_null_array(
476 &DataType::Struct(Fields::from(vec![Field::new(
477 "non_nullable_deeper_inner",
478 DataType::Dictionary(Box::new(DataType::Int32), Box::new(DataType::Utf8)),
479 false,
480 )])),
481 1,
482 );
483
484 ArrayRef::from_arrow(null_struct_array_with_non_nullable_field.as_ref(), true);
485 }
486}