vortex_array/arrow/compute/to_arrow/
mod.rs1mod canonical;
5mod list;
6mod null_buffer;
7mod temporal;
8mod varbin;
9
10use std::any::Any;
11use std::sync::LazyLock;
12
13use arcref::ArcRef;
14use arrow_array::ArrayRef as ArrowArrayRef;
15use arrow_schema::DataType;
16use vortex_dtype::DType;
17use vortex_dtype::arrow::FromArrowType;
18use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
19
20use crate::Array;
21use crate::arrow::array::{ArrowArray, ArrowVTable};
22use crate::compute::{ComputeFn, ComputeFnVTable, InvocationArgs, Kernel, Options, Output};
23use crate::vtable::VTable;
24
25static TO_ARROW_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
26 let compute = ComputeFn::new("to_arrow".into(), ArcRef::new_ref(&ToArrow));
27
28 compute.register_kernel(ArcRef::new_ref(&canonical::ToArrowCanonical));
30 compute.register_kernel(ArcRef::new_ref(&temporal::ToArrowTemporal));
31
32 for kernel in inventory::iter::<ToArrowKernelRef> {
33 compute.register_kernel(kernel.0.clone());
34 }
35 compute
36});
37
38pub(crate) fn warm_up_vtable() -> usize {
39 TO_ARROW_FN.kernels().len()
40}
41
42pub fn to_arrow_preferred(array: &dyn Array) -> VortexResult<ArrowArrayRef> {
50 to_arrow_opts(array, &ToArrowOptions { arrow_type: None })
51}
52
53pub fn to_arrow(array: &dyn Array, arrow_type: &DataType) -> VortexResult<ArrowArrayRef> {
55 to_arrow_opts(
56 array,
57 &ToArrowOptions {
58 arrow_type: Some(arrow_type.clone()),
59 },
60 )
61}
62
63pub fn to_arrow_opts(array: &dyn Array, options: &ToArrowOptions) -> VortexResult<ArrowArrayRef> {
64 let arrow = TO_ARROW_FN
65 .invoke(&InvocationArgs {
66 inputs: &[array.into()],
67 options,
68 })?
69 .unwrap_array()?
70 .as_opt::<ArrowVTable>()
71 .ok_or_else(|| vortex_err!("ToArrow compute kernels must return a Vortex ArrowArray"))?
72 .inner()
73 .clone();
74
75 if let Some(arrow_type) = &options.arrow_type
76 && arrow.data_type() != arrow_type
77 {
78 vortex_bail!(
79 "Arrow array type mismatch: expected {:?}, got {:?}",
80 &options.arrow_type,
81 arrow.data_type()
82 );
83 }
84
85 Ok(arrow)
86}
87
88pub struct ToArrowOptions {
89 pub arrow_type: Option<DataType>,
91}
92
93impl Options for ToArrowOptions {
94 fn as_any(&self) -> &dyn Any {
95 self
96 }
97}
98
99struct ToArrow;
100
101impl ComputeFnVTable for ToArrow {
102 fn invoke(
103 &self,
104 args: &InvocationArgs,
105 kernels: &[ArcRef<dyn Kernel>],
106 ) -> VortexResult<Output> {
107 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
108
109 for kernel in kernels {
110 if let Some(output) = kernel.invoke(args)? {
111 return Ok(output);
112 }
113 }
114 if let Some(output) = array.invoke(&TO_ARROW_FN, args)? {
115 return Ok(output);
116 }
117
118 if !array.is_canonical() {
120 let canonical_array = array.to_canonical();
121 let arrow_array = to_arrow_opts(
122 canonical_array.as_ref(),
123 &ToArrowOptions {
124 arrow_type: arrow_type.cloned(),
125 },
126 )?;
127 return Ok(ArrowArray::new(arrow_array, array.dtype().nullability())
128 .to_array()
129 .into());
130 }
131
132 vortex_bail!(
133 "Failed to convert array {} to Arrow {:?}",
134 array.encoding_id(),
135 arrow_type
136 );
137 }
138
139 fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
140 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
141 Ok(arrow_type
142 .map(|arrow_type| DType::from_arrow((arrow_type, array.dtype().nullability())))
143 .unwrap_or_else(|| array.dtype().clone()))
144 }
145
146 fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
147 let ToArrowArgs { array, .. } = ToArrowArgs::try_from(args)?;
148 Ok(array.len())
149 }
150
151 fn is_elementwise(&self) -> bool {
152 false
153 }
154}
155
156pub struct ToArrowArgs<'a> {
157 array: &'a dyn Array,
158 arrow_type: Option<&'a DataType>,
159}
160
161impl<'a> TryFrom<&InvocationArgs<'a>> for ToArrowArgs<'a> {
162 type Error = VortexError;
163
164 fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
165 if value.inputs.len() != 1 {
166 vortex_bail!("Expected 1 input, found {}", value.inputs.len());
167 }
168 let array = value.inputs[0]
169 .array()
170 .ok_or_else(|| vortex_err!("Expected input 0 to be an array"))?;
171 let options = value
172 .options
173 .as_any()
174 .downcast_ref::<ToArrowOptions>()
175 .vortex_expect("Expected options to be ToArrowOptions");
176
177 Ok(ToArrowArgs {
178 array,
179 arrow_type: options.arrow_type.as_ref(),
180 })
181 }
182}
183
184pub struct ToArrowKernelRef(pub ArcRef<dyn Kernel>);
185inventory::collect!(ToArrowKernelRef);
186
187pub trait ToArrowKernel: VTable {
188 fn to_arrow(
189 &self,
190 arr: &Self::Array,
191 arrow_type: Option<&DataType>,
192 ) -> VortexResult<Option<ArrowArrayRef>>;
193}
194
195#[derive(Debug)]
196pub struct ToArrowKernelAdapter<V: VTable>(pub V);
197
198impl<V: VTable + ToArrowKernel> ToArrowKernelAdapter<V> {
199 pub const fn lift(&'static self) -> ToArrowKernelRef {
200 ToArrowKernelRef(ArcRef::new_ref(self))
201 }
202}
203
204impl<V: VTable + ToArrowKernel> Kernel for ToArrowKernelAdapter<V> {
205 fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
206 let inputs = ToArrowArgs::try_from(args)?;
207 let Some(array) = inputs.array.as_opt::<V>() else {
208 return Ok(None);
209 };
210
211 let Some(arrow_array) = V::to_arrow(&self.0, array, inputs.arrow_type)? else {
212 return Ok(None);
213 };
214
215 Ok(Some(
216 ArrowArray::new(arrow_array, array.dtype().nullability())
217 .to_array()
218 .into(),
219 ))
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use std::sync::Arc;
226
227 use arrow_array::types::Int32Type;
228 use arrow_array::{ArrayRef, PrimitiveArray, StringViewArray, StructArray};
229 use arrow_buffer::NullBuffer;
230
231 use super::to_arrow;
232 use crate::{IntoArray, arrays};
233
234 #[test]
235 fn test_to_arrow() {
236 let array = arrays::StructArray::from_fields(
237 vec![
238 (
239 "a",
240 arrays::PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)])
241 .into_array(),
242 ),
243 (
244 "b",
245 arrays::VarBinViewArray::from_iter_str(vec!["a", "b", "c"]).into_array(),
246 ),
247 ]
248 .as_slice(),
249 )
250 .unwrap();
251
252 let arrow_array: ArrayRef = Arc::new(
253 StructArray::try_from(vec![
254 (
255 "a",
256 Arc::new(PrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
257 vec![1, 0, 2],
258 Some(NullBuffer::from(vec![true, false, true])),
259 )) as ArrayRef,
260 ),
261 (
262 "b",
263 Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
264 ),
265 ])
266 .unwrap(),
267 );
268
269 assert_eq!(
270 &to_arrow(array.as_ref(), &array.dtype().to_arrow_dtype().unwrap()).unwrap(),
271 &arrow_array
272 );
273 }
274}