vibesql_executor/select/columnar/batch/
operations.rs

1//! Batch manipulation operations
2//!
3//! This module contains methods for accessing and manipulating
4//! `ColumnarBatch` and `ColumnArray` instances.
5
6#![allow(clippy::needless_range_loop)]
7
8use crate::errors::ExecutorError;
9use vibesql_storage::Row;
10use vibesql_types::{DataType, Date, SqlValue, Time, Timestamp};
11
12use super::types::{ColumnArray, ColumnarBatch};
13
14impl ColumnarBatch {
15    /// Get the number of rows in this batch
16    pub fn row_count(&self) -> usize {
17        self.row_count
18    }
19
20    /// Get the number of columns in this batch
21    pub fn column_count(&self) -> usize {
22        self.columns.len()
23    }
24
25    /// Get a reference to a column array
26    pub fn column(&self, index: usize) -> Option<&ColumnArray> {
27        self.columns.get(index)
28    }
29
30    /// Get a mutable reference to a column array
31    pub fn column_mut(&mut self, index: usize) -> Option<&mut ColumnArray> {
32        self.columns.get_mut(index)
33    }
34
35    /// Add a column to the batch
36    pub fn add_column(&mut self, column: ColumnArray) -> Result<(), ExecutorError> {
37        // Verify column has correct length
38        let col_len = column.len();
39        if self.row_count > 0 && col_len != self.row_count {
40            return Err(ExecutorError::ColumnarLengthMismatch {
41                context: "add_column".to_string(),
42                expected: self.row_count,
43                actual: col_len,
44            });
45        }
46
47        if self.row_count == 0 {
48            self.row_count = col_len;
49        }
50
51        self.columns.push(column);
52        Ok(())
53    }
54
55    /// Set column names (for debugging)
56    pub fn set_column_names(&mut self, names: Vec<String>) {
57        self.column_names = Some(names);
58    }
59
60    /// Get column names
61    pub fn column_names(&self) -> Option<&[String]> {
62        self.column_names.as_deref()
63    }
64
65    /// Get column index by name
66    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
67        self.column_names.as_ref()?.iter().position(|n| n == name)
68    }
69
70    /// Get a value at a specific (row, column) position
71    pub fn get_value(&self, row_idx: usize, col_idx: usize) -> Result<SqlValue, ExecutorError> {
72        let column = self.column(col_idx).ok_or(ExecutorError::ColumnarColumnNotFound {
73            column_index: col_idx,
74            batch_columns: self.columns.len(),
75        })?;
76        column.get_value(row_idx)
77    }
78
79    /// Convert columnar batch back to row-oriented storage
80    pub fn to_rows(&self) -> Result<Vec<Row>, ExecutorError> {
81        let mut rows = Vec::with_capacity(self.row_count);
82
83        for row_idx in 0..self.row_count {
84            let mut values = Vec::with_capacity(self.columns.len());
85
86            for column in &self.columns {
87                let value = column.get_value(row_idx)?;
88                values.push(value);
89            }
90
91            rows.push(Row::new(values));
92        }
93
94        Ok(rows)
95    }
96}
97
98impl ColumnArray {
99    /// Get the number of values in this column
100    pub fn len(&self) -> usize {
101        match self {
102            Self::Int64(v, _) => v.len(),
103            Self::Int32(v, _) => v.len(),
104            Self::Float64(v, _) => v.len(),
105            Self::Float32(v, _) => v.len(),
106            Self::String(v, _) => v.len(),
107            Self::FixedString(v, _) => v.len(),
108            Self::Date(v, _) => v.len(),
109            Self::Timestamp(v, _) => v.len(),
110            Self::Boolean(v, _) => v.len(),
111            Self::Mixed(v) => v.len(),
112        }
113    }
114
115    /// Check if column is empty
116    pub fn is_empty(&self) -> bool {
117        self.len() == 0
118    }
119
120    /// Get a value at the specified index as SqlValue
121    pub fn get_value(&self, index: usize) -> Result<SqlValue, ExecutorError> {
122        match self {
123            Self::Int64(values, nulls) => {
124                if let Some(null_mask) = nulls {
125                    if null_mask.get(index).copied().unwrap_or(false) {
126                        return Ok(SqlValue::Null);
127                    }
128                }
129                values
130                    .get(index)
131                    .map(|v| SqlValue::Integer(*v))
132                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
133            }
134
135            Self::Float64(values, nulls) => {
136                if let Some(null_mask) = nulls {
137                    if null_mask.get(index).copied().unwrap_or(false) {
138                        return Ok(SqlValue::Null);
139                    }
140                }
141                values
142                    .get(index)
143                    .map(|v| SqlValue::Double(*v))
144                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
145            }
146
147            Self::String(values, nulls) => {
148                if let Some(null_mask) = nulls {
149                    if null_mask.get(index).copied().unwrap_or(false) {
150                        return Ok(SqlValue::Null);
151                    }
152                }
153                values
154                    .get(index)
155                    .map(|v| SqlValue::Varchar(v.clone()))
156                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
157            }
158
159            Self::Boolean(values, nulls) => {
160                if let Some(null_mask) = nulls {
161                    if null_mask.get(index).copied().unwrap_or(false) {
162                        return Ok(SqlValue::Null);
163                    }
164                }
165                values
166                    .get(index)
167                    .map(|v| SqlValue::Boolean(*v != 0))
168                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
169            }
170
171            Self::Mixed(values) => {
172                values.get(index).cloned().ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
173            }
174
175            Self::Int32(values, nulls) => {
176                if let Some(null_mask) = nulls {
177                    if null_mask.get(index).copied().unwrap_or(false) {
178                        return Ok(SqlValue::Null);
179                    }
180                }
181                values
182                    .get(index)
183                    .map(|v| SqlValue::Integer(*v as i64))
184                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
185            }
186
187            Self::Float32(values, nulls) => {
188                if let Some(null_mask) = nulls {
189                    if null_mask.get(index).copied().unwrap_or(false) {
190                        return Ok(SqlValue::Null);
191                    }
192                }
193                values
194                    .get(index)
195                    .map(|v| SqlValue::Real(*v))
196                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
197            }
198
199            Self::FixedString(values, nulls) => {
200                if let Some(null_mask) = nulls {
201                    if null_mask.get(index).copied().unwrap_or(false) {
202                        return Ok(SqlValue::Null);
203                    }
204                }
205                values
206                    .get(index)
207                    .map(|v| SqlValue::Character(v.clone()))
208                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
209            }
210
211            Self::Date(values, nulls) => {
212                if let Some(null_mask) = nulls {
213                    if null_mask.get(index).copied().unwrap_or(false) {
214                        return Ok(SqlValue::Null);
215                    }
216                }
217                values
218                    .get(index)
219                    .map(|v| SqlValue::Date(days_since_epoch_to_date(*v)))
220                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
221            }
222
223            Self::Timestamp(values, nulls) => {
224                if let Some(null_mask) = nulls {
225                    if null_mask.get(index).copied().unwrap_or(false) {
226                        return Ok(SqlValue::Null);
227                    }
228                }
229                values
230                    .get(index)
231                    .map(|v| SqlValue::Timestamp(microseconds_to_timestamp(*v)))
232                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
233            }
234        }
235    }
236
237    /// Get the data type of this column
238    pub fn data_type(&self) -> DataType {
239        match self {
240            Self::Int64(_, _) => DataType::Integer,
241            Self::Int32(_, _) => DataType::Integer,
242            Self::Float64(_, _) => DataType::DoublePrecision,
243            Self::Float32(_, _) => DataType::Real,
244            Self::String(_, _) => DataType::Varchar { max_length: None },
245            Self::FixedString(_, _) => DataType::Character { length: 255 },
246            Self::Date(_, _) => DataType::Date,
247            Self::Timestamp(_, _) => DataType::Timestamp { with_timezone: false },
248            Self::Boolean(_, _) => DataType::Boolean,
249            Self::Mixed(_) => DataType::Varchar { max_length: None }, // fallback
250        }
251    }
252
253    /// Get raw i64 slice (for SIMD operations)
254    pub fn as_i64(&self) -> Option<(&[i64], Option<&[bool]>)> {
255        match self {
256            Self::Int64(values, nulls) => {
257                Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
258            }
259            _ => None,
260        }
261    }
262
263    /// Get raw f64 slice (for SIMD operations)
264    pub fn as_f64(&self) -> Option<(&[f64], Option<&[bool]>)> {
265        match self {
266            Self::Float64(values, nulls) => {
267                Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
268            }
269            _ => None,
270        }
271    }
272}
273
274/// Convert days since Unix epoch to Date
275fn days_since_epoch_to_date(days: i32) -> Date {
276    // Simplified conversion: start from 1970-01-01 and count forward
277    let mut year = 1970;
278    let mut remaining_days = days;
279
280    // Handle years
281    loop {
282        let year_days =
283            if year % 4 == 0 && (year % 100 != 0 || year % 400 == 0) { 366 } else { 365 };
284        if remaining_days < year_days {
285            break;
286        }
287        remaining_days -= year_days;
288        year += 1;
289    }
290
291    // Handle months
292    let is_leap = year % 4 == 0 && (year % 100 != 0 || year % 400 == 0);
293    let month_lengths = if is_leap {
294        [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
295    } else {
296        [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
297    };
298
299    let mut month = 1;
300    for &days_in_month in &month_lengths {
301        if remaining_days < days_in_month {
302            break;
303        }
304        remaining_days -= days_in_month;
305        month += 1;
306    }
307
308    let day = remaining_days + 1;
309
310    Date::new(year, month as u8, day as u8).unwrap_or_else(|_| Date::new(1970, 1, 1).unwrap())
311}
312
313/// Convert microseconds since Unix epoch to Timestamp
314fn microseconds_to_timestamp(micros: i64) -> Timestamp {
315    let days = (micros / 86_400_000_000) as i32;
316    let remaining_micros = micros % 86_400_000_000;
317
318    let date = days_since_epoch_to_date(days);
319
320    let hours = (remaining_micros / 3_600_000_000) as u8;
321    let remaining_micros = remaining_micros % 3_600_000_000;
322    let minutes = (remaining_micros / 60_000_000) as u8;
323    let remaining_micros = remaining_micros % 60_000_000;
324    let seconds = (remaining_micros / 1_000_000) as u8;
325    let nanoseconds = ((remaining_micros % 1_000_000) * 1_000) as u32;
326
327    let time = Time::new(hours, minutes, seconds, nanoseconds)
328        .unwrap_or_else(|_| Time::new(0, 0, 0, 0).unwrap());
329
330    Timestamp::new(date, time)
331}
332
333#[cfg(test)]
334mod tests {
335    use super::*;
336
337    #[test]
338    fn test_batch_to_rows_roundtrip() {
339        let original_rows = vec![
340            Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
341            Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
342        ];
343
344        let batch = ColumnarBatch::from_rows(&original_rows).unwrap();
345        let converted_rows = batch.to_rows().unwrap();
346
347        assert_eq!(converted_rows.len(), original_rows.len());
348        for (original, converted) in original_rows.iter().zip(converted_rows.iter()) {
349            assert_eq!(original.len(), converted.len());
350            for i in 0..original.len() {
351                assert_eq!(original.get(i), converted.get(i));
352            }
353        }
354    }
355
356    #[test]
357    fn test_simd_column_access() {
358        let rows = vec![
359            Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
360            Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
361            Row::new(vec![SqlValue::Integer(3), SqlValue::Double(30.5)]),
362        ];
363
364        let batch = ColumnarBatch::from_rows(&rows).unwrap();
365
366        // Access i64 column for SIMD
367        let col0 = batch.column(0).unwrap();
368        if let Some((values, nulls)) = col0.as_i64() {
369            assert_eq!(values, &[1, 2, 3]);
370            assert!(nulls.is_none());
371        } else {
372            panic!("Expected i64 slice");
373        }
374
375        // Access f64 column for SIMD
376        let col1 = batch.column(1).unwrap();
377        if let Some((values, nulls)) = col1.as_f64() {
378            assert_eq!(values, &[10.5, 20.5, 30.5]);
379            assert!(nulls.is_none());
380        } else {
381            panic!("Expected f64 slice");
382        }
383    }
384}