1use crate::{
2 db::{
3 DbSession, EntityFieldDescription, EntityResponse, EntitySchemaDescription,
4 MissingRowPolicy, PagedGroupedExecutionWithTrace, PersistedRow, Query, QueryError,
5 executor::{
6 EntityAuthority, KernelRow, ScalarNumericFieldBoundaryRequest,
7 ScalarProjectionBoundaryRequest, execute_sql_delete_projection_for_canister,
8 execute_sql_projection_rows_for_canister,
9 },
10 query::{
11 builder::aggregate::{AggregateExpr, avg, count, count_by, max_by, min_by, sum},
12 intent::StructuralQuery,
13 plan::{
14 AggregateKind, FieldSlot, QueryMode,
15 expr::{Expr, ProjectionField},
16 resolve_aggregate_target_field_slot,
17 },
18 },
19 sql::lowering::{
20 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlCommand, LoweredSqlLaneKind,
21 LoweredSqlQuery, PreparedSqlStatement as CorePreparedSqlStatement, SqlCommand,
22 SqlGlobalAggregateCommand, SqlGlobalAggregateTerminal,
23 StructuralSqlGlobalAggregateCommand, apply_lowered_select_shape,
24 bind_lowered_sql_command, bind_lowered_sql_delete_query_structural,
25 bind_lowered_sql_explain_global_aggregate_structural, compile_sql_command,
26 compile_sql_global_aggregate_command, lower_sql_command_from_prepared_statement,
27 lowered_sql_command_lane, prepare_sql_statement,
28 render_lowered_sql_explain_plan_or_json,
29 },
30 sql::parser::{SqlExplainMode, SqlExplainTarget, SqlStatement, parse_sql},
31 },
32 model::EntityModel,
33 traits::{CanisterKind, EntityKind, EntityValue},
34 value::Value,
35};
36
37#[derive(Clone, Debug, Eq, PartialEq)]
45pub enum SqlStatementRoute {
46 Query { entity: String },
47 Explain { entity: String },
48 Describe { entity: String },
49 ShowIndexes { entity: String },
50 ShowColumns { entity: String },
51 ShowEntities,
52}
53
54#[derive(Debug)]
60pub enum SqlDispatchResult {
61 Projection {
62 columns: Vec<String>,
63 rows: Vec<Vec<Value>>,
64 row_count: u32,
65 },
66 Explain(String),
67 Describe(EntitySchemaDescription),
68 ShowIndexes(Vec<String>),
69 ShowColumns(Vec<EntityFieldDescription>),
70 ShowEntities(Vec<String>),
71}
72
73#[derive(Clone, Debug)]
81pub struct SqlParsedStatement {
82 statement: SqlStatement,
83 route: SqlStatementRoute,
84}
85
86impl SqlParsedStatement {
87 #[must_use]
89 pub const fn route(&self) -> &SqlStatementRoute {
90 &self.route
91 }
92}
93
94#[derive(Clone, Debug)]
103pub struct SqlPreparedStatement {
104 prepared: CorePreparedSqlStatement,
105}
106
107#[derive(Debug)]
117struct SqlProjectionPayload {
118 columns: Vec<String>,
119 rows: Vec<Vec<Value>>,
120 row_count: u32,
121}
122
123impl SqlProjectionPayload {
124 #[must_use]
125 const fn new(columns: Vec<String>, rows: Vec<Vec<Value>>, row_count: u32) -> Self {
126 Self {
127 columns,
128 rows,
129 row_count,
130 }
131 }
132
133 #[must_use]
134 fn into_dispatch_result(self) -> SqlDispatchResult {
135 SqlDispatchResult::Projection {
136 columns: self.columns,
137 rows: self.rows,
138 row_count: self.row_count,
139 }
140 }
141}
142
143impl SqlStatementRoute {
144 #[must_use]
149 pub const fn entity(&self) -> &str {
150 match self {
151 Self::Query { entity }
152 | Self::Explain { entity }
153 | Self::Describe { entity }
154 | Self::ShowIndexes { entity }
155 | Self::ShowColumns { entity } => entity.as_str(),
156 Self::ShowEntities => "",
157 }
158 }
159
160 #[must_use]
162 pub const fn is_explain(&self) -> bool {
163 matches!(self, Self::Explain { .. })
164 }
165
166 #[must_use]
168 pub const fn is_describe(&self) -> bool {
169 matches!(self, Self::Describe { .. })
170 }
171
172 #[must_use]
174 pub const fn is_show_indexes(&self) -> bool {
175 matches!(self, Self::ShowIndexes { .. })
176 }
177
178 #[must_use]
180 pub const fn is_show_columns(&self) -> bool {
181 matches!(self, Self::ShowColumns { .. })
182 }
183
184 #[must_use]
186 pub const fn is_show_entities(&self) -> bool {
187 matches!(self, Self::ShowEntities)
188 }
189}
190
191#[derive(Clone, Copy, Debug, Eq, PartialEq)]
193enum SqlLaneKind {
194 Query,
195 Explain,
196 Describe,
197 ShowIndexes,
198 ShowColumns,
199 ShowEntities,
200}
201
202#[derive(Clone, Copy, Debug, Eq, PartialEq)]
204enum SqlSurface {
205 QueryFrom,
206 Explain,
207}
208
209const fn sql_command_lane<E: EntityKind>(command: &SqlCommand<E>) -> SqlLaneKind {
211 match command {
212 SqlCommand::Query(_) => SqlLaneKind::Query,
213 SqlCommand::Explain { .. } | SqlCommand::ExplainGlobalAggregate { .. } => {
214 SqlLaneKind::Explain
215 }
216 SqlCommand::DescribeEntity => SqlLaneKind::Describe,
217 SqlCommand::ShowIndexesEntity => SqlLaneKind::ShowIndexes,
218 SqlCommand::ShowColumnsEntity => SqlLaneKind::ShowColumns,
219 SqlCommand::ShowEntities => SqlLaneKind::ShowEntities,
220 }
221}
222
223const fn session_sql_lane(command: &LoweredSqlCommand) -> SqlLaneKind {
225 match lowered_sql_command_lane(command) {
226 LoweredSqlLaneKind::Query => SqlLaneKind::Query,
227 LoweredSqlLaneKind::Explain => SqlLaneKind::Explain,
228 LoweredSqlLaneKind::Describe => SqlLaneKind::Describe,
229 LoweredSqlLaneKind::ShowIndexes => SqlLaneKind::ShowIndexes,
230 LoweredSqlLaneKind::ShowColumns => SqlLaneKind::ShowColumns,
231 LoweredSqlLaneKind::ShowEntities => SqlLaneKind::ShowEntities,
232 }
233}
234
235const fn unsupported_sql_lane_message(surface: SqlSurface, lane: SqlLaneKind) -> &'static str {
237 match (surface, lane) {
238 (SqlSurface::QueryFrom, SqlLaneKind::Explain) => {
239 "query_from_sql does not accept EXPLAIN statements; use execute_sql_dispatch(...)"
240 }
241 (SqlSurface::QueryFrom, SqlLaneKind::Describe) => {
242 "query_from_sql does not accept DESCRIBE statements; use execute_sql_dispatch(...)"
243 }
244 (SqlSurface::QueryFrom, SqlLaneKind::ShowIndexes) => {
245 "query_from_sql does not accept SHOW INDEXES statements; use execute_sql_dispatch(...)"
246 }
247 (SqlSurface::QueryFrom, SqlLaneKind::ShowColumns) => {
248 "query_from_sql does not accept SHOW COLUMNS statements; use execute_sql_dispatch(...)"
249 }
250 (SqlSurface::QueryFrom, SqlLaneKind::ShowEntities) => {
251 "query_from_sql does not accept SHOW ENTITIES/SHOW TABLES statements; use execute_sql_dispatch(...)"
252 }
253 (SqlSurface::QueryFrom, SqlLaneKind::Query) => {
254 "query_from_sql requires one executable SELECT or DELETE statement"
255 }
256 (SqlSurface::Explain, SqlLaneKind::Describe) => {
257 "explain_sql does not accept DESCRIBE statements; use execute_sql_dispatch(...)"
258 }
259 (SqlSurface::Explain, SqlLaneKind::ShowIndexes) => {
260 "explain_sql does not accept SHOW INDEXES statements; use execute_sql_dispatch(...)"
261 }
262 (SqlSurface::Explain, SqlLaneKind::ShowColumns) => {
263 "explain_sql does not accept SHOW COLUMNS statements; use execute_sql_dispatch(...)"
264 }
265 (SqlSurface::Explain, SqlLaneKind::ShowEntities) => {
266 "explain_sql does not accept SHOW ENTITIES/SHOW TABLES statements; use execute_sql_dispatch(...)"
267 }
268 (SqlSurface::Explain, SqlLaneKind::Query | SqlLaneKind::Explain) => {
269 "explain_sql requires an EXPLAIN statement"
270 }
271 }
272}
273
274fn compile_sql_command_ignore<E: EntityKind>(sql: &str) -> Result<SqlCommand<E>, QueryError> {
276 compile_sql_command::<E>(sql, MissingRowPolicy::Ignore)
277 .map_err(QueryError::from_sql_lowering_error)
278}
279
280fn lower_prepared_sql_command(
283 prepared: &SqlPreparedStatement,
284 primary_key_field: &str,
285) -> Result<LoweredSqlCommand, QueryError> {
286 lower_sql_command_from_prepared_statement(prepared.prepared.clone(), primary_key_field)
287 .map_err(QueryError::from_sql_lowering_error)
288}
289
290fn bind_prepared_sql_command<E>(
293 prepared: &SqlPreparedStatement,
294 primary_key_field: &str,
295) -> Result<SqlCommand<E>, QueryError>
296where
297 E: EntityKind,
298{
299 let lowered = lower_prepared_sql_command(prepared, primary_key_field)?;
300
301 bind_lowered_sql_command::<E>(lowered, MissingRowPolicy::Ignore)
302 .map_err(QueryError::from_sql_lowering_error)
303}
304
305fn sql_statement_route_from_statement(statement: &SqlStatement) -> SqlStatementRoute {
307 match statement {
308 SqlStatement::Select(select) => SqlStatementRoute::Query {
309 entity: select.entity.clone(),
310 },
311 SqlStatement::Delete(delete) => SqlStatementRoute::Query {
312 entity: delete.entity.clone(),
313 },
314 SqlStatement::Explain(explain) => match &explain.statement {
315 SqlExplainTarget::Select(select) => SqlStatementRoute::Explain {
316 entity: select.entity.clone(),
317 },
318 SqlExplainTarget::Delete(delete) => SqlStatementRoute::Explain {
319 entity: delete.entity.clone(),
320 },
321 },
322 SqlStatement::Describe(describe) => SqlStatementRoute::Describe {
323 entity: describe.entity.clone(),
324 },
325 SqlStatement::ShowIndexes(show_indexes) => SqlStatementRoute::ShowIndexes {
326 entity: show_indexes.entity.clone(),
327 },
328 SqlStatement::ShowColumns(show_columns) => SqlStatementRoute::ShowColumns {
329 entity: show_columns.entity.clone(),
330 },
331 SqlStatement::ShowEntities(_) => SqlStatementRoute::ShowEntities,
332 }
333}
334
335fn resolve_sql_aggregate_target_slot_with_model(
338 model: &'static EntityModel,
339 field: &str,
340) -> Result<FieldSlot, QueryError> {
341 resolve_aggregate_target_field_slot(model, field)
342}
343
344fn resolve_sql_aggregate_target_slot<E: EntityKind>(field: &str) -> Result<FieldSlot, QueryError> {
345 resolve_sql_aggregate_target_slot_with_model(E::MODEL, field)
346}
347
348fn sql_global_aggregate_terminal_to_expr_with_model(
351 model: &'static EntityModel,
352 terminal: &SqlGlobalAggregateTerminal,
353) -> Result<AggregateExpr, QueryError> {
354 match terminal {
355 SqlGlobalAggregateTerminal::CountRows => Ok(count()),
356 SqlGlobalAggregateTerminal::CountField(field) => {
357 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
358
359 Ok(count_by(field.as_str()))
360 }
361 SqlGlobalAggregateTerminal::SumField(field) => {
362 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
363
364 Ok(sum(field.as_str()))
365 }
366 SqlGlobalAggregateTerminal::AvgField(field) => {
367 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
368
369 Ok(avg(field.as_str()))
370 }
371 SqlGlobalAggregateTerminal::MinField(field) => {
372 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
373
374 Ok(min_by(field.as_str()))
375 }
376 SqlGlobalAggregateTerminal::MaxField(field) => {
377 let _ = resolve_sql_aggregate_target_slot_with_model(model, field)?;
378
379 Ok(max_by(field.as_str()))
380 }
381 }
382}
383
384fn sql_global_aggregate_terminal_to_expr<E: EntityKind>(
385 terminal: &SqlGlobalAggregateTerminal,
386) -> Result<AggregateExpr, QueryError> {
387 sql_global_aggregate_terminal_to_expr_with_model(E::MODEL, terminal)
388}
389
390fn projection_label_from_aggregate(aggregate: &AggregateExpr) -> String {
392 let kind = match aggregate.kind() {
393 AggregateKind::Count => "COUNT",
394 AggregateKind::Sum => "SUM",
395 AggregateKind::Avg => "AVG",
396 AggregateKind::Exists => "EXISTS",
397 AggregateKind::First => "FIRST",
398 AggregateKind::Last => "LAST",
399 AggregateKind::Min => "MIN",
400 AggregateKind::Max => "MAX",
401 };
402 let distinct = if aggregate.is_distinct() {
403 "DISTINCT "
404 } else {
405 ""
406 };
407
408 if let Some(field) = aggregate.target_field() {
409 return format!("{kind}({distinct}{field})");
410 }
411
412 format!("{kind}({distinct}*)")
413}
414
415fn projection_label_from_expr(expr: &Expr, ordinal: usize) -> String {
417 match expr {
418 Expr::Field(field) => field.as_str().to_string(),
419 Expr::Aggregate(aggregate) => projection_label_from_aggregate(aggregate),
420 Expr::Alias { name, .. } => name.as_str().to_string(),
421 Expr::Literal(_) | Expr::Unary { .. } | Expr::Binary { .. } => {
422 format!("expr_{ordinal}")
423 }
424 }
425}
426
427fn projection_labels_from_query<E: EntityKind>(
429 query: &Query<E>,
430) -> Result<Vec<String>, QueryError> {
431 let projection = query.plan()?.projection_spec();
432 Ok(projection_labels_from_projection_spec(&projection))
433}
434
435fn projection_labels_from_structural_query(
437 query: &StructuralQuery,
438) -> Result<Vec<String>, QueryError> {
439 let projection = query.build_plan()?.projection_spec(query.model());
440 Ok(projection_labels_from_projection_spec(&projection))
441}
442
443fn projection_labels_from_projection_spec(
446 projection: &crate::db::query::plan::expr::ProjectionSpec,
447) -> Vec<String> {
448 let mut labels = Vec::with_capacity(projection.len());
449
450 for (ordinal, field) in projection.fields().enumerate() {
451 match field {
452 ProjectionField::Scalar {
453 expr: _,
454 alias: Some(alias),
455 } => labels.push(alias.as_str().to_string()),
456 ProjectionField::Scalar { expr, alias: None } => {
457 labels.push(projection_label_from_expr(expr, ordinal));
458 }
459 }
460 }
461
462 labels
463}
464
465fn projection_labels_from_entity_model(model: &'static EntityModel) -> Vec<String> {
467 model
468 .fields
469 .iter()
470 .map(|field| field.name.to_string())
471 .collect()
472}
473
474fn sql_projection_rows_from_kernel_rows(rows: Vec<KernelRow>) -> Vec<Vec<Value>> {
477 rows.into_iter()
478 .map(|row| {
479 row.into_slots()
480 .into_iter()
481 .map(|value| value.unwrap_or(Value::Null))
482 .collect()
483 })
484 .collect()
485}
486
487impl<C: CanisterKind> DbSession<C> {
488 fn execute_structural_sql_projection(
492 &self,
493 query: StructuralQuery,
494 authority: EntityAuthority,
495 ) -> Result<SqlProjectionPayload, QueryError> {
496 let columns = projection_labels_from_structural_query(&query)?;
497 let projected = execute_sql_projection_rows_for_canister(
498 &self.db,
499 self.debug,
500 authority,
501 query.build_plan()?,
502 )
503 .map_err(QueryError::execute)?;
504 let (rows, row_count) = projected.into_parts();
505
506 Ok(SqlProjectionPayload::new(columns, rows, row_count))
507 }
508
509 fn execute_typed_sql_projection<E>(
512 &self,
513 query: &Query<E>,
514 ) -> Result<SqlProjectionPayload, QueryError>
515 where
516 E: PersistedRow<Canister = C> + EntityValue,
517 {
518 let columns = projection_labels_from_query(query)?;
519 let projected = execute_sql_projection_rows_for_canister(
520 &self.db,
521 self.debug,
522 EntityAuthority::for_type::<E>(),
523 query.plan()?.into_inner(),
524 )
525 .map_err(QueryError::execute)?;
526 let (rows, row_count) = projected.into_parts();
527
528 Ok(SqlProjectionPayload::new(columns, rows, row_count))
529 }
530
531 fn execute_sql_dispatch_query_from_command<E>(
533 &self,
534 command: SqlCommand<E>,
535 ) -> Result<SqlDispatchResult, QueryError>
536 where
537 E: PersistedRow<Canister = C> + EntityValue,
538 {
539 match command {
540 SqlCommand::Query(query) => {
541 Self::ensure_sql_query_grouping(&query, false)?;
542
543 match query.mode() {
544 QueryMode::Load(_) => self
545 .execute_typed_sql_projection(&query)
546 .map(SqlProjectionPayload::into_dispatch_result),
547 QueryMode::Delete(_) => self.execute_typed_sql_delete(&query),
548 }
549 }
550 SqlCommand::Explain { .. } | SqlCommand::ExplainGlobalAggregate { .. } => {
551 Err(QueryError::unsupported_query(unsupported_sql_lane_message(
552 SqlSurface::QueryFrom,
553 SqlLaneKind::Explain,
554 )))
555 }
556 SqlCommand::DescribeEntity
557 | SqlCommand::ShowIndexesEntity
558 | SqlCommand::ShowColumnsEntity
559 | SqlCommand::ShowEntities => Err(QueryError::unsupported_query_lane_dispatch()),
560 }
561 }
562
563 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
566 where
567 E: PersistedRow<Canister = C> + EntityValue,
568 {
569 let plan = query.plan()?.into_executable();
570 let deleted = self
571 .with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
572 .map_err(QueryError::execute)?;
573 let (rows, row_count) = deleted.into_parts();
574 let rows = sql_projection_rows_from_kernel_rows(rows);
575
576 Ok(SqlProjectionPayload::new(
577 projection_labels_from_entity_model(E::MODEL),
578 rows,
579 row_count,
580 )
581 .into_dispatch_result())
582 }
583
584 fn ensure_sql_query_grouping<E>(query: &Query<E>, grouped: bool) -> Result<(), QueryError>
587 where
588 E: EntityKind,
589 {
590 match (grouped, query.has_grouping()) {
591 (true, true) | (false, false) => Ok(()),
592 (false, true) => Err(QueryError::grouped_requires_execute_grouped()),
593 (true, false) => Err(QueryError::unsupported_query(
594 "execute_sql_grouped requires grouped SQL query intent",
595 )),
596 }
597 }
598
599 fn execute_lowered_sql_dispatch_select_core(
602 &self,
603 select: &LoweredSelectShape,
604 authority: EntityAuthority,
605 ) -> Result<SqlDispatchResult, QueryError> {
606 let structural = apply_lowered_select_shape(
607 StructuralQuery::new(authority.model(), MissingRowPolicy::Ignore),
608 select.clone(),
609 )
610 .map_err(QueryError::from_sql_lowering_error)?;
611
612 self.execute_structural_sql_projection(structural, authority)
613 .map(SqlProjectionPayload::into_dispatch_result)
614 }
615
616 fn execute_lowered_sql_dispatch_query_lane_for_authority(
619 &self,
620 lowered: &LoweredSqlCommand,
621 authority: EntityAuthority,
622 ) -> Result<SqlDispatchResult, QueryError> {
623 match session_sql_lane(lowered) {
624 SqlLaneKind::Query => {
625 self.execute_lowered_sql_dispatch_query_for_authority(lowered, authority)
626 }
627 SqlLaneKind::Explain => self
628 .explain_lowered_sql_dispatch_for_model(lowered, authority.model())
629 .map(SqlDispatchResult::Explain),
630 SqlLaneKind::Describe
631 | SqlLaneKind::ShowIndexes
632 | SqlLaneKind::ShowColumns
633 | SqlLaneKind::ShowEntities => Err(QueryError::invariant(
634 "query-lane SQL lowering admitted unsupported non-query lane",
635 )),
636 }
637 }
638
639 fn execute_sql_dispatch_explain_from_command<E>(
641 command: SqlCommand<E>,
642 ) -> Result<String, QueryError>
643 where
644 E: PersistedRow<Canister = C> + EntityValue,
645 {
646 Self::explain_sql_from_command::<E>(command, SqlLaneKind::Explain)
647 }
648
649 fn explain_sql_from_command<E>(
651 command: SqlCommand<E>,
652 lane: SqlLaneKind,
653 ) -> Result<String, QueryError>
654 where
655 E: PersistedRow<Canister = C> + EntityValue,
656 {
657 match command {
658 SqlCommand::Query(_)
659 | SqlCommand::DescribeEntity
660 | SqlCommand::ShowIndexesEntity
661 | SqlCommand::ShowColumnsEntity
662 | SqlCommand::ShowEntities => Err(QueryError::unsupported_query(
663 unsupported_sql_lane_message(SqlSurface::Explain, lane),
664 )),
665 SqlCommand::Explain { mode, query } => match mode {
666 SqlExplainMode::Plan => Ok(query.explain()?.render_text_canonical()),
667 SqlExplainMode::Execution => query.explain_execution_text(),
668 SqlExplainMode::Json => Ok(query.explain()?.render_json_canonical()),
669 },
670 SqlCommand::ExplainGlobalAggregate { mode, command } => {
671 Self::explain_sql_global_aggregate::<E>(mode, command)
672 }
673 }
674 }
675
676 pub fn parse_sql_statement(&self, sql: &str) -> Result<SqlParsedStatement, QueryError> {
680 let statement = parse_sql(sql).map_err(QueryError::from_sql_parse_error)?;
681 let route = sql_statement_route_from_statement(&statement);
682
683 Ok(SqlParsedStatement { statement, route })
684 }
685
686 pub fn sql_statement_route(&self, sql: &str) -> Result<SqlStatementRoute, QueryError> {
691 let parsed = self.parse_sql_statement(sql)?;
692
693 Ok(parsed.route().clone())
694 }
695
696 pub fn prepare_sql_dispatch_parsed(
701 &self,
702 parsed: &SqlParsedStatement,
703 expected_entity: &'static str,
704 ) -> Result<SqlPreparedStatement, QueryError> {
705 let prepared = prepare_sql_statement(parsed.statement.clone(), expected_entity)
706 .map_err(QueryError::from_sql_lowering_error)?;
707
708 Ok(SqlPreparedStatement { prepared })
709 }
710
711 pub fn query_from_sql<E>(&self, sql: &str) -> Result<Query<E>, QueryError>
716 where
717 E: EntityKind<Canister = C>,
718 {
719 let command = compile_sql_command_ignore::<E>(sql)?;
720 let lane = sql_command_lane(&command);
721
722 match command {
723 SqlCommand::Query(query) => Ok(query),
724 SqlCommand::Explain { .. }
725 | SqlCommand::ExplainGlobalAggregate { .. }
726 | SqlCommand::DescribeEntity
727 | SqlCommand::ShowIndexesEntity
728 | SqlCommand::ShowColumnsEntity
729 | SqlCommand::ShowEntities => Err(QueryError::unsupported_query(
730 unsupported_sql_lane_message(SqlSurface::QueryFrom, lane),
731 )),
732 }
733 }
734
735 pub fn execute_sql<E>(&self, sql: &str) -> Result<EntityResponse<E>, QueryError>
737 where
738 E: PersistedRow<Canister = C> + EntityValue,
739 {
740 let query = self.query_from_sql::<E>(sql)?;
741 Self::ensure_sql_query_grouping(&query, false)?;
742
743 self.execute_query(&query)
744 }
745
746 pub fn execute_sql_aggregate<E>(&self, sql: &str) -> Result<Value, QueryError>
751 where
752 E: PersistedRow<Canister = C> + EntityValue,
753 {
754 let command = compile_sql_global_aggregate_command::<E>(sql, MissingRowPolicy::Ignore)
755 .map_err(QueryError::from_sql_lowering_error)?;
756
757 match command.terminal() {
758 SqlGlobalAggregateTerminal::CountRows => self
759 .execute_load_query_with(command.query(), |load, plan| {
760 load.execute_scalar_terminal_request(
761 plan,
762 crate::db::executor::ScalarTerminalBoundaryRequest::Count,
763 )?
764 .into_count()
765 })
766 .map(|count| Value::Uint(u64::from(count))),
767 SqlGlobalAggregateTerminal::CountField(field) => {
768 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
769 self.execute_load_query_with(command.query(), |load, plan| {
770 load.execute_scalar_projection_boundary(
771 plan,
772 target_slot,
773 ScalarProjectionBoundaryRequest::Values,
774 )?
775 .into_values()
776 })
777 .map(|values| {
778 let count = values
779 .into_iter()
780 .filter(|value| !matches!(value, Value::Null))
781 .count();
782 Value::Uint(u64::try_from(count).unwrap_or(u64::MAX))
783 })
784 }
785 SqlGlobalAggregateTerminal::SumField(field) => {
786 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
787 self.execute_load_query_with(command.query(), |load, plan| {
788 load.execute_numeric_field_boundary(
789 plan,
790 target_slot,
791 ScalarNumericFieldBoundaryRequest::Sum,
792 )
793 })
794 .map(|value| value.map_or(Value::Null, Value::Decimal))
795 }
796 SqlGlobalAggregateTerminal::AvgField(field) => {
797 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
798 self.execute_load_query_with(command.query(), |load, plan| {
799 load.execute_numeric_field_boundary(
800 plan,
801 target_slot,
802 ScalarNumericFieldBoundaryRequest::Avg,
803 )
804 })
805 .map(|value| value.map_or(Value::Null, Value::Decimal))
806 }
807 SqlGlobalAggregateTerminal::MinField(field) => {
808 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
809 let min_id = self.execute_load_query_with(command.query(), |load, plan| {
810 load.execute_scalar_terminal_request(
811 plan,
812 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
813 kind: AggregateKind::Min,
814 target_field: target_slot,
815 },
816 )?
817 .into_id()
818 })?;
819
820 match min_id {
821 Some(id) => self
822 .load::<E>()
823 .by_id(id)
824 .first_value_by(field)
825 .map(|value| value.unwrap_or(Value::Null)),
826 None => Ok(Value::Null),
827 }
828 }
829 SqlGlobalAggregateTerminal::MaxField(field) => {
830 let target_slot = resolve_sql_aggregate_target_slot::<E>(field)?;
831 let max_id = self.execute_load_query_with(command.query(), |load, plan| {
832 load.execute_scalar_terminal_request(
833 plan,
834 crate::db::executor::ScalarTerminalBoundaryRequest::IdBySlot {
835 kind: AggregateKind::Max,
836 target_field: target_slot,
837 },
838 )?
839 .into_id()
840 })?;
841
842 match max_id {
843 Some(id) => self
844 .load::<E>()
845 .by_id(id)
846 .first_value_by(field)
847 .map(|value| value.unwrap_or(Value::Null)),
848 None => Ok(Value::Null),
849 }
850 }
851 }
852 }
853
854 pub fn execute_sql_grouped<E>(
856 &self,
857 sql: &str,
858 cursor_token: Option<&str>,
859 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
860 where
861 E: PersistedRow<Canister = C> + EntityValue,
862 {
863 let query = self.query_from_sql::<E>(sql)?;
864 Self::ensure_sql_query_grouping(&query, true)?;
865
866 self.execute_grouped(&query, cursor_token)
867 }
868
869 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
871 where
872 E: PersistedRow<Canister = C> + EntityValue,
873 {
874 let parsed = self.parse_sql_statement(sql)?;
875
876 self.execute_sql_dispatch_parsed::<E>(&parsed)
877 }
878
879 pub fn execute_sql_dispatch_parsed<E>(
881 &self,
882 parsed: &SqlParsedStatement,
883 ) -> Result<SqlDispatchResult, QueryError>
884 where
885 E: PersistedRow<Canister = C> + EntityValue,
886 {
887 let prepared = self.prepare_sql_dispatch_parsed(parsed, E::MODEL.name())?;
888
889 self.execute_sql_dispatch_prepared::<E>(&prepared)
890 }
891
892 pub fn execute_sql_dispatch_prepared<E>(
894 &self,
895 prepared: &SqlPreparedStatement,
896 ) -> Result<SqlDispatchResult, QueryError>
897 where
898 E: PersistedRow<Canister = C> + EntityValue,
899 {
900 let command = bind_prepared_sql_command::<E>(prepared, E::MODEL.primary_key.name)?;
901
902 match command {
903 SqlCommand::Query(_) => self.execute_sql_dispatch_query_from_command::<E>(command),
904 SqlCommand::Explain { .. } | SqlCommand::ExplainGlobalAggregate { .. } => {
905 Self::execute_sql_dispatch_explain_from_command::<E>(command)
906 .map(SqlDispatchResult::Explain)
907 }
908 SqlCommand::DescribeEntity => {
909 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
910 }
911 SqlCommand::ShowIndexesEntity => {
912 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
913 }
914 SqlCommand::ShowColumnsEntity => {
915 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
916 }
917 SqlCommand::ShowEntities => Ok(SqlDispatchResult::ShowEntities(self.show_entities())),
918 }
919 }
920
921 pub fn execute_sql_dispatch_query_lane_prepared<E>(
923 &self,
924 prepared: &SqlPreparedStatement,
925 ) -> Result<SqlDispatchResult, QueryError>
926 where
927 E: PersistedRow<Canister = C> + EntityValue,
928 {
929 let lowered =
930 self.lower_sql_dispatch_query_lane_prepared(prepared, E::MODEL.primary_key.name)?;
931
932 self.execute_lowered_sql_dispatch_query_lane_for_authority(
933 &lowered,
934 EntityAuthority::for_type::<E>(),
935 )
936 }
937
938 pub fn lower_sql_dispatch_query_lane_prepared(
940 &self,
941 prepared: &SqlPreparedStatement,
942 primary_key_field: &str,
943 ) -> Result<LoweredSqlCommand, QueryError> {
944 let lowered = lower_prepared_sql_command(prepared, primary_key_field)?;
945 let lane = lowered_sql_command_lane(&lowered);
946
947 match lane {
948 LoweredSqlLaneKind::Query | LoweredSqlLaneKind::Explain => Ok(lowered),
949 LoweredSqlLaneKind::Describe
950 | LoweredSqlLaneKind::ShowIndexes
951 | LoweredSqlLaneKind::ShowColumns
952 | LoweredSqlLaneKind::ShowEntities => {
953 Err(QueryError::unsupported_query_lane_dispatch())
954 }
955 }
956 }
957
958 fn execute_lowered_sql_dispatch_delete_core(
961 &self,
962 delete: &LoweredBaseQueryShape,
963 authority: EntityAuthority,
964 ) -> Result<SqlDispatchResult, QueryError> {
965 let structural = bind_lowered_sql_delete_query_structural(
966 authority.model(),
967 delete.clone(),
968 MissingRowPolicy::Ignore,
969 );
970 let deleted = execute_sql_delete_projection_for_canister(
971 &self.db,
972 authority,
973 structural.build_plan()?,
974 )
975 .map_err(QueryError::execute)?;
976 let (rows, row_count) = deleted.into_parts();
977 let rows = sql_projection_rows_from_kernel_rows(rows);
978
979 Ok(SqlProjectionPayload::new(
980 projection_labels_from_entity_model(authority.model()),
981 rows,
982 row_count,
983 )
984 .into_dispatch_result())
985 }
986
987 #[doc(hidden)]
989 pub fn execute_lowered_sql_dispatch_query_for_authority(
990 &self,
991 lowered: &LoweredSqlCommand,
992 authority: EntityAuthority,
993 ) -> Result<SqlDispatchResult, QueryError> {
994 self.execute_lowered_sql_dispatch_query_core(lowered, authority)
995 }
996
997 fn execute_lowered_sql_dispatch_query_core(
1000 &self,
1001 lowered: &LoweredSqlCommand,
1002 authority: EntityAuthority,
1003 ) -> Result<SqlDispatchResult, QueryError> {
1004 let Some(query) = lowered.query() else {
1005 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
1006 SqlSurface::QueryFrom,
1007 session_sql_lane(lowered),
1008 )));
1009 };
1010
1011 match query {
1012 LoweredSqlQuery::Select(select) => {
1013 self.execute_lowered_sql_dispatch_select_core(select, authority)
1014 }
1015 LoweredSqlQuery::Delete(delete) => {
1016 self.execute_lowered_sql_dispatch_delete_core(delete, authority)
1017 }
1018 }
1019 }
1020
1021 #[doc(hidden)]
1023 pub fn explain_lowered_sql_dispatch_for_model(
1024 &self,
1025 lowered: &LoweredSqlCommand,
1026 model: &'static EntityModel,
1027 ) -> Result<String, QueryError> {
1028 Self::explain_lowered_sql_dispatch_core(lowered, model)
1029 }
1030
1031 fn explain_lowered_sql_dispatch_core(
1034 lowered: &LoweredSqlCommand,
1035 model: &'static EntityModel,
1036 ) -> Result<String, QueryError> {
1037 let lane = session_sql_lane(lowered);
1040 if lane != SqlLaneKind::Explain {
1041 return Err(QueryError::unsupported_query(unsupported_sql_lane_message(
1042 SqlSurface::Explain,
1043 lane,
1044 )));
1045 }
1046
1047 if let Some(rendered) =
1050 render_lowered_sql_explain_plan_or_json(lowered, model, MissingRowPolicy::Ignore)
1051 .map_err(QueryError::from_sql_lowering_error)?
1052 {
1053 return Ok(rendered);
1054 }
1055
1056 if let Some((mode, command)) = bind_lowered_sql_explain_global_aggregate_structural(
1059 lowered,
1060 model,
1061 MissingRowPolicy::Ignore,
1062 ) {
1063 return Self::explain_sql_global_aggregate_structural(mode, command);
1064 }
1065
1066 Err(QueryError::unsupported_query(
1067 "shared EXPLAIN dispatch could not classify the lowered SQL command shape",
1068 ))
1069 }
1070
1071 fn explain_sql_global_aggregate<E>(
1073 mode: SqlExplainMode,
1074 command: SqlGlobalAggregateCommand<E>,
1075 ) -> Result<String, QueryError>
1076 where
1077 E: PersistedRow<Canister = C> + EntityValue,
1078 {
1079 match mode {
1080 SqlExplainMode::Plan => {
1081 let _ = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
1084
1085 Ok(command.query().explain()?.render_text_canonical())
1086 }
1087 SqlExplainMode::Execution => {
1088 let aggregate = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
1089 let plan = Self::explain_load_query_terminal_with(command.query(), aggregate)?;
1090
1091 Ok(plan.execution_node_descriptor().render_text_tree())
1092 }
1093 SqlExplainMode::Json => {
1094 let _ = sql_global_aggregate_terminal_to_expr::<E>(command.terminal())?;
1097
1098 Ok(command.query().explain()?.render_json_canonical())
1099 }
1100 }
1101 }
1102
1103 fn explain_sql_global_aggregate_structural(
1106 mode: SqlExplainMode,
1107 command: StructuralSqlGlobalAggregateCommand,
1108 ) -> Result<String, QueryError> {
1109 let model = command.query().model();
1110
1111 match mode {
1112 SqlExplainMode::Plan => {
1113 let _ =
1114 sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
1115
1116 Ok(command
1117 .query()
1118 .build_plan()?
1119 .explain_with_model(model)
1120 .render_text_canonical())
1121 }
1122 SqlExplainMode::Execution => {
1123 let aggregate =
1124 sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
1125 let plan = command.query().explain_aggregate_terminal(aggregate)?;
1126
1127 Ok(plan.execution_node_descriptor().render_text_tree())
1128 }
1129 SqlExplainMode::Json => {
1130 let _ =
1131 sql_global_aggregate_terminal_to_expr_with_model(model, command.terminal())?;
1132
1133 Ok(command
1134 .query()
1135 .build_plan()?
1136 .explain_with_model(model)
1137 .render_json_canonical())
1138 }
1139 }
1140 }
1141}