vibesql_executor/select/columnar/executor/
mod.rs

1//! Native columnar executor - end-to-end columnar query execution
2//!
3//! This module provides true columnar query execution that operates on `ColumnarBatch`
4//! throughout the entire pipeline, avoiding row materialization until final output.
5
6#![allow(clippy::needless_range_loop, clippy::unnecessary_map_or)]
7//!
8//! ## Architecture
9//!
10//! ```text
11//! Storage → ColumnarBatch → SIMD Filter → SIMD Aggregate → Vec<Row> (only at output)
12//!          ↑ Zero-copy     ↑ 4-8x faster  ↑ 10x faster   ↑ Minimal materialization
13//! ```
14//!
15//! ## Key Benefits
16//!
17//! - **Zero-copy**: ColumnarBatch flows through without row materialization
18//! - **SIMD acceleration**: All filtering and aggregation uses vectorized instructions
19//! - **Cache efficiency**: Columnar data access patterns are cache-friendly
20//! - **Minimal allocations**: Only allocate result rows at the end
21//!
22//! ## Module Organization
23//!
24//! - `fused` - Fused filter+aggregate optimization path
25//! - `aggregate` - Standard aggregation functions
26//! - `expression` - Expression evaluation helpers
27
28mod fused;
29mod aggregate;
30mod expression;
31
32use super::batch::ColumnarBatch;
33use super::aggregate::{AggregateOp, AggregateSpec};
34use super::filter::ColumnPredicate;
35use super::simd_filter::simd_filter_batch;
36use crate::errors::ExecutorError;
37use crate::schema::CombinedSchema;
38use vibesql_storage::Row;
39use vibesql_types::SqlValue;
40
41use fused::{can_use_fused_aggregation, execute_fused_filter_aggregate};
42use aggregate::compute_batch_aggregates;
43
44/// Execute a columnar query end-to-end on a ColumnarBatch
45///
46/// This is the main entry point for native columnar execution. It accepts
47/// a ColumnarBatch from storage and executes filtering and aggregation
48/// entirely in columnar format.
49///
50/// # Arguments
51///
52/// * `batch` - Input ColumnarBatch from storage layer
53/// * `predicates` - Column predicates for SIMD filtering
54/// * `aggregates` - Aggregate specifications (SUM, COUNT, etc.)
55/// * `schema` - Optional schema for expression evaluation
56///
57/// # Returns
58///
59/// A vector of rows containing the aggregated results
60pub fn execute_columnar_batch(
61    batch: &ColumnarBatch,
62    predicates: &[ColumnPredicate],
63    aggregates: &[AggregateSpec],
64    _schema: Option<&CombinedSchema>,
65) -> Result<Vec<Row>, ExecutorError> {
66    // Early return for empty input
67    if batch.row_count() == 0 {
68        let values: Vec<SqlValue> = aggregates
69            .iter()
70            .map(|spec| match spec.op {
71                AggregateOp::Count => SqlValue::Integer(0),
72                _ => SqlValue::Null,
73            })
74            .collect();
75        return Ok(vec![Row::new(values)]);
76    }
77
78    // Try fused filter+aggregate path first (avoids intermediate batch allocation)
79    // This is faster for simple aggregate queries like TPC-H Q6
80    if !predicates.is_empty() && can_use_fused_aggregation(aggregates) {
81        if let Ok(results) = execute_fused_filter_aggregate(batch, predicates, aggregates) {
82            return Ok(vec![Row::new(results)]);
83        }
84        // Fall through to standard path on failure
85    }
86
87    // Standard path: filter first, then aggregate
88    // Used when fused path is not applicable or fails
89
90    // Phase 1: Apply auto-vectorized filtering
91    #[cfg(feature = "profile-q6")]
92    let filter_start = std::time::Instant::now();
93
94    let filtered_batch = if predicates.is_empty() {
95        batch.clone()
96    } else {
97        simd_filter_batch(batch, predicates)?
98    };
99
100    #[cfg(feature = "profile-q6")]
101    {
102        let filter_time = filter_start.elapsed();
103        eprintln!(
104            "[PROFILE-Q6]   Phase 1 - SIMD Filter: {:?} ({}/{} rows passed)",
105            filter_time,
106            filtered_batch.row_count(),
107            batch.row_count()
108        );
109    }
110
111    // Phase 2: Compute aggregates on filtered batch
112    #[cfg(feature = "profile-q6")]
113    let agg_start = std::time::Instant::now();
114
115    let results = compute_batch_aggregates(&filtered_batch, aggregates)?;
116
117    #[cfg(feature = "profile-q6")]
118    {
119        let agg_time = agg_start.elapsed();
120        eprintln!(
121            "[PROFILE-Q6]   Phase 2 - SIMD Aggregate: {:?} ({} aggregates)",
122            agg_time,
123            aggregates.len()
124        );
125    }
126
127    // Phase 3: Convert to output rows (only materialization point)
128    Ok(vec![Row::new(results)])
129}
130
131#[cfg(test)]
132mod tests {
133    use super::*;
134    use super::super::aggregate::AggregateSource;
135    use super::super::batch::ColumnarBatch;
136
137    fn make_test_batch() -> ColumnarBatch {
138        let rows = vec![
139            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
140            Row::new(vec![SqlValue::Integer(20), SqlValue::Double(2.5)]),
141            Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.5)]),
142            Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
143        ];
144        ColumnarBatch::from_rows(&rows).unwrap()
145    }
146
147    #[test]
148    fn test_execute_columnar_batch_sum() {
149        let batch = make_test_batch();
150        let aggregates = vec![
151            AggregateSpec {
152                op: AggregateOp::Sum,
153                source: AggregateSource::Column(0),
154            },
155        ];
156
157        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
158        assert_eq!(result.len(), 1);
159        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
160    }
161
162    #[test]
163    fn test_execute_columnar_batch_with_filter() {
164        let batch = make_test_batch();
165        let predicates = vec![
166            ColumnPredicate::LessThan {
167                column_idx: 0,
168                value: SqlValue::Integer(25),
169            },
170        ];
171        let aggregates = vec![
172            AggregateSpec {
173                op: AggregateOp::Sum,
174                source: AggregateSource::Column(0),
175            },
176        ];
177
178        let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
179        assert_eq!(result.len(), 1);
180        // Only rows 0 (10) and 1 (20) pass the filter
181        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(30)));
182    }
183
184    #[test]
185    fn test_execute_columnar_batch_multiple_aggregates() {
186        let batch = make_test_batch();
187        let aggregates = vec![
188            AggregateSpec {
189                op: AggregateOp::Sum,
190                source: AggregateSource::Column(0),
191            },
192            AggregateSpec {
193                op: AggregateOp::Avg,
194                source: AggregateSource::Column(1),
195            },
196            AggregateSpec {
197                op: AggregateOp::Count,
198                source: AggregateSource::CountStar,
199            },
200        ];
201
202        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
203        assert_eq!(result.len(), 1);
204        assert_eq!(result[0].len(), 3);
205
206        // SUM(col0) = 100
207        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
208
209        // AVG(col1) = 3.0
210        if let Some(SqlValue::Double(avg)) = result[0].get(1) {
211            assert!((avg - 3.0).abs() < 0.001);
212        } else {
213            panic!("Expected Double for AVG");
214        }
215
216        // COUNT(*) = 4
217        assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
218    }
219
220    #[test]
221    fn test_execute_columnar_batch_empty() {
222        let batch = ColumnarBatch::new(2);
223        let aggregates = vec![
224            AggregateSpec {
225                op: AggregateOp::Sum,
226                source: AggregateSource::Column(0),
227            },
228            AggregateSpec {
229                op: AggregateOp::Count,
230                source: AggregateSource::CountStar,
231            },
232        ];
233
234        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
235        assert_eq!(result.len(), 1);
236        assert_eq!(result[0].get(0), Some(&SqlValue::Null)); // SUM of empty = NULL
237        assert_eq!(result[0].get(1), Some(&SqlValue::Integer(0))); // COUNT of empty = 0
238    }
239
240    /// Test that aggregation correctly handles NULL values.
241    /// This verifies the fix for NULL handling in fused column aggregates.
242    #[test]
243    fn test_execute_columnar_batch_with_nulls() {
244        // Create batch with some NULL values
245        let rows = vec![
246            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
247            Row::new(vec![SqlValue::Null, SqlValue::Double(2.5)]),       // NULL in first column
248            Row::new(vec![SqlValue::Integer(30), SqlValue::Null]),       // NULL in second column
249            Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
250        ];
251        let batch = ColumnarBatch::from_rows(&rows).unwrap();
252
253        let aggregates = vec![
254            AggregateSpec {
255                op: AggregateOp::Sum,
256                source: AggregateSource::Column(0),
257            },
258            AggregateSpec {
259                op: AggregateOp::Sum,
260                source: AggregateSource::Column(1),
261            },
262            AggregateSpec {
263                op: AggregateOp::Count,
264                source: AggregateSource::CountStar,
265            },
266        ];
267
268        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
269        assert_eq!(result.len(), 1);
270
271        // SUM(col0) should be 10 + 30 + 40 = 80 (NULL excluded)
272        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(80)));
273
274        // SUM(col1) should be 1.5 + 2.5 + 4.5 = 8.5 (NULL excluded)
275        if let Some(SqlValue::Double(sum)) = result[0].get(1) {
276            assert!((sum - 8.5).abs() < 0.001);
277        } else {
278            panic!("Expected Double for SUM(col1)");
279        }
280
281        // COUNT(*) = 4 (counts all rows regardless of NULLs)
282        assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
283    }
284
285    /// Test aggregation with NULLs and filter predicates.
286    /// Verifies fused path correctly falls back when NULLs are present.
287    #[test]
288    fn test_execute_columnar_batch_with_nulls_and_filter() {
289        let rows = vec![
290            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.0)]),
291            Row::new(vec![SqlValue::Integer(20), SqlValue::Null]),       // NULL - should be excluded from sum
292            Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.0)]),
293            Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.0)]),
294        ];
295        let batch = ColumnarBatch::from_rows(&rows).unwrap();
296
297        // Filter: col0 < 35 (includes rows 0, 1, 2)
298        let predicates = vec![
299            ColumnPredicate::LessThan {
300                column_idx: 0,
301                value: SqlValue::Integer(35),
302            },
303        ];
304
305        let aggregates = vec![
306            AggregateSpec {
307                op: AggregateOp::Sum,
308                source: AggregateSource::Column(1),
309            },
310        ];
311
312        let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
313        assert_eq!(result.len(), 1);
314
315        // SUM(col1) where col0 < 35: 1.0 + 3.0 = 4.0 (row 1's NULL excluded)
316        if let Some(SqlValue::Double(sum)) = result[0].get(0) {
317            assert!((sum - 4.0).abs() < 0.001);
318        } else {
319            panic!("Expected Double for SUM(col1)");
320        }
321    }
322}