vibesql_executor/select/columnar/batch/
operations.rs1#![allow(clippy::needless_range_loop)]
7
8use crate::errors::ExecutorError;
9use vibesql_storage::Row;
10use vibesql_types::{DataType, Date, SqlValue, Time, Timestamp};
11
12use super::types::{ColumnArray, ColumnarBatch};
13
14impl ColumnarBatch {
15 pub fn row_count(&self) -> usize {
17 self.row_count
18 }
19
20 pub fn column_count(&self) -> usize {
22 self.columns.len()
23 }
24
25 pub fn column(&self, index: usize) -> Option<&ColumnArray> {
27 self.columns.get(index)
28 }
29
30 pub fn column_mut(&mut self, index: usize) -> Option<&mut ColumnArray> {
32 self.columns.get_mut(index)
33 }
34
35 pub fn add_column(&mut self, column: ColumnArray) -> Result<(), ExecutorError> {
37 let col_len = column.len();
39 if self.row_count > 0 && col_len != self.row_count {
40 return Err(ExecutorError::ColumnarLengthMismatch {
41 context: "add_column".to_string(),
42 expected: self.row_count,
43 actual: col_len,
44 });
45 }
46
47 if self.row_count == 0 {
48 self.row_count = col_len;
49 }
50
51 self.columns.push(column);
52 Ok(())
53 }
54
55 pub fn set_column_names(&mut self, names: Vec<String>) {
57 self.column_names = Some(names);
58 }
59
60 pub fn column_names(&self) -> Option<&[String]> {
62 self.column_names.as_deref()
63 }
64
65 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
67 self.column_names.as_ref()?.iter().position(|n| n == name)
68 }
69
70 pub fn get_value(&self, row_idx: usize, col_idx: usize) -> Result<SqlValue, ExecutorError> {
72 let column = self
73 .column(col_idx)
74 .ok_or(ExecutorError::ColumnarColumnNotFound {
75 column_index: col_idx,
76 batch_columns: self.columns.len(),
77 })?;
78 column.get_value(row_idx)
79 }
80
81 pub fn to_rows(&self) -> Result<Vec<Row>, ExecutorError> {
83 let mut rows = Vec::with_capacity(self.row_count);
84
85 for row_idx in 0..self.row_count {
86 let mut values = Vec::with_capacity(self.columns.len());
87
88 for column in &self.columns {
89 let value = column.get_value(row_idx)?;
90 values.push(value);
91 }
92
93 rows.push(Row::new(values));
94 }
95
96 Ok(rows)
97 }
98}
99
100impl ColumnArray {
101 pub fn len(&self) -> usize {
103 match self {
104 Self::Int64(v, _) => v.len(),
105 Self::Int32(v, _) => v.len(),
106 Self::Float64(v, _) => v.len(),
107 Self::Float32(v, _) => v.len(),
108 Self::String(v, _) => v.len(),
109 Self::FixedString(v, _) => v.len(),
110 Self::Date(v, _) => v.len(),
111 Self::Timestamp(v, _) => v.len(),
112 Self::Boolean(v, _) => v.len(),
113 Self::Mixed(v) => v.len(),
114 }
115 }
116
117 pub fn is_empty(&self) -> bool {
119 self.len() == 0
120 }
121
122 pub fn get_value(&self, index: usize) -> Result<SqlValue, ExecutorError> {
124 match self {
125 Self::Int64(values, nulls) => {
126 if let Some(null_mask) = nulls {
127 if null_mask.get(index).copied().unwrap_or(false) {
128 return Ok(SqlValue::Null);
129 }
130 }
131 values
132 .get(index)
133 .map(|v| SqlValue::Integer(*v))
134 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
135 }
136
137 Self::Float64(values, nulls) => {
138 if let Some(null_mask) = nulls {
139 if null_mask.get(index).copied().unwrap_or(false) {
140 return Ok(SqlValue::Null);
141 }
142 }
143 values
144 .get(index)
145 .map(|v| SqlValue::Double(*v))
146 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
147 }
148
149 Self::String(values, nulls) => {
150 if let Some(null_mask) = nulls {
151 if null_mask.get(index).copied().unwrap_or(false) {
152 return Ok(SqlValue::Null);
153 }
154 }
155 values
156 .get(index)
157 .map(|v| SqlValue::Varchar(v.clone()))
158 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
159 }
160
161 Self::Boolean(values, nulls) => {
162 if let Some(null_mask) = nulls {
163 if null_mask.get(index).copied().unwrap_or(false) {
164 return Ok(SqlValue::Null);
165 }
166 }
167 values
168 .get(index)
169 .map(|v| SqlValue::Boolean(*v != 0))
170 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
171 }
172
173 Self::Mixed(values) => values
174 .get(index)
175 .cloned()
176 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index }),
177
178 Self::Int32(values, nulls) => {
179 if let Some(null_mask) = nulls {
180 if null_mask.get(index).copied().unwrap_or(false) {
181 return Ok(SqlValue::Null);
182 }
183 }
184 values
185 .get(index)
186 .map(|v| SqlValue::Integer(*v as i64))
187 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
188 }
189
190 Self::Float32(values, nulls) => {
191 if let Some(null_mask) = nulls {
192 if null_mask.get(index).copied().unwrap_or(false) {
193 return Ok(SqlValue::Null);
194 }
195 }
196 values
197 .get(index)
198 .map(|v| SqlValue::Real(*v))
199 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
200 }
201
202 Self::FixedString(values, nulls) => {
203 if let Some(null_mask) = nulls {
204 if null_mask.get(index).copied().unwrap_or(false) {
205 return Ok(SqlValue::Null);
206 }
207 }
208 values
209 .get(index)
210 .map(|v| SqlValue::Character(v.clone()))
211 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
212 }
213
214 Self::Date(values, nulls) => {
215 if let Some(null_mask) = nulls {
216 if null_mask.get(index).copied().unwrap_or(false) {
217 return Ok(SqlValue::Null);
218 }
219 }
220 values
221 .get(index)
222 .map(|v| SqlValue::Date(days_since_epoch_to_date(*v)))
223 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
224 }
225
226 Self::Timestamp(values, nulls) => {
227 if let Some(null_mask) = nulls {
228 if null_mask.get(index).copied().unwrap_or(false) {
229 return Ok(SqlValue::Null);
230 }
231 }
232 values
233 .get(index)
234 .map(|v| SqlValue::Timestamp(microseconds_to_timestamp(*v)))
235 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
236 }
237 }
238 }
239
240 pub fn data_type(&self) -> DataType {
242 match self {
243 Self::Int64(_, _) => DataType::Integer,
244 Self::Int32(_, _) => DataType::Integer,
245 Self::Float64(_, _) => DataType::DoublePrecision,
246 Self::Float32(_, _) => DataType::Real,
247 Self::String(_, _) => DataType::Varchar { max_length: None },
248 Self::FixedString(_, _) => DataType::Character { length: 255 },
249 Self::Date(_, _) => DataType::Date,
250 Self::Timestamp(_, _) => DataType::Timestamp { with_timezone: false },
251 Self::Boolean(_, _) => DataType::Boolean,
252 Self::Mixed(_) => DataType::Varchar { max_length: None }, }
254 }
255
256 pub fn as_i64(&self) -> Option<(&[i64], Option<&[bool]>)> {
258 match self {
259 Self::Int64(values, nulls) => {
260 Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
261 }
262 _ => None,
263 }
264 }
265
266 pub fn as_f64(&self) -> Option<(&[f64], Option<&[bool]>)> {
268 match self {
269 Self::Float64(values, nulls) => {
270 Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
271 }
272 _ => None,
273 }
274 }
275}
276
277fn days_since_epoch_to_date(days: i32) -> Date {
279 let mut year = 1970;
281 let mut remaining_days = days;
282
283 loop {
285 let year_days = if year % 4 == 0 && (year % 100 != 0 || year % 400 == 0) {
286 366
287 } else {
288 365
289 };
290 if remaining_days < year_days {
291 break;
292 }
293 remaining_days -= year_days;
294 year += 1;
295 }
296
297 let is_leap = year % 4 == 0 && (year % 100 != 0 || year % 400 == 0);
299 let month_lengths = if is_leap {
300 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
301 } else {
302 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
303 };
304
305 let mut month = 1;
306 for &days_in_month in &month_lengths {
307 if remaining_days < days_in_month {
308 break;
309 }
310 remaining_days -= days_in_month;
311 month += 1;
312 }
313
314 let day = remaining_days + 1;
315
316 Date::new(year, month as u8, day as u8).unwrap_or_else(|_| Date::new(1970, 1, 1).unwrap())
317}
318
319fn microseconds_to_timestamp(micros: i64) -> Timestamp {
321 let days = (micros / 86_400_000_000) as i32;
322 let remaining_micros = micros % 86_400_000_000;
323
324 let date = days_since_epoch_to_date(days);
325
326 let hours = (remaining_micros / 3_600_000_000) as u8;
327 let remaining_micros = remaining_micros % 3_600_000_000;
328 let minutes = (remaining_micros / 60_000_000) as u8;
329 let remaining_micros = remaining_micros % 60_000_000;
330 let seconds = (remaining_micros / 1_000_000) as u8;
331 let nanoseconds = ((remaining_micros % 1_000_000) * 1_000) as u32;
332
333 let time =
334 Time::new(hours, minutes, seconds, nanoseconds).unwrap_or_else(|_| Time::new(0, 0, 0, 0).unwrap());
335
336 Timestamp::new(date, time)
337}
338
339#[cfg(test)]
340mod tests {
341 use super::*;
342
343 #[test]
344 fn test_batch_to_rows_roundtrip() {
345 let original_rows = vec![
346 Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
347 Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
348 ];
349
350 let batch = ColumnarBatch::from_rows(&original_rows).unwrap();
351 let converted_rows = batch.to_rows().unwrap();
352
353 assert_eq!(converted_rows.len(), original_rows.len());
354 for (original, converted) in original_rows.iter().zip(converted_rows.iter()) {
355 assert_eq!(original.len(), converted.len());
356 for i in 0..original.len() {
357 assert_eq!(original.get(i), converted.get(i));
358 }
359 }
360 }
361
362 #[test]
363 fn test_simd_column_access() {
364 let rows = vec![
365 Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
366 Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
367 Row::new(vec![SqlValue::Integer(3), SqlValue::Double(30.5)]),
368 ];
369
370 let batch = ColumnarBatch::from_rows(&rows).unwrap();
371
372 let col0 = batch.column(0).unwrap();
374 if let Some((values, nulls)) = col0.as_i64() {
375 assert_eq!(values, &[1, 2, 3]);
376 assert!(nulls.is_none());
377 } else {
378 panic!("Expected i64 slice");
379 }
380
381 let col1 = batch.column(1).unwrap();
383 if let Some((values, nulls)) = col1.as_f64() {
384 assert_eq!(values, &[10.5, 20.5, 30.5]);
385 assert!(nulls.is_none());
386 } else {
387 panic!("Expected f64 slice");
388 }
389 }
390}