1use arrow_array::RecordBatch;
5use arrow_array::RecordBatchReader;
6use arrow_array::cast::AsArray;
7use arrow_schema::ArrowError;
8use arrow_schema::DataType;
9use arrow_schema::SchemaRef;
10use futures::Stream;
11use futures::TryStreamExt;
12use vortex_array::ArrayRef;
13use vortex_array::ExecutionCtx;
14use vortex_array::VortexSessionExecute;
15use vortex_array::arrow::ArrowArrayExecutor;
16use vortex_error::VortexResult;
17use vortex_io::runtime::BlockingRuntime;
18
19use crate::ScanBuilder;
20
21impl ScanBuilder<ArrayRef> {
22 pub fn into_record_batch_reader<B: BlockingRuntime>(
28 self,
29 schema: SchemaRef,
30 runtime: &B,
31 ) -> VortexResult<impl RecordBatchReader + 'static> {
32 let data_type = DataType::Struct(schema.fields().clone());
33 let session = self.session().clone();
34
35 let iter = self
36 .map(move |chunk| {
37 let mut ctx = session.create_execution_ctx();
38 to_record_batch(chunk, &data_type, &mut ctx)
39 })
40 .into_iter(runtime)?
41 .map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
42
43 Ok(RecordBatchIteratorAdapter { iter, schema })
44 }
45
46 pub fn into_record_batch_stream(
47 self,
48 schema: SchemaRef,
49 ) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
50 let data_type = DataType::Struct(schema.fields().clone());
51 let session = self.session().clone();
52
53 let stream = self
54 .map(move |chunk| {
55 let mut ctx = session.create_execution_ctx();
56 to_record_batch(chunk, &data_type, &mut ctx)
57 })
58 .into_stream()?
59 .map_err(|e| ArrowError::ExternalError(Box::new(e)));
60
61 Ok(stream)
62 }
63}
64
65fn to_record_batch(
66 chunk: ArrayRef,
67 data_type: &DataType,
68 ctx: &mut ExecutionCtx,
69) -> VortexResult<RecordBatch> {
70 let arrow = chunk.execute_arrow(Some(data_type), ctx)?;
71 Ok(RecordBatch::from(arrow.as_struct().clone()))
72}
73
74#[derive(Clone)]
77pub struct RecordBatchIteratorAdapter<I> {
78 iter: I,
79 schema: SchemaRef,
80}
81
82impl<I> RecordBatchIteratorAdapter<I> {
83 pub fn new(iter: I, schema: SchemaRef) -> Self {
85 Self { iter, schema }
86 }
87}
88
89impl<I> Iterator for RecordBatchIteratorAdapter<I>
90where
91 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
92{
93 type Item = Result<RecordBatch, ArrowError>;
94
95 #[inline]
96 fn next(&mut self) -> Option<Self::Item> {
97 self.iter.next()
98 }
99}
100
101impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
102where
103 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
104{
105 #[inline]
106 fn schema(&self) -> SchemaRef {
107 self.schema.clone()
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use std::sync::Arc;
114
115 use arrow_array::Array;
116 use arrow_array::ArrayRef as ArrowArrayRef;
117 use arrow_array::Int32Array;
118 use arrow_array::RecordBatch;
119 use arrow_array::StringArray;
120 use arrow_array::StructArray;
121 use arrow_array::cast::AsArray;
122 use arrow_schema::ArrowError;
123 use arrow_schema::DataType;
124 use arrow_schema::Field;
125 use arrow_schema::Schema;
126 use vortex_array::ArrayRef;
127 use vortex_array::arrow::FromArrowArray;
128 use vortex_error::VortexResult;
129
130 use super::*;
131 use crate::test::SCAN_SESSION;
132
133 fn create_test_struct_array() -> VortexResult<ArrayRef> {
134 let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
136 let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
137
138 let schema = Arc::new(Schema::new(vec![
140 Field::new("id", DataType::Int32, true),
141 Field::new("name", DataType::Utf8, true),
142 ]));
143
144 let struct_array = StructArray::new(
145 schema.fields().clone(),
146 vec![
147 Arc::new(id_array) as ArrowArrayRef,
148 Arc::new(name_array) as ArrowArrayRef,
149 ],
150 None,
151 );
152
153 ArrayRef::from_arrow(&struct_array, true)
155 }
156
157 fn create_arrow_schema() -> Arc<Schema> {
158 Arc::new(Schema::new(vec![
159 Field::new("id", DataType::Int32, true),
160 Field::new("name", DataType::Utf8, true),
161 ]))
162 }
163
164 #[test]
165 fn test_record_batch_conversion() -> VortexResult<()> {
166 let vortex_array = create_test_struct_array()?;
167 let schema = create_arrow_schema();
168 let data_type = DataType::Struct(schema.fields().clone());
169 let mut ctx = SCAN_SESSION.create_execution_ctx();
170
171 let batch = to_record_batch(vortex_array, &data_type, &mut ctx)?;
172 assert_eq!(batch.num_columns(), 2);
173 assert_eq!(batch.num_rows(), 4);
174
175 let id_col = batch
177 .column(0)
178 .as_primitive::<arrow_array::types::Int32Type>();
179 assert_eq!(id_col.value(0), 1);
180 assert_eq!(id_col.value(1), 2);
181 assert!(id_col.is_null(2));
182 assert_eq!(id_col.value(3), 4);
183
184 let name_col = batch.column(1).as_string::<i32>();
186 assert_eq!(name_col.value(0), "Alice");
187 assert_eq!(name_col.value(1), "Bob");
188 assert_eq!(name_col.value(2), "Charlie");
189 assert!(name_col.is_null(3));
190
191 Ok(())
192 }
193
194 #[test]
195 fn test_record_batch_iterator_adapter() -> VortexResult<()> {
196 let schema = create_arrow_schema();
197 let batch1 = RecordBatch::try_new(
198 schema.clone(),
199 vec![
200 Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
201 Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
202 ],
203 )?;
204 let batch2 = RecordBatch::try_new(
205 schema.clone(),
206 vec![
207 Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
208 Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
209 ],
210 )?;
211
212 let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
213 let mut adapter = RecordBatchIteratorAdapter {
214 iter,
215 schema: schema.clone(),
216 };
217
218 assert_eq!(adapter.schema(), schema);
220
221 let first = adapter.next().unwrap()?;
223 assert_eq!(first.num_rows(), 2);
224
225 let second = adapter.next().unwrap()?;
226 assert_eq!(second.num_rows(), 2);
227
228 assert!(adapter.next().is_none());
229
230 Ok(())
231 }
232
233 #[test]
234 fn test_error_in_iterator() {
235 let schema = create_arrow_schema();
236 let error = ArrowError::ComputeError("test error".to_string());
237
238 let iter = vec![Err(error)].into_iter();
239 let mut adapter = RecordBatchIteratorAdapter {
240 iter,
241 schema: schema.clone(),
242 };
243
244 assert_eq!(adapter.schema(), schema);
246 let result = adapter.next().unwrap();
247 assert!(result.is_err());
248 }
249
250 #[test]
251 fn test_mixed_success_and_error() {
252 let schema = create_arrow_schema();
253 let batch = RecordBatch::try_new(
254 schema.clone(),
255 vec![
256 Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
257 Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
258 ],
259 )
260 .unwrap();
261
262 let error = ArrowError::ComputeError("test error".to_string());
263
264 let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
265 let mut adapter = RecordBatchIteratorAdapter { iter, schema };
266
267 let first = adapter.next().unwrap();
269 assert!(first.is_ok());
270
271 let second = adapter.next().unwrap();
273 assert!(second.is_err());
274
275 let third = adapter.next().unwrap();
277 assert!(third.is_ok());
278 }
279}