Skip to main content

icydb_core/db/session/sql/dispatch/
mod.rs

1//! Module: db::session::sql::dispatch
2//! Responsibility: session-owned SQL dispatch entrypoints that bind lowered SQL
3//! commands onto structural planning, execution, and outward result shaping.
4//! Does not own: SQL parsing or executor runtime internals.
5//! Boundary: centralizes authority-aware SQL dispatch classification and result packaging.
6
7mod computed;
8mod lowered;
9
10use crate::{
11    db::{
12        DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13        data::UpdatePatch,
14        executor::{EntityAuthority, MutationMode},
15        identifiers_tail_match,
16        query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17        session::sql::{
18            SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
19            aggregate::parsed_requires_dedicated_sql_aggregate_lane,
20            computed_projection,
21            projection::{
22                SqlProjectionPayload, execute_sql_projection_rows_for_canister,
23                execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
24                projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25            },
26        },
27        sql::lowering::{
28            LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
29            bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
30        },
31        sql::parser::{
32            SqlAggregateCall, SqlAggregateKind, SqlInsertStatement, SqlOrderDirection,
33            SqlProjection, SqlSelectItem, SqlStatement, SqlTextFunction, SqlUpdateStatement,
34        },
35    },
36    model::{entity::resolve_field_slot, field::FieldKind},
37    traits::{CanisterKind, EntityKind, EntityValue},
38    types::{Timestamp, Ulid},
39    value::Value,
40};
41
42#[cfg(feature = "perf-attribution")]
43pub use lowered::LoweredSqlDispatchExecutorAttribution;
44
45///
46/// GeneratedSqlDispatchAttempt
47///
48/// Hidden generated-query dispatch envelope used by the facade helper to keep
49/// generated route ownership in core while preserving the public EXPLAIN error
50/// rewrite contract at the outer boundary.
51///
52
53#[doc(hidden)]
54pub struct GeneratedSqlDispatchAttempt {
55    entity_name: &'static str,
56    explain_order_field: Option<&'static str>,
57    result: Result<SqlDispatchResult, QueryError>,
58}
59
60impl GeneratedSqlDispatchAttempt {
61    // Build one generated-query dispatch attempt with optional explain-hint context.
62    const fn new(
63        entity_name: &'static str,
64        explain_order_field: Option<&'static str>,
65        result: Result<SqlDispatchResult, QueryError>,
66    ) -> Self {
67        Self {
68            entity_name,
69            explain_order_field,
70            result,
71        }
72    }
73
74    /// Borrow the resolved entity name for this generated-query attempt.
75    #[must_use]
76    pub const fn entity_name(&self) -> &'static str {
77        self.entity_name
78    }
79
80    /// Borrow the suggested deterministic order field for EXPLAIN rewrites.
81    #[must_use]
82    pub const fn explain_order_field(&self) -> Option<&'static str> {
83        self.explain_order_field
84    }
85
86    /// Consume and return the generated-query dispatch result.
87    pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
88        self.result
89    }
90}
91
92#[derive(Clone, Copy, Debug, Eq, PartialEq)]
93pub(in crate::db::session::sql) enum SqlGroupingSurface {
94    Scalar,
95    Grouped,
96}
97
98const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
99    match surface {
100        SqlGroupingSurface::Scalar => {
101            "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
102        }
103        SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
104    }
105}
106
107// Enforce the generated canister query contract that empty SQL is unsupported
108// before any parser/lowering work occurs.
109fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
110    let sql_trimmed = sql.trim();
111    if sql_trimmed.is_empty() {
112        return Err(QueryError::unsupported_query(
113            "query endpoint requires a non-empty SQL string",
114        ));
115    }
116
117    Ok(sql_trimmed)
118}
119
120// Render the generated-surface entity list from the descriptor table instead
121// of assuming every session-visible entity belongs on the public query export.
122fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
123    let mut entities = Vec::with_capacity(authorities.len());
124
125    for authority in authorities {
126        entities.push(authority.model().name().to_string());
127    }
128
129    entities
130}
131
132// Project parsed SELECT items into one stable outward column contract while
133// allowing parser-owned aliases to override only the final session label.
134fn sql_projection_labels_from_select_statement(
135    statement: &SqlStatement,
136) -> Result<Option<Vec<String>>, QueryError> {
137    let SqlStatement::Select(select) = statement else {
138        return Err(QueryError::invariant(
139            "SQL projection labels require SELECT statement shape",
140        ));
141    };
142    let SqlProjection::Items(items) = &select.projection else {
143        return Ok(None);
144    };
145
146    Ok(Some(
147        items
148            .iter()
149            .enumerate()
150            .map(|(index, item)| {
151                select
152                    .projection_alias(index)
153                    .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
154            })
155            .collect(),
156    ))
157}
158
159// Render one grouped SELECT item into the public grouped-column label used by
160// unified dispatch results.
161fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
162    match item {
163        SqlSelectItem::Field(field) => field.clone(),
164        SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
165        SqlSelectItem::TextFunction(call) => {
166            format!(
167                "{}({})",
168                grouped_sql_text_function_name(call.function),
169                call.field
170            )
171        }
172    }
173}
174
175// Keep the dedicated SQL aggregate lane on parser-owned outward labels
176// without reopening alias semantics in lowering or runtime strategy state.
177fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
178    let SqlStatement::Select(select) = statement else {
179        return None;
180    };
181
182    select.projection_alias(0).map(str::to_string)
183}
184
185// Render one aggregate call into one canonical SQL-style label.
186fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
187    let kind = match aggregate.kind {
188        SqlAggregateKind::Count => "COUNT",
189        SqlAggregateKind::Sum => "SUM",
190        SqlAggregateKind::Avg => "AVG",
191        SqlAggregateKind::Min => "MIN",
192        SqlAggregateKind::Max => "MAX",
193    };
194
195    match aggregate.field.as_deref() {
196        Some(field) => format!("{kind}({field})"),
197        None => format!("{kind}(*)"),
198    }
199}
200
201// Render one reduced SQL text-function identifier into one stable uppercase
202// SQL label for outward column metadata.
203const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
204    match function {
205        SqlTextFunction::Trim => "TRIM",
206        SqlTextFunction::Ltrim => "LTRIM",
207        SqlTextFunction::Rtrim => "RTRIM",
208        SqlTextFunction::Lower => "LOWER",
209        SqlTextFunction::Upper => "UPPER",
210        SqlTextFunction::Length => "LENGTH",
211        SqlTextFunction::Left => "LEFT",
212        SqlTextFunction::Right => "RIGHT",
213        SqlTextFunction::StartsWith => "STARTS_WITH",
214        SqlTextFunction::EndsWith => "ENDS_WITH",
215        SqlTextFunction::Contains => "CONTAINS",
216        SqlTextFunction::Position => "POSITION",
217        SqlTextFunction::Replace => "REPLACE",
218        SqlTextFunction::Substring => "SUBSTRING",
219    }
220}
221
222// Resolve one generated query route onto the descriptor-owned authority table.
223fn authority_for_generated_sql_route(
224    route: &SqlStatementRoute,
225    authorities: &[EntityAuthority],
226) -> Result<EntityAuthority, QueryError> {
227    let sql_entity = route.entity();
228
229    for authority in authorities {
230        if identifiers_tail_match(sql_entity, authority.model().name()) {
231            return Ok(*authority);
232        }
233    }
234
235    Err(unsupported_generated_sql_entity_error(
236        sql_entity,
237        authorities,
238    ))
239}
240
241// Keep the generated query-surface unsupported-entity contract stable while
242// moving authority lookup out of the build-generated shim.
243fn unsupported_generated_sql_entity_error(
244    entity_name: &str,
245    authorities: &[EntityAuthority],
246) -> QueryError {
247    let mut supported = String::new();
248
249    for (index, authority) in authorities.iter().enumerate() {
250        if index != 0 {
251            supported.push_str(", ");
252        }
253
254        supported.push_str(authority.model().name());
255    }
256
257    QueryError::unsupported_query(format!(
258        "query endpoint does not support entity '{entity_name}'; supported: {supported}"
259    ))
260}
261
262// Keep typed SQL write routes on the same entity-match contract used by
263// lowered query dispatch, without widening write statements into lowering.
264fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
265where
266    E: EntityKind,
267{
268    if identifiers_tail_match(sql_entity, E::MODEL.name()) {
269        return Ok(());
270    }
271
272    Err(QueryError::from_sql_lowering_error(
273        SqlLoweringError::EntityMismatch {
274            sql_entity: sql_entity.to_string(),
275            expected_entity: E::MODEL.name(),
276        },
277    ))
278}
279
280// Normalize one reduced-SQL primary-key literal onto the concrete entity key
281// type accepted by the structural mutation entrypoint.
282fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
283where
284    E: EntityKind,
285{
286    if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
287        return Ok(key);
288    }
289
290    let widened = match value {
291        Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
292        Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
293        _ => {
294            return Err(QueryError::unsupported_query(format!(
295                "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
296            )));
297        }
298    };
299
300    <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
301        QueryError::unsupported_query(format!(
302            "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
303        ))
304    })
305}
306
307// Synthesize one generated SQL primary-key literal when the narrowed write
308// lane owns that generation contract directly instead of requiring one
309// user-authored SQL column.
310fn sql_write_generated_primary_key_value<E>() -> Option<Value>
311where
312    E: EntityKind,
313{
314    matches!(E::MODEL.primary_key.kind(), FieldKind::Ulid).then(|| Value::Ulid(Ulid::generate()))
315}
316
317// Normalize one reduced-SQL write literal onto the target entity field kind
318// when the parser's numeric literal domain is narrower than the runtime field.
319fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
320where
321    E: EntityKind,
322{
323    let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
324        QueryError::invariant("SQL write field must resolve against the target entity model")
325    })?;
326    let field_kind = E::MODEL.fields()[field_slot].kind();
327
328    let normalized = match (field_kind, value) {
329        (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
330        (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
331            Value::Int(v.cast_signed())
332        }
333        _ => value.clone(),
334    };
335
336    Ok(normalized)
337}
338
339// Mirror the derive-owned system timestamp contract on the structural SQL
340// write lane so schema-derived entities stay writable without exposing those
341// slots as required user-authored SQL columns.
342fn sql_write_system_timestamp_fields<E>() -> Option<(&'static str, &'static str)>
343where
344    E: EntityKind,
345{
346    if resolve_field_slot(E::MODEL, "created_at").is_some()
347        && resolve_field_slot(E::MODEL, "updated_at").is_some()
348    {
349        return Some(("created_at", "updated_at"));
350    }
351
352    None
353}
354
355// Resolve the effective INSERT column list for one reduced SQL write:
356// explicit column lists pass through, while omitted-column-list INSERT uses
357// canonical user-authored model field order and leaves hidden timestamp
358// synthesis on the existing write path.
359fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
360where
361    E: EntityKind,
362{
363    if !statement.columns.is_empty() {
364        return statement.columns.clone();
365    }
366
367    let timestamp_fields = sql_write_system_timestamp_fields::<E>();
368
369    let columns: Vec<String> = E::MODEL
370        .fields()
371        .iter()
372        .filter(|field| {
373            !matches!(
374                timestamp_fields,
375                Some((created_at, updated_at))
376                    if field.name() == created_at || field.name() == updated_at
377            )
378        })
379        .map(|field| field.name().to_string())
380        .collect();
381
382    let pk_name = E::MODEL.primary_key.name;
383    if sql_write_generated_primary_key_value::<E>().is_none() {
384        return columns;
385    }
386
387    let generated_key_omitted_columns: Vec<String> = columns
388        .iter()
389        .filter(|field| field.as_str() != pk_name)
390        .cloned()
391        .collect();
392    let first_width = statement.values.first().map(Vec::len);
393
394    if first_width == Some(generated_key_omitted_columns.len()) {
395        return generated_key_omitted_columns;
396    }
397
398    columns
399}
400
401// Validate one INSERT tuple list against the resolved effective column list so
402// every VALUES tuple stays full-width and deterministic.
403fn validate_sql_insert_tuple_lengths(
404    columns: &[String],
405    values: &[Vec<Value>],
406) -> Result<(), QueryError> {
407    for tuple in values {
408        if tuple.len() != columns.len() {
409            return Err(QueryError::from_sql_parse_error(
410                crate::db::sql::parser::SqlParseError::invalid_syntax(
411                    "INSERT column list and VALUES tuple length must match",
412                ),
413            ));
414        }
415    }
416
417    Ok(())
418}
419
420impl<C: CanisterKind> DbSession<C> {
421    // Project one typed SQL write after-image into one outward SQL row using
422    // the persisted model field order.
423    fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
424    where
425        E: PersistedRow<Canister = C> + EntityValue,
426    {
427        let mut row = Vec::with_capacity(E::MODEL.fields().len());
428
429        for index in 0..E::MODEL.fields().len() {
430            let value = entity.get_value_by_index(index).ok_or_else(|| {
431                QueryError::invariant(
432                    "SQL write dispatch projection row must include every declared field",
433                )
434            })?;
435            row.push(value);
436        }
437
438        Ok(row)
439    }
440
441    // Render one or more typed entities returned by SQL write dispatch as one
442    // projection payload so write statements reuse the same outward result
443    // family as row-producing SELECT and DELETE dispatch.
444    fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
445    where
446        E: PersistedRow<Canister = C> + EntityValue,
447    {
448        let columns = projection_labels_from_fields(E::MODEL.fields());
449        let rows = entities
450            .into_iter()
451            .map(Self::sql_write_dispatch_row)
452            .collect::<Result<Vec<_>, _>>()?;
453        let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
454
455        Ok(SqlDispatchResult::Projection {
456            columns,
457            rows,
458            row_count,
459        })
460    }
461
462    // Build the structural insert patch and resolved primary key expected by
463    // the shared structural mutation entrypoint.
464    fn sql_insert_patch_and_key<E>(
465        columns: &[String],
466        values: &[Value],
467    ) -> Result<(E::Key, UpdatePatch), QueryError>
468    where
469        E: PersistedRow<Canister = C> + EntityValue,
470    {
471        // Phase 1: resolve the required primary-key literal from the explicit
472        // INSERT column/value list, or synthesize one generated key when the
473        // narrowed SQL write lane owns that contract for this entity.
474        let pk_name = E::MODEL.primary_key.name;
475        let generated_pk = sql_write_generated_primary_key_value::<E>();
476        let (key, generated_pk_value) =
477            if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
478                let pk_value = values.get(pk_index).ok_or_else(|| {
479                    QueryError::invariant(
480                        "INSERT primary key column must align with one VALUES literal",
481                    )
482                })?;
483                (sql_write_key_from_literal::<E>(pk_value, pk_name)?, None)
484            } else if let Some(pk_value) = generated_pk {
485                (
486                    sql_write_key_from_literal::<E>(&pk_value, pk_name)?,
487                    Some(pk_value),
488                )
489            } else {
490                return Err(QueryError::unsupported_query(format!(
491                    "SQL INSERT requires primary key column '{pk_name}' in this release"
492                )));
493            };
494
495        // Phase 2: lower the explicit column/value pairs onto the structural
496        // patch program consumed by the shared save path.
497        let mut patch = UpdatePatch::new();
498        if let Some(pk_value) = generated_pk_value {
499            patch = patch
500                .set_field(E::MODEL, pk_name, pk_value)
501                .map_err(QueryError::execute)?;
502        }
503        for (field, value) in columns.iter().zip(values.iter()) {
504            let normalized = sql_write_value_for_field::<E>(field, value)?;
505            patch = patch
506                .set_field(E::MODEL, field, normalized)
507                .map_err(QueryError::execute)?;
508        }
509
510        // Phase 3: synthesize the derive-owned system timestamps when the
511        // target entity carries them, matching the typed write surface.
512        if let Some((created_at, updated_at)) = sql_write_system_timestamp_fields::<E>() {
513            let now = Value::Timestamp(Timestamp::now());
514            patch = patch
515                .set_field(E::MODEL, created_at, now.clone())
516                .map_err(QueryError::execute)?;
517            patch = patch
518                .set_field(E::MODEL, updated_at, now)
519                .map_err(QueryError::execute)?;
520        }
521
522        Ok((key, patch))
523    }
524
525    // Build the structural update patch shared by every row selected by one
526    // reduced SQL UPDATE statement.
527    fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
528    where
529        E: PersistedRow<Canister = C> + EntityValue,
530    {
531        // Phase 1: lower the `SET` list onto the structural patch program
532        // while keeping primary-key mutation out of the reduced SQL write lane.
533        let pk_name = E::MODEL.primary_key.name;
534        let mut patch = UpdatePatch::new();
535        for assignment in &statement.assignments {
536            if assignment.field == pk_name {
537                return Err(QueryError::unsupported_query(format!(
538                    "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
539                )));
540            }
541            let normalized =
542                sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
543
544            patch = patch
545                .set_field(E::MODEL, assignment.field.as_str(), normalized)
546                .map_err(QueryError::execute)?;
547        }
548
549        // Phase 2: keep structural SQL UPDATE aligned with the derive-owned
550        // auto-updated timestamp contract when the entity carries that field.
551        if let Some((_, updated_at)) = sql_write_system_timestamp_fields::<E>() {
552            patch = patch
553                .set_field(E::MODEL, updated_at, Value::Timestamp(Timestamp::now()))
554                .map_err(QueryError::execute)?;
555        }
556
557        Ok(patch)
558    }
559
560    // Resolve one deterministic typed selector query for reduced SQL UPDATE.
561    fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
562    where
563        E: PersistedRow<Canister = C> + EntityValue,
564    {
565        // Phase 1: keep the widened SQL UPDATE lane explicit about requiring
566        // one admitted reduced predicate instead of opening bare full-table
567        // updates implicitly.
568        let Some(predicate) = statement.predicate.clone() else {
569            return Err(QueryError::unsupported_query(
570                "SQL UPDATE requires WHERE predicate in this release",
571            ));
572        };
573        let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
574        let pk_name = E::MODEL.primary_key.name;
575        let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
576
577        // Phase 2: honor one explicit ordered update window when present, and
578        // otherwise keep the write target set deterministic on primary-key
579        // order exactly as the earlier predicate-only update lane did.
580        if statement.order_by.is_empty() {
581            selector = selector.order_by(pk_name);
582        } else {
583            let mut orders_primary_key = false;
584
585            for term in &statement.order_by {
586                if term.field == pk_name {
587                    orders_primary_key = true;
588                }
589                selector = match term.direction {
590                    SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
591                    SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
592                };
593            }
594
595            if !orders_primary_key {
596                selector = selector.order_by(pk_name);
597            }
598        }
599
600        // Phase 3: apply the bounded update window on top of the deterministic
601        // selector order before mutation replay begins.
602        if let Some(limit) = statement.limit {
603            selector = selector.limit(limit);
604        }
605        if let Some(offset) = statement.offset {
606            selector = selector.offset(offset);
607        }
608
609        Ok(selector)
610    }
611
612    // Execute one narrow SQL INSERT statement through the existing structural
613    // mutation path and project the returned after-image as one SQL row.
614    fn execute_sql_insert_dispatch<E>(
615        &self,
616        statement: &SqlInsertStatement,
617    ) -> Result<SqlDispatchResult, QueryError>
618    where
619        E: PersistedRow<Canister = C> + EntityValue,
620    {
621        ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
622        let columns = sql_insert_columns::<E>(statement);
623        validate_sql_insert_tuple_lengths(columns.as_slice(), statement.values.as_slice())?;
624        let mut entities = Vec::with_capacity(statement.values.len());
625
626        for values in &statement.values {
627            let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
628            let entity = self
629                .mutate_structural::<E>(key, patch, MutationMode::Insert)
630                .map_err(QueryError::execute)?;
631            entities.push(entity);
632        }
633
634        Self::sql_write_dispatch_projection(entities)
635    }
636
637    // Execute one reduced SQL UPDATE statement by selecting deterministic
638    // target rows first and then replaying one shared structural patch onto
639    // each matched primary key.
640    fn execute_sql_update_dispatch<E>(
641        &self,
642        statement: &SqlUpdateStatement,
643    ) -> Result<SqlDispatchResult, QueryError>
644    where
645        E: PersistedRow<Canister = C> + EntityValue,
646    {
647        ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
648        let selector = Self::sql_update_selector_query::<E>(statement)?;
649        let patch = Self::sql_update_patch::<E>(statement)?;
650        let matched = self.execute_query(&selector)?;
651        let mut entities = Vec::with_capacity(matched.len());
652
653        // Phase 1: apply the already-normalized structural patch to every
654        // matched row in deterministic primary-key order.
655        for entity in matched.entities() {
656            let updated = self
657                .mutate_structural::<E>(entity.id().key(), patch.clone(), MutationMode::Update)
658                .map_err(QueryError::execute)?;
659            entities.push(updated);
660        }
661
662        Self::sql_write_dispatch_projection(entities)
663    }
664
665    // Build the shared structural SQL projection execution inputs once so
666    // value-row and rendered-row dispatch surfaces only differ in final packaging.
667    fn prepare_structural_sql_projection_execution(
668        &self,
669        query: StructuralQuery,
670        authority: EntityAuthority,
671    ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
672        // Phase 1: build the structural access plan once and freeze its outward
673        // column contract for all projection materialization surfaces.
674        let (_, plan) =
675            self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
676        let projection = plan.projection_spec(authority.model());
677        let columns = projection_labels_from_projection_spec(&projection);
678
679        Ok((columns, plan))
680    }
681
682    // Execute one structural SQL load query and return only row-oriented SQL
683    // projection values, keeping typed projection rows out of the shared SQL
684    // query-lane path.
685    pub(in crate::db::session::sql) fn execute_structural_sql_projection(
686        &self,
687        query: StructuralQuery,
688        authority: EntityAuthority,
689    ) -> Result<SqlProjectionPayload, QueryError> {
690        // Phase 1: build the shared structural plan and outward column contract once.
691        let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
692
693        // Phase 2: execute the shared structural load path with the already
694        // derived projection semantics.
695        let projected =
696            execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
697                .map_err(QueryError::execute)?;
698        let (rows, row_count) = projected.into_parts();
699
700        Ok(SqlProjectionPayload::new(columns, rows, row_count))
701    }
702
703    // Execute one structural SQL load query and return render-ready text rows
704    // for the dispatch lane when the terminal short path can prove them
705    // directly.
706    fn execute_structural_sql_projection_text(
707        &self,
708        query: StructuralQuery,
709        authority: EntityAuthority,
710    ) -> Result<SqlDispatchResult, QueryError> {
711        // Phase 1: build the shared structural plan and outward column contract once.
712        let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
713
714        // Phase 2: execute the shared structural load path with the already
715        // derived projection semantics while preferring rendered SQL rows.
716        let projected =
717            execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
718                .map_err(QueryError::execute)?;
719        let (rows, row_count) = projected.into_parts();
720
721        Ok(SqlDispatchResult::ProjectionText {
722            columns,
723            rows,
724            row_count,
725        })
726    }
727
728    // Execute one typed SQL delete query while keeping the row payload on the
729    // typed delete executor boundary that still owns non-runtime-hook delete
730    // commit-window application.
731    fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
732    where
733        E: PersistedRow<Canister = C> + EntityValue,
734    {
735        let plan = self
736            .compile_query_with_visible_indexes(query)?
737            .into_prepared_execution_plan();
738        let deleted = self
739            .with_metrics(|| {
740                self.delete_executor::<E>()
741                    .execute_structural_projection(plan)
742            })
743            .map_err(QueryError::execute)?;
744        let (rows, row_count) = deleted.into_parts();
745        let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
746
747        Ok(SqlProjectionPayload::new(
748            projection_labels_from_fields(E::MODEL.fields()),
749            rows,
750            row_count,
751        )
752        .into_dispatch_result())
753    }
754
755    // Lower one parsed SQL query/explain route once for one resolved authority
756    // and preserve grouped-column metadata for grouped SELECT dispatch.
757    fn lowered_sql_query_dispatch_inputs_for_authority(
758        parsed: &SqlParsedStatement,
759        authority: EntityAuthority,
760        unsupported_message: &'static str,
761    ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
762        let lowered = parsed.lower_query_lane_for_entity(
763            authority.model().name(),
764            authority.model().primary_key.name,
765        )?;
766        let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
767            .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
768            .transpose()?;
769        let query = lowered
770            .into_query()
771            .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
772
773        Ok((query, projection_columns.flatten()))
774    }
775
776    // Execute one parsed SQL query route through the shared aggregate,
777    // computed-projection, and lowered query lane so typed and generated
778    // dispatch only differ at the final SELECT/DELETE packaging boundary.
779    fn dispatch_sql_query_route_for_authority(
780        &self,
781        parsed: &SqlParsedStatement,
782        authority: EntityAuthority,
783        unsupported_message: &'static str,
784        dispatch_select: impl FnOnce(
785            &Self,
786            LoweredSelectShape,
787            EntityAuthority,
788            bool,
789            Option<Vec<String>>,
790        ) -> Result<SqlDispatchResult, QueryError>,
791        dispatch_delete: impl FnOnce(
792            &Self,
793            LoweredBaseQueryShape,
794            EntityAuthority,
795        ) -> Result<SqlDispatchResult, QueryError>,
796    ) -> Result<SqlDispatchResult, QueryError> {
797        // Phase 1: keep aggregate and computed projection classification on the
798        // shared parsed route so both dispatch surfaces honor the same lane split.
799        if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
800            let command =
801                Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
802
803            return self.execute_sql_aggregate_dispatch_for_authority(
804                command,
805                authority,
806                sql_aggregate_dispatch_label_override(&parsed.statement),
807            );
808        }
809
810        if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
811            return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
812        }
813
814        // Phase 2: lower the remaining query route once, then let the caller
815        // decide only the final outward result packaging.
816        let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
817            parsed,
818            authority,
819            unsupported_message,
820        )?;
821        let grouped_surface = query.has_grouping();
822
823        match query {
824            LoweredSqlQuery::Select(select) => {
825                dispatch_select(self, select, authority, grouped_surface, projection_columns)
826            }
827            LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
828        }
829    }
830
831    // Execute one parsed SQL EXPLAIN route through the shared computed-
832    // projection and lowered explain lanes so typed and generated dispatch do
833    // not duplicate the same explain classification tree.
834    fn dispatch_sql_explain_route_for_authority(
835        &self,
836        parsed: &SqlParsedStatement,
837        authority: EntityAuthority,
838    ) -> Result<SqlDispatchResult, QueryError> {
839        // Phase 1: keep computed-projection explain ownership on the same
840        // parsed route boundary as the shared query lane.
841        if let Some((mode, plan)) =
842            computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
843        {
844            return self
845                .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
846                .map(SqlDispatchResult::Explain);
847        }
848
849        // Phase 2: lower once for execution/logical explain and preserve the
850        // shared execution-first fallback policy across both callers.
851        let lowered = parsed.lower_query_lane_for_entity(
852            authority.model().name(),
853            authority.model().primary_key.name,
854        )?;
855        if let Some(explain) =
856            self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
857        {
858            return Ok(SqlDispatchResult::Explain(explain));
859        }
860
861        self.explain_lowered_sql_for_authority(&lowered, authority)
862            .map(SqlDispatchResult::Explain)
863    }
864
865    // Validate that one SQL-derived query intent matches the grouped/scalar
866    // execution surface that is about to consume it.
867    pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
868        query: &Query<E>,
869        surface: SqlGroupingSurface,
870    ) -> Result<(), QueryError>
871    where
872        E: EntityKind,
873    {
874        match (surface, query.has_grouping()) {
875            (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
876            (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
877                QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
878            ),
879        }
880    }
881
882    /// Execute one reduced SQL statement into one unified SQL dispatch payload.
883    pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
884    where
885        E: PersistedRow<Canister = C> + EntityValue,
886    {
887        let parsed = self.parse_sql_statement(sql)?;
888
889        self.execute_sql_dispatch_parsed::<E>(&parsed)
890    }
891
892    /// Execute one parsed reduced SQL statement into one unified SQL payload.
893    pub fn execute_sql_dispatch_parsed<E>(
894        &self,
895        parsed: &SqlParsedStatement,
896    ) -> Result<SqlDispatchResult, QueryError>
897    where
898        E: PersistedRow<Canister = C> + EntityValue,
899    {
900        match parsed.route() {
901            SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
902                parsed,
903                EntityAuthority::for_type::<E>(),
904                "execute_sql_dispatch accepts SELECT or DELETE only",
905                |session, select, authority, grouped_surface, projection_columns| {
906                    if grouped_surface {
907                        let columns = projection_columns.ok_or_else(|| {
908                            QueryError::unsupported_query(
909                                "grouped SQL dispatch requires explicit grouped projection items",
910                            )
911                        })?;
912
913                        return session.execute_lowered_sql_grouped_dispatch_select_core(
914                            select, authority, columns,
915                        );
916                    }
917
918                    let payload = session.execute_lowered_sql_projection_core(select, authority)?;
919                    if let Some(columns) = projection_columns {
920                        let (_, rows, row_count) = payload.into_parts();
921
922                        return Ok(SqlProjectionPayload::new(columns, rows, row_count)
923                            .into_dispatch_result());
924                    }
925
926                    Ok(payload.into_dispatch_result())
927                },
928                |session, delete, _authority| {
929                    let typed_query = bind_lowered_sql_query::<E>(
930                        LoweredSqlQuery::Delete(delete),
931                        MissingRowPolicy::Ignore,
932                    )
933                    .map_err(QueryError::from_sql_lowering_error)?;
934
935                    session.execute_typed_sql_delete(&typed_query)
936                },
937            ),
938            SqlStatementRoute::Insert { .. } => {
939                let SqlStatement::Insert(statement) = &parsed.statement else {
940                    return Err(QueryError::invariant(
941                        "INSERT SQL route must carry parsed INSERT statement",
942                    ));
943                };
944
945                self.execute_sql_insert_dispatch::<E>(statement)
946            }
947            SqlStatementRoute::Update { .. } => {
948                let SqlStatement::Update(statement) = &parsed.statement else {
949                    return Err(QueryError::invariant(
950                        "UPDATE SQL route must carry parsed UPDATE statement",
951                    ));
952                };
953
954                self.execute_sql_update_dispatch::<E>(statement)
955            }
956            SqlStatementRoute::Explain { .. } => self
957                .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
958            SqlStatementRoute::Describe { .. } => {
959                Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
960            }
961            SqlStatementRoute::ShowIndexes { .. } => {
962                Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
963            }
964            SqlStatementRoute::ShowColumns { .. } => {
965                Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
966            }
967            SqlStatementRoute::ShowEntities => {
968                Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
969            }
970        }
971    }
972
973    /// Execute one parsed reduced SQL statement through the generated canister
974    /// query/explain surface for one already-resolved dynamic authority.
975    ///
976    /// This keeps the canister SQL facade on the same reduced SQL ownership
977    /// boundary as typed dispatch without forcing the outer facade to reopen
978    /// typed-generic routing just to preserve parity for computed projections.
979    #[doc(hidden)]
980    pub fn execute_generated_query_surface_dispatch_for_authority(
981        &self,
982        parsed: &SqlParsedStatement,
983        authority: EntityAuthority,
984    ) -> Result<SqlDispatchResult, QueryError> {
985        match parsed.route() {
986            SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
987                parsed,
988                authority,
989                "generated SQL query surface requires query or EXPLAIN statement lanes",
990                |session, select, authority, grouped_surface, projection_columns| {
991                    if grouped_surface {
992                        let columns = projection_columns.ok_or_else(|| {
993                            QueryError::unsupported_query(
994                                "grouped SQL dispatch requires explicit grouped projection items",
995                            )
996                        })?;
997
998                        return session
999                            .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
1000                    }
1001
1002                    let result =
1003                        session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
1004                    if let Some(columns) = projection_columns {
1005                        let SqlDispatchResult::ProjectionText {
1006                            rows, row_count, ..
1007                        } = result
1008                        else {
1009                            return Err(QueryError::invariant(
1010                                "generated scalar SQL dispatch text path must emit projection text rows",
1011                            ));
1012                        };
1013
1014                        return Ok(SqlDispatchResult::ProjectionText {
1015                            columns,
1016                            rows,
1017                            row_count,
1018                        });
1019                    }
1020
1021                    Ok(result)
1022                },
1023                |session, delete, authority| {
1024                    session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
1025                },
1026            ),
1027            SqlStatementRoute::Explain { .. } => {
1028                self.dispatch_sql_explain_route_for_authority(parsed, authority)
1029            }
1030            SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
1031            | SqlStatementRoute::Describe { .. }
1032            | SqlStatementRoute::ShowIndexes { .. }
1033            | SqlStatementRoute::ShowColumns { .. }
1034            | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
1035                "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1036            )),
1037        }
1038    }
1039
1040    /// Execute one raw SQL string through the generated canister query surface.
1041    ///
1042    /// This hidden helper keeps parse, route, authority, and metadata/query
1043    /// dispatch ownership in core so the build-generated `sql_dispatch` shim
1044    /// stays close to a pure descriptor table plus public ABI wrapper.
1045    #[doc(hidden)]
1046    #[must_use]
1047    pub fn execute_generated_query_surface_sql(
1048        &self,
1049        sql: &str,
1050        authorities: &[EntityAuthority],
1051    ) -> GeneratedSqlDispatchAttempt {
1052        // Phase 1: normalize and parse once so every generated route family
1053        // shares the same SQL ownership boundary.
1054        let sql_trimmed = match trim_generated_query_sql_input(sql) {
1055            Ok(sql_trimmed) => sql_trimmed,
1056            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1057        };
1058        let parsed = match self.parse_sql_statement(sql_trimmed) {
1059            Ok(parsed) => parsed,
1060            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1061        };
1062
1063        // Phase 2: keep SHOW ENTITIES descriptor-owned and resolve all other
1064        // generated routes against the emitted authority table exactly once.
1065        if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
1066            return GeneratedSqlDispatchAttempt::new(
1067                "",
1068                None,
1069                Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
1070                    authorities,
1071                ))),
1072            );
1073        }
1074        let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1075            Ok(authority) => authority,
1076            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1077        };
1078
1079        // Phase 3: dispatch the resolved route through the existing query,
1080        // explain, and metadata helpers without rebuilding route ownership in
1081        // the generated build output.
1082        let entity_name = authority.model().name();
1083        let explain_order_field = parsed
1084            .route()
1085            .is_explain()
1086            .then_some(authority.model().primary_key.name);
1087        let result = match parsed.route() {
1088            SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1089                self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1090            }
1091            SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1092                Err(QueryError::unsupported_query(
1093                    "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1094                ))
1095            }
1096            SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1097                self.describe_entity_model(authority.model()),
1098            )),
1099            SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1100                self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1101            )),
1102            SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1103                self.show_columns_for_model(authority.model()),
1104            )),
1105            SqlStatementRoute::ShowEntities => unreachable!(
1106                "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1107            ),
1108        };
1109
1110        GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1111    }
1112}