1mod computed;
8mod lowered;
9
10use crate::{
11 db::{
12 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13 data::UpdatePatch,
14 executor::{EntityAuthority, MutationMode},
15 identifiers_tail_match,
16 predicate::{CompareOp, Predicate},
17 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
18 session::sql::{
19 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
20 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
21 computed_projection,
22 projection::{
23 SqlProjectionPayload, execute_sql_projection_rows_for_canister,
24 execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
25 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
26 },
27 },
28 sql::lowering::{
29 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
30 bind_lowered_sql_query,
31 },
32 sql::parser::{
33 SqlAggregateCall, SqlAggregateKind, SqlInsertStatement, SqlProjection, SqlSelectItem,
34 SqlStatement, SqlTextFunction, SqlUpdateStatement,
35 },
36 },
37 model::{entity::resolve_field_slot, field::FieldKind},
38 traits::{CanisterKind, EntityKind, EntityValue},
39 types::Timestamp,
40 value::Value,
41};
42
43#[cfg(feature = "perf-attribution")]
44pub use lowered::LoweredSqlDispatchExecutorAttribution;
45
46#[doc(hidden)]
55pub struct GeneratedSqlDispatchAttempt {
56 entity_name: &'static str,
57 explain_order_field: Option<&'static str>,
58 result: Result<SqlDispatchResult, QueryError>,
59}
60
61impl GeneratedSqlDispatchAttempt {
62 const fn new(
64 entity_name: &'static str,
65 explain_order_field: Option<&'static str>,
66 result: Result<SqlDispatchResult, QueryError>,
67 ) -> Self {
68 Self {
69 entity_name,
70 explain_order_field,
71 result,
72 }
73 }
74
75 #[must_use]
77 pub const fn entity_name(&self) -> &'static str {
78 self.entity_name
79 }
80
81 #[must_use]
83 pub const fn explain_order_field(&self) -> Option<&'static str> {
84 self.explain_order_field
85 }
86
87 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
89 self.result
90 }
91}
92
93#[derive(Clone, Copy, Debug, Eq, PartialEq)]
94pub(in crate::db::session::sql) enum SqlGroupingSurface {
95 Scalar,
96 Grouped,
97}
98
99const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
100 match surface {
101 SqlGroupingSurface::Scalar => {
102 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
103 }
104 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
105 }
106}
107
108fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
111 let sql_trimmed = sql.trim();
112 if sql_trimmed.is_empty() {
113 return Err(QueryError::unsupported_query(
114 "query endpoint requires a non-empty SQL string",
115 ));
116 }
117
118 Ok(sql_trimmed)
119}
120
121fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
124 let mut entities = Vec::with_capacity(authorities.len());
125
126 for authority in authorities {
127 entities.push(authority.model().name().to_string());
128 }
129
130 entities
131}
132
133fn sql_projection_labels_from_select_statement(
136 statement: &SqlStatement,
137) -> Result<Option<Vec<String>>, QueryError> {
138 let SqlStatement::Select(select) = statement else {
139 return Err(QueryError::invariant(
140 "SQL projection labels require SELECT statement shape",
141 ));
142 };
143 let SqlProjection::Items(items) = &select.projection else {
144 return Ok(None);
145 };
146
147 Ok(Some(
148 items
149 .iter()
150 .enumerate()
151 .map(|(index, item)| {
152 select
153 .projection_alias(index)
154 .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
155 })
156 .collect(),
157 ))
158}
159
160fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
163 match item {
164 SqlSelectItem::Field(field) => field.clone(),
165 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
166 SqlSelectItem::TextFunction(call) => {
167 format!(
168 "{}({})",
169 grouped_sql_text_function_name(call.function),
170 call.field
171 )
172 }
173 }
174}
175
176fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
179 let SqlStatement::Select(select) = statement else {
180 return None;
181 };
182
183 select.projection_alias(0).map(str::to_string)
184}
185
186fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
188 let kind = match aggregate.kind {
189 SqlAggregateKind::Count => "COUNT",
190 SqlAggregateKind::Sum => "SUM",
191 SqlAggregateKind::Avg => "AVG",
192 SqlAggregateKind::Min => "MIN",
193 SqlAggregateKind::Max => "MAX",
194 };
195
196 match aggregate.field.as_deref() {
197 Some(field) => format!("{kind}({field})"),
198 None => format!("{kind}(*)"),
199 }
200}
201
202const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
205 match function {
206 SqlTextFunction::Trim => "TRIM",
207 SqlTextFunction::Ltrim => "LTRIM",
208 SqlTextFunction::Rtrim => "RTRIM",
209 SqlTextFunction::Lower => "LOWER",
210 SqlTextFunction::Upper => "UPPER",
211 SqlTextFunction::Length => "LENGTH",
212 SqlTextFunction::Left => "LEFT",
213 SqlTextFunction::Right => "RIGHT",
214 SqlTextFunction::StartsWith => "STARTS_WITH",
215 SqlTextFunction::EndsWith => "ENDS_WITH",
216 SqlTextFunction::Contains => "CONTAINS",
217 SqlTextFunction::Position => "POSITION",
218 SqlTextFunction::Replace => "REPLACE",
219 SqlTextFunction::Substring => "SUBSTRING",
220 }
221}
222
223fn authority_for_generated_sql_route(
225 route: &SqlStatementRoute,
226 authorities: &[EntityAuthority],
227) -> Result<EntityAuthority, QueryError> {
228 let sql_entity = route.entity();
229
230 for authority in authorities {
231 if identifiers_tail_match(sql_entity, authority.model().name()) {
232 return Ok(*authority);
233 }
234 }
235
236 Err(unsupported_generated_sql_entity_error(
237 sql_entity,
238 authorities,
239 ))
240}
241
242fn unsupported_generated_sql_entity_error(
245 entity_name: &str,
246 authorities: &[EntityAuthority],
247) -> QueryError {
248 let mut supported = String::new();
249
250 for (index, authority) in authorities.iter().enumerate() {
251 if index != 0 {
252 supported.push_str(", ");
253 }
254
255 supported.push_str(authority.model().name());
256 }
257
258 QueryError::unsupported_query(format!(
259 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
260 ))
261}
262
263fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
266where
267 E: EntityKind,
268{
269 if identifiers_tail_match(sql_entity, E::MODEL.name()) {
270 return Ok(());
271 }
272
273 Err(QueryError::from_sql_lowering_error(
274 SqlLoweringError::EntityMismatch {
275 sql_entity: sql_entity.to_string(),
276 expected_entity: E::MODEL.name(),
277 },
278 ))
279}
280
281fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
284where
285 E: EntityKind,
286{
287 if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
288 return Ok(key);
289 }
290
291 let widened = match value {
292 Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
293 Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
294 _ => {
295 return Err(QueryError::unsupported_query(format!(
296 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
297 )));
298 }
299 };
300
301 <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
302 QueryError::unsupported_query(format!(
303 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
304 ))
305 })
306}
307
308fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
311where
312 E: EntityKind,
313{
314 let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
315 QueryError::invariant("SQL write field must resolve against the target entity model")
316 })?;
317 let field_kind = E::MODEL.fields()[field_slot].kind();
318
319 let normalized = match (field_kind, value) {
320 (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
321 (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
322 Value::Int(v.cast_signed())
323 }
324 _ => value.clone(),
325 };
326
327 Ok(normalized)
328}
329
330fn sql_write_system_timestamp_fields<E>() -> Option<(&'static str, &'static str)>
334where
335 E: EntityKind,
336{
337 if resolve_field_slot(E::MODEL, "created_at").is_some()
338 && resolve_field_slot(E::MODEL, "updated_at").is_some()
339 {
340 return Some(("created_at", "updated_at"));
341 }
342
343 None
344}
345
346fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Result<Vec<String>, QueryError>
351where
352 E: EntityKind,
353{
354 if !statement.columns.is_empty() {
355 return Ok(statement.columns.clone());
356 }
357 if sql_write_system_timestamp_fields::<E>().is_some() {
358 return Err(QueryError::unsupported_query(
359 "SQL INSERT without explicit column list is not supported for entities with system timestamp fields in this release",
360 ));
361 }
362
363 Ok(E::MODEL
364 .fields()
365 .iter()
366 .map(|field| field.name().to_string())
367 .collect())
368}
369
370fn validate_sql_insert_tuple_lengths(
373 columns: &[String],
374 values: &[Vec<Value>],
375) -> Result<(), QueryError> {
376 for tuple in values {
377 if tuple.len() != columns.len() {
378 return Err(QueryError::from_sql_parse_error(
379 crate::db::sql::parser::SqlParseError::invalid_syntax(
380 "INSERT column list and VALUES tuple length must match",
381 ),
382 ));
383 }
384 }
385
386 Ok(())
387}
388
389impl<C: CanisterKind> DbSession<C> {
390 fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
393 where
394 E: PersistedRow<Canister = C> + EntityValue,
395 {
396 let mut row = Vec::with_capacity(E::MODEL.fields().len());
397
398 for index in 0..E::MODEL.fields().len() {
399 let value = entity.get_value_by_index(index).ok_or_else(|| {
400 QueryError::invariant(
401 "SQL write dispatch projection row must include every declared field",
402 )
403 })?;
404 row.push(value);
405 }
406
407 Ok(row)
408 }
409
410 fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
414 where
415 E: PersistedRow<Canister = C> + EntityValue,
416 {
417 let columns = projection_labels_from_fields(E::MODEL.fields());
418 let rows = entities
419 .into_iter()
420 .map(Self::sql_write_dispatch_row)
421 .collect::<Result<Vec<_>, _>>()?;
422 let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
423
424 Ok(SqlDispatchResult::Projection {
425 columns,
426 rows,
427 row_count,
428 })
429 }
430
431 fn sql_insert_patch_and_key<E>(
434 columns: &[String],
435 values: &[Value],
436 ) -> Result<(E::Key, UpdatePatch), QueryError>
437 where
438 E: PersistedRow<Canister = C> + EntityValue,
439 {
440 let pk_name = E::MODEL.primary_key.name;
443 let Some(pk_index) = columns.iter().position(|field| field == pk_name) else {
444 return Err(QueryError::unsupported_query(format!(
445 "SQL INSERT requires primary key column '{pk_name}' in this release"
446 )));
447 };
448 let pk_value = values.get(pk_index).ok_or_else(|| {
449 QueryError::invariant("INSERT primary key column must align with one VALUES literal")
450 })?;
451 let key = sql_write_key_from_literal::<E>(pk_value, pk_name)?;
452
453 let mut patch = UpdatePatch::new();
456 for (field, value) in columns.iter().zip(values.iter()) {
457 let normalized = sql_write_value_for_field::<E>(field, value)?;
458 patch = patch
459 .set_field(E::MODEL, field, normalized)
460 .map_err(QueryError::execute)?;
461 }
462
463 if let Some((created_at, updated_at)) = sql_write_system_timestamp_fields::<E>() {
466 let now = Value::Timestamp(Timestamp::now());
467 patch = patch
468 .set_field(E::MODEL, created_at, now.clone())
469 .map_err(QueryError::execute)?;
470 patch = patch
471 .set_field(E::MODEL, updated_at, now)
472 .map_err(QueryError::execute)?;
473 }
474
475 Ok((key, patch))
476 }
477
478 fn sql_update_patch_and_key<E>(
481 statement: &SqlUpdateStatement,
482 ) -> Result<(E::Key, UpdatePatch), QueryError>
483 where
484 E: PersistedRow<Canister = C> + EntityValue,
485 {
486 let pk_name = E::MODEL.primary_key.name;
490 let Some(Predicate::Compare(compare)) = &statement.predicate else {
491 return Err(QueryError::unsupported_query(format!(
492 "SQL UPDATE requires WHERE {pk_name} = literal in this release"
493 )));
494 };
495 if compare.field() != pk_name || compare.op() != CompareOp::Eq {
496 return Err(QueryError::unsupported_query(format!(
497 "SQL UPDATE requires WHERE {pk_name} = literal in this release"
498 )));
499 }
500 let key = sql_write_key_from_literal::<E>(compare.value(), pk_name)?;
501
502 let mut patch = UpdatePatch::new();
505 for assignment in &statement.assignments {
506 if assignment.field == pk_name {
507 return Err(QueryError::unsupported_query(format!(
508 "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
509 )));
510 }
511 let normalized =
512 sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
513
514 patch = patch
515 .set_field(E::MODEL, assignment.field.as_str(), normalized)
516 .map_err(QueryError::execute)?;
517 }
518
519 if let Some((_, updated_at)) = sql_write_system_timestamp_fields::<E>() {
522 patch = patch
523 .set_field(E::MODEL, updated_at, Value::Timestamp(Timestamp::now()))
524 .map_err(QueryError::execute)?;
525 }
526
527 Ok((key, patch))
528 }
529
530 fn execute_sql_insert_dispatch<E>(
533 &self,
534 statement: &SqlInsertStatement,
535 ) -> Result<SqlDispatchResult, QueryError>
536 where
537 E: PersistedRow<Canister = C> + EntityValue,
538 {
539 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
540 let columns = sql_insert_columns::<E>(statement)?;
541 validate_sql_insert_tuple_lengths(columns.as_slice(), statement.values.as_slice())?;
542 let mut entities = Vec::with_capacity(statement.values.len());
543
544 for values in &statement.values {
545 let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
546 let entity = self
547 .mutate_structural::<E>(key, patch, MutationMode::Insert)
548 .map_err(QueryError::execute)?;
549 entities.push(entity);
550 }
551
552 Self::sql_write_dispatch_projection(entities)
553 }
554
555 fn execute_sql_update_dispatch<E>(
558 &self,
559 statement: &SqlUpdateStatement,
560 ) -> Result<SqlDispatchResult, QueryError>
561 where
562 E: PersistedRow<Canister = C> + EntityValue,
563 {
564 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
565 let (key, patch) = Self::sql_update_patch_and_key::<E>(statement)?;
566 let entity = self
567 .mutate_structural::<E>(key, patch, MutationMode::Update)
568 .map_err(QueryError::execute)?;
569
570 Self::sql_write_dispatch_projection(vec![entity])
571 }
572
573 fn prepare_structural_sql_projection_execution(
576 &self,
577 query: StructuralQuery,
578 authority: EntityAuthority,
579 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
580 let (_, plan) =
583 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
584 let projection = plan.projection_spec(authority.model());
585 let columns = projection_labels_from_projection_spec(&projection);
586
587 Ok((columns, plan))
588 }
589
590 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
594 &self,
595 query: StructuralQuery,
596 authority: EntityAuthority,
597 ) -> Result<SqlProjectionPayload, QueryError> {
598 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
600
601 let projected =
604 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
605 .map_err(QueryError::execute)?;
606 let (rows, row_count) = projected.into_parts();
607
608 Ok(SqlProjectionPayload::new(columns, rows, row_count))
609 }
610
611 fn execute_structural_sql_projection_text(
615 &self,
616 query: StructuralQuery,
617 authority: EntityAuthority,
618 ) -> Result<SqlDispatchResult, QueryError> {
619 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
621
622 let projected =
625 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
626 .map_err(QueryError::execute)?;
627 let (rows, row_count) = projected.into_parts();
628
629 Ok(SqlDispatchResult::ProjectionText {
630 columns,
631 rows,
632 row_count,
633 })
634 }
635
636 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
640 where
641 E: PersistedRow<Canister = C> + EntityValue,
642 {
643 let plan = self
644 .compile_query_with_visible_indexes(query)?
645 .into_prepared_execution_plan();
646 let deleted = self
647 .with_metrics(|| {
648 self.delete_executor::<E>()
649 .execute_structural_projection(plan)
650 })
651 .map_err(QueryError::execute)?;
652 let (rows, row_count) = deleted.into_parts();
653 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
654
655 Ok(SqlProjectionPayload::new(
656 projection_labels_from_fields(E::MODEL.fields()),
657 rows,
658 row_count,
659 )
660 .into_dispatch_result())
661 }
662
663 fn lowered_sql_query_dispatch_inputs_for_authority(
666 parsed: &SqlParsedStatement,
667 authority: EntityAuthority,
668 unsupported_message: &'static str,
669 ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
670 let lowered = parsed.lower_query_lane_for_entity(
671 authority.model().name(),
672 authority.model().primary_key.name,
673 )?;
674 let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
675 .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
676 .transpose()?;
677 let query = lowered
678 .into_query()
679 .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
680
681 Ok((query, projection_columns.flatten()))
682 }
683
684 fn dispatch_sql_query_route_for_authority(
688 &self,
689 parsed: &SqlParsedStatement,
690 authority: EntityAuthority,
691 unsupported_message: &'static str,
692 dispatch_select: impl FnOnce(
693 &Self,
694 LoweredSelectShape,
695 EntityAuthority,
696 bool,
697 Option<Vec<String>>,
698 ) -> Result<SqlDispatchResult, QueryError>,
699 dispatch_delete: impl FnOnce(
700 &Self,
701 LoweredBaseQueryShape,
702 EntityAuthority,
703 ) -> Result<SqlDispatchResult, QueryError>,
704 ) -> Result<SqlDispatchResult, QueryError> {
705 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
708 let command =
709 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
710
711 return self.execute_sql_aggregate_dispatch_for_authority(
712 command,
713 authority,
714 sql_aggregate_dispatch_label_override(&parsed.statement),
715 );
716 }
717
718 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
719 return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
720 }
721
722 let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
725 parsed,
726 authority,
727 unsupported_message,
728 )?;
729 let grouped_surface = query.has_grouping();
730
731 match query {
732 LoweredSqlQuery::Select(select) => {
733 dispatch_select(self, select, authority, grouped_surface, projection_columns)
734 }
735 LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
736 }
737 }
738
739 fn dispatch_sql_explain_route_for_authority(
743 &self,
744 parsed: &SqlParsedStatement,
745 authority: EntityAuthority,
746 ) -> Result<SqlDispatchResult, QueryError> {
747 if let Some((mode, plan)) =
750 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
751 {
752 return self
753 .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
754 .map(SqlDispatchResult::Explain);
755 }
756
757 let lowered = parsed.lower_query_lane_for_entity(
760 authority.model().name(),
761 authority.model().primary_key.name,
762 )?;
763 if let Some(explain) =
764 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
765 {
766 return Ok(SqlDispatchResult::Explain(explain));
767 }
768
769 self.explain_lowered_sql_for_authority(&lowered, authority)
770 .map(SqlDispatchResult::Explain)
771 }
772
773 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
776 query: &Query<E>,
777 surface: SqlGroupingSurface,
778 ) -> Result<(), QueryError>
779 where
780 E: EntityKind,
781 {
782 match (surface, query.has_grouping()) {
783 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
784 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
785 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
786 ),
787 }
788 }
789
790 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
792 where
793 E: PersistedRow<Canister = C> + EntityValue,
794 {
795 let parsed = self.parse_sql_statement(sql)?;
796
797 self.execute_sql_dispatch_parsed::<E>(&parsed)
798 }
799
800 pub fn execute_sql_dispatch_parsed<E>(
802 &self,
803 parsed: &SqlParsedStatement,
804 ) -> Result<SqlDispatchResult, QueryError>
805 where
806 E: PersistedRow<Canister = C> + EntityValue,
807 {
808 match parsed.route() {
809 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
810 parsed,
811 EntityAuthority::for_type::<E>(),
812 "execute_sql_dispatch accepts SELECT or DELETE only",
813 |session, select, authority, grouped_surface, projection_columns| {
814 if grouped_surface {
815 let columns = projection_columns.ok_or_else(|| {
816 QueryError::unsupported_query(
817 "grouped SQL dispatch requires explicit grouped projection items",
818 )
819 })?;
820
821 return session.execute_lowered_sql_grouped_dispatch_select_core(
822 select, authority, columns,
823 );
824 }
825
826 let payload = session.execute_lowered_sql_projection_core(select, authority)?;
827 if let Some(columns) = projection_columns {
828 let (_, rows, row_count) = payload.into_parts();
829
830 return Ok(SqlProjectionPayload::new(columns, rows, row_count)
831 .into_dispatch_result());
832 }
833
834 Ok(payload.into_dispatch_result())
835 },
836 |session, delete, _authority| {
837 let typed_query = bind_lowered_sql_query::<E>(
838 LoweredSqlQuery::Delete(delete),
839 MissingRowPolicy::Ignore,
840 )
841 .map_err(QueryError::from_sql_lowering_error)?;
842
843 session.execute_typed_sql_delete(&typed_query)
844 },
845 ),
846 SqlStatementRoute::Insert { .. } => {
847 let SqlStatement::Insert(statement) = &parsed.statement else {
848 return Err(QueryError::invariant(
849 "INSERT SQL route must carry parsed INSERT statement",
850 ));
851 };
852
853 self.execute_sql_insert_dispatch::<E>(statement)
854 }
855 SqlStatementRoute::Update { .. } => {
856 let SqlStatement::Update(statement) = &parsed.statement else {
857 return Err(QueryError::invariant(
858 "UPDATE SQL route must carry parsed UPDATE statement",
859 ));
860 };
861
862 self.execute_sql_update_dispatch::<E>(statement)
863 }
864 SqlStatementRoute::Explain { .. } => self
865 .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
866 SqlStatementRoute::Describe { .. } => {
867 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
868 }
869 SqlStatementRoute::ShowIndexes { .. } => {
870 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
871 }
872 SqlStatementRoute::ShowColumns { .. } => {
873 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
874 }
875 SqlStatementRoute::ShowEntities => {
876 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
877 }
878 }
879 }
880
881 #[doc(hidden)]
888 pub fn execute_generated_query_surface_dispatch_for_authority(
889 &self,
890 parsed: &SqlParsedStatement,
891 authority: EntityAuthority,
892 ) -> Result<SqlDispatchResult, QueryError> {
893 match parsed.route() {
894 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
895 parsed,
896 authority,
897 "generated SQL query surface requires query or EXPLAIN statement lanes",
898 |session, select, authority, grouped_surface, projection_columns| {
899 if grouped_surface {
900 let columns = projection_columns.ok_or_else(|| {
901 QueryError::unsupported_query(
902 "grouped SQL dispatch requires explicit grouped projection items",
903 )
904 })?;
905
906 return session
907 .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
908 }
909
910 let result =
911 session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
912 if let Some(columns) = projection_columns {
913 let SqlDispatchResult::ProjectionText {
914 rows, row_count, ..
915 } = result
916 else {
917 return Err(QueryError::invariant(
918 "generated scalar SQL dispatch text path must emit projection text rows",
919 ));
920 };
921
922 return Ok(SqlDispatchResult::ProjectionText {
923 columns,
924 rows,
925 row_count,
926 });
927 }
928
929 Ok(result)
930 },
931 |session, delete, authority| {
932 session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
933 },
934 ),
935 SqlStatementRoute::Explain { .. } => {
936 self.dispatch_sql_explain_route_for_authority(parsed, authority)
937 }
938 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
939 | SqlStatementRoute::Describe { .. }
940 | SqlStatementRoute::ShowIndexes { .. }
941 | SqlStatementRoute::ShowColumns { .. }
942 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
943 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
944 )),
945 }
946 }
947
948 #[doc(hidden)]
954 #[must_use]
955 pub fn execute_generated_query_surface_sql(
956 &self,
957 sql: &str,
958 authorities: &[EntityAuthority],
959 ) -> GeneratedSqlDispatchAttempt {
960 let sql_trimmed = match trim_generated_query_sql_input(sql) {
963 Ok(sql_trimmed) => sql_trimmed,
964 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
965 };
966 let parsed = match self.parse_sql_statement(sql_trimmed) {
967 Ok(parsed) => parsed,
968 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
969 };
970
971 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
974 return GeneratedSqlDispatchAttempt::new(
975 "",
976 None,
977 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
978 authorities,
979 ))),
980 );
981 }
982 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
983 Ok(authority) => authority,
984 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
985 };
986
987 let entity_name = authority.model().name();
991 let explain_order_field = parsed
992 .route()
993 .is_explain()
994 .then_some(authority.model().primary_key.name);
995 let result = match parsed.route() {
996 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
997 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
998 }
999 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1000 Err(QueryError::unsupported_query(
1001 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1002 ))
1003 }
1004 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1005 self.describe_entity_model(authority.model()),
1006 )),
1007 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1008 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1009 )),
1010 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1011 self.show_columns_for_model(authority.model()),
1012 )),
1013 SqlStatementRoute::ShowEntities => unreachable!(
1014 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1015 ),
1016 };
1017
1018 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1019 }
1020}