Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        executor::{EntityAuthority, SharedPreparedExecutionPlan},
31        query::intent::StructuralQuery,
32        schema::{AcceptedSchemaSnapshot, SchemaInfo},
33        schema::{
34            execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35            execute_sql_ddl_field_default_change, execute_sql_ddl_field_nullability_change,
36            execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop,
37        },
38        session::query::QueryPlanCacheAttribution,
39        session::sql::projection::{
40            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
41        },
42        sql::{
43            ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
44            parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
45        },
46    },
47    traits::{CanisterKind, EntityValue, Path},
48};
49
50pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
51pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
52#[cfg(feature = "diagnostics")]
53pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
54#[cfg(feature = "diagnostics")]
55pub use attribution::{
56    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
57    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
58};
59pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
60pub(in crate::db::session::sql) use cache::{
61    SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
62};
63pub(in crate::db::session::sql) use compile::{
64    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
65};
66pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
67pub use result::SqlStatementResult;
68
69#[cfg(all(test, not(feature = "diagnostics")))]
70pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
71#[cfg(feature = "diagnostics")]
72pub use crate::db::session::sql::projection::{
73    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
74};
75
76// Keep parsing as a module-owned helper instead of hanging a pure parser off
77// `DbSession` as a fake session method.
78#[cfg(test)]
79pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
80    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
81}
82
83/// Return the entity identifier targeted by one reduced SQL statement.
84///
85/// `SHOW ENTITIES` intentionally has no entity target; callers that dispatch
86/// across canister-owned entities may route it through any accepted entity.
87#[doc(hidden)]
88pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
89    let (statement, _) =
90        parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
91
92    Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
93}
94
95const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
96    match statement {
97        SqlStatement::Select(statement) => Some(statement.entity.as_str()),
98        SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
99        SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
100        SqlStatement::Update(statement) => Some(statement.entity.as_str()),
101        SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
102            Some(statement.entity.as_str())
103        }
104        SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
105            Some(entity) => Some(entity.as_str()),
106            None => None,
107        },
108        SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
109            Some(statement.entity.as_str())
110        }
111        SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
112            Some(statement.entity.as_str())
113        }
114        SqlStatement::Ddl(SqlDdlStatement::AlterTableDropColumn(statement)) => {
115            Some(statement.entity.as_str())
116        }
117        SqlStatement::Ddl(SqlDdlStatement::AlterTableRenameColumn(statement)) => {
118            Some(statement.entity.as_str())
119        }
120        SqlStatement::Explain(statement) => match &statement.statement {
121            SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
122            SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
123        },
124        SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
125        SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
126        SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
127        SqlStatement::ShowEntities(_) => None,
128    }
129}
130
131// Measure one SQL compile stage and immediately surface the stage result. The
132// helper keeps attribution capture uniform while avoiding repeated
133// `(cost, result); result?` boilerplate across the compile pipeline.
134fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
135    let (local_instructions, result) = measure_sql_stage(stage);
136    let value = result?;
137
138    Ok((local_instructions, value))
139}
140
141impl<C: CanisterKind> DbSession<C> {
142    // Resolve one SQL SELECT through a caller-selected accepted authority and
143    // accepted schema snapshot. Typed SQL entrypoints use this to avoid passing
144    // generated authority through the runtime cache boundary.
145    fn sql_select_prepared_plan_for_accepted_authority(
146        &self,
147        query: &StructuralQuery,
148        authority: EntityAuthority,
149        accepted_schema: &AcceptedSchemaSnapshot,
150    ) -> Result<
151        (
152            SharedPreparedExecutionPlan,
153            SqlProjectionContract,
154            SqlCacheAttribution,
155        ),
156        QueryError,
157    > {
158        let (prepared_plan, cache_attribution) = self
159            .cached_shared_query_plan_for_accepted_authority(
160                authority.clone(),
161                accepted_schema,
162                query,
163            )?;
164        Ok(Self::sql_select_projection_from_prepared_plan(
165            prepared_plan,
166            authority,
167            cache_attribution,
168        ))
169    }
170
171    // Resolve one typed SQL SELECT through accepted schema authority selected
172    // at the session boundary.
173    fn sql_select_prepared_plan_for_entity<E>(
174        &self,
175        query: &StructuralQuery,
176    ) -> Result<
177        (
178            SharedPreparedExecutionPlan,
179            SqlProjectionContract,
180            SqlCacheAttribution,
181        ),
182        QueryError,
183    >
184    where
185        E: PersistedRow<Canister = C> + EntityValue,
186    {
187        let (accepted_schema, authority) = self
188            .accepted_entity_authority::<E>()
189            .map_err(QueryError::execute)?;
190
191        self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
192    }
193
194    fn sql_select_projection_from_prepared_plan(
195        prepared_plan: SharedPreparedExecutionPlan,
196        authority: EntityAuthority,
197        cache_attribution: QueryPlanCacheAttribution,
198    ) -> (
199        SharedPreparedExecutionPlan,
200        SqlProjectionContract,
201        SqlCacheAttribution,
202    ) {
203        let projection_spec = prepared_plan
204            .logical_plan()
205            .projection_spec(authority.model());
206        let projection = SqlProjectionContract::new(
207            projection_labels_from_projection_spec(&projection_spec),
208            projection_fixed_scales_from_projection_spec(&projection_spec),
209        );
210
211        (
212            prepared_plan,
213            projection,
214            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
215        )
216    }
217
218    // Keep query/update surface gating owned by one helper so the SQL
219    // compiled-command lane does not duplicate the same statement-family split
220    // just to change the outward error wording.
221    fn ensure_sql_statement_supported_for_surface(
222        statement: &SqlStatement,
223        surface: SqlCompiledCommandSurface,
224    ) -> Result<(), QueryError> {
225        match (surface, statement) {
226            (
227                SqlCompiledCommandSurface::Query,
228                SqlStatement::Select(_)
229                | SqlStatement::Explain(_)
230                | SqlStatement::Describe(_)
231                | SqlStatement::ShowIndexes(_)
232                | SqlStatement::ShowColumns(_)
233                | SqlStatement::ShowEntities(_),
234            )
235            | (
236                SqlCompiledCommandSurface::Update,
237                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
238            ) => Ok(()),
239            (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
240                "SQL DDL execution is not supported in this release",
241            )),
242            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
243                Err(QueryError::unsupported_query(
244                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
245                ))
246            }
247            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
248                Err(QueryError::unsupported_query(
249                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
250                ))
251            }
252            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
253                Err(QueryError::unsupported_query(
254                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
255                ))
256            }
257            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
258                Err(QueryError::unsupported_query(
259                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
260                ))
261            }
262            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
263                Err(QueryError::unsupported_query(
264                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
265                ))
266            }
267            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
268                Err(QueryError::unsupported_query(
269                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
270                ))
271            }
272            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
273                Err(QueryError::unsupported_query(
274                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
275                ))
276            }
277            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
278                Err(QueryError::unsupported_query(
279                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
280                ))
281            }
282            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
283                Err(QueryError::unsupported_query(
284                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
285                ))
286            }
287        }
288    }
289
290    /// Execute one single-entity reduced SQL query or introspection statement.
291    ///
292    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
293    /// returns SQL-shaped statement output instead of typed entities.
294    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
295    where
296        E: PersistedRow<Canister = C> + EntityValue,
297    {
298        let compiled = self.compile_sql_query::<E>(sql)?;
299
300        self.execute_compiled_sql_owned::<E>(compiled)
301    }
302
303    /// Execute one reduced SQL query while reporting the compile/execute split
304    /// at the top-level SQL seam.
305    #[cfg(feature = "diagnostics")]
306    #[doc(hidden)]
307    pub fn execute_sql_query_with_attribution<E>(
308        &self,
309        sql: &str,
310    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
311    where
312        E: PersistedRow<Canister = C> + EntityValue,
313    {
314        // Phase 1: measure the compile side of the new seam, including parse,
315        // surface validation, and semantic command construction.
316        let (compile_local_instructions, compiled) =
317            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
318        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
319
320        // Phase 2: measure the execute side separately so repeat-run cache
321        // experiments can prove which side actually moved.
322        let store_get_calls_before = DataStore::current_get_call_count();
323        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
324        let pure_covering_row_assembly_before =
325            current_pure_covering_row_assembly_local_instructions();
326        let (result, execute_cache_attribution, execute_phase_attribution) =
327            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
328        let store_get_calls =
329            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
330        let pure_covering_decode_local_instructions =
331            current_pure_covering_decode_local_instructions()
332                .saturating_sub(pure_covering_decode_before);
333        let pure_covering_row_assembly_local_instructions =
334            current_pure_covering_row_assembly_local_instructions()
335                .saturating_sub(pure_covering_row_assembly_before);
336        let execute_local_instructions = execute_phase_attribution
337            .planner_local_instructions
338            .saturating_add(execute_phase_attribution.store_local_instructions)
339            .saturating_add(execute_phase_attribution.executor_local_instructions)
340            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
341        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
342        let total_local_instructions =
343            compile_local_instructions.saturating_add(execute_local_instructions);
344        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
345            GroupedExecutionAttribution {
346                stream_local_instructions: execute_phase_attribution
347                    .grouped_stream_local_instructions,
348                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
349                finalize_local_instructions: execute_phase_attribution
350                    .grouped_finalize_local_instructions,
351                count: GroupedCountAttribution::from_executor(
352                    execute_phase_attribution.grouped_count,
353                ),
354            },
355        );
356        let pure_covering = (pure_covering_decode_local_instructions > 0
357            || pure_covering_row_assembly_local_instructions > 0)
358            .then_some(SqlPureCoveringAttribution {
359                decode_local_instructions: pure_covering_decode_local_instructions,
360                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
361            });
362
363        Ok((
364            result,
365            SqlQueryExecutionAttribution {
366                compile_local_instructions,
367                compile: SqlCompileAttribution {
368                    cache_key_local_instructions: compile_phase_attribution.cache_key,
369                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
370                    parse_local_instructions: compile_phase_attribution.parse,
371                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
372                    parse_select_local_instructions: compile_phase_attribution.parse_select,
373                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
374                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
375                    aggregate_lane_check_local_instructions: compile_phase_attribution
376                        .aggregate_lane_check,
377                    prepare_local_instructions: compile_phase_attribution.prepare,
378                    lower_local_instructions: compile_phase_attribution.lower,
379                    bind_local_instructions: compile_phase_attribution.bind,
380                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
381                },
382                plan_lookup_local_instructions: execute_phase_attribution
383                    .planner_local_instructions,
384                execution: SqlExecutionAttribution {
385                    planner_local_instructions: execute_phase_attribution
386                        .planner_local_instructions,
387                    store_local_instructions: execute_phase_attribution.store_local_instructions,
388                    executor_invocation_local_instructions: execute_phase_attribution
389                        .executor_invocation_local_instructions,
390                    executor_local_instructions: execute_phase_attribution
391                        .executor_local_instructions,
392                    response_finalization_local_instructions: execute_phase_attribution
393                        .response_finalization_local_instructions,
394                },
395                grouped,
396                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
397                    execute_phase_attribution.scalar_aggregate_terminal,
398                ),
399                pure_covering,
400                store_get_calls,
401                response_decode_local_instructions: 0,
402                execute_local_instructions,
403                total_local_instructions,
404                cache: SqlQueryCacheAttribution {
405                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
406                    sql_compiled_command_misses: cache_attribution
407                        .sql_compiled_command_cache_misses,
408                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
409                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
410                },
411            },
412        ))
413    }
414
415    /// Execute one single-entity reduced SQL mutation statement.
416    ///
417    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
418    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
419    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
420    where
421        E: PersistedRow<Canister = C> + EntityValue,
422    {
423        let compiled = self.compile_sql_update::<E>(sql)?;
424
425        self.execute_compiled_sql_owned::<E>(compiled)
426    }
427
428    /// Prepare one SQL DDL statement against the accepted schema catalog.
429    ///
430    /// This is a non-executing surface: it proves the statement can bind,
431    /// derive an accepted-after snapshot, and pass schema mutation admission,
432    /// then returns a prepared-only report without mutating schema or index
433    /// storage.
434    pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
435    where
436        E: PersistedRow<Canister = C> + EntityValue,
437    {
438        let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
439
440        Ok(prepared.report().clone())
441    }
442
443    fn prepare_sql_ddl_command<E>(
444        &self,
445        sql: &str,
446    ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
447    where
448        E: PersistedRow<Canister = C> + EntityValue,
449    {
450        let (statement, _) =
451            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
452        let (accepted_schema, _) = self
453            .accepted_entity_authority::<E>()
454            .map_err(QueryError::execute)?;
455        let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
456            E::MODEL,
457            &accepted_schema,
458            true,
459        );
460        let prepared = match prepare_sql_ddl_statement(
461            &statement,
462            &accepted_schema,
463            &schema_info,
464            E::Store::PATH,
465        ) {
466            Ok(prepared) => prepared,
467            Err(err) => {
468                return Err(QueryError::unsupported_query(format!(
469                    "SQL DDL preparation failed before execution: {err}"
470                )));
471            }
472        };
473
474        Ok((accepted_schema, prepared))
475    }
476
477    /// Execute one SQL DDL statement.
478    ///
479    /// Supported DDL routes through schema-owned physical work and
480    /// accepted-snapshot publication.
481    pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
482    where
483        E: PersistedRow<Canister = C> + EntityValue,
484    {
485        let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
486        if !prepared.mutates_schema() {
487            return Ok(SqlStatementResult::Ddl(
488                prepared
489                    .report()
490                    .clone()
491                    .with_execution_status(SqlDdlExecutionStatus::NoOp),
492            ));
493        }
494
495        let Some(derivation) = prepared.derivation() else {
496            return Err(QueryError::unsupported_query(
497                "SQL DDL execution could not find a prepared schema derivation".to_string(),
498            ));
499        };
500        let store = self
501            .db
502            .recovered_store(E::Store::PATH)
503            .map_err(QueryError::execute)?;
504
505        let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
506            crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
507                execute_sql_ddl_field_addition(
508                    store,
509                    E::ENTITY_TAG,
510                    E::PATH,
511                    &accepted_before,
512                    derivation,
513                )
514                .map_err(QueryError::execute)?;
515
516                (0, 0)
517            }
518            crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
519                execute_sql_ddl_field_default_change(
520                    store,
521                    E::ENTITY_TAG,
522                    E::PATH,
523                    &accepted_before,
524                    derivation,
525                )
526                .map_err(QueryError::execute)?;
527
528                (0, 0)
529            }
530            crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnNullability(_) => {
531                let rows_scanned = execute_sql_ddl_field_nullability_change(
532                    store,
533                    E::ENTITY_TAG,
534                    E::PATH,
535                    &accepted_before,
536                    derivation,
537                )
538                .map_err(QueryError::execute)?;
539
540                (rows_scanned, 0)
541            }
542            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
543                if create.candidate_index().key().is_field_path_only() =>
544            {
545                execute_sql_ddl_field_path_index_addition(
546                    store,
547                    E::ENTITY_TAG,
548                    E::PATH,
549                    &accepted_before,
550                    derivation,
551                )
552                .map_err(QueryError::execute)?
553            }
554            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
555                execute_sql_ddl_expression_index_addition(
556                    store,
557                    E::ENTITY_TAG,
558                    E::PATH,
559                    &accepted_before,
560                    derivation,
561                )
562                .map_err(QueryError::execute)?
563            }
564            crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
565                execute_sql_ddl_secondary_index_drop(
566                    store,
567                    E::ENTITY_TAG,
568                    E::PATH,
569                    &accepted_before,
570                    derivation,
571                )
572                .map_err(QueryError::execute)?;
573
574                (0, 0)
575            }
576            crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
577        };
578
579        Ok(SqlStatementResult::Ddl(
580            prepared
581                .report()
582                .clone()
583                .with_execution_status(SqlDdlExecutionStatus::Published)
584                .with_execution_metrics(rows_scanned, index_keys_written),
585        ))
586    }
587}