vortex_scan/
arrow.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use arrow_array::RecordBatch;
5use arrow_array::RecordBatchReader;
6use arrow_array::cast::AsArray;
7use arrow_schema::ArrowError;
8use arrow_schema::DataType;
9use arrow_schema::SchemaRef;
10use futures::Stream;
11use futures::TryStreamExt;
12use vortex_array::ArrayRef;
13use vortex_array::arrow::ArrowArrayExecutor;
14use vortex_array::arrow::IntoArrowArray;
15use vortex_error::VortexResult;
16use vortex_io::runtime::BlockingRuntime;
17use vortex_layout::layouts::USE_VORTEX_OPERATORS;
18use vortex_session::VortexSession;
19
20use crate::ScanBuilder;
21
22impl ScanBuilder<ArrayRef> {
23    /// Creates a new `RecordBatchReader` from the scan builder.
24    ///
25    /// The `schema` parameter is used to define the schema of the resulting record batches. In
26    /// general, it is not possible to exactly infer an Arrow schema from a Vortex
27    /// [`vortex_dtype::DType`], therefore it is required to be provided explicitly.
28    pub fn into_record_batch_reader<B: BlockingRuntime>(
29        self,
30        schema: SchemaRef,
31        runtime: &B,
32    ) -> VortexResult<impl RecordBatchReader + 'static> {
33        let data_type = DataType::Struct(schema.fields().clone());
34        let session = self.session().clone();
35
36        let iter = self
37            .map(move |chunk| to_record_batch(chunk, &data_type, &session))
38            .into_iter(runtime)?
39            .map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
40
41        Ok(RecordBatchIteratorAdapter { iter, schema })
42    }
43
44    pub fn into_record_batch_stream(
45        self,
46        schema: SchemaRef,
47    ) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
48        let data_type = DataType::Struct(schema.fields().clone());
49        let session = self.session().clone();
50
51        let stream = self
52            .map(move |chunk| to_record_batch(chunk, &data_type, &session))
53            .into_stream()?
54            .map_err(|e| ArrowError::ExternalError(Box::new(e)));
55
56        Ok(stream)
57    }
58}
59
60fn to_record_batch(
61    chunk: ArrayRef,
62    data_type: &DataType,
63    session: &VortexSession,
64) -> VortexResult<RecordBatch> {
65    if *USE_VORTEX_OPERATORS {
66        let arrow = chunk.execute_arrow(data_type, session)?;
67        Ok(RecordBatch::from(arrow.as_struct().clone()))
68    } else {
69        let arrow = chunk.into_arrow(data_type)?;
70        Ok(RecordBatch::from(arrow.as_struct().clone()))
71    }
72}
73
74/// We create an adapter for record batch iterators that supports clone.
75/// This allows us to create thread-safe [`arrow_array::RecordBatchIterator`].
76#[derive(Clone)]
77pub struct RecordBatchIteratorAdapter<I> {
78    iter: I,
79    schema: SchemaRef,
80}
81
82impl<I> RecordBatchIteratorAdapter<I> {
83    /// Creates a new `RecordBatchIteratorAdapter`.
84    pub fn new(iter: I, schema: SchemaRef) -> Self {
85        Self { iter, schema }
86    }
87}
88
89impl<I> Iterator for RecordBatchIteratorAdapter<I>
90where
91    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
92{
93    type Item = Result<RecordBatch, ArrowError>;
94
95    #[inline]
96    fn next(&mut self) -> Option<Self::Item> {
97        self.iter.next()
98    }
99}
100
101impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
102where
103    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
104{
105    #[inline]
106    fn schema(&self) -> SchemaRef {
107        self.schema.clone()
108    }
109}
110
111#[cfg(test)]
112mod tests {
113    use std::sync::Arc;
114
115    use arrow_array::Array;
116    use arrow_array::ArrayRef as ArrowArrayRef;
117    use arrow_array::Int32Array;
118    use arrow_array::RecordBatch;
119    use arrow_array::StringArray;
120    use arrow_array::StructArray;
121    use arrow_array::cast::AsArray;
122    use arrow_schema::ArrowError;
123    use arrow_schema::DataType;
124    use arrow_schema::Field;
125    use arrow_schema::Schema;
126    use vortex_array::ArrayRef;
127    use vortex_array::arrow::FromArrowArray;
128    use vortex_error::VortexResult;
129
130    use super::*;
131    use crate::test::SESSION;
132
133    fn create_test_struct_array() -> VortexResult<ArrayRef> {
134        // Create Arrow arrays
135        let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
136        let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
137
138        // Create Arrow struct array
139        let schema = Arc::new(Schema::new(vec![
140            Field::new("id", DataType::Int32, true),
141            Field::new("name", DataType::Utf8, true),
142        ]));
143
144        let struct_array = StructArray::new(
145            schema.fields().clone(),
146            vec![
147                Arc::new(id_array) as ArrowArrayRef,
148                Arc::new(name_array) as ArrowArrayRef,
149            ],
150            None,
151        );
152
153        // Convert to Vortex
154        Ok(ArrayRef::from_arrow(&struct_array, true))
155    }
156
157    fn create_arrow_schema() -> Arc<Schema> {
158        Arc::new(Schema::new(vec![
159            Field::new("id", DataType::Int32, true),
160            Field::new("name", DataType::Utf8, true),
161        ]))
162    }
163
164    #[test]
165    fn test_record_batch_conversion() -> VortexResult<()> {
166        let vortex_array = create_test_struct_array()?;
167        let schema = create_arrow_schema();
168        let data_type = DataType::Struct(schema.fields().clone());
169
170        let batch = to_record_batch(vortex_array, &data_type, &SESSION)?;
171        assert_eq!(batch.num_columns(), 2);
172        assert_eq!(batch.num_rows(), 4);
173
174        // Check id column
175        let id_col = batch
176            .column(0)
177            .as_primitive::<arrow_array::types::Int32Type>();
178        assert_eq!(id_col.value(0), 1);
179        assert_eq!(id_col.value(1), 2);
180        assert!(id_col.is_null(2));
181        assert_eq!(id_col.value(3), 4);
182
183        // Check name column
184        let name_col = batch.column(1).as_string::<i32>();
185        assert_eq!(name_col.value(0), "Alice");
186        assert_eq!(name_col.value(1), "Bob");
187        assert_eq!(name_col.value(2), "Charlie");
188        assert!(name_col.is_null(3));
189
190        Ok(())
191    }
192
193    #[test]
194    fn test_record_batch_iterator_adapter() -> VortexResult<()> {
195        let schema = create_arrow_schema();
196        let batch1 = RecordBatch::try_new(
197            schema.clone(),
198            vec![
199                Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
200                Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
201            ],
202        )?;
203        let batch2 = RecordBatch::try_new(
204            schema.clone(),
205            vec![
206                Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
207                Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
208            ],
209        )?;
210
211        let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
212        let mut adapter = RecordBatchIteratorAdapter {
213            iter,
214            schema: schema.clone(),
215        };
216
217        // Test RecordBatchReader trait
218        assert_eq!(adapter.schema(), schema);
219
220        // Test Iterator trait
221        let first = adapter.next().unwrap()?;
222        assert_eq!(first.num_rows(), 2);
223
224        let second = adapter.next().unwrap()?;
225        assert_eq!(second.num_rows(), 2);
226
227        assert!(adapter.next().is_none());
228
229        Ok(())
230    }
231
232    #[test]
233    fn test_error_in_iterator() {
234        let schema = create_arrow_schema();
235        let error = ArrowError::ComputeError("test error".to_string());
236
237        let iter = vec![Err(error)].into_iter();
238        let mut adapter = RecordBatchIteratorAdapter {
239            iter,
240            schema: schema.clone(),
241        };
242
243        // Test that error is propagated
244        assert_eq!(adapter.schema(), schema);
245        let result = adapter.next().unwrap();
246        assert!(result.is_err());
247    }
248
249    #[test]
250    fn test_mixed_success_and_error() {
251        let schema = create_arrow_schema();
252        let batch = RecordBatch::try_new(
253            schema.clone(),
254            vec![
255                Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
256                Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
257            ],
258        )
259        .unwrap();
260
261        let error = ArrowError::ComputeError("test error".to_string());
262
263        let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
264        let mut adapter = RecordBatchIteratorAdapter { iter, schema };
265
266        // First batch succeeds
267        let first = adapter.next().unwrap();
268        assert!(first.is_ok());
269
270        // Second batch errors
271        let second = adapter.next().unwrap();
272        assert!(second.is_err());
273
274        // Third batch succeeds
275        let third = adapter.next().unwrap();
276        assert!(third.is_ok());
277    }
278}