1mod computed;
8mod lowered;
9
10use crate::{
11 db::{
12 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13 data::UpdatePatch,
14 executor::{EntityAuthority, MutationMode},
15 identifiers_tail_match,
16 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17 session::sql::{
18 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
19 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
20 computed_projection,
21 projection::{
22 SqlProjectionPayload, execute_sql_projection_rows_for_canister,
23 execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
24 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25 },
26 },
27 sql::lowering::{
28 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
29 bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
30 lower_sql_command_from_prepared_statement, prepare_sql_statement,
31 },
32 sql::parser::{
33 SqlAggregateCall, SqlAggregateKind, SqlInsertSource, SqlInsertStatement,
34 SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlSelectItem, SqlSelectStatement,
35 SqlStatement, SqlTextFunction, SqlUpdateStatement,
36 },
37 },
38 model::{
39 entity::resolve_field_slot,
40 field::{FieldInsertGeneration, FieldKind, FieldModel},
41 },
42 sanitize::{SanitizeWriteContext, SanitizeWriteMode},
43 traits::{CanisterKind, EntityKind, EntityValue},
44 types::{Timestamp, Ulid},
45 value::Value,
46};
47
48#[cfg(feature = "perf-attribution")]
49pub use lowered::LoweredSqlDispatchExecutorAttribution;
50
51#[doc(hidden)]
60pub struct GeneratedSqlDispatchAttempt {
61 entity_name: &'static str,
62 explain_order_field: Option<&'static str>,
63 result: Result<SqlDispatchResult, QueryError>,
64}
65
66impl GeneratedSqlDispatchAttempt {
67 const fn new(
69 entity_name: &'static str,
70 explain_order_field: Option<&'static str>,
71 result: Result<SqlDispatchResult, QueryError>,
72 ) -> Self {
73 Self {
74 entity_name,
75 explain_order_field,
76 result,
77 }
78 }
79
80 #[must_use]
82 pub const fn entity_name(&self) -> &'static str {
83 self.entity_name
84 }
85
86 #[must_use]
88 pub const fn explain_order_field(&self) -> Option<&'static str> {
89 self.explain_order_field
90 }
91
92 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
94 self.result
95 }
96}
97
98#[derive(Clone, Copy, Debug, Eq, PartialEq)]
99pub(in crate::db::session::sql) enum SqlGroupingSurface {
100 Scalar,
101 Grouped,
102}
103
104const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
105 match surface {
106 SqlGroupingSurface::Scalar => {
107 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
108 }
109 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
110 }
111}
112
113fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
116 let sql_trimmed = sql.trim();
117 if sql_trimmed.is_empty() {
118 return Err(QueryError::unsupported_query(
119 "query endpoint requires a non-empty SQL string",
120 ));
121 }
122
123 Ok(sql_trimmed)
124}
125
126fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
129 let mut entities = Vec::with_capacity(authorities.len());
130
131 for authority in authorities {
132 entities.push(authority.model().name().to_string());
133 }
134
135 entities
136}
137
138fn sql_projection_labels_from_select_statement(
141 statement: &SqlStatement,
142) -> Result<Option<Vec<String>>, QueryError> {
143 let SqlStatement::Select(select) = statement else {
144 return Err(QueryError::invariant(
145 "SQL projection labels require SELECT statement shape",
146 ));
147 };
148 let SqlProjection::Items(items) = &select.projection else {
149 return Ok(None);
150 };
151
152 Ok(Some(
153 items
154 .iter()
155 .enumerate()
156 .map(|(index, item)| {
157 select
158 .projection_alias(index)
159 .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
160 })
161 .collect(),
162 ))
163}
164
165fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
168 match item {
169 SqlSelectItem::Field(field) => field.clone(),
170 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
171 SqlSelectItem::TextFunction(call) => {
172 format!(
173 "{}({})",
174 grouped_sql_text_function_name(call.function),
175 call.field
176 )
177 }
178 }
179}
180
181fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
184 let SqlStatement::Select(select) = statement else {
185 return None;
186 };
187
188 select.projection_alias(0).map(str::to_string)
189}
190
191fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
193 let kind = match aggregate.kind {
194 SqlAggregateKind::Count => "COUNT",
195 SqlAggregateKind::Sum => "SUM",
196 SqlAggregateKind::Avg => "AVG",
197 SqlAggregateKind::Min => "MIN",
198 SqlAggregateKind::Max => "MAX",
199 };
200
201 match aggregate.field.as_deref() {
202 Some(field) => format!("{kind}({field})"),
203 None => format!("{kind}(*)"),
204 }
205}
206
207const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
210 match function {
211 SqlTextFunction::Trim => "TRIM",
212 SqlTextFunction::Ltrim => "LTRIM",
213 SqlTextFunction::Rtrim => "RTRIM",
214 SqlTextFunction::Lower => "LOWER",
215 SqlTextFunction::Upper => "UPPER",
216 SqlTextFunction::Length => "LENGTH",
217 SqlTextFunction::Left => "LEFT",
218 SqlTextFunction::Right => "RIGHT",
219 SqlTextFunction::StartsWith => "STARTS_WITH",
220 SqlTextFunction::EndsWith => "ENDS_WITH",
221 SqlTextFunction::Contains => "CONTAINS",
222 SqlTextFunction::Position => "POSITION",
223 SqlTextFunction::Replace => "REPLACE",
224 SqlTextFunction::Substring => "SUBSTRING",
225 }
226}
227
228fn authority_for_generated_sql_route(
230 route: &SqlStatementRoute,
231 authorities: &[EntityAuthority],
232) -> Result<EntityAuthority, QueryError> {
233 let sql_entity = route.entity();
234
235 for authority in authorities {
236 if identifiers_tail_match(sql_entity, authority.model().name()) {
237 return Ok(*authority);
238 }
239 }
240
241 Err(unsupported_generated_sql_entity_error(
242 sql_entity,
243 authorities,
244 ))
245}
246
247fn unsupported_generated_sql_entity_error(
250 entity_name: &str,
251 authorities: &[EntityAuthority],
252) -> QueryError {
253 let mut supported = String::new();
254
255 for (index, authority) in authorities.iter().enumerate() {
256 if index != 0 {
257 supported.push_str(", ");
258 }
259
260 supported.push_str(authority.model().name());
261 }
262
263 QueryError::unsupported_query(format!(
264 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
265 ))
266}
267
268fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
271where
272 E: EntityKind,
273{
274 if identifiers_tail_match(sql_entity, E::MODEL.name()) {
275 return Ok(());
276 }
277
278 Err(QueryError::from_sql_lowering_error(
279 SqlLoweringError::EntityMismatch {
280 sql_entity: sql_entity.to_string(),
281 expected_entity: E::MODEL.name(),
282 },
283 ))
284}
285
286fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
289where
290 E: EntityKind,
291{
292 if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
293 return Ok(key);
294 }
295
296 let widened = match value {
297 Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
298 Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
299 _ => {
300 return Err(QueryError::unsupported_query(format!(
301 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
302 )));
303 }
304 };
305
306 <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
307 QueryError::unsupported_query(format!(
308 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
309 ))
310 })
311}
312
313fn sql_write_generated_field_value(field: &FieldModel) -> Option<Value> {
316 field
317 .insert_generation()
318 .map(|generation| match generation {
319 FieldInsertGeneration::Ulid => Value::Ulid(Ulid::generate()),
320 FieldInsertGeneration::Timestamp => Value::Timestamp(Timestamp::now()),
321 })
322}
323
324fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
327where
328 E: EntityKind,
329{
330 let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
331 QueryError::invariant("SQL write field must resolve against the target entity model")
332 })?;
333 let field_kind = E::MODEL.fields()[field_slot].kind();
334
335 let normalized = match (field_kind, value) {
336 (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
337 (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
338 Value::Int(v.cast_signed())
339 }
340 _ => value.clone(),
341 };
342
343 Ok(normalized)
344}
345
346fn reject_explicit_sql_write_to_managed_field<E>(
349 field_name: &str,
350 statement_kind: &str,
351) -> Result<(), QueryError>
352where
353 E: EntityKind,
354{
355 let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
356 return Ok(());
357 };
358 let field = &E::MODEL.fields()[field_slot];
359
360 if field.write_management().is_some() {
361 return Err(QueryError::unsupported_query(format!(
362 "SQL {statement_kind} does not allow explicit writes to managed field '{field_name}' in this release"
363 )));
364 }
365
366 Ok(())
367}
368
369fn sql_insert_field_is_omittable(field: &FieldModel) -> bool {
372 if sql_write_generated_field_value(field).is_some() {
373 return true;
374 }
375
376 field.write_management().is_some()
377}
378
379fn validate_sql_insert_required_fields<E>(columns: &[String]) -> Result<(), QueryError>
382where
383 E: EntityKind,
384{
385 let missing_required_fields = E::MODEL
386 .fields()
387 .iter()
388 .filter(|field| !columns.iter().any(|column| column == field.name()))
389 .filter(|field| !sql_insert_field_is_omittable(field))
390 .map(FieldModel::name)
391 .collect::<Vec<_>>();
392
393 if missing_required_fields.is_empty() {
394 return Ok(());
395 }
396
397 if missing_required_fields.len() == 1
398 && missing_required_fields[0] == E::MODEL.primary_key.name()
399 {
400 return Err(QueryError::unsupported_query(format!(
401 "SQL INSERT requires primary key column '{}' in this release",
402 E::MODEL.primary_key.name()
403 )));
404 }
405
406 Err(QueryError::unsupported_query(format!(
407 "SQL INSERT requires explicit values for non-generated fields {} in this release",
408 missing_required_fields.join(", ")
409 )))
410}
411
412fn sql_insert_source_width_hint<E>(source: &SqlInsertSource) -> Option<usize>
417where
418 E: EntityKind,
419{
420 match source {
421 SqlInsertSource::Values(values) => values.first().map(Vec::len),
422 SqlInsertSource::Select(select) => match &select.projection {
423 SqlProjection::All => Some(
424 E::MODEL
425 .fields()
426 .iter()
427 .filter(|field| field.write_management().is_none())
428 .count(),
429 ),
430 SqlProjection::Items(items) => Some(items.len()),
431 },
432 }
433}
434
435fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
440where
441 E: EntityKind,
442{
443 if !statement.columns.is_empty() {
444 return statement.columns.clone();
445 }
446
447 let columns: Vec<String> = E::MODEL
448 .fields()
449 .iter()
450 .filter(|field| !sql_insert_field_is_omittable(field))
451 .map(|field| field.name().to_string())
452 .collect();
453 let full_columns: Vec<String> = E::MODEL
454 .fields()
455 .iter()
456 .filter(|field| field.write_management().is_none())
457 .map(|field| field.name().to_string())
458 .collect();
459 let first_width = sql_insert_source_width_hint::<E>(&statement.source);
460
461 if first_width == Some(columns.len()) {
462 return columns;
463 }
464
465 full_columns
466}
467
468fn validate_sql_insert_value_tuple_lengths(
471 columns: &[String],
472 values: &[Vec<Value>],
473) -> Result<(), QueryError> {
474 for tuple in values {
475 if tuple.len() != columns.len() {
476 return Err(QueryError::from_sql_parse_error(
477 crate::db::sql::parser::SqlParseError::invalid_syntax(
478 "INSERT column list and VALUES tuple length must match",
479 ),
480 ));
481 }
482 }
483
484 Ok(())
485}
486
487fn validate_sql_insert_selected_rows(
490 columns: &[String],
491 rows: &[Vec<Value>],
492) -> Result<(), QueryError> {
493 for row in rows {
494 if row.len() != columns.len() {
495 return Err(QueryError::unsupported_query(
496 "SQL INSERT SELECT projection width must match the target INSERT column list in this release",
497 ));
498 }
499 }
500
501 Ok(())
502}
503
504impl<C: CanisterKind> DbSession<C> {
505 fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
508 where
509 E: PersistedRow<Canister = C> + EntityValue,
510 {
511 let mut row = Vec::with_capacity(E::MODEL.fields().len());
512
513 for index in 0..E::MODEL.fields().len() {
514 let value = entity.get_value_by_index(index).ok_or_else(|| {
515 QueryError::invariant(
516 "SQL write dispatch projection row must include every declared field",
517 )
518 })?;
519 row.push(value);
520 }
521
522 Ok(row)
523 }
524
525 fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
529 where
530 E: PersistedRow<Canister = C> + EntityValue,
531 {
532 let columns = projection_labels_from_fields(E::MODEL.fields());
533 let rows = entities
534 .into_iter()
535 .map(Self::sql_write_dispatch_row)
536 .collect::<Result<Vec<_>, _>>()?;
537 let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
538
539 Ok(SqlDispatchResult::Projection {
540 columns,
541 rows,
542 row_count,
543 })
544 }
545
546 fn sql_insert_patch_and_key<E>(
549 columns: &[String],
550 values: &[Value],
551 ) -> Result<(E::Key, UpdatePatch), QueryError>
552 where
553 E: PersistedRow<Canister = C> + EntityValue,
554 {
555 let pk_name = E::MODEL.primary_key.name;
559 let generated_fields = E::MODEL
560 .fields()
561 .iter()
562 .filter(|field| !columns.iter().any(|column| column == field.name()))
563 .filter_map(|field| {
564 sql_write_generated_field_value(field).map(|value| (field.name(), value))
565 })
566 .collect::<Vec<_>>();
567 let key = if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
568 let pk_value = values.get(pk_index).ok_or_else(|| {
569 QueryError::invariant(
570 "INSERT primary key column must align with one VALUES literal",
571 )
572 })?;
573 sql_write_key_from_literal::<E>(pk_value, pk_name)?
574 } else if let Some((_, pk_value)) = generated_fields
575 .iter()
576 .find(|(field_name, _)| *field_name == pk_name)
577 {
578 sql_write_key_from_literal::<E>(pk_value, pk_name)?
579 } else {
580 return Err(QueryError::unsupported_query(format!(
581 "SQL INSERT requires primary key column '{pk_name}' in this release"
582 )));
583 };
584
585 let mut patch = UpdatePatch::new();
588 for (field_name, generated_value) in &generated_fields {
589 patch = patch
590 .set_field(E::MODEL, field_name, generated_value.clone())
591 .map_err(QueryError::execute)?;
592 }
593 for (field, value) in columns.iter().zip(values.iter()) {
594 reject_explicit_sql_write_to_managed_field::<E>(field, "INSERT")?;
595 let normalized = sql_write_value_for_field::<E>(field, value)?;
596 patch = patch
597 .set_field(E::MODEL, field, normalized)
598 .map_err(QueryError::execute)?;
599 }
600
601 Ok((key, patch))
602 }
603
604 fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
607 where
608 E: PersistedRow<Canister = C> + EntityValue,
609 {
610 let pk_name = E::MODEL.primary_key.name;
613 let mut patch = UpdatePatch::new();
614 for assignment in &statement.assignments {
615 if assignment.field == pk_name {
616 return Err(QueryError::unsupported_query(format!(
617 "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
618 )));
619 }
620 reject_explicit_sql_write_to_managed_field::<E>(assignment.field.as_str(), "UPDATE")?;
621 let normalized =
622 sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
623
624 patch = patch
625 .set_field(E::MODEL, assignment.field.as_str(), normalized)
626 .map_err(QueryError::execute)?;
627 }
628
629 Ok(patch)
630 }
631
632 fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
634 where
635 E: PersistedRow<Canister = C> + EntityValue,
636 {
637 let Some(predicate) = statement.predicate.clone() else {
641 return Err(QueryError::unsupported_query(
642 "SQL UPDATE requires WHERE predicate in this release",
643 ));
644 };
645 let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
646 let pk_name = E::MODEL.primary_key.name;
647 let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
648
649 if statement.order_by.is_empty() {
653 selector = selector.order_by(pk_name);
654 } else {
655 let mut orders_primary_key = false;
656
657 for term in &statement.order_by {
658 if term.field == pk_name {
659 orders_primary_key = true;
660 }
661 selector = match term.direction {
662 SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
663 SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
664 };
665 }
666
667 if !orders_primary_key {
668 selector = selector.order_by(pk_name);
669 }
670 }
671
672 if let Some(limit) = statement.limit {
675 selector = selector.limit(limit);
676 }
677 if let Some(offset) = statement.offset {
678 selector = selector.offset(offset);
679 }
680
681 Ok(selector)
682 }
683
684 fn sql_insert_select_source_statement<E>(
688 statement: &SqlInsertStatement,
689 ) -> Result<SqlSelectStatement, QueryError>
690 where
691 E: PersistedRow<Canister = C> + EntityValue,
692 {
693 let SqlInsertSource::Select(select) = statement.source.clone() else {
694 return Err(QueryError::invariant(
695 "INSERT SELECT source validation requires parsed SELECT source",
696 ));
697 };
698 let mut select = *select;
699 ensure_sql_write_entity_matches::<E>(select.entity.as_str())?;
700
701 if !select.group_by.is_empty() || !select.having.is_empty() {
702 return Err(QueryError::unsupported_query(
703 "SQL INSERT SELECT requires scalar SELECT source in this release",
704 ));
705 }
706
707 if let SqlProjection::Items(items) = &select.projection {
708 for item in items {
709 if matches!(item, SqlSelectItem::Aggregate(_)) {
710 return Err(QueryError::unsupported_query(
711 "SQL INSERT SELECT does not support aggregate source projection in this release",
712 ));
713 }
714 }
715 }
716
717 let pk_name = E::MODEL.primary_key.name;
718 if select.order_by.is_empty() || !select.order_by.iter().any(|term| term.field == pk_name) {
719 select.order_by.push(SqlOrderTerm {
720 field: pk_name.to_string(),
721 direction: SqlOrderDirection::Asc,
722 });
723 }
724
725 Ok(select)
726 }
727
728 fn execute_sql_insert_select_source_rows<E>(
732 &self,
733 source: &SqlSelectStatement,
734 ) -> Result<Vec<Vec<Value>>, QueryError>
735 where
736 E: PersistedRow<Canister = C> + EntityValue,
737 {
738 if let Some(plan) = computed_projection::computed_sql_projection_plan(
742 &SqlStatement::Select(source.clone()),
743 )? {
744 let result = self.execute_computed_sql_projection_dispatch_for_authority(
745 plan,
746 EntityAuthority::for_type::<E>(),
747 )?;
748
749 return match result {
750 SqlDispatchResult::Projection { rows, .. } => Ok(rows),
751 other => Err(QueryError::invariant(format!(
752 "INSERT SELECT computed source must produce projection rows, found {other:?}",
753 ))),
754 };
755 }
756
757 let prepared = prepare_sql_statement(SqlStatement::Select(source.clone()), E::MODEL.name())
760 .map_err(QueryError::from_sql_lowering_error)?;
761 let lowered =
762 lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)
763 .map_err(QueryError::from_sql_lowering_error)?;
764 let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
765 return Err(QueryError::invariant(
766 "INSERT SELECT source lowering must stay on the scalar SELECT query lane",
767 ));
768 };
769
770 let payload =
771 self.execute_lowered_sql_projection_core(select, EntityAuthority::for_type::<E>())?;
772 let (_, rows, _) = payload.into_parts();
773
774 Ok(rows)
775 }
776
777 fn execute_sql_insert_dispatch<E>(
780 &self,
781 statement: &SqlInsertStatement,
782 ) -> Result<SqlDispatchResult, QueryError>
783 where
784 E: PersistedRow<Canister = C> + EntityValue,
785 {
786 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
787 let columns = sql_insert_columns::<E>(statement);
788 validate_sql_insert_required_fields::<E>(columns.as_slice())?;
789 let write_context = SanitizeWriteContext::new(SanitizeWriteMode::Insert, Timestamp::now());
790 let source_rows = match &statement.source {
791 SqlInsertSource::Values(values) => {
792 validate_sql_insert_value_tuple_lengths(columns.as_slice(), values.as_slice())?;
793 values.clone()
794 }
795 SqlInsertSource::Select(_) => {
796 let source = Self::sql_insert_select_source_statement::<E>(statement)?;
797 let rows = self.execute_sql_insert_select_source_rows::<E>(&source)?;
798 validate_sql_insert_selected_rows(columns.as_slice(), rows.as_slice())?;
799
800 rows
801 }
802 };
803 let mut entities = Vec::with_capacity(source_rows.len());
804
805 for values in &source_rows {
806 let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
807 let entity = self
808 .execute_save_entity::<E>(|save| {
809 save.apply_structural_mutation_with_write_context(
810 MutationMode::Insert,
811 key,
812 patch,
813 write_context,
814 )
815 })
816 .map_err(QueryError::execute)?;
817 entities.push(entity);
818 }
819
820 Self::sql_write_dispatch_projection(entities)
821 }
822
823 fn execute_sql_update_dispatch<E>(
827 &self,
828 statement: &SqlUpdateStatement,
829 ) -> Result<SqlDispatchResult, QueryError>
830 where
831 E: PersistedRow<Canister = C> + EntityValue,
832 {
833 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
834 let selector = Self::sql_update_selector_query::<E>(statement)?;
835 let patch = Self::sql_update_patch::<E>(statement)?;
836 let write_context = SanitizeWriteContext::new(SanitizeWriteMode::Update, Timestamp::now());
837 let matched = self.execute_query(&selector)?;
838 let mut entities = Vec::with_capacity(matched.len());
839
840 for entity in matched.entities() {
843 let updated = self
844 .execute_save_entity::<E>(|save| {
845 save.apply_structural_mutation_with_write_context(
846 MutationMode::Update,
847 entity.id().key(),
848 patch.clone(),
849 write_context,
850 )
851 })
852 .map_err(QueryError::execute)?;
853 entities.push(updated);
854 }
855
856 Self::sql_write_dispatch_projection(entities)
857 }
858
859 fn prepare_structural_sql_projection_execution(
862 &self,
863 query: StructuralQuery,
864 authority: EntityAuthority,
865 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
866 let (_, plan) =
869 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
870 let projection = plan.projection_spec(authority.model());
871 let columns = projection_labels_from_projection_spec(&projection);
872
873 Ok((columns, plan))
874 }
875
876 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
880 &self,
881 query: StructuralQuery,
882 authority: EntityAuthority,
883 ) -> Result<SqlProjectionPayload, QueryError> {
884 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
886
887 let projected =
890 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
891 .map_err(QueryError::execute)?;
892 let (rows, row_count) = projected.into_parts();
893
894 Ok(SqlProjectionPayload::new(columns, rows, row_count))
895 }
896
897 fn execute_structural_sql_projection_text(
901 &self,
902 query: StructuralQuery,
903 authority: EntityAuthority,
904 ) -> Result<SqlDispatchResult, QueryError> {
905 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
907
908 let projected =
911 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
912 .map_err(QueryError::execute)?;
913 let (rows, row_count) = projected.into_parts();
914
915 Ok(SqlDispatchResult::ProjectionText {
916 columns,
917 rows,
918 row_count,
919 })
920 }
921
922 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
926 where
927 E: PersistedRow<Canister = C> + EntityValue,
928 {
929 let plan = self
930 .compile_query_with_visible_indexes(query)?
931 .into_prepared_execution_plan();
932 let deleted = self
933 .with_metrics(|| {
934 self.delete_executor::<E>()
935 .execute_structural_projection(plan)
936 })
937 .map_err(QueryError::execute)?;
938 let (rows, row_count) = deleted.into_parts();
939 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
940
941 Ok(SqlProjectionPayload::new(
942 projection_labels_from_fields(E::MODEL.fields()),
943 rows,
944 row_count,
945 )
946 .into_dispatch_result())
947 }
948
949 fn lowered_sql_query_dispatch_inputs_for_authority(
952 parsed: &SqlParsedStatement,
953 authority: EntityAuthority,
954 unsupported_message: &'static str,
955 ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
956 let lowered = parsed.lower_query_lane_for_entity(
957 authority.model().name(),
958 authority.model().primary_key.name,
959 )?;
960 let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
961 .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
962 .transpose()?;
963 let query = lowered
964 .into_query()
965 .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
966
967 Ok((query, projection_columns.flatten()))
968 }
969
970 fn dispatch_sql_query_route_for_authority(
974 &self,
975 parsed: &SqlParsedStatement,
976 authority: EntityAuthority,
977 unsupported_message: &'static str,
978 dispatch_select: impl FnOnce(
979 &Self,
980 LoweredSelectShape,
981 EntityAuthority,
982 bool,
983 Option<Vec<String>>,
984 ) -> Result<SqlDispatchResult, QueryError>,
985 dispatch_delete: impl FnOnce(
986 &Self,
987 LoweredBaseQueryShape,
988 EntityAuthority,
989 ) -> Result<SqlDispatchResult, QueryError>,
990 ) -> Result<SqlDispatchResult, QueryError> {
991 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
994 let command =
995 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
996
997 return self.execute_sql_aggregate_dispatch_for_authority(
998 command,
999 authority,
1000 sql_aggregate_dispatch_label_override(&parsed.statement),
1001 );
1002 }
1003
1004 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
1005 return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
1006 }
1007
1008 let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
1011 parsed,
1012 authority,
1013 unsupported_message,
1014 )?;
1015 let grouped_surface = query.has_grouping();
1016
1017 match query {
1018 LoweredSqlQuery::Select(select) => {
1019 dispatch_select(self, select, authority, grouped_surface, projection_columns)
1020 }
1021 LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
1022 }
1023 }
1024
1025 fn dispatch_sql_explain_route_for_authority(
1029 &self,
1030 parsed: &SqlParsedStatement,
1031 authority: EntityAuthority,
1032 ) -> Result<SqlDispatchResult, QueryError> {
1033 if let Some((mode, plan)) =
1036 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
1037 {
1038 return self
1039 .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
1040 .map(SqlDispatchResult::Explain);
1041 }
1042
1043 let lowered = parsed.lower_query_lane_for_entity(
1046 authority.model().name(),
1047 authority.model().primary_key.name,
1048 )?;
1049 if let Some(explain) =
1050 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
1051 {
1052 return Ok(SqlDispatchResult::Explain(explain));
1053 }
1054
1055 self.explain_lowered_sql_for_authority(&lowered, authority)
1056 .map(SqlDispatchResult::Explain)
1057 }
1058
1059 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
1062 query: &Query<E>,
1063 surface: SqlGroupingSurface,
1064 ) -> Result<(), QueryError>
1065 where
1066 E: EntityKind,
1067 {
1068 match (surface, query.has_grouping()) {
1069 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
1070 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
1071 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
1072 ),
1073 }
1074 }
1075
1076 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
1078 where
1079 E: PersistedRow<Canister = C> + EntityValue,
1080 {
1081 let parsed = self.parse_sql_statement(sql)?;
1082
1083 self.execute_sql_dispatch_parsed::<E>(&parsed)
1084 }
1085
1086 pub fn execute_sql_dispatch_parsed<E>(
1088 &self,
1089 parsed: &SqlParsedStatement,
1090 ) -> Result<SqlDispatchResult, QueryError>
1091 where
1092 E: PersistedRow<Canister = C> + EntityValue,
1093 {
1094 match parsed.route() {
1095 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1096 parsed,
1097 EntityAuthority::for_type::<E>(),
1098 "execute_sql_dispatch accepts SELECT or DELETE only",
1099 |session, select, authority, grouped_surface, projection_columns| {
1100 if grouped_surface {
1101 let columns = projection_columns.ok_or_else(|| {
1102 QueryError::unsupported_query(
1103 "grouped SQL dispatch requires explicit grouped projection items",
1104 )
1105 })?;
1106
1107 return session.execute_lowered_sql_grouped_dispatch_select_core(
1108 select, authority, columns,
1109 );
1110 }
1111
1112 let payload = session.execute_lowered_sql_projection_core(select, authority)?;
1113 if let Some(columns) = projection_columns {
1114 let (_, rows, row_count) = payload.into_parts();
1115
1116 return Ok(SqlProjectionPayload::new(columns, rows, row_count)
1117 .into_dispatch_result());
1118 }
1119
1120 Ok(payload.into_dispatch_result())
1121 },
1122 |session, delete, _authority| {
1123 let typed_query = bind_lowered_sql_query::<E>(
1124 LoweredSqlQuery::Delete(delete),
1125 MissingRowPolicy::Ignore,
1126 )
1127 .map_err(QueryError::from_sql_lowering_error)?;
1128
1129 session.execute_typed_sql_delete(&typed_query)
1130 },
1131 ),
1132 SqlStatementRoute::Insert { .. } => {
1133 let SqlStatement::Insert(statement) = &parsed.statement else {
1134 return Err(QueryError::invariant(
1135 "INSERT SQL route must carry parsed INSERT statement",
1136 ));
1137 };
1138
1139 self.execute_sql_insert_dispatch::<E>(statement)
1140 }
1141 SqlStatementRoute::Update { .. } => {
1142 let SqlStatement::Update(statement) = &parsed.statement else {
1143 return Err(QueryError::invariant(
1144 "UPDATE SQL route must carry parsed UPDATE statement",
1145 ));
1146 };
1147
1148 self.execute_sql_update_dispatch::<E>(statement)
1149 }
1150 SqlStatementRoute::Explain { .. } => self
1151 .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
1152 SqlStatementRoute::Describe { .. } => {
1153 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
1154 }
1155 SqlStatementRoute::ShowIndexes { .. } => {
1156 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
1157 }
1158 SqlStatementRoute::ShowColumns { .. } => {
1159 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
1160 }
1161 SqlStatementRoute::ShowEntities => {
1162 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
1163 }
1164 }
1165 }
1166
1167 #[doc(hidden)]
1174 pub fn execute_generated_query_surface_dispatch_for_authority(
1175 &self,
1176 parsed: &SqlParsedStatement,
1177 authority: EntityAuthority,
1178 ) -> Result<SqlDispatchResult, QueryError> {
1179 match parsed.route() {
1180 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1181 parsed,
1182 authority,
1183 "generated SQL query surface requires query or EXPLAIN statement lanes",
1184 |session, select, authority, grouped_surface, projection_columns| {
1185 if grouped_surface {
1186 let columns = projection_columns.ok_or_else(|| {
1187 QueryError::unsupported_query(
1188 "grouped SQL dispatch requires explicit grouped projection items",
1189 )
1190 })?;
1191
1192 return session
1193 .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
1194 }
1195
1196 let result =
1197 session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
1198 if let Some(columns) = projection_columns {
1199 let SqlDispatchResult::ProjectionText {
1200 rows, row_count, ..
1201 } = result
1202 else {
1203 return Err(QueryError::invariant(
1204 "generated scalar SQL dispatch text path must emit projection text rows",
1205 ));
1206 };
1207
1208 return Ok(SqlDispatchResult::ProjectionText {
1209 columns,
1210 rows,
1211 row_count,
1212 });
1213 }
1214
1215 Ok(result)
1216 },
1217 |session, delete, authority| {
1218 session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
1219 },
1220 ),
1221 SqlStatementRoute::Explain { .. } => {
1222 self.dispatch_sql_explain_route_for_authority(parsed, authority)
1223 }
1224 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
1225 | SqlStatementRoute::Describe { .. }
1226 | SqlStatementRoute::ShowIndexes { .. }
1227 | SqlStatementRoute::ShowColumns { .. }
1228 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
1229 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1230 )),
1231 }
1232 }
1233
1234 #[doc(hidden)]
1240 #[must_use]
1241 pub fn execute_generated_query_surface_sql(
1242 &self,
1243 sql: &str,
1244 authorities: &[EntityAuthority],
1245 ) -> GeneratedSqlDispatchAttempt {
1246 let sql_trimmed = match trim_generated_query_sql_input(sql) {
1249 Ok(sql_trimmed) => sql_trimmed,
1250 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1251 };
1252 let parsed = match self.parse_sql_statement(sql_trimmed) {
1253 Ok(parsed) => parsed,
1254 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1255 };
1256
1257 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
1260 return GeneratedSqlDispatchAttempt::new(
1261 "",
1262 None,
1263 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
1264 authorities,
1265 ))),
1266 );
1267 }
1268 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1269 Ok(authority) => authority,
1270 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1271 };
1272
1273 let entity_name = authority.model().name();
1277 let explain_order_field = parsed
1278 .route()
1279 .is_explain()
1280 .then_some(authority.model().primary_key.name);
1281 let result = match parsed.route() {
1282 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1283 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1284 }
1285 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1286 Err(QueryError::unsupported_query(
1287 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1288 ))
1289 }
1290 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1291 self.describe_entity_model(authority.model()),
1292 )),
1293 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1294 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1295 )),
1296 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1297 self.show_columns_for_model(authority.model()),
1298 )),
1299 SqlStatementRoute::ShowEntities => unreachable!(
1300 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1301 ),
1302 };
1303
1304 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1305 }
1306}