vortex_array/arrow/compute/to_arrow/
mod.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4mod canonical;
5mod list;
6mod temporal;
7mod varbin;
8
9use std::any::Any;
10use std::sync::LazyLock;
11
12use arcref::ArcRef;
13use arrow_array::ArrayRef as ArrowArrayRef;
14use arrow_schema::DataType;
15use vortex_dtype::DType;
16use vortex_dtype::arrow::FromArrowType;
17use vortex_error::VortexError;
18use vortex_error::VortexExpect;
19use vortex_error::VortexResult;
20use vortex_error::vortex_bail;
21use vortex_error::vortex_err;
22
23use crate::Array;
24use crate::LEGACY_SESSION;
25use crate::USE_VORTEX_OPERATORS;
26use crate::arrow::ArrowArrayExecutor;
27use crate::arrow::array::ArrowArray;
28use crate::arrow::array::ArrowVTable;
29use crate::compute::ComputeFn;
30use crate::compute::ComputeFnVTable;
31use crate::compute::InvocationArgs;
32use crate::compute::Kernel;
33use crate::compute::Options;
34use crate::compute::Output;
35use crate::vtable::VTable;
36
37static TO_ARROW_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
38    let compute = ComputeFn::new("to_arrow".into(), ArcRef::new_ref(&ToArrow));
39
40    // Register the kernels we ship ourselves
41    compute.register_kernel(ArcRef::new_ref(&canonical::ToArrowCanonical));
42    compute.register_kernel(ArcRef::new_ref(&temporal::ToArrowTemporal));
43
44    for kernel in inventory::iter::<ToArrowKernelRef> {
45        compute.register_kernel(kernel.0.clone());
46    }
47    compute
48});
49
50pub(crate) fn warm_up_vtable() -> usize {
51    TO_ARROW_FN.kernels().len()
52}
53
54/// Convert a Vortex array to an Arrow array with the encoding's preferred `DataType`.
55///
56/// For example, a `VarBinArray` will be converted to an Arrow `VarBin` array, instead of the
57/// canonical `VarBinViewArray`.
58///
59/// Warning: do not use this to convert a Vortex [`crate::stream::ArrayStream`] since each array
60/// may have a different preferred Arrow type. Use [`to_arrow`] instead.
61pub fn to_arrow_preferred(array: &dyn Array) -> VortexResult<ArrowArrayRef> {
62    to_arrow_opts(array, &ToArrowOptions { arrow_type: None })
63}
64
65/// Convert a Vortex array to an Arrow array of the given type.
66pub fn to_arrow(array: &dyn Array, arrow_type: &DataType) -> VortexResult<ArrowArrayRef> {
67    to_arrow_opts(
68        array,
69        &ToArrowOptions {
70            arrow_type: Some(arrow_type.clone()),
71        },
72    )
73}
74
75pub fn to_arrow_opts(array: &dyn Array, options: &ToArrowOptions) -> VortexResult<ArrowArrayRef> {
76    let arrow = TO_ARROW_FN
77        .invoke(&InvocationArgs {
78            inputs: &[array.into()],
79            options,
80        })?
81        .unwrap_array()?
82        .as_opt::<ArrowVTable>()
83        .ok_or_else(|| vortex_err!("ToArrow compute kernels must return a Vortex ArrowArray"))?
84        .inner()
85        .clone();
86
87    if let Some(arrow_type) = &options.arrow_type
88        && arrow.data_type() != arrow_type
89    {
90        vortex_bail!(
91            "Arrow array type mismatch: expected {:?}, got {:?}",
92            &options.arrow_type,
93            arrow.data_type()
94        );
95    }
96
97    Ok(arrow)
98}
99
100pub struct ToArrowOptions {
101    /// The Arrow data type to convert to, if specified.
102    pub arrow_type: Option<DataType>,
103}
104
105impl Options for ToArrowOptions {
106    fn as_any(&self) -> &dyn Any {
107        self
108    }
109}
110
111struct ToArrow;
112
113impl ComputeFnVTable for ToArrow {
114    fn invoke(
115        &self,
116        args: &InvocationArgs,
117        kernels: &[ArcRef<dyn Kernel>],
118    ) -> VortexResult<Output> {
119        let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
120
121        for kernel in kernels {
122            if let Some(output) = kernel.invoke(args)? {
123                return Ok(output);
124            }
125        }
126        if let Some(output) = array.invoke(&TO_ARROW_FN, args)? {
127            return Ok(output);
128        }
129
130        if !array.is_canonical() {
131            let arrow_array = if *USE_VORTEX_OPERATORS {
132                let arrow_type = arrow_type
133                    .cloned()
134                    .map(Ok)
135                    .unwrap_or_else(|| array.dtype().to_arrow_dtype())?;
136                array
137                    .to_array()
138                    .execute_arrow(&arrow_type, &LEGACY_SESSION)?
139            } else {
140                // Fall back to canonicalizing and then converting.
141                let canonical_array = array.to_canonical();
142                to_arrow_opts(
143                    canonical_array.as_ref(),
144                    &ToArrowOptions {
145                        arrow_type: arrow_type.cloned(),
146                    },
147                )?
148            };
149
150            return Ok(ArrowArray::new(arrow_array, array.dtype().nullability())
151                .to_array()
152                .into());
153        }
154
155        vortex_bail!(
156            "Failed to convert array {} to Arrow {:?}",
157            array.encoding_id(),
158            arrow_type
159        );
160    }
161
162    fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
163        let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
164        Ok(arrow_type
165            .map(|arrow_type| DType::from_arrow((arrow_type, array.dtype().nullability())))
166            .unwrap_or_else(|| array.dtype().clone()))
167    }
168
169    fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
170        let ToArrowArgs { array, .. } = ToArrowArgs::try_from(args)?;
171        Ok(array.len())
172    }
173
174    fn is_elementwise(&self) -> bool {
175        false
176    }
177}
178
179pub struct ToArrowArgs<'a> {
180    array: &'a dyn Array,
181    arrow_type: Option<&'a DataType>,
182}
183
184impl<'a> TryFrom<&InvocationArgs<'a>> for ToArrowArgs<'a> {
185    type Error = VortexError;
186
187    fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
188        if value.inputs.len() != 1 {
189            vortex_bail!("Expected 1 input, found {}", value.inputs.len());
190        }
191        let array = value.inputs[0]
192            .array()
193            .ok_or_else(|| vortex_err!("Expected input 0 to be an array"))?;
194        let options = value
195            .options
196            .as_any()
197            .downcast_ref::<ToArrowOptions>()
198            .vortex_expect("Expected options to be ToArrowOptions");
199
200        Ok(ToArrowArgs {
201            array,
202            arrow_type: options.arrow_type.as_ref(),
203        })
204    }
205}
206
207pub struct ToArrowKernelRef(pub ArcRef<dyn Kernel>);
208inventory::collect!(ToArrowKernelRef);
209
210pub trait ToArrowKernel: VTable {
211    fn to_arrow(
212        &self,
213        arr: &Self::Array,
214        arrow_type: Option<&DataType>,
215    ) -> VortexResult<Option<ArrowArrayRef>>;
216}
217
218#[derive(Debug)]
219pub struct ToArrowKernelAdapter<V: VTable>(pub V);
220
221impl<V: VTable + ToArrowKernel> ToArrowKernelAdapter<V> {
222    pub const fn lift(&'static self) -> ToArrowKernelRef {
223        ToArrowKernelRef(ArcRef::new_ref(self))
224    }
225}
226
227impl<V: VTable + ToArrowKernel> Kernel for ToArrowKernelAdapter<V> {
228    fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
229        let inputs = ToArrowArgs::try_from(args)?;
230        let Some(array) = inputs.array.as_opt::<V>() else {
231            return Ok(None);
232        };
233
234        let Some(arrow_array) = V::to_arrow(&self.0, array, inputs.arrow_type)? else {
235            return Ok(None);
236        };
237
238        Ok(Some(
239            ArrowArray::new(arrow_array, array.dtype().nullability())
240                .to_array()
241                .into(),
242        ))
243    }
244}
245
246#[cfg(test)]
247mod tests {
248    use std::sync::Arc;
249
250    use arrow_array::ArrayRef;
251    use arrow_array::PrimitiveArray;
252    use arrow_array::StringViewArray;
253    use arrow_array::StructArray;
254    use arrow_array::types::Int32Type;
255    use arrow_buffer::NullBuffer;
256
257    use super::to_arrow;
258    use crate::IntoArray;
259    use crate::arrays;
260
261    #[test]
262    fn test_to_arrow() {
263        let array = arrays::StructArray::from_fields(
264            vec![
265                (
266                    "a",
267                    arrays::PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)])
268                        .into_array(),
269                ),
270                (
271                    "b",
272                    arrays::VarBinViewArray::from_iter_str(vec!["a", "b", "c"]).into_array(),
273                ),
274            ]
275            .as_slice(),
276        )
277        .unwrap();
278
279        let arrow_array: ArrayRef = Arc::new(
280            StructArray::try_from(vec![
281                (
282                    "a",
283                    Arc::new(PrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
284                        vec![1, 0, 2],
285                        Some(NullBuffer::from(vec![true, false, true])),
286                    )) as ArrayRef,
287                ),
288                (
289                    "b",
290                    Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
291                ),
292            ])
293            .unwrap(),
294        );
295
296        assert_eq!(
297            &to_arrow(array.as_ref(), &array.dtype().to_arrow_dtype().unwrap()).unwrap(),
298            &arrow_array
299        );
300    }
301}