1mod computed;
8mod lowered;
9
10use crate::{
11 db::{
12 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13 data::UpdatePatch,
14 executor::{EntityAuthority, MutationMode},
15 identifiers_tail_match,
16 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17 schema::{ValidateError, field_type_from_model_kind, literal_matches_type},
18 session::sql::{
19 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
20 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
21 computed_projection,
22 projection::{
23 SqlProjectionPayload, execute_sql_projection_rows_for_canister,
24 execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
25 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
26 },
27 },
28 sql::lowering::{
29 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
30 bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
31 lower_sql_command_from_prepared_statement, prepare_sql_statement,
32 },
33 sql::parser::{
34 SqlAggregateCall, SqlAggregateKind, SqlInsertSource, SqlInsertStatement,
35 SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlReturningProjection, SqlSelectItem,
36 SqlSelectStatement, SqlStatement, SqlTextFunction, SqlUpdateStatement,
37 },
38 },
39 model::{
40 entity::resolve_field_slot,
41 field::{FieldInsertGeneration, FieldKind, FieldModel},
42 },
43 sanitize::{SanitizeWriteContext, SanitizeWriteMode},
44 traits::{CanisterKind, EntityKind, EntityValue},
45 types::{Timestamp, Ulid},
46 value::Value,
47};
48
49#[cfg(feature = "perf-attribution")]
50pub use lowered::LoweredSqlDispatchExecutorAttribution;
51
52#[doc(hidden)]
61pub struct GeneratedSqlDispatchAttempt {
62 entity_name: &'static str,
63 explain_order_field: Option<&'static str>,
64 result: Result<SqlDispatchResult, QueryError>,
65}
66
67impl GeneratedSqlDispatchAttempt {
68 const fn new(
70 entity_name: &'static str,
71 explain_order_field: Option<&'static str>,
72 result: Result<SqlDispatchResult, QueryError>,
73 ) -> Self {
74 Self {
75 entity_name,
76 explain_order_field,
77 result,
78 }
79 }
80
81 #[must_use]
83 pub const fn entity_name(&self) -> &'static str {
84 self.entity_name
85 }
86
87 #[must_use]
89 pub const fn explain_order_field(&self) -> Option<&'static str> {
90 self.explain_order_field
91 }
92
93 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
95 self.result
96 }
97}
98
99#[derive(Clone, Copy, Debug, Eq, PartialEq)]
100pub(in crate::db::session::sql) enum SqlGroupingSurface {
101 Scalar,
102 Grouped,
103}
104
105const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
106 match surface {
107 SqlGroupingSurface::Scalar => {
108 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
109 }
110 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
111 }
112}
113
114fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
117 let sql_trimmed = sql.trim();
118 if sql_trimmed.is_empty() {
119 return Err(QueryError::unsupported_query(
120 "query endpoint requires a non-empty SQL string",
121 ));
122 }
123
124 Ok(sql_trimmed)
125}
126
127fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
130 let mut entities = Vec::with_capacity(authorities.len());
131
132 for authority in authorities {
133 entities.push(authority.model().name().to_string());
134 }
135
136 entities
137}
138
139fn sql_projection_labels_from_select_statement(
142 statement: &SqlStatement,
143) -> Result<Option<Vec<String>>, QueryError> {
144 let SqlStatement::Select(select) = statement else {
145 return Err(QueryError::invariant(
146 "SQL projection labels require SELECT statement shape",
147 ));
148 };
149 let SqlProjection::Items(items) = &select.projection else {
150 return Ok(None);
151 };
152
153 Ok(Some(
154 items
155 .iter()
156 .enumerate()
157 .map(|(index, item)| {
158 select
159 .projection_alias(index)
160 .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
161 })
162 .collect(),
163 ))
164}
165
166fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
169 match item {
170 SqlSelectItem::Field(field) => field.clone(),
171 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
172 SqlSelectItem::TextFunction(call) => {
173 format!(
174 "{}({})",
175 grouped_sql_text_function_name(call.function),
176 call.field
177 )
178 }
179 }
180}
181
182fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
185 let SqlStatement::Select(select) = statement else {
186 return None;
187 };
188
189 select.projection_alias(0).map(str::to_string)
190}
191
192fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
194 let kind = match aggregate.kind {
195 SqlAggregateKind::Count => "COUNT",
196 SqlAggregateKind::Sum => "SUM",
197 SqlAggregateKind::Avg => "AVG",
198 SqlAggregateKind::Min => "MIN",
199 SqlAggregateKind::Max => "MAX",
200 };
201
202 match aggregate.field.as_deref() {
203 Some(field) => format!("{kind}({field})"),
204 None => format!("{kind}(*)"),
205 }
206}
207
208const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
211 match function {
212 SqlTextFunction::Trim => "TRIM",
213 SqlTextFunction::Ltrim => "LTRIM",
214 SqlTextFunction::Rtrim => "RTRIM",
215 SqlTextFunction::Lower => "LOWER",
216 SqlTextFunction::Upper => "UPPER",
217 SqlTextFunction::Length => "LENGTH",
218 SqlTextFunction::Left => "LEFT",
219 SqlTextFunction::Right => "RIGHT",
220 SqlTextFunction::StartsWith => "STARTS_WITH",
221 SqlTextFunction::EndsWith => "ENDS_WITH",
222 SqlTextFunction::Contains => "CONTAINS",
223 SqlTextFunction::Position => "POSITION",
224 SqlTextFunction::Replace => "REPLACE",
225 SqlTextFunction::Substring => "SUBSTRING",
226 }
227}
228
229fn authority_for_generated_sql_route(
231 route: &SqlStatementRoute,
232 authorities: &[EntityAuthority],
233) -> Result<EntityAuthority, QueryError> {
234 let sql_entity = route.entity();
235
236 for authority in authorities {
237 if identifiers_tail_match(sql_entity, authority.model().name()) {
238 return Ok(*authority);
239 }
240 }
241
242 Err(unsupported_generated_sql_entity_error(
243 sql_entity,
244 authorities,
245 ))
246}
247
248fn unsupported_generated_sql_entity_error(
251 entity_name: &str,
252 authorities: &[EntityAuthority],
253) -> QueryError {
254 let mut supported = String::new();
255
256 for (index, authority) in authorities.iter().enumerate() {
257 if index != 0 {
258 supported.push_str(", ");
259 }
260
261 supported.push_str(authority.model().name());
262 }
263
264 QueryError::unsupported_query(format!(
265 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
266 ))
267}
268
269fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
272where
273 E: EntityKind,
274{
275 if identifiers_tail_match(sql_entity, E::MODEL.name()) {
276 return Ok(());
277 }
278
279 Err(QueryError::from_sql_lowering_error(
280 SqlLoweringError::EntityMismatch {
281 sql_entity: sql_entity.to_string(),
282 expected_entity: E::MODEL.name(),
283 },
284 ))
285}
286
287fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
290where
291 E: EntityKind,
292{
293 if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
294 return Ok(key);
295 }
296
297 let widened = match value {
298 Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
299 Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
300 _ => {
301 return Err(QueryError::unsupported_query(format!(
302 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
303 )));
304 }
305 };
306
307 <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
308 QueryError::unsupported_query(format!(
309 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
310 ))
311 })
312}
313
314fn sql_write_generated_field_value(field: &FieldModel) -> Option<Value> {
317 field
318 .insert_generation()
319 .map(|generation| match generation {
320 FieldInsertGeneration::Ulid => Value::Ulid(Ulid::generate()),
321 FieldInsertGeneration::Timestamp => Value::Timestamp(Timestamp::now()),
322 })
323}
324
325fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
328where
329 E: EntityKind,
330{
331 let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
332 QueryError::invariant("SQL write field must resolve against the target entity model")
333 })?;
334 let field_kind = E::MODEL.fields()[field_slot].kind();
335
336 let normalized = match (field_kind, value) {
337 (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
338 (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
339 Value::Int(v.cast_signed())
340 }
341 _ => value.clone(),
342 };
343
344 let field_type = field_type_from_model_kind(&field_kind);
345 if !literal_matches_type(&normalized, &field_type) {
346 return Err(QueryError::unsupported_query(
347 ValidateError::invalid_literal(field_name, "literal type does not match field type")
348 .to_string(),
349 ));
350 }
351
352 Ok(normalized)
353}
354
355fn reject_explicit_sql_write_to_managed_field<E>(
358 field_name: &str,
359 statement_kind: &str,
360) -> Result<(), QueryError>
361where
362 E: EntityKind,
363{
364 let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
365 return Ok(());
366 };
367 let field = &E::MODEL.fields()[field_slot];
368
369 if field.write_management().is_some() {
370 return Err(QueryError::unsupported_query(format!(
371 "SQL {statement_kind} does not allow explicit writes to managed field '{field_name}' in this release"
372 )));
373 }
374
375 Ok(())
376}
377
378fn reject_explicit_sql_insert_to_generated_field<E>(field_name: &str) -> Result<(), QueryError>
381where
382 E: EntityKind,
383{
384 let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
385 return Ok(());
386 };
387 let field = &E::MODEL.fields()[field_slot];
388
389 if field.insert_generation().is_some() {
390 return Err(QueryError::unsupported_query(format!(
391 "SQL INSERT does not allow explicit writes to generated field '{field_name}' in this release"
392 )));
393 }
394
395 Ok(())
396}
397
398fn reject_explicit_sql_update_to_generated_field<E>(field_name: &str) -> Result<(), QueryError>
401where
402 E: EntityKind,
403{
404 let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
405 return Ok(());
406 };
407 let field = &E::MODEL.fields()[field_slot];
408
409 if field.insert_generation().is_some() {
410 return Err(QueryError::unsupported_query(format!(
411 "SQL UPDATE does not allow explicit writes to generated field '{field_name}' in this release"
412 )));
413 }
414
415 Ok(())
416}
417
418fn sql_insert_field_is_omittable(field: &FieldModel) -> bool {
421 if sql_write_generated_field_value(field).is_some() {
422 return true;
423 }
424
425 field.write_management().is_some()
426}
427
428fn validate_sql_insert_required_fields<E>(columns: &[String]) -> Result<(), QueryError>
431where
432 E: EntityKind,
433{
434 let missing_required_fields = E::MODEL
435 .fields()
436 .iter()
437 .filter(|field| !columns.iter().any(|column| column == field.name()))
438 .filter(|field| !sql_insert_field_is_omittable(field))
439 .map(FieldModel::name)
440 .collect::<Vec<_>>();
441
442 if missing_required_fields.is_empty() {
443 return Ok(());
444 }
445
446 if missing_required_fields.len() == 1
447 && missing_required_fields[0] == E::MODEL.primary_key.name()
448 {
449 return Err(QueryError::unsupported_query(format!(
450 "SQL INSERT requires primary key column '{}' in this release",
451 E::MODEL.primary_key.name()
452 )));
453 }
454
455 Err(QueryError::unsupported_query(format!(
456 "SQL INSERT requires explicit values for non-generated fields {} in this release",
457 missing_required_fields.join(", ")
458 )))
459}
460
461fn sql_insert_source_width_hint<E>(source: &SqlInsertSource) -> Option<usize>
466where
467 E: EntityKind,
468{
469 match source {
470 SqlInsertSource::Values(values) => values.first().map(Vec::len),
471 SqlInsertSource::Select(select) => match &select.projection {
472 SqlProjection::All => Some(
473 E::MODEL
474 .fields()
475 .iter()
476 .filter(|field| field.write_management().is_none())
477 .count(),
478 ),
479 SqlProjection::Items(items) => Some(items.len()),
480 },
481 }
482}
483
484fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
489where
490 E: EntityKind,
491{
492 if !statement.columns.is_empty() {
493 return statement.columns.clone();
494 }
495
496 let columns: Vec<String> = E::MODEL
497 .fields()
498 .iter()
499 .filter(|field| !sql_insert_field_is_omittable(field))
500 .map(|field| field.name().to_string())
501 .collect();
502 let full_columns: Vec<String> = E::MODEL
503 .fields()
504 .iter()
505 .filter(|field| field.write_management().is_none())
506 .map(|field| field.name().to_string())
507 .collect();
508 let first_width = sql_insert_source_width_hint::<E>(&statement.source);
509
510 if first_width == Some(columns.len()) {
511 return columns;
512 }
513
514 full_columns
515}
516
517fn validate_sql_insert_value_tuple_lengths(
520 columns: &[String],
521 values: &[Vec<Value>],
522) -> Result<(), QueryError> {
523 for tuple in values {
524 if tuple.len() != columns.len() {
525 return Err(QueryError::from_sql_parse_error(
526 crate::db::sql::parser::SqlParseError::invalid_syntax(
527 "INSERT column list and VALUES tuple length must match",
528 ),
529 ));
530 }
531 }
532
533 Ok(())
534}
535
536fn validate_sql_insert_selected_rows(
539 columns: &[String],
540 rows: &[Vec<Value>],
541) -> Result<(), QueryError> {
542 for row in rows {
543 if row.len() != columns.len() {
544 return Err(QueryError::unsupported_query(
545 "SQL INSERT SELECT projection width must match the target INSERT column list in this release",
546 ));
547 }
548 }
549
550 Ok(())
551}
552
553impl<C: CanisterKind> DbSession<C> {
554 fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
557 where
558 E: PersistedRow<Canister = C> + EntityValue,
559 {
560 let mut row = Vec::with_capacity(E::MODEL.fields().len());
561
562 for index in 0..E::MODEL.fields().len() {
563 let value = entity.get_value_by_index(index).ok_or_else(|| {
564 QueryError::invariant(
565 "SQL write dispatch projection row must include every declared field",
566 )
567 })?;
568 row.push(value);
569 }
570
571 Ok(row)
572 }
573
574 fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
578 where
579 E: PersistedRow<Canister = C> + EntityValue,
580 {
581 let columns = projection_labels_from_fields(E::MODEL.fields());
582 let rows = entities
583 .into_iter()
584 .map(Self::sql_write_dispatch_row)
585 .collect::<Result<Vec<_>, _>>()?;
586 let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
587
588 Ok(SqlDispatchResult::Projection {
589 columns,
590 rows,
591 row_count,
592 })
593 }
594
595 fn sql_write_dispatch_result<E>(
599 entities: Vec<E>,
600 returning: Option<&SqlReturningProjection>,
601 ) -> Result<SqlDispatchResult, QueryError>
602 where
603 E: PersistedRow<Canister = C> + EntityValue,
604 {
605 let row_count = u32::try_from(entities.len()).unwrap_or(u32::MAX);
606
607 match returning {
608 None => Ok(SqlDispatchResult::Count { row_count }),
609 Some(returning) => {
610 let SqlDispatchResult::Projection {
611 columns,
612 rows,
613 row_count,
614 } = Self::sql_write_dispatch_projection(entities)?
615 else {
616 return Err(QueryError::invariant(
617 "SQL write projection helper must emit value-row projection payload",
618 ));
619 };
620
621 Self::sql_returning_dispatch_projection(columns, rows, row_count, returning)
622 }
623 }
624 }
625
626 fn sql_returning_dispatch_projection(
630 columns: Vec<String>,
631 rows: Vec<Vec<Value>>,
632 row_count: u32,
633 returning: &SqlReturningProjection,
634 ) -> Result<SqlDispatchResult, QueryError> {
635 match returning {
636 SqlReturningProjection::All => Ok(SqlDispatchResult::Projection {
637 columns,
638 rows,
639 row_count,
640 }),
641 SqlReturningProjection::Fields(fields) => {
642 let mut indices = Vec::with_capacity(fields.len());
643
644 for field in fields {
645 let index = columns
646 .iter()
647 .position(|column| column == field)
648 .ok_or_else(|| {
649 QueryError::unsupported_query(format!(
650 "SQL RETURNING field '{field}' does not exist on the target entity"
651 ))
652 })?;
653 indices.push(index);
654 }
655
656 let mut projected_rows = Vec::with_capacity(rows.len());
657 for row in rows {
658 let mut projected = Vec::with_capacity(indices.len());
659 for index in &indices {
660 let value = row.get(*index).ok_or_else(|| {
661 QueryError::invariant(
662 "SQL RETURNING projection row must align with declared columns",
663 )
664 })?;
665 projected.push(value.clone());
666 }
667 projected_rows.push(projected);
668 }
669
670 Ok(SqlDispatchResult::Projection {
671 columns: fields.clone(),
672 rows: projected_rows,
673 row_count,
674 })
675 }
676 }
677 }
678
679 fn sql_insert_patch_and_key<E>(
682 columns: &[String],
683 values: &[Value],
684 ) -> Result<(E::Key, UpdatePatch), QueryError>
685 where
686 E: PersistedRow<Canister = C> + EntityValue,
687 {
688 let pk_name = E::MODEL.primary_key.name;
692 let generated_fields = E::MODEL
693 .fields()
694 .iter()
695 .filter(|field| !columns.iter().any(|column| column == field.name()))
696 .filter_map(|field| {
697 sql_write_generated_field_value(field).map(|value| (field.name(), value))
698 })
699 .collect::<Vec<_>>();
700 let key = if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
701 let pk_value = values.get(pk_index).ok_or_else(|| {
702 QueryError::invariant(
703 "INSERT primary key column must align with one VALUES literal",
704 )
705 })?;
706 sql_write_key_from_literal::<E>(pk_value, pk_name)?
707 } else if let Some((_, pk_value)) = generated_fields
708 .iter()
709 .find(|(field_name, _)| *field_name == pk_name)
710 {
711 sql_write_key_from_literal::<E>(pk_value, pk_name)?
712 } else {
713 return Err(QueryError::unsupported_query(format!(
714 "SQL INSERT requires primary key column '{pk_name}' in this release"
715 )));
716 };
717
718 let mut patch = UpdatePatch::new();
721 for (field_name, generated_value) in &generated_fields {
722 patch = patch
723 .set_field(E::MODEL, field_name, generated_value.clone())
724 .map_err(QueryError::execute)?;
725 }
726 for (field, value) in columns.iter().zip(values.iter()) {
727 reject_explicit_sql_insert_to_generated_field::<E>(field)?;
728 reject_explicit_sql_write_to_managed_field::<E>(field, "INSERT")?;
729 let normalized = sql_write_value_for_field::<E>(field, value)?;
730 patch = patch
731 .set_field(E::MODEL, field, normalized)
732 .map_err(QueryError::execute)?;
733 }
734
735 Ok((key, patch))
736 }
737
738 fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
741 where
742 E: PersistedRow<Canister = C> + EntityValue,
743 {
744 let pk_name = E::MODEL.primary_key.name;
747 let mut patch = UpdatePatch::new();
748 for assignment in &statement.assignments {
749 if assignment.field == pk_name {
750 return Err(QueryError::unsupported_query(format!(
751 "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
752 )));
753 }
754 reject_explicit_sql_update_to_generated_field::<E>(assignment.field.as_str())?;
755 reject_explicit_sql_write_to_managed_field::<E>(assignment.field.as_str(), "UPDATE")?;
756 let normalized =
757 sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
758
759 patch = patch
760 .set_field(E::MODEL, assignment.field.as_str(), normalized)
761 .map_err(QueryError::execute)?;
762 }
763
764 Ok(patch)
765 }
766
767 fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
769 where
770 E: PersistedRow<Canister = C> + EntityValue,
771 {
772 let Some(predicate) = statement.predicate.clone() else {
776 return Err(QueryError::unsupported_query(
777 "SQL UPDATE requires WHERE predicate in this release",
778 ));
779 };
780 let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
781 let pk_name = E::MODEL.primary_key.name;
782 let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
783
784 if statement.order_by.is_empty() {
788 selector = selector.order_by(pk_name);
789 } else {
790 let mut orders_primary_key = false;
791
792 for term in &statement.order_by {
793 if term.field == pk_name {
794 orders_primary_key = true;
795 }
796 selector = match term.direction {
797 SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
798 SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
799 };
800 }
801
802 if !orders_primary_key {
803 selector = selector.order_by(pk_name);
804 }
805 }
806
807 if let Some(limit) = statement.limit {
810 selector = selector.limit(limit);
811 }
812 if let Some(offset) = statement.offset {
813 selector = selector.offset(offset);
814 }
815
816 Ok(selector)
817 }
818
819 fn sql_insert_select_source_statement<E>(
823 statement: &SqlInsertStatement,
824 ) -> Result<SqlSelectStatement, QueryError>
825 where
826 E: PersistedRow<Canister = C> + EntityValue,
827 {
828 let SqlInsertSource::Select(select) = statement.source.clone() else {
829 return Err(QueryError::invariant(
830 "INSERT SELECT source validation requires parsed SELECT source",
831 ));
832 };
833 let mut select = *select;
834 ensure_sql_write_entity_matches::<E>(select.entity.as_str())?;
835
836 if !select.group_by.is_empty() || !select.having.is_empty() {
837 return Err(QueryError::unsupported_query(
838 "SQL INSERT SELECT requires scalar SELECT source in this release",
839 ));
840 }
841
842 if let SqlProjection::Items(items) = &select.projection {
843 for item in items {
844 if matches!(item, SqlSelectItem::Aggregate(_)) {
845 return Err(QueryError::unsupported_query(
846 "SQL INSERT SELECT does not support aggregate source projection in this release",
847 ));
848 }
849 }
850 }
851
852 let pk_name = E::MODEL.primary_key.name;
853 if select.order_by.is_empty() || !select.order_by.iter().any(|term| term.field == pk_name) {
854 select.order_by.push(SqlOrderTerm {
855 field: pk_name.to_string(),
856 direction: SqlOrderDirection::Asc,
857 });
858 }
859
860 Ok(select)
861 }
862
863 fn execute_sql_insert_select_source_rows<E>(
867 &self,
868 source: &SqlSelectStatement,
869 ) -> Result<Vec<Vec<Value>>, QueryError>
870 where
871 E: PersistedRow<Canister = C> + EntityValue,
872 {
873 if let Some(plan) = computed_projection::computed_sql_projection_plan(
877 &SqlStatement::Select(source.clone()),
878 )? {
879 let result = self.execute_computed_sql_projection_dispatch_for_authority(
880 plan,
881 EntityAuthority::for_type::<E>(),
882 )?;
883
884 return match result {
885 SqlDispatchResult::Projection { rows, .. } => Ok(rows),
886 other => Err(QueryError::invariant(format!(
887 "INSERT SELECT computed source must produce projection rows, found {other:?}",
888 ))),
889 };
890 }
891
892 let prepared = prepare_sql_statement(SqlStatement::Select(source.clone()), E::MODEL.name())
895 .map_err(QueryError::from_sql_lowering_error)?;
896 let lowered =
897 lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)
898 .map_err(QueryError::from_sql_lowering_error)?;
899 let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
900 return Err(QueryError::invariant(
901 "INSERT SELECT source lowering must stay on the scalar SELECT query lane",
902 ));
903 };
904
905 let payload =
906 self.execute_lowered_sql_projection_core(select, EntityAuthority::for_type::<E>())?;
907 let (_, rows, _) = payload.into_parts();
908
909 Ok(rows)
910 }
911
912 fn execute_sql_insert_dispatch<E>(
915 &self,
916 statement: &SqlInsertStatement,
917 ) -> Result<SqlDispatchResult, QueryError>
918 where
919 E: PersistedRow<Canister = C> + EntityValue,
920 {
921 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
922 let columns = sql_insert_columns::<E>(statement);
923 validate_sql_insert_required_fields::<E>(columns.as_slice())?;
924 let write_context = SanitizeWriteContext::new(SanitizeWriteMode::Insert, Timestamp::now());
925 let source_rows = match &statement.source {
926 SqlInsertSource::Values(values) => {
927 validate_sql_insert_value_tuple_lengths(columns.as_slice(), values.as_slice())?;
928 values.clone()
929 }
930 SqlInsertSource::Select(_) => {
931 let source = Self::sql_insert_select_source_statement::<E>(statement)?;
932 let rows = self.execute_sql_insert_select_source_rows::<E>(&source)?;
933 validate_sql_insert_selected_rows(columns.as_slice(), rows.as_slice())?;
934
935 rows
936 }
937 };
938 let mut entities = Vec::with_capacity(source_rows.len());
939
940 for values in &source_rows {
941 let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
942 let entity = self
943 .execute_save_entity::<E>(|save| {
944 save.apply_internal_structural_mutation_with_write_context(
945 MutationMode::Insert,
946 key,
947 patch,
948 write_context,
949 )
950 })
951 .map_err(QueryError::execute)?;
952 entities.push(entity);
953 }
954
955 Self::sql_write_dispatch_result(entities, statement.returning.as_ref())
956 }
957
958 fn execute_sql_update_dispatch<E>(
962 &self,
963 statement: &SqlUpdateStatement,
964 ) -> Result<SqlDispatchResult, QueryError>
965 where
966 E: PersistedRow<Canister = C> + EntityValue,
967 {
968 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
969 let selector = Self::sql_update_selector_query::<E>(statement)?;
970 let patch = Self::sql_update_patch::<E>(statement)?;
971 let write_context = SanitizeWriteContext::new(SanitizeWriteMode::Update, Timestamp::now());
972 let matched = self.execute_query(&selector)?;
973 let mut entities = Vec::with_capacity(matched.len());
974
975 for entity in matched.entities() {
978 let updated = self
979 .execute_save_entity::<E>(|save| {
980 save.apply_internal_structural_mutation_with_write_context(
981 MutationMode::Update,
982 entity.id().key(),
983 patch.clone(),
984 write_context,
985 )
986 })
987 .map_err(QueryError::execute)?;
988 entities.push(updated);
989 }
990
991 Self::sql_write_dispatch_result(entities, statement.returning.as_ref())
992 }
993
994 fn prepare_structural_sql_projection_execution(
997 &self,
998 query: StructuralQuery,
999 authority: EntityAuthority,
1000 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
1001 let (_, plan) =
1004 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
1005 let projection = plan.projection_spec(authority.model());
1006 let columns = projection_labels_from_projection_spec(&projection);
1007
1008 Ok((columns, plan))
1009 }
1010
1011 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
1015 &self,
1016 query: StructuralQuery,
1017 authority: EntityAuthority,
1018 ) -> Result<SqlProjectionPayload, QueryError> {
1019 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
1021
1022 let projected =
1025 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
1026 .map_err(QueryError::execute)?;
1027 let (rows, row_count) = projected.into_parts();
1028
1029 Ok(SqlProjectionPayload::new(columns, rows, row_count))
1030 }
1031
1032 fn execute_structural_sql_projection_text(
1036 &self,
1037 query: StructuralQuery,
1038 authority: EntityAuthority,
1039 ) -> Result<SqlDispatchResult, QueryError> {
1040 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
1042
1043 let projected =
1046 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
1047 .map_err(QueryError::execute)?;
1048 let (rows, row_count) = projected.into_parts();
1049
1050 Ok(SqlDispatchResult::ProjectionText {
1051 columns,
1052 rows,
1053 row_count,
1054 })
1055 }
1056
1057 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
1061 where
1062 E: PersistedRow<Canister = C> + EntityValue,
1063 {
1064 let plan = self
1065 .compile_query_with_visible_indexes(query)?
1066 .into_prepared_execution_plan();
1067 let deleted = self
1068 .with_metrics(|| {
1069 self.delete_executor::<E>()
1070 .execute_structural_projection(plan)
1071 })
1072 .map_err(QueryError::execute)?;
1073 let (rows, row_count) = deleted.into_parts();
1074 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
1075
1076 Ok(SqlProjectionPayload::new(
1077 projection_labels_from_fields(E::MODEL.fields()),
1078 rows,
1079 row_count,
1080 )
1081 .into_dispatch_result())
1082 }
1083
1084 fn execute_typed_sql_delete_count<E>(
1087 &self,
1088 query: &Query<E>,
1089 ) -> Result<SqlDispatchResult, QueryError>
1090 where
1091 E: PersistedRow<Canister = C> + EntityValue,
1092 {
1093 let row_count = self.execute_delete_count(query)?;
1094
1095 Ok(SqlDispatchResult::Count { row_count })
1096 }
1097
1098 fn execute_typed_sql_delete_returning<E>(
1101 &self,
1102 query: &Query<E>,
1103 returning: &SqlReturningProjection,
1104 ) -> Result<SqlDispatchResult, QueryError>
1105 where
1106 E: PersistedRow<Canister = C> + EntityValue,
1107 {
1108 let SqlDispatchResult::Projection {
1109 columns,
1110 rows,
1111 row_count,
1112 } = self.execute_typed_sql_delete(query)?
1113 else {
1114 return Err(QueryError::invariant(
1115 "typed SQL delete projection path must emit value-row projection payload",
1116 ));
1117 };
1118
1119 Self::sql_returning_dispatch_projection(columns, rows, row_count, returning)
1120 }
1121
1122 fn lowered_sql_query_dispatch_inputs_for_authority(
1125 parsed: &SqlParsedStatement,
1126 authority: EntityAuthority,
1127 unsupported_message: &'static str,
1128 ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
1129 let lowered = parsed.lower_query_lane_for_entity(
1130 authority.model().name(),
1131 authority.model().primary_key.name,
1132 )?;
1133 let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
1134 .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
1135 .transpose()?;
1136 let query = lowered
1137 .into_query()
1138 .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
1139
1140 Ok((query, projection_columns.flatten()))
1141 }
1142
1143 fn dispatch_sql_query_route_for_authority(
1147 &self,
1148 parsed: &SqlParsedStatement,
1149 authority: EntityAuthority,
1150 unsupported_message: &'static str,
1151 dispatch_select: impl FnOnce(
1152 &Self,
1153 LoweredSelectShape,
1154 EntityAuthority,
1155 bool,
1156 Option<Vec<String>>,
1157 ) -> Result<SqlDispatchResult, QueryError>,
1158 dispatch_delete: impl FnOnce(
1159 &Self,
1160 LoweredBaseQueryShape,
1161 EntityAuthority,
1162 ) -> Result<SqlDispatchResult, QueryError>,
1163 ) -> Result<SqlDispatchResult, QueryError> {
1164 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
1167 let command =
1168 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
1169
1170 return self.execute_sql_aggregate_dispatch_for_authority(
1171 command,
1172 authority,
1173 sql_aggregate_dispatch_label_override(&parsed.statement),
1174 );
1175 }
1176
1177 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
1178 return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
1179 }
1180
1181 let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
1184 parsed,
1185 authority,
1186 unsupported_message,
1187 )?;
1188 let grouped_surface = query.has_grouping();
1189
1190 match query {
1191 LoweredSqlQuery::Select(select) => {
1192 dispatch_select(self, select, authority, grouped_surface, projection_columns)
1193 }
1194 LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
1195 }
1196 }
1197
1198 fn dispatch_sql_explain_route_for_authority(
1202 &self,
1203 parsed: &SqlParsedStatement,
1204 authority: EntityAuthority,
1205 ) -> Result<SqlDispatchResult, QueryError> {
1206 if let Some((mode, plan)) =
1209 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
1210 {
1211 return self
1212 .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
1213 .map(SqlDispatchResult::Explain);
1214 }
1215
1216 let lowered = parsed.lower_query_lane_for_entity(
1219 authority.model().name(),
1220 authority.model().primary_key.name,
1221 )?;
1222 if let Some(explain) =
1223 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
1224 {
1225 return Ok(SqlDispatchResult::Explain(explain));
1226 }
1227
1228 self.explain_lowered_sql_for_authority(&lowered, authority)
1229 .map(SqlDispatchResult::Explain)
1230 }
1231
1232 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
1235 query: &Query<E>,
1236 surface: SqlGroupingSurface,
1237 ) -> Result<(), QueryError>
1238 where
1239 E: EntityKind,
1240 {
1241 match (surface, query.has_grouping()) {
1242 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
1243 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
1244 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
1245 ),
1246 }
1247 }
1248
1249 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
1251 where
1252 E: PersistedRow<Canister = C> + EntityValue,
1253 {
1254 let parsed = self.parse_sql_statement(sql)?;
1255
1256 self.execute_sql_dispatch_parsed::<E>(&parsed)
1257 }
1258
1259 pub fn execute_sql_dispatch_parsed<E>(
1261 &self,
1262 parsed: &SqlParsedStatement,
1263 ) -> Result<SqlDispatchResult, QueryError>
1264 where
1265 E: PersistedRow<Canister = C> + EntityValue,
1266 {
1267 match parsed.route() {
1268 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1269 parsed,
1270 EntityAuthority::for_type::<E>(),
1271 "execute_sql_dispatch accepts SELECT or DELETE only",
1272 |session, select, authority, grouped_surface, projection_columns| {
1273 if grouped_surface {
1274 let columns = projection_columns.ok_or_else(|| {
1275 QueryError::unsupported_query(
1276 "grouped SQL dispatch requires explicit grouped projection items",
1277 )
1278 })?;
1279
1280 return session.execute_lowered_sql_grouped_dispatch_select_core(
1281 select, authority, columns,
1282 );
1283 }
1284
1285 let payload = session.execute_lowered_sql_projection_core(select, authority)?;
1286 if let Some(columns) = projection_columns {
1287 let (_, rows, row_count) = payload.into_parts();
1288
1289 return Ok(SqlProjectionPayload::new(columns, rows, row_count)
1290 .into_dispatch_result());
1291 }
1292
1293 Ok(payload.into_dispatch_result())
1294 },
1295 |session, delete, _authority| {
1296 let SqlStatement::Delete(statement) = &parsed.statement else {
1297 return Err(QueryError::invariant(
1298 "DELETE SQL route must carry parsed DELETE statement",
1299 ));
1300 };
1301 let typed_query = bind_lowered_sql_query::<E>(
1302 LoweredSqlQuery::Delete(delete),
1303 MissingRowPolicy::Ignore,
1304 )
1305 .map_err(QueryError::from_sql_lowering_error)?;
1306
1307 match &statement.returning {
1308 Some(returning) => {
1309 session.execute_typed_sql_delete_returning(&typed_query, returning)
1310 }
1311 None => session.execute_typed_sql_delete_count(&typed_query),
1312 }
1313 },
1314 ),
1315 SqlStatementRoute::Insert { .. } => {
1316 let SqlStatement::Insert(statement) = &parsed.statement else {
1317 return Err(QueryError::invariant(
1318 "INSERT SQL route must carry parsed INSERT statement",
1319 ));
1320 };
1321
1322 self.execute_sql_insert_dispatch::<E>(statement)
1323 }
1324 SqlStatementRoute::Update { .. } => {
1325 let SqlStatement::Update(statement) = &parsed.statement else {
1326 return Err(QueryError::invariant(
1327 "UPDATE SQL route must carry parsed UPDATE statement",
1328 ));
1329 };
1330
1331 self.execute_sql_update_dispatch::<E>(statement)
1332 }
1333 SqlStatementRoute::Explain { .. } => self
1334 .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
1335 SqlStatementRoute::Describe { .. } => {
1336 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
1337 }
1338 SqlStatementRoute::ShowIndexes { .. } => {
1339 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
1340 }
1341 SqlStatementRoute::ShowColumns { .. } => {
1342 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
1343 }
1344 SqlStatementRoute::ShowEntities => {
1345 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
1346 }
1347 }
1348 }
1349
1350 #[doc(hidden)]
1357 pub fn execute_generated_query_surface_dispatch_for_authority(
1358 &self,
1359 parsed: &SqlParsedStatement,
1360 authority: EntityAuthority,
1361 ) -> Result<SqlDispatchResult, QueryError> {
1362 match parsed.route() {
1363 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1364 parsed,
1365 authority,
1366 "generated SQL query surface requires query or EXPLAIN statement lanes",
1367 |session, select, authority, grouped_surface, projection_columns| {
1368 if grouped_surface {
1369 let columns = projection_columns.ok_or_else(|| {
1370 QueryError::unsupported_query(
1371 "grouped SQL dispatch requires explicit grouped projection items",
1372 )
1373 })?;
1374
1375 return session
1376 .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
1377 }
1378
1379 let result =
1380 session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
1381 if let Some(columns) = projection_columns {
1382 let SqlDispatchResult::ProjectionText {
1383 rows, row_count, ..
1384 } = result
1385 else {
1386 return Err(QueryError::invariant(
1387 "generated scalar SQL dispatch text path must emit projection text rows",
1388 ));
1389 };
1390
1391 return Ok(SqlDispatchResult::ProjectionText {
1392 columns,
1393 rows,
1394 row_count,
1395 });
1396 }
1397
1398 Ok(result)
1399 },
1400 |session, delete, authority| {
1401 let SqlStatement::Delete(statement) = &parsed.statement else {
1402 return Err(QueryError::invariant(
1403 "DELETE SQL route must carry parsed DELETE statement",
1404 ));
1405 };
1406
1407 match &statement.returning {
1408 Some(returning) => {
1409 let SqlDispatchResult::Projection {
1410 columns,
1411 rows,
1412 row_count,
1413 } = session.execute_lowered_sql_dispatch_delete_core(&delete, authority)?
1414 else {
1415 return Err(QueryError::invariant(
1416 "generated SQL delete projection path must emit value-row projection payload",
1417 ));
1418 };
1419
1420 Self::sql_returning_dispatch_projection(
1421 columns, rows, row_count, returning,
1422 )
1423 }
1424 None => session
1425 .execute_lowered_sql_delete_count_core(&delete, authority),
1426 }
1427 },
1428 ),
1429 SqlStatementRoute::Explain { .. } => {
1430 self.dispatch_sql_explain_route_for_authority(parsed, authority)
1431 }
1432 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
1433 | SqlStatementRoute::Describe { .. }
1434 | SqlStatementRoute::ShowIndexes { .. }
1435 | SqlStatementRoute::ShowColumns { .. }
1436 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
1437 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1438 )),
1439 }
1440 }
1441
1442 #[doc(hidden)]
1448 #[must_use]
1449 pub fn execute_generated_query_surface_sql(
1450 &self,
1451 sql: &str,
1452 authorities: &[EntityAuthority],
1453 ) -> GeneratedSqlDispatchAttempt {
1454 let sql_trimmed = match trim_generated_query_sql_input(sql) {
1457 Ok(sql_trimmed) => sql_trimmed,
1458 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1459 };
1460 let parsed = match self.parse_sql_statement(sql_trimmed) {
1461 Ok(parsed) => parsed,
1462 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1463 };
1464
1465 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
1468 return GeneratedSqlDispatchAttempt::new(
1469 "",
1470 None,
1471 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
1472 authorities,
1473 ))),
1474 );
1475 }
1476 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1477 Ok(authority) => authority,
1478 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1479 };
1480
1481 let entity_name = authority.model().name();
1485 let explain_order_field = parsed
1486 .route()
1487 .is_explain()
1488 .then_some(authority.model().primary_key.name);
1489 let result = match parsed.route() {
1490 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1491 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1492 }
1493 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1494 Err(QueryError::unsupported_query(
1495 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1496 ))
1497 }
1498 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1499 self.describe_entity_model(authority.model()),
1500 )),
1501 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1502 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1503 )),
1504 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1505 self.show_columns_for_model(authority.model()),
1506 )),
1507 SqlStatementRoute::ShowEntities => unreachable!(
1508 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1509 ),
1510 };
1511
1512 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1513 }
1514}