vortex_array/arrow/executor/
mod.rs1pub mod bool;
5mod byte;
6pub mod byte_view;
7mod decimal;
8mod dictionary;
9mod fixed_size_list;
10mod list;
11mod list_view;
12pub mod null;
13pub mod primitive;
14mod run_end;
15mod struct_;
16mod temporal;
17mod validity;
18
19use arrow_array::ArrayRef as ArrowArrayRef;
20use arrow_array::RecordBatch;
21use arrow_array::cast::AsArray;
22use arrow_array::types::*;
23use arrow_schema::DataType;
24use arrow_schema::Field;
25use arrow_schema::FieldRef;
26use arrow_schema::Schema;
27use itertools::Itertools;
28use vortex_error::VortexResult;
29use vortex_error::vortex_bail;
30use vortex_error::vortex_ensure;
31
32use crate::ArrayRef;
33use crate::arrays::List;
34use crate::arrays::VarBin;
35use crate::arrays::list::ListArrayExt;
36use crate::arrays::varbin::VarBinArrayExt;
37use crate::arrow::executor::bool::to_arrow_bool;
38use crate::arrow::executor::byte::to_arrow_byte_array;
39use crate::arrow::executor::byte_view::to_arrow_byte_view;
40use crate::arrow::executor::decimal::to_arrow_decimal;
41use crate::arrow::executor::dictionary::to_arrow_dictionary;
42use crate::arrow::executor::fixed_size_list::to_arrow_fixed_list;
43use crate::arrow::executor::list::to_arrow_list;
44use crate::arrow::executor::list_view::to_arrow_list_view;
45use crate::arrow::executor::null::to_arrow_null;
46use crate::arrow::executor::primitive::to_arrow_primitive;
47use crate::arrow::executor::run_end::to_arrow_run_end;
48use crate::arrow::executor::struct_::to_arrow_struct;
49use crate::arrow::executor::temporal::to_arrow_temporal;
50use crate::dtype::DType;
51use crate::dtype::PType;
52use crate::executor::ExecutionCtx;
53
54pub trait ArrowArrayExecutor: Sized {
56 fn execute_arrow(
61 self,
62 data_type: Option<&DataType>,
63 ctx: &mut ExecutionCtx,
64 ) -> VortexResult<ArrowArrayRef>;
65
66 fn execute_record_batch(
68 self,
69 schema: &Schema,
70 ctx: &mut ExecutionCtx,
71 ) -> VortexResult<RecordBatch> {
72 let array = self.execute_arrow(Some(&DataType::Struct(schema.fields.clone())), ctx)?;
73 Ok(RecordBatch::from(array.as_struct()))
74 }
75
76 fn execute_record_batches(
78 self,
79 schema: &Schema,
80 ctx: &mut ExecutionCtx,
81 ) -> VortexResult<Vec<RecordBatch>>;
82}
83
84impl ArrowArrayExecutor for ArrayRef {
85 fn execute_arrow(
86 self,
87 data_type: Option<&DataType>,
88 ctx: &mut ExecutionCtx,
89 ) -> VortexResult<ArrowArrayRef> {
90 let len = self.len();
91
92 let resolved_type: DataType = match data_type {
95 Some(dt) => dt.clone(),
96 None => preferred_arrow_type(&self)?,
97 };
98
99 let arrow = match &resolved_type {
100 DataType::Null => to_arrow_null(self, ctx),
101 DataType::Boolean => to_arrow_bool(self, ctx),
102 DataType::Int8 => to_arrow_primitive::<Int8Type>(self, ctx),
103 DataType::Int16 => to_arrow_primitive::<Int16Type>(self, ctx),
104 DataType::Int32 => to_arrow_primitive::<Int32Type>(self, ctx),
105 DataType::Int64 => to_arrow_primitive::<Int64Type>(self, ctx),
106 DataType::UInt8 => to_arrow_primitive::<UInt8Type>(self, ctx),
107 DataType::UInt16 => to_arrow_primitive::<UInt16Type>(self, ctx),
108 DataType::UInt32 => to_arrow_primitive::<UInt32Type>(self, ctx),
109 DataType::UInt64 => to_arrow_primitive::<UInt64Type>(self, ctx),
110 DataType::Float16 => to_arrow_primitive::<Float16Type>(self, ctx),
111 DataType::Float32 => to_arrow_primitive::<Float32Type>(self, ctx),
112 DataType::Float64 => to_arrow_primitive::<Float64Type>(self, ctx),
113 DataType::Timestamp(..)
114 | DataType::Date32
115 | DataType::Date64
116 | DataType::Time32(_)
117 | DataType::Time64(_) => to_arrow_temporal(self, &resolved_type, ctx),
118 DataType::Binary => to_arrow_byte_array::<BinaryType>(self, ctx),
119 DataType::LargeBinary => to_arrow_byte_array::<LargeBinaryType>(self, ctx),
120 DataType::Utf8 => to_arrow_byte_array::<Utf8Type>(self, ctx),
121 DataType::LargeUtf8 => to_arrow_byte_array::<LargeUtf8Type>(self, ctx),
122 DataType::BinaryView => to_arrow_byte_view::<BinaryViewType>(self, ctx),
123 DataType::Utf8View => to_arrow_byte_view::<StringViewType>(self, ctx),
124 DataType::List(elements_field) => to_arrow_list::<i32>(self, elements_field, ctx),
126 DataType::LargeList(elements_field) => to_arrow_list::<i64>(self, elements_field, ctx),
128 DataType::FixedSizeList(elements_field, list_size) => {
130 to_arrow_fixed_list(self, *list_size, elements_field, ctx)
131 }
132 DataType::ListView(elements_field) => {
134 to_arrow_list_view::<i32>(self, elements_field, ctx)
135 }
136 DataType::LargeListView(elements_field) => {
138 to_arrow_list_view::<i64>(self, elements_field, ctx)
139 }
140 DataType::Struct(fields) => {
141 let fields = if data_type.is_none() {
142 None
143 } else {
144 Some(fields)
145 };
146 to_arrow_struct(self, fields, ctx)
147 }
148 DataType::Dictionary(codes_type, values_type) => {
150 to_arrow_dictionary(self, codes_type, values_type, ctx)
151 }
152 dt @ DataType::Decimal32(..) => to_arrow_decimal(self, dt, ctx),
153 dt @ DataType::Decimal64(..) => to_arrow_decimal(self, dt, ctx),
154 dt @ DataType::Decimal128(..) => to_arrow_decimal(self, dt, ctx),
155 dt @ DataType::Decimal256(..) => to_arrow_decimal(self, dt, ctx),
156 DataType::RunEndEncoded(ends_type, values_type) => {
158 to_arrow_run_end(self, ends_type.data_type(), values_type, ctx)
159 }
160 DataType::FixedSizeBinary(_)
161 | DataType::Map(..)
162 | DataType::Duration(_)
163 | DataType::Interval(_)
164 | DataType::Union(..) => {
165 vortex_bail!("Conversion to Arrow type {resolved_type} is not supported");
166 }
167 }?;
168
169 vortex_ensure!(
170 arrow.len() == len,
171 "Arrow array length does not match Vortex array length after conversion to {:?}",
172 arrow
173 );
174
175 Ok(arrow)
176 }
177
178 fn execute_record_batches(
179 self,
180 schema: &Schema,
181 ctx: &mut ExecutionCtx,
182 ) -> VortexResult<Vec<RecordBatch>> {
183 self.to_array_iterator()
184 .map(|a| a?.execute_record_batch(schema, ctx))
185 .try_collect()
186 }
187}
188
189fn preferred_arrow_type(array: &ArrayRef) -> VortexResult<DataType> {
196 if let Some(varbin) = array.as_opt::<VarBin>() {
198 let offsets_ptype = PType::try_from(varbin.offsets().dtype())?;
199 let use_large = matches!(offsets_ptype, PType::I64 | PType::U64);
200
201 return Ok(match (varbin.dtype(), use_large) {
202 (DType::Utf8(_), false) => DataType::Utf8,
203 (DType::Utf8(_), true) => DataType::LargeUtf8,
204 (DType::Binary(_), false) => DataType::Binary,
205 (DType::Binary(_), true) => DataType::LargeBinary,
206 _ => unreachable!("VarBinArray must have Utf8 or Binary dtype"),
207 });
208 }
209
210 if let Some(list) = array.as_opt::<List>() {
212 let offsets_ptype = PType::try_from(list.offsets().dtype())?;
213 let use_large = matches!(offsets_ptype, PType::I64 | PType::U64);
214 let elem_dtype = preferred_arrow_type(list.elements())?;
216 let field = FieldRef::new(Field::new_list_field(
217 elem_dtype,
218 list.elements().dtype().is_nullable(),
219 ));
220
221 return Ok(if use_large {
222 DataType::LargeList(field)
223 } else {
224 DataType::List(field)
225 });
226 }
227
228 array.dtype().to_arrow_dtype()
230}