vortex_array/arrow/compute/to_arrow/
mod.rs1mod canonical;
5mod list;
6mod null_buffer;
7mod temporal;
8mod varbin;
9
10use std::any::Any;
11use std::sync::LazyLock;
12
13use arcref::ArcRef;
14use arrow_array::ArrayRef as ArrowArrayRef;
15use arrow_schema::DataType;
16use vortex_dtype::DType;
17use vortex_dtype::arrow::FromArrowType;
18use vortex_error::VortexError;
19use vortex_error::VortexExpect;
20use vortex_error::VortexResult;
21use vortex_error::vortex_bail;
22use vortex_error::vortex_err;
23
24use crate::Array;
25use crate::arrow::array::ArrowArray;
26use crate::arrow::array::ArrowVTable;
27use crate::compute::ComputeFn;
28use crate::compute::ComputeFnVTable;
29use crate::compute::InvocationArgs;
30use crate::compute::Kernel;
31use crate::compute::Options;
32use crate::compute::Output;
33use crate::vtable::VTable;
34
35static TO_ARROW_FN: LazyLock<ComputeFn> = LazyLock::new(|| {
36 let compute = ComputeFn::new("to_arrow".into(), ArcRef::new_ref(&ToArrow));
37
38 compute.register_kernel(ArcRef::new_ref(&canonical::ToArrowCanonical));
40 compute.register_kernel(ArcRef::new_ref(&temporal::ToArrowTemporal));
41
42 for kernel in inventory::iter::<ToArrowKernelRef> {
43 compute.register_kernel(kernel.0.clone());
44 }
45 compute
46});
47
48pub(crate) fn warm_up_vtable() -> usize {
49 TO_ARROW_FN.kernels().len()
50}
51
52pub fn to_arrow_preferred(array: &dyn Array) -> VortexResult<ArrowArrayRef> {
60 to_arrow_opts(array, &ToArrowOptions { arrow_type: None })
61}
62
63pub fn to_arrow(array: &dyn Array, arrow_type: &DataType) -> VortexResult<ArrowArrayRef> {
65 to_arrow_opts(
66 array,
67 &ToArrowOptions {
68 arrow_type: Some(arrow_type.clone()),
69 },
70 )
71}
72
73pub fn to_arrow_opts(array: &dyn Array, options: &ToArrowOptions) -> VortexResult<ArrowArrayRef> {
74 let arrow = TO_ARROW_FN
75 .invoke(&InvocationArgs {
76 inputs: &[array.into()],
77 options,
78 })?
79 .unwrap_array()?
80 .as_opt::<ArrowVTable>()
81 .ok_or_else(|| vortex_err!("ToArrow compute kernels must return a Vortex ArrowArray"))?
82 .inner()
83 .clone();
84
85 if let Some(arrow_type) = &options.arrow_type
86 && arrow.data_type() != arrow_type
87 {
88 vortex_bail!(
89 "Arrow array type mismatch: expected {:?}, got {:?}",
90 &options.arrow_type,
91 arrow.data_type()
92 );
93 }
94
95 Ok(arrow)
96}
97
98pub struct ToArrowOptions {
99 pub arrow_type: Option<DataType>,
101}
102
103impl Options for ToArrowOptions {
104 fn as_any(&self) -> &dyn Any {
105 self
106 }
107}
108
109struct ToArrow;
110
111impl ComputeFnVTable for ToArrow {
112 fn invoke(
113 &self,
114 args: &InvocationArgs,
115 kernels: &[ArcRef<dyn Kernel>],
116 ) -> VortexResult<Output> {
117 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
118
119 for kernel in kernels {
120 if let Some(output) = kernel.invoke(args)? {
121 return Ok(output);
122 }
123 }
124 if let Some(output) = array.invoke(&TO_ARROW_FN, args)? {
125 return Ok(output);
126 }
127
128 if !array.is_canonical() {
130 let canonical_array = array.to_canonical();
131 let arrow_array = to_arrow_opts(
132 canonical_array.as_ref(),
133 &ToArrowOptions {
134 arrow_type: arrow_type.cloned(),
135 },
136 )?;
137 return Ok(ArrowArray::new(arrow_array, array.dtype().nullability())
138 .to_array()
139 .into());
140 }
141
142 vortex_bail!(
143 "Failed to convert array {} to Arrow {:?}",
144 array.encoding_id(),
145 arrow_type
146 );
147 }
148
149 fn return_dtype(&self, args: &InvocationArgs) -> VortexResult<DType> {
150 let ToArrowArgs { array, arrow_type } = ToArrowArgs::try_from(args)?;
151 Ok(arrow_type
152 .map(|arrow_type| DType::from_arrow((arrow_type, array.dtype().nullability())))
153 .unwrap_or_else(|| array.dtype().clone()))
154 }
155
156 fn return_len(&self, args: &InvocationArgs) -> VortexResult<usize> {
157 let ToArrowArgs { array, .. } = ToArrowArgs::try_from(args)?;
158 Ok(array.len())
159 }
160
161 fn is_elementwise(&self) -> bool {
162 false
163 }
164}
165
166pub struct ToArrowArgs<'a> {
167 array: &'a dyn Array,
168 arrow_type: Option<&'a DataType>,
169}
170
171impl<'a> TryFrom<&InvocationArgs<'a>> for ToArrowArgs<'a> {
172 type Error = VortexError;
173
174 fn try_from(value: &InvocationArgs<'a>) -> Result<Self, Self::Error> {
175 if value.inputs.len() != 1 {
176 vortex_bail!("Expected 1 input, found {}", value.inputs.len());
177 }
178 let array = value.inputs[0]
179 .array()
180 .ok_or_else(|| vortex_err!("Expected input 0 to be an array"))?;
181 let options = value
182 .options
183 .as_any()
184 .downcast_ref::<ToArrowOptions>()
185 .vortex_expect("Expected options to be ToArrowOptions");
186
187 Ok(ToArrowArgs {
188 array,
189 arrow_type: options.arrow_type.as_ref(),
190 })
191 }
192}
193
194pub struct ToArrowKernelRef(pub ArcRef<dyn Kernel>);
195inventory::collect!(ToArrowKernelRef);
196
197pub trait ToArrowKernel: VTable {
198 fn to_arrow(
199 &self,
200 arr: &Self::Array,
201 arrow_type: Option<&DataType>,
202 ) -> VortexResult<Option<ArrowArrayRef>>;
203}
204
205#[derive(Debug)]
206pub struct ToArrowKernelAdapter<V: VTable>(pub V);
207
208impl<V: VTable + ToArrowKernel> ToArrowKernelAdapter<V> {
209 pub const fn lift(&'static self) -> ToArrowKernelRef {
210 ToArrowKernelRef(ArcRef::new_ref(self))
211 }
212}
213
214impl<V: VTable + ToArrowKernel> Kernel for ToArrowKernelAdapter<V> {
215 fn invoke(&self, args: &InvocationArgs) -> VortexResult<Option<Output>> {
216 let inputs = ToArrowArgs::try_from(args)?;
217 let Some(array) = inputs.array.as_opt::<V>() else {
218 return Ok(None);
219 };
220
221 let Some(arrow_array) = V::to_arrow(&self.0, array, inputs.arrow_type)? else {
222 return Ok(None);
223 };
224
225 Ok(Some(
226 ArrowArray::new(arrow_array, array.dtype().nullability())
227 .to_array()
228 .into(),
229 ))
230 }
231}
232
233#[cfg(test)]
234mod tests {
235 use std::sync::Arc;
236
237 use arrow_array::ArrayRef;
238 use arrow_array::PrimitiveArray;
239 use arrow_array::StringViewArray;
240 use arrow_array::StructArray;
241 use arrow_array::types::Int32Type;
242 use arrow_buffer::NullBuffer;
243
244 use super::to_arrow;
245 use crate::IntoArray;
246 use crate::arrays;
247
248 #[test]
249 fn test_to_arrow() {
250 let array = arrays::StructArray::from_fields(
251 vec![
252 (
253 "a",
254 arrays::PrimitiveArray::from_option_iter(vec![Some(1), None, Some(2)])
255 .into_array(),
256 ),
257 (
258 "b",
259 arrays::VarBinViewArray::from_iter_str(vec!["a", "b", "c"]).into_array(),
260 ),
261 ]
262 .as_slice(),
263 )
264 .unwrap();
265
266 let arrow_array: ArrayRef = Arc::new(
267 StructArray::try_from(vec![
268 (
269 "a",
270 Arc::new(PrimitiveArray::<Int32Type>::from_iter_values_with_nulls(
271 vec![1, 0, 2],
272 Some(NullBuffer::from(vec![true, false, true])),
273 )) as ArrayRef,
274 ),
275 (
276 "b",
277 Arc::new(StringViewArray::from(vec![Some("a"), Some("b"), Some("c")])),
278 ),
279 ])
280 .unwrap(),
281 );
282
283 assert_eq!(
284 &to_arrow(array.as_ref(), &array.dtype().to_arrow_dtype().unwrap()).unwrap(),
285 &arrow_array
286 );
287 }
288}