1use arrow_array::cast::AsArray;
5use arrow_array::{RecordBatch, RecordBatchReader};
6use arrow_schema::{ArrowError, DataType, SchemaRef};
7use vortex_array::ArrayRef;
8use vortex_array::arrow::IntoArrowArray;
9use vortex_error::{VortexError, VortexResult};
10
11use crate::ScanBuilder;
12
13impl ScanBuilder<ArrayRef> {
14 pub fn into_record_batch_reader(
22 self,
23 schema: SchemaRef,
24 ) -> VortexResult<impl RecordBatchReader + Send + Clone + 'static> {
25 let data_type = DataType::Struct(schema.fields().clone());
26
27 let iter = self
28 .into_array_iter()?
29 .map(move |chunk| to_record_batch(chunk, &data_type));
30
31 Ok(RecordBatchIteratorAdapter { iter, schema })
32 }
33
34 #[cfg(feature = "tokio")]
37 pub fn into_record_batch_reader_multithread(
38 self,
39 schema: SchemaRef,
40 ) -> VortexResult<impl RecordBatchReader + Send + 'static> {
41 use arrow_array::RecordBatchIterator;
42
43 let data_type = DataType::Struct(schema.fields().clone());
44
45 let iter = self.into_iter_multithread(move |chunk| to_record_batch(chunk, &data_type))?;
46
47 Ok(RecordBatchIterator::new(iter, schema))
48 }
49}
50
51fn to_record_batch(
52 chunk: VortexResult<ArrayRef>,
53 data_type: &DataType,
54) -> Result<RecordBatch, ArrowError> {
55 chunk
56 .and_then(|array| {
57 let arrow = array.into_arrow(data_type)?;
58 Ok::<_, VortexError>(RecordBatch::from(arrow.as_struct().clone()))
59 })
60 .map_err(|e| ArrowError::ExternalError(Box::new(e)))
61}
62
63#[derive(Clone)]
66struct RecordBatchIteratorAdapter<I> {
67 iter: I,
68 schema: SchemaRef,
69}
70
71impl<I> Iterator for RecordBatchIteratorAdapter<I>
72where
73 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
74{
75 type Item = Result<RecordBatch, ArrowError>;
76
77 #[inline]
78 fn next(&mut self) -> Option<Self::Item> {
79 self.iter.next()
80 }
81}
82
83impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
84where
85 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
86{
87 #[inline]
88 fn schema(&self) -> SchemaRef {
89 self.schema.clone()
90 }
91}
92
93#[cfg(test)]
94mod tests {
95 use std::sync::Arc;
96
97 use arrow_array::cast::AsArray;
98 use arrow_array::{
99 Array, ArrayRef as ArrowArrayRef, Int32Array, RecordBatch, StringArray, StructArray,
100 };
101 use arrow_schema::{ArrowError, DataType, Field, Schema};
102 use vortex_array::ArrayRef;
103 use vortex_array::arrow::FromArrowArray;
104 use vortex_error::{VortexResult, vortex_err};
105
106 use super::*;
107
108 fn create_test_struct_array() -> VortexResult<ArrayRef> {
109 let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
111 let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
112
113 let schema = Arc::new(Schema::new(vec![
115 Field::new("id", DataType::Int32, true),
116 Field::new("name", DataType::Utf8, true),
117 ]));
118
119 let struct_array = StructArray::new(
120 schema.fields().clone(),
121 vec![
122 Arc::new(id_array) as ArrowArrayRef,
123 Arc::new(name_array) as ArrowArrayRef,
124 ],
125 None,
126 );
127
128 Ok(ArrayRef::from_arrow(&struct_array, true))
130 }
131
132 fn create_arrow_schema() -> Arc<Schema> {
133 Arc::new(Schema::new(vec![
134 Field::new("id", DataType::Int32, true),
135 Field::new("name", DataType::Utf8, true),
136 ]))
137 }
138
139 #[test]
140 fn test_record_batch_conversion() -> VortexResult<()> {
141 let vortex_array = create_test_struct_array()?;
142 let schema = create_arrow_schema();
143 let data_type = DataType::Struct(schema.fields().clone());
144
145 let result = to_record_batch(Ok(vortex_array), &data_type);
146 assert!(result.is_ok());
147
148 let batch = result.unwrap();
149 assert_eq!(batch.num_columns(), 2);
150 assert_eq!(batch.num_rows(), 4);
151
152 let id_col = batch
154 .column(0)
155 .as_primitive::<arrow_array::types::Int32Type>();
156 assert_eq!(id_col.value(0), 1);
157 assert_eq!(id_col.value(1), 2);
158 assert!(id_col.is_null(2));
159 assert_eq!(id_col.value(3), 4);
160
161 let name_col = batch.column(1).as_string::<i32>();
163 assert_eq!(name_col.value(0), "Alice");
164 assert_eq!(name_col.value(1), "Bob");
165 assert_eq!(name_col.value(2), "Charlie");
166 assert!(name_col.is_null(3));
167
168 Ok(())
169 }
170
171 #[test]
172 fn test_record_batch_conversion_error() {
173 let error = vortex_err!("test error");
174 let data_type = DataType::Struct(create_arrow_schema().fields().clone());
175
176 let result = to_record_batch(Err(error), &data_type);
177 assert!(result.is_err());
178 assert!(matches!(result.unwrap_err(), ArrowError::ExternalError(_)));
179 }
180
181 #[test]
182 fn test_record_batch_iterator_adapter() {
183 let schema = create_arrow_schema();
184 let batch1 = RecordBatch::try_new(
185 schema.clone(),
186 vec![
187 Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
188 Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
189 ],
190 )
191 .unwrap();
192 let batch2 = RecordBatch::try_new(
193 schema.clone(),
194 vec![
195 Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
196 Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
197 ],
198 )
199 .unwrap();
200
201 let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
202 let mut adapter = RecordBatchIteratorAdapter {
203 iter,
204 schema: schema.clone(),
205 };
206
207 assert_eq!(adapter.schema(), schema);
209
210 let first = adapter.next().unwrap().unwrap();
212 assert_eq!(first.num_rows(), 2);
213
214 let second = adapter.next().unwrap().unwrap();
215 assert_eq!(second.num_rows(), 2);
216
217 assert!(adapter.next().is_none());
218 }
219
220 #[test]
221 fn test_error_in_iterator() {
222 let schema = create_arrow_schema();
223 let error = ArrowError::ComputeError("test error".to_string());
224
225 let iter = vec![Err(error)].into_iter();
226 let mut adapter = RecordBatchIteratorAdapter {
227 iter,
228 schema: schema.clone(),
229 };
230
231 assert_eq!(adapter.schema(), schema);
233 let result = adapter.next().unwrap();
234 assert!(result.is_err());
235 }
236
237 #[test]
238 fn test_mixed_success_and_error() {
239 let schema = create_arrow_schema();
240 let batch = RecordBatch::try_new(
241 schema.clone(),
242 vec![
243 Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
244 Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
245 ],
246 )
247 .unwrap();
248
249 let error = ArrowError::ComputeError("test error".to_string());
250
251 let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
252 let mut adapter = RecordBatchIteratorAdapter { iter, schema };
253
254 let first = adapter.next().unwrap();
256 assert!(first.is_ok());
257
258 let second = adapter.next().unwrap();
260 assert!(second.is_err());
261
262 let third = adapter.next().unwrap();
264 assert!(third.is_ok());
265 }
266}