vortex_array/arrow/compute/to_arrow/
mod.rs1mod canonical;
2mod temporal;
3mod varbin;
4
5use std::any::Any;
6use std::sync::LazyLock;
7
8use arcref::ArcRef;
9use arrow_array::ArrayRef as ArrowArrayRef;
10use arrow_schema::DataType;
11use vortex_dtype::DType;
12use vortex_dtype::arrow::FromArrowType;
13use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
14
15use crate::Array;
16use crate::arrow::array::{ArrowArray, ArrowVTable};
17use crate::compute::{ComputeFn, ComputeFnVTable, InvocationArgs, Kernel, Options, Output};
18use crate::vtable::VTable;
19
20pub fn to_arrow_preferred(array: &dyn Array) -> VortexResult<ArrowArrayRef> {
28    to_arrow_opts(array, &ToArrowOptions { arrow_type: None })
29}
30
31pub fn to_arrow(array: &dyn Array, arrow_type: &DataType) -> VortexResult<ArrowArrayRef> {
33    to_arrow_opts(
34        array,
35        &ToArrowOptions {
36            arrow_type: Some(arrow_type.clone()),
37        },
38    )
39}
40
41pub fn to_arrow_opts(array: &dyn Array, options: &ToArrowOptions) -> VortexResult<ArrowArrayRef> {
42    let arrow = TO_ARROW_FN
43        .invoke(&InvocationArgs {
44            inputs: &[array.into()],
45            options,
46        })?
47        .unwrap_array()?
48        .as_opt::<ArrowVTable>()
49        .ok_or_else(|| vortex_err!("ToArrow compute kernels must return a Vortex ArrowArray"))?
50        .inner()
51        .clone();
52
53    if let Some(arrow_type) = &options.arrow_type {
54        if arrow.data_type() != arrow_type {
55            vortex_bail!(
56                "Arrow array type mismatch: expected {:?}, got {:?}",
57                &options.arrow_type,
58                arrow.data_type()
59            );
60        }
61    }
62
63    Ok(arrow)
64}
65
66pub struct ToArrowOptions {
67    pub arrow_type: Option<DataType>,
69}
70
71impl Options for ToArrowOptions {
72    fn as_any(&self) -> &dyn Any {
73        self
74    }
75}
76
77struct ToArrow;
78
79impl ComputeFnVTable for ToArrow {
80    fn invoke(
81        &self,
82        args: &InvocationArgs,
83        kernels: &[ArcRef<dyn Kernel>],
84    ) -> VortexResult<Output> {
85        let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
86
87        for kernel in kernels {
88            if let Some(output) = kernel.invoke(args)? {
89                return Ok(output);
90            }
91        }
92        if let Some(output) = array.invoke(&TO_ARROW_FN, args)? {
93            return Ok(output);
94        }
95
96        if !array.is_canonical() {
98            let canonical_array = array.to_canonical()?;
99            let arrow_array = to_arrow_opts(
100                canonical_array.as_ref(),
101                &ToArrowOptions {
102                    arrow_type: arrow_type.cloned(),
103                },
104            )?;
105            return Ok(ArrowArray::new(arrow_array, array.dtype().nullability())
106                .to_array()
107                .into());
108        }
109
110        vortex_bail!(
111            "Failed to convert array {} to Arrow {:?}",
112            array.encoding_id(),
113            arrow_type
114        );
115    }
116
117    fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
118        let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
119        Ok(arrow_type
120            .map(|arrow_type| DType::from_arrow((arrow_type, array.dtype().nullability())))
121            .unwrap_or_else(|| array.dtype().clone()))
122    }
123
124    fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
125        let ToArrowArgs { array, .. } = ToArrowArgs::try_from(args)?;
126        Ok(array.len())
127    }
128
129    fn is_elementwise(&self) -> bool {
130        false
131    }
132}
133
134pub static TO_ARROW_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
135    let compute = ComputeFn::new("to_arrow".into(), ArcRef::new_ref(&ToArrow));
136
137    compute.register_kernel(ArcRef::new_ref(&canonical::ToArrowCanonical));
139    compute.register_kernel(ArcRef::new_ref(&temporal::ToArrowTemporal));
140
141    for kernel in inventory::iter::<ToArrowKernelRef> {
142        compute.register_kernel(kernel.0.clone());
143    }
144    compute
145});
146
147pub struct ToArrowArgs<'a> {
148    array: &'a dyn Array,
149    arrow_type: Option<&'a DataType>,
150}
151
152impl<'a> TryFrom<&InvocationArgs<'a>> for ToArrowArgs<'a> {
153    type Error = VortexError;
154
155    fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
156        if value.inputs.len() != 1 {
157            vortex_bail!("Expected 1 input, found {}", value.inputs.len());
158        }
159        let array = value.inputs[0]
160            .array()
161            .ok_or_else(|| vortex_err!("Expected input 0 to be an array"))?;
162        let options = value
163            .options
164            .as_any()
165            .downcast_ref::<ToArrowOptions>()
166            .vortex_expect("Expected options to be ToArrowOptions");
167
168        Ok(ToArrowArgs {
169            array,
170            arrow_type: options.arrow_type.as_ref(),
171        })
172    }
173}
174
175pub struct ToArrowKernelRef(pub ArcRef<dyn Kernel>);
176inventory::collect!(ToArrowKernelRef);
177
178pub trait ToArrowKernel: VTable {
179    fn to_arrow(
180        &self,
181        arr: &Self::Array,
182        arrow_type: Option<&DataType>,
183    ) -> VortexResult<Option<ArrowArrayRef>>;
184}
185
186#[derive(Debug)]
187pub struct ToArrowKernelAdapter<V: VTable>(pub V);
188
189impl<V: VTable + ToArrowKernel> ToArrowKernelAdapter<V> {
190    pub const fn lift(&'static self) -> ToArrowKernelRef {
191        ToArrowKernelRef(ArcRef::new_ref(self))
192    }
193}
194
195impl<V: VTable + ToArrowKernel> Kernel for ToArrowKernelAdapter<V> {
196    fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
197        let inputs = ToArrowArgs::try_from(args)?;
198        let Some(array) = inputs.array.as_opt::<V>() else {
199            return Ok(None);
200        };
201
202        let Some(arrow_array) = V::to_arrow(&self.0, array, inputs.arrow_type)? else {
203            return Ok(None);
204        };
205
206        Ok(Some(
207            ArrowArray::new(arrow_array, array.dtype().nullability())
208                .to_array()
209                .into(),
210        ))
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use std::sync::Arc;
217
218    use arrow_array::types::Int32Type;
219    use arrow_array::{ArrayRef, PrimitiveArray, StringViewArray, StructArray};
220    use arrow_buffer::NullBuffer;
221
222    use super::to_arrow;
223    use crate::{IntoArray, arrays};
224
225    #[test]
226    fn test_to_arrow() {
227        let array = arrays::StructArray::from_fields(
228            vec![
229                (
230                    "a",
231                    arrays::PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)])
232                        .into_array(),
233                ),
234                (
235                    "b",
236                    arrays::VarBinViewArray::from_iter_str(vec!["a", "b", "c"]).into_array(),
237                ),
238            ]
239            .as_slice(),
240        )
241        .unwrap();
242
243        let arrow_array: ArrayRef = Arc::new(
244            StructArray::try_from(vec![
245                (
246                    "a",
247                    Arc::new(PrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
248                        vec![1, 0, 2],
249                        Some(NullBuffer::from(vec![true, false, true])),
250                    )) as ArrayRef,
251                ),
252                (
253                    "b",
254                    Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
255                ),
256            ])
257            .unwrap(),
258        );
259
260        assert_eq!(
261            &to_arrow(array.as_ref(), &array.dtype().to_arrow_dtype().unwrap()).unwrap(),
262            &arrow_array
263        );
264    }
265}