1use arrow_array::RecordBatch;
5use arrow_array::RecordBatchReader;
6use arrow_array::cast::AsArray;
7use arrow_schema::ArrowError;
8use arrow_schema::DataType;
9use arrow_schema::SchemaRef;
10use futures::Stream;
11use futures::TryStreamExt;
12use vortex_array::ArrayRef;
13use vortex_array::arrow::ArrowArrayExecutor;
14use vortex_array::arrow::IntoArrowArray;
15use vortex_error::VortexResult;
16use vortex_io::runtime::BlockingRuntime;
17use vortex_layout::layouts::USE_VORTEX_OPERATORS;
18use vortex_session::VortexSession;
19
20use crate::ScanBuilder;
21
22impl ScanBuilder<ArrayRef> {
23 pub fn into_record_batch_reader<B: BlockingRuntime>(
29 self,
30 schema: SchemaRef,
31 runtime: &B,
32 ) -> VortexResult<impl RecordBatchReader + 'static> {
33 let data_type = DataType::Struct(schema.fields().clone());
34 let session = self.session().clone();
35
36 let iter = self
37 .map(move |chunk| to_record_batch(chunk, &data_type, &session))
38 .into_iter(runtime)?
39 .map(|result| result.map_err(|e| ArrowError::ExternalError(Box::new(e))));
40
41 Ok(RecordBatchIteratorAdapter { iter, schema })
42 }
43
44 pub fn into_record_batch_stream(
45 self,
46 schema: SchemaRef,
47 ) -> VortexResult<impl Stream<Item = Result<RecordBatch, ArrowError>> + Send + 'static> {
48 let data_type = DataType::Struct(schema.fields().clone());
49 let session = self.session().clone();
50
51 let stream = self
52 .map(move |chunk| to_record_batch(chunk, &data_type, &session))
53 .into_stream()?
54 .map_err(|e| ArrowError::ExternalError(Box::new(e)));
55
56 Ok(stream)
57 }
58}
59
60fn to_record_batch(
61 chunk: ArrayRef,
62 data_type: &DataType,
63 session: &VortexSession,
64) -> VortexResult<RecordBatch> {
65 if *USE_VORTEX_OPERATORS {
66 let arrow = chunk.execute_arrow(data_type, session)?;
67 Ok(RecordBatch::from(arrow.as_struct().clone()))
68 } else {
69 let arrow = chunk.into_arrow(data_type)?;
70 Ok(RecordBatch::from(arrow.as_struct().clone()))
71 }
72}
73
74#[derive(Clone)]
77pub struct RecordBatchIteratorAdapter<I> {
78 iter: I,
79 schema: SchemaRef,
80}
81
82impl<I> RecordBatchIteratorAdapter<I> {
83 pub fn new(iter: I, schema: SchemaRef) -> Self {
85 Self { iter, schema }
86 }
87}
88
89impl<I> Iterator for RecordBatchIteratorAdapter<I>
90where
91 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
92{
93 type Item = Result<RecordBatch, ArrowError>;
94
95 #[inline]
96 fn next(&mut self) -> Option<Self::Item> {
97 self.iter.next()
98 }
99}
100
101impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
102where
103 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
104{
105 #[inline]
106 fn schema(&self) -> SchemaRef {
107 self.schema.clone()
108 }
109}
110
111#[cfg(test)]
112mod tests {
113 use std::sync::Arc;
114
115 use arrow_array::Array;
116 use arrow_array::ArrayRef as ArrowArrayRef;
117 use arrow_array::Int32Array;
118 use arrow_array::RecordBatch;
119 use arrow_array::StringArray;
120 use arrow_array::StructArray;
121 use arrow_array::cast::AsArray;
122 use arrow_schema::ArrowError;
123 use arrow_schema::DataType;
124 use arrow_schema::Field;
125 use arrow_schema::Schema;
126 use vortex_array::ArrayRef;
127 use vortex_array::arrow::FromArrowArray;
128 use vortex_error::VortexResult;
129
130 use super::*;
131 use crate::test::SESSION;
132
133 fn create_test_struct_array() -> VortexResult<ArrayRef> {
134 let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
136 let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
137
138 let schema = Arc::new(Schema::new(vec![
140 Field::new("id", DataType::Int32, true),
141 Field::new("name", DataType::Utf8, true),
142 ]));
143
144 let struct_array = StructArray::new(
145 schema.fields().clone(),
146 vec![
147 Arc::new(id_array) as ArrowArrayRef,
148 Arc::new(name_array) as ArrowArrayRef,
149 ],
150 None,
151 );
152
153 Ok(ArrayRef::from_arrow(&struct_array, true))
155 }
156
157 fn create_arrow_schema() -> Arc<Schema> {
158 Arc::new(Schema::new(vec![
159 Field::new("id", DataType::Int32, true),
160 Field::new("name", DataType::Utf8, true),
161 ]))
162 }
163
164 #[test]
165 fn test_record_batch_conversion() -> VortexResult<()> {
166 let vortex_array = create_test_struct_array()?;
167 let schema = create_arrow_schema();
168 let data_type = DataType::Struct(schema.fields().clone());
169
170 let batch = to_record_batch(vortex_array, &data_type, &SESSION)?;
171 assert_eq!(batch.num_columns(), 2);
172 assert_eq!(batch.num_rows(), 4);
173
174 let id_col = batch
176 .column(0)
177 .as_primitive::<arrow_array::types::Int32Type>();
178 assert_eq!(id_col.value(0), 1);
179 assert_eq!(id_col.value(1), 2);
180 assert!(id_col.is_null(2));
181 assert_eq!(id_col.value(3), 4);
182
183 let name_col = batch.column(1).as_string::<i32>();
185 assert_eq!(name_col.value(0), "Alice");
186 assert_eq!(name_col.value(1), "Bob");
187 assert_eq!(name_col.value(2), "Charlie");
188 assert!(name_col.is_null(3));
189
190 Ok(())
191 }
192
193 #[test]
194 fn test_record_batch_iterator_adapter() -> VortexResult<()> {
195 let schema = create_arrow_schema();
196 let batch1 = RecordBatch::try_new(
197 schema.clone(),
198 vec![
199 Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
200 Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
201 ],
202 )?;
203 let batch2 = RecordBatch::try_new(
204 schema.clone(),
205 vec![
206 Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
207 Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
208 ],
209 )?;
210
211 let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
212 let mut adapter = RecordBatchIteratorAdapter {
213 iter,
214 schema: schema.clone(),
215 };
216
217 assert_eq!(adapter.schema(), schema);
219
220 let first = adapter.next().unwrap()?;
222 assert_eq!(first.num_rows(), 2);
223
224 let second = adapter.next().unwrap()?;
225 assert_eq!(second.num_rows(), 2);
226
227 assert!(adapter.next().is_none());
228
229 Ok(())
230 }
231
232 #[test]
233 fn test_error_in_iterator() {
234 let schema = create_arrow_schema();
235 let error = ArrowError::ComputeError("test error".to_string());
236
237 let iter = vec![Err(error)].into_iter();
238 let mut adapter = RecordBatchIteratorAdapter {
239 iter,
240 schema: schema.clone(),
241 };
242
243 assert_eq!(adapter.schema(), schema);
245 let result = adapter.next().unwrap();
246 assert!(result.is_err());
247 }
248
249 #[test]
250 fn test_mixed_success_and_error() {
251 let schema = create_arrow_schema();
252 let batch = RecordBatch::try_new(
253 schema.clone(),
254 vec![
255 Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
256 Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
257 ],
258 )
259 .unwrap();
260
261 let error = ArrowError::ComputeError("test error".to_string());
262
263 let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
264 let mut adapter = RecordBatchIteratorAdapter { iter, schema };
265
266 let first = adapter.next().unwrap();
268 assert!(first.is_ok());
269
270 let second = adapter.next().unwrap();
272 assert!(second.is_err());
273
274 let third = adapter.next().unwrap();
276 assert!(third.is_ok());
277 }
278}