vortex_array/arrow/compute/to_arrow/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod canonical;
5mod list;
6mod null_buffer;
7mod temporal;
8mod varbin;
9
10use std::any::Any;
11use std::sync::LazyLock;
12
13use arcref::ArcRef;
14use arrow_array::ArrayRef as ArrowArrayRef;
15use arrow_schema::DataType;
16use vortex_dtype::DType;
17use vortex_dtype::arrow::FromArrowType;
18use vortex_error::VortexError;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_error::vortex_bail;
22use vortex_error::vortex_err;
23
24use crate::Array;
25use crate::arrow::array::ArrowArray;
26use crate::arrow::array::ArrowVTable;
27use crate::compute::ComputeFn;
28use crate::compute::ComputeFnVTable;
29use crate::compute::InvocationArgs;
30use crate::compute::Kernel;
31use crate::compute::Options;
32use crate::compute::Output;
33use crate::vtable::VTable;
34
35static TO_ARROW_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
36    let compute = ComputeFn::new("to_arrow".into(), ArcRef::new_ref(&ToArrow));
37
38    // Register the kernels we ship ourselves
39    compute.register_kernel(ArcRef::new_ref(&canonical::ToArrowCanonical));
40    compute.register_kernel(ArcRef::new_ref(&temporal::ToArrowTemporal));
41
42    for kernel in inventory::iter::<ToArrowKernelRef> {
43        compute.register_kernel(kernel.0.clone());
44    }
45    compute
46});
47
48pub(crate) fn warm_up_vtable() -> usize {
49    TO_ARROW_FN.kernels().len()
50}
51
52/// Convert a Vortex array to an Arrow array with the encoding's preferred `DataType`.
53///
54/// For example, a `VarBinArray` will be converted to an Arrow `VarBin` array, instead of the
55/// canonical `VarBinViewArray`.
56///
57/// Warning: do not use this to convert a Vortex [`crate::stream::ArrayStream`] since each array
58/// may have a different preferred Arrow type. Use [`to_arrow`] instead.
59pub fn to_arrow_preferred(array: &dyn Array) -> VortexResult<ArrowArrayRef> {
60    to_arrow_opts(array, &ToArrowOptions { arrow_type: None })
61}
62
63/// Convert a Vortex array to an Arrow array of the given type.
64pub fn to_arrow(array: &dyn Array, arrow_type: &DataType) -> VortexResult<ArrowArrayRef> {
65    to_arrow_opts(
66        array,
67        &ToArrowOptions {
68            arrow_type: Some(arrow_type.clone()),
69        },
70    )
71}
72
73pub fn to_arrow_opts(array: &dyn Array, options: &ToArrowOptions) -> VortexResult<ArrowArrayRef> {
74    let arrow = TO_ARROW_FN
75        .invoke(&InvocationArgs {
76            inputs: &[array.into()],
77            options,
78        })?
79        .unwrap_array()?
80        .as_opt::<ArrowVTable>()
81        .ok_or_else(|| vortex_err!("ToArrow compute kernels must return a Vortex ArrowArray"))?
82        .inner()
83        .clone();
84
85    if let Some(arrow_type) = &options.arrow_type
86        && arrow.data_type() != arrow_type
87    {
88        vortex_bail!(
89            "Arrow array type mismatch: expected {:?}, got {:?}",
90            &options.arrow_type,
91            arrow.data_type()
92        );
93    }
94
95    Ok(arrow)
96}
97
98pub struct ToArrowOptions {
99    /// The Arrow data type to convert to, if specified.
100    pub arrow_type: Option<DataType>,
101}
102
103impl Options for ToArrowOptions {
104    fn as_any(&self) -> &dyn Any {
105        self
106    }
107}
108
109struct ToArrow;
110
111impl ComputeFnVTable for ToArrow {
112    fn invoke(
113        &self,
114        args: &InvocationArgs,
115        kernels: &[ArcRef<dyn Kernel>],
116    ) -> VortexResult<Output> {
117        let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
118
119        for kernel in kernels {
120            if let Some(output) = kernel.invoke(args)? {
121                return Ok(output);
122            }
123        }
124        if let Some(output) = array.invoke(&TO_ARROW_FN, args)? {
125            return Ok(output);
126        }
127
128        // Fall back to canonicalizing and then converting.
129        if !array.is_canonical() {
130            let canonical_array = array.to_canonical();
131            let arrow_array = to_arrow_opts(
132                canonical_array.as_ref(),
133                &ToArrowOptions {
134                    arrow_type: arrow_type.cloned(),
135                },
136            )?;
137            return Ok(ArrowArray::new(arrow_array, array.dtype().nullability())
138                .to_array()
139                .into());
140        }
141
142        vortex_bail!(
143            "Failed to convert array {} to Arrow {:?}",
144            array.encoding_id(),
145            arrow_type
146        );
147    }
148
149    fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
150        let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
151        Ok(arrow_type
152            .map(|arrow_type| DType::from_arrow((arrow_type, array.dtype().nullability())))
153            .unwrap_or_else(|| array.dtype().clone()))
154    }
155
156    fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
157        let ToArrowArgs { array, .. } = ToArrowArgs::try_from(args)?;
158        Ok(array.len())
159    }
160
161    fn is_elementwise(&self) -> bool {
162        false
163    }
164}
165
166pub struct ToArrowArgs<'a> {
167    array: &'a dyn Array,
168    arrow_type: Option<&'a DataType>,
169}
170
171impl<'a> TryFrom<&InvocationArgs<'a>> for ToArrowArgs<'a> {
172    type Error = VortexError;
173
174    fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
175        if value.inputs.len() != 1 {
176            vortex_bail!("Expected 1 input, found {}", value.inputs.len());
177        }
178        let array = value.inputs[0]
179            .array()
180            .ok_or_else(|| vortex_err!("Expected input 0 to be an array"))?;
181        let options = value
182            .options
183            .as_any()
184            .downcast_ref::<ToArrowOptions>()
185            .vortex_expect("Expected options to be ToArrowOptions");
186
187        Ok(ToArrowArgs {
188            array,
189            arrow_type: options.arrow_type.as_ref(),
190        })
191    }
192}
193
194pub struct ToArrowKernelRef(pub ArcRef<dyn Kernel>);
195inventory::collect!(ToArrowKernelRef);
196
197pub trait ToArrowKernel: VTable {
198    fn to_arrow(
199        &self,
200        arr: &Self::Array,
201        arrow_type: Option<&DataType>,
202    ) -> VortexResult<Option<ArrowArrayRef>>;
203}
204
205#[derive(Debug)]
206pub struct ToArrowKernelAdapter<V: VTable>(pub V);
207
208impl<V: VTable + ToArrowKernel> ToArrowKernelAdapter<V> {
209    pub const fn lift(&'static self) -> ToArrowKernelRef {
210        ToArrowKernelRef(ArcRef::new_ref(self))
211    }
212}
213
214impl<V: VTable + ToArrowKernel> Kernel for ToArrowKernelAdapter<V> {
215    fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
216        let inputs = ToArrowArgs::try_from(args)?;
217        let Some(array) = inputs.array.as_opt::<V>() else {
218            return Ok(None);
219        };
220
221        let Some(arrow_array) = V::to_arrow(&self.0, array, inputs.arrow_type)? else {
222            return Ok(None);
223        };
224
225        Ok(Some(
226            ArrowArray::new(arrow_array, array.dtype().nullability())
227                .to_array()
228                .into(),
229        ))
230    }
231}
232
233#[cfg(test)]
234mod tests {
235    use std::sync::Arc;
236
237    use arrow_array::ArrayRef;
238    use arrow_array::PrimitiveArray;
239    use arrow_array::StringViewArray;
240    use arrow_array::StructArray;
241    use arrow_array::types::Int32Type;
242    use arrow_buffer::NullBuffer;
243
244    use super::to_arrow;
245    use crate::IntoArray;
246    use crate::arrays;
247
248    #[test]
249    fn test_to_arrow() {
250        let array = arrays::StructArray::from_fields(
251            vec![
252                (
253                    "a",
254                    arrays::PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)])
255                        .into_array(),
256                ),
257                (
258                    "b",
259                    arrays::VarBinViewArray::from_iter_str(vec!["a", "b", "c"]).into_array(),
260                ),
261            ]
262            .as_slice(),
263        )
264        .unwrap();
265
266        let arrow_array: ArrayRef = Arc::new(
267            StructArray::try_from(vec![
268                (
269                    "a",
270                    Arc::new(PrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
271                        vec![1, 0, 2],
272                        Some(NullBuffer::from(vec![true, false, true])),
273                    )) as ArrayRef,
274                ),
275                (
276                    "b",
277                    Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
278                ),
279            ])
280            .unwrap(),
281        );
282
283        assert_eq!(
284            &to_arrow(array.as_ref(), &array.dtype().to_arrow_dtype().unwrap()).unwrap(),
285            &arrow_array
286        );
287    }
288}