1mod computed;
8mod lowered;
9
10use crate::{
11 db::{
12 DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13 data::UpdatePatch,
14 executor::{EntityAuthority, MutationMode},
15 identifiers_tail_match,
16 query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17 session::sql::{
18 SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
19 aggregate::parsed_requires_dedicated_sql_aggregate_lane,
20 computed_projection,
21 projection::{
22 SqlProjectionPayload, execute_sql_projection_rows_for_canister,
23 execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
24 projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25 },
26 },
27 sql::lowering::{
28 LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
29 bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
30 lower_sql_command_from_prepared_statement, prepare_sql_statement,
31 },
32 sql::parser::{
33 SqlAggregateCall, SqlAggregateKind, SqlInsertSource, SqlInsertStatement,
34 SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlSelectItem, SqlSelectStatement,
35 SqlStatement, SqlTextFunction, SqlUpdateStatement,
36 },
37 },
38 model::{
39 entity::resolve_field_slot,
40 field::{FieldInsertGeneration, FieldKind, FieldModel, FieldWriteManagement},
41 },
42 traits::{CanisterKind, EntityKind, EntityValue},
43 types::{Timestamp, Ulid},
44 value::Value,
45};
46
47#[cfg(feature = "perf-attribution")]
48pub use lowered::LoweredSqlDispatchExecutorAttribution;
49
50#[doc(hidden)]
59pub struct GeneratedSqlDispatchAttempt {
60 entity_name: &'static str,
61 explain_order_field: Option<&'static str>,
62 result: Result<SqlDispatchResult, QueryError>,
63}
64
65impl GeneratedSqlDispatchAttempt {
66 const fn new(
68 entity_name: &'static str,
69 explain_order_field: Option<&'static str>,
70 result: Result<SqlDispatchResult, QueryError>,
71 ) -> Self {
72 Self {
73 entity_name,
74 explain_order_field,
75 result,
76 }
77 }
78
79 #[must_use]
81 pub const fn entity_name(&self) -> &'static str {
82 self.entity_name
83 }
84
85 #[must_use]
87 pub const fn explain_order_field(&self) -> Option<&'static str> {
88 self.explain_order_field
89 }
90
91 pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
93 self.result
94 }
95}
96
97#[derive(Clone, Copy, Debug, Eq, PartialEq)]
98pub(in crate::db::session::sql) enum SqlGroupingSurface {
99 Scalar,
100 Grouped,
101}
102
103const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
104 match surface {
105 SqlGroupingSurface::Scalar => {
106 "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
107 }
108 SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
109 }
110}
111
112fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
115 let sql_trimmed = sql.trim();
116 if sql_trimmed.is_empty() {
117 return Err(QueryError::unsupported_query(
118 "query endpoint requires a non-empty SQL string",
119 ));
120 }
121
122 Ok(sql_trimmed)
123}
124
125fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
128 let mut entities = Vec::with_capacity(authorities.len());
129
130 for authority in authorities {
131 entities.push(authority.model().name().to_string());
132 }
133
134 entities
135}
136
137fn sql_projection_labels_from_select_statement(
140 statement: &SqlStatement,
141) -> Result<Option<Vec<String>>, QueryError> {
142 let SqlStatement::Select(select) = statement else {
143 return Err(QueryError::invariant(
144 "SQL projection labels require SELECT statement shape",
145 ));
146 };
147 let SqlProjection::Items(items) = &select.projection else {
148 return Ok(None);
149 };
150
151 Ok(Some(
152 items
153 .iter()
154 .enumerate()
155 .map(|(index, item)| {
156 select
157 .projection_alias(index)
158 .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
159 })
160 .collect(),
161 ))
162}
163
164fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
167 match item {
168 SqlSelectItem::Field(field) => field.clone(),
169 SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
170 SqlSelectItem::TextFunction(call) => {
171 format!(
172 "{}({})",
173 grouped_sql_text_function_name(call.function),
174 call.field
175 )
176 }
177 }
178}
179
180fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
183 let SqlStatement::Select(select) = statement else {
184 return None;
185 };
186
187 select.projection_alias(0).map(str::to_string)
188}
189
190fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
192 let kind = match aggregate.kind {
193 SqlAggregateKind::Count => "COUNT",
194 SqlAggregateKind::Sum => "SUM",
195 SqlAggregateKind::Avg => "AVG",
196 SqlAggregateKind::Min => "MIN",
197 SqlAggregateKind::Max => "MAX",
198 };
199
200 match aggregate.field.as_deref() {
201 Some(field) => format!("{kind}({field})"),
202 None => format!("{kind}(*)"),
203 }
204}
205
206const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
209 match function {
210 SqlTextFunction::Trim => "TRIM",
211 SqlTextFunction::Ltrim => "LTRIM",
212 SqlTextFunction::Rtrim => "RTRIM",
213 SqlTextFunction::Lower => "LOWER",
214 SqlTextFunction::Upper => "UPPER",
215 SqlTextFunction::Length => "LENGTH",
216 SqlTextFunction::Left => "LEFT",
217 SqlTextFunction::Right => "RIGHT",
218 SqlTextFunction::StartsWith => "STARTS_WITH",
219 SqlTextFunction::EndsWith => "ENDS_WITH",
220 SqlTextFunction::Contains => "CONTAINS",
221 SqlTextFunction::Position => "POSITION",
222 SqlTextFunction::Replace => "REPLACE",
223 SqlTextFunction::Substring => "SUBSTRING",
224 }
225}
226
227fn authority_for_generated_sql_route(
229 route: &SqlStatementRoute,
230 authorities: &[EntityAuthority],
231) -> Result<EntityAuthority, QueryError> {
232 let sql_entity = route.entity();
233
234 for authority in authorities {
235 if identifiers_tail_match(sql_entity, authority.model().name()) {
236 return Ok(*authority);
237 }
238 }
239
240 Err(unsupported_generated_sql_entity_error(
241 sql_entity,
242 authorities,
243 ))
244}
245
246fn unsupported_generated_sql_entity_error(
249 entity_name: &str,
250 authorities: &[EntityAuthority],
251) -> QueryError {
252 let mut supported = String::new();
253
254 for (index, authority) in authorities.iter().enumerate() {
255 if index != 0 {
256 supported.push_str(", ");
257 }
258
259 supported.push_str(authority.model().name());
260 }
261
262 QueryError::unsupported_query(format!(
263 "query endpoint does not support entity '{entity_name}'; supported: {supported}"
264 ))
265}
266
267fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
270where
271 E: EntityKind,
272{
273 if identifiers_tail_match(sql_entity, E::MODEL.name()) {
274 return Ok(());
275 }
276
277 Err(QueryError::from_sql_lowering_error(
278 SqlLoweringError::EntityMismatch {
279 sql_entity: sql_entity.to_string(),
280 expected_entity: E::MODEL.name(),
281 },
282 ))
283}
284
285fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
288where
289 E: EntityKind,
290{
291 if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
292 return Ok(key);
293 }
294
295 let widened = match value {
296 Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
297 Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
298 _ => {
299 return Err(QueryError::unsupported_query(format!(
300 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
301 )));
302 }
303 };
304
305 <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
306 QueryError::unsupported_query(format!(
307 "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
308 ))
309 })
310}
311
312fn sql_write_generated_field_value(field: &FieldModel) -> Option<Value> {
315 field
316 .insert_generation()
317 .map(|FieldInsertGeneration::Ulid| Value::Ulid(Ulid::generate()))
318}
319
320fn sql_write_managed_insert_field_value(field: &FieldModel, now: &Value) -> Option<Value> {
323 match field.write_management() {
324 Some(FieldWriteManagement::CreatedAt | FieldWriteManagement::UpdatedAt) => {
325 Some(now.clone())
326 }
327 None => None,
328 }
329}
330
331fn sql_write_managed_update_field_value(field: &FieldModel, now: &Value) -> Option<Value> {
334 match field.write_management() {
335 Some(FieldWriteManagement::UpdatedAt) => Some(now.clone()),
336 Some(FieldWriteManagement::CreatedAt) | None => None,
337 }
338}
339
340fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
343where
344 E: EntityKind,
345{
346 let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
347 QueryError::invariant("SQL write field must resolve against the target entity model")
348 })?;
349 let field_kind = E::MODEL.fields()[field_slot].kind();
350
351 let normalized = match (field_kind, value) {
352 (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
353 (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
354 Value::Int(v.cast_signed())
355 }
356 _ => value.clone(),
357 };
358
359 Ok(normalized)
360}
361
362fn reject_explicit_sql_write_to_managed_field<E>(
365 field_name: &str,
366 statement_kind: &str,
367) -> Result<(), QueryError>
368where
369 E: EntityKind,
370{
371 let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
372 return Ok(());
373 };
374 let field = &E::MODEL.fields()[field_slot];
375
376 if field.write_management().is_some() {
377 return Err(QueryError::unsupported_query(format!(
378 "SQL {statement_kind} does not allow explicit writes to managed field '{field_name}' in this release"
379 )));
380 }
381
382 Ok(())
383}
384
385fn sql_insert_field_is_omittable(field: &FieldModel) -> bool {
388 if sql_write_generated_field_value(field).is_some() {
389 return true;
390 }
391
392 field.write_management().is_some()
393}
394
395fn validate_sql_insert_required_fields<E>(columns: &[String]) -> Result<(), QueryError>
398where
399 E: EntityKind,
400{
401 let missing_required_fields = E::MODEL
402 .fields()
403 .iter()
404 .filter(|field| !columns.iter().any(|column| column == field.name()))
405 .filter(|field| !sql_insert_field_is_omittable(field))
406 .map(FieldModel::name)
407 .collect::<Vec<_>>();
408
409 if missing_required_fields.is_empty() {
410 return Ok(());
411 }
412
413 if missing_required_fields.len() == 1
414 && missing_required_fields[0] == E::MODEL.primary_key.name()
415 {
416 return Err(QueryError::unsupported_query(format!(
417 "SQL INSERT requires primary key column '{}' in this release",
418 E::MODEL.primary_key.name()
419 )));
420 }
421
422 Err(QueryError::unsupported_query(format!(
423 "SQL INSERT requires explicit values for non-generated fields {} in this release",
424 missing_required_fields.join(", ")
425 )))
426}
427
428fn sql_insert_source_width_hint<E>(source: &SqlInsertSource) -> Option<usize>
433where
434 E: EntityKind,
435{
436 match source {
437 SqlInsertSource::Values(values) => values.first().map(Vec::len),
438 SqlInsertSource::Select(select) => match &select.projection {
439 SqlProjection::All => Some(
440 E::MODEL
441 .fields()
442 .iter()
443 .filter(|field| field.write_management().is_none())
444 .count(),
445 ),
446 SqlProjection::Items(items) => Some(items.len()),
447 },
448 }
449}
450
451fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
456where
457 E: EntityKind,
458{
459 if !statement.columns.is_empty() {
460 return statement.columns.clone();
461 }
462
463 let columns: Vec<String> = E::MODEL
464 .fields()
465 .iter()
466 .filter(|field| !sql_insert_field_is_omittable(field))
467 .map(|field| field.name().to_string())
468 .collect();
469 let full_columns: Vec<String> = E::MODEL
470 .fields()
471 .iter()
472 .filter(|field| field.write_management().is_none())
473 .map(|field| field.name().to_string())
474 .collect();
475 let first_width = sql_insert_source_width_hint::<E>(&statement.source);
476
477 if first_width == Some(columns.len()) {
478 return columns;
479 }
480
481 full_columns
482}
483
484fn validate_sql_insert_value_tuple_lengths(
487 columns: &[String],
488 values: &[Vec<Value>],
489) -> Result<(), QueryError> {
490 for tuple in values {
491 if tuple.len() != columns.len() {
492 return Err(QueryError::from_sql_parse_error(
493 crate::db::sql::parser::SqlParseError::invalid_syntax(
494 "INSERT column list and VALUES tuple length must match",
495 ),
496 ));
497 }
498 }
499
500 Ok(())
501}
502
503fn validate_sql_insert_selected_rows(
506 columns: &[String],
507 rows: &[Vec<Value>],
508) -> Result<(), QueryError> {
509 for row in rows {
510 if row.len() != columns.len() {
511 return Err(QueryError::unsupported_query(
512 "SQL INSERT SELECT projection width must match the target INSERT column list in this release",
513 ));
514 }
515 }
516
517 Ok(())
518}
519
520impl<C: CanisterKind> DbSession<C> {
521 fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
524 where
525 E: PersistedRow<Canister = C> + EntityValue,
526 {
527 let mut row = Vec::with_capacity(E::MODEL.fields().len());
528
529 for index in 0..E::MODEL.fields().len() {
530 let value = entity.get_value_by_index(index).ok_or_else(|| {
531 QueryError::invariant(
532 "SQL write dispatch projection row must include every declared field",
533 )
534 })?;
535 row.push(value);
536 }
537
538 Ok(row)
539 }
540
541 fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
545 where
546 E: PersistedRow<Canister = C> + EntityValue,
547 {
548 let columns = projection_labels_from_fields(E::MODEL.fields());
549 let rows = entities
550 .into_iter()
551 .map(Self::sql_write_dispatch_row)
552 .collect::<Result<Vec<_>, _>>()?;
553 let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
554
555 Ok(SqlDispatchResult::Projection {
556 columns,
557 rows,
558 row_count,
559 })
560 }
561
562 fn sql_insert_patch_and_key<E>(
565 columns: &[String],
566 values: &[Value],
567 ) -> Result<(E::Key, UpdatePatch), QueryError>
568 where
569 E: PersistedRow<Canister = C> + EntityValue,
570 {
571 let pk_name = E::MODEL.primary_key.name;
575 let generated_fields = E::MODEL
576 .fields()
577 .iter()
578 .filter(|field| !columns.iter().any(|column| column == field.name()))
579 .filter_map(|field| {
580 sql_write_generated_field_value(field).map(|value| (field.name(), value))
581 })
582 .collect::<Vec<_>>();
583 let key = if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
584 let pk_value = values.get(pk_index).ok_or_else(|| {
585 QueryError::invariant(
586 "INSERT primary key column must align with one VALUES literal",
587 )
588 })?;
589 sql_write_key_from_literal::<E>(pk_value, pk_name)?
590 } else if let Some((_, pk_value)) = generated_fields
591 .iter()
592 .find(|(field_name, _)| *field_name == pk_name)
593 {
594 sql_write_key_from_literal::<E>(pk_value, pk_name)?
595 } else {
596 return Err(QueryError::unsupported_query(format!(
597 "SQL INSERT requires primary key column '{pk_name}' in this release"
598 )));
599 };
600
601 let mut patch = UpdatePatch::new();
604 for (field_name, generated_value) in &generated_fields {
605 patch = patch
606 .set_field(E::MODEL, field_name, generated_value.clone())
607 .map_err(QueryError::execute)?;
608 }
609 for (field, value) in columns.iter().zip(values.iter()) {
610 reject_explicit_sql_write_to_managed_field::<E>(field, "INSERT")?;
611 let normalized = sql_write_value_for_field::<E>(field, value)?;
612 patch = patch
613 .set_field(E::MODEL, field, normalized)
614 .map_err(QueryError::execute)?;
615 }
616
617 let now = Value::Timestamp(Timestamp::now());
621 for field in E::MODEL.fields() {
622 let Some(value) = sql_write_managed_insert_field_value(field, &now) else {
623 continue;
624 };
625
626 patch = patch
627 .set_field(E::MODEL, field.name(), value)
628 .map_err(QueryError::execute)?;
629 }
630
631 Ok((key, patch))
632 }
633
634 fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
637 where
638 E: PersistedRow<Canister = C> + EntityValue,
639 {
640 let pk_name = E::MODEL.primary_key.name;
643 let mut patch = UpdatePatch::new();
644 for assignment in &statement.assignments {
645 if assignment.field == pk_name {
646 return Err(QueryError::unsupported_query(format!(
647 "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
648 )));
649 }
650 reject_explicit_sql_write_to_managed_field::<E>(assignment.field.as_str(), "UPDATE")?;
651 let normalized =
652 sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
653
654 patch = patch
655 .set_field(E::MODEL, assignment.field.as_str(), normalized)
656 .map_err(QueryError::execute)?;
657 }
658
659 let now = Value::Timestamp(Timestamp::now());
662 for field in E::MODEL.fields() {
663 let Some(value) = sql_write_managed_update_field_value(field, &now) else {
664 continue;
665 };
666
667 patch = patch
668 .set_field(E::MODEL, field.name(), value)
669 .map_err(QueryError::execute)?;
670 }
671
672 Ok(patch)
673 }
674
675 fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
677 where
678 E: PersistedRow<Canister = C> + EntityValue,
679 {
680 let Some(predicate) = statement.predicate.clone() else {
684 return Err(QueryError::unsupported_query(
685 "SQL UPDATE requires WHERE predicate in this release",
686 ));
687 };
688 let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
689 let pk_name = E::MODEL.primary_key.name;
690 let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
691
692 if statement.order_by.is_empty() {
696 selector = selector.order_by(pk_name);
697 } else {
698 let mut orders_primary_key = false;
699
700 for term in &statement.order_by {
701 if term.field == pk_name {
702 orders_primary_key = true;
703 }
704 selector = match term.direction {
705 SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
706 SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
707 };
708 }
709
710 if !orders_primary_key {
711 selector = selector.order_by(pk_name);
712 }
713 }
714
715 if let Some(limit) = statement.limit {
718 selector = selector.limit(limit);
719 }
720 if let Some(offset) = statement.offset {
721 selector = selector.offset(offset);
722 }
723
724 Ok(selector)
725 }
726
727 fn sql_insert_select_source_statement<E>(
731 statement: &SqlInsertStatement,
732 ) -> Result<SqlSelectStatement, QueryError>
733 where
734 E: PersistedRow<Canister = C> + EntityValue,
735 {
736 let SqlInsertSource::Select(select) = statement.source.clone() else {
737 return Err(QueryError::invariant(
738 "INSERT SELECT source validation requires parsed SELECT source",
739 ));
740 };
741 let mut select = *select;
742 ensure_sql_write_entity_matches::<E>(select.entity.as_str())?;
743
744 if !select.group_by.is_empty() || !select.having.is_empty() {
745 return Err(QueryError::unsupported_query(
746 "SQL INSERT SELECT requires scalar SELECT source in this release",
747 ));
748 }
749
750 if let SqlProjection::Items(items) = &select.projection {
751 for item in items {
752 if matches!(item, SqlSelectItem::Aggregate(_)) {
753 return Err(QueryError::unsupported_query(
754 "SQL INSERT SELECT does not support aggregate source projection in this release",
755 ));
756 }
757 }
758 }
759
760 let pk_name = E::MODEL.primary_key.name;
761 if select.order_by.is_empty() || !select.order_by.iter().any(|term| term.field == pk_name) {
762 select.order_by.push(SqlOrderTerm {
763 field: pk_name.to_string(),
764 direction: SqlOrderDirection::Asc,
765 });
766 }
767
768 Ok(select)
769 }
770
771 fn execute_sql_insert_select_source_rows<E>(
775 &self,
776 source: &SqlSelectStatement,
777 ) -> Result<Vec<Vec<Value>>, QueryError>
778 where
779 E: PersistedRow<Canister = C> + EntityValue,
780 {
781 if let Some(plan) = computed_projection::computed_sql_projection_plan(
785 &SqlStatement::Select(source.clone()),
786 )? {
787 let result = self.execute_computed_sql_projection_dispatch_for_authority(
788 plan,
789 EntityAuthority::for_type::<E>(),
790 )?;
791
792 return match result {
793 SqlDispatchResult::Projection { rows, .. } => Ok(rows),
794 other => Err(QueryError::invariant(format!(
795 "INSERT SELECT computed source must produce projection rows, found {other:?}",
796 ))),
797 };
798 }
799
800 let prepared = prepare_sql_statement(SqlStatement::Select(source.clone()), E::MODEL.name())
803 .map_err(QueryError::from_sql_lowering_error)?;
804 let lowered =
805 lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)
806 .map_err(QueryError::from_sql_lowering_error)?;
807 let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
808 return Err(QueryError::invariant(
809 "INSERT SELECT source lowering must stay on the scalar SELECT query lane",
810 ));
811 };
812
813 let payload =
814 self.execute_lowered_sql_projection_core(select, EntityAuthority::for_type::<E>())?;
815 let (_, rows, _) = payload.into_parts();
816
817 Ok(rows)
818 }
819
820 fn execute_sql_insert_dispatch<E>(
823 &self,
824 statement: &SqlInsertStatement,
825 ) -> Result<SqlDispatchResult, QueryError>
826 where
827 E: PersistedRow<Canister = C> + EntityValue,
828 {
829 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
830 let columns = sql_insert_columns::<E>(statement);
831 validate_sql_insert_required_fields::<E>(columns.as_slice())?;
832 let source_rows = match &statement.source {
833 SqlInsertSource::Values(values) => {
834 validate_sql_insert_value_tuple_lengths(columns.as_slice(), values.as_slice())?;
835 values.clone()
836 }
837 SqlInsertSource::Select(_) => {
838 let source = Self::sql_insert_select_source_statement::<E>(statement)?;
839 let rows = self.execute_sql_insert_select_source_rows::<E>(&source)?;
840 validate_sql_insert_selected_rows(columns.as_slice(), rows.as_slice())?;
841
842 rows
843 }
844 };
845 let mut entities = Vec::with_capacity(source_rows.len());
846
847 for values in &source_rows {
848 let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
849 let entity = self
850 .mutate_structural::<E>(key, patch, MutationMode::Insert)
851 .map_err(QueryError::execute)?;
852 entities.push(entity);
853 }
854
855 Self::sql_write_dispatch_projection(entities)
856 }
857
858 fn execute_sql_update_dispatch<E>(
862 &self,
863 statement: &SqlUpdateStatement,
864 ) -> Result<SqlDispatchResult, QueryError>
865 where
866 E: PersistedRow<Canister = C> + EntityValue,
867 {
868 ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
869 let selector = Self::sql_update_selector_query::<E>(statement)?;
870 let patch = Self::sql_update_patch::<E>(statement)?;
871 let matched = self.execute_query(&selector)?;
872 let mut entities = Vec::with_capacity(matched.len());
873
874 for entity in matched.entities() {
877 let updated = self
878 .mutate_structural::<E>(entity.id().key(), patch.clone(), MutationMode::Update)
879 .map_err(QueryError::execute)?;
880 entities.push(updated);
881 }
882
883 Self::sql_write_dispatch_projection(entities)
884 }
885
886 fn prepare_structural_sql_projection_execution(
889 &self,
890 query: StructuralQuery,
891 authority: EntityAuthority,
892 ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
893 let (_, plan) =
896 self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
897 let projection = plan.projection_spec(authority.model());
898 let columns = projection_labels_from_projection_spec(&projection);
899
900 Ok((columns, plan))
901 }
902
903 pub(in crate::db::session::sql) fn execute_structural_sql_projection(
907 &self,
908 query: StructuralQuery,
909 authority: EntityAuthority,
910 ) -> Result<SqlProjectionPayload, QueryError> {
911 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
913
914 let projected =
917 execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
918 .map_err(QueryError::execute)?;
919 let (rows, row_count) = projected.into_parts();
920
921 Ok(SqlProjectionPayload::new(columns, rows, row_count))
922 }
923
924 fn execute_structural_sql_projection_text(
928 &self,
929 query: StructuralQuery,
930 authority: EntityAuthority,
931 ) -> Result<SqlDispatchResult, QueryError> {
932 let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
934
935 let projected =
938 execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
939 .map_err(QueryError::execute)?;
940 let (rows, row_count) = projected.into_parts();
941
942 Ok(SqlDispatchResult::ProjectionText {
943 columns,
944 rows,
945 row_count,
946 })
947 }
948
949 fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
953 where
954 E: PersistedRow<Canister = C> + EntityValue,
955 {
956 let plan = self
957 .compile_query_with_visible_indexes(query)?
958 .into_prepared_execution_plan();
959 let deleted = self
960 .with_metrics(|| {
961 self.delete_executor::<E>()
962 .execute_structural_projection(plan)
963 })
964 .map_err(QueryError::execute)?;
965 let (rows, row_count) = deleted.into_parts();
966 let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
967
968 Ok(SqlProjectionPayload::new(
969 projection_labels_from_fields(E::MODEL.fields()),
970 rows,
971 row_count,
972 )
973 .into_dispatch_result())
974 }
975
976 fn lowered_sql_query_dispatch_inputs_for_authority(
979 parsed: &SqlParsedStatement,
980 authority: EntityAuthority,
981 unsupported_message: &'static str,
982 ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
983 let lowered = parsed.lower_query_lane_for_entity(
984 authority.model().name(),
985 authority.model().primary_key.name,
986 )?;
987 let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
988 .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
989 .transpose()?;
990 let query = lowered
991 .into_query()
992 .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
993
994 Ok((query, projection_columns.flatten()))
995 }
996
997 fn dispatch_sql_query_route_for_authority(
1001 &self,
1002 parsed: &SqlParsedStatement,
1003 authority: EntityAuthority,
1004 unsupported_message: &'static str,
1005 dispatch_select: impl FnOnce(
1006 &Self,
1007 LoweredSelectShape,
1008 EntityAuthority,
1009 bool,
1010 Option<Vec<String>>,
1011 ) -> Result<SqlDispatchResult, QueryError>,
1012 dispatch_delete: impl FnOnce(
1013 &Self,
1014 LoweredBaseQueryShape,
1015 EntityAuthority,
1016 ) -> Result<SqlDispatchResult, QueryError>,
1017 ) -> Result<SqlDispatchResult, QueryError> {
1018 if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
1021 let command =
1022 Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
1023
1024 return self.execute_sql_aggregate_dispatch_for_authority(
1025 command,
1026 authority,
1027 sql_aggregate_dispatch_label_override(&parsed.statement),
1028 );
1029 }
1030
1031 if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
1032 return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
1033 }
1034
1035 let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
1038 parsed,
1039 authority,
1040 unsupported_message,
1041 )?;
1042 let grouped_surface = query.has_grouping();
1043
1044 match query {
1045 LoweredSqlQuery::Select(select) => {
1046 dispatch_select(self, select, authority, grouped_surface, projection_columns)
1047 }
1048 LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
1049 }
1050 }
1051
1052 fn dispatch_sql_explain_route_for_authority(
1056 &self,
1057 parsed: &SqlParsedStatement,
1058 authority: EntityAuthority,
1059 ) -> Result<SqlDispatchResult, QueryError> {
1060 if let Some((mode, plan)) =
1063 computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
1064 {
1065 return self
1066 .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
1067 .map(SqlDispatchResult::Explain);
1068 }
1069
1070 let lowered = parsed.lower_query_lane_for_entity(
1073 authority.model().name(),
1074 authority.model().primary_key.name,
1075 )?;
1076 if let Some(explain) =
1077 self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
1078 {
1079 return Ok(SqlDispatchResult::Explain(explain));
1080 }
1081
1082 self.explain_lowered_sql_for_authority(&lowered, authority)
1083 .map(SqlDispatchResult::Explain)
1084 }
1085
1086 pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
1089 query: &Query<E>,
1090 surface: SqlGroupingSurface,
1091 ) -> Result<(), QueryError>
1092 where
1093 E: EntityKind,
1094 {
1095 match (surface, query.has_grouping()) {
1096 (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
1097 (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
1098 QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
1099 ),
1100 }
1101 }
1102
1103 pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
1105 where
1106 E: PersistedRow<Canister = C> + EntityValue,
1107 {
1108 let parsed = self.parse_sql_statement(sql)?;
1109
1110 self.execute_sql_dispatch_parsed::<E>(&parsed)
1111 }
1112
1113 pub fn execute_sql_dispatch_parsed<E>(
1115 &self,
1116 parsed: &SqlParsedStatement,
1117 ) -> Result<SqlDispatchResult, QueryError>
1118 where
1119 E: PersistedRow<Canister = C> + EntityValue,
1120 {
1121 match parsed.route() {
1122 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1123 parsed,
1124 EntityAuthority::for_type::<E>(),
1125 "execute_sql_dispatch accepts SELECT or DELETE only",
1126 |session, select, authority, grouped_surface, projection_columns| {
1127 if grouped_surface {
1128 let columns = projection_columns.ok_or_else(|| {
1129 QueryError::unsupported_query(
1130 "grouped SQL dispatch requires explicit grouped projection items",
1131 )
1132 })?;
1133
1134 return session.execute_lowered_sql_grouped_dispatch_select_core(
1135 select, authority, columns,
1136 );
1137 }
1138
1139 let payload = session.execute_lowered_sql_projection_core(select, authority)?;
1140 if let Some(columns) = projection_columns {
1141 let (_, rows, row_count) = payload.into_parts();
1142
1143 return Ok(SqlProjectionPayload::new(columns, rows, row_count)
1144 .into_dispatch_result());
1145 }
1146
1147 Ok(payload.into_dispatch_result())
1148 },
1149 |session, delete, _authority| {
1150 let typed_query = bind_lowered_sql_query::<E>(
1151 LoweredSqlQuery::Delete(delete),
1152 MissingRowPolicy::Ignore,
1153 )
1154 .map_err(QueryError::from_sql_lowering_error)?;
1155
1156 session.execute_typed_sql_delete(&typed_query)
1157 },
1158 ),
1159 SqlStatementRoute::Insert { .. } => {
1160 let SqlStatement::Insert(statement) = &parsed.statement else {
1161 return Err(QueryError::invariant(
1162 "INSERT SQL route must carry parsed INSERT statement",
1163 ));
1164 };
1165
1166 self.execute_sql_insert_dispatch::<E>(statement)
1167 }
1168 SqlStatementRoute::Update { .. } => {
1169 let SqlStatement::Update(statement) = &parsed.statement else {
1170 return Err(QueryError::invariant(
1171 "UPDATE SQL route must carry parsed UPDATE statement",
1172 ));
1173 };
1174
1175 self.execute_sql_update_dispatch::<E>(statement)
1176 }
1177 SqlStatementRoute::Explain { .. } => self
1178 .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
1179 SqlStatementRoute::Describe { .. } => {
1180 Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
1181 }
1182 SqlStatementRoute::ShowIndexes { .. } => {
1183 Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
1184 }
1185 SqlStatementRoute::ShowColumns { .. } => {
1186 Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
1187 }
1188 SqlStatementRoute::ShowEntities => {
1189 Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
1190 }
1191 }
1192 }
1193
1194 #[doc(hidden)]
1201 pub fn execute_generated_query_surface_dispatch_for_authority(
1202 &self,
1203 parsed: &SqlParsedStatement,
1204 authority: EntityAuthority,
1205 ) -> Result<SqlDispatchResult, QueryError> {
1206 match parsed.route() {
1207 SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1208 parsed,
1209 authority,
1210 "generated SQL query surface requires query or EXPLAIN statement lanes",
1211 |session, select, authority, grouped_surface, projection_columns| {
1212 if grouped_surface {
1213 let columns = projection_columns.ok_or_else(|| {
1214 QueryError::unsupported_query(
1215 "grouped SQL dispatch requires explicit grouped projection items",
1216 )
1217 })?;
1218
1219 return session
1220 .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
1221 }
1222
1223 let result =
1224 session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
1225 if let Some(columns) = projection_columns {
1226 let SqlDispatchResult::ProjectionText {
1227 rows, row_count, ..
1228 } = result
1229 else {
1230 return Err(QueryError::invariant(
1231 "generated scalar SQL dispatch text path must emit projection text rows",
1232 ));
1233 };
1234
1235 return Ok(SqlDispatchResult::ProjectionText {
1236 columns,
1237 rows,
1238 row_count,
1239 });
1240 }
1241
1242 Ok(result)
1243 },
1244 |session, delete, authority| {
1245 session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
1246 },
1247 ),
1248 SqlStatementRoute::Explain { .. } => {
1249 self.dispatch_sql_explain_route_for_authority(parsed, authority)
1250 }
1251 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
1252 | SqlStatementRoute::Describe { .. }
1253 | SqlStatementRoute::ShowIndexes { .. }
1254 | SqlStatementRoute::ShowColumns { .. }
1255 | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
1256 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1257 )),
1258 }
1259 }
1260
1261 #[doc(hidden)]
1267 #[must_use]
1268 pub fn execute_generated_query_surface_sql(
1269 &self,
1270 sql: &str,
1271 authorities: &[EntityAuthority],
1272 ) -> GeneratedSqlDispatchAttempt {
1273 let sql_trimmed = match trim_generated_query_sql_input(sql) {
1276 Ok(sql_trimmed) => sql_trimmed,
1277 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1278 };
1279 let parsed = match self.parse_sql_statement(sql_trimmed) {
1280 Ok(parsed) => parsed,
1281 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1282 };
1283
1284 if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
1287 return GeneratedSqlDispatchAttempt::new(
1288 "",
1289 None,
1290 Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
1291 authorities,
1292 ))),
1293 );
1294 }
1295 let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1296 Ok(authority) => authority,
1297 Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1298 };
1299
1300 let entity_name = authority.model().name();
1304 let explain_order_field = parsed
1305 .route()
1306 .is_explain()
1307 .then_some(authority.model().primary_key.name);
1308 let result = match parsed.route() {
1309 SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1310 self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1311 }
1312 SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1313 Err(QueryError::unsupported_query(
1314 "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1315 ))
1316 }
1317 SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1318 self.describe_entity_model(authority.model()),
1319 )),
1320 SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1321 self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1322 )),
1323 SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1324 self.show_columns_for_model(authority.model()),
1325 )),
1326 SqlStatementRoute::ShowEntities => unreachable!(
1327 "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1328 ),
1329 };
1330
1331 GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1332 }
1333}