Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        executor::{EntityAuthority, SharedPreparedExecutionPlan},
31        query::intent::StructuralQuery,
32        schema::AcceptedSchemaSnapshot,
33        session::query::QueryPlanCacheAttribution,
34        session::sql::projection::{
35            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
36        },
37        sql::parser::SqlStatement,
38    },
39    traits::{CanisterKind, EntityValue},
40};
41
42pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
43#[cfg(feature = "diagnostics")]
44pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
45#[cfg(feature = "diagnostics")]
46pub use attribution::{
47    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
48    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
49};
50pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
51pub(in crate::db::session::sql) use cache::{
52    SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
53};
54pub(in crate::db::session::sql) use compile::{
55    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
56};
57pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
58pub use result::SqlStatementResult;
59
60#[cfg(all(test, not(feature = "diagnostics")))]
61pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
62#[cfg(feature = "diagnostics")]
63pub use crate::db::session::sql::projection::{
64    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
65};
66
67// Keep parsing as a module-owned helper instead of hanging a pure parser off
68// `DbSession` as a fake session method.
69#[cfg(test)]
70pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
71    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
72}
73
74// Measure one SQL compile stage and immediately surface the stage result. The
75// helper keeps attribution capture uniform while avoiding repeated
76// `(cost, result); result?` boilerplate across the compile pipeline.
77fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
78    let (local_instructions, result) = measure_sql_stage(stage);
79    let value = result?;
80
81    Ok((local_instructions, value))
82}
83
84impl<C: CanisterKind> DbSession<C> {
85    // Resolve one SQL SELECT through a caller-selected accepted authority and
86    // accepted schema snapshot. Typed SQL entrypoints use this to avoid passing
87    // generated authority through the runtime cache boundary.
88    fn sql_select_prepared_plan_for_accepted_authority(
89        &self,
90        query: &StructuralQuery,
91        authority: EntityAuthority,
92        accepted_schema: &AcceptedSchemaSnapshot,
93    ) -> Result<
94        (
95            SharedPreparedExecutionPlan,
96            SqlProjectionContract,
97            SqlCacheAttribution,
98        ),
99        QueryError,
100    > {
101        let (prepared_plan, cache_attribution) = self
102            .cached_shared_query_plan_for_accepted_authority(
103                authority.clone(),
104                accepted_schema,
105                query,
106            )?;
107        Ok(Self::sql_select_projection_from_prepared_plan(
108            prepared_plan,
109            authority,
110            cache_attribution,
111        ))
112    }
113
114    // Resolve one typed SQL SELECT through accepted schema authority selected
115    // at the session boundary.
116    fn sql_select_prepared_plan_for_entity<E>(
117        &self,
118        query: &StructuralQuery,
119    ) -> Result<
120        (
121            SharedPreparedExecutionPlan,
122            SqlProjectionContract,
123            SqlCacheAttribution,
124        ),
125        QueryError,
126    >
127    where
128        E: PersistedRow<Canister = C> + EntityValue,
129    {
130        let (accepted_schema, authority) = self
131            .accepted_entity_authority::<E>()
132            .map_err(QueryError::execute)?;
133
134        self.sql_select_prepared_plan_for_accepted_authority(query, authority, &accepted_schema)
135    }
136
137    fn sql_select_projection_from_prepared_plan(
138        prepared_plan: SharedPreparedExecutionPlan,
139        authority: EntityAuthority,
140        cache_attribution: QueryPlanCacheAttribution,
141    ) -> (
142        SharedPreparedExecutionPlan,
143        SqlProjectionContract,
144        SqlCacheAttribution,
145    ) {
146        let projection_spec = prepared_plan
147            .logical_plan()
148            .projection_spec(authority.model());
149        let projection = SqlProjectionContract::new(
150            projection_labels_from_projection_spec(&projection_spec),
151            projection_fixed_scales_from_projection_spec(&projection_spec),
152        );
153
154        (
155            prepared_plan,
156            projection,
157            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
158        )
159    }
160
161    // Keep query/update surface gating owned by one helper so the SQL
162    // compiled-command lane does not duplicate the same statement-family split
163    // just to change the outward error wording.
164    fn ensure_sql_statement_supported_for_surface(
165        statement: &SqlStatement,
166        surface: SqlCompiledCommandSurface,
167    ) -> Result<(), QueryError> {
168        match (surface, statement) {
169            (
170                SqlCompiledCommandSurface::Query,
171                SqlStatement::Select(_)
172                | SqlStatement::Explain(_)
173                | SqlStatement::Describe(_)
174                | SqlStatement::ShowIndexes(_)
175                | SqlStatement::ShowColumns(_)
176                | SqlStatement::ShowEntities(_),
177            )
178            | (
179                SqlCompiledCommandSurface::Update,
180                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
181            ) => Ok(()),
182            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
183                Err(QueryError::unsupported_query(
184                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
185                ))
186            }
187            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
188                Err(QueryError::unsupported_query(
189                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
190                ))
191            }
192            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
193                Err(QueryError::unsupported_query(
194                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
195                ))
196            }
197            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
198                Err(QueryError::unsupported_query(
199                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
200                ))
201            }
202            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
203                Err(QueryError::unsupported_query(
204                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
205                ))
206            }
207            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
208                Err(QueryError::unsupported_query(
209                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
210                ))
211            }
212            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
213                Err(QueryError::unsupported_query(
214                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
215                ))
216            }
217            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
218                Err(QueryError::unsupported_query(
219                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
220                ))
221            }
222            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
223                Err(QueryError::unsupported_query(
224                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
225                ))
226            }
227        }
228    }
229
230    /// Execute one single-entity reduced SQL query or introspection statement.
231    ///
232    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
233    /// returns SQL-shaped statement output instead of typed entities.
234    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
235    where
236        E: PersistedRow<Canister = C> + EntityValue,
237    {
238        let compiled = self.compile_sql_query::<E>(sql)?;
239
240        self.execute_compiled_sql_owned::<E>(compiled)
241    }
242
243    /// Execute one reduced SQL query while reporting the compile/execute split
244    /// at the top-level SQL seam.
245    #[cfg(feature = "diagnostics")]
246    #[doc(hidden)]
247    pub fn execute_sql_query_with_attribution<E>(
248        &self,
249        sql: &str,
250    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
251    where
252        E: PersistedRow<Canister = C> + EntityValue,
253    {
254        // Phase 1: measure the compile side of the new seam, including parse,
255        // surface validation, and semantic command construction.
256        let (compile_local_instructions, compiled) =
257            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
258        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
259
260        // Phase 2: measure the execute side separately so repeat-run cache
261        // experiments can prove which side actually moved.
262        let store_get_calls_before = DataStore::current_get_call_count();
263        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
264        let pure_covering_row_assembly_before =
265            current_pure_covering_row_assembly_local_instructions();
266        let (result, execute_cache_attribution, execute_phase_attribution) =
267            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
268        let store_get_calls =
269            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
270        let pure_covering_decode_local_instructions =
271            current_pure_covering_decode_local_instructions()
272                .saturating_sub(pure_covering_decode_before);
273        let pure_covering_row_assembly_local_instructions =
274            current_pure_covering_row_assembly_local_instructions()
275                .saturating_sub(pure_covering_row_assembly_before);
276        let execute_local_instructions = execute_phase_attribution
277            .planner_local_instructions
278            .saturating_add(execute_phase_attribution.store_local_instructions)
279            .saturating_add(execute_phase_attribution.executor_local_instructions)
280            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
281        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
282        let total_local_instructions =
283            compile_local_instructions.saturating_add(execute_local_instructions);
284        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
285            GroupedExecutionAttribution {
286                stream_local_instructions: execute_phase_attribution
287                    .grouped_stream_local_instructions,
288                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
289                finalize_local_instructions: execute_phase_attribution
290                    .grouped_finalize_local_instructions,
291                count: GroupedCountAttribution::from_executor(
292                    execute_phase_attribution.grouped_count,
293                ),
294            },
295        );
296        let pure_covering = (pure_covering_decode_local_instructions > 0
297            || pure_covering_row_assembly_local_instructions > 0)
298            .then_some(SqlPureCoveringAttribution {
299                decode_local_instructions: pure_covering_decode_local_instructions,
300                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
301            });
302
303        Ok((
304            result,
305            SqlQueryExecutionAttribution {
306                compile_local_instructions,
307                compile: SqlCompileAttribution {
308                    cache_key_local_instructions: compile_phase_attribution.cache_key,
309                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
310                    parse_local_instructions: compile_phase_attribution.parse,
311                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
312                    parse_select_local_instructions: compile_phase_attribution.parse_select,
313                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
314                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
315                    aggregate_lane_check_local_instructions: compile_phase_attribution
316                        .aggregate_lane_check,
317                    prepare_local_instructions: compile_phase_attribution.prepare,
318                    lower_local_instructions: compile_phase_attribution.lower,
319                    bind_local_instructions: compile_phase_attribution.bind,
320                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
321                },
322                plan_lookup_local_instructions: execute_phase_attribution
323                    .planner_local_instructions,
324                execution: SqlExecutionAttribution {
325                    planner_local_instructions: execute_phase_attribution
326                        .planner_local_instructions,
327                    store_local_instructions: execute_phase_attribution.store_local_instructions,
328                    executor_invocation_local_instructions: execute_phase_attribution
329                        .executor_invocation_local_instructions,
330                    executor_local_instructions: execute_phase_attribution
331                        .executor_local_instructions,
332                    response_finalization_local_instructions: execute_phase_attribution
333                        .response_finalization_local_instructions,
334                },
335                grouped,
336                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
337                    execute_phase_attribution.scalar_aggregate_terminal,
338                ),
339                pure_covering,
340                store_get_calls,
341                response_decode_local_instructions: 0,
342                execute_local_instructions,
343                total_local_instructions,
344                cache: SqlQueryCacheAttribution {
345                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
346                    sql_compiled_command_misses: cache_attribution
347                        .sql_compiled_command_cache_misses,
348                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
349                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
350                },
351            },
352        ))
353    }
354
355    /// Execute one single-entity reduced SQL mutation statement.
356    ///
357    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
358    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
359    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
360    where
361        E: PersistedRow<Canister = C> + EntityValue,
362    {
363        let compiled = self.compile_sql_update::<E>(sql)?;
364
365        self.execute_compiled_sql_owned::<E>(compiled)
366    }
367}