vortex_array/arrow/compute/to_arrow/
mod.rs1mod canonical;
5mod temporal;
6mod varbin;
7
8use std::any::Any;
9use std::sync::LazyLock;
10
11use arcref::ArcRef;
12use arrow_array::ArrayRef as ArrowArrayRef;
13use arrow_schema::DataType;
14use vortex_dtype::DType;
15use vortex_dtype::arrow::FromArrowType;
16use vortex_error::{VortexError, VortexExpect, VortexResult, vortex_bail, vortex_err};
17
18use crate::Array;
19use crate::arrow::array::{ArrowArray, ArrowVTable};
20use crate::compute::{ComputeFn, ComputeFnVTable, InvocationArgs, Kernel, Options, Output};
21use crate::vtable::VTable;
22
23pub fn to_arrow_preferred(array: &dyn Array) -> VortexResult<ArrowArrayRef> {
31 to_arrow_opts(array, &ToArrowOptions { arrow_type: None })
32}
33
34pub fn to_arrow(array: &dyn Array, arrow_type: &DataType) -> VortexResult<ArrowArrayRef> {
36 to_arrow_opts(
37 array,
38 &ToArrowOptions {
39 arrow_type: Some(arrow_type.clone()),
40 },
41 )
42}
43
44pub fn to_arrow_opts(array: &dyn Array, options: &ToArrowOptions) -> VortexResult<ArrowArrayRef> {
45 let arrow = TO_ARROW_FN
46 .invoke(&InvocationArgs {
47 inputs: &[array.into()],
48 options,
49 })?
50 .unwrap_array()?
51 .as_opt::<ArrowVTable>()
52 .ok_or_else(|| vortex_err!("ToArrow compute kernels must return a Vortex ArrowArray"))?
53 .inner()
54 .clone();
55
56 if let Some(arrow_type) = &options.arrow_type {
57 if arrow.data_type() != arrow_type {
58 vortex_bail!(
59 "Arrow array type mismatch: expected {:?}, got {:?}",
60 &options.arrow_type,
61 arrow.data_type()
62 );
63 }
64 }
65
66 Ok(arrow)
67}
68
69pub struct ToArrowOptions {
70 pub arrow_type: Option<DataType>,
72}
73
74impl Options for ToArrowOptions {
75 fn as_any(&self) -> &dyn Any {
76 self
77 }
78}
79
80struct ToArrow;
81
82impl ComputeFnVTable for ToArrow {
83 fn invoke(
84 &self,
85 args: &InvocationArgs,
86 kernels: &[ArcRef<dyn Kernel>],
87 ) -> VortexResult<Output> {
88 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
89
90 for kernel in kernels {
91 if let Some(output) = kernel.invoke(args)? {
92 return Ok(output);
93 }
94 }
95 if let Some(output) = array.invoke(&TO_ARROW_FN, args)? {
96 return Ok(output);
97 }
98
99 if !array.is_canonical() {
101 let canonical_array = array.to_canonical()?;
102 let arrow_array = to_arrow_opts(
103 canonical_array.as_ref(),
104 &ToArrowOptions {
105 arrow_type: arrow_type.cloned(),
106 },
107 )?;
108 return Ok(ArrowArray::new(arrow_array, array.dtype().nullability())
109 .to_array()
110 .into());
111 }
112
113 vortex_bail!(
114 "Failed to convert array {} to Arrow {:?}",
115 array.encoding_id(),
116 arrow_type
117 );
118 }
119
120 fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
121 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
122 Ok(arrow_type
123 .map(|arrow_type| DType::from_arrow((arrow_type, array.dtype().nullability())))
124 .unwrap_or_else(|| array.dtype().clone()))
125 }
126
127 fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
128 let ToArrowArgs { array, .. } = ToArrowArgs::try_from(args)?;
129 Ok(array.len())
130 }
131
132 fn is_elementwise(&self) -> bool {
133 false
134 }
135}
136
137pub static TO_ARROW_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
138 let compute = ComputeFn::new("to_arrow".into(), ArcRef::new_ref(&ToArrow));
139
140 compute.register_kernel(ArcRef::new_ref(&canonical::ToArrowCanonical));
142 compute.register_kernel(ArcRef::new_ref(&temporal::ToArrowTemporal));
143
144 for kernel in inventory::iter::<ToArrowKernelRef> {
145 compute.register_kernel(kernel.0.clone());
146 }
147 compute
148});
149
150pub struct ToArrowArgs<'a> {
151 array: &'a dyn Array,
152 arrow_type: Option<&'a DataType>,
153}
154
155impl<'a> TryFrom<&InvocationArgs<'a>> for ToArrowArgs<'a> {
156 type Error = VortexError;
157
158 fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
159 if value.inputs.len() != 1 {
160 vortex_bail!("Expected 1 input, found {}", value.inputs.len());
161 }
162 let array = value.inputs[0]
163 .array()
164 .ok_or_else(|| vortex_err!("Expected input 0 to be an array"))?;
165 let options = value
166 .options
167 .as_any()
168 .downcast_ref::<ToArrowOptions>()
169 .vortex_expect("Expected options to be ToArrowOptions");
170
171 Ok(ToArrowArgs {
172 array,
173 arrow_type: options.arrow_type.as_ref(),
174 })
175 }
176}
177
178pub struct ToArrowKernelRef(pub ArcRef<dyn Kernel>);
179inventory::collect!(ToArrowKernelRef);
180
181pub trait ToArrowKernel: VTable {
182 fn to_arrow(
183 &self,
184 arr: &Self::Array,
185 arrow_type: Option<&DataType>,
186 ) -> VortexResult<Option<ArrowArrayRef>>;
187}
188
189#[derive(Debug)]
190pub struct ToArrowKernelAdapter<V: VTable>(pub V);
191
192impl<V: VTable + ToArrowKernel> ToArrowKernelAdapter<V> {
193 pub const fn lift(&'static self) -> ToArrowKernelRef {
194 ToArrowKernelRef(ArcRef::new_ref(self))
195 }
196}
197
198impl<V: VTable + ToArrowKernel> Kernel for ToArrowKernelAdapter<V> {
199 fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
200 let inputs = ToArrowArgs::try_from(args)?;
201 let Some(array) = inputs.array.as_opt::<V>() else {
202 return Ok(None);
203 };
204
205 let Some(arrow_array) = V::to_arrow(&self.0, array, inputs.arrow_type)? else {
206 return Ok(None);
207 };
208
209 Ok(Some(
210 ArrowArray::new(arrow_array, array.dtype().nullability())
211 .to_array()
212 .into(),
213 ))
214 }
215}
216
217#[cfg(test)]
218mod tests {
219 use std::sync::Arc;
220
221 use arrow_array::types::Int32Type;
222 use arrow_array::{ArrayRef, PrimitiveArray, StringViewArray, StructArray};
223 use arrow_buffer::NullBuffer;
224
225 use super::to_arrow;
226 use crate::{IntoArray, arrays};
227
228 #[test]
229 fn test_to_arrow() {
230 let array = arrays::StructArray::from_fields(
231 vec![
232 (
233 "a",
234 arrays::PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)])
235 .into_array(),
236 ),
237 (
238 "b",
239 arrays::VarBinViewArray::from_iter_str(vec!["a", "b", "c"]).into_array(),
240 ),
241 ]
242 .as_slice(),
243 )
244 .unwrap();
245
246 let arrow_array: ArrayRef = Arc::new(
247 StructArray::try_from(vec![
248 (
249 "a",
250 Arc::new(PrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
251 vec![1, 0, 2],
252 Some(NullBuffer::from(vec![true, false, true])),
253 )) as ArrayRef,
254 ),
255 (
256 "b",
257 Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
258 ),
259 ])
260 .unwrap(),
261 );
262
263 assert_eq!(
264 &to_arrow(array.as_ref(), &array.dtype().to_arrow_dtype().unwrap()).unwrap(),
265 &arrow_array
266 );
267 }
268}