1mod computed;
8mod lowered;
9
10use crate::{
11 db::{
12 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13 data::UpdatePatch,
14 executor::{EntityAuthority, MutationMode},
15 identifiers_tail_match,
16 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17 session::sql::{
18 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
19 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
20 computed_projection,
21 projection::{
22 SqlProjectionPayload, execute_sql_projection_rows_for_canister,
23 execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
24 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25 },
26 },
27 sql::lowering::{
28 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
29 bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
30 },
31 sql::parser::{
32 SqlAggregateCall, SqlAggregateKind, SqlInsertStatement, SqlProjection, SqlSelectItem,
33 SqlStatement, SqlTextFunction, SqlUpdateStatement,
34 },
35 },
36 model::{entity::resolve_field_slot, field::FieldKind},
37 traits::{CanisterKind, EntityKind, EntityValue},
38 types::Timestamp,
39 value::Value,
40};
41
42#[cfg(feature = "perf-attribution")]
43pub use lowered::LoweredSqlDispatchExecutorAttribution;
44
45#[doc(hidden)]
54pub struct GeneratedSqlDispatchAttempt {
55 entity_name: &'static str,
56 explain_order_field: Option<&'static str>,
57 result: Result<SqlDispatchResult, QueryError>,
58}
59
60impl GeneratedSqlDispatchAttempt {
61 const fn new(
63 entity_name: &'static str,
64 explain_order_field: Option<&'static str>,
65 result: Result<SqlDispatchResult, QueryError>,
66 ) -> Self {
67 Self {
68 entity_name,
69 explain_order_field,
70 result,
71 }
72 }
73
74 #[must_use]
76 pub const fn entity_name(&self) -> &'static str {
77 self.entity_name
78 }
79
80 #[must_use]
82 pub const fn explain_order_field(&self) -> Option<&'static str> {
83 self.explain_order_field
84 }
85
86 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
88 self.result
89 }
90}
91
92#[derive(Clone, Copy, Debug, Eq, PartialEq)]
93pub(in crate::db::session::sql) enum SqlGroupingSurface {
94 Scalar,
95 Grouped,
96}
97
98const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
99 match surface {
100 SqlGroupingSurface::Scalar => {
101 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
102 }
103 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
104 }
105}
106
107fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
110 let sql_trimmed = sql.trim();
111 if sql_trimmed.is_empty() {
112 return Err(QueryError::unsupported_query(
113 "query endpoint requires a non-empty SQL string",
114 ));
115 }
116
117 Ok(sql_trimmed)
118}
119
120fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
123 let mut entities = Vec::with_capacity(authorities.len());
124
125 for authority in authorities {
126 entities.push(authority.model().name().to_string());
127 }
128
129 entities
130}
131
132fn sql_projection_labels_from_select_statement(
135 statement: &SqlStatement,
136) -> Result<Option<Vec<String>>, QueryError> {
137 let SqlStatement::Select(select) = statement else {
138 return Err(QueryError::invariant(
139 "SQL projection labels require SELECT statement shape",
140 ));
141 };
142 let SqlProjection::Items(items) = &select.projection else {
143 return Ok(None);
144 };
145
146 Ok(Some(
147 items
148 .iter()
149 .enumerate()
150 .map(|(index, item)| {
151 select
152 .projection_alias(index)
153 .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
154 })
155 .collect(),
156 ))
157}
158
159fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
162 match item {
163 SqlSelectItem::Field(field) => field.clone(),
164 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
165 SqlSelectItem::TextFunction(call) => {
166 format!(
167 "{}({})",
168 grouped_sql_text_function_name(call.function),
169 call.field
170 )
171 }
172 }
173}
174
175fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
178 let SqlStatement::Select(select) = statement else {
179 return None;
180 };
181
182 select.projection_alias(0).map(str::to_string)
183}
184
185fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
187 let kind = match aggregate.kind {
188 SqlAggregateKind::Count => "COUNT",
189 SqlAggregateKind::Sum => "SUM",
190 SqlAggregateKind::Avg => "AVG",
191 SqlAggregateKind::Min => "MIN",
192 SqlAggregateKind::Max => "MAX",
193 };
194
195 match aggregate.field.as_deref() {
196 Some(field) => format!("{kind}({field})"),
197 None => format!("{kind}(*)"),
198 }
199}
200
201const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
204 match function {
205 SqlTextFunction::Trim => "TRIM",
206 SqlTextFunction::Ltrim => "LTRIM",
207 SqlTextFunction::Rtrim => "RTRIM",
208 SqlTextFunction::Lower => "LOWER",
209 SqlTextFunction::Upper => "UPPER",
210 SqlTextFunction::Length => "LENGTH",
211 SqlTextFunction::Left => "LEFT",
212 SqlTextFunction::Right => "RIGHT",
213 SqlTextFunction::StartsWith => "STARTS_WITH",
214 SqlTextFunction::EndsWith => "ENDS_WITH",
215 SqlTextFunction::Contains => "CONTAINS",
216 SqlTextFunction::Position => "POSITION",
217 SqlTextFunction::Replace => "REPLACE",
218 SqlTextFunction::Substring => "SUBSTRING",
219 }
220}
221
222fn authority_for_generated_sql_route(
224 route: &SqlStatementRoute,
225 authorities: &[EntityAuthority],
226) -> Result<EntityAuthority, QueryError> {
227 let sql_entity = route.entity();
228
229 for authority in authorities {
230 if identifiers_tail_match(sql_entity, authority.model().name()) {
231 return Ok(*authority);
232 }
233 }
234
235 Err(unsupported_generated_sql_entity_error(
236 sql_entity,
237 authorities,
238 ))
239}
240
241fn unsupported_generated_sql_entity_error(
244 entity_name: &str,
245 authorities: &[EntityAuthority],
246) -> QueryError {
247 let mut supported = String::new();
248
249 for (index, authority) in authorities.iter().enumerate() {
250 if index != 0 {
251 supported.push_str(", ");
252 }
253
254 supported.push_str(authority.model().name());
255 }
256
257 QueryError::unsupported_query(format!(
258 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
259 ))
260}
261
262fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
265where
266 E: EntityKind,
267{
268 if identifiers_tail_match(sql_entity, E::MODEL.name()) {
269 return Ok(());
270 }
271
272 Err(QueryError::from_sql_lowering_error(
273 SqlLoweringError::EntityMismatch {
274 sql_entity: sql_entity.to_string(),
275 expected_entity: E::MODEL.name(),
276 },
277 ))
278}
279
280fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
283where
284 E: EntityKind,
285{
286 if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
287 return Ok(key);
288 }
289
290 let widened = match value {
291 Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
292 Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
293 _ => {
294 return Err(QueryError::unsupported_query(format!(
295 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
296 )));
297 }
298 };
299
300 <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
301 QueryError::unsupported_query(format!(
302 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
303 ))
304 })
305}
306
307fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
310where
311 E: EntityKind,
312{
313 let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
314 QueryError::invariant("SQL write field must resolve against the target entity model")
315 })?;
316 let field_kind = E::MODEL.fields()[field_slot].kind();
317
318 let normalized = match (field_kind, value) {
319 (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
320 (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
321 Value::Int(v.cast_signed())
322 }
323 _ => value.clone(),
324 };
325
326 Ok(normalized)
327}
328
329fn sql_write_system_timestamp_fields<E>() -> Option<(&'static str, &'static str)>
333where
334 E: EntityKind,
335{
336 if resolve_field_slot(E::MODEL, "created_at").is_some()
337 && resolve_field_slot(E::MODEL, "updated_at").is_some()
338 {
339 return Some(("created_at", "updated_at"));
340 }
341
342 None
343}
344
345fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
350where
351 E: EntityKind,
352{
353 if !statement.columns.is_empty() {
354 return statement.columns.clone();
355 }
356
357 let timestamp_fields = sql_write_system_timestamp_fields::<E>();
358
359 E::MODEL
360 .fields()
361 .iter()
362 .filter(|field| {
363 !matches!(
364 timestamp_fields,
365 Some((created_at, updated_at))
366 if field.name() == created_at || field.name() == updated_at
367 )
368 })
369 .map(|field| field.name().to_string())
370 .collect()
371}
372
373fn validate_sql_insert_tuple_lengths(
376 columns: &[String],
377 values: &[Vec<Value>],
378) -> Result<(), QueryError> {
379 for tuple in values {
380 if tuple.len() != columns.len() {
381 return Err(QueryError::from_sql_parse_error(
382 crate::db::sql::parser::SqlParseError::invalid_syntax(
383 "INSERT column list and VALUES tuple length must match",
384 ),
385 ));
386 }
387 }
388
389 Ok(())
390}
391
392impl<C: CanisterKind> DbSession<C> {
393 fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
396 where
397 E: PersistedRow<Canister = C> + EntityValue,
398 {
399 let mut row = Vec::with_capacity(E::MODEL.fields().len());
400
401 for index in 0..E::MODEL.fields().len() {
402 let value = entity.get_value_by_index(index).ok_or_else(|| {
403 QueryError::invariant(
404 "SQL write dispatch projection row must include every declared field",
405 )
406 })?;
407 row.push(value);
408 }
409
410 Ok(row)
411 }
412
413 fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
417 where
418 E: PersistedRow<Canister = C> + EntityValue,
419 {
420 let columns = projection_labels_from_fields(E::MODEL.fields());
421 let rows = entities
422 .into_iter()
423 .map(Self::sql_write_dispatch_row)
424 .collect::<Result<Vec<_>, _>>()?;
425 let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
426
427 Ok(SqlDispatchResult::Projection {
428 columns,
429 rows,
430 row_count,
431 })
432 }
433
434 fn sql_insert_patch_and_key<E>(
437 columns: &[String],
438 values: &[Value],
439 ) -> Result<(E::Key, UpdatePatch), QueryError>
440 where
441 E: PersistedRow<Canister = C> + EntityValue,
442 {
443 let pk_name = E::MODEL.primary_key.name;
446 let Some(pk_index) = columns.iter().position(|field| field == pk_name) else {
447 return Err(QueryError::unsupported_query(format!(
448 "SQL INSERT requires primary key column '{pk_name}' in this release"
449 )));
450 };
451 let pk_value = values.get(pk_index).ok_or_else(|| {
452 QueryError::invariant("INSERT primary key column must align with one VALUES literal")
453 })?;
454 let key = sql_write_key_from_literal::<E>(pk_value, pk_name)?;
455
456 let mut patch = UpdatePatch::new();
459 for (field, value) in columns.iter().zip(values.iter()) {
460 let normalized = sql_write_value_for_field::<E>(field, value)?;
461 patch = patch
462 .set_field(E::MODEL, field, normalized)
463 .map_err(QueryError::execute)?;
464 }
465
466 if let Some((created_at, updated_at)) = sql_write_system_timestamp_fields::<E>() {
469 let now = Value::Timestamp(Timestamp::now());
470 patch = patch
471 .set_field(E::MODEL, created_at, now.clone())
472 .map_err(QueryError::execute)?;
473 patch = patch
474 .set_field(E::MODEL, updated_at, now)
475 .map_err(QueryError::execute)?;
476 }
477
478 Ok((key, patch))
479 }
480
481 fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
484 where
485 E: PersistedRow<Canister = C> + EntityValue,
486 {
487 let pk_name = E::MODEL.primary_key.name;
490 let mut patch = UpdatePatch::new();
491 for assignment in &statement.assignments {
492 if assignment.field == pk_name {
493 return Err(QueryError::unsupported_query(format!(
494 "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
495 )));
496 }
497 let normalized =
498 sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
499
500 patch = patch
501 .set_field(E::MODEL, assignment.field.as_str(), normalized)
502 .map_err(QueryError::execute)?;
503 }
504
505 if let Some((_, updated_at)) = sql_write_system_timestamp_fields::<E>() {
508 patch = patch
509 .set_field(E::MODEL, updated_at, Value::Timestamp(Timestamp::now()))
510 .map_err(QueryError::execute)?;
511 }
512
513 Ok(patch)
514 }
515
516 fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
518 where
519 E: PersistedRow<Canister = C> + EntityValue,
520 {
521 let Some(predicate) = statement.predicate.clone() else {
525 return Err(QueryError::unsupported_query(
526 "SQL UPDATE requires WHERE predicate in this release",
527 ));
528 };
529 let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
530
531 Ok(Query::<E>::new(MissingRowPolicy::Ignore)
534 .filter(predicate)
535 .order_by(E::MODEL.primary_key.name))
536 }
537
538 fn execute_sql_insert_dispatch<E>(
541 &self,
542 statement: &SqlInsertStatement,
543 ) -> Result<SqlDispatchResult, QueryError>
544 where
545 E: PersistedRow<Canister = C> + EntityValue,
546 {
547 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
548 let columns = sql_insert_columns::<E>(statement);
549 validate_sql_insert_tuple_lengths(columns.as_slice(), statement.values.as_slice())?;
550 let mut entities = Vec::with_capacity(statement.values.len());
551
552 for values in &statement.values {
553 let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
554 let entity = self
555 .mutate_structural::<E>(key, patch, MutationMode::Insert)
556 .map_err(QueryError::execute)?;
557 entities.push(entity);
558 }
559
560 Self::sql_write_dispatch_projection(entities)
561 }
562
563 fn execute_sql_update_dispatch<E>(
567 &self,
568 statement: &SqlUpdateStatement,
569 ) -> Result<SqlDispatchResult, QueryError>
570 where
571 E: PersistedRow<Canister = C> + EntityValue,
572 {
573 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
574 let selector = Self::sql_update_selector_query::<E>(statement)?;
575 let patch = Self::sql_update_patch::<E>(statement)?;
576 let matched = self.execute_query(&selector)?;
577 let mut entities = Vec::with_capacity(matched.len());
578
579 for entity in matched.entities() {
582 let updated = self
583 .mutate_structural::<E>(entity.id().key(), patch.clone(), MutationMode::Update)
584 .map_err(QueryError::execute)?;
585 entities.push(updated);
586 }
587
588 Self::sql_write_dispatch_projection(entities)
589 }
590
591 fn prepare_structural_sql_projection_execution(
594 &self,
595 query: StructuralQuery,
596 authority: EntityAuthority,
597 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
598 let (_, plan) =
601 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
602 let projection = plan.projection_spec(authority.model());
603 let columns = projection_labels_from_projection_spec(&projection);
604
605 Ok((columns, plan))
606 }
607
608 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
612 &self,
613 query: StructuralQuery,
614 authority: EntityAuthority,
615 ) -> Result<SqlProjectionPayload, QueryError> {
616 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
618
619 let projected =
622 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
623 .map_err(QueryError::execute)?;
624 let (rows, row_count) = projected.into_parts();
625
626 Ok(SqlProjectionPayload::new(columns, rows, row_count))
627 }
628
629 fn execute_structural_sql_projection_text(
633 &self,
634 query: StructuralQuery,
635 authority: EntityAuthority,
636 ) -> Result<SqlDispatchResult, QueryError> {
637 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
639
640 let projected =
643 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
644 .map_err(QueryError::execute)?;
645 let (rows, row_count) = projected.into_parts();
646
647 Ok(SqlDispatchResult::ProjectionText {
648 columns,
649 rows,
650 row_count,
651 })
652 }
653
654 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
658 where
659 E: PersistedRow<Canister = C> + EntityValue,
660 {
661 let plan = self
662 .compile_query_with_visible_indexes(query)?
663 .into_prepared_execution_plan();
664 let deleted = self
665 .with_metrics(|| {
666 self.delete_executor::<E>()
667 .execute_structural_projection(plan)
668 })
669 .map_err(QueryError::execute)?;
670 let (rows, row_count) = deleted.into_parts();
671 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
672
673 Ok(SqlProjectionPayload::new(
674 projection_labels_from_fields(E::MODEL.fields()),
675 rows,
676 row_count,
677 )
678 .into_dispatch_result())
679 }
680
681 fn lowered_sql_query_dispatch_inputs_for_authority(
684 parsed: &SqlParsedStatement,
685 authority: EntityAuthority,
686 unsupported_message: &'static str,
687 ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
688 let lowered = parsed.lower_query_lane_for_entity(
689 authority.model().name(),
690 authority.model().primary_key.name,
691 )?;
692 let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
693 .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
694 .transpose()?;
695 let query = lowered
696 .into_query()
697 .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
698
699 Ok((query, projection_columns.flatten()))
700 }
701
702 fn dispatch_sql_query_route_for_authority(
706 &self,
707 parsed: &SqlParsedStatement,
708 authority: EntityAuthority,
709 unsupported_message: &'static str,
710 dispatch_select: impl FnOnce(
711 &Self,
712 LoweredSelectShape,
713 EntityAuthority,
714 bool,
715 Option<Vec<String>>,
716 ) -> Result<SqlDispatchResult, QueryError>,
717 dispatch_delete: impl FnOnce(
718 &Self,
719 LoweredBaseQueryShape,
720 EntityAuthority,
721 ) -> Result<SqlDispatchResult, QueryError>,
722 ) -> Result<SqlDispatchResult, QueryError> {
723 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
726 let command =
727 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
728
729 return self.execute_sql_aggregate_dispatch_for_authority(
730 command,
731 authority,
732 sql_aggregate_dispatch_label_override(&parsed.statement),
733 );
734 }
735
736 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
737 return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
738 }
739
740 let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
743 parsed,
744 authority,
745 unsupported_message,
746 )?;
747 let grouped_surface = query.has_grouping();
748
749 match query {
750 LoweredSqlQuery::Select(select) => {
751 dispatch_select(self, select, authority, grouped_surface, projection_columns)
752 }
753 LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
754 }
755 }
756
757 fn dispatch_sql_explain_route_for_authority(
761 &self,
762 parsed: &SqlParsedStatement,
763 authority: EntityAuthority,
764 ) -> Result<SqlDispatchResult, QueryError> {
765 if let Some((mode, plan)) =
768 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
769 {
770 return self
771 .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
772 .map(SqlDispatchResult::Explain);
773 }
774
775 let lowered = parsed.lower_query_lane_for_entity(
778 authority.model().name(),
779 authority.model().primary_key.name,
780 )?;
781 if let Some(explain) =
782 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
783 {
784 return Ok(SqlDispatchResult::Explain(explain));
785 }
786
787 self.explain_lowered_sql_for_authority(&lowered, authority)
788 .map(SqlDispatchResult::Explain)
789 }
790
791 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
794 query: &Query<E>,
795 surface: SqlGroupingSurface,
796 ) -> Result<(), QueryError>
797 where
798 E: EntityKind,
799 {
800 match (surface, query.has_grouping()) {
801 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
802 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
803 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
804 ),
805 }
806 }
807
808 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
810 where
811 E: PersistedRow<Canister = C> + EntityValue,
812 {
813 let parsed = self.parse_sql_statement(sql)?;
814
815 self.execute_sql_dispatch_parsed::<E>(&parsed)
816 }
817
818 pub fn execute_sql_dispatch_parsed<E>(
820 &self,
821 parsed: &SqlParsedStatement,
822 ) -> Result<SqlDispatchResult, QueryError>
823 where
824 E: PersistedRow<Canister = C> + EntityValue,
825 {
826 match parsed.route() {
827 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
828 parsed,
829 EntityAuthority::for_type::<E>(),
830 "execute_sql_dispatch accepts SELECT or DELETE only",
831 |session, select, authority, grouped_surface, projection_columns| {
832 if grouped_surface {
833 let columns = projection_columns.ok_or_else(|| {
834 QueryError::unsupported_query(
835 "grouped SQL dispatch requires explicit grouped projection items",
836 )
837 })?;
838
839 return session.execute_lowered_sql_grouped_dispatch_select_core(
840 select, authority, columns,
841 );
842 }
843
844 let payload = session.execute_lowered_sql_projection_core(select, authority)?;
845 if let Some(columns) = projection_columns {
846 let (_, rows, row_count) = payload.into_parts();
847
848 return Ok(SqlProjectionPayload::new(columns, rows, row_count)
849 .into_dispatch_result());
850 }
851
852 Ok(payload.into_dispatch_result())
853 },
854 |session, delete, _authority| {
855 let typed_query = bind_lowered_sql_query::<E>(
856 LoweredSqlQuery::Delete(delete),
857 MissingRowPolicy::Ignore,
858 )
859 .map_err(QueryError::from_sql_lowering_error)?;
860
861 session.execute_typed_sql_delete(&typed_query)
862 },
863 ),
864 SqlStatementRoute::Insert { .. } => {
865 let SqlStatement::Insert(statement) = &parsed.statement else {
866 return Err(QueryError::invariant(
867 "INSERT SQL route must carry parsed INSERT statement",
868 ));
869 };
870
871 self.execute_sql_insert_dispatch::<E>(statement)
872 }
873 SqlStatementRoute::Update { .. } => {
874 let SqlStatement::Update(statement) = &parsed.statement else {
875 return Err(QueryError::invariant(
876 "UPDATE SQL route must carry parsed UPDATE statement",
877 ));
878 };
879
880 self.execute_sql_update_dispatch::<E>(statement)
881 }
882 SqlStatementRoute::Explain { .. } => self
883 .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
884 SqlStatementRoute::Describe { .. } => {
885 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
886 }
887 SqlStatementRoute::ShowIndexes { .. } => {
888 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
889 }
890 SqlStatementRoute::ShowColumns { .. } => {
891 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
892 }
893 SqlStatementRoute::ShowEntities => {
894 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
895 }
896 }
897 }
898
899 #[doc(hidden)]
906 pub fn execute_generated_query_surface_dispatch_for_authority(
907 &self,
908 parsed: &SqlParsedStatement,
909 authority: EntityAuthority,
910 ) -> Result<SqlDispatchResult, QueryError> {
911 match parsed.route() {
912 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
913 parsed,
914 authority,
915 "generated SQL query surface requires query or EXPLAIN statement lanes",
916 |session, select, authority, grouped_surface, projection_columns| {
917 if grouped_surface {
918 let columns = projection_columns.ok_or_else(|| {
919 QueryError::unsupported_query(
920 "grouped SQL dispatch requires explicit grouped projection items",
921 )
922 })?;
923
924 return session
925 .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
926 }
927
928 let result =
929 session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
930 if let Some(columns) = projection_columns {
931 let SqlDispatchResult::ProjectionText {
932 rows, row_count, ..
933 } = result
934 else {
935 return Err(QueryError::invariant(
936 "generated scalar SQL dispatch text path must emit projection text rows",
937 ));
938 };
939
940 return Ok(SqlDispatchResult::ProjectionText {
941 columns,
942 rows,
943 row_count,
944 });
945 }
946
947 Ok(result)
948 },
949 |session, delete, authority| {
950 session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
951 },
952 ),
953 SqlStatementRoute::Explain { .. } => {
954 self.dispatch_sql_explain_route_for_authority(parsed, authority)
955 }
956 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
957 | SqlStatementRoute::Describe { .. }
958 | SqlStatementRoute::ShowIndexes { .. }
959 | SqlStatementRoute::ShowColumns { .. }
960 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
961 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
962 )),
963 }
964 }
965
966 #[doc(hidden)]
972 #[must_use]
973 pub fn execute_generated_query_surface_sql(
974 &self,
975 sql: &str,
976 authorities: &[EntityAuthority],
977 ) -> GeneratedSqlDispatchAttempt {
978 let sql_trimmed = match trim_generated_query_sql_input(sql) {
981 Ok(sql_trimmed) => sql_trimmed,
982 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
983 };
984 let parsed = match self.parse_sql_statement(sql_trimmed) {
985 Ok(parsed) => parsed,
986 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
987 };
988
989 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
992 return GeneratedSqlDispatchAttempt::new(
993 "",
994 None,
995 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
996 authorities,
997 ))),
998 );
999 }
1000 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1001 Ok(authority) => authority,
1002 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1003 };
1004
1005 let entity_name = authority.model().name();
1009 let explain_order_field = parsed
1010 .route()
1011 .is_explain()
1012 .then_some(authority.model().primary_key.name);
1013 let result = match parsed.route() {
1014 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1015 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1016 }
1017 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1018 Err(QueryError::unsupported_query(
1019 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1020 ))
1021 }
1022 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1023 self.describe_entity_model(authority.model()),
1024 )),
1025 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1026 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1027 )),
1028 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1029 self.show_columns_for_model(authority.model()),
1030 )),
1031 SqlStatementRoute::ShowEntities => unreachable!(
1032 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1033 ),
1034 };
1035
1036 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1037 }
1038}