vibesql_executor/select/columnar/batch/
arrow.rs1use std::sync::Arc;
7
8use arrow::array::{Array, ArrayRef};
9use arrow::record_batch::RecordBatch;
10
11use crate::errors::ExecutorError;
12
13use super::types::{ColumnArray, ColumnarBatch};
14
15impl ColumnarBatch {
16 pub fn from_arrow_batch(batch: &RecordBatch) -> Result<Self, ExecutorError> {
36 let row_count = batch.num_rows();
37 let column_count = batch.num_columns();
38
39 let mut columns = Vec::with_capacity(column_count);
40 let mut column_names = Vec::with_capacity(column_count);
41
42 for (idx, field) in batch.schema().fields().iter().enumerate() {
44 column_names.push(field.name().clone());
45 let array = batch.column(idx);
46
47 let column = convert_arrow_array(array, field.data_type())?;
48 columns.push(column);
49 }
50
51 Ok(Self { row_count, columns, column_names: Some(column_names) })
52 }
53}
54
55fn convert_arrow_array(
57 array: &ArrayRef,
58 data_type: &arrow::datatypes::DataType,
59) -> Result<ColumnArray, ExecutorError> {
60 use arrow::array::{
61 BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array, StringArray,
62 TimestampMicrosecondArray,
63 };
64 use arrow::datatypes::DataType as ArrowDataType;
65
66 match data_type {
67 ArrowDataType::Int64 => {
68 let arr = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
69 ExecutorError::ArrowDowncastError {
70 expected_type: "Int64Array".to_string(),
71 context: "Arrow batch conversion".to_string(),
72 }
73 })?;
74
75 let values: Vec<i64> = (0..arr.len()).map(|i| arr.value(i)).collect();
76 let nulls = if arr.null_count() > 0 {
77 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
78 } else {
79 None
80 };
81
82 Ok(ColumnArray::Int64(Arc::new(values), nulls))
83 }
84
85 ArrowDataType::Int32 => {
86 let arr = array.as_any().downcast_ref::<Int32Array>().ok_or_else(|| {
87 ExecutorError::ArrowDowncastError {
88 expected_type: "Int32Array".to_string(),
89 context: "Arrow batch conversion".to_string(),
90 }
91 })?;
92
93 let values: Vec<i32> = (0..arr.len()).map(|i| arr.value(i)).collect();
94 let nulls = if arr.null_count() > 0 {
95 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
96 } else {
97 None
98 };
99
100 Ok(ColumnArray::Int32(Arc::new(values), nulls))
101 }
102
103 ArrowDataType::Float64 => {
104 let arr = array.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
105 ExecutorError::ArrowDowncastError {
106 expected_type: "Float64Array".to_string(),
107 context: "Arrow batch conversion".to_string(),
108 }
109 })?;
110
111 let values: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
112 let nulls = if arr.null_count() > 0 {
113 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
114 } else {
115 None
116 };
117
118 Ok(ColumnArray::Float64(Arc::new(values), nulls))
119 }
120
121 ArrowDataType::Float32 => {
122 let arr = array.as_any().downcast_ref::<Float32Array>().ok_or_else(|| {
123 ExecutorError::ArrowDowncastError {
124 expected_type: "Float32Array".to_string(),
125 context: "Arrow batch conversion".to_string(),
126 }
127 })?;
128
129 let values: Vec<f32> = (0..arr.len()).map(|i| arr.value(i)).collect();
130 let nulls = if arr.null_count() > 0 {
131 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
132 } else {
133 None
134 };
135
136 Ok(ColumnArray::Float32(Arc::new(values), nulls))
137 }
138
139 ArrowDataType::Utf8 => {
140 let arr = array.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
141 ExecutorError::ArrowDowncastError {
142 expected_type: "StringArray".to_string(),
143 context: "Arrow batch conversion".to_string(),
144 }
145 })?;
146
147 let values: Vec<String> = (0..arr.len()).map(|i| arr.value(i).to_string()).collect();
148 let nulls = if arr.null_count() > 0 {
149 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
150 } else {
151 None
152 };
153
154 Ok(ColumnArray::String(Arc::new(values), nulls))
155 }
156
157 ArrowDataType::Boolean => {
158 let arr = array.as_any().downcast_ref::<BooleanArray>().ok_or_else(|| {
159 ExecutorError::ArrowDowncastError {
160 expected_type: "BooleanArray".to_string(),
161 context: "Arrow batch conversion".to_string(),
162 }
163 })?;
164
165 let values: Vec<u8> =
166 (0..arr.len()).map(|i| if arr.value(i) { 1 } else { 0 }).collect();
167 let nulls = if arr.null_count() > 0 {
168 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
169 } else {
170 None
171 };
172
173 Ok(ColumnArray::Boolean(Arc::new(values), nulls))
174 }
175
176 ArrowDataType::Date32 => {
177 let arr = array.as_any().downcast_ref::<Date32Array>().ok_or_else(|| {
178 ExecutorError::ArrowDowncastError {
179 expected_type: "Date32Array".to_string(),
180 context: "Arrow batch conversion".to_string(),
181 }
182 })?;
183
184 let values: Vec<i32> = (0..arr.len()).map(|i| arr.value(i)).collect();
185 let nulls = if arr.null_count() > 0 {
186 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
187 } else {
188 None
189 };
190
191 Ok(ColumnArray::Date(Arc::new(values), nulls))
192 }
193
194 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, _) => {
195 let arr =
196 array.as_any().downcast_ref::<TimestampMicrosecondArray>().ok_or_else(|| {
197 ExecutorError::ArrowDowncastError {
198 expected_type: "TimestampMicrosecondArray".to_string(),
199 context: "Arrow batch conversion".to_string(),
200 }
201 })?;
202
203 let values: Vec<i64> = (0..arr.len()).map(|i| arr.value(i)).collect();
204 let nulls = if arr.null_count() > 0 {
205 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
206 } else {
207 None
208 };
209
210 Ok(ColumnArray::Timestamp(Arc::new(values), nulls))
211 }
212
213 _ => Err(ExecutorError::UnsupportedArrayType {
214 operation: "Arrow batch conversion".to_string(),
215 array_type: format!("{:?}", data_type),
216 }),
217 }
218}
219
220#[cfg(test)]
221mod tests {
222 use super::*;
223
224 #[test]
225 fn test_arrow_integration() {
226 use arrow::array::{Float64Array, Int64Array};
227 use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
228 use arrow::record_batch::RecordBatch;
229 use std::sync::Arc;
230
231 let schema = Arc::new(Schema::new(vec![
233 Field::new("id", ArrowDataType::Int64, false),
234 Field::new("value", ArrowDataType::Float64, false),
235 ]));
236
237 let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
238 let value_array = Arc::new(Float64Array::from(vec![10.5, 20.5, 30.5]));
239
240 let arrow_batch =
241 RecordBatch::try_new(schema.clone(), vec![id_array, value_array]).unwrap();
242
243 let columnar_batch = ColumnarBatch::from_arrow_batch(&arrow_batch).unwrap();
245
246 assert_eq!(columnar_batch.row_count(), 3);
248 assert_eq!(columnar_batch.column_count(), 2);
249
250 let names = columnar_batch.column_names().unwrap();
252 assert_eq!(names, &["id", "value"]);
253
254 let col0 = columnar_batch.column(0).unwrap();
256 if let Some((values, nulls)) = col0.as_i64() {
257 assert_eq!(values, &[1, 2, 3]);
258 assert!(nulls.is_none());
259 } else {
260 panic!("Expected i64 column");
261 }
262
263 let col1 = columnar_batch.column(1).unwrap();
265 if let Some((values, nulls)) = col1.as_f64() {
266 assert_eq!(values, &[10.5, 20.5, 30.5]);
267 assert!(nulls.is_none());
268 } else {
269 panic!("Expected f64 column");
270 }
271 }
272
273 #[test]
274 fn test_arrow_integration_with_nulls() {
275 use arrow::array::{Float64Array, Int64Array};
276 use arrow::datatypes::{DataType as ArrowDataType, Field, Schema};
277 use arrow::record_batch::RecordBatch;
278 use std::sync::Arc;
279
280 let schema = Arc::new(Schema::new(vec![
282 Field::new("id", ArrowDataType::Int64, true), Field::new("value", ArrowDataType::Float64, true),
284 ]));
285
286 let id_array = Arc::new(Int64Array::from(vec![Some(1), None, Some(3)]));
287 let value_array = Arc::new(Float64Array::from(vec![Some(10.5), Some(20.5), None]));
288
289 let arrow_batch =
290 RecordBatch::try_new(schema.clone(), vec![id_array, value_array]).unwrap();
291
292 let columnar_batch = ColumnarBatch::from_arrow_batch(&arrow_batch).unwrap();
294
295 let col0 = columnar_batch.column(0).unwrap();
297 if let Some((values, Some(nulls))) = col0.as_i64() {
298 assert_eq!(values.len(), 3);
299 assert_eq!(nulls, &[false, true, false]);
300 } else {
301 panic!("Expected i64 column with nulls");
302 }
303
304 let col1 = columnar_batch.column(1).unwrap();
306 if let Some((values, Some(nulls))) = col1.as_f64() {
307 assert_eq!(values.len(), 3);
308 assert_eq!(nulls, &[false, false, true]);
309 } else {
310 panic!("Expected f64 column with nulls");
311 }
312 }
313}