Skip to main content

vortex_layout/scan/
arrow.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use arrow_array::RecordBatch;
7use arrow_array::RecordBatchReader;
8use arrow_array::cast::AsArray;
9use arrow_schema::ArrowError;
10use arrow_schema::Field;
11use arrow_schema::SchemaRef;
12use futures::Stream;
13use futures::TryStreamExt;
14use vortex_array::ArrayRef;
15use vortex_array::ExecutionCtx;
16use vortex_array::VortexSessionExecute;
17use vortex_array::arrow::ArrowSessionExt;
18use vortex_error::VortexResult;
19use vortex_io::runtime::BlockingRuntime;
20
21use crate::scan::scan_builder::ScanBuilder;
22
23impl ScanBuilder<ArrayRef> {
24    /// Creates a new `RecordBatchReader` from the scan builder.
25    ///
26    /// The `schema` parameter is used to define the schema of the resulting record batches. In
27    /// general, it is not possible to exactly infer an Arrow schema from a Vortex
28    /// [`vortex_array::dtype::DType`], therefore it is required to be provided explicitly.
29    pub fn into_record_batch_reader<B: BlockingRuntime>(
30        self,
31        schema: SchemaRef,
32        runtime: &B,
33    ) -> VortexResult<impl RecordBatchReader + 'static> {
34        let struct_field = Field::new_struct("", schema.fields().clone(), false);
35        let session = self.session().clone();
36
37        let iter = self
38            .map(move |chunk| {
39                let mut ctx = session.create_execution_ctx();
40                to_record_batch(chunk, &struct_field, &mut ctx)
41            })
42            .into_iter(runtime)?
43            .map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
44
45        Ok(RecordBatchIteratorAdapter { iter, schema })
46    }
47
48    pub fn into_record_batch_stream(
49        self,
50        schema: SchemaRef,
51    ) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
52        let struct_field = Field::new_struct("", schema.fields().clone(), false);
53        let session = self.session().clone();
54
55        let stream = self
56            .map(move |chunk| {
57                let mut ctx = session.create_execution_ctx();
58                to_record_batch(chunk, &struct_field, &mut ctx)
59            })
60            .into_stream()?
61            .map_err(|e| ArrowError::ExternalError(Box::new(e)));
62
63        Ok(stream)
64    }
65}
66
67fn to_record_batch(
68    chunk: ArrayRef,
69    data_type: &Field,
70    ctx: &mut ExecutionCtx,
71) -> VortexResult<RecordBatch> {
72    let session = ctx.session().clone();
73    let arrow = session.arrow().execute_arrow(chunk, Some(data_type), ctx)?;
74    Ok(RecordBatch::from(arrow.as_struct().clone()))
75}
76
77/// We create an adapter for record batch iterators that supports clone.
78/// This allows us to create thread-safe [`arrow_array::RecordBatchIterator`].
79#[derive(Clone)]
80pub struct RecordBatchIteratorAdapter<I> {
81    iter: I,
82    schema: SchemaRef,
83}
84
85impl<I> RecordBatchIteratorAdapter<I> {
86    /// Creates a new `RecordBatchIteratorAdapter`.
87    pub fn new(iter: I, schema: SchemaRef) -> Self {
88        Self { iter, schema }
89    }
90}
91
92impl<I> Iterator for RecordBatchIteratorAdapter<I>
93where
94    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
95{
96    type Item = Result<RecordBatch, ArrowError>;
97
98    #[inline]
99    fn next(&mut self) -> Option<Self::Item> {
100        self.iter.next()
101    }
102}
103
104impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
105where
106    I: Iterator<Item = Result<RecordBatch, ArrowError>>,
107{
108    #[inline]
109    fn schema(&self) -> SchemaRef {
110        Arc::clone(&self.schema)
111    }
112}
113
114#[cfg(test)]
115mod tests {
116    use std::sync::Arc;
117
118    use arrow_array::Array;
119    use arrow_array::ArrayRef as ArrowArrayRef;
120    use arrow_array::Int32Array;
121    use arrow_array::RecordBatch;
122    use arrow_array::StringArray;
123    use arrow_array::StructArray;
124    use arrow_array::cast::AsArray;
125    use arrow_schema::ArrowError;
126    use arrow_schema::DataType;
127    use arrow_schema::Field;
128    use arrow_schema::Schema;
129    use vortex_array::ArrayRef;
130    use vortex_array::arrow::FromArrowArray;
131    use vortex_error::VortexResult;
132
133    use super::*;
134    use crate::scan::test::SCAN_SESSION;
135
136    fn create_test_struct_array() -> VortexResult<ArrayRef> {
137        // Create Arrow arrays
138        let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
139        let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
140
141        // Create Arrow struct array
142        let schema = Arc::new(Schema::new(vec![
143            Field::new("id", DataType::Int32, true),
144            Field::new("name", DataType::Utf8, true),
145        ]));
146
147        let struct_array = StructArray::new(
148            schema.fields().clone(),
149            vec![
150                Arc::new(id_array) as ArrowArrayRef,
151                Arc::new(name_array) as ArrowArrayRef,
152            ],
153            None,
154        );
155
156        // Convert to Vortex
157        ArrayRef::from_arrow(&struct_array, true)
158    }
159
160    fn create_arrow_schema() -> Arc<Schema> {
161        Arc::new(Schema::new(vec![
162            Field::new("id", DataType::Int32, true),
163            Field::new("name", DataType::Utf8, true),
164        ]))
165    }
166
167    #[test]
168    fn test_record_batch_conversion() -> VortexResult<()> {
169        let vortex_array = create_test_struct_array()?;
170        let schema = create_arrow_schema();
171        let struct_field = Field::new_struct("", schema.fields().clone(), false);
172        let mut ctx = SCAN_SESSION.create_execution_ctx();
173
174        let batch = to_record_batch(vortex_array, &struct_field, &mut ctx)?;
175        assert_eq!(batch.num_columns(), 2);
176        assert_eq!(batch.num_rows(), 4);
177
178        // Check id column
179        let id_col = batch
180            .column(0)
181            .as_primitive::<arrow_array::types::Int32Type>();
182        assert_eq!(id_col.value(0), 1);
183        assert_eq!(id_col.value(1), 2);
184        assert!(id_col.is_null(2));
185        assert_eq!(id_col.value(3), 4);
186
187        // Check name column
188        let name_col = batch.column(1).as_string::<i32>();
189        assert_eq!(name_col.value(0), "Alice");
190        assert_eq!(name_col.value(1), "Bob");
191        assert_eq!(name_col.value(2), "Charlie");
192        assert!(name_col.is_null(3));
193
194        Ok(())
195    }
196
197    #[test]
198    fn test_record_batch_iterator_adapter() -> VortexResult<()> {
199        let schema = create_arrow_schema();
200        let batch1 = RecordBatch::try_new(
201            Arc::clone(&schema),
202            vec![
203                Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
204                Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
205            ],
206        )?;
207        let batch2 = RecordBatch::try_new(
208            Arc::clone(&schema),
209            vec![
210                Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
211                Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
212            ],
213        )?;
214
215        let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
216        let mut adapter = RecordBatchIteratorAdapter {
217            iter,
218            schema: Arc::clone(&schema),
219        };
220
221        // Test RecordBatchReader trait
222        assert_eq!(adapter.schema(), schema);
223
224        // Test Iterator trait
225        let first = adapter.next().unwrap()?;
226        assert_eq!(first.num_rows(), 2);
227
228        let second = adapter.next().unwrap()?;
229        assert_eq!(second.num_rows(), 2);
230
231        assert!(adapter.next().is_none());
232
233        Ok(())
234    }
235
236    #[test]
237    fn test_error_in_iterator() {
238        let schema = create_arrow_schema();
239        let error = ArrowError::ComputeError("test error".to_string());
240
241        let iter = vec![Err(error)].into_iter();
242        let mut adapter = RecordBatchIteratorAdapter {
243            iter,
244            schema: Arc::clone(&schema),
245        };
246
247        // Test that error is propagated
248        assert_eq!(adapter.schema(), schema);
249        let result = adapter.next().unwrap();
250        assert!(result.is_err());
251    }
252
253    #[test]
254    fn test_mixed_success_and_error() {
255        let schema = create_arrow_schema();
256        let batch = RecordBatch::try_new(
257            Arc::clone(&schema),
258            vec![
259                Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
260                Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
261            ],
262        )
263        .unwrap();
264
265        let error = ArrowError::ComputeError("test error".to_string());
266
267        let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
268        let mut adapter = RecordBatchIteratorAdapter { iter, schema };
269
270        // First batch succeeds
271        let first = adapter.next().unwrap();
272        assert!(first.is_ok());
273
274        // Second batch errors
275        let second = adapter.next().unwrap();
276        assert!(second.is_err());
277
278        // Third batch succeeds
279        let third = adapter.next().unwrap();
280        assert!(third.is_ok());
281    }
282}