vibesql_executor/select/columnar/executor/
mod.rs

1//! Native columnar executor - end-to-end columnar query execution
2//!
3//! This module provides true columnar query execution that operates on `ColumnarBatch`
4//! throughout the entire pipeline, avoiding row materialization until final output.
5
6#![allow(clippy::needless_range_loop, clippy::unnecessary_map_or)]
7//!
8//! ## Architecture
9//!
10//! ```text
11//! Storage → ColumnarBatch → SIMD Filter → SIMD Aggregate → Vec<Row> (only at output)
12//!          ↑ Zero-copy     ↑ 4-8x faster  ↑ 10x faster   ↑ Minimal materialization
13//! ```
14//!
15//! ## Key Benefits
16//!
17//! - **Zero-copy**: ColumnarBatch flows through without row materialization
18//! - **SIMD acceleration**: All filtering and aggregation uses vectorized instructions
19//! - **Cache efficiency**: Columnar data access patterns are cache-friendly
20//! - **Minimal allocations**: Only allocate result rows at the end
21//!
22//! ## Module Organization
23//!
24//! - `fused` - Fused filter+aggregate optimization path
25//! - `aggregate` - Standard aggregation functions
26//! - `expression` - Expression evaluation helpers
27
28mod aggregate;
29mod expression;
30mod fused;
31
32use aggregate::compute_batch_aggregates;
33use fused::{can_use_fused_aggregation, execute_fused_filter_aggregate};
34use vibesql_storage::Row;
35use vibesql_types::SqlValue;
36
37use super::{
38    aggregate::{AggregateOp, AggregateSpec},
39    batch::ColumnarBatch,
40    filter::ColumnPredicate,
41    simd_filter::simd_filter_batch,
42};
43use crate::{errors::ExecutorError, schema::CombinedSchema};
44
45/// Execute a columnar query end-to-end on a ColumnarBatch
46///
47/// This is the main entry point for native columnar execution. It accepts
48/// a ColumnarBatch from storage and executes filtering and aggregation
49/// entirely in columnar format.
50///
51/// # Arguments
52///
53/// * `batch` - Input ColumnarBatch from storage layer
54/// * `predicates` - Column predicates for SIMD filtering
55/// * `aggregates` - Aggregate specifications (SUM, COUNT, etc.)
56/// * `schema` - Optional schema for expression evaluation
57///
58/// # Returns
59///
60/// A vector of rows containing the aggregated results
61pub fn execute_columnar_batch(
62    batch: &ColumnarBatch,
63    predicates: &[ColumnPredicate],
64    aggregates: &[AggregateSpec],
65    _schema: Option<&CombinedSchema>,
66) -> Result<Vec<Row>, ExecutorError> {
67    // Early return for empty input
68    if batch.row_count() == 0 {
69        let values: Vec<SqlValue> = aggregates
70            .iter()
71            .map(|spec| match spec.op {
72                AggregateOp::Count => SqlValue::Integer(0),
73                _ => SqlValue::Null,
74            })
75            .collect();
76        return Ok(vec![Row::new(values)]);
77    }
78
79    // Try fused filter+aggregate path first (avoids intermediate batch allocation)
80    // This is faster for simple aggregate queries like TPC-H Q6
81    if !predicates.is_empty() && can_use_fused_aggregation(aggregates) {
82        if let Ok(results) = execute_fused_filter_aggregate(batch, predicates, aggregates) {
83            return Ok(vec![Row::new(results)]);
84        }
85        // Fall through to standard path on failure
86    }
87
88    // Standard path: filter first, then aggregate
89    // Used when fused path is not applicable or fails
90
91    // Phase 1: Apply auto-vectorized filtering
92    #[cfg(feature = "profile-q6")]
93    let filter_start = std::time::Instant::now();
94
95    let filtered_batch =
96        if predicates.is_empty() { batch.clone() } else { simd_filter_batch(batch, predicates)? };
97
98    #[cfg(feature = "profile-q6")]
99    {
100        let filter_time = filter_start.elapsed();
101        eprintln!(
102            "[PROFILE-Q6]   Phase 1 - SIMD Filter: {:?} ({}/{} rows passed)",
103            filter_time,
104            filtered_batch.row_count(),
105            batch.row_count()
106        );
107    }
108
109    // Phase 2: Compute aggregates on filtered batch
110    #[cfg(feature = "profile-q6")]
111    let agg_start = std::time::Instant::now();
112
113    let results = compute_batch_aggregates(&filtered_batch, aggregates)?;
114
115    #[cfg(feature = "profile-q6")]
116    {
117        let agg_time = agg_start.elapsed();
118        eprintln!(
119            "[PROFILE-Q6]   Phase 2 - SIMD Aggregate: {:?} ({} aggregates)",
120            agg_time,
121            aggregates.len()
122        );
123    }
124
125    // Phase 3: Convert to output rows (only materialization point)
126    Ok(vec![Row::new(results)])
127}
128
129#[cfg(test)]
130mod tests {
131    use super::{
132        super::{aggregate::AggregateSource, batch::ColumnarBatch},
133        *,
134    };
135
136    fn make_test_batch() -> ColumnarBatch {
137        let rows = vec![
138            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
139            Row::new(vec![SqlValue::Integer(20), SqlValue::Double(2.5)]),
140            Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.5)]),
141            Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
142        ];
143        ColumnarBatch::from_rows(&rows).unwrap()
144    }
145
146    #[test]
147    fn test_execute_columnar_batch_sum() {
148        let batch = make_test_batch();
149        let aggregates =
150            vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) }];
151
152        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
153        assert_eq!(result.len(), 1);
154        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
155    }
156
157    #[test]
158    fn test_execute_columnar_batch_with_filter() {
159        let batch = make_test_batch();
160        let predicates =
161            vec![ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(25) }];
162        let aggregates =
163            vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) }];
164
165        let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
166        assert_eq!(result.len(), 1);
167        // Only rows 0 (10) and 1 (20) pass the filter
168        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(30)));
169    }
170
171    #[test]
172    fn test_execute_columnar_batch_multiple_aggregates() {
173        let batch = make_test_batch();
174        let aggregates = vec![
175            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
176            AggregateSpec { op: AggregateOp::Avg, source: AggregateSource::Column(1) },
177            AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
178        ];
179
180        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
181        assert_eq!(result.len(), 1);
182        assert_eq!(result[0].len(), 3);
183
184        // SUM(col0) = 100
185        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(100)));
186
187        // AVG(col1) = 3.0
188        if let Some(SqlValue::Double(avg)) = result[0].get(1) {
189            assert!((avg - 3.0).abs() < 0.001);
190        } else {
191            panic!("Expected Double for AVG");
192        }
193
194        // COUNT(*) = 4
195        assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
196    }
197
198    #[test]
199    fn test_execute_columnar_batch_empty() {
200        let batch = ColumnarBatch::new(2);
201        let aggregates = vec![
202            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
203            AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
204        ];
205
206        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
207        assert_eq!(result.len(), 1);
208        assert_eq!(result[0].get(0), Some(&SqlValue::Null)); // SUM of empty = NULL
209        assert_eq!(result[0].get(1), Some(&SqlValue::Integer(0))); // COUNT of empty = 0
210    }
211
212    /// Test that aggregation correctly handles NULL values.
213    /// This verifies the fix for NULL handling in fused column aggregates.
214    #[test]
215    fn test_execute_columnar_batch_with_nulls() {
216        // Create batch with some NULL values
217        let rows = vec![
218            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.5)]),
219            Row::new(vec![SqlValue::Null, SqlValue::Double(2.5)]), // NULL in first column
220            Row::new(vec![SqlValue::Integer(30), SqlValue::Null]), // NULL in second column
221            Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.5)]),
222        ];
223        let batch = ColumnarBatch::from_rows(&rows).unwrap();
224
225        let aggregates = vec![
226            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(0) },
227            AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(1) },
228            AggregateSpec { op: AggregateOp::Count, source: AggregateSource::CountStar },
229        ];
230
231        let result = execute_columnar_batch(&batch, &[], &aggregates, None).unwrap();
232        assert_eq!(result.len(), 1);
233
234        // SUM(col0) should be 10 + 30 + 40 = 80 (NULL excluded)
235        assert_eq!(result[0].get(0), Some(&SqlValue::Integer(80)));
236
237        // SUM(col1) should be 1.5 + 2.5 + 4.5 = 8.5 (NULL excluded)
238        if let Some(SqlValue::Double(sum)) = result[0].get(1) {
239            assert!((sum - 8.5).abs() < 0.001);
240        } else {
241            panic!("Expected Double for SUM(col1)");
242        }
243
244        // COUNT(*) = 4 (counts all rows regardless of NULLs)
245        assert_eq!(result[0].get(2), Some(&SqlValue::Integer(4)));
246    }
247
248    /// Test aggregation with NULLs and filter predicates.
249    /// Verifies fused path correctly falls back when NULLs are present.
250    #[test]
251    fn test_execute_columnar_batch_with_nulls_and_filter() {
252        let rows = vec![
253            Row::new(vec![SqlValue::Integer(10), SqlValue::Double(1.0)]),
254            Row::new(vec![SqlValue::Integer(20), SqlValue::Null]), /* NULL - should be excluded
255                                                                    * from sum */
256            Row::new(vec![SqlValue::Integer(30), SqlValue::Double(3.0)]),
257            Row::new(vec![SqlValue::Integer(40), SqlValue::Double(4.0)]),
258        ];
259        let batch = ColumnarBatch::from_rows(&rows).unwrap();
260
261        // Filter: col0 < 35 (includes rows 0, 1, 2)
262        let predicates =
263            vec![ColumnPredicate::LessThan { column_idx: 0, value: SqlValue::Integer(35) }];
264
265        let aggregates =
266            vec![AggregateSpec { op: AggregateOp::Sum, source: AggregateSource::Column(1) }];
267
268        let result = execute_columnar_batch(&batch, &predicates, &aggregates, None).unwrap();
269        assert_eq!(result.len(), 1);
270
271        // SUM(col1) where col0 < 35: 1.0 + 3.0 = 4.0 (row 1's NULL excluded)
272        if let Some(SqlValue::Double(sum)) = result[0].get(0) {
273            assert!((sum - 4.0).abs() < 0.001);
274        } else {
275            panic!("Expected Double for SUM(col1)");
276        }
277    }
278}