vortex_array/arrow/compute/to_arrow/
mod.rs1mod canonical;
5mod temporal;
6mod varbin;
7
8use std::any::Any;
9use std::sync::LazyLock;
10
11use arcref::ArcRef;
12use arrow_array::ArrayRef as ArrowArrayRef;
13use arrow_schema::DataType;
14use vortex_dtype::DType;
15use vortex_dtype::arrow::FromArrowType;
16use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
17
18use crate::Array;
19use crate::arrow::array::{ArrowArray, ArrowVTable};
20use crate::compute::{ComputeFn, ComputeFnVTable, InvocationArgs, Kernel, Options, Output};
21use crate::vtable::VTable;
22
23static TO_ARROW_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
24 let compute = ComputeFn::new("to_arrow".into(), ArcRef::new_ref(&ToArrow));
25
26 compute.register_kernel(ArcRef::new_ref(&canonical::ToArrowCanonical));
28 compute.register_kernel(ArcRef::new_ref(&temporal::ToArrowTemporal));
29
30 for kernel in inventory::iter::<ToArrowKernelRef> {
31 compute.register_kernel(kernel.0.clone());
32 }
33 compute
34});
35
36pub(crate) fn warm_up_vtable() -> usize {
37 TO_ARROW_FN.kernels().len()
38}
39
40pub fn to_arrow_preferred(array: &dyn Array) -> VortexResult<ArrowArrayRef> {
48 to_arrow_opts(array, &ToArrowOptions { arrow_type: None })
49}
50
51pub fn to_arrow(array: &dyn Array, arrow_type: &DataType) -> VortexResult<ArrowArrayRef> {
53 to_arrow_opts(
54 array,
55 &ToArrowOptions {
56 arrow_type: Some(arrow_type.clone()),
57 },
58 )
59}
60
61pub fn to_arrow_opts(array: &dyn Array, options: &ToArrowOptions) -> VortexResult<ArrowArrayRef> {
62 let arrow = TO_ARROW_FN
63 .invoke(&InvocationArgs {
64 inputs: &[array.into()],
65 options,
66 })?
67 .unwrap_array()?
68 .as_opt::<ArrowVTable>()
69 .ok_or_else(|| vortex_err!("ToArrow compute kernels must return a Vortex ArrowArray"))?
70 .inner()
71 .clone();
72
73 if let Some(arrow_type) = &options.arrow_type
74 && arrow.data_type() != arrow_type
75 {
76 vortex_bail!(
77 "Arrow array type mismatch: expected {:?}, got {:?}",
78 &options.arrow_type,
79 arrow.data_type()
80 );
81 }
82
83 Ok(arrow)
84}
85
86pub struct ToArrowOptions {
87 pub arrow_type: Option<DataType>,
89}
90
91impl Options for ToArrowOptions {
92 fn as_any(&self) -> &dyn Any {
93 self
94 }
95}
96
97struct ToArrow;
98
99impl ComputeFnVTable for ToArrow {
100 fn invoke(
101 &self,
102 args: &InvocationArgs,
103 kernels: &[ArcRef<dyn Kernel>],
104 ) -> VortexResult<Output> {
105 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
106
107 for kernel in kernels {
108 if let Some(output) = kernel.invoke(args)? {
109 return Ok(output);
110 }
111 }
112 if let Some(output) = array.invoke(&TO_ARROW_FN, args)? {
113 return Ok(output);
114 }
115
116 if !array.is_canonical() {
118 let canonical_array = array.to_canonical();
119 let arrow_array = to_arrow_opts(
120 canonical_array.as_ref(),
121 &ToArrowOptions {
122 arrow_type: arrow_type.cloned(),
123 },
124 )?;
125 return Ok(ArrowArray::new(arrow_array, array.dtype().nullability())
126 .to_array()
127 .into());
128 }
129
130 vortex_bail!(
131 "Failed to convert array {} to Arrow {:?}",
132 array.encoding_id(),
133 arrow_type
134 );
135 }
136
137 fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
138 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
139 Ok(arrow_type
140 .map(|arrow_type| DType::from_arrow((arrow_type, array.dtype().nullability())))
141 .unwrap_or_else(|| array.dtype().clone()))
142 }
143
144 fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
145 let ToArrowArgs { array, .. } = ToArrowArgs::try_from(args)?;
146 Ok(array.len())
147 }
148
149 fn is_elementwise(&self) -> bool {
150 false
151 }
152}
153
154pub struct ToArrowArgs<'a> {
155 array: &'a dyn Array,
156 arrow_type: Option<&'a DataType>,
157}
158
159impl<'a> TryFrom<&InvocationArgs<'a>> for ToArrowArgs<'a> {
160 type Error = VortexError;
161
162 fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
163 if value.inputs.len() != 1 {
164 vortex_bail!("Expected 1 input, found {}", value.inputs.len());
165 }
166 let array = value.inputs[0]
167 .array()
168 .ok_or_else(|| vortex_err!("Expected input 0 to be an array"))?;
169 let options = value
170 .options
171 .as_any()
172 .downcast_ref::<ToArrowOptions>()
173 .vortex_expect("Expected options to be ToArrowOptions");
174
175 Ok(ToArrowArgs {
176 array,
177 arrow_type: options.arrow_type.as_ref(),
178 })
179 }
180}
181
182pub struct ToArrowKernelRef(pub ArcRef<dyn Kernel>);
183inventory::collect!(ToArrowKernelRef);
184
185pub trait ToArrowKernel: VTable {
186 fn to_arrow(
187 &self,
188 arr: &Self::Array,
189 arrow_type: Option<&DataType>,
190 ) -> VortexResult<Option<ArrowArrayRef>>;
191}
192
193#[derive(Debug)]
194pub struct ToArrowKernelAdapter<V: VTable>(pub V);
195
196impl<V: VTable + ToArrowKernel> ToArrowKernelAdapter<V> {
197 pub const fn lift(&'static self) -> ToArrowKernelRef {
198 ToArrowKernelRef(ArcRef::new_ref(self))
199 }
200}
201
202impl<V: VTable + ToArrowKernel> Kernel for ToArrowKernelAdapter<V> {
203 fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
204 let inputs = ToArrowArgs::try_from(args)?;
205 let Some(array) = inputs.array.as_opt::<V>() else {
206 return Ok(None);
207 };
208
209 let Some(arrow_array) = V::to_arrow(&self.0, array, inputs.arrow_type)? else {
210 return Ok(None);
211 };
212
213 Ok(Some(
214 ArrowArray::new(arrow_array, array.dtype().nullability())
215 .to_array()
216 .into(),
217 ))
218 }
219}
220
221#[cfg(test)]
222mod tests {
223 use std::sync::Arc;
224
225 use arrow_array::types::Int32Type;
226 use arrow_array::{ArrayRef, PrimitiveArray, StringViewArray, StructArray};
227 use arrow_buffer::NullBuffer;
228
229 use super::to_arrow;
230 use crate::{IntoArray, arrays};
231
232 #[test]
233 fn test_to_arrow() {
234 let array = arrays::StructArray::from_fields(
235 vec![
236 (
237 "a",
238 arrays::PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)])
239 .into_array(),
240 ),
241 (
242 "b",
243 arrays::VarBinViewArray::from_iter_str(vec!["a", "b", "c"]).into_array(),
244 ),
245 ]
246 .as_slice(),
247 )
248 .unwrap();
249
250 let arrow_array: ArrayRef = Arc::new(
251 StructArray::try_from(vec![
252 (
253 "a",
254 Arc::new(PrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
255 vec![1, 0, 2],
256 Some(NullBuffer::from(vec![true, false, true])),
257 )) as ArrayRef,
258 ),
259 (
260 "b",
261 Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
262 ),
263 ])
264 .unwrap(),
265 );
266
267 assert_eq!(
268 &to_arrow(array.as_ref(), &array.dtype().to_arrow_dtype().unwrap()).unwrap(),
269 &arrow_array
270 );
271 }
272}