Skip to main content

icydb_core/db/session/sql/dispatch/
mod.rs

1//! Module: db::session::sql::dispatch
2//! Responsibility: session-owned SQL dispatch entrypoints that bind lowered SQL
3//! commands onto structural planning, execution, and outward result shaping.
4//! Does not own: SQL parsing or executor runtime internals.
5//! Boundary: centralizes authority-aware SQL dispatch classification and result packaging.
6
7mod computed;
8mod lowered;
9
10use crate::{
11    db::{
12        DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
13        data::UpdatePatch,
14        executor::{EntityAuthority, MutationMode},
15        identifiers_tail_match,
16        query::{intent::StructuralQuery, plan::AccessPlannedQuery},
17        session::sql::{
18            SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
19            aggregate::parsed_requires_dedicated_sql_aggregate_lane,
20            computed_projection,
21            projection::{
22                SqlProjectionPayload, execute_sql_projection_rows_for_canister,
23                execute_sql_projection_text_rows_for_canister, projection_labels_from_fields,
24                projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25            },
26        },
27        sql::lowering::{
28            LoweredBaseQueryShape, LoweredSelectShape, LoweredSqlQuery, SqlLoweringError,
29            bind_lowered_sql_query, canonicalize_sql_predicate_for_model,
30            lower_sql_command_from_prepared_statement, prepare_sql_statement,
31        },
32        sql::parser::{
33            SqlAggregateCall, SqlAggregateKind, SqlInsertSource, SqlInsertStatement,
34            SqlOrderDirection, SqlOrderTerm, SqlProjection, SqlSelectItem, SqlSelectStatement,
35            SqlStatement, SqlTextFunction, SqlUpdateStatement,
36        },
37    },
38    model::{entity::resolve_field_slot, field::FieldKind},
39    traits::{CanisterKind, EntityKind, EntityValue},
40    types::{Timestamp, Ulid},
41    value::Value,
42};
43
44#[cfg(feature = "perf-attribution")]
45pub use lowered::LoweredSqlDispatchExecutorAttribution;
46
47///
48/// GeneratedSqlDispatchAttempt
49///
50/// Hidden generated-query dispatch envelope used by the facade helper to keep
51/// generated route ownership in core while preserving the public EXPLAIN error
52/// rewrite contract at the outer boundary.
53///
54
55#[doc(hidden)]
56pub struct GeneratedSqlDispatchAttempt {
57    entity_name: &'static str,
58    explain_order_field: Option<&'static str>,
59    result: Result<SqlDispatchResult, QueryError>,
60}
61
62impl GeneratedSqlDispatchAttempt {
63    // Build one generated-query dispatch attempt with optional explain-hint context.
64    const fn new(
65        entity_name: &'static str,
66        explain_order_field: Option<&'static str>,
67        result: Result<SqlDispatchResult, QueryError>,
68    ) -> Self {
69        Self {
70            entity_name,
71            explain_order_field,
72            result,
73        }
74    }
75
76    /// Borrow the resolved entity name for this generated-query attempt.
77    #[must_use]
78    pub const fn entity_name(&self) -> &'static str {
79        self.entity_name
80    }
81
82    /// Borrow the suggested deterministic order field for EXPLAIN rewrites.
83    #[must_use]
84    pub const fn explain_order_field(&self) -> Option<&'static str> {
85        self.explain_order_field
86    }
87
88    /// Consume and return the generated-query dispatch result.
89    pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
90        self.result
91    }
92}
93
94#[derive(Clone, Copy, Debug, Eq, PartialEq)]
95pub(in crate::db::session::sql) enum SqlGroupingSurface {
96    Scalar,
97    Grouped,
98}
99
100const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
101    match surface {
102        SqlGroupingSurface::Scalar => {
103            "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
104        }
105        SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
106    }
107}
108
109// Enforce the generated canister query contract that empty SQL is unsupported
110// before any parser/lowering work occurs.
111fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
112    let sql_trimmed = sql.trim();
113    if sql_trimmed.is_empty() {
114        return Err(QueryError::unsupported_query(
115            "query endpoint requires a non-empty SQL string",
116        ));
117    }
118
119    Ok(sql_trimmed)
120}
121
122// Render the generated-surface entity list from the descriptor table instead
123// of assuming every session-visible entity belongs on the public query export.
124fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
125    let mut entities = Vec::with_capacity(authorities.len());
126
127    for authority in authorities {
128        entities.push(authority.model().name().to_string());
129    }
130
131    entities
132}
133
134// Project parsed SELECT items into one stable outward column contract while
135// allowing parser-owned aliases to override only the final session label.
136fn sql_projection_labels_from_select_statement(
137    statement: &SqlStatement,
138) -> Result<Option<Vec<String>>, QueryError> {
139    let SqlStatement::Select(select) = statement else {
140        return Err(QueryError::invariant(
141            "SQL projection labels require SELECT statement shape",
142        ));
143    };
144    let SqlProjection::Items(items) = &select.projection else {
145        return Ok(None);
146    };
147
148    Ok(Some(
149        items
150            .iter()
151            .enumerate()
152            .map(|(index, item)| {
153                select
154                    .projection_alias(index)
155                    .map_or_else(|| grouped_sql_projection_item_label(item), str::to_string)
156            })
157            .collect(),
158    ))
159}
160
161// Render one grouped SELECT item into the public grouped-column label used by
162// unified dispatch results.
163fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
164    match item {
165        SqlSelectItem::Field(field) => field.clone(),
166        SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
167        SqlSelectItem::TextFunction(call) => {
168            format!(
169                "{}({})",
170                grouped_sql_text_function_name(call.function),
171                call.field
172            )
173        }
174    }
175}
176
177// Keep the dedicated SQL aggregate lane on parser-owned outward labels
178// without reopening alias semantics in lowering or runtime strategy state.
179fn sql_aggregate_dispatch_label_override(statement: &SqlStatement) -> Option<String> {
180    let SqlStatement::Select(select) = statement else {
181        return None;
182    };
183
184    select.projection_alias(0).map(str::to_string)
185}
186
187// Render one aggregate call into one canonical SQL-style label.
188fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
189    let kind = match aggregate.kind {
190        SqlAggregateKind::Count => "COUNT",
191        SqlAggregateKind::Sum => "SUM",
192        SqlAggregateKind::Avg => "AVG",
193        SqlAggregateKind::Min => "MIN",
194        SqlAggregateKind::Max => "MAX",
195    };
196
197    match aggregate.field.as_deref() {
198        Some(field) => format!("{kind}({field})"),
199        None => format!("{kind}(*)"),
200    }
201}
202
203// Render one reduced SQL text-function identifier into one stable uppercase
204// SQL label for outward column metadata.
205const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
206    match function {
207        SqlTextFunction::Trim => "TRIM",
208        SqlTextFunction::Ltrim => "LTRIM",
209        SqlTextFunction::Rtrim => "RTRIM",
210        SqlTextFunction::Lower => "LOWER",
211        SqlTextFunction::Upper => "UPPER",
212        SqlTextFunction::Length => "LENGTH",
213        SqlTextFunction::Left => "LEFT",
214        SqlTextFunction::Right => "RIGHT",
215        SqlTextFunction::StartsWith => "STARTS_WITH",
216        SqlTextFunction::EndsWith => "ENDS_WITH",
217        SqlTextFunction::Contains => "CONTAINS",
218        SqlTextFunction::Position => "POSITION",
219        SqlTextFunction::Replace => "REPLACE",
220        SqlTextFunction::Substring => "SUBSTRING",
221    }
222}
223
224// Resolve one generated query route onto the descriptor-owned authority table.
225fn authority_for_generated_sql_route(
226    route: &SqlStatementRoute,
227    authorities: &[EntityAuthority],
228) -> Result<EntityAuthority, QueryError> {
229    let sql_entity = route.entity();
230
231    for authority in authorities {
232        if identifiers_tail_match(sql_entity, authority.model().name()) {
233            return Ok(*authority);
234        }
235    }
236
237    Err(unsupported_generated_sql_entity_error(
238        sql_entity,
239        authorities,
240    ))
241}
242
243// Keep the generated query-surface unsupported-entity contract stable while
244// moving authority lookup out of the build-generated shim.
245fn unsupported_generated_sql_entity_error(
246    entity_name: &str,
247    authorities: &[EntityAuthority],
248) -> QueryError {
249    let mut supported = String::new();
250
251    for (index, authority) in authorities.iter().enumerate() {
252        if index != 0 {
253            supported.push_str(", ");
254        }
255
256        supported.push_str(authority.model().name());
257    }
258
259    QueryError::unsupported_query(format!(
260        "query endpoint does not support entity '{entity_name}'; supported: {supported}"
261    ))
262}
263
264// Keep typed SQL write routes on the same entity-match contract used by
265// lowered query dispatch, without widening write statements into lowering.
266fn ensure_sql_write_entity_matches<E>(sql_entity: &str) -> Result<(), QueryError>
267where
268    E: EntityKind,
269{
270    if identifiers_tail_match(sql_entity, E::MODEL.name()) {
271        return Ok(());
272    }
273
274    Err(QueryError::from_sql_lowering_error(
275        SqlLoweringError::EntityMismatch {
276            sql_entity: sql_entity.to_string(),
277            expected_entity: E::MODEL.name(),
278        },
279    ))
280}
281
282// Normalize one reduced-SQL primary-key literal onto the concrete entity key
283// type accepted by the structural mutation entrypoint.
284fn sql_write_key_from_literal<E>(value: &Value, pk_name: &str) -> Result<E::Key, QueryError>
285where
286    E: EntityKind,
287{
288    if let Some(key) = <E::Key as crate::traits::FieldValue>::from_value(value) {
289        return Ok(key);
290    }
291
292    let widened = match value {
293        Value::Int(v) if *v >= 0 => Value::Uint(v.cast_unsigned()),
294        Value::Uint(v) if i64::try_from(*v).is_ok() => Value::Int(v.cast_signed()),
295        _ => {
296            return Err(QueryError::unsupported_query(format!(
297                "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
298            )));
299        }
300    };
301
302    <E::Key as crate::traits::FieldValue>::from_value(&widened).ok_or_else(|| {
303        QueryError::unsupported_query(format!(
304            "SQL write primary key literal for '{pk_name}' is not compatible with entity key type"
305        ))
306    })
307}
308
309// Synthesize one generated SQL primary-key literal when the narrowed write
310// lane owns that generation contract directly instead of requiring one
311// user-authored SQL column.
312fn sql_write_generated_primary_key_value<E>() -> Option<Value>
313where
314    E: EntityKind,
315{
316    matches!(E::MODEL.primary_key.kind(), FieldKind::Ulid).then(|| Value::Ulid(Ulid::generate()))
317}
318
319// Normalize one reduced-SQL write literal onto the target entity field kind
320// when the parser's numeric literal domain is narrower than the runtime field.
321fn sql_write_value_for_field<E>(field_name: &str, value: &Value) -> Result<Value, QueryError>
322where
323    E: EntityKind,
324{
325    let field_slot = resolve_field_slot(E::MODEL, field_name).ok_or_else(|| {
326        QueryError::invariant("SQL write field must resolve against the target entity model")
327    })?;
328    let field_kind = E::MODEL.fields()[field_slot].kind();
329
330    let normalized = match (field_kind, value) {
331        (FieldKind::Uint, Value::Int(v)) if *v >= 0 => Value::Uint(v.cast_unsigned()),
332        (FieldKind::Int, Value::Uint(v)) if i64::try_from(*v).is_ok() => {
333            Value::Int(v.cast_signed())
334        }
335        _ => value.clone(),
336    };
337
338    Ok(normalized)
339}
340
341// Mirror the derive-owned system timestamp contract on the structural SQL
342// write lane so schema-derived entities stay writable without exposing those
343// slots as required user-authored SQL columns.
344fn sql_write_system_timestamp_fields<E>() -> Option<(&'static str, &'static str)>
345where
346    E: EntityKind,
347{
348    if resolve_field_slot(E::MODEL, "created_at").is_some()
349        && resolve_field_slot(E::MODEL, "updated_at").is_some()
350    {
351        return Some(("created_at", "updated_at"));
352    }
353
354    None
355}
356
357// Resolve the effective INSERT column list for one reduced SQL write:
358// explicit column lists pass through, while omitted-column-list INSERT uses
359// canonical user-authored model field order and leaves hidden timestamp
360// synthesis on the existing write path.
361fn sql_insert_source_width_hint<E>(source: &SqlInsertSource) -> Option<usize>
362where
363    E: EntityKind,
364{
365    match source {
366        SqlInsertSource::Values(values) => values.first().map(Vec::len),
367        SqlInsertSource::Select(select) => match &select.projection {
368            SqlProjection::All => Some(
369                E::MODEL
370                    .fields()
371                    .iter()
372                    .filter(|field| {
373                        !matches!(
374                            sql_write_system_timestamp_fields::<E>(),
375                            Some((created_at, updated_at))
376                                if field.name() == created_at || field.name() == updated_at
377                        )
378                    })
379                    .count(),
380            ),
381            SqlProjection::Items(items) => Some(items.len()),
382        },
383    }
384}
385
386// Resolve the effective INSERT column list for one reduced SQL write:
387// explicit column lists pass through, while omitted-column-list INSERT uses
388// canonical user-authored model field order and leaves hidden timestamp
389// synthesis on the existing write path.
390fn sql_insert_columns<E>(statement: &SqlInsertStatement) -> Vec<String>
391where
392    E: EntityKind,
393{
394    if !statement.columns.is_empty() {
395        return statement.columns.clone();
396    }
397
398    let timestamp_fields = sql_write_system_timestamp_fields::<E>();
399
400    let columns: Vec<String> = E::MODEL
401        .fields()
402        .iter()
403        .filter(|field| {
404            !matches!(
405                timestamp_fields,
406                Some((created_at, updated_at))
407                    if field.name() == created_at || field.name() == updated_at
408            )
409        })
410        .map(|field| field.name().to_string())
411        .collect();
412
413    let pk_name = E::MODEL.primary_key.name;
414    if sql_write_generated_primary_key_value::<E>().is_none() {
415        return columns;
416    }
417
418    let generated_key_omitted_columns: Vec<String> = columns
419        .iter()
420        .filter(|field| field.as_str() != pk_name)
421        .cloned()
422        .collect();
423    let first_width = sql_insert_source_width_hint::<E>(&statement.source);
424
425    if first_width == Some(generated_key_omitted_columns.len()) {
426        return generated_key_omitted_columns;
427    }
428
429    columns
430}
431
432// Validate one INSERT tuple list against the resolved effective column list so
433// every VALUES tuple stays full-width and deterministic.
434fn validate_sql_insert_value_tuple_lengths(
435    columns: &[String],
436    values: &[Vec<Value>],
437) -> Result<(), QueryError> {
438    for tuple in values {
439        if tuple.len() != columns.len() {
440            return Err(QueryError::from_sql_parse_error(
441                crate::db::sql::parser::SqlParseError::invalid_syntax(
442                    "INSERT column list and VALUES tuple length must match",
443                ),
444            ));
445        }
446    }
447
448    Ok(())
449}
450
451// Validate one projected `INSERT ... SELECT` row set against the resolved
452// effective column list so replayed structural inserts stay deterministic.
453fn validate_sql_insert_selected_rows(
454    columns: &[String],
455    rows: &[Vec<Value>],
456) -> Result<(), QueryError> {
457    for row in rows {
458        if row.len() != columns.len() {
459            return Err(QueryError::unsupported_query(
460                "SQL INSERT SELECT projection width must match the target INSERT column list in this release",
461            ));
462        }
463    }
464
465    Ok(())
466}
467
468impl<C: CanisterKind> DbSession<C> {
469    // Project one typed SQL write after-image into one outward SQL row using
470    // the persisted model field order.
471    fn sql_write_dispatch_row<E>(entity: E) -> Result<Vec<Value>, QueryError>
472    where
473        E: PersistedRow<Canister = C> + EntityValue,
474    {
475        let mut row = Vec::with_capacity(E::MODEL.fields().len());
476
477        for index in 0..E::MODEL.fields().len() {
478            let value = entity.get_value_by_index(index).ok_or_else(|| {
479                QueryError::invariant(
480                    "SQL write dispatch projection row must include every declared field",
481                )
482            })?;
483            row.push(value);
484        }
485
486        Ok(row)
487    }
488
489    // Render one or more typed entities returned by SQL write dispatch as one
490    // projection payload so write statements reuse the same outward result
491    // family as row-producing SELECT and DELETE dispatch.
492    fn sql_write_dispatch_projection<E>(entities: Vec<E>) -> Result<SqlDispatchResult, QueryError>
493    where
494        E: PersistedRow<Canister = C> + EntityValue,
495    {
496        let columns = projection_labels_from_fields(E::MODEL.fields());
497        let rows = entities
498            .into_iter()
499            .map(Self::sql_write_dispatch_row)
500            .collect::<Result<Vec<_>, _>>()?;
501        let row_count = u32::try_from(rows.len()).unwrap_or(u32::MAX);
502
503        Ok(SqlDispatchResult::Projection {
504            columns,
505            rows,
506            row_count,
507        })
508    }
509
510    // Build the structural insert patch and resolved primary key expected by
511    // the shared structural mutation entrypoint.
512    fn sql_insert_patch_and_key<E>(
513        columns: &[String],
514        values: &[Value],
515    ) -> Result<(E::Key, UpdatePatch), QueryError>
516    where
517        E: PersistedRow<Canister = C> + EntityValue,
518    {
519        // Phase 1: resolve the required primary-key literal from the explicit
520        // INSERT column/value list, or synthesize one generated key when the
521        // narrowed SQL write lane owns that contract for this entity.
522        let pk_name = E::MODEL.primary_key.name;
523        let generated_pk = sql_write_generated_primary_key_value::<E>();
524        let (key, generated_pk_value) =
525            if let Some(pk_index) = columns.iter().position(|field| field == pk_name) {
526                let pk_value = values.get(pk_index).ok_or_else(|| {
527                    QueryError::invariant(
528                        "INSERT primary key column must align with one VALUES literal",
529                    )
530                })?;
531                (sql_write_key_from_literal::<E>(pk_value, pk_name)?, None)
532            } else if let Some(pk_value) = generated_pk {
533                (
534                    sql_write_key_from_literal::<E>(&pk_value, pk_name)?,
535                    Some(pk_value),
536                )
537            } else {
538                return Err(QueryError::unsupported_query(format!(
539                    "SQL INSERT requires primary key column '{pk_name}' in this release"
540                )));
541            };
542
543        // Phase 2: lower the explicit column/value pairs onto the structural
544        // patch program consumed by the shared save path.
545        let mut patch = UpdatePatch::new();
546        if let Some(pk_value) = generated_pk_value {
547            patch = patch
548                .set_field(E::MODEL, pk_name, pk_value)
549                .map_err(QueryError::execute)?;
550        }
551        for (field, value) in columns.iter().zip(values.iter()) {
552            let normalized = sql_write_value_for_field::<E>(field, value)?;
553            patch = patch
554                .set_field(E::MODEL, field, normalized)
555                .map_err(QueryError::execute)?;
556        }
557
558        // Phase 3: synthesize the derive-owned system timestamps when the
559        // target entity carries them, matching the typed write surface.
560        if let Some((created_at, updated_at)) = sql_write_system_timestamp_fields::<E>() {
561            let now = Value::Timestamp(Timestamp::now());
562            patch = patch
563                .set_field(E::MODEL, created_at, now.clone())
564                .map_err(QueryError::execute)?;
565            patch = patch
566                .set_field(E::MODEL, updated_at, now)
567                .map_err(QueryError::execute)?;
568        }
569
570        Ok((key, patch))
571    }
572
573    // Build the structural update patch shared by every row selected by one
574    // reduced SQL UPDATE statement.
575    fn sql_update_patch<E>(statement: &SqlUpdateStatement) -> Result<UpdatePatch, QueryError>
576    where
577        E: PersistedRow<Canister = C> + EntityValue,
578    {
579        // Phase 1: lower the `SET` list onto the structural patch program
580        // while keeping primary-key mutation out of the reduced SQL write lane.
581        let pk_name = E::MODEL.primary_key.name;
582        let mut patch = UpdatePatch::new();
583        for assignment in &statement.assignments {
584            if assignment.field == pk_name {
585                return Err(QueryError::unsupported_query(format!(
586                    "SQL UPDATE does not allow primary key mutation for '{pk_name}' in this release"
587                )));
588            }
589            let normalized =
590                sql_write_value_for_field::<E>(assignment.field.as_str(), &assignment.value)?;
591
592            patch = patch
593                .set_field(E::MODEL, assignment.field.as_str(), normalized)
594                .map_err(QueryError::execute)?;
595        }
596
597        // Phase 2: keep structural SQL UPDATE aligned with the derive-owned
598        // auto-updated timestamp contract when the entity carries that field.
599        if let Some((_, updated_at)) = sql_write_system_timestamp_fields::<E>() {
600            patch = patch
601                .set_field(E::MODEL, updated_at, Value::Timestamp(Timestamp::now()))
602                .map_err(QueryError::execute)?;
603        }
604
605        Ok(patch)
606    }
607
608    // Resolve one deterministic typed selector query for reduced SQL UPDATE.
609    fn sql_update_selector_query<E>(statement: &SqlUpdateStatement) -> Result<Query<E>, QueryError>
610    where
611        E: PersistedRow<Canister = C> + EntityValue,
612    {
613        // Phase 1: keep the widened SQL UPDATE lane explicit about requiring
614        // one admitted reduced predicate instead of opening bare full-table
615        // updates implicitly.
616        let Some(predicate) = statement.predicate.clone() else {
617            return Err(QueryError::unsupported_query(
618                "SQL UPDATE requires WHERE predicate in this release",
619            ));
620        };
621        let predicate = canonicalize_sql_predicate_for_model(E::MODEL, predicate);
622        let pk_name = E::MODEL.primary_key.name;
623        let mut selector = Query::<E>::new(MissingRowPolicy::Ignore).filter(predicate);
624
625        // Phase 2: honor one explicit ordered update window when present, and
626        // otherwise keep the write target set deterministic on primary-key
627        // order exactly as the earlier predicate-only update lane did.
628        if statement.order_by.is_empty() {
629            selector = selector.order_by(pk_name);
630        } else {
631            let mut orders_primary_key = false;
632
633            for term in &statement.order_by {
634                if term.field == pk_name {
635                    orders_primary_key = true;
636                }
637                selector = match term.direction {
638                    SqlOrderDirection::Asc => selector.order_by(term.field.as_str()),
639                    SqlOrderDirection::Desc => selector.order_by_desc(term.field.as_str()),
640                };
641            }
642
643            if !orders_primary_key {
644                selector = selector.order_by(pk_name);
645            }
646        }
647
648        // Phase 3: apply the bounded update window on top of the deterministic
649        // selector order before mutation replay begins.
650        if let Some(limit) = statement.limit {
651            selector = selector.limit(limit);
652        }
653        if let Some(offset) = statement.offset {
654            selector = selector.offset(offset);
655        }
656
657        Ok(selector)
658    }
659
660    // Validate and normalize the admitted `INSERT ... SELECT` source shape
661    // without widening the write lane into grouped, aggregate, or computed
662    // projection ownership.
663    fn sql_insert_select_source_statement<E>(
664        statement: &SqlInsertStatement,
665    ) -> Result<SqlSelectStatement, QueryError>
666    where
667        E: PersistedRow<Canister = C> + EntityValue,
668    {
669        let SqlInsertSource::Select(select) = statement.source.clone() else {
670            return Err(QueryError::invariant(
671                "INSERT SELECT source validation requires parsed SELECT source",
672            ));
673        };
674        let mut select = *select;
675        ensure_sql_write_entity_matches::<E>(select.entity.as_str())?;
676
677        if !select.group_by.is_empty() || !select.having.is_empty() {
678            return Err(QueryError::unsupported_query(
679                "SQL INSERT SELECT requires scalar SELECT source in this release",
680            ));
681        }
682
683        if let SqlProjection::Items(items) = &select.projection {
684            for item in items {
685                if matches!(item, SqlSelectItem::Aggregate(_)) {
686                    return Err(QueryError::unsupported_query(
687                        "SQL INSERT SELECT does not support aggregate source projection in this release",
688                    ));
689                }
690            }
691        }
692
693        let pk_name = E::MODEL.primary_key.name;
694        if select.order_by.is_empty() || !select.order_by.iter().any(|term| term.field == pk_name) {
695            select.order_by.push(SqlOrderTerm {
696                field: pk_name.to_string(),
697                direction: SqlOrderDirection::Asc,
698            });
699        }
700
701        Ok(select)
702    }
703
704    // Execute one admitted `INSERT ... SELECT` source query through the
705    // existing scalar SQL projection lane and return the projected value rows
706    // that will later feed the ordinary structural insert replay.
707    fn execute_sql_insert_select_source_rows<E>(
708        &self,
709        source: &SqlSelectStatement,
710    ) -> Result<Vec<Vec<Value>>, QueryError>
711    where
712        E: PersistedRow<Canister = C> + EntityValue,
713    {
714        // Phase 1: reuse the already-shipped scalar computed-projection lane
715        // when the source SELECT widens beyond plain fields but still fits the
716        // admitted session-owned text projection contract.
717        if let Some(plan) = computed_projection::computed_sql_projection_plan(
718            &SqlStatement::Select(source.clone()),
719        )? {
720            let result = self.execute_computed_sql_projection_dispatch_for_authority(
721                plan,
722                EntityAuthority::for_type::<E>(),
723            )?;
724
725            return match result {
726                SqlDispatchResult::Projection { rows, .. } => Ok(rows),
727                other => Err(QueryError::invariant(format!(
728                    "INSERT SELECT computed source must produce projection rows, found {other:?}",
729                ))),
730            };
731        }
732
733        // Phase 2: keep the ordinary field-only source path on the shared
734        // scalar SQL projection lane.
735        let prepared = prepare_sql_statement(SqlStatement::Select(source.clone()), E::MODEL.name())
736            .map_err(QueryError::from_sql_lowering_error)?;
737        let lowered =
738            lower_sql_command_from_prepared_statement(prepared, E::MODEL.primary_key.name)
739                .map_err(QueryError::from_sql_lowering_error)?;
740        let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
741            return Err(QueryError::invariant(
742                "INSERT SELECT source lowering must stay on the scalar SELECT query lane",
743            ));
744        };
745
746        let payload =
747            self.execute_lowered_sql_projection_core(select, EntityAuthority::for_type::<E>())?;
748        let (_, rows, _) = payload.into_parts();
749
750        Ok(rows)
751    }
752
753    // Execute one narrow SQL INSERT statement through the existing structural
754    // mutation path and project the returned after-image as one SQL row.
755    fn execute_sql_insert_dispatch<E>(
756        &self,
757        statement: &SqlInsertStatement,
758    ) -> Result<SqlDispatchResult, QueryError>
759    where
760        E: PersistedRow<Canister = C> + EntityValue,
761    {
762        ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
763        let columns = sql_insert_columns::<E>(statement);
764        let source_rows = match &statement.source {
765            SqlInsertSource::Values(values) => {
766                validate_sql_insert_value_tuple_lengths(columns.as_slice(), values.as_slice())?;
767                values.clone()
768            }
769            SqlInsertSource::Select(_) => {
770                let source = Self::sql_insert_select_source_statement::<E>(statement)?;
771                let rows = self.execute_sql_insert_select_source_rows::<E>(&source)?;
772                validate_sql_insert_selected_rows(columns.as_slice(), rows.as_slice())?;
773
774                rows
775            }
776        };
777        let mut entities = Vec::with_capacity(source_rows.len());
778
779        for values in &source_rows {
780            let (key, patch) = Self::sql_insert_patch_and_key::<E>(columns.as_slice(), values)?;
781            let entity = self
782                .mutate_structural::<E>(key, patch, MutationMode::Insert)
783                .map_err(QueryError::execute)?;
784            entities.push(entity);
785        }
786
787        Self::sql_write_dispatch_projection(entities)
788    }
789
790    // Execute one reduced SQL UPDATE statement by selecting deterministic
791    // target rows first and then replaying one shared structural patch onto
792    // each matched primary key.
793    fn execute_sql_update_dispatch<E>(
794        &self,
795        statement: &SqlUpdateStatement,
796    ) -> Result<SqlDispatchResult, QueryError>
797    where
798        E: PersistedRow<Canister = C> + EntityValue,
799    {
800        ensure_sql_write_entity_matches::<E>(statement.entity.as_str())?;
801        let selector = Self::sql_update_selector_query::<E>(statement)?;
802        let patch = Self::sql_update_patch::<E>(statement)?;
803        let matched = self.execute_query(&selector)?;
804        let mut entities = Vec::with_capacity(matched.len());
805
806        // Phase 1: apply the already-normalized structural patch to every
807        // matched row in deterministic primary-key order.
808        for entity in matched.entities() {
809            let updated = self
810                .mutate_structural::<E>(entity.id().key(), patch.clone(), MutationMode::Update)
811                .map_err(QueryError::execute)?;
812            entities.push(updated);
813        }
814
815        Self::sql_write_dispatch_projection(entities)
816    }
817
818    // Build the shared structural SQL projection execution inputs once so
819    // value-row and rendered-row dispatch surfaces only differ in final packaging.
820    fn prepare_structural_sql_projection_execution(
821        &self,
822        query: StructuralQuery,
823        authority: EntityAuthority,
824    ) -> Result<(Vec<String>, AccessPlannedQuery), QueryError> {
825        // Phase 1: build the structural access plan once and freeze its outward
826        // column contract for all projection materialization surfaces.
827        let (_, plan) =
828            self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
829        let projection = plan.projection_spec(authority.model());
830        let columns = projection_labels_from_projection_spec(&projection);
831
832        Ok((columns, plan))
833    }
834
835    // Execute one structural SQL load query and return only row-oriented SQL
836    // projection values, keeping typed projection rows out of the shared SQL
837    // query-lane path.
838    pub(in crate::db::session::sql) fn execute_structural_sql_projection(
839        &self,
840        query: StructuralQuery,
841        authority: EntityAuthority,
842    ) -> Result<SqlProjectionPayload, QueryError> {
843        // Phase 1: build the shared structural plan and outward column contract once.
844        let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
845
846        // Phase 2: execute the shared structural load path with the already
847        // derived projection semantics.
848        let projected =
849            execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
850                .map_err(QueryError::execute)?;
851        let (rows, row_count) = projected.into_parts();
852
853        Ok(SqlProjectionPayload::new(columns, rows, row_count))
854    }
855
856    // Execute one structural SQL load query and return render-ready text rows
857    // for the dispatch lane when the terminal short path can prove them
858    // directly.
859    fn execute_structural_sql_projection_text(
860        &self,
861        query: StructuralQuery,
862        authority: EntityAuthority,
863    ) -> Result<SqlDispatchResult, QueryError> {
864        // Phase 1: build the shared structural plan and outward column contract once.
865        let (columns, plan) = self.prepare_structural_sql_projection_execution(query, authority)?;
866
867        // Phase 2: execute the shared structural load path with the already
868        // derived projection semantics while preferring rendered SQL rows.
869        let projected =
870            execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
871                .map_err(QueryError::execute)?;
872        let (rows, row_count) = projected.into_parts();
873
874        Ok(SqlDispatchResult::ProjectionText {
875            columns,
876            rows,
877            row_count,
878        })
879    }
880
881    // Execute one typed SQL delete query while keeping the row payload on the
882    // typed delete executor boundary that still owns non-runtime-hook delete
883    // commit-window application.
884    fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
885    where
886        E: PersistedRow<Canister = C> + EntityValue,
887    {
888        let plan = self
889            .compile_query_with_visible_indexes(query)?
890            .into_prepared_execution_plan();
891        let deleted = self
892            .with_metrics(|| {
893                self.delete_executor::<E>()
894                    .execute_structural_projection(plan)
895            })
896            .map_err(QueryError::execute)?;
897        let (rows, row_count) = deleted.into_parts();
898        let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
899
900        Ok(SqlProjectionPayload::new(
901            projection_labels_from_fields(E::MODEL.fields()),
902            rows,
903            row_count,
904        )
905        .into_dispatch_result())
906    }
907
908    // Lower one parsed SQL query/explain route once for one resolved authority
909    // and preserve grouped-column metadata for grouped SELECT dispatch.
910    fn lowered_sql_query_dispatch_inputs_for_authority(
911        parsed: &SqlParsedStatement,
912        authority: EntityAuthority,
913        unsupported_message: &'static str,
914    ) -> Result<(LoweredSqlQuery, Option<Vec<String>>), QueryError> {
915        let lowered = parsed.lower_query_lane_for_entity(
916            authority.model().name(),
917            authority.model().primary_key.name,
918        )?;
919        let projection_columns = matches!(lowered.query(), Some(LoweredSqlQuery::Select(_)))
920            .then(|| sql_projection_labels_from_select_statement(&parsed.statement))
921            .transpose()?;
922        let query = lowered
923            .into_query()
924            .ok_or_else(|| QueryError::unsupported_query(unsupported_message))?;
925
926        Ok((query, projection_columns.flatten()))
927    }
928
929    // Execute one parsed SQL query route through the shared aggregate,
930    // computed-projection, and lowered query lane so typed and generated
931    // dispatch only differ at the final SELECT/DELETE packaging boundary.
932    fn dispatch_sql_query_route_for_authority(
933        &self,
934        parsed: &SqlParsedStatement,
935        authority: EntityAuthority,
936        unsupported_message: &'static str,
937        dispatch_select: impl FnOnce(
938            &Self,
939            LoweredSelectShape,
940            EntityAuthority,
941            bool,
942            Option<Vec<String>>,
943        ) -> Result<SqlDispatchResult, QueryError>,
944        dispatch_delete: impl FnOnce(
945            &Self,
946            LoweredBaseQueryShape,
947            EntityAuthority,
948        ) -> Result<SqlDispatchResult, QueryError>,
949    ) -> Result<SqlDispatchResult, QueryError> {
950        // Phase 1: keep aggregate and computed projection classification on the
951        // shared parsed route so both dispatch surfaces honor the same lane split.
952        if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
953            let command =
954                Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
955
956            return self.execute_sql_aggregate_dispatch_for_authority(
957                command,
958                authority,
959                sql_aggregate_dispatch_label_override(&parsed.statement),
960            );
961        }
962
963        if let Some(plan) = computed_projection::computed_sql_projection_plan(&parsed.statement)? {
964            return self.execute_computed_sql_projection_dispatch_for_authority(plan, authority);
965        }
966
967        // Phase 2: lower the remaining query route once, then let the caller
968        // decide only the final outward result packaging.
969        let (query, projection_columns) = Self::lowered_sql_query_dispatch_inputs_for_authority(
970            parsed,
971            authority,
972            unsupported_message,
973        )?;
974        let grouped_surface = query.has_grouping();
975
976        match query {
977            LoweredSqlQuery::Select(select) => {
978                dispatch_select(self, select, authority, grouped_surface, projection_columns)
979            }
980            LoweredSqlQuery::Delete(delete) => dispatch_delete(self, delete, authority),
981        }
982    }
983
984    // Execute one parsed SQL EXPLAIN route through the shared computed-
985    // projection and lowered explain lanes so typed and generated dispatch do
986    // not duplicate the same explain classification tree.
987    fn dispatch_sql_explain_route_for_authority(
988        &self,
989        parsed: &SqlParsedStatement,
990        authority: EntityAuthority,
991    ) -> Result<SqlDispatchResult, QueryError> {
992        // Phase 1: keep computed-projection explain ownership on the same
993        // parsed route boundary as the shared query lane.
994        if let Some((mode, plan)) =
995            computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
996        {
997            return self
998                .explain_computed_sql_projection_dispatch_for_authority(mode, plan, authority)
999                .map(SqlDispatchResult::Explain);
1000        }
1001
1002        // Phase 2: lower once for execution/logical explain and preserve the
1003        // shared execution-first fallback policy across both callers.
1004        let lowered = parsed.lower_query_lane_for_entity(
1005            authority.model().name(),
1006            authority.model().primary_key.name,
1007        )?;
1008        if let Some(explain) =
1009            self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
1010        {
1011            return Ok(SqlDispatchResult::Explain(explain));
1012        }
1013
1014        self.explain_lowered_sql_for_authority(&lowered, authority)
1015            .map(SqlDispatchResult::Explain)
1016    }
1017
1018    // Validate that one SQL-derived query intent matches the grouped/scalar
1019    // execution surface that is about to consume it.
1020    pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
1021        query: &Query<E>,
1022        surface: SqlGroupingSurface,
1023    ) -> Result<(), QueryError>
1024    where
1025        E: EntityKind,
1026    {
1027        match (surface, query.has_grouping()) {
1028            (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
1029            (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
1030                QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
1031            ),
1032        }
1033    }
1034
1035    /// Execute one reduced SQL statement into one unified SQL dispatch payload.
1036    pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
1037    where
1038        E: PersistedRow<Canister = C> + EntityValue,
1039    {
1040        let parsed = self.parse_sql_statement(sql)?;
1041
1042        self.execute_sql_dispatch_parsed::<E>(&parsed)
1043    }
1044
1045    /// Execute one parsed reduced SQL statement into one unified SQL payload.
1046    pub fn execute_sql_dispatch_parsed<E>(
1047        &self,
1048        parsed: &SqlParsedStatement,
1049    ) -> Result<SqlDispatchResult, QueryError>
1050    where
1051        E: PersistedRow<Canister = C> + EntityValue,
1052    {
1053        match parsed.route() {
1054            SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1055                parsed,
1056                EntityAuthority::for_type::<E>(),
1057                "execute_sql_dispatch accepts SELECT or DELETE only",
1058                |session, select, authority, grouped_surface, projection_columns| {
1059                    if grouped_surface {
1060                        let columns = projection_columns.ok_or_else(|| {
1061                            QueryError::unsupported_query(
1062                                "grouped SQL dispatch requires explicit grouped projection items",
1063                            )
1064                        })?;
1065
1066                        return session.execute_lowered_sql_grouped_dispatch_select_core(
1067                            select, authority, columns,
1068                        );
1069                    }
1070
1071                    let payload = session.execute_lowered_sql_projection_core(select, authority)?;
1072                    if let Some(columns) = projection_columns {
1073                        let (_, rows, row_count) = payload.into_parts();
1074
1075                        return Ok(SqlProjectionPayload::new(columns, rows, row_count)
1076                            .into_dispatch_result());
1077                    }
1078
1079                    Ok(payload.into_dispatch_result())
1080                },
1081                |session, delete, _authority| {
1082                    let typed_query = bind_lowered_sql_query::<E>(
1083                        LoweredSqlQuery::Delete(delete),
1084                        MissingRowPolicy::Ignore,
1085                    )
1086                    .map_err(QueryError::from_sql_lowering_error)?;
1087
1088                    session.execute_typed_sql_delete(&typed_query)
1089                },
1090            ),
1091            SqlStatementRoute::Insert { .. } => {
1092                let SqlStatement::Insert(statement) = &parsed.statement else {
1093                    return Err(QueryError::invariant(
1094                        "INSERT SQL route must carry parsed INSERT statement",
1095                    ));
1096                };
1097
1098                self.execute_sql_insert_dispatch::<E>(statement)
1099            }
1100            SqlStatementRoute::Update { .. } => {
1101                let SqlStatement::Update(statement) = &parsed.statement else {
1102                    return Err(QueryError::invariant(
1103                        "UPDATE SQL route must carry parsed UPDATE statement",
1104                    ));
1105                };
1106
1107                self.execute_sql_update_dispatch::<E>(statement)
1108            }
1109            SqlStatementRoute::Explain { .. } => self
1110                .dispatch_sql_explain_route_for_authority(parsed, EntityAuthority::for_type::<E>()),
1111            SqlStatementRoute::Describe { .. } => {
1112                Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
1113            }
1114            SqlStatementRoute::ShowIndexes { .. } => {
1115                Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
1116            }
1117            SqlStatementRoute::ShowColumns { .. } => {
1118                Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
1119            }
1120            SqlStatementRoute::ShowEntities => {
1121                Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
1122            }
1123        }
1124    }
1125
1126    /// Execute one parsed reduced SQL statement through the generated canister
1127    /// query/explain surface for one already-resolved dynamic authority.
1128    ///
1129    /// This keeps the canister SQL facade on the same reduced SQL ownership
1130    /// boundary as typed dispatch without forcing the outer facade to reopen
1131    /// typed-generic routing just to preserve parity for computed projections.
1132    #[doc(hidden)]
1133    pub fn execute_generated_query_surface_dispatch_for_authority(
1134        &self,
1135        parsed: &SqlParsedStatement,
1136        authority: EntityAuthority,
1137    ) -> Result<SqlDispatchResult, QueryError> {
1138        match parsed.route() {
1139            SqlStatementRoute::Query { .. } => self.dispatch_sql_query_route_for_authority(
1140                parsed,
1141                authority,
1142                "generated SQL query surface requires query or EXPLAIN statement lanes",
1143                |session, select, authority, grouped_surface, projection_columns| {
1144                    if grouped_surface {
1145                        let columns = projection_columns.ok_or_else(|| {
1146                            QueryError::unsupported_query(
1147                                "grouped SQL dispatch requires explicit grouped projection items",
1148                            )
1149                        })?;
1150
1151                        return session
1152                            .execute_lowered_sql_grouped_dispatch_select_core(select, authority, columns);
1153                    }
1154
1155                    let result =
1156                        session.execute_lowered_sql_dispatch_select_text_core(select, authority)?;
1157                    if let Some(columns) = projection_columns {
1158                        let SqlDispatchResult::ProjectionText {
1159                            rows, row_count, ..
1160                        } = result
1161                        else {
1162                            return Err(QueryError::invariant(
1163                                "generated scalar SQL dispatch text path must emit projection text rows",
1164                            ));
1165                        };
1166
1167                        return Ok(SqlDispatchResult::ProjectionText {
1168                            columns,
1169                            rows,
1170                            row_count,
1171                        });
1172                    }
1173
1174                    Ok(result)
1175                },
1176                |session, delete, authority| {
1177                    session.execute_lowered_sql_dispatch_delete_core(&delete, authority)
1178                },
1179            ),
1180            SqlStatementRoute::Explain { .. } => {
1181                self.dispatch_sql_explain_route_for_authority(parsed, authority)
1182            }
1183            SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. }
1184            | SqlStatementRoute::Describe { .. }
1185            | SqlStatementRoute::ShowIndexes { .. }
1186            | SqlStatementRoute::ShowColumns { .. }
1187            | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
1188                "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1189            )),
1190        }
1191    }
1192
1193    /// Execute one raw SQL string through the generated canister query surface.
1194    ///
1195    /// This hidden helper keeps parse, route, authority, and metadata/query
1196    /// dispatch ownership in core so the build-generated `sql_dispatch` shim
1197    /// stays close to a pure descriptor table plus public ABI wrapper.
1198    #[doc(hidden)]
1199    #[must_use]
1200    pub fn execute_generated_query_surface_sql(
1201        &self,
1202        sql: &str,
1203        authorities: &[EntityAuthority],
1204    ) -> GeneratedSqlDispatchAttempt {
1205        // Phase 1: normalize and parse once so every generated route family
1206        // shares the same SQL ownership boundary.
1207        let sql_trimmed = match trim_generated_query_sql_input(sql) {
1208            Ok(sql_trimmed) => sql_trimmed,
1209            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1210        };
1211        let parsed = match self.parse_sql_statement(sql_trimmed) {
1212            Ok(parsed) => parsed,
1213            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1214        };
1215
1216        // Phase 2: keep SHOW ENTITIES descriptor-owned and resolve all other
1217        // generated routes against the emitted authority table exactly once.
1218        if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
1219            return GeneratedSqlDispatchAttempt::new(
1220                "",
1221                None,
1222                Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
1223                    authorities,
1224                ))),
1225            );
1226        }
1227        let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
1228            Ok(authority) => authority,
1229            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
1230        };
1231
1232        // Phase 3: dispatch the resolved route through the existing query,
1233        // explain, and metadata helpers without rebuilding route ownership in
1234        // the generated build output.
1235        let entity_name = authority.model().name();
1236        let explain_order_field = parsed
1237            .route()
1238            .is_explain()
1239            .then_some(authority.model().primary_key.name);
1240        let result = match parsed.route() {
1241            SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
1242                self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
1243            }
1244            SqlStatementRoute::Insert { .. } | SqlStatementRoute::Update { .. } => {
1245                Err(QueryError::unsupported_query(
1246                    "generated SQL query surface requires SELECT, DELETE, or EXPLAIN statement lanes",
1247                ))
1248            }
1249            SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
1250                self.describe_entity_model(authority.model()),
1251            )),
1252            SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
1253                self.show_indexes_for_store_model(authority.store_path(), authority.model()),
1254            )),
1255            SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
1256                self.show_columns_for_model(authority.model()),
1257            )),
1258            SqlStatementRoute::ShowEntities => unreachable!(
1259                "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
1260            ),
1261        };
1262
1263        GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
1264    }
1265}