Skip to main content

icydb_core/db/session/sql/
aggregate.rs

1//! Module: db::session::sql::aggregate
2//! Responsibility: module-local ownership and contracts for db::session::sql::aggregate.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6use crate::{
7    db::{
8        DbSession, MissingRowPolicy, PersistedRow, QueryError,
9        executor::{ScalarNumericFieldBoundaryRequest, ScalarProjectionBoundaryRequest},
10        query::plan::AggregateKind,
11        session::sql::surface::sql_statement_route_from_statement,
12        session::sql::{SqlParsedStatement, SqlStatementRoute},
13        sql::lowering::{
14            TypedSqlGlobalAggregateTerminal, compile_sql_global_aggregate_command_from_prepared,
15            is_sql_global_aggregate_statement, prepare_sql_statement,
16        },
17        sql::parser::{SqlStatement, parse_sql},
18    },
19    traits::{CanisterKind, EntityValue},
20    value::Value,
21};
22
23#[derive(Clone, Copy, Debug, Eq, PartialEq)]
24pub(in crate::db::session::sql) enum SqlAggregateSurface {
25    QueryFrom,
26    ExecuteSql,
27    ExecuteSqlGrouped,
28    ExecuteSqlDispatch,
29    GeneratedQuerySurface,
30}
31
32pub(in crate::db::session::sql) fn parsed_requires_dedicated_sql_aggregate_lane(
33    parsed: &SqlParsedStatement,
34) -> bool {
35    is_sql_global_aggregate_statement(&parsed.statement)
36}
37
38pub(in crate::db::session::sql) const fn unsupported_sql_aggregate_lane_message(
39    surface: SqlAggregateSurface,
40) -> &'static str {
41    match surface {
42        SqlAggregateSurface::QueryFrom => {
43            "query_from_sql rejects global aggregate SELECT; use execute_sql_aggregate(...)"
44        }
45        SqlAggregateSurface::ExecuteSql => {
46            "execute_sql rejects global aggregate SELECT; use execute_sql_aggregate(...)"
47        }
48        SqlAggregateSurface::ExecuteSqlGrouped => {
49            "execute_sql_grouped rejects global aggregate SELECT; use execute_sql_aggregate(...)"
50        }
51        SqlAggregateSurface::ExecuteSqlDispatch => {
52            "execute_sql_dispatch rejects global aggregate SELECT; use execute_sql_aggregate(...)"
53        }
54        SqlAggregateSurface::GeneratedQuerySurface => {
55            "generated SQL query surface rejects global aggregate SELECT; use execute_sql_aggregate(...)"
56        }
57    }
58}
59
60const fn unsupported_sql_aggregate_surface_lane_message(route: &SqlStatementRoute) -> &'static str {
61    match route {
62        SqlStatementRoute::Query { .. } => {
63            "execute_sql_aggregate requires constrained global aggregate SELECT"
64        }
65        SqlStatementRoute::Explain { .. } => {
66            "execute_sql_aggregate rejects EXPLAIN; use execute_sql_dispatch"
67        }
68        SqlStatementRoute::Describe { .. } => {
69            "execute_sql_aggregate rejects DESCRIBE; use execute_sql_dispatch"
70        }
71        SqlStatementRoute::ShowIndexes { .. } => {
72            "execute_sql_aggregate rejects SHOW INDEXES; use execute_sql_dispatch"
73        }
74        SqlStatementRoute::ShowColumns { .. } => {
75            "execute_sql_aggregate rejects SHOW COLUMNS; use execute_sql_dispatch"
76        }
77        SqlStatementRoute::ShowEntities => {
78            "execute_sql_aggregate rejects SHOW ENTITIES; use execute_sql_dispatch"
79        }
80    }
81}
82
83const fn unsupported_sql_aggregate_grouped_message() -> &'static str {
84    "execute_sql_aggregate rejects grouped SELECT; use execute_sql_grouped(...)"
85}
86
87impl<C: CanisterKind> DbSession<C> {
88    /// Execute one reduced SQL global aggregate `SELECT` statement.
89    ///
90    /// This entrypoint is intentionally constrained to one aggregate terminal
91    /// shape per statement and preserves existing terminal semantics.
92    pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
93    where
94        E: PersistedRow<Canister = C> + EntityValue,
95    {
96        // Parse once into one owned statement so the aggregate lane can keep
97        // its surface checks and lowering on the same statement instance.
98        let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
99
100        // First keep wrong-lane traffic on an explicit aggregate-surface
101        // contract instead of relying on generic lowering failures.
102        match &statement {
103            SqlStatement::Select(_) if is_sql_global_aggregate_statement(&statement) => {}
104            SqlStatement::Select(statement) if !statement.group_by.is_empty() => {
105                return Err(QueryError::unsupported_query(
106                    unsupported_sql_aggregate_grouped_message(),
107                ));
108            }
109            SqlStatement::Delete(_) => {
110                return Err(QueryError::unsupported_query(
111                    "execute_sql_aggregate rejects DELETE; use execute_sql_dispatch",
112                ));
113            }
114            _ => {
115                let route = sql_statement_route_from_statement(&statement);
116
117                return Err(QueryError::unsupported_query(
118                    unsupported_sql_aggregate_surface_lane_message(&route),
119                ));
120            }
121        }
122
123        // First lower the SQL surface onto the existing single-terminal
124        // aggregate command authority so execution never has to rediscover the
125        // accepted aggregate shape family.
126        let command = compile_sql_global_aggregate_command_from_prepared::<E>(
127            prepare_sql_statement(statement, E::MODEL.name())
128                .map_err(QueryError::from_sql_lowering_error)?,
129            MissingRowPolicy::Ignore,
130        )
131        .map_err(QueryError::from_sql_lowering_error)?;
132
133        // Then dispatch each accepted terminal onto the existing load/query
134        // boundaries instead of reopening aggregate execution ownership here.
135        match command.terminal() {
136            TypedSqlGlobalAggregateTerminal::CountRows => self
137                .execute_load_query_with(command.query(), |load, plan| {
138                    load.execute_scalar_terminal_request(
139                        plan,
140                        crate::db::executor::ScalarTerminalBoundaryRequest::Count,
141                    )?
142                    .into_count()
143                })
144                .map(|count| Value::Uint(u64::from(count))),
145            TypedSqlGlobalAggregateTerminal::CountField(target_slot) => self
146                .execute_load_query_with(command.query(), |load, plan| {
147                    load.execute_scalar_projection_boundary(
148                        plan,
149                        target_slot.clone(),
150                        ScalarProjectionBoundaryRequest::CountNonNull,
151                    )?
152                    .into_count()
153                })
154                .map(|count| Value::Uint(u64::from(count))),
155            TypedSqlGlobalAggregateTerminal::SumField(target_slot) => self
156                .execute_load_query_with(command.query(), |load, plan| {
157                    load.execute_numeric_field_boundary(
158                        plan,
159                        target_slot.clone(),
160                        ScalarNumericFieldBoundaryRequest::Sum,
161                    )
162                })
163                .map(|value| value.map_or(Value::Null, Value::Decimal)),
164            TypedSqlGlobalAggregateTerminal::AvgField(target_slot) => self
165                .execute_load_query_with(command.query(), |load, plan| {
166                    load.execute_numeric_field_boundary(
167                        plan,
168                        target_slot.clone(),
169                        ScalarNumericFieldBoundaryRequest::Avg,
170                    )
171                })
172                .map(|value| value.map_or(Value::Null, Value::Decimal)),
173            TypedSqlGlobalAggregateTerminal::MinField(target_slot) => self
174                .execute_load_query_with(command.query(), |load, plan| {
175                    load.execute_scalar_extrema_value_boundary(
176                        plan,
177                        target_slot.clone(),
178                        AggregateKind::Min,
179                    )
180                })
181                .map(|value| value.unwrap_or(Value::Null)),
182            TypedSqlGlobalAggregateTerminal::MaxField(target_slot) => self
183                .execute_load_query_with(command.query(), |load, plan| {
184                    load.execute_scalar_extrema_value_boundary(
185                        plan,
186                        target_slot.clone(),
187                        AggregateKind::Max,
188                    )
189                })
190                .map(|value| value.unwrap_or(Value::Null)),
191        }
192    }
193}