Skip to main content

icydb_core/db/session/sql/dispatch/
mod.rs

1//! Module: db::session::sql::dispatch
2//! Responsibility: session-owned SQL dispatch entrypoints that bind lowered SQL
3//! commands onto structural planning, execution, and outward result shaping.
4//! Does not own: SQL parsing or executor runtime internals.
5//! Boundary: centralizes authority-aware SQL dispatch classification and result packaging.
6
7mod computed;
8mod lowered;
9
10use crate::{
11    db::{
12        DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13        data::UpdatePatch,
14        executor::{EntityAuthority, MutationMode},
15        identifiers_tail_match,
16        predicate::{CompareOp, Predicate},
17        query::{intent::StructuralQuery, plan::AccessPlannedQuery},
18        session::sql::{
19            SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
20            aggregate::parsed_requires_dedicated_sql_aggregate_lane,
21            computed_projection,
22            projection::{
23                SqlProjectionPayload, execute_sql_projection_rows_for_canister,
24                execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
25                projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
26            },
27        },
28        sql::lowering::{
29            LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
30            bind_lowered_sql_query,
31        },
32        sql::parser::{
33            SqlAggregateCall, SqlAggregateKind, SqlInsertStatement, SqlProjection, SqlSelectItem,
34            SqlStatement, SqlTextFunction, SqlUpdateStatement,
35        },
36    },
37    model::{entity::resolve_field_slot, field::FieldKind},
38    traits::{CanisterKind, EntityKind, EntityValue},
39    types::Timestamp,
40    value::Value,
41};
42
43#[cfg(feature = "perf-attribution")]
44pub use lowered::LoweredSqlDispatchExecutorAttribution;
45
46///
47/// GeneratedSqlDispatchAttempt
48///
49/// Hidden generated-query dispatch envelope used by the facade helper to keep
50/// generated route ownership in core while preserving the public EXPLAIN error
51/// rewrite contract at the outer boundary.
52///
53
54#[doc(hidden)]
55pub struct GeneratedSqlDispatchAttempt {
56    entity_name: &'static str,
57    explain_order_field: Option<&'static str>,
58    result: Result<SqlDispatchResult, QueryError>,
59}
60
61impl GeneratedSqlDispatchAttempt {
62    // Build one generated-query dispatch attempt with optional explain-hint context.
63    const fn new(
64        entity_name: &'static str,
65        explain_order_field: Option<&'static str>,
66        result: Result<SqlDispatchResult, QueryError>,
67    ) -> Self {
68        Self {
69            entity_name,
70            explain_order_field,
71            result,
72        }
73    }
74
75    /// Borrow the resolved entity name for this generated-query attempt.
76    #[must_use]
77    pub const fn entity_name(&self) -> &'static str {
78        self.entity_name
79    }
80
81    /// Borrow the suggested deterministic order field for EXPLAIN rewrites.
82    #[must_use]
83    pub const fn explain_order_field(&self) -> Option<&'static str> {
84        self.explain_order_field
85    }
86
87    /// Consume and return the generated-query dispatch result.
88    pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
89        self.result
90    }
91}
92
93#[derive(Clone, Copy, Debug, Eq, PartialEq)]
94pub(in crate::db::session::sql) enum SqlGroupingSurface {
95    Scalar,
96    Grouped,
97}
98
99const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
100    match surface {
101        SqlGroupingSurface::Scalar => {
102            "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
103        }
104        SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
105    }
106}
107
108// Enforce the generated canister query contract that empty SQL is unsupported
109// before any parser/lowering work occurs.
110fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
111    let sql_trimmed = sql.trim();
112    if sql_trimmed.is_empty() {
113        return Err(QueryError::unsupported_query(
114            "query endpoint requires a non-empty SQL string",
115        ));
116    }
117
118    Ok(sql_trimmed)
119}
120
121// Render the generated-surface entity list from the descriptor table instead
122// of assuming every session-visible entity belongs on the public query export.
123fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
124    let mut entities = Vec::with_capacity(authorities.len());
125
126    for authority in authorities {
127        entities.push(authority.model().name().to_string());
128    }
129
130    entities
131}
132
133// Project parsed SELECT items into one stable outward column contract while
134// allowing parser-owned aliases to override only the final session label.
135fn sql_projection_labels_from_select_statement(
136    statement: &SqlStatement,
137) -> Result<Option<Vec<String>>, QueryError> {
138    let SqlStatement::Select(select) = statement else {
139        return Err(QueryError::invariant(
140            "SQL projection labels require SELECT statement shape",
141        ));
142    };
143    let SqlProjection::Items(items) = &select.projection else {
144        return Ok(None);
145    };
146
147    Ok(Some(
148        items
149            .iter()
150            .enumerate()
151            .map(|(index, item)| {
152                select
153                    .projection_alias(index)
154                    .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
155            })
156            .collect(),
157    ))
158}
159
160// Render one grouped SELECT item into the public grouped-column label used by
161// unified dispatch results.
162fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
163    match item {
164        SqlSelectItem::Field(field) => field.clone(),
165        SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
166        SqlSelectItem::TextFunction(call) => {
167            format!(
168                "{}({})",
169                grouped_sql_text_function_name(call.function),
170                call.field
171            )
172        }
173    }
174}
175
176// Keep the dedicated SQL aggregate lane on parser-owned outward labels
177// without reopening alias semantics in lowering or runtime strategy state.
178fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
179    let SqlStatement::Select(select) = statement else {
180        return None;
181    };
182
183    select.projection_alias(0).map(str::to_string)
184}
185
186// Render one aggregate call into one canonical SQL-style label.
187fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
188    let kind = match aggregate.kind {
189        SqlAggregateKind::Count => "COUNT",
190        SqlAggregateKind::Sum => "SUM",
191        SqlAggregateKind::Avg => "AVG",
192        SqlAggregateKind::Min => "MIN",
193        SqlAggregateKind::Max => "MAX",
194    };
195
196    match aggregate.field.as_deref() {
197        Some(field) => format!("{kind}({field})"),
198        None => format!("{kind}(*)"),
199    }
200}
201
202// Render one reduced SQL text-function identifier into one stable uppercase
203// SQL label for outward column metadata.
204const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
205    match function {
206        SqlTextFunction::Trim => "TRIM",
207        SqlTextFunction::Ltrim => "LTRIM",
208        SqlTextFunction::Rtrim => "RTRIM",
209        SqlTextFunction::Lower => "LOWER",
210        SqlTextFunction::Upper => "UPPER",
211        SqlTextFunction::Length => "LENGTH",
212        SqlTextFunction::Left => "LEFT",
213        SqlTextFunction::Right => "RIGHT",
214        SqlTextFunction::StartsWith => "STARTS_WITH",
215        SqlTextFunction::EndsWith => "ENDS_WITH",
216        SqlTextFunction::Contains => "CONTAINS",
217        SqlTextFunction::Position => "POSITION",
218        SqlTextFunction::Replace => "REPLACE",
219        SqlTextFunction::Substring => "SUBSTRING",
220    }
221}
222
223// Resolve one generated query route onto the descriptor-owned authority table.
224fn authority_for_generated_sql_route(
225    route: &SqlStatementRoute,
226    authorities: &[EntityAuthority],
227) -> Result<EntityAuthority, QueryError> {
228    let sql_entity = route.entity();
229
230    for authority in authorities {
231        if identifiers_tail_match(sql_entity, authority.model().name()) {
232            return Ok(*authority);
233        }
234    }
235
236    Err(unsupported_generated_sql_entity_error(
237        sql_entity,
238        authorities,
239    ))
240}
241
242// Keep the generated query-surface unsupported-entity contract stable while
243// moving authority lookup out of the build-generated shim.
244fn unsupported_generated_sql_entity_error(
245    entity_name: &str,
246    authorities: &[EntityAuthority],
247) -> QueryError {
248    let mut supported = String::new();
249
250    for (index, authority) in authorities.iter().enumerate() {
251        if index != 0 {
252            supported.push_str(", ");
253        }
254
255        supported.push_str(authority.model().name());
256    }
257
258    QueryError::unsupported_query(format!(
259        "query endpoint does not support entity '{entity_name}'; supported: {supported}"
260    ))
261}
262
263// Keep typed SQL write routes on the same entity-match contract used by
264// lowered query dispatch, without widening write statements into lowering.
265fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
266where
267    E: EntityKind,
268{
269    if identifiers_tail_match(sql_entity, E::MODEL.name()) {
270        return Ok(());
271    }
272
273    Err(QueryError::from_sql_lowering_error(
274        SqlLoweringError::EntityMismatch {
275            sql_entity: sql_entity.to_string(),
276            expected_entity: E::MODEL.name(),
277        },
278    ))
279}
280
281// Normalize one reduced-SQL primary-key literal onto the concrete entity key
282// type accepted by the structural mutation entrypoint.
283fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
284where
285    E: EntityKind,
286{
287    if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
288        return Ok(key);
289    }
290
291    let widened = match value {
292        Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
293        Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
294        _ => {
295            return Err(QueryError::unsupported_query(format!(
296                "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
297            )));
298        }
299    };
300
301    <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
302        QueryError::unsupported_query(format!(
303            "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
304        ))
305    })
306}
307
308// Normalize one reduced-SQL write literal onto the target entity field kind
309// when the parser's numeric literal domain is narrower than the runtime field.
310fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
311where
312    E: EntityKind,
313{
314    let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
315        QueryError::invariant("SQL write field must resolve against the target entity model")
316    })?;
317    let field_kind = E::MODEL.fields()[field_slot].kind();
318
319    let normalized = match (field_kind, value) {
320        (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
321        (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
322            Value::Int(v.cast_signed())
323        }
324        _ => value.clone(),
325    };
326
327    Ok(normalized)
328}
329
330// Mirror the derive-owned system timestamp contract on the structural SQL
331// write lane so schema-derived entities stay writable without exposing those
332// slots as required user-authored SQL columns.
333fn sql_write_system_timestamp_fields<E>() -> Option<(&'static str, &'static str)>
334where
335    E: EntityKind,
336{
337    if resolve_field_slot(E::MODEL, "created_at").is_some()
338        && resolve_field_slot(E::MODEL, "updated_at").is_some()
339    {
340        return Some(("created_at", "updated_at"));
341    }
342
343    None
344}
345
346// Resolve the effective INSERT column list for one reduced SQL write:
347// explicit column lists pass through, while omitted-column-list INSERT uses
348// canonical model field order only when no hidden timestamp synthesis would be
349// required.
350fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Result<Vec<String>, QueryError>
351where
352    E: EntityKind,
353{
354    if !statement.columns.is_empty() {
355        return Ok(statement.columns.clone());
356    }
357    if sql_write_system_timestamp_fields::<E>().is_some() {
358        return Err(QueryError::unsupported_query(
359            "SQL INSERT without explicit column list is not supported for entities with system timestamp fields in this release",
360        ));
361    }
362
363    Ok(E::MODEL
364        .fields()
365        .iter()
366        .map(|field| field.name().to_string())
367        .collect())
368}
369
370// Validate one INSERT tuple list against the resolved effective column list so
371// every VALUES tuple stays full-width and deterministic.
372fn validate_sql_insert_tuple_lengths(
373    columns: &[String],
374    values: &[Vec<Value>],
375) -> Result<(), QueryError> {
376    for tuple in values {
377        if tuple.len() != columns.len() {
378            return Err(QueryError::from_sql_parse_error(
379                crate::db::sql::parser::SqlParseError::invalid_syntax(
380                    "INSERT column list and VALUES tuple length must match",
381                ),
382            ));
383        }
384    }
385
386    Ok(())
387}
388
389impl<C: CanisterKind> DbSession<C> {
390    // Project one typed SQL write after-image into one outward SQL row using
391    // the persisted model field order.
392    fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
393    where
394        E: PersistedRow<Canister = C> + EntityValue,
395    {
396        let mut row = Vec::with_capacity(E::MODEL.fields().len());
397
398        for index in 0..E::MODEL.fields().len() {
399            let value = entity.get_value_by_index(index).ok_or_else(|| {
400                QueryError::invariant(
401                    "SQL write dispatch projection row must include every declared field",
402                )
403            })?;
404            row.push(value);
405        }
406
407        Ok(row)
408    }
409
410    // Render one or more typed entities returned by SQL write dispatch as one
411    // projection payload so write statements reuse the same outward result
412    // family as row-producing SELECT and DELETE dispatch.
413    fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
414    where
415        E: PersistedRow<Canister = C> + EntityValue,
416    {
417        let columns = projection_labels_from_fields(E::MODEL.fields());
418        let rows = entities
419            .into_iter()
420            .map(Self::sql_write_dispatch_row)
421            .collect::<Result<Vec<_>, _>>()?;
422        let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
423
424        Ok(SqlDispatchResult::Projection {
425            columns,
426            rows,
427            row_count,
428        })
429    }
430
431    // Build the structural insert patch and resolved primary key expected by
432    // the shared structural mutation entrypoint.
433    fn sql_insert_patch_and_key<E>(
434        columns: &[String],
435        values: &[Value],
436    ) -> Result<(E::Key, UpdatePatch), QueryError>
437    where
438        E: PersistedRow<Canister = C> + EntityValue,
439    {
440        // Phase 1: resolve the required primary-key literal from the explicit
441        // INSERT column/value list.
442        let pk_name = E::MODEL.primary_key.name;
443        let Some(pk_index) = columns.iter().position(|field| field == pk_name) else {
444            return Err(QueryError::unsupported_query(format!(
445                "SQL INSERT requires primary key column '{pk_name}' in this release"
446            )));
447        };
448        let pk_value = values.get(pk_index).ok_or_else(|| {
449            QueryError::invariant("INSERT primary key column must align with one VALUES literal")
450        })?;
451        let key = sql_write_key_from_literal::<E>(pk_value, pk_name)?;
452
453        // Phase 2: lower the explicit column/value pairs onto the structural
454        // patch program consumed by the shared save path.
455        let mut patch = UpdatePatch::new();
456        for (field, value) in columns.iter().zip(values.iter()) {
457            let normalized = sql_write_value_for_field::<E>(field, value)?;
458            patch = patch
459                .set_field(E::MODEL, field, normalized)
460                .map_err(QueryError::execute)?;
461        }
462
463        // Phase 3: synthesize the derive-owned system timestamps when the
464        // target entity carries them, matching the typed write surface.
465        if let Some((created_at, updated_at)) = sql_write_system_timestamp_fields::<E>() {
466            let now = Value::Timestamp(Timestamp::now());
467            patch = patch
468                .set_field(E::MODEL, created_at, now.clone())
469                .map_err(QueryError::execute)?;
470            patch = patch
471                .set_field(E::MODEL, updated_at, now)
472                .map_err(QueryError::execute)?;
473        }
474
475        Ok((key, patch))
476    }
477
478    // Build the structural update patch and resolved primary key expected by
479    // the shared structural mutation entrypoint.
480    fn sql_update_patch_and_key<E>(
481        statement: &SqlUpdateStatement,
482    ) -> Result<(E::Key, UpdatePatch), QueryError>
483    where
484        E: PersistedRow<Canister = C> + EntityValue,
485    {
486        // Phase 1: require the narrow `WHERE <pk> = literal` update selector
487        // so this first SQL update slice stays on the existing single-row
488        // structural mutation contract.
489        let pk_name = E::MODEL.primary_key.name;
490        let Some(Predicate::Compare(compare)) = &statement.predicate else {
491            return Err(QueryError::unsupported_query(format!(
492                "SQL UPDATE requires WHERE {pk_name} = literal in this release"
493            )));
494        };
495        if compare.field() != pk_name || compare.op() != CompareOp::Eq {
496            return Err(QueryError::unsupported_query(format!(
497                "SQL UPDATE requires WHERE {pk_name} = literal in this release"
498            )));
499        }
500        let key = sql_write_key_from_literal::<E>(compare.value(), pk_name)?;
501
502        // Phase 2: lower the `SET` list onto the structural patch program
503        // while keeping primary-key mutation out of this first SQL update slice.
504        let mut patch = UpdatePatch::new();
505        for assignment in &statement.assignments {
506            if assignment.field == pk_name {
507                return Err(QueryError::unsupported_query(format!(
508                    "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
509                )));
510            }
511            let normalized =
512                sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
513
514            patch = patch
515                .set_field(E::MODEL, assignment.field.as_str(), normalized)
516                .map_err(QueryError::execute)?;
517        }
518
519        // Phase 3: keep structural SQL UPDATE aligned with the derive-owned
520        // auto-updated timestamp contract when the entity carries that field.
521        if let Some((_, updated_at)) = sql_write_system_timestamp_fields::<E>() {
522            patch = patch
523                .set_field(E::MODEL, updated_at, Value::Timestamp(Timestamp::now()))
524                .map_err(QueryError::execute)?;
525        }
526
527        Ok((key, patch))
528    }
529
530    // Execute one narrow SQL INSERT statement through the existing structural
531    // mutation path and project the returned after-image as one SQL row.
532    fn execute_sql_insert_dispatch<E>(
533        &self,
534        statement: &SqlInsertStatement,
535    ) -> Result<SqlDispatchResult, QueryError>
536    where
537        E: PersistedRow<Canister = C> + EntityValue,
538    {
539        ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
540        let columns = sql_insert_columns::<E>(statement)?;
541        validate_sql_insert_tuple_lengths(columns.as_slice(), statement.values.as_slice())?;
542        let mut entities = Vec::with_capacity(statement.values.len());
543
544        for values in &statement.values {
545            let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
546            let entity = self
547                .mutate_structural::<E>(key, patch, MutationMode::Insert)
548                .map_err(QueryError::execute)?;
549            entities.push(entity);
550        }
551
552        Self::sql_write_dispatch_projection(entities)
553    }
554
555    // Execute one narrow SQL UPDATE statement through the existing structural
556    // mutation path and project the returned after-image as one SQL row.
557    fn execute_sql_update_dispatch<E>(
558        &self,
559        statement: &SqlUpdateStatement,
560    ) -> Result<SqlDispatchResult, QueryError>
561    where
562        E: PersistedRow<Canister = C> + EntityValue,
563    {
564        ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
565        let (key, patch) = Self::sql_update_patch_and_key::<E>(statement)?;
566        let entity = self
567            .mutate_structural::<E>(key, patch, MutationMode::Update)
568            .map_err(QueryError::execute)?;
569
570        Self::sql_write_dispatch_projection(vec![entity])
571    }
572
573    // Build the shared structural SQL projection execution inputs once so
574    // value-row and rendered-row dispatch surfaces only differ in final packaging.
575    fn prepare_structural_sql_projection_execution(
576        &self,
577        query: StructuralQuery,
578        authority: EntityAuthority,
579    ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
580        // Phase 1: build the structural access plan once and freeze its outward
581        // column contract for all projection materialization surfaces.
582        let (_, plan) =
583            self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
584        let projection = plan.projection_spec(authority.model());
585        let columns = projection_labels_from_projection_spec(&projection);
586
587        Ok((columns, plan))
588    }
589
590    // Execute one structural SQL load query and return only row-oriented SQL
591    // projection values, keeping typed projection rows out of the shared SQL
592    // query-lane path.
593    pub(in crate::db::session::sql) fn execute_structural_sql_projection(
594        &self,
595        query: StructuralQuery,
596        authority: EntityAuthority,
597    ) -> Result<SqlProjectionPayload, QueryError> {
598        // Phase 1: build the shared structural plan and outward column contract once.
599        let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
600
601        // Phase 2: execute the shared structural load path with the already
602        // derived projection semantics.
603        let projected =
604            execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
605                .map_err(QueryError::execute)?;
606        let (rows, row_count) = projected.into_parts();
607
608        Ok(SqlProjectionPayload::new(columns, rows, row_count))
609    }
610
611    // Execute one structural SQL load query and return render-ready text rows
612    // for the dispatch lane when the terminal short path can prove them
613    // directly.
614    fn execute_structural_sql_projection_text(
615        &self,
616        query: StructuralQuery,
617        authority: EntityAuthority,
618    ) -> Result<SqlDispatchResult, QueryError> {
619        // Phase 1: build the shared structural plan and outward column contract once.
620        let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
621
622        // Phase 2: execute the shared structural load path with the already
623        // derived projection semantics while preferring rendered SQL rows.
624        let projected =
625            execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
626                .map_err(QueryError::execute)?;
627        let (rows, row_count) = projected.into_parts();
628
629        Ok(SqlDispatchResult::ProjectionText {
630            columns,
631            rows,
632            row_count,
633        })
634    }
635
636    // Execute one typed SQL delete query while keeping the row payload on the
637    // typed delete executor boundary that still owns non-runtime-hook delete
638    // commit-window application.
639    fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
640    where
641        E: PersistedRow<Canister = C> + EntityValue,
642    {
643        let plan = self
644            .compile_query_with_visible_indexes(query)?
645            .into_prepared_execution_plan();
646        let deleted = self
647            .with_metrics(|| {
648                self.delete_executor::<E>()
649                    .execute_structural_projection(plan)
650            })
651            .map_err(QueryError::execute)?;
652        let (rows, row_count) = deleted.into_parts();
653        let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
654
655        Ok(SqlProjectionPayload::new(
656            projection_labels_from_fields(E::MODEL.fields()),
657            rows,
658            row_count,
659        )
660        .into_dispatch_result())
661    }
662
663    // Lower one parsed SQL query/explain route once for one resolved authority
664    // and preserve grouped-column metadata for grouped SELECT dispatch.
665    fn lowered_sql_query_dispatch_inputs_for_authority(
666        parsed: &SqlParsedStatement,
667        authority: EntityAuthority,
668        unsupported_message: &'static str,
669    ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
670        let lowered = parsed.lower_query_lane_for_entity(
671            authority.model().name(),
672            authority.model().primary_key.name,
673        )?;
674        let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
675            .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
676            .transpose()?;
677        let query = lowered
678            .into_query()
679            .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
680
681        Ok((query, projection_columns.flatten()))
682    }
683
684    // Execute one parsed SQL query route through the shared aggregate,
685    // computed-projection, and lowered query lane so typed and generated
686    // dispatch only differ at the final SELECT/DELETE packaging boundary.
687    fn dispatch_sql_query_route_for_authority(
688        &self,
689        parsed: &SqlParsedStatement,
690        authority: EntityAuthority,
691        unsupported_message: &'static str,
692        dispatch_select: impl FnOnce(
693            &Self,
694            LoweredSelectShape,
695            EntityAuthority,
696            bool,
697            Option<Vec<String>>,
698        ) -> Result<SqlDispatchResult, QueryError>,
699        dispatch_delete: impl FnOnce(
700            &Self,
701            LoweredBaseQueryShape,
702            EntityAuthority,
703        ) -> Result<SqlDispatchResult, QueryError>,
704    ) -> Result<SqlDispatchResult, QueryError> {
705        // Phase 1: keep aggregate and computed projection classification on the
706        // shared parsed route so both dispatch surfaces honor the same lane split.
707        if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
708            let command =
709                Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
710
711            return self.execute_sql_aggregate_dispatch_for_authority(
712                command,
713                authority,
714                sql_aggregate_dispatch_label_override(&parsed.statement),
715            );
716        }
717
718        if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
719            return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
720        }
721
722        // Phase 2: lower the remaining query route once, then let the caller
723        // decide only the final outward result packaging.
724        let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
725            parsed,
726            authority,
727            unsupported_message,
728        )?;
729        let grouped_surface = query.has_grouping();
730
731        match query {
732            LoweredSqlQuery::Select(select) => {
733                dispatch_select(self, select, authority, grouped_surface, projection_columns)
734            }
735            LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
736        }
737    }
738
739    // Execute one parsed SQL EXPLAIN route through the shared computed-
740    // projection and lowered explain lanes so typed and generated dispatch do
741    // not duplicate the same explain classification tree.
742    fn dispatch_sql_explain_route_for_authority(
743        &self,
744        parsed: &SqlParsedStatement,
745        authority: EntityAuthority,
746    ) -> Result<SqlDispatchResult, QueryError> {
747        // Phase 1: keep computed-projection explain ownership on the same
748        // parsed route boundary as the shared query lane.
749        if let Some((mode, plan)) =
750            computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
751        {
752            return self
753                .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
754                .map(SqlDispatchResult::Explain);
755        }
756
757        // Phase 2: lower once for execution/logical explain and preserve the
758        // shared execution-first fallback policy across both callers.
759        let lowered = parsed.lower_query_lane_for_entity(
760            authority.model().name(),
761            authority.model().primary_key.name,
762        )?;
763        if let Some(explain) =
764            self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
765        {
766            return Ok(SqlDispatchResult::Explain(explain));
767        }
768
769        self.explain_lowered_sql_for_authority(&lowered, authority)
770            .map(SqlDispatchResult::Explain)
771    }
772
773    // Validate that one SQL-derived query intent matches the grouped/scalar
774    // execution surface that is about to consume it.
775    pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
776        query: &Query<E>,
777        surface: SqlGroupingSurface,
778    ) -> Result<(), QueryError>
779    where
780        E: EntityKind,
781    {
782        match (surface, query.has_grouping()) {
783            (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
784            (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
785                QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
786            ),
787        }
788    }
789
790    /// Execute one reduced SQL statement into one unified SQL dispatch payload.
791    pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
792    where
793        E: PersistedRow<Canister = C> + EntityValue,
794    {
795        let parsed = self.parse_sql_statement(sql)?;
796
797        self.execute_sql_dispatch_parsed::<E>(&parsed)
798    }
799
800    /// Execute one parsed reduced SQL statement into one unified SQL payload.
801    pub fn execute_sql_dispatch_parsed<E>(
802        &self,
803        parsed: &SqlParsedStatement,
804    ) -> Result<SqlDispatchResult, QueryError>
805    where
806        E: PersistedRow<Canister = C> + EntityValue,
807    {
808        match parsed.route() {
809            SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
810                parsed,
811                EntityAuthority::for_type::<E>(),
812                "execute_sql_dispatch accepts SELECT or DELETE only",
813                |session, select, authority, grouped_surface, projection_columns| {
814                    if grouped_surface {
815                        let columns = projection_columns.ok_or_else(|| {
816                            QueryError::unsupported_query(
817                                "grouped SQL dispatch requires explicit grouped projection items",
818                            )
819                        })?;
820
821                        return session.execute_lowered_sql_grouped_dispatch_select_core(
822                            select, authority, columns,
823                        );
824                    }
825
826                    let payload = session.execute_lowered_sql_projection_core(select, authority)?;
827                    if let Some(columns) = projection_columns {
828                        let (_, rows, row_count) = payload.into_parts();
829
830                        return Ok(SqlProjectionPayload::new(columns, rows, row_count)
831                            .into_dispatch_result());
832                    }
833
834                    Ok(payload.into_dispatch_result())
835                },
836                |session, delete, _authority| {
837                    let typed_query = bind_lowered_sql_query::<E>(
838                        LoweredSqlQuery::Delete(delete),
839                        MissingRowPolicy::Ignore,
840                    )
841                    .map_err(QueryError::from_sql_lowering_error)?;
842
843                    session.execute_typed_sql_delete(&typed_query)
844                },
845            ),
846            SqlStatementRoute::Insert { .. } => {
847                let SqlStatement::Insert(statement) = &parsed.statement else {
848                    return Err(QueryError::invariant(
849                        "INSERT SQL route must carry parsed INSERT statement",
850                    ));
851                };
852
853                self.execute_sql_insert_dispatch::<E>(statement)
854            }
855            SqlStatementRoute::Update { .. } => {
856                let SqlStatement::Update(statement) = &parsed.statement else {
857                    return Err(QueryError::invariant(
858                        "UPDATE SQL route must carry parsed UPDATE statement",
859                    ));
860                };
861
862                self.execute_sql_update_dispatch::<E>(statement)
863            }
864            SqlStatementRoute::Explain { .. } => self
865                .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
866            SqlStatementRoute::Describe { .. } => {
867                Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
868            }
869            SqlStatementRoute::ShowIndexes { .. } => {
870                Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
871            }
872            SqlStatementRoute::ShowColumns { .. } => {
873                Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
874            }
875            SqlStatementRoute::ShowEntities => {
876                Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
877            }
878        }
879    }
880
881    /// Execute one parsed reduced SQL statement through the generated canister
882    /// query/explain surface for one already-resolved dynamic authority.
883    ///
884    /// This keeps the canister SQL facade on the same reduced SQL ownership
885    /// boundary as typed dispatch without forcing the outer facade to reopen
886    /// typed-generic routing just to preserve parity for computed projections.
887    #[doc(hidden)]
888    pub fn execute_generated_query_surface_dispatch_for_authority(
889        &self,
890        parsed: &SqlParsedStatement,
891        authority: EntityAuthority,
892    ) -> Result<SqlDispatchResult, QueryError> {
893        match parsed.route() {
894            SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
895                parsed,
896                authority,
897                "generated SQL query surface requires query or EXPLAIN statement lanes",
898                |session, select, authority, grouped_surface, projection_columns| {
899                    if grouped_surface {
900                        let columns = projection_columns.ok_or_else(|| {
901                            QueryError::unsupported_query(
902                                "grouped SQL dispatch requires explicit grouped projection items",
903                            )
904                        })?;
905
906                        return session
907                            .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
908                    }
909
910                    let result =
911                        session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
912                    if let Some(columns) = projection_columns {
913                        let SqlDispatchResult::ProjectionText {
914                            rows, row_count, ..
915                        } = result
916                        else {
917                            return Err(QueryError::invariant(
918                                "generated scalar SQL dispatch text path must emit projection text rows",
919                            ));
920                        };
921
922                        return Ok(SqlDispatchResult::ProjectionText {
923                            columns,
924                            rows,
925                            row_count,
926                        });
927                    }
928
929                    Ok(result)
930                },
931                |session, delete, authority| {
932                    session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
933                },
934            ),
935            SqlStatementRoute::Explain { .. } => {
936                self.dispatch_sql_explain_route_for_authority(parsed, authority)
937            }
938            SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
939            | SqlStatementRoute::Describe { .. }
940            | SqlStatementRoute::ShowIndexes { .. }
941            | SqlStatementRoute::ShowColumns { .. }
942            | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
943                "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
944            )),
945        }
946    }
947
948    /// Execute one raw SQL string through the generated canister query surface.
949    ///
950    /// This hidden helper keeps parse, route, authority, and metadata/query
951    /// dispatch ownership in core so the build-generated `sql_dispatch` shim
952    /// stays close to a pure descriptor table plus public ABI wrapper.
953    #[doc(hidden)]
954    #[must_use]
955    pub fn execute_generated_query_surface_sql(
956        &self,
957        sql: &str,
958        authorities: &[EntityAuthority],
959    ) -> GeneratedSqlDispatchAttempt {
960        // Phase 1: normalize and parse once so every generated route family
961        // shares the same SQL ownership boundary.
962        let sql_trimmed = match trim_generated_query_sql_input(sql) {
963            Ok(sql_trimmed) => sql_trimmed,
964            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
965        };
966        let parsed = match self.parse_sql_statement(sql_trimmed) {
967            Ok(parsed) => parsed,
968            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
969        };
970
971        // Phase 2: keep SHOW ENTITIES descriptor-owned and resolve all other
972        // generated routes against the emitted authority table exactly once.
973        if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
974            return GeneratedSqlDispatchAttempt::new(
975                "",
976                None,
977                Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
978                    authorities,
979                ))),
980            );
981        }
982        let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
983            Ok(authority) => authority,
984            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
985        };
986
987        // Phase 3: dispatch the resolved route through the existing query,
988        // explain, and metadata helpers without rebuilding route ownership in
989        // the generated build output.
990        let entity_name = authority.model().name();
991        let explain_order_field = parsed
992            .route()
993            .is_explain()
994            .then_some(authority.model().primary_key.name);
995        let result = match parsed.route() {
996            SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
997                self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
998            }
999            SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1000                Err(QueryError::unsupported_query(
1001                    "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1002                ))
1003            }
1004            SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1005                self.describe_entity_model(authority.model()),
1006            )),
1007            SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1008                self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1009            )),
1010            SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1011                self.show_columns_for_model(authority.model()),
1012            )),
1013            SqlStatementRoute::ShowEntities => unreachable!(
1014                "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1015            ),
1016        };
1017
1018        GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1019    }
1020}