Skip to main content

icydb_core/db/session/sql/
aggregate.rs

1//! Module: db::session::sql::aggregate
2//! Responsibility: module-local ownership and contracts for db::session::sql::aggregate.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6use crate::{
7    db::{
8        DbSession, MissingRowPolicy, PersistedRow, QueryError,
9        executor::{ScalarNumericFieldBoundaryRequest, ScalarProjectionBoundaryRequest},
10        session::sql::surface::sql_statement_route_from_statement,
11        session::sql::{SqlParsedStatement, SqlStatementRoute},
12        sql::lowering::{
13            PreparedSqlScalarAggregateRuntimeDescriptor, SqlGlobalAggregateCommand,
14            compile_sql_global_aggregate_command_from_prepared, is_sql_global_aggregate_statement,
15            prepare_sql_statement,
16        },
17        sql::parser::{SqlStatement, parse_sql},
18    },
19    traits::{CanisterKind, EntityValue},
20    value::Value,
21};
22
23#[derive(Clone, Copy, Debug, Eq, PartialEq)]
24pub(in crate::db::session::sql) enum SqlAggregateSurface {
25    QueryFrom,
26    ExecuteSql,
27    ExecuteSqlGrouped,
28    ExecuteSqlDispatch,
29    GeneratedQuerySurface,
30}
31
32pub(in crate::db::session::sql) fn parsed_requires_dedicated_sql_aggregate_lane(
33    parsed: &SqlParsedStatement,
34) -> bool {
35    is_sql_global_aggregate_statement(&parsed.statement)
36}
37
38pub(in crate::db::session::sql) const fn unsupported_sql_aggregate_lane_message(
39    surface: SqlAggregateSurface,
40) -> &'static str {
41    match surface {
42        SqlAggregateSurface::QueryFrom => {
43            "query_from_sql rejects global aggregate SELECT; use execute_sql_aggregate(...)"
44        }
45        SqlAggregateSurface::ExecuteSql => {
46            "execute_sql rejects global aggregate SELECT; use execute_sql_aggregate(...)"
47        }
48        SqlAggregateSurface::ExecuteSqlGrouped => {
49            "execute_sql_grouped rejects global aggregate SELECT; use execute_sql_aggregate(...)"
50        }
51        SqlAggregateSurface::ExecuteSqlDispatch => {
52            "execute_sql_dispatch rejects global aggregate SELECT; use execute_sql_aggregate(...)"
53        }
54        SqlAggregateSurface::GeneratedQuerySurface => {
55            "generated SQL query surface rejects global aggregate SELECT; use execute_sql_aggregate(...)"
56        }
57    }
58}
59
60const fn unsupported_sql_aggregate_surface_lane_message(route: &SqlStatementRoute) -> &'static str {
61    match route {
62        SqlStatementRoute::Query { .. } => {
63            "execute_sql_aggregate requires constrained global aggregate SELECT"
64        }
65        SqlStatementRoute::Explain { .. } => {
66            "execute_sql_aggregate rejects EXPLAIN; use execute_sql_dispatch"
67        }
68        SqlStatementRoute::Describe { .. } => {
69            "execute_sql_aggregate rejects DESCRIBE; use execute_sql_dispatch"
70        }
71        SqlStatementRoute::ShowIndexes { .. } => {
72            "execute_sql_aggregate rejects SHOW INDEXES; use execute_sql_dispatch"
73        }
74        SqlStatementRoute::ShowColumns { .. } => {
75            "execute_sql_aggregate rejects SHOW COLUMNS; use execute_sql_dispatch"
76        }
77        SqlStatementRoute::ShowEntities => {
78            "execute_sql_aggregate rejects SHOW ENTITIES; use execute_sql_dispatch"
79        }
80    }
81}
82
83const fn unsupported_sql_aggregate_grouped_message() -> &'static str {
84    "execute_sql_aggregate rejects grouped SELECT; use execute_sql_grouped(...)"
85}
86
87impl<C: CanisterKind> DbSession<C> {
88    // Require one resolved target slot from a prepared field-target SQL
89    // aggregate strategy before dispatching into execution families.
90    fn prepared_sql_scalar_target_slot_required(
91        strategy: &crate::db::sql::lowering::PreparedSqlScalarAggregateStrategy,
92        message: &'static str,
93    ) -> Result<crate::db::query::plan::FieldSlot, QueryError> {
94        strategy
95            .target_slot()
96            .cloned()
97            .ok_or_else(|| QueryError::invariant(message))
98    }
99
100    // Execute prepared COUNT(*) through the shared existing-rows scalar
101    // terminal boundary.
102    fn execute_prepared_sql_scalar_count_rows<E>(
103        &self,
104        command: &SqlGlobalAggregateCommand<E>,
105    ) -> Result<Value, QueryError>
106    where
107        E: PersistedRow<Canister = C> + EntityValue,
108    {
109        self.execute_load_query_with(command.query(), |load, plan| {
110            load.execute_scalar_terminal_request(
111                plan,
112                crate::db::executor::ScalarTerminalBoundaryRequest::Count,
113            )?
114            .into_count()
115        })
116        .map(|count| Value::Uint(u64::from(count)))
117    }
118
119    // Execute prepared COUNT(field) through the shared scalar projection
120    // boundary.
121    fn execute_prepared_sql_scalar_count_field<E>(
122        &self,
123        command: &SqlGlobalAggregateCommand<E>,
124        strategy: &crate::db::sql::lowering::PreparedSqlScalarAggregateStrategy,
125    ) -> Result<Value, QueryError>
126    where
127        E: PersistedRow<Canister = C> + EntityValue,
128    {
129        let target_slot = Self::prepared_sql_scalar_target_slot_required(
130            strategy,
131            "prepared COUNT(field) SQL aggregate strategy requires target slot",
132        )?;
133
134        self.execute_load_query_with(command.query(), |load, plan| {
135            load.execute_scalar_projection_boundary(
136                plan,
137                target_slot.clone(),
138                ScalarProjectionBoundaryRequest::CountNonNull,
139            )?
140            .into_count()
141        })
142        .map(|count| Value::Uint(u64::from(count)))
143    }
144
145    // Execute prepared SUM/AVG(field) through the shared numeric field
146    // boundary.
147    fn execute_prepared_sql_scalar_numeric_field<E>(
148        &self,
149        command: &SqlGlobalAggregateCommand<E>,
150        strategy: &crate::db::sql::lowering::PreparedSqlScalarAggregateStrategy,
151        request: ScalarNumericFieldBoundaryRequest,
152        message: &'static str,
153    ) -> Result<Value, QueryError>
154    where
155        E: PersistedRow<Canister = C> + EntityValue,
156    {
157        let target_slot = Self::prepared_sql_scalar_target_slot_required(strategy, message)?;
158
159        self.execute_load_query_with(command.query(), |load, plan| {
160            load.execute_numeric_field_boundary(plan, target_slot.clone(), request)
161        })
162        .map(|value| value.map_or(Value::Null, Value::Decimal))
163    }
164
165    // Execute prepared MIN/MAX(field) through the shared extrema-value
166    // boundary.
167    fn execute_prepared_sql_scalar_extrema_field<E>(
168        &self,
169        command: &SqlGlobalAggregateCommand<E>,
170        strategy: &crate::db::sql::lowering::PreparedSqlScalarAggregateStrategy,
171        kind: crate::db::query::plan::AggregateKind,
172    ) -> Result<Value, QueryError>
173    where
174        E: PersistedRow<Canister = C> + EntityValue,
175    {
176        let target_slot = Self::prepared_sql_scalar_target_slot_required(
177            strategy,
178            "prepared extrema SQL aggregate strategy requires target slot",
179        )?;
180
181        self.execute_load_query_with(command.query(), |load, plan| {
182            load.execute_scalar_extrema_value_boundary(plan, target_slot.clone(), kind)
183        })
184        .map(|value| value.unwrap_or(Value::Null))
185    }
186
187    // Execute one prepared typed SQL scalar aggregate strategy through the
188    // existing aggregate boundary families without rediscovering behavior from
189    // raw SQL terminal variants at the session layer.
190    fn execute_prepared_sql_scalar_aggregate<E>(
191        &self,
192        command: &SqlGlobalAggregateCommand<E>,
193    ) -> Result<Value, QueryError>
194    where
195        E: PersistedRow<Canister = C> + EntityValue,
196    {
197        let strategy = command.prepared_scalar_strategy();
198
199        match strategy.runtime_descriptor() {
200            PreparedSqlScalarAggregateRuntimeDescriptor::CountRows => {
201                self.execute_prepared_sql_scalar_count_rows(command)
202            }
203            PreparedSqlScalarAggregateRuntimeDescriptor::CountField => {
204                self.execute_prepared_sql_scalar_count_field(command, &strategy)
205            }
206            PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
207                kind: crate::db::query::plan::AggregateKind::Sum,
208            } => self.execute_prepared_sql_scalar_numeric_field(
209                command,
210                &strategy,
211                ScalarNumericFieldBoundaryRequest::Sum,
212                "prepared SUM(field) SQL aggregate strategy requires target slot",
213            ),
214            PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
215                kind: crate::db::query::plan::AggregateKind::Avg,
216            } => self.execute_prepared_sql_scalar_numeric_field(
217                command,
218                &strategy,
219                ScalarNumericFieldBoundaryRequest::Avg,
220                "prepared AVG(field) SQL aggregate strategy requires target slot",
221            ),
222            PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField { kind } => {
223                self.execute_prepared_sql_scalar_extrema_field(command, &strategy, kind)
224            }
225            PreparedSqlScalarAggregateRuntimeDescriptor::NumericField { .. } => {
226                Err(QueryError::invariant(
227                    "prepared SQL scalar aggregate numeric runtime descriptor drift",
228                ))
229            }
230        }
231    }
232
233    /// Execute one reduced SQL global aggregate `SELECT` statement.
234    ///
235    /// This entrypoint is intentionally constrained to one aggregate terminal
236    /// shape per statement and preserves existing terminal semantics.
237    pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
238    where
239        E: PersistedRow<Canister = C> + EntityValue,
240    {
241        // Parse once into one owned statement so the aggregate lane can keep
242        // its surface checks and lowering on the same statement instance.
243        let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
244
245        // First keep wrong-lane traffic on an explicit aggregate-surface
246        // contract instead of relying on generic lowering failures.
247        match &statement {
248            SqlStatement::Select(_) if is_sql_global_aggregate_statement(&statement) => {}
249            SqlStatement::Select(statement) if !statement.group_by.is_empty() => {
250                return Err(QueryError::unsupported_query(
251                    unsupported_sql_aggregate_grouped_message(),
252                ));
253            }
254            SqlStatement::Delete(_) => {
255                return Err(QueryError::unsupported_query(
256                    "execute_sql_aggregate rejects DELETE; use execute_sql_dispatch",
257                ));
258            }
259            _ => {
260                let route = sql_statement_route_from_statement(&statement);
261
262                return Err(QueryError::unsupported_query(
263                    unsupported_sql_aggregate_surface_lane_message(&route),
264                ));
265            }
266        }
267
268        // First lower the SQL surface onto the existing single-terminal
269        // aggregate command authority so execution never has to rediscover the
270        // accepted aggregate shape family.
271        let command = compile_sql_global_aggregate_command_from_prepared::<E>(
272            prepare_sql_statement(statement, E::MODEL.name())
273                .map_err(QueryError::from_sql_lowering_error)?,
274            MissingRowPolicy::Ignore,
275        )
276        .map_err(QueryError::from_sql_lowering_error)?;
277
278        // Then dispatch through one prepared typed-scalar aggregate strategy so
279        // SQL aggregate execution and SQL aggregate explain consume the same
280        // behavioral source instead of matching raw terminal variants twice.
281        self.execute_prepared_sql_scalar_aggregate(&command)
282    }
283}