vibesql_executor/select/columnar/batch/
arrow.rs1use std::sync::Arc;
7
8use arrow::array::{Array, ArrayRef};
9use arrow::record_batch::RecordBatch;
10
11use crate::errors::ExecutorError;
12
13use super::types::{ColumnArray, ColumnarBatch};
14
15impl ColumnarBatch {
16 pub fn from_arrow_batch(batch: &RecordBatch) -> Result<Self, ExecutorError> {
36 let row_count = batch.num_rows();
37 let column_count = batch.num_columns();
38
39 let mut columns = Vec::with_capacity(column_count);
40 let mut column_names = Vec::with_capacity(column_count);
41
42 for (idx, field) in batch.schema().fields().iter().enumerate() {
44 column_names.push(field.name().clone());
45 let array = batch.column(idx);
46
47 let column = convert_arrow_array(array, field.data_type())?;
48 columns.push(column);
49 }
50
51 Ok(Self {
52 row_count,
53 columns,
54 column_names: Some(column_names),
55 })
56 }
57}
58
59fn convert_arrow_array(
61 array: &ArrayRef,
62 data_type: &arrow::datatypes::DataType,
63) -> Result<ColumnArray, ExecutorError> {
64 use arrow::array::{
65 BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, StringArray,
66 TimestampMicrosecondArray,
67 };
68 use arrow::datatypes::DataType as ArrowDataType;
69
70 match data_type {
71 ArrowDataType::Int64 => {
72 let arr = array
73 .as_any()
74 .downcast_ref::<Int64Array>()
75 .ok_or_else(|| ExecutorError::ArrowDowncastError {
76 expected_type: "Int64Array".to_string(),
77 context: "Arrow batch conversion".to_string(),
78 })?;
79
80 let values: Vec<i64> = (0..arr.len()).map(|i| arr.value(i)).collect();
81 let nulls = if arr.null_count() > 0 {
82 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
83 } else {
84 None
85 };
86
87 Ok(ColumnArray::Int64(Arc::new(values), nulls))
88 }
89
90 ArrowDataType::Int32 => {
91 let arr = array
92 .as_any()
93 .downcast_ref::<Int32Array>()
94 .ok_or_else(|| ExecutorError::ArrowDowncastError {
95 expected_type: "Int32Array".to_string(),
96 context: "Arrow batch conversion".to_string(),
97 })?;
98
99 let values: Vec<i32> = (0..arr.len()).map(|i| arr.value(i)).collect();
100 let nulls = if arr.null_count() > 0 {
101 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
102 } else {
103 None
104 };
105
106 Ok(ColumnArray::Int32(Arc::new(values), nulls))
107 }
108
109 ArrowDataType::Float64 => {
110 let arr = array
111 .as_any()
112 .downcast_ref::<Float64Array>()
113 .ok_or_else(|| ExecutorError::ArrowDowncastError {
114 expected_type: "Float64Array".to_string(),
115 context: "Arrow batch conversion".to_string(),
116 })?;
117
118 let values: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
119 let nulls = if arr.null_count() > 0 {
120 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
121 } else {
122 None
123 };
124
125 Ok(ColumnArray::Float64(Arc::new(values), nulls))
126 }
127
128 ArrowDataType::Float32 => {
129 let arr = array
130 .as_any()
131 .downcast_ref::<Float32Array>()
132 .ok_or_else(|| ExecutorError::ArrowDowncastError {
133 expected_type: "Float32Array".to_string(),
134 context: "Arrow batch conversion".to_string(),
135 })?;
136
137 let values: Vec<f32> = (0..arr.len()).map(|i| arr.value(i)).collect();
138 let nulls = if arr.null_count() > 0 {
139 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
140 } else {
141 None
142 };
143
144 Ok(ColumnArray::Float32(Arc::new(values), nulls))
145 }
146
147 ArrowDataType::Utf8 => {
148 let arr = array
149 .as_any()
150 .downcast_ref::<StringArray>()
151 .ok_or_else(|| ExecutorError::ArrowDowncastError {
152 expected_type: "StringArray".to_string(),
153 context: "Arrow batch conversion".to_string(),
154 })?;
155
156 let values: Vec<String> = (0..arr.len()).map(|i| arr.value(i).to_string()).collect();
157 let nulls = if arr.null_count() > 0 {
158 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
159 } else {
160 None
161 };
162
163 Ok(ColumnArray::String(Arc::new(values), nulls))
164 }
165
166 ArrowDataType::Boolean => {
167 let arr = array
168 .as_any()
169 .downcast_ref::<BooleanArray>()
170 .ok_or_else(|| ExecutorError::ArrowDowncastError {
171 expected_type: "BooleanArray".to_string(),
172 context: "Arrow batch conversion".to_string(),
173 })?;
174
175 let values: Vec<u8> = (0..arr.len())
176 .map(|i| if arr.value(i) { 1 } else { 0 })
177 .collect();
178 let nulls = if arr.null_count() > 0 {
179 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
180 } else {
181 None
182 };
183
184 Ok(ColumnArray::Boolean(Arc::new(values), nulls))
185 }
186
187 ArrowDataType::Date32 => {
188 let arr = array
189 .as_any()
190 .downcast_ref::<Date32Array>()
191 .ok_or_else(|| ExecutorError::ArrowDowncastError {
192 expected_type: "Date32Array".to_string(),
193 context: "Arrow batch conversion".to_string(),
194 })?;
195
196 let values: Vec<i32> = (0..arr.len()).map(|i| arr.value(i)).collect();
197 let nulls = if arr.null_count() > 0 {
198 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
199 } else {
200 None
201 };
202
203 Ok(ColumnArray::Date(Arc::new(values), nulls))
204 }
205
206 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, _) => {
207 let arr = array
208 .as_any()
209 .downcast_ref::<TimestampMicrosecondArray>()
210 .ok_or_else(|| ExecutorError::ArrowDowncastError {
211 expected_type: "TimestampMicrosecondArray".to_string(),
212 context: "Arrow batch conversion".to_string(),
213 })?;
214
215 let values: Vec<i64> = (0..arr.len()).map(|i| arr.value(i)).collect();
216 let nulls = if arr.null_count() > 0 {
217 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
218 } else {
219 None
220 };
221
222 Ok(ColumnArray::Timestamp(Arc::new(values), nulls))
223 }
224
225 _ => Err(ExecutorError::UnsupportedArrayType {
226 operation: "Arrow batch conversion".to_string(),
227 array_type: format!("{:?}", data_type),
228 }),
229 }
230}
231
232#[cfg(test)]
233mod tests {
234 use super::*;
235
236 #[test]
237 fn test_arrow_integration() {
238 use arrow::array::{Float64Array, Int64Array};
239 use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
240 use arrow::record_batch::RecordBatch;
241 use std::sync::Arc;
242
243 let schema = Arc::new(Schema::new(vec![
245 Field::new("id", ArrowDataType::Int64, false),
246 Field::new("value", ArrowDataType::Float64, false),
247 ]));
248
249 let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
250 let value_array = Arc::new(Float64Array::from(vec![10.5, 20.5, 30.5]));
251
252 let arrow_batch =
253 RecordBatch::try_new(schema.clone(), vec![id_array, value_array]).unwrap();
254
255 let columnar_batch = ColumnarBatch::from_arrow_batch(&arrow_batch).unwrap();
257
258 assert_eq!(columnar_batch.row_count(), 3);
260 assert_eq!(columnar_batch.column_count(), 2);
261
262 let names = columnar_batch.column_names().unwrap();
264 assert_eq!(names, &["id", "value"]);
265
266 let col0 = columnar_batch.column(0).unwrap();
268 if let Some((values, nulls)) = col0.as_i64() {
269 assert_eq!(values, &[1, 2, 3]);
270 assert!(nulls.is_none());
271 } else {
272 panic!("Expected i64 column");
273 }
274
275 let col1 = columnar_batch.column(1).unwrap();
277 if let Some((values, nulls)) = col1.as_f64() {
278 assert_eq!(values, &[10.5, 20.5, 30.5]);
279 assert!(nulls.is_none());
280 } else {
281 panic!("Expected f64 column");
282 }
283 }
284
285 #[test]
286 fn test_arrow_integration_with_nulls() {
287 use arrow::array::{Float64Array, Int64Array};
288 use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
289 use arrow::record_batch::RecordBatch;
290 use std::sync::Arc;
291
292 let schema = Arc::new(Schema::new(vec![
294 Field::new("id", ArrowDataType::Int64, true), Field::new("value", ArrowDataType::Float64, true),
296 ]));
297
298 let id_array = Arc::new(Int64Array::from(vec![Some(1), None, Some(3)]));
299 let value_array = Arc::new(Float64Array::from(vec![Some(10.5), Some(20.5), None]));
300
301 let arrow_batch =
302 RecordBatch::try_new(schema.clone(), vec![id_array, value_array]).unwrap();
303
304 let columnar_batch = ColumnarBatch::from_arrow_batch(&arrow_batch).unwrap();
306
307 let col0 = columnar_batch.column(0).unwrap();
309 if let Some((values, Some(nulls))) = col0.as_i64() {
310 assert_eq!(values.len(), 3);
311 assert_eq!(nulls, &[false, true, false]);
312 } else {
313 panic!("Expected i64 column with nulls");
314 }
315
316 let col1 = columnar_batch.column(1).unwrap();
318 if let Some((values, Some(nulls))) = col1.as_f64() {
319 assert_eq!(values.len(), 3);
320 assert_eq!(nulls, &[false, false, true]);
321 } else {
322 panic!("Expected f64 column with nulls");
323 }
324 }
325}