vortex_array/arrow/compute/to_arrow/
mod.rs1mod canonical;
5mod list;
6mod temporal;
7mod varbin;
8
9use std::any::Any;
10use std::sync::LazyLock;
11
12use arcref::ArcRef;
13use arrow_array::ArrayRef as ArrowArrayRef;
14use arrow_schema::DataType;
15use vortex_dtype::DType;
16use vortex_dtype::arrow::FromArrowType;
17use vortex_error::VortexError;
18use vortex_error::VortexExpect;
19use vortex_error::VortexResult;
20use vortex_error::vortex_bail;
21use vortex_error::vortex_err;
22
23use crate::Array;
24use crate::LEGACY_SESSION;
25use crate::USE_VORTEX_OPERATORS;
26use crate::arrow::ArrowArrayExecutor;
27use crate::arrow::array::ArrowArray;
28use crate::arrow::array::ArrowVTable;
29use crate::compute::ComputeFn;
30use crate::compute::ComputeFnVTable;
31use crate::compute::InvocationArgs;
32use crate::compute::Kernel;
33use crate::compute::Options;
34use crate::compute::Output;
35use crate::vtable::VTable;
36
37static TO_ARROW_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
38 let compute = ComputeFn::new("to_arrow".into(), ArcRef::new_ref(&ToArrow));
39
40 compute.register_kernel(ArcRef::new_ref(&canonical::ToArrowCanonical));
42 compute.register_kernel(ArcRef::new_ref(&temporal::ToArrowTemporal));
43
44 for kernel in inventory::iter::<ToArrowKernelRef> {
45 compute.register_kernel(kernel.0.clone());
46 }
47 compute
48});
49
50pub(crate) fn warm_up_vtable() -> usize {
51 TO_ARROW_FN.kernels().len()
52}
53
54pub fn to_arrow_preferred(array: &dyn Array) -> VortexResult<ArrowArrayRef> {
62 to_arrow_opts(array, &ToArrowOptions { arrow_type: None })
63}
64
65pub fn to_arrow(array: &dyn Array, arrow_type: &DataType) -> VortexResult<ArrowArrayRef> {
67 to_arrow_opts(
68 array,
69 &ToArrowOptions {
70 arrow_type: Some(arrow_type.clone()),
71 },
72 )
73}
74
75pub fn to_arrow_opts(array: &dyn Array, options: &ToArrowOptions) -> VortexResult<ArrowArrayRef> {
76 let arrow = TO_ARROW_FN
77 .invoke(&InvocationArgs {
78 inputs: &[array.into()],
79 options,
80 })?
81 .unwrap_array()?
82 .as_opt::<ArrowVTable>()
83 .ok_or_else(|| vortex_err!("ToArrow compute kernels must return a Vortex ArrowArray"))?
84 .inner()
85 .clone();
86
87 if let Some(arrow_type) = &options.arrow_type
88 && arrow.data_type() != arrow_type
89 {
90 vortex_bail!(
91 "Arrow array type mismatch: expected {:?}, got {:?}",
92 &options.arrow_type,
93 arrow.data_type()
94 );
95 }
96
97 Ok(arrow)
98}
99
100pub struct ToArrowOptions {
101 pub arrow_type: Option<DataType>,
103}
104
105impl Options for ToArrowOptions {
106 fn as_any(&self) -> &dyn Any {
107 self
108 }
109}
110
111struct ToArrow;
112
113impl ComputeFnVTable for ToArrow {
114 fn invoke(
115 &self,
116 args: &InvocationArgs,
117 kernels: &[ArcRef<dyn Kernel>],
118 ) -> VortexResult<Output> {
119 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
120
121 for kernel in kernels {
122 if let Some(output) = kernel.invoke(args)? {
123 return Ok(output);
124 }
125 }
126 if let Some(output) = array.invoke(&TO_ARROW_FN, args)? {
127 return Ok(output);
128 }
129
130 if !array.is_canonical() {
131 let arrow_array = if *USE_VORTEX_OPERATORS {
132 let arrow_type = arrow_type
133 .cloned()
134 .map(Ok)
135 .unwrap_or_else(|| array.dtype().to_arrow_dtype())?;
136 array
137 .to_array()
138 .execute_arrow(&arrow_type, &LEGACY_SESSION)?
139 } else {
140 let canonical_array = array.to_canonical();
142 to_arrow_opts(
143 canonical_array.as_ref(),
144 &ToArrowOptions {
145 arrow_type: arrow_type.cloned(),
146 },
147 )?
148 };
149
150 return Ok(ArrowArray::new(arrow_array, array.dtype().nullability())
151 .to_array()
152 .into());
153 }
154
155 vortex_bail!(
156 "Failed to convert array {} to Arrow {:?}",
157 array.encoding_id(),
158 arrow_type
159 );
160 }
161
162 fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
163 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
164 Ok(arrow_type
165 .map(|arrow_type| DType::from_arrow((arrow_type, array.dtype().nullability())))
166 .unwrap_or_else(|| array.dtype().clone()))
167 }
168
169 fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
170 let ToArrowArgs { array, .. } = ToArrowArgs::try_from(args)?;
171 Ok(array.len())
172 }
173
174 fn is_elementwise(&self) -> bool {
175 false
176 }
177}
178
179pub struct ToArrowArgs<'a> {
180 array: &'a dyn Array,
181 arrow_type: Option<&'a DataType>,
182}
183
184impl<'a> TryFrom<&InvocationArgs<'a>> for ToArrowArgs<'a> {
185 type Error = VortexError;
186
187 fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
188 if value.inputs.len() != 1 {
189 vortex_bail!("Expected 1 input, found {}", value.inputs.len());
190 }
191 let array = value.inputs[0]
192 .array()
193 .ok_or_else(|| vortex_err!("Expected input 0 to be an array"))?;
194 let options = value
195 .options
196 .as_any()
197 .downcast_ref::<ToArrowOptions>()
198 .vortex_expect("Expected options to be ToArrowOptions");
199
200 Ok(ToArrowArgs {
201 array,
202 arrow_type: options.arrow_type.as_ref(),
203 })
204 }
205}
206
207pub struct ToArrowKernelRef(pub ArcRef<dyn Kernel>);
208inventory::collect!(ToArrowKernelRef);
209
210pub trait ToArrowKernel: VTable {
211 fn to_arrow(
212 &self,
213 arr: &Self::Array,
214 arrow_type: Option<&DataType>,
215 ) -> VortexResult<Option<ArrowArrayRef>>;
216}
217
218#[derive(Debug)]
219pub struct ToArrowKernelAdapter<V: VTable>(pub V);
220
221impl<V: VTable + ToArrowKernel> ToArrowKernelAdapter<V> {
222 pub const fn lift(&'static self) -> ToArrowKernelRef {
223 ToArrowKernelRef(ArcRef::new_ref(self))
224 }
225}
226
227impl<V: VTable + ToArrowKernel> Kernel for ToArrowKernelAdapter<V> {
228 fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
229 let inputs = ToArrowArgs::try_from(args)?;
230 let Some(array) = inputs.array.as_opt::<V>() else {
231 return Ok(None);
232 };
233
234 let Some(arrow_array) = V::to_arrow(&self.0, array, inputs.arrow_type)? else {
235 return Ok(None);
236 };
237
238 Ok(Some(
239 ArrowArray::new(arrow_array, array.dtype().nullability())
240 .to_array()
241 .into(),
242 ))
243 }
244}
245
246#[cfg(test)]
247mod tests {
248 use std::sync::Arc;
249
250 use arrow_array::ArrayRef;
251 use arrow_array::PrimitiveArray;
252 use arrow_array::StringViewArray;
253 use arrow_array::StructArray;
254 use arrow_array::types::Int32Type;
255 use arrow_buffer::NullBuffer;
256
257 use super::to_arrow;
258 use crate::IntoArray;
259 use crate::arrays;
260
261 #[test]
262 fn test_to_arrow() {
263 let array = arrays::StructArray::from_fields(
264 vec![
265 (
266 "a",
267 arrays::PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)])
268 .into_array(),
269 ),
270 (
271 "b",
272 arrays::VarBinViewArray::from_iter_str(vec!["a", "b", "c"]).into_array(),
273 ),
274 ]
275 .as_slice(),
276 )
277 .unwrap();
278
279 let arrow_array: ArrayRef = Arc::new(
280 StructArray::try_from(vec![
281 (
282 "a",
283 Arc::new(PrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
284 vec![1, 0, 2],
285 Some(NullBuffer::from(vec![true, false, true])),
286 )) as ArrayRef,
287 ),
288 (
289 "b",
290 Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
291 ),
292 ])
293 .unwrap(),
294 );
295
296 assert_eq!(
297 &to_arrow(array.as_ref(), &array.dtype().to_arrow_dtype().unwrap()).unwrap(),
298 &arrow_array
299 );
300 }
301}