icydb_core/db/session/sql/
aggregate.rs1use crate::{
7 db::{
8 DbSession, MissingRowPolicy, PersistedRow, QueryError,
9 executor::{ScalarNumericFieldBoundaryRequest, ScalarProjectionBoundaryRequest},
10 session::sql::surface::sql_statement_route_from_statement,
11 session::sql::{SqlParsedStatement, SqlStatementRoute},
12 sql::lowering::{
13 PreparedSqlScalarAggregateRuntimeDescriptor, SqlGlobalAggregateCommand,
14 compile_sql_global_aggregate_command_from_prepared, is_sql_global_aggregate_statement,
15 prepare_sql_statement,
16 },
17 sql::parser::{SqlStatement, parse_sql},
18 },
19 traits::{CanisterKind, EntityValue},
20 value::Value,
21};
22
23#[derive(Clone, Copy, Debug, Eq, PartialEq)]
24pub(in crate::db::session::sql) enum SqlAggregateSurface {
25 QueryFrom,
26 ExecuteSql,
27 ExecuteSqlGrouped,
28 ExecuteSqlDispatch,
29 GeneratedQuerySurface,
30}
31
32pub(in crate::db::session::sql) fn parsed_requires_dedicated_sql_aggregate_lane(
33 parsed: &SqlParsedStatement,
34) -> bool {
35 is_sql_global_aggregate_statement(&parsed.statement)
36}
37
38pub(in crate::db::session::sql) const fn unsupported_sql_aggregate_lane_message(
39 surface: SqlAggregateSurface,
40) -> &'static str {
41 match surface {
42 SqlAggregateSurface::QueryFrom => {
43 "query_from_sql rejects global aggregate SELECT; use execute_sql_aggregate(...)"
44 }
45 SqlAggregateSurface::ExecuteSql => {
46 "execute_sql rejects global aggregate SELECT; use execute_sql_aggregate(...)"
47 }
48 SqlAggregateSurface::ExecuteSqlGrouped => {
49 "execute_sql_grouped rejects global aggregate SELECT; use execute_sql_aggregate(...)"
50 }
51 SqlAggregateSurface::ExecuteSqlDispatch => {
52 "execute_sql_dispatch rejects global aggregate SELECT; use execute_sql_aggregate(...)"
53 }
54 SqlAggregateSurface::GeneratedQuerySurface => {
55 "generated SQL query surface rejects global aggregate SELECT; use execute_sql_aggregate(...)"
56 }
57 }
58}
59
60const fn unsupported_sql_aggregate_surface_lane_message(route: &SqlStatementRoute) -> &'static str {
61 match route {
62 SqlStatementRoute::Query { .. } => {
63 "execute_sql_aggregate requires constrained global aggregate SELECT"
64 }
65 SqlStatementRoute::Explain { .. } => {
66 "execute_sql_aggregate rejects EXPLAIN; use execute_sql_dispatch"
67 }
68 SqlStatementRoute::Describe { .. } => {
69 "execute_sql_aggregate rejects DESCRIBE; use execute_sql_dispatch"
70 }
71 SqlStatementRoute::ShowIndexes { .. } => {
72 "execute_sql_aggregate rejects SHOW INDEXES; use execute_sql_dispatch"
73 }
74 SqlStatementRoute::ShowColumns { .. } => {
75 "execute_sql_aggregate rejects SHOW COLUMNS; use execute_sql_dispatch"
76 }
77 SqlStatementRoute::ShowEntities => {
78 "execute_sql_aggregate rejects SHOW ENTITIES; use execute_sql_dispatch"
79 }
80 }
81}
82
83const fn unsupported_sql_aggregate_grouped_message() -> &'static str {
84 "execute_sql_aggregate rejects grouped SELECT; use execute_sql_grouped(...)"
85}
86
87impl<C: CanisterKind> DbSession<C> {
88 fn prepared_sql_scalar_target_slot_required(
91 strategy: &crate::db::sql::lowering::PreparedSqlScalarAggregateStrategy,
92 message: &'static str,
93 ) -> Result<crate::db::query::plan::FieldSlot, QueryError> {
94 strategy
95 .target_slot()
96 .cloned()
97 .ok_or_else(|| QueryError::invariant(message))
98 }
99
100 fn execute_prepared_sql_scalar_count_rows<E>(
103 &self,
104 command: &SqlGlobalAggregateCommand<E>,
105 ) -> Result<Value, QueryError>
106 where
107 E: PersistedRow<Canister = C> + EntityValue,
108 {
109 self.execute_load_query_with(command.query(), |load, plan| {
110 load.execute_scalar_terminal_request(
111 plan,
112 crate::db::executor::ScalarTerminalBoundaryRequest::Count,
113 )?
114 .into_count()
115 })
116 .map(|count| Value::Uint(u64::from(count)))
117 }
118
119 fn execute_prepared_sql_scalar_count_field<E>(
122 &self,
123 command: &SqlGlobalAggregateCommand<E>,
124 strategy: &crate::db::sql::lowering::PreparedSqlScalarAggregateStrategy,
125 ) -> Result<Value, QueryError>
126 where
127 E: PersistedRow<Canister = C> + EntityValue,
128 {
129 let target_slot = Self::prepared_sql_scalar_target_slot_required(
130 strategy,
131 "prepared COUNT(field) SQL aggregate strategy requires target slot",
132 )?;
133
134 self.execute_load_query_with(command.query(), |load, plan| {
135 load.execute_scalar_projection_boundary(
136 plan,
137 target_slot.clone(),
138 ScalarProjectionBoundaryRequest::CountNonNull,
139 )?
140 .into_count()
141 })
142 .map(|count| Value::Uint(u64::from(count)))
143 }
144
145 fn execute_prepared_sql_scalar_numeric_field<E>(
148 &self,
149 command: &SqlGlobalAggregateCommand<E>,
150 strategy: &crate::db::sql::lowering::PreparedSqlScalarAggregateStrategy,
151 request: ScalarNumericFieldBoundaryRequest,
152 message: &'static str,
153 ) -> Result<Value, QueryError>
154 where
155 E: PersistedRow<Canister = C> + EntityValue,
156 {
157 let target_slot = Self::prepared_sql_scalar_target_slot_required(strategy, message)?;
158
159 self.execute_load_query_with(command.query(), |load, plan| {
160 load.execute_numeric_field_boundary(plan, target_slot.clone(), request)
161 })
162 .map(|value| value.map_or(Value::Null, Value::Decimal))
163 }
164
165 fn execute_prepared_sql_scalar_extrema_field<E>(
168 &self,
169 command: &SqlGlobalAggregateCommand<E>,
170 strategy: &crate::db::sql::lowering::PreparedSqlScalarAggregateStrategy,
171 kind: crate::db::query::plan::AggregateKind,
172 ) -> Result<Value, QueryError>
173 where
174 E: PersistedRow<Canister = C> + EntityValue,
175 {
176 let target_slot = Self::prepared_sql_scalar_target_slot_required(
177 strategy,
178 "prepared extrema SQL aggregate strategy requires target slot",
179 )?;
180
181 self.execute_load_query_with(command.query(), |load, plan| {
182 load.execute_scalar_extrema_value_boundary(plan, target_slot.clone(), kind)
183 })
184 .map(|value| value.unwrap_or(Value::Null))
185 }
186
187 fn execute_prepared_sql_scalar_aggregate<E>(
191 &self,
192 command: &SqlGlobalAggregateCommand<E>,
193 ) -> Result<Value, QueryError>
194 where
195 E: PersistedRow<Canister = C> + EntityValue,
196 {
197 let strategy = command.prepared_scalar_strategy();
198
199 match strategy.runtime_descriptor() {
200 PreparedSqlScalarAggregateRuntimeDescriptor::CountRows => {
201 self.execute_prepared_sql_scalar_count_rows(command)
202 }
203 PreparedSqlScalarAggregateRuntimeDescriptor::CountField => {
204 self.execute_prepared_sql_scalar_count_field(command, &strategy)
205 }
206 PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
207 kind: crate::db::query::plan::AggregateKind::Sum,
208 } => self.execute_prepared_sql_scalar_numeric_field(
209 command,
210 &strategy,
211 ScalarNumericFieldBoundaryRequest::Sum,
212 "prepared SUM(field) SQL aggregate strategy requires target slot",
213 ),
214 PreparedSqlScalarAggregateRuntimeDescriptor::NumericField {
215 kind: crate::db::query::plan::AggregateKind::Avg,
216 } => self.execute_prepared_sql_scalar_numeric_field(
217 command,
218 &strategy,
219 ScalarNumericFieldBoundaryRequest::Avg,
220 "prepared AVG(field) SQL aggregate strategy requires target slot",
221 ),
222 PreparedSqlScalarAggregateRuntimeDescriptor::ExtremalWinnerField { kind } => {
223 self.execute_prepared_sql_scalar_extrema_field(command, &strategy, kind)
224 }
225 PreparedSqlScalarAggregateRuntimeDescriptor::NumericField { .. } => {
226 Err(QueryError::invariant(
227 "prepared SQL scalar aggregate numeric runtime descriptor drift",
228 ))
229 }
230 }
231 }
232
233 pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
238 where
239 E: PersistedRow<Canister = C> + EntityValue,
240 {
241 let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
244
245 match &statement {
248 SqlStatement::Select(_) if is_sql_global_aggregate_statement(&statement) => {}
249 SqlStatement::Select(statement) if !statement.group_by.is_empty() => {
250 return Err(QueryError::unsupported_query(
251 unsupported_sql_aggregate_grouped_message(),
252 ));
253 }
254 SqlStatement::Delete(_) => {
255 return Err(QueryError::unsupported_query(
256 "execute_sql_aggregate rejects DELETE; use execute_sql_dispatch",
257 ));
258 }
259 _ => {
260 let route = sql_statement_route_from_statement(&statement);
261
262 return Err(QueryError::unsupported_query(
263 unsupported_sql_aggregate_surface_lane_message(&route),
264 ));
265 }
266 }
267
268 let command = compile_sql_global_aggregate_command_from_prepared::<E>(
272 prepare_sql_statement(statement, E::MODEL.name())
273 .map_err(QueryError::from_sql_lowering_error)?,
274 MissingRowPolicy::Ignore,
275 )
276 .map_err(QueryError::from_sql_lowering_error)?;
277
278 self.execute_prepared_sql_scalar_aggregate(&command)
282 }
283}