vibesql_executor/select/executor/fast_path/
streaming_agg.rs

1//! Streaming aggregation for fast path execution
2//!
3//! This module provides ultra-fast execution for simple aggregate queries
4//! that can accumulate results inline during a PK range scan.
5
6use vibesql_ast::{SelectItem, SelectStmt};
7use vibesql_storage::Row;
8use vibesql_types::SqlValue;
9
10use super::analysis::extract_simple_aggregate;
11use crate::{
12    errors::ExecutorError,
13    select::{executor::builder::SelectExecutor, grouping::AggregateAccumulator},
14};
15
16impl SelectExecutor<'_> {
17    /// Execute a streaming aggregate query (#3815)
18    ///
19    /// This provides ultra-fast execution for queries like:
20    /// `SELECT SUM(k) FROM sbtest1 WHERE id BETWEEN ? AND ?`
21    ///
22    /// By accumulating aggregates inline during the PK range scan, we avoid:
23    /// - Materializing intermediate Row objects
24    /// - Going through the full pipeline infrastructure
25    /// - Multiple allocations per row
26    ///
27    /// # Performance
28    /// This achieves SQLite-like performance (~4μs) compared to ~30μs
29    /// for the standard aggregation path.
30    pub fn execute_streaming_aggregate(
31        &self,
32        stmt: &SelectStmt,
33    ) -> Result<Vec<Row>, ExecutorError> {
34        // Extract table name from FROM clause
35        let table_name = match &stmt.from {
36            Some(vibesql_ast::FromClause::Table { name, .. }) => name.as_str(),
37            _ => {
38                return Err(ExecutorError::Other(
39                    "Streaming aggregate requires simple table FROM".to_string(),
40                ))
41            }
42        };
43
44        // Get table
45        let table = match self.database.get_table(table_name) {
46            Some(t) => t,
47            None => {
48                return Err(ExecutorError::TableNotFound(table_name.to_string()));
49            }
50        };
51
52        // Need single-column PK for streaming
53        let pk_columns = match &table.schema.primary_key {
54            Some(cols) if cols.len() == 1 => cols,
55            _ => {
56                return Err(ExecutorError::Other(
57                    "Streaming aggregate requires single-column PK".to_string(),
58                ))
59            }
60        };
61        let pk_col = &pk_columns[0];
62
63        // Extract BETWEEN bounds from WHERE clause
64        let where_clause = stmt.where_clause.as_ref().ok_or_else(|| {
65            ExecutorError::Other("Streaming aggregate requires WHERE clause".to_string())
66        })?;
67
68        let (low_value, high_value) = match self.extract_between_bounds(where_clause, pk_col) {
69            Some(bounds) => bounds,
70            None => {
71                return Err(ExecutorError::Other(
72                    "Streaming aggregate requires BETWEEN predicate on PK".to_string(),
73                ))
74            }
75        };
76
77        // Find an index on the PK column
78        let index_names = self.database.list_indexes_for_table(table_name);
79        let pk_index_data = index_names.iter().find_map(|idx_name| {
80            let metadata = self.database.get_index(idx_name)?;
81            if metadata.columns.len() == 1
82                && metadata.columns[0].expect_column_name().eq_ignore_ascii_case(pk_col)
83            {
84                self.database.get_index_data(idx_name)
85            } else {
86                None
87            }
88        });
89        let pk_index_data = match pk_index_data {
90            Some(idx) => idx,
91            None => {
92                return Err(ExecutorError::Other(
93                    "Streaming aggregate requires index on PK".to_string(),
94                ))
95            }
96        };
97
98        // Create accumulators for each aggregate in the SELECT list
99        let mut accumulators: Vec<(AggregateAccumulator, usize)> =
100            Vec::with_capacity(stmt.select_list.len());
101
102        for item in &stmt.select_list {
103            match item {
104                SelectItem::Expression { expr, .. } => {
105                    let (func_name, col_idx) = extract_simple_aggregate(expr, &table.schema)
106                        .ok_or_else(|| {
107                            ExecutorError::Other(
108                                "Streaming aggregate: invalid aggregate expression".to_string(),
109                            )
110                        })?;
111                    let accumulator = AggregateAccumulator::new(&func_name, false)?;
112                    accumulators.push((accumulator, col_idx));
113                }
114                _ => {
115                    return Err(ExecutorError::Other(
116                        "Streaming aggregate: SELECT must contain only aggregates".to_string(),
117                    ))
118                }
119            }
120        }
121
122        // Use streaming range scan to accumulate inline
123        if let Some(stream) = pk_index_data.range_scan_streaming(
124            Some(&low_value),
125            Some(&high_value),
126            true, // inclusive start (BETWEEN is inclusive)
127            true, // inclusive end
128        ) {
129            for row_idx in stream {
130                if let Some(row) = table.get_row(row_idx) {
131                    for (accumulator, col_idx) in &mut accumulators {
132                        accumulator.accumulate(&row.values[*col_idx]);
133                    }
134                }
135            }
136        }
137
138        // Finalize and return single row
139        let result_values: Vec<SqlValue> = accumulators
140            .iter()
141            .map(|(acc, _)| acc.finalize())
142            .collect::<Result<Vec<_>, _>>()?;
143
144        Ok(vec![Row::from_vec(result_values)])
145    }
146}