Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod attribution;
8mod cache;
9mod compile;
10mod compile_cache;
11mod compiled;
12mod execute;
13mod projection;
14mod result;
15
16#[cfg(feature = "diagnostics")]
17use crate::db::DataStore;
18#[cfg(feature = "diagnostics")]
19use crate::db::executor::{
20    current_pure_covering_decode_local_instructions,
21    current_pure_covering_row_assembly_local_instructions,
22};
23#[cfg(test)]
24use crate::db::sql::parser::parse_sql;
25#[cfg(feature = "diagnostics")]
26use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
27use crate::{
28    db::{
29        DbSession, PersistedRow, QueryError,
30        commit::CommitSchemaFingerprint,
31        executor::{EntityAuthority, SharedPreparedExecutionPlan},
32        query::intent::StructuralQuery,
33        session::sql::projection::{
34            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
35        },
36        sql::parser::SqlStatement,
37    },
38    traits::{CanisterKind, EntityValue},
39};
40
41pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
42#[cfg(feature = "diagnostics")]
43pub(in crate::db) use attribution::SqlExecutePhaseAttribution;
44#[cfg(feature = "diagnostics")]
45pub use attribution::{
46    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
47    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
48};
49pub(in crate::db::session::sql) use cache::SqlCompiledCommandSurface;
50pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
51pub(in crate::db::session::sql) use compile::{
52    SqlCompileAttributionBuilder, SqlCompilePhaseAttribution,
53};
54pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
55pub use result::SqlStatementResult;
56
57#[cfg(all(test, not(feature = "diagnostics")))]
58pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
59#[cfg(feature = "diagnostics")]
60pub use crate::db::session::sql::projection::{
61    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
62};
63
64// Keep parsing as a module-owned helper instead of hanging a pure parser off
65// `DbSession` as a fake session method.
66#[cfg(test)]
67pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
68    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
69}
70
71// Measure one SQL compile stage and immediately surface the stage result. The
72// helper keeps attribution capture uniform while avoiding repeated
73// `(cost, result); result?` boilerplate across the compile pipeline.
74fn measured<T>(stage: impl FnOnce() -> Result<T, QueryError>) -> Result<(u64, T), QueryError> {
75    let (local_instructions, result) = measure_sql_stage(stage);
76    let value = result?;
77
78    Ok((local_instructions, value))
79}
80
81impl<C: CanisterKind> DbSession<C> {
82    // Resolve one SQL SELECT entirely through the shared lower query-plan
83    // cache and derive only the outward SQL projection contract locally.
84    fn sql_select_prepared_plan(
85        &self,
86        query: &StructuralQuery,
87        authority: EntityAuthority,
88        cache_schema_fingerprint: CommitSchemaFingerprint,
89    ) -> Result<
90        (
91            SharedPreparedExecutionPlan,
92            SqlProjectionContract,
93            SqlCacheAttribution,
94        ),
95        QueryError,
96    > {
97        let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
98            authority,
99            cache_schema_fingerprint,
100            query,
101        )?;
102        let projection_spec = prepared_plan
103            .logical_plan()
104            .projection_spec(authority.model());
105        let projection = SqlProjectionContract::new(
106            projection_labels_from_projection_spec(&projection_spec),
107            projection_fixed_scales_from_projection_spec(&projection_spec),
108        );
109
110        Ok((
111            prepared_plan,
112            projection,
113            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
114        ))
115    }
116
117    // Keep query/update surface gating owned by one helper so the SQL
118    // compiled-command lane does not duplicate the same statement-family split
119    // just to change the outward error wording.
120    fn ensure_sql_statement_supported_for_surface(
121        statement: &SqlStatement,
122        surface: SqlCompiledCommandSurface,
123    ) -> Result<(), QueryError> {
124        match (surface, statement) {
125            (
126                SqlCompiledCommandSurface::Query,
127                SqlStatement::Select(_)
128                | SqlStatement::Explain(_)
129                | SqlStatement::Describe(_)
130                | SqlStatement::ShowIndexes(_)
131                | SqlStatement::ShowColumns(_)
132                | SqlStatement::ShowEntities(_),
133            )
134            | (
135                SqlCompiledCommandSurface::Update,
136                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
137            ) => Ok(()),
138            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
139                Err(QueryError::unsupported_query(
140                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
141                ))
142            }
143            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
144                Err(QueryError::unsupported_query(
145                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
146                ))
147            }
148            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
149                Err(QueryError::unsupported_query(
150                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
151                ))
152            }
153            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
154                Err(QueryError::unsupported_query(
155                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
156                ))
157            }
158            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
159                Err(QueryError::unsupported_query(
160                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
161                ))
162            }
163            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
164                Err(QueryError::unsupported_query(
165                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
166                ))
167            }
168            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
169                Err(QueryError::unsupported_query(
170                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
171                ))
172            }
173            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
174                Err(QueryError::unsupported_query(
175                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
176                ))
177            }
178            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
179                Err(QueryError::unsupported_query(
180                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
181                ))
182            }
183        }
184    }
185
186    /// Execute one single-entity reduced SQL query or introspection statement.
187    ///
188    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
189    /// returns SQL-shaped statement output instead of typed entities.
190    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
191    where
192        E: PersistedRow<Canister = C> + EntityValue,
193    {
194        let compiled = self.compile_sql_query::<E>(sql)?;
195
196        self.execute_compiled_sql_owned::<E>(compiled)
197    }
198
199    /// Execute one reduced SQL query while reporting the compile/execute split
200    /// at the top-level SQL seam.
201    #[cfg(feature = "diagnostics")]
202    #[doc(hidden)]
203    pub fn execute_sql_query_with_attribution<E>(
204        &self,
205        sql: &str,
206    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
207    where
208        E: PersistedRow<Canister = C> + EntityValue,
209    {
210        // Phase 1: measure the compile side of the new seam, including parse,
211        // surface validation, and semantic command construction.
212        let (compile_local_instructions, compiled) =
213            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
214        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
215
216        // Phase 2: measure the execute side separately so repeat-run cache
217        // experiments can prove which side actually moved.
218        let store_get_calls_before = DataStore::current_get_call_count();
219        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
220        let pure_covering_row_assembly_before =
221            current_pure_covering_row_assembly_local_instructions();
222        let (result, execute_cache_attribution, execute_phase_attribution) =
223            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
224        let store_get_calls =
225            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
226        let pure_covering_decode_local_instructions =
227            current_pure_covering_decode_local_instructions()
228                .saturating_sub(pure_covering_decode_before);
229        let pure_covering_row_assembly_local_instructions =
230            current_pure_covering_row_assembly_local_instructions()
231                .saturating_sub(pure_covering_row_assembly_before);
232        let execute_local_instructions = execute_phase_attribution
233            .planner_local_instructions
234            .saturating_add(execute_phase_attribution.store_local_instructions)
235            .saturating_add(execute_phase_attribution.executor_local_instructions)
236            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
237        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
238        let total_local_instructions =
239            compile_local_instructions.saturating_add(execute_local_instructions);
240        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
241            GroupedExecutionAttribution {
242                stream_local_instructions: execute_phase_attribution
243                    .grouped_stream_local_instructions,
244                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
245                finalize_local_instructions: execute_phase_attribution
246                    .grouped_finalize_local_instructions,
247                count: GroupedCountAttribution::from_executor(
248                    execute_phase_attribution.grouped_count,
249                ),
250            },
251        );
252        let pure_covering = (pure_covering_decode_local_instructions > 0
253            || pure_covering_row_assembly_local_instructions > 0)
254            .then_some(SqlPureCoveringAttribution {
255                decode_local_instructions: pure_covering_decode_local_instructions,
256                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
257            });
258
259        Ok((
260            result,
261            SqlQueryExecutionAttribution {
262                compile_local_instructions,
263                compile: SqlCompileAttribution {
264                    cache_key_local_instructions: compile_phase_attribution.cache_key,
265                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
266                    parse_local_instructions: compile_phase_attribution.parse,
267                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
268                    parse_select_local_instructions: compile_phase_attribution.parse_select,
269                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
270                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
271                    aggregate_lane_check_local_instructions: compile_phase_attribution
272                        .aggregate_lane_check,
273                    prepare_local_instructions: compile_phase_attribution.prepare,
274                    lower_local_instructions: compile_phase_attribution.lower,
275                    bind_local_instructions: compile_phase_attribution.bind,
276                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
277                },
278                plan_lookup_local_instructions: execute_phase_attribution
279                    .planner_local_instructions,
280                execution: SqlExecutionAttribution {
281                    planner_local_instructions: execute_phase_attribution
282                        .planner_local_instructions,
283                    store_local_instructions: execute_phase_attribution.store_local_instructions,
284                    executor_invocation_local_instructions: execute_phase_attribution
285                        .executor_invocation_local_instructions,
286                    executor_local_instructions: execute_phase_attribution
287                        .executor_local_instructions,
288                    response_finalization_local_instructions: execute_phase_attribution
289                        .response_finalization_local_instructions,
290                },
291                grouped,
292                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
293                    execute_phase_attribution.scalar_aggregate_terminal,
294                ),
295                pure_covering,
296                store_get_calls,
297                response_decode_local_instructions: 0,
298                execute_local_instructions,
299                total_local_instructions,
300                cache: SqlQueryCacheAttribution {
301                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
302                    sql_compiled_command_misses: cache_attribution
303                        .sql_compiled_command_cache_misses,
304                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
305                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
306                },
307            },
308        ))
309    }
310
311    /// Execute one single-entity reduced SQL mutation statement.
312    ///
313    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
314    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
315    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
316    where
317        E: PersistedRow<Canister = C> + EntityValue,
318    {
319        let compiled = self.compile_sql_update::<E>(sql)?;
320
321        self.execute_compiled_sql_owned::<E>(compiled)
322    }
323}