Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        executor::{EntityAuthority, SharedPreparedExecutionPlan},
31        query::intent::StructuralQuery,
32        schema::{AcceptedSchemaSnapshot, SchemaInfo},
33        schema::{
34            execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35            execute_sql_ddl_field_default_change, execute_sql_ddl_field_path_index_addition,
36            execute_sql_ddl_secondary_index_drop,
37        },
38        session::query::QueryPlanCacheAttribution,
39        session::sql::projection::{
40            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
41        },
42        sql::{
43            ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
44            parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
45        },
46    },
47    traits::{CanisterKind, EntityValue, Path},
48};
49
50pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
51pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
52#[cfg(feature = "diagnostics")]
53pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
54#[cfg(feature = "diagnostics")]
55pub use attribution::{
56    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
57    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
58};
59pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
60pub(in crate::db::session::sql) use cache::{
61    SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
62};
63pub(in crate::db::session::sql) use compile::{
64    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
65};
66pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
67pub use result::SqlStatementResult;
68
69#[cfg(all(test, not(feature = "diagnostics")))]
70pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
71#[cfg(feature = "diagnostics")]
72pub use crate::db::session::sql::projection::{
73    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
74};
75
76// Keep parsing as a module-owned helper instead of hanging a pure parser off
77// `DbSession` as a fake session method.
78#[cfg(test)]
79pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
80    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
81}
82
83/// Return the entity identifier targeted by one reduced SQL statement.
84///
85/// `SHOW ENTITIES` intentionally has no entity target; callers that dispatch
86/// across canister-owned entities may route it through any accepted entity.
87#[doc(hidden)]
88pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
89    let (statement, _) =
90        parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
91
92    Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
93}
94
95const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
96    match statement {
97        SqlStatement::Select(statement) => Some(statement.entity.as_str()),
98        SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
99        SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
100        SqlStatement::Update(statement) => Some(statement.entity.as_str()),
101        SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
102            Some(statement.entity.as_str())
103        }
104        SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
105            Some(entity) => Some(entity.as_str()),
106            None => None,
107        },
108        SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
109            Some(statement.entity.as_str())
110        }
111        SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
112            Some(statement.entity.as_str())
113        }
114        SqlStatement::Explain(statement) => match &statement.statement {
115            SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
116            SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
117        },
118        SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
119        SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
120        SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
121        SqlStatement::ShowEntities(_) => None,
122    }
123}
124
125// Measure one SQL compile stage and immediately surface the stage result. The
126// helper keeps attribution capture uniform while avoiding repeated
127// `(cost, result); result?` boilerplate across the compile pipeline.
128fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
129    let (local_instructions, result) = measure_sql_stage(stage);
130    let value = result?;
131
132    Ok((local_instructions, value))
133}
134
135impl<C: CanisterKind> DbSession<C> {
136    // Resolve one SQL SELECT through a caller-selected accepted authority and
137    // accepted schema snapshot. Typed SQL entrypoints use this to avoid passing
138    // generated authority through the runtime cache boundary.
139    fn sql_select_prepared_plan_for_accepted_authority(
140        &self,
141        query: &StructuralQuery,
142        authority: EntityAuthority,
143        accepted_schema: &AcceptedSchemaSnapshot,
144    ) -> Result<
145        (
146            SharedPreparedExecutionPlan,
147            SqlProjectionContract,
148            SqlCacheAttribution,
149        ),
150        QueryError,
151    > {
152        let (prepared_plan, cache_attribution) = self
153            .cached_shared_query_plan_for_accepted_authority(
154                authority.clone(),
155                accepted_schema,
156                query,
157            )?;
158        Ok(Self::sql_select_projection_from_prepared_plan(
159            prepared_plan,
160            authority,
161            cache_attribution,
162        ))
163    }
164
165    // Resolve one typed SQL SELECT through accepted schema authority selected
166    // at the session boundary.
167    fn sql_select_prepared_plan_for_entity<E>(
168        &self,
169        query: &StructuralQuery,
170    ) -> Result<
171        (
172            SharedPreparedExecutionPlan,
173            SqlProjectionContract,
174            SqlCacheAttribution,
175        ),
176        QueryError,
177    >
178    where
179        E: PersistedRow<Canister = C> + EntityValue,
180    {
181        let (accepted_schema, authority) = self
182            .accepted_entity_authority::<E>()
183            .map_err(QueryError::execute)?;
184
185        self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
186    }
187
188    fn sql_select_projection_from_prepared_plan(
189        prepared_plan: SharedPreparedExecutionPlan,
190        authority: EntityAuthority,
191        cache_attribution: QueryPlanCacheAttribution,
192    ) -> (
193        SharedPreparedExecutionPlan,
194        SqlProjectionContract,
195        SqlCacheAttribution,
196    ) {
197        let projection_spec = prepared_plan
198            .logical_plan()
199            .projection_spec(authority.model());
200        let projection = SqlProjectionContract::new(
201            projection_labels_from_projection_spec(&projection_spec),
202            projection_fixed_scales_from_projection_spec(&projection_spec),
203        );
204
205        (
206            prepared_plan,
207            projection,
208            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
209        )
210    }
211
212    // Keep query/update surface gating owned by one helper so the SQL
213    // compiled-command lane does not duplicate the same statement-family split
214    // just to change the outward error wording.
215    fn ensure_sql_statement_supported_for_surface(
216        statement: &SqlStatement,
217        surface: SqlCompiledCommandSurface,
218    ) -> Result<(), QueryError> {
219        match (surface, statement) {
220            (
221                SqlCompiledCommandSurface::Query,
222                SqlStatement::Select(_)
223                | SqlStatement::Explain(_)
224                | SqlStatement::Describe(_)
225                | SqlStatement::ShowIndexes(_)
226                | SqlStatement::ShowColumns(_)
227                | SqlStatement::ShowEntities(_),
228            )
229            | (
230                SqlCompiledCommandSurface::Update,
231                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
232            ) => Ok(()),
233            (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
234                "SQL DDL execution is not supported in this release",
235            )),
236            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
237                Err(QueryError::unsupported_query(
238                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
239                ))
240            }
241            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
242                Err(QueryError::unsupported_query(
243                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
244                ))
245            }
246            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
247                Err(QueryError::unsupported_query(
248                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
249                ))
250            }
251            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
252                Err(QueryError::unsupported_query(
253                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
254                ))
255            }
256            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
257                Err(QueryError::unsupported_query(
258                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
259                ))
260            }
261            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
262                Err(QueryError::unsupported_query(
263                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
264                ))
265            }
266            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
267                Err(QueryError::unsupported_query(
268                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
269                ))
270            }
271            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
272                Err(QueryError::unsupported_query(
273                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
274                ))
275            }
276            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
277                Err(QueryError::unsupported_query(
278                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
279                ))
280            }
281        }
282    }
283
284    /// Execute one single-entity reduced SQL query or introspection statement.
285    ///
286    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
287    /// returns SQL-shaped statement output instead of typed entities.
288    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
289    where
290        E: PersistedRow<Canister = C> + EntityValue,
291    {
292        let compiled = self.compile_sql_query::<E>(sql)?;
293
294        self.execute_compiled_sql_owned::<E>(compiled)
295    }
296
297    /// Execute one reduced SQL query while reporting the compile/execute split
298    /// at the top-level SQL seam.
299    #[cfg(feature = "diagnostics")]
300    #[doc(hidden)]
301    pub fn execute_sql_query_with_attribution<E>(
302        &self,
303        sql: &str,
304    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
305    where
306        E: PersistedRow<Canister = C> + EntityValue,
307    {
308        // Phase 1: measure the compile side of the new seam, including parse,
309        // surface validation, and semantic command construction.
310        let (compile_local_instructions, compiled) =
311            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
312        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
313
314        // Phase 2: measure the execute side separately so repeat-run cache
315        // experiments can prove which side actually moved.
316        let store_get_calls_before = DataStore::current_get_call_count();
317        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
318        let pure_covering_row_assembly_before =
319            current_pure_covering_row_assembly_local_instructions();
320        let (result, execute_cache_attribution, execute_phase_attribution) =
321            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
322        let store_get_calls =
323            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
324        let pure_covering_decode_local_instructions =
325            current_pure_covering_decode_local_instructions()
326                .saturating_sub(pure_covering_decode_before);
327        let pure_covering_row_assembly_local_instructions =
328            current_pure_covering_row_assembly_local_instructions()
329                .saturating_sub(pure_covering_row_assembly_before);
330        let execute_local_instructions = execute_phase_attribution
331            .planner_local_instructions
332            .saturating_add(execute_phase_attribution.store_local_instructions)
333            .saturating_add(execute_phase_attribution.executor_local_instructions)
334            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
335        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
336        let total_local_instructions =
337            compile_local_instructions.saturating_add(execute_local_instructions);
338        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
339            GroupedExecutionAttribution {
340                stream_local_instructions: execute_phase_attribution
341                    .grouped_stream_local_instructions,
342                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
343                finalize_local_instructions: execute_phase_attribution
344                    .grouped_finalize_local_instructions,
345                count: GroupedCountAttribution::from_executor(
346                    execute_phase_attribution.grouped_count,
347                ),
348            },
349        );
350        let pure_covering = (pure_covering_decode_local_instructions > 0
351            || pure_covering_row_assembly_local_instructions > 0)
352            .then_some(SqlPureCoveringAttribution {
353                decode_local_instructions: pure_covering_decode_local_instructions,
354                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
355            });
356
357        Ok((
358            result,
359            SqlQueryExecutionAttribution {
360                compile_local_instructions,
361                compile: SqlCompileAttribution {
362                    cache_key_local_instructions: compile_phase_attribution.cache_key,
363                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
364                    parse_local_instructions: compile_phase_attribution.parse,
365                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
366                    parse_select_local_instructions: compile_phase_attribution.parse_select,
367                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
368                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
369                    aggregate_lane_check_local_instructions: compile_phase_attribution
370                        .aggregate_lane_check,
371                    prepare_local_instructions: compile_phase_attribution.prepare,
372                    lower_local_instructions: compile_phase_attribution.lower,
373                    bind_local_instructions: compile_phase_attribution.bind,
374                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
375                },
376                plan_lookup_local_instructions: execute_phase_attribution
377                    .planner_local_instructions,
378                execution: SqlExecutionAttribution {
379                    planner_local_instructions: execute_phase_attribution
380                        .planner_local_instructions,
381                    store_local_instructions: execute_phase_attribution.store_local_instructions,
382                    executor_invocation_local_instructions: execute_phase_attribution
383                        .executor_invocation_local_instructions,
384                    executor_local_instructions: execute_phase_attribution
385                        .executor_local_instructions,
386                    response_finalization_local_instructions: execute_phase_attribution
387                        .response_finalization_local_instructions,
388                },
389                grouped,
390                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
391                    execute_phase_attribution.scalar_aggregate_terminal,
392                ),
393                pure_covering,
394                store_get_calls,
395                response_decode_local_instructions: 0,
396                execute_local_instructions,
397                total_local_instructions,
398                cache: SqlQueryCacheAttribution {
399                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
400                    sql_compiled_command_misses: cache_attribution
401                        .sql_compiled_command_cache_misses,
402                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
403                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
404                },
405            },
406        ))
407    }
408
409    /// Execute one single-entity reduced SQL mutation statement.
410    ///
411    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
412    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
413    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
414    where
415        E: PersistedRow<Canister = C> + EntityValue,
416    {
417        let compiled = self.compile_sql_update::<E>(sql)?;
418
419        self.execute_compiled_sql_owned::<E>(compiled)
420    }
421
422    /// Prepare one SQL DDL statement against the accepted schema catalog.
423    ///
424    /// This is a non-executing surface: it proves the statement can bind,
425    /// derive an accepted-after snapshot, and pass schema mutation admission,
426    /// then returns a prepared-only report without mutating schema or index
427    /// storage.
428    pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
429    where
430        E: PersistedRow<Canister = C> + EntityValue,
431    {
432        let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
433
434        Ok(prepared.report().clone())
435    }
436
437    fn prepare_sql_ddl_command<E>(
438        &self,
439        sql: &str,
440    ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
441    where
442        E: PersistedRow<Canister = C> + EntityValue,
443    {
444        let (statement, _) =
445            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
446        let (accepted_schema, _) = self
447            .accepted_entity_authority::<E>()
448            .map_err(QueryError::execute)?;
449        let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
450            E::MODEL,
451            &accepted_schema,
452            true,
453        );
454        let prepared = match prepare_sql_ddl_statement(
455            &statement,
456            &accepted_schema,
457            &schema_info,
458            E::Store::PATH,
459        ) {
460            Ok(prepared) => prepared,
461            Err(err) => {
462                return Err(QueryError::unsupported_query(format!(
463                    "SQL DDL preparation failed before execution: {err}"
464                )));
465            }
466        };
467
468        Ok((accepted_schema, prepared))
469    }
470
471    /// Execute one SQL DDL statement.
472    ///
473    /// Supported DDL routes through schema-owned physical work and
474    /// accepted-snapshot publication.
475    pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
476    where
477        E: PersistedRow<Canister = C> + EntityValue,
478    {
479        let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
480        if !prepared.mutates_schema() {
481            return Ok(SqlStatementResult::Ddl(
482                prepared
483                    .report()
484                    .clone()
485                    .with_execution_status(SqlDdlExecutionStatus::NoOp),
486            ));
487        }
488
489        let Some(derivation) = prepared.derivation() else {
490            return Err(QueryError::unsupported_query(
491                "SQL DDL execution could not find a prepared schema derivation".to_string(),
492            ));
493        };
494        let store = self
495            .db
496            .recovered_store(E::Store::PATH)
497            .map_err(QueryError::execute)?;
498
499        let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
500            crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
501                execute_sql_ddl_field_addition(
502                    store,
503                    E::ENTITY_TAG,
504                    E::PATH,
505                    &accepted_before,
506                    derivation,
507                )
508                .map_err(QueryError::execute)?;
509
510                (0, 0)
511            }
512            crate::db::sql::ddl::BoundSqlDdlStatement::AlterColumnDefault(_) => {
513                execute_sql_ddl_field_default_change(
514                    store,
515                    E::ENTITY_TAG,
516                    E::PATH,
517                    &accepted_before,
518                    derivation,
519                )
520                .map_err(QueryError::execute)?;
521
522                (0, 0)
523            }
524            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
525                if create.candidate_index().key().is_field_path_only() =>
526            {
527                execute_sql_ddl_field_path_index_addition(
528                    store,
529                    E::ENTITY_TAG,
530                    E::PATH,
531                    &accepted_before,
532                    derivation,
533                )
534                .map_err(QueryError::execute)?
535            }
536            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
537                execute_sql_ddl_expression_index_addition(
538                    store,
539                    E::ENTITY_TAG,
540                    E::PATH,
541                    &accepted_before,
542                    derivation,
543                )
544                .map_err(QueryError::execute)?
545            }
546            crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
547                execute_sql_ddl_secondary_index_drop(
548                    store,
549                    E::ENTITY_TAG,
550                    E::PATH,
551                    &accepted_before,
552                    derivation,
553                )
554                .map_err(QueryError::execute)?;
555
556                (0, 0)
557            }
558            crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
559        };
560
561        Ok(SqlStatementResult::Ddl(
562            prepared
563                .report()
564                .clone()
565                .with_execution_status(SqlDdlExecutionStatus::Published)
566                .with_execution_metrics(rows_scanned, index_keys_written),
567        ))
568    }
569}