vortex_array/arrow/compute/to_arrow/
mod.rs1mod canonical;
5mod temporal;
6mod varbin;
7
8use std::any::Any;
9use std::sync::LazyLock;
10
11use arcref::ArcRef;
12use arrow_array::ArrayRef as ArrowArrayRef;
13use arrow_schema::DataType;
14use vortex_dtype::DType;
15use vortex_dtype::arrow::FromArrowType;
16use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
17
18use crate::Array;
19use crate::arrow::array::{ArrowArray, ArrowVTable};
20use crate::compute::{ComputeFn, ComputeFnVTable, InvocationArgs, Kernel, Options, Output};
21use crate::vtable::VTable;
22
23static TO_ARROW_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
24 let compute = ComputeFn::new("to_arrow".into(), ArcRef::new_ref(&ToArrow));
25
26 compute.register_kernel(ArcRef::new_ref(&canonical::ToArrowCanonical));
28 compute.register_kernel(ArcRef::new_ref(&temporal::ToArrowTemporal));
29
30 for kernel in inventory::iter::<ToArrowKernelRef> {
31 compute.register_kernel(kernel.0.clone());
32 }
33 compute
34});
35
36pub fn to_arrow_preferred(array: &dyn Array) -> VortexResult<ArrowArrayRef> {
44 to_arrow_opts(array, &ToArrowOptions { arrow_type: None })
45}
46
47pub fn to_arrow(array: &dyn Array, arrow_type: &DataType) -> VortexResult<ArrowArrayRef> {
49 to_arrow_opts(
50 array,
51 &ToArrowOptions {
52 arrow_type: Some(arrow_type.clone()),
53 },
54 )
55}
56
57pub fn to_arrow_opts(array: &dyn Array, options: &ToArrowOptions) -> VortexResult<ArrowArrayRef> {
58 let arrow = TO_ARROW_FN
59 .invoke(&InvocationArgs {
60 inputs: &[array.into()],
61 options,
62 })?
63 .unwrap_array()?
64 .as_opt::<ArrowVTable>()
65 .ok_or_else(|| vortex_err!("ToArrow compute kernels must return a Vortex ArrowArray"))?
66 .inner()
67 .clone();
68
69 if let Some(arrow_type) = &options.arrow_type
70 && arrow.data_type() != arrow_type
71 {
72 vortex_bail!(
73 "Arrow array type mismatch: expected {:?}, got {:?}",
74 &options.arrow_type,
75 arrow.data_type()
76 );
77 }
78
79 Ok(arrow)
80}
81
82pub struct ToArrowOptions {
83 pub arrow_type: Option<DataType>,
85}
86
87impl Options for ToArrowOptions {
88 fn as_any(&self) -> &dyn Any {
89 self
90 }
91}
92
93struct ToArrow;
94
95impl ComputeFnVTable for ToArrow {
96 fn invoke(
97 &self,
98 args: &InvocationArgs,
99 kernels: &[ArcRef<dyn Kernel>],
100 ) -> VortexResult<Output> {
101 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
102
103 for kernel in kernels {
104 if let Some(output) = kernel.invoke(args)? {
105 return Ok(output);
106 }
107 }
108 if let Some(output) = array.invoke(&TO_ARROW_FN, args)? {
109 return Ok(output);
110 }
111
112 if !array.is_canonical() {
114 let canonical_array = array.to_canonical()?;
115 let arrow_array = to_arrow_opts(
116 canonical_array.as_ref(),
117 &ToArrowOptions {
118 arrow_type: arrow_type.cloned(),
119 },
120 )?;
121 return Ok(ArrowArray::new(arrow_array, array.dtype().nullability())
122 .to_array()
123 .into());
124 }
125
126 vortex_bail!(
127 "Failed to convert array {} to Arrow {:?}",
128 array.encoding_id(),
129 arrow_type
130 );
131 }
132
133 fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
134 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
135 Ok(arrow_type
136 .map(|arrow_type| DType::from_arrow((arrow_type, array.dtype().nullability())))
137 .unwrap_or_else(|| array.dtype().clone()))
138 }
139
140 fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
141 let ToArrowArgs { array, .. } = ToArrowArgs::try_from(args)?;
142 Ok(array.len())
143 }
144
145 fn is_elementwise(&self) -> bool {
146 false
147 }
148}
149
150pub struct ToArrowArgs<'a> {
151 array: &'a dyn Array,
152 arrow_type: Option<&'a DataType>,
153}
154
155impl<'a> TryFrom<&InvocationArgs<'a>> for ToArrowArgs<'a> {
156 type Error = VortexError;
157
158 fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
159 if value.inputs.len() != 1 {
160 vortex_bail!("Expected 1 input, found {}", value.inputs.len());
161 }
162 let array = value.inputs[0]
163 .array()
164 .ok_or_else(|| vortex_err!("Expected input 0 to be an array"))?;
165 let options = value
166 .options
167 .as_any()
168 .downcast_ref::<ToArrowOptions>()
169 .vortex_expect("Expected options to be ToArrowOptions");
170
171 Ok(ToArrowArgs {
172 array,
173 arrow_type: options.arrow_type.as_ref(),
174 })
175 }
176}
177
178pub struct ToArrowKernelRef(pub ArcRef<dyn Kernel>);
179inventory::collect!(ToArrowKernelRef);
180
181pub trait ToArrowKernel: VTable {
182 fn to_arrow(
183 &self,
184 arr: &Self::Array,
185 arrow_type: Option<&DataType>,
186 ) -> VortexResult<Option<ArrowArrayRef>>;
187}
188
189#[derive(Debug)]
190pub struct ToArrowKernelAdapter<V: VTable>(pub V);
191
192impl<V: VTable + ToArrowKernel> ToArrowKernelAdapter<V> {
193 pub const fn lift(&'static self) -> ToArrowKernelRef {
194 ToArrowKernelRef(ArcRef::new_ref(self))
195 }
196}
197
198impl<V: VTable + ToArrowKernel> Kernel for ToArrowKernelAdapter<V> {
199 fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
200 let inputs = ToArrowArgs::try_from(args)?;
201 let Some(array) = inputs.array.as_opt::<V>() else {
202 return Ok(None);
203 };
204
205 let Some(arrow_array) = V::to_arrow(&self.0, array, inputs.arrow_type)? else {
206 return Ok(None);
207 };
208
209 Ok(Some(
210 ArrowArray::new(arrow_array, array.dtype().nullability())
211 .to_array()
212 .into(),
213 ))
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use std::sync::Arc;
220
221 use arrow_array::types::Int32Type;
222 use arrow_array::{ArrayRef, PrimitiveArray, StringViewArray, StructArray};
223 use arrow_buffer::NullBuffer;
224
225 use super::to_arrow;
226 use crate::{IntoArray, arrays};
227
228 #[test]
229 fn test_to_arrow() {
230 let array = arrays::StructArray::from_fields(
231 vec![
232 (
233 "a",
234 arrays::PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)])
235 .into_array(),
236 ),
237 (
238 "b",
239 arrays::VarBinViewArray::from_iter_str(vec!["a", "b", "c"]).into_array(),
240 ),
241 ]
242 .as_slice(),
243 )
244 .unwrap();
245
246 let arrow_array: ArrayRef = Arc::new(
247 StructArray::try_from(vec![
248 (
249 "a",
250 Arc::new(PrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
251 vec![1, 0, 2],
252 Some(NullBuffer::from(vec![true, false, true])),
253 )) as ArrayRef,
254 ),
255 (
256 "b",
257 Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
258 ),
259 ])
260 .unwrap(),
261 );
262
263 assert_eq!(
264 &to_arrow(array.as_ref(), &array.dtype().to_arrow_dtype().unwrap()).unwrap(),
265 &arrow_array
266 );
267 }
268}