1use crate::{
2 db::{
3 DbSession, EntityFieldDescription, EntityResponse, EntitySchemaDescription,
4 MissingRowPolicy, PagedGroupedExecutionWithTrace, PersistedRow, Query, QueryError,
5 executor::{
6 EntityAuthority, KernelRow, ScalarNumericFieldBoundaryRequest,
7 ScalarProjectionBoundaryRequest, execute_sql_delete_projection_for_canister,
8 execute_sql_projection_rows_for_canister,
9 },
10 query::{
11 builder::aggregate::{AggregateExpr, avg, count, count_by, max_by, min_by, sum},
12 intent::StructuralQuery,
13 plan::{
14 AggregateKind, FieldSlot,
15 expr::{Expr, ProjectionField},
16 resolve_aggregate_target_field_slot,
17 },
18 },
19 sql::lowering::{
20 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlCommand, LoweredSqlLaneKind,
21 LoweredSqlQuery, PreparedSqlStatement as CorePreparedSqlStatement,
22 SqlGlobalAggregateTerminal, StructuralSqlGlobalAggregateCommand,
23 apply_lowered_select_shape, bind_lowered_sql_delete_query_structural,
24 bind_lowered_sql_explain_global_aggregate_structural, bind_lowered_sql_query,
25 compile_sql_global_aggregate_command, lower_sql_command_from_prepared_statement,
26 lowered_sql_command_lane, prepare_sql_statement,
27 render_lowered_sql_explain_plan_or_json,
28 },
29 sql::parser::{
30 SqlExplainMode, SqlExplainStatement, SqlExplainTarget, SqlStatement, parse_sql,
31 },
32 },
33 model::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue},
35 value::Value,
36};
37
38#[derive(Clone, Debug, Eq, PartialEq)]
46pub enum SqlStatementRoute {
47 Query { entity: String },
48 Explain { entity: String },
49 Describe { entity: String },
50 ShowIndexes { entity: String },
51 ShowColumns { entity: String },
52 ShowEntities,
53}
54
55#[derive(Debug)]
61pub enum SqlDispatchResult {
62 Projection {
63 columns: Vec<String>,
64 rows: Vec<Vec<Value>>,
65 row_count: u32,
66 },
67 Explain(String),
68 Describe(EntitySchemaDescription),
69 ShowIndexes(Vec<String>),
70 ShowColumns(Vec<EntityFieldDescription>),
71 ShowEntities(Vec<String>),
72}
73
74#[derive(Clone, Debug)]
82pub struct SqlParsedStatement {
83 statement: SqlStatement,
84 route: SqlStatementRoute,
85}
86
87impl SqlParsedStatement {
88 #[must_use]
90 pub const fn route(&self) -> &SqlStatementRoute {
91 &self.route
92 }
93
94 #[must_use]
101 pub const fn is_delete_like_query_surface(&self) -> bool {
102 matches!(
103 &self.statement,
104 SqlStatement::Delete(_)
105 | SqlStatement::Explain(SqlExplainStatement {
106 statement: SqlExplainTarget::Delete(_),
107 ..
108 })
109 )
110 }
111
112 fn prepare(
114 &self,
115 expected_entity: &'static str,
116 ) -> Result<CorePreparedSqlStatement, QueryError> {
117 prepare_sql_statement(self.statement.clone(), expected_entity)
118 .map_err(QueryError::from_sql_lowering_error)
119 }
120
121 pub fn lower_query_lane_for_entity(
123 &self,
124 expected_entity: &'static str,
125 primary_key_field: &str,
126 ) -> Result<LoweredSqlCommand, QueryError> {
127 let lowered = lower_sql_command_from_prepared_statement(
128 self.prepare(expected_entity)?,
129 primary_key_field,
130 )
131 .map_err(QueryError::from_sql_lowering_error)?;
132 let lane = lowered_sql_command_lane(&lowered);
133
134 match lane {
135 LoweredSqlLaneKind::Query | LoweredSqlLaneKind::Explain => Ok(lowered),
136 LoweredSqlLaneKind::Describe
137 | LoweredSqlLaneKind::ShowIndexes
138 | LoweredSqlLaneKind::ShowColumns
139 | LoweredSqlLaneKind::ShowEntities => {
140 Err(QueryError::unsupported_query_lane_dispatch())
141 }
142 }
143 }
144}
145
146#[derive(Debug)]
156struct SqlProjectionPayload {
157 columns: Vec<String>,
158 rows: Vec<Vec<Value>>,
159 row_count: u32,
160}
161
162impl SqlProjectionPayload {
163 #[must_use]
164 const fn new(columns: Vec<String>, rows: Vec<Vec<Value>>, row_count: u32) -> Self {
165 Self {
166 columns,
167 rows,
168 row_count,
169 }
170 }
171
172 #[must_use]
173 fn into_dispatch_result(self) -> SqlDispatchResult {
174 SqlDispatchResult::Projection {
175 columns: self.columns,
176 rows: self.rows,
177 row_count: self.row_count,
178 }
179 }
180}
181
182impl SqlStatementRoute {
183 #[must_use]
188 pub const fn entity(&self) -> &str {
189 match self {
190 Self::Query { entity }
191 | Self::Explain { entity }
192 | Self::Describe { entity }
193 | Self::ShowIndexes { entity }
194 | Self::ShowColumns { entity } => entity.as_str(),
195 Self::ShowEntities => "",
196 }
197 }
198
199 #[must_use]
201 pub const fn is_explain(&self) -> bool {
202 matches!(self, Self::Explain { .. })
203 }
204
205 #[must_use]
207 pub const fn is_describe(&self) -> bool {
208 matches!(self, Self::Describe { .. })
209 }
210
211 #[must_use]
213 pub const fn is_show_indexes(&self) -> bool {
214 matches!(self, Self::ShowIndexes { .. })
215 }
216
217 #[must_use]
219 pub const fn is_show_columns(&self) -> bool {
220 matches!(self, Self::ShowColumns { .. })
221 }
222
223 #[must_use]
225 pub const fn is_show_entities(&self) -> bool {
226 matches!(self, Self::ShowEntities)
227 }
228}
229
230#[derive(Clone, Copy, Debug, Eq, PartialEq)]
232enum SqlLaneKind {
233 Query,
234 Explain,
235 Describe,
236 ShowIndexes,
237 ShowColumns,
238 ShowEntities,
239}
240
241#[derive(Clone, Copy, Debug, Eq, PartialEq)]
243enum SqlSurface {
244 QueryFrom,
245 Explain,
246}
247
248const fn session_sql_lane(command: &LoweredSqlCommand) -> SqlLaneKind {
250 match lowered_sql_command_lane(command) {
251 LoweredSqlLaneKind::Query => SqlLaneKind::Query,
252 LoweredSqlLaneKind::Explain => SqlLaneKind::Explain,
253 LoweredSqlLaneKind::Describe => SqlLaneKind::Describe,
254 LoweredSqlLaneKind::ShowIndexes => SqlLaneKind::ShowIndexes,
255 LoweredSqlLaneKind::ShowColumns => SqlLaneKind::ShowColumns,
256 LoweredSqlLaneKind::ShowEntities => SqlLaneKind::ShowEntities,
257 }
258}
259
260const fn unsupported_sql_lane_message(surface: SqlSurface, lane: SqlLaneKind) -> &'static str {
262 match (surface, lane) {
263 (SqlSurface::QueryFrom, SqlLaneKind::Explain) => {
264 "query_from_sql does not accept EXPLAIN statements; use execute_sql_dispatch(...)"
265 }
266 (SqlSurface::QueryFrom, SqlLaneKind::Describe) => {
267 "query_from_sql does not accept DESCRIBE statements; use execute_sql_dispatch(...)"
268 }
269 (SqlSurface::QueryFrom, SqlLaneKind::ShowIndexes) => {
270 "query_from_sql does not accept SHOW INDEXES statements; use execute_sql_dispatch(...)"
271 }
272 (SqlSurface::QueryFrom, SqlLaneKind::ShowColumns) => {
273 "query_from_sql does not accept SHOW COLUMNS statements; use execute_sql_dispatch(...)"
274 }
275 (SqlSurface::QueryFrom, SqlLaneKind::ShowEntities) => {
276 "query_from_sql does not accept SHOW ENTITIES/SHOW TABLES statements; use execute_sql_dispatch(...)"
277 }
278 (SqlSurface::QueryFrom, SqlLaneKind::Query) => {
279 "query_from_sql requires one executable SELECT or DELETE statement"
280 }
281 (SqlSurface::Explain, SqlLaneKind::Describe) => {
282 "explain_sql does not accept DESCRIBE statements; use execute_sql_dispatch(...)"
283 }
284 (SqlSurface::Explain, SqlLaneKind::ShowIndexes) => {
285 "explain_sql does not accept SHOW INDEXES statements; use execute_sql_dispatch(...)"
286 }
287 (SqlSurface::Explain, SqlLaneKind::ShowColumns) => {
288 "explain_sql does not accept SHOW COLUMNS statements; use execute_sql_dispatch(...)"
289 }
290 (SqlSurface::Explain, SqlLaneKind::ShowEntities) => {
291 "explain_sql does not accept SHOW ENTITIES/SHOW TABLES statements; use execute_sql_dispatch(...)"
292 }
293 (SqlSurface::Explain, SqlLaneKind::Query | SqlLaneKind::Explain) => {
294 "explain_sql requires an EXPLAIN statement"
295 }
296 }
297}
298
299fn sql_statement_route_from_statement(statement: &SqlStatement) -> SqlStatementRoute {
301 match statement {
302 SqlStatement::Select(select) => SqlStatementRoute::Query {
303 entity: select.entity.clone(),
304 },
305 SqlStatement::Delete(delete) => SqlStatementRoute::Query {
306 entity: delete.entity.clone(),
307 },
308 SqlStatement::Explain(explain) => match &explain.statement {
309 SqlExplainTarget::Select(select) => SqlStatementRoute::Explain {
310 entity: select.entity.clone(),
311 },
312 SqlExplainTarget::Delete(delete) => SqlStatementRoute::Explain {
313 entity: delete.entity.clone(),
314 },
315 },
316 SqlStatement::Describe(describe) => SqlStatementRoute::Describe {
317 entity: describe.entity.clone(),
318 },
319 SqlStatement::ShowIndexes(show_indexes) => SqlStatementRoute::ShowIndexes {
320 entity: show_indexes.entity.clone(),
321 },
322 SqlStatement::ShowColumns(show_columns) => SqlStatementRoute::ShowColumns {
323 entity: show_columns.entity.clone(),
324 },
325 SqlStatement::ShowEntities(_) => SqlStatementRoute::ShowEntities,
326 }
327}
328
329fn resolve_sql_aggregate_target_slot_with_model(
332 model: &'static EntityModel,
333 field: &str,
334) -> Result<FieldSlot, QueryError> {
335 resolve_aggregate_target_field_slot(model, field)
336}
337
338fn resolve_sql_aggregate_target_slot<E: EntityKind>(field: &str) -> Result<FieldSlot, QueryError> {
339 resolve_sql_aggregate_target_slot_with_model(E::MODEL, field)
340}
341
342fn sql_global_aggregate_terminal_to_expr_with_model(
345 model: &'static EntityModel,
346 terminal: &SqlGlobalAggregateTerminal,
347) -> Result<AggregateExpr, QueryError> {
348 match terminal {
349 SqlGlobalAggregateTerminal::CountRows => Ok(count()),
350 SqlGlobalAggregateTerminal::CountField(field) => {
351 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
352
353 Ok(count_by(field.as_str()))
354 }
355 SqlGlobalAggregateTerminal::SumField(field) => {
356 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
357
358 Ok(sum(field.as_str()))
359 }
360 SqlGlobalAggregateTerminal::AvgField(field) => {
361 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
362
363 Ok(avg(field.as_str()))
364 }
365 SqlGlobalAggregateTerminal::MinField(field) => {
366 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
367
368 Ok(min_by(field.as_str()))
369 }
370 SqlGlobalAggregateTerminal::MaxField(field) => {
371 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
372
373 Ok(max_by(field.as_str()))
374 }
375 }
376}
377
378fn projection_label_from_aggregate(aggregate: &AggregateExpr) -> String {
380 let kind = match aggregate.kind() {
381 AggregateKind::Count => "COUNT",
382 AggregateKind::Sum => "SUM",
383 AggregateKind::Avg => "AVG",
384 AggregateKind::Exists => "EXISTS",
385 AggregateKind::First => "FIRST",
386 AggregateKind::Last => "LAST",
387 AggregateKind::Min => "MIN",
388 AggregateKind::Max => "MAX",
389 };
390 let distinct = if aggregate.is_distinct() {
391 "DISTINCT "
392 } else {
393 ""
394 };
395
396 if let Some(field) = aggregate.target_field() {
397 return format!("{kind}({distinct}{field})");
398 }
399
400 format!("{kind}({distinct}*)")
401}
402
403fn projection_label_from_expr(expr: &Expr, ordinal: usize) -> String {
405 match expr {
406 Expr::Field(field) => field.as_str().to_string(),
407 Expr::Aggregate(aggregate) => projection_label_from_aggregate(aggregate),
408 Expr::Alias { name, .. } => name.as_str().to_string(),
409 Expr::Literal(_) | Expr::Unary { .. } | Expr::Binary { .. } => {
410 format!("expr_{ordinal}")
411 }
412 }
413}
414
415fn projection_labels_from_structural_query(
417 query: &StructuralQuery,
418) -> Result<Vec<String>, QueryError> {
419 let projection = query.build_plan()?.projection_spec(query.model());
420 Ok(projection_labels_from_projection_spec(&projection))
421}
422
423fn projection_labels_from_projection_spec(
426 projection: &crate::db::query::plan::expr::ProjectionSpec,
427) -> Vec<String> {
428 let mut labels = Vec::with_capacity(projection.len());
429
430 for (ordinal, field) in projection.fields().enumerate() {
431 match field {
432 ProjectionField::Scalar {
433 expr: _,
434 alias: Some(alias),
435 } => labels.push(alias.as_str().to_string()),
436 ProjectionField::Scalar { expr, alias: None } => {
437 labels.push(projection_label_from_expr(expr, ordinal));
438 }
439 }
440 }
441
442 labels
443}
444
445fn projection_labels_from_entity_model(model: &'static EntityModel) -> Vec<String> {
447 model
448 .fields
449 .iter()
450 .map(|field| field.name.to_string())
451 .collect()
452}
453
454fn sql_projection_rows_from_kernel_rows(rows: Vec<KernelRow>) -> Vec<Vec<Value>> {
457 rows.into_iter()
458 .map(|row| {
459 row.into_slots()
460 .into_iter()
461 .map(|value| value.unwrap_or(Value::Null))
462 .collect()
463 })
464 .collect()
465}
466
467impl LoweredSqlCommand {
468 #[inline(never)]
471 pub fn explain_for_model(&self, model: &'static EntityModel) -> Result<String, QueryError> {
472 let lane = session_sql_lane(self);
475 if lane != SqlLaneKind::Explain {
476 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
477 SqlSurface::Explain,
478 lane,
479 )));
480 }
481
482 if let Some(rendered) =
485 render_lowered_sql_explain_plan_or_json(self, model, MissingRowPolicy::Ignore)
486 .map_err(QueryError::from_sql_lowering_error)?
487 {
488 return Ok(rendered);
489 }
490
491 if let Some((mode, command)) = bind_lowered_sql_explain_global_aggregate_structural(
494 self,
495 model,
496 MissingRowPolicy::Ignore,
497 ) {
498 return explain_sql_global_aggregate_structural(mode, command);
499 }
500
501 Err(QueryError::unsupported_query(
502 "shared EXPLAIN dispatch could not classify the lowered SQL command shape",
503 ))
504 }
505}
506
507#[inline(never)]
510fn explain_sql_global_aggregate_structural(
511 mode: SqlExplainMode,
512 command: StructuralSqlGlobalAggregateCommand,
513) -> Result<String, QueryError> {
514 let model = command.query().model();
515
516 match mode {
517 SqlExplainMode::Plan => {
518 let _ = sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
519
520 Ok(command
521 .query()
522 .build_plan()?
523 .explain_with_model(model)
524 .render_text_canonical())
525 }
526 SqlExplainMode::Execution => {
527 let aggregate =
528 sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
529 let plan = command.query().explain_aggregate_terminal(aggregate)?;
530
531 Ok(plan.execution_node_descriptor().render_text_tree())
532 }
533 SqlExplainMode::Json => {
534 let _ = sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
535
536 Ok(command
537 .query()
538 .build_plan()?
539 .explain_with_model(model)
540 .render_json_canonical())
541 }
542 }
543}
544
545impl<C: CanisterKind> DbSession<C> {
546 fn bind_sql_query_lane_from_parsed<E>(
549 parsed: &SqlParsedStatement,
550 ) -> Result<(LoweredSqlQuery, Query<E>), QueryError>
551 where
552 E: EntityKind<Canister = C>,
553 {
554 let lowered =
555 parsed.lower_query_lane_for_entity(E::MODEL.name(), E::MODEL.primary_key.name)?;
556 let lane = session_sql_lane(&lowered);
557 let Some(query) = lowered.query().cloned() else {
558 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
559 SqlSurface::QueryFrom,
560 lane,
561 )));
562 };
563 let typed = bind_lowered_sql_query::<E>(query.clone(), MissingRowPolicy::Ignore)
564 .map_err(QueryError::from_sql_lowering_error)?;
565
566 Ok((query, typed))
567 }
568
569 fn execute_structural_sql_projection(
573 &self,
574 query: StructuralQuery,
575 authority: EntityAuthority,
576 ) -> Result<SqlProjectionPayload, QueryError> {
577 let columns = projection_labels_from_structural_query(&query)?;
578 let projected = execute_sql_projection_rows_for_canister(
579 &self.db,
580 self.debug,
581 authority,
582 query.build_plan()?,
583 )
584 .map_err(QueryError::execute)?;
585 let (rows, row_count) = projected.into_parts();
586
587 Ok(SqlProjectionPayload::new(columns, rows, row_count))
588 }
589
590 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
594 where
595 E: PersistedRow<Canister = C> + EntityValue,
596 {
597 let plan = query.plan()?.into_executable();
598 let deleted = self
599 .with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
600 .map_err(QueryError::execute)?;
601 let (rows, row_count) = deleted.into_parts();
602 let rows = sql_projection_rows_from_kernel_rows(rows);
603
604 Ok(SqlProjectionPayload::new(
605 projection_labels_from_entity_model(E::MODEL),
606 rows,
607 row_count,
608 )
609 .into_dispatch_result())
610 }
611
612 fn ensure_sql_query_grouping<E>(query: &Query<E>, grouped: bool) -> Result<(), QueryError>
615 where
616 E: EntityKind,
617 {
618 match (grouped, query.has_grouping()) {
619 (true, true) | (false, false) => Ok(()),
620 (false, true) => Err(QueryError::grouped_requires_execute_grouped()),
621 (true, false) => Err(QueryError::unsupported_query(
622 "execute_sql_grouped requires grouped SQL query intent",
623 )),
624 }
625 }
626
627 fn execute_lowered_sql_dispatch_select_core(
630 &self,
631 select: &LoweredSelectShape,
632 authority: EntityAuthority,
633 ) -> Result<SqlDispatchResult, QueryError> {
634 let structural = apply_lowered_select_shape(
635 StructuralQuery::new(authority.model(), MissingRowPolicy::Ignore),
636 select.clone(),
637 )
638 .map_err(QueryError::from_sql_lowering_error)?;
639
640 self.execute_structural_sql_projection(structural, authority)
641 .map(SqlProjectionPayload::into_dispatch_result)
642 }
643
644 pub fn parse_sql_statement(&self, sql: &str) -> Result<SqlParsedStatement, QueryError> {
648 let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
649 let route = sql_statement_route_from_statement(&statement);
650
651 Ok(SqlParsedStatement { statement, route })
652 }
653
654 pub fn sql_statement_route(&self, sql: &str) -> Result<SqlStatementRoute, QueryError> {
659 let parsed = self.parse_sql_statement(sql)?;
660
661 Ok(parsed.route().clone())
662 }
663
664 pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
669 where
670 E: EntityKind<Canister = C>,
671 {
672 let parsed = self.parse_sql_statement(sql)?;
673 let (_, query) = Self::bind_sql_query_lane_from_parsed::<E>(&parsed)?;
674
675 Ok(query)
676 }
677
678 pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
680 where
681 E: PersistedRow<Canister = C> + EntityValue,
682 {
683 let query = self.query_from_sql::<E>(sql)?;
684 Self::ensure_sql_query_grouping(&query, false)?;
685
686 self.execute_query(&query)
687 }
688
689 pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
694 where
695 E: PersistedRow<Canister = C> + EntityValue,
696 {
697 let command = compile_sql_global_aggregate_command::<E>(sql, MissingRowPolicy::Ignore)
698 .map_err(QueryError::from_sql_lowering_error)?;
699
700 match command.terminal() {
701 SqlGlobalAggregateTerminal::CountRows => self
702 .execute_load_query_with(command.query(), |load, plan| {
703 load.execute_scalar_terminal_request(
704 plan,
705 crate::db::executor::ScalarTerminalBoundaryRequest::Count,
706 )?
707 .into_count()
708 })
709 .map(|count| Value::Uint(u64::from(count))),
710 SqlGlobalAggregateTerminal::CountField(field) => {
711 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
712 self.execute_load_query_with(command.query(), |load, plan| {
713 load.execute_scalar_projection_boundary(
714 plan,
715 target_slot,
716 ScalarProjectionBoundaryRequest::Values,
717 )?
718 .into_values()
719 })
720 .map(|values| {
721 let count = values
722 .into_iter()
723 .filter(|value| !matches!(value, Value::Null))
724 .count();
725 Value::Uint(u64::try_from(count).unwrap_or(u64::MAX))
726 })
727 }
728 SqlGlobalAggregateTerminal::SumField(field) => {
729 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
730 self.execute_load_query_with(command.query(), |load, plan| {
731 load.execute_numeric_field_boundary(
732 plan,
733 target_slot,
734 ScalarNumericFieldBoundaryRequest::Sum,
735 )
736 })
737 .map(|value| value.map_or(Value::Null, Value::Decimal))
738 }
739 SqlGlobalAggregateTerminal::AvgField(field) => {
740 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
741 self.execute_load_query_with(command.query(), |load, plan| {
742 load.execute_numeric_field_boundary(
743 plan,
744 target_slot,
745 ScalarNumericFieldBoundaryRequest::Avg,
746 )
747 })
748 .map(|value| value.map_or(Value::Null, Value::Decimal))
749 }
750 SqlGlobalAggregateTerminal::MinField(field) => {
751 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
752 let min_id = self.execute_load_query_with(command.query(), |load, plan| {
753 load.execute_scalar_terminal_request(
754 plan,
755 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
756 kind: AggregateKind::Min,
757 target_field: target_slot,
758 },
759 )?
760 .into_id()
761 })?;
762
763 match min_id {
764 Some(id) => self
765 .load::<E>()
766 .by_id(id)
767 .first_value_by(field)
768 .map(|value| value.unwrap_or(Value::Null)),
769 None => Ok(Value::Null),
770 }
771 }
772 SqlGlobalAggregateTerminal::MaxField(field) => {
773 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
774 let max_id = self.execute_load_query_with(command.query(), |load, plan| {
775 load.execute_scalar_terminal_request(
776 plan,
777 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
778 kind: AggregateKind::Max,
779 target_field: target_slot,
780 },
781 )?
782 .into_id()
783 })?;
784
785 match max_id {
786 Some(id) => self
787 .load::<E>()
788 .by_id(id)
789 .first_value_by(field)
790 .map(|value| value.unwrap_or(Value::Null)),
791 None => Ok(Value::Null),
792 }
793 }
794 }
795 }
796
797 pub fn execute_sql_grouped<E>(
799 &self,
800 sql: &str,
801 cursor_token: Option<&str>,
802 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
803 where
804 E: PersistedRow<Canister = C> + EntityValue,
805 {
806 let query = self.query_from_sql::<E>(sql)?;
807 Self::ensure_sql_query_grouping(&query, true)?;
808
809 self.execute_grouped(&query, cursor_token)
810 }
811
812 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
814 where
815 E: PersistedRow<Canister = C> + EntityValue,
816 {
817 let parsed = self.parse_sql_statement(sql)?;
818
819 self.execute_sql_dispatch_parsed::<E>(&parsed)
820 }
821
822 pub fn execute_sql_dispatch_parsed<E>(
824 &self,
825 parsed: &SqlParsedStatement,
826 ) -> Result<SqlDispatchResult, QueryError>
827 where
828 E: PersistedRow<Canister = C> + EntityValue,
829 {
830 match parsed.route() {
831 SqlStatementRoute::Query { .. } => {
832 let (query, typed_query) = Self::bind_sql_query_lane_from_parsed::<E>(parsed)?;
833
834 Self::ensure_sql_query_grouping(&typed_query, false)?;
835
836 match query {
837 LoweredSqlQuery::Select(select) => self
838 .execute_lowered_sql_dispatch_select_core(
839 &select,
840 EntityAuthority::for_type::<E>(),
841 ),
842 LoweredSqlQuery::Delete(_) => self.execute_typed_sql_delete(&typed_query),
843 }
844 }
845 SqlStatementRoute::Explain { .. } => {
846 let lowered = lower_sql_command_from_prepared_statement(
847 parsed.prepare(E::MODEL.name())?,
848 E::MODEL.primary_key.name,
849 )
850 .map_err(QueryError::from_sql_lowering_error)?;
851
852 lowered
853 .explain_for_model(E::MODEL)
854 .map(SqlDispatchResult::Explain)
855 }
856 SqlStatementRoute::Describe { .. } => {
857 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
858 }
859 SqlStatementRoute::ShowIndexes { .. } => {
860 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
861 }
862 SqlStatementRoute::ShowColumns { .. } => {
863 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
864 }
865 SqlStatementRoute::ShowEntities => {
866 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
867 }
868 }
869 }
870
871 fn execute_lowered_sql_dispatch_delete_core(
874 &self,
875 delete: &LoweredBaseQueryShape,
876 authority: EntityAuthority,
877 ) -> Result<SqlDispatchResult, QueryError> {
878 let structural = bind_lowered_sql_delete_query_structural(
879 authority.model(),
880 delete.clone(),
881 MissingRowPolicy::Ignore,
882 );
883 let deleted = execute_sql_delete_projection_for_canister(
884 &self.db,
885 authority,
886 structural.build_plan()?,
887 )
888 .map_err(QueryError::execute)?;
889 let (rows, row_count) = deleted.into_parts();
890 let rows = sql_projection_rows_from_kernel_rows(rows);
891
892 Ok(SqlProjectionPayload::new(
893 projection_labels_from_entity_model(authority.model()),
894 rows,
895 row_count,
896 )
897 .into_dispatch_result())
898 }
899
900 #[doc(hidden)]
902 pub fn execute_lowered_sql_dispatch_query_for_authority(
903 &self,
904 lowered: &LoweredSqlCommand,
905 authority: EntityAuthority,
906 ) -> Result<SqlDispatchResult, QueryError> {
907 self.execute_lowered_sql_dispatch_query_core(lowered, authority)
908 }
909
910 #[doc(hidden)]
916 pub fn execute_lowered_sql_dispatch_select_for_authority(
917 &self,
918 lowered: &LoweredSqlCommand,
919 authority: EntityAuthority,
920 ) -> Result<SqlDispatchResult, QueryError> {
921 let Some(query) = lowered.query() else {
922 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
923 SqlSurface::QueryFrom,
924 session_sql_lane(lowered),
925 )));
926 };
927
928 match query {
929 LoweredSqlQuery::Select(select) => {
930 self.execute_lowered_sql_dispatch_select_core(select, authority)
931 }
932 LoweredSqlQuery::Delete(_) => Err(QueryError::unsupported_query(
933 "generated SQL query dispatch requires one lowered SELECT shape",
934 )),
935 }
936 }
937
938 fn execute_lowered_sql_dispatch_query_core(
941 &self,
942 lowered: &LoweredSqlCommand,
943 authority: EntityAuthority,
944 ) -> Result<SqlDispatchResult, QueryError> {
945 let Some(query) = lowered.query() else {
946 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
947 SqlSurface::QueryFrom,
948 session_sql_lane(lowered),
949 )));
950 };
951
952 match query {
953 LoweredSqlQuery::Select(select) => {
954 self.execute_lowered_sql_dispatch_select_core(select, authority)
955 }
956 LoweredSqlQuery::Delete(delete) => {
957 self.execute_lowered_sql_dispatch_delete_core(delete, authority)
958 }
959 }
960 }
961}