Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod cache;
8mod compiled;
9mod execute;
10mod projection;
11
12#[cfg(feature = "diagnostics")]
13use candid::CandidType;
14#[cfg(feature = "diagnostics")]
15use serde::Deserialize;
16use std::sync::Arc;
17
18#[cfg(feature = "diagnostics")]
19use crate::db::DataStore;
20#[cfg(feature = "diagnostics")]
21use crate::db::executor::{
22    GroupedCountAttribution as ExecutorGroupedCountAttribution, ScalarAggregateTerminalAttribution,
23    current_pure_covering_decode_local_instructions,
24    current_pure_covering_row_assembly_local_instructions,
25};
26#[cfg(test)]
27use crate::db::sql::parser::parse_sql;
28#[cfg(feature = "diagnostics")]
29use crate::db::{GroupedCountAttribution, GroupedExecutionAttribution};
30use crate::{
31    db::{
32        DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
33        commit::CommitSchemaFingerprint,
34        executor::{EntityAuthority, SharedPreparedExecutionPlan},
35        query::intent::StructuralQuery,
36        session::sql::projection::{
37            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
38        },
39        sql::lowering::{
40            bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
41            compile_sql_global_aggregate_command_core_from_prepared,
42            extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
43            lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
44            lower_sql_command_from_prepared_statement, prepare_sql_statement,
45        },
46        sql::parser::{SqlStatement, parse_sql_with_attribution},
47    },
48    traits::{CanisterKind, EntityValue},
49    value::OutputValue,
50};
51
52pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
53pub(in crate::db::session::sql) use cache::SqlCompiledCommandSurface;
54pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
55pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
56
57#[cfg(all(test, not(feature = "diagnostics")))]
58pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
59#[cfg(feature = "diagnostics")]
60pub use crate::db::session::sql::projection::{
61    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
62};
63
64/// Unified SQL statement payload returned by shared SQL lane execution.
65#[derive(Debug)]
66pub enum SqlStatementResult {
67    Count {
68        row_count: u32,
69    },
70    Projection {
71        columns: Vec<String>,
72        fixed_scales: Vec<Option<u32>>,
73        rows: Vec<Vec<OutputValue>>,
74        row_count: u32,
75    },
76    ProjectionText {
77        columns: Vec<String>,
78        rows: Vec<Vec<String>>,
79        row_count: u32,
80    },
81    Grouped {
82        columns: Vec<String>,
83        fixed_scales: Vec<Option<u32>>,
84        rows: Vec<GroupedRow>,
85        row_count: u32,
86        next_cursor: Option<String>,
87    },
88    Explain(String),
89    Describe(crate::db::EntitySchemaDescription),
90    ShowIndexes(Vec<String>),
91    ShowColumns(Vec<crate::db::EntityFieldDescription>),
92    ShowEntities(Vec<String>),
93}
94
95// SqlCompileAttribution
96//
97// Candid diagnostics payload for SQL front-end compile counters.
98// The short field names are scoped by the `compile` parent field on
99// `SqlQueryExecutionAttribution`.
100#[cfg(feature = "diagnostics")]
101#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
102pub struct SqlCompileAttribution {
103    pub cache_key_local_instructions: u64,
104    pub cache_lookup_local_instructions: u64,
105    pub parse_local_instructions: u64,
106    pub parse_tokenize_local_instructions: u64,
107    pub parse_select_local_instructions: u64,
108    pub parse_expr_local_instructions: u64,
109    pub parse_predicate_local_instructions: u64,
110    pub aggregate_lane_check_local_instructions: u64,
111    pub prepare_local_instructions: u64,
112    pub lower_local_instructions: u64,
113    pub bind_local_instructions: u64,
114    pub cache_insert_local_instructions: u64,
115}
116
117// SqlExecutionAttribution
118//
119// Candid diagnostics payload for the reduced SQL execute phase.
120// Planner, store, executor invocation, executor runtime, and response
121// finalization counters stay together under the `execution` parent field.
122#[cfg(feature = "diagnostics")]
123#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
124pub struct SqlExecutionAttribution {
125    pub planner_local_instructions: u64,
126    pub store_local_instructions: u64,
127    pub executor_invocation_local_instructions: u64,
128    pub executor_local_instructions: u64,
129    pub response_finalization_local_instructions: u64,
130}
131
132// SqlScalarAggregateAttribution
133//
134// Candid diagnostics payload for scalar aggregate terminal execution.
135// The field names drop the old `scalar_aggregate_` prefix because the parent
136// field now owns that context.
137#[cfg(feature = "diagnostics")]
138#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
139pub struct SqlScalarAggregateAttribution {
140    pub base_row_local_instructions: u64,
141    pub reducer_fold_local_instructions: u64,
142    pub expression_evaluations: u64,
143    pub filter_evaluations: u64,
144    pub rows_ingested: u64,
145    pub terminal_count: u64,
146    pub unique_input_expr_count: u64,
147    pub unique_filter_expr_count: u64,
148    pub sink_mode: Option<String>,
149}
150
151#[cfg(feature = "diagnostics")]
152impl SqlScalarAggregateAttribution {
153    fn from_executor(terminal: ScalarAggregateTerminalAttribution) -> Option<Self> {
154        // Treat the nested payload as absent only when the executor reported
155        // no scalar aggregate work at all. This keeps COUNT fast paths compact
156        // while preserving any future counter that becomes nonzero.
157        let has_scalar_aggregate_work = terminal.base_row_local_instructions != 0
158            || terminal.reducer_fold_local_instructions != 0
159            || terminal.expression_evaluations != 0
160            || terminal.filter_evaluations != 0
161            || terminal.rows_ingested != 0
162            || terminal.terminal_count != 0
163            || terminal.unique_input_expr_count != 0
164            || terminal.unique_filter_expr_count != 0
165            || terminal.sink_mode.label().is_some();
166        if !has_scalar_aggregate_work {
167            return None;
168        }
169
170        Some(Self {
171            base_row_local_instructions: terminal.base_row_local_instructions,
172            reducer_fold_local_instructions: terminal.reducer_fold_local_instructions,
173            expression_evaluations: terminal.expression_evaluations,
174            filter_evaluations: terminal.filter_evaluations,
175            rows_ingested: terminal.rows_ingested,
176            terminal_count: terminal.terminal_count,
177            unique_input_expr_count: terminal.unique_input_expr_count,
178            unique_filter_expr_count: terminal.unique_filter_expr_count,
179            sink_mode: terminal.sink_mode.label().map(str::to_string),
180        })
181    }
182}
183
184// SqlPureCoveringAttribution
185//
186// Candid diagnostics payload for pure covering projection counters.
187// The value is optional on the top-level SQL attribution because most query
188// shapes do not enter this projection path.
189#[cfg(feature = "diagnostics")]
190#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
191pub struct SqlPureCoveringAttribution {
192    pub decode_local_instructions: u64,
193    pub row_assembly_local_instructions: u64,
194}
195
196// SqlQueryCacheAttribution
197//
198// Candid diagnostics payload for SQL compiled-command and shared query-plan
199// cache counters observed during one SQL query call.
200#[cfg(feature = "diagnostics")]
201#[derive(CandidType, Clone, Copy, Debug, Default, Deserialize, Eq, PartialEq)]
202pub struct SqlQueryCacheAttribution {
203    pub sql_compiled_command_hits: u64,
204    pub sql_compiled_command_misses: u64,
205    pub shared_query_plan_hits: u64,
206    pub shared_query_plan_misses: u64,
207}
208
209// SqlQueryExecutionAttribution
210//
211// SqlQueryExecutionAttribution records the top-level reduced SQL query cost
212// split at the new compile/execute seam.
213// Every field is an additive counter where zero means no observed work or no
214// observed event for that bucket. Path-specific counters are present only for
215// the execution path that produced them.
216
217#[cfg(feature = "diagnostics")]
218#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
219pub struct SqlQueryExecutionAttribution {
220    pub compile_local_instructions: u64,
221    pub compile: SqlCompileAttribution,
222    pub plan_lookup_local_instructions: u64,
223    pub execution: SqlExecutionAttribution,
224    pub grouped: Option<GroupedExecutionAttribution>,
225    pub scalar_aggregate: Option<SqlScalarAggregateAttribution>,
226    pub pure_covering: Option<SqlPureCoveringAttribution>,
227    pub store_get_calls: u64,
228    pub response_decode_local_instructions: u64,
229    pub execute_local_instructions: u64,
230    pub total_local_instructions: u64,
231    pub cache: SqlQueryCacheAttribution,
232}
233
234// SqlExecutePhaseAttribution keeps the execute side split into select-plan
235// work, physical store/index access, and narrower runtime execution so shell
236// tooling can show all three.
237#[cfg(feature = "diagnostics")]
238#[derive(Clone, Copy, Debug, Eq, PartialEq)]
239pub(in crate::db) struct SqlExecutePhaseAttribution {
240    pub planner_local_instructions: u64,
241    pub store_local_instructions: u64,
242    pub executor_invocation_local_instructions: u64,
243    pub executor_local_instructions: u64,
244    pub response_finalization_local_instructions: u64,
245    pub grouped_stream_local_instructions: u64,
246    pub grouped_fold_local_instructions: u64,
247    pub grouped_finalize_local_instructions: u64,
248    pub grouped_count: ExecutorGroupedCountAttribution,
249    pub scalar_aggregate_terminal: ScalarAggregateTerminalAttribution,
250}
251
252///
253/// SqlCompilePhaseAttribution
254///
255/// SqlCompilePhaseAttribution keeps the SQL-front-end compile miss path split
256/// into the concrete stages that still exist after the shared lower-cache
257/// collapse.
258/// This lets perf audits distinguish cache lookup, parsing, prepared-statement
259/// normalization, lowered-command construction, structural binding, and cache
260/// insertion cost instead of treating compile as one opaque bucket.
261///
262
263#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
264pub(in crate::db) struct SqlCompilePhaseAttribution {
265    pub cache_key: u64,
266    pub cache_lookup: u64,
267    pub parse: u64,
268    pub parse_tokenize: u64,
269    pub parse_select: u64,
270    pub parse_expr: u64,
271    pub parse_predicate: u64,
272    pub aggregate_lane_check: u64,
273    pub prepare: u64,
274    pub lower: u64,
275    pub bind: u64,
276    pub cache_insert: u64,
277}
278
279impl SqlCompilePhaseAttribution {
280    #[must_use]
281    const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
282        Self {
283            cache_key,
284            cache_lookup,
285            parse: 0,
286            parse_tokenize: 0,
287            parse_select: 0,
288            parse_expr: 0,
289            parse_predicate: 0,
290            aggregate_lane_check: 0,
291            prepare: 0,
292            lower: 0,
293            bind: 0,
294            cache_insert: 0,
295        }
296    }
297}
298
299#[cfg(feature = "diagnostics")]
300impl SqlExecutePhaseAttribution {
301    #[must_use]
302    pub(in crate::db) const fn from_execute_total_and_store_total(
303        execute_local_instructions: u64,
304        store_local_instructions: u64,
305    ) -> Self {
306        Self {
307            planner_local_instructions: 0,
308            store_local_instructions,
309            executor_invocation_local_instructions: execute_local_instructions,
310            executor_local_instructions: execute_local_instructions
311                .saturating_sub(store_local_instructions),
312            response_finalization_local_instructions: 0,
313            grouped_stream_local_instructions: 0,
314            grouped_fold_local_instructions: 0,
315            grouped_finalize_local_instructions: 0,
316            grouped_count: ExecutorGroupedCountAttribution::none(),
317            scalar_aggregate_terminal: ScalarAggregateTerminalAttribution::none(),
318        }
319    }
320}
321
322// Keep parsing as a module-owned helper instead of hanging a pure parser off
323// `DbSession` as a fake session method.
324#[cfg(test)]
325pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
326    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
327}
328
329impl<C: CanisterKind> DbSession<C> {
330    // Compile one parsed SQL statement into the generic-free session-owned
331    // semantic command artifact for one resolved authority.
332    #[expect(clippy::too_many_lines)]
333    fn compile_sql_statement_for_authority(
334        statement: &SqlStatement,
335        authority: EntityAuthority,
336        compiled_cache_key: SqlCompiledCommandCacheKey,
337    ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
338        // Reuse one local preparation closure so the session compile surface
339        // reaches the prepared-statement owner directly without another
340        // single-purpose module hop.
341        let prepare_statement = || {
342            measure_sql_stage(|| {
343                prepare_sql_statement(statement.clone(), authority.model().name())
344                    .map_err(QueryError::from_sql_lowering_error)
345            })
346        };
347
348        match statement {
349            SqlStatement::Select(_) => {
350                let (prepare_local_instructions, prepared) = prepare_statement();
351                let prepared = prepared?;
352                let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
353                    measure_sql_stage(|| {
354                        Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
355                    });
356                let requires_aggregate_lane = requires_aggregate_lane?;
357
358                if requires_aggregate_lane {
359                    let (lower_local_instructions, command) = measure_sql_stage(|| {
360                        compile_sql_global_aggregate_command_core_from_prepared(
361                            prepared,
362                            authority.model(),
363                            MissingRowPolicy::Ignore,
364                        )
365                        .map_err(QueryError::from_sql_lowering_error)
366                    });
367                    let command = command?;
368
369                    Ok((
370                        CompiledSqlCommand::GlobalAggregate {
371                            command: Box::new(command),
372                        },
373                        aggregate_lane_check_local_instructions,
374                        prepare_local_instructions,
375                        lower_local_instructions,
376                        0,
377                    ))
378                } else {
379                    let (lower_local_instructions, select) = measure_sql_stage(|| {
380                        lower_prepared_sql_select_statement(prepared, authority.model())
381                            .map_err(QueryError::from_sql_lowering_error)
382                    });
383                    let select = select?;
384                    let (bind_local_instructions, query) = measure_sql_stage(|| {
385                        bind_lowered_sql_select_query_structural(
386                            authority.model(),
387                            select,
388                            MissingRowPolicy::Ignore,
389                        )
390                        .map_err(QueryError::from_sql_lowering_error)
391                    });
392                    let query = query?;
393
394                    Ok((
395                        CompiledSqlCommand::Select {
396                            query: Arc::new(query),
397                            compiled_cache_key,
398                        },
399                        aggregate_lane_check_local_instructions,
400                        prepare_local_instructions,
401                        lower_local_instructions,
402                        bind_local_instructions,
403                    ))
404                }
405            }
406            SqlStatement::Delete(_) => {
407                let (prepare_local_instructions, prepared) = prepare_statement();
408                let prepared = prepared?;
409                let (lower_local_instructions, lowered) = measure_sql_stage(|| {
410                    lower_prepared_sql_delete_statement(prepared)
411                        .map_err(QueryError::from_sql_lowering_error)
412                });
413                let delete = lowered?;
414                let returning = delete.returning().cloned();
415                let query = delete.into_base_query();
416                let (bind_local_instructions, query) = measure_sql_stage(|| {
417                    Ok::<_, QueryError>(bind_lowered_sql_delete_query_structural(
418                        authority.model(),
419                        query,
420                        MissingRowPolicy::Ignore,
421                    ))
422                });
423                let query = query?;
424
425                Ok((
426                    CompiledSqlCommand::Delete {
427                        query: Arc::new(query),
428                        returning,
429                    },
430                    0,
431                    prepare_local_instructions,
432                    lower_local_instructions,
433                    bind_local_instructions,
434                ))
435            }
436            SqlStatement::Insert(_) => {
437                let (prepare_local_instructions, prepared) = prepare_statement();
438                let prepared = prepared?;
439                let statement = extract_prepared_sql_insert_statement(prepared)
440                    .map_err(QueryError::from_sql_lowering_error)?;
441
442                Ok((
443                    CompiledSqlCommand::Insert(statement),
444                    0,
445                    prepare_local_instructions,
446                    0,
447                    0,
448                ))
449            }
450            SqlStatement::Update(_) => {
451                let (prepare_local_instructions, prepared) = prepare_statement();
452                let prepared = prepared?;
453                let statement = extract_prepared_sql_update_statement(prepared)
454                    .map_err(QueryError::from_sql_lowering_error)?;
455
456                Ok((
457                    CompiledSqlCommand::Update(statement),
458                    0,
459                    prepare_local_instructions,
460                    0,
461                    0,
462                ))
463            }
464            SqlStatement::Explain(_) => {
465                let (prepare_local_instructions, prepared) = prepare_statement();
466                let prepared = prepared?;
467                let (lower_local_instructions, lowered) = measure_sql_stage(|| {
468                    lower_sql_command_from_prepared_statement(prepared, authority.model())
469                        .map_err(QueryError::from_sql_lowering_error)
470                });
471                let lowered = lowered?;
472
473                Ok((
474                    CompiledSqlCommand::Explain(Box::new(lowered)),
475                    0,
476                    prepare_local_instructions,
477                    lower_local_instructions,
478                    0,
479                ))
480            }
481            SqlStatement::Describe(_) => {
482                let (prepare_local_instructions, prepared) = prepare_statement();
483                let _prepared = prepared?;
484
485                Ok((
486                    CompiledSqlCommand::DescribeEntity,
487                    0,
488                    prepare_local_instructions,
489                    0,
490                    0,
491                ))
492            }
493            SqlStatement::ShowIndexes(_) => {
494                let (prepare_local_instructions, prepared) = prepare_statement();
495                let _prepared = prepared?;
496
497                Ok((
498                    CompiledSqlCommand::ShowIndexesEntity,
499                    0,
500                    prepare_local_instructions,
501                    0,
502                    0,
503                ))
504            }
505            SqlStatement::ShowColumns(_) => {
506                let (prepare_local_instructions, prepared) = prepare_statement();
507                let _prepared = prepared?;
508
509                Ok((
510                    CompiledSqlCommand::ShowColumnsEntity,
511                    0,
512                    prepare_local_instructions,
513                    0,
514                    0,
515                ))
516            }
517            SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
518        }
519    }
520
521    // Resolve one SQL SELECT entirely through the shared lower query-plan
522    // cache and derive only the outward SQL projection contract locally.
523    fn sql_select_prepared_plan(
524        &self,
525        query: &StructuralQuery,
526        authority: EntityAuthority,
527        cache_schema_fingerprint: CommitSchemaFingerprint,
528    ) -> Result<
529        (
530            SharedPreparedExecutionPlan,
531            SqlProjectionContract,
532            SqlCacheAttribution,
533        ),
534        QueryError,
535    > {
536        let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
537            authority,
538            cache_schema_fingerprint,
539            query,
540        )?;
541        let projection_spec = prepared_plan
542            .logical_plan()
543            .projection_spec(authority.model());
544        let projection = SqlProjectionContract::new(
545            projection_labels_from_projection_spec(&projection_spec),
546            projection_fixed_scales_from_projection_spec(&projection_spec),
547        );
548
549        Ok((
550            prepared_plan,
551            projection,
552            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
553        ))
554    }
555
556    // Keep query/update surface gating owned by one helper so the SQL
557    // compiled-command lane does not duplicate the same statement-family split
558    // just to change the outward error wording.
559    fn ensure_sql_statement_supported_for_surface(
560        statement: &SqlStatement,
561        surface: SqlCompiledCommandSurface,
562    ) -> Result<(), QueryError> {
563        match (surface, statement) {
564            (
565                SqlCompiledCommandSurface::Query,
566                SqlStatement::Select(_)
567                | SqlStatement::Explain(_)
568                | SqlStatement::Describe(_)
569                | SqlStatement::ShowIndexes(_)
570                | SqlStatement::ShowColumns(_)
571                | SqlStatement::ShowEntities(_),
572            )
573            | (
574                SqlCompiledCommandSurface::Update,
575                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
576            ) => Ok(()),
577            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
578                Err(QueryError::unsupported_query(
579                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
580                ))
581            }
582            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
583                Err(QueryError::unsupported_query(
584                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
585                ))
586            }
587            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
588                Err(QueryError::unsupported_query(
589                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
590                ))
591            }
592            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
593                Err(QueryError::unsupported_query(
594                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
595                ))
596            }
597            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
598                Err(QueryError::unsupported_query(
599                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
600                ))
601            }
602            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
603                Err(QueryError::unsupported_query(
604                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
605                ))
606            }
607            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
608                Err(QueryError::unsupported_query(
609                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
610                ))
611            }
612            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
613                Err(QueryError::unsupported_query(
614                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
615                ))
616            }
617            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
618                Err(QueryError::unsupported_query(
619                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
620                ))
621            }
622        }
623    }
624
625    /// Execute one single-entity reduced SQL query or introspection statement.
626    ///
627    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
628    /// returns SQL-shaped statement output instead of typed entities.
629    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
630    where
631        E: PersistedRow<Canister = C> + EntityValue,
632    {
633        let compiled = self.compile_sql_query::<E>(sql)?;
634
635        self.execute_compiled_sql_owned::<E>(compiled)
636    }
637
638    /// Execute one reduced SQL query while reporting the compile/execute split
639    /// at the top-level SQL seam.
640    #[cfg(feature = "diagnostics")]
641    #[doc(hidden)]
642    pub fn execute_sql_query_with_attribution<E>(
643        &self,
644        sql: &str,
645    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
646    where
647        E: PersistedRow<Canister = C> + EntityValue,
648    {
649        // Phase 1: measure the compile side of the new seam, including parse,
650        // surface validation, and semantic command construction.
651        let (compile_local_instructions, compiled) =
652            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
653        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
654
655        // Phase 2: measure the execute side separately so repeat-run cache
656        // experiments can prove which side actually moved.
657        let store_get_calls_before = DataStore::current_get_call_count();
658        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
659        let pure_covering_row_assembly_before =
660            current_pure_covering_row_assembly_local_instructions();
661        let (result, execute_cache_attribution, execute_phase_attribution) =
662            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
663        let store_get_calls =
664            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
665        let pure_covering_decode_local_instructions =
666            current_pure_covering_decode_local_instructions()
667                .saturating_sub(pure_covering_decode_before);
668        let pure_covering_row_assembly_local_instructions =
669            current_pure_covering_row_assembly_local_instructions()
670                .saturating_sub(pure_covering_row_assembly_before);
671        let execute_local_instructions = execute_phase_attribution
672            .planner_local_instructions
673            .saturating_add(execute_phase_attribution.store_local_instructions)
674            .saturating_add(execute_phase_attribution.executor_local_instructions)
675            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
676        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
677        let total_local_instructions =
678            compile_local_instructions.saturating_add(execute_local_instructions);
679        let grouped = matches!(&result, SqlStatementResult::Grouped { .. }).then_some(
680            GroupedExecutionAttribution {
681                stream_local_instructions: execute_phase_attribution
682                    .grouped_stream_local_instructions,
683                fold_local_instructions: execute_phase_attribution.grouped_fold_local_instructions,
684                finalize_local_instructions: execute_phase_attribution
685                    .grouped_finalize_local_instructions,
686                count: GroupedCountAttribution::from_executor(
687                    execute_phase_attribution.grouped_count,
688                ),
689            },
690        );
691        let pure_covering = (pure_covering_decode_local_instructions > 0
692            || pure_covering_row_assembly_local_instructions > 0)
693            .then_some(SqlPureCoveringAttribution {
694                decode_local_instructions: pure_covering_decode_local_instructions,
695                row_assembly_local_instructions: pure_covering_row_assembly_local_instructions,
696            });
697
698        Ok((
699            result,
700            SqlQueryExecutionAttribution {
701                compile_local_instructions,
702                compile: SqlCompileAttribution {
703                    cache_key_local_instructions: compile_phase_attribution.cache_key,
704                    cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
705                    parse_local_instructions: compile_phase_attribution.parse,
706                    parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
707                    parse_select_local_instructions: compile_phase_attribution.parse_select,
708                    parse_expr_local_instructions: compile_phase_attribution.parse_expr,
709                    parse_predicate_local_instructions: compile_phase_attribution.parse_predicate,
710                    aggregate_lane_check_local_instructions: compile_phase_attribution
711                        .aggregate_lane_check,
712                    prepare_local_instructions: compile_phase_attribution.prepare,
713                    lower_local_instructions: compile_phase_attribution.lower,
714                    bind_local_instructions: compile_phase_attribution.bind,
715                    cache_insert_local_instructions: compile_phase_attribution.cache_insert,
716                },
717                plan_lookup_local_instructions: execute_phase_attribution
718                    .planner_local_instructions,
719                execution: SqlExecutionAttribution {
720                    planner_local_instructions: execute_phase_attribution
721                        .planner_local_instructions,
722                    store_local_instructions: execute_phase_attribution.store_local_instructions,
723                    executor_invocation_local_instructions: execute_phase_attribution
724                        .executor_invocation_local_instructions,
725                    executor_local_instructions: execute_phase_attribution
726                        .executor_local_instructions,
727                    response_finalization_local_instructions: execute_phase_attribution
728                        .response_finalization_local_instructions,
729                },
730                grouped,
731                scalar_aggregate: SqlScalarAggregateAttribution::from_executor(
732                    execute_phase_attribution.scalar_aggregate_terminal,
733                ),
734                pure_covering,
735                store_get_calls,
736                response_decode_local_instructions: 0,
737                execute_local_instructions,
738                total_local_instructions,
739                cache: SqlQueryCacheAttribution {
740                    sql_compiled_command_hits: cache_attribution.sql_compiled_command_cache_hits,
741                    sql_compiled_command_misses: cache_attribution
742                        .sql_compiled_command_cache_misses,
743                    shared_query_plan_hits: cache_attribution.shared_query_plan_cache_hits,
744                    shared_query_plan_misses: cache_attribution.shared_query_plan_cache_misses,
745                },
746            },
747        ))
748    }
749
750    /// Execute one single-entity reduced SQL mutation statement.
751    ///
752    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
753    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
754    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
755    where
756        E: PersistedRow<Canister = C> + EntityValue,
757    {
758        let compiled = self.compile_sql_update::<E>(sql)?;
759
760        self.execute_compiled_sql_owned::<E>(compiled)
761    }
762
763    // Compile one SQL query-surface string into the session-owned generic-free
764    // semantic command artifact before execution.
765    pub(in crate::db) fn compile_sql_query<E>(
766        &self,
767        sql: &str,
768    ) -> Result<CompiledSqlCommand, QueryError>
769    where
770        E: PersistedRow<Canister = C> + EntityValue,
771    {
772        self.compile_sql_query_with_cache_attribution::<E>(sql)
773            .map(|(compiled, _, _)| compiled)
774    }
775
776    fn compile_sql_query_with_cache_attribution<E>(
777        &self,
778        sql: &str,
779    ) -> Result<
780        (
781            CompiledSqlCommand,
782            SqlCacheAttribution,
783            SqlCompilePhaseAttribution,
784        ),
785        QueryError,
786    >
787    where
788        E: PersistedRow<Canister = C> + EntityValue,
789    {
790        self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
791    }
792
793    // Compile one SQL update-surface string into the session-owned generic-free
794    // semantic command artifact before execution.
795    pub(in crate::db) fn compile_sql_update<E>(
796        &self,
797        sql: &str,
798    ) -> Result<CompiledSqlCommand, QueryError>
799    where
800        E: PersistedRow<Canister = C> + EntityValue,
801    {
802        self.compile_sql_update_with_cache_attribution::<E>(sql)
803            .map(|(compiled, _, _)| compiled)
804    }
805
806    fn compile_sql_update_with_cache_attribution<E>(
807        &self,
808        sql: &str,
809    ) -> Result<
810        (
811            CompiledSqlCommand,
812            SqlCacheAttribution,
813            SqlCompilePhaseAttribution,
814        ),
815        QueryError,
816    >
817    where
818        E: PersistedRow<Canister = C> + EntityValue,
819    {
820        self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
821    }
822
823    // Reuse one internal compile shell for both outward SQL surfaces so query
824    // and update no longer duplicate cache-key construction and surface
825    // validation plumbing before they reach the real compile/cache owner.
826    fn compile_sql_surface_with_cache_attribution<E>(
827        &self,
828        sql: &str,
829        surface: SqlCompiledCommandSurface,
830    ) -> Result<
831        (
832            CompiledSqlCommand,
833            SqlCacheAttribution,
834            SqlCompilePhaseAttribution,
835        ),
836        QueryError,
837    >
838    where
839        E: PersistedRow<Canister = C> + EntityValue,
840    {
841        let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
842            Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
843        });
844        let cache_key = cache_key?;
845
846        self.compile_sql_statement_with_cache::<E, _>(
847            cache_key,
848            cache_key_local_instructions,
849            sql,
850            |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
851        )
852    }
853
854    // Reuse one previously compiled SQL artifact when the session-local cache
855    // can prove the surface, entity contract, and raw SQL text all match.
856    fn compile_sql_statement_with_cache<E, F>(
857        &self,
858        cache_key: SqlCompiledCommandCacheKey,
859        cache_key_local_instructions: u64,
860        sql: &str,
861        ensure_surface_supported: F,
862    ) -> Result<
863        (
864            CompiledSqlCommand,
865            SqlCacheAttribution,
866            SqlCompilePhaseAttribution,
867        ),
868        QueryError,
869    >
870    where
871        E: PersistedRow<Canister = C> + EntityValue,
872        F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
873    {
874        let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
875            let cached =
876                self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
877            Ok::<_, QueryError>(cached)
878        });
879        let cached = cached?;
880        if let Some(compiled) = cached {
881            return Ok((
882                compiled,
883                SqlCacheAttribution::sql_compiled_command_cache_hit(),
884                SqlCompilePhaseAttribution::cache_hit(
885                    cache_key_local_instructions,
886                    cache_lookup_local_instructions,
887                ),
888            ));
889        }
890
891        let (parse_local_instructions, parsed) = measure_sql_stage(|| {
892            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
893        });
894        let (parsed, parse_attribution) = parsed?;
895        let parse_select_local_instructions = parse_local_instructions
896            .saturating_sub(parse_attribution.tokenize)
897            .saturating_sub(parse_attribution.expr)
898            .saturating_sub(parse_attribution.predicate);
899        ensure_surface_supported(&parsed)?;
900        let authority = EntityAuthority::for_type::<E>();
901        let (
902            compiled,
903            aggregate_lane_check_local_instructions,
904            prepare_local_instructions,
905            lower_local_instructions,
906            bind_local_instructions,
907        ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
908
909        let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
910            self.with_sql_compiled_command_cache(|cache| {
911                cache.insert(cache_key, compiled.clone());
912            });
913            Ok::<_, QueryError>(())
914        });
915        cache_insert?;
916
917        Ok((
918            compiled,
919            SqlCacheAttribution::sql_compiled_command_cache_miss(),
920            SqlCompilePhaseAttribution {
921                cache_key: cache_key_local_instructions,
922                cache_lookup: cache_lookup_local_instructions,
923                parse: parse_local_instructions,
924                parse_tokenize: parse_attribution.tokenize,
925                parse_select: parse_select_local_instructions,
926                parse_expr: parse_attribution.expr,
927                parse_predicate: parse_attribution.predicate,
928                aggregate_lane_check: aggregate_lane_check_local_instructions,
929                prepare: prepare_local_instructions,
930                lower: lower_local_instructions,
931                bind: bind_local_instructions,
932                cache_insert: cache_insert_local_instructions,
933            },
934        ))
935    }
936}