Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        executor::{EntityAuthority, SharedPreparedExecutionPlan},
31        query::intent::StructuralQuery,
32        schema::{AcceptedSchemaSnapshot, SchemaInfo},
33        schema::{
34            execute_sql_ddl_expression_index_addition, execute_sql_ddl_field_addition,
35            execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop,
36        },
37        session::query::QueryPlanCacheAttribution,
38        session::sql::projection::{
39            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
40        },
41        sql::{
42            ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
43            parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
44        },
45    },
46    traits::{CanisterKind, EntityValue, Path},
47};
48
49pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
50pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
51#[cfg(feature = "diagnostics")]
52pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
53#[cfg(feature = "diagnostics")]
54pub use attribution::{
55    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
56    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
57};
58pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
59pub(in crate::db::session::sql) use cache::{
60    SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
61};
62pub(in crate::db::session::sql) use compile::{
63    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
64};
65pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
66pub use result::SqlStatementResult;
67
68#[cfg(all(test, not(feature = "diagnostics")))]
69pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
70#[cfg(feature = "diagnostics")]
71pub use crate::db::session::sql::projection::{
72    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
73};
74
75// Keep parsing as a module-owned helper instead of hanging a pure parser off
76// `DbSession` as a fake session method.
77#[cfg(test)]
78pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
79    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
80}
81
82/// Return the entity identifier targeted by one reduced SQL statement.
83///
84/// `SHOW ENTITIES` intentionally has no entity target; callers that dispatch
85/// across canister-owned entities may route it through any accepted entity.
86#[doc(hidden)]
87pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
88    let (statement, _) =
89        parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
90
91    Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
92}
93
94const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
95    match statement {
96        SqlStatement::Select(statement) => Some(statement.entity.as_str()),
97        SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
98        SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
99        SqlStatement::Update(statement) => Some(statement.entity.as_str()),
100        SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
101            Some(statement.entity.as_str())
102        }
103        SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
104            Some(entity) => Some(entity.as_str()),
105            None => None,
106        },
107        SqlStatement::Ddl(SqlDdlStatement::AlterTableAddColumn(statement)) => {
108            Some(statement.entity.as_str())
109        }
110        SqlStatement::Ddl(SqlDdlStatement::AlterTableAlterColumn(statement)) => {
111            Some(statement.entity.as_str())
112        }
113        SqlStatement::Explain(statement) => match &statement.statement {
114            SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
115            SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
116        },
117        SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
118        SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
119        SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
120        SqlStatement::ShowEntities(_) => None,
121    }
122}
123
124// Measure one SQL compile stage and immediately surface the stage result. The
125// helper keeps attribution capture uniform while avoiding repeated
126// `(cost, result); result?` boilerplate across the compile pipeline.
127fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
128    let (local_instructions, result) = measure_sql_stage(stage);
129    let value = result?;
130
131    Ok((local_instructions, value))
132}
133
134impl<C: CanisterKind> DbSession<C> {
135    // Resolve one SQL SELECT through a caller-selected accepted authority and
136    // accepted schema snapshot. Typed SQL entrypoints use this to avoid passing
137    // generated authority through the runtime cache boundary.
138    fn sql_select_prepared_plan_for_accepted_authority(
139        &self,
140        query: &StructuralQuery,
141        authority: EntityAuthority,
142        accepted_schema: &AcceptedSchemaSnapshot,
143    ) -> Result<
144        (
145            SharedPreparedExecutionPlan,
146            SqlProjectionContract,
147            SqlCacheAttribution,
148        ),
149        QueryError,
150    > {
151        let (prepared_plan, cache_attribution) = self
152            .cached_shared_query_plan_for_accepted_authority(
153                authority.clone(),
154                accepted_schema,
155                query,
156            )?;
157        Ok(Self::sql_select_projection_from_prepared_plan(
158            prepared_plan,
159            authority,
160            cache_attribution,
161        ))
162    }
163
164    // Resolve one typed SQL SELECT through accepted schema authority selected
165    // at the session boundary.
166    fn sql_select_prepared_plan_for_entity<E>(
167        &self,
168        query: &StructuralQuery,
169    ) -> Result<
170        (
171            SharedPreparedExecutionPlan,
172            SqlProjectionContract,
173            SqlCacheAttribution,
174        ),
175        QueryError,
176    >
177    where
178        E: PersistedRow<Canister = C> + EntityValue,
179    {
180        let (accepted_schema, authority) = self
181            .accepted_entity_authority::<E>()
182            .map_err(QueryError::execute)?;
183
184        self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
185    }
186
187    fn sql_select_projection_from_prepared_plan(
188        prepared_plan: SharedPreparedExecutionPlan,
189        authority: EntityAuthority,
190        cache_attribution: QueryPlanCacheAttribution,
191    ) -> (
192        SharedPreparedExecutionPlan,
193        SqlProjectionContract,
194        SqlCacheAttribution,
195    ) {
196        let projection_spec = prepared_plan
197            .logical_plan()
198            .projection_spec(authority.model());
199        let projection = SqlProjectionContract::new(
200            projection_labels_from_projection_spec(&projection_spec),
201            projection_fixed_scales_from_projection_spec(&projection_spec),
202        );
203
204        (
205            prepared_plan,
206            projection,
207            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
208        )
209    }
210
211    // Keep query/update surface gating owned by one helper so the SQL
212    // compiled-command lane does not duplicate the same statement-family split
213    // just to change the outward error wording.
214    fn ensure_sql_statement_supported_for_surface(
215        statement: &SqlStatement,
216        surface: SqlCompiledCommandSurface,
217    ) -> Result<(), QueryError> {
218        match (surface, statement) {
219            (
220                SqlCompiledCommandSurface::Query,
221                SqlStatement::Select(_)
222                | SqlStatement::Explain(_)
223                | SqlStatement::Describe(_)
224                | SqlStatement::ShowIndexes(_)
225                | SqlStatement::ShowColumns(_)
226                | SqlStatement::ShowEntities(_),
227            )
228            | (
229                SqlCompiledCommandSurface::Update,
230                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
231            ) => Ok(()),
232            (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
233                "SQL DDL execution is not supported in this release",
234            )),
235            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
236                Err(QueryError::unsupported_query(
237                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
238                ))
239            }
240            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
241                Err(QueryError::unsupported_query(
242                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
243                ))
244            }
245            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
246                Err(QueryError::unsupported_query(
247                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
248                ))
249            }
250            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
251                Err(QueryError::unsupported_query(
252                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
253                ))
254            }
255            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
256                Err(QueryError::unsupported_query(
257                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
258                ))
259            }
260            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
261                Err(QueryError::unsupported_query(
262                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
263                ))
264            }
265            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
266                Err(QueryError::unsupported_query(
267                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
268                ))
269            }
270            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
271                Err(QueryError::unsupported_query(
272                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
273                ))
274            }
275            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
276                Err(QueryError::unsupported_query(
277                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
278                ))
279            }
280        }
281    }
282
283    /// Execute one single-entity reduced SQL query or introspection statement.
284    ///
285    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
286    /// returns SQL-shaped statement output instead of typed entities.
287    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
288    where
289        E: PersistedRow<Canister = C> + EntityValue,
290    {
291        let compiled = self.compile_sql_query::<E>(sql)?;
292
293        self.execute_compiled_sql_owned::<E>(compiled)
294    }
295
296    /// Execute one reduced SQL query while reporting the compile/execute split
297    /// at the top-level SQL seam.
298    #[cfg(feature = "diagnostics")]
299    #[doc(hidden)]
300    pub fn execute_sql_query_with_attribution<E>(
301        &self,
302        sql: &str,
303    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
304    where
305        E: PersistedRow<Canister = C> + EntityValue,
306    {
307        // Phase 1: measure the compile side of the new seam, including parse,
308        // surface validation, and semantic command construction.
309        let (compile_local_instructions, compiled) =
310            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
311        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
312
313        // Phase 2: measure the execute side separately so repeat-run cache
314        // experiments can prove which side actually moved.
315        let store_get_calls_before = DataStore::current_get_call_count();
316        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
317        let pure_covering_row_assembly_before =
318            current_pure_covering_row_assembly_local_instructions();
319        let (result, execute_cache_attribution, execute_phase_attribution) =
320            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
321        let store_get_calls =
322            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
323        let pure_covering_decode_local_instructions =
324            current_pure_covering_decode_local_instructions()
325                .saturating_sub(pure_covering_decode_before);
326        let pure_covering_row_assembly_local_instructions =
327            current_pure_covering_row_assembly_local_instructions()
328                .saturating_sub(pure_covering_row_assembly_before);
329        let execute_local_instructions = execute_phase_attribution
330            .planner_local_instructions
331            .saturating_add(execute_phase_attribution.store_local_instructions)
332            .saturating_add(execute_phase_attribution.executor_local_instructions)
333            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
334        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
335        let total_local_instructions =
336            compile_local_instructions.saturating_add(execute_local_instructions);
337        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
338            GroupedExecutionAttribution {
339                stream_local_instructions: execute_phase_attribution
340                    .grouped_stream_local_instructions,
341                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
342                finalize_local_instructions: execute_phase_attribution
343                    .grouped_finalize_local_instructions,
344                count: GroupedCountAttribution::from_executor(
345                    execute_phase_attribution.grouped_count,
346                ),
347            },
348        );
349        let pure_covering = (pure_covering_decode_local_instructions > 0
350            || pure_covering_row_assembly_local_instructions > 0)
351            .then_some(SqlPureCoveringAttribution {
352                decode_local_instructions: pure_covering_decode_local_instructions,
353                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
354            });
355
356        Ok((
357            result,
358            SqlQueryExecutionAttribution {
359                compile_local_instructions,
360                compile: SqlCompileAttribution {
361                    cache_key_local_instructions: compile_phase_attribution.cache_key,
362                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
363                    parse_local_instructions: compile_phase_attribution.parse,
364                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
365                    parse_select_local_instructions: compile_phase_attribution.parse_select,
366                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
367                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
368                    aggregate_lane_check_local_instructions: compile_phase_attribution
369                        .aggregate_lane_check,
370                    prepare_local_instructions: compile_phase_attribution.prepare,
371                    lower_local_instructions: compile_phase_attribution.lower,
372                    bind_local_instructions: compile_phase_attribution.bind,
373                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
374                },
375                plan_lookup_local_instructions: execute_phase_attribution
376                    .planner_local_instructions,
377                execution: SqlExecutionAttribution {
378                    planner_local_instructions: execute_phase_attribution
379                        .planner_local_instructions,
380                    store_local_instructions: execute_phase_attribution.store_local_instructions,
381                    executor_invocation_local_instructions: execute_phase_attribution
382                        .executor_invocation_local_instructions,
383                    executor_local_instructions: execute_phase_attribution
384                        .executor_local_instructions,
385                    response_finalization_local_instructions: execute_phase_attribution
386                        .response_finalization_local_instructions,
387                },
388                grouped,
389                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
390                    execute_phase_attribution.scalar_aggregate_terminal,
391                ),
392                pure_covering,
393                store_get_calls,
394                response_decode_local_instructions: 0,
395                execute_local_instructions,
396                total_local_instructions,
397                cache: SqlQueryCacheAttribution {
398                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
399                    sql_compiled_command_misses: cache_attribution
400                        .sql_compiled_command_cache_misses,
401                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
402                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
403                },
404            },
405        ))
406    }
407
408    /// Execute one single-entity reduced SQL mutation statement.
409    ///
410    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
411    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
412    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
413    where
414        E: PersistedRow<Canister = C> + EntityValue,
415    {
416        let compiled = self.compile_sql_update::<E>(sql)?;
417
418        self.execute_compiled_sql_owned::<E>(compiled)
419    }
420
421    /// Prepare one SQL DDL statement against the accepted schema catalog.
422    ///
423    /// This is a non-executing surface: it proves the statement can bind,
424    /// derive an accepted-after snapshot, and pass schema mutation admission,
425    /// then returns a prepared-only report without mutating schema or index
426    /// storage.
427    pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
428    where
429        E: PersistedRow<Canister = C> + EntityValue,
430    {
431        let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
432
433        Ok(prepared.report().clone())
434    }
435
436    fn prepare_sql_ddl_command<E>(
437        &self,
438        sql: &str,
439    ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
440    where
441        E: PersistedRow<Canister = C> + EntityValue,
442    {
443        let (statement, _) =
444            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
445        let (accepted_schema, _) = self
446            .accepted_entity_authority::<E>()
447            .map_err(QueryError::execute)?;
448        let schema_info = SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
449            E::MODEL,
450            &accepted_schema,
451            true,
452        );
453        let prepared = match prepare_sql_ddl_statement(
454            &statement,
455            &accepted_schema,
456            &schema_info,
457            E::Store::PATH,
458        ) {
459            Ok(prepared) => prepared,
460            Err(err) => {
461                return Err(QueryError::unsupported_query(format!(
462                    "SQL DDL preparation failed before execution: {err}"
463                )));
464            }
465        };
466
467        Ok((accepted_schema, prepared))
468    }
469
470    /// Execute one SQL DDL statement.
471    ///
472    /// Supported DDL routes through schema-owned physical work and
473    /// accepted-snapshot publication.
474    pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
475    where
476        E: PersistedRow<Canister = C> + EntityValue,
477    {
478        let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
479        if !prepared.mutates_schema() {
480            return Ok(SqlStatementResult::Ddl(
481                prepared
482                    .report()
483                    .clone()
484                    .with_execution_status(SqlDdlExecutionStatus::NoOp),
485            ));
486        }
487
488        let Some(derivation) = prepared.derivation() else {
489            return Err(QueryError::unsupported_query(
490                "SQL DDL execution could not find a prepared schema derivation".to_string(),
491            ));
492        };
493        let store = self
494            .db
495            .recovered_store(E::Store::PATH)
496            .map_err(QueryError::execute)?;
497
498        let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
499            crate::db::sql::ddl::BoundSqlDdlStatement::AddColumn(_) => {
500                execute_sql_ddl_field_addition(
501                    store,
502                    E::ENTITY_TAG,
503                    E::PATH,
504                    &accepted_before,
505                    derivation,
506                )
507                .map_err(QueryError::execute)?;
508
509                (0, 0)
510            }
511            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(create)
512                if create.candidate_index().key().is_field_path_only() =>
513            {
514                execute_sql_ddl_field_path_index_addition(
515                    store,
516                    E::ENTITY_TAG,
517                    E::PATH,
518                    &accepted_before,
519                    derivation,
520                )
521                .map_err(QueryError::execute)?
522            }
523            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
524                execute_sql_ddl_expression_index_addition(
525                    store,
526                    E::ENTITY_TAG,
527                    E::PATH,
528                    &accepted_before,
529                    derivation,
530                )
531                .map_err(QueryError::execute)?
532            }
533            crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
534                execute_sql_ddl_secondary_index_drop(
535                    store,
536                    E::ENTITY_TAG,
537                    E::PATH,
538                    &accepted_before,
539                    derivation,
540                )
541                .map_err(QueryError::execute)?;
542
543                (0, 0)
544            }
545            crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
546        };
547
548        Ok(SqlStatementResult::Ddl(
549            prepared
550                .report()
551                .clone()
552                .with_execution_status(SqlDdlExecutionStatus::Published)
553                .with_execution_metrics(rows_scanned, index_keys_written),
554        ))
555    }
556}