1use arrow_array::cast::AsArray;
5use arrow_array::{RecordBatch, RecordBatchReader};
6use arrow_schema::{ArrowError, DataType, SchemaRef};
7use vortex_array::ArrayRef;
8use vortex_array::arrow::IntoArrowArray;
9use vortex_error::{VortexError, VortexResult};
10
11use crate::ScanBuilder;
12
13impl ScanBuilder<ArrayRef> {
14 pub fn into_record_batch_reader(
22 self,
23 schema: SchemaRef,
24 ) -> VortexResult<impl RecordBatchReader + Send + Clone + 'static> {
25 let data_type = DataType::Struct(schema.fields().clone());
26
27 let iter = self
28 .into_array_iter()?
29 .map(move |chunk| to_record_batch(chunk, &data_type));
30
31 Ok(RecordBatchIteratorAdapter { iter, schema })
32 }
33
34 #[cfg(feature = "tokio")]
37 pub fn into_record_batch_reader_multithread(
38 self,
39 schema: SchemaRef,
40 ) -> VortexResult<impl RecordBatchReader + Send + 'static> {
41 use arrow_array::RecordBatchIterator;
42
43 let data_type = DataType::Struct(schema.fields().clone());
44
45 let iter = self.into_iter_multithread(move |chunk| to_record_batch(chunk, &data_type))?;
46
47 Ok(RecordBatchIterator::new(iter, schema))
48 }
49}
50
51fn to_record_batch(
52 chunk: VortexResult<ArrayRef>,
53 data_type: &DataType,
54) -> Result<RecordBatch, ArrowError> {
55 chunk
56 .and_then(|array| {
57 let arrow = array.into_arrow(data_type)?;
58 Ok::<_, VortexError>(RecordBatch::from(arrow.as_struct().clone()))
59 })
60 .map_err(|e| ArrowError::ExternalError(Box::new(e)))
61}
62
63#[derive(Clone)]
66struct RecordBatchIteratorAdapter<I> {
67 iter: I,
68 schema: SchemaRef,
69}
70
71impl<I> Iterator for RecordBatchIteratorAdapter<I>
72where
73 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
74{
75 type Item = Result<RecordBatch, ArrowError>;
76
77 fn next(&mut self) -> Option<Self::Item> {
78 self.iter.next()
79 }
80}
81
82impl<I> RecordBatchReader for RecordBatchIteratorAdapter<I>
83where
84 I: Iterator<Item = Result<RecordBatch, ArrowError>>,
85{
86 fn schema(&self) -> SchemaRef {
87 self.schema.clone()
88 }
89}
90
91#[cfg(test)]
92mod tests {
93 use std::sync::Arc;
94
95 use arrow_array::cast::AsArray;
96 use arrow_array::{
97 Array, ArrayRef as ArrowArrayRef, Int32Array, RecordBatch, StringArray, StructArray,
98 };
99 use arrow_schema::{ArrowError, DataType, Field, Schema};
100 use vortex_array::ArrayRef;
101 use vortex_array::arrow::FromArrowArray;
102 use vortex_error::{VortexResult, vortex_err};
103
104 use super::*;
105
106 fn create_test_struct_array() -> VortexResult<ArrayRef> {
107 let id_array = Int32Array::from(vec![Some(1), Some(2), None, Some(4)]);
109 let name_array = StringArray::from(vec![Some("Alice"), Some("Bob"), Some("Charlie"), None]);
110
111 let schema = Arc::new(Schema::new(vec![
113 Field::new("id", DataType::Int32, true),
114 Field::new("name", DataType::Utf8, true),
115 ]));
116
117 let struct_array = StructArray::new(
118 schema.fields().clone(),
119 vec![
120 Arc::new(id_array) as ArrowArrayRef,
121 Arc::new(name_array) as ArrowArrayRef,
122 ],
123 None,
124 );
125
126 Ok(ArrayRef::from_arrow(&struct_array, true))
128 }
129
130 fn create_arrow_schema() -> Arc<Schema> {
131 Arc::new(Schema::new(vec![
132 Field::new("id", DataType::Int32, true),
133 Field::new("name", DataType::Utf8, true),
134 ]))
135 }
136
137 #[test]
138 fn test_record_batch_conversion() -> VortexResult<()> {
139 let vortex_array = create_test_struct_array()?;
140 let schema = create_arrow_schema();
141 let data_type = DataType::Struct(schema.fields().clone());
142
143 let result = to_record_batch(Ok(vortex_array), &data_type);
144 assert!(result.is_ok());
145
146 let batch = result.unwrap();
147 assert_eq!(batch.num_columns(), 2);
148 assert_eq!(batch.num_rows(), 4);
149
150 let id_col = batch
152 .column(0)
153 .as_primitive::<arrow_array::types::Int32Type>();
154 assert_eq!(id_col.value(0), 1);
155 assert_eq!(id_col.value(1), 2);
156 assert!(id_col.is_null(2));
157 assert_eq!(id_col.value(3), 4);
158
159 let name_col = batch.column(1).as_string::<i32>();
161 assert_eq!(name_col.value(0), "Alice");
162 assert_eq!(name_col.value(1), "Bob");
163 assert_eq!(name_col.value(2), "Charlie");
164 assert!(name_col.is_null(3));
165
166 Ok(())
167 }
168
169 #[test]
170 fn test_record_batch_conversion_error() {
171 let error = vortex_err!("test error");
172 let data_type = DataType::Struct(create_arrow_schema().fields().clone());
173
174 let result = to_record_batch(Err(error), &data_type);
175 assert!(result.is_err());
176 assert!(matches!(result.unwrap_err(), ArrowError::ExternalError(_)));
177 }
178
179 #[test]
180 fn test_record_batch_iterator_adapter() {
181 let schema = create_arrow_schema();
182 let batch1 = RecordBatch::try_new(
183 schema.clone(),
184 vec![
185 Arc::new(Int32Array::from(vec![Some(1), Some(2)])) as ArrowArrayRef,
186 Arc::new(StringArray::from(vec![Some("Alice"), Some("Bob")])) as ArrowArrayRef,
187 ],
188 )
189 .unwrap();
190 let batch2 = RecordBatch::try_new(
191 schema.clone(),
192 vec![
193 Arc::new(Int32Array::from(vec![None, Some(4)])) as ArrowArrayRef,
194 Arc::new(StringArray::from(vec![Some("Charlie"), None])) as ArrowArrayRef,
195 ],
196 )
197 .unwrap();
198
199 let iter = vec![Ok(batch1), Ok(batch2)].into_iter();
200 let mut adapter = RecordBatchIteratorAdapter {
201 iter,
202 schema: schema.clone(),
203 };
204
205 assert_eq!(adapter.schema(), schema);
207
208 let first = adapter.next().unwrap().unwrap();
210 assert_eq!(first.num_rows(), 2);
211
212 let second = adapter.next().unwrap().unwrap();
213 assert_eq!(second.num_rows(), 2);
214
215 assert!(adapter.next().is_none());
216 }
217
218 #[test]
219 fn test_error_in_iterator() {
220 let schema = create_arrow_schema();
221 let error = ArrowError::ComputeError("test error".to_string());
222
223 let iter = vec![Err(error)].into_iter();
224 let mut adapter = RecordBatchIteratorAdapter {
225 iter,
226 schema: schema.clone(),
227 };
228
229 assert_eq!(adapter.schema(), schema);
231 let result = adapter.next().unwrap();
232 assert!(result.is_err());
233 }
234
235 #[test]
236 fn test_mixed_success_and_error() {
237 let schema = create_arrow_schema();
238 let batch = RecordBatch::try_new(
239 schema.clone(),
240 vec![
241 Arc::new(Int32Array::from(vec![Some(1)])) as ArrowArrayRef,
242 Arc::new(StringArray::from(vec![Some("Test")])) as ArrowArrayRef,
243 ],
244 )
245 .unwrap();
246
247 let error = ArrowError::ComputeError("test error".to_string());
248
249 let iter = vec![Ok(batch.clone()), Err(error), Ok(batch)].into_iter();
250 let mut adapter = RecordBatchIteratorAdapter { iter, schema };
251
252 let first = adapter.next().unwrap();
254 assert!(first.is_ok());
255
256 let second = adapter.next().unwrap();
258 assert!(second.is_err());
259
260 let third = adapter.next().unwrap();
262 assert!(third.is_ok());
263 }
264}