vibesql_executor/select/columnar/
mod.rs

1//! Columnar execution for high-performance aggregation queries
2//!
3//! This module implements column-oriented query execution that avoids
4//! materializing full Row objects during table scans, providing 8-10x speedup
5//! for aggregation-heavy workloads.
6//!
7//! ## Architecture
8//!
9//! Instead of:
10//! ```text
11//! TableScan → Row{Vec<SqlValue>} → Filter(Row) → Aggregate(Row) → Vec<Row>
12//! ```
13//!
14//! We use:
15//! ```text
16//! TableScan → ColumnRefs → Filter(native types) → Aggregate → Row
17//! ```
18//!
19//! ## Benefits
20//!
21//! - **Zero-copy**: Work with `&SqlValue` references instead of cloning
22//! - **Cache-friendly**: Access contiguous column data instead of scattered row data
23//! - **Type-specialized**: Skip SqlValue enum matching overhead for filters/aggregates
24//! - **Minimal allocations**: Only allocate result rows, not intermediate data
25//!
26//! ## Usage
27//!
28//! This path is automatically selected for simple aggregate queries that:
29//! - Have a single table scan (no JOINs)
30//! - Use simple WHERE predicates
31//! - Compute aggregates (SUM, COUNT, AVG, MIN, MAX)
32//! - Don't use window functions or complex subqueries
33
34mod aggregate;
35pub mod batch;
36mod executor;
37pub mod filter;
38mod scan;
39mod string_ops;
40
41// Auto-vectorized SIMD operations - replaces the `wide` crate dependency
42// See simd_ops.rs for documentation on why these patterns are structured this way
43pub mod simd_ops;
44
45mod simd_aggregate;
46pub mod simd_filter;
47mod simd_join;
48
49pub use aggregate::{
50    columnar_group_by, columnar_group_by_batch, compute_multiple_aggregates,
51    evaluate_expression_to_column, evaluate_expression_with_cached_column, extract_aggregates,
52    AggregateOp, AggregateSource, AggregateSpec,
53};
54pub use batch::{ColumnArray, ColumnarBatch};
55pub use executor::execute_columnar_batch;
56pub use filter::{
57    apply_columnar_filter, apply_columnar_filter_simd_streaming, create_filter_bitmap,
58    create_filter_bitmap_tree, evaluate_predicate_tree, extract_column_predicates,
59    extract_predicate_tree, ColumnPredicate, PredicateTree,
60};
61pub use scan::ColumnarScan;
62
63pub use aggregate::compute_aggregates_from_batch;
64pub use simd_aggregate::{can_use_simd_for_column, simd_aggregate_f64, simd_aggregate_i64};
65pub use simd_filter::{simd_create_filter_mask, simd_create_filter_mask_packed, simd_filter_batch};
66pub use simd_join::columnar_hash_join_inner;
67pub use simd_ops::PackedMask;
68
69use crate::errors::ExecutorError;
70use crate::schema::CombinedSchema;
71use log;
72use vibesql_storage::Row;
73use vibesql_types::SqlValue;
74
75/// Execute a columnar aggregate query with filtering
76///
77/// This is a simplified entry point for columnar execution that demonstrates
78/// the full pipeline: scan → filter → aggregate.
79///
80/// # Arguments
81///
82/// * `rows` - Input rows to process
83/// * `predicates` - Column predicates for filtering (optional)
84/// * `aggregates` - List of (column_index, aggregate_op) pairs to compute
85///
86/// # Returns
87///
88/// A single Row containing the computed aggregate values
89///
90/// # Example
91///
92/// ```text
93/// // Compute SUM(col0), AVG(col1) WHERE col2 < 100
94/// let predicates = vec![
95///     ColumnPredicate::LessThan {
96///         column_idx: 2,
97///         value: SqlValue::Integer(100),
98///     },
99/// ];
100/// let aggregates = vec![
101///     (0, AggregateOp::Sum),
102///     (1, AggregateOp::Avg),
103/// ];
104///
105/// let result = execute_columnar_aggregate(&rows, &predicates, &aggregates)?;
106/// ```
107///
108/// Note: This function provides SIMD-accelerated filtering and aggregation through
109/// LLVM auto-vectorization of batch-native operations.
110pub fn execute_columnar_aggregate(
111    rows: &[Row],
112    predicates: &[ColumnPredicate],
113    aggregates: &[aggregate::AggregateSpec],
114    schema: Option<&CombinedSchema>,
115) -> Result<Vec<Row>, ExecutorError> {
116    // Early return for empty input
117    // SQL standard: COUNT returns 0 for empty input, other aggregates return NULL
118    if rows.is_empty() {
119        let values: Vec<SqlValue> = aggregates
120            .iter()
121            .map(|spec| match spec.op {
122                aggregate::AggregateOp::Count => SqlValue::Integer(0),
123                _ => SqlValue::Null,
124            })
125            .collect();
126        return Ok(vec![Row::new(values)]);
127    }
128
129    // Phase 1: Convert to columnar batch for SIMD acceleration
130    #[cfg(feature = "profile-q6")]
131    let batch_start = std::time::Instant::now();
132
133    let batch = ColumnarBatch::from_rows(rows)?;
134
135    #[cfg(feature = "profile-q6")]
136    {
137        let batch_time = batch_start.elapsed();
138        eprintln!("[PROFILE-Q6]   Phase 1 - Convert to batch: {:?}", batch_time);
139    }
140
141    // Phase 2: Apply SIMD-accelerated filtering
142    #[cfg(feature = "profile-q6")]
143    let filter_start = std::time::Instant::now();
144
145    let filtered_batch =
146        if predicates.is_empty() { batch.clone() } else { simd_filter_batch(&batch, predicates)? };
147
148    #[cfg(feature = "profile-q6")]
149    {
150        let filter_time = filter_start.elapsed();
151        eprintln!(
152            "[PROFILE-Q6]   Phase 2 - SIMD filter: {:?} ({}/{} rows passed)",
153            filter_time,
154            filtered_batch.row_count(),
155            rows.len()
156        );
157    }
158
159    // Phase 3: Compute aggregates directly on batch (no row conversion!)
160    #[cfg(feature = "profile-q6")]
161    let agg_start = std::time::Instant::now();
162
163    // Use batch-native aggregation to avoid to_rows() conversion overhead
164    let results = compute_aggregates_from_batch(&filtered_batch, aggregates, schema)?;
165
166    #[cfg(feature = "profile-q6")]
167    {
168        let agg_time = agg_start.elapsed();
169        eprintln!(
170            "[PROFILE-Q6]   Phase 3 - Batch-native aggregate: {:?} ({} aggregates)",
171            agg_time,
172            aggregates.len()
173        );
174    }
175
176    // Return as single row
177    Ok(vec![Row::new(results)])
178}
179
180/// Fast single-pass aggregate on rows - avoids batch conversion overhead
181///
182/// This function performs filtering and aggregation in a single pass over the input rows,
183/// without converting to columnar format. It's 3-5x faster than `execute_columnar_aggregate`
184/// for queries that come from row-based storage.
185///
186/// # Use Cases
187///
188/// Best suited for:
189/// - Simple aggregate queries without GROUP BY
190/// - When data arrives as Vec<Row> (not native columnar)
191/// - TPC-H style queries: SUM(price * discount) WHERE ...
192///
193/// # Arguments
194///
195/// * `rows` - Input rows to process
196/// * `predicates` - Column predicates for filtering
197/// * `aggregates` - Aggregate specifications
198///
199/// # Returns
200///
201/// A single Row containing the computed aggregate values
202pub fn fast_aggregate_on_rows(
203    rows: &[Row],
204    predicates: &[ColumnPredicate],
205    aggregates: &[aggregate::AggregateSpec],
206) -> Result<Vec<Row>, ExecutorError> {
207    use aggregate::{AggregateOp, AggregateSource};
208
209    // Early return for empty input
210    if rows.is_empty() {
211        let values: Vec<SqlValue> = aggregates
212            .iter()
213            .map(|spec| match spec.op {
214                AggregateOp::Count => SqlValue::Integer(0),
215                _ => SqlValue::Null,
216            })
217            .collect();
218        return Ok(vec![Row::new(values)]);
219    }
220
221    // Initialize accumulators for each aggregate
222    struct Accumulator {
223        sum_f64: f64,
224        sum_i64: i64,
225        count: i64,
226        min_f64: Option<f64>,
227        max_f64: Option<f64>,
228        min_i64: Option<i64>,
229        max_i64: Option<i64>,
230        is_integer: bool,
231    }
232
233    let mut accumulators: Vec<Accumulator> = aggregates
234        .iter()
235        .map(|_| Accumulator {
236            sum_f64: 0.0,
237            sum_i64: 0,
238            count: 0,
239            min_f64: None,
240            max_f64: None,
241            min_i64: None,
242            max_i64: None,
243            is_integer: true,
244        })
245        .collect();
246
247    // Single pass: filter and accumulate
248    for row in rows {
249        // Check all predicates
250        let passes_filter = predicates.iter().all(|pred| evaluate_predicate(row, pred));
251
252        if !passes_filter {
253            continue;
254        }
255
256        // Accumulate values for each aggregate
257        for (i, spec) in aggregates.iter().enumerate() {
258            let acc = &mut accumulators[i];
259
260            match &spec.source {
261                AggregateSource::CountStar => {
262                    acc.count += 1;
263                }
264                AggregateSource::Column(col_idx) => {
265                    if let Some(value) = row.get(*col_idx) {
266                        if !matches!(value, SqlValue::Null) {
267                            acc.count += 1;
268                            match value {
269                                SqlValue::Integer(v) => {
270                                    acc.sum_i64 += v;
271                                    acc.sum_f64 += *v as f64;
272                                    acc.min_i64 = Some(acc.min_i64.map_or(*v, |m| m.min(*v)));
273                                    acc.max_i64 = Some(acc.max_i64.map_or(*v, |m| m.max(*v)));
274                                    acc.min_f64 =
275                                        Some(acc.min_f64.map_or(*v as f64, |m| m.min(*v as f64)));
276                                    acc.max_f64 =
277                                        Some(acc.max_f64.map_or(*v as f64, |m| m.max(*v as f64)));
278                                }
279                                SqlValue::Double(v) => {
280                                    acc.is_integer = false;
281                                    acc.sum_f64 += v;
282                                    acc.min_f64 = Some(acc.min_f64.map_or(*v, |m| m.min(*v)));
283                                    acc.max_f64 = Some(acc.max_f64.map_or(*v, |m| m.max(*v)));
284                                }
285                                SqlValue::Float(v) => {
286                                    acc.is_integer = false;
287                                    acc.sum_f64 += *v as f64;
288                                    acc.min_f64 =
289                                        Some(acc.min_f64.map_or(*v as f64, |m| m.min(*v as f64)));
290                                    acc.max_f64 =
291                                        Some(acc.max_f64.map_or(*v as f64, |m| m.max(*v as f64)));
292                                }
293                                SqlValue::Bigint(v) => {
294                                    acc.sum_i64 += v;
295                                    acc.sum_f64 += *v as f64;
296                                    acc.min_i64 = Some(acc.min_i64.map_or(*v, |m| m.min(*v)));
297                                    acc.max_i64 = Some(acc.max_i64.map_or(*v, |m| m.max(*v)));
298                                    acc.min_f64 =
299                                        Some(acc.min_f64.map_or(*v as f64, |m| m.min(*v as f64)));
300                                    acc.max_f64 =
301                                        Some(acc.max_f64.map_or(*v as f64, |m| m.max(*v as f64)));
302                                }
303                                SqlValue::Numeric(v) => {
304                                    acc.is_integer = false;
305                                    acc.sum_f64 += v;
306                                    acc.min_f64 = Some(acc.min_f64.map_or(*v, |m| m.min(*v)));
307                                    acc.max_f64 = Some(acc.max_f64.map_or(*v, |m| m.max(*v)));
308                                }
309                                _ => {}
310                            }
311                        }
312                    }
313                }
314                AggregateSource::Expression(expr) => {
315                    // For expression aggregates like SUM(a * b), evaluate the expression
316                    // This is a simplified evaluator for common binary operations
317                    if let Some(value) = eval_simple_expression(row, expr) {
318                        acc.count += 1;
319                        acc.is_integer = false;
320                        acc.sum_f64 += value;
321                        acc.min_f64 = Some(acc.min_f64.map_or(value, |m| m.min(value)));
322                        acc.max_f64 = Some(acc.max_f64.map_or(value, |m| m.max(value)));
323                    }
324                }
325            }
326        }
327    }
328
329    // Build result row from accumulators
330    let values: Vec<SqlValue> = aggregates
331        .iter()
332        .zip(accumulators.iter())
333        .map(|(spec, acc)| match spec.op {
334            AggregateOp::Count => SqlValue::Integer(acc.count),
335            AggregateOp::Sum => {
336                if acc.count == 0 {
337                    SqlValue::Null
338                } else if acc.is_integer {
339                    SqlValue::Integer(acc.sum_i64)
340                } else {
341                    SqlValue::Double(acc.sum_f64)
342                }
343            }
344            AggregateOp::Avg => {
345                if acc.count == 0 {
346                    SqlValue::Null
347                } else {
348                    SqlValue::Double(acc.sum_f64 / acc.count as f64)
349                }
350            }
351            AggregateOp::Min => {
352                if acc.is_integer {
353                    acc.min_i64.map(SqlValue::Integer).unwrap_or(SqlValue::Null)
354                } else {
355                    acc.min_f64.map(SqlValue::Double).unwrap_or(SqlValue::Null)
356                }
357            }
358            AggregateOp::Max => {
359                if acc.is_integer {
360                    acc.max_i64.map(SqlValue::Integer).unwrap_or(SqlValue::Null)
361                } else {
362                    acc.max_f64.map(SqlValue::Double).unwrap_or(SqlValue::Null)
363                }
364            }
365        })
366        .collect();
367
368    Ok(vec![Row::new(values)])
369}
370
371/// Evaluate a column predicate against a row
372fn evaluate_predicate(row: &Row, predicate: &ColumnPredicate) -> bool {
373    match predicate {
374        ColumnPredicate::LessThan { column_idx, value } => row
375            .get(*column_idx)
376            .map(|v| compare_values(v, value) == std::cmp::Ordering::Less)
377            .unwrap_or(false),
378        ColumnPredicate::LessThanOrEqual { column_idx, value } => row
379            .get(*column_idx)
380            .map(|v| compare_values(v, value) != std::cmp::Ordering::Greater)
381            .unwrap_or(false),
382        ColumnPredicate::GreaterThan { column_idx, value } => row
383            .get(*column_idx)
384            .map(|v| compare_values(v, value) == std::cmp::Ordering::Greater)
385            .unwrap_or(false),
386        ColumnPredicate::GreaterThanOrEqual { column_idx, value } => row
387            .get(*column_idx)
388            .map(|v| compare_values(v, value) != std::cmp::Ordering::Less)
389            .unwrap_or(false),
390        ColumnPredicate::Equal { column_idx, value } => row
391            .get(*column_idx)
392            .map(|v| compare_values(v, value) == std::cmp::Ordering::Equal)
393            .unwrap_or(false),
394        ColumnPredicate::NotEqual { column_idx, value } => row
395            .get(*column_idx)
396            .map(|v| compare_values(v, value) != std::cmp::Ordering::Equal)
397            .unwrap_or(false),
398        ColumnPredicate::Between { column_idx, low, high } => row
399            .get(*column_idx)
400            .map(|v| {
401                compare_values(v, low) != std::cmp::Ordering::Less
402                    && compare_values(v, high) != std::cmp::Ordering::Greater
403            })
404            .unwrap_or(false),
405        ColumnPredicate::Like { column_idx, pattern, negated } => {
406            // Simple LIKE pattern matching for row-based fast path
407            let matches = row
408                .get(*column_idx)
409                .map(|v| {
410                    if let SqlValue::Varchar(s) = v {
411                        // Convert SQL LIKE pattern to simple check
412                        // This is a simplified version - full LIKE support is in simd_filter
413                        let pattern_str = pattern.as_str();
414                        if let Some(inner) =
415                            pattern_str.strip_prefix('%').and_then(|s| s.strip_suffix('%'))
416                        {
417                            s.contains(inner)
418                        } else if let Some(suffix) = pattern_str.strip_prefix('%') {
419                            s.ends_with(suffix)
420                        } else if let Some(prefix) = pattern_str.strip_suffix('%') {
421                            s.starts_with(prefix)
422                        } else {
423                            s == pattern_str
424                        }
425                    } else {
426                        false
427                    }
428                })
429                .unwrap_or(false);
430            if *negated {
431                !matches
432            } else {
433                matches
434            }
435        }
436        ColumnPredicate::InList { column_idx, values, negated } => {
437            // Check if column value matches any value in the list
438            let matches = row
439                .get(*column_idx)
440                .map(|v| {
441                    values
442                        .iter()
443                        .any(|list_val| compare_values(v, list_val) == std::cmp::Ordering::Equal)
444                })
445                .unwrap_or(false);
446            if *negated {
447                !matches
448            } else {
449                matches
450            }
451        }
452    }
453}
454
455/// Compare two SqlValues
456fn compare_values(a: &SqlValue, b: &SqlValue) -> std::cmp::Ordering {
457    use std::cmp::Ordering;
458
459    match (a, b) {
460        (SqlValue::Integer(a), SqlValue::Integer(b)) => a.cmp(b),
461        (SqlValue::Bigint(a), SqlValue::Bigint(b)) => a.cmp(b),
462        (SqlValue::Double(a), SqlValue::Double(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
463        (SqlValue::Float(a), SqlValue::Float(b)) => a.partial_cmp(b).unwrap_or(Ordering::Equal),
464        // Cross-type comparisons
465        (SqlValue::Integer(a), SqlValue::Double(b)) => {
466            (*a as f64).partial_cmp(b).unwrap_or(Ordering::Equal)
467        }
468        (SqlValue::Double(a), SqlValue::Integer(b)) => {
469            a.partial_cmp(&(*b as f64)).unwrap_or(Ordering::Equal)
470        }
471        (SqlValue::Integer(a), SqlValue::Bigint(b)) => (*a).cmp(b),
472        (SqlValue::Bigint(a), SqlValue::Integer(b)) => a.cmp(&{ *b }),
473        // String comparisons
474        (SqlValue::Varchar(a), SqlValue::Varchar(b)) => a.cmp(b),
475        // Date comparisons
476        (SqlValue::Date(a), SqlValue::Date(b)) => {
477            // Compare year, month, day
478            match a.year.cmp(&b.year) {
479                Ordering::Equal => match a.month.cmp(&b.month) {
480                    Ordering::Equal => a.day.cmp(&b.day),
481                    other => other,
482                },
483                other => other,
484            }
485        }
486        // NULL handling
487        (SqlValue::Null, _) | (_, SqlValue::Null) => Ordering::Equal, // NULL comparisons are undefined
488        _ => Ordering::Equal,                                         // Incompatible types
489    }
490}
491
492/// Evaluate a simple expression (for expression aggregates)
493#[allow(clippy::only_used_in_recursion)]
494fn eval_simple_expression(row: &Row, expr: &vibesql_ast::Expression) -> Option<f64> {
495    use vibesql_ast::{BinaryOperator, Expression};
496
497    match expr {
498        Expression::BinaryOp { left, op, right } => {
499            let left_val = eval_simple_expression(row, left)?;
500            let right_val = eval_simple_expression(row, right)?;
501            match op {
502                BinaryOperator::Multiply => Some(left_val * right_val),
503                BinaryOperator::Divide => Some(left_val / right_val),
504                BinaryOperator::Plus => Some(left_val + right_val),
505                BinaryOperator::Minus => Some(left_val - right_val),
506                _ => None,
507            }
508        }
509        Expression::ColumnRef { column, .. } => {
510            // Cannot resolve column names without a schema - return None to skip
511            // the fast path and fall back to the columnar execution path which
512            // properly handles expression aggregates with schema resolution.
513            log::debug!(
514                "fast_aggregate_on_rows: ColumnRef '{}' requires schema resolution, skipping fast path",
515                column
516            );
517            None
518        }
519        Expression::Literal(val) => match val {
520            SqlValue::Integer(v) => Some(*v as f64),
521            SqlValue::Double(v) => Some(*v),
522            SqlValue::Float(v) => Some(*v as f64),
523            SqlValue::Bigint(v) => Some(*v as f64),
524            SqlValue::Numeric(v) => Some(*v),
525            _ => None,
526        },
527        _ => None,
528    }
529}
530
531/// Execute a query using columnar processing (AST-based interface)
532///
533/// This is the entry point for columnar execution that accepts AST expressions
534/// and converts them to the columnar execution pipeline.
535///
536/// # Arguments
537///
538/// * `rows` - The rows to process
539/// * `filter` - Optional WHERE clause expression
540/// * `aggregates` - SELECT list aggregate expressions
541/// * `schema` - Schema for resolving column names to indices
542///
543/// # Returns
544///
545/// Some(Result) if the query can be optimized using columnar execution,
546/// None if the expressions are too complex for columnar optimization.
547///
548/// Note: This function uses LLVM auto-vectorization for vectorized execution.
549pub fn execute_columnar(
550    rows: &[Row],
551    filter: Option<&vibesql_ast::Expression>,
552    aggregates: &[vibesql_ast::Expression],
553    schema: &CombinedSchema,
554) -> Option<Result<Vec<Row>, ExecutorError>> {
555    log::debug!("  Executing columnar query with {} rows", rows.len());
556
557    // Extract column predicates from filter expression
558    let predicates = if let Some(filter_expr) = filter {
559        match extract_column_predicates(filter_expr, schema) {
560            Some(preds) => {
561                log::debug!("    ✓ Extracted {} column predicates for SIMD filtering", preds.len());
562                preds
563            }
564            None => {
565                log::debug!("    ✗ Filter too complex for columnar optimization");
566                return None; // Too complex for columnar optimization
567            }
568        }
569    } else {
570        log::debug!("    No filter predicates");
571        vec![] // No filter
572    };
573
574    // Extract aggregates from SELECT list
575    let agg_specs = match extract_aggregates(aggregates, schema) {
576        Some(specs) => {
577            log::debug!("    ✓ Extracted {} aggregate operations", specs.len());
578            for (i, spec) in specs.iter().enumerate() {
579                log::debug!("      Aggregate {}: {:?}", i + 1, spec.op);
580            }
581            specs
582        }
583        None => {
584            log::debug!("    ✗ Aggregates too complex for columnar optimization");
585            return None; // Too complex for columnar optimization
586        }
587    };
588
589    // Call the simplified interface, passing schema if any aggregates use expressions
590    let needs_schema = agg_specs
591        .iter()
592        .any(|spec| matches!(spec.source, aggregate::AggregateSource::Expression(_)));
593    let schema_ref = if needs_schema { Some(schema) } else { None };
594
595    log::debug!("    Executing SIMD-accelerated columnar aggregation");
596    Some(execute_columnar_aggregate(rows, &predicates, &agg_specs, schema_ref))
597}
598
599#[cfg(test)]
600mod tests {
601    use super::*;
602    use vibesql_types::Date;
603
604    /// Test the full columnar pipeline: filter + aggregation
605    #[test]
606    fn test_columnar_pipeline_filtered_sum() {
607        // Create test data: TPC-H Q6 style query
608        // SELECT SUM(l_extendedprice * l_discount)
609        // WHERE l_shipdate >= '1994-01-01'
610        //   AND l_shipdate < '1995-01-01'
611        //   AND l_discount BETWEEN 0.05 AND 0.07
612        //   AND l_quantity < 24
613
614        let rows = vec![
615            Row::new(vec![
616                SqlValue::Integer(10),   // quantity
617                SqlValue::Double(100.0), // extendedprice
618                SqlValue::Double(0.06),  // discount
619                SqlValue::Date(Date::new(1994, 6, 1).unwrap()),
620            ]),
621            Row::new(vec![
622                SqlValue::Integer(25), // quantity (filtered out: > 24)
623                SqlValue::Double(200.0),
624                SqlValue::Double(0.06),
625                SqlValue::Date(Date::new(1994, 7, 1).unwrap()),
626            ]),
627            Row::new(vec![
628                SqlValue::Integer(15), // quantity
629                SqlValue::Double(150.0),
630                SqlValue::Double(0.05), // discount
631                SqlValue::Date(Date::new(1994, 8, 1).unwrap()),
632            ]),
633            Row::new(vec![
634                SqlValue::Integer(20), // quantity
635                SqlValue::Double(180.0),
636                SqlValue::Double(0.08), // discount (filtered out: > 0.07)
637                SqlValue::Date(Date::new(1994, 9, 1).unwrap()),
638            ]),
639        ];
640
641        // Predicates: quantity < 24 AND discount BETWEEN 0.05 AND 0.07
642        let predicates = vec![
643            ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(24) },
644            ColumnPredicate::Between {
645                column_idx: 2,
646                low: SqlValue::Double(0.05),
647                high: SqlValue::Double(0.07),
648            },
649        ];
650
651        // Aggregates: SUM(extendedprice), COUNT(*)
652        let aggregates = vec![
653            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(1) }, // SUM(extendedprice)
654            AggregateSpec { op: AggregateOp::Count, source: AggregateSource::Column(0) }, // COUNT(*)
655        ];
656
657        let result = execute_columnar_aggregate(&rows, &predicates, &aggregates, None).unwrap();
658
659        assert_eq!(result.len(), 1);
660        let result_row = &result[0];
661
662        // Only rows 0 and 2 pass the filter (quantity < 24 AND discount in range)
663        // SUM(extendedprice) = 100.0 + 150.0 = 250.0
664        assert!(
665            matches!(result_row.get(0), Some(&SqlValue::Double(sum)) if (sum - 250.0).abs() < 0.001)
666        );
667        // COUNT(*) = 2
668        assert_eq!(result_row.get(1), Some(&SqlValue::Integer(2)));
669    }
670
671    /// Test columnar execution with no filtering
672    #[test]
673    fn test_columnar_pipeline_no_filter() {
674        let rows = vec![
675            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
676            Row::new(vec![SqlValue::Integer(20), SqlValue::Double(2.5)]),
677            Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.5)]),
678        ];
679
680        let predicates = vec![];
681        let aggregates = vec![
682            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
683            AggregateSpec { op: AggregateOp::Avg, source: AggregateSource::Column(1) },
684            AggregateSpec { op: AggregateOp::Max, source: AggregateSource::Column(0) },
685        ];
686
687        let result = execute_columnar_aggregate(&rows, &predicates, &aggregates, None).unwrap();
688
689        assert_eq!(result.len(), 1);
690        let result_row = &result[0];
691
692        // SUM(col0) = 60 (preserves integer type per #2545)
693        assert_eq!(result_row.get(0), Some(&SqlValue::Integer(60)));
694        // AVG(col1) = 2.5
695        assert!(
696            matches!(result_row.get(1), Some(&SqlValue::Double(avg)) if (avg - 2.5).abs() < 0.001)
697        );
698        // MAX(col0) = 30
699        assert_eq!(result_row.get(2), Some(&SqlValue::Integer(30)));
700    }
701
702    /// Test columnar execution with empty result set
703    #[test]
704    fn test_columnar_pipeline_empty_result() {
705        let rows =
706            vec![Row::new(vec![SqlValue::Integer(100)]), Row::new(vec![SqlValue::Integer(200)])];
707
708        // Filter that matches nothing
709        let predicates =
710            vec![ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(50) }];
711
712        let aggregates = vec![
713            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
714            AggregateSpec { op: AggregateOp::Count, source: AggregateSource::Column(0) },
715        ];
716
717        let result = execute_columnar_aggregate(&rows, &predicates, &aggregates, None).unwrap();
718
719        assert_eq!(result.len(), 1);
720        let result_row = &result[0];
721
722        // SUM of empty set = NULL
723        assert_eq!(result_row.get(0), Some(&SqlValue::Null));
724        // COUNT of empty set = 0
725        assert_eq!(result_row.get(1), Some(&SqlValue::Integer(0)));
726    }
727
728    // AST Integration Tests
729
730    use crate::schema::CombinedSchema;
731    use vibesql_ast::{BinaryOperator, Expression};
732    use vibesql_catalog::{ColumnSchema, TableSchema};
733    use vibesql_types::DataType;
734
735    fn make_test_schema() -> CombinedSchema {
736        let schema = TableSchema::new(
737            "test".to_string(),
738            vec![
739                ColumnSchema::new("quantity".to_string(), DataType::Integer, false),
740                ColumnSchema::new("price".to_string(), DataType::DoublePrecision, false),
741            ],
742        );
743        CombinedSchema::from_table("test".to_string(), schema)
744    }
745
746    fn make_test_rows_for_ast() -> Vec<Row> {
747        vec![
748            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
749            Row::new(vec![SqlValue::Integer(20), SqlValue::Double(2.5)]),
750            Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.5)]),
751            Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
752        ]
753    }
754
755    #[test]
756    fn test_execute_columnar_simple_aggregate() {
757        let rows = make_test_rows_for_ast();
758        let schema = make_test_schema();
759
760        // SELECT SUM(price) FROM test
761        let aggregates = vec![Expression::AggregateFunction {
762            name: "SUM".to_string(),
763            distinct: false,
764            args: vec![Expression::ColumnRef { table: None, column: "price".to_string() }],
765        }];
766
767        let result = execute_columnar(&rows, None, &aggregates, &schema);
768        assert!(result.is_some());
769
770        let rows_result = result.unwrap();
771        assert!(rows_result.is_ok());
772
773        let result_rows = rows_result.unwrap();
774        assert_eq!(result_rows.len(), 1);
775        assert_eq!(result_rows[0].len(), 1);
776
777        // Sum should be 1.5 + 2.5 + 3.5 + 4.5 = 12.0
778        if let Some(SqlValue::Double(sum)) = result_rows[0].get(0) {
779            assert!((sum - 12.0).abs() < 0.001);
780        } else {
781            panic!("Expected Double value");
782        }
783    }
784
785    #[test]
786    fn test_execute_columnar_with_filter() {
787        let rows = make_test_rows_for_ast();
788        let schema = make_test_schema();
789
790        // SELECT SUM(price) FROM test WHERE quantity < 25
791        let filter = Expression::BinaryOp {
792            left: Box::new(Expression::ColumnRef { table: None, column: "quantity".to_string() }),
793            op: BinaryOperator::LessThan,
794            right: Box::new(Expression::Literal(SqlValue::Integer(25))),
795        };
796
797        let aggregates = vec![Expression::AggregateFunction {
798            name: "SUM".to_string(),
799            distinct: false,
800            args: vec![Expression::ColumnRef { table: None, column: "price".to_string() }],
801        }];
802
803        let result = execute_columnar(&rows, Some(&filter), &aggregates, &schema);
804        assert!(result.is_some());
805
806        let rows_result = result.unwrap();
807        assert!(rows_result.is_ok());
808
809        let result_rows = rows_result.unwrap();
810        assert_eq!(result_rows.len(), 1);
811        assert_eq!(result_rows[0].len(), 1);
812
813        // Sum of rows where quantity < 25: 1.5 + 2.5 = 4.0
814        if let Some(SqlValue::Double(sum)) = result_rows[0].get(0) {
815            assert!((sum - 4.0).abs() < 0.001);
816        } else {
817            panic!("Expected Double value");
818        }
819    }
820
821    #[test]
822    fn test_execute_columnar_multiple_aggregates() {
823        let rows = make_test_rows_for_ast();
824        let schema = make_test_schema();
825
826        // SELECT SUM(price), COUNT(*), AVG(quantity) FROM test
827        let aggregates = vec![
828            Expression::AggregateFunction {
829                name: "SUM".to_string(),
830                distinct: false,
831                args: vec![Expression::ColumnRef { table: None, column: "price".to_string() }],
832            },
833            Expression::AggregateFunction {
834                name: "COUNT".to_string(),
835                distinct: false,
836                args: vec![Expression::Wildcard],
837            },
838            Expression::AggregateFunction {
839                name: "AVG".to_string(),
840                distinct: false,
841                args: vec![Expression::ColumnRef { table: None, column: "quantity".to_string() }],
842            },
843        ];
844
845        let result = execute_columnar(&rows, None, &aggregates, &schema);
846        assert!(result.is_some());
847
848        let rows_result = result.unwrap();
849        assert!(rows_result.is_ok());
850
851        let result_rows = rows_result.unwrap();
852        assert_eq!(result_rows.len(), 1);
853        assert_eq!(result_rows[0].len(), 3);
854
855        // Check SUM(price) = 12.0
856        if let Some(SqlValue::Double(sum)) = result_rows[0].get(0) {
857            assert!((sum - 12.0).abs() < 0.001);
858        } else {
859            panic!("Expected Double value for SUM");
860        }
861
862        // Check COUNT(*) = 4
863        assert_eq!(result_rows[0].get(1), Some(&SqlValue::Integer(4)));
864
865        // Check AVG(quantity) = (10+20+30+40)/4 = 25.0
866        if let Some(SqlValue::Double(avg)) = result_rows[0].get(2) {
867            assert!((avg - 25.0).abs() < 0.001);
868        } else {
869            panic!("Expected Double value for AVG");
870        }
871    }
872
873    #[test]
874    fn test_execute_columnar_unsupported_distinct() {
875        let rows = make_test_rows_for_ast();
876        let schema = make_test_schema();
877
878        // SELECT SUM(DISTINCT price) FROM test - should return None
879        let aggregates = vec![Expression::AggregateFunction {
880            name: "SUM".to_string(),
881            distinct: true,
882            args: vec![Expression::ColumnRef { table: None, column: "price".to_string() }],
883        }];
884
885        let result = execute_columnar(&rows, None, &aggregates, &schema);
886        assert!(result.is_none());
887    }
888
889    #[test]
890    fn test_execute_columnar_unsupported_complex_filter() {
891        let rows = make_test_rows_for_ast();
892        let schema = make_test_schema();
893
894        // SELECT SUM(price) FROM test WHERE quantity IN (SELECT ...) - unsupported
895        let filter = Expression::ScalarSubquery(Box::new(vibesql_ast::SelectStmt {
896            with_clause: None,
897            distinct: false,
898            select_list: vec![],
899            into_table: None,
900            into_variables: None,
901            from: None,
902            where_clause: None,
903            group_by: None,
904            having: None,
905            order_by: None,
906            limit: None,
907            offset: None,
908            set_operation: None,
909        }));
910
911        let aggregates = vec![Expression::AggregateFunction {
912            name: "SUM".to_string(),
913            distinct: false,
914            args: vec![Expression::ColumnRef { table: None, column: "price".to_string() }],
915        }];
916
917        let result = execute_columnar(&rows, Some(&filter), &aggregates, &schema);
918        assert!(result.is_none());
919    }
920}