1use arrow_array::cast::AsArray;
5use arrow_array::{RecordBatch, RecordBatchReader};
6use arrow_schema::{ArrowError, DataType, SchemaRef};
7use futures::{Stream, TryStreamExt};
8use vortex_array::ArrayRef;
9use vortex_array::arrow::IntoArrowArray;
10use vortex_error::VortexResult;
11use vortex_io::runtime::BlockingRuntime;
12
13use crate::ScanBuilder;
14
15impl ScanBuilder<ArrayRef> {
16 pub fn into_record_batch_reader<B: BlockingRuntime>(
22 self,
23 schema: SchemaRef,
24 runtime: &B,
25 ) -> VortexResult<impl RecordBatchReader + 'static> {
26 let data_type = DataType::Struct(schema.fields().clone());
27
28 let iter = self
29 .map(move |chunk| to_record_batch(chunk, &data_type))
30 .into_iter(runtime)?
31 .map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
32
33 Ok(RecordBatchIteratorAdapter { iter, schema })
34 }
35
36 pub fn into_record_batch_stream(
37 self,
38 schema: SchemaRef,
39 ) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
40 let data_type = DataType::Struct(schema.fields().clone());
41
42 let stream = self
43 .map(move |chunk| to_record_batch(chunk, &data_type))
44 .into_stream()?
45 .map_err(|e| ArrowError::ExternalError(Box::new(e)));
46
47 Ok(stream)
48 }
49}
50
51fn to_record_batch(chunk: ArrayRef, data_type: &DataType) -> VortexResult<RecordBatch> {
52 let arrow = chunk.into_arrow(data_type)?;
53 Ok(RecordBatch::from(arrow.as_struct().clone()))
54}
55
56#[derive(Clone)]
59pub struct RecordBatchIteratorAdapter<I> {
60 iter: I,
61 schema: SchemaRef,
62}
63
64impl<I> RecordBatchIteratorAdapter<I> {
65 pub fn new(iter: I, schema: SchemaRef) -> Self {
67 Self { iter, schema }
68 }
69}
70
71impl<I> Iterator for RecordBatchIteratorAdapter<I>
72where
73 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
74{
75 type Item = Result<RecordBatch, ArrowError>;
76
77 #[inline]
78 fn next(&mut self) -> Option<Self::Item> {
79 self.iter.next()
80 }
81}
82
83impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
84where
85 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
86{
87 #[inline]
88 fn schema(&self) -> SchemaRef {
89 self.schema.clone()
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use std::sync::Arc;
96
97 use arrow_array::cast::AsArray;
98 use arrow_array::{
99 Array, ArrayRef as ArrowArrayRef, Int32Array, RecordBatch, StringArray, StructArray,
100 };
101 use arrow_schema::{ArrowError, DataType, Field, Schema};
102 use vortex_array::ArrayRef;
103 use vortex_array::arrow::FromArrowArray;
104 use vortex_error::VortexResult;
105
106 use super::*;
107
108 fn create_test_struct_array() -> VortexResult<ArrayRef> {
109 let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
111 let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
112
113 let schema = Arc::new(Schema::new(vec![
115 Field::new("id", DataType::Int32, true),
116 Field::new("name", DataType::Utf8, true),
117 ]));
118
119 let struct_array = StructArray::new(
120 schema.fields().clone(),
121 vec![
122 Arc::new(id_array) as ArrowArrayRef,
123 Arc::new(name_array) as ArrowArrayRef,
124 ],
125 None,
126 );
127
128 Ok(ArrayRef::from_arrow(&struct_array, true))
130 }
131
132 fn create_arrow_schema() -> Arc<Schema> {
133 Arc::new(Schema::new(vec![
134 Field::new("id", DataType::Int32, true),
135 Field::new("name", DataType::Utf8, true),
136 ]))
137 }
138
139 #[test]
140 fn test_record_batch_conversion() -> VortexResult<()> {
141 let vortex_array = create_test_struct_array()?;
142 let schema = create_arrow_schema();
143 let data_type = DataType::Struct(schema.fields().clone());
144
145 let result = to_record_batch(vortex_array, &data_type);
146 assert!(result.is_ok());
147
148 let batch = result.unwrap();
149 assert_eq!(batch.num_columns(), 2);
150 assert_eq!(batch.num_rows(), 4);
151
152 let id_col = batch
154 .column(0)
155 .as_primitive::<arrow_array::types::Int32Type>();
156 assert_eq!(id_col.value(0), 1);
157 assert_eq!(id_col.value(1), 2);
158 assert!(id_col.is_null(2));
159 assert_eq!(id_col.value(3), 4);
160
161 let name_col = batch.column(1).as_string::<i32>();
163 assert_eq!(name_col.value(0), "Alice");
164 assert_eq!(name_col.value(1), "Bob");
165 assert_eq!(name_col.value(2), "Charlie");
166 assert!(name_col.is_null(3));
167
168 Ok(())
169 }
170
171 #[test]
172 fn test_record_batch_iterator_adapter() {
173 let schema = create_arrow_schema();
174 let batch1 = RecordBatch::try_new(
175 schema.clone(),
176 vec![
177 Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
178 Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
179 ],
180 )
181 .unwrap();
182 let batch2 = RecordBatch::try_new(
183 schema.clone(),
184 vec![
185 Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
186 Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
187 ],
188 )
189 .unwrap();
190
191 let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
192 let mut adapter = RecordBatchIteratorAdapter {
193 iter,
194 schema: schema.clone(),
195 };
196
197 assert_eq!(adapter.schema(), schema);
199
200 let first = adapter.next().unwrap().unwrap();
202 assert_eq!(first.num_rows(), 2);
203
204 let second = adapter.next().unwrap().unwrap();
205 assert_eq!(second.num_rows(), 2);
206
207 assert!(adapter.next().is_none());
208 }
209
210 #[test]
211 fn test_error_in_iterator() {
212 let schema = create_arrow_schema();
213 let error = ArrowError::ComputeError("test error".to_string());
214
215 let iter = vec![Err(error)].into_iter();
216 let mut adapter = RecordBatchIteratorAdapter {
217 iter,
218 schema: schema.clone(),
219 };
220
221 assert_eq!(adapter.schema(), schema);
223 let result = adapter.next().unwrap();
224 assert!(result.is_err());
225 }
226
227 #[test]
228 fn test_mixed_success_and_error() {
229 let schema = create_arrow_schema();
230 let batch = RecordBatch::try_new(
231 schema.clone(),
232 vec![
233 Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
234 Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
235 ],
236 )
237 .unwrap();
238
239 let error = ArrowError::ComputeError("test error".to_string());
240
241 let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
242 let mut adapter = RecordBatchIteratorAdapter { iter, schema };
243
244 let first = adapter.next().unwrap();
246 assert!(first.is_ok());
247
248 let second = adapter.next().unwrap();
250 assert!(second.is_err());
251
252 let third = adapter.next().unwrap();
254 assert!(third.is_ok());
255 }
256}