vibesql_executor/select/columnar/executor/
mod.rs

1//! Native columnar executor - end-to-end columnar query execution
2//!
3//! This module provides true columnar query execution that operates on `ColumnarBatch`
4//! throughout the entire pipeline, avoiding row materialization until final output.
5
6#![allow(clippy::needless_range_loop, clippy::unnecessary_map_or)]
7//!
8//! ## Architecture
9//!
10//! ```text
11//! Storage → ColumnarBatch → SIMD Filter → SIMD Aggregate → Vec<Row> (only at output)
12//!          ↑ Zero-copy     ↑ 4-8x faster  ↑ 10x faster   ↑ Minimal materialization
13//! ```
14//!
15//! ## Key Benefits
16//!
17//! - **Zero-copy**: ColumnarBatch flows through without row materialization
18//! - **SIMD acceleration**: All filtering and aggregation uses vectorized instructions
19//! - **Cache efficiency**: Columnar data access patterns are cache-friendly
20//! - **Minimal allocations**: Only allocate result rows at the end
21//!
22//! ## Module Organization
23//!
24//! - `fused` - Fused filter+aggregate optimization path
25//! - `aggregate` - Standard aggregation functions
26//! - `expression` - Expression evaluation helpers
27
28mod aggregate;
29mod expression;
30mod fused;
31
32use super::aggregate::{AggregateOp, AggregateSpec};
33use super::batch::ColumnarBatch;
34use super::filter::ColumnPredicate;
35use super::simd_filter::simd_filter_batch;
36use crate::errors::ExecutorError;
37use crate::schema::CombinedSchema;
38use vibesql_storage::Row;
39use vibesql_types::SqlValue;
40
41use aggregate::compute_batch_aggregates;
42use fused::{can_use_fused_aggregation, execute_fused_filter_aggregate};
43
44/// Execute a columnar query end-to-end on a ColumnarBatch
45///
46/// This is the main entry point for native columnar execution. It accepts
47/// a ColumnarBatch from storage and executes filtering and aggregation
48/// entirely in columnar format.
49///
50/// # Arguments
51///
52/// * `batch` - Input ColumnarBatch from storage layer
53/// * `predicates` - Column predicates for SIMD filtering
54/// * `aggregates` - Aggregate specifications (SUM, COUNT, etc.)
55/// * `schema` - Optional schema for expression evaluation
56///
57/// # Returns
58///
59/// A vector of rows containing the aggregated results
60pub fn execute_columnar_batch(
61    batch: &ColumnarBatch,
62    predicates: &[ColumnPredicate],
63    aggregates: &[AggregateSpec],
64    _schema: Option<&CombinedSchema>,
65) -> Result<Vec<Row>, ExecutorError> {
66    // Early return for empty input
67    if batch.row_count() == 0 {
68        let values: Vec<SqlValue> = aggregates
69            .iter()
70            .map(|spec| match spec.op {
71                AggregateOp::Count => SqlValue::Integer(0),
72                _ => SqlValue::Null,
73            })
74            .collect();
75        return Ok(vec![Row::new(values)]);
76    }
77
78    // Try fused filter+aggregate path first (avoids intermediate batch allocation)
79    // This is faster for simple aggregate queries like TPC-H Q6
80    if !predicates.is_empty() && can_use_fused_aggregation(aggregates) {
81        if let Ok(results) = execute_fused_filter_aggregate(batch, predicates, aggregates) {
82            return Ok(vec![Row::new(results)]);
83        }
84        // Fall through to standard path on failure
85    }
86
87    // Standard path: filter first, then aggregate
88    // Used when fused path is not applicable or fails
89
90    // Phase 1: Apply auto-vectorized filtering
91    #[cfg(feature = "profile-q6")]
92    let filter_start = std::time::Instant::now();
93
94    let filtered_batch =
95        if predicates.is_empty() { batch.clone() } else { simd_filter_batch(batch, predicates)? };
96
97    #[cfg(feature = "profile-q6")]
98    {
99        let filter_time = filter_start.elapsed();
100        eprintln!(
101            "[PROFILE-Q6]   Phase 1 - SIMD Filter: {:?} ({}/{} rows passed)",
102            filter_time,
103            filtered_batch.row_count(),
104            batch.row_count()
105        );
106    }
107
108    // Phase 2: Compute aggregates on filtered batch
109    #[cfg(feature = "profile-q6")]
110    let agg_start = std::time::Instant::now();
111
112    let results = compute_batch_aggregates(&filtered_batch, aggregates)?;
113
114    #[cfg(feature = "profile-q6")]
115    {
116        let agg_time = agg_start.elapsed();
117        eprintln!(
118            "[PROFILE-Q6]   Phase 2 - SIMD Aggregate: {:?} ({} aggregates)",
119            agg_time,
120            aggregates.len()
121        );
122    }
123
124    // Phase 3: Convert to output rows (only materialization point)
125    Ok(vec![Row::new(results)])
126}
127
128#[cfg(test)]
129mod tests {
130    use super::super::aggregate::AggregateSource;
131    use super::super::batch::ColumnarBatch;
132    use super::*;
133
134    fn make_test_batch() -> ColumnarBatch {
135        let rows = vec![
136            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
137            Row::new(vec![SqlValue::Integer(20), SqlValue::Double(2.5)]),
138            Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.5)]),
139            Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
140        ];
141        ColumnarBatch::from_rows(&rows).unwrap()
142    }
143
144    #[test]
145    fn test_execute_columnar_batch_sum() {
146        let batch = make_test_batch();
147        let aggregates =
148            vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) }];
149
150        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
151        assert_eq!(result.len(), 1);
152        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
153    }
154
155    #[test]
156    fn test_execute_columnar_batch_with_filter() {
157        let batch = make_test_batch();
158        let predicates =
159            vec![ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(25) }];
160        let aggregates =
161            vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) }];
162
163        let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
164        assert_eq!(result.len(), 1);
165        // Only rows 0 (10) and 1 (20) pass the filter
166        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(30)));
167    }
168
169    #[test]
170    fn test_execute_columnar_batch_multiple_aggregates() {
171        let batch = make_test_batch();
172        let aggregates = vec![
173            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
174            AggregateSpec { op: AggregateOp::Avg, source: AggregateSource::Column(1) },
175            AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
176        ];
177
178        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
179        assert_eq!(result.len(), 1);
180        assert_eq!(result[0].len(), 3);
181
182        // SUM(col0) = 100
183        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
184
185        // AVG(col1) = 3.0
186        if let Some(SqlValue::Double(avg)) = result[0].get(1) {
187            assert!((avg - 3.0).abs() < 0.001);
188        } else {
189            panic!("Expected Double for AVG");
190        }
191
192        // COUNT(*) = 4
193        assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
194    }
195
196    #[test]
197    fn test_execute_columnar_batch_empty() {
198        let batch = ColumnarBatch::new(2);
199        let aggregates = vec![
200            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
201            AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
202        ];
203
204        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
205        assert_eq!(result.len(), 1);
206        assert_eq!(result[0].get(0), Some(&SqlValue::Null)); // SUM of empty = NULL
207        assert_eq!(result[0].get(1), Some(&SqlValue::Integer(0))); // COUNT of empty = 0
208    }
209
210    /// Test that aggregation correctly handles NULL values.
211    /// This verifies the fix for NULL handling in fused column aggregates.
212    #[test]
213    fn test_execute_columnar_batch_with_nulls() {
214        // Create batch with some NULL values
215        let rows = vec![
216            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
217            Row::new(vec![SqlValue::Null, SqlValue::Double(2.5)]), // NULL in first column
218            Row::new(vec![SqlValue::Integer(30), SqlValue::Null]), // NULL in second column
219            Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
220        ];
221        let batch = ColumnarBatch::from_rows(&rows).unwrap();
222
223        let aggregates = vec![
224            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
225            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(1) },
226            AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
227        ];
228
229        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
230        assert_eq!(result.len(), 1);
231
232        // SUM(col0) should be 10 + 30 + 40 = 80 (NULL excluded)
233        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(80)));
234
235        // SUM(col1) should be 1.5 + 2.5 + 4.5 = 8.5 (NULL excluded)
236        if let Some(SqlValue::Double(sum)) = result[0].get(1) {
237            assert!((sum - 8.5).abs() < 0.001);
238        } else {
239            panic!("Expected Double for SUM(col1)");
240        }
241
242        // COUNT(*) = 4 (counts all rows regardless of NULLs)
243        assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
244    }
245
246    /// Test aggregation with NULLs and filter predicates.
247    /// Verifies fused path correctly falls back when NULLs are present.
248    #[test]
249    fn test_execute_columnar_batch_with_nulls_and_filter() {
250        let rows = vec![
251            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.0)]),
252            Row::new(vec![SqlValue::Integer(20), SqlValue::Null]), // NULL - should be excluded from sum
253            Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.0)]),
254            Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.0)]),
255        ];
256        let batch = ColumnarBatch::from_rows(&rows).unwrap();
257
258        // Filter: col0 < 35 (includes rows 0, 1, 2)
259        let predicates =
260            vec![ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(35) }];
261
262        let aggregates =
263            vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(1) }];
264
265        let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
266        assert_eq!(result.len(), 1);
267
268        // SUM(col1) where col0 < 35: 1.0 + 3.0 = 4.0 (row 1's NULL excluded)
269        if let Some(SqlValue::Double(sum)) = result[0].get(0) {
270            assert!((sum - 4.0).abs() < 0.001);
271        } else {
272            panic!("Expected Double for SUM(col1)");
273        }
274    }
275}