vortex_scan/
arrow.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use arrow_array::cast::AsArray;
5use arrow_array::{RecordBatch, RecordBatchReader};
6use arrow_schema::{ArrowError, DataType, SchemaRef};
7use vortex_array::ArrayRef;
8use vortex_array::arrow::IntoArrowArray;
9use vortex_error::{VortexError, VortexResult};
10
11use crate::ScanBuilder;
12
13impl ScanBuilder<ArrayRef> {
14    /// Creates a new thread-safe `RecordBatchReader` from the scan builder.
15    ///
16    /// This reader can be cloned and passed to multiple threads for concurrent processing.
17    ///
18    /// The `schema` parameter is used to define the schema of the resulting record batches. In
19    /// general, it is not possible to exactly infer an Arrow schema from a Vortex
20    /// [`vortex_dtype::DType`], therefore it is required to be provided explicitly.
21    pub fn into_record_batch_reader(
22        self,
23        schema: SchemaRef,
24    ) -> VortexResult<impl RecordBatchReader + Send + Clone + 'static> {
25        let data_type = DataType::Struct(schema.fields().clone());
26
27        let iter = self
28            .into_array_iter()?
29            .map(move |chunk| to_record_batch(chunk, &data_type));
30
31        Ok(RecordBatchIteratorAdapter { iter, schema })
32    }
33
34    /// Creates a new `RecordBatchReader` from the scan builder that internally drives the scan
35    /// on multithreaded pool of workers.
36    #[cfg(feature = "tokio")]
37    pub fn into_record_batch_reader_multithread(
38        self,
39        schema: SchemaRef,
40    ) -> VortexResult<impl RecordBatchReader + Send + 'static> {
41        use arrow_array::RecordBatchIterator;
42
43        let data_type = DataType::Struct(schema.fields().clone());
44
45        let iter = self.into_iter_multithread(move |chunk| to_record_batch(chunk, &data_type))?;
46
47        Ok(RecordBatchIterator::new(iter, schema))
48    }
49}
50
51fn to_record_batch(
52    chunk: VortexResult<ArrayRef>,
53    data_type: &DataType,
54) -> Result<RecordBatch, ArrowError> {
55    chunk
56        .and_then(|array| {
57            let arrow = array.into_arrow(data_type)?;
58            Ok::<_, VortexError>(RecordBatch::from(arrow.as_struct().clone()))
59        })
60        .map_err(|e| ArrowError::ExternalError(Box::new(e)))
61}
62
63/// We create an adapter for record batch iterators that supports clone.
64/// This allows us to create thread-safe [`RecordBatchIterator`].
65#[derive(Clone)]
66struct RecordBatchIteratorAdapter<I> {
67    iter: I,
68    schema: SchemaRef,
69}
70
71impl<I> Iterator for RecordBatchIteratorAdapter<I>
72where
73    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
74{
75    type Item = Result<RecordBatch, ArrowError>;
76
77    fn next(&mut self) -> Option<Self::Item> {
78        self.iter.next()
79    }
80}
81
82impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
83where
84    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
85{
86    fn schema(&self) -> SchemaRef {
87        self.schema.clone()
88    }
89}
90
91#[cfg(test)]
92mod tests {
93    use std::sync::Arc;
94
95    use arrow_array::cast::AsArray;
96    use arrow_array::{
97        Array, ArrayRef as ArrowArrayRef, Int32Array, RecordBatch, StringArray, StructArray,
98    };
99    use arrow_schema::{ArrowError, DataType, Field, Schema};
100    use vortex_array::ArrayRef;
101    use vortex_array::arrow::FromArrowArray;
102    use vortex_error::{VortexResult, vortex_err};
103
104    use super::*;
105
106    fn create_test_struct_array() -> VortexResult<ArrayRef> {
107        // Create Arrow arrays
108        let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
109        let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
110
111        // Create Arrow struct array
112        let schema = Arc::new(Schema::new(vec![
113            Field::new("id", DataType::Int32, true),
114            Field::new("name", DataType::Utf8, true),
115        ]));
116
117        let struct_array = StructArray::new(
118            schema.fields().clone(),
119            vec![
120                Arc::new(id_array) as ArrowArrayRef,
121                Arc::new(name_array) as ArrowArrayRef,
122            ],
123            None,
124        );
125
126        // Convert to Vortex
127        Ok(ArrayRef::from_arrow(&struct_array, true))
128    }
129
130    fn create_arrow_schema() -> Arc<Schema> {
131        Arc::new(Schema::new(vec![
132            Field::new("id", DataType::Int32, true),
133            Field::new("name", DataType::Utf8, true),
134        ]))
135    }
136
137    #[test]
138    fn test_record_batch_conversion() -> VortexResult<()> {
139        let vortex_array = create_test_struct_array()?;
140        let schema = create_arrow_schema();
141        let data_type = DataType::Struct(schema.fields().clone());
142
143        let result = to_record_batch(Ok(vortex_array), &data_type);
144        assert!(result.is_ok());
145
146        let batch = result.unwrap();
147        assert_eq!(batch.num_columns(), 2);
148        assert_eq!(batch.num_rows(), 4);
149
150        // Check id column
151        let id_col = batch
152            .column(0)
153            .as_primitive::<arrow_array::types::Int32Type>();
154        assert_eq!(id_col.value(0), 1);
155        assert_eq!(id_col.value(1), 2);
156        assert!(id_col.is_null(2));
157        assert_eq!(id_col.value(3), 4);
158
159        // Check name column
160        let name_col = batch.column(1).as_string::<i32>();
161        assert_eq!(name_col.value(0), "Alice");
162        assert_eq!(name_col.value(1), "Bob");
163        assert_eq!(name_col.value(2), "Charlie");
164        assert!(name_col.is_null(3));
165
166        Ok(())
167    }
168
169    #[test]
170    fn test_record_batch_conversion_error() {
171        let error = vortex_err!("test error");
172        let data_type = DataType::Struct(create_arrow_schema().fields().clone());
173
174        let result = to_record_batch(Err(error), &data_type);
175        assert!(result.is_err());
176        assert!(matches!(result.unwrap_err(), ArrowError::ExternalError(_)));
177    }
178
179    #[test]
180    fn test_record_batch_iterator_adapter() {
181        let schema = create_arrow_schema();
182        let batch1 = RecordBatch::try_new(
183            schema.clone(),
184            vec![
185                Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
186                Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
187            ],
188        )
189        .unwrap();
190        let batch2 = RecordBatch::try_new(
191            schema.clone(),
192            vec![
193                Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
194                Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
195            ],
196        )
197        .unwrap();
198
199        let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
200        let mut adapter = RecordBatchIteratorAdapter {
201            iter,
202            schema: schema.clone(),
203        };
204
205        // Test RecordBatchReader trait
206        assert_eq!(adapter.schema(), schema);
207
208        // Test Iterator trait
209        let first = adapter.next().unwrap().unwrap();
210        assert_eq!(first.num_rows(), 2);
211
212        let second = adapter.next().unwrap().unwrap();
213        assert_eq!(second.num_rows(), 2);
214
215        assert!(adapter.next().is_none());
216    }
217
218    #[test]
219    fn test_error_in_iterator() {
220        let schema = create_arrow_schema();
221        let error = ArrowError::ComputeError("test error".to_string());
222
223        let iter = vec![Err(error)].into_iter();
224        let mut adapter = RecordBatchIteratorAdapter {
225            iter,
226            schema: schema.clone(),
227        };
228
229        // Test that error is propagated
230        assert_eq!(adapter.schema(), schema);
231        let result = adapter.next().unwrap();
232        assert!(result.is_err());
233    }
234
235    #[test]
236    fn test_mixed_success_and_error() {
237        let schema = create_arrow_schema();
238        let batch = RecordBatch::try_new(
239            schema.clone(),
240            vec![
241                Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
242                Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
243            ],
244        )
245        .unwrap();
246
247        let error = ArrowError::ComputeError("test error".to_string());
248
249        let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
250        let mut adapter = RecordBatchIteratorAdapter { iter, schema };
251
252        // First batch succeeds
253        let first = adapter.next().unwrap();
254        assert!(first.is_ok());
255
256        // Second batch errors
257        let second = adapter.next().unwrap();
258        assert!(second.is_err());
259
260        // Third batch succeeds
261        let third = adapter.next().unwrap();
262        assert!(third.is_ok());
263    }
264}