vibesql_executor/select/columnar/batch/
operations.rs

1//! Batch manipulation operations
2//!
3//! This module contains methods for accessing and manipulating
4//! `ColumnarBatch` and `ColumnArray` instances.
5
6#![allow(clippy::needless_range_loop)]
7
8use std::sync::Arc;
9
10use ahash::AHashSet;
11use vibesql_storage::Row;
12use vibesql_types::{DataType, Date, SqlValue, Time, Timestamp};
13
14use super::types::{ColumnArray, ColumnarBatch};
15use crate::errors::ExecutorError;
16
17impl ColumnarBatch {
18    /// Get the number of rows in this batch
19    pub fn row_count(&self) -> usize {
20        self.row_count
21    }
22
23    /// Get the number of columns in this batch
24    pub fn column_count(&self) -> usize {
25        self.columns.len()
26    }
27
28    /// Get a reference to a column array
29    pub fn column(&self, index: usize) -> Option<&ColumnArray> {
30        self.columns.get(index)
31    }
32
33    /// Get a mutable reference to a column array
34    pub fn column_mut(&mut self, index: usize) -> Option<&mut ColumnArray> {
35        self.columns.get_mut(index)
36    }
37
38    /// Add a column to the batch
39    pub fn add_column(&mut self, column: ColumnArray) -> Result<(), ExecutorError> {
40        // Verify column has correct length
41        let col_len = column.len();
42        if self.row_count > 0 && col_len != self.row_count {
43            return Err(ExecutorError::ColumnarLengthMismatch {
44                context: "add_column".to_string(),
45                expected: self.row_count,
46                actual: col_len,
47            });
48        }
49
50        if self.row_count == 0 {
51            self.row_count = col_len;
52        }
53
54        self.columns.push(column);
55        Ok(())
56    }
57
58    /// Set column names (for debugging)
59    pub fn set_column_names(&mut self, names: Vec<String>) {
60        self.column_names = Some(names);
61    }
62
63    /// Get column names
64    pub fn column_names(&self) -> Option<&[String]> {
65        self.column_names.as_deref()
66    }
67
68    /// Get column index by name
69    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
70        self.column_names.as_ref()?.iter().position(|n| n == name)
71    }
72
73    /// Get a value at a specific (row, column) position
74    pub fn get_value(&self, row_idx: usize, col_idx: usize) -> Result<SqlValue, ExecutorError> {
75        let column = self.column(col_idx).ok_or(ExecutorError::ColumnarColumnNotFound {
76            column_index: col_idx,
77            batch_columns: self.columns.len(),
78        })?;
79        column.get_value(row_idx)
80    }
81
82    /// Convert columnar batch back to row-oriented storage
83    ///
84    /// This implementation uses column-outer, row-inner iteration for better
85    /// cache locality. Instead of calling `get_value()` for each cell (O(n*m)
86    /// enum matches), we match on each column once and iterate through all
87    /// its values sequentially.
88    ///
89    /// # Performance
90    ///
91    /// For a 60,000 row × 15 column batch:
92    /// - Old approach: 900,000 `get_value()` calls with enum matching per cell
93    /// - New approach: 15 enum matches total, sequential memory access per column
94    ///
95    /// Expected 2-3x speedup due to:
96    /// - Single enum match per column (not per cell)
97    /// - Sequential memory access within each column array
98    /// - Better CPU cache utilization
99    pub fn to_rows(&self) -> Result<Vec<Row>, ExecutorError> {
100        if self.row_count == 0 {
101            return Ok(Vec::new());
102        }
103
104        // Pre-allocate all row value vectors
105        let num_cols = self.columns.len();
106        let mut row_values: Vec<Vec<SqlValue>> =
107            (0..self.row_count).map(|_| Vec::with_capacity(num_cols)).collect();
108
109        // Process column-by-column (cache-friendly)
110        for column in &self.columns {
111            column.append_values_to_rows(&mut row_values)?;
112        }
113
114        // Convert to Row objects
115        Ok(row_values.into_iter().map(Row::new).collect())
116    }
117
118    /// Deduplicate rows in the batch, returning a new batch with unique rows only
119    ///
120    /// Uses hash-based deduplication on all columns, preserving insertion order.
121    /// This implements DISTINCT semantics: NULL == NULL for uniqueness purposes.
122    ///
123    /// # Performance
124    ///
125    /// - O(n) time complexity where n is the number of rows
126    /// - Uses AHashSet for efficient duplicate detection
127    /// - Preserves the first occurrence of each unique row combination
128    ///
129    /// # Example
130    ///
131    /// ```text
132    /// // Original batch:
133    /// // [1, "A"], [2, "B"], [1, "A"], [3, "C"]
134    ///
135    /// // After deduplicate():
136    /// // [1, "A"], [2, "B"], [3, "C"]
137    /// ```
138    pub fn deduplicate(&self) -> Result<Self, ExecutorError> {
139        if self.row_count == 0 {
140            return Ok(self.clone());
141        }
142
143        // Track which row indices to keep (first occurrence of each unique row)
144        let mut seen: AHashSet<Vec<SqlValue>> = AHashSet::with_capacity(self.row_count);
145        let mut keep_indices: Vec<usize> = Vec::with_capacity(self.row_count);
146
147        for row_idx in 0..self.row_count {
148            // Extract all column values for this row as a key
149            let mut row_key = Vec::with_capacity(self.columns.len());
150            for col in &self.columns {
151                let value = col.get_value(row_idx)?;
152                row_key.push(value);
153            }
154
155            // If this row hasn't been seen, keep it
156            if seen.insert(row_key) {
157                keep_indices.push(row_idx);
158            }
159        }
160
161        // If no duplicates found, return self unchanged
162        if keep_indices.len() == self.row_count {
163            return Ok(self.clone());
164        }
165
166        log::debug!(
167            "Columnar deduplicate: {} rows -> {} unique rows",
168            self.row_count,
169            keep_indices.len()
170        );
171
172        // Build new batch with only the unique rows
173        self.select_rows(&keep_indices)
174    }
175
176    /// Select specific rows from the batch by index, returning a new batch
177    ///
178    /// # Arguments
179    ///
180    /// * `indices` - Row indices to select (must be valid for this batch)
181    ///
182    /// # Returns
183    ///
184    /// A new ColumnarBatch containing only the selected rows
185    pub fn select_rows(&self, indices: &[usize]) -> Result<Self, ExecutorError> {
186        if indices.is_empty() {
187            return Self::empty(self.columns.len());
188        }
189
190        let new_row_count = indices.len();
191        let mut new_columns = Vec::with_capacity(self.columns.len());
192
193        for column in &self.columns {
194            let new_column = column.select_rows(indices)?;
195            new_columns.push(new_column);
196        }
197
198        Ok(Self {
199            row_count: new_row_count,
200            columns: new_columns,
201            column_names: self.column_names.clone(),
202        })
203    }
204}
205
206impl ColumnArray {
207    /// Select specific rows from the column by index, returning a new column
208    fn select_rows(&self, indices: &[usize]) -> Result<Self, ExecutorError> {
209        match self {
210            Self::Int64(values, nulls) => {
211                let new_values: Vec<i64> = indices.iter().map(|&i| values[i]).collect();
212                let new_nulls = nulls
213                    .as_ref()
214                    .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
215                Ok(Self::Int64(Arc::new(new_values), new_nulls))
216            }
217            Self::Int32(values, nulls) => {
218                let new_values: Vec<i32> = indices.iter().map(|&i| values[i]).collect();
219                let new_nulls = nulls
220                    .as_ref()
221                    .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
222                Ok(Self::Int32(Arc::new(new_values), new_nulls))
223            }
224            Self::Float64(values, nulls) => {
225                let new_values: Vec<f64> = indices.iter().map(|&i| values[i]).collect();
226                let new_nulls = nulls
227                    .as_ref()
228                    .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
229                Ok(Self::Float64(Arc::new(new_values), new_nulls))
230            }
231            Self::Float32(values, nulls) => {
232                let new_values: Vec<f32> = indices.iter().map(|&i| values[i]).collect();
233                let new_nulls = nulls
234                    .as_ref()
235                    .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
236                Ok(Self::Float32(Arc::new(new_values), new_nulls))
237            }
238            Self::String(values, nulls) => {
239                let new_values: Vec<Arc<str>> =
240                    indices.iter().map(|&i| values[i].clone()).collect();
241                let new_nulls = nulls
242                    .as_ref()
243                    .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
244                Ok(Self::String(Arc::new(new_values), new_nulls))
245            }
246            Self::FixedString(values, nulls) => {
247                let new_values: Vec<Arc<str>> =
248                    indices.iter().map(|&i| values[i].clone()).collect();
249                let new_nulls = nulls
250                    .as_ref()
251                    .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
252                Ok(Self::FixedString(Arc::new(new_values), new_nulls))
253            }
254            Self::Date(values, nulls) => {
255                let new_values: Vec<i32> = indices.iter().map(|&i| values[i]).collect();
256                let new_nulls = nulls
257                    .as_ref()
258                    .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
259                Ok(Self::Date(Arc::new(new_values), new_nulls))
260            }
261            Self::Timestamp(values, nulls) => {
262                let new_values: Vec<i64> = indices.iter().map(|&i| values[i]).collect();
263                let new_nulls = nulls
264                    .as_ref()
265                    .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
266                Ok(Self::Timestamp(Arc::new(new_values), new_nulls))
267            }
268            Self::Boolean(values, nulls) => {
269                let new_values: Vec<u8> = indices.iter().map(|&i| values[i]).collect();
270                let new_nulls = nulls
271                    .as_ref()
272                    .map(|n| Arc::new(indices.iter().map(|&i| n[i]).collect::<Vec<_>>()));
273                Ok(Self::Boolean(Arc::new(new_values), new_nulls))
274            }
275            Self::Mixed(values) => {
276                let new_values: Vec<SqlValue> =
277                    indices.iter().map(|&i| values[i].clone()).collect();
278                Ok(Self::Mixed(Arc::new(new_values)))
279            }
280        }
281    }
282
283    /// Get the number of values in this column
284    pub fn len(&self) -> usize {
285        match self {
286            Self::Int64(v, _) => v.len(),
287            Self::Int32(v, _) => v.len(),
288            Self::Float64(v, _) => v.len(),
289            Self::Float32(v, _) => v.len(),
290            Self::String(v, _) => v.len(),
291            Self::FixedString(v, _) => v.len(),
292            Self::Date(v, _) => v.len(),
293            Self::Timestamp(v, _) => v.len(),
294            Self::Boolean(v, _) => v.len(),
295            Self::Mixed(v) => v.len(),
296        }
297    }
298
299    /// Check if column is empty
300    pub fn is_empty(&self) -> bool {
301        self.len() == 0
302    }
303
304    /// Get a value at the specified index as SqlValue
305    pub fn get_value(&self, index: usize) -> Result<SqlValue, ExecutorError> {
306        match self {
307            Self::Int64(values, nulls) => {
308                if let Some(null_mask) = nulls {
309                    if null_mask.get(index).copied().unwrap_or(false) {
310                        return Ok(SqlValue::Null);
311                    }
312                }
313                values
314                    .get(index)
315                    .map(|v| SqlValue::Integer(*v))
316                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
317            }
318
319            Self::Float64(values, nulls) => {
320                if let Some(null_mask) = nulls {
321                    if null_mask.get(index).copied().unwrap_or(false) {
322                        return Ok(SqlValue::Null);
323                    }
324                }
325                values
326                    .get(index)
327                    .map(|v| SqlValue::Double(*v))
328                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
329            }
330
331            Self::String(values, nulls) => {
332                if let Some(null_mask) = nulls {
333                    if null_mask.get(index).copied().unwrap_or(false) {
334                        return Ok(SqlValue::Null);
335                    }
336                }
337                values
338                    .get(index)
339                    .map(|v| SqlValue::Varchar(arcstr::ArcStr::from(v.as_ref())))
340                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
341            }
342
343            Self::Boolean(values, nulls) => {
344                if let Some(null_mask) = nulls {
345                    if null_mask.get(index).copied().unwrap_or(false) {
346                        return Ok(SqlValue::Null);
347                    }
348                }
349                values
350                    .get(index)
351                    .map(|v| SqlValue::Boolean(*v != 0))
352                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
353            }
354
355            Self::Mixed(values) => {
356                values.get(index).cloned().ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
357            }
358
359            Self::Int32(values, nulls) => {
360                if let Some(null_mask) = nulls {
361                    if null_mask.get(index).copied().unwrap_or(false) {
362                        return Ok(SqlValue::Null);
363                    }
364                }
365                values
366                    .get(index)
367                    .map(|v| SqlValue::Integer(*v as i64))
368                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
369            }
370
371            Self::Float32(values, nulls) => {
372                if let Some(null_mask) = nulls {
373                    if null_mask.get(index).copied().unwrap_or(false) {
374                        return Ok(SqlValue::Null);
375                    }
376                }
377                values
378                    .get(index)
379                    .map(|v| SqlValue::Float(*v))  // Float32 → SqlValue::Float (f32)
380                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
381            }
382
383            Self::FixedString(values, nulls) => {
384                if let Some(null_mask) = nulls {
385                    if null_mask.get(index).copied().unwrap_or(false) {
386                        return Ok(SqlValue::Null);
387                    }
388                }
389                values
390                    .get(index)
391                    .map(|v| SqlValue::Character(arcstr::ArcStr::from(v.as_ref())))
392                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
393            }
394
395            Self::Date(values, nulls) => {
396                if let Some(null_mask) = nulls {
397                    if null_mask.get(index).copied().unwrap_or(false) {
398                        return Ok(SqlValue::Null);
399                    }
400                }
401                values
402                    .get(index)
403                    .map(|v| SqlValue::Date(days_since_epoch_to_date(*v)))
404                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
405            }
406
407            Self::Timestamp(values, nulls) => {
408                if let Some(null_mask) = nulls {
409                    if null_mask.get(index).copied().unwrap_or(false) {
410                        return Ok(SqlValue::Null);
411                    }
412                }
413                values
414                    .get(index)
415                    .map(|v| SqlValue::Timestamp(microseconds_to_timestamp(*v)))
416                    .ok_or(ExecutorError::ColumnIndexOutOfBounds { index })
417            }
418        }
419    }
420
421    /// Append all values from this column to the corresponding row vectors.
422    ///
423    /// This is used by `ColumnarBatch::to_rows()` for cache-friendly column-major
424    /// iteration. Instead of calling `get_value()` for each cell, we match on
425    /// the column type once and iterate through all values sequentially.
426    ///
427    /// # Arguments
428    ///
429    /// * `row_values` - Mutable slice of row value vectors to append to
430    ///
431    /// # Performance
432    ///
433    /// This approach is significantly faster than per-cell `get_value()` because:
434    /// - Single enum match per column (not per cell)
435    /// - Sequential memory access within the column array
436    /// - NULL bitmap checked with simple indexing, not Option unwrapping per cell
437    pub(crate) fn append_values_to_rows(
438        &self,
439        row_values: &mut [Vec<SqlValue>],
440    ) -> Result<(), ExecutorError> {
441        match self {
442            Self::Int64(values, nulls) => {
443                if let Some(null_mask) = nulls {
444                    for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
445                        row_values[i].push(if is_null {
446                            SqlValue::Null
447                        } else {
448                            SqlValue::Integer(v)
449                        });
450                    }
451                } else {
452                    for (i, &v) in values.iter().enumerate() {
453                        row_values[i].push(SqlValue::Integer(v));
454                    }
455                }
456            }
457
458            Self::Int32(values, nulls) => {
459                if let Some(null_mask) = nulls {
460                    for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
461                        row_values[i].push(if is_null {
462                            SqlValue::Null
463                        } else {
464                            SqlValue::Integer(v as i64)
465                        });
466                    }
467                } else {
468                    for (i, &v) in values.iter().enumerate() {
469                        row_values[i].push(SqlValue::Integer(v as i64));
470                    }
471                }
472            }
473
474            Self::Float64(values, nulls) => {
475                if let Some(null_mask) = nulls {
476                    for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
477                        row_values[i].push(if is_null {
478                            SqlValue::Null
479                        } else {
480                            SqlValue::Double(v)
481                        });
482                    }
483                } else {
484                    for (i, &v) in values.iter().enumerate() {
485                        row_values[i].push(SqlValue::Double(v));
486                    }
487                }
488            }
489
490            Self::Float32(values, nulls) => {
491                if let Some(null_mask) = nulls {
492                    for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
493                        row_values[i].push(if is_null {
494                            SqlValue::Null
495                        } else {
496                            SqlValue::Float(v)  // Float32 → SqlValue::Float (f32)
497                        });
498                    }
499                } else {
500                    for (i, &v) in values.iter().enumerate() {
501                        row_values[i].push(SqlValue::Float(v));  // Float32 → SqlValue::Float (f32)
502                    }
503                }
504            }
505
506            Self::String(values, nulls) => {
507                if let Some(null_mask) = nulls {
508                    for (i, (v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
509                        row_values[i].push(if is_null {
510                            SqlValue::Null
511                        } else {
512                            SqlValue::Varchar(arcstr::ArcStr::from(v.as_ref()))
513                        });
514                    }
515                } else {
516                    for (i, v) in values.iter().enumerate() {
517                        row_values[i].push(SqlValue::Varchar(arcstr::ArcStr::from(v.as_ref())));
518                    }
519                }
520            }
521
522            Self::FixedString(values, nulls) => {
523                if let Some(null_mask) = nulls {
524                    for (i, (v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
525                        row_values[i].push(if is_null {
526                            SqlValue::Null
527                        } else {
528                            SqlValue::Character(arcstr::ArcStr::from(v.as_ref()))
529                        });
530                    }
531                } else {
532                    for (i, v) in values.iter().enumerate() {
533                        row_values[i].push(SqlValue::Character(arcstr::ArcStr::from(v.as_ref())));
534                    }
535                }
536            }
537
538            Self::Date(values, nulls) => {
539                if let Some(null_mask) = nulls {
540                    for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
541                        row_values[i].push(if is_null {
542                            SqlValue::Null
543                        } else {
544                            SqlValue::Date(days_since_epoch_to_date(v))
545                        });
546                    }
547                } else {
548                    for (i, &v) in values.iter().enumerate() {
549                        row_values[i].push(SqlValue::Date(days_since_epoch_to_date(v)));
550                    }
551                }
552            }
553
554            Self::Timestamp(values, nulls) => {
555                if let Some(null_mask) = nulls {
556                    for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
557                        row_values[i].push(if is_null {
558                            SqlValue::Null
559                        } else {
560                            SqlValue::Timestamp(microseconds_to_timestamp(v))
561                        });
562                    }
563                } else {
564                    for (i, &v) in values.iter().enumerate() {
565                        row_values[i].push(SqlValue::Timestamp(microseconds_to_timestamp(v)));
566                    }
567                }
568            }
569
570            Self::Boolean(values, nulls) => {
571                if let Some(null_mask) = nulls {
572                    for (i, (&v, &is_null)) in values.iter().zip(null_mask.iter()).enumerate() {
573                        row_values[i].push(if is_null {
574                            SqlValue::Null
575                        } else {
576                            SqlValue::Boolean(v != 0)
577                        });
578                    }
579                } else {
580                    for (i, &v) in values.iter().enumerate() {
581                        row_values[i].push(SqlValue::Boolean(v != 0));
582                    }
583                }
584            }
585
586            Self::Mixed(values) => {
587                for (i, v) in values.iter().enumerate() {
588                    row_values[i].push(v.clone());
589                }
590            }
591        }
592
593        Ok(())
594    }
595
596    /// Get the data type of this column
597    pub fn data_type(&self) -> DataType {
598        match self {
599            Self::Int64(_, _) => DataType::Integer,
600            Self::Int32(_, _) => DataType::Integer,
601            Self::Float64(_, _) => DataType::DoublePrecision,
602            Self::Float32(_, _) => DataType::Real,
603            Self::String(_, _) => DataType::Varchar { max_length: None },
604            Self::FixedString(_, _) => DataType::Character { length: 255 },
605            Self::Date(_, _) => DataType::Date,
606            Self::Timestamp(_, _) => DataType::Timestamp { with_timezone: false },
607            Self::Boolean(_, _) => DataType::Boolean,
608            Self::Mixed(_) => DataType::Varchar { max_length: None }, // fallback
609        }
610    }
611
612    /// Get raw i64 slice (for SIMD operations)
613    pub fn as_i64(&self) -> Option<(&[i64], Option<&[bool]>)> {
614        match self {
615            Self::Int64(values, nulls) => {
616                Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
617            }
618            _ => None,
619        }
620    }
621
622    /// Get raw f64 slice (for SIMD operations)
623    pub fn as_f64(&self) -> Option<(&[f64], Option<&[bool]>)> {
624        match self {
625            Self::Float64(values, nulls) => {
626                Some((values.as_slice(), nulls.as_ref().map(|n| n.as_slice())))
627            }
628            _ => None,
629        }
630    }
631}
632
633/// Convert days since Unix epoch to Date
634fn days_since_epoch_to_date(days: i32) -> Date {
635    // Simplified conversion: start from 1970-01-01 and count forward
636    let mut year = 1970;
637    let mut remaining_days = days;
638
639    // Handle years
640    loop {
641        let year_days =
642            if year % 4 == 0 && (year % 100 != 0 || year % 400 == 0) { 366 } else { 365 };
643        if remaining_days < year_days {
644            break;
645        }
646        remaining_days -= year_days;
647        year += 1;
648    }
649
650    // Handle months
651    let is_leap = year % 4 == 0 && (year % 100 != 0 || year % 400 == 0);
652    let month_lengths = if is_leap {
653        [31, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
654    } else {
655        [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
656    };
657
658    let mut month = 1;
659    for &days_in_month in &month_lengths {
660        if remaining_days < days_in_month {
661            break;
662        }
663        remaining_days -= days_in_month;
664        month += 1;
665    }
666
667    let day = remaining_days + 1;
668
669    Date::new(year, month as u8, day as u8).unwrap_or_else(|_| Date::new(1970, 1, 1).unwrap())
670}
671
672/// Convert microseconds since Unix epoch to Timestamp
673fn microseconds_to_timestamp(micros: i64) -> Timestamp {
674    let days = (micros / 86_400_000_000) as i32;
675    let remaining_micros = micros % 86_400_000_000;
676
677    let date = days_since_epoch_to_date(days);
678
679    let hours = (remaining_micros / 3_600_000_000) as u8;
680    let remaining_micros = remaining_micros % 3_600_000_000;
681    let minutes = (remaining_micros / 60_000_000) as u8;
682    let remaining_micros = remaining_micros % 60_000_000;
683    let seconds = (remaining_micros / 1_000_000) as u8;
684    let nanoseconds = ((remaining_micros % 1_000_000) * 1_000) as u32;
685
686    let time = Time::new(hours, minutes, seconds, nanoseconds)
687        .unwrap_or_else(|_| Time::new(0, 0, 0, 0).unwrap());
688
689    Timestamp::new(date, time)
690}
691
692#[cfg(test)]
693mod tests {
694    use super::*;
695
696    #[test]
697    fn test_batch_to_rows_roundtrip() {
698        let original_rows = vec![
699            Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
700            Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
701        ];
702
703        let batch = ColumnarBatch::from_rows(&original_rows).unwrap();
704        let converted_rows = batch.to_rows().unwrap();
705
706        assert_eq!(converted_rows.len(), original_rows.len());
707        for (original, converted) in original_rows.iter().zip(converted_rows.iter()) {
708            assert_eq!(original.len(), converted.len());
709            for i in 0..original.len() {
710                assert_eq!(original.get(i), converted.get(i));
711            }
712        }
713    }
714
715    #[test]
716    fn test_simd_column_access() {
717        let rows = vec![
718            Row::new(vec![SqlValue::Integer(1), SqlValue::Double(10.5)]),
719            Row::new(vec![SqlValue::Integer(2), SqlValue::Double(20.5)]),
720            Row::new(vec![SqlValue::Integer(3), SqlValue::Double(30.5)]),
721        ];
722
723        let batch = ColumnarBatch::from_rows(&rows).unwrap();
724
725        // Access i64 column for SIMD
726        let col0 = batch.column(0).unwrap();
727        if let Some((values, nulls)) = col0.as_i64() {
728            assert_eq!(values, &[1, 2, 3]);
729            assert!(nulls.is_none());
730        } else {
731            panic!("Expected i64 slice");
732        }
733
734        // Access f64 column for SIMD
735        let col1 = batch.column(1).unwrap();
736        if let Some((values, nulls)) = col1.as_f64() {
737            assert_eq!(values, &[10.5, 20.5, 30.5]);
738            assert!(nulls.is_none());
739        } else {
740            panic!("Expected f64 slice");
741        }
742    }
743
744    #[test]
745    fn test_deduplicate_with_duplicates() {
746        // Create batch with duplicate rows
747        let rows = vec![
748            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("A"))]),
749            Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("B"))]),
750            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("A"))]), /* duplicate */
751            Row::new(vec![SqlValue::Integer(3), SqlValue::Varchar(arcstr::ArcStr::from("C"))]),
752            Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("B"))]), /* duplicate */
753        ];
754
755        let batch = ColumnarBatch::from_rows(&rows).unwrap();
756        assert_eq!(batch.row_count(), 5);
757
758        let deduped = batch.deduplicate().unwrap();
759        assert_eq!(deduped.row_count(), 3);
760
761        // Verify the first occurrences are kept in order
762        let result_rows = deduped.to_rows().unwrap();
763        assert_eq!(result_rows[0].get(0), Some(&SqlValue::Integer(1)));
764        assert_eq!(result_rows[0].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("A"))));
765        assert_eq!(result_rows[1].get(0), Some(&SqlValue::Integer(2)));
766        assert_eq!(result_rows[1].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("B"))));
767        assert_eq!(result_rows[2].get(0), Some(&SqlValue::Integer(3)));
768        assert_eq!(result_rows[2].get(1), Some(&SqlValue::Varchar(arcstr::ArcStr::from("C"))));
769    }
770
771    #[test]
772    fn test_deduplicate_no_duplicates() {
773        // All unique rows
774        let rows = vec![
775            Row::new(vec![SqlValue::Integer(1)]),
776            Row::new(vec![SqlValue::Integer(2)]),
777            Row::new(vec![SqlValue::Integer(3)]),
778        ];
779
780        let batch = ColumnarBatch::from_rows(&rows).unwrap();
781        let deduped = batch.deduplicate().unwrap();
782
783        assert_eq!(deduped.row_count(), 3);
784    }
785
786    #[test]
787    fn test_deduplicate_with_nulls() {
788        // Test NULL handling: NULL == NULL for DISTINCT purposes
789        let rows = vec![
790            Row::new(vec![SqlValue::Null, SqlValue::Varchar(arcstr::ArcStr::from("A"))]),
791            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("B"))]),
792            Row::new(vec![SqlValue::Null, SqlValue::Varchar(arcstr::ArcStr::from("A"))]), /* duplicate */
793        ];
794
795        let batch = ColumnarBatch::from_rows(&rows).unwrap();
796        let deduped = batch.deduplicate().unwrap();
797
798        assert_eq!(deduped.row_count(), 2);
799    }
800
801    #[test]
802    fn test_deduplicate_empty_batch() {
803        let batch = ColumnarBatch::new(2);
804        let deduped = batch.deduplicate().unwrap();
805        assert_eq!(deduped.row_count(), 0);
806    }
807
808    #[test]
809    fn test_select_rows() {
810        let rows = vec![
811            Row::new(vec![SqlValue::Integer(1), SqlValue::Varchar(arcstr::ArcStr::from("A"))]),
812            Row::new(vec![SqlValue::Integer(2), SqlValue::Varchar(arcstr::ArcStr::from("B"))]),
813            Row::new(vec![SqlValue::Integer(3), SqlValue::Varchar(arcstr::ArcStr::from("C"))]),
814            Row::new(vec![SqlValue::Integer(4), SqlValue::Varchar(arcstr::ArcStr::from("D"))]),
815        ];
816
817        let batch = ColumnarBatch::from_rows(&rows).unwrap();
818
819        // Select rows 0, 2 (indices)
820        let selected = batch.select_rows(&[0, 2]).unwrap();
821        assert_eq!(selected.row_count(), 2);
822
823        let result_rows = selected.to_rows().unwrap();
824        assert_eq!(result_rows[0].get(0), Some(&SqlValue::Integer(1)));
825        assert_eq!(result_rows[1].get(0), Some(&SqlValue::Integer(3)));
826    }
827}