vibesql_executor/select/executor/fast_path/
streaming_agg.rs1use vibesql_ast::{SelectItem, SelectStmt};
7use vibesql_storage::Row;
8use vibesql_types::SqlValue;
9
10use super::analysis::extract_simple_aggregate;
11use crate::{
12 errors::ExecutorError,
13 select::{executor::builder::SelectExecutor, grouping::AggregateAccumulator},
14};
15
16impl SelectExecutor<'_> {
17 pub fn execute_streaming_aggregate(
31 &self,
32 stmt: &SelectStmt,
33 ) -> Result<Vec<Row>, ExecutorError> {
34 let table_name = match &stmt.from {
36 Some(vibesql_ast::FromClause::Table { name, .. }) => name.as_str(),
37 _ => {
38 return Err(ExecutorError::Other(
39 "Streaming aggregate requires simple table FROM".to_string(),
40 ))
41 }
42 };
43
44 let table = match self.database.get_table(table_name) {
46 Some(t) => t,
47 None => {
48 return Err(ExecutorError::TableNotFound(table_name.to_string()));
49 }
50 };
51
52 let pk_columns = match &table.schema.primary_key {
54 Some(cols) if cols.len() == 1 => cols,
55 _ => {
56 return Err(ExecutorError::Other(
57 "Streaming aggregate requires single-column PK".to_string(),
58 ))
59 }
60 };
61 let pk_col = &pk_columns[0];
62
63 let where_clause = stmt.where_clause.as_ref().ok_or_else(|| {
65 ExecutorError::Other("Streaming aggregate requires WHERE clause".to_string())
66 })?;
67
68 let (low_value, high_value) = match self.extract_between_bounds(where_clause, pk_col) {
69 Some(bounds) => bounds,
70 None => {
71 return Err(ExecutorError::Other(
72 "Streaming aggregate requires BETWEEN predicate on PK".to_string(),
73 ))
74 }
75 };
76
77 let index_names = self.database.list_indexes_for_table(table_name);
79 let pk_index_data = index_names.iter().find_map(|idx_name| {
80 let metadata = self.database.get_index(idx_name)?;
81 if metadata.columns.len() == 1
82 && metadata.columns[0].expect_column_name().eq_ignore_ascii_case(pk_col)
83 {
84 self.database.get_index_data(idx_name)
85 } else {
86 None
87 }
88 });
89 let pk_index_data = match pk_index_data {
90 Some(idx) => idx,
91 None => {
92 return Err(ExecutorError::Other(
93 "Streaming aggregate requires index on PK".to_string(),
94 ))
95 }
96 };
97
98 let mut accumulators: Vec<(AggregateAccumulator, usize)> =
100 Vec::with_capacity(stmt.select_list.len());
101
102 for item in &stmt.select_list {
103 match item {
104 SelectItem::Expression { expr, .. } => {
105 let (func_name, col_idx) = extract_simple_aggregate(expr, &table.schema)
106 .ok_or_else(|| {
107 ExecutorError::Other(
108 "Streaming aggregate: invalid aggregate expression".to_string(),
109 )
110 })?;
111 let accumulator = AggregateAccumulator::new(&func_name, false)?;
112 accumulators.push((accumulator, col_idx));
113 }
114 _ => {
115 return Err(ExecutorError::Other(
116 "Streaming aggregate: SELECT must contain only aggregates".to_string(),
117 ))
118 }
119 }
120 }
121
122 if let Some(stream) = pk_index_data.range_scan_streaming(
124 Some(&low_value),
125 Some(&high_value),
126 true, true, ) {
129 for row_idx in stream {
130 if let Some(row) = table.get_row(row_idx) {
131 for (accumulator, col_idx) in &mut accumulators {
132 accumulator.accumulate(&row.values[*col_idx]);
133 }
134 }
135 }
136 }
137
138 let result_values: Vec<SqlValue> = accumulators
140 .iter()
141 .map(|(acc, _)| acc.finalize())
142 .collect::<Result<Vec<_>, _>>()?;
143
144 Ok(vec![Row::from_vec(result_values)])
145 }
146}