vortex_scan/
arrow.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use arrow_array::cast::AsArray;
5use arrow_array::{RecordBatch, RecordBatchReader};
6use arrow_schema::{ArrowError, DataType, SchemaRef};
7use futures::{Stream, TryStreamExt};
8use vortex_array::ArrayRef;
9use vortex_array::arrow::IntoArrowArray;
10use vortex_error::VortexResult;
11use vortex_io::runtime::BlockingRuntime;
12
13use crate::ScanBuilder;
14
15impl ScanBuilder<ArrayRef> {
16    /// Creates a new `RecordBatchReader` from the scan builder.
17    ///
18    /// The `schema` parameter is used to define the schema of the resulting record batches. In
19    /// general, it is not possible to exactly infer an Arrow schema from a Vortex
20    /// [`vortex_dtype::DType`], therefore it is required to be provided explicitly.
21    pub fn into_record_batch_reader<B: BlockingRuntime>(
22        self,
23        schema: SchemaRef,
24        runtime: &B,
25    ) -> VortexResult<impl RecordBatchReader + 'static> {
26        let data_type = DataType::Struct(schema.fields().clone());
27
28        let iter = self
29            .map(move |chunk| to_record_batch(chunk, &data_type))
30            .into_iter(runtime)?
31            .map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
32
33        Ok(RecordBatchIteratorAdapter { iter, schema })
34    }
35
36    pub fn into_record_batch_stream(
37        self,
38        schema: SchemaRef,
39    ) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
40        let data_type = DataType::Struct(schema.fields().clone());
41
42        let stream = self
43            .map(move |chunk| to_record_batch(chunk, &data_type))
44            .into_stream()?
45            .map_err(|e| ArrowError::ExternalError(Box::new(e)));
46
47        Ok(stream)
48    }
49}
50
51fn to_record_batch(chunk: ArrayRef, data_type: &DataType) -> VortexResult<RecordBatch> {
52    let arrow = chunk.into_arrow(data_type)?;
53    Ok(RecordBatch::from(arrow.as_struct().clone()))
54}
55
56/// We create an adapter for record batch iterators that supports clone.
57/// This allows us to create thread-safe [`arrow_array::RecordBatchIterator`].
58#[derive(Clone)]
59pub struct RecordBatchIteratorAdapter<I> {
60    iter: I,
61    schema: SchemaRef,
62}
63
64impl<I> RecordBatchIteratorAdapter<I> {
65    /// Creates a new `RecordBatchIteratorAdapter`.
66    pub fn new(iter: I, schema: SchemaRef) -> Self {
67        Self { iter, schema }
68    }
69}
70
71impl<I> Iterator for RecordBatchIteratorAdapter<I>
72where
73    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
74{
75    type Item = Result<RecordBatch, ArrowError>;
76
77    #[inline]
78    fn next(&mut self) -> Option<Self::Item> {
79        self.iter.next()
80    }
81}
82
83impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
84where
85    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
86{
87    #[inline]
88    fn schema(&self) -> SchemaRef {
89        self.schema.clone()
90    }
91}
92
93#[cfg(test)]
94mod tests {
95    use std::sync::Arc;
96
97    use arrow_array::cast::AsArray;
98    use arrow_array::{
99        Array, ArrayRef as ArrowArrayRef, Int32Array, RecordBatch, StringArray, StructArray,
100    };
101    use arrow_schema::{ArrowError, DataType, Field, Schema};
102    use vortex_array::ArrayRef;
103    use vortex_array::arrow::FromArrowArray;
104    use vortex_error::VortexResult;
105
106    use super::*;
107
108    fn create_test_struct_array() -> VortexResult<ArrayRef> {
109        // Create Arrow arrays
110        let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
111        let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
112
113        // Create Arrow struct array
114        let schema = Arc::new(Schema::new(vec![
115            Field::new("id", DataType::Int32, true),
116            Field::new("name", DataType::Utf8, true),
117        ]));
118
119        let struct_array = StructArray::new(
120            schema.fields().clone(),
121            vec![
122                Arc::new(id_array) as ArrowArrayRef,
123                Arc::new(name_array) as ArrowArrayRef,
124            ],
125            None,
126        );
127
128        // Convert to Vortex
129        Ok(ArrayRef::from_arrow(&struct_array, true))
130    }
131
132    fn create_arrow_schema() -> Arc<Schema> {
133        Arc::new(Schema::new(vec![
134            Field::new("id", DataType::Int32, true),
135            Field::new("name", DataType::Utf8, true),
136        ]))
137    }
138
139    #[test]
140    fn test_record_batch_conversion() -> VortexResult<()> {
141        let vortex_array = create_test_struct_array()?;
142        let schema = create_arrow_schema();
143        let data_type = DataType::Struct(schema.fields().clone());
144
145        let result = to_record_batch(vortex_array, &data_type);
146        assert!(result.is_ok());
147
148        let batch = result.unwrap();
149        assert_eq!(batch.num_columns(), 2);
150        assert_eq!(batch.num_rows(), 4);
151
152        // Check id column
153        let id_col = batch
154            .column(0)
155            .as_primitive::<arrow_array::types::Int32Type>();
156        assert_eq!(id_col.value(0), 1);
157        assert_eq!(id_col.value(1), 2);
158        assert!(id_col.is_null(2));
159        assert_eq!(id_col.value(3), 4);
160
161        // Check name column
162        let name_col = batch.column(1).as_string::<i32>();
163        assert_eq!(name_col.value(0), "Alice");
164        assert_eq!(name_col.value(1), "Bob");
165        assert_eq!(name_col.value(2), "Charlie");
166        assert!(name_col.is_null(3));
167
168        Ok(())
169    }
170
171    #[test]
172    fn test_record_batch_iterator_adapter() {
173        let schema = create_arrow_schema();
174        let batch1 = RecordBatch::try_new(
175            schema.clone(),
176            vec![
177                Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
178                Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
179            ],
180        )
181        .unwrap();
182        let batch2 = RecordBatch::try_new(
183            schema.clone(),
184            vec![
185                Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
186                Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
187            ],
188        )
189        .unwrap();
190
191        let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
192        let mut adapter = RecordBatchIteratorAdapter {
193            iter,
194            schema: schema.clone(),
195        };
196
197        // Test RecordBatchReader trait
198        assert_eq!(adapter.schema(), schema);
199
200        // Test Iterator trait
201        let first = adapter.next().unwrap().unwrap();
202        assert_eq!(first.num_rows(), 2);
203
204        let second = adapter.next().unwrap().unwrap();
205        assert_eq!(second.num_rows(), 2);
206
207        assert!(adapter.next().is_none());
208    }
209
210    #[test]
211    fn test_error_in_iterator() {
212        let schema = create_arrow_schema();
213        let error = ArrowError::ComputeError("test error".to_string());
214
215        let iter = vec![Err(error)].into_iter();
216        let mut adapter = RecordBatchIteratorAdapter {
217            iter,
218            schema: schema.clone(),
219        };
220
221        // Test that error is propagated
222        assert_eq!(adapter.schema(), schema);
223        let result = adapter.next().unwrap();
224        assert!(result.is_err());
225    }
226
227    #[test]
228    fn test_mixed_success_and_error() {
229        let schema = create_arrow_schema();
230        let batch = RecordBatch::try_new(
231            schema.clone(),
232            vec![
233                Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
234                Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
235            ],
236        )
237        .unwrap();
238
239        let error = ArrowError::ComputeError("test error".to_string());
240
241        let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
242        let mut adapter = RecordBatchIteratorAdapter { iter, schema };
243
244        // First batch succeeds
245        let first = adapter.next().unwrap();
246        assert!(first.is_ok());
247
248        // Second batch errors
249        let second = adapter.next().unwrap();
250        assert!(second.is_err());
251
252        // Third batch succeeds
253        let third = adapter.next().unwrap();
254        assert!(third.is_ok());
255    }
256}