vortex_scan/
arrow.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use arrow_array::cast::AsArray;
5use arrow_array::{RecordBatch, RecordBatchReader};
6use arrow_schema::{ArrowError, DataType, SchemaRef};
7use vortex_array::ArrayRef;
8use vortex_array::arrow::IntoArrowArray;
9use vortex_error::{VortexError, VortexResult};
10
11use crate::ScanBuilder;
12
13impl ScanBuilder<ArrayRef> {
14    /// Creates a new thread-safe `RecordBatchReader` from the scan builder.
15    ///
16    /// This reader can be cloned and passed to multiple threads for concurrent processing.
17    ///
18    /// The `schema` parameter is used to define the schema of the resulting record batches. In
19    /// general, it is not possible to exactly infer an Arrow schema from a Vortex
20    /// [`vortex_dtype::DType`], therefore it is required to be provided explicitly.
21    pub fn into_record_batch_reader(
22        self,
23        schema: SchemaRef,
24    ) -> VortexResult<impl RecordBatchReader + Send + Clone + 'static> {
25        let data_type = DataType::Struct(schema.fields().clone());
26
27        let iter = self
28            .into_array_iter()?
29            .map(move |chunk| to_record_batch(chunk, &data_type));
30
31        Ok(RecordBatchIteratorAdapter { iter, schema })
32    }
33
34    /// Creates a new `RecordBatchReader` from the scan builder that internally drives the scan
35    /// on multithreaded pool of workers.
36    #[cfg(feature = "tokio")]
37    pub fn into_record_batch_reader_multithread(
38        self,
39        schema: SchemaRef,
40    ) -> VortexResult<impl RecordBatchReader + Send + 'static> {
41        use arrow_array::RecordBatchIterator;
42
43        let data_type = DataType::Struct(schema.fields().clone());
44
45        let iter = self.into_iter_multithread(move |chunk| to_record_batch(chunk, &data_type))?;
46
47        Ok(RecordBatchIterator::new(iter, schema))
48    }
49}
50
51fn to_record_batch(
52    chunk: VortexResult<ArrayRef>,
53    data_type: &DataType,
54) -> Result<RecordBatch, ArrowError> {
55    chunk
56        .and_then(|array| {
57            let arrow = array.into_arrow(data_type)?;
58            Ok::<_, VortexError>(RecordBatch::from(arrow.as_struct().clone()))
59        })
60        .map_err(|e| ArrowError::ExternalError(Box::new(e)))
61}
62
63/// We create an adapter for record batch iterators that supports clone.
64/// This allows us to create thread-safe [`RecordBatchIterator`].
65#[derive(Clone)]
66struct RecordBatchIteratorAdapter<I> {
67    iter: I,
68    schema: SchemaRef,
69}
70
71impl<I> Iterator for RecordBatchIteratorAdapter<I>
72where
73    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
74{
75    type Item = Result<RecordBatch, ArrowError>;
76
77    #[inline]
78    fn next(&mut self) -> Option<Self::Item> {
79        self.iter.next()
80    }
81}
82
83impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
84where
85    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
86{
87    #[inline]
88    fn schema(&self) -> SchemaRef {
89        self.schema.clone()
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use std::sync::Arc;
96
97    use arrow_array::cast::AsArray;
98    use arrow_array::{
99        Array, ArrayRef as ArrowArrayRef, Int32Array, RecordBatch, StringArray, StructArray,
100    };
101    use arrow_schema::{ArrowError, DataType, Field, Schema};
102    use vortex_array::ArrayRef;
103    use vortex_array::arrow::FromArrowArray;
104    use vortex_error::{VortexResult, vortex_err};
105
106    use super::*;
107
108    fn create_test_struct_array() -> VortexResult<ArrayRef> {
109        // Create Arrow arrays
110        let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
111        let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
112
113        // Create Arrow struct array
114        let schema = Arc::new(Schema::new(vec![
115            Field::new("id", DataType::Int32, true),
116            Field::new("name", DataType::Utf8, true),
117        ]));
118
119        let struct_array = StructArray::new(
120            schema.fields().clone(),
121            vec![
122                Arc::new(id_array) as ArrowArrayRef,
123                Arc::new(name_array) as ArrowArrayRef,
124            ],
125            None,
126        );
127
128        // Convert to Vortex
129        Ok(ArrayRef::from_arrow(&struct_array, true))
130    }
131
132    fn create_arrow_schema() -> Arc<Schema> {
133        Arc::new(Schema::new(vec![
134            Field::new("id", DataType::Int32, true),
135            Field::new("name", DataType::Utf8, true),
136        ]))
137    }
138
139    #[test]
140    fn test_record_batch_conversion() -> VortexResult<()> {
141        let vortex_array = create_test_struct_array()?;
142        let schema = create_arrow_schema();
143        let data_type = DataType::Struct(schema.fields().clone());
144
145        let result = to_record_batch(Ok(vortex_array), &data_type);
146        assert!(result.is_ok());
147
148        let batch = result.unwrap();
149        assert_eq!(batch.num_columns(), 2);
150        assert_eq!(batch.num_rows(), 4);
151
152        // Check id column
153        let id_col = batch
154            .column(0)
155            .as_primitive::<arrow_array::types::Int32Type>();
156        assert_eq!(id_col.value(0), 1);
157        assert_eq!(id_col.value(1), 2);
158        assert!(id_col.is_null(2));
159        assert_eq!(id_col.value(3), 4);
160
161        // Check name column
162        let name_col = batch.column(1).as_string::<i32>();
163        assert_eq!(name_col.value(0), "Alice");
164        assert_eq!(name_col.value(1), "Bob");
165        assert_eq!(name_col.value(2), "Charlie");
166        assert!(name_col.is_null(3));
167
168        Ok(())
169    }
170
171    #[test]
172    fn test_record_batch_conversion_error() {
173        let error = vortex_err!("test error");
174        let data_type = DataType::Struct(create_arrow_schema().fields().clone());
175
176        let result = to_record_batch(Err(error), &data_type);
177        assert!(result.is_err());
178        assert!(matches!(result.unwrap_err(), ArrowError::ExternalError(_)));
179    }
180
181    #[test]
182    fn test_record_batch_iterator_adapter() {
183        let schema = create_arrow_schema();
184        let batch1 = RecordBatch::try_new(
185            schema.clone(),
186            vec![
187                Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
188                Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
189            ],
190        )
191        .unwrap();
192        let batch2 = RecordBatch::try_new(
193            schema.clone(),
194            vec![
195                Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
196                Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
197            ],
198        )
199        .unwrap();
200
201        let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
202        let mut adapter = RecordBatchIteratorAdapter {
203            iter,
204            schema: schema.clone(),
205        };
206
207        // Test RecordBatchReader trait
208        assert_eq!(adapter.schema(), schema);
209
210        // Test Iterator trait
211        let first = adapter.next().unwrap().unwrap();
212        assert_eq!(first.num_rows(), 2);
213
214        let second = adapter.next().unwrap().unwrap();
215        assert_eq!(second.num_rows(), 2);
216
217        assert!(adapter.next().is_none());
218    }
219
220    #[test]
221    fn test_error_in_iterator() {
222        let schema = create_arrow_schema();
223        let error = ArrowError::ComputeError("test error".to_string());
224
225        let iter = vec![Err(error)].into_iter();
226        let mut adapter = RecordBatchIteratorAdapter {
227            iter,
228            schema: schema.clone(),
229        };
230
231        // Test that error is propagated
232        assert_eq!(adapter.schema(), schema);
233        let result = adapter.next().unwrap();
234        assert!(result.is_err());
235    }
236
237    #[test]
238    fn test_mixed_success_and_error() {
239        let schema = create_arrow_schema();
240        let batch = RecordBatch::try_new(
241            schema.clone(),
242            vec![
243                Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
244                Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
245            ],
246        )
247        .unwrap();
248
249        let error = ArrowError::ComputeError("test error".to_string());
250
251        let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
252        let mut adapter = RecordBatchIteratorAdapter { iter, schema };
253
254        // First batch succeeds
255        let first = adapter.next().unwrap();
256        assert!(first.is_ok());
257
258        // Second batch errors
259        let second = adapter.next().unwrap();
260        assert!(second.is_err());
261
262        // Third batch succeeds
263        let third = adapter.next().unwrap();
264        assert!(third.is_ok());
265    }
266}