Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        executor::{EntityAuthority, SharedPreparedExecutionPlan},
31        query::intent::StructuralQuery,
32        schema::execute_sql_ddl_field_path_index_addition,
33        schema::{AcceptedSchemaSnapshot, SchemaInfo},
34        session::query::QueryPlanCacheAttribution,
35        session::sql::projection::{
36            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
37        },
38        sql::{
39            ddl::{PreparedSqlDdlCommand, prepare_sql_ddl_statement},
40            parser::{SqlDdlStatement, SqlExplainTarget, SqlStatement, parse_sql_with_attribution},
41        },
42    },
43    traits::{CanisterKind, EntityValue, Path},
44};
45
46pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
47pub use crate::db::sql::ddl::{SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport};
48#[cfg(feature = "diagnostics")]
49pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
50#[cfg(feature = "diagnostics")]
51pub use attribution::{
52    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
53    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
54};
55pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
56pub(in crate::db::session::sql) use cache::{
57    SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
58};
59pub(in crate::db::session::sql) use compile::{
60    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
61};
62pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
63pub use result::SqlStatementResult;
64
65#[cfg(all(test, not(feature = "diagnostics")))]
66pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
67#[cfg(feature = "diagnostics")]
68pub use crate::db::session::sql::projection::{
69    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
70};
71
72// Keep parsing as a module-owned helper instead of hanging a pure parser off
73// `DbSession` as a fake session method.
74#[cfg(test)]
75pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
76    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
77}
78
79/// Return the entity identifier targeted by one reduced SQL statement.
80///
81/// `SHOW ENTITIES` intentionally has no entity target; callers that dispatch
82/// across canister-owned entities may route it through any accepted entity.
83#[doc(hidden)]
84pub fn sql_statement_entity_name(sql: &str) -> Result<Option<String>, QueryError> {
85    let (statement, _) =
86        parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
87
88    Ok(sql_statement_entity_name_from_statement(&statement).map(str::to_string))
89}
90
91const fn sql_statement_entity_name_from_statement(statement: &SqlStatement) -> Option<&str> {
92    match statement {
93        SqlStatement::Select(statement) => Some(statement.entity.as_str()),
94        SqlStatement::Delete(statement) => Some(statement.entity.as_str()),
95        SqlStatement::Insert(statement) => Some(statement.entity.as_str()),
96        SqlStatement::Update(statement) => Some(statement.entity.as_str()),
97        SqlStatement::Ddl(SqlDdlStatement::CreateIndex(statement)) => {
98            Some(statement.entity.as_str())
99        }
100        SqlStatement::Explain(statement) => match &statement.statement {
101            SqlExplainTarget::Select(statement) => Some(statement.entity.as_str()),
102            SqlExplainTarget::Delete(statement) => Some(statement.entity.as_str()),
103        },
104        SqlStatement::Describe(statement) => Some(statement.entity.as_str()),
105        SqlStatement::ShowIndexes(statement) => Some(statement.entity.as_str()),
106        SqlStatement::ShowColumns(statement) => Some(statement.entity.as_str()),
107        SqlStatement::ShowEntities(_) => None,
108    }
109}
110
111// Measure one SQL compile stage and immediately surface the stage result. The
112// helper keeps attribution capture uniform while avoiding repeated
113// `(cost, result); result?` boilerplate across the compile pipeline.
114fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
115    let (local_instructions, result) = measure_sql_stage(stage);
116    let value = result?;
117
118    Ok((local_instructions, value))
119}
120
121impl<C: CanisterKind> DbSession<C> {
122    // Resolve one SQL SELECT through a caller-selected accepted authority and
123    // accepted schema snapshot. Typed SQL entrypoints use this to avoid passing
124    // generated authority through the runtime cache boundary.
125    fn sql_select_prepared_plan_for_accepted_authority(
126        &self,
127        query: &StructuralQuery,
128        authority: EntityAuthority,
129        accepted_schema: &AcceptedSchemaSnapshot,
130    ) -> Result<
131        (
132            SharedPreparedExecutionPlan,
133            SqlProjectionContract,
134            SqlCacheAttribution,
135        ),
136        QueryError,
137    > {
138        let (prepared_plan, cache_attribution) = self
139            .cached_shared_query_plan_for_accepted_authority(
140                authority.clone(),
141                accepted_schema,
142                query,
143            )?;
144        Ok(Self::sql_select_projection_from_prepared_plan(
145            prepared_plan,
146            authority,
147            cache_attribution,
148        ))
149    }
150
151    // Resolve one typed SQL SELECT through accepted schema authority selected
152    // at the session boundary.
153    fn sql_select_prepared_plan_for_entity<E>(
154        &self,
155        query: &StructuralQuery,
156    ) -> Result<
157        (
158            SharedPreparedExecutionPlan,
159            SqlProjectionContract,
160            SqlCacheAttribution,
161        ),
162        QueryError,
163    >
164    where
165        E: PersistedRow<Canister = C> + EntityValue,
166    {
167        let (accepted_schema, authority) = self
168            .accepted_entity_authority::<E>()
169            .map_err(QueryError::execute)?;
170
171        self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
172    }
173
174    fn sql_select_projection_from_prepared_plan(
175        prepared_plan: SharedPreparedExecutionPlan,
176        authority: EntityAuthority,
177        cache_attribution: QueryPlanCacheAttribution,
178    ) -> (
179        SharedPreparedExecutionPlan,
180        SqlProjectionContract,
181        SqlCacheAttribution,
182    ) {
183        let projection_spec = prepared_plan
184            .logical_plan()
185            .projection_spec(authority.model());
186        let projection = SqlProjectionContract::new(
187            projection_labels_from_projection_spec(&projection_spec),
188            projection_fixed_scales_from_projection_spec(&projection_spec),
189        );
190
191        (
192            prepared_plan,
193            projection,
194            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
195        )
196    }
197
198    // Keep query/update surface gating owned by one helper so the SQL
199    // compiled-command lane does not duplicate the same statement-family split
200    // just to change the outward error wording.
201    fn ensure_sql_statement_supported_for_surface(
202        statement: &SqlStatement,
203        surface: SqlCompiledCommandSurface,
204    ) -> Result<(), QueryError> {
205        match (surface, statement) {
206            (
207                SqlCompiledCommandSurface::Query,
208                SqlStatement::Select(_)
209                | SqlStatement::Explain(_)
210                | SqlStatement::Describe(_)
211                | SqlStatement::ShowIndexes(_)
212                | SqlStatement::ShowColumns(_)
213                | SqlStatement::ShowEntities(_),
214            )
215            | (
216                SqlCompiledCommandSurface::Update,
217                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
218            ) => Ok(()),
219            (_, SqlStatement::Ddl(_)) => Err(QueryError::unsupported_query(
220                "SQL DDL execution is not supported in this release",
221            )),
222            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
223                Err(QueryError::unsupported_query(
224                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
225                ))
226            }
227            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
228                Err(QueryError::unsupported_query(
229                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
230                ))
231            }
232            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
233                Err(QueryError::unsupported_query(
234                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
235                ))
236            }
237            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
238                Err(QueryError::unsupported_query(
239                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
240                ))
241            }
242            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
243                Err(QueryError::unsupported_query(
244                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
245                ))
246            }
247            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
248                Err(QueryError::unsupported_query(
249                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
250                ))
251            }
252            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
253                Err(QueryError::unsupported_query(
254                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
255                ))
256            }
257            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
258                Err(QueryError::unsupported_query(
259                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
260                ))
261            }
262            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
263                Err(QueryError::unsupported_query(
264                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
265                ))
266            }
267        }
268    }
269
270    /// Execute one single-entity reduced SQL query or introspection statement.
271    ///
272    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
273    /// returns SQL-shaped statement output instead of typed entities.
274    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
275    where
276        E: PersistedRow<Canister = C> + EntityValue,
277    {
278        let compiled = self.compile_sql_query::<E>(sql)?;
279
280        self.execute_compiled_sql_owned::<E>(compiled)
281    }
282
283    /// Execute one reduced SQL query while reporting the compile/execute split
284    /// at the top-level SQL seam.
285    #[cfg(feature = "diagnostics")]
286    #[doc(hidden)]
287    pub fn execute_sql_query_with_attribution<E>(
288        &self,
289        sql: &str,
290    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
291    where
292        E: PersistedRow<Canister = C> + EntityValue,
293    {
294        // Phase 1: measure the compile side of the new seam, including parse,
295        // surface validation, and semantic command construction.
296        let (compile_local_instructions, compiled) =
297            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
298        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
299
300        // Phase 2: measure the execute side separately so repeat-run cache
301        // experiments can prove which side actually moved.
302        let store_get_calls_before = DataStore::current_get_call_count();
303        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
304        let pure_covering_row_assembly_before =
305            current_pure_covering_row_assembly_local_instructions();
306        let (result, execute_cache_attribution, execute_phase_attribution) =
307            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
308        let store_get_calls =
309            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
310        let pure_covering_decode_local_instructions =
311            current_pure_covering_decode_local_instructions()
312                .saturating_sub(pure_covering_decode_before);
313        let pure_covering_row_assembly_local_instructions =
314            current_pure_covering_row_assembly_local_instructions()
315                .saturating_sub(pure_covering_row_assembly_before);
316        let execute_local_instructions = execute_phase_attribution
317            .planner_local_instructions
318            .saturating_add(execute_phase_attribution.store_local_instructions)
319            .saturating_add(execute_phase_attribution.executor_local_instructions)
320            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
321        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
322        let total_local_instructions =
323            compile_local_instructions.saturating_add(execute_local_instructions);
324        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
325            GroupedExecutionAttribution {
326                stream_local_instructions: execute_phase_attribution
327                    .grouped_stream_local_instructions,
328                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
329                finalize_local_instructions: execute_phase_attribution
330                    .grouped_finalize_local_instructions,
331                count: GroupedCountAttribution::from_executor(
332                    execute_phase_attribution.grouped_count,
333                ),
334            },
335        );
336        let pure_covering = (pure_covering_decode_local_instructions > 0
337            || pure_covering_row_assembly_local_instructions > 0)
338            .then_some(SqlPureCoveringAttribution {
339                decode_local_instructions: pure_covering_decode_local_instructions,
340                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
341            });
342
343        Ok((
344            result,
345            SqlQueryExecutionAttribution {
346                compile_local_instructions,
347                compile: SqlCompileAttribution {
348                    cache_key_local_instructions: compile_phase_attribution.cache_key,
349                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
350                    parse_local_instructions: compile_phase_attribution.parse,
351                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
352                    parse_select_local_instructions: compile_phase_attribution.parse_select,
353                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
354                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
355                    aggregate_lane_check_local_instructions: compile_phase_attribution
356                        .aggregate_lane_check,
357                    prepare_local_instructions: compile_phase_attribution.prepare,
358                    lower_local_instructions: compile_phase_attribution.lower,
359                    bind_local_instructions: compile_phase_attribution.bind,
360                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
361                },
362                plan_lookup_local_instructions: execute_phase_attribution
363                    .planner_local_instructions,
364                execution: SqlExecutionAttribution {
365                    planner_local_instructions: execute_phase_attribution
366                        .planner_local_instructions,
367                    store_local_instructions: execute_phase_attribution.store_local_instructions,
368                    executor_invocation_local_instructions: execute_phase_attribution
369                        .executor_invocation_local_instructions,
370                    executor_local_instructions: execute_phase_attribution
371                        .executor_local_instructions,
372                    response_finalization_local_instructions: execute_phase_attribution
373                        .response_finalization_local_instructions,
374                },
375                grouped,
376                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
377                    execute_phase_attribution.scalar_aggregate_terminal,
378                ),
379                pure_covering,
380                store_get_calls,
381                response_decode_local_instructions: 0,
382                execute_local_instructions,
383                total_local_instructions,
384                cache: SqlQueryCacheAttribution {
385                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
386                    sql_compiled_command_misses: cache_attribution
387                        .sql_compiled_command_cache_misses,
388                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
389                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
390                },
391            },
392        ))
393    }
394
395    /// Execute one single-entity reduced SQL mutation statement.
396    ///
397    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
398    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
399    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
400    where
401        E: PersistedRow<Canister = C> + EntityValue,
402    {
403        let compiled = self.compile_sql_update::<E>(sql)?;
404
405        self.execute_compiled_sql_owned::<E>(compiled)
406    }
407
408    /// Prepare one SQL DDL statement against the accepted schema catalog.
409    ///
410    /// This is a non-executing surface: it proves the statement can bind,
411    /// derive an accepted-after snapshot, and pass schema mutation admission,
412    /// then returns a prepared-only report without mutating schema or index
413    /// storage.
414    pub fn prepare_sql_ddl<E>(&self, sql: &str) -> Result<SqlDdlPreparationReport, QueryError>
415    where
416        E: PersistedRow<Canister = C> + EntityValue,
417    {
418        let (_, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
419
420        Ok(prepared.report().clone())
421    }
422
423    fn prepare_sql_ddl_command<E>(
424        &self,
425        sql: &str,
426    ) -> Result<(AcceptedSchemaSnapshot, PreparedSqlDdlCommand), QueryError>
427    where
428        E: PersistedRow<Canister = C> + EntityValue,
429    {
430        let (statement, _) =
431            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)?;
432        let (accepted_schema, _) = self
433            .accepted_entity_authority::<E>()
434            .map_err(QueryError::execute)?;
435        let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, &accepted_schema);
436        let prepared =
437            prepare_sql_ddl_statement(&statement, &accepted_schema, &schema_info, E::Store::PATH)
438                .map_err(|err| {
439                QueryError::unsupported_query(format!(
440                    "SQL DDL preparation failed before execution: {err}"
441                ))
442            })?;
443
444        Ok((accepted_schema, prepared))
445    }
446
447    /// Execute one SQL DDL statement.
448    ///
449    /// The 0.155 execution boundary routes the single supported DDL shape
450    /// through schema-owned physical rebuild and accepted-snapshot publication.
451    pub fn execute_sql_ddl<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
452    where
453        E: PersistedRow<Canister = C> + EntityValue,
454    {
455        let (accepted_before, prepared) = self.prepare_sql_ddl_command::<E>(sql)?;
456        let store = self
457            .db
458            .recovered_store(E::Store::PATH)
459            .map_err(QueryError::execute)?;
460
461        execute_sql_ddl_field_path_index_addition(
462            store,
463            E::ENTITY_TAG,
464            E::PATH,
465            &accepted_before,
466            prepared.derivation(),
467        )
468        .map_err(QueryError::execute)?;
469
470        Ok(SqlStatementResult::Ddl(
471            prepared
472                .report()
473                .clone()
474                .with_execution_status(SqlDdlExecutionStatus::Published),
475        ))
476    }
477}