1use crate::{
2 db::{
3 DbSession, EntityFieldDescription, EntityResponse, EntitySchemaDescription,
4 MissingRowPolicy, PagedGroupedExecutionWithTrace, PersistedRow, Query, QueryError,
5 executor::{
6 EntityAuthority, KernelRow, ScalarNumericFieldBoundaryRequest,
7 ScalarProjectionBoundaryRequest, execute_sql_delete_projection_for_canister,
8 execute_sql_projection_rows_for_canister,
9 },
10 query::{
11 builder::aggregate::{AggregateExpr, avg, count, count_by, max_by, min_by, sum},
12 intent::StructuralQuery,
13 plan::{
14 AggregateKind, FieldSlot,
15 expr::{Expr, ProjectionField},
16 resolve_aggregate_target_field_slot,
17 },
18 },
19 sql::lowering::{
20 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlCommand, LoweredSqlLaneKind,
21 LoweredSqlQuery, PreparedSqlStatement as CorePreparedSqlStatement,
22 SqlGlobalAggregateCommandCore, SqlGlobalAggregateTerminal, apply_lowered_select_shape,
23 bind_lowered_sql_delete_query_structural,
24 bind_lowered_sql_explain_global_aggregate_structural, bind_lowered_sql_query,
25 compile_sql_global_aggregate_command, lower_sql_command_from_prepared_statement,
26 lowered_sql_command_lane, prepare_sql_statement,
27 render_lowered_sql_explain_plan_or_json,
28 },
29 sql::parser::{
30 SqlExplainMode, SqlExplainStatement, SqlExplainTarget, SqlStatement, parse_sql,
31 },
32 },
33 model::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue},
35 value::Value,
36};
37
38#[cfg_attr(
39 doc,
40 doc = "SqlStatementRoute\n\nCanonical SQL statement routing metadata derived from reduced SQL parser output.\nCarries surface kind (`Query` / `Explain` / `Describe` / `ShowIndexes` / `ShowColumns` / `ShowEntities`) and canonical parsed entity identifier."
41)]
42#[derive(Clone, Debug, Eq, PartialEq)]
43pub enum SqlStatementRoute {
44 Query { entity: String },
45 Explain { entity: String },
46 Describe { entity: String },
47 ShowIndexes { entity: String },
48 ShowColumns { entity: String },
49 ShowEntities,
50}
51
52#[cfg_attr(
53 doc,
54 doc = "SqlDispatchResult\n\nUnified SQL dispatch payload returned by shared SQL lane execution."
55)]
56#[derive(Debug)]
57pub enum SqlDispatchResult {
58 Projection {
59 columns: Vec<String>,
60 rows: Vec<Vec<Value>>,
61 row_count: u32,
62 },
63 Explain(String),
64 Describe(EntitySchemaDescription),
65 ShowIndexes(Vec<String>),
66 ShowColumns(Vec<EntityFieldDescription>),
67 ShowEntities(Vec<String>),
68}
69
70#[cfg_attr(
71 doc,
72 doc = "SqlParsedStatement\n\nOpaque parsed SQL statement envelope with stable route metadata.\nThis allows callers to parse once and reuse parsed authority across route classification and typed dispatch lowering."
73)]
74#[derive(Clone, Debug)]
75pub struct SqlParsedStatement {
76 statement: SqlStatement,
77 route: SqlStatementRoute,
78}
79
80impl SqlParsedStatement {
81 #[must_use]
83 pub const fn route(&self) -> &SqlStatementRoute {
84 &self.route
85 }
86
87 #[must_use]
94 pub const fn is_delete_like_query_surface(&self) -> bool {
95 matches!(
96 &self.statement,
97 SqlStatement::Delete(_)
98 | SqlStatement::Explain(SqlExplainStatement {
99 statement: SqlExplainTarget::Delete(_),
100 ..
101 })
102 )
103 }
104
105 fn prepare(
107 &self,
108 expected_entity: &'static str,
109 ) -> Result<CorePreparedSqlStatement, QueryError> {
110 prepare_sql_statement(self.statement.clone(), expected_entity)
111 .map_err(QueryError::from_sql_lowering_error)
112 }
113
114 #[inline(never)]
116 pub fn lower_query_lane_for_entity(
117 &self,
118 expected_entity: &'static str,
119 primary_key_field: &str,
120 ) -> Result<LoweredSqlCommand, QueryError> {
121 let lowered = lower_sql_command_from_prepared_statement(
122 self.prepare(expected_entity)?,
123 primary_key_field,
124 )
125 .map_err(QueryError::from_sql_lowering_error)?;
126 let lane = lowered_sql_command_lane(&lowered);
127
128 match lane {
129 LoweredSqlLaneKind::Query | LoweredSqlLaneKind::Explain => Ok(lowered),
130 LoweredSqlLaneKind::Describe
131 | LoweredSqlLaneKind::ShowIndexes
132 | LoweredSqlLaneKind::ShowColumns
133 | LoweredSqlLaneKind::ShowEntities => {
134 Err(QueryError::unsupported_query_lane_dispatch())
135 }
136 }
137 }
138}
139
140#[cfg_attr(
141 doc,
142 doc = "SqlProjectionPayload\n\nGeneric-free row-oriented SQL projection payload carried across the shared SQL dispatch surface.\nKeeps SQL `SELECT` results structural so query-lane dispatch does not rebuild typed response rows before rendering values."
143)]
144#[derive(Debug)]
145struct SqlProjectionPayload {
146 columns: Vec<String>,
147 rows: Vec<Vec<Value>>,
148 row_count: u32,
149}
150
151impl SqlProjectionPayload {
152 #[must_use]
153 const fn new(columns: Vec<String>, rows: Vec<Vec<Value>>, row_count: u32) -> Self {
154 Self {
155 columns,
156 rows,
157 row_count,
158 }
159 }
160
161 #[must_use]
162 fn into_dispatch_result(self) -> SqlDispatchResult {
163 SqlDispatchResult::Projection {
164 columns: self.columns,
165 rows: self.rows,
166 row_count: self.row_count,
167 }
168 }
169}
170
171impl SqlStatementRoute {
172 #[must_use]
177 pub const fn entity(&self) -> &str {
178 match self {
179 Self::Query { entity }
180 | Self::Explain { entity }
181 | Self::Describe { entity }
182 | Self::ShowIndexes { entity }
183 | Self::ShowColumns { entity } => entity.as_str(),
184 Self::ShowEntities => "",
185 }
186 }
187
188 #[must_use]
190 pub const fn is_explain(&self) -> bool {
191 matches!(self, Self::Explain { .. })
192 }
193
194 #[must_use]
196 pub const fn is_describe(&self) -> bool {
197 matches!(self, Self::Describe { .. })
198 }
199
200 #[must_use]
202 pub const fn is_show_indexes(&self) -> bool {
203 matches!(self, Self::ShowIndexes { .. })
204 }
205
206 #[must_use]
208 pub const fn is_show_columns(&self) -> bool {
209 matches!(self, Self::ShowColumns { .. })
210 }
211
212 #[must_use]
214 pub const fn is_show_entities(&self) -> bool {
215 matches!(self, Self::ShowEntities)
216 }
217}
218
219#[derive(Clone, Copy, Debug, Eq, PartialEq)]
221enum SqlLaneKind {
222 Query,
223 Explain,
224 Describe,
225 ShowIndexes,
226 ShowColumns,
227 ShowEntities,
228}
229
230#[derive(Clone, Copy, Debug, Eq, PartialEq)]
232enum SqlSurface {
233 QueryFrom,
234 Explain,
235}
236
237const fn session_sql_lane(command: &LoweredSqlCommand) -> SqlLaneKind {
239 match lowered_sql_command_lane(command) {
240 LoweredSqlLaneKind::Query => SqlLaneKind::Query,
241 LoweredSqlLaneKind::Explain => SqlLaneKind::Explain,
242 LoweredSqlLaneKind::Describe => SqlLaneKind::Describe,
243 LoweredSqlLaneKind::ShowIndexes => SqlLaneKind::ShowIndexes,
244 LoweredSqlLaneKind::ShowColumns => SqlLaneKind::ShowColumns,
245 LoweredSqlLaneKind::ShowEntities => SqlLaneKind::ShowEntities,
246 }
247}
248
249const fn unsupported_sql_lane_message(surface: SqlSurface, lane: SqlLaneKind) -> &'static str {
251 match (surface, lane) {
252 (SqlSurface::QueryFrom, SqlLaneKind::Explain) => {
253 "query_from_sql does not accept EXPLAIN; use execute_sql_dispatch(...)"
254 }
255 (SqlSurface::QueryFrom, SqlLaneKind::Describe) => {
256 "query_from_sql does not accept DESCRIBE; use execute_sql_dispatch(...)"
257 }
258 (SqlSurface::QueryFrom, SqlLaneKind::ShowIndexes) => {
259 "query_from_sql does not accept SHOW INDEXES; use execute_sql_dispatch(...)"
260 }
261 (SqlSurface::QueryFrom, SqlLaneKind::ShowColumns) => {
262 "query_from_sql does not accept SHOW COLUMNS; use execute_sql_dispatch(...)"
263 }
264 (SqlSurface::QueryFrom, SqlLaneKind::ShowEntities) => {
265 "query_from_sql does not accept SHOW ENTITIES; use execute_sql_dispatch(...)"
266 }
267 (SqlSurface::QueryFrom, SqlLaneKind::Query) => {
268 "query_from_sql only accepts SELECT or DELETE"
269 }
270 (SqlSurface::Explain, SqlLaneKind::Describe) => {
271 "explain_sql does not accept DESCRIBE; use execute_sql_dispatch(...)"
272 }
273 (SqlSurface::Explain, SqlLaneKind::ShowIndexes) => {
274 "explain_sql does not accept SHOW INDEXES; use execute_sql_dispatch(...)"
275 }
276 (SqlSurface::Explain, SqlLaneKind::ShowColumns) => {
277 "explain_sql does not accept SHOW COLUMNS; use execute_sql_dispatch(...)"
278 }
279 (SqlSurface::Explain, SqlLaneKind::ShowEntities) => {
280 "explain_sql does not accept SHOW ENTITIES; use execute_sql_dispatch(...)"
281 }
282 (SqlSurface::Explain, SqlLaneKind::Query | SqlLaneKind::Explain) => {
283 "explain_sql requires EXPLAIN"
284 }
285 }
286}
287
288fn sql_statement_route_from_statement(statement: &SqlStatement) -> SqlStatementRoute {
290 match statement {
291 SqlStatement::Select(select) => SqlStatementRoute::Query {
292 entity: select.entity.clone(),
293 },
294 SqlStatement::Delete(delete) => SqlStatementRoute::Query {
295 entity: delete.entity.clone(),
296 },
297 SqlStatement::Explain(explain) => match &explain.statement {
298 SqlExplainTarget::Select(select) => SqlStatementRoute::Explain {
299 entity: select.entity.clone(),
300 },
301 SqlExplainTarget::Delete(delete) => SqlStatementRoute::Explain {
302 entity: delete.entity.clone(),
303 },
304 },
305 SqlStatement::Describe(describe) => SqlStatementRoute::Describe {
306 entity: describe.entity.clone(),
307 },
308 SqlStatement::ShowIndexes(show_indexes) => SqlStatementRoute::ShowIndexes {
309 entity: show_indexes.entity.clone(),
310 },
311 SqlStatement::ShowColumns(show_columns) => SqlStatementRoute::ShowColumns {
312 entity: show_columns.entity.clone(),
313 },
314 SqlStatement::ShowEntities(_) => SqlStatementRoute::ShowEntities,
315 }
316}
317
318fn resolve_sql_aggregate_target_slot_with_model(
321 model: &'static EntityModel,
322 field: &str,
323) -> Result<FieldSlot, QueryError> {
324 resolve_aggregate_target_field_slot(model, field)
325}
326
327fn resolve_sql_aggregate_target_slot<E: EntityKind>(field: &str) -> Result<FieldSlot, QueryError> {
328 resolve_sql_aggregate_target_slot_with_model(E::MODEL, field)
329}
330
331fn sql_global_aggregate_terminal_to_expr_with_model(
334 model: &'static EntityModel,
335 terminal: &SqlGlobalAggregateTerminal,
336) -> Result<AggregateExpr, QueryError> {
337 match terminal {
338 SqlGlobalAggregateTerminal::CountRows => Ok(count()),
339 SqlGlobalAggregateTerminal::CountField(field) => {
340 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
341
342 Ok(count_by(field.as_str()))
343 }
344 SqlGlobalAggregateTerminal::SumField(field) => {
345 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
346
347 Ok(sum(field.as_str()))
348 }
349 SqlGlobalAggregateTerminal::AvgField(field) => {
350 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
351
352 Ok(avg(field.as_str()))
353 }
354 SqlGlobalAggregateTerminal::MinField(field) => {
355 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
356
357 Ok(min_by(field.as_str()))
358 }
359 SqlGlobalAggregateTerminal::MaxField(field) => {
360 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
361
362 Ok(max_by(field.as_str()))
363 }
364 }
365}
366
367fn projection_label_from_aggregate(aggregate: &AggregateExpr) -> String {
369 let kind = match aggregate.kind() {
370 AggregateKind::Count => "COUNT",
371 AggregateKind::Sum => "SUM",
372 AggregateKind::Avg => "AVG",
373 AggregateKind::Exists => "EXISTS",
374 AggregateKind::First => "FIRST",
375 AggregateKind::Last => "LAST",
376 AggregateKind::Min => "MIN",
377 AggregateKind::Max => "MAX",
378 };
379 let distinct = if aggregate.is_distinct() {
380 "DISTINCT "
381 } else {
382 ""
383 };
384
385 if let Some(field) = aggregate.target_field() {
386 return format!("{kind}({distinct}{field})");
387 }
388
389 format!("{kind}({distinct}*)")
390}
391
392fn projection_label_from_expr(expr: &Expr, ordinal: usize) -> String {
394 match expr {
395 Expr::Field(field) => field.as_str().to_string(),
396 Expr::Aggregate(aggregate) => projection_label_from_aggregate(aggregate),
397 Expr::Alias { name, .. } => name.as_str().to_string(),
398 Expr::Literal(_) | Expr::Unary { .. } | Expr::Binary { .. } => {
399 format!("expr_{ordinal}")
400 }
401 }
402}
403
404fn projection_labels_from_structural_query(
406 query: &StructuralQuery,
407) -> Result<Vec<String>, QueryError> {
408 let projection = query.build_plan()?.projection_spec(query.model());
409 Ok(projection_labels_from_projection_spec(&projection))
410}
411
412fn projection_labels_from_projection_spec(
415 projection: &crate::db::query::plan::expr::ProjectionSpec,
416) -> Vec<String> {
417 let mut labels = Vec::with_capacity(projection.len());
418
419 for (ordinal, field) in projection.fields().enumerate() {
420 match field {
421 ProjectionField::Scalar {
422 expr: _,
423 alias: Some(alias),
424 } => labels.push(alias.as_str().to_string()),
425 ProjectionField::Scalar { expr, alias: None } => {
426 labels.push(projection_label_from_expr(expr, ordinal));
427 }
428 }
429 }
430
431 labels
432}
433
434fn projection_labels_from_entity_model(model: &'static EntityModel) -> Vec<String> {
436 model
437 .fields
438 .iter()
439 .map(|field| field.name.to_string())
440 .collect()
441}
442
443fn sql_projection_rows_from_kernel_rows(rows: Vec<KernelRow>) -> Vec<Vec<Value>> {
446 rows.into_iter()
447 .map(|row| {
448 row.into_slots()
449 .into_iter()
450 .map(|value| value.unwrap_or(Value::Null))
451 .collect()
452 })
453 .collect()
454}
455
456impl LoweredSqlCommand {
457 #[inline(never)]
460 pub fn explain_for_model(&self, model: &'static EntityModel) -> Result<String, QueryError> {
461 let lane = session_sql_lane(self);
464 if lane != SqlLaneKind::Explain {
465 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
466 SqlSurface::Explain,
467 lane,
468 )));
469 }
470
471 if let Some(rendered) =
474 render_lowered_sql_explain_plan_or_json(self, model, MissingRowPolicy::Ignore)
475 .map_err(QueryError::from_sql_lowering_error)?
476 {
477 return Ok(rendered);
478 }
479
480 if let Some((mode, command)) = bind_lowered_sql_explain_global_aggregate_structural(
483 self,
484 model,
485 MissingRowPolicy::Ignore,
486 ) {
487 return explain_sql_global_aggregate_structural(mode, command);
488 }
489
490 Err(QueryError::unsupported_query(
491 "shared EXPLAIN dispatch could not classify the lowered SQL command shape",
492 ))
493 }
494}
495
496#[inline(never)]
499fn explain_sql_global_aggregate_structural(
500 mode: SqlExplainMode,
501 command: SqlGlobalAggregateCommandCore,
502) -> Result<String, QueryError> {
503 let model = command.query().model();
504
505 match mode {
506 SqlExplainMode::Plan => {
507 let _ = sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
508
509 Ok(command
510 .query()
511 .build_plan()?
512 .explain_with_model(model)
513 .render_text_canonical())
514 }
515 SqlExplainMode::Execution => {
516 let aggregate =
517 sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
518 let plan = command.query().explain_aggregate_terminal(aggregate)?;
519
520 Ok(plan.execution_node_descriptor().render_text_tree())
521 }
522 SqlExplainMode::Json => {
523 let _ = sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
524
525 Ok(command
526 .query()
527 .build_plan()?
528 .explain_with_model(model)
529 .render_json_canonical())
530 }
531 }
532}
533
534impl<C: CanisterKind> DbSession<C> {
535 fn bind_sql_query_lane_from_parsed<E>(
538 parsed: &SqlParsedStatement,
539 ) -> Result<(LoweredSqlQuery, Query<E>), QueryError>
540 where
541 E: EntityKind<Canister = C>,
542 {
543 let lowered =
544 parsed.lower_query_lane_for_entity(E::MODEL.name(), E::MODEL.primary_key.name)?;
545 let lane = session_sql_lane(&lowered);
546 let Some(query) = lowered.query().cloned() else {
547 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
548 SqlSurface::QueryFrom,
549 lane,
550 )));
551 };
552 let typed = bind_lowered_sql_query::<E>(query.clone(), MissingRowPolicy::Ignore)
553 .map_err(QueryError::from_sql_lowering_error)?;
554
555 Ok((query, typed))
556 }
557
558 fn execute_structural_sql_projection(
562 &self,
563 query: StructuralQuery,
564 authority: EntityAuthority,
565 ) -> Result<SqlProjectionPayload, QueryError> {
566 let columns = projection_labels_from_structural_query(&query)?;
567 let projected = execute_sql_projection_rows_for_canister(
568 &self.db,
569 self.debug,
570 authority,
571 query.build_plan()?,
572 )
573 .map_err(QueryError::execute)?;
574 let (rows, row_count) = projected.into_parts();
575
576 Ok(SqlProjectionPayload::new(columns, rows, row_count))
577 }
578
579 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
583 where
584 E: PersistedRow<Canister = C> + EntityValue,
585 {
586 let plan = query.plan()?.into_executable();
587 let deleted = self
588 .with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
589 .map_err(QueryError::execute)?;
590 let (rows, row_count) = deleted.into_parts();
591 let rows = sql_projection_rows_from_kernel_rows(rows);
592
593 Ok(SqlProjectionPayload::new(
594 projection_labels_from_entity_model(E::MODEL),
595 rows,
596 row_count,
597 )
598 .into_dispatch_result())
599 }
600
601 fn ensure_sql_query_grouping<E>(query: &Query<E>, grouped: bool) -> Result<(), QueryError>
604 where
605 E: EntityKind,
606 {
607 match (grouped, query.has_grouping()) {
608 (true, true) | (false, false) => Ok(()),
609 (false, true) => Err(QueryError::grouped_requires_execute_grouped()),
610 (true, false) => Err(QueryError::unsupported_query(
611 "execute_sql_grouped requires grouped SQL query intent",
612 )),
613 }
614 }
615
616 #[inline(never)]
619 fn execute_lowered_sql_dispatch_select_core(
620 &self,
621 select: &LoweredSelectShape,
622 authority: EntityAuthority,
623 ) -> Result<SqlDispatchResult, QueryError> {
624 let structural = apply_lowered_select_shape(
625 StructuralQuery::new(authority.model(), MissingRowPolicy::Ignore),
626 select.clone(),
627 )
628 .map_err(QueryError::from_sql_lowering_error)?;
629
630 self.execute_structural_sql_projection(structural, authority)
631 .map(SqlProjectionPayload::into_dispatch_result)
632 }
633
634 pub fn parse_sql_statement(&self, sql: &str) -> Result<SqlParsedStatement, QueryError> {
638 let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
639 let route = sql_statement_route_from_statement(&statement);
640
641 Ok(SqlParsedStatement { statement, route })
642 }
643
644 pub fn sql_statement_route(&self, sql: &str) -> Result<SqlStatementRoute, QueryError> {
649 let parsed = self.parse_sql_statement(sql)?;
650
651 Ok(parsed.route().clone())
652 }
653
654 pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
659 where
660 E: EntityKind<Canister = C>,
661 {
662 let parsed = self.parse_sql_statement(sql)?;
663 let (_, query) = Self::bind_sql_query_lane_from_parsed::<E>(&parsed)?;
664
665 Ok(query)
666 }
667
668 pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
670 where
671 E: PersistedRow<Canister = C> + EntityValue,
672 {
673 let query = self.query_from_sql::<E>(sql)?;
674 Self::ensure_sql_query_grouping(&query, false)?;
675
676 self.execute_query(&query)
677 }
678
679 pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
684 where
685 E: PersistedRow<Canister = C> + EntityValue,
686 {
687 let command = compile_sql_global_aggregate_command::<E>(sql, MissingRowPolicy::Ignore)
688 .map_err(QueryError::from_sql_lowering_error)?;
689
690 match command.terminal() {
691 SqlGlobalAggregateTerminal::CountRows => self
692 .execute_load_query_with(command.query(), |load, plan| {
693 load.execute_scalar_terminal_request(
694 plan,
695 crate::db::executor::ScalarTerminalBoundaryRequest::Count,
696 )?
697 .into_count()
698 })
699 .map(|count| Value::Uint(u64::from(count))),
700 SqlGlobalAggregateTerminal::CountField(field) => {
701 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
702 self.execute_load_query_with(command.query(), |load, plan| {
703 load.execute_scalar_projection_boundary(
704 plan,
705 target_slot,
706 ScalarProjectionBoundaryRequest::Values,
707 )?
708 .into_values()
709 })
710 .map(|values| {
711 let count = values
712 .into_iter()
713 .filter(|value| !matches!(value, Value::Null))
714 .count();
715 Value::Uint(u64::try_from(count).unwrap_or(u64::MAX))
716 })
717 }
718 SqlGlobalAggregateTerminal::SumField(field) => {
719 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
720 self.execute_load_query_with(command.query(), |load, plan| {
721 load.execute_numeric_field_boundary(
722 plan,
723 target_slot,
724 ScalarNumericFieldBoundaryRequest::Sum,
725 )
726 })
727 .map(|value| value.map_or(Value::Null, Value::Decimal))
728 }
729 SqlGlobalAggregateTerminal::AvgField(field) => {
730 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
731 self.execute_load_query_with(command.query(), |load, plan| {
732 load.execute_numeric_field_boundary(
733 plan,
734 target_slot,
735 ScalarNumericFieldBoundaryRequest::Avg,
736 )
737 })
738 .map(|value| value.map_or(Value::Null, Value::Decimal))
739 }
740 SqlGlobalAggregateTerminal::MinField(field) => {
741 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
742 let min_id = self.execute_load_query_with(command.query(), |load, plan| {
743 load.execute_scalar_terminal_request(
744 plan,
745 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
746 kind: AggregateKind::Min,
747 target_field: target_slot,
748 },
749 )?
750 .into_id()
751 })?;
752
753 match min_id {
754 Some(id) => self
755 .load::<E>()
756 .by_id(id)
757 .first_value_by(field)
758 .map(|value| value.unwrap_or(Value::Null)),
759 None => Ok(Value::Null),
760 }
761 }
762 SqlGlobalAggregateTerminal::MaxField(field) => {
763 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
764 let max_id = self.execute_load_query_with(command.query(), |load, plan| {
765 load.execute_scalar_terminal_request(
766 plan,
767 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
768 kind: AggregateKind::Max,
769 target_field: target_slot,
770 },
771 )?
772 .into_id()
773 })?;
774
775 match max_id {
776 Some(id) => self
777 .load::<E>()
778 .by_id(id)
779 .first_value_by(field)
780 .map(|value| value.unwrap_or(Value::Null)),
781 None => Ok(Value::Null),
782 }
783 }
784 }
785 }
786
787 pub fn execute_sql_grouped<E>(
789 &self,
790 sql: &str,
791 cursor_token: Option<&str>,
792 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
793 where
794 E: PersistedRow<Canister = C> + EntityValue,
795 {
796 let query = self.query_from_sql::<E>(sql)?;
797 Self::ensure_sql_query_grouping(&query, true)?;
798
799 self.execute_grouped(&query, cursor_token)
800 }
801
802 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
804 where
805 E: PersistedRow<Canister = C> + EntityValue,
806 {
807 let parsed = self.parse_sql_statement(sql)?;
808
809 self.execute_sql_dispatch_parsed::<E>(&parsed)
810 }
811
812 pub fn execute_sql_dispatch_parsed<E>(
814 &self,
815 parsed: &SqlParsedStatement,
816 ) -> Result<SqlDispatchResult, QueryError>
817 where
818 E: PersistedRow<Canister = C> + EntityValue,
819 {
820 match parsed.route() {
821 SqlStatementRoute::Query { .. } => {
822 let (query, typed_query) = Self::bind_sql_query_lane_from_parsed::<E>(parsed)?;
823
824 Self::ensure_sql_query_grouping(&typed_query, false)?;
825
826 match query {
827 LoweredSqlQuery::Select(select) => self
828 .execute_lowered_sql_dispatch_select_core(
829 &select,
830 EntityAuthority::for_type::<E>(),
831 ),
832 LoweredSqlQuery::Delete(_) => self.execute_typed_sql_delete(&typed_query),
833 }
834 }
835 SqlStatementRoute::Explain { .. } => {
836 let lowered = lower_sql_command_from_prepared_statement(
837 parsed.prepare(E::MODEL.name())?,
838 E::MODEL.primary_key.name,
839 )
840 .map_err(QueryError::from_sql_lowering_error)?;
841
842 lowered
843 .explain_for_model(E::MODEL)
844 .map(SqlDispatchResult::Explain)
845 }
846 SqlStatementRoute::Describe { .. } => {
847 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
848 }
849 SqlStatementRoute::ShowIndexes { .. } => {
850 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
851 }
852 SqlStatementRoute::ShowColumns { .. } => {
853 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
854 }
855 SqlStatementRoute::ShowEntities => {
856 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
857 }
858 }
859 }
860
861 fn execute_lowered_sql_dispatch_delete_core(
864 &self,
865 delete: &LoweredBaseQueryShape,
866 authority: EntityAuthority,
867 ) -> Result<SqlDispatchResult, QueryError> {
868 let structural = bind_lowered_sql_delete_query_structural(
869 authority.model(),
870 delete.clone(),
871 MissingRowPolicy::Ignore,
872 );
873 let deleted = execute_sql_delete_projection_for_canister(
874 &self.db,
875 authority,
876 structural.build_plan()?,
877 )
878 .map_err(QueryError::execute)?;
879 let (rows, row_count) = deleted.into_parts();
880 let rows = sql_projection_rows_from_kernel_rows(rows);
881
882 Ok(SqlProjectionPayload::new(
883 projection_labels_from_entity_model(authority.model()),
884 rows,
885 row_count,
886 )
887 .into_dispatch_result())
888 }
889
890 #[doc(hidden)]
892 pub fn execute_lowered_sql_dispatch_query_for_authority(
893 &self,
894 lowered: &LoweredSqlCommand,
895 authority: EntityAuthority,
896 ) -> Result<SqlDispatchResult, QueryError> {
897 self.execute_lowered_sql_dispatch_query_core(lowered, authority)
898 }
899
900 #[doc(hidden)]
906 pub fn execute_lowered_sql_dispatch_select_for_authority(
907 &self,
908 lowered: &LoweredSqlCommand,
909 authority: EntityAuthority,
910 ) -> Result<SqlDispatchResult, QueryError> {
911 let Some(query) = lowered.query() else {
912 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
913 SqlSurface::QueryFrom,
914 session_sql_lane(lowered),
915 )));
916 };
917
918 match query {
919 LoweredSqlQuery::Select(select) => {
920 self.execute_lowered_sql_dispatch_select_core(select, authority)
921 }
922 LoweredSqlQuery::Delete(_) => Err(QueryError::unsupported_query(
923 "generated SQL query dispatch requires lowered SELECT",
924 )),
925 }
926 }
927
928 fn execute_lowered_sql_dispatch_query_core(
931 &self,
932 lowered: &LoweredSqlCommand,
933 authority: EntityAuthority,
934 ) -> Result<SqlDispatchResult, QueryError> {
935 let Some(query) = lowered.query() else {
936 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
937 SqlSurface::QueryFrom,
938 session_sql_lane(lowered),
939 )));
940 };
941
942 match query {
943 LoweredSqlQuery::Select(select) => {
944 self.execute_lowered_sql_dispatch_select_core(select, authority)
945 }
946 LoweredSqlQuery::Delete(delete) => {
947 self.execute_lowered_sql_dispatch_delete_core(delete, authority)
948 }
949 }
950 }
951}