vortex_array/arrow/executor/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod bool;
5mod byte;
6mod byte_view;
7mod decimal;
8mod dictionary;
9mod fixed_size_list;
10mod list;
11mod list_view;
12mod null;
13mod primitive;
14mod run_end;
15mod struct_;
16mod temporal;
17mod validity;
18
19use arrow_array::ArrayRef as ArrowArrayRef;
20use arrow_array::RecordBatch;
21use arrow_array::cast::AsArray;
22use arrow_array::types::*;
23use arrow_schema::DataType;
24use arrow_schema::Schema;
25use itertools::Itertools;
26use vortex_error::VortexResult;
27use vortex_error::vortex_bail;
28use vortex_error::vortex_ensure;
29use vortex_session::VortexSession;
30
31use crate::Array;
32use crate::ArrayRef;
33use crate::arrow::executor::bool::to_arrow_bool;
34use crate::arrow::executor::byte::to_arrow_byte_array;
35use crate::arrow::executor::byte_view::to_arrow_byte_view;
36use crate::arrow::executor::decimal::to_arrow_decimal;
37use crate::arrow::executor::dictionary::to_arrow_dictionary;
38use crate::arrow::executor::fixed_size_list::to_arrow_fixed_list;
39use crate::arrow::executor::list::to_arrow_list;
40use crate::arrow::executor::list_view::to_arrow_list_view;
41use crate::arrow::executor::null::to_arrow_null;
42use crate::arrow::executor::primitive::to_arrow_primitive;
43use crate::arrow::executor::run_end::to_arrow_run_end;
44use crate::arrow::executor::struct_::to_arrow_struct;
45use crate::arrow::executor::temporal::to_arrow_temporal;
46
47/// Trait for executing a Vortex array to produce an Arrow array.
48pub trait ArrowArrayExecutor: Sized {
49    /// Execute the array to produce an Arrow array.
50    ///
51    /// If a [`DataType`] is given, the array will be converted to the desired Arrow type.
52    fn execute_arrow(
53        self,
54        data_type: &DataType,
55        session: &VortexSession,
56    ) -> VortexResult<ArrowArrayRef>;
57
58    /// Execute the array to produce an Arrow `RecordBatch` with the given schema.
59    fn execute_record_batch(
60        self,
61        schema: &Schema,
62        session: &VortexSession,
63    ) -> VortexResult<RecordBatch> {
64        let array = self.execute_arrow(&DataType::Struct(schema.fields.clone()), session)?;
65        Ok(RecordBatch::from(array.as_struct()))
66    }
67
68    /// Execute the array to produce Arrow `RecordBatch`'s with the given schema.
69    fn execute_record_batches(
70        self,
71        schema: &Schema,
72        session: &VortexSession,
73    ) -> VortexResult<Vec<RecordBatch>>;
74}
75
76impl ArrowArrayExecutor for ArrayRef {
77    fn execute_arrow(
78        self,
79        data_type: &DataType,
80        session: &VortexSession,
81    ) -> VortexResult<ArrowArrayRef> {
82        let len = self.len();
83
84        let arrow = match data_type {
85            DataType::Null => to_arrow_null(self, session),
86            DataType::Boolean => to_arrow_bool(self, session),
87            DataType::Int8 => to_arrow_primitive::<Int8Type>(self, session),
88            DataType::Int16 => to_arrow_primitive::<Int16Type>(self, session),
89            DataType::Int32 => to_arrow_primitive::<Int32Type>(self, session),
90            DataType::Int64 => to_arrow_primitive::<Int64Type>(self, session),
91            DataType::UInt8 => to_arrow_primitive::<UInt8Type>(self, session),
92            DataType::UInt16 => to_arrow_primitive::<UInt16Type>(self, session),
93            DataType::UInt32 => to_arrow_primitive::<UInt32Type>(self, session),
94            DataType::UInt64 => to_arrow_primitive::<UInt64Type>(self, session),
95            DataType::Float16 => to_arrow_primitive::<Float16Type>(self, session),
96            DataType::Float32 => to_arrow_primitive::<Float32Type>(self, session),
97            DataType::Float64 => to_arrow_primitive::<Float64Type>(self, session),
98            DataType::Timestamp(..)
99            | DataType::Date32
100            | DataType::Date64
101            | DataType::Time32(_)
102            | DataType::Time64(_) => to_arrow_temporal(self, data_type, session),
103            DataType::Binary => to_arrow_byte_array::<BinaryType>(self, session),
104            DataType::LargeBinary => to_arrow_byte_array::<LargeBinaryType>(self, session),
105            DataType::Utf8 => to_arrow_byte_array::<Utf8Type>(self, session),
106            DataType::LargeUtf8 => to_arrow_byte_array::<LargeUtf8Type>(self, session),
107            DataType::BinaryView => to_arrow_byte_view::<BinaryViewType>(self, session),
108            DataType::Utf8View => to_arrow_byte_view::<StringViewType>(self, session),
109            DataType::List(elements_field) => to_arrow_list::<i32>(self, elements_field, session),
110            DataType::LargeList(elements_field) => {
111                to_arrow_list::<i64>(self, elements_field, session)
112            }
113            DataType::FixedSizeList(elements_field, list_size) => {
114                to_arrow_fixed_list(self, *list_size, elements_field, session)
115            }
116            DataType::ListView(elements_field) => {
117                to_arrow_list_view::<i32>(self, elements_field, session)
118            }
119            DataType::LargeListView(elements_field) => {
120                to_arrow_list_view::<i64>(self, elements_field, session)
121            }
122            DataType::Struct(fields) => to_arrow_struct(self, fields, session),
123            DataType::Dictionary(codes_type, values_type) => {
124                to_arrow_dictionary(self, codes_type, values_type, session)
125            }
126            DataType::Decimal32(p, s) => {
127                to_arrow_decimal::<Decimal32Type, i32>(self, *p, *s, session)
128            }
129            DataType::Decimal64(p, s) => {
130                to_arrow_decimal::<Decimal64Type, i64>(self, *p, *s, session)
131            }
132            DataType::Decimal128(p, s) => {
133                to_arrow_decimal::<Decimal128Type, i128>(self, *p, *s, session)
134            }
135            DataType::Decimal256(p, s) => {
136                to_arrow_decimal::<Decimal256Type, vortex_dtype::i256>(self, *p, *s, session)
137            }
138            DataType::RunEndEncoded(ends_type, values_type) => {
139                to_arrow_run_end(self, ends_type.data_type(), values_type, session)
140            }
141            DataType::FixedSizeBinary(_)
142            | DataType::Map(..)
143            | DataType::Duration(_)
144            | DataType::Interval(_)
145            | DataType::Union(..) => {
146                vortex_bail!("Conversion to Arrow type {data_type} is not supported");
147            }
148        }?;
149
150        vortex_ensure!(
151            arrow.len() == len,
152            "Arrow array length does not match Vortex array length after conversion to {:?}",
153            arrow
154        );
155
156        Ok(arrow)
157    }
158
159    fn execute_record_batches(
160        self,
161        schema: &Schema,
162        session: &VortexSession,
163    ) -> VortexResult<Vec<RecordBatch>> {
164        self.to_array_iterator()
165            .map(|a| a?.execute_record_batch(schema, session))
166            .try_collect()
167    }
168}