vibesql_executor/select/columnar/batch/
arrow.rs1use std::sync::Arc;
7
8use arrow::{
9 array::{Array, ArrayRef},
10 record_batch::RecordBatch,
11};
12
13use super::types::{ColumnArray, ColumnarBatch};
14use crate::errors::ExecutorError;
15
16impl ColumnarBatch {
17 pub fn from_arrow_batch(batch: &RecordBatch) -> Result<Self, ExecutorError> {
37 let row_count = batch.num_rows();
38 let column_count = batch.num_columns();
39
40 let mut columns = Vec::with_capacity(column_count);
41 let mut column_names = Vec::with_capacity(column_count);
42
43 for (idx, field) in batch.schema().fields().iter().enumerate() {
45 column_names.push(field.name().clone());
46 let array = batch.column(idx);
47
48 let column = convert_arrow_array(array, field.data_type())?;
49 columns.push(column);
50 }
51
52 Ok(Self { row_count, columns, column_names: Some(column_names) })
53 }
54}
55
56fn convert_arrow_array(
58 array: &ArrayRef,
59 data_type: &arrow::datatypes::DataType,
60) -> Result<ColumnArray, ExecutorError> {
61 use arrow::{
62 array::{
63 BooleanArray, Date32Array, Float32Array, Float64Array, Int32Array, Int64Array,
64 StringArray, TimestampMicrosecondArray,
65 },
66 datatypes::DataType as ArrowDataType,
67 };
68
69 match data_type {
70 ArrowDataType::Int64 => {
71 let arr = array.as_any().downcast_ref::<Int64Array>().ok_or_else(|| {
72 ExecutorError::ArrowDowncastError {
73 expected_type: "Int64Array".to_string(),
74 context: "Arrow batch conversion".to_string(),
75 }
76 })?;
77
78 let values: Vec<i64> = (0..arr.len()).map(|i| arr.value(i)).collect();
79 let nulls = if arr.null_count() > 0 {
80 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
81 } else {
82 None
83 };
84
85 Ok(ColumnArray::Int64(Arc::new(values), nulls))
86 }
87
88 ArrowDataType::Int32 => {
89 let arr = array.as_any().downcast_ref::<Int32Array>().ok_or_else(|| {
90 ExecutorError::ArrowDowncastError {
91 expected_type: "Int32Array".to_string(),
92 context: "Arrow batch conversion".to_string(),
93 }
94 })?;
95
96 let values: Vec<i32> = (0..arr.len()).map(|i| arr.value(i)).collect();
97 let nulls = if arr.null_count() > 0 {
98 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
99 } else {
100 None
101 };
102
103 Ok(ColumnArray::Int32(Arc::new(values), nulls))
104 }
105
106 ArrowDataType::Float64 => {
107 let arr = array.as_any().downcast_ref::<Float64Array>().ok_or_else(|| {
108 ExecutorError::ArrowDowncastError {
109 expected_type: "Float64Array".to_string(),
110 context: "Arrow batch conversion".to_string(),
111 }
112 })?;
113
114 let values: Vec<f64> = (0..arr.len()).map(|i| arr.value(i)).collect();
115 let nulls = if arr.null_count() > 0 {
116 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
117 } else {
118 None
119 };
120
121 Ok(ColumnArray::Float64(Arc::new(values), nulls))
122 }
123
124 ArrowDataType::Float32 => {
125 let arr = array.as_any().downcast_ref::<Float32Array>().ok_or_else(|| {
126 ExecutorError::ArrowDowncastError {
127 expected_type: "Float32Array".to_string(),
128 context: "Arrow batch conversion".to_string(),
129 }
130 })?;
131
132 let values: Vec<f32> = (0..arr.len()).map(|i| arr.value(i)).collect();
133 let nulls = if arr.null_count() > 0 {
134 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
135 } else {
136 None
137 };
138
139 Ok(ColumnArray::Float32(Arc::new(values), nulls))
140 }
141
142 ArrowDataType::Utf8 => {
143 let arr = array.as_any().downcast_ref::<StringArray>().ok_or_else(|| {
144 ExecutorError::ArrowDowncastError {
145 expected_type: "StringArray".to_string(),
146 context: "Arrow batch conversion".to_string(),
147 }
148 })?;
149
150 let values: Vec<Arc<str>> = (0..arr.len()).map(|i| Arc::from(arr.value(i))).collect();
151 let nulls = if arr.null_count() > 0 {
152 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
153 } else {
154 None
155 };
156
157 Ok(ColumnArray::String(Arc::new(values), nulls))
158 }
159
160 ArrowDataType::Boolean => {
161 let arr = array.as_any().downcast_ref::<BooleanArray>().ok_or_else(|| {
162 ExecutorError::ArrowDowncastError {
163 expected_type: "BooleanArray".to_string(),
164 context: "Arrow batch conversion".to_string(),
165 }
166 })?;
167
168 let values: Vec<u8> =
169 (0..arr.len()).map(|i| if arr.value(i) { 1 } else { 0 }).collect();
170 let nulls = if arr.null_count() > 0 {
171 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
172 } else {
173 None
174 };
175
176 Ok(ColumnArray::Boolean(Arc::new(values), nulls))
177 }
178
179 ArrowDataType::Date32 => {
180 let arr = array.as_any().downcast_ref::<Date32Array>().ok_or_else(|| {
181 ExecutorError::ArrowDowncastError {
182 expected_type: "Date32Array".to_string(),
183 context: "Arrow batch conversion".to_string(),
184 }
185 })?;
186
187 let values: Vec<i32> = (0..arr.len()).map(|i| arr.value(i)).collect();
188 let nulls = if arr.null_count() > 0 {
189 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
190 } else {
191 None
192 };
193
194 Ok(ColumnArray::Date(Arc::new(values), nulls))
195 }
196
197 ArrowDataType::Timestamp(arrow::datatypes::TimeUnit::Microsecond, _) => {
198 let arr =
199 array.as_any().downcast_ref::<TimestampMicrosecondArray>().ok_or_else(|| {
200 ExecutorError::ArrowDowncastError {
201 expected_type: "TimestampMicrosecondArray".to_string(),
202 context: "Arrow batch conversion".to_string(),
203 }
204 })?;
205
206 let values: Vec<i64> = (0..arr.len()).map(|i| arr.value(i)).collect();
207 let nulls = if arr.null_count() > 0 {
208 Some(Arc::new((0..arr.len()).map(|i| arr.is_null(i)).collect()))
209 } else {
210 None
211 };
212
213 Ok(ColumnArray::Timestamp(Arc::new(values), nulls))
214 }
215
216 _ => Err(ExecutorError::UnsupportedArrayType {
217 operation: "Arrow batch conversion".to_string(),
218 array_type: format!("{:?}", data_type),
219 }),
220 }
221}
222
223#[cfg(test)]
224mod tests {
225 use super::*;
226
227 #[test]
228 fn test_arrow_integration() {
229 use std::sync::Arc;
230
231 use arrow::{
232 array::{Float64Array, Int64Array},
233 datatypes::{DataType as ArrowDataType, Field, Schema},
234 record_batch::RecordBatch,
235 };
236
237 let schema = Arc::new(Schema::new(vec![
239 Field::new("id", ArrowDataType::Int64, false),
240 Field::new("value", ArrowDataType::Float64, false),
241 ]));
242
243 let id_array = Arc::new(Int64Array::from(vec![1, 2, 3]));
244 let value_array = Arc::new(Float64Array::from(vec![10.5, 20.5, 30.5]));
245
246 let arrow_batch =
247 RecordBatch::try_new(schema.clone(), vec![id_array, value_array]).unwrap();
248
249 let columnar_batch = ColumnarBatch::from_arrow_batch(&arrow_batch).unwrap();
251
252 assert_eq!(columnar_batch.row_count(), 3);
254 assert_eq!(columnar_batch.column_count(), 2);
255
256 let names = columnar_batch.column_names().unwrap();
258 assert_eq!(names, &["id", "value"]);
259
260 let col0 = columnar_batch.column(0).unwrap();
262 if let Some((values, nulls)) = col0.as_i64() {
263 assert_eq!(values, &[1, 2, 3]);
264 assert!(nulls.is_none());
265 } else {
266 panic!("Expected i64 column");
267 }
268
269 let col1 = columnar_batch.column(1).unwrap();
271 if let Some((values, nulls)) = col1.as_f64() {
272 assert_eq!(values, &[10.5, 20.5, 30.5]);
273 assert!(nulls.is_none());
274 } else {
275 panic!("Expected f64 column");
276 }
277 }
278
279 #[test]
280 fn test_arrow_integration_with_nulls() {
281 use std::sync::Arc;
282
283 use arrow::{
284 array::{Float64Array, Int64Array},
285 datatypes::{DataType as ArrowDataType, Field, Schema},
286 record_batch::RecordBatch,
287 };
288
289 let schema = Arc::new(Schema::new(vec![
291 Field::new("id", ArrowDataType::Int64, true), Field::new("value", ArrowDataType::Float64, true),
293 ]));
294
295 let id_array = Arc::new(Int64Array::from(vec![Some(1), None, Some(3)]));
296 let value_array = Arc::new(Float64Array::from(vec![Some(10.5), Some(20.5), None]));
297
298 let arrow_batch =
299 RecordBatch::try_new(schema.clone(), vec![id_array, value_array]).unwrap();
300
301 let columnar_batch = ColumnarBatch::from_arrow_batch(&arrow_batch).unwrap();
303
304 let col0 = columnar_batch.column(0).unwrap();
306 if let Some((values, Some(nulls))) = col0.as_i64() {
307 assert_eq!(values.len(), 3);
308 assert_eq!(nulls, &[false, true, false]);
309 } else {
310 panic!("Expected i64 column with nulls");
311 }
312
313 let col1 = columnar_batch.column(1).unwrap();
315 if let Some((values, Some(nulls))) = col1.as_f64() {
316 assert_eq!(values.len(), 3);
317 assert_eq!(nulls, &[false, false, true]);
318 } else {
319 panic!("Expected f64 column with nulls");
320 }
321 }
322}