Skip to main content

icydb_core/db/session/sql/dispatch/
mod.rs

1//! Module: db::session::sql::dispatch
2//! Responsibility: session-owned SQL dispatch entrypoints that bind lowered SQL
3//! commands onto structural planning, execution, and outward result shaping.
4//! Does not own: SQL parsing or executor runtime internals.
5//! Boundary: centralizes authority-aware SQL dispatch classification and result packaging.
6
7mod computed;
8mod lowered;
9
10use crate::{
11    db::{
12        DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13        data::UpdatePatch,
14        executor::{EntityAuthority, MutationMode},
15        identifiers_tail_match,
16        query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17        schema::{ValidateError, field_type_from_model_kind, literal_matches_type},
18        session::sql::{
19            SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
20            aggregate::parsed_requires_dedicated_sql_aggregate_lane,
21            computed_projection,
22            projection::{
23                SqlProjectionPayload, execute_sql_projection_rows_for_canister,
24                execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
25                projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
26            },
27        },
28        sql::lowering::{
29            LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
30            bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
31            lower_sql_command_from_prepared_statement, prepare_sql_statement,
32        },
33        sql::parser::{
34            SqlAggregateCall, SqlAggregateKind, SqlInsertSource, SqlInsertStatement,
35            SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlSelectItem, SqlSelectStatement,
36            SqlStatement, SqlTextFunction, SqlUpdateStatement,
37        },
38    },
39    model::{
40        entity::resolve_field_slot,
41        field::{FieldInsertGeneration, FieldKind, FieldModel},
42    },
43    sanitize::{SanitizeWriteContext, SanitizeWriteMode},
44    traits::{CanisterKind, EntityKind, EntityValue},
45    types::{Timestamp, Ulid},
46    value::Value,
47};
48
49#[cfg(feature = "perf-attribution")]
50pub use lowered::LoweredSqlDispatchExecutorAttribution;
51
52///
53/// GeneratedSqlDispatchAttempt
54///
55/// Hidden generated-query dispatch envelope used by the facade helper to keep
56/// generated route ownership in core while preserving the public EXPLAIN error
57/// rewrite contract at the outer boundary.
58///
59
60#[doc(hidden)]
61pub struct GeneratedSqlDispatchAttempt {
62    entity_name: &'static str,
63    explain_order_field: Option<&'static str>,
64    result: Result<SqlDispatchResult, QueryError>,
65}
66
67impl GeneratedSqlDispatchAttempt {
68    // Build one generated-query dispatch attempt with optional explain-hint context.
69    const fn new(
70        entity_name: &'static str,
71        explain_order_field: Option<&'static str>,
72        result: Result<SqlDispatchResult, QueryError>,
73    ) -> Self {
74        Self {
75            entity_name,
76            explain_order_field,
77            result,
78        }
79    }
80
81    /// Borrow the resolved entity name for this generated-query attempt.
82    #[must_use]
83    pub const fn entity_name(&self) -> &'static str {
84        self.entity_name
85    }
86
87    /// Borrow the suggested deterministic order field for EXPLAIN rewrites.
88    #[must_use]
89    pub const fn explain_order_field(&self) -> Option<&'static str> {
90        self.explain_order_field
91    }
92
93    /// Consume and return the generated-query dispatch result.
94    pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
95        self.result
96    }
97}
98
99#[derive(Clone, Copy, Debug, Eq, PartialEq)]
100pub(in crate::db::session::sql) enum SqlGroupingSurface {
101    Scalar,
102    Grouped,
103}
104
105const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
106    match surface {
107        SqlGroupingSurface::Scalar => {
108            "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
109        }
110        SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
111    }
112}
113
114// Enforce the generated canister query contract that empty SQL is unsupported
115// before any parser/lowering work occurs.
116fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
117    let sql_trimmed = sql.trim();
118    if sql_trimmed.is_empty() {
119        return Err(QueryError::unsupported_query(
120            "query endpoint requires a non-empty SQL string",
121        ));
122    }
123
124    Ok(sql_trimmed)
125}
126
127// Render the generated-surface entity list from the descriptor table instead
128// of assuming every session-visible entity belongs on the public query export.
129fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
130    let mut entities = Vec::with_capacity(authorities.len());
131
132    for authority in authorities {
133        entities.push(authority.model().name().to_string());
134    }
135
136    entities
137}
138
139// Project parsed SELECT items into one stable outward column contract while
140// allowing parser-owned aliases to override only the final session label.
141fn sql_projection_labels_from_select_statement(
142    statement: &SqlStatement,
143) -> Result<Option<Vec<String>>, QueryError> {
144    let SqlStatement::Select(select) = statement else {
145        return Err(QueryError::invariant(
146            "SQL projection labels require SELECT statement shape",
147        ));
148    };
149    let SqlProjection::Items(items) = &select.projection else {
150        return Ok(None);
151    };
152
153    Ok(Some(
154        items
155            .iter()
156            .enumerate()
157            .map(|(index, item)| {
158                select
159                    .projection_alias(index)
160                    .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
161            })
162            .collect(),
163    ))
164}
165
166// Render one grouped SELECT item into the public grouped-column label used by
167// unified dispatch results.
168fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
169    match item {
170        SqlSelectItem::Field(field) => field.clone(),
171        SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
172        SqlSelectItem::TextFunction(call) => {
173            format!(
174                "{}({})",
175                grouped_sql_text_function_name(call.function),
176                call.field
177            )
178        }
179    }
180}
181
182// Keep the dedicated SQL aggregate lane on parser-owned outward labels
183// without reopening alias semantics in lowering or runtime strategy state.
184fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
185    let SqlStatement::Select(select) = statement else {
186        return None;
187    };
188
189    select.projection_alias(0).map(str::to_string)
190}
191
192// Render one aggregate call into one canonical SQL-style label.
193fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
194    let kind = match aggregate.kind {
195        SqlAggregateKind::Count => "COUNT",
196        SqlAggregateKind::Sum => "SUM",
197        SqlAggregateKind::Avg => "AVG",
198        SqlAggregateKind::Min => "MIN",
199        SqlAggregateKind::Max => "MAX",
200    };
201
202    match aggregate.field.as_deref() {
203        Some(field) => format!("{kind}({field})"),
204        None => format!("{kind}(*)"),
205    }
206}
207
208// Render one reduced SQL text-function identifier into one stable uppercase
209// SQL label for outward column metadata.
210const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
211    match function {
212        SqlTextFunction::Trim => "TRIM",
213        SqlTextFunction::Ltrim => "LTRIM",
214        SqlTextFunction::Rtrim => "RTRIM",
215        SqlTextFunction::Lower => "LOWER",
216        SqlTextFunction::Upper => "UPPER",
217        SqlTextFunction::Length => "LENGTH",
218        SqlTextFunction::Left => "LEFT",
219        SqlTextFunction::Right => "RIGHT",
220        SqlTextFunction::StartsWith => "STARTS_WITH",
221        SqlTextFunction::EndsWith => "ENDS_WITH",
222        SqlTextFunction::Contains => "CONTAINS",
223        SqlTextFunction::Position => "POSITION",
224        SqlTextFunction::Replace => "REPLACE",
225        SqlTextFunction::Substring => "SUBSTRING",
226    }
227}
228
229// Resolve one generated query route onto the descriptor-owned authority table.
230fn authority_for_generated_sql_route(
231    route: &SqlStatementRoute,
232    authorities: &[EntityAuthority],
233) -> Result<EntityAuthority, QueryError> {
234    let sql_entity = route.entity();
235
236    for authority in authorities {
237        if identifiers_tail_match(sql_entity, authority.model().name()) {
238            return Ok(*authority);
239        }
240    }
241
242    Err(unsupported_generated_sql_entity_error(
243        sql_entity,
244        authorities,
245    ))
246}
247
248// Keep the generated query-surface unsupported-entity contract stable while
249// moving authority lookup out of the build-generated shim.
250fn unsupported_generated_sql_entity_error(
251    entity_name: &str,
252    authorities: &[EntityAuthority],
253) -> QueryError {
254    let mut supported = String::new();
255
256    for (index, authority) in authorities.iter().enumerate() {
257        if index != 0 {
258            supported.push_str(", ");
259        }
260
261        supported.push_str(authority.model().name());
262    }
263
264    QueryError::unsupported_query(format!(
265        "query endpoint does not support entity '{entity_name}'; supported: {supported}"
266    ))
267}
268
269// Keep typed SQL write routes on the same entity-match contract used by
270// lowered query dispatch, without widening write statements into lowering.
271fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
272where
273    E: EntityKind,
274{
275    if identifiers_tail_match(sql_entity, E::MODEL.name()) {
276        return Ok(());
277    }
278
279    Err(QueryError::from_sql_lowering_error(
280        SqlLoweringError::EntityMismatch {
281            sql_entity: sql_entity.to_string(),
282            expected_entity: E::MODEL.name(),
283        },
284    ))
285}
286
287// Normalize one reduced-SQL primary-key literal onto the concrete entity key
288// type accepted by the structural mutation entrypoint.
289fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
290where
291    E: EntityKind,
292{
293    if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
294        return Ok(key);
295    }
296
297    let widened = match value {
298        Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
299        Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
300        _ => {
301            return Err(QueryError::unsupported_query(format!(
302                "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
303            )));
304        }
305    };
306
307    <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
308        QueryError::unsupported_query(format!(
309            "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
310        ))
311    })
312}
313
314// Synthesize one generated SQL insert literal from the schema-owned runtime
315// field contract instead of hard-coding generation at the SQL boundary.
316fn sql_write_generated_field_value(field: &FieldModel) -> Option<Value> {
317    field
318        .insert_generation()
319        .map(|generation| match generation {
320            FieldInsertGeneration::Ulid => Value::Ulid(Ulid::generate()),
321            FieldInsertGeneration::Timestamp => Value::Timestamp(Timestamp::now()),
322        })
323}
324
325// Normalize one reduced-SQL write literal onto the target entity field kind
326// when the parser's numeric literal domain is narrower than the runtime field.
327fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
328where
329    E: EntityKind,
330{
331    let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
332        QueryError::invariant("SQL write field must resolve against the target entity model")
333    })?;
334    let field_kind = E::MODEL.fields()[field_slot].kind();
335
336    let normalized = match (field_kind, value) {
337        (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
338        (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
339            Value::Int(v.cast_signed())
340        }
341        _ => value.clone(),
342    };
343
344    let field_type = field_type_from_model_kind(&field_kind);
345    if !literal_matches_type(&normalized, &field_type) {
346        return Err(QueryError::unsupported_query(
347            ValidateError::invalid_literal(field_name, "literal type does not match field type")
348                .to_string(),
349        ));
350    }
351
352    Ok(normalized)
353}
354
355// Reject explicit user-authored writes to schema-managed fields so reduced SQL
356// does not silently accept values that the write boundary will overwrite.
357fn reject_explicit_sql_write_to_managed_field<E>(
358    field_name: &str,
359    statement_kind: &str,
360) -> Result<(), QueryError>
361where
362    E: EntityKind,
363{
364    let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
365        return Ok(());
366    };
367    let field = &E::MODEL.fields()[field_slot];
368
369    if field.write_management().is_some() {
370        return Err(QueryError::unsupported_query(format!(
371            "SQL {statement_kind} does not allow explicit writes to managed field '{field_name}' in this release"
372        )));
373    }
374
375    Ok(())
376}
377
378// Reject explicit user-authored INSERT values for schema-generated fields so
379// reduced SQL keeps generated insert ownership on the server side.
380fn reject_explicit_sql_insert_to_generated_field<E>(field_name: &str) -> Result<(), QueryError>
381where
382    E: EntityKind,
383{
384    let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
385        return Ok(());
386    };
387    let field = &E::MODEL.fields()[field_slot];
388
389    if field.insert_generation().is_some() {
390        return Err(QueryError::unsupported_query(format!(
391            "SQL INSERT does not allow explicit writes to generated field '{field_name}' in this release"
392        )));
393    }
394
395    Ok(())
396}
397
398// Reject explicit user-authored UPDATE assignments to insert-generated fields
399// so system-owned generation remains immutable after creation.
400fn reject_explicit_sql_update_to_generated_field<E>(field_name: &str) -> Result<(), QueryError>
401where
402    E: EntityKind,
403{
404    let Some(field_slot) = resolve_field_slot(E::MODEL, field_name) else {
405        return Ok(());
406    };
407    let field = &E::MODEL.fields()[field_slot];
408
409    if field.insert_generation().is_some() {
410        return Err(QueryError::unsupported_query(format!(
411            "SQL UPDATE does not allow explicit writes to generated field '{field_name}' in this release"
412        )));
413    }
414
415    Ok(())
416}
417
418// Determine whether one field may be omitted from reduced SQL INSERT because
419// the write lane owns its value synthesis contract.
420fn sql_insert_field_is_omittable(field: &FieldModel) -> bool {
421    if sql_write_generated_field_value(field).is_some() {
422        return true;
423    }
424
425    field.write_management().is_some()
426}
427
428// Reject explicit INSERT column lists that omit non-generated user fields so
429// reduced SQL does not silently consume typed-Rust defaults.
430fn validate_sql_insert_required_fields<E>(columns: &[String]) -> Result<(), QueryError>
431where
432    E: EntityKind,
433{
434    let missing_required_fields = E::MODEL
435        .fields()
436        .iter()
437        .filter(|field| !columns.iter().any(|column| column == field.name()))
438        .filter(|field| !sql_insert_field_is_omittable(field))
439        .map(FieldModel::name)
440        .collect::<Vec<_>>();
441
442    if missing_required_fields.is_empty() {
443        return Ok(());
444    }
445
446    if missing_required_fields.len() == 1
447        && missing_required_fields[0] == E::MODEL.primary_key.name()
448    {
449        return Err(QueryError::unsupported_query(format!(
450            "SQL INSERT requires primary key column '{}' in this release",
451            E::MODEL.primary_key.name()
452        )));
453    }
454
455    Err(QueryError::unsupported_query(format!(
456        "SQL INSERT requires explicit values for non-generated fields {} in this release",
457        missing_required_fields.join(", ")
458    )))
459}
460
461// Resolve the effective INSERT column list for one reduced SQL write:
462// explicit column lists pass through, while omitted-column-list INSERT uses
463// canonical user-authored model field order and leaves hidden timestamp
464// synthesis on the existing write path.
465fn sql_insert_source_width_hint<E>(source: &SqlInsertSource) -> Option<usize>
466where
467    E: EntityKind,
468{
469    match source {
470        SqlInsertSource::Values(values) => values.first().map(Vec::len),
471        SqlInsertSource::Select(select) => match &select.projection {
472            SqlProjection::All => Some(
473                E::MODEL
474                    .fields()
475                    .iter()
476                    .filter(|field| field.write_management().is_none())
477                    .count(),
478            ),
479            SqlProjection::Items(items) => Some(items.len()),
480        },
481    }
482}
483
484// Resolve the effective INSERT column list for one reduced SQL write:
485// explicit column lists pass through, while omitted-column-list INSERT uses
486// canonical user-authored model field order and leaves hidden timestamp
487// synthesis on the existing write path.
488fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
489where
490    E: EntityKind,
491{
492    if !statement.columns.is_empty() {
493        return statement.columns.clone();
494    }
495
496    let columns: Vec<String> = E::MODEL
497        .fields()
498        .iter()
499        .filter(|field| !sql_insert_field_is_omittable(field))
500        .map(|field| field.name().to_string())
501        .collect();
502    let full_columns: Vec<String> = E::MODEL
503        .fields()
504        .iter()
505        .filter(|field| field.write_management().is_none())
506        .map(|field| field.name().to_string())
507        .collect();
508    let first_width = sql_insert_source_width_hint::<E>(&statement.source);
509
510    if first_width == Some(columns.len()) {
511        return columns;
512    }
513
514    full_columns
515}
516
517// Validate one INSERT tuple list against the resolved effective column list so
518// every VALUES tuple stays full-width and deterministic.
519fn validate_sql_insert_value_tuple_lengths(
520    columns: &[String],
521    values: &[Vec<Value>],
522) -> Result<(), QueryError> {
523    for tuple in values {
524        if tuple.len() != columns.len() {
525            return Err(QueryError::from_sql_parse_error(
526                crate::db::sql::parser::SqlParseError::invalid_syntax(
527                    "INSERT column list and VALUES tuple length must match",
528                ),
529            ));
530        }
531    }
532
533    Ok(())
534}
535
536// Validate one projected `INSERT ... SELECT` row set against the resolved
537// effective column list so replayed structural inserts stay deterministic.
538fn validate_sql_insert_selected_rows(
539    columns: &[String],
540    rows: &[Vec<Value>],
541) -> Result<(), QueryError> {
542    for row in rows {
543        if row.len() != columns.len() {
544            return Err(QueryError::unsupported_query(
545                "SQL INSERT SELECT projection width must match the target INSERT column list in this release",
546            ));
547        }
548    }
549
550    Ok(())
551}
552
553impl<C: CanisterKind> DbSession<C> {
554    // Project one typed SQL write after-image into one outward SQL row using
555    // the persisted model field order.
556    fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
557    where
558        E: PersistedRow<Canister = C> + EntityValue,
559    {
560        let mut row = Vec::with_capacity(E::MODEL.fields().len());
561
562        for index in 0..E::MODEL.fields().len() {
563            let value = entity.get_value_by_index(index).ok_or_else(|| {
564                QueryError::invariant(
565                    "SQL write dispatch projection row must include every declared field",
566                )
567            })?;
568            row.push(value);
569        }
570
571        Ok(row)
572    }
573
574    // Render one or more typed entities returned by SQL write dispatch as one
575    // projection payload so write statements reuse the same outward result
576    // family as row-producing SELECT and DELETE dispatch.
577    fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
578    where
579        E: PersistedRow<Canister = C> + EntityValue,
580    {
581        let columns = projection_labels_from_fields(E::MODEL.fields());
582        let rows = entities
583            .into_iter()
584            .map(Self::sql_write_dispatch_row)
585            .collect::<Result<Vec<_>, _>>()?;
586        let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
587
588        Ok(SqlDispatchResult::Projection {
589            columns,
590            rows,
591            row_count,
592        })
593    }
594
595    // Build the structural insert patch and resolved primary key expected by
596    // the shared structural mutation entrypoint.
597    fn sql_insert_patch_and_key<E>(
598        columns: &[String],
599        values: &[Value],
600    ) -> Result<(E::Key, UpdatePatch), QueryError>
601    where
602        E: PersistedRow<Canister = C> + EntityValue,
603    {
604        // Phase 1: resolve the required primary-key literal from the explicit
605        // INSERT column/value list, or synthesize one schema-generated value
606        // when the target field contract admits omission on insert.
607        let pk_name = E::MODEL.primary_key.name;
608        let generated_fields = E::MODEL
609            .fields()
610            .iter()
611            .filter(|field| !columns.iter().any(|column| column == field.name()))
612            .filter_map(|field| {
613                sql_write_generated_field_value(field).map(|value| (field.name(), value))
614            })
615            .collect::<Vec<_>>();
616        let key = if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
617            let pk_value = values.get(pk_index).ok_or_else(|| {
618                QueryError::invariant(
619                    "INSERT primary key column must align with one VALUES literal",
620                )
621            })?;
622            sql_write_key_from_literal::<E>(pk_value, pk_name)?
623        } else if let Some((_, pk_value)) = generated_fields
624            .iter()
625            .find(|(field_name, _)| *field_name == pk_name)
626        {
627            sql_write_key_from_literal::<E>(pk_value, pk_name)?
628        } else {
629            return Err(QueryError::unsupported_query(format!(
630                "SQL INSERT requires primary key column '{pk_name}' in this release"
631            )));
632        };
633
634        // Phase 2: lower the explicit column/value pairs onto the structural
635        // patch program consumed by the shared save path.
636        let mut patch = UpdatePatch::new();
637        for (field_name, generated_value) in &generated_fields {
638            patch = patch
639                .set_field(E::MODEL, field_name, generated_value.clone())
640                .map_err(QueryError::execute)?;
641        }
642        for (field, value) in columns.iter().zip(values.iter()) {
643            reject_explicit_sql_insert_to_generated_field::<E>(field)?;
644            reject_explicit_sql_write_to_managed_field::<E>(field, "INSERT")?;
645            let normalized = sql_write_value_for_field::<E>(field, value)?;
646            patch = patch
647                .set_field(E::MODEL, field, normalized)
648                .map_err(QueryError::execute)?;
649        }
650
651        Ok((key, patch))
652    }
653
654    // Build the structural update patch shared by every row selected by one
655    // reduced SQL UPDATE statement.
656    fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
657    where
658        E: PersistedRow<Canister = C> + EntityValue,
659    {
660        // Phase 1: lower the `SET` list onto the structural patch program
661        // while keeping primary-key mutation out of the reduced SQL write lane.
662        let pk_name = E::MODEL.primary_key.name;
663        let mut patch = UpdatePatch::new();
664        for assignment in &statement.assignments {
665            if assignment.field == pk_name {
666                return Err(QueryError::unsupported_query(format!(
667                    "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
668                )));
669            }
670            reject_explicit_sql_update_to_generated_field::<E>(assignment.field.as_str())?;
671            reject_explicit_sql_write_to_managed_field::<E>(assignment.field.as_str(), "UPDATE")?;
672            let normalized =
673                sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
674
675            patch = patch
676                .set_field(E::MODEL, assignment.field.as_str(), normalized)
677                .map_err(QueryError::execute)?;
678        }
679
680        Ok(patch)
681    }
682
683    // Resolve one deterministic typed selector query for reduced SQL UPDATE.
684    fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
685    where
686        E: PersistedRow<Canister = C> + EntityValue,
687    {
688        // Phase 1: keep the widened SQL UPDATE lane explicit about requiring
689        // one admitted reduced predicate instead of opening bare full-table
690        // updates implicitly.
691        let Some(predicate) = statement.predicate.clone() else {
692            return Err(QueryError::unsupported_query(
693                "SQL UPDATE requires WHERE predicate in this release",
694            ));
695        };
696        let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
697        let pk_name = E::MODEL.primary_key.name;
698        let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
699
700        // Phase 2: honor one explicit ordered update window when present, and
701        // otherwise keep the write target set deterministic on primary-key
702        // order exactly as the earlier predicate-only update lane did.
703        if statement.order_by.is_empty() {
704            selector = selector.order_by(pk_name);
705        } else {
706            let mut orders_primary_key = false;
707
708            for term in &statement.order_by {
709                if term.field == pk_name {
710                    orders_primary_key = true;
711                }
712                selector = match term.direction {
713                    SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
714                    SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
715                };
716            }
717
718            if !orders_primary_key {
719                selector = selector.order_by(pk_name);
720            }
721        }
722
723        // Phase 3: apply the bounded update window on top of the deterministic
724        // selector order before mutation replay begins.
725        if let Some(limit) = statement.limit {
726            selector = selector.limit(limit);
727        }
728        if let Some(offset) = statement.offset {
729            selector = selector.offset(offset);
730        }
731
732        Ok(selector)
733    }
734
735    // Validate and normalize the admitted `INSERT ... SELECT` source shape
736    // without widening the write lane into grouped, aggregate, or computed
737    // projection ownership.
738    fn sql_insert_select_source_statement<E>(
739        statement: &SqlInsertStatement,
740    ) -> Result<SqlSelectStatement, QueryError>
741    where
742        E: PersistedRow<Canister = C> + EntityValue,
743    {
744        let SqlInsertSource::Select(select) = statement.source.clone() else {
745            return Err(QueryError::invariant(
746                "INSERT SELECT source validation requires parsed SELECT source",
747            ));
748        };
749        let mut select = *select;
750        ensure_sql_write_entity_matches::<E>(select.entity.as_str())?;
751
752        if !select.group_by.is_empty() || !select.having.is_empty() {
753            return Err(QueryError::unsupported_query(
754                "SQL INSERT SELECT requires scalar SELECT source in this release",
755            ));
756        }
757
758        if let SqlProjection::Items(items) = &select.projection {
759            for item in items {
760                if matches!(item, SqlSelectItem::Aggregate(_)) {
761                    return Err(QueryError::unsupported_query(
762                        "SQL INSERT SELECT does not support aggregate source projection in this release",
763                    ));
764                }
765            }
766        }
767
768        let pk_name = E::MODEL.primary_key.name;
769        if select.order_by.is_empty() || !select.order_by.iter().any(|term| term.field == pk_name) {
770            select.order_by.push(SqlOrderTerm {
771                field: pk_name.to_string(),
772                direction: SqlOrderDirection::Asc,
773            });
774        }
775
776        Ok(select)
777    }
778
779    // Execute one admitted `INSERT ... SELECT` source query through the
780    // existing scalar SQL projection lane and return the projected value rows
781    // that will later feed the ordinary structural insert replay.
782    fn execute_sql_insert_select_source_rows<E>(
783        &self,
784        source: &SqlSelectStatement,
785    ) -> Result<Vec<Vec<Value>>, QueryError>
786    where
787        E: PersistedRow<Canister = C> + EntityValue,
788    {
789        // Phase 1: reuse the already-shipped scalar computed-projection lane
790        // when the source SELECT widens beyond plain fields but still fits the
791        // admitted session-owned text projection contract.
792        if let Some(plan) = computed_projection::computed_sql_projection_plan(
793            &SqlStatement::Select(source.clone()),
794        )? {
795            let result = self.execute_computed_sql_projection_dispatch_for_authority(
796                plan,
797                EntityAuthority::for_type::<E>(),
798            )?;
799
800            return match result {
801                SqlDispatchResult::Projection { rows, .. } => Ok(rows),
802                other => Err(QueryError::invariant(format!(
803                    "INSERT SELECT computed source must produce projection rows, found {other:?}",
804                ))),
805            };
806        }
807
808        // Phase 2: keep the ordinary field-only source path on the shared
809        // scalar SQL projection lane.
810        let prepared = prepare_sql_statement(SqlStatement::Select(source.clone()), E::MODEL.name())
811            .map_err(QueryError::from_sql_lowering_error)?;
812        let lowered =
813            lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)
814                .map_err(QueryError::from_sql_lowering_error)?;
815        let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
816            return Err(QueryError::invariant(
817                "INSERT SELECT source lowering must stay on the scalar SELECT query lane",
818            ));
819        };
820
821        let payload =
822            self.execute_lowered_sql_projection_core(select, EntityAuthority::for_type::<E>())?;
823        let (_, rows, _) = payload.into_parts();
824
825        Ok(rows)
826    }
827
828    // Execute one narrow SQL INSERT statement through the existing structural
829    // mutation path and project the returned after-image as one SQL row.
830    fn execute_sql_insert_dispatch<E>(
831        &self,
832        statement: &SqlInsertStatement,
833    ) -> Result<SqlDispatchResult, QueryError>
834    where
835        E: PersistedRow<Canister = C> + EntityValue,
836    {
837        ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
838        let columns = sql_insert_columns::<E>(statement);
839        validate_sql_insert_required_fields::<E>(columns.as_slice())?;
840        let write_context = SanitizeWriteContext::new(SanitizeWriteMode::Insert, Timestamp::now());
841        let source_rows = match &statement.source {
842            SqlInsertSource::Values(values) => {
843                validate_sql_insert_value_tuple_lengths(columns.as_slice(), values.as_slice())?;
844                values.clone()
845            }
846            SqlInsertSource::Select(_) => {
847                let source = Self::sql_insert_select_source_statement::<E>(statement)?;
848                let rows = self.execute_sql_insert_select_source_rows::<E>(&source)?;
849                validate_sql_insert_selected_rows(columns.as_slice(), rows.as_slice())?;
850
851                rows
852            }
853        };
854        let mut entities = Vec::with_capacity(source_rows.len());
855
856        for values in &source_rows {
857            let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
858            let entity = self
859                .execute_save_entity::<E>(|save| {
860                    save.apply_internal_structural_mutation_with_write_context(
861                        MutationMode::Insert,
862                        key,
863                        patch,
864                        write_context,
865                    )
866                })
867                .map_err(QueryError::execute)?;
868            entities.push(entity);
869        }
870
871        Self::sql_write_dispatch_projection(entities)
872    }
873
874    // Execute one reduced SQL UPDATE statement by selecting deterministic
875    // target rows first and then replaying one shared structural patch onto
876    // each matched primary key.
877    fn execute_sql_update_dispatch<E>(
878        &self,
879        statement: &SqlUpdateStatement,
880    ) -> Result<SqlDispatchResult, QueryError>
881    where
882        E: PersistedRow<Canister = C> + EntityValue,
883    {
884        ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
885        let selector = Self::sql_update_selector_query::<E>(statement)?;
886        let patch = Self::sql_update_patch::<E>(statement)?;
887        let write_context = SanitizeWriteContext::new(SanitizeWriteMode::Update, Timestamp::now());
888        let matched = self.execute_query(&selector)?;
889        let mut entities = Vec::with_capacity(matched.len());
890
891        // Phase 1: apply the already-normalized structural patch to every
892        // matched row in deterministic primary-key order.
893        for entity in matched.entities() {
894            let updated = self
895                .execute_save_entity::<E>(|save| {
896                    save.apply_internal_structural_mutation_with_write_context(
897                        MutationMode::Update,
898                        entity.id().key(),
899                        patch.clone(),
900                        write_context,
901                    )
902                })
903                .map_err(QueryError::execute)?;
904            entities.push(updated);
905        }
906
907        Self::sql_write_dispatch_projection(entities)
908    }
909
910    // Build the shared structural SQL projection execution inputs once so
911    // value-row and rendered-row dispatch surfaces only differ in final packaging.
912    fn prepare_structural_sql_projection_execution(
913        &self,
914        query: StructuralQuery,
915        authority: EntityAuthority,
916    ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
917        // Phase 1: build the structural access plan once and freeze its outward
918        // column contract for all projection materialization surfaces.
919        let (_, plan) =
920            self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
921        let projection = plan.projection_spec(authority.model());
922        let columns = projection_labels_from_projection_spec(&projection);
923
924        Ok((columns, plan))
925    }
926
927    // Execute one structural SQL load query and return only row-oriented SQL
928    // projection values, keeping typed projection rows out of the shared SQL
929    // query-lane path.
930    pub(in crate::db::session::sql) fn execute_structural_sql_projection(
931        &self,
932        query: StructuralQuery,
933        authority: EntityAuthority,
934    ) -> Result<SqlProjectionPayload, QueryError> {
935        // Phase 1: build the shared structural plan and outward column contract once.
936        let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
937
938        // Phase 2: execute the shared structural load path with the already
939        // derived projection semantics.
940        let projected =
941            execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
942                .map_err(QueryError::execute)?;
943        let (rows, row_count) = projected.into_parts();
944
945        Ok(SqlProjectionPayload::new(columns, rows, row_count))
946    }
947
948    // Execute one structural SQL load query and return render-ready text rows
949    // for the dispatch lane when the terminal short path can prove them
950    // directly.
951    fn execute_structural_sql_projection_text(
952        &self,
953        query: StructuralQuery,
954        authority: EntityAuthority,
955    ) -> Result<SqlDispatchResult, QueryError> {
956        // Phase 1: build the shared structural plan and outward column contract once.
957        let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
958
959        // Phase 2: execute the shared structural load path with the already
960        // derived projection semantics while preferring rendered SQL rows.
961        let projected =
962            execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
963                .map_err(QueryError::execute)?;
964        let (rows, row_count) = projected.into_parts();
965
966        Ok(SqlDispatchResult::ProjectionText {
967            columns,
968            rows,
969            row_count,
970        })
971    }
972
973    // Execute one typed SQL delete query while keeping the row payload on the
974    // typed delete executor boundary that still owns non-runtime-hook delete
975    // commit-window application.
976    fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
977    where
978        E: PersistedRow<Canister = C> + EntityValue,
979    {
980        let plan = self
981            .compile_query_with_visible_indexes(query)?
982            .into_prepared_execution_plan();
983        let deleted = self
984            .with_metrics(|| {
985                self.delete_executor::<E>()
986                    .execute_structural_projection(plan)
987            })
988            .map_err(QueryError::execute)?;
989        let (rows, row_count) = deleted.into_parts();
990        let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
991
992        Ok(SqlProjectionPayload::new(
993            projection_labels_from_fields(E::MODEL.fields()),
994            rows,
995            row_count,
996        )
997        .into_dispatch_result())
998    }
999
1000    // Lower one parsed SQL query/explain route once for one resolved authority
1001    // and preserve grouped-column metadata for grouped SELECT dispatch.
1002    fn lowered_sql_query_dispatch_inputs_for_authority(
1003        parsed: &SqlParsedStatement,
1004        authority: EntityAuthority,
1005        unsupported_message: &'static str,
1006    ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
1007        let lowered = parsed.lower_query_lane_for_entity(
1008            authority.model().name(),
1009            authority.model().primary_key.name,
1010        )?;
1011        let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
1012            .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
1013            .transpose()?;
1014        let query = lowered
1015            .into_query()
1016            .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
1017
1018        Ok((query, projection_columns.flatten()))
1019    }
1020
1021    // Execute one parsed SQL query route through the shared aggregate,
1022    // computed-projection, and lowered query lane so typed and generated
1023    // dispatch only differ at the final SELECT/DELETE packaging boundary.
1024    fn dispatch_sql_query_route_for_authority(
1025        &self,
1026        parsed: &SqlParsedStatement,
1027        authority: EntityAuthority,
1028        unsupported_message: &'static str,
1029        dispatch_select: impl FnOnce(
1030            &Self,
1031            LoweredSelectShape,
1032            EntityAuthority,
1033            bool,
1034            Option<Vec<String>>,
1035        ) -> Result<SqlDispatchResult, QueryError>,
1036        dispatch_delete: impl FnOnce(
1037            &Self,
1038            LoweredBaseQueryShape,
1039            EntityAuthority,
1040        ) -> Result<SqlDispatchResult, QueryError>,
1041    ) -> Result<SqlDispatchResult, QueryError> {
1042        // Phase 1: keep aggregate and computed projection classification on the
1043        // shared parsed route so both dispatch surfaces honor the same lane split.
1044        if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
1045            let command =
1046                Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
1047
1048            return self.execute_sql_aggregate_dispatch_for_authority(
1049                command,
1050                authority,
1051                sql_aggregate_dispatch_label_override(&parsed.statement),
1052            );
1053        }
1054
1055        if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
1056            return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
1057        }
1058
1059        // Phase 2: lower the remaining query route once, then let the caller
1060        // decide only the final outward result packaging.
1061        let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
1062            parsed,
1063            authority,
1064            unsupported_message,
1065        )?;
1066        let grouped_surface = query.has_grouping();
1067
1068        match query {
1069            LoweredSqlQuery::Select(select) => {
1070                dispatch_select(self, select, authority, grouped_surface, projection_columns)
1071            }
1072            LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
1073        }
1074    }
1075
1076    // Execute one parsed SQL EXPLAIN route through the shared computed-
1077    // projection and lowered explain lanes so typed and generated dispatch do
1078    // not duplicate the same explain classification tree.
1079    fn dispatch_sql_explain_route_for_authority(
1080        &self,
1081        parsed: &SqlParsedStatement,
1082        authority: EntityAuthority,
1083    ) -> Result<SqlDispatchResult, QueryError> {
1084        // Phase 1: keep computed-projection explain ownership on the same
1085        // parsed route boundary as the shared query lane.
1086        if let Some((mode, plan)) =
1087            computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
1088        {
1089            return self
1090                .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
1091                .map(SqlDispatchResult::Explain);
1092        }
1093
1094        // Phase 2: lower once for execution/logical explain and preserve the
1095        // shared execution-first fallback policy across both callers.
1096        let lowered = parsed.lower_query_lane_for_entity(
1097            authority.model().name(),
1098            authority.model().primary_key.name,
1099        )?;
1100        if let Some(explain) =
1101            self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
1102        {
1103            return Ok(SqlDispatchResult::Explain(explain));
1104        }
1105
1106        self.explain_lowered_sql_for_authority(&lowered, authority)
1107            .map(SqlDispatchResult::Explain)
1108    }
1109
1110    // Validate that one SQL-derived query intent matches the grouped/scalar
1111    // execution surface that is about to consume it.
1112    pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
1113        query: &Query<E>,
1114        surface: SqlGroupingSurface,
1115    ) -> Result<(), QueryError>
1116    where
1117        E: EntityKind,
1118    {
1119        match (surface, query.has_grouping()) {
1120            (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
1121            (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
1122                QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
1123            ),
1124        }
1125    }
1126
1127    /// Execute one reduced SQL statement into one unified SQL dispatch payload.
1128    pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
1129    where
1130        E: PersistedRow<Canister = C> + EntityValue,
1131    {
1132        let parsed = self.parse_sql_statement(sql)?;
1133
1134        self.execute_sql_dispatch_parsed::<E>(&parsed)
1135    }
1136
1137    /// Execute one parsed reduced SQL statement into one unified SQL payload.
1138    pub fn execute_sql_dispatch_parsed<E>(
1139        &self,
1140        parsed: &SqlParsedStatement,
1141    ) -> Result<SqlDispatchResult, QueryError>
1142    where
1143        E: PersistedRow<Canister = C> + EntityValue,
1144    {
1145        match parsed.route() {
1146            SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1147                parsed,
1148                EntityAuthority::for_type::<E>(),
1149                "execute_sql_dispatch accepts SELECT or DELETE only",
1150                |session, select, authority, grouped_surface, projection_columns| {
1151                    if grouped_surface {
1152                        let columns = projection_columns.ok_or_else(|| {
1153                            QueryError::unsupported_query(
1154                                "grouped SQL dispatch requires explicit grouped projection items",
1155                            )
1156                        })?;
1157
1158                        return session.execute_lowered_sql_grouped_dispatch_select_core(
1159                            select, authority, columns,
1160                        );
1161                    }
1162
1163                    let payload = session.execute_lowered_sql_projection_core(select, authority)?;
1164                    if let Some(columns) = projection_columns {
1165                        let (_, rows, row_count) = payload.into_parts();
1166
1167                        return Ok(SqlProjectionPayload::new(columns, rows, row_count)
1168                            .into_dispatch_result());
1169                    }
1170
1171                    Ok(payload.into_dispatch_result())
1172                },
1173                |session, delete, _authority| {
1174                    let typed_query = bind_lowered_sql_query::<E>(
1175                        LoweredSqlQuery::Delete(delete),
1176                        MissingRowPolicy::Ignore,
1177                    )
1178                    .map_err(QueryError::from_sql_lowering_error)?;
1179
1180                    session.execute_typed_sql_delete(&typed_query)
1181                },
1182            ),
1183            SqlStatementRoute::Insert { .. } => {
1184                let SqlStatement::Insert(statement) = &parsed.statement else {
1185                    return Err(QueryError::invariant(
1186                        "INSERT SQL route must carry parsed INSERT statement",
1187                    ));
1188                };
1189
1190                self.execute_sql_insert_dispatch::<E>(statement)
1191            }
1192            SqlStatementRoute::Update { .. } => {
1193                let SqlStatement::Update(statement) = &parsed.statement else {
1194                    return Err(QueryError::invariant(
1195                        "UPDATE SQL route must carry parsed UPDATE statement",
1196                    ));
1197                };
1198
1199                self.execute_sql_update_dispatch::<E>(statement)
1200            }
1201            SqlStatementRoute::Explain { .. } => self
1202                .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
1203            SqlStatementRoute::Describe { .. } => {
1204                Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
1205            }
1206            SqlStatementRoute::ShowIndexes { .. } => {
1207                Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
1208            }
1209            SqlStatementRoute::ShowColumns { .. } => {
1210                Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
1211            }
1212            SqlStatementRoute::ShowEntities => {
1213                Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
1214            }
1215        }
1216    }
1217
1218    /// Execute one parsed reduced SQL statement through the generated canister
1219    /// query/explain surface for one already-resolved dynamic authority.
1220    ///
1221    /// This keeps the canister SQL facade on the same reduced SQL ownership
1222    /// boundary as typed dispatch without forcing the outer facade to reopen
1223    /// typed-generic routing just to preserve parity for computed projections.
1224    #[doc(hidden)]
1225    pub fn execute_generated_query_surface_dispatch_for_authority(
1226        &self,
1227        parsed: &SqlParsedStatement,
1228        authority: EntityAuthority,
1229    ) -> Result<SqlDispatchResult, QueryError> {
1230        match parsed.route() {
1231            SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1232                parsed,
1233                authority,
1234                "generated SQL query surface requires query or EXPLAIN statement lanes",
1235                |session, select, authority, grouped_surface, projection_columns| {
1236                    if grouped_surface {
1237                        let columns = projection_columns.ok_or_else(|| {
1238                            QueryError::unsupported_query(
1239                                "grouped SQL dispatch requires explicit grouped projection items",
1240                            )
1241                        })?;
1242
1243                        return session
1244                            .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
1245                    }
1246
1247                    let result =
1248                        session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
1249                    if let Some(columns) = projection_columns {
1250                        let SqlDispatchResult::ProjectionText {
1251                            rows, row_count, ..
1252                        } = result
1253                        else {
1254                            return Err(QueryError::invariant(
1255                                "generated scalar SQL dispatch text path must emit projection text rows",
1256                            ));
1257                        };
1258
1259                        return Ok(SqlDispatchResult::ProjectionText {
1260                            columns,
1261                            rows,
1262                            row_count,
1263                        });
1264                    }
1265
1266                    Ok(result)
1267                },
1268                |session, delete, authority| {
1269                    session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
1270                },
1271            ),
1272            SqlStatementRoute::Explain { .. } => {
1273                self.dispatch_sql_explain_route_for_authority(parsed, authority)
1274            }
1275            SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
1276            | SqlStatementRoute::Describe { .. }
1277            | SqlStatementRoute::ShowIndexes { .. }
1278            | SqlStatementRoute::ShowColumns { .. }
1279            | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
1280                "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1281            )),
1282        }
1283    }
1284
1285    /// Execute one raw SQL string through the generated canister query surface.
1286    ///
1287    /// This hidden helper keeps parse, route, authority, and metadata/query
1288    /// dispatch ownership in core so the build-generated `sql_dispatch` shim
1289    /// stays close to a pure descriptor table plus public ABI wrapper.
1290    #[doc(hidden)]
1291    #[must_use]
1292    pub fn execute_generated_query_surface_sql(
1293        &self,
1294        sql: &str,
1295        authorities: &[EntityAuthority],
1296    ) -> GeneratedSqlDispatchAttempt {
1297        // Phase 1: normalize and parse once so every generated route family
1298        // shares the same SQL ownership boundary.
1299        let sql_trimmed = match trim_generated_query_sql_input(sql) {
1300            Ok(sql_trimmed) => sql_trimmed,
1301            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1302        };
1303        let parsed = match self.parse_sql_statement(sql_trimmed) {
1304            Ok(parsed) => parsed,
1305            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1306        };
1307
1308        // Phase 2: keep SHOW ENTITIES descriptor-owned and resolve all other
1309        // generated routes against the emitted authority table exactly once.
1310        if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
1311            return GeneratedSqlDispatchAttempt::new(
1312                "",
1313                None,
1314                Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
1315                    authorities,
1316                ))),
1317            );
1318        }
1319        let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1320            Ok(authority) => authority,
1321            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1322        };
1323
1324        // Phase 3: dispatch the resolved route through the existing query,
1325        // explain, and metadata helpers without rebuilding route ownership in
1326        // the generated build output.
1327        let entity_name = authority.model().name();
1328        let explain_order_field = parsed
1329            .route()
1330            .is_explain()
1331            .then_some(authority.model().primary_key.name);
1332        let result = match parsed.route() {
1333            SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1334                self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1335            }
1336            SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1337                Err(QueryError::unsupported_query(
1338                    "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1339                ))
1340            }
1341            SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1342                self.describe_entity_model(authority.model()),
1343            )),
1344            SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1345                self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1346            )),
1347            SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1348                self.show_columns_for_model(authority.model()),
1349            )),
1350            SqlStatementRoute::ShowEntities => unreachable!(
1351                "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1352            ),
1353        };
1354
1355        GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1356    }
1357}