1use crate::{
2 db::{
3 DbSession, EntityFieldDescription, EntityResponse, EntitySchemaDescription,
4 MissingRowPolicy, PagedGroupedExecutionWithTrace, Query, QueryError,
5 executor::{
6 EntityAuthority, ScalarNumericFieldBoundaryRequest, ScalarProjectionBoundaryRequest,
7 execute_sql_projection_rows_for_canister,
8 },
9 query::{
10 builder::aggregate::{AggregateExpr, avg, count, count_by, max_by, min_by, sum},
11 intent::{IntentError, StructuralQuery},
12 plan::{
13 AggregateKind, FieldSlot, QueryMode,
14 expr::{Expr, ProjectionField},
15 },
16 },
17 sql::lowering::{
18 LoweredSqlCommand, LoweredSqlLaneKind, LoweredSqlQuery,
19 PreparedSqlStatement as CorePreparedSqlStatement, SqlCommand,
20 SqlGlobalAggregateCommand, SqlGlobalAggregateTerminal, SqlLoweringError,
21 StructuralSqlGlobalAggregateCommand, bind_lowered_sql_command,
22 bind_lowered_sql_explain_global_aggregate_structural, bind_lowered_sql_query,
23 bind_lowered_sql_query_structural, compile_sql_command,
24 compile_sql_global_aggregate_command, lower_sql_command_from_prepared_statement,
25 lowered_sql_command_lane, prepare_sql_statement,
26 render_lowered_sql_explain_plan_or_json,
27 },
28 sql::parser::{SqlExplainMode, SqlExplainTarget, SqlStatement, parse_sql},
29 },
30 error::{ErrorClass, ErrorOrigin, InternalError},
31 model::EntityModel,
32 traits::{CanisterKind, EntityKind, EntityValue},
33 value::Value,
34};
35
36#[derive(Clone, Debug, Eq, PartialEq)]
44pub enum SqlStatementRoute {
45 Query { entity: String },
46 Explain { entity: String },
47 Describe { entity: String },
48 ShowIndexes { entity: String },
49 ShowColumns { entity: String },
50 ShowEntities,
51}
52
53#[derive(Debug)]
59pub enum SqlDispatchResult {
60 Projection {
61 columns: Vec<String>,
62 rows: Vec<Vec<Value>>,
63 row_count: u32,
64 },
65 Explain(String),
66 Describe(EntitySchemaDescription),
67 ShowIndexes(Vec<String>),
68 ShowColumns(Vec<EntityFieldDescription>),
69 ShowEntities(Vec<String>),
70}
71
72#[derive(Clone, Debug)]
80pub struct SqlParsedStatement {
81 statement: SqlStatement,
82 route: SqlStatementRoute,
83}
84
85impl SqlParsedStatement {
86 #[must_use]
88 pub const fn route(&self) -> &SqlStatementRoute {
89 &self.route
90 }
91}
92
93#[derive(Clone, Debug)]
102pub struct SqlPreparedStatement {
103 prepared: CorePreparedSqlStatement,
104}
105
106#[derive(Debug)]
116struct SqlProjectionPayload {
117 columns: Vec<String>,
118 rows: Vec<Vec<Value>>,
119 row_count: u32,
120}
121
122impl SqlProjectionPayload {
123 #[must_use]
124 const fn new(columns: Vec<String>, rows: Vec<Vec<Value>>, row_count: u32) -> Self {
125 Self {
126 columns,
127 rows,
128 row_count,
129 }
130 }
131
132 #[must_use]
133 fn into_dispatch_result(self) -> SqlDispatchResult {
134 SqlDispatchResult::Projection {
135 columns: self.columns,
136 rows: self.rows,
137 row_count: self.row_count,
138 }
139 }
140}
141
142impl SqlStatementRoute {
143 #[must_use]
148 pub const fn entity(&self) -> &str {
149 match self {
150 Self::Query { entity }
151 | Self::Explain { entity }
152 | Self::Describe { entity }
153 | Self::ShowIndexes { entity }
154 | Self::ShowColumns { entity } => entity.as_str(),
155 Self::ShowEntities => "",
156 }
157 }
158
159 #[must_use]
161 pub const fn is_explain(&self) -> bool {
162 matches!(self, Self::Explain { .. })
163 }
164
165 #[must_use]
167 pub const fn is_describe(&self) -> bool {
168 matches!(self, Self::Describe { .. })
169 }
170
171 #[must_use]
173 pub const fn is_show_indexes(&self) -> bool {
174 matches!(self, Self::ShowIndexes { .. })
175 }
176
177 #[must_use]
179 pub const fn is_show_columns(&self) -> bool {
180 matches!(self, Self::ShowColumns { .. })
181 }
182
183 #[must_use]
185 pub const fn is_show_entities(&self) -> bool {
186 matches!(self, Self::ShowEntities)
187 }
188}
189
190#[derive(Clone, Copy, Debug, Eq, PartialEq)]
192enum SqlLaneKind {
193 Query,
194 Explain,
195 Describe,
196 ShowIndexes,
197 ShowColumns,
198 ShowEntities,
199}
200
201#[derive(Clone, Copy, Debug, Eq, PartialEq)]
203enum SqlSurface {
204 QueryFrom,
205 Explain,
206}
207
208const fn sql_command_lane<E: EntityKind>(command: &SqlCommand<E>) -> SqlLaneKind {
210 match command {
211 SqlCommand::Query(_) => SqlLaneKind::Query,
212 SqlCommand::Explain { .. } | SqlCommand::ExplainGlobalAggregate { .. } => {
213 SqlLaneKind::Explain
214 }
215 SqlCommand::DescribeEntity => SqlLaneKind::Describe,
216 SqlCommand::ShowIndexesEntity => SqlLaneKind::ShowIndexes,
217 SqlCommand::ShowColumnsEntity => SqlLaneKind::ShowColumns,
218 SqlCommand::ShowEntities => SqlLaneKind::ShowEntities,
219 }
220}
221
222const fn session_sql_lane(command: &LoweredSqlCommand) -> SqlLaneKind {
224 match lowered_sql_command_lane(command) {
225 LoweredSqlLaneKind::Query => SqlLaneKind::Query,
226 LoweredSqlLaneKind::Explain => SqlLaneKind::Explain,
227 LoweredSqlLaneKind::Describe => SqlLaneKind::Describe,
228 LoweredSqlLaneKind::ShowIndexes => SqlLaneKind::ShowIndexes,
229 LoweredSqlLaneKind::ShowColumns => SqlLaneKind::ShowColumns,
230 LoweredSqlLaneKind::ShowEntities => SqlLaneKind::ShowEntities,
231 }
232}
233
234const fn unsupported_sql_lane_message(surface: SqlSurface, lane: SqlLaneKind) -> &'static str {
236 match (surface, lane) {
237 (SqlSurface::QueryFrom, SqlLaneKind::Explain) => {
238 "query_from_sql does not accept EXPLAIN statements; use execute_sql_dispatch(...)"
239 }
240 (SqlSurface::QueryFrom, SqlLaneKind::Describe) => {
241 "query_from_sql does not accept DESCRIBE statements; use execute_sql_dispatch(...)"
242 }
243 (SqlSurface::QueryFrom, SqlLaneKind::ShowIndexes) => {
244 "query_from_sql does not accept SHOW INDEXES statements; use execute_sql_dispatch(...)"
245 }
246 (SqlSurface::QueryFrom, SqlLaneKind::ShowColumns) => {
247 "query_from_sql does not accept SHOW COLUMNS statements; use execute_sql_dispatch(...)"
248 }
249 (SqlSurface::QueryFrom, SqlLaneKind::ShowEntities) => {
250 "query_from_sql does not accept SHOW ENTITIES/SHOW TABLES statements; use execute_sql_dispatch(...)"
251 }
252 (SqlSurface::QueryFrom, SqlLaneKind::Query) => {
253 "query_from_sql requires one executable SELECT or DELETE statement"
254 }
255 (SqlSurface::Explain, SqlLaneKind::Describe) => {
256 "explain_sql does not accept DESCRIBE statements; use execute_sql_dispatch(...)"
257 }
258 (SqlSurface::Explain, SqlLaneKind::ShowIndexes) => {
259 "explain_sql does not accept SHOW INDEXES statements; use execute_sql_dispatch(...)"
260 }
261 (SqlSurface::Explain, SqlLaneKind::ShowColumns) => {
262 "explain_sql does not accept SHOW COLUMNS statements; use execute_sql_dispatch(...)"
263 }
264 (SqlSurface::Explain, SqlLaneKind::ShowEntities) => {
265 "explain_sql does not accept SHOW ENTITIES/SHOW TABLES statements; use execute_sql_dispatch(...)"
266 }
267 (SqlSurface::Explain, SqlLaneKind::Query | SqlLaneKind::Explain) => {
268 "explain_sql requires an EXPLAIN statement"
269 }
270 }
271}
272
273fn unsupported_sql_lane_error(surface: SqlSurface, lane: SqlLaneKind) -> QueryError {
275 QueryError::execute(InternalError::classified(
276 ErrorClass::Unsupported,
277 ErrorOrigin::Query,
278 unsupported_sql_lane_message(surface, lane),
279 ))
280}
281
282fn compile_sql_command_ignore<E: EntityKind>(sql: &str) -> Result<SqlCommand<E>, QueryError> {
284 compile_sql_command::<E>(sql, MissingRowPolicy::Ignore).map_err(map_sql_lowering_error)
285}
286
287fn map_sql_lowering_error(err: SqlLoweringError) -> QueryError {
289 match err {
290 SqlLoweringError::Query(err) => err,
291 SqlLoweringError::Parse(crate::db::sql::parser::SqlParseError::UnsupportedFeature {
292 feature,
293 }) => QueryError::execute(InternalError::query_unsupported_sql_feature(feature)),
294 other => QueryError::execute(InternalError::classified(
295 ErrorClass::Unsupported,
296 ErrorOrigin::Query,
297 format!("SQL query is not executable in this release: {other}"),
298 )),
299 }
300}
301
302fn map_sql_parse_error(err: crate::db::sql::parser::SqlParseError) -> QueryError {
305 map_sql_lowering_error(SqlLoweringError::Parse(err))
306}
307
308fn sql_statement_route_from_statement(statement: &SqlStatement) -> SqlStatementRoute {
310 match statement {
311 SqlStatement::Select(select) => SqlStatementRoute::Query {
312 entity: select.entity.clone(),
313 },
314 SqlStatement::Delete(delete) => SqlStatementRoute::Query {
315 entity: delete.entity.clone(),
316 },
317 SqlStatement::Explain(explain) => match &explain.statement {
318 SqlExplainTarget::Select(select) => SqlStatementRoute::Explain {
319 entity: select.entity.clone(),
320 },
321 SqlExplainTarget::Delete(delete) => SqlStatementRoute::Explain {
322 entity: delete.entity.clone(),
323 },
324 },
325 SqlStatement::Describe(describe) => SqlStatementRoute::Describe {
326 entity: describe.entity.clone(),
327 },
328 SqlStatement::ShowIndexes(show_indexes) => SqlStatementRoute::ShowIndexes {
329 entity: show_indexes.entity.clone(),
330 },
331 SqlStatement::ShowColumns(show_columns) => SqlStatementRoute::ShowColumns {
332 entity: show_columns.entity.clone(),
333 },
334 SqlStatement::ShowEntities(_) => SqlStatementRoute::ShowEntities,
335 }
336}
337
338fn resolve_sql_aggregate_target_slot_with_model(
341 model: &'static EntityModel,
342 field: &str,
343) -> Result<FieldSlot, QueryError> {
344 FieldSlot::resolve(model, field).ok_or_else(|| {
345 QueryError::execute(crate::db::error::executor_unsupported(format!(
346 "unknown aggregate target field: {field}",
347 )))
348 })
349}
350
351fn resolve_sql_aggregate_target_slot<E: EntityKind>(field: &str) -> Result<FieldSlot, QueryError> {
352 resolve_sql_aggregate_target_slot_with_model(E::MODEL, field)
353}
354
355fn sql_global_aggregate_terminal_to_expr_with_model(
358 model: &'static EntityModel,
359 terminal: &SqlGlobalAggregateTerminal,
360) -> Result<AggregateExpr, QueryError> {
361 match terminal {
362 SqlGlobalAggregateTerminal::CountRows => Ok(count()),
363 SqlGlobalAggregateTerminal::CountField(field) => {
364 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
365
366 Ok(count_by(field.as_str()))
367 }
368 SqlGlobalAggregateTerminal::SumField(field) => {
369 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
370
371 Ok(sum(field.as_str()))
372 }
373 SqlGlobalAggregateTerminal::AvgField(field) => {
374 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
375
376 Ok(avg(field.as_str()))
377 }
378 SqlGlobalAggregateTerminal::MinField(field) => {
379 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
380
381 Ok(min_by(field.as_str()))
382 }
383 SqlGlobalAggregateTerminal::MaxField(field) => {
384 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
385
386 Ok(max_by(field.as_str()))
387 }
388 }
389}
390
391fn sql_global_aggregate_terminal_to_expr<E: EntityKind>(
392 terminal: &SqlGlobalAggregateTerminal,
393) -> Result<AggregateExpr, QueryError> {
394 sql_global_aggregate_terminal_to_expr_with_model(E::MODEL, terminal)
395}
396
397fn projection_label_from_aggregate(aggregate: &AggregateExpr) -> String {
399 let kind = match aggregate.kind() {
400 AggregateKind::Count => "COUNT",
401 AggregateKind::Sum => "SUM",
402 AggregateKind::Avg => "AVG",
403 AggregateKind::Exists => "EXISTS",
404 AggregateKind::First => "FIRST",
405 AggregateKind::Last => "LAST",
406 AggregateKind::Min => "MIN",
407 AggregateKind::Max => "MAX",
408 };
409 let distinct = if aggregate.is_distinct() {
410 "DISTINCT "
411 } else {
412 ""
413 };
414
415 if let Some(field) = aggregate.target_field() {
416 return format!("{kind}({distinct}{field})");
417 }
418
419 format!("{kind}({distinct}*)")
420}
421
422fn projection_label_from_expr(expr: &Expr, ordinal: usize) -> String {
424 match expr {
425 Expr::Field(field) => field.as_str().to_string(),
426 Expr::Aggregate(aggregate) => projection_label_from_aggregate(aggregate),
427 Expr::Alias { name, .. } => name.as_str().to_string(),
428 Expr::Literal(_) | Expr::Unary { .. } | Expr::Binary { .. } => {
429 format!("expr_{ordinal}")
430 }
431 }
432}
433
434fn projection_labels_from_query<E: EntityKind>(
436 query: &Query<E>,
437) -> Result<Vec<String>, QueryError> {
438 let projection = query.plan()?.projection_spec();
439 Ok(projection_labels_from_projection_spec(&projection))
440}
441
442fn projection_labels_from_structural_query(
444 query: &StructuralQuery,
445) -> Result<Vec<String>, QueryError> {
446 let projection = query.build_plan()?.projection_spec(query.model());
447 Ok(projection_labels_from_projection_spec(&projection))
448}
449
450fn projection_labels_from_projection_spec(
453 projection: &crate::db::query::plan::expr::ProjectionSpec,
454) -> Vec<String> {
455 let mut labels = Vec::with_capacity(projection.len());
456
457 for (ordinal, field) in projection.fields().enumerate() {
458 match field {
459 ProjectionField::Scalar {
460 expr: _,
461 alias: Some(alias),
462 } => labels.push(alias.as_str().to_string()),
463 ProjectionField::Scalar { expr, alias: None } => {
464 labels.push(projection_label_from_expr(expr, ordinal));
465 }
466 }
467 }
468
469 labels
470}
471
472fn projection_labels_from_entity_model<E: EntityKind>() -> Vec<String> {
474 E::MODEL
475 .fields
476 .iter()
477 .map(|field| field.name.to_string())
478 .collect()
479}
480
481fn projection_payload_from_entity_response<E>(
484 response: crate::db::EntityResponse<E>,
485) -> SqlProjectionPayload
486where
487 E: EntityKind + EntityValue,
488{
489 let row_count = response.count();
490 let rows = response
491 .rows()
492 .into_iter()
493 .map(|row| {
494 let (_, entity) = row.into_parts();
495
496 (0..E::MODEL.fields.len())
497 .map(|index| entity.get_value_by_index(index).unwrap_or(Value::Null))
498 .collect::<Vec<_>>()
499 })
500 .collect::<Vec<_>>();
501
502 SqlProjectionPayload::new(projection_labels_from_entity_model::<E>(), rows, row_count)
503}
504
505impl<C: CanisterKind> DbSession<C> {
506 fn execute_structural_sql_projection(
510 &self,
511 query: StructuralQuery,
512 authority: EntityAuthority,
513 ) -> Result<SqlProjectionPayload, QueryError> {
514 let columns = projection_labels_from_structural_query(&query)?;
515 let projected = execute_sql_projection_rows_for_canister(
516 &self.db,
517 self.debug,
518 authority,
519 query.build_plan()?,
520 )
521 .map_err(QueryError::execute)?;
522 let (rows, row_count) = projected.into_parts();
523
524 Ok(SqlProjectionPayload::new(columns, rows, row_count))
525 }
526
527 fn execute_typed_sql_projection<E>(
530 &self,
531 query: &Query<E>,
532 ) -> Result<SqlProjectionPayload, QueryError>
533 where
534 E: EntityKind<Canister = C> + EntityValue,
535 {
536 let columns = projection_labels_from_query(query)?;
537 let projected = execute_sql_projection_rows_for_canister(
538 &self.db,
539 self.debug,
540 EntityAuthority::for_type::<E>(),
541 query.plan()?.into_inner(),
542 )
543 .map_err(QueryError::execute)?;
544 let (rows, row_count) = projected.into_parts();
545
546 Ok(SqlProjectionPayload::new(columns, rows, row_count))
547 }
548
549 fn execute_sql_dispatch_query_from_command<E>(
551 &self,
552 command: SqlCommand<E>,
553 ) -> Result<SqlDispatchResult, QueryError>
554 where
555 E: EntityKind<Canister = C> + EntityValue,
556 {
557 match command {
558 SqlCommand::Query(query) => {
559 if query.has_grouping() {
560 return Err(QueryError::Intent(
561 IntentError::GroupedRequiresExecuteGrouped,
562 ));
563 }
564
565 match query.mode() {
566 QueryMode::Load(_) => self
567 .execute_typed_sql_projection(&query)
568 .map(SqlProjectionPayload::into_dispatch_result),
569 QueryMode::Delete(_) => self.execute_typed_sql_delete(&query),
570 }
571 }
572 SqlCommand::Explain { .. } | SqlCommand::ExplainGlobalAggregate { .. } => Err(
573 unsupported_sql_lane_error(SqlSurface::QueryFrom, SqlLaneKind::Explain),
574 ),
575 SqlCommand::DescribeEntity
576 | SqlCommand::ShowIndexesEntity
577 | SqlCommand::ShowColumnsEntity
578 | SqlCommand::ShowEntities => Err(QueryError::execute(InternalError::classified(
579 ErrorClass::Unsupported,
580 ErrorOrigin::Query,
581 "query-lane SQL dispatch only accepts SELECT, DELETE, and EXPLAIN statements",
582 ))),
583 }
584 }
585
586 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
588 where
589 E: EntityKind<Canister = C> + EntityValue,
590 {
591 let deleted = self.execute_query(query)?;
592
593 Ok(projection_payload_from_entity_response(deleted).into_dispatch_result())
594 }
595
596 fn execute_sql_dispatch_explain_from_command<E>(
598 command: SqlCommand<E>,
599 ) -> Result<String, QueryError>
600 where
601 E: EntityKind<Canister = C> + EntityValue,
602 {
603 Self::explain_sql_from_command::<E>(command, SqlLaneKind::Explain)
604 }
605
606 fn explain_sql_from_command<E>(
608 command: SqlCommand<E>,
609 lane: SqlLaneKind,
610 ) -> Result<String, QueryError>
611 where
612 E: EntityKind<Canister = C> + EntityValue,
613 {
614 match command {
615 SqlCommand::Query(_)
616 | SqlCommand::DescribeEntity
617 | SqlCommand::ShowIndexesEntity
618 | SqlCommand::ShowColumnsEntity
619 | SqlCommand::ShowEntities => {
620 Err(unsupported_sql_lane_error(SqlSurface::Explain, lane))
621 }
622 SqlCommand::Explain { mode, query } => match mode {
623 SqlExplainMode::Plan => Ok(query.explain()?.render_text_canonical()),
624 SqlExplainMode::Execution => query.explain_execution_text(),
625 SqlExplainMode::Json => Ok(query.explain()?.render_json_canonical()),
626 },
627 SqlCommand::ExplainGlobalAggregate { mode, command } => {
628 Self::explain_sql_global_aggregate::<E>(mode, command)
629 }
630 }
631 }
632
633 pub fn parse_sql_statement(&self, sql: &str) -> Result<SqlParsedStatement, QueryError> {
637 let statement = parse_sql(sql).map_err(map_sql_parse_error)?;
638 let route = sql_statement_route_from_statement(&statement);
639
640 Ok(SqlParsedStatement { statement, route })
641 }
642
643 pub fn sql_statement_route(&self, sql: &str) -> Result<SqlStatementRoute, QueryError> {
648 let parsed = self.parse_sql_statement(sql)?;
649
650 Ok(parsed.route().clone())
651 }
652
653 pub fn prepare_sql_dispatch_parsed(
658 &self,
659 parsed: &SqlParsedStatement,
660 expected_entity: &'static str,
661 ) -> Result<SqlPreparedStatement, QueryError> {
662 let prepared = prepare_sql_statement(parsed.statement.clone(), expected_entity)
663 .map_err(map_sql_lowering_error)?;
664
665 Ok(SqlPreparedStatement { prepared })
666 }
667
668 pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
673 where
674 E: EntityKind<Canister = C>,
675 {
676 let command = compile_sql_command_ignore::<E>(sql)?;
677 let lane = sql_command_lane(&command);
678
679 match command {
680 SqlCommand::Query(query) => Ok(query),
681 SqlCommand::Explain { .. }
682 | SqlCommand::ExplainGlobalAggregate { .. }
683 | SqlCommand::DescribeEntity
684 | SqlCommand::ShowIndexesEntity
685 | SqlCommand::ShowColumnsEntity
686 | SqlCommand::ShowEntities => {
687 Err(unsupported_sql_lane_error(SqlSurface::QueryFrom, lane))
688 }
689 }
690 }
691
692 pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
694 where
695 E: EntityKind<Canister = C> + EntityValue,
696 {
697 let query = self.query_from_sql::<E>(sql)?;
698 if query.has_grouping() {
699 return Err(QueryError::Intent(
700 IntentError::GroupedRequiresExecuteGrouped,
701 ));
702 }
703
704 self.execute_query(&query)
705 }
706
707 pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
712 where
713 E: EntityKind<Canister = C> + EntityValue,
714 {
715 let command = compile_sql_global_aggregate_command::<E>(sql, MissingRowPolicy::Ignore)
716 .map_err(map_sql_lowering_error)?;
717
718 match command.terminal() {
719 SqlGlobalAggregateTerminal::CountRows => self
720 .execute_load_query_with(command.query(), |load, plan| {
721 load.execute_scalar_terminal_request(
722 plan,
723 crate::db::executor::ScalarTerminalBoundaryRequest::Count,
724 )?
725 .into_count()
726 })
727 .map(|count| Value::Uint(u64::from(count))),
728 SqlGlobalAggregateTerminal::CountField(field) => {
729 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
730 self.execute_load_query_with(command.query(), |load, plan| {
731 load.execute_scalar_projection_boundary(
732 plan,
733 target_slot,
734 ScalarProjectionBoundaryRequest::Values,
735 )?
736 .into_values()
737 })
738 .map(|values| {
739 let count = values
740 .into_iter()
741 .filter(|value| !matches!(value, Value::Null))
742 .count();
743 Value::Uint(u64::try_from(count).unwrap_or(u64::MAX))
744 })
745 }
746 SqlGlobalAggregateTerminal::SumField(field) => {
747 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
748 self.execute_load_query_with(command.query(), |load, plan| {
749 load.execute_numeric_field_boundary(
750 plan,
751 target_slot,
752 ScalarNumericFieldBoundaryRequest::Sum,
753 )
754 })
755 .map(|value| value.map_or(Value::Null, Value::Decimal))
756 }
757 SqlGlobalAggregateTerminal::AvgField(field) => {
758 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
759 self.execute_load_query_with(command.query(), |load, plan| {
760 load.execute_numeric_field_boundary(
761 plan,
762 target_slot,
763 ScalarNumericFieldBoundaryRequest::Avg,
764 )
765 })
766 .map(|value| value.map_or(Value::Null, Value::Decimal))
767 }
768 SqlGlobalAggregateTerminal::MinField(field) => {
769 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
770 let min_id = self.execute_load_query_with(command.query(), |load, plan| {
771 load.execute_scalar_terminal_request(
772 plan,
773 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
774 kind: AggregateKind::Min,
775 target_field: target_slot,
776 },
777 )?
778 .into_id()
779 })?;
780
781 match min_id {
782 Some(id) => self
783 .load::<E>()
784 .by_id(id)
785 .first_value_by(field)
786 .map(|value| value.unwrap_or(Value::Null)),
787 None => Ok(Value::Null),
788 }
789 }
790 SqlGlobalAggregateTerminal::MaxField(field) => {
791 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
792 let max_id = self.execute_load_query_with(command.query(), |load, plan| {
793 load.execute_scalar_terminal_request(
794 plan,
795 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
796 kind: AggregateKind::Max,
797 target_field: target_slot,
798 },
799 )?
800 .into_id()
801 })?;
802
803 match max_id {
804 Some(id) => self
805 .load::<E>()
806 .by_id(id)
807 .first_value_by(field)
808 .map(|value| value.unwrap_or(Value::Null)),
809 None => Ok(Value::Null),
810 }
811 }
812 }
813 }
814
815 pub fn execute_sql_grouped<E>(
817 &self,
818 sql: &str,
819 cursor_token: Option<&str>,
820 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
821 where
822 E: EntityKind<Canister = C> + EntityValue,
823 {
824 let query = self.query_from_sql::<E>(sql)?;
825 if !query.has_grouping() {
826 return Err(QueryError::execute(InternalError::classified(
827 ErrorClass::Unsupported,
828 ErrorOrigin::Query,
829 "execute_sql_grouped requires grouped SQL query intent",
830 )));
831 }
832
833 self.execute_grouped(&query, cursor_token)
834 }
835
836 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
838 where
839 E: EntityKind<Canister = C> + EntityValue,
840 {
841 let parsed = self.parse_sql_statement(sql)?;
842
843 self.execute_sql_dispatch_parsed::<E>(&parsed)
844 }
845
846 pub fn execute_sql_dispatch_parsed<E>(
848 &self,
849 parsed: &SqlParsedStatement,
850 ) -> Result<SqlDispatchResult, QueryError>
851 where
852 E: EntityKind<Canister = C> + EntityValue,
853 {
854 let prepared = self.prepare_sql_dispatch_parsed(parsed, E::MODEL.entity_name())?;
855
856 self.execute_sql_dispatch_prepared::<E>(&prepared)
857 }
858
859 pub fn execute_sql_dispatch_prepared<E>(
861 &self,
862 prepared: &SqlPreparedStatement,
863 ) -> Result<SqlDispatchResult, QueryError>
864 where
865 E: EntityKind<Canister = C> + EntityValue,
866 {
867 let lowered = lower_sql_command_from_prepared_statement(
868 prepared.prepared.clone(),
869 E::MODEL.primary_key.name,
870 )
871 .map_err(map_sql_lowering_error)?;
872 let command = bind_lowered_sql_command::<E>(lowered, MissingRowPolicy::Ignore)
873 .map_err(map_sql_lowering_error)?;
874
875 match command {
876 SqlCommand::Query(_) => self.execute_sql_dispatch_query_from_command::<E>(command),
877 SqlCommand::Explain { .. } | SqlCommand::ExplainGlobalAggregate { .. } => {
878 Self::execute_sql_dispatch_explain_from_command::<E>(command)
879 .map(SqlDispatchResult::Explain)
880 }
881 SqlCommand::DescribeEntity => {
882 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
883 }
884 SqlCommand::ShowIndexesEntity => {
885 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
886 }
887 SqlCommand::ShowColumnsEntity => {
888 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
889 }
890 SqlCommand::ShowEntities => Ok(SqlDispatchResult::ShowEntities(self.show_entities())),
891 }
892 }
893
894 pub fn execute_sql_dispatch_query_lane_prepared<E>(
896 &self,
897 prepared: &SqlPreparedStatement,
898 ) -> Result<SqlDispatchResult, QueryError>
899 where
900 E: EntityKind<Canister = C> + EntityValue,
901 {
902 let lowered =
903 self.lower_sql_dispatch_query_lane_prepared(prepared, E::MODEL.primary_key.name)?;
904 let lane = session_sql_lane(&lowered);
905
906 match lane {
907 SqlLaneKind::Query => self.execute_lowered_sql_dispatch_query::<E>(&lowered),
908 SqlLaneKind::Explain => self
909 .explain_lowered_sql_dispatch::<E>(&lowered)
910 .map(SqlDispatchResult::Explain),
911 SqlLaneKind::Describe
912 | SqlLaneKind::ShowIndexes
913 | SqlLaneKind::ShowColumns
914 | SqlLaneKind::ShowEntities => Err(QueryError::execute(InternalError::classified(
915 ErrorClass::Unsupported,
916 ErrorOrigin::Query,
917 "query-lane SQL dispatch only accepts SELECT, DELETE, and EXPLAIN statements",
918 ))),
919 }
920 }
921
922 pub fn lower_sql_dispatch_query_lane_prepared(
924 &self,
925 prepared: &SqlPreparedStatement,
926 primary_key_field: &str,
927 ) -> Result<LoweredSqlCommand, QueryError> {
928 let lowered =
929 lower_sql_command_from_prepared_statement(prepared.prepared.clone(), primary_key_field)
930 .map_err(map_sql_lowering_error)?;
931 let lane = lowered_sql_command_lane(&lowered);
932
933 match lane {
934 LoweredSqlLaneKind::Query | LoweredSqlLaneKind::Explain => Ok(lowered),
935 LoweredSqlLaneKind::Describe
936 | LoweredSqlLaneKind::ShowIndexes
937 | LoweredSqlLaneKind::ShowColumns
938 | LoweredSqlLaneKind::ShowEntities => {
939 Err(QueryError::execute(InternalError::classified(
940 ErrorClass::Unsupported,
941 ErrorOrigin::Query,
942 "query-lane SQL dispatch only accepts SELECT, DELETE, and EXPLAIN statements",
943 )))
944 }
945 }
946 }
947
948 pub fn execute_lowered_sql_dispatch_query<E>(
950 &self,
951 lowered: &LoweredSqlCommand,
952 ) -> Result<SqlDispatchResult, QueryError>
953 where
954 E: EntityKind<Canister = C> + EntityValue,
955 {
956 self.execute_lowered_sql_dispatch_query_core(
957 lowered,
958 E::MODEL,
959 EntityAuthority::for_type::<E>(),
960 Self::execute_lowered_sql_dispatch_delete::<E>,
961 )
962 }
963
964 fn execute_lowered_sql_dispatch_delete<E>(
966 &self,
967 lowered: &LoweredSqlCommand,
968 ) -> Result<SqlDispatchResult, QueryError>
969 where
970 E: EntityKind<Canister = C> + EntityValue,
971 {
972 let Some(query) = lowered.query().cloned() else {
973 return Err(QueryError::execute(InternalError::classified(
974 ErrorClass::Unsupported,
975 ErrorOrigin::Query,
976 "lowered SQL delete dispatch requires one lowered delete query command",
977 )));
978 };
979 let LoweredSqlQuery::Delete(_) = query else {
980 return Err(QueryError::execute(InternalError::classified(
981 ErrorClass::Unsupported,
982 ErrorOrigin::Query,
983 "lowered SQL delete dispatch requires one lowered DELETE command",
984 )));
985 };
986
987 let query = bind_lowered_sql_query::<E>(query, MissingRowPolicy::Ignore)
988 .map_err(map_sql_lowering_error)?;
989
990 self.execute_typed_sql_delete(&query)
991 }
992
993 fn execute_lowered_sql_dispatch_query_core(
996 &self,
997 lowered: &LoweredSqlCommand,
998 model: &'static EntityModel,
999 authority: EntityAuthority,
1000 execute_delete: fn(&Self, &LoweredSqlCommand) -> Result<SqlDispatchResult, QueryError>,
1001 ) -> Result<SqlDispatchResult, QueryError> {
1002 let lane = session_sql_lane(lowered);
1003 if lane != SqlLaneKind::Query {
1004 return Err(unsupported_sql_lane_error(SqlSurface::QueryFrom, lane));
1005 }
1006
1007 let Some(query) = lowered.query().cloned() else {
1008 return Err(QueryError::execute(InternalError::classified(
1009 ErrorClass::Unsupported,
1010 ErrorOrigin::Query,
1011 "lowered SQL query dispatch requires one lowered query command",
1012 )));
1013 };
1014
1015 match query {
1016 LoweredSqlQuery::Select(_) => {
1017 let structural =
1018 bind_lowered_sql_query_structural(model, query, MissingRowPolicy::Ignore)
1019 .map_err(map_sql_lowering_error)?;
1020
1021 self.execute_structural_sql_projection(structural, authority)
1022 .map(SqlProjectionPayload::into_dispatch_result)
1023 }
1024 LoweredSqlQuery::Delete(_) => execute_delete(self, lowered),
1025 }
1026 }
1027
1028 pub fn explain_lowered_sql_dispatch<E>(
1030 &self,
1031 lowered: &LoweredSqlCommand,
1032 ) -> Result<String, QueryError>
1033 where
1034 E: EntityKind<Canister = C> + EntityValue,
1035 {
1036 Self::explain_lowered_sql_dispatch_core(lowered, E::MODEL)
1037 }
1038
1039 fn explain_lowered_sql_dispatch_core(
1042 lowered: &LoweredSqlCommand,
1043 model: &'static EntityModel,
1044 ) -> Result<String, QueryError> {
1045 let lane = session_sql_lane(lowered);
1048 if lane != SqlLaneKind::Explain {
1049 return Err(unsupported_sql_lane_error(SqlSurface::Explain, lane));
1050 }
1051
1052 if let Some(rendered) =
1055 render_lowered_sql_explain_plan_or_json(lowered, model, MissingRowPolicy::Ignore)
1056 .map_err(map_sql_lowering_error)?
1057 {
1058 return Ok(rendered);
1059 }
1060
1061 if let Some((mode, command)) = bind_lowered_sql_explain_global_aggregate_structural(
1064 lowered,
1065 model,
1066 MissingRowPolicy::Ignore,
1067 ) {
1068 return Self::explain_sql_global_aggregate_structural(mode, command);
1069 }
1070
1071 Err(QueryError::execute(InternalError::classified(
1072 ErrorClass::Unsupported,
1073 ErrorOrigin::Query,
1074 "shared EXPLAIN dispatch could not classify the lowered SQL command shape",
1075 )))
1076 }
1077
1078 fn explain_sql_global_aggregate<E>(
1080 mode: SqlExplainMode,
1081 command: SqlGlobalAggregateCommand<E>,
1082 ) -> Result<String, QueryError>
1083 where
1084 E: EntityKind<Canister = C> + EntityValue,
1085 {
1086 match mode {
1087 SqlExplainMode::Plan => {
1088 let _ = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
1091
1092 Ok(command.query().explain()?.render_text_canonical())
1093 }
1094 SqlExplainMode::Execution => {
1095 let aggregate = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
1096 let plan = Self::explain_load_query_terminal_with(command.query(), aggregate)?;
1097
1098 Ok(plan.execution_node_descriptor().render_text_tree())
1099 }
1100 SqlExplainMode::Json => {
1101 let _ = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
1104
1105 Ok(command.query().explain()?.render_json_canonical())
1106 }
1107 }
1108 }
1109
1110 fn explain_sql_global_aggregate_structural(
1113 mode: SqlExplainMode,
1114 command: StructuralSqlGlobalAggregateCommand,
1115 ) -> Result<String, QueryError> {
1116 let model = command.query().model();
1117
1118 match mode {
1119 SqlExplainMode::Plan => {
1120 let _ =
1121 sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
1122
1123 Ok(command
1124 .query()
1125 .build_plan()?
1126 .explain_with_model(model)
1127 .render_text_canonical())
1128 }
1129 SqlExplainMode::Execution => {
1130 let aggregate =
1131 sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
1132 let plan = command.query().explain_aggregate_terminal(aggregate)?;
1133
1134 Ok(plan.execution_node_descriptor().render_text_tree())
1135 }
1136 SqlExplainMode::Json => {
1137 let _ =
1138 sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
1139
1140 Ok(command
1141 .query()
1142 .build_plan()?
1143 .explain_with_model(model)
1144 .render_json_canonical())
1145 }
1146 }
1147 }
1148}