vibesql_executor/select/columnar/batch/
operations.rs1#![allow(clippy::needless_range_loop)]
7
8use crate::errors::ExecutorError;
9use vibesql_storage::Row;
10use vibesql_types::{DataType, Date, SqlValue, Time, Timestamp};
11
12use super::types::{ColumnArray, ColumnarBatch};
13
14impl ColumnarBatch {
15 pub fn row_count(&self) -> usize {
17 self.row_count
18 }
19
20 pub fn column_count(&self) -> usize {
22 self.columns.len()
23 }
24
25 pub fn column(&self, index: usize) -> Option<&ColumnArray> {
27 self.columns.get(index)
28 }
29
30 pub fn column_mut(&mut self, index: usize) -> Option<&mut ColumnArray> {
32 self.columns.get_mut(index)
33 }
34
35 pub fn add_column(&mut self, column: ColumnArray) -> Result<(), ExecutorError> {
37 let col_len = column.len();
39 if self.row_count > 0 && col_len != self.row_count {
40 return Err(ExecutorError::ColumnarLengthMismatch {
41 context: "add_column".to_string(),
42 expected: self.row_count,
43 actual: col_len,
44 });
45 }
46
47 if self.row_count == 0 {
48 self.row_count = col_len;
49 }
50
51 self.columns.push(column);
52 Ok(())
53 }
54
55 pub fn set_column_names(&mut self, names: Vec<String>) {
57 self.column_names = Some(names);
58 }
59
60 pub fn column_names(&self) -> Option<&[String]> {
62 self.column_names.as_deref()
63 }
64
65 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
67 self.column_names.as_ref()?.iter().position(|n| n == name)
68 }
69
70 pub fn get_value(&self, row_idx: usize, col_idx: usize) -> Result<SqlValue, ExecutorError> {
72 let column = self.column(col_idx).ok_or(ExecutorError::ColumnarColumnNotFound {
73 column_index: col_idx,
74 batch_columns: self.columns.len(),
75 })?;
76 column.get_value(row_idx)
77 }
78
79 pub fn to_rows(&self) -> Result<Vec<Row>, ExecutorError> {
81 let mut rows = Vec::with_capacity(self.row_count);
82
83 for row_idx in 0..self.row_count {
84 let mut values = Vec::with_capacity(self.columns.len());
85
86 for column in &self.columns {
87 let value = column.get_value(row_idx)?;
88 values.push(value);
89 }
90
91 rows.push(Row::new(values));
92 }
93
94 Ok(rows)
95 }
96}
97
98impl ColumnArray {
99 pub fn len(&self) -> usize {
101 match self {
102 Self::Int64(v, _) => v.len(),
103 Self::Int32(v, _) => v.len(),
104 Self::Float64(v, _) => v.len(),
105 Self::Float32(v, _) => v.len(),
106 Self::String(v, _) => v.len(),
107 Self::FixedString(v, _) => v.len(),
108 Self::Date(v, _) => v.len(),
109 Self::Timestamp(v, _) => v.len(),
110 Self::Boolean(v, _) => v.len(),
111 Self::Mixed(v) => v.len(),
112 }
113 }
114
115 pub fn is_empty(&self) -> bool {
117 self.len() == 0
118 }
119
120 pub fn get_value(&self, index: usize) -> Result<SqlValue, ExecutorError> {
122 match self {
123 Self::Int64(values, nulls) => {
124 if let Some(null_mask) = nulls {
125 if null_mask.get(index).copied().unwrap_or(false) {
126 return Ok(SqlValue::Null);
127 }
128 }
129 values
130 .get(index)
131 .map(|v| SqlValue::Integer(*v))
132 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
133 }
134
135 Self::Float64(values, nulls) => {
136 if let Some(null_mask) = nulls {
137 if null_mask.get(index).copied().unwrap_or(false) {
138 return Ok(SqlValue::Null);
139 }
140 }
141 values
142 .get(index)
143 .map(|v| SqlValue::Double(*v))
144 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
145 }
146
147 Self::String(values, nulls) => {
148 if let Some(null_mask) = nulls {
149 if null_mask.get(index).copied().unwrap_or(false) {
150 return Ok(SqlValue::Null);
151 }
152 }
153 values
154 .get(index)
155 .map(|v| SqlValue::Varchar(v.clone()))
156 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
157 }
158
159 Self::Boolean(values, nulls) => {
160 if let Some(null_mask) = nulls {
161 if null_mask.get(index).copied().unwrap_or(false) {
162 return Ok(SqlValue::Null);
163 }
164 }
165 values
166 .get(index)
167 .map(|v| SqlValue::Boolean(*v != 0))
168 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
169 }
170
171 Self::Mixed(values) => {
172 values.get(index).cloned().ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
173 }
174
175 Self::Int32(values, nulls) => {
176 if let Some(null_mask) = nulls {
177 if null_mask.get(index).copied().unwrap_or(false) {
178 return Ok(SqlValue::Null);
179 }
180 }
181 values
182 .get(index)
183 .map(|v| SqlValue::Integer(*v as i64))
184 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
185 }
186
187 Self::Float32(values, nulls) => {
188 if let Some(null_mask) = nulls {
189 if null_mask.get(index).copied().unwrap_or(false) {
190 return Ok(SqlValue::Null);
191 }
192 }
193 values
194 .get(index)
195 .map(|v| SqlValue::Real(*v))
196 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
197 }
198
199 Self::FixedString(values, nulls) => {
200 if let Some(null_mask) = nulls {
201 if null_mask.get(index).copied().unwrap_or(false) {
202 return Ok(SqlValue::Null);
203 }
204 }
205 values
206 .get(index)
207 .map(|v| SqlValue::Character(v.clone()))
208 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
209 }
210
211 Self::Date(values, nulls) => {
212 if let Some(null_mask) = nulls {
213 if null_mask.get(index).copied().unwrap_or(false) {
214 return Ok(SqlValue::Null);
215 }
216 }
217 values
218 .get(index)
219 .map(|v| SqlValue::Date(days_since_epoch_to_date(*v)))
220 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
221 }
222
223 Self::Timestamp(values, nulls) => {
224 if let Some(null_mask) = nulls {
225 if null_mask.get(index).copied().unwrap_or(false) {
226 return Ok(SqlValue::Null);
227 }
228 }
229 values
230 .get(index)
231 .map(|v| SqlValue::Timestamp(microseconds_to_timestamp(*v)))
232 .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
233 }
234 }
235 }
236
237 pub fn data_type(&self) -> DataType {
239 match self {
240 Self::Int64(_, _) => DataType::Integer,
241 Self::Int32(_, _) => DataType::Integer,
242 Self::Float64(_, _) => DataType::DoublePrecision,
243 Self::Float32(_, _) => DataType::Real,
244 Self::String(_, _) => DataType::Varchar { max_length: None },
245 Self::FixedString(_, _) => DataType::Character { length: 255 },
246 Self::Date(_, _) => DataType::Date,
247 Self::Timestamp(_, _) => DataType::Timestamp { with_timezone: false },
248 Self::Boolean(_, _) => DataType::Boolean,
249 Self::Mixed(_) => DataType::Varchar { max_length: None }, }
251 }
252
253 pub fn as_i64(&self) -> Option<(&[i64], Option<&[bool]>)> {
255 match self {
256 Self::Int64(values, nulls) => {
257 Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
258 }
259 _ => None,
260 }
261 }
262
263 pub fn as_f64(&self) -> Option<(&[f64], Option<&[bool]>)> {
265 match self {
266 Self::Float64(values, nulls) => {
267 Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
268 }
269 _ => None,
270 }
271 }
272}
273
274fn days_since_epoch_to_date(days: i32) -> Date {
276 let mut year = 1970;
278 let mut remaining_days = days;
279
280 loop {
282 let year_days =
283 if year % 4 == 0 && (year % 100 != 0 || year % 400 == 0) { 366 } else { 365 };
284 if remaining_days < year_days {
285 break;
286 }
287 remaining_days -= year_days;
288 year += 1;
289 }
290
291 let is_leap = year % 4 == 0 && (year % 100 != 0 || year % 400 == 0);
293 let month_lengths = if is_leap {
294 [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
295 } else {
296 [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
297 };
298
299 let mut month = 1;
300 for &days_in_month in &month_lengths {
301 if remaining_days < days_in_month {
302 break;
303 }
304 remaining_days -= days_in_month;
305 month += 1;
306 }
307
308 let day = remaining_days + 1;
309
310 Date::new(year, month as u8, day as u8).unwrap_or_else(|_| Date::new(1970, 1, 1).unwrap())
311}
312
313fn microseconds_to_timestamp(micros: i64) -> Timestamp {
315 let days = (micros / 86_400_000_000) as i32;
316 let remaining_micros = micros % 86_400_000_000;
317
318 let date = days_since_epoch_to_date(days);
319
320 let hours = (remaining_micros / 3_600_000_000) as u8;
321 let remaining_micros = remaining_micros % 3_600_000_000;
322 let minutes = (remaining_micros / 60_000_000) as u8;
323 let remaining_micros = remaining_micros % 60_000_000;
324 let seconds = (remaining_micros / 1_000_000) as u8;
325 let nanoseconds = ((remaining_micros % 1_000_000) * 1_000) as u32;
326
327 let time = Time::new(hours, minutes, seconds, nanoseconds)
328 .unwrap_or_else(|_| Time::new(0, 0, 0, 0).unwrap());
329
330 Timestamp::new(date, time)
331}
332
333#[cfg(test)]
334mod tests {
335 use super::*;
336
337 #[test]
338 fn test_batch_to_rows_roundtrip() {
339 let original_rows = vec![
340 Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
341 Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
342 ];
343
344 let batch = ColumnarBatch::from_rows(&original_rows).unwrap();
345 let converted_rows = batch.to_rows().unwrap();
346
347 assert_eq!(converted_rows.len(), original_rows.len());
348 for (original, converted) in original_rows.iter().zip(converted_rows.iter()) {
349 assert_eq!(original.len(), converted.len());
350 for i in 0..original.len() {
351 assert_eq!(original.get(i), converted.get(i));
352 }
353 }
354 }
355
356 #[test]
357 fn test_simd_column_access() {
358 let rows = vec![
359 Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
360 Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
361 Row::new(vec![SqlValue::Integer(3), SqlValue::Double(30.5)]),
362 ];
363
364 let batch = ColumnarBatch::from_rows(&rows).unwrap();
365
366 let col0 = batch.column(0).unwrap();
368 if let Some((values, nulls)) = col0.as_i64() {
369 assert_eq!(values, &[1, 2, 3]);
370 assert!(nulls.is_none());
371 } else {
372 panic!("Expected i64 slice");
373 }
374
375 let col1 = batch.column(1).unwrap();
377 if let Some((values, nulls)) = col1.as_f64() {
378 assert_eq!(values, &[10.5, 20.5, 30.5]);
379 assert!(nulls.is_none());
380 } else {
381 panic!("Expected f64 slice");
382 }
383 }
384}