vortex_array/arrow/
iter.rs1use std::sync::Arc;
5
6use arrow_array::cast::AsArray;
7use arrow_array::{RecordBatch, RecordBatchReader, ffi_stream};
8use arrow_schema::{ArrowError, DataType, SchemaRef};
9use vortex_dtype::DType;
10use vortex_dtype::arrow::FromArrowType;
11use vortex_error::{VortexError, VortexResult};
12
13use crate::ArrayRef;
14use crate::arrow::FromArrowArray;
15use crate::arrow::compute::to_arrow;
16use crate::iter::ArrayIterator;
17
18pub struct ArrowArrayStreamAdapter {
20 stream: ffi_stream::ArrowArrayStreamReader,
21 dtype: DType,
22}
23
24impl ArrowArrayStreamAdapter {
25 pub fn new(stream: ffi_stream::ArrowArrayStreamReader, dtype: DType) -> Self {
26 Self { stream, dtype }
27 }
28}
29
30impl ArrayIterator for ArrowArrayStreamAdapter {
31 fn dtype(&self) -> &DType {
32 &self.dtype
33 }
34}
35
36impl Iterator for ArrowArrayStreamAdapter {
37 type Item = VortexResult<ArrayRef>;
38
39 fn next(&mut self) -> Option<Self::Item> {
40 let batch = self.stream.next()?;
41
42 Some(batch.map_err(VortexError::from).map(|b| {
43 debug_assert_eq!(&self.dtype, &DType::from_arrow(b.schema()));
44 ArrayRef::from_arrow(b, false)
45 }))
46 }
47}
48
49pub struct VortexRecordBatchReader<I> {
51 iter: I,
52 arrow_schema: SchemaRef,
53 arrow_dtype: DataType,
54}
55
56impl<I: ArrayIterator> VortexRecordBatchReader<I> {
57 pub fn try_new(iter: I) -> VortexResult<Self> {
58 let arrow_schema = Arc::new(iter.dtype().to_arrow_schema()?);
59 let arrow_dtype = DataType::Struct(arrow_schema.fields().clone());
60 Ok(VortexRecordBatchReader {
61 iter,
62 arrow_schema,
63 arrow_dtype,
64 })
65 }
66}
67
68impl<I: ArrayIterator> Iterator for VortexRecordBatchReader<I> {
69 type Item = Result<RecordBatch, ArrowError>;
70
71 fn next(&mut self) -> Option<Self::Item> {
72 self.iter.next().map(|result| {
73 result
74 .and_then(|array| to_arrow(&array, &self.arrow_dtype))
75 .map_err(|e| ArrowError::ExternalError(Box::new(e)))
76 .map(|array| RecordBatch::from(array.as_struct()))
77 })
78 }
79}
80
81impl<I: ArrayIterator> RecordBatchReader for VortexRecordBatchReader<I> {
82 fn schema(&self) -> SchemaRef {
83 self.arrow_schema.clone()
84 }
85}