Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        executor::{EntityAuthority, SharedPreparedExecutionPlan},
31        query::intent::StructuralQuery,
32        session::sql::projection::{
33            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
34        },
35        sql::parser::SqlStatement,
36    },
37    traits::{CanisterKind, EntityValue},
38};
39
40pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
41#[cfg(feature = "diagnostics")]
42pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
43#[cfg(feature = "diagnostics")]
44pub use attribution::{
45    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
46    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
47};
48pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
49pub(in crate::db::session::sql) use cache::{
50    SqlCompiledCommandSurface, sql_compiled_command_cache_miss_reason,
51};
52pub(in crate::db::session::sql) use compile::{
53    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
54};
55pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
56pub use result::SqlStatementResult;
57
58#[cfg(all(test, not(feature = "diagnostics")))]
59pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
60#[cfg(feature = "diagnostics")]
61pub use crate::db::session::sql::projection::{
62    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
63};
64
65// Keep parsing as a module-owned helper instead of hanging a pure parser off
66// `DbSession` as a fake session method.
67#[cfg(test)]
68pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
69    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
70}
71
72// Measure one SQL compile stage and immediately surface the stage result. The
73// helper keeps attribution capture uniform while avoiding repeated
74// `(cost, result); result?` boilerplate across the compile pipeline.
75fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
76    let (local_instructions, result) = measure_sql_stage(stage);
77    let value = result?;
78
79    Ok((local_instructions, value))
80}
81
82impl<C: CanisterKind> DbSession<C> {
83    // Resolve one SQL SELECT entirely through the shared lower query-plan
84    // cache and derive only the outward SQL projection contract locally.
85    fn sql_select_prepared_plan(
86        &self,
87        query: &StructuralQuery,
88        authority: EntityAuthority,
89    ) -> Result<
90        (
91            SharedPreparedExecutionPlan,
92            SqlProjectionContract,
93            SqlCacheAttribution,
94        ),
95        QueryError,
96    > {
97        let (prepared_plan, cache_attribution) =
98            self.cached_shared_query_plan_for_authority(authority, query)?;
99        let projection_spec = prepared_plan
100            .logical_plan()
101            .projection_spec(authority.model());
102        let projection = SqlProjectionContract::new(
103            projection_labels_from_projection_spec(&projection_spec),
104            projection_fixed_scales_from_projection_spec(&projection_spec),
105        );
106
107        Ok((
108            prepared_plan,
109            projection,
110            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
111        ))
112    }
113
114    // Keep query/update surface gating owned by one helper so the SQL
115    // compiled-command lane does not duplicate the same statement-family split
116    // just to change the outward error wording.
117    fn ensure_sql_statement_supported_for_surface(
118        statement: &SqlStatement,
119        surface: SqlCompiledCommandSurface,
120    ) -> Result<(), QueryError> {
121        match (surface, statement) {
122            (
123                SqlCompiledCommandSurface::Query,
124                SqlStatement::Select(_)
125                | SqlStatement::Explain(_)
126                | SqlStatement::Describe(_)
127                | SqlStatement::ShowIndexes(_)
128                | SqlStatement::ShowColumns(_)
129                | SqlStatement::ShowEntities(_),
130            )
131            | (
132                SqlCompiledCommandSurface::Update,
133                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
134            ) => Ok(()),
135            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
136                Err(QueryError::unsupported_query(
137                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
138                ))
139            }
140            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
141                Err(QueryError::unsupported_query(
142                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
143                ))
144            }
145            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
146                Err(QueryError::unsupported_query(
147                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
148                ))
149            }
150            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
151                Err(QueryError::unsupported_query(
152                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
153                ))
154            }
155            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
156                Err(QueryError::unsupported_query(
157                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
158                ))
159            }
160            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
161                Err(QueryError::unsupported_query(
162                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
163                ))
164            }
165            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
166                Err(QueryError::unsupported_query(
167                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
168                ))
169            }
170            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
171                Err(QueryError::unsupported_query(
172                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
173                ))
174            }
175            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
176                Err(QueryError::unsupported_query(
177                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
178                ))
179            }
180        }
181    }
182
183    /// Execute one single-entity reduced SQL query or introspection statement.
184    ///
185    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
186    /// returns SQL-shaped statement output instead of typed entities.
187    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
188    where
189        E: PersistedRow<Canister = C> + EntityValue,
190    {
191        let compiled = self.compile_sql_query::<E>(sql)?;
192
193        self.execute_compiled_sql_owned::<E>(compiled)
194    }
195
196    /// Execute one reduced SQL query while reporting the compile/execute split
197    /// at the top-level SQL seam.
198    #[cfg(feature = "diagnostics")]
199    #[doc(hidden)]
200    pub fn execute_sql_query_with_attribution<E>(
201        &self,
202        sql: &str,
203    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
204    where
205        E: PersistedRow<Canister = C> + EntityValue,
206    {
207        // Phase 1: measure the compile side of the new seam, including parse,
208        // surface validation, and semantic command construction.
209        let (compile_local_instructions, compiled) =
210            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
211        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
212
213        // Phase 2: measure the execute side separately so repeat-run cache
214        // experiments can prove which side actually moved.
215        let store_get_calls_before = DataStore::current_get_call_count();
216        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
217        let pure_covering_row_assembly_before =
218            current_pure_covering_row_assembly_local_instructions();
219        let (result, execute_cache_attribution, execute_phase_attribution) =
220            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
221        let store_get_calls =
222            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
223        let pure_covering_decode_local_instructions =
224            current_pure_covering_decode_local_instructions()
225                .saturating_sub(pure_covering_decode_before);
226        let pure_covering_row_assembly_local_instructions =
227            current_pure_covering_row_assembly_local_instructions()
228                .saturating_sub(pure_covering_row_assembly_before);
229        let execute_local_instructions = execute_phase_attribution
230            .planner_local_instructions
231            .saturating_add(execute_phase_attribution.store_local_instructions)
232            .saturating_add(execute_phase_attribution.executor_local_instructions)
233            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
234        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
235        let total_local_instructions =
236            compile_local_instructions.saturating_add(execute_local_instructions);
237        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
238            GroupedExecutionAttribution {
239                stream_local_instructions: execute_phase_attribution
240                    .grouped_stream_local_instructions,
241                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
242                finalize_local_instructions: execute_phase_attribution
243                    .grouped_finalize_local_instructions,
244                count: GroupedCountAttribution::from_executor(
245                    execute_phase_attribution.grouped_count,
246                ),
247            },
248        );
249        let pure_covering = (pure_covering_decode_local_instructions > 0
250            || pure_covering_row_assembly_local_instructions > 0)
251            .then_some(SqlPureCoveringAttribution {
252                decode_local_instructions: pure_covering_decode_local_instructions,
253                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
254            });
255
256        Ok((
257            result,
258            SqlQueryExecutionAttribution {
259                compile_local_instructions,
260                compile: SqlCompileAttribution {
261                    cache_key_local_instructions: compile_phase_attribution.cache_key,
262                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
263                    parse_local_instructions: compile_phase_attribution.parse,
264                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
265                    parse_select_local_instructions: compile_phase_attribution.parse_select,
266                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
267                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
268                    aggregate_lane_check_local_instructions: compile_phase_attribution
269                        .aggregate_lane_check,
270                    prepare_local_instructions: compile_phase_attribution.prepare,
271                    lower_local_instructions: compile_phase_attribution.lower,
272                    bind_local_instructions: compile_phase_attribution.bind,
273                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
274                },
275                plan_lookup_local_instructions: execute_phase_attribution
276                    .planner_local_instructions,
277                execution: SqlExecutionAttribution {
278                    planner_local_instructions: execute_phase_attribution
279                        .planner_local_instructions,
280                    store_local_instructions: execute_phase_attribution.store_local_instructions,
281                    executor_invocation_local_instructions: execute_phase_attribution
282                        .executor_invocation_local_instructions,
283                    executor_local_instructions: execute_phase_attribution
284                        .executor_local_instructions,
285                    response_finalization_local_instructions: execute_phase_attribution
286                        .response_finalization_local_instructions,
287                },
288                grouped,
289                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
290                    execute_phase_attribution.scalar_aggregate_terminal,
291                ),
292                pure_covering,
293                store_get_calls,
294                response_decode_local_instructions: 0,
295                execute_local_instructions,
296                total_local_instructions,
297                cache: SqlQueryCacheAttribution {
298                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
299                    sql_compiled_command_misses: cache_attribution
300                        .sql_compiled_command_cache_misses,
301                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
302                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
303                },
304            },
305        ))
306    }
307
308    /// Execute one single-entity reduced SQL mutation statement.
309    ///
310    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
311    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
312    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
313    where
314        E: PersistedRow<Canister = C> + EntityValue,
315    {
316        let compiled = self.compile_sql_update::<E>(sql)?;
317
318        self.execute_compiled_sql_owned::<E>(compiled)
319    }
320}