vibesql_executor/select/columnar/batch/
operations.rs

1//! Batch manipulation operations
2//!
3//! This module contains methods for accessing and manipulating
4//! `ColumnarBatch` and `ColumnArray` instances.
5
6#![allow(clippy::needless_range_loop)]
7
8use crate::errors::ExecutorError;
9use vibesql_storage::Row;
10use vibesql_types::{DataType, Date, SqlValue, Time, Timestamp};
11
12use super::types::{ColumnArray, ColumnarBatch};
13
14impl ColumnarBatch {
15    /// Get the number of rows in this batch
16    pub fn row_count(&self) -> usize {
17        self.row_count
18    }
19
20    /// Get the number of columns in this batch
21    pub fn column_count(&self) -> usize {
22        self.columns.len()
23    }
24
25    /// Get a reference to a column array
26    pub fn column(&self, index: usize) -> Option<&ColumnArray> {
27        self.columns.get(index)
28    }
29
30    /// Get a mutable reference to a column array
31    pub fn column_mut(&mut self, index: usize) -> Option<&mut ColumnArray> {
32        self.columns.get_mut(index)
33    }
34
35    /// Add a column to the batch
36    pub fn add_column(&mut self, column: ColumnArray) -> Result<(), ExecutorError> {
37        // Verify column has correct length
38        let col_len = column.len();
39        if self.row_count > 0 && col_len != self.row_count {
40            return Err(ExecutorError::ColumnarLengthMismatch {
41                context: "add_column".to_string(),
42                expected: self.row_count,
43                actual: col_len,
44            });
45        }
46
47        if self.row_count == 0 {
48            self.row_count = col_len;
49        }
50
51        self.columns.push(column);
52        Ok(())
53    }
54
55    /// Set column names (for debugging)
56    pub fn set_column_names(&mut self, names: Vec<String>) {
57        self.column_names = Some(names);
58    }
59
60    /// Get column names
61    pub fn column_names(&self) -> Option<&[String]> {
62        self.column_names.as_deref()
63    }
64
65    /// Get column index by name
66    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
67        self.column_names.as_ref()?.iter().position(|n| n == name)
68    }
69
70    /// Get a value at a specific (row, column) position
71    pub fn get_value(&self, row_idx: usize, col_idx: usize) -> Result<SqlValue, ExecutorError> {
72        let column = self
73            .column(col_idx)
74            .ok_or(ExecutorError::ColumnarColumnNotFound {
75                column_index: col_idx,
76                batch_columns: self.columns.len(),
77            })?;
78        column.get_value(row_idx)
79    }
80
81    /// Convert columnar batch back to row-oriented storage
82    pub fn to_rows(&self) -> Result<Vec<Row>, ExecutorError> {
83        let mut rows = Vec::with_capacity(self.row_count);
84
85        for row_idx in 0..self.row_count {
86            let mut values = Vec::with_capacity(self.columns.len());
87
88            for column in &self.columns {
89                let value = column.get_value(row_idx)?;
90                values.push(value);
91            }
92
93            rows.push(Row::new(values));
94        }
95
96        Ok(rows)
97    }
98}
99
100impl ColumnArray {
101    /// Get the number of values in this column
102    pub fn len(&self) -> usize {
103        match self {
104            Self::Int64(v, _) => v.len(),
105            Self::Int32(v, _) => v.len(),
106            Self::Float64(v, _) => v.len(),
107            Self::Float32(v, _) => v.len(),
108            Self::String(v, _) => v.len(),
109            Self::FixedString(v, _) => v.len(),
110            Self::Date(v, _) => v.len(),
111            Self::Timestamp(v, _) => v.len(),
112            Self::Boolean(v, _) => v.len(),
113            Self::Mixed(v) => v.len(),
114        }
115    }
116
117    /// Check if column is empty
118    pub fn is_empty(&self) -> bool {
119        self.len() == 0
120    }
121
122    /// Get a value at the specified index as SqlValue
123    pub fn get_value(&self, index: usize) -> Result<SqlValue, ExecutorError> {
124        match self {
125            Self::Int64(values, nulls) => {
126                if let Some(null_mask) = nulls {
127                    if null_mask.get(index).copied().unwrap_or(false) {
128                        return Ok(SqlValue::Null);
129                    }
130                }
131                values
132                    .get(index)
133                    .map(|v| SqlValue::Integer(*v))
134                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
135            }
136
137            Self::Float64(values, nulls) => {
138                if let Some(null_mask) = nulls {
139                    if null_mask.get(index).copied().unwrap_or(false) {
140                        return Ok(SqlValue::Null);
141                    }
142                }
143                values
144                    .get(index)
145                    .map(|v| SqlValue::Double(*v))
146                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
147            }
148
149            Self::String(values, nulls) => {
150                if let Some(null_mask) = nulls {
151                    if null_mask.get(index).copied().unwrap_or(false) {
152                        return Ok(SqlValue::Null);
153                    }
154                }
155                values
156                    .get(index)
157                    .map(|v| SqlValue::Varchar(v.clone()))
158                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
159            }
160
161            Self::Boolean(values, nulls) => {
162                if let Some(null_mask) = nulls {
163                    if null_mask.get(index).copied().unwrap_or(false) {
164                        return Ok(SqlValue::Null);
165                    }
166                }
167                values
168                    .get(index)
169                    .map(|v| SqlValue::Boolean(*v != 0))
170                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
171            }
172
173            Self::Mixed(values) => values
174                .get(index)
175                .cloned()
176                .ok_or(ExecutorError::ColumnIndexOutOfBounds { index }),
177
178            Self::Int32(values, nulls) => {
179                if let Some(null_mask) = nulls {
180                    if null_mask.get(index).copied().unwrap_or(false) {
181                        return Ok(SqlValue::Null);
182                    }
183                }
184                values
185                    .get(index)
186                    .map(|v| SqlValue::Integer(*v as i64))
187                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
188            }
189
190            Self::Float32(values, nulls) => {
191                if let Some(null_mask) = nulls {
192                    if null_mask.get(index).copied().unwrap_or(false) {
193                        return Ok(SqlValue::Null);
194                    }
195                }
196                values
197                    .get(index)
198                    .map(|v| SqlValue::Real(*v))
199                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
200            }
201
202            Self::FixedString(values, nulls) => {
203                if let Some(null_mask) = nulls {
204                    if null_mask.get(index).copied().unwrap_or(false) {
205                        return Ok(SqlValue::Null);
206                    }
207                }
208                values
209                    .get(index)
210                    .map(|v| SqlValue::Character(v.clone()))
211                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
212            }
213
214            Self::Date(values, nulls) => {
215                if let Some(null_mask) = nulls {
216                    if null_mask.get(index).copied().unwrap_or(false) {
217                        return Ok(SqlValue::Null);
218                    }
219                }
220                values
221                    .get(index)
222                    .map(|v| SqlValue::Date(days_since_epoch_to_date(*v)))
223                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
224            }
225
226            Self::Timestamp(values, nulls) => {
227                if let Some(null_mask) = nulls {
228                    if null_mask.get(index).copied().unwrap_or(false) {
229                        return Ok(SqlValue::Null);
230                    }
231                }
232                values
233                    .get(index)
234                    .map(|v| SqlValue::Timestamp(microseconds_to_timestamp(*v)))
235                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
236            }
237        }
238    }
239
240    /// Get the data type of this column
241    pub fn data_type(&self) -> DataType {
242        match self {
243            Self::Int64(_, _) => DataType::Integer,
244            Self::Int32(_, _) => DataType::Integer,
245            Self::Float64(_, _) => DataType::DoublePrecision,
246            Self::Float32(_, _) => DataType::Real,
247            Self::String(_, _) => DataType::Varchar { max_length: None },
248            Self::FixedString(_, _) => DataType::Character { length: 255 },
249            Self::Date(_, _) => DataType::Date,
250            Self::Timestamp(_, _) => DataType::Timestamp { with_timezone: false },
251            Self::Boolean(_, _) => DataType::Boolean,
252            Self::Mixed(_) => DataType::Varchar { max_length: None }, // fallback
253        }
254    }
255
256    /// Get raw i64 slice (for SIMD operations)
257    pub fn as_i64(&self) -> Option<(&[i64], Option<&[bool]>)> {
258        match self {
259            Self::Int64(values, nulls) => {
260                Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
261            }
262            _ => None,
263        }
264    }
265
266    /// Get raw f64 slice (for SIMD operations)
267    pub fn as_f64(&self) -> Option<(&[f64], Option<&[bool]>)> {
268        match self {
269            Self::Float64(values, nulls) => {
270                Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
271            }
272            _ => None,
273        }
274    }
275}
276
277/// Convert days since Unix epoch to Date
278fn days_since_epoch_to_date(days: i32) -> Date {
279    // Simplified conversion: start from 1970-01-01 and count forward
280    let mut year = 1970;
281    let mut remaining_days = days;
282
283    // Handle years
284    loop {
285        let year_days = if year % 4 == 0 && (year % 100 != 0 || year % 400 == 0) {
286            366
287        } else {
288            365
289        };
290        if remaining_days < year_days {
291            break;
292        }
293        remaining_days -= year_days;
294        year += 1;
295    }
296
297    // Handle months
298    let is_leap = year % 4 == 0 && (year % 100 != 0 || year % 400 == 0);
299    let month_lengths = if is_leap {
300        [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
301    } else {
302        [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
303    };
304
305    let mut month = 1;
306    for &days_in_month in &month_lengths {
307        if remaining_days < days_in_month {
308            break;
309        }
310        remaining_days -= days_in_month;
311        month += 1;
312    }
313
314    let day = remaining_days + 1;
315
316    Date::new(year, month as u8, day as u8).unwrap_or_else(|_| Date::new(1970, 1, 1).unwrap())
317}
318
319/// Convert microseconds since Unix epoch to Timestamp
320fn microseconds_to_timestamp(micros: i64) -> Timestamp {
321    let days = (micros / 86_400_000_000) as i32;
322    let remaining_micros = micros % 86_400_000_000;
323
324    let date = days_since_epoch_to_date(days);
325
326    let hours = (remaining_micros / 3_600_000_000) as u8;
327    let remaining_micros = remaining_micros % 3_600_000_000;
328    let minutes = (remaining_micros / 60_000_000) as u8;
329    let remaining_micros = remaining_micros % 60_000_000;
330    let seconds = (remaining_micros / 1_000_000) as u8;
331    let nanoseconds = ((remaining_micros % 1_000_000) * 1_000) as u32;
332
333    let time =
334        Time::new(hours, minutes, seconds, nanoseconds).unwrap_or_else(|_| Time::new(0, 0, 0, 0).unwrap());
335
336    Timestamp::new(date, time)
337}
338
339#[cfg(test)]
340mod tests {
341    use super::*;
342
343    #[test]
344    fn test_batch_to_rows_roundtrip() {
345        let original_rows = vec![
346            Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
347            Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
348        ];
349
350        let batch = ColumnarBatch::from_rows(&original_rows).unwrap();
351        let converted_rows = batch.to_rows().unwrap();
352
353        assert_eq!(converted_rows.len(), original_rows.len());
354        for (original, converted) in original_rows.iter().zip(converted_rows.iter()) {
355            assert_eq!(original.len(), converted.len());
356            for i in 0..original.len() {
357                assert_eq!(original.get(i), converted.get(i));
358            }
359        }
360    }
361
362    #[test]
363    fn test_simd_column_access() {
364        let rows = vec![
365            Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
366            Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
367            Row::new(vec![SqlValue::Integer(3), SqlValue::Double(30.5)]),
368        ];
369
370        let batch = ColumnarBatch::from_rows(&rows).unwrap();
371
372        // Access i64 column for SIMD
373        let col0 = batch.column(0).unwrap();
374        if let Some((values, nulls)) = col0.as_i64() {
375            assert_eq!(values, &[1, 2, 3]);
376            assert!(nulls.is_none());
377        } else {
378            panic!("Expected i64 slice");
379        }
380
381        // Access f64 column for SIMD
382        let col1 = batch.column(1).unwrap();
383        if let Some((values, nulls)) = col1.as_f64() {
384            assert_eq!(values, &[10.5, 20.5, 30.5]);
385            assert!(nulls.is_none());
386        } else {
387            panic!("Expected f64 slice");
388        }
389    }
390}