Skip to main content

icydb_core/db/session/sql/dispatch/
mod.rs

1//! Module: db::session::sql::dispatch
2//! Responsibility: module-local ownership and contracts for db::session::sql::dispatch.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6mod computed;
7mod lowered;
8
9use crate::{
10    db::{
11        DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
12        executor::{
13            EntityAuthority, execute_sql_projection_rows_for_canister,
14            execute_sql_projection_text_rows_for_canister,
15        },
16        identifiers_tail_match,
17        query::intent::StructuralQuery,
18        session::sql::{
19            SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
20            aggregate::parsed_requires_dedicated_sql_aggregate_lane,
21            computed_projection,
22            projection::{
23                SqlProjectionPayload, projection_labels_from_fields,
24                projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25            },
26        },
27        sql::lowering::{
28            LoweredSqlQuery, bind_lowered_sql_query, lower_sql_command_from_prepared_statement,
29        },
30        sql::parser::{
31            SqlAggregateCall, SqlAggregateKind, SqlProjection, SqlSelectItem, SqlStatement,
32            SqlTextFunction,
33        },
34    },
35    traits::{CanisterKind, EntityKind, EntityValue},
36};
37
38///
39/// GeneratedSqlDispatchAttempt
40///
41/// Hidden generated-query dispatch envelope used by the facade helper to keep
42/// generated route ownership in core while preserving the public EXPLAIN error
43/// rewrite contract at the outer boundary.
44///
45
46#[doc(hidden)]
47pub struct GeneratedSqlDispatchAttempt {
48    entity_name: &'static str,
49    explain_order_field: Option<&'static str>,
50    result: Result<SqlDispatchResult, QueryError>,
51}
52
53impl GeneratedSqlDispatchAttempt {
54    // Build one generated-query dispatch attempt with optional explain-hint context.
55    const fn new(
56        entity_name: &'static str,
57        explain_order_field: Option<&'static str>,
58        result: Result<SqlDispatchResult, QueryError>,
59    ) -> Self {
60        Self {
61            entity_name,
62            explain_order_field,
63            result,
64        }
65    }
66
67    /// Borrow the resolved entity name for this generated-query attempt.
68    #[must_use]
69    pub const fn entity_name(&self) -> &'static str {
70        self.entity_name
71    }
72
73    /// Borrow the suggested deterministic order field for EXPLAIN rewrites.
74    #[must_use]
75    pub const fn explain_order_field(&self) -> Option<&'static str> {
76        self.explain_order_field
77    }
78
79    /// Consume and return the generated-query dispatch result.
80    pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
81        self.result
82    }
83}
84
85#[derive(Clone, Copy, Debug, Eq, PartialEq)]
86pub(in crate::db::session::sql) enum SqlGroupingSurface {
87    Scalar,
88    Grouped,
89}
90
91const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
92    match surface {
93        SqlGroupingSurface::Scalar => {
94            "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
95        }
96        SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
97    }
98}
99
100// Enforce the generated canister query contract that empty SQL is unsupported
101// before any parser/lowering work occurs.
102fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
103    let sql_trimmed = sql.trim();
104    if sql_trimmed.is_empty() {
105        return Err(QueryError::unsupported_query(
106            "query endpoint requires a non-empty SQL string",
107        ));
108    }
109
110    Ok(sql_trimmed)
111}
112
113// Render the generated-surface entity list from the descriptor table instead
114// of assuming every session-visible entity belongs on the public query export.
115fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
116    let mut entities = Vec::with_capacity(authorities.len());
117
118    for authority in authorities {
119        entities.push(authority.model().name().to_string());
120    }
121
122    entities
123}
124
125// Project grouped SELECT item labels into one stable outward column contract.
126fn grouped_sql_projection_labels_from_statement(
127    statement: &SqlStatement,
128) -> Result<Vec<String>, QueryError> {
129    let SqlStatement::Select(select) = statement else {
130        return Err(QueryError::invariant(
131            "grouped SQL projection labels require SELECT statement shape",
132        ));
133    };
134    let SqlProjection::Items(items) = &select.projection else {
135        return Err(QueryError::unsupported_query(
136            "grouped SQL dispatch requires explicit grouped projection items",
137        ));
138    };
139
140    Ok(items
141        .iter()
142        .map(grouped_sql_projection_item_label)
143        .collect())
144}
145
146// Render one grouped SELECT item into the public grouped-column label used by
147// unified dispatch results.
148fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
149    match item {
150        SqlSelectItem::Field(field) => field.clone(),
151        SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
152        SqlSelectItem::TextFunction(call) => {
153            format!(
154                "{}({})",
155                grouped_sql_text_function_name(call.function),
156                call.field
157            )
158        }
159    }
160}
161
162// Render one aggregate call into one canonical SQL-style label.
163fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
164    let kind = match aggregate.kind {
165        SqlAggregateKind::Count => "COUNT",
166        SqlAggregateKind::Sum => "SUM",
167        SqlAggregateKind::Avg => "AVG",
168        SqlAggregateKind::Min => "MIN",
169        SqlAggregateKind::Max => "MAX",
170    };
171
172    match aggregate.field.as_deref() {
173        Some(field) => format!("{kind}({field})"),
174        None => format!("{kind}(*)"),
175    }
176}
177
178// Render one reduced SQL text-function identifier into one stable uppercase
179// SQL label for outward column metadata.
180const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
181    match function {
182        SqlTextFunction::Trim => "TRIM",
183        SqlTextFunction::Ltrim => "LTRIM",
184        SqlTextFunction::Rtrim => "RTRIM",
185        SqlTextFunction::Lower => "LOWER",
186        SqlTextFunction::Upper => "UPPER",
187        SqlTextFunction::Length => "LENGTH",
188        SqlTextFunction::Left => "LEFT",
189        SqlTextFunction::Right => "RIGHT",
190        SqlTextFunction::StartsWith => "STARTS_WITH",
191        SqlTextFunction::EndsWith => "ENDS_WITH",
192        SqlTextFunction::Contains => "CONTAINS",
193        SqlTextFunction::Position => "POSITION",
194        SqlTextFunction::Replace => "REPLACE",
195        SqlTextFunction::Substring => "SUBSTRING",
196    }
197}
198
199// Resolve one generated query route onto the descriptor-owned authority table.
200fn authority_for_generated_sql_route(
201    route: &SqlStatementRoute,
202    authorities: &[EntityAuthority],
203) -> Result<EntityAuthority, QueryError> {
204    let sql_entity = route.entity();
205
206    for authority in authorities {
207        if identifiers_tail_match(sql_entity, authority.model().name()) {
208            return Ok(*authority);
209        }
210    }
211
212    Err(unsupported_generated_sql_entity_error(
213        sql_entity,
214        authorities,
215    ))
216}
217
218// Keep the generated query-surface unsupported-entity contract stable while
219// moving authority lookup out of the build-generated shim.
220fn unsupported_generated_sql_entity_error(
221    entity_name: &str,
222    authorities: &[EntityAuthority],
223) -> QueryError {
224    let mut supported = String::new();
225
226    for (index, authority) in authorities.iter().enumerate() {
227        if index != 0 {
228            supported.push_str(", ");
229        }
230
231        supported.push_str(authority.model().name());
232    }
233
234    QueryError::unsupported_query(format!(
235        "query endpoint does not support entity '{entity_name}'; supported: {supported}"
236    ))
237}
238
239impl<C: CanisterKind> DbSession<C> {
240    // Execute one structural SQL load query and return only row-oriented SQL
241    // projection values, keeping typed projection rows out of the shared SQL
242    // query-lane path.
243    pub(in crate::db::session::sql) fn execute_structural_sql_projection(
244        &self,
245        query: StructuralQuery,
246        authority: EntityAuthority,
247    ) -> Result<SqlProjectionPayload, QueryError> {
248        // Phase 1: build the structural access plan once and reuse its
249        // projection contract for both labels and row materialization.
250        let visible_indexes =
251            self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
252        let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
253        let projection = plan.projection_spec(authority.model());
254        let columns = projection_labels_from_projection_spec(&projection);
255
256        // Phase 2: execute the shared structural load path with the already
257        // derived projection semantics.
258        let projected =
259            execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
260                .map_err(QueryError::execute)?;
261        let (rows, row_count) = projected.into_parts();
262
263        Ok(SqlProjectionPayload::new(columns, rows, row_count))
264    }
265
266    // Execute one structural SQL load query and return render-ready text rows
267    // for the dispatch lane when the terminal short path can prove them
268    // directly.
269    fn execute_structural_sql_projection_text(
270        &self,
271        query: StructuralQuery,
272        authority: EntityAuthority,
273    ) -> Result<SqlDispatchResult, QueryError> {
274        // Phase 1: build the structural access plan once and reuse its
275        // projection contract for both labels and text-row materialization.
276        let visible_indexes =
277            self.visible_indexes_for_store_model(authority.store_path(), authority.model())?;
278        let plan = query.build_plan_with_visible_indexes(&visible_indexes)?;
279        let projection = plan.projection_spec(authority.model());
280        let columns = projection_labels_from_projection_spec(&projection);
281
282        // Phase 2: execute the shared structural load path with the already
283        // derived projection semantics while preferring rendered SQL rows.
284        let projected =
285            execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
286                .map_err(QueryError::execute)?;
287        let (rows, row_count) = projected.into_parts();
288
289        Ok(SqlDispatchResult::ProjectionText {
290            columns,
291            rows,
292            row_count,
293        })
294    }
295
296    // Execute one typed SQL delete query while keeping the row payload on the
297    // typed delete executor boundary that still owns non-runtime-hook delete
298    // commit-window application.
299    fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
300    where
301        E: PersistedRow<Canister = C> + EntityValue,
302    {
303        let plan = self
304            .compile_query_with_visible_indexes(query)?
305            .into_executable();
306        let deleted = self
307            .with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
308            .map_err(QueryError::execute)?;
309        let (rows, row_count) = deleted.into_parts();
310        let rows = sql_projection_rows_from_kernel_rows(rows);
311
312        Ok(SqlProjectionPayload::new(
313            projection_labels_from_fields(E::MODEL.fields()),
314            rows,
315            row_count,
316        )
317        .into_dispatch_result())
318    }
319
320    // Validate that one SQL-derived query intent matches the grouped/scalar
321    // execution surface that is about to consume it.
322    pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
323        query: &Query<E>,
324        surface: SqlGroupingSurface,
325    ) -> Result<(), QueryError>
326    where
327        E: EntityKind,
328    {
329        match (surface, query.has_grouping()) {
330            (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
331            (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
332                QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
333            ),
334        }
335    }
336
337    /// Execute one reduced SQL statement into one unified SQL dispatch payload.
338    pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
339    where
340        E: PersistedRow<Canister = C> + EntityValue,
341    {
342        let parsed = self.parse_sql_statement(sql)?;
343
344        self.execute_sql_dispatch_parsed::<E>(&parsed)
345    }
346
347    /// Execute one parsed reduced SQL statement into one unified SQL payload.
348    pub fn execute_sql_dispatch_parsed<E>(
349        &self,
350        parsed: &SqlParsedStatement,
351    ) -> Result<SqlDispatchResult, QueryError>
352    where
353        E: PersistedRow<Canister = C> + EntityValue,
354    {
355        match parsed.route() {
356            SqlStatementRoute::Query { .. } => {
357                if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
358                    let authority = EntityAuthority::for_type::<E>();
359                    let command =
360                        Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
361
362                    return self.execute_sql_aggregate_dispatch_for_authority(command, authority);
363                }
364
365                if let Some(plan) =
366                    computed_projection::computed_sql_projection_plan(&parsed.statement)?
367                {
368                    return self.execute_computed_sql_projection_dispatch::<E>(plan);
369                }
370
371                // Phase 1: keep typed dispatch on the shared lowered query lane
372                // for plain `SELECT`, and only pay typed query binding on the
373                // `DELETE` branch that still owns typed commit semantics.
374                let lowered = parsed
375                    .lower_query_lane_for_entity(E::MODEL.name(), E::MODEL.primary_key.name)?;
376                let grouped_columns = lowered
377                    .query()
378                    .filter(|query| query.has_grouping())
379                    .map(|_| grouped_sql_projection_labels_from_statement(&parsed.statement))
380                    .transpose()?;
381
382                // Phase 2: dispatch `SELECT` directly from the lowered shape so
383                // typed SQL projection does not rebuild and discard a typed
384                // `Query<E>` before returning to the structural executor path.
385                match lowered.into_query() {
386                    Some(LoweredSqlQuery::Select(select)) => match grouped_columns {
387                        Some(columns) => self.execute_lowered_sql_grouped_dispatch_select_core(
388                            select,
389                            EntityAuthority::for_type::<E>(),
390                            columns,
391                        ),
392                        None => self.execute_lowered_sql_dispatch_select_core(
393                            select,
394                            EntityAuthority::for_type::<E>(),
395                        ),
396                    },
397                    Some(LoweredSqlQuery::Delete(delete)) => {
398                        let typed_query = bind_lowered_sql_query::<E>(
399                            LoweredSqlQuery::Delete(delete),
400                            MissingRowPolicy::Ignore,
401                        )
402                        .map_err(QueryError::from_sql_lowering_error)?;
403
404                        self.execute_typed_sql_delete(&typed_query)
405                    }
406                    None => Err(QueryError::unsupported_query(
407                        "execute_sql_dispatch accepts SELECT or DELETE only",
408                    )),
409                }
410            }
411            SqlStatementRoute::Explain { .. } => {
412                if let Some((mode, plan)) =
413                    computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
414                {
415                    return self
416                        .explain_computed_sql_projection_dispatch::<E>(mode, plan)
417                        .map(SqlDispatchResult::Explain);
418                }
419
420                let lowered = lower_sql_command_from_prepared_statement(
421                    parsed.prepare(E::MODEL.name())?,
422                    E::MODEL.primary_key.name,
423                )
424                .map_err(QueryError::from_sql_lowering_error)?;
425                if let Some(explain) = self.explain_lowered_sql_execution_for_authority(
426                    &lowered,
427                    EntityAuthority::for_type::<E>(),
428                )? {
429                    return Ok(SqlDispatchResult::Explain(explain));
430                }
431
432                self.explain_lowered_sql_for_authority(&lowered, EntityAuthority::for_type::<E>())
433                    .map(SqlDispatchResult::Explain)
434            }
435            SqlStatementRoute::Describe { .. } => {
436                Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
437            }
438            SqlStatementRoute::ShowIndexes { .. } => {
439                Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
440            }
441            SqlStatementRoute::ShowColumns { .. } => {
442                Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
443            }
444            SqlStatementRoute::ShowEntities => {
445                Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
446            }
447        }
448    }
449
450    /// Execute one parsed reduced SQL statement through the generated canister
451    /// query/explain surface for one already-resolved dynamic authority.
452    ///
453    /// This keeps the canister SQL facade on the same reduced SQL ownership
454    /// boundary as typed dispatch without forcing the outer facade to reopen
455    /// typed-generic routing just to preserve parity for computed projections.
456    #[doc(hidden)]
457    pub fn execute_generated_query_surface_dispatch_for_authority(
458        &self,
459        parsed: &SqlParsedStatement,
460        authority: EntityAuthority,
461    ) -> Result<SqlDispatchResult, QueryError> {
462        match parsed.route() {
463            SqlStatementRoute::Query { .. } => {
464                if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
465                    let command =
466                        Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
467
468                    return self.execute_sql_aggregate_dispatch_for_authority(command, authority);
469                }
470
471                if let Some(plan) =
472                    computed_projection::computed_sql_projection_plan(&parsed.statement)?
473                {
474                    return self
475                        .execute_computed_sql_projection_dispatch_for_authority(plan, authority);
476                }
477
478                let lowered = parsed.lower_query_lane_for_entity(
479                    authority.model().name(),
480                    authority.model().primary_key.name,
481                )?;
482                let grouped_columns = lowered
483                    .query()
484                    .filter(|query| query.has_grouping())
485                    .map(|_| grouped_sql_projection_labels_from_statement(&parsed.statement))
486                    .transpose()?;
487
488                match lowered.into_query() {
489                    Some(LoweredSqlQuery::Select(select)) => match grouped_columns {
490                        Some(columns) => self.execute_lowered_sql_grouped_dispatch_select_core(
491                            select, authority, columns,
492                        ),
493                        None => {
494                            self.execute_lowered_sql_dispatch_select_text_core(select, authority)
495                        }
496                    },
497                    Some(LoweredSqlQuery::Delete(delete)) => {
498                        self.execute_lowered_sql_dispatch_delete_core(&delete, authority)
499                    }
500                    None => Err(QueryError::unsupported_query(
501                        "generated SQL query surface requires query or EXPLAIN statement lanes",
502                    )),
503                }
504            }
505            SqlStatementRoute::Explain { .. } => {
506                if let Some((mode, plan)) =
507                    computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
508                {
509                    return self
510                        .explain_computed_sql_projection_dispatch_for_authority(
511                            mode, plan, authority,
512                        )
513                        .map(SqlDispatchResult::Explain);
514                }
515
516                let lowered = parsed.lower_query_lane_for_entity(
517                    authority.model().name(),
518                    authority.model().primary_key.name,
519                )?;
520                if let Some(explain) =
521                    self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
522                {
523                    return Ok(SqlDispatchResult::Explain(explain));
524                }
525
526                self.explain_lowered_sql_for_authority(&lowered, authority)
527                    .map(SqlDispatchResult::Explain)
528            }
529            SqlStatementRoute::Describe { .. }
530            | SqlStatementRoute::ShowIndexes { .. }
531            | SqlStatementRoute::ShowColumns { .. }
532            | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
533                "generated SQL query surface requires query or EXPLAIN statement lanes",
534            )),
535        }
536    }
537
538    /// Execute one raw SQL string through the generated canister query surface.
539    ///
540    /// This hidden helper keeps parse, route, authority, and metadata/query
541    /// dispatch ownership in core so the build-generated `sql_dispatch` shim
542    /// stays close to a pure descriptor table plus public ABI wrapper.
543    #[doc(hidden)]
544    #[must_use]
545    pub fn execute_generated_query_surface_sql(
546        &self,
547        sql: &str,
548        authorities: &[EntityAuthority],
549    ) -> GeneratedSqlDispatchAttempt {
550        // Phase 1: normalize and parse once so every generated route family
551        // shares the same SQL ownership boundary.
552        let sql_trimmed = match trim_generated_query_sql_input(sql) {
553            Ok(sql_trimmed) => sql_trimmed,
554            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
555        };
556        let parsed = match self.parse_sql_statement(sql_trimmed) {
557            Ok(parsed) => parsed,
558            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
559        };
560
561        // Phase 2: keep SHOW ENTITIES descriptor-owned and resolve all other
562        // generated routes against the emitted authority table exactly once.
563        if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
564            return GeneratedSqlDispatchAttempt::new(
565                "",
566                None,
567                Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
568                    authorities,
569                ))),
570            );
571        }
572        let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
573            Ok(authority) => authority,
574            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
575        };
576
577        // Phase 3: dispatch the resolved route through the existing query,
578        // explain, and metadata helpers without rebuilding route ownership in
579        // the generated build output.
580        let entity_name = authority.model().name();
581        let explain_order_field = parsed
582            .route()
583            .is_explain()
584            .then_some(authority.model().primary_key.name);
585        let result = match parsed.route() {
586            SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
587                self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
588            }
589            SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
590                self.describe_entity_model(authority.model()),
591            )),
592            SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
593                self.show_indexes_for_store_model(authority.store_path(), authority.model()),
594            )),
595            SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
596                self.show_columns_for_model(authority.model()),
597            )),
598            SqlStatementRoute::ShowEntities => unreachable!(
599                "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
600            ),
601        };
602
603        GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
604    }
605}