1use arrow_array::array::{
2 Array as ArrowArray, ArrayRef as ArrowArrayRef, ArrowPrimitiveType,
3 BooleanArray as ArrowBooleanArray, GenericByteArray, NullArray as ArrowNullArray,
4 OffsetSizeTrait, PrimitiveArray as ArrowPrimitiveArray, StructArray as ArrowStructArray,
5};
6use arrow_array::cast::{AsArray, as_null_array};
7use arrow_array::types::{
8 ByteArrayType, ByteViewType, Date32Type, Date64Type, Decimal128Type, Decimal256Type,
9 Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, Int64Type,
10 Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
11 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
12 TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
13};
14use arrow_array::{BinaryViewArray, GenericByteViewArray, GenericListArray, StringViewArray};
15use arrow_buffer::buffer::{NullBuffer, OffsetBuffer};
16use arrow_buffer::{ArrowNativeType, BooleanBuffer, Buffer as ArrowBuffer, ScalarBuffer};
17use arrow_schema::{DataType, TimeUnit as ArrowTimeUnit};
18use vortex_buffer::{Alignment, Buffer, ByteBuffer};
19use vortex_dtype::datetime::TimeUnit;
20use vortex_dtype::{DType, DecimalDType, NativePType, PType};
21use vortex_error::{VortexExpect as _, vortex_panic};
22use vortex_scalar::i256;
23
24use crate::arrays::{
25 BoolArray, DecimalArray, ListArray, NullArray, PrimitiveArray, StructArray, TemporalArray,
26 VarBinArray, VarBinViewArray,
27};
28use crate::arrow::FromArrowArray;
29use crate::validity::Validity;
30use crate::{Array, ArrayRef, IntoArray};
31
32impl IntoArray for ArrowBuffer {
33 fn into_array(self) -> ArrayRef {
34 PrimitiveArray::from_byte_buffer(
35 ByteBuffer::from_arrow_buffer(self, Alignment::of::<u8>()),
36 PType::U8,
37 Validity::NonNullable,
38 )
39 .into_array()
40 }
41}
42
43impl IntoArray for BooleanBuffer {
44 fn into_array(self) -> ArrayRef {
45 BoolArray::new(self, Validity::NonNullable).into_array()
46 }
47}
48
49impl<T> IntoArray for ScalarBuffer<T>
50where
51 T: ArrowNativeType + NativePType,
52{
53 fn into_array(self) -> ArrayRef {
54 PrimitiveArray::new(
55 Buffer::<T>::from_arrow_scalar_buffer(self),
56 Validity::NonNullable,
57 )
58 .into_array()
59 }
60}
61
62impl<O> IntoArray for OffsetBuffer<O>
63where
64 O: NativePType + OffsetSizeTrait,
65{
66 fn into_array(self) -> ArrayRef {
67 let primitive = PrimitiveArray::new(
68 Buffer::from_arrow_scalar_buffer(self.into_inner()),
69 Validity::NonNullable,
70 );
71
72 primitive.into_array()
73 }
74}
75
76macro_rules! impl_from_arrow_primitive {
77 ($ty:path) => {
78 impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
79 fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
80 let buffer = Buffer::from_arrow_scalar_buffer(value.values().clone());
81 let validity = nulls(value.nulls(), nullable);
82 PrimitiveArray::new(buffer, validity).into_array()
83 }
84 }
85 };
86}
87
88impl_from_arrow_primitive!(Int8Type);
89impl_from_arrow_primitive!(Int16Type);
90impl_from_arrow_primitive!(Int32Type);
91impl_from_arrow_primitive!(Int64Type);
92impl_from_arrow_primitive!(UInt8Type);
93impl_from_arrow_primitive!(UInt16Type);
94impl_from_arrow_primitive!(UInt32Type);
95impl_from_arrow_primitive!(UInt64Type);
96impl_from_arrow_primitive!(Float16Type);
97impl_from_arrow_primitive!(Float32Type);
98impl_from_arrow_primitive!(Float64Type);
99
100impl FromArrowArray<&ArrowPrimitiveArray<Decimal128Type>> for ArrayRef {
101 fn from_arrow(array: &ArrowPrimitiveArray<Decimal128Type>, _nullable: bool) -> Self {
102 let decimal_type = DecimalDType::new(array.precision(), array.scale());
103 let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
104 let validity = nulls(array.nulls(), false);
105 DecimalArray::new(buffer, decimal_type, validity).into_array()
106 }
107}
108
109impl FromArrowArray<&ArrowPrimitiveArray<Decimal256Type>> for ArrayRef {
110 fn from_arrow(array: &ArrowPrimitiveArray<Decimal256Type>, _nullable: bool) -> Self {
111 let decimal_type = DecimalDType::new(array.precision(), array.scale());
112 let buffer = Buffer::from_arrow_scalar_buffer(array.values().clone());
113 let buffer =
117 unsafe { std::mem::transmute::<Buffer<arrow_buffer::i256>, Buffer<i256>>(buffer) };
118 let validity = nulls(array.nulls(), false);
119 DecimalArray::new(buffer, decimal_type, validity).into_array()
120 }
121}
122
123macro_rules! impl_from_arrow_temporal {
124 ($ty:path) => {
125 impl FromArrowArray<&ArrowPrimitiveArray<$ty>> for ArrayRef {
126 fn from_arrow(value: &ArrowPrimitiveArray<$ty>, nullable: bool) -> Self {
127 temporal_array(value, nullable)
128 }
129 }
130 };
131}
132
133impl_from_arrow_temporal!(TimestampSecondType);
135impl_from_arrow_temporal!(TimestampMillisecondType);
136impl_from_arrow_temporal!(TimestampMicrosecondType);
137impl_from_arrow_temporal!(TimestampNanosecondType);
138
139impl_from_arrow_temporal!(Time32SecondType);
141impl_from_arrow_temporal!(Time32MillisecondType);
142impl_from_arrow_temporal!(Time64MicrosecondType);
143impl_from_arrow_temporal!(Time64NanosecondType);
144
145impl_from_arrow_temporal!(Date32Type);
147impl_from_arrow_temporal!(Date64Type);
148
149fn temporal_array<T: ArrowPrimitiveType>(value: &ArrowPrimitiveArray<T>, nullable: bool) -> ArrayRef
150where
151 T::Native: NativePType,
152{
153 let arr = PrimitiveArray::new(
154 Buffer::from_arrow_scalar_buffer(value.values().clone()),
155 nulls(value.nulls(), nullable),
156 )
157 .into_array();
158
159 match T::DATA_TYPE {
160 DataType::Timestamp(time_unit, tz) => {
161 let tz = tz.map(|s| s.to_string());
162 TemporalArray::new_timestamp(arr.into_array(), time_unit.into(), tz).into()
163 }
164 DataType::Time32(time_unit) => {
165 TemporalArray::new_time(arr.into_array(), time_unit.into()).into()
166 }
167 DataType::Time64(time_unit) => {
168 TemporalArray::new_time(arr.into_array(), time_unit.into()).into()
169 }
170 DataType::Date32 => TemporalArray::new_date(arr.into_array(), TimeUnit::D).into(),
171 DataType::Date64 => TemporalArray::new_date(arr.into_array(), TimeUnit::Ms).into(),
172 DataType::Duration(_) => unimplemented!(),
173 DataType::Interval(_) => unimplemented!(),
174 _ => vortex_panic!("Invalid temporal type: {}", T::DATA_TYPE),
175 }
176}
177
178impl<T: ByteArrayType> FromArrowArray<&GenericByteArray<T>> for ArrayRef
179where
180 <T as ByteArrayType>::Offset: NativePType,
181{
182 fn from_arrow(value: &GenericByteArray<T>, nullable: bool) -> Self {
183 let dtype = match T::DATA_TYPE {
184 DataType::Binary | DataType::LargeBinary => DType::Binary(nullable.into()),
185 DataType::Utf8 | DataType::LargeUtf8 => DType::Utf8(nullable.into()),
186 _ => vortex_panic!("Invalid data type for ByteArray: {}", T::DATA_TYPE),
187 };
188 VarBinArray::try_new(
189 value.offsets().clone().into_array(),
190 ByteBuffer::from_arrow_buffer(value.values().clone(), Alignment::of::<u8>()),
191 dtype,
192 nulls(value.nulls(), nullable),
193 )
194 .vortex_expect("Failed to convert Arrow GenericByteArray to Vortex VarBinArray")
195 .into_array()
196 }
197}
198
199impl<T: ByteViewType> FromArrowArray<&GenericByteViewArray<T>> for ArrayRef {
200 fn from_arrow(value: &GenericByteViewArray<T>, nullable: bool) -> Self {
201 let dtype = match T::DATA_TYPE {
202 DataType::BinaryView => DType::Binary(nullable.into()),
203 DataType::Utf8View => DType::Utf8(nullable.into()),
204 _ => vortex_panic!("Invalid data type for ByteViewArray: {}", T::DATA_TYPE),
205 };
206
207 let views_buffer = Buffer::from_byte_buffer(
208 Buffer::from_arrow_scalar_buffer(value.views().clone()).into_byte_buffer(),
209 );
210
211 VarBinViewArray::try_new(
212 views_buffer,
213 value
214 .data_buffers()
215 .iter()
216 .map(|b| ByteBuffer::from_arrow_buffer(b.clone(), Alignment::of::<u8>()))
217 .collect::<Vec<_>>(),
218 dtype,
219 nulls(value.nulls(), nullable),
220 )
221 .vortex_expect("Failed to convert Arrow GenericByteViewArray to Vortex VarBinViewArray")
222 .into_array()
223 }
224}
225
226impl FromArrowArray<&ArrowBooleanArray> for ArrayRef {
227 fn from_arrow(value: &ArrowBooleanArray, nullable: bool) -> Self {
228 BoolArray::new(value.values().clone(), nulls(value.nulls(), nullable)).into_array()
229 }
230}
231
232impl FromArrowArray<&ArrowStructArray> for ArrayRef {
233 fn from_arrow(value: &ArrowStructArray, nullable: bool) -> Self {
234 StructArray::try_new(
235 value.column_names().iter().map(|s| (*s).into()).collect(),
236 value
237 .columns()
238 .iter()
239 .zip(value.fields())
240 .map(|(c, field)| Self::from_arrow(c.clone(), field.is_nullable()))
241 .collect(),
242 value.len(),
243 nulls(value.nulls(), nullable),
244 )
245 .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
246 .into_array()
247 }
248}
249
250impl<O: OffsetSizeTrait + NativePType> FromArrowArray<&GenericListArray<O>> for ArrayRef {
251 fn from_arrow(value: &GenericListArray<O>, nullable: bool) -> Self {
252 let elem_nullable = match value.data_type() {
254 DataType::List(field) => field.is_nullable(),
255 DataType::LargeList(field) => field.is_nullable(),
256 dt => vortex_panic!("Invalid data type for ListArray: {dt}"),
257 };
258 ListArray::try_new(
259 Self::from_arrow(value.values().clone(), elem_nullable),
260 value.offsets().clone().into_array(),
262 nulls(value.nulls(), nullable),
263 )
264 .vortex_expect("Failed to convert Arrow StructArray to Vortex StructArray")
265 .into_array()
266 }
267}
268
269impl FromArrowArray<&ArrowNullArray> for ArrayRef {
270 fn from_arrow(value: &ArrowNullArray, nullable: bool) -> Self {
271 assert!(nullable);
272 NullArray::new(value.len()).into_array()
273 }
274}
275
276fn nulls(nulls: Option<&NullBuffer>, nullable: bool) -> Validity {
277 if nullable {
278 nulls
279 .map(|nulls| {
280 if nulls.null_count() == nulls.len() {
281 Validity::AllInvalid
282 } else {
283 Validity::from(nulls.inner().clone())
284 }
285 })
286 .unwrap_or_else(|| Validity::AllValid)
287 } else {
288 assert!(nulls.map(|x| x.null_count() == 0).unwrap_or(true));
289 Validity::NonNullable
290 }
291}
292
293impl FromArrowArray<ArrowArrayRef> for ArrayRef {
294 fn from_arrow(array: ArrowArrayRef, nullable: bool) -> Self {
295 match array.data_type() {
296 DataType::Boolean => Self::from_arrow(array.as_boolean(), nullable),
297 DataType::UInt8 => Self::from_arrow(array.as_primitive::<UInt8Type>(), nullable),
298 DataType::UInt16 => Self::from_arrow(array.as_primitive::<UInt16Type>(), nullable),
299 DataType::UInt32 => Self::from_arrow(array.as_primitive::<UInt32Type>(), nullable),
300 DataType::UInt64 => Self::from_arrow(array.as_primitive::<UInt64Type>(), nullable),
301 DataType::Int8 => Self::from_arrow(array.as_primitive::<Int8Type>(), nullable),
302 DataType::Int16 => Self::from_arrow(array.as_primitive::<Int16Type>(), nullable),
303 DataType::Int32 => Self::from_arrow(array.as_primitive::<Int32Type>(), nullable),
304 DataType::Int64 => Self::from_arrow(array.as_primitive::<Int64Type>(), nullable),
305 DataType::Float16 => Self::from_arrow(array.as_primitive::<Float16Type>(), nullable),
306 DataType::Float32 => Self::from_arrow(array.as_primitive::<Float32Type>(), nullable),
307 DataType::Float64 => Self::from_arrow(array.as_primitive::<Float64Type>(), nullable),
308 DataType::Utf8 => Self::from_arrow(array.as_string::<i32>(), nullable),
309 DataType::LargeUtf8 => Self::from_arrow(array.as_string::<i64>(), nullable),
310 DataType::Binary => Self::from_arrow(array.as_binary::<i32>(), nullable),
311 DataType::LargeBinary => Self::from_arrow(array.as_binary::<i64>(), nullable),
312 DataType::BinaryView => Self::from_arrow(
313 array
314 .as_any()
315 .downcast_ref::<BinaryViewArray>()
316 .vortex_expect("Expected Arrow BinaryViewArray for DataType::BinaryView"),
317 nullable,
318 ),
319 DataType::Utf8View => Self::from_arrow(
320 array
321 .as_any()
322 .downcast_ref::<StringViewArray>()
323 .vortex_expect("Expected Arrow StringViewArray for DataType::Utf8View"),
324 nullable,
325 ),
326 DataType::Struct(_) => Self::from_arrow(array.as_struct(), nullable),
327 DataType::List(_) => Self::from_arrow(array.as_list::<i32>(), nullable),
328 DataType::LargeList(_) => Self::from_arrow(array.as_list::<i64>(), nullable),
329 DataType::Null => Self::from_arrow(as_null_array(&array), nullable),
330 DataType::Timestamp(u, _) => match u {
331 ArrowTimeUnit::Second => {
332 Self::from_arrow(array.as_primitive::<TimestampSecondType>(), nullable)
333 }
334 ArrowTimeUnit::Millisecond => {
335 Self::from_arrow(array.as_primitive::<TimestampMillisecondType>(), nullable)
336 }
337 ArrowTimeUnit::Microsecond => {
338 Self::from_arrow(array.as_primitive::<TimestampMicrosecondType>(), nullable)
339 }
340 ArrowTimeUnit::Nanosecond => {
341 Self::from_arrow(array.as_primitive::<TimestampNanosecondType>(), nullable)
342 }
343 },
344 DataType::Date32 => Self::from_arrow(array.as_primitive::<Date32Type>(), nullable),
345 DataType::Date64 => Self::from_arrow(array.as_primitive::<Date64Type>(), nullable),
346 DataType::Time32(u) => match u {
347 ArrowTimeUnit::Second => {
348 Self::from_arrow(array.as_primitive::<Time32SecondType>(), nullable)
349 }
350 ArrowTimeUnit::Millisecond => {
351 Self::from_arrow(array.as_primitive::<Time32MillisecondType>(), nullable)
352 }
353 _ => unreachable!(),
354 },
355 DataType::Time64(u) => match u {
356 ArrowTimeUnit::Microsecond => {
357 Self::from_arrow(array.as_primitive::<Time64MicrosecondType>(), nullable)
358 }
359 ArrowTimeUnit::Nanosecond => {
360 Self::from_arrow(array.as_primitive::<Time64NanosecondType>(), nullable)
361 }
362 _ => unreachable!(),
363 },
364 DataType::Decimal128(..) => {
365 Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
366 }
367 DataType::Decimal256(..) => {
368 Self::from_arrow(array.as_primitive::<Decimal128Type>(), nullable)
369 }
370 _ => vortex_panic!(
371 "Array encoding not implemented for Arrow data type {}",
372 array.data_type().clone()
373 ),
374 }
375 }
376}