vortex_array/arrow/compute/to_arrow/
mod.rs1mod canonical;
2mod temporal;
3mod varbin;
4
5use std::any::Any;
6use std::sync::LazyLock;
7
8use arcref::ArcRef;
9use arrow_array::ArrayRef as ArrowArrayRef;
10use arrow_schema::DataType;
11use vortex_dtype::DType;
12use vortex_dtype::arrow::FromArrowType;
13use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
14
15use crate::Array;
16use crate::arrow::array::{ArrowArray, ArrowVTable};
17use crate::compute::{ComputeFn, ComputeFnVTable, InvocationArgs, Kernel, Options, Output};
18use crate::vtable::VTable;
19
20pub fn to_arrow_preferred(array: &dyn Array) -> VortexResult<ArrowArrayRef> {
28 to_arrow_opts(array, &ToArrowOptions { arrow_type: None })
29}
30
31pub fn to_arrow(array: &dyn Array, arrow_type: &DataType) -> VortexResult<ArrowArrayRef> {
33 to_arrow_opts(
34 array,
35 &ToArrowOptions {
36 arrow_type: Some(arrow_type.clone()),
37 },
38 )
39}
40
41pub fn to_arrow_opts(array: &dyn Array, options: &ToArrowOptions) -> VortexResult<ArrowArrayRef> {
42 let arrow = TO_ARROW_FN
43 .invoke(&InvocationArgs {
44 inputs: &[array.into()],
45 options,
46 })?
47 .unwrap_array()?
48 .as_opt::<ArrowVTable>()
49 .ok_or_else(|| vortex_err!("ToArrow compute kernels must return a Vortex ArrowArray"))?
50 .inner()
51 .clone();
52
53 if let Some(arrow_type) = &options.arrow_type {
54 if arrow.data_type() != arrow_type {
55 vortex_bail!(
56 "Arrow array type mismatch: expected {:?}, got {:?}",
57 &options.arrow_type,
58 arrow.data_type()
59 );
60 }
61 }
62
63 Ok(arrow)
64}
65
66pub struct ToArrowOptions {
67 pub arrow_type: Option<DataType>,
69}
70
71impl Options for ToArrowOptions {
72 fn as_any(&self) -> &dyn Any {
73 self
74 }
75}
76
77struct ToArrow;
78
79impl ComputeFnVTable for ToArrow {
80 fn invoke(
81 &self,
82 args: &InvocationArgs,
83 kernels: &[ArcRef<dyn Kernel>],
84 ) -> VortexResult<Output> {
85 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
86
87 for kernel in kernels {
88 if let Some(output) = kernel.invoke(args)? {
89 return Ok(output);
90 }
91 }
92 if let Some(output) = array.invoke(&TO_ARROW_FN, args)? {
93 return Ok(output);
94 }
95
96 if !array.is_canonical() {
98 let canonical_array = array.to_canonical()?;
99 let arrow_array = to_arrow_opts(
100 canonical_array.as_ref(),
101 &ToArrowOptions {
102 arrow_type: arrow_type.cloned(),
103 },
104 )?;
105 return Ok(ArrowArray::new(arrow_array, array.dtype().nullability())
106 .to_array()
107 .into());
108 }
109
110 vortex_bail!(
111 "Failed to convert array {} to Arrow {:?}",
112 array.encoding_id(),
113 arrow_type
114 );
115 }
116
117 fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
118 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
119 Ok(arrow_type
120 .map(|arrow_type| DType::from_arrow((arrow_type, array.dtype().nullability())))
121 .unwrap_or_else(|| array.dtype().clone()))
122 }
123
124 fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
125 let ToArrowArgs { array, .. } = ToArrowArgs::try_from(args)?;
126 Ok(array.len())
127 }
128
129 fn is_elementwise(&self) -> bool {
130 false
131 }
132}
133
134pub static TO_ARROW_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
135 let compute = ComputeFn::new("to_arrow".into(), ArcRef::new_ref(&ToArrow));
136
137 compute.register_kernel(ArcRef::new_ref(&canonical::ToArrowCanonical));
139 compute.register_kernel(ArcRef::new_ref(&temporal::ToArrowTemporal));
140
141 for kernel in inventory::iter::<ToArrowKernelRef> {
142 compute.register_kernel(kernel.0.clone());
143 }
144 compute
145});
146
147pub struct ToArrowArgs<'a> {
148 array: &'a dyn Array,
149 arrow_type: Option<&'a DataType>,
150}
151
152impl<'a> TryFrom<&InvocationArgs<'a>> for ToArrowArgs<'a> {
153 type Error = VortexError;
154
155 fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
156 if value.inputs.len() != 1 {
157 vortex_bail!("Expected 1 input, found {}", value.inputs.len());
158 }
159 let array = value.inputs[0]
160 .array()
161 .ok_or_else(|| vortex_err!("Expected input 0 to be an array"))?;
162 let options = value
163 .options
164 .as_any()
165 .downcast_ref::<ToArrowOptions>()
166 .vortex_expect("Expected options to be ToArrowOptions");
167
168 Ok(ToArrowArgs {
169 array,
170 arrow_type: options.arrow_type.as_ref(),
171 })
172 }
173}
174
175pub struct ToArrowKernelRef(pub ArcRef<dyn Kernel>);
176inventory::collect!(ToArrowKernelRef);
177
178pub trait ToArrowKernel: VTable {
179 fn to_arrow(
180 &self,
181 arr: &Self::Array,
182 arrow_type: Option<&DataType>,
183 ) -> VortexResult<Option<ArrowArrayRef>>;
184}
185
186#[derive(Debug)]
187pub struct ToArrowKernelAdapter<V: VTable>(pub V);
188
189impl<V: VTable + ToArrowKernel> ToArrowKernelAdapter<V> {
190 pub const fn lift(&'static self) -> ToArrowKernelRef {
191 ToArrowKernelRef(ArcRef::new_ref(self))
192 }
193}
194
195impl<V: VTable + ToArrowKernel> Kernel for ToArrowKernelAdapter<V> {
196 fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
197 let inputs = ToArrowArgs::try_from(args)?;
198 let Some(array) = inputs.array.as_opt::<V>() else {
199 return Ok(None);
200 };
201
202 let Some(arrow_array) = V::to_arrow(&self.0, array, inputs.arrow_type)? else {
203 return Ok(None);
204 };
205
206 Ok(Some(
207 ArrowArray::new(arrow_array, array.dtype().nullability())
208 .to_array()
209 .into(),
210 ))
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use std::sync::Arc;
217
218 use arrow_array::types::Int32Type;
219 use arrow_array::{ArrayRef, PrimitiveArray, StringViewArray, StructArray};
220 use arrow_buffer::NullBuffer;
221
222 use super::to_arrow;
223 use crate::{IntoArray, arrays};
224
225 #[test]
226 fn test_to_arrow() {
227 let array = arrays::StructArray::from_fields(
228 vec![
229 (
230 "a",
231 arrays::PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)])
232 .into_array(),
233 ),
234 (
235 "b",
236 arrays::VarBinViewArray::from_iter_str(vec!["a", "b", "c"]).into_array(),
237 ),
238 ]
239 .as_slice(),
240 )
241 .unwrap();
242
243 let arrow_array: ArrayRef = Arc::new(
244 StructArray::try_from(vec![
245 (
246 "a",
247 Arc::new(PrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
248 vec![1, 0, 2],
249 Some(NullBuffer::from(vec![true, false, true])),
250 )) as ArrayRef,
251 ),
252 (
253 "b",
254 Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
255 ),
256 ])
257 .unwrap(),
258 );
259
260 assert_eq!(
261 &to_arrow(array.as_ref(), &array.dtype().to_arrow_dtype().unwrap()).unwrap(),
262 &arrow_array
263 );
264 }
265}