Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        executor::{EntityAuthority, SharedPreparedExecutionPlan},
31        query::intent::StructuralQuery,
32        schema::{AcceptedSchemaSnapshot, SchemaInfo},
33        schema::{
34            execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35            execute_sql_ddl_field_default_change, execute_sql_ddl_field_nullability_change,
36            execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop,
37        },
38        session::query::QueryPlanCacheAttribution,
39        session::sql::projection::{
40            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
41        },
42        sql::{
43            ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
44            parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
45        },
46    },
47    traits::{CanisterKind, EntityValue, Path},
48};
49
50pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
51pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
52#[cfg(feature = "diagnostics")]
53pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
54#[cfg(feature = "diagnostics")]
55pub use attribution::{
56    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
57    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
58};
59pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
60pub(in crate::db::session::sql) use cache::{
61    SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
62};
63pub(in crate::db::session::sql) use compile::{
64    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
65};
66pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
67pub use result::SqlStatementResult;
68
69#[cfg(all(test, not(feature = "diagnostics")))]
70pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
71#[cfg(feature = "diagnostics")]
72pub use crate::db::session::sql::projection::{
73    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
74};
75
76// Keep parsing as a module-owned helper instead of hanging a pure parser off
77// `DbSession` as a fake session method.
78#[cfg(test)]
79pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
80    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
81}
82
83/// Return the entity identifier targeted by one reduced SQL statement.
84///
85/// `SHOW ENTITIES` intentionally has no entity target; callers that dispatch
86/// across canister-owned entities may route it through any accepted entity.
87#[doc(hidden)]
88pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
89    let (statement, _) =
90        parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
91
92    Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
93}
94
95const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
96    match statement {
97        SqlStatement::Select(statement) => Some(statement.entity.as_str()),
98        SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
99        SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
100        SqlStatement::Update(statement) => Some(statement.entity.as_str()),
101        SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
102            Some(statement.entity.as_str())
103        }
104        SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
105            Some(entity) => Some(entity.as_str()),
106            None => None,
107        },
108        SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
109            Some(statement.entity.as_str())
110        }
111        SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
112            Some(statement.entity.as_str())
113        }
114        SqlStatement::Ddl(SqlDdlStatement::AlterTableDropColumn(statement)) => {
115            Some(statement.entity.as_str())
116        }
117        SqlStatement::Explain(statement) => match &statement.statement {
118            SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
119            SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
120        },
121        SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
122        SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
123        SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
124        SqlStatement::ShowEntities(_) => None,
125    }
126}
127
128// Measure one SQL compile stage and immediately surface the stage result. The
129// helper keeps attribution capture uniform while avoiding repeated
130// `(cost, result); result?` boilerplate across the compile pipeline.
131fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
132    let (local_instructions, result) = measure_sql_stage(stage);
133    let value = result?;
134
135    Ok((local_instructions, value))
136}
137
138impl<C: CanisterKind> DbSession<C> {
139    // Resolve one SQL SELECT through a caller-selected accepted authority and
140    // accepted schema snapshot. Typed SQL entrypoints use this to avoid passing
141    // generated authority through the runtime cache boundary.
142    fn sql_select_prepared_plan_for_accepted_authority(
143        &self,
144        query: &StructuralQuery,
145        authority: EntityAuthority,
146        accepted_schema: &AcceptedSchemaSnapshot,
147    ) -> Result<
148        (
149            SharedPreparedExecutionPlan,
150            SqlProjectionContract,
151            SqlCacheAttribution,
152        ),
153        QueryError,
154    > {
155        let (prepared_plan, cache_attribution) = self
156            .cached_shared_query_plan_for_accepted_authority(
157                authority.clone(),
158                accepted_schema,
159                query,
160            )?;
161        Ok(Self::sql_select_projection_from_prepared_plan(
162            prepared_plan,
163            authority,
164            cache_attribution,
165        ))
166    }
167
168    // Resolve one typed SQL SELECT through accepted schema authority selected
169    // at the session boundary.
170    fn sql_select_prepared_plan_for_entity<E>(
171        &self,
172        query: &StructuralQuery,
173    ) -> Result<
174        (
175            SharedPreparedExecutionPlan,
176            SqlProjectionContract,
177            SqlCacheAttribution,
178        ),
179        QueryError,
180    >
181    where
182        E: PersistedRow<Canister = C> + EntityValue,
183    {
184        let (accepted_schema, authority) = self
185            .accepted_entity_authority::<E>()
186            .map_err(QueryError::execute)?;
187
188        self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
189    }
190
191    fn sql_select_projection_from_prepared_plan(
192        prepared_plan: SharedPreparedExecutionPlan,
193        authority: EntityAuthority,
194        cache_attribution: QueryPlanCacheAttribution,
195    ) -> (
196        SharedPreparedExecutionPlan,
197        SqlProjectionContract,
198        SqlCacheAttribution,
199    ) {
200        let projection_spec = prepared_plan
201            .logical_plan()
202            .projection_spec(authority.model());
203        let projection = SqlProjectionContract::new(
204            projection_labels_from_projection_spec(&projection_spec),
205            projection_fixed_scales_from_projection_spec(&projection_spec),
206        );
207
208        (
209            prepared_plan,
210            projection,
211            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
212        )
213    }
214
215    // Keep query/update surface gating owned by one helper so the SQL
216    // compiled-command lane does not duplicate the same statement-family split
217    // just to change the outward error wording.
218    fn ensure_sql_statement_supported_for_surface(
219        statement: &SqlStatement,
220        surface: SqlCompiledCommandSurface,
221    ) -> Result<(), QueryError> {
222        match (surface, statement) {
223            (
224                SqlCompiledCommandSurface::Query,
225                SqlStatement::Select(_)
226                | SqlStatement::Explain(_)
227                | SqlStatement::Describe(_)
228                | SqlStatement::ShowIndexes(_)
229                | SqlStatement::ShowColumns(_)
230                | SqlStatement::ShowEntities(_),
231            )
232            | (
233                SqlCompiledCommandSurface::Update,
234                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
235            ) => Ok(()),
236            (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
237                "SQL DDL execution is not supported in this release",
238            )),
239            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
240                Err(QueryError::unsupported_query(
241                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
242                ))
243            }
244            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
245                Err(QueryError::unsupported_query(
246                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
247                ))
248            }
249            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
250                Err(QueryError::unsupported_query(
251                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
252                ))
253            }
254            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
255                Err(QueryError::unsupported_query(
256                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
257                ))
258            }
259            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
260                Err(QueryError::unsupported_query(
261                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
262                ))
263            }
264            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
265                Err(QueryError::unsupported_query(
266                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
267                ))
268            }
269            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
270                Err(QueryError::unsupported_query(
271                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
272                ))
273            }
274            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
275                Err(QueryError::unsupported_query(
276                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
277                ))
278            }
279            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
280                Err(QueryError::unsupported_query(
281                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
282                ))
283            }
284        }
285    }
286
287    /// Execute one single-entity reduced SQL query or introspection statement.
288    ///
289    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
290    /// returns SQL-shaped statement output instead of typed entities.
291    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
292    where
293        E: PersistedRow<Canister = C> + EntityValue,
294    {
295        let compiled = self.compile_sql_query::<E>(sql)?;
296
297        self.execute_compiled_sql_owned::<E>(compiled)
298    }
299
300    /// Execute one reduced SQL query while reporting the compile/execute split
301    /// at the top-level SQL seam.
302    #[cfg(feature = "diagnostics")]
303    #[doc(hidden)]
304    pub fn execute_sql_query_with_attribution<E>(
305        &self,
306        sql: &str,
307    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
308    where
309        E: PersistedRow<Canister = C> + EntityValue,
310    {
311        // Phase 1: measure the compile side of the new seam, including parse,
312        // surface validation, and semantic command construction.
313        let (compile_local_instructions, compiled) =
314            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
315        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
316
317        // Phase 2: measure the execute side separately so repeat-run cache
318        // experiments can prove which side actually moved.
319        let store_get_calls_before = DataStore::current_get_call_count();
320        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
321        let pure_covering_row_assembly_before =
322            current_pure_covering_row_assembly_local_instructions();
323        let (result, execute_cache_attribution, execute_phase_attribution) =
324            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
325        let store_get_calls =
326            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
327        let pure_covering_decode_local_instructions =
328            current_pure_covering_decode_local_instructions()
329                .saturating_sub(pure_covering_decode_before);
330        let pure_covering_row_assembly_local_instructions =
331            current_pure_covering_row_assembly_local_instructions()
332                .saturating_sub(pure_covering_row_assembly_before);
333        let execute_local_instructions = execute_phase_attribution
334            .planner_local_instructions
335            .saturating_add(execute_phase_attribution.store_local_instructions)
336            .saturating_add(execute_phase_attribution.executor_local_instructions)
337            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
338        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
339        let total_local_instructions =
340            compile_local_instructions.saturating_add(execute_local_instructions);
341        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
342            GroupedExecutionAttribution {
343                stream_local_instructions: execute_phase_attribution
344                    .grouped_stream_local_instructions,
345                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
346                finalize_local_instructions: execute_phase_attribution
347                    .grouped_finalize_local_instructions,
348                count: GroupedCountAttribution::from_executor(
349                    execute_phase_attribution.grouped_count,
350                ),
351            },
352        );
353        let pure_covering = (pure_covering_decode_local_instructions > 0
354            || pure_covering_row_assembly_local_instructions > 0)
355            .then_some(SqlPureCoveringAttribution {
356                decode_local_instructions: pure_covering_decode_local_instructions,
357                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
358            });
359
360        Ok((
361            result,
362            SqlQueryExecutionAttribution {
363                compile_local_instructions,
364                compile: SqlCompileAttribution {
365                    cache_key_local_instructions: compile_phase_attribution.cache_key,
366                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
367                    parse_local_instructions: compile_phase_attribution.parse,
368                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
369                    parse_select_local_instructions: compile_phase_attribution.parse_select,
370                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
371                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
372                    aggregate_lane_check_local_instructions: compile_phase_attribution
373                        .aggregate_lane_check,
374                    prepare_local_instructions: compile_phase_attribution.prepare,
375                    lower_local_instructions: compile_phase_attribution.lower,
376                    bind_local_instructions: compile_phase_attribution.bind,
377                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
378                },
379                plan_lookup_local_instructions: execute_phase_attribution
380                    .planner_local_instructions,
381                execution: SqlExecutionAttribution {
382                    planner_local_instructions: execute_phase_attribution
383                        .planner_local_instructions,
384                    store_local_instructions: execute_phase_attribution.store_local_instructions,
385                    executor_invocation_local_instructions: execute_phase_attribution
386                        .executor_invocation_local_instructions,
387                    executor_local_instructions: execute_phase_attribution
388                        .executor_local_instructions,
389                    response_finalization_local_instructions: execute_phase_attribution
390                        .response_finalization_local_instructions,
391                },
392                grouped,
393                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
394                    execute_phase_attribution.scalar_aggregate_terminal,
395                ),
396                pure_covering,
397                store_get_calls,
398                response_decode_local_instructions: 0,
399                execute_local_instructions,
400                total_local_instructions,
401                cache: SqlQueryCacheAttribution {
402                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
403                    sql_compiled_command_misses: cache_attribution
404                        .sql_compiled_command_cache_misses,
405                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
406                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
407                },
408            },
409        ))
410    }
411
412    /// Execute one single-entity reduced SQL mutation statement.
413    ///
414    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
415    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
416    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
417    where
418        E: PersistedRow<Canister = C> + EntityValue,
419    {
420        let compiled = self.compile_sql_update::<E>(sql)?;
421
422        self.execute_compiled_sql_owned::<E>(compiled)
423    }
424
425    /// Prepare one SQL DDL statement against the accepted schema catalog.
426    ///
427    /// This is a non-executing surface: it proves the statement can bind,
428    /// derive an accepted-after snapshot, and pass schema mutation admission,
429    /// then returns a prepared-only report without mutating schema or index
430    /// storage.
431    pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
432    where
433        E: PersistedRow<Canister = C> + EntityValue,
434    {
435        let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
436
437        Ok(prepared.report().clone())
438    }
439
440    fn prepare_sql_ddl_command<E>(
441        &self,
442        sql: &str,
443    ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
444    where
445        E: PersistedRow<Canister = C> + EntityValue,
446    {
447        let (statement, _) =
448            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
449        let (accepted_schema, _) = self
450            .accepted_entity_authority::<E>()
451            .map_err(QueryError::execute)?;
452        let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
453            E::MODEL,
454            &accepted_schema,
455            true,
456        );
457        let prepared = match prepare_sql_ddl_statement(
458            &statement,
459            &accepted_schema,
460            &schema_info,
461            E::Store::PATH,
462        ) {
463            Ok(prepared) => prepared,
464            Err(err) => {
465                return Err(QueryError::unsupported_query(format!(
466                    "SQL DDL preparation failed before execution: {err}"
467                )));
468            }
469        };
470
471        Ok((accepted_schema, prepared))
472    }
473
474    /// Execute one SQL DDL statement.
475    ///
476    /// Supported DDL routes through schema-owned physical work and
477    /// accepted-snapshot publication.
478    pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
479    where
480        E: PersistedRow<Canister = C> + EntityValue,
481    {
482        let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
483        if !prepared.mutates_schema() {
484            return Ok(SqlStatementResult::Ddl(
485                prepared
486                    .report()
487                    .clone()
488                    .with_execution_status(SqlDdlExecutionStatus::NoOp),
489            ));
490        }
491
492        let Some(derivation) = prepared.derivation() else {
493            return Err(QueryError::unsupported_query(
494                "SQL DDL execution could not find a prepared schema derivation".to_string(),
495            ));
496        };
497        let store = self
498            .db
499            .recovered_store(E::Store::PATH)
500            .map_err(QueryError::execute)?;
501
502        let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
503            crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
504                execute_sql_ddl_field_addition(
505                    store,
506                    E::ENTITY_TAG,
507                    E::PATH,
508                    &accepted_before,
509                    derivation,
510                )
511                .map_err(QueryError::execute)?;
512
513                (0, 0)
514            }
515            crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
516                execute_sql_ddl_field_default_change(
517                    store,
518                    E::ENTITY_TAG,
519                    E::PATH,
520                    &accepted_before,
521                    derivation,
522                )
523                .map_err(QueryError::execute)?;
524
525                (0, 0)
526            }
527            crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
528                let rows_scanned = execute_sql_ddl_field_nullability_change(
529                    store,
530                    E::ENTITY_TAG,
531                    E::PATH,
532                    &accepted_before,
533                    derivation,
534                )
535                .map_err(QueryError::execute)?;
536
537                (rows_scanned, 0)
538            }
539            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
540                if create.candidate_index().key().is_field_path_only() =>
541            {
542                execute_sql_ddl_field_path_index_addition(
543                    store,
544                    E::ENTITY_TAG,
545                    E::PATH,
546                    &accepted_before,
547                    derivation,
548                )
549                .map_err(QueryError::execute)?
550            }
551            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
552                execute_sql_ddl_expression_index_addition(
553                    store,
554                    E::ENTITY_TAG,
555                    E::PATH,
556                    &accepted_before,
557                    derivation,
558                )
559                .map_err(QueryError::execute)?
560            }
561            crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
562                execute_sql_ddl_secondary_index_drop(
563                    store,
564                    E::ENTITY_TAG,
565                    E::PATH,
566                    &accepted_before,
567                    derivation,
568                )
569                .map_err(QueryError::execute)?;
570
571                (0, 0)
572            }
573            crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
574        };
575
576        Ok(SqlStatementResult::Ddl(
577            prepared
578                .report()
579                .clone()
580                .with_execution_status(SqlDdlExecutionStatus::Published)
581                .with_execution_metrics(rows_scanned, index_keys_written),
582        ))
583    }
584}