1use crate::{
2 db::{
3 DbSession, EntityFieldDescription, EntityResponse, EntitySchemaDescription,
4 MissingRowPolicy, PagedGroupedExecutionWithTrace, PersistedRow, Query, QueryError,
5 executor::{
6 EntityAuthority, KernelRow, ScalarNumericFieldBoundaryRequest,
7 ScalarProjectionBoundaryRequest, execute_sql_delete_projection_for_canister,
8 execute_sql_projection_rows_for_canister,
9 },
10 query::{
11 builder::aggregate::{AggregateExpr, avg, count, count_by, max_by, min_by, sum},
12 intent::StructuralQuery,
13 plan::{
14 AggregateKind, FieldSlot,
15 expr::{Expr, ProjectionField},
16 resolve_aggregate_target_field_slot,
17 },
18 },
19 sql::lowering::{
20 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlCommand, LoweredSqlLaneKind,
21 LoweredSqlQuery, PreparedSqlStatement as CorePreparedSqlStatement,
22 SqlGlobalAggregateCommandCore, SqlGlobalAggregateTerminal, apply_lowered_select_shape,
23 bind_lowered_sql_delete_query_structural,
24 bind_lowered_sql_explain_global_aggregate_structural, bind_lowered_sql_query,
25 compile_sql_global_aggregate_command, lower_sql_command_from_prepared_statement,
26 lowered_sql_command_lane, prepare_sql_statement,
27 render_lowered_sql_explain_plan_or_json,
28 },
29 sql::parser::{
30 SqlExplainMode, SqlExplainStatement, SqlExplainTarget, SqlStatement, parse_sql,
31 },
32 },
33 model::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue},
35 value::Value,
36};
37
38#[derive(Clone, Debug, Eq, PartialEq)]
46pub enum SqlStatementRoute {
47 Query { entity: String },
48 Explain { entity: String },
49 Describe { entity: String },
50 ShowIndexes { entity: String },
51 ShowColumns { entity: String },
52 ShowEntities,
53}
54
55#[derive(Debug)]
61pub enum SqlDispatchResult {
62 Projection {
63 columns: Vec<String>,
64 rows: Vec<Vec<Value>>,
65 row_count: u32,
66 },
67 Explain(String),
68 Describe(EntitySchemaDescription),
69 ShowIndexes(Vec<String>),
70 ShowColumns(Vec<EntityFieldDescription>),
71 ShowEntities(Vec<String>),
72}
73
74#[derive(Clone, Debug)]
82pub struct SqlParsedStatement {
83 statement: SqlStatement,
84 route: SqlStatementRoute,
85}
86
87impl SqlParsedStatement {
88 #[must_use]
90 pub const fn route(&self) -> &SqlStatementRoute {
91 &self.route
92 }
93
94 #[must_use]
101 pub const fn is_delete_like_query_surface(&self) -> bool {
102 matches!(
103 &self.statement,
104 SqlStatement::Delete(_)
105 | SqlStatement::Explain(SqlExplainStatement {
106 statement: SqlExplainTarget::Delete(_),
107 ..
108 })
109 )
110 }
111
112 fn prepare(
114 &self,
115 expected_entity: &'static str,
116 ) -> Result<CorePreparedSqlStatement, QueryError> {
117 prepare_sql_statement(self.statement.clone(), expected_entity)
118 .map_err(QueryError::from_sql_lowering_error)
119 }
120
121 #[inline(never)]
123 pub fn lower_query_lane_for_entity(
124 &self,
125 expected_entity: &'static str,
126 primary_key_field: &str,
127 ) -> Result<LoweredSqlCommand, QueryError> {
128 let lowered = lower_sql_command_from_prepared_statement(
129 self.prepare(expected_entity)?,
130 primary_key_field,
131 )
132 .map_err(QueryError::from_sql_lowering_error)?;
133 let lane = lowered_sql_command_lane(&lowered);
134
135 match lane {
136 LoweredSqlLaneKind::Query | LoweredSqlLaneKind::Explain => Ok(lowered),
137 LoweredSqlLaneKind::Describe
138 | LoweredSqlLaneKind::ShowIndexes
139 | LoweredSqlLaneKind::ShowColumns
140 | LoweredSqlLaneKind::ShowEntities => {
141 Err(QueryError::unsupported_query_lane_dispatch())
142 }
143 }
144 }
145}
146
147#[derive(Debug)]
157struct SqlProjectionPayload {
158 columns: Vec<String>,
159 rows: Vec<Vec<Value>>,
160 row_count: u32,
161}
162
163impl SqlProjectionPayload {
164 #[must_use]
165 const fn new(columns: Vec<String>, rows: Vec<Vec<Value>>, row_count: u32) -> Self {
166 Self {
167 columns,
168 rows,
169 row_count,
170 }
171 }
172
173 #[must_use]
174 fn into_dispatch_result(self) -> SqlDispatchResult {
175 SqlDispatchResult::Projection {
176 columns: self.columns,
177 rows: self.rows,
178 row_count: self.row_count,
179 }
180 }
181}
182
183impl SqlStatementRoute {
184 #[must_use]
189 pub const fn entity(&self) -> &str {
190 match self {
191 Self::Query { entity }
192 | Self::Explain { entity }
193 | Self::Describe { entity }
194 | Self::ShowIndexes { entity }
195 | Self::ShowColumns { entity } => entity.as_str(),
196 Self::ShowEntities => "",
197 }
198 }
199
200 #[must_use]
202 pub const fn is_explain(&self) -> bool {
203 matches!(self, Self::Explain { .. })
204 }
205
206 #[must_use]
208 pub const fn is_describe(&self) -> bool {
209 matches!(self, Self::Describe { .. })
210 }
211
212 #[must_use]
214 pub const fn is_show_indexes(&self) -> bool {
215 matches!(self, Self::ShowIndexes { .. })
216 }
217
218 #[must_use]
220 pub const fn is_show_columns(&self) -> bool {
221 matches!(self, Self::ShowColumns { .. })
222 }
223
224 #[must_use]
226 pub const fn is_show_entities(&self) -> bool {
227 matches!(self, Self::ShowEntities)
228 }
229}
230
231#[derive(Clone, Copy, Debug, Eq, PartialEq)]
233enum SqlLaneKind {
234 Query,
235 Explain,
236 Describe,
237 ShowIndexes,
238 ShowColumns,
239 ShowEntities,
240}
241
242#[derive(Clone, Copy, Debug, Eq, PartialEq)]
244enum SqlSurface {
245 QueryFrom,
246 Explain,
247}
248
249const fn session_sql_lane(command: &LoweredSqlCommand) -> SqlLaneKind {
251 match lowered_sql_command_lane(command) {
252 LoweredSqlLaneKind::Query => SqlLaneKind::Query,
253 LoweredSqlLaneKind::Explain => SqlLaneKind::Explain,
254 LoweredSqlLaneKind::Describe => SqlLaneKind::Describe,
255 LoweredSqlLaneKind::ShowIndexes => SqlLaneKind::ShowIndexes,
256 LoweredSqlLaneKind::ShowColumns => SqlLaneKind::ShowColumns,
257 LoweredSqlLaneKind::ShowEntities => SqlLaneKind::ShowEntities,
258 }
259}
260
261const fn unsupported_sql_lane_message(surface: SqlSurface, lane: SqlLaneKind) -> &'static str {
263 match (surface, lane) {
264 (SqlSurface::QueryFrom, SqlLaneKind::Explain) => {
265 "query_from_sql does not accept EXPLAIN statements; use execute_sql_dispatch(...)"
266 }
267 (SqlSurface::QueryFrom, SqlLaneKind::Describe) => {
268 "query_from_sql does not accept DESCRIBE statements; use execute_sql_dispatch(...)"
269 }
270 (SqlSurface::QueryFrom, SqlLaneKind::ShowIndexes) => {
271 "query_from_sql does not accept SHOW INDEXES statements; use execute_sql_dispatch(...)"
272 }
273 (SqlSurface::QueryFrom, SqlLaneKind::ShowColumns) => {
274 "query_from_sql does not accept SHOW COLUMNS statements; use execute_sql_dispatch(...)"
275 }
276 (SqlSurface::QueryFrom, SqlLaneKind::ShowEntities) => {
277 "query_from_sql does not accept SHOW ENTITIES statements; use execute_sql_dispatch(...)"
278 }
279 (SqlSurface::QueryFrom, SqlLaneKind::Query) => {
280 "query_from_sql requires one executable SELECT or DELETE statement"
281 }
282 (SqlSurface::Explain, SqlLaneKind::Describe) => {
283 "explain_sql does not accept DESCRIBE statements; use execute_sql_dispatch(...)"
284 }
285 (SqlSurface::Explain, SqlLaneKind::ShowIndexes) => {
286 "explain_sql does not accept SHOW INDEXES statements; use execute_sql_dispatch(...)"
287 }
288 (SqlSurface::Explain, SqlLaneKind::ShowColumns) => {
289 "explain_sql does not accept SHOW COLUMNS statements; use execute_sql_dispatch(...)"
290 }
291 (SqlSurface::Explain, SqlLaneKind::ShowEntities) => {
292 "explain_sql does not accept SHOW ENTITIES statements; use execute_sql_dispatch(...)"
293 }
294 (SqlSurface::Explain, SqlLaneKind::Query | SqlLaneKind::Explain) => {
295 "explain_sql requires an EXPLAIN statement"
296 }
297 }
298}
299
300fn sql_statement_route_from_statement(statement: &SqlStatement) -> SqlStatementRoute {
302 match statement {
303 SqlStatement::Select(select) => SqlStatementRoute::Query {
304 entity: select.entity.clone(),
305 },
306 SqlStatement::Delete(delete) => SqlStatementRoute::Query {
307 entity: delete.entity.clone(),
308 },
309 SqlStatement::Explain(explain) => match &explain.statement {
310 SqlExplainTarget::Select(select) => SqlStatementRoute::Explain {
311 entity: select.entity.clone(),
312 },
313 SqlExplainTarget::Delete(delete) => SqlStatementRoute::Explain {
314 entity: delete.entity.clone(),
315 },
316 },
317 SqlStatement::Describe(describe) => SqlStatementRoute::Describe {
318 entity: describe.entity.clone(),
319 },
320 SqlStatement::ShowIndexes(show_indexes) => SqlStatementRoute::ShowIndexes {
321 entity: show_indexes.entity.clone(),
322 },
323 SqlStatement::ShowColumns(show_columns) => SqlStatementRoute::ShowColumns {
324 entity: show_columns.entity.clone(),
325 },
326 SqlStatement::ShowEntities(_) => SqlStatementRoute::ShowEntities,
327 }
328}
329
330fn resolve_sql_aggregate_target_slot_with_model(
333 model: &'static EntityModel,
334 field: &str,
335) -> Result<FieldSlot, QueryError> {
336 resolve_aggregate_target_field_slot(model, field)
337}
338
339fn resolve_sql_aggregate_target_slot<E: EntityKind>(field: &str) -> Result<FieldSlot, QueryError> {
340 resolve_sql_aggregate_target_slot_with_model(E::MODEL, field)
341}
342
343fn sql_global_aggregate_terminal_to_expr_with_model(
346 model: &'static EntityModel,
347 terminal: &SqlGlobalAggregateTerminal,
348) -> Result<AggregateExpr, QueryError> {
349 match terminal {
350 SqlGlobalAggregateTerminal::CountRows => Ok(count()),
351 SqlGlobalAggregateTerminal::CountField(field) => {
352 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
353
354 Ok(count_by(field.as_str()))
355 }
356 SqlGlobalAggregateTerminal::SumField(field) => {
357 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
358
359 Ok(sum(field.as_str()))
360 }
361 SqlGlobalAggregateTerminal::AvgField(field) => {
362 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
363
364 Ok(avg(field.as_str()))
365 }
366 SqlGlobalAggregateTerminal::MinField(field) => {
367 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
368
369 Ok(min_by(field.as_str()))
370 }
371 SqlGlobalAggregateTerminal::MaxField(field) => {
372 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
373
374 Ok(max_by(field.as_str()))
375 }
376 }
377}
378
379fn projection_label_from_aggregate(aggregate: &AggregateExpr) -> String {
381 let kind = match aggregate.kind() {
382 AggregateKind::Count => "COUNT",
383 AggregateKind::Sum => "SUM",
384 AggregateKind::Avg => "AVG",
385 AggregateKind::Exists => "EXISTS",
386 AggregateKind::First => "FIRST",
387 AggregateKind::Last => "LAST",
388 AggregateKind::Min => "MIN",
389 AggregateKind::Max => "MAX",
390 };
391 let distinct = if aggregate.is_distinct() {
392 "DISTINCT "
393 } else {
394 ""
395 };
396
397 if let Some(field) = aggregate.target_field() {
398 return format!("{kind}({distinct}{field})");
399 }
400
401 format!("{kind}({distinct}*)")
402}
403
404fn projection_label_from_expr(expr: &Expr, ordinal: usize) -> String {
406 match expr {
407 Expr::Field(field) => field.as_str().to_string(),
408 Expr::Aggregate(aggregate) => projection_label_from_aggregate(aggregate),
409 Expr::Alias { name, .. } => name.as_str().to_string(),
410 Expr::Literal(_) | Expr::Unary { .. } | Expr::Binary { .. } => {
411 format!("expr_{ordinal}")
412 }
413 }
414}
415
416fn projection_labels_from_structural_query(
418 query: &StructuralQuery,
419) -> Result<Vec<String>, QueryError> {
420 let projection = query.build_plan()?.projection_spec(query.model());
421 Ok(projection_labels_from_projection_spec(&projection))
422}
423
424fn projection_labels_from_projection_spec(
427 projection: &crate::db::query::plan::expr::ProjectionSpec,
428) -> Vec<String> {
429 let mut labels = Vec::with_capacity(projection.len());
430
431 for (ordinal, field) in projection.fields().enumerate() {
432 match field {
433 ProjectionField::Scalar {
434 expr: _,
435 alias: Some(alias),
436 } => labels.push(alias.as_str().to_string()),
437 ProjectionField::Scalar { expr, alias: None } => {
438 labels.push(projection_label_from_expr(expr, ordinal));
439 }
440 }
441 }
442
443 labels
444}
445
446fn projection_labels_from_entity_model(model: &'static EntityModel) -> Vec<String> {
448 model
449 .fields
450 .iter()
451 .map(|field| field.name.to_string())
452 .collect()
453}
454
455fn sql_projection_rows_from_kernel_rows(rows: Vec<KernelRow>) -> Vec<Vec<Value>> {
458 rows.into_iter()
459 .map(|row| {
460 row.into_slots()
461 .into_iter()
462 .map(|value| value.unwrap_or(Value::Null))
463 .collect()
464 })
465 .collect()
466}
467
468impl LoweredSqlCommand {
469 #[inline(never)]
472 pub fn explain_for_model(&self, model: &'static EntityModel) -> Result<String, QueryError> {
473 let lane = session_sql_lane(self);
476 if lane != SqlLaneKind::Explain {
477 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
478 SqlSurface::Explain,
479 lane,
480 )));
481 }
482
483 if let Some(rendered) =
486 render_lowered_sql_explain_plan_or_json(self, model, MissingRowPolicy::Ignore)
487 .map_err(QueryError::from_sql_lowering_error)?
488 {
489 return Ok(rendered);
490 }
491
492 if let Some((mode, command)) = bind_lowered_sql_explain_global_aggregate_structural(
495 self,
496 model,
497 MissingRowPolicy::Ignore,
498 ) {
499 return explain_sql_global_aggregate_structural(mode, command);
500 }
501
502 Err(QueryError::unsupported_query(
503 "shared EXPLAIN dispatch could not classify the lowered SQL command shape",
504 ))
505 }
506}
507
508#[inline(never)]
511fn explain_sql_global_aggregate_structural(
512 mode: SqlExplainMode,
513 command: SqlGlobalAggregateCommandCore,
514) -> Result<String, QueryError> {
515 let model = command.query().model();
516
517 match mode {
518 SqlExplainMode::Plan => {
519 let _ = sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
520
521 Ok(command
522 .query()
523 .build_plan()?
524 .explain_with_model(model)
525 .render_text_canonical())
526 }
527 SqlExplainMode::Execution => {
528 let aggregate =
529 sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
530 let plan = command.query().explain_aggregate_terminal(aggregate)?;
531
532 Ok(plan.execution_node_descriptor().render_text_tree())
533 }
534 SqlExplainMode::Json => {
535 let _ = sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
536
537 Ok(command
538 .query()
539 .build_plan()?
540 .explain_with_model(model)
541 .render_json_canonical())
542 }
543 }
544}
545
546impl<C: CanisterKind> DbSession<C> {
547 fn bind_sql_query_lane_from_parsed<E>(
550 parsed: &SqlParsedStatement,
551 ) -> Result<(LoweredSqlQuery, Query<E>), QueryError>
552 where
553 E: EntityKind<Canister = C>,
554 {
555 let lowered =
556 parsed.lower_query_lane_for_entity(E::MODEL.name(), E::MODEL.primary_key.name)?;
557 let lane = session_sql_lane(&lowered);
558 let Some(query) = lowered.query().cloned() else {
559 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
560 SqlSurface::QueryFrom,
561 lane,
562 )));
563 };
564 let typed = bind_lowered_sql_query::<E>(query.clone(), MissingRowPolicy::Ignore)
565 .map_err(QueryError::from_sql_lowering_error)?;
566
567 Ok((query, typed))
568 }
569
570 fn execute_structural_sql_projection(
574 &self,
575 query: StructuralQuery,
576 authority: EntityAuthority,
577 ) -> Result<SqlProjectionPayload, QueryError> {
578 let columns = projection_labels_from_structural_query(&query)?;
579 let projected = execute_sql_projection_rows_for_canister(
580 &self.db,
581 self.debug,
582 authority,
583 query.build_plan()?,
584 )
585 .map_err(QueryError::execute)?;
586 let (rows, row_count) = projected.into_parts();
587
588 Ok(SqlProjectionPayload::new(columns, rows, row_count))
589 }
590
591 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
595 where
596 E: PersistedRow<Canister = C> + EntityValue,
597 {
598 let plan = query.plan()?.into_executable();
599 let deleted = self
600 .with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
601 .map_err(QueryError::execute)?;
602 let (rows, row_count) = deleted.into_parts();
603 let rows = sql_projection_rows_from_kernel_rows(rows);
604
605 Ok(SqlProjectionPayload::new(
606 projection_labels_from_entity_model(E::MODEL),
607 rows,
608 row_count,
609 )
610 .into_dispatch_result())
611 }
612
613 fn ensure_sql_query_grouping<E>(query: &Query<E>, grouped: bool) -> Result<(), QueryError>
616 where
617 E: EntityKind,
618 {
619 match (grouped, query.has_grouping()) {
620 (true, true) | (false, false) => Ok(()),
621 (false, true) => Err(QueryError::grouped_requires_execute_grouped()),
622 (true, false) => Err(QueryError::unsupported_query(
623 "execute_sql_grouped requires grouped SQL query intent",
624 )),
625 }
626 }
627
628 #[inline(never)]
631 fn execute_lowered_sql_dispatch_select_core(
632 &self,
633 select: &LoweredSelectShape,
634 authority: EntityAuthority,
635 ) -> Result<SqlDispatchResult, QueryError> {
636 let structural = apply_lowered_select_shape(
637 StructuralQuery::new(authority.model(), MissingRowPolicy::Ignore),
638 select.clone(),
639 )
640 .map_err(QueryError::from_sql_lowering_error)?;
641
642 self.execute_structural_sql_projection(structural, authority)
643 .map(SqlProjectionPayload::into_dispatch_result)
644 }
645
646 pub fn parse_sql_statement(&self, sql: &str) -> Result<SqlParsedStatement, QueryError> {
650 let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
651 let route = sql_statement_route_from_statement(&statement);
652
653 Ok(SqlParsedStatement { statement, route })
654 }
655
656 pub fn sql_statement_route(&self, sql: &str) -> Result<SqlStatementRoute, QueryError> {
661 let parsed = self.parse_sql_statement(sql)?;
662
663 Ok(parsed.route().clone())
664 }
665
666 pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
671 where
672 E: EntityKind<Canister = C>,
673 {
674 let parsed = self.parse_sql_statement(sql)?;
675 let (_, query) = Self::bind_sql_query_lane_from_parsed::<E>(&parsed)?;
676
677 Ok(query)
678 }
679
680 pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
682 where
683 E: PersistedRow<Canister = C> + EntityValue,
684 {
685 let query = self.query_from_sql::<E>(sql)?;
686 Self::ensure_sql_query_grouping(&query, false)?;
687
688 self.execute_query(&query)
689 }
690
691 pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
696 where
697 E: PersistedRow<Canister = C> + EntityValue,
698 {
699 let command = compile_sql_global_aggregate_command::<E>(sql, MissingRowPolicy::Ignore)
700 .map_err(QueryError::from_sql_lowering_error)?;
701
702 match command.terminal() {
703 SqlGlobalAggregateTerminal::CountRows => self
704 .execute_load_query_with(command.query(), |load, plan| {
705 load.execute_scalar_terminal_request(
706 plan,
707 crate::db::executor::ScalarTerminalBoundaryRequest::Count,
708 )?
709 .into_count()
710 })
711 .map(|count| Value::Uint(u64::from(count))),
712 SqlGlobalAggregateTerminal::CountField(field) => {
713 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
714 self.execute_load_query_with(command.query(), |load, plan| {
715 load.execute_scalar_projection_boundary(
716 plan,
717 target_slot,
718 ScalarProjectionBoundaryRequest::Values,
719 )?
720 .into_values()
721 })
722 .map(|values| {
723 let count = values
724 .into_iter()
725 .filter(|value| !matches!(value, Value::Null))
726 .count();
727 Value::Uint(u64::try_from(count).unwrap_or(u64::MAX))
728 })
729 }
730 SqlGlobalAggregateTerminal::SumField(field) => {
731 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
732 self.execute_load_query_with(command.query(), |load, plan| {
733 load.execute_numeric_field_boundary(
734 plan,
735 target_slot,
736 ScalarNumericFieldBoundaryRequest::Sum,
737 )
738 })
739 .map(|value| value.map_or(Value::Null, Value::Decimal))
740 }
741 SqlGlobalAggregateTerminal::AvgField(field) => {
742 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
743 self.execute_load_query_with(command.query(), |load, plan| {
744 load.execute_numeric_field_boundary(
745 plan,
746 target_slot,
747 ScalarNumericFieldBoundaryRequest::Avg,
748 )
749 })
750 .map(|value| value.map_or(Value::Null, Value::Decimal))
751 }
752 SqlGlobalAggregateTerminal::MinField(field) => {
753 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
754 let min_id = self.execute_load_query_with(command.query(), |load, plan| {
755 load.execute_scalar_terminal_request(
756 plan,
757 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
758 kind: AggregateKind::Min,
759 target_field: target_slot,
760 },
761 )?
762 .into_id()
763 })?;
764
765 match min_id {
766 Some(id) => self
767 .load::<E>()
768 .by_id(id)
769 .first_value_by(field)
770 .map(|value| value.unwrap_or(Value::Null)),
771 None => Ok(Value::Null),
772 }
773 }
774 SqlGlobalAggregateTerminal::MaxField(field) => {
775 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
776 let max_id = self.execute_load_query_with(command.query(), |load, plan| {
777 load.execute_scalar_terminal_request(
778 plan,
779 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
780 kind: AggregateKind::Max,
781 target_field: target_slot,
782 },
783 )?
784 .into_id()
785 })?;
786
787 match max_id {
788 Some(id) => self
789 .load::<E>()
790 .by_id(id)
791 .first_value_by(field)
792 .map(|value| value.unwrap_or(Value::Null)),
793 None => Ok(Value::Null),
794 }
795 }
796 }
797 }
798
799 pub fn execute_sql_grouped<E>(
801 &self,
802 sql: &str,
803 cursor_token: Option<&str>,
804 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
805 where
806 E: PersistedRow<Canister = C> + EntityValue,
807 {
808 let query = self.query_from_sql::<E>(sql)?;
809 Self::ensure_sql_query_grouping(&query, true)?;
810
811 self.execute_grouped(&query, cursor_token)
812 }
813
814 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
816 where
817 E: PersistedRow<Canister = C> + EntityValue,
818 {
819 let parsed = self.parse_sql_statement(sql)?;
820
821 self.execute_sql_dispatch_parsed::<E>(&parsed)
822 }
823
824 pub fn execute_sql_dispatch_parsed<E>(
826 &self,
827 parsed: &SqlParsedStatement,
828 ) -> Result<SqlDispatchResult, QueryError>
829 where
830 E: PersistedRow<Canister = C> + EntityValue,
831 {
832 match parsed.route() {
833 SqlStatementRoute::Query { .. } => {
834 let (query, typed_query) = Self::bind_sql_query_lane_from_parsed::<E>(parsed)?;
835
836 Self::ensure_sql_query_grouping(&typed_query, false)?;
837
838 match query {
839 LoweredSqlQuery::Select(select) => self
840 .execute_lowered_sql_dispatch_select_core(
841 &select,
842 EntityAuthority::for_type::<E>(),
843 ),
844 LoweredSqlQuery::Delete(_) => self.execute_typed_sql_delete(&typed_query),
845 }
846 }
847 SqlStatementRoute::Explain { .. } => {
848 let lowered = lower_sql_command_from_prepared_statement(
849 parsed.prepare(E::MODEL.name())?,
850 E::MODEL.primary_key.name,
851 )
852 .map_err(QueryError::from_sql_lowering_error)?;
853
854 lowered
855 .explain_for_model(E::MODEL)
856 .map(SqlDispatchResult::Explain)
857 }
858 SqlStatementRoute::Describe { .. } => {
859 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
860 }
861 SqlStatementRoute::ShowIndexes { .. } => {
862 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
863 }
864 SqlStatementRoute::ShowColumns { .. } => {
865 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
866 }
867 SqlStatementRoute::ShowEntities => {
868 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
869 }
870 }
871 }
872
873 fn execute_lowered_sql_dispatch_delete_core(
876 &self,
877 delete: &LoweredBaseQueryShape,
878 authority: EntityAuthority,
879 ) -> Result<SqlDispatchResult, QueryError> {
880 let structural = bind_lowered_sql_delete_query_structural(
881 authority.model(),
882 delete.clone(),
883 MissingRowPolicy::Ignore,
884 );
885 let deleted = execute_sql_delete_projection_for_canister(
886 &self.db,
887 authority,
888 structural.build_plan()?,
889 )
890 .map_err(QueryError::execute)?;
891 let (rows, row_count) = deleted.into_parts();
892 let rows = sql_projection_rows_from_kernel_rows(rows);
893
894 Ok(SqlProjectionPayload::new(
895 projection_labels_from_entity_model(authority.model()),
896 rows,
897 row_count,
898 )
899 .into_dispatch_result())
900 }
901
902 #[doc(hidden)]
904 pub fn execute_lowered_sql_dispatch_query_for_authority(
905 &self,
906 lowered: &LoweredSqlCommand,
907 authority: EntityAuthority,
908 ) -> Result<SqlDispatchResult, QueryError> {
909 self.execute_lowered_sql_dispatch_query_core(lowered, authority)
910 }
911
912 #[doc(hidden)]
918 pub fn execute_lowered_sql_dispatch_select_for_authority(
919 &self,
920 lowered: &LoweredSqlCommand,
921 authority: EntityAuthority,
922 ) -> Result<SqlDispatchResult, QueryError> {
923 let Some(query) = lowered.query() else {
924 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
925 SqlSurface::QueryFrom,
926 session_sql_lane(lowered),
927 )));
928 };
929
930 match query {
931 LoweredSqlQuery::Select(select) => {
932 self.execute_lowered_sql_dispatch_select_core(select, authority)
933 }
934 LoweredSqlQuery::Delete(_) => Err(QueryError::unsupported_query(
935 "generated SQL query dispatch requires one lowered SELECT shape",
936 )),
937 }
938 }
939
940 fn execute_lowered_sql_dispatch_query_core(
943 &self,
944 lowered: &LoweredSqlCommand,
945 authority: EntityAuthority,
946 ) -> Result<SqlDispatchResult, QueryError> {
947 let Some(query) = lowered.query() else {
948 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
949 SqlSurface::QueryFrom,
950 session_sql_lane(lowered),
951 )));
952 };
953
954 match query {
955 LoweredSqlQuery::Select(select) => {
956 self.execute_lowered_sql_dispatch_select_core(select, authority)
957 }
958 LoweredSqlQuery::Delete(delete) => {
959 self.execute_lowered_sql_dispatch_delete_core(delete, authority)
960 }
961 }
962 }
963}