1mod computed;
8mod lowered;
9
10use crate::{
11 db::{
12 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13 data::UpdatePatch,
14 executor::{EntityAuthority, MutationMode},
15 identifiers_tail_match,
16 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17 session::sql::{
18 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
19 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
20 computed_projection,
21 projection::{
22 SqlProjectionPayload, execute_sql_projection_rows_for_canister,
23 execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
24 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25 },
26 },
27 sql::lowering::{
28 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
29 bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
30 lower_sql_command_from_prepared_statement, prepare_sql_statement,
31 },
32 sql::parser::{
33 SqlAggregateCall, SqlAggregateKind, SqlInsertSource, SqlInsertStatement,
34 SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlSelectItem, SqlSelectStatement,
35 SqlStatement, SqlTextFunction, SqlUpdateStatement,
36 },
37 },
38 model::{entity::resolve_field_slot, field::FieldKind},
39 traits::{CanisterKind, EntityKind, EntityValue},
40 types::{Timestamp, Ulid},
41 value::Value,
42};
43
44#[cfg(feature = "perf-attribution")]
45pub use lowered::LoweredSqlDispatchExecutorAttribution;
46
47#[doc(hidden)]
56pub struct GeneratedSqlDispatchAttempt {
57 entity_name: &'static str,
58 explain_order_field: Option<&'static str>,
59 result: Result<SqlDispatchResult, QueryError>,
60}
61
62impl GeneratedSqlDispatchAttempt {
63 const fn new(
65 entity_name: &'static str,
66 explain_order_field: Option<&'static str>,
67 result: Result<SqlDispatchResult, QueryError>,
68 ) -> Self {
69 Self {
70 entity_name,
71 explain_order_field,
72 result,
73 }
74 }
75
76 #[must_use]
78 pub const fn entity_name(&self) -> &'static str {
79 self.entity_name
80 }
81
82 #[must_use]
84 pub const fn explain_order_field(&self) -> Option<&'static str> {
85 self.explain_order_field
86 }
87
88 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
90 self.result
91 }
92}
93
94#[derive(Clone, Copy, Debug, Eq, PartialEq)]
95pub(in crate::db::session::sql) enum SqlGroupingSurface {
96 Scalar,
97 Grouped,
98}
99
100const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
101 match surface {
102 SqlGroupingSurface::Scalar => {
103 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
104 }
105 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
106 }
107}
108
109fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
112 let sql_trimmed = sql.trim();
113 if sql_trimmed.is_empty() {
114 return Err(QueryError::unsupported_query(
115 "query endpoint requires a non-empty SQL string",
116 ));
117 }
118
119 Ok(sql_trimmed)
120}
121
122fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
125 let mut entities = Vec::with_capacity(authorities.len());
126
127 for authority in authorities {
128 entities.push(authority.model().name().to_string());
129 }
130
131 entities
132}
133
134fn sql_projection_labels_from_select_statement(
137 statement: &SqlStatement,
138) -> Result<Option<Vec<String>>, QueryError> {
139 let SqlStatement::Select(select) = statement else {
140 return Err(QueryError::invariant(
141 "SQL projection labels require SELECT statement shape",
142 ));
143 };
144 let SqlProjection::Items(items) = &select.projection else {
145 return Ok(None);
146 };
147
148 Ok(Some(
149 items
150 .iter()
151 .enumerate()
152 .map(|(index, item)| {
153 select
154 .projection_alias(index)
155 .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
156 })
157 .collect(),
158 ))
159}
160
161fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
164 match item {
165 SqlSelectItem::Field(field) => field.clone(),
166 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
167 SqlSelectItem::TextFunction(call) => {
168 format!(
169 "{}({})",
170 grouped_sql_text_function_name(call.function),
171 call.field
172 )
173 }
174 }
175}
176
177fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
180 let SqlStatement::Select(select) = statement else {
181 return None;
182 };
183
184 select.projection_alias(0).map(str::to_string)
185}
186
187fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
189 let kind = match aggregate.kind {
190 SqlAggregateKind::Count => "COUNT",
191 SqlAggregateKind::Sum => "SUM",
192 SqlAggregateKind::Avg => "AVG",
193 SqlAggregateKind::Min => "MIN",
194 SqlAggregateKind::Max => "MAX",
195 };
196
197 match aggregate.field.as_deref() {
198 Some(field) => format!("{kind}({field})"),
199 None => format!("{kind}(*)"),
200 }
201}
202
203const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
206 match function {
207 SqlTextFunction::Trim => "TRIM",
208 SqlTextFunction::Ltrim => "LTRIM",
209 SqlTextFunction::Rtrim => "RTRIM",
210 SqlTextFunction::Lower => "LOWER",
211 SqlTextFunction::Upper => "UPPER",
212 SqlTextFunction::Length => "LENGTH",
213 SqlTextFunction::Left => "LEFT",
214 SqlTextFunction::Right => "RIGHT",
215 SqlTextFunction::StartsWith => "STARTS_WITH",
216 SqlTextFunction::EndsWith => "ENDS_WITH",
217 SqlTextFunction::Contains => "CONTAINS",
218 SqlTextFunction::Position => "POSITION",
219 SqlTextFunction::Replace => "REPLACE",
220 SqlTextFunction::Substring => "SUBSTRING",
221 }
222}
223
224fn authority_for_generated_sql_route(
226 route: &SqlStatementRoute,
227 authorities: &[EntityAuthority],
228) -> Result<EntityAuthority, QueryError> {
229 let sql_entity = route.entity();
230
231 for authority in authorities {
232 if identifiers_tail_match(sql_entity, authority.model().name()) {
233 return Ok(*authority);
234 }
235 }
236
237 Err(unsupported_generated_sql_entity_error(
238 sql_entity,
239 authorities,
240 ))
241}
242
243fn unsupported_generated_sql_entity_error(
246 entity_name: &str,
247 authorities: &[EntityAuthority],
248) -> QueryError {
249 let mut supported = String::new();
250
251 for (index, authority) in authorities.iter().enumerate() {
252 if index != 0 {
253 supported.push_str(", ");
254 }
255
256 supported.push_str(authority.model().name());
257 }
258
259 QueryError::unsupported_query(format!(
260 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
261 ))
262}
263
264fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
267where
268 E: EntityKind,
269{
270 if identifiers_tail_match(sql_entity, E::MODEL.name()) {
271 return Ok(());
272 }
273
274 Err(QueryError::from_sql_lowering_error(
275 SqlLoweringError::EntityMismatch {
276 sql_entity: sql_entity.to_string(),
277 expected_entity: E::MODEL.name(),
278 },
279 ))
280}
281
282fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
285where
286 E: EntityKind,
287{
288 if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
289 return Ok(key);
290 }
291
292 let widened = match value {
293 Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
294 Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
295 _ => {
296 return Err(QueryError::unsupported_query(format!(
297 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
298 )));
299 }
300 };
301
302 <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
303 QueryError::unsupported_query(format!(
304 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
305 ))
306 })
307}
308
309fn sql_write_generated_primary_key_value<E>() -> Option<Value>
313where
314 E: EntityKind,
315{
316 matches!(E::MODEL.primary_key.kind(), FieldKind::Ulid).then(|| Value::Ulid(Ulid::generate()))
317}
318
319fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
322where
323 E: EntityKind,
324{
325 let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
326 QueryError::invariant("SQL write field must resolve against the target entity model")
327 })?;
328 let field_kind = E::MODEL.fields()[field_slot].kind();
329
330 let normalized = match (field_kind, value) {
331 (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
332 (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
333 Value::Int(v.cast_signed())
334 }
335 _ => value.clone(),
336 };
337
338 Ok(normalized)
339}
340
341fn sql_write_system_timestamp_fields<E>() -> Option<(&'static str, &'static str)>
345where
346 E: EntityKind,
347{
348 if resolve_field_slot(E::MODEL, "created_at").is_some()
349 && resolve_field_slot(E::MODEL, "updated_at").is_some()
350 {
351 return Some(("created_at", "updated_at"));
352 }
353
354 None
355}
356
357fn sql_insert_source_width_hint<E>(source: &SqlInsertSource) -> Option<usize>
362where
363 E: EntityKind,
364{
365 match source {
366 SqlInsertSource::Values(values) => values.first().map(Vec::len),
367 SqlInsertSource::Select(select) => match &select.projection {
368 SqlProjection::All => Some(
369 E::MODEL
370 .fields()
371 .iter()
372 .filter(|field| {
373 !matches!(
374 sql_write_system_timestamp_fields::<E>(),
375 Some((created_at, updated_at))
376 if field.name() == created_at || field.name() == updated_at
377 )
378 })
379 .count(),
380 ),
381 SqlProjection::Items(items) => Some(items.len()),
382 },
383 }
384}
385
386fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
391where
392 E: EntityKind,
393{
394 if !statement.columns.is_empty() {
395 return statement.columns.clone();
396 }
397
398 let timestamp_fields = sql_write_system_timestamp_fields::<E>();
399
400 let columns: Vec<String> = E::MODEL
401 .fields()
402 .iter()
403 .filter(|field| {
404 !matches!(
405 timestamp_fields,
406 Some((created_at, updated_at))
407 if field.name() == created_at || field.name() == updated_at
408 )
409 })
410 .map(|field| field.name().to_string())
411 .collect();
412
413 let pk_name = E::MODEL.primary_key.name;
414 if sql_write_generated_primary_key_value::<E>().is_none() {
415 return columns;
416 }
417
418 let generated_key_omitted_columns: Vec<String> = columns
419 .iter()
420 .filter(|field| field.as_str() != pk_name)
421 .cloned()
422 .collect();
423 let first_width = sql_insert_source_width_hint::<E>(&statement.source);
424
425 if first_width == Some(generated_key_omitted_columns.len()) {
426 return generated_key_omitted_columns;
427 }
428
429 columns
430}
431
432fn validate_sql_insert_value_tuple_lengths(
435 columns: &[String],
436 values: &[Vec<Value>],
437) -> Result<(), QueryError> {
438 for tuple in values {
439 if tuple.len() != columns.len() {
440 return Err(QueryError::from_sql_parse_error(
441 crate::db::sql::parser::SqlParseError::invalid_syntax(
442 "INSERT column list and VALUES tuple length must match",
443 ),
444 ));
445 }
446 }
447
448 Ok(())
449}
450
451fn validate_sql_insert_selected_rows(
454 columns: &[String],
455 rows: &[Vec<Value>],
456) -> Result<(), QueryError> {
457 for row in rows {
458 if row.len() != columns.len() {
459 return Err(QueryError::unsupported_query(
460 "SQL INSERT SELECT projection width must match the target INSERT column list in this release",
461 ));
462 }
463 }
464
465 Ok(())
466}
467
468impl<C: CanisterKind> DbSession<C> {
469 fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
472 where
473 E: PersistedRow<Canister = C> + EntityValue,
474 {
475 let mut row = Vec::with_capacity(E::MODEL.fields().len());
476
477 for index in 0..E::MODEL.fields().len() {
478 let value = entity.get_value_by_index(index).ok_or_else(|| {
479 QueryError::invariant(
480 "SQL write dispatch projection row must include every declared field",
481 )
482 })?;
483 row.push(value);
484 }
485
486 Ok(row)
487 }
488
489 fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
493 where
494 E: PersistedRow<Canister = C> + EntityValue,
495 {
496 let columns = projection_labels_from_fields(E::MODEL.fields());
497 let rows = entities
498 .into_iter()
499 .map(Self::sql_write_dispatch_row)
500 .collect::<Result<Vec<_>, _>>()?;
501 let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
502
503 Ok(SqlDispatchResult::Projection {
504 columns,
505 rows,
506 row_count,
507 })
508 }
509
510 fn sql_insert_patch_and_key<E>(
513 columns: &[String],
514 values: &[Value],
515 ) -> Result<(E::Key, UpdatePatch), QueryError>
516 where
517 E: PersistedRow<Canister = C> + EntityValue,
518 {
519 let pk_name = E::MODEL.primary_key.name;
523 let generated_pk = sql_write_generated_primary_key_value::<E>();
524 let (key, generated_pk_value) =
525 if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
526 let pk_value = values.get(pk_index).ok_or_else(|| {
527 QueryError::invariant(
528 "INSERT primary key column must align with one VALUES literal",
529 )
530 })?;
531 (sql_write_key_from_literal::<E>(pk_value, pk_name)?, None)
532 } else if let Some(pk_value) = generated_pk {
533 (
534 sql_write_key_from_literal::<E>(&pk_value, pk_name)?,
535 Some(pk_value),
536 )
537 } else {
538 return Err(QueryError::unsupported_query(format!(
539 "SQL INSERT requires primary key column '{pk_name}' in this release"
540 )));
541 };
542
543 let mut patch = UpdatePatch::new();
546 if let Some(pk_value) = generated_pk_value {
547 patch = patch
548 .set_field(E::MODEL, pk_name, pk_value)
549 .map_err(QueryError::execute)?;
550 }
551 for (field, value) in columns.iter().zip(values.iter()) {
552 let normalized = sql_write_value_for_field::<E>(field, value)?;
553 patch = patch
554 .set_field(E::MODEL, field, normalized)
555 .map_err(QueryError::execute)?;
556 }
557
558 if let Some((created_at, updated_at)) = sql_write_system_timestamp_fields::<E>() {
561 let now = Value::Timestamp(Timestamp::now());
562 patch = patch
563 .set_field(E::MODEL, created_at, now.clone())
564 .map_err(QueryError::execute)?;
565 patch = patch
566 .set_field(E::MODEL, updated_at, now)
567 .map_err(QueryError::execute)?;
568 }
569
570 Ok((key, patch))
571 }
572
573 fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
576 where
577 E: PersistedRow<Canister = C> + EntityValue,
578 {
579 let pk_name = E::MODEL.primary_key.name;
582 let mut patch = UpdatePatch::new();
583 for assignment in &statement.assignments {
584 if assignment.field == pk_name {
585 return Err(QueryError::unsupported_query(format!(
586 "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
587 )));
588 }
589 let normalized =
590 sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
591
592 patch = patch
593 .set_field(E::MODEL, assignment.field.as_str(), normalized)
594 .map_err(QueryError::execute)?;
595 }
596
597 if let Some((_, updated_at)) = sql_write_system_timestamp_fields::<E>() {
600 patch = patch
601 .set_field(E::MODEL, updated_at, Value::Timestamp(Timestamp::now()))
602 .map_err(QueryError::execute)?;
603 }
604
605 Ok(patch)
606 }
607
608 fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
610 where
611 E: PersistedRow<Canister = C> + EntityValue,
612 {
613 let Some(predicate) = statement.predicate.clone() else {
617 return Err(QueryError::unsupported_query(
618 "SQL UPDATE requires WHERE predicate in this release",
619 ));
620 };
621 let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
622 let pk_name = E::MODEL.primary_key.name;
623 let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
624
625 if statement.order_by.is_empty() {
629 selector = selector.order_by(pk_name);
630 } else {
631 let mut orders_primary_key = false;
632
633 for term in &statement.order_by {
634 if term.field == pk_name {
635 orders_primary_key = true;
636 }
637 selector = match term.direction {
638 SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
639 SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
640 };
641 }
642
643 if !orders_primary_key {
644 selector = selector.order_by(pk_name);
645 }
646 }
647
648 if let Some(limit) = statement.limit {
651 selector = selector.limit(limit);
652 }
653 if let Some(offset) = statement.offset {
654 selector = selector.offset(offset);
655 }
656
657 Ok(selector)
658 }
659
660 fn sql_insert_select_source_statement<E>(
664 statement: &SqlInsertStatement,
665 ) -> Result<SqlSelectStatement, QueryError>
666 where
667 E: PersistedRow<Canister = C> + EntityValue,
668 {
669 let SqlInsertSource::Select(select) = statement.source.clone() else {
670 return Err(QueryError::invariant(
671 "INSERT SELECT source validation requires parsed SELECT source",
672 ));
673 };
674 let mut select = *select;
675 ensure_sql_write_entity_matches::<E>(select.entity.as_str())?;
676
677 if !select.group_by.is_empty() || !select.having.is_empty() {
678 return Err(QueryError::unsupported_query(
679 "SQL INSERT SELECT requires scalar SELECT source in this release",
680 ));
681 }
682
683 if let SqlProjection::Items(items) = &select.projection {
684 for item in items {
685 if matches!(item, SqlSelectItem::Aggregate(_)) {
686 return Err(QueryError::unsupported_query(
687 "SQL INSERT SELECT does not support aggregate source projection in this release",
688 ));
689 }
690 }
691 }
692
693 let pk_name = E::MODEL.primary_key.name;
694 if select.order_by.is_empty() || !select.order_by.iter().any(|term| term.field == pk_name) {
695 select.order_by.push(SqlOrderTerm {
696 field: pk_name.to_string(),
697 direction: SqlOrderDirection::Asc,
698 });
699 }
700
701 Ok(select)
702 }
703
704 fn execute_sql_insert_select_source_rows<E>(
708 &self,
709 source: &SqlSelectStatement,
710 ) -> Result<Vec<Vec<Value>>, QueryError>
711 where
712 E: PersistedRow<Canister = C> + EntityValue,
713 {
714 if let Some(plan) = computed_projection::computed_sql_projection_plan(
718 &SqlStatement::Select(source.clone()),
719 )? {
720 let result = self.execute_computed_sql_projection_dispatch_for_authority(
721 plan,
722 EntityAuthority::for_type::<E>(),
723 )?;
724
725 return match result {
726 SqlDispatchResult::Projection { rows, .. } => Ok(rows),
727 other => Err(QueryError::invariant(format!(
728 "INSERT SELECT computed source must produce projection rows, found {other:?}",
729 ))),
730 };
731 }
732
733 let prepared = prepare_sql_statement(SqlStatement::Select(source.clone()), E::MODEL.name())
736 .map_err(QueryError::from_sql_lowering_error)?;
737 let lowered =
738 lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)
739 .map_err(QueryError::from_sql_lowering_error)?;
740 let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
741 return Err(QueryError::invariant(
742 "INSERT SELECT source lowering must stay on the scalar SELECT query lane",
743 ));
744 };
745
746 let payload =
747 self.execute_lowered_sql_projection_core(select, EntityAuthority::for_type::<E>())?;
748 let (_, rows, _) = payload.into_parts();
749
750 Ok(rows)
751 }
752
753 fn execute_sql_insert_dispatch<E>(
756 &self,
757 statement: &SqlInsertStatement,
758 ) -> Result<SqlDispatchResult, QueryError>
759 where
760 E: PersistedRow<Canister = C> + EntityValue,
761 {
762 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
763 let columns = sql_insert_columns::<E>(statement);
764 let source_rows = match &statement.source {
765 SqlInsertSource::Values(values) => {
766 validate_sql_insert_value_tuple_lengths(columns.as_slice(), values.as_slice())?;
767 values.clone()
768 }
769 SqlInsertSource::Select(_) => {
770 let source = Self::sql_insert_select_source_statement::<E>(statement)?;
771 let rows = self.execute_sql_insert_select_source_rows::<E>(&source)?;
772 validate_sql_insert_selected_rows(columns.as_slice(), rows.as_slice())?;
773
774 rows
775 }
776 };
777 let mut entities = Vec::with_capacity(source_rows.len());
778
779 for values in &source_rows {
780 let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
781 let entity = self
782 .mutate_structural::<E>(key, patch, MutationMode::Insert)
783 .map_err(QueryError::execute)?;
784 entities.push(entity);
785 }
786
787 Self::sql_write_dispatch_projection(entities)
788 }
789
790 fn execute_sql_update_dispatch<E>(
794 &self,
795 statement: &SqlUpdateStatement,
796 ) -> Result<SqlDispatchResult, QueryError>
797 where
798 E: PersistedRow<Canister = C> + EntityValue,
799 {
800 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
801 let selector = Self::sql_update_selector_query::<E>(statement)?;
802 let patch = Self::sql_update_patch::<E>(statement)?;
803 let matched = self.execute_query(&selector)?;
804 let mut entities = Vec::with_capacity(matched.len());
805
806 for entity in matched.entities() {
809 let updated = self
810 .mutate_structural::<E>(entity.id().key(), patch.clone(), MutationMode::Update)
811 .map_err(QueryError::execute)?;
812 entities.push(updated);
813 }
814
815 Self::sql_write_dispatch_projection(entities)
816 }
817
818 fn prepare_structural_sql_projection_execution(
821 &self,
822 query: StructuralQuery,
823 authority: EntityAuthority,
824 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
825 let (_, plan) =
828 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
829 let projection = plan.projection_spec(authority.model());
830 let columns = projection_labels_from_projection_spec(&projection);
831
832 Ok((columns, plan))
833 }
834
835 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
839 &self,
840 query: StructuralQuery,
841 authority: EntityAuthority,
842 ) -> Result<SqlProjectionPayload, QueryError> {
843 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
845
846 let projected =
849 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
850 .map_err(QueryError::execute)?;
851 let (rows, row_count) = projected.into_parts();
852
853 Ok(SqlProjectionPayload::new(columns, rows, row_count))
854 }
855
856 fn execute_structural_sql_projection_text(
860 &self,
861 query: StructuralQuery,
862 authority: EntityAuthority,
863 ) -> Result<SqlDispatchResult, QueryError> {
864 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
866
867 let projected =
870 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
871 .map_err(QueryError::execute)?;
872 let (rows, row_count) = projected.into_parts();
873
874 Ok(SqlDispatchResult::ProjectionText {
875 columns,
876 rows,
877 row_count,
878 })
879 }
880
881 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
885 where
886 E: PersistedRow<Canister = C> + EntityValue,
887 {
888 let plan = self
889 .compile_query_with_visible_indexes(query)?
890 .into_prepared_execution_plan();
891 let deleted = self
892 .with_metrics(|| {
893 self.delete_executor::<E>()
894 .execute_structural_projection(plan)
895 })
896 .map_err(QueryError::execute)?;
897 let (rows, row_count) = deleted.into_parts();
898 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
899
900 Ok(SqlProjectionPayload::new(
901 projection_labels_from_fields(E::MODEL.fields()),
902 rows,
903 row_count,
904 )
905 .into_dispatch_result())
906 }
907
908 fn lowered_sql_query_dispatch_inputs_for_authority(
911 parsed: &SqlParsedStatement,
912 authority: EntityAuthority,
913 unsupported_message: &'static str,
914 ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
915 let lowered = parsed.lower_query_lane_for_entity(
916 authority.model().name(),
917 authority.model().primary_key.name,
918 )?;
919 let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
920 .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
921 .transpose()?;
922 let query = lowered
923 .into_query()
924 .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
925
926 Ok((query, projection_columns.flatten()))
927 }
928
929 fn dispatch_sql_query_route_for_authority(
933 &self,
934 parsed: &SqlParsedStatement,
935 authority: EntityAuthority,
936 unsupported_message: &'static str,
937 dispatch_select: impl FnOnce(
938 &Self,
939 LoweredSelectShape,
940 EntityAuthority,
941 bool,
942 Option<Vec<String>>,
943 ) -> Result<SqlDispatchResult, QueryError>,
944 dispatch_delete: impl FnOnce(
945 &Self,
946 LoweredBaseQueryShape,
947 EntityAuthority,
948 ) -> Result<SqlDispatchResult, QueryError>,
949 ) -> Result<SqlDispatchResult, QueryError> {
950 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
953 let command =
954 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
955
956 return self.execute_sql_aggregate_dispatch_for_authority(
957 command,
958 authority,
959 sql_aggregate_dispatch_label_override(&parsed.statement),
960 );
961 }
962
963 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
964 return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
965 }
966
967 let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
970 parsed,
971 authority,
972 unsupported_message,
973 )?;
974 let grouped_surface = query.has_grouping();
975
976 match query {
977 LoweredSqlQuery::Select(select) => {
978 dispatch_select(self, select, authority, grouped_surface, projection_columns)
979 }
980 LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
981 }
982 }
983
984 fn dispatch_sql_explain_route_for_authority(
988 &self,
989 parsed: &SqlParsedStatement,
990 authority: EntityAuthority,
991 ) -> Result<SqlDispatchResult, QueryError> {
992 if let Some((mode, plan)) =
995 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
996 {
997 return self
998 .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
999 .map(SqlDispatchResult::Explain);
1000 }
1001
1002 let lowered = parsed.lower_query_lane_for_entity(
1005 authority.model().name(),
1006 authority.model().primary_key.name,
1007 )?;
1008 if let Some(explain) =
1009 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
1010 {
1011 return Ok(SqlDispatchResult::Explain(explain));
1012 }
1013
1014 self.explain_lowered_sql_for_authority(&lowered, authority)
1015 .map(SqlDispatchResult::Explain)
1016 }
1017
1018 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
1021 query: &Query<E>,
1022 surface: SqlGroupingSurface,
1023 ) -> Result<(), QueryError>
1024 where
1025 E: EntityKind,
1026 {
1027 match (surface, query.has_grouping()) {
1028 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
1029 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
1030 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
1031 ),
1032 }
1033 }
1034
1035 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
1037 where
1038 E: PersistedRow<Canister = C> + EntityValue,
1039 {
1040 let parsed = self.parse_sql_statement(sql)?;
1041
1042 self.execute_sql_dispatch_parsed::<E>(&parsed)
1043 }
1044
1045 pub fn execute_sql_dispatch_parsed<E>(
1047 &self,
1048 parsed: &SqlParsedStatement,
1049 ) -> Result<SqlDispatchResult, QueryError>
1050 where
1051 E: PersistedRow<Canister = C> + EntityValue,
1052 {
1053 match parsed.route() {
1054 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1055 parsed,
1056 EntityAuthority::for_type::<E>(),
1057 "execute_sql_dispatch accepts SELECT or DELETE only",
1058 |session, select, authority, grouped_surface, projection_columns| {
1059 if grouped_surface {
1060 let columns = projection_columns.ok_or_else(|| {
1061 QueryError::unsupported_query(
1062 "grouped SQL dispatch requires explicit grouped projection items",
1063 )
1064 })?;
1065
1066 return session.execute_lowered_sql_grouped_dispatch_select_core(
1067 select, authority, columns,
1068 );
1069 }
1070
1071 let payload = session.execute_lowered_sql_projection_core(select, authority)?;
1072 if let Some(columns) = projection_columns {
1073 let (_, rows, row_count) = payload.into_parts();
1074
1075 return Ok(SqlProjectionPayload::new(columns, rows, row_count)
1076 .into_dispatch_result());
1077 }
1078
1079 Ok(payload.into_dispatch_result())
1080 },
1081 |session, delete, _authority| {
1082 let typed_query = bind_lowered_sql_query::<E>(
1083 LoweredSqlQuery::Delete(delete),
1084 MissingRowPolicy::Ignore,
1085 )
1086 .map_err(QueryError::from_sql_lowering_error)?;
1087
1088 session.execute_typed_sql_delete(&typed_query)
1089 },
1090 ),
1091 SqlStatementRoute::Insert { .. } => {
1092 let SqlStatement::Insert(statement) = &parsed.statement else {
1093 return Err(QueryError::invariant(
1094 "INSERT SQL route must carry parsed INSERT statement",
1095 ));
1096 };
1097
1098 self.execute_sql_insert_dispatch::<E>(statement)
1099 }
1100 SqlStatementRoute::Update { .. } => {
1101 let SqlStatement::Update(statement) = &parsed.statement else {
1102 return Err(QueryError::invariant(
1103 "UPDATE SQL route must carry parsed UPDATE statement",
1104 ));
1105 };
1106
1107 self.execute_sql_update_dispatch::<E>(statement)
1108 }
1109 SqlStatementRoute::Explain { .. } => self
1110 .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
1111 SqlStatementRoute::Describe { .. } => {
1112 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
1113 }
1114 SqlStatementRoute::ShowIndexes { .. } => {
1115 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
1116 }
1117 SqlStatementRoute::ShowColumns { .. } => {
1118 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
1119 }
1120 SqlStatementRoute::ShowEntities => {
1121 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
1122 }
1123 }
1124 }
1125
1126 #[doc(hidden)]
1133 pub fn execute_generated_query_surface_dispatch_for_authority(
1134 &self,
1135 parsed: &SqlParsedStatement,
1136 authority: EntityAuthority,
1137 ) -> Result<SqlDispatchResult, QueryError> {
1138 match parsed.route() {
1139 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1140 parsed,
1141 authority,
1142 "generated SQL query surface requires query or EXPLAIN statement lanes",
1143 |session, select, authority, grouped_surface, projection_columns| {
1144 if grouped_surface {
1145 let columns = projection_columns.ok_or_else(|| {
1146 QueryError::unsupported_query(
1147 "grouped SQL dispatch requires explicit grouped projection items",
1148 )
1149 })?;
1150
1151 return session
1152 .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
1153 }
1154
1155 let result =
1156 session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
1157 if let Some(columns) = projection_columns {
1158 let SqlDispatchResult::ProjectionText {
1159 rows, row_count, ..
1160 } = result
1161 else {
1162 return Err(QueryError::invariant(
1163 "generated scalar SQL dispatch text path must emit projection text rows",
1164 ));
1165 };
1166
1167 return Ok(SqlDispatchResult::ProjectionText {
1168 columns,
1169 rows,
1170 row_count,
1171 });
1172 }
1173
1174 Ok(result)
1175 },
1176 |session, delete, authority| {
1177 session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
1178 },
1179 ),
1180 SqlStatementRoute::Explain { .. } => {
1181 self.dispatch_sql_explain_route_for_authority(parsed, authority)
1182 }
1183 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
1184 | SqlStatementRoute::Describe { .. }
1185 | SqlStatementRoute::ShowIndexes { .. }
1186 | SqlStatementRoute::ShowColumns { .. }
1187 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
1188 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1189 )),
1190 }
1191 }
1192
1193 #[doc(hidden)]
1199 #[must_use]
1200 pub fn execute_generated_query_surface_sql(
1201 &self,
1202 sql: &str,
1203 authorities: &[EntityAuthority],
1204 ) -> GeneratedSqlDispatchAttempt {
1205 let sql_trimmed = match trim_generated_query_sql_input(sql) {
1208 Ok(sql_trimmed) => sql_trimmed,
1209 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1210 };
1211 let parsed = match self.parse_sql_statement(sql_trimmed) {
1212 Ok(parsed) => parsed,
1213 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1214 };
1215
1216 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
1219 return GeneratedSqlDispatchAttempt::new(
1220 "",
1221 None,
1222 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
1223 authorities,
1224 ))),
1225 );
1226 }
1227 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1228 Ok(authority) => authority,
1229 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1230 };
1231
1232 let entity_name = authority.model().name();
1236 let explain_order_field = parsed
1237 .route()
1238 .is_explain()
1239 .then_some(authority.model().primary_key.name);
1240 let result = match parsed.route() {
1241 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1242 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1243 }
1244 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1245 Err(QueryError::unsupported_query(
1246 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1247 ))
1248 }
1249 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1250 self.describe_entity_model(authority.model()),
1251 )),
1252 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1253 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1254 )),
1255 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1256 self.show_columns_for_model(authority.model()),
1257 )),
1258 SqlStatementRoute::ShowEntities => unreachable!(
1259 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1260 ),
1261 };
1262
1263 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1264 }
1265}