icydb_core/db/session/sql/
aggregate.rs1use crate::{
7 db::{
8 DbSession, MissingRowPolicy, PersistedRow, QueryError,
9 executor::{ScalarNumericFieldBoundaryRequest, ScalarProjectionBoundaryRequest},
10 query::plan::AggregateKind,
11 session::sql::surface::sql_statement_route_from_statement,
12 session::sql::{SqlParsedStatement, SqlStatementRoute},
13 sql::lowering::{
14 TypedSqlGlobalAggregateTerminal, compile_sql_global_aggregate_command_from_prepared,
15 is_sql_global_aggregate_statement, prepare_sql_statement,
16 },
17 sql::parser::{SqlStatement, parse_sql},
18 },
19 traits::{CanisterKind, EntityValue},
20 value::Value,
21};
22
23#[derive(Clone, Copy, Debug, Eq, PartialEq)]
24pub(in crate::db::session::sql) enum SqlAggregateSurface {
25 QueryFrom,
26 ExecuteSql,
27 ExecuteSqlGrouped,
28 ExecuteSqlDispatch,
29 GeneratedQuerySurface,
30}
31
32pub(in crate::db::session::sql) fn parsed_requires_dedicated_sql_aggregate_lane(
33 parsed: &SqlParsedStatement,
34) -> bool {
35 is_sql_global_aggregate_statement(&parsed.statement)
36}
37
38pub(in crate::db::session::sql) const fn unsupported_sql_aggregate_lane_message(
39 surface: SqlAggregateSurface,
40) -> &'static str {
41 match surface {
42 SqlAggregateSurface::QueryFrom => {
43 "query_from_sql rejects global aggregate SELECT; use execute_sql_aggregate(...)"
44 }
45 SqlAggregateSurface::ExecuteSql => {
46 "execute_sql rejects global aggregate SELECT; use execute_sql_aggregate(...)"
47 }
48 SqlAggregateSurface::ExecuteSqlGrouped => {
49 "execute_sql_grouped rejects global aggregate SELECT; use execute_sql_aggregate(...)"
50 }
51 SqlAggregateSurface::ExecuteSqlDispatch => {
52 "execute_sql_dispatch rejects global aggregate SELECT; use execute_sql_aggregate(...)"
53 }
54 SqlAggregateSurface::GeneratedQuerySurface => {
55 "generated SQL query surface rejects global aggregate SELECT; use execute_sql_aggregate(...)"
56 }
57 }
58}
59
60const fn unsupported_sql_aggregate_surface_lane_message(route: &SqlStatementRoute) -> &'static str {
61 match route {
62 SqlStatementRoute::Query { .. } => {
63 "execute_sql_aggregate requires constrained global aggregate SELECT"
64 }
65 SqlStatementRoute::Explain { .. } => {
66 "execute_sql_aggregate rejects EXPLAIN; use execute_sql_dispatch"
67 }
68 SqlStatementRoute::Describe { .. } => {
69 "execute_sql_aggregate rejects DESCRIBE; use execute_sql_dispatch"
70 }
71 SqlStatementRoute::ShowIndexes { .. } => {
72 "execute_sql_aggregate rejects SHOW INDEXES; use execute_sql_dispatch"
73 }
74 SqlStatementRoute::ShowColumns { .. } => {
75 "execute_sql_aggregate rejects SHOW COLUMNS; use execute_sql_dispatch"
76 }
77 SqlStatementRoute::ShowEntities => {
78 "execute_sql_aggregate rejects SHOW ENTITIES; use execute_sql_dispatch"
79 }
80 }
81}
82
83const fn unsupported_sql_aggregate_grouped_message() -> &'static str {
84 "execute_sql_aggregate rejects grouped SELECT; use execute_sql_grouped(...)"
85}
86
87impl<C: CanisterKind> DbSession<C> {
88 pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
93 where
94 E: PersistedRow<Canister = C> + EntityValue,
95 {
96 let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
99
100 match &statement {
103 SqlStatement::Select(_) if is_sql_global_aggregate_statement(&statement) => {}
104 SqlStatement::Select(statement) if !statement.group_by.is_empty() => {
105 return Err(QueryError::unsupported_query(
106 unsupported_sql_aggregate_grouped_message(),
107 ));
108 }
109 SqlStatement::Delete(_) => {
110 return Err(QueryError::unsupported_query(
111 "execute_sql_aggregate rejects DELETE; use execute_sql_dispatch",
112 ));
113 }
114 _ => {
115 let route = sql_statement_route_from_statement(&statement);
116
117 return Err(QueryError::unsupported_query(
118 unsupported_sql_aggregate_surface_lane_message(&route),
119 ));
120 }
121 }
122
123 let command = compile_sql_global_aggregate_command_from_prepared::<E>(
127 prepare_sql_statement(statement, E::MODEL.name())
128 .map_err(QueryError::from_sql_lowering_error)?,
129 MissingRowPolicy::Ignore,
130 )
131 .map_err(QueryError::from_sql_lowering_error)?;
132
133 match command.terminal() {
136 TypedSqlGlobalAggregateTerminal::CountRows => self
137 .execute_load_query_with(command.query(), |load, plan| {
138 load.execute_scalar_terminal_request(
139 plan,
140 crate::db::executor::ScalarTerminalBoundaryRequest::Count,
141 )?
142 .into_count()
143 })
144 .map(|count| Value::Uint(u64::from(count))),
145 TypedSqlGlobalAggregateTerminal::CountField(target_slot) => self
146 .execute_load_query_with(command.query(), |load, plan| {
147 load.execute_scalar_projection_boundary(
148 plan,
149 target_slot.clone(),
150 ScalarProjectionBoundaryRequest::CountNonNull,
151 )?
152 .into_count()
153 })
154 .map(|count| Value::Uint(u64::from(count))),
155 TypedSqlGlobalAggregateTerminal::SumField(target_slot) => self
156 .execute_load_query_with(command.query(), |load, plan| {
157 load.execute_numeric_field_boundary(
158 plan,
159 target_slot.clone(),
160 ScalarNumericFieldBoundaryRequest::Sum,
161 )
162 })
163 .map(|value| value.map_or(Value::Null, Value::Decimal)),
164 TypedSqlGlobalAggregateTerminal::AvgField(target_slot) => self
165 .execute_load_query_with(command.query(), |load, plan| {
166 load.execute_numeric_field_boundary(
167 plan,
168 target_slot.clone(),
169 ScalarNumericFieldBoundaryRequest::Avg,
170 )
171 })
172 .map(|value| value.map_or(Value::Null, Value::Decimal)),
173 TypedSqlGlobalAggregateTerminal::MinField(target_slot) => self
174 .execute_load_query_with(command.query(), |load, plan| {
175 load.execute_scalar_extrema_value_boundary(
176 plan,
177 target_slot.clone(),
178 AggregateKind::Min,
179 )
180 })
181 .map(|value| value.unwrap_or(Value::Null)),
182 TypedSqlGlobalAggregateTerminal::MaxField(target_slot) => self
183 .execute_load_query_with(command.query(), |load, plan| {
184 load.execute_scalar_extrema_value_boundary(
185 plan,
186 target_slot.clone(),
187 AggregateKind::Max,
188 )
189 })
190 .map(|value| value.unwrap_or(Value::Null)),
191 }
192 }
193}