vibesql_executor/select/executor/fast_path/
streaming_agg.rs

1//! Streaming aggregation for fast path execution
2//!
3//! This module provides ultra-fast execution for simple aggregate queries
4//! that can accumulate results inline during a PK range scan.
5
6use vibesql_ast::{SelectItem, SelectStmt};
7use vibesql_storage::Row;
8use vibesql_types::SqlValue;
9
10use super::analysis::extract_simple_aggregate;
11use crate::errors::ExecutorError;
12use crate::select::executor::builder::SelectExecutor;
13use crate::select::grouping::AggregateAccumulator;
14
15impl SelectExecutor<'_> {
16    /// Execute a streaming aggregate query (#3815)
17    ///
18    /// This provides ultra-fast execution for queries like:
19    /// `SELECT SUM(k) FROM sbtest1 WHERE id BETWEEN ? AND ?`
20    ///
21    /// By accumulating aggregates inline during the PK range scan, we avoid:
22    /// - Materializing intermediate Row objects
23    /// - Going through the full pipeline infrastructure
24    /// - Multiple allocations per row
25    ///
26    /// # Performance
27    /// This achieves SQLite-like performance (~4μs) compared to ~30μs
28    /// for the standard aggregation path.
29    pub fn execute_streaming_aggregate(&self, stmt: &SelectStmt) -> Result<Vec<Row>, ExecutorError> {
30        // Extract table name from FROM clause
31        let table_name = match &stmt.from {
32            Some(vibesql_ast::FromClause::Table { name, .. }) => name.as_str(),
33            _ => {
34                return Err(ExecutorError::Other(
35                    "Streaming aggregate requires simple table FROM".to_string(),
36                ))
37            }
38        };
39
40        // Get table
41        let table = match self.database.get_table(table_name) {
42            Some(t) => t,
43            None => {
44                return Err(ExecutorError::TableNotFound(table_name.to_string()));
45            }
46        };
47
48        // Need single-column PK for streaming
49        let pk_columns = match &table.schema.primary_key {
50            Some(cols) if cols.len() == 1 => cols,
51            _ => {
52                return Err(ExecutorError::Other(
53                    "Streaming aggregate requires single-column PK".to_string(),
54                ))
55            }
56        };
57        let pk_col = &pk_columns[0];
58
59        // Extract BETWEEN bounds from WHERE clause
60        let where_clause = stmt.where_clause.as_ref().ok_or_else(|| {
61            ExecutorError::Other("Streaming aggregate requires WHERE clause".to_string())
62        })?;
63
64        let (low_value, high_value) = match self.extract_between_bounds(where_clause, pk_col) {
65            Some(bounds) => bounds,
66            None => {
67                return Err(ExecutorError::Other(
68                    "Streaming aggregate requires BETWEEN predicate on PK".to_string(),
69                ))
70            }
71        };
72
73        // Find an index on the PK column
74        let index_names = self.database.list_indexes_for_table(table_name);
75        let pk_index_data = index_names.iter().find_map(|idx_name| {
76            let metadata = self.database.get_index(idx_name)?;
77            if metadata.columns.len() == 1
78                && metadata.columns[0].column_name.eq_ignore_ascii_case(pk_col)
79            {
80                self.database.get_index_data(idx_name)
81            } else {
82                None
83            }
84        });
85        let pk_index_data = match pk_index_data {
86            Some(idx) => idx,
87            None => {
88                return Err(ExecutorError::Other(
89                    "Streaming aggregate requires index on PK".to_string(),
90                ))
91            }
92        };
93
94        // Create accumulators for each aggregate in the SELECT list
95        let mut accumulators: Vec<(AggregateAccumulator, usize)> =
96            Vec::with_capacity(stmt.select_list.len());
97
98        for item in &stmt.select_list {
99            match item {
100                SelectItem::Expression { expr, .. } => {
101                    let (func_name, col_idx) =
102                        extract_simple_aggregate(expr, &table.schema).ok_or_else(|| {
103                            ExecutorError::Other(
104                                "Streaming aggregate: invalid aggregate expression".to_string(),
105                            )
106                        })?;
107                    let accumulator = AggregateAccumulator::new(&func_name, false)?;
108                    accumulators.push((accumulator, col_idx));
109                }
110                _ => {
111                    return Err(ExecutorError::Other(
112                        "Streaming aggregate: SELECT must contain only aggregates".to_string(),
113                    ))
114                }
115            }
116        }
117
118        // Use streaming range scan to accumulate inline
119        if let Some(stream) = pk_index_data.range_scan_streaming(
120            Some(&low_value),
121            Some(&high_value),
122            true, // inclusive start (BETWEEN is inclusive)
123            true, // inclusive end
124        ) {
125            for row_idx in stream {
126                if let Some(row) = table.get_row(row_idx) {
127                    for (accumulator, col_idx) in &mut accumulators {
128                        accumulator.accumulate(&row.values[*col_idx]);
129                    }
130                }
131            }
132        }
133
134        // Finalize and return single row
135        let result_values: Vec<SqlValue> =
136            accumulators.iter().map(|(acc, _)| acc.finalize()).collect();
137
138        Ok(vec![Row::from_vec(result_values)])
139    }
140}