1use arrow_array::cast::AsArray;
5use arrow_array::{RecordBatch, RecordBatchReader};
6use arrow_schema::{ArrowError, DataType, SchemaRef};
7use vortex_array::ArrayRef;
8use vortex_array::arrow::IntoArrowArray;
9use vortex_error::{VortexError, VortexResult};
10
11use crate::ScanBuilder;
12
13impl ScanBuilder<ArrayRef> {
14 pub fn into_record_batch_reader(
22 self,
23 schema: SchemaRef,
24 ) -> VortexResult<impl RecordBatchReader + Send + Clone + 'static> {
25 let data_type = DataType::Struct(schema.fields().clone());
26
27 let iter = self
28 .into_array_iter()?
29 .map(move |chunk| to_record_batch(chunk, &data_type));
30
31 Ok(RecordBatchIteratorAdapter { iter, schema })
32 }
33
34 #[cfg(feature = "tokio")]
37 pub fn into_record_batch_reader_multithread(
38 self,
39 schema: SchemaRef,
40 ) -> VortexResult<impl RecordBatchReader + Send + 'static> {
41 use arrow_array::RecordBatchIterator;
42
43 let data_type = DataType::Struct(schema.fields().clone());
44
45 let iter = self.into_iter_multithread(move |chunk| to_record_batch(chunk, &data_type))?;
46
47 Ok(RecordBatchIterator::new(iter, schema))
48 }
49}
50
51fn to_record_batch(
52 chunk: VortexResult<ArrayRef>,
53 data_type: &DataType,
54) -> Result<RecordBatch, ArrowError> {
55 chunk
56 .and_then(|array| {
57 let arrow = array.into_arrow(data_type)?;
58 Ok::<_, VortexError>(RecordBatch::from(arrow.as_struct().clone()))
59 })
60 .map_err(|e| ArrowError::ExternalError(Box::new(e)))
61}
62
63#[derive(Clone)]
66struct RecordBatchIteratorAdapter<I> {
67 iter: I,
68 schema: SchemaRef,
69}
70
71impl<I> Iterator for RecordBatchIteratorAdapter<I>
72where
73 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
74{
75 type Item = Result<RecordBatch, ArrowError>;
76
77 fn next(&mut self) -> Option<Self::Item> {
78 self.iter.next()
79 }
80}
81
82impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
83where
84 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
85{
86 fn schema(&self) -> SchemaRef {
87 self.schema.clone()
88 }
89}