Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        executor::{EntityAuthority, SharedPreparedExecutionPlan},
31        query::intent::StructuralQuery,
32        schema::{AcceptedSchemaSnapshot, SchemaInfo},
33        schema::{execute_sql_ddl_field_path_index_addition, execute_sql_ddl_secondary_index_drop},
34        session::query::QueryPlanCacheAttribution,
35        session::sql::projection::{
36            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
37        },
38        sql::{
39            ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
40            parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
41        },
42    },
43    traits::{CanisterKind, EntityValue, Path},
44};
45
46pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
47pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
48#[cfg(feature = "diagnostics")]
49pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
50#[cfg(feature = "diagnostics")]
51pub use attribution::{
52    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
53    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
54};
55pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
56pub(in crate::db::session::sql) use cache::{
57    SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
58};
59pub(in crate::db::session::sql) use compile::{
60    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
61};
62pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
63pub use result::SqlStatementResult;
64
65#[cfg(all(test, not(feature = "diagnostics")))]
66pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
67#[cfg(feature = "diagnostics")]
68pub use crate::db::session::sql::projection::{
69    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
70};
71
72// Keep parsing as a module-owned helper instead of hanging a pure parser off
73// `DbSession` as a fake session method.
74#[cfg(test)]
75pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
76    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
77}
78
79/// Return the entity identifier targeted by one reduced SQL statement.
80///
81/// `SHOW ENTITIES` intentionally has no entity target; callers that dispatch
82/// across canister-owned entities may route it through any accepted entity.
83#[doc(hidden)]
84pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
85    let (statement, _) =
86        parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
87
88    Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
89}
90
91const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
92    match statement {
93        SqlStatement::Select(statement) => Some(statement.entity.as_str()),
94        SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
95        SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
96        SqlStatement::Update(statement) => Some(statement.entity.as_str()),
97        SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
98            Some(statement.entity.as_str())
99        }
100        SqlStatement::Ddl(SqlDdlStatement::DropIndex(statement)) => match &statement.entity {
101            Some(entity) => Some(entity.as_str()),
102            None => None,
103        },
104        SqlStatement::Explain(statement) => match &statement.statement {
105            SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
106            SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
107        },
108        SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
109        SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
110        SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
111        SqlStatement::ShowEntities(_) => None,
112    }
113}
114
115// Measure one SQL compile stage and immediately surface the stage result. The
116// helper keeps attribution capture uniform while avoiding repeated
117// `(cost, result); result?` boilerplate across the compile pipeline.
118fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
119    let (local_instructions, result) = measure_sql_stage(stage);
120    let value = result?;
121
122    Ok((local_instructions, value))
123}
124
125impl<C: CanisterKind> DbSession<C> {
126    // Resolve one SQL SELECT through a caller-selected accepted authority and
127    // accepted schema snapshot. Typed SQL entrypoints use this to avoid passing
128    // generated authority through the runtime cache boundary.
129    fn sql_select_prepared_plan_for_accepted_authority(
130        &self,
131        query: &StructuralQuery,
132        authority: EntityAuthority,
133        accepted_schema: &AcceptedSchemaSnapshot,
134    ) -> Result<
135        (
136            SharedPreparedExecutionPlan,
137            SqlProjectionContract,
138            SqlCacheAttribution,
139        ),
140        QueryError,
141    > {
142        let (prepared_plan, cache_attribution) = self
143            .cached_shared_query_plan_for_accepted_authority(
144                authority.clone(),
145                accepted_schema,
146                query,
147            )?;
148        Ok(Self::sql_select_projection_from_prepared_plan(
149            prepared_plan,
150            authority,
151            cache_attribution,
152        ))
153    }
154
155    // Resolve one typed SQL SELECT through accepted schema authority selected
156    // at the session boundary.
157    fn sql_select_prepared_plan_for_entity<E>(
158        &self,
159        query: &StructuralQuery,
160    ) -> Result<
161        (
162            SharedPreparedExecutionPlan,
163            SqlProjectionContract,
164            SqlCacheAttribution,
165        ),
166        QueryError,
167    >
168    where
169        E: PersistedRow<Canister = C> + EntityValue,
170    {
171        let (accepted_schema, authority) = self
172            .accepted_entity_authority::<E>()
173            .map_err(QueryError::execute)?;
174
175        self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
176    }
177
178    fn sql_select_projection_from_prepared_plan(
179        prepared_plan: SharedPreparedExecutionPlan,
180        authority: EntityAuthority,
181        cache_attribution: QueryPlanCacheAttribution,
182    ) -> (
183        SharedPreparedExecutionPlan,
184        SqlProjectionContract,
185        SqlCacheAttribution,
186    ) {
187        let projection_spec = prepared_plan
188            .logical_plan()
189            .projection_spec(authority.model());
190        let projection = SqlProjectionContract::new(
191            projection_labels_from_projection_spec(&projection_spec),
192            projection_fixed_scales_from_projection_spec(&projection_spec),
193        );
194
195        (
196            prepared_plan,
197            projection,
198            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
199        )
200    }
201
202    // Keep query/update surface gating owned by one helper so the SQL
203    // compiled-command lane does not duplicate the same statement-family split
204    // just to change the outward error wording.
205    fn ensure_sql_statement_supported_for_surface(
206        statement: &SqlStatement,
207        surface: SqlCompiledCommandSurface,
208    ) -> Result<(), QueryError> {
209        match (surface, statement) {
210            (
211                SqlCompiledCommandSurface::Query,
212                SqlStatement::Select(_)
213                | SqlStatement::Explain(_)
214                | SqlStatement::Describe(_)
215                | SqlStatement::ShowIndexes(_)
216                | SqlStatement::ShowColumns(_)
217                | SqlStatement::ShowEntities(_),
218            )
219            | (
220                SqlCompiledCommandSurface::Update,
221                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
222            ) => Ok(()),
223            (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
224                "SQL DDL execution is not supported in this release",
225            )),
226            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
227                Err(QueryError::unsupported_query(
228                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
229                ))
230            }
231            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
232                Err(QueryError::unsupported_query(
233                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
234                ))
235            }
236            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
237                Err(QueryError::unsupported_query(
238                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
239                ))
240            }
241            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
242                Err(QueryError::unsupported_query(
243                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
244                ))
245            }
246            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
247                Err(QueryError::unsupported_query(
248                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
249                ))
250            }
251            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
252                Err(QueryError::unsupported_query(
253                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
254                ))
255            }
256            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
257                Err(QueryError::unsupported_query(
258                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
259                ))
260            }
261            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
262                Err(QueryError::unsupported_query(
263                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
264                ))
265            }
266            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
267                Err(QueryError::unsupported_query(
268                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
269                ))
270            }
271        }
272    }
273
274    /// Execute one single-entity reduced SQL query or introspection statement.
275    ///
276    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
277    /// returns SQL-shaped statement output instead of typed entities.
278    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
279    where
280        E: PersistedRow<Canister = C> + EntityValue,
281    {
282        let compiled = self.compile_sql_query::<E>(sql)?;
283
284        self.execute_compiled_sql_owned::<E>(compiled)
285    }
286
287    /// Execute one reduced SQL query while reporting the compile/execute split
288    /// at the top-level SQL seam.
289    #[cfg(feature = "diagnostics")]
290    #[doc(hidden)]
291    pub fn execute_sql_query_with_attribution<E>(
292        &self,
293        sql: &str,
294    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
295    where
296        E: PersistedRow<Canister = C> + EntityValue,
297    {
298        // Phase 1: measure the compile side of the new seam, including parse,
299        // surface validation, and semantic command construction.
300        let (compile_local_instructions, compiled) =
301            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
302        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
303
304        // Phase 2: measure the execute side separately so repeat-run cache
305        // experiments can prove which side actually moved.
306        let store_get_calls_before = DataStore::current_get_call_count();
307        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
308        let pure_covering_row_assembly_before =
309            current_pure_covering_row_assembly_local_instructions();
310        let (result, execute_cache_attribution, execute_phase_attribution) =
311            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
312        let store_get_calls =
313            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
314        let pure_covering_decode_local_instructions =
315            current_pure_covering_decode_local_instructions()
316                .saturating_sub(pure_covering_decode_before);
317        let pure_covering_row_assembly_local_instructions =
318            current_pure_covering_row_assembly_local_instructions()
319                .saturating_sub(pure_covering_row_assembly_before);
320        let execute_local_instructions = execute_phase_attribution
321            .planner_local_instructions
322            .saturating_add(execute_phase_attribution.store_local_instructions)
323            .saturating_add(execute_phase_attribution.executor_local_instructions)
324            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
325        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
326        let total_local_instructions =
327            compile_local_instructions.saturating_add(execute_local_instructions);
328        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
329            GroupedExecutionAttribution {
330                stream_local_instructions: execute_phase_attribution
331                    .grouped_stream_local_instructions,
332                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
333                finalize_local_instructions: execute_phase_attribution
334                    .grouped_finalize_local_instructions,
335                count: GroupedCountAttribution::from_executor(
336                    execute_phase_attribution.grouped_count,
337                ),
338            },
339        );
340        let pure_covering = (pure_covering_decode_local_instructions > 0
341            || pure_covering_row_assembly_local_instructions > 0)
342            .then_some(SqlPureCoveringAttribution {
343                decode_local_instructions: pure_covering_decode_local_instructions,
344                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
345            });
346
347        Ok((
348            result,
349            SqlQueryExecutionAttribution {
350                compile_local_instructions,
351                compile: SqlCompileAttribution {
352                    cache_key_local_instructions: compile_phase_attribution.cache_key,
353                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
354                    parse_local_instructions: compile_phase_attribution.parse,
355                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
356                    parse_select_local_instructions: compile_phase_attribution.parse_select,
357                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
358                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
359                    aggregate_lane_check_local_instructions: compile_phase_attribution
360                        .aggregate_lane_check,
361                    prepare_local_instructions: compile_phase_attribution.prepare,
362                    lower_local_instructions: compile_phase_attribution.lower,
363                    bind_local_instructions: compile_phase_attribution.bind,
364                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
365                },
366                plan_lookup_local_instructions: execute_phase_attribution
367                    .planner_local_instructions,
368                execution: SqlExecutionAttribution {
369                    planner_local_instructions: execute_phase_attribution
370                        .planner_local_instructions,
371                    store_local_instructions: execute_phase_attribution.store_local_instructions,
372                    executor_invocation_local_instructions: execute_phase_attribution
373                        .executor_invocation_local_instructions,
374                    executor_local_instructions: execute_phase_attribution
375                        .executor_local_instructions,
376                    response_finalization_local_instructions: execute_phase_attribution
377                        .response_finalization_local_instructions,
378                },
379                grouped,
380                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
381                    execute_phase_attribution.scalar_aggregate_terminal,
382                ),
383                pure_covering,
384                store_get_calls,
385                response_decode_local_instructions: 0,
386                execute_local_instructions,
387                total_local_instructions,
388                cache: SqlQueryCacheAttribution {
389                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
390                    sql_compiled_command_misses: cache_attribution
391                        .sql_compiled_command_cache_misses,
392                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
393                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
394                },
395            },
396        ))
397    }
398
399    /// Execute one single-entity reduced SQL mutation statement.
400    ///
401    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
402    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
403    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
404    where
405        E: PersistedRow<Canister = C> + EntityValue,
406    {
407        let compiled = self.compile_sql_update::<E>(sql)?;
408
409        self.execute_compiled_sql_owned::<E>(compiled)
410    }
411
412    /// Prepare one SQL DDL statement against the accepted schema catalog.
413    ///
414    /// This is a non-executing surface: it proves the statement can bind,
415    /// derive an accepted-after snapshot, and pass schema mutation admission,
416    /// then returns a prepared-only report without mutating schema or index
417    /// storage.
418    pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
419    where
420        E: PersistedRow<Canister = C> + EntityValue,
421    {
422        let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
423
424        Ok(prepared.report().clone())
425    }
426
427    fn prepare_sql_ddl_command<E>(
428        &self,
429        sql: &str,
430    ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
431    where
432        E: PersistedRow<Canister = C> + EntityValue,
433    {
434        let (statement, _) =
435            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
436        let (accepted_schema, _) = self
437            .accepted_entity_authority::<E>()
438            .map_err(QueryError::execute)?;
439        let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, &accepted_schema);
440        let prepared = match prepare_sql_ddl_statement(
441            &statement,
442            &accepted_schema,
443            &schema_info,
444            E::Store::PATH,
445        ) {
446            Ok(prepared) => prepared,
447            Err(err) => {
448                return Err(QueryError::unsupported_query(format!(
449                    "SQL DDL preparation failed before execution: {err}"
450                )));
451            }
452        };
453
454        Ok((accepted_schema, prepared))
455    }
456
457    /// Execute one SQL DDL statement.
458    ///
459    /// Supported DDL routes through schema-owned physical work and
460    /// accepted-snapshot publication.
461    pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
462    where
463        E: PersistedRow<Canister = C> + EntityValue,
464    {
465        let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
466        if !prepared.mutates_schema() {
467            return Ok(SqlStatementResult::Ddl(
468                prepared
469                    .report()
470                    .clone()
471                    .with_execution_status(SqlDdlExecutionStatus::NoOp),
472            ));
473        }
474
475        let Some(derivation) = prepared.derivation() else {
476            return Err(QueryError::unsupported_query(
477                "SQL DDL execution could not find a prepared schema derivation".to_string(),
478            ));
479        };
480        let store = self
481            .db
482            .recovered_store(E::Store::PATH)
483            .map_err(QueryError::execute)?;
484
485        let (rows_scanned, index_keys_written) = match prepared.bound().statement() {
486            crate::db::sql::ddl::BoundSqlDdlStatement::CreateIndex(_) => {
487                execute_sql_ddl_field_path_index_addition(
488                    store,
489                    E::ENTITY_TAG,
490                    E::PATH,
491                    &accepted_before,
492                    derivation,
493                )
494                .map_err(QueryError::execute)?
495            }
496            crate::db::sql::ddl::BoundSqlDdlStatement::DropIndex(_) => {
497                execute_sql_ddl_secondary_index_drop(
498                    store,
499                    E::ENTITY_TAG,
500                    E::PATH,
501                    &accepted_before,
502                    derivation,
503                )
504                .map_err(QueryError::execute)?;
505
506                (0, 0)
507            }
508            crate::db::sql::ddl::BoundSqlDdlStatement::NoOp(_) => (0, 0),
509        };
510
511        Ok(SqlStatementResult::Ddl(
512            prepared
513                .report()
514                .clone()
515                .with_execution_status(SqlDdlExecutionStatus::Published)
516                .with_execution_metrics(rows_scanned, index_keys_written),
517        ))
518    }
519}