vibesql_executor/select/executor/fast_path/
streaming_agg.rs1use vibesql_ast::{SelectItem, SelectStmt};
7use vibesql_storage::Row;
8use vibesql_types::SqlValue;
9
10use super::analysis::extract_simple_aggregate;
11use crate::errors::ExecutorError;
12use crate::select::executor::builder::SelectExecutor;
13use crate::select::grouping::AggregateAccumulator;
14
15impl SelectExecutor<'_> {
16 pub fn execute_streaming_aggregate(&self, stmt: &SelectStmt) -> Result<Vec<Row>, ExecutorError> {
30 let table_name = match &stmt.from {
32 Some(vibesql_ast::FromClause::Table { name, .. }) => name.as_str(),
33 _ => {
34 return Err(ExecutorError::Other(
35 "Streaming aggregate requires simple table FROM".to_string(),
36 ))
37 }
38 };
39
40 let table = match self.database.get_table(table_name) {
42 Some(t) => t,
43 None => {
44 return Err(ExecutorError::TableNotFound(table_name.to_string()));
45 }
46 };
47
48 let pk_columns = match &table.schema.primary_key {
50 Some(cols) if cols.len() == 1 => cols,
51 _ => {
52 return Err(ExecutorError::Other(
53 "Streaming aggregate requires single-column PK".to_string(),
54 ))
55 }
56 };
57 let pk_col = &pk_columns[0];
58
59 let where_clause = stmt.where_clause.as_ref().ok_or_else(|| {
61 ExecutorError::Other("Streaming aggregate requires WHERE clause".to_string())
62 })?;
63
64 let (low_value, high_value) = match self.extract_between_bounds(where_clause, pk_col) {
65 Some(bounds) => bounds,
66 None => {
67 return Err(ExecutorError::Other(
68 "Streaming aggregate requires BETWEEN predicate on PK".to_string(),
69 ))
70 }
71 };
72
73 let index_names = self.database.list_indexes_for_table(table_name);
75 let pk_index_data = index_names.iter().find_map(|idx_name| {
76 let metadata = self.database.get_index(idx_name)?;
77 if metadata.columns.len() == 1
78 && metadata.columns[0].column_name.eq_ignore_ascii_case(pk_col)
79 {
80 self.database.get_index_data(idx_name)
81 } else {
82 None
83 }
84 });
85 let pk_index_data = match pk_index_data {
86 Some(idx) => idx,
87 None => {
88 return Err(ExecutorError::Other(
89 "Streaming aggregate requires index on PK".to_string(),
90 ))
91 }
92 };
93
94 let mut accumulators: Vec<(AggregateAccumulator, usize)> =
96 Vec::with_capacity(stmt.select_list.len());
97
98 for item in &stmt.select_list {
99 match item {
100 SelectItem::Expression { expr, .. } => {
101 let (func_name, col_idx) =
102 extract_simple_aggregate(expr, &table.schema).ok_or_else(|| {
103 ExecutorError::Other(
104 "Streaming aggregate: invalid aggregate expression".to_string(),
105 )
106 })?;
107 let accumulator = AggregateAccumulator::new(&func_name, false)?;
108 accumulators.push((accumulator, col_idx));
109 }
110 _ => {
111 return Err(ExecutorError::Other(
112 "Streaming aggregate: SELECT must contain only aggregates".to_string(),
113 ))
114 }
115 }
116 }
117
118 if let Some(stream) = pk_index_data.range_scan_streaming(
120 Some(&low_value),
121 Some(&high_value),
122 true, true, ) {
125 for row_idx in stream {
126 if let Some(row) = table.get_row(row_idx) {
127 for (accumulator, col_idx) in &mut accumulators {
128 accumulator.accumulate(&row.values[*col_idx]);
129 }
130 }
131 }
132 }
133
134 let result_values: Vec<SqlValue> =
136 accumulators.iter().map(|(acc, _)| acc.finalize()).collect();
137
138 Ok(vec![Row::from_vec(result_values)])
139 }
140}