Skip to main content

icydb_core/db/session/sql/dispatch/
mod.rs

1//! Module: db::session::sql::dispatch
2//! Responsibility: session-owned SQL dispatch entrypoints that bind lowered SQL
3//! commands onto structural planning, execution, and outward result shaping.
4//! Does not own: SQL parsing or executor runtime internals.
5//! Boundary: centralizes authority-aware SQL dispatch classification and result packaging.
6
7mod computed;
8mod lowered;
9
10use crate::{
11    db::{
12        DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13        data::UpdatePatch,
14        executor::{EntityAuthority, MutationMode},
15        identifiers_tail_match,
16        query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17        schema::{ValidateError, field_type_from_model_kind, literal_matches_type},
18        session::sql::{
19            SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
20            aggregate::parsed_requires_dedicated_sql_aggregate_lane,
21            computed_projection,
22            projection::{
23                SqlProjectionPayload, execute_sql_projection_rows_for_canister,
24                execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
25                projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
26            },
27        },
28        sql::lowering::{
29            LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
30            bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
31            lower_sql_command_from_prepared_statement, prepare_sql_statement,
32        },
33        sql::parser::{
34            SqlAggregateCall, SqlAggregateKind, SqlInsertSource, SqlInsertStatement,
35            SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlReturningProjection, SqlSelectItem,
36            SqlSelectStatement, SqlStatement, SqlTextFunction, SqlUpdateStatement,
37        },
38    },
39    model::{
40        entity::resolve_field_slot,
41        field::{FieldInsertGeneration, FieldKind, FieldModel},
42    },
43    sanitize::{SanitizeWriteContext, SanitizeWriteMode},
44    traits::{CanisterKind, EntityKind, EntityValue},
45    types::{Timestamp, Ulid},
46    value::Value,
47};
48
49#[cfg(feature = "perf-attribution")]
50pub use lowered::LoweredSqlDispatchExecutorAttribution;
51
52///
53/// GeneratedSqlDispatchAttempt
54///
55/// Hidden generated-query dispatch envelope used by the facade helper to keep
56/// generated route ownership in core while preserving the public EXPLAIN error
57/// rewrite contract at the outer boundary.
58///
59
60#[doc(hidden)]
61pub struct GeneratedSqlDispatchAttempt {
62    entity_name: &'static str,
63    explain_order_field: Option<&'static str>,
64    result: Result<SqlDispatchResult, QueryError>,
65}
66
67impl GeneratedSqlDispatchAttempt {
68    // Build one generated-query dispatch attempt with optional explain-hint context.
69    const fn new(
70        entity_name: &'static str,
71        explain_order_field: Option<&'static str>,
72        result: Result<SqlDispatchResult, QueryError>,
73    ) -> Self {
74        Self {
75            entity_name,
76            explain_order_field,
77            result,
78        }
79    }
80
81    /// Borrow the resolved entity name for this generated-query attempt.
82    #[must_use]
83    pub const fn entity_name(&self) -> &'static str {
84        self.entity_name
85    }
86
87    /// Borrow the suggested deterministic order field for EXPLAIN rewrites.
88    #[must_use]
89    pub const fn explain_order_field(&self) -> Option<&'static str> {
90        self.explain_order_field
91    }
92
93    /// Consume and return the generated-query dispatch result.
94    pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
95        self.result
96    }
97}
98
99#[derive(Clone, Copy, Debug, Eq, PartialEq)]
100pub(in crate::db::session::sql) enum SqlGroupingSurface {
101    Scalar,
102    Grouped,
103}
104
105const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
106    match surface {
107        SqlGroupingSurface::Scalar => {
108            "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
109        }
110        SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
111    }
112}
113
114// Enforce the generated canister query contract that empty SQL is unsupported
115// before any parser/lowering work occurs.
116fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
117    let sql_trimmed = sql.trim();
118    if sql_trimmed.is_empty() {
119        return Err(QueryError::unsupported_query(
120            "query endpoint requires a non-empty SQL string",
121        ));
122    }
123
124    Ok(sql_trimmed)
125}
126
127// Render the generated-surface entity list from the descriptor table instead
128// of assuming every session-visible entity belongs on the public query export.
129fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
130    let mut entities = Vec::with_capacity(authorities.len());
131
132    for authority in authorities {
133        entities.push(authority.model().name().to_string());
134    }
135
136    entities
137}
138
139// Project parsed SELECT items into one stable outward column contract while
140// allowing parser-owned aliases to override only the final session label.
141fn sql_projection_labels_from_select_statement(
142    statement: &SqlStatement,
143) -> Result<Option<Vec<String>>, QueryError> {
144    let SqlStatement::Select(select) = statement else {
145        return Err(QueryError::invariant(
146            "SQL projection labels require SELECT statement shape",
147        ));
148    };
149    let SqlProjection::Items(items) = &select.projection else {
150        return Ok(None);
151    };
152
153    Ok(Some(
154        items
155            .iter()
156            .enumerate()
157            .map(|(index, item)| {
158                select
159                    .projection_alias(index)
160                    .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
161            })
162            .collect(),
163    ))
164}
165
166// Render one grouped SELECT item into the public grouped-column label used by
167// unified dispatch results.
168fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
169    match item {
170        SqlSelectItem::Field(field) => field.clone(),
171        SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
172        SqlSelectItem::TextFunction(call) => {
173            format!(
174                "{}({})",
175                grouped_sql_text_function_name(call.function),
176                call.field
177            )
178        }
179    }
180}
181
182// Keep the dedicated SQL aggregate lane on parser-owned outward labels
183// without reopening alias semantics in lowering or runtime strategy state.
184fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
185    let SqlStatement::Select(select) = statement else {
186        return None;
187    };
188
189    select.projection_alias(0).map(str::to_string)
190}
191
192// Render one aggregate call into one canonical SQL-style label.
193fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
194    let kind = match aggregate.kind {
195        SqlAggregateKind::Count => "COUNT",
196        SqlAggregateKind::Sum => "SUM",
197        SqlAggregateKind::Avg => "AVG",
198        SqlAggregateKind::Min => "MIN",
199        SqlAggregateKind::Max => "MAX",
200    };
201
202    match aggregate.field.as_deref() {
203        Some(field) => format!("{kind}({field})"),
204        None => format!("{kind}(*)"),
205    }
206}
207
208// Render one reduced SQL text-function identifier into one stable uppercase
209// SQL label for outward column metadata.
210const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
211    match function {
212        SqlTextFunction::Trim => "TRIM",
213        SqlTextFunction::Ltrim => "LTRIM",
214        SqlTextFunction::Rtrim => "RTRIM",
215        SqlTextFunction::Lower => "LOWER",
216        SqlTextFunction::Upper => "UPPER",
217        SqlTextFunction::Length => "LENGTH",
218        SqlTextFunction::Left => "LEFT",
219        SqlTextFunction::Right => "RIGHT",
220        SqlTextFunction::StartsWith => "STARTS_WITH",
221        SqlTextFunction::EndsWith => "ENDS_WITH",
222        SqlTextFunction::Contains => "CONTAINS",
223        SqlTextFunction::Position => "POSITION",
224        SqlTextFunction::Replace => "REPLACE",
225        SqlTextFunction::Substring => "SUBSTRING",
226    }
227}
228
229// Resolve one generated query route onto the descriptor-owned authority table.
230fn authority_for_generated_sql_route(
231    route: &SqlStatementRoute,
232    authorities: &[EntityAuthority],
233) -> Result<EntityAuthority, QueryError> {
234    let sql_entity = route.entity();
235
236    for authority in authorities {
237        if identifiers_tail_match(sql_entity, authority.model().name()) {
238            return Ok(*authority);
239        }
240    }
241
242    Err(unsupported_generated_sql_entity_error(
243        sql_entity,
244        authorities,
245    ))
246}
247
248// Keep the generated query-surface unsupported-entity contract stable while
249// moving authority lookup out of the build-generated shim.
250fn unsupported_generated_sql_entity_error(
251    entity_name: &str,
252    authorities: &[EntityAuthority],
253) -> QueryError {
254    let mut supported = String::new();
255
256    for (index, authority) in authorities.iter().enumerate() {
257        if index != 0 {
258            supported.push_str(", ");
259        }
260
261        supported.push_str(authority.model().name());
262    }
263
264    QueryError::unsupported_query(format!(
265        "query endpoint does not support entity '{entity_name}'; supported: {supported}"
266    ))
267}
268
269// Keep typed SQL write routes on the same entity-match contract used by
270// lowered query dispatch, without widening write statements into lowering.
271fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
272where
273    E: EntityKind,
274{
275    if identifiers_tail_match(sql_entity, E::MODEL.name()) {
276        return Ok(());
277    }
278
279    Err(QueryError::from_sql_lowering_error(
280        SqlLoweringError::EntityMismatch {
281            sql_entity: sql_entity.to_string(),
282            expected_entity: E::MODEL.name(),
283        },
284    ))
285}
286
287// Normalize one reduced-SQL primary-key literal onto the concrete entity key
288// type accepted by the structural mutation entrypoint.
289fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
290where
291    E: EntityKind,
292{
293    if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
294        return Ok(key);
295    }
296
297    let widened = match value {
298        Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
299        Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
300        _ => {
301            return Err(QueryError::unsupported_query(format!(
302                "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
303            )));
304        }
305    };
306
307    <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
308        QueryError::unsupported_query(format!(
309            "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
310        ))
311    })
312}
313
314// Synthesize one generated SQL insert literal from the schema-owned runtime
315// field contract instead of hard-coding generation at the SQL boundary.
316fn sql_write_generated_field_value(field: &FieldModel) -> Option<Value> {
317    field
318        .insert_generation()
319        .map(|generation| match generation {
320            FieldInsertGeneration::Ulid => Value::Ulid(Ulid::generate()),
321            FieldInsertGeneration::Timestamp => Value::Timestamp(Timestamp::now()),
322        })
323}
324
325// Normalize one reduced-SQL write literal onto the target entity field kind
326// when the parser's numeric literal domain is narrower than the runtime field.
327fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
328where
329    E: EntityKind,
330{
331    let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
332        QueryError::invariant("SQL write field must resolve against the target entity model")
333    })?;
334    let field_kind = E::MODEL.fields()[field_slot].kind();
335
336    let normalized = match (field_kind, value) {
337        (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
338        (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
339            Value::Int(v.cast_signed())
340        }
341        _ => value.clone(),
342    };
343
344    let field_type = field_type_from_model_kind(&field_kind);
345    if !literal_matches_type(&normalized, &field_type) {
346        return Err(QueryError::unsupported_query(
347            ValidateError::invalid_literal(field_name, "literal type does not match field type")
348                .to_string(),
349        ));
350    }
351
352    Ok(normalized)
353}
354
355// Reject explicit user-authored writes to schema-managed fields so reduced SQL
356// does not silently accept values that the write boundary will overwrite.
357fn reject_explicit_sql_write_to_managed_field<E>(
358    field_name: &str,
359    statement_kind: &str,
360) -> Result<(), QueryError>
361where
362    E: EntityKind,
363{
364    let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
365        return Ok(());
366    };
367    let field = &E::MODEL.fields()[field_slot];
368
369    if field.write_management().is_some() {
370        return Err(QueryError::unsupported_query(format!(
371            "SQL {statement_kind} does not allow explicit writes to managed field '{field_name}' in this release"
372        )));
373    }
374
375    Ok(())
376}
377
378// Reject explicit user-authored INSERT values for schema-generated fields so
379// reduced SQL keeps generated insert ownership on the server side.
380fn reject_explicit_sql_insert_to_generated_field<E>(field_name: &str) -> Result<(), QueryError>
381where
382    E: EntityKind,
383{
384    let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
385        return Ok(());
386    };
387    let field = &E::MODEL.fields()[field_slot];
388
389    if field.insert_generation().is_some() {
390        return Err(QueryError::unsupported_query(format!(
391            "SQL INSERT does not allow explicit writes to generated field '{field_name}' in this release"
392        )));
393    }
394
395    Ok(())
396}
397
398// Reject explicit user-authored UPDATE assignments to insert-generated fields
399// so system-owned generation remains immutable after creation.
400fn reject_explicit_sql_update_to_generated_field<E>(field_name: &str) -> Result<(), QueryError>
401where
402    E: EntityKind,
403{
404    let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
405        return Ok(());
406    };
407    let field = &E::MODEL.fields()[field_slot];
408
409    if field.insert_generation().is_some() {
410        return Err(QueryError::unsupported_query(format!(
411            "SQL UPDATE does not allow explicit writes to generated field '{field_name}' in this release"
412        )));
413    }
414
415    Ok(())
416}
417
418// Determine whether one field may be omitted from reduced SQL INSERT because
419// the write lane owns its value synthesis contract.
420fn sql_insert_field_is_omittable(field: &FieldModel) -> bool {
421    if sql_write_generated_field_value(field).is_some() {
422        return true;
423    }
424
425    field.write_management().is_some()
426}
427
428// Reject explicit INSERT column lists that omit non-generated user fields so
429// reduced SQL does not silently consume typed-Rust defaults.
430fn validate_sql_insert_required_fields<E>(columns: &[String]) -> Result<(), QueryError>
431where
432    E: EntityKind,
433{
434    let missing_required_fields = E::MODEL
435        .fields()
436        .iter()
437        .filter(|field| !columns.iter().any(|column| column == field.name()))
438        .filter(|field| !sql_insert_field_is_omittable(field))
439        .map(FieldModel::name)
440        .collect::<Vec<_>>();
441
442    if missing_required_fields.is_empty() {
443        return Ok(());
444    }
445
446    if missing_required_fields.len() == 1
447        && missing_required_fields[0] == E::MODEL.primary_key.name()
448    {
449        return Err(QueryError::unsupported_query(format!(
450            "SQL INSERT requires primary key column '{}' in this release",
451            E::MODEL.primary_key.name()
452        )));
453    }
454
455    Err(QueryError::unsupported_query(format!(
456        "SQL INSERT requires explicit values for non-generated fields {} in this release",
457        missing_required_fields.join(", ")
458    )))
459}
460
461// Resolve the effective INSERT column list for one reduced SQL write:
462// explicit column lists pass through, while omitted-column-list INSERT uses
463// canonical user-authored model field order and leaves hidden timestamp
464// synthesis on the existing write path.
465fn sql_insert_source_width_hint<E>(source: &SqlInsertSource) -> Option<usize>
466where
467    E: EntityKind,
468{
469    match source {
470        SqlInsertSource::Values(values) => values.first().map(Vec::len),
471        SqlInsertSource::Select(select) => match &select.projection {
472            SqlProjection::All => Some(
473                E::MODEL
474                    .fields()
475                    .iter()
476                    .filter(|field| field.write_management().is_none())
477                    .count(),
478            ),
479            SqlProjection::Items(items) => Some(items.len()),
480        },
481    }
482}
483
484// Resolve the effective INSERT column list for one reduced SQL write:
485// explicit column lists pass through, while omitted-column-list INSERT uses
486// canonical user-authored model field order and leaves hidden timestamp
487// synthesis on the existing write path.
488fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
489where
490    E: EntityKind,
491{
492    if !statement.columns.is_empty() {
493        return statement.columns.clone();
494    }
495
496    let columns: Vec<String> = E::MODEL
497        .fields()
498        .iter()
499        .filter(|field| !sql_insert_field_is_omittable(field))
500        .map(|field| field.name().to_string())
501        .collect();
502    let full_columns: Vec<String> = E::MODEL
503        .fields()
504        .iter()
505        .filter(|field| field.write_management().is_none())
506        .map(|field| field.name().to_string())
507        .collect();
508    let first_width = sql_insert_source_width_hint::<E>(&statement.source);
509
510    if first_width == Some(columns.len()) {
511        return columns;
512    }
513
514    full_columns
515}
516
517// Validate one INSERT tuple list against the resolved effective column list so
518// every VALUES tuple stays full-width and deterministic.
519fn validate_sql_insert_value_tuple_lengths(
520    columns: &[String],
521    values: &[Vec<Value>],
522) -> Result<(), QueryError> {
523    for tuple in values {
524        if tuple.len() != columns.len() {
525            return Err(QueryError::from_sql_parse_error(
526                crate::db::sql::parser::SqlParseError::invalid_syntax(
527                    "INSERT column list and VALUES tuple length must match",
528                ),
529            ));
530        }
531    }
532
533    Ok(())
534}
535
536// Validate one projected `INSERT ... SELECT` row set against the resolved
537// effective column list so replayed structural inserts stay deterministic.
538fn validate_sql_insert_selected_rows(
539    columns: &[String],
540    rows: &[Vec<Value>],
541) -> Result<(), QueryError> {
542    for row in rows {
543        if row.len() != columns.len() {
544            return Err(QueryError::unsupported_query(
545                "SQL INSERT SELECT projection width must match the target INSERT column list in this release",
546            ));
547        }
548    }
549
550    Ok(())
551}
552
553impl<C: CanisterKind> DbSession<C> {
554    // Project one typed SQL write after-image into one outward SQL row using
555    // the persisted model field order.
556    fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
557    where
558        E: PersistedRow<Canister = C> + EntityValue,
559    {
560        let mut row = Vec::with_capacity(E::MODEL.fields().len());
561
562        for index in 0..E::MODEL.fields().len() {
563            let value = entity.get_value_by_index(index).ok_or_else(|| {
564                QueryError::invariant(
565                    "SQL write dispatch projection row must include every declared field",
566                )
567            })?;
568            row.push(value);
569        }
570
571        Ok(row)
572    }
573
574    // Render one or more typed entities returned by SQL write dispatch as one
575    // projection payload so write statements reuse the same outward result
576    // family as row-producing SELECT and DELETE dispatch.
577    fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
578    where
579        E: PersistedRow<Canister = C> + EntityValue,
580    {
581        let columns = projection_labels_from_fields(E::MODEL.fields());
582        let rows = entities
583            .into_iter()
584            .map(Self::sql_write_dispatch_row)
585            .collect::<Result<Vec<_>, _>>()?;
586        let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
587
588        Ok(SqlDispatchResult::Projection {
589            columns,
590            rows,
591            row_count,
592        })
593    }
594
595    // Package one write after-image batch according to the traditional SQL
596    // mutation result contract: count-only when no `RETURNING` clause is
597    // present, row payload only when the authored SQL explicitly asks for it.
598    fn sql_write_dispatch_result<E>(
599        entities: Vec<E>,
600        returning: Option<&SqlReturningProjection>,
601    ) -> Result<SqlDispatchResult, QueryError>
602    where
603        E: PersistedRow<Canister = C> + EntityValue,
604    {
605        let row_count = u32::try_from(entities.len()).unwrap_or(u32::MAX);
606
607        match returning {
608            None => Ok(SqlDispatchResult::Count { row_count }),
609            Some(returning) => {
610                let SqlDispatchResult::Projection {
611                    columns,
612                    rows,
613                    row_count,
614                } = Self::sql_write_dispatch_projection(entities)?
615                else {
616                    return Err(QueryError::invariant(
617                        "SQL write projection helper must emit value-row projection payload",
618                    ));
619                };
620
621                Self::sql_returning_dispatch_projection(columns, rows, row_count, returning)
622            }
623        }
624    }
625
626    // Narrow one row-producing write payload down to the admitted `RETURNING`
627    // projection shape so write dispatch can keep explicit field-list
628    // ownership without reopening the full scalar SELECT projection surface.
629    fn sql_returning_dispatch_projection(
630        columns: Vec<String>,
631        rows: Vec<Vec<Value>>,
632        row_count: u32,
633        returning: &SqlReturningProjection,
634    ) -> Result<SqlDispatchResult, QueryError> {
635        match returning {
636            SqlReturningProjection::All => Ok(SqlDispatchResult::Projection {
637                columns,
638                rows,
639                row_count,
640            }),
641            SqlReturningProjection::Fields(fields) => {
642                let mut indices = Vec::with_capacity(fields.len());
643
644                for field in fields {
645                    let index = columns
646                        .iter()
647                        .position(|column| column == field)
648                        .ok_or_else(|| {
649                            QueryError::unsupported_query(format!(
650                                "SQL RETURNING field '{field}' does not exist on the target entity"
651                            ))
652                        })?;
653                    indices.push(index);
654                }
655
656                let mut projected_rows = Vec::with_capacity(rows.len());
657                for row in rows {
658                    let mut projected = Vec::with_capacity(indices.len());
659                    for index in &indices {
660                        let value = row.get(*index).ok_or_else(|| {
661                            QueryError::invariant(
662                                "SQL RETURNING projection row must align with declared columns",
663                            )
664                        })?;
665                        projected.push(value.clone());
666                    }
667                    projected_rows.push(projected);
668                }
669
670                Ok(SqlDispatchResult::Projection {
671                    columns: fields.clone(),
672                    rows: projected_rows,
673                    row_count,
674                })
675            }
676        }
677    }
678
679    // Build the structural insert patch and resolved primary key expected by
680    // the shared structural mutation entrypoint.
681    fn sql_insert_patch_and_key<E>(
682        columns: &[String],
683        values: &[Value],
684    ) -> Result<(E::Key, UpdatePatch), QueryError>
685    where
686        E: PersistedRow<Canister = C> + EntityValue,
687    {
688        // Phase 1: resolve the required primary-key literal from the explicit
689        // INSERT column/value list, or synthesize one schema-generated value
690        // when the target field contract admits omission on insert.
691        let pk_name = E::MODEL.primary_key.name;
692        let generated_fields = E::MODEL
693            .fields()
694            .iter()
695            .filter(|field| !columns.iter().any(|column| column == field.name()))
696            .filter_map(|field| {
697                sql_write_generated_field_value(field).map(|value| (field.name(), value))
698            })
699            .collect::<Vec<_>>();
700        let key = if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
701            let pk_value = values.get(pk_index).ok_or_else(|| {
702                QueryError::invariant(
703                    "INSERT primary key column must align with one VALUES literal",
704                )
705            })?;
706            sql_write_key_from_literal::<E>(pk_value, pk_name)?
707        } else if let Some((_, pk_value)) = generated_fields
708            .iter()
709            .find(|(field_name, _)| *field_name == pk_name)
710        {
711            sql_write_key_from_literal::<E>(pk_value, pk_name)?
712        } else {
713            return Err(QueryError::unsupported_query(format!(
714                "SQL INSERT requires primary key column '{pk_name}' in this release"
715            )));
716        };
717
718        // Phase 2: lower the explicit column/value pairs onto the structural
719        // patch program consumed by the shared save path.
720        let mut patch = UpdatePatch::new();
721        for (field_name, generated_value) in &generated_fields {
722            patch = patch
723                .set_field(E::MODEL, field_name, generated_value.clone())
724                .map_err(QueryError::execute)?;
725        }
726        for (field, value) in columns.iter().zip(values.iter()) {
727            reject_explicit_sql_insert_to_generated_field::<E>(field)?;
728            reject_explicit_sql_write_to_managed_field::<E>(field, "INSERT")?;
729            let normalized = sql_write_value_for_field::<E>(field, value)?;
730            patch = patch
731                .set_field(E::MODEL, field, normalized)
732                .map_err(QueryError::execute)?;
733        }
734
735        Ok((key, patch))
736    }
737
738    // Build the structural update patch shared by every row selected by one
739    // reduced SQL UPDATE statement.
740    fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
741    where
742        E: PersistedRow<Canister = C> + EntityValue,
743    {
744        // Phase 1: lower the `SET` list onto the structural patch program
745        // while keeping primary-key mutation out of the reduced SQL write lane.
746        let pk_name = E::MODEL.primary_key.name;
747        let mut patch = UpdatePatch::new();
748        for assignment in &statement.assignments {
749            if assignment.field == pk_name {
750                return Err(QueryError::unsupported_query(format!(
751                    "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
752                )));
753            }
754            reject_explicit_sql_update_to_generated_field::<E>(assignment.field.as_str())?;
755            reject_explicit_sql_write_to_managed_field::<E>(assignment.field.as_str(), "UPDATE")?;
756            let normalized =
757                sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
758
759            patch = patch
760                .set_field(E::MODEL, assignment.field.as_str(), normalized)
761                .map_err(QueryError::execute)?;
762        }
763
764        Ok(patch)
765    }
766
767    // Resolve one deterministic typed selector query for reduced SQL UPDATE.
768    fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
769    where
770        E: PersistedRow<Canister = C> + EntityValue,
771    {
772        // Phase 1: keep the widened SQL UPDATE lane explicit about requiring
773        // one admitted reduced predicate instead of opening bare full-table
774        // updates implicitly.
775        let Some(predicate) = statement.predicate.clone() else {
776            return Err(QueryError::unsupported_query(
777                "SQL UPDATE requires WHERE predicate in this release",
778            ));
779        };
780        let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
781        let pk_name = E::MODEL.primary_key.name;
782        let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
783
784        // Phase 2: honor one explicit ordered update window when present, and
785        // otherwise keep the write target set deterministic on primary-key
786        // order exactly as the earlier predicate-only update lane did.
787        if statement.order_by.is_empty() {
788            selector = selector.order_by(pk_name);
789        } else {
790            let mut orders_primary_key = false;
791
792            for term in &statement.order_by {
793                if term.field == pk_name {
794                    orders_primary_key = true;
795                }
796                selector = match term.direction {
797                    SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
798                    SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
799                };
800            }
801
802            if !orders_primary_key {
803                selector = selector.order_by(pk_name);
804            }
805        }
806
807        // Phase 3: apply the bounded update window on top of the deterministic
808        // selector order before mutation replay begins.
809        if let Some(limit) = statement.limit {
810            selector = selector.limit(limit);
811        }
812        if let Some(offset) = statement.offset {
813            selector = selector.offset(offset);
814        }
815
816        Ok(selector)
817    }
818
819    // Validate and normalize the admitted `INSERT ... SELECT` source shape
820    // without widening the write lane into grouped, aggregate, or computed
821    // projection ownership.
822    fn sql_insert_select_source_statement<E>(
823        statement: &SqlInsertStatement,
824    ) -> Result<SqlSelectStatement, QueryError>
825    where
826        E: PersistedRow<Canister = C> + EntityValue,
827    {
828        let SqlInsertSource::Select(select) = statement.source.clone() else {
829            return Err(QueryError::invariant(
830                "INSERT SELECT source validation requires parsed SELECT source",
831            ));
832        };
833        let mut select = *select;
834        ensure_sql_write_entity_matches::<E>(select.entity.as_str())?;
835
836        if !select.group_by.is_empty() || !select.having.is_empty() {
837            return Err(QueryError::unsupported_query(
838                "SQL INSERT SELECT requires scalar SELECT source in this release",
839            ));
840        }
841
842        if let SqlProjection::Items(items) = &select.projection {
843            for item in items {
844                if matches!(item, SqlSelectItem::Aggregate(_)) {
845                    return Err(QueryError::unsupported_query(
846                        "SQL INSERT SELECT does not support aggregate source projection in this release",
847                    ));
848                }
849            }
850        }
851
852        let pk_name = E::MODEL.primary_key.name;
853        if select.order_by.is_empty() || !select.order_by.iter().any(|term| term.field == pk_name) {
854            select.order_by.push(SqlOrderTerm {
855                field: pk_name.to_string(),
856                direction: SqlOrderDirection::Asc,
857            });
858        }
859
860        Ok(select)
861    }
862
863    // Execute one admitted `INSERT ... SELECT` source query through the
864    // existing scalar SQL projection lane and return the projected value rows
865    // that will later feed the ordinary structural insert replay.
866    fn execute_sql_insert_select_source_rows<E>(
867        &self,
868        source: &SqlSelectStatement,
869    ) -> Result<Vec<Vec<Value>>, QueryError>
870    where
871        E: PersistedRow<Canister = C> + EntityValue,
872    {
873        // Phase 1: reuse the already-shipped scalar computed-projection lane
874        // when the source SELECT widens beyond plain fields but still fits the
875        // admitted session-owned text projection contract.
876        if let Some(plan) = computed_projection::computed_sql_projection_plan(
877            &SqlStatement::Select(source.clone()),
878        )? {
879            let result = self.execute_computed_sql_projection_dispatch_for_authority(
880                plan,
881                EntityAuthority::for_type::<E>(),
882            )?;
883
884            return match result {
885                SqlDispatchResult::Projection { rows, .. } => Ok(rows),
886                other => Err(QueryError::invariant(format!(
887                    "INSERT SELECT computed source must produce projection rows, found {other:?}",
888                ))),
889            };
890        }
891
892        // Phase 2: keep the ordinary field-only source path on the shared
893        // scalar SQL projection lane.
894        let prepared = prepare_sql_statement(SqlStatement::Select(source.clone()), E::MODEL.name())
895            .map_err(QueryError::from_sql_lowering_error)?;
896        let lowered =
897            lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)
898                .map_err(QueryError::from_sql_lowering_error)?;
899        let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
900            return Err(QueryError::invariant(
901                "INSERT SELECT source lowering must stay on the scalar SELECT query lane",
902            ));
903        };
904
905        let payload =
906            self.execute_lowered_sql_projection_core(select, EntityAuthority::for_type::<E>())?;
907        let (_, rows, _) = payload.into_parts();
908
909        Ok(rows)
910    }
911
912    // Execute one narrow SQL INSERT statement through the existing structural
913    // mutation path and project the returned after-image as one SQL row.
914    fn execute_sql_insert_dispatch<E>(
915        &self,
916        statement: &SqlInsertStatement,
917    ) -> Result<SqlDispatchResult, QueryError>
918    where
919        E: PersistedRow<Canister = C> + EntityValue,
920    {
921        ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
922        let columns = sql_insert_columns::<E>(statement);
923        validate_sql_insert_required_fields::<E>(columns.as_slice())?;
924        let write_context = SanitizeWriteContext::new(SanitizeWriteMode::Insert, Timestamp::now());
925        let source_rows = match &statement.source {
926            SqlInsertSource::Values(values) => {
927                validate_sql_insert_value_tuple_lengths(columns.as_slice(), values.as_slice())?;
928                values.clone()
929            }
930            SqlInsertSource::Select(_) => {
931                let source = Self::sql_insert_select_source_statement::<E>(statement)?;
932                let rows = self.execute_sql_insert_select_source_rows::<E>(&source)?;
933                validate_sql_insert_selected_rows(columns.as_slice(), rows.as_slice())?;
934
935                rows
936            }
937        };
938        let mut entities = Vec::with_capacity(source_rows.len());
939
940        for values in &source_rows {
941            let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
942            let entity = self
943                .execute_save_entity::<E>(|save| {
944                    save.apply_internal_structural_mutation_with_write_context(
945                        MutationMode::Insert,
946                        key,
947                        patch,
948                        write_context,
949                    )
950                })
951                .map_err(QueryError::execute)?;
952            entities.push(entity);
953        }
954
955        Self::sql_write_dispatch_result(entities, statement.returning.as_ref())
956    }
957
958    // Execute one reduced SQL UPDATE statement by selecting deterministic
959    // target rows first and then replaying one shared structural patch onto
960    // each matched primary key.
961    fn execute_sql_update_dispatch<E>(
962        &self,
963        statement: &SqlUpdateStatement,
964    ) -> Result<SqlDispatchResult, QueryError>
965    where
966        E: PersistedRow<Canister = C> + EntityValue,
967    {
968        ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
969        let selector = Self::sql_update_selector_query::<E>(statement)?;
970        let patch = Self::sql_update_patch::<E>(statement)?;
971        let write_context = SanitizeWriteContext::new(SanitizeWriteMode::Update, Timestamp::now());
972        let matched = self.execute_query(&selector)?;
973        let mut entities = Vec::with_capacity(matched.len());
974
975        // Phase 1: apply the already-normalized structural patch to every
976        // matched row in deterministic primary-key order.
977        for entity in matched.entities() {
978            let updated = self
979                .execute_save_entity::<E>(|save| {
980                    save.apply_internal_structural_mutation_with_write_context(
981                        MutationMode::Update,
982                        entity.id().key(),
983                        patch.clone(),
984                        write_context,
985                    )
986                })
987                .map_err(QueryError::execute)?;
988            entities.push(updated);
989        }
990
991        Self::sql_write_dispatch_result(entities, statement.returning.as_ref())
992    }
993
994    // Build the shared structural SQL projection execution inputs once so
995    // value-row and rendered-row dispatch surfaces only differ in final packaging.
996    fn prepare_structural_sql_projection_execution(
997        &self,
998        query: StructuralQuery,
999        authority: EntityAuthority,
1000    ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
1001        // Phase 1: build the structural access plan once and freeze its outward
1002        // column contract for all projection materialization surfaces.
1003        let (_, plan) =
1004            self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
1005        let projection = plan.projection_spec(authority.model());
1006        let columns = projection_labels_from_projection_spec(&projection);
1007
1008        Ok((columns, plan))
1009    }
1010
1011    // Execute one structural SQL load query and return only row-oriented SQL
1012    // projection values, keeping typed projection rows out of the shared SQL
1013    // query-lane path.
1014    pub(in crate::db::session::sql) fn execute_structural_sql_projection(
1015        &self,
1016        query: StructuralQuery,
1017        authority: EntityAuthority,
1018    ) -> Result<SqlProjectionPayload, QueryError> {
1019        // Phase 1: build the shared structural plan and outward column contract once.
1020        let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
1021
1022        // Phase 2: execute the shared structural load path with the already
1023        // derived projection semantics.
1024        let projected =
1025            execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
1026                .map_err(QueryError::execute)?;
1027        let (rows, row_count) = projected.into_parts();
1028
1029        Ok(SqlProjectionPayload::new(columns, rows, row_count))
1030    }
1031
1032    // Execute one structural SQL load query and return render-ready text rows
1033    // for the dispatch lane when the terminal short path can prove them
1034    // directly.
1035    fn execute_structural_sql_projection_text(
1036        &self,
1037        query: StructuralQuery,
1038        authority: EntityAuthority,
1039    ) -> Result<SqlDispatchResult, QueryError> {
1040        // Phase 1: build the shared structural plan and outward column contract once.
1041        let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
1042
1043        // Phase 2: execute the shared structural load path with the already
1044        // derived projection semantics while preferring rendered SQL rows.
1045        let projected =
1046            execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
1047                .map_err(QueryError::execute)?;
1048        let (rows, row_count) = projected.into_parts();
1049
1050        Ok(SqlDispatchResult::ProjectionText {
1051            columns,
1052            rows,
1053            row_count,
1054        })
1055    }
1056
1057    // Execute one typed SQL delete query while keeping the row payload on the
1058    // typed delete executor boundary that still owns non-runtime-hook delete
1059    // commit-window application.
1060    fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
1061    where
1062        E: PersistedRow<Canister = C> + EntityValue,
1063    {
1064        let plan = self
1065            .compile_query_with_visible_indexes(query)?
1066            .into_prepared_execution_plan();
1067        let deleted = self
1068            .with_metrics(|| {
1069                self.delete_executor::<E>()
1070                    .execute_structural_projection(plan)
1071            })
1072            .map_err(QueryError::execute)?;
1073        let (rows, row_count) = deleted.into_parts();
1074        let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
1075
1076        Ok(SqlProjectionPayload::new(
1077            projection_labels_from_fields(E::MODEL.fields()),
1078            rows,
1079            row_count,
1080        )
1081        .into_dispatch_result())
1082    }
1083
1084    // Execute one typed SQL delete query and return only the affected-row
1085    // count so bare dispatch DELETE matches traditional SQL result semantics.
1086    fn execute_typed_sql_delete_count<E>(
1087        &self,
1088        query: &Query<E>,
1089    ) -> Result<SqlDispatchResult, QueryError>
1090    where
1091        E: PersistedRow<Canister = C> + EntityValue,
1092    {
1093        let row_count = self.execute_delete_count(query)?;
1094
1095        Ok(SqlDispatchResult::Count { row_count })
1096    }
1097
1098    // Execute one typed SQL delete query and project only the explicit
1099    // `RETURNING` payload requested by the authored SQL surface.
1100    fn execute_typed_sql_delete_returning<E>(
1101        &self,
1102        query: &Query<E>,
1103        returning: &SqlReturningProjection,
1104    ) -> Result<SqlDispatchResult, QueryError>
1105    where
1106        E: PersistedRow<Canister = C> + EntityValue,
1107    {
1108        let SqlDispatchResult::Projection {
1109            columns,
1110            rows,
1111            row_count,
1112        } = self.execute_typed_sql_delete(query)?
1113        else {
1114            return Err(QueryError::invariant(
1115                "typed SQL delete projection path must emit value-row projection payload",
1116            ));
1117        };
1118
1119        Self::sql_returning_dispatch_projection(columns, rows, row_count, returning)
1120    }
1121
1122    // Lower one parsed SQL query/explain route once for one resolved authority
1123    // and preserve grouped-column metadata for grouped SELECT dispatch.
1124    fn lowered_sql_query_dispatch_inputs_for_authority(
1125        parsed: &SqlParsedStatement,
1126        authority: EntityAuthority,
1127        unsupported_message: &'static str,
1128    ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
1129        let lowered = parsed.lower_query_lane_for_entity(
1130            authority.model().name(),
1131            authority.model().primary_key.name,
1132        )?;
1133        let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
1134            .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
1135            .transpose()?;
1136        let query = lowered
1137            .into_query()
1138            .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
1139
1140        Ok((query, projection_columns.flatten()))
1141    }
1142
1143    // Execute one parsed SQL query route through the shared aggregate,
1144    // computed-projection, and lowered query lane so typed and generated
1145    // dispatch only differ at the final SELECT/DELETE packaging boundary.
1146    fn dispatch_sql_query_route_for_authority(
1147        &self,
1148        parsed: &SqlParsedStatement,
1149        authority: EntityAuthority,
1150        unsupported_message: &'static str,
1151        dispatch_select: impl FnOnce(
1152            &Self,
1153            LoweredSelectShape,
1154            EntityAuthority,
1155            bool,
1156            Option<Vec<String>>,
1157        ) -> Result<SqlDispatchResult, QueryError>,
1158        dispatch_delete: impl FnOnce(
1159            &Self,
1160            LoweredBaseQueryShape,
1161            EntityAuthority,
1162        ) -> Result<SqlDispatchResult, QueryError>,
1163    ) -> Result<SqlDispatchResult, QueryError> {
1164        // Phase 1: keep aggregate and computed projection classification on the
1165        // shared parsed route so both dispatch surfaces honor the same lane split.
1166        if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
1167            let command =
1168                Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
1169
1170            return self.execute_sql_aggregate_dispatch_for_authority(
1171                command,
1172                authority,
1173                sql_aggregate_dispatch_label_override(&parsed.statement),
1174            );
1175        }
1176
1177        if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
1178            return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
1179        }
1180
1181        // Phase 2: lower the remaining query route once, then let the caller
1182        // decide only the final outward result packaging.
1183        let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
1184            parsed,
1185            authority,
1186            unsupported_message,
1187        )?;
1188        let grouped_surface = query.has_grouping();
1189
1190        match query {
1191            LoweredSqlQuery::Select(select) => {
1192                dispatch_select(self, select, authority, grouped_surface, projection_columns)
1193            }
1194            LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
1195        }
1196    }
1197
1198    // Execute one parsed SQL EXPLAIN route through the shared computed-
1199    // projection and lowered explain lanes so typed and generated dispatch do
1200    // not duplicate the same explain classification tree.
1201    fn dispatch_sql_explain_route_for_authority(
1202        &self,
1203        parsed: &SqlParsedStatement,
1204        authority: EntityAuthority,
1205    ) -> Result<SqlDispatchResult, QueryError> {
1206        // Phase 1: keep computed-projection explain ownership on the same
1207        // parsed route boundary as the shared query lane.
1208        if let Some((mode, plan)) =
1209            computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
1210        {
1211            return self
1212                .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
1213                .map(SqlDispatchResult::Explain);
1214        }
1215
1216        // Phase 2: lower once for execution/logical explain and preserve the
1217        // shared execution-first fallback policy across both callers.
1218        let lowered = parsed.lower_query_lane_for_entity(
1219            authority.model().name(),
1220            authority.model().primary_key.name,
1221        )?;
1222        if let Some(explain) =
1223            self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
1224        {
1225            return Ok(SqlDispatchResult::Explain(explain));
1226        }
1227
1228        self.explain_lowered_sql_for_authority(&lowered, authority)
1229            .map(SqlDispatchResult::Explain)
1230    }
1231
1232    // Validate that one SQL-derived query intent matches the grouped/scalar
1233    // execution surface that is about to consume it.
1234    pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
1235        query: &Query<E>,
1236        surface: SqlGroupingSurface,
1237    ) -> Result<(), QueryError>
1238    where
1239        E: EntityKind,
1240    {
1241        match (surface, query.has_grouping()) {
1242            (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
1243            (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
1244                QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
1245            ),
1246        }
1247    }
1248
1249    /// Execute one reduced SQL statement into one unified SQL dispatch payload.
1250    pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
1251    where
1252        E: PersistedRow<Canister = C> + EntityValue,
1253    {
1254        let parsed = self.parse_sql_statement(sql)?;
1255
1256        self.execute_sql_dispatch_parsed::<E>(&parsed)
1257    }
1258
1259    /// Execute one parsed reduced SQL statement into one unified SQL payload.
1260    pub fn execute_sql_dispatch_parsed<E>(
1261        &self,
1262        parsed: &SqlParsedStatement,
1263    ) -> Result<SqlDispatchResult, QueryError>
1264    where
1265        E: PersistedRow<Canister = C> + EntityValue,
1266    {
1267        match parsed.route() {
1268            SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1269                parsed,
1270                EntityAuthority::for_type::<E>(),
1271                "execute_sql_dispatch accepts SELECT or DELETE only",
1272                |session, select, authority, grouped_surface, projection_columns| {
1273                    if grouped_surface {
1274                        let columns = projection_columns.ok_or_else(|| {
1275                            QueryError::unsupported_query(
1276                                "grouped SQL dispatch requires explicit grouped projection items",
1277                            )
1278                        })?;
1279
1280                        return session.execute_lowered_sql_grouped_dispatch_select_core(
1281                            select, authority, columns,
1282                        );
1283                    }
1284
1285                    let payload = session.execute_lowered_sql_projection_core(select, authority)?;
1286                    if let Some(columns) = projection_columns {
1287                        let (_, rows, row_count) = payload.into_parts();
1288
1289                        return Ok(SqlProjectionPayload::new(columns, rows, row_count)
1290                            .into_dispatch_result());
1291                    }
1292
1293                    Ok(payload.into_dispatch_result())
1294                },
1295                |session, delete, _authority| {
1296                    let SqlStatement::Delete(statement) = &parsed.statement else {
1297                        return Err(QueryError::invariant(
1298                            "DELETE SQL route must carry parsed DELETE statement",
1299                        ));
1300                    };
1301                    let typed_query = bind_lowered_sql_query::<E>(
1302                        LoweredSqlQuery::Delete(delete),
1303                        MissingRowPolicy::Ignore,
1304                    )
1305                    .map_err(QueryError::from_sql_lowering_error)?;
1306
1307                    match &statement.returning {
1308                        Some(returning) => {
1309                            session.execute_typed_sql_delete_returning(&typed_query, returning)
1310                        }
1311                        None => session.execute_typed_sql_delete_count(&typed_query),
1312                    }
1313                },
1314            ),
1315            SqlStatementRoute::Insert { .. } => {
1316                let SqlStatement::Insert(statement) = &parsed.statement else {
1317                    return Err(QueryError::invariant(
1318                        "INSERT SQL route must carry parsed INSERT statement",
1319                    ));
1320                };
1321
1322                self.execute_sql_insert_dispatch::<E>(statement)
1323            }
1324            SqlStatementRoute::Update { .. } => {
1325                let SqlStatement::Update(statement) = &parsed.statement else {
1326                    return Err(QueryError::invariant(
1327                        "UPDATE SQL route must carry parsed UPDATE statement",
1328                    ));
1329                };
1330
1331                self.execute_sql_update_dispatch::<E>(statement)
1332            }
1333            SqlStatementRoute::Explain { .. } => self
1334                .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
1335            SqlStatementRoute::Describe { .. } => {
1336                Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
1337            }
1338            SqlStatementRoute::ShowIndexes { .. } => {
1339                Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
1340            }
1341            SqlStatementRoute::ShowColumns { .. } => {
1342                Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
1343            }
1344            SqlStatementRoute::ShowEntities => {
1345                Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
1346            }
1347        }
1348    }
1349
1350    /// Execute one parsed reduced SQL statement through the generated canister
1351    /// query/explain surface for one already-resolved dynamic authority.
1352    ///
1353    /// This keeps the canister SQL facade on the same reduced SQL ownership
1354    /// boundary as typed dispatch without forcing the outer facade to reopen
1355    /// typed-generic routing just to preserve parity for computed projections.
1356    #[doc(hidden)]
1357    pub fn execute_generated_query_surface_dispatch_for_authority(
1358        &self,
1359        parsed: &SqlParsedStatement,
1360        authority: EntityAuthority,
1361    ) -> Result<SqlDispatchResult, QueryError> {
1362        match parsed.route() {
1363            SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1364                parsed,
1365                authority,
1366                "generated SQL query surface requires query or EXPLAIN statement lanes",
1367                |session, select, authority, grouped_surface, projection_columns| {
1368                    if grouped_surface {
1369                        let columns = projection_columns.ok_or_else(|| {
1370                            QueryError::unsupported_query(
1371                                "grouped SQL dispatch requires explicit grouped projection items",
1372                            )
1373                        })?;
1374
1375                        return session
1376                            .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
1377                    }
1378
1379                    let result =
1380                        session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
1381                    if let Some(columns) = projection_columns {
1382                        let SqlDispatchResult::ProjectionText {
1383                            rows, row_count, ..
1384                        } = result
1385                        else {
1386                            return Err(QueryError::invariant(
1387                                "generated scalar SQL dispatch text path must emit projection text rows",
1388                            ));
1389                        };
1390
1391                        return Ok(SqlDispatchResult::ProjectionText {
1392                            columns,
1393                            rows,
1394                            row_count,
1395                        });
1396                    }
1397
1398                    Ok(result)
1399                },
1400                |session, delete, authority| {
1401                    let SqlStatement::Delete(statement) = &parsed.statement else {
1402                        return Err(QueryError::invariant(
1403                            "DELETE SQL route must carry parsed DELETE statement",
1404                        ));
1405                    };
1406
1407                    match &statement.returning {
1408                        Some(returning) => {
1409                            let SqlDispatchResult::Projection {
1410                                columns,
1411                                rows,
1412                                row_count,
1413                            } = session.execute_lowered_sql_dispatch_delete_core(&delete, authority)?
1414                            else {
1415                                return Err(QueryError::invariant(
1416                                    "generated SQL delete projection path must emit value-row projection payload",
1417                                ));
1418                            };
1419
1420                            Self::sql_returning_dispatch_projection(
1421                                columns, rows, row_count, returning,
1422                            )
1423                        }
1424                        None => session
1425                            .execute_lowered_sql_delete_count_core(&delete, authority),
1426                    }
1427                },
1428            ),
1429            SqlStatementRoute::Explain { .. } => {
1430                self.dispatch_sql_explain_route_for_authority(parsed, authority)
1431            }
1432            SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
1433            | SqlStatementRoute::Describe { .. }
1434            | SqlStatementRoute::ShowIndexes { .. }
1435            | SqlStatementRoute::ShowColumns { .. }
1436            | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
1437                "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1438            )),
1439        }
1440    }
1441
1442    /// Execute one raw SQL string through the generated canister query surface.
1443    ///
1444    /// This hidden helper keeps parse, route, authority, and metadata/query
1445    /// dispatch ownership in core so the build-generated `sql_dispatch` shim
1446    /// stays close to a pure descriptor table plus public ABI wrapper.
1447    #[doc(hidden)]
1448    #[must_use]
1449    pub fn execute_generated_query_surface_sql(
1450        &self,
1451        sql: &str,
1452        authorities: &[EntityAuthority],
1453    ) -> GeneratedSqlDispatchAttempt {
1454        // Phase 1: normalize and parse once so every generated route family
1455        // shares the same SQL ownership boundary.
1456        let sql_trimmed = match trim_generated_query_sql_input(sql) {
1457            Ok(sql_trimmed) => sql_trimmed,
1458            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1459        };
1460        let parsed = match self.parse_sql_statement(sql_trimmed) {
1461            Ok(parsed) => parsed,
1462            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1463        };
1464
1465        // Phase 2: keep SHOW ENTITIES descriptor-owned and resolve all other
1466        // generated routes against the emitted authority table exactly once.
1467        if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
1468            return GeneratedSqlDispatchAttempt::new(
1469                "",
1470                None,
1471                Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
1472                    authorities,
1473                ))),
1474            );
1475        }
1476        let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1477            Ok(authority) => authority,
1478            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1479        };
1480
1481        // Phase 3: dispatch the resolved route through the existing query,
1482        // explain, and metadata helpers without rebuilding route ownership in
1483        // the generated build output.
1484        let entity_name = authority.model().name();
1485        let explain_order_field = parsed
1486            .route()
1487            .is_explain()
1488            .then_some(authority.model().primary_key.name);
1489        let result = match parsed.route() {
1490            SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1491                self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1492            }
1493            SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1494                Err(QueryError::unsupported_query(
1495                    "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1496                ))
1497            }
1498            SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1499                self.describe_entity_model(authority.model()),
1500            )),
1501            SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1502                self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1503            )),
1504            SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1505                self.show_columns_for_model(authority.model()),
1506            )),
1507            SqlStatementRoute::ShowEntities => unreachable!(
1508                "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1509            ),
1510        };
1511
1512        GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1513    }
1514}