vortex_array/arrow/executor/
mod.rs1mod bool;
5mod byte;
6mod byte_view;
7mod decimal;
8mod dictionary;
9mod fixed_size_list;
10mod list;
11mod list_view;
12mod null;
13mod primitive;
14mod run_end;
15mod struct_;
16mod temporal;
17mod validity;
18
19use arrow_array::ArrayRef as ArrowArrayRef;
20use arrow_array::RecordBatch;
21use arrow_array::cast::AsArray;
22use arrow_array::types::*;
23use arrow_schema::DataType;
24use arrow_schema::Schema;
25use itertools::Itertools;
26use vortex_error::VortexResult;
27use vortex_error::vortex_bail;
28use vortex_error::vortex_ensure;
29use vortex_session::VortexSession;
30
31use crate::Array;
32use crate::ArrayRef;
33use crate::arrow::executor::bool::to_arrow_bool;
34use crate::arrow::executor::byte::to_arrow_byte_array;
35use crate::arrow::executor::byte_view::to_arrow_byte_view;
36use crate::arrow::executor::decimal::to_arrow_decimal;
37use crate::arrow::executor::dictionary::to_arrow_dictionary;
38use crate::arrow::executor::fixed_size_list::to_arrow_fixed_list;
39use crate::arrow::executor::list::to_arrow_list;
40use crate::arrow::executor::list_view::to_arrow_list_view;
41use crate::arrow::executor::null::to_arrow_null;
42use crate::arrow::executor::primitive::to_arrow_primitive;
43use crate::arrow::executor::run_end::to_arrow_run_end;
44use crate::arrow::executor::struct_::to_arrow_struct;
45use crate::arrow::executor::temporal::to_arrow_temporal;
46
47pub trait ArrowArrayExecutor: Sized {
49 fn execute_arrow(
53 self,
54 data_type: &DataType,
55 session: &VortexSession,
56 ) -> VortexResult<ArrowArrayRef>;
57
58 fn execute_record_batch(
60 self,
61 schema: &Schema,
62 session: &VortexSession,
63 ) -> VortexResult<RecordBatch> {
64 let array = self.execute_arrow(&DataType::Struct(schema.fields.clone()), session)?;
65 Ok(RecordBatch::from(array.as_struct()))
66 }
67
68 fn execute_record_batches(
70 self,
71 schema: &Schema,
72 session: &VortexSession,
73 ) -> VortexResult<Vec<RecordBatch>>;
74}
75
76impl ArrowArrayExecutor for ArrayRef {
77 fn execute_arrow(
78 self,
79 data_type: &DataType,
80 session: &VortexSession,
81 ) -> VortexResult<ArrowArrayRef> {
82 let len = self.len();
83
84 let arrow = match data_type {
85 DataType::Null => to_arrow_null(self, session),
86 DataType::Boolean => to_arrow_bool(self, session),
87 DataType::Int8 => to_arrow_primitive::<Int8Type>(self, session),
88 DataType::Int16 => to_arrow_primitive::<Int16Type>(self, session),
89 DataType::Int32 => to_arrow_primitive::<Int32Type>(self, session),
90 DataType::Int64 => to_arrow_primitive::<Int64Type>(self, session),
91 DataType::UInt8 => to_arrow_primitive::<UInt8Type>(self, session),
92 DataType::UInt16 => to_arrow_primitive::<UInt16Type>(self, session),
93 DataType::UInt32 => to_arrow_primitive::<UInt32Type>(self, session),
94 DataType::UInt64 => to_arrow_primitive::<UInt64Type>(self, session),
95 DataType::Float16 => to_arrow_primitive::<Float16Type>(self, session),
96 DataType::Float32 => to_arrow_primitive::<Float32Type>(self, session),
97 DataType::Float64 => to_arrow_primitive::<Float64Type>(self, session),
98 DataType::Timestamp(..)
99 | DataType::Date32
100 | DataType::Date64
101 | DataType::Time32(_)
102 | DataType::Time64(_) => to_arrow_temporal(self, data_type, session),
103 DataType::Binary => to_arrow_byte_array::<BinaryType>(self, session),
104 DataType::LargeBinary => to_arrow_byte_array::<LargeBinaryType>(self, session),
105 DataType::Utf8 => to_arrow_byte_array::<Utf8Type>(self, session),
106 DataType::LargeUtf8 => to_arrow_byte_array::<LargeUtf8Type>(self, session),
107 DataType::BinaryView => to_arrow_byte_view::<BinaryViewType>(self, session),
108 DataType::Utf8View => to_arrow_byte_view::<StringViewType>(self, session),
109 DataType::List(elements_field) => to_arrow_list::<i32>(self, elements_field, session),
110 DataType::LargeList(elements_field) => {
111 to_arrow_list::<i64>(self, elements_field, session)
112 }
113 DataType::FixedSizeList(elements_field, list_size) => {
114 to_arrow_fixed_list(self, *list_size, elements_field, session)
115 }
116 DataType::ListView(elements_field) => {
117 to_arrow_list_view::<i32>(self, elements_field, session)
118 }
119 DataType::LargeListView(elements_field) => {
120 to_arrow_list_view::<i64>(self, elements_field, session)
121 }
122 DataType::Struct(fields) => to_arrow_struct(self, fields, session),
123 DataType::Dictionary(codes_type, values_type) => {
124 to_arrow_dictionary(self, codes_type, values_type, session)
125 }
126 DataType::Decimal32(p, s) => {
127 to_arrow_decimal::<Decimal32Type, i32>(self, *p, *s, session)
128 }
129 DataType::Decimal64(p, s) => {
130 to_arrow_decimal::<Decimal64Type, i64>(self, *p, *s, session)
131 }
132 DataType::Decimal128(p, s) => {
133 to_arrow_decimal::<Decimal128Type, i128>(self, *p, *s, session)
134 }
135 DataType::Decimal256(p, s) => {
136 to_arrow_decimal::<Decimal256Type, vortex_dtype::i256>(self, *p, *s, session)
137 }
138 DataType::RunEndEncoded(ends_type, values_type) => {
139 to_arrow_run_end(self, ends_type.data_type(), values_type, session)
140 }
141 DataType::FixedSizeBinary(_)
142 | DataType::Map(..)
143 | DataType::Duration(_)
144 | DataType::Interval(_)
145 | DataType::Union(..) => {
146 vortex_bail!("Conversion to Arrow type {data_type} is not supported");
147 }
148 }?;
149
150 vortex_ensure!(
151 arrow.len() == len,
152 "Arrow array length does not match Vortex array length after conversion to {:?}",
153 arrow
154 );
155
156 Ok(arrow)
157 }
158
159 fn execute_record_batches(
160 self,
161 schema: &Schema,
162 session: &VortexSession,
163 ) -> VortexResult<Vec<RecordBatch>> {
164 self.to_array_iterator()
165 .map(|a| a?.execute_record_batch(schema, session))
166 .try_collect()
167 }
168}