1mod computed;
8mod lowered;
9
10use crate::{
11 db::{
12 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13 data::UpdatePatch,
14 executor::{EntityAuthority, MutationMode},
15 identifiers_tail_match,
16 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17 schema::{ValidateError, field_type_from_model_kind, literal_matches_type},
18 session::sql::{
19 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
20 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
21 computed_projection,
22 projection::{
23 SqlProjectionPayload, execute_sql_projection_rows_for_canister,
24 execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
25 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
26 },
27 },
28 sql::lowering::{
29 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
30 bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
31 lower_sql_command_from_prepared_statement, prepare_sql_statement,
32 },
33 sql::parser::{
34 SqlAggregateCall, SqlAggregateKind, SqlInsertSource, SqlInsertStatement,
35 SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlSelectItem, SqlSelectStatement,
36 SqlStatement, SqlTextFunction, SqlUpdateStatement,
37 },
38 },
39 model::{
40 entity::resolve_field_slot,
41 field::{FieldInsertGeneration, FieldKind, FieldModel},
42 },
43 sanitize::{SanitizeWriteContext, SanitizeWriteMode},
44 traits::{CanisterKind, EntityKind, EntityValue},
45 types::{Timestamp, Ulid},
46 value::Value,
47};
48
49#[cfg(feature = "perf-attribution")]
50pub use lowered::LoweredSqlDispatchExecutorAttribution;
51
52#[doc(hidden)]
61pub struct GeneratedSqlDispatchAttempt {
62 entity_name: &'static str,
63 explain_order_field: Option<&'static str>,
64 result: Result<SqlDispatchResult, QueryError>,
65}
66
67impl GeneratedSqlDispatchAttempt {
68 const fn new(
70 entity_name: &'static str,
71 explain_order_field: Option<&'static str>,
72 result: Result<SqlDispatchResult, QueryError>,
73 ) -> Self {
74 Self {
75 entity_name,
76 explain_order_field,
77 result,
78 }
79 }
80
81 #[must_use]
83 pub const fn entity_name(&self) -> &'static str {
84 self.entity_name
85 }
86
87 #[must_use]
89 pub const fn explain_order_field(&self) -> Option<&'static str> {
90 self.explain_order_field
91 }
92
93 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
95 self.result
96 }
97}
98
99#[derive(Clone, Copy, Debug, Eq, PartialEq)]
100pub(in crate::db::session::sql) enum SqlGroupingSurface {
101 Scalar,
102 Grouped,
103}
104
105const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
106 match surface {
107 SqlGroupingSurface::Scalar => {
108 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
109 }
110 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
111 }
112}
113
114fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
117 let sql_trimmed = sql.trim();
118 if sql_trimmed.is_empty() {
119 return Err(QueryError::unsupported_query(
120 "query endpoint requires a non-empty SQL string",
121 ));
122 }
123
124 Ok(sql_trimmed)
125}
126
127fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
130 let mut entities = Vec::with_capacity(authorities.len());
131
132 for authority in authorities {
133 entities.push(authority.model().name().to_string());
134 }
135
136 entities
137}
138
139fn sql_projection_labels_from_select_statement(
142 statement: &SqlStatement,
143) -> Result<Option<Vec<String>>, QueryError> {
144 let SqlStatement::Select(select) = statement else {
145 return Err(QueryError::invariant(
146 "SQL projection labels require SELECT statement shape",
147 ));
148 };
149 let SqlProjection::Items(items) = &select.projection else {
150 return Ok(None);
151 };
152
153 Ok(Some(
154 items
155 .iter()
156 .enumerate()
157 .map(|(index, item)| {
158 select
159 .projection_alias(index)
160 .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
161 })
162 .collect(),
163 ))
164}
165
166fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
169 match item {
170 SqlSelectItem::Field(field) => field.clone(),
171 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
172 SqlSelectItem::TextFunction(call) => {
173 format!(
174 "{}({})",
175 grouped_sql_text_function_name(call.function),
176 call.field
177 )
178 }
179 }
180}
181
182fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
185 let SqlStatement::Select(select) = statement else {
186 return None;
187 };
188
189 select.projection_alias(0).map(str::to_string)
190}
191
192fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
194 let kind = match aggregate.kind {
195 SqlAggregateKind::Count => "COUNT",
196 SqlAggregateKind::Sum => "SUM",
197 SqlAggregateKind::Avg => "AVG",
198 SqlAggregateKind::Min => "MIN",
199 SqlAggregateKind::Max => "MAX",
200 };
201
202 match aggregate.field.as_deref() {
203 Some(field) => format!("{kind}({field})"),
204 None => format!("{kind}(*)"),
205 }
206}
207
208const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
211 match function {
212 SqlTextFunction::Trim => "TRIM",
213 SqlTextFunction::Ltrim => "LTRIM",
214 SqlTextFunction::Rtrim => "RTRIM",
215 SqlTextFunction::Lower => "LOWER",
216 SqlTextFunction::Upper => "UPPER",
217 SqlTextFunction::Length => "LENGTH",
218 SqlTextFunction::Left => "LEFT",
219 SqlTextFunction::Right => "RIGHT",
220 SqlTextFunction::StartsWith => "STARTS_WITH",
221 SqlTextFunction::EndsWith => "ENDS_WITH",
222 SqlTextFunction::Contains => "CONTAINS",
223 SqlTextFunction::Position => "POSITION",
224 SqlTextFunction::Replace => "REPLACE",
225 SqlTextFunction::Substring => "SUBSTRING",
226 }
227}
228
229fn authority_for_generated_sql_route(
231 route: &SqlStatementRoute,
232 authorities: &[EntityAuthority],
233) -> Result<EntityAuthority, QueryError> {
234 let sql_entity = route.entity();
235
236 for authority in authorities {
237 if identifiers_tail_match(sql_entity, authority.model().name()) {
238 return Ok(*authority);
239 }
240 }
241
242 Err(unsupported_generated_sql_entity_error(
243 sql_entity,
244 authorities,
245 ))
246}
247
248fn unsupported_generated_sql_entity_error(
251 entity_name: &str,
252 authorities: &[EntityAuthority],
253) -> QueryError {
254 let mut supported = String::new();
255
256 for (index, authority) in authorities.iter().enumerate() {
257 if index != 0 {
258 supported.push_str(", ");
259 }
260
261 supported.push_str(authority.model().name());
262 }
263
264 QueryError::unsupported_query(format!(
265 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
266 ))
267}
268
269fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
272where
273 E: EntityKind,
274{
275 if identifiers_tail_match(sql_entity, E::MODEL.name()) {
276 return Ok(());
277 }
278
279 Err(QueryError::from_sql_lowering_error(
280 SqlLoweringError::EntityMismatch {
281 sql_entity: sql_entity.to_string(),
282 expected_entity: E::MODEL.name(),
283 },
284 ))
285}
286
287fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
290where
291 E: EntityKind,
292{
293 if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
294 return Ok(key);
295 }
296
297 let widened = match value {
298 Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
299 Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
300 _ => {
301 return Err(QueryError::unsupported_query(format!(
302 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
303 )));
304 }
305 };
306
307 <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
308 QueryError::unsupported_query(format!(
309 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
310 ))
311 })
312}
313
314fn sql_write_generated_field_value(field: &FieldModel) -> Option<Value> {
317 field
318 .insert_generation()
319 .map(|generation| match generation {
320 FieldInsertGeneration::Ulid => Value::Ulid(Ulid::generate()),
321 FieldInsertGeneration::Timestamp => Value::Timestamp(Timestamp::now()),
322 })
323}
324
325fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
328where
329 E: EntityKind,
330{
331 let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
332 QueryError::invariant("SQL write field must resolve against the target entity model")
333 })?;
334 let field_kind = E::MODEL.fields()[field_slot].kind();
335
336 let normalized = match (field_kind, value) {
337 (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
338 (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
339 Value::Int(v.cast_signed())
340 }
341 _ => value.clone(),
342 };
343
344 let field_type = field_type_from_model_kind(&field_kind);
345 if !literal_matches_type(&normalized, &field_type) {
346 return Err(QueryError::unsupported_query(
347 ValidateError::invalid_literal(field_name, "literal type does not match field type")
348 .to_string(),
349 ));
350 }
351
352 Ok(normalized)
353}
354
355fn reject_explicit_sql_write_to_managed_field<E>(
358 field_name: &str,
359 statement_kind: &str,
360) -> Result<(), QueryError>
361where
362 E: EntityKind,
363{
364 let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
365 return Ok(());
366 };
367 let field = &E::MODEL.fields()[field_slot];
368
369 if field.write_management().is_some() {
370 return Err(QueryError::unsupported_query(format!(
371 "SQL {statement_kind} does not allow explicit writes to managed field '{field_name}' in this release"
372 )));
373 }
374
375 Ok(())
376}
377
378fn reject_explicit_sql_insert_to_generated_field<E>(field_name: &str) -> Result<(), QueryError>
381where
382 E: EntityKind,
383{
384 let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
385 return Ok(());
386 };
387 let field = &E::MODEL.fields()[field_slot];
388
389 if field.insert_generation().is_some() {
390 return Err(QueryError::unsupported_query(format!(
391 "SQL INSERT does not allow explicit writes to generated field '{field_name}' in this release"
392 )));
393 }
394
395 Ok(())
396}
397
398fn reject_explicit_sql_update_to_generated_field<E>(field_name: &str) -> Result<(), QueryError>
401where
402 E: EntityKind,
403{
404 let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
405 return Ok(());
406 };
407 let field = &E::MODEL.fields()[field_slot];
408
409 if field.insert_generation().is_some() {
410 return Err(QueryError::unsupported_query(format!(
411 "SQL UPDATE does not allow explicit writes to generated field '{field_name}' in this release"
412 )));
413 }
414
415 Ok(())
416}
417
418fn sql_insert_field_is_omittable(field: &FieldModel) -> bool {
421 if sql_write_generated_field_value(field).is_some() {
422 return true;
423 }
424
425 field.write_management().is_some()
426}
427
428fn validate_sql_insert_required_fields<E>(columns: &[String]) -> Result<(), QueryError>
431where
432 E: EntityKind,
433{
434 let missing_required_fields = E::MODEL
435 .fields()
436 .iter()
437 .filter(|field| !columns.iter().any(|column| column == field.name()))
438 .filter(|field| !sql_insert_field_is_omittable(field))
439 .map(FieldModel::name)
440 .collect::<Vec<_>>();
441
442 if missing_required_fields.is_empty() {
443 return Ok(());
444 }
445
446 if missing_required_fields.len() == 1
447 && missing_required_fields[0] == E::MODEL.primary_key.name()
448 {
449 return Err(QueryError::unsupported_query(format!(
450 "SQL INSERT requires primary key column '{}' in this release",
451 E::MODEL.primary_key.name()
452 )));
453 }
454
455 Err(QueryError::unsupported_query(format!(
456 "SQL INSERT requires explicit values for non-generated fields {} in this release",
457 missing_required_fields.join(", ")
458 )))
459}
460
461fn sql_insert_source_width_hint<E>(source: &SqlInsertSource) -> Option<usize>
466where
467 E: EntityKind,
468{
469 match source {
470 SqlInsertSource::Values(values) => values.first().map(Vec::len),
471 SqlInsertSource::Select(select) => match &select.projection {
472 SqlProjection::All => Some(
473 E::MODEL
474 .fields()
475 .iter()
476 .filter(|field| field.write_management().is_none())
477 .count(),
478 ),
479 SqlProjection::Items(items) => Some(items.len()),
480 },
481 }
482}
483
484fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
489where
490 E: EntityKind,
491{
492 if !statement.columns.is_empty() {
493 return statement.columns.clone();
494 }
495
496 let columns: Vec<String> = E::MODEL
497 .fields()
498 .iter()
499 .filter(|field| !sql_insert_field_is_omittable(field))
500 .map(|field| field.name().to_string())
501 .collect();
502 let full_columns: Vec<String> = E::MODEL
503 .fields()
504 .iter()
505 .filter(|field| field.write_management().is_none())
506 .map(|field| field.name().to_string())
507 .collect();
508 let first_width = sql_insert_source_width_hint::<E>(&statement.source);
509
510 if first_width == Some(columns.len()) {
511 return columns;
512 }
513
514 full_columns
515}
516
517fn validate_sql_insert_value_tuple_lengths(
520 columns: &[String],
521 values: &[Vec<Value>],
522) -> Result<(), QueryError> {
523 for tuple in values {
524 if tuple.len() != columns.len() {
525 return Err(QueryError::from_sql_parse_error(
526 crate::db::sql::parser::SqlParseError::invalid_syntax(
527 "INSERT column list and VALUES tuple length must match",
528 ),
529 ));
530 }
531 }
532
533 Ok(())
534}
535
536fn validate_sql_insert_selected_rows(
539 columns: &[String],
540 rows: &[Vec<Value>],
541) -> Result<(), QueryError> {
542 for row in rows {
543 if row.len() != columns.len() {
544 return Err(QueryError::unsupported_query(
545 "SQL INSERT SELECT projection width must match the target INSERT column list in this release",
546 ));
547 }
548 }
549
550 Ok(())
551}
552
553impl<C: CanisterKind> DbSession<C> {
554 fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
557 where
558 E: PersistedRow<Canister = C> + EntityValue,
559 {
560 let mut row = Vec::with_capacity(E::MODEL.fields().len());
561
562 for index in 0..E::MODEL.fields().len() {
563 let value = entity.get_value_by_index(index).ok_or_else(|| {
564 QueryError::invariant(
565 "SQL write dispatch projection row must include every declared field",
566 )
567 })?;
568 row.push(value);
569 }
570
571 Ok(row)
572 }
573
574 fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
578 where
579 E: PersistedRow<Canister = C> + EntityValue,
580 {
581 let columns = projection_labels_from_fields(E::MODEL.fields());
582 let rows = entities
583 .into_iter()
584 .map(Self::sql_write_dispatch_row)
585 .collect::<Result<Vec<_>, _>>()?;
586 let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
587
588 Ok(SqlDispatchResult::Projection {
589 columns,
590 rows,
591 row_count,
592 })
593 }
594
595 fn sql_insert_patch_and_key<E>(
598 columns: &[String],
599 values: &[Value],
600 ) -> Result<(E::Key, UpdatePatch), QueryError>
601 where
602 E: PersistedRow<Canister = C> + EntityValue,
603 {
604 let pk_name = E::MODEL.primary_key.name;
608 let generated_fields = E::MODEL
609 .fields()
610 .iter()
611 .filter(|field| !columns.iter().any(|column| column == field.name()))
612 .filter_map(|field| {
613 sql_write_generated_field_value(field).map(|value| (field.name(), value))
614 })
615 .collect::<Vec<_>>();
616 let key = if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
617 let pk_value = values.get(pk_index).ok_or_else(|| {
618 QueryError::invariant(
619 "INSERT primary key column must align with one VALUES literal",
620 )
621 })?;
622 sql_write_key_from_literal::<E>(pk_value, pk_name)?
623 } else if let Some((_, pk_value)) = generated_fields
624 .iter()
625 .find(|(field_name, _)| *field_name == pk_name)
626 {
627 sql_write_key_from_literal::<E>(pk_value, pk_name)?
628 } else {
629 return Err(QueryError::unsupported_query(format!(
630 "SQL INSERT requires primary key column '{pk_name}' in this release"
631 )));
632 };
633
634 let mut patch = UpdatePatch::new();
637 for (field_name, generated_value) in &generated_fields {
638 patch = patch
639 .set_field(E::MODEL, field_name, generated_value.clone())
640 .map_err(QueryError::execute)?;
641 }
642 for (field, value) in columns.iter().zip(values.iter()) {
643 reject_explicit_sql_insert_to_generated_field::<E>(field)?;
644 reject_explicit_sql_write_to_managed_field::<E>(field, "INSERT")?;
645 let normalized = sql_write_value_for_field::<E>(field, value)?;
646 patch = patch
647 .set_field(E::MODEL, field, normalized)
648 .map_err(QueryError::execute)?;
649 }
650
651 Ok((key, patch))
652 }
653
654 fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
657 where
658 E: PersistedRow<Canister = C> + EntityValue,
659 {
660 let pk_name = E::MODEL.primary_key.name;
663 let mut patch = UpdatePatch::new();
664 for assignment in &statement.assignments {
665 if assignment.field == pk_name {
666 return Err(QueryError::unsupported_query(format!(
667 "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
668 )));
669 }
670 reject_explicit_sql_update_to_generated_field::<E>(assignment.field.as_str())?;
671 reject_explicit_sql_write_to_managed_field::<E>(assignment.field.as_str(), "UPDATE")?;
672 let normalized =
673 sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
674
675 patch = patch
676 .set_field(E::MODEL, assignment.field.as_str(), normalized)
677 .map_err(QueryError::execute)?;
678 }
679
680 Ok(patch)
681 }
682
683 fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
685 where
686 E: PersistedRow<Canister = C> + EntityValue,
687 {
688 let Some(predicate) = statement.predicate.clone() else {
692 return Err(QueryError::unsupported_query(
693 "SQL UPDATE requires WHERE predicate in this release",
694 ));
695 };
696 let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
697 let pk_name = E::MODEL.primary_key.name;
698 let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
699
700 if statement.order_by.is_empty() {
704 selector = selector.order_by(pk_name);
705 } else {
706 let mut orders_primary_key = false;
707
708 for term in &statement.order_by {
709 if term.field == pk_name {
710 orders_primary_key = true;
711 }
712 selector = match term.direction {
713 SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
714 SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
715 };
716 }
717
718 if !orders_primary_key {
719 selector = selector.order_by(pk_name);
720 }
721 }
722
723 if let Some(limit) = statement.limit {
726 selector = selector.limit(limit);
727 }
728 if let Some(offset) = statement.offset {
729 selector = selector.offset(offset);
730 }
731
732 Ok(selector)
733 }
734
735 fn sql_insert_select_source_statement<E>(
739 statement: &SqlInsertStatement,
740 ) -> Result<SqlSelectStatement, QueryError>
741 where
742 E: PersistedRow<Canister = C> + EntityValue,
743 {
744 let SqlInsertSource::Select(select) = statement.source.clone() else {
745 return Err(QueryError::invariant(
746 "INSERT SELECT source validation requires parsed SELECT source",
747 ));
748 };
749 let mut select = *select;
750 ensure_sql_write_entity_matches::<E>(select.entity.as_str())?;
751
752 if !select.group_by.is_empty() || !select.having.is_empty() {
753 return Err(QueryError::unsupported_query(
754 "SQL INSERT SELECT requires scalar SELECT source in this release",
755 ));
756 }
757
758 if let SqlProjection::Items(items) = &select.projection {
759 for item in items {
760 if matches!(item, SqlSelectItem::Aggregate(_)) {
761 return Err(QueryError::unsupported_query(
762 "SQL INSERT SELECT does not support aggregate source projection in this release",
763 ));
764 }
765 }
766 }
767
768 let pk_name = E::MODEL.primary_key.name;
769 if select.order_by.is_empty() || !select.order_by.iter().any(|term| term.field == pk_name) {
770 select.order_by.push(SqlOrderTerm {
771 field: pk_name.to_string(),
772 direction: SqlOrderDirection::Asc,
773 });
774 }
775
776 Ok(select)
777 }
778
779 fn execute_sql_insert_select_source_rows<E>(
783 &self,
784 source: &SqlSelectStatement,
785 ) -> Result<Vec<Vec<Value>>, QueryError>
786 where
787 E: PersistedRow<Canister = C> + EntityValue,
788 {
789 if let Some(plan) = computed_projection::computed_sql_projection_plan(
793 &SqlStatement::Select(source.clone()),
794 )? {
795 let result = self.execute_computed_sql_projection_dispatch_for_authority(
796 plan,
797 EntityAuthority::for_type::<E>(),
798 )?;
799
800 return match result {
801 SqlDispatchResult::Projection { rows, .. } => Ok(rows),
802 other => Err(QueryError::invariant(format!(
803 "INSERT SELECT computed source must produce projection rows, found {other:?}",
804 ))),
805 };
806 }
807
808 let prepared = prepare_sql_statement(SqlStatement::Select(source.clone()), E::MODEL.name())
811 .map_err(QueryError::from_sql_lowering_error)?;
812 let lowered =
813 lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)
814 .map_err(QueryError::from_sql_lowering_error)?;
815 let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
816 return Err(QueryError::invariant(
817 "INSERT SELECT source lowering must stay on the scalar SELECT query lane",
818 ));
819 };
820
821 let payload =
822 self.execute_lowered_sql_projection_core(select, EntityAuthority::for_type::<E>())?;
823 let (_, rows, _) = payload.into_parts();
824
825 Ok(rows)
826 }
827
828 fn execute_sql_insert_dispatch<E>(
831 &self,
832 statement: &SqlInsertStatement,
833 ) -> Result<SqlDispatchResult, QueryError>
834 where
835 E: PersistedRow<Canister = C> + EntityValue,
836 {
837 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
838 let columns = sql_insert_columns::<E>(statement);
839 validate_sql_insert_required_fields::<E>(columns.as_slice())?;
840 let write_context = SanitizeWriteContext::new(SanitizeWriteMode::Insert, Timestamp::now());
841 let source_rows = match &statement.source {
842 SqlInsertSource::Values(values) => {
843 validate_sql_insert_value_tuple_lengths(columns.as_slice(), values.as_slice())?;
844 values.clone()
845 }
846 SqlInsertSource::Select(_) => {
847 let source = Self::sql_insert_select_source_statement::<E>(statement)?;
848 let rows = self.execute_sql_insert_select_source_rows::<E>(&source)?;
849 validate_sql_insert_selected_rows(columns.as_slice(), rows.as_slice())?;
850
851 rows
852 }
853 };
854 let mut entities = Vec::with_capacity(source_rows.len());
855
856 for values in &source_rows {
857 let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
858 let entity = self
859 .execute_save_entity::<E>(|save| {
860 save.apply_internal_structural_mutation_with_write_context(
861 MutationMode::Insert,
862 key,
863 patch,
864 write_context,
865 )
866 })
867 .map_err(QueryError::execute)?;
868 entities.push(entity);
869 }
870
871 Self::sql_write_dispatch_projection(entities)
872 }
873
874 fn execute_sql_update_dispatch<E>(
878 &self,
879 statement: &SqlUpdateStatement,
880 ) -> Result<SqlDispatchResult, QueryError>
881 where
882 E: PersistedRow<Canister = C> + EntityValue,
883 {
884 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
885 let selector = Self::sql_update_selector_query::<E>(statement)?;
886 let patch = Self::sql_update_patch::<E>(statement)?;
887 let write_context = SanitizeWriteContext::new(SanitizeWriteMode::Update, Timestamp::now());
888 let matched = self.execute_query(&selector)?;
889 let mut entities = Vec::with_capacity(matched.len());
890
891 for entity in matched.entities() {
894 let updated = self
895 .execute_save_entity::<E>(|save| {
896 save.apply_internal_structural_mutation_with_write_context(
897 MutationMode::Update,
898 entity.id().key(),
899 patch.clone(),
900 write_context,
901 )
902 })
903 .map_err(QueryError::execute)?;
904 entities.push(updated);
905 }
906
907 Self::sql_write_dispatch_projection(entities)
908 }
909
910 fn prepare_structural_sql_projection_execution(
913 &self,
914 query: StructuralQuery,
915 authority: EntityAuthority,
916 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
917 let (_, plan) =
920 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
921 let projection = plan.projection_spec(authority.model());
922 let columns = projection_labels_from_projection_spec(&projection);
923
924 Ok((columns, plan))
925 }
926
927 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
931 &self,
932 query: StructuralQuery,
933 authority: EntityAuthority,
934 ) -> Result<SqlProjectionPayload, QueryError> {
935 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
937
938 let projected =
941 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
942 .map_err(QueryError::execute)?;
943 let (rows, row_count) = projected.into_parts();
944
945 Ok(SqlProjectionPayload::new(columns, rows, row_count))
946 }
947
948 fn execute_structural_sql_projection_text(
952 &self,
953 query: StructuralQuery,
954 authority: EntityAuthority,
955 ) -> Result<SqlDispatchResult, QueryError> {
956 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
958
959 let projected =
962 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
963 .map_err(QueryError::execute)?;
964 let (rows, row_count) = projected.into_parts();
965
966 Ok(SqlDispatchResult::ProjectionText {
967 columns,
968 rows,
969 row_count,
970 })
971 }
972
973 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
977 where
978 E: PersistedRow<Canister = C> + EntityValue,
979 {
980 let plan = self
981 .compile_query_with_visible_indexes(query)?
982 .into_prepared_execution_plan();
983 let deleted = self
984 .with_metrics(|| {
985 self.delete_executor::<E>()
986 .execute_structural_projection(plan)
987 })
988 .map_err(QueryError::execute)?;
989 let (rows, row_count) = deleted.into_parts();
990 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
991
992 Ok(SqlProjectionPayload::new(
993 projection_labels_from_fields(E::MODEL.fields()),
994 rows,
995 row_count,
996 )
997 .into_dispatch_result())
998 }
999
1000 fn lowered_sql_query_dispatch_inputs_for_authority(
1003 parsed: &SqlParsedStatement,
1004 authority: EntityAuthority,
1005 unsupported_message: &'static str,
1006 ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
1007 let lowered = parsed.lower_query_lane_for_entity(
1008 authority.model().name(),
1009 authority.model().primary_key.name,
1010 )?;
1011 let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
1012 .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
1013 .transpose()?;
1014 let query = lowered
1015 .into_query()
1016 .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
1017
1018 Ok((query, projection_columns.flatten()))
1019 }
1020
1021 fn dispatch_sql_query_route_for_authority(
1025 &self,
1026 parsed: &SqlParsedStatement,
1027 authority: EntityAuthority,
1028 unsupported_message: &'static str,
1029 dispatch_select: impl FnOnce(
1030 &Self,
1031 LoweredSelectShape,
1032 EntityAuthority,
1033 bool,
1034 Option<Vec<String>>,
1035 ) -> Result<SqlDispatchResult, QueryError>,
1036 dispatch_delete: impl FnOnce(
1037 &Self,
1038 LoweredBaseQueryShape,
1039 EntityAuthority,
1040 ) -> Result<SqlDispatchResult, QueryError>,
1041 ) -> Result<SqlDispatchResult, QueryError> {
1042 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
1045 let command =
1046 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
1047
1048 return self.execute_sql_aggregate_dispatch_for_authority(
1049 command,
1050 authority,
1051 sql_aggregate_dispatch_label_override(&parsed.statement),
1052 );
1053 }
1054
1055 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
1056 return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
1057 }
1058
1059 let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
1062 parsed,
1063 authority,
1064 unsupported_message,
1065 )?;
1066 let grouped_surface = query.has_grouping();
1067
1068 match query {
1069 LoweredSqlQuery::Select(select) => {
1070 dispatch_select(self, select, authority, grouped_surface, projection_columns)
1071 }
1072 LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
1073 }
1074 }
1075
1076 fn dispatch_sql_explain_route_for_authority(
1080 &self,
1081 parsed: &SqlParsedStatement,
1082 authority: EntityAuthority,
1083 ) -> Result<SqlDispatchResult, QueryError> {
1084 if let Some((mode, plan)) =
1087 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
1088 {
1089 return self
1090 .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
1091 .map(SqlDispatchResult::Explain);
1092 }
1093
1094 let lowered = parsed.lower_query_lane_for_entity(
1097 authority.model().name(),
1098 authority.model().primary_key.name,
1099 )?;
1100 if let Some(explain) =
1101 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
1102 {
1103 return Ok(SqlDispatchResult::Explain(explain));
1104 }
1105
1106 self.explain_lowered_sql_for_authority(&lowered, authority)
1107 .map(SqlDispatchResult::Explain)
1108 }
1109
1110 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
1113 query: &Query<E>,
1114 surface: SqlGroupingSurface,
1115 ) -> Result<(), QueryError>
1116 where
1117 E: EntityKind,
1118 {
1119 match (surface, query.has_grouping()) {
1120 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
1121 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
1122 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
1123 ),
1124 }
1125 }
1126
1127 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
1129 where
1130 E: PersistedRow<Canister = C> + EntityValue,
1131 {
1132 let parsed = self.parse_sql_statement(sql)?;
1133
1134 self.execute_sql_dispatch_parsed::<E>(&parsed)
1135 }
1136
1137 pub fn execute_sql_dispatch_parsed<E>(
1139 &self,
1140 parsed: &SqlParsedStatement,
1141 ) -> Result<SqlDispatchResult, QueryError>
1142 where
1143 E: PersistedRow<Canister = C> + EntityValue,
1144 {
1145 match parsed.route() {
1146 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1147 parsed,
1148 EntityAuthority::for_type::<E>(),
1149 "execute_sql_dispatch accepts SELECT or DELETE only",
1150 |session, select, authority, grouped_surface, projection_columns| {
1151 if grouped_surface {
1152 let columns = projection_columns.ok_or_else(|| {
1153 QueryError::unsupported_query(
1154 "grouped SQL dispatch requires explicit grouped projection items",
1155 )
1156 })?;
1157
1158 return session.execute_lowered_sql_grouped_dispatch_select_core(
1159 select, authority, columns,
1160 );
1161 }
1162
1163 let payload = session.execute_lowered_sql_projection_core(select, authority)?;
1164 if let Some(columns) = projection_columns {
1165 let (_, rows, row_count) = payload.into_parts();
1166
1167 return Ok(SqlProjectionPayload::new(columns, rows, row_count)
1168 .into_dispatch_result());
1169 }
1170
1171 Ok(payload.into_dispatch_result())
1172 },
1173 |session, delete, _authority| {
1174 let typed_query = bind_lowered_sql_query::<E>(
1175 LoweredSqlQuery::Delete(delete),
1176 MissingRowPolicy::Ignore,
1177 )
1178 .map_err(QueryError::from_sql_lowering_error)?;
1179
1180 session.execute_typed_sql_delete(&typed_query)
1181 },
1182 ),
1183 SqlStatementRoute::Insert { .. } => {
1184 let SqlStatement::Insert(statement) = &parsed.statement else {
1185 return Err(QueryError::invariant(
1186 "INSERT SQL route must carry parsed INSERT statement",
1187 ));
1188 };
1189
1190 self.execute_sql_insert_dispatch::<E>(statement)
1191 }
1192 SqlStatementRoute::Update { .. } => {
1193 let SqlStatement::Update(statement) = &parsed.statement else {
1194 return Err(QueryError::invariant(
1195 "UPDATE SQL route must carry parsed UPDATE statement",
1196 ));
1197 };
1198
1199 self.execute_sql_update_dispatch::<E>(statement)
1200 }
1201 SqlStatementRoute::Explain { .. } => self
1202 .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
1203 SqlStatementRoute::Describe { .. } => {
1204 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
1205 }
1206 SqlStatementRoute::ShowIndexes { .. } => {
1207 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
1208 }
1209 SqlStatementRoute::ShowColumns { .. } => {
1210 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
1211 }
1212 SqlStatementRoute::ShowEntities => {
1213 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
1214 }
1215 }
1216 }
1217
1218 #[doc(hidden)]
1225 pub fn execute_generated_query_surface_dispatch_for_authority(
1226 &self,
1227 parsed: &SqlParsedStatement,
1228 authority: EntityAuthority,
1229 ) -> Result<SqlDispatchResult, QueryError> {
1230 match parsed.route() {
1231 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1232 parsed,
1233 authority,
1234 "generated SQL query surface requires query or EXPLAIN statement lanes",
1235 |session, select, authority, grouped_surface, projection_columns| {
1236 if grouped_surface {
1237 let columns = projection_columns.ok_or_else(|| {
1238 QueryError::unsupported_query(
1239 "grouped SQL dispatch requires explicit grouped projection items",
1240 )
1241 })?;
1242
1243 return session
1244 .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
1245 }
1246
1247 let result =
1248 session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
1249 if let Some(columns) = projection_columns {
1250 let SqlDispatchResult::ProjectionText {
1251 rows, row_count, ..
1252 } = result
1253 else {
1254 return Err(QueryError::invariant(
1255 "generated scalar SQL dispatch text path must emit projection text rows",
1256 ));
1257 };
1258
1259 return Ok(SqlDispatchResult::ProjectionText {
1260 columns,
1261 rows,
1262 row_count,
1263 });
1264 }
1265
1266 Ok(result)
1267 },
1268 |session, delete, authority| {
1269 session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
1270 },
1271 ),
1272 SqlStatementRoute::Explain { .. } => {
1273 self.dispatch_sql_explain_route_for_authority(parsed, authority)
1274 }
1275 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
1276 | SqlStatementRoute::Describe { .. }
1277 | SqlStatementRoute::ShowIndexes { .. }
1278 | SqlStatementRoute::ShowColumns { .. }
1279 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
1280 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1281 )),
1282 }
1283 }
1284
1285 #[doc(hidden)]
1291 #[must_use]
1292 pub fn execute_generated_query_surface_sql(
1293 &self,
1294 sql: &str,
1295 authorities: &[EntityAuthority],
1296 ) -> GeneratedSqlDispatchAttempt {
1297 let sql_trimmed = match trim_generated_query_sql_input(sql) {
1300 Ok(sql_trimmed) => sql_trimmed,
1301 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1302 };
1303 let parsed = match self.parse_sql_statement(sql_trimmed) {
1304 Ok(parsed) => parsed,
1305 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1306 };
1307
1308 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
1311 return GeneratedSqlDispatchAttempt::new(
1312 "",
1313 None,
1314 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
1315 authorities,
1316 ))),
1317 );
1318 }
1319 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1320 Ok(authority) => authority,
1321 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1322 };
1323
1324 let entity_name = authority.model().name();
1328 let explain_order_field = parsed
1329 .route()
1330 .is_explain()
1331 .then_some(authority.model().primary_key.name);
1332 let result = match parsed.route() {
1333 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1334 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1335 }
1336 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1337 Err(QueryError::unsupported_query(
1338 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1339 ))
1340 }
1341 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1342 self.describe_entity_model(authority.model()),
1343 )),
1344 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1345 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1346 )),
1347 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1348 self.show_columns_for_model(authority.model()),
1349 )),
1350 SqlStatementRoute::ShowEntities => unreachable!(
1351 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1352 ),
1353 };
1354
1355 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1356 }
1357}