1use std::sync::Arc;
5
6use arrow_array::RecordBatch;
7use arrow_array::RecordBatchReader;
8use arrow_array::cast::AsArray;
9use arrow_schema::ArrowError;
10use arrow_schema::Field;
11use arrow_schema::SchemaRef;
12use futures::Stream;
13use futures::TryStreamExt;
14use vortex_array::ArrayRef;
15use vortex_array::ExecutionCtx;
16use vortex_array::VortexSessionExecute;
17use vortex_array::arrow::ArrowSessionExt;
18use vortex_error::VortexResult;
19use vortex_io::runtime::BlockingRuntime;
20
21use crate::scan::scan_builder::ScanBuilder;
22
23impl ScanBuilder<ArrayRef> {
24 pub fn into_record_batch_reader<B: BlockingRuntime>(
30 self,
31 schema: SchemaRef,
32 runtime: &B,
33 ) -> VortexResult<impl RecordBatchReader + 'static> {
34 let struct_field = Field::new_struct("", schema.fields().clone(), false);
35 let session = self.session().clone();
36
37 let iter = self
38 .map(move |chunk| {
39 let mut ctx = session.create_execution_ctx();
40 to_record_batch(chunk, &struct_field, &mut ctx)
41 })
42 .into_iter(runtime)?
43 .map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
44
45 Ok(RecordBatchIteratorAdapter { iter, schema })
46 }
47
48 pub fn into_record_batch_stream(
49 self,
50 schema: SchemaRef,
51 ) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
52 let struct_field = Field::new_struct("", schema.fields().clone(), false);
53 let session = self.session().clone();
54
55 let stream = self
56 .map(move |chunk| {
57 let mut ctx = session.create_execution_ctx();
58 to_record_batch(chunk, &struct_field, &mut ctx)
59 })
60 .into_stream()?
61 .map_err(|e| ArrowError::ExternalError(Box::new(e)));
62
63 Ok(stream)
64 }
65}
66
67fn to_record_batch(
68 chunk: ArrayRef,
69 data_type: &Field,
70 ctx: &mut ExecutionCtx,
71) -> VortexResult<RecordBatch> {
72 let session = ctx.session().clone();
73 let arrow = session.arrow().execute_arrow(chunk, Some(data_type), ctx)?;
74 Ok(RecordBatch::from(arrow.as_struct().clone()))
75}
76
77#[derive(Clone)]
80pub struct RecordBatchIteratorAdapter<I> {
81 iter: I,
82 schema: SchemaRef,
83}
84
85impl<I> RecordBatchIteratorAdapter<I> {
86 pub fn new(iter: I, schema: SchemaRef) -> Self {
88 Self { iter, schema }
89 }
90}
91
92impl<I> Iterator for RecordBatchIteratorAdapter<I>
93where
94 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
95{
96 type Item = Result<RecordBatch, ArrowError>;
97
98 #[inline]
99 fn next(&mut self) -> Option<Self::Item> {
100 self.iter.next()
101 }
102}
103
104impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
105where
106 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
107{
108 #[inline]
109 fn schema(&self) -> SchemaRef {
110 Arc::clone(&self.schema)
111 }
112}
113
114#[cfg(test)]
115mod tests {
116 use std::sync::Arc;
117
118 use arrow_array::Array;
119 use arrow_array::ArrayRef as ArrowArrayRef;
120 use arrow_array::Int32Array;
121 use arrow_array::RecordBatch;
122 use arrow_array::StringArray;
123 use arrow_array::StructArray;
124 use arrow_array::cast::AsArray;
125 use arrow_schema::ArrowError;
126 use arrow_schema::DataType;
127 use arrow_schema::Field;
128 use arrow_schema::Schema;
129 use vortex_array::ArrayRef;
130 use vortex_array::arrow::FromArrowArray;
131 use vortex_error::VortexResult;
132
133 use super::*;
134 use crate::scan::test::SCAN_SESSION;
135
136 fn create_test_struct_array() -> VortexResult<ArrayRef> {
137 let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
139 let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
140
141 let schema = Arc::new(Schema::new(vec![
143 Field::new("id", DataType::Int32, true),
144 Field::new("name", DataType::Utf8, true),
145 ]));
146
147 let struct_array = StructArray::new(
148 schema.fields().clone(),
149 vec![
150 Arc::new(id_array) as ArrowArrayRef,
151 Arc::new(name_array) as ArrowArrayRef,
152 ],
153 None,
154 );
155
156 ArrayRef::from_arrow(&struct_array, true)
158 }
159
160 fn create_arrow_schema() -> Arc<Schema> {
161 Arc::new(Schema::new(vec![
162 Field::new("id", DataType::Int32, true),
163 Field::new("name", DataType::Utf8, true),
164 ]))
165 }
166
167 #[test]
168 fn test_record_batch_conversion() -> VortexResult<()> {
169 let vortex_array = create_test_struct_array()?;
170 let schema = create_arrow_schema();
171 let struct_field = Field::new_struct("", schema.fields().clone(), false);
172 let mut ctx = SCAN_SESSION.create_execution_ctx();
173
174 let batch = to_record_batch(vortex_array, &struct_field, &mut ctx)?;
175 assert_eq!(batch.num_columns(), 2);
176 assert_eq!(batch.num_rows(), 4);
177
178 let id_col = batch
180 .column(0)
181 .as_primitive::<arrow_array::types::Int32Type>();
182 assert_eq!(id_col.value(0), 1);
183 assert_eq!(id_col.value(1), 2);
184 assert!(id_col.is_null(2));
185 assert_eq!(id_col.value(3), 4);
186
187 let name_col = batch.column(1).as_string::<i32>();
189 assert_eq!(name_col.value(0), "Alice");
190 assert_eq!(name_col.value(1), "Bob");
191 assert_eq!(name_col.value(2), "Charlie");
192 assert!(name_col.is_null(3));
193
194 Ok(())
195 }
196
197 #[test]
198 fn test_record_batch_iterator_adapter() -> VortexResult<()> {
199 let schema = create_arrow_schema();
200 let batch1 = RecordBatch::try_new(
201 Arc::clone(&schema),
202 vec![
203 Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
204 Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
205 ],
206 )?;
207 let batch2 = RecordBatch::try_new(
208 Arc::clone(&schema),
209 vec![
210 Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
211 Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
212 ],
213 )?;
214
215 let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
216 let mut adapter = RecordBatchIteratorAdapter {
217 iter,
218 schema: Arc::clone(&schema),
219 };
220
221 assert_eq!(adapter.schema(), schema);
223
224 let first = adapter.next().unwrap()?;
226 assert_eq!(first.num_rows(), 2);
227
228 let second = adapter.next().unwrap()?;
229 assert_eq!(second.num_rows(), 2);
230
231 assert!(adapter.next().is_none());
232
233 Ok(())
234 }
235
236 #[test]
237 fn test_error_in_iterator() {
238 let schema = create_arrow_schema();
239 let error = ArrowError::ComputeError("test error".to_string());
240
241 let iter = vec![Err(error)].into_iter();
242 let mut adapter = RecordBatchIteratorAdapter {
243 iter,
244 schema: Arc::clone(&schema),
245 };
246
247 assert_eq!(adapter.schema(), schema);
249 let result = adapter.next().unwrap();
250 assert!(result.is_err());
251 }
252
253 #[test]
254 fn test_mixed_success_and_error() {
255 let schema = create_arrow_schema();
256 let batch = RecordBatch::try_new(
257 Arc::clone(&schema),
258 vec![
259 Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
260 Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
261 ],
262 )
263 .unwrap();
264
265 let error = ArrowError::ComputeError("test error".to_string());
266
267 let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
268 let mut adapter = RecordBatchIteratorAdapter { iter, schema };
269
270 let first = adapter.next().unwrap();
272 assert!(first.is_ok());
273
274 let second = adapter.next().unwrap();
276 assert!(second.is_err());
277
278 let third = adapter.next().unwrap();
280 assert!(third.is_ok());
281 }
282}