vortex_array/arrow/
iter.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright the Vortex contributors
3
4use std::sync::Arc;
5
6use arrow_array::cast::AsArray;
7use arrow_array::{RecordBatch, RecordBatchReader, ffi_stream};
8use arrow_schema::{ArrowError, DataType, SchemaRef};
9use vortex_dtype::DType;
10use vortex_dtype::arrow::FromArrowType;
11use vortex_error::{VortexError, VortexResult};
12
13use crate::ArrayRef;
14use crate::arrow::FromArrowArray;
15use crate::arrow::compute::to_arrow;
16use crate::iter::ArrayIterator;
17
18/// An adapter for converting an `ArrowArrayStreamReader` into a Vortex `ArrayStream`.
19pub struct ArrowArrayStreamAdapter {
20    stream: ffi_stream::ArrowArrayStreamReader,
21    dtype: DType,
22}
23
24impl ArrowArrayStreamAdapter {
25    pub fn new(stream: ffi_stream::ArrowArrayStreamReader, dtype: DType) -> Self {
26        Self { stream, dtype }
27    }
28}
29
30impl ArrayIterator for ArrowArrayStreamAdapter {
31    fn dtype(&self) -> &DType {
32        &self.dtype
33    }
34}
35
36impl Iterator for ArrowArrayStreamAdapter {
37    type Item = VortexResult<ArrayRef>;
38
39    fn next(&mut self) -> Option<Self::Item> {
40        let batch = self.stream.next()?;
41
42        Some(batch.map_err(VortexError::from).map(|b| {
43            debug_assert_eq!(&self.dtype, &DType::from_arrow(b.schema()));
44            ArrayRef::from_arrow(b, false)
45        }))
46    }
47}
48
49/// Adapter for converting a [`ArrayIterator`] into an Arrow [`RecordBatchReader`].
50pub struct VortexRecordBatchReader<I> {
51    iter: I,
52    arrow_schema: SchemaRef,
53    arrow_dtype: DataType,
54}
55
56impl<I: ArrayIterator> VortexRecordBatchReader<I> {
57    pub fn try_new(iter: I) -> VortexResult<Self> {
58        let arrow_schema = Arc::new(iter.dtype().to_arrow_schema()?);
59        let arrow_dtype = DataType::Struct(arrow_schema.fields().clone());
60        Ok(VortexRecordBatchReader {
61            iter,
62            arrow_schema,
63            arrow_dtype,
64        })
65    }
66}
67
68impl<I: ArrayIterator> Iterator for VortexRecordBatchReader<I> {
69    type Item = Result<RecordBatch, ArrowError>;
70
71    fn next(&mut self) -> Option<Self::Item> {
72        self.iter.next().map(|result| {
73            result
74                .and_then(|array| to_arrow(&array, &self.arrow_dtype))
75                .map_err(|e| ArrowError::ExternalError(Box::new(e)))
76                .map(|array| RecordBatch::from(array.as_struct()))
77        })
78    }
79}
80
81impl<I: ArrayIterator> RecordBatchReader for VortexRecordBatchReader<I> {
82    fn schema(&self) -> SchemaRef {
83        self.arrow_schema.clone()
84    }
85}