1use std::sync::Arc;
5
6use arrow_array::RecordBatch;
7use arrow_array::RecordBatchReader;
8use arrow_array::cast::AsArray;
9use arrow_schema::ArrowError;
10use arrow_schema::DataType;
11use arrow_schema::SchemaRef;
12use futures::Stream;
13use futures::TryStreamExt;
14use vortex_array::ArrayRef;
15use vortex_array::ExecutionCtx;
16use vortex_array::VortexSessionExecute;
17use vortex_array::arrow::ArrowArrayExecutor;
18use vortex_error::VortexResult;
19use vortex_io::runtime::BlockingRuntime;
20
21use crate::scan::scan_builder::ScanBuilder;
22
23impl ScanBuilder<ArrayRef> {
24 pub fn into_record_batch_reader<B: BlockingRuntime>(
30 self,
31 schema: SchemaRef,
32 runtime: &B,
33 ) -> VortexResult<impl RecordBatchReader + 'static> {
34 let data_type = DataType::Struct(schema.fields().clone());
35 let session = self.session().clone();
36
37 let iter = self
38 .map(move |chunk| {
39 let mut ctx = session.create_execution_ctx();
40 to_record_batch(chunk, &data_type, &mut ctx)
41 })
42 .into_iter(runtime)?
43 .map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
44
45 Ok(RecordBatchIteratorAdapter { iter, schema })
46 }
47
48 pub fn into_record_batch_stream(
49 self,
50 schema: SchemaRef,
51 ) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
52 let data_type = DataType::Struct(schema.fields().clone());
53 let session = self.session().clone();
54
55 let stream = self
56 .map(move |chunk| {
57 let mut ctx = session.create_execution_ctx();
58 to_record_batch(chunk, &data_type, &mut ctx)
59 })
60 .into_stream()?
61 .map_err(|e| ArrowError::ExternalError(Box::new(e)));
62
63 Ok(stream)
64 }
65}
66
67fn to_record_batch(
68 chunk: ArrayRef,
69 data_type: &DataType,
70 ctx: &mut ExecutionCtx,
71) -> VortexResult<RecordBatch> {
72 let arrow = chunk.execute_arrow(Some(data_type), ctx)?;
73 Ok(RecordBatch::from(arrow.as_struct().clone()))
74}
75
76#[derive(Clone)]
79pub struct RecordBatchIteratorAdapter<I> {
80 iter: I,
81 schema: SchemaRef,
82}
83
84impl<I> RecordBatchIteratorAdapter<I> {
85 pub fn new(iter: I, schema: SchemaRef) -> Self {
87 Self { iter, schema }
88 }
89}
90
91impl<I> Iterator for RecordBatchIteratorAdapter<I>
92where
93 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
94{
95 type Item = Result<RecordBatch, ArrowError>;
96
97 #[inline]
98 fn next(&mut self) -> Option<Self::Item> {
99 self.iter.next()
100 }
101}
102
103impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
104where
105 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
106{
107 #[inline]
108 fn schema(&self) -> SchemaRef {
109 Arc::clone(&self.schema)
110 }
111}
112
113#[cfg(test)]
114mod tests {
115 use std::sync::Arc;
116
117 use arrow_array::Array;
118 use arrow_array::ArrayRef as ArrowArrayRef;
119 use arrow_array::Int32Array;
120 use arrow_array::RecordBatch;
121 use arrow_array::StringArray;
122 use arrow_array::StructArray;
123 use arrow_array::cast::AsArray;
124 use arrow_schema::ArrowError;
125 use arrow_schema::DataType;
126 use arrow_schema::Field;
127 use arrow_schema::Schema;
128 use vortex_array::ArrayRef;
129 use vortex_array::arrow::FromArrowArray;
130 use vortex_error::VortexResult;
131
132 use super::*;
133 use crate::scan::test::SCAN_SESSION;
134
135 fn create_test_struct_array() -> VortexResult<ArrayRef> {
136 let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
138 let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
139
140 let schema = Arc::new(Schema::new(vec![
142 Field::new("id", DataType::Int32, true),
143 Field::new("name", DataType::Utf8, true),
144 ]));
145
146 let struct_array = StructArray::new(
147 schema.fields().clone(),
148 vec![
149 Arc::new(id_array) as ArrowArrayRef,
150 Arc::new(name_array) as ArrowArrayRef,
151 ],
152 None,
153 );
154
155 ArrayRef::from_arrow(&struct_array, true)
157 }
158
159 fn create_arrow_schema() -> Arc<Schema> {
160 Arc::new(Schema::new(vec![
161 Field::new("id", DataType::Int32, true),
162 Field::new("name", DataType::Utf8, true),
163 ]))
164 }
165
166 #[test]
167 fn test_record_batch_conversion() -> VortexResult<()> {
168 let vortex_array = create_test_struct_array()?;
169 let schema = create_arrow_schema();
170 let data_type = DataType::Struct(schema.fields().clone());
171 let mut ctx = SCAN_SESSION.create_execution_ctx();
172
173 let batch = to_record_batch(vortex_array, &data_type, &mut ctx)?;
174 assert_eq!(batch.num_columns(), 2);
175 assert_eq!(batch.num_rows(), 4);
176
177 let id_col = batch
179 .column(0)
180 .as_primitive::<arrow_array::types::Int32Type>();
181 assert_eq!(id_col.value(0), 1);
182 assert_eq!(id_col.value(1), 2);
183 assert!(id_col.is_null(2));
184 assert_eq!(id_col.value(3), 4);
185
186 let name_col = batch.column(1).as_string::<i32>();
188 assert_eq!(name_col.value(0), "Alice");
189 assert_eq!(name_col.value(1), "Bob");
190 assert_eq!(name_col.value(2), "Charlie");
191 assert!(name_col.is_null(3));
192
193 Ok(())
194 }
195
196 #[test]
197 fn test_record_batch_iterator_adapter() -> VortexResult<()> {
198 let schema = create_arrow_schema();
199 let batch1 = RecordBatch::try_new(
200 Arc::clone(&schema),
201 vec![
202 Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
203 Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
204 ],
205 )?;
206 let batch2 = RecordBatch::try_new(
207 Arc::clone(&schema),
208 vec![
209 Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
210 Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
211 ],
212 )?;
213
214 let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
215 let mut adapter = RecordBatchIteratorAdapter {
216 iter,
217 schema: Arc::clone(&schema),
218 };
219
220 assert_eq!(adapter.schema(), schema);
222
223 let first = adapter.next().unwrap()?;
225 assert_eq!(first.num_rows(), 2);
226
227 let second = adapter.next().unwrap()?;
228 assert_eq!(second.num_rows(), 2);
229
230 assert!(adapter.next().is_none());
231
232 Ok(())
233 }
234
235 #[test]
236 fn test_error_in_iterator() {
237 let schema = create_arrow_schema();
238 let error = ArrowError::ComputeError("test error".to_string());
239
240 let iter = vec![Err(error)].into_iter();
241 let mut adapter = RecordBatchIteratorAdapter {
242 iter,
243 schema: Arc::clone(&schema),
244 };
245
246 assert_eq!(adapter.schema(), schema);
248 let result = adapter.next().unwrap();
249 assert!(result.is_err());
250 }
251
252 #[test]
253 fn test_mixed_success_and_error() {
254 let schema = create_arrow_schema();
255 let batch = RecordBatch::try_new(
256 Arc::clone(&schema),
257 vec![
258 Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
259 Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
260 ],
261 )
262 .unwrap();
263
264 let error = ArrowError::ComputeError("test error".to_string());
265
266 let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
267 let mut adapter = RecordBatchIteratorAdapter { iter, schema };
268
269 let first = adapter.next().unwrap();
271 assert!(first.is_ok());
272
273 let second = adapter.next().unwrap();
275 assert!(second.is_err());
276
277 let third = adapter.next().unwrap();
279 assert!(third.is_ok());
280 }
281}