1mod computed;
8mod lowered;
9
10use crate::{
11 db::{
12 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13 data::UpdatePatch,
14 executor::{EntityAuthority, MutationMode},
15 identifiers_tail_match,
16 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17 session::sql::{
18 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
19 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
20 computed_projection,
21 projection::{
22 SqlProjectionPayload, execute_sql_projection_rows_for_canister,
23 execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
24 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25 },
26 },
27 sql::lowering::{
28 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
29 bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
30 },
31 sql::parser::{
32 SqlAggregateCall, SqlAggregateKind, SqlInsertStatement, SqlOrderDirection,
33 SqlProjection, SqlSelectItem, SqlStatement, SqlTextFunction, SqlUpdateStatement,
34 },
35 },
36 model::{entity::resolve_field_slot, field::FieldKind},
37 traits::{CanisterKind, EntityKind, EntityValue},
38 types::{Timestamp, Ulid},
39 value::Value,
40};
41
42#[cfg(feature = "perf-attribution")]
43pub use lowered::LoweredSqlDispatchExecutorAttribution;
44
45#[doc(hidden)]
54pub struct GeneratedSqlDispatchAttempt {
55 entity_name: &'static str,
56 explain_order_field: Option<&'static str>,
57 result: Result<SqlDispatchResult, QueryError>,
58}
59
60impl GeneratedSqlDispatchAttempt {
61 const fn new(
63 entity_name: &'static str,
64 explain_order_field: Option<&'static str>,
65 result: Result<SqlDispatchResult, QueryError>,
66 ) -> Self {
67 Self {
68 entity_name,
69 explain_order_field,
70 result,
71 }
72 }
73
74 #[must_use]
76 pub const fn entity_name(&self) -> &'static str {
77 self.entity_name
78 }
79
80 #[must_use]
82 pub const fn explain_order_field(&self) -> Option<&'static str> {
83 self.explain_order_field
84 }
85
86 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
88 self.result
89 }
90}
91
92#[derive(Clone, Copy, Debug, Eq, PartialEq)]
93pub(in crate::db::session::sql) enum SqlGroupingSurface {
94 Scalar,
95 Grouped,
96}
97
98const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
99 match surface {
100 SqlGroupingSurface::Scalar => {
101 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
102 }
103 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
104 }
105}
106
107fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
110 let sql_trimmed = sql.trim();
111 if sql_trimmed.is_empty() {
112 return Err(QueryError::unsupported_query(
113 "query endpoint requires a non-empty SQL string",
114 ));
115 }
116
117 Ok(sql_trimmed)
118}
119
120fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
123 let mut entities = Vec::with_capacity(authorities.len());
124
125 for authority in authorities {
126 entities.push(authority.model().name().to_string());
127 }
128
129 entities
130}
131
132fn sql_projection_labels_from_select_statement(
135 statement: &SqlStatement,
136) -> Result<Option<Vec<String>>, QueryError> {
137 let SqlStatement::Select(select) = statement else {
138 return Err(QueryError::invariant(
139 "SQL projection labels require SELECT statement shape",
140 ));
141 };
142 let SqlProjection::Items(items) = &select.projection else {
143 return Ok(None);
144 };
145
146 Ok(Some(
147 items
148 .iter()
149 .enumerate()
150 .map(|(index, item)| {
151 select
152 .projection_alias(index)
153 .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
154 })
155 .collect(),
156 ))
157}
158
159fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
162 match item {
163 SqlSelectItem::Field(field) => field.clone(),
164 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
165 SqlSelectItem::TextFunction(call) => {
166 format!(
167 "{}({})",
168 grouped_sql_text_function_name(call.function),
169 call.field
170 )
171 }
172 }
173}
174
175fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
178 let SqlStatement::Select(select) = statement else {
179 return None;
180 };
181
182 select.projection_alias(0).map(str::to_string)
183}
184
185fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
187 let kind = match aggregate.kind {
188 SqlAggregateKind::Count => "COUNT",
189 SqlAggregateKind::Sum => "SUM",
190 SqlAggregateKind::Avg => "AVG",
191 SqlAggregateKind::Min => "MIN",
192 SqlAggregateKind::Max => "MAX",
193 };
194
195 match aggregate.field.as_deref() {
196 Some(field) => format!("{kind}({field})"),
197 None => format!("{kind}(*)"),
198 }
199}
200
201const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
204 match function {
205 SqlTextFunction::Trim => "TRIM",
206 SqlTextFunction::Ltrim => "LTRIM",
207 SqlTextFunction::Rtrim => "RTRIM",
208 SqlTextFunction::Lower => "LOWER",
209 SqlTextFunction::Upper => "UPPER",
210 SqlTextFunction::Length => "LENGTH",
211 SqlTextFunction::Left => "LEFT",
212 SqlTextFunction::Right => "RIGHT",
213 SqlTextFunction::StartsWith => "STARTS_WITH",
214 SqlTextFunction::EndsWith => "ENDS_WITH",
215 SqlTextFunction::Contains => "CONTAINS",
216 SqlTextFunction::Position => "POSITION",
217 SqlTextFunction::Replace => "REPLACE",
218 SqlTextFunction::Substring => "SUBSTRING",
219 }
220}
221
222fn authority_for_generated_sql_route(
224 route: &SqlStatementRoute,
225 authorities: &[EntityAuthority],
226) -> Result<EntityAuthority, QueryError> {
227 let sql_entity = route.entity();
228
229 for authority in authorities {
230 if identifiers_tail_match(sql_entity, authority.model().name()) {
231 return Ok(*authority);
232 }
233 }
234
235 Err(unsupported_generated_sql_entity_error(
236 sql_entity,
237 authorities,
238 ))
239}
240
241fn unsupported_generated_sql_entity_error(
244 entity_name: &str,
245 authorities: &[EntityAuthority],
246) -> QueryError {
247 let mut supported = String::new();
248
249 for (index, authority) in authorities.iter().enumerate() {
250 if index != 0 {
251 supported.push_str(", ");
252 }
253
254 supported.push_str(authority.model().name());
255 }
256
257 QueryError::unsupported_query(format!(
258 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
259 ))
260}
261
262fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
265where
266 E: EntityKind,
267{
268 if identifiers_tail_match(sql_entity, E::MODEL.name()) {
269 return Ok(());
270 }
271
272 Err(QueryError::from_sql_lowering_error(
273 SqlLoweringError::EntityMismatch {
274 sql_entity: sql_entity.to_string(),
275 expected_entity: E::MODEL.name(),
276 },
277 ))
278}
279
280fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
283where
284 E: EntityKind,
285{
286 if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
287 return Ok(key);
288 }
289
290 let widened = match value {
291 Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
292 Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
293 _ => {
294 return Err(QueryError::unsupported_query(format!(
295 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
296 )));
297 }
298 };
299
300 <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
301 QueryError::unsupported_query(format!(
302 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
303 ))
304 })
305}
306
307fn sql_write_generated_primary_key_value<E>() -> Option<Value>
311where
312 E: EntityKind,
313{
314 matches!(E::MODEL.primary_key.kind(), FieldKind::Ulid).then(|| Value::Ulid(Ulid::generate()))
315}
316
317fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
320where
321 E: EntityKind,
322{
323 let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
324 QueryError::invariant("SQL write field must resolve against the target entity model")
325 })?;
326 let field_kind = E::MODEL.fields()[field_slot].kind();
327
328 let normalized = match (field_kind, value) {
329 (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
330 (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
331 Value::Int(v.cast_signed())
332 }
333 _ => value.clone(),
334 };
335
336 Ok(normalized)
337}
338
339fn sql_write_system_timestamp_fields<E>() -> Option<(&'static str, &'static str)>
343where
344 E: EntityKind,
345{
346 if resolve_field_slot(E::MODEL, "created_at").is_some()
347 && resolve_field_slot(E::MODEL, "updated_at").is_some()
348 {
349 return Some(("created_at", "updated_at"));
350 }
351
352 None
353}
354
355fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
360where
361 E: EntityKind,
362{
363 if !statement.columns.is_empty() {
364 return statement.columns.clone();
365 }
366
367 let timestamp_fields = sql_write_system_timestamp_fields::<E>();
368
369 let columns: Vec<String> = E::MODEL
370 .fields()
371 .iter()
372 .filter(|field| {
373 !matches!(
374 timestamp_fields,
375 Some((created_at, updated_at))
376 if field.name() == created_at || field.name() == updated_at
377 )
378 })
379 .map(|field| field.name().to_string())
380 .collect();
381
382 let pk_name = E::MODEL.primary_key.name;
383 if sql_write_generated_primary_key_value::<E>().is_none() {
384 return columns;
385 }
386
387 let generated_key_omitted_columns: Vec<String> = columns
388 .iter()
389 .filter(|field| field.as_str() != pk_name)
390 .cloned()
391 .collect();
392 let first_width = statement.values.first().map(Vec::len);
393
394 if first_width == Some(generated_key_omitted_columns.len()) {
395 return generated_key_omitted_columns;
396 }
397
398 columns
399}
400
401fn validate_sql_insert_tuple_lengths(
404 columns: &[String],
405 values: &[Vec<Value>],
406) -> Result<(), QueryError> {
407 for tuple in values {
408 if tuple.len() != columns.len() {
409 return Err(QueryError::from_sql_parse_error(
410 crate::db::sql::parser::SqlParseError::invalid_syntax(
411 "INSERT column list and VALUES tuple length must match",
412 ),
413 ));
414 }
415 }
416
417 Ok(())
418}
419
420impl<C: CanisterKind> DbSession<C> {
421 fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
424 where
425 E: PersistedRow<Canister = C> + EntityValue,
426 {
427 let mut row = Vec::with_capacity(E::MODEL.fields().len());
428
429 for index in 0..E::MODEL.fields().len() {
430 let value = entity.get_value_by_index(index).ok_or_else(|| {
431 QueryError::invariant(
432 "SQL write dispatch projection row must include every declared field",
433 )
434 })?;
435 row.push(value);
436 }
437
438 Ok(row)
439 }
440
441 fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
445 where
446 E: PersistedRow<Canister = C> + EntityValue,
447 {
448 let columns = projection_labels_from_fields(E::MODEL.fields());
449 let rows = entities
450 .into_iter()
451 .map(Self::sql_write_dispatch_row)
452 .collect::<Result<Vec<_>, _>>()?;
453 let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
454
455 Ok(SqlDispatchResult::Projection {
456 columns,
457 rows,
458 row_count,
459 })
460 }
461
462 fn sql_insert_patch_and_key<E>(
465 columns: &[String],
466 values: &[Value],
467 ) -> Result<(E::Key, UpdatePatch), QueryError>
468 where
469 E: PersistedRow<Canister = C> + EntityValue,
470 {
471 let pk_name = E::MODEL.primary_key.name;
475 let generated_pk = sql_write_generated_primary_key_value::<E>();
476 let (key, generated_pk_value) =
477 if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
478 let pk_value = values.get(pk_index).ok_or_else(|| {
479 QueryError::invariant(
480 "INSERT primary key column must align with one VALUES literal",
481 )
482 })?;
483 (sql_write_key_from_literal::<E>(pk_value, pk_name)?, None)
484 } else if let Some(pk_value) = generated_pk {
485 (
486 sql_write_key_from_literal::<E>(&pk_value, pk_name)?,
487 Some(pk_value),
488 )
489 } else {
490 return Err(QueryError::unsupported_query(format!(
491 "SQL INSERT requires primary key column '{pk_name}' in this release"
492 )));
493 };
494
495 let mut patch = UpdatePatch::new();
498 if let Some(pk_value) = generated_pk_value {
499 patch = patch
500 .set_field(E::MODEL, pk_name, pk_value)
501 .map_err(QueryError::execute)?;
502 }
503 for (field, value) in columns.iter().zip(values.iter()) {
504 let normalized = sql_write_value_for_field::<E>(field, value)?;
505 patch = patch
506 .set_field(E::MODEL, field, normalized)
507 .map_err(QueryError::execute)?;
508 }
509
510 if let Some((created_at, updated_at)) = sql_write_system_timestamp_fields::<E>() {
513 let now = Value::Timestamp(Timestamp::now());
514 patch = patch
515 .set_field(E::MODEL, created_at, now.clone())
516 .map_err(QueryError::execute)?;
517 patch = patch
518 .set_field(E::MODEL, updated_at, now)
519 .map_err(QueryError::execute)?;
520 }
521
522 Ok((key, patch))
523 }
524
525 fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
528 where
529 E: PersistedRow<Canister = C> + EntityValue,
530 {
531 let pk_name = E::MODEL.primary_key.name;
534 let mut patch = UpdatePatch::new();
535 for assignment in &statement.assignments {
536 if assignment.field == pk_name {
537 return Err(QueryError::unsupported_query(format!(
538 "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
539 )));
540 }
541 let normalized =
542 sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
543
544 patch = patch
545 .set_field(E::MODEL, assignment.field.as_str(), normalized)
546 .map_err(QueryError::execute)?;
547 }
548
549 if let Some((_, updated_at)) = sql_write_system_timestamp_fields::<E>() {
552 patch = patch
553 .set_field(E::MODEL, updated_at, Value::Timestamp(Timestamp::now()))
554 .map_err(QueryError::execute)?;
555 }
556
557 Ok(patch)
558 }
559
560 fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
562 where
563 E: PersistedRow<Canister = C> + EntityValue,
564 {
565 let Some(predicate) = statement.predicate.clone() else {
569 return Err(QueryError::unsupported_query(
570 "SQL UPDATE requires WHERE predicate in this release",
571 ));
572 };
573 let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
574 let pk_name = E::MODEL.primary_key.name;
575 let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
576
577 if statement.order_by.is_empty() {
581 selector = selector.order_by(pk_name);
582 } else {
583 let mut orders_primary_key = false;
584
585 for term in &statement.order_by {
586 if term.field == pk_name {
587 orders_primary_key = true;
588 }
589 selector = match term.direction {
590 SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
591 SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
592 };
593 }
594
595 if !orders_primary_key {
596 selector = selector.order_by(pk_name);
597 }
598 }
599
600 if let Some(limit) = statement.limit {
603 selector = selector.limit(limit);
604 }
605 if let Some(offset) = statement.offset {
606 selector = selector.offset(offset);
607 }
608
609 Ok(selector)
610 }
611
612 fn execute_sql_insert_dispatch<E>(
615 &self,
616 statement: &SqlInsertStatement,
617 ) -> Result<SqlDispatchResult, QueryError>
618 where
619 E: PersistedRow<Canister = C> + EntityValue,
620 {
621 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
622 let columns = sql_insert_columns::<E>(statement);
623 validate_sql_insert_tuple_lengths(columns.as_slice(), statement.values.as_slice())?;
624 let mut entities = Vec::with_capacity(statement.values.len());
625
626 for values in &statement.values {
627 let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
628 let entity = self
629 .mutate_structural::<E>(key, patch, MutationMode::Insert)
630 .map_err(QueryError::execute)?;
631 entities.push(entity);
632 }
633
634 Self::sql_write_dispatch_projection(entities)
635 }
636
637 fn execute_sql_update_dispatch<E>(
641 &self,
642 statement: &SqlUpdateStatement,
643 ) -> Result<SqlDispatchResult, QueryError>
644 where
645 E: PersistedRow<Canister = C> + EntityValue,
646 {
647 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
648 let selector = Self::sql_update_selector_query::<E>(statement)?;
649 let patch = Self::sql_update_patch::<E>(statement)?;
650 let matched = self.execute_query(&selector)?;
651 let mut entities = Vec::with_capacity(matched.len());
652
653 for entity in matched.entities() {
656 let updated = self
657 .mutate_structural::<E>(entity.id().key(), patch.clone(), MutationMode::Update)
658 .map_err(QueryError::execute)?;
659 entities.push(updated);
660 }
661
662 Self::sql_write_dispatch_projection(entities)
663 }
664
665 fn prepare_structural_sql_projection_execution(
668 &self,
669 query: StructuralQuery,
670 authority: EntityAuthority,
671 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
672 let (_, plan) =
675 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
676 let projection = plan.projection_spec(authority.model());
677 let columns = projection_labels_from_projection_spec(&projection);
678
679 Ok((columns, plan))
680 }
681
682 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
686 &self,
687 query: StructuralQuery,
688 authority: EntityAuthority,
689 ) -> Result<SqlProjectionPayload, QueryError> {
690 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
692
693 let projected =
696 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
697 .map_err(QueryError::execute)?;
698 let (rows, row_count) = projected.into_parts();
699
700 Ok(SqlProjectionPayload::new(columns, rows, row_count))
701 }
702
703 fn execute_structural_sql_projection_text(
707 &self,
708 query: StructuralQuery,
709 authority: EntityAuthority,
710 ) -> Result<SqlDispatchResult, QueryError> {
711 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
713
714 let projected =
717 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
718 .map_err(QueryError::execute)?;
719 let (rows, row_count) = projected.into_parts();
720
721 Ok(SqlDispatchResult::ProjectionText {
722 columns,
723 rows,
724 row_count,
725 })
726 }
727
728 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
732 where
733 E: PersistedRow<Canister = C> + EntityValue,
734 {
735 let plan = self
736 .compile_query_with_visible_indexes(query)?
737 .into_prepared_execution_plan();
738 let deleted = self
739 .with_metrics(|| {
740 self.delete_executor::<E>()
741 .execute_structural_projection(plan)
742 })
743 .map_err(QueryError::execute)?;
744 let (rows, row_count) = deleted.into_parts();
745 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
746
747 Ok(SqlProjectionPayload::new(
748 projection_labels_from_fields(E::MODEL.fields()),
749 rows,
750 row_count,
751 )
752 .into_dispatch_result())
753 }
754
755 fn lowered_sql_query_dispatch_inputs_for_authority(
758 parsed: &SqlParsedStatement,
759 authority: EntityAuthority,
760 unsupported_message: &'static str,
761 ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
762 let lowered = parsed.lower_query_lane_for_entity(
763 authority.model().name(),
764 authority.model().primary_key.name,
765 )?;
766 let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
767 .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
768 .transpose()?;
769 let query = lowered
770 .into_query()
771 .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
772
773 Ok((query, projection_columns.flatten()))
774 }
775
776 fn dispatch_sql_query_route_for_authority(
780 &self,
781 parsed: &SqlParsedStatement,
782 authority: EntityAuthority,
783 unsupported_message: &'static str,
784 dispatch_select: impl FnOnce(
785 &Self,
786 LoweredSelectShape,
787 EntityAuthority,
788 bool,
789 Option<Vec<String>>,
790 ) -> Result<SqlDispatchResult, QueryError>,
791 dispatch_delete: impl FnOnce(
792 &Self,
793 LoweredBaseQueryShape,
794 EntityAuthority,
795 ) -> Result<SqlDispatchResult, QueryError>,
796 ) -> Result<SqlDispatchResult, QueryError> {
797 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
800 let command =
801 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
802
803 return self.execute_sql_aggregate_dispatch_for_authority(
804 command,
805 authority,
806 sql_aggregate_dispatch_label_override(&parsed.statement),
807 );
808 }
809
810 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
811 return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
812 }
813
814 let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
817 parsed,
818 authority,
819 unsupported_message,
820 )?;
821 let grouped_surface = query.has_grouping();
822
823 match query {
824 LoweredSqlQuery::Select(select) => {
825 dispatch_select(self, select, authority, grouped_surface, projection_columns)
826 }
827 LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
828 }
829 }
830
831 fn dispatch_sql_explain_route_for_authority(
835 &self,
836 parsed: &SqlParsedStatement,
837 authority: EntityAuthority,
838 ) -> Result<SqlDispatchResult, QueryError> {
839 if let Some((mode, plan)) =
842 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
843 {
844 return self
845 .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
846 .map(SqlDispatchResult::Explain);
847 }
848
849 let lowered = parsed.lower_query_lane_for_entity(
852 authority.model().name(),
853 authority.model().primary_key.name,
854 )?;
855 if let Some(explain) =
856 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
857 {
858 return Ok(SqlDispatchResult::Explain(explain));
859 }
860
861 self.explain_lowered_sql_for_authority(&lowered, authority)
862 .map(SqlDispatchResult::Explain)
863 }
864
865 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
868 query: &Query<E>,
869 surface: SqlGroupingSurface,
870 ) -> Result<(), QueryError>
871 where
872 E: EntityKind,
873 {
874 match (surface, query.has_grouping()) {
875 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
876 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
877 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
878 ),
879 }
880 }
881
882 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
884 where
885 E: PersistedRow<Canister = C> + EntityValue,
886 {
887 let parsed = self.parse_sql_statement(sql)?;
888
889 self.execute_sql_dispatch_parsed::<E>(&parsed)
890 }
891
892 pub fn execute_sql_dispatch_parsed<E>(
894 &self,
895 parsed: &SqlParsedStatement,
896 ) -> Result<SqlDispatchResult, QueryError>
897 where
898 E: PersistedRow<Canister = C> + EntityValue,
899 {
900 match parsed.route() {
901 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
902 parsed,
903 EntityAuthority::for_type::<E>(),
904 "execute_sql_dispatch accepts SELECT or DELETE only",
905 |session, select, authority, grouped_surface, projection_columns| {
906 if grouped_surface {
907 let columns = projection_columns.ok_or_else(|| {
908 QueryError::unsupported_query(
909 "grouped SQL dispatch requires explicit grouped projection items",
910 )
911 })?;
912
913 return session.execute_lowered_sql_grouped_dispatch_select_core(
914 select, authority, columns,
915 );
916 }
917
918 let payload = session.execute_lowered_sql_projection_core(select, authority)?;
919 if let Some(columns) = projection_columns {
920 let (_, rows, row_count) = payload.into_parts();
921
922 return Ok(SqlProjectionPayload::new(columns, rows, row_count)
923 .into_dispatch_result());
924 }
925
926 Ok(payload.into_dispatch_result())
927 },
928 |session, delete, _authority| {
929 let typed_query = bind_lowered_sql_query::<E>(
930 LoweredSqlQuery::Delete(delete),
931 MissingRowPolicy::Ignore,
932 )
933 .map_err(QueryError::from_sql_lowering_error)?;
934
935 session.execute_typed_sql_delete(&typed_query)
936 },
937 ),
938 SqlStatementRoute::Insert { .. } => {
939 let SqlStatement::Insert(statement) = &parsed.statement else {
940 return Err(QueryError::invariant(
941 "INSERT SQL route must carry parsed INSERT statement",
942 ));
943 };
944
945 self.execute_sql_insert_dispatch::<E>(statement)
946 }
947 SqlStatementRoute::Update { .. } => {
948 let SqlStatement::Update(statement) = &parsed.statement else {
949 return Err(QueryError::invariant(
950 "UPDATE SQL route must carry parsed UPDATE statement",
951 ));
952 };
953
954 self.execute_sql_update_dispatch::<E>(statement)
955 }
956 SqlStatementRoute::Explain { .. } => self
957 .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
958 SqlStatementRoute::Describe { .. } => {
959 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
960 }
961 SqlStatementRoute::ShowIndexes { .. } => {
962 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
963 }
964 SqlStatementRoute::ShowColumns { .. } => {
965 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
966 }
967 SqlStatementRoute::ShowEntities => {
968 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
969 }
970 }
971 }
972
973 #[doc(hidden)]
980 pub fn execute_generated_query_surface_dispatch_for_authority(
981 &self,
982 parsed: &SqlParsedStatement,
983 authority: EntityAuthority,
984 ) -> Result<SqlDispatchResult, QueryError> {
985 match parsed.route() {
986 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
987 parsed,
988 authority,
989 "generated SQL query surface requires query or EXPLAIN statement lanes",
990 |session, select, authority, grouped_surface, projection_columns| {
991 if grouped_surface {
992 let columns = projection_columns.ok_or_else(|| {
993 QueryError::unsupported_query(
994 "grouped SQL dispatch requires explicit grouped projection items",
995 )
996 })?;
997
998 return session
999 .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
1000 }
1001
1002 let result =
1003 session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
1004 if let Some(columns) = projection_columns {
1005 let SqlDispatchResult::ProjectionText {
1006 rows, row_count, ..
1007 } = result
1008 else {
1009 return Err(QueryError::invariant(
1010 "generated scalar SQL dispatch text path must emit projection text rows",
1011 ));
1012 };
1013
1014 return Ok(SqlDispatchResult::ProjectionText {
1015 columns,
1016 rows,
1017 row_count,
1018 });
1019 }
1020
1021 Ok(result)
1022 },
1023 |session, delete, authority| {
1024 session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
1025 },
1026 ),
1027 SqlStatementRoute::Explain { .. } => {
1028 self.dispatch_sql_explain_route_for_authority(parsed, authority)
1029 }
1030 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
1031 | SqlStatementRoute::Describe { .. }
1032 | SqlStatementRoute::ShowIndexes { .. }
1033 | SqlStatementRoute::ShowColumns { .. }
1034 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
1035 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1036 )),
1037 }
1038 }
1039
1040 #[doc(hidden)]
1046 #[must_use]
1047 pub fn execute_generated_query_surface_sql(
1048 &self,
1049 sql: &str,
1050 authorities: &[EntityAuthority],
1051 ) -> GeneratedSqlDispatchAttempt {
1052 let sql_trimmed = match trim_generated_query_sql_input(sql) {
1055 Ok(sql_trimmed) => sql_trimmed,
1056 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1057 };
1058 let parsed = match self.parse_sql_statement(sql_trimmed) {
1059 Ok(parsed) => parsed,
1060 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1061 };
1062
1063 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
1066 return GeneratedSqlDispatchAttempt::new(
1067 "",
1068 None,
1069 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
1070 authorities,
1071 ))),
1072 );
1073 }
1074 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1075 Ok(authority) => authority,
1076 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1077 };
1078
1079 let entity_name = authority.model().name();
1083 let explain_order_field = parsed
1084 .route()
1085 .is_explain()
1086 .then_some(authority.model().primary_key.name);
1087 let result = match parsed.route() {
1088 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1089 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1090 }
1091 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1092 Err(QueryError::unsupported_query(
1093 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1094 ))
1095 }
1096 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1097 self.describe_entity_model(authority.model()),
1098 )),
1099 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1100 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1101 )),
1102 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1103 self.show_columns_for_model(authority.model()),
1104 )),
1105 SqlStatementRoute::ShowEntities => unreachable!(
1106 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1107 ),
1108 };
1109
1110 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1111 }
1112}