Skip to main content

icydb_core/db/session/sql/dispatch/
mod.rs

1//! Module: db::session::sql::dispatch
2//! Responsibility: module-local ownership and contracts for db::session::sql::dispatch.
3//! Does not own: cross-module orchestration outside this module.
4//! Boundary: exposes this module API while keeping implementation details internal.
5
6mod computed;
7mod lowered;
8
9use crate::{
10    db::{
11        DbSession, MissingRowPolicy, PersistedRow, Query, QueryError,
12        executor::{
13            EntityAuthority, execute_sql_projection_rows_for_canister,
14            execute_sql_projection_text_rows_for_canister,
15        },
16        identifiers_tail_match,
17        query::intent::StructuralQuery,
18        session::sql::{
19            SqlDispatchResult, SqlParsedStatement, SqlStatementRoute,
20            aggregate::parsed_requires_dedicated_sql_aggregate_lane,
21            computed_projection,
22            projection::{
23                SqlProjectionPayload, projection_labels_from_fields,
24                projection_labels_from_projection_spec, sql_projection_rows_from_kernel_rows,
25            },
26        },
27        sql::lowering::{
28            LoweredSqlQuery, bind_lowered_sql_query, lower_sql_command_from_prepared_statement,
29        },
30        sql::parser::{
31            SqlAggregateCall, SqlAggregateKind, SqlProjection, SqlSelectItem, SqlStatement,
32            SqlTextFunction,
33        },
34    },
35    traits::{CanisterKind, EntityKind, EntityValue},
36};
37
38#[cfg(feature = "perf-attribution")]
39pub use lowered::LoweredSqlDispatchExecutorAttribution;
40
41///
42/// GeneratedSqlDispatchAttempt
43///
44/// Hidden generated-query dispatch envelope used by the facade helper to keep
45/// generated route ownership in core while preserving the public EXPLAIN error
46/// rewrite contract at the outer boundary.
47///
48
49#[doc(hidden)]
50pub struct GeneratedSqlDispatchAttempt {
51    entity_name: &'static str,
52    explain_order_field: Option<&'static str>,
53    result: Result<SqlDispatchResult, QueryError>,
54}
55
56impl GeneratedSqlDispatchAttempt {
57    // Build one generated-query dispatch attempt with optional explain-hint context.
58    const fn new(
59        entity_name: &'static str,
60        explain_order_field: Option<&'static str>,
61        result: Result<SqlDispatchResult, QueryError>,
62    ) -> Self {
63        Self {
64            entity_name,
65            explain_order_field,
66            result,
67        }
68    }
69
70    /// Borrow the resolved entity name for this generated-query attempt.
71    #[must_use]
72    pub const fn entity_name(&self) -> &'static str {
73        self.entity_name
74    }
75
76    /// Borrow the suggested deterministic order field for EXPLAIN rewrites.
77    #[must_use]
78    pub const fn explain_order_field(&self) -> Option<&'static str> {
79        self.explain_order_field
80    }
81
82    /// Consume and return the generated-query dispatch result.
83    pub fn into_result(self) -> Result<SqlDispatchResult, QueryError> {
84        self.result
85    }
86}
87
88#[derive(Clone, Copy, Debug, Eq, PartialEq)]
89pub(in crate::db::session::sql) enum SqlGroupingSurface {
90    Scalar,
91    Grouped,
92}
93
94const fn unsupported_sql_grouping_message(surface: SqlGroupingSurface) -> &'static str {
95    match surface {
96        SqlGroupingSurface::Scalar => {
97            "execute_sql rejects grouped SELECT; use execute_sql_grouped(...)"
98        }
99        SqlGroupingSurface::Grouped => "execute_sql_grouped requires grouped SQL query intent",
100    }
101}
102
103// Enforce the generated canister query contract that empty SQL is unsupported
104// before any parser/lowering work occurs.
105fn trim_generated_query_sql_input(sql: &str) -> Result<&str, QueryError> {
106    let sql_trimmed = sql.trim();
107    if sql_trimmed.is_empty() {
108        return Err(QueryError::unsupported_query(
109            "query endpoint requires a non-empty SQL string",
110        ));
111    }
112
113    Ok(sql_trimmed)
114}
115
116// Render the generated-surface entity list from the descriptor table instead
117// of assuming every session-visible entity belongs on the public query export.
118fn generated_sql_entities(authorities: &[EntityAuthority]) -> Vec<String> {
119    let mut entities = Vec::with_capacity(authorities.len());
120
121    for authority in authorities {
122        entities.push(authority.model().name().to_string());
123    }
124
125    entities
126}
127
128// Project grouped SELECT item labels into one stable outward column contract.
129fn grouped_sql_projection_labels_from_statement(
130    statement: &SqlStatement,
131) -> Result<Vec<String>, QueryError> {
132    let SqlStatement::Select(select) = statement else {
133        return Err(QueryError::invariant(
134            "grouped SQL projection labels require SELECT statement shape",
135        ));
136    };
137    let SqlProjection::Items(items) = &select.projection else {
138        return Err(QueryError::unsupported_query(
139            "grouped SQL dispatch requires explicit grouped projection items",
140        ));
141    };
142
143    Ok(items
144        .iter()
145        .map(grouped_sql_projection_item_label)
146        .collect())
147}
148
149// Render one grouped SELECT item into the public grouped-column label used by
150// unified dispatch results.
151fn grouped_sql_projection_item_label(item: &SqlSelectItem) -> String {
152    match item {
153        SqlSelectItem::Field(field) => field.clone(),
154        SqlSelectItem::Aggregate(aggregate) => grouped_sql_aggregate_call_label(aggregate),
155        SqlSelectItem::TextFunction(call) => {
156            format!(
157                "{}({})",
158                grouped_sql_text_function_name(call.function),
159                call.field
160            )
161        }
162    }
163}
164
165// Render one aggregate call into one canonical SQL-style label.
166fn grouped_sql_aggregate_call_label(aggregate: &SqlAggregateCall) -> String {
167    let kind = match aggregate.kind {
168        SqlAggregateKind::Count => "COUNT",
169        SqlAggregateKind::Sum => "SUM",
170        SqlAggregateKind::Avg => "AVG",
171        SqlAggregateKind::Min => "MIN",
172        SqlAggregateKind::Max => "MAX",
173    };
174
175    match aggregate.field.as_deref() {
176        Some(field) => format!("{kind}({field})"),
177        None => format!("{kind}(*)"),
178    }
179}
180
181// Render one reduced SQL text-function identifier into one stable uppercase
182// SQL label for outward column metadata.
183const fn grouped_sql_text_function_name(function: SqlTextFunction) -> &'static str {
184    match function {
185        SqlTextFunction::Trim => "TRIM",
186        SqlTextFunction::Ltrim => "LTRIM",
187        SqlTextFunction::Rtrim => "RTRIM",
188        SqlTextFunction::Lower => "LOWER",
189        SqlTextFunction::Upper => "UPPER",
190        SqlTextFunction::Length => "LENGTH",
191        SqlTextFunction::Left => "LEFT",
192        SqlTextFunction::Right => "RIGHT",
193        SqlTextFunction::StartsWith => "STARTS_WITH",
194        SqlTextFunction::EndsWith => "ENDS_WITH",
195        SqlTextFunction::Contains => "CONTAINS",
196        SqlTextFunction::Position => "POSITION",
197        SqlTextFunction::Replace => "REPLACE",
198        SqlTextFunction::Substring => "SUBSTRING",
199    }
200}
201
202// Resolve one generated query route onto the descriptor-owned authority table.
203fn authority_for_generated_sql_route(
204    route: &SqlStatementRoute,
205    authorities: &[EntityAuthority],
206) -> Result<EntityAuthority, QueryError> {
207    let sql_entity = route.entity();
208
209    for authority in authorities {
210        if identifiers_tail_match(sql_entity, authority.model().name()) {
211            return Ok(*authority);
212        }
213    }
214
215    Err(unsupported_generated_sql_entity_error(
216        sql_entity,
217        authorities,
218    ))
219}
220
221// Keep the generated query-surface unsupported-entity contract stable while
222// moving authority lookup out of the build-generated shim.
223fn unsupported_generated_sql_entity_error(
224    entity_name: &str,
225    authorities: &[EntityAuthority],
226) -> QueryError {
227    let mut supported = String::new();
228
229    for (index, authority) in authorities.iter().enumerate() {
230        if index != 0 {
231            supported.push_str(", ");
232        }
233
234        supported.push_str(authority.model().name());
235    }
236
237    QueryError::unsupported_query(format!(
238        "query endpoint does not support entity '{entity_name}'; supported: {supported}"
239    ))
240}
241
242impl<C: CanisterKind> DbSession<C> {
243    // Execute one structural SQL load query and return only row-oriented SQL
244    // projection values, keeping typed projection rows out of the shared SQL
245    // query-lane path.
246    pub(in crate::db::session::sql) fn execute_structural_sql_projection(
247        &self,
248        query: StructuralQuery,
249        authority: EntityAuthority,
250    ) -> Result<SqlProjectionPayload, QueryError> {
251        // Phase 1: build the structural access plan once and reuse its
252        // projection contract for both labels and row materialization.
253        let (_, plan) =
254            self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
255        let projection = plan.projection_spec(authority.model());
256        let columns = projection_labels_from_projection_spec(&projection);
257
258        // Phase 2: execute the shared structural load path with the already
259        // derived projection semantics.
260        let projected =
261            execute_sql_projection_rows_for_canister(&self.db, self.debug, authority, plan)
262                .map_err(QueryError::execute)?;
263        let (rows, row_count) = projected.into_parts();
264
265        Ok(SqlProjectionPayload::new(columns, rows, row_count))
266    }
267
268    // Execute one structural SQL load query and return render-ready text rows
269    // for the dispatch lane when the terminal short path can prove them
270    // directly.
271    fn execute_structural_sql_projection_text(
272        &self,
273        query: StructuralQuery,
274        authority: EntityAuthority,
275    ) -> Result<SqlDispatchResult, QueryError> {
276        // Phase 1: build the structural access plan once and reuse its
277        // projection contract for both labels and text-row materialization.
278        let (_, plan) =
279            self.build_structural_plan_with_visible_indexes_for_authority(query, authority)?;
280        let projection = plan.projection_spec(authority.model());
281        let columns = projection_labels_from_projection_spec(&projection);
282
283        // Phase 2: execute the shared structural load path with the already
284        // derived projection semantics while preferring rendered SQL rows.
285        let projected =
286            execute_sql_projection_text_rows_for_canister(&self.db, self.debug, authority, plan)
287                .map_err(QueryError::execute)?;
288        let (rows, row_count) = projected.into_parts();
289
290        Ok(SqlDispatchResult::ProjectionText {
291            columns,
292            rows,
293            row_count,
294        })
295    }
296
297    // Execute one typed SQL delete query while keeping the row payload on the
298    // typed delete executor boundary that still owns non-runtime-hook delete
299    // commit-window application.
300    fn execute_typed_sql_delete<E>(&self, query: &Query<E>) -> Result<SqlDispatchResult, QueryError>
301    where
302        E: PersistedRow<Canister = C> + EntityValue,
303    {
304        let plan = self
305            .compile_query_with_visible_indexes(query)?
306            .into_executable();
307        let deleted = self
308            .with_metrics(|| self.delete_executor::<E>().execute_sql_projection(plan))
309            .map_err(QueryError::execute)?;
310        let (rows, row_count) = deleted.into_parts();
311        let rows = sql_projection_rows_from_kernel_rows(rows).map_err(QueryError::execute)?;
312
313        Ok(SqlProjectionPayload::new(
314            projection_labels_from_fields(E::MODEL.fields()),
315            rows,
316            row_count,
317        )
318        .into_dispatch_result())
319    }
320
321    // Validate that one SQL-derived query intent matches the grouped/scalar
322    // execution surface that is about to consume it.
323    pub(in crate::db::session::sql) fn ensure_sql_query_grouping<E>(
324        query: &Query<E>,
325        surface: SqlGroupingSurface,
326    ) -> Result<(), QueryError>
327    where
328        E: EntityKind,
329    {
330        match (surface, query.has_grouping()) {
331            (SqlGroupingSurface::Scalar, false) | (SqlGroupingSurface::Grouped, true) => Ok(()),
332            (SqlGroupingSurface::Scalar, true) | (SqlGroupingSurface::Grouped, false) => Err(
333                QueryError::unsupported_query(unsupported_sql_grouping_message(surface)),
334            ),
335        }
336    }
337
338    /// Execute one reduced SQL statement into one unified SQL dispatch payload.
339    pub fn execute_sql_dispatch<E>(&self, sql: &str) -> Result<SqlDispatchResult, QueryError>
340    where
341        E: PersistedRow<Canister = C> + EntityValue,
342    {
343        let parsed = self.parse_sql_statement(sql)?;
344
345        self.execute_sql_dispatch_parsed::<E>(&parsed)
346    }
347
348    /// Execute one parsed reduced SQL statement into one unified SQL payload.
349    pub fn execute_sql_dispatch_parsed<E>(
350        &self,
351        parsed: &SqlParsedStatement,
352    ) -> Result<SqlDispatchResult, QueryError>
353    where
354        E: PersistedRow<Canister = C> + EntityValue,
355    {
356        match parsed.route() {
357            SqlStatementRoute::Query { .. } => {
358                if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
359                    let authority = EntityAuthority::for_type::<E>();
360                    let command =
361                        Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
362
363                    return self.execute_sql_aggregate_dispatch_for_authority(command, authority);
364                }
365
366                if let Some(plan) =
367                    computed_projection::computed_sql_projection_plan(&parsed.statement)?
368                {
369                    return self.execute_computed_sql_projection_dispatch::<E>(plan);
370                }
371
372                // Phase 1: keep typed dispatch on the shared lowered query lane
373                // for plain `SELECT`, and only pay typed query binding on the
374                // `DELETE` branch that still owns typed commit semantics.
375                let lowered = parsed
376                    .lower_query_lane_for_entity(E::MODEL.name(), E::MODEL.primary_key.name)?;
377                let grouped_columns = lowered
378                    .query()
379                    .filter(|query| query.has_grouping())
380                    .map(|_| grouped_sql_projection_labels_from_statement(&parsed.statement))
381                    .transpose()?;
382
383                // Phase 2: dispatch `SELECT` directly from the lowered shape so
384                // typed SQL projection does not rebuild and discard a typed
385                // `Query<E>` before returning to the structural executor path.
386                match lowered.into_query() {
387                    Some(LoweredSqlQuery::Select(select)) => match grouped_columns {
388                        Some(columns) => self.execute_lowered_sql_grouped_dispatch_select_core(
389                            select,
390                            EntityAuthority::for_type::<E>(),
391                            columns,
392                        ),
393                        None => self
394                            .execute_lowered_sql_projection_core(
395                                select,
396                                EntityAuthority::for_type::<E>(),
397                            )
398                            .map(SqlProjectionPayload::into_dispatch_result),
399                    },
400                    Some(LoweredSqlQuery::Delete(delete)) => {
401                        let typed_query = bind_lowered_sql_query::<E>(
402                            LoweredSqlQuery::Delete(delete),
403                            MissingRowPolicy::Ignore,
404                        )
405                        .map_err(QueryError::from_sql_lowering_error)?;
406
407                        self.execute_typed_sql_delete(&typed_query)
408                    }
409                    None => Err(QueryError::unsupported_query(
410                        "execute_sql_dispatch accepts SELECT or DELETE only",
411                    )),
412                }
413            }
414            SqlStatementRoute::Explain { .. } => {
415                if let Some((mode, plan)) =
416                    computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
417                {
418                    return self
419                        .explain_computed_sql_projection_dispatch::<E>(mode, plan)
420                        .map(SqlDispatchResult::Explain);
421                }
422
423                let lowered = lower_sql_command_from_prepared_statement(
424                    parsed.prepare(E::MODEL.name())?,
425                    E::MODEL.primary_key.name,
426                )
427                .map_err(QueryError::from_sql_lowering_error)?;
428                if let Some(explain) = self.explain_lowered_sql_execution_for_authority(
429                    &lowered,
430                    EntityAuthority::for_type::<E>(),
431                )? {
432                    return Ok(SqlDispatchResult::Explain(explain));
433                }
434
435                self.explain_lowered_sql_for_authority(&lowered, EntityAuthority::for_type::<E>())
436                    .map(SqlDispatchResult::Explain)
437            }
438            SqlStatementRoute::Describe { .. } => {
439                Ok(SqlDispatchResult::Describe(self.describe_entity::<E>()))
440            }
441            SqlStatementRoute::ShowIndexes { .. } => {
442                Ok(SqlDispatchResult::ShowIndexes(self.show_indexes::<E>()))
443            }
444            SqlStatementRoute::ShowColumns { .. } => {
445                Ok(SqlDispatchResult::ShowColumns(self.show_columns::<E>()))
446            }
447            SqlStatementRoute::ShowEntities => {
448                Ok(SqlDispatchResult::ShowEntities(self.show_entities()))
449            }
450        }
451    }
452
453    /// Execute one parsed reduced SQL statement through the generated canister
454    /// query/explain surface for one already-resolved dynamic authority.
455    ///
456    /// This keeps the canister SQL facade on the same reduced SQL ownership
457    /// boundary as typed dispatch without forcing the outer facade to reopen
458    /// typed-generic routing just to preserve parity for computed projections.
459    #[doc(hidden)]
460    pub fn execute_generated_query_surface_dispatch_for_authority(
461        &self,
462        parsed: &SqlParsedStatement,
463        authority: EntityAuthority,
464    ) -> Result<SqlDispatchResult, QueryError> {
465        match parsed.route() {
466            SqlStatementRoute::Query { .. } => {
467                if parsed_requires_dedicated_sql_aggregate_lane(parsed) {
468                    let command =
469                        Self::compile_sql_aggregate_command_core_for_authority(parsed, authority)?;
470
471                    return self.execute_sql_aggregate_dispatch_for_authority(command, authority);
472                }
473
474                if let Some(plan) =
475                    computed_projection::computed_sql_projection_plan(&parsed.statement)?
476                {
477                    return self
478                        .execute_computed_sql_projection_dispatch_for_authority(plan, authority);
479                }
480
481                let lowered = parsed.lower_query_lane_for_entity(
482                    authority.model().name(),
483                    authority.model().primary_key.name,
484                )?;
485                let grouped_columns = lowered
486                    .query()
487                    .filter(|query| query.has_grouping())
488                    .map(|_| grouped_sql_projection_labels_from_statement(&parsed.statement))
489                    .transpose()?;
490
491                match lowered.into_query() {
492                    Some(LoweredSqlQuery::Select(select)) => match grouped_columns {
493                        Some(columns) => self.execute_lowered_sql_grouped_dispatch_select_core(
494                            select, authority, columns,
495                        ),
496                        None => {
497                            self.execute_lowered_sql_dispatch_select_text_core(select, authority)
498                        }
499                    },
500                    Some(LoweredSqlQuery::Delete(delete)) => {
501                        self.execute_lowered_sql_dispatch_delete_core(&delete, authority)
502                    }
503                    None => Err(QueryError::unsupported_query(
504                        "generated SQL query surface requires query or EXPLAIN statement lanes",
505                    )),
506                }
507            }
508            SqlStatementRoute::Explain { .. } => {
509                if let Some((mode, plan)) =
510                    computed_projection::computed_sql_projection_explain_plan(&parsed.statement)?
511                {
512                    return self
513                        .explain_computed_sql_projection_dispatch_for_authority(
514                            mode, plan, authority,
515                        )
516                        .map(SqlDispatchResult::Explain);
517                }
518
519                let lowered = parsed.lower_query_lane_for_entity(
520                    authority.model().name(),
521                    authority.model().primary_key.name,
522                )?;
523                if let Some(explain) =
524                    self.explain_lowered_sql_execution_for_authority(&lowered, authority)?
525                {
526                    return Ok(SqlDispatchResult::Explain(explain));
527                }
528
529                self.explain_lowered_sql_for_authority(&lowered, authority)
530                    .map(SqlDispatchResult::Explain)
531            }
532            SqlStatementRoute::Describe { .. }
533            | SqlStatementRoute::ShowIndexes { .. }
534            | SqlStatementRoute::ShowColumns { .. }
535            | SqlStatementRoute::ShowEntities => Err(QueryError::unsupported_query(
536                "generated SQL query surface requires query or EXPLAIN statement lanes",
537            )),
538        }
539    }
540
541    /// Execute one raw SQL string through the generated canister query surface.
542    ///
543    /// This hidden helper keeps parse, route, authority, and metadata/query
544    /// dispatch ownership in core so the build-generated `sql_dispatch` shim
545    /// stays close to a pure descriptor table plus public ABI wrapper.
546    #[doc(hidden)]
547    #[must_use]
548    pub fn execute_generated_query_surface_sql(
549        &self,
550        sql: &str,
551        authorities: &[EntityAuthority],
552    ) -> GeneratedSqlDispatchAttempt {
553        // Phase 1: normalize and parse once so every generated route family
554        // shares the same SQL ownership boundary.
555        let sql_trimmed = match trim_generated_query_sql_input(sql) {
556            Ok(sql_trimmed) => sql_trimmed,
557            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
558        };
559        let parsed = match self.parse_sql_statement(sql_trimmed) {
560            Ok(parsed) => parsed,
561            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
562        };
563
564        // Phase 2: keep SHOW ENTITIES descriptor-owned and resolve all other
565        // generated routes against the emitted authority table exactly once.
566        if matches!(parsed.route(), SqlStatementRoute::ShowEntities) {
567            return GeneratedSqlDispatchAttempt::new(
568                "",
569                None,
570                Ok(SqlDispatchResult::ShowEntities(generated_sql_entities(
571                    authorities,
572                ))),
573            );
574        }
575        let authority = match authority_for_generated_sql_route(parsed.route(), authorities) {
576            Ok(authority) => authority,
577            Err(err) => return GeneratedSqlDispatchAttempt::new("", None, Err(err)),
578        };
579
580        // Phase 3: dispatch the resolved route through the existing query,
581        // explain, and metadata helpers without rebuilding route ownership in
582        // the generated build output.
583        let entity_name = authority.model().name();
584        let explain_order_field = parsed
585            .route()
586            .is_explain()
587            .then_some(authority.model().primary_key.name);
588        let result = match parsed.route() {
589            SqlStatementRoute::Query { .. } | SqlStatementRoute::Explain { .. } => {
590                self.execute_generated_query_surface_dispatch_for_authority(&parsed, authority)
591            }
592            SqlStatementRoute::Describe { .. } => Ok(SqlDispatchResult::Describe(
593                self.describe_entity_model(authority.model()),
594            )),
595            SqlStatementRoute::ShowIndexes { .. } => Ok(SqlDispatchResult::ShowIndexes(
596                self.show_indexes_for_store_model(authority.store_path(), authority.model()),
597            )),
598            SqlStatementRoute::ShowColumns { .. } => Ok(SqlDispatchResult::ShowColumns(
599                self.show_columns_for_model(authority.model()),
600            )),
601            SqlStatementRoute::ShowEntities => unreachable!(
602                "SHOW ENTITIES is handled before authority resolution for generated query dispatch"
603            ),
604        };
605
606        GeneratedSqlDispatchAttempt::new(entity_name, explain_order_field, result)
607    }
608}