Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod cache;
8mod compiled;
9mod execute;
10mod projection;
11
12#[cfg(feature = "diagnostics")]
13use candid::CandidType;
14#[cfg(feature = "diagnostics")]
15use serde::Deserialize;
16use std::sync::Arc;
17
18#[cfg(feature = "diagnostics")]
19use crate::db::DataStore;
20#[cfg(feature = "diagnostics")]
21use crate::db::executor::{
22    GroupedCountAttribution, ScalarAggregateTerminalAttribution,
23    current_pure_covering_decode_local_instructions,
24    current_pure_covering_row_assembly_local_instructions,
25};
26#[cfg(test)]
27use crate::db::sql::parser::parse_sql;
28use crate::{
29    db::{
30        DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
31        commit::CommitSchemaFingerprint,
32        executor::{EntityAuthority, SharedPreparedExecutionPlan},
33        query::intent::StructuralQuery,
34        session::sql::projection::{
35            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
36        },
37        sql::lowering::{
38            bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
39            compile_sql_global_aggregate_command_core_from_prepared,
40            extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
41            lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
42            lower_sql_command_from_prepared_statement, prepare_sql_statement,
43        },
44        sql::parser::{SqlStatement, parse_sql_with_attribution},
45    },
46    traits::{CanisterKind, EntityValue},
47    value::OutputValue,
48};
49
50pub(in crate::db::session::sql) use crate::db::diagnostics::measure_local_instruction_delta as measure_sql_stage;
51pub(in crate::db::session::sql) use cache::SqlCompiledCommandSurface;
52pub(in crate::db) use cache::{SqlCacheAttribution, SqlCompiledCommandCacheKey};
53pub(in crate::db) use compiled::{CompiledSqlCommand, SqlProjectionContract};
54
55#[cfg(all(test, not(feature = "diagnostics")))]
56pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
57#[cfg(feature = "diagnostics")]
58pub use crate::db::session::sql::projection::{
59    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
60};
61
62/// Unified SQL statement payload returned by shared SQL lane execution.
63#[derive(Debug)]
64pub enum SqlStatementResult {
65    Count {
66        row_count: u32,
67    },
68    Projection {
69        columns: Vec<String>,
70        fixed_scales: Vec<Option<u32>>,
71        rows: Vec<Vec<OutputValue>>,
72        row_count: u32,
73    },
74    ProjectionText {
75        columns: Vec<String>,
76        rows: Vec<Vec<String>>,
77        row_count: u32,
78    },
79    Grouped {
80        columns: Vec<String>,
81        fixed_scales: Vec<Option<u32>>,
82        rows: Vec<GroupedRow>,
83        row_count: u32,
84        next_cursor: Option<String>,
85    },
86    Explain(String),
87    Describe(crate::db::EntitySchemaDescription),
88    ShowIndexes(Vec<String>),
89    ShowColumns(Vec<crate::db::EntityFieldDescription>),
90    ShowEntities(Vec<String>),
91}
92
93///
94/// SqlQueryExecutionAttribution
95///
96/// SqlQueryExecutionAttribution records the top-level reduced SQL query cost
97/// split at the new compile/execute seam.
98/// This keeps future cache validation focused on one concrete question:
99/// whether repeated queries stop paying compile cost while execute cost stays
100/// otherwise comparable.
101/// Every field is an additive counter where zero means no observed work or no
102/// observed event for that bucket. Future non-additive diagnostics must use an
103/// explicit presence type instead of relying on `Default`.
104///
105
106#[cfg(feature = "diagnostics")]
107#[derive(CandidType, Clone, Debug, Default, Deserialize, Eq, PartialEq)]
108pub struct SqlQueryExecutionAttribution {
109    pub compile_local_instructions: u64,
110    pub compile_cache_key_local_instructions: u64,
111    pub compile_cache_lookup_local_instructions: u64,
112    pub compile_parse_local_instructions: u64,
113    pub compile_parse_tokenize_local_instructions: u64,
114    pub compile_parse_select_local_instructions: u64,
115    pub compile_parse_expr_local_instructions: u64,
116    pub compile_parse_predicate_local_instructions: u64,
117    pub compile_aggregate_lane_check_local_instructions: u64,
118    pub compile_prepare_local_instructions: u64,
119    pub compile_lower_local_instructions: u64,
120    pub compile_bind_local_instructions: u64,
121    pub compile_cache_insert_local_instructions: u64,
122    pub plan_lookup_local_instructions: u64,
123    pub planner_local_instructions: u64,
124    pub store_local_instructions: u64,
125    pub executor_invocation_local_instructions: u64,
126    pub executor_local_instructions: u64,
127    pub response_finalization_local_instructions: u64,
128    pub grouped_stream_local_instructions: u64,
129    pub grouped_fold_local_instructions: u64,
130    pub grouped_finalize_local_instructions: u64,
131    pub grouped_count_borrowed_hash_computations: u64,
132    pub grouped_count_bucket_candidate_checks: u64,
133    pub grouped_count_existing_group_hits: u64,
134    pub grouped_count_new_group_inserts: u64,
135    pub grouped_count_row_materialization_local_instructions: u64,
136    pub grouped_count_group_lookup_local_instructions: u64,
137    pub grouped_count_existing_group_update_local_instructions: u64,
138    pub grouped_count_new_group_insert_local_instructions: u64,
139    pub scalar_aggregate_base_row_local_instructions: u64,
140    pub scalar_aggregate_reducer_fold_local_instructions: u64,
141    pub scalar_aggregate_expression_evaluations: u64,
142    pub scalar_aggregate_filter_evaluations: u64,
143    pub scalar_aggregate_rows_ingested: u64,
144    pub scalar_aggregate_terminal_count: u64,
145    pub scalar_aggregate_unique_input_expr_count: u64,
146    pub scalar_aggregate_unique_filter_expr_count: u64,
147    pub scalar_aggregate_sink_mode: Option<String>,
148    pub pure_covering_decode_local_instructions: u64,
149    pub pure_covering_row_assembly_local_instructions: u64,
150    pub store_get_calls: u64,
151    pub response_decode_local_instructions: u64,
152    pub execute_local_instructions: u64,
153    pub total_local_instructions: u64,
154    pub sql_compiled_command_cache_hits: u64,
155    pub sql_compiled_command_cache_misses: u64,
156    pub shared_query_plan_cache_hits: u64,
157    pub shared_query_plan_cache_misses: u64,
158}
159
160// SqlExecutePhaseAttribution keeps the execute side split into select-plan
161// work, physical store/index access, and narrower runtime execution so shell
162// tooling can show all three.
163#[cfg(feature = "diagnostics")]
164#[derive(Clone, Copy, Debug, Eq, PartialEq)]
165pub(in crate::db) struct SqlExecutePhaseAttribution {
166    pub planner_local_instructions: u64,
167    pub store_local_instructions: u64,
168    pub executor_invocation_local_instructions: u64,
169    pub executor_local_instructions: u64,
170    pub response_finalization_local_instructions: u64,
171    pub grouped_stream_local_instructions: u64,
172    pub grouped_fold_local_instructions: u64,
173    pub grouped_finalize_local_instructions: u64,
174    pub grouped_count: GroupedCountAttribution,
175    pub scalar_aggregate_terminal: ScalarAggregateTerminalAttribution,
176}
177
178///
179/// SqlCompilePhaseAttribution
180///
181/// SqlCompilePhaseAttribution keeps the SQL-front-end compile miss path split
182/// into the concrete stages that still exist after the shared lower-cache
183/// collapse.
184/// This lets perf audits distinguish cache lookup, parsing, prepared-statement
185/// normalization, lowered-command construction, structural binding, and cache
186/// insertion cost instead of treating compile as one opaque bucket.
187///
188
189#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
190pub(in crate::db) struct SqlCompilePhaseAttribution {
191    pub cache_key: u64,
192    pub cache_lookup: u64,
193    pub parse: u64,
194    pub parse_tokenize: u64,
195    pub parse_select: u64,
196    pub parse_expr: u64,
197    pub parse_predicate: u64,
198    pub aggregate_lane_check: u64,
199    pub prepare: u64,
200    pub lower: u64,
201    pub bind: u64,
202    pub cache_insert: u64,
203}
204
205impl SqlCompilePhaseAttribution {
206    #[must_use]
207    const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
208        Self {
209            cache_key,
210            cache_lookup,
211            parse: 0,
212            parse_tokenize: 0,
213            parse_select: 0,
214            parse_expr: 0,
215            parse_predicate: 0,
216            aggregate_lane_check: 0,
217            prepare: 0,
218            lower: 0,
219            bind: 0,
220            cache_insert: 0,
221        }
222    }
223}
224
225#[cfg(feature = "diagnostics")]
226impl SqlExecutePhaseAttribution {
227    #[must_use]
228    pub(in crate::db) const fn from_execute_total_and_store_total(
229        execute_local_instructions: u64,
230        store_local_instructions: u64,
231    ) -> Self {
232        Self {
233            planner_local_instructions: 0,
234            store_local_instructions,
235            executor_invocation_local_instructions: execute_local_instructions,
236            executor_local_instructions: execute_local_instructions
237                .saturating_sub(store_local_instructions),
238            response_finalization_local_instructions: 0,
239            grouped_stream_local_instructions: 0,
240            grouped_fold_local_instructions: 0,
241            grouped_finalize_local_instructions: 0,
242            grouped_count: GroupedCountAttribution::none(),
243            scalar_aggregate_terminal: ScalarAggregateTerminalAttribution::none(),
244        }
245    }
246}
247
248// Keep parsing as a module-owned helper instead of hanging a pure parser off
249// `DbSession` as a fake session method.
250#[cfg(test)]
251pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
252    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
253}
254
255impl<C: CanisterKind> DbSession<C> {
256    // Compile one parsed SQL statement into the generic-free session-owned
257    // semantic command artifact for one resolved authority.
258    #[expect(clippy::too_many_lines)]
259    fn compile_sql_statement_for_authority(
260        statement: &SqlStatement,
261        authority: EntityAuthority,
262        compiled_cache_key: SqlCompiledCommandCacheKey,
263    ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
264        // Reuse one local preparation closure so the session compile surface
265        // reaches the prepared-statement owner directly without another
266        // single-purpose module hop.
267        let prepare_statement = || {
268            measure_sql_stage(|| {
269                prepare_sql_statement(statement.clone(), authority.model().name())
270                    .map_err(QueryError::from_sql_lowering_error)
271            })
272        };
273
274        match statement {
275            SqlStatement::Select(_) => {
276                let (prepare_local_instructions, prepared) = prepare_statement();
277                let prepared = prepared?;
278                let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
279                    measure_sql_stage(|| {
280                        Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
281                    });
282                let requires_aggregate_lane = requires_aggregate_lane?;
283
284                if requires_aggregate_lane {
285                    let (lower_local_instructions, command) = measure_sql_stage(|| {
286                        compile_sql_global_aggregate_command_core_from_prepared(
287                            prepared,
288                            authority.model(),
289                            MissingRowPolicy::Ignore,
290                        )
291                        .map_err(QueryError::from_sql_lowering_error)
292                    });
293                    let command = command?;
294
295                    Ok((
296                        CompiledSqlCommand::GlobalAggregate {
297                            command: Box::new(command),
298                        },
299                        aggregate_lane_check_local_instructions,
300                        prepare_local_instructions,
301                        lower_local_instructions,
302                        0,
303                    ))
304                } else {
305                    let (lower_local_instructions, select) = measure_sql_stage(|| {
306                        lower_prepared_sql_select_statement(prepared, authority.model())
307                            .map_err(QueryError::from_sql_lowering_error)
308                    });
309                    let select = select?;
310                    let (bind_local_instructions, query) = measure_sql_stage(|| {
311                        bind_lowered_sql_select_query_structural(
312                            authority.model(),
313                            select,
314                            MissingRowPolicy::Ignore,
315                        )
316                        .map_err(QueryError::from_sql_lowering_error)
317                    });
318                    let query = query?;
319
320                    Ok((
321                        CompiledSqlCommand::Select {
322                            query: Arc::new(query),
323                            compiled_cache_key,
324                        },
325                        aggregate_lane_check_local_instructions,
326                        prepare_local_instructions,
327                        lower_local_instructions,
328                        bind_local_instructions,
329                    ))
330                }
331            }
332            SqlStatement::Delete(_) => {
333                let (prepare_local_instructions, prepared) = prepare_statement();
334                let prepared = prepared?;
335                let (lower_local_instructions, lowered) = measure_sql_stage(|| {
336                    lower_prepared_sql_delete_statement(prepared)
337                        .map_err(QueryError::from_sql_lowering_error)
338                });
339                let delete = lowered?;
340                let returning = delete.returning().cloned();
341                let query = delete.into_base_query();
342                let (bind_local_instructions, query) = measure_sql_stage(|| {
343                    Ok::<_, QueryError>(bind_lowered_sql_delete_query_structural(
344                        authority.model(),
345                        query,
346                        MissingRowPolicy::Ignore,
347                    ))
348                });
349                let query = query?;
350
351                Ok((
352                    CompiledSqlCommand::Delete {
353                        query: Arc::new(query),
354                        returning,
355                    },
356                    0,
357                    prepare_local_instructions,
358                    lower_local_instructions,
359                    bind_local_instructions,
360                ))
361            }
362            SqlStatement::Insert(_) => {
363                let (prepare_local_instructions, prepared) = prepare_statement();
364                let prepared = prepared?;
365                let statement = extract_prepared_sql_insert_statement(prepared)
366                    .map_err(QueryError::from_sql_lowering_error)?;
367
368                Ok((
369                    CompiledSqlCommand::Insert(statement),
370                    0,
371                    prepare_local_instructions,
372                    0,
373                    0,
374                ))
375            }
376            SqlStatement::Update(_) => {
377                let (prepare_local_instructions, prepared) = prepare_statement();
378                let prepared = prepared?;
379                let statement = extract_prepared_sql_update_statement(prepared)
380                    .map_err(QueryError::from_sql_lowering_error)?;
381
382                Ok((
383                    CompiledSqlCommand::Update(statement),
384                    0,
385                    prepare_local_instructions,
386                    0,
387                    0,
388                ))
389            }
390            SqlStatement::Explain(_) => {
391                let (prepare_local_instructions, prepared) = prepare_statement();
392                let prepared = prepared?;
393                let (lower_local_instructions, lowered) = measure_sql_stage(|| {
394                    lower_sql_command_from_prepared_statement(prepared, authority.model())
395                        .map_err(QueryError::from_sql_lowering_error)
396                });
397                let lowered = lowered?;
398
399                Ok((
400                    CompiledSqlCommand::Explain(Box::new(lowered)),
401                    0,
402                    prepare_local_instructions,
403                    lower_local_instructions,
404                    0,
405                ))
406            }
407            SqlStatement::Describe(_) => {
408                let (prepare_local_instructions, prepared) = prepare_statement();
409                let _prepared = prepared?;
410
411                Ok((
412                    CompiledSqlCommand::DescribeEntity,
413                    0,
414                    prepare_local_instructions,
415                    0,
416                    0,
417                ))
418            }
419            SqlStatement::ShowIndexes(_) => {
420                let (prepare_local_instructions, prepared) = prepare_statement();
421                let _prepared = prepared?;
422
423                Ok((
424                    CompiledSqlCommand::ShowIndexesEntity,
425                    0,
426                    prepare_local_instructions,
427                    0,
428                    0,
429                ))
430            }
431            SqlStatement::ShowColumns(_) => {
432                let (prepare_local_instructions, prepared) = prepare_statement();
433                let _prepared = prepared?;
434
435                Ok((
436                    CompiledSqlCommand::ShowColumnsEntity,
437                    0,
438                    prepare_local_instructions,
439                    0,
440                    0,
441                ))
442            }
443            SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
444        }
445    }
446
447    // Resolve one SQL SELECT entirely through the shared lower query-plan
448    // cache and derive only the outward SQL projection contract locally.
449    fn sql_select_prepared_plan(
450        &self,
451        query: &StructuralQuery,
452        authority: EntityAuthority,
453        cache_schema_fingerprint: CommitSchemaFingerprint,
454    ) -> Result<
455        (
456            SharedPreparedExecutionPlan,
457            SqlProjectionContract,
458            SqlCacheAttribution,
459        ),
460        QueryError,
461    > {
462        let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
463            authority,
464            cache_schema_fingerprint,
465            query,
466        )?;
467        let projection_spec = prepared_plan
468            .logical_plan()
469            .projection_spec(authority.model());
470        let projection = SqlProjectionContract::new(
471            projection_labels_from_projection_spec(&projection_spec),
472            projection_fixed_scales_from_projection_spec(&projection_spec),
473        );
474
475        Ok((
476            prepared_plan,
477            projection,
478            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
479        ))
480    }
481
482    // Keep query/update surface gating owned by one helper so the SQL
483    // compiled-command lane does not duplicate the same statement-family split
484    // just to change the outward error wording.
485    fn ensure_sql_statement_supported_for_surface(
486        statement: &SqlStatement,
487        surface: SqlCompiledCommandSurface,
488    ) -> Result<(), QueryError> {
489        match (surface, statement) {
490            (
491                SqlCompiledCommandSurface::Query,
492                SqlStatement::Select(_)
493                | SqlStatement::Explain(_)
494                | SqlStatement::Describe(_)
495                | SqlStatement::ShowIndexes(_)
496                | SqlStatement::ShowColumns(_)
497                | SqlStatement::ShowEntities(_),
498            )
499            | (
500                SqlCompiledCommandSurface::Update,
501                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
502            ) => Ok(()),
503            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
504                Err(QueryError::unsupported_query(
505                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
506                ))
507            }
508            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
509                Err(QueryError::unsupported_query(
510                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
511                ))
512            }
513            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
514                Err(QueryError::unsupported_query(
515                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
516                ))
517            }
518            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
519                Err(QueryError::unsupported_query(
520                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
521                ))
522            }
523            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
524                Err(QueryError::unsupported_query(
525                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
526                ))
527            }
528            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
529                Err(QueryError::unsupported_query(
530                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
531                ))
532            }
533            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
534                Err(QueryError::unsupported_query(
535                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
536                ))
537            }
538            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
539                Err(QueryError::unsupported_query(
540                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
541                ))
542            }
543            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
544                Err(QueryError::unsupported_query(
545                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
546                ))
547            }
548        }
549    }
550
551    /// Execute one single-entity reduced SQL query or introspection statement.
552    ///
553    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
554    /// returns SQL-shaped statement output instead of typed entities.
555    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
556    where
557        E: PersistedRow<Canister = C> + EntityValue,
558    {
559        let compiled = self.compile_sql_query::<E>(sql)?;
560
561        self.execute_compiled_sql_owned::<E>(compiled)
562    }
563
564    /// Execute one reduced SQL query while reporting the compile/execute split
565    /// at the top-level SQL seam.
566    #[cfg(feature = "diagnostics")]
567    #[doc(hidden)]
568    #[expect(
569        clippy::needless_update,
570        reason = "diagnostics attribution literals stay default-backed so future counters do not break every initializer"
571    )]
572    #[expect(
573        clippy::too_many_lines,
574        reason = "diagnostics attribution explicitly enumerates every counter field at the public diagnostics boundary"
575    )]
576    pub fn execute_sql_query_with_attribution<E>(
577        &self,
578        sql: &str,
579    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
580    where
581        E: PersistedRow<Canister = C> + EntityValue,
582    {
583        // Phase 1: measure the compile side of the new seam, including parse,
584        // surface validation, and semantic command construction.
585        let (compile_local_instructions, compiled) =
586            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
587        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
588
589        // Phase 2: measure the execute side separately so repeat-run cache
590        // experiments can prove which side actually moved.
591        let store_get_calls_before = DataStore::current_get_call_count();
592        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
593        let pure_covering_row_assembly_before =
594            current_pure_covering_row_assembly_local_instructions();
595        let (result, execute_cache_attribution, execute_phase_attribution) =
596            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
597        let store_get_calls =
598            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
599        let pure_covering_decode_local_instructions =
600            current_pure_covering_decode_local_instructions()
601                .saturating_sub(pure_covering_decode_before);
602        let pure_covering_row_assembly_local_instructions =
603            current_pure_covering_row_assembly_local_instructions()
604                .saturating_sub(pure_covering_row_assembly_before);
605        let execute_local_instructions = execute_phase_attribution
606            .planner_local_instructions
607            .saturating_add(execute_phase_attribution.store_local_instructions)
608            .saturating_add(execute_phase_attribution.executor_local_instructions)
609            .saturating_add(execute_phase_attribution.response_finalization_local_instructions);
610        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
611        let total_local_instructions =
612            compile_local_instructions.saturating_add(execute_local_instructions);
613
614        Ok((
615            result,
616            SqlQueryExecutionAttribution {
617                compile_local_instructions,
618                compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
619                compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
620                compile_parse_local_instructions: compile_phase_attribution.parse,
621                compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
622                compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
623                compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
624                compile_parse_predicate_local_instructions: compile_phase_attribution
625                    .parse_predicate,
626                compile_aggregate_lane_check_local_instructions: compile_phase_attribution
627                    .aggregate_lane_check,
628                compile_prepare_local_instructions: compile_phase_attribution.prepare,
629                compile_lower_local_instructions: compile_phase_attribution.lower,
630                compile_bind_local_instructions: compile_phase_attribution.bind,
631                compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
632                plan_lookup_local_instructions: execute_phase_attribution
633                    .planner_local_instructions,
634                planner_local_instructions: execute_phase_attribution.planner_local_instructions,
635                store_local_instructions: execute_phase_attribution.store_local_instructions,
636                executor_invocation_local_instructions: execute_phase_attribution
637                    .executor_invocation_local_instructions,
638                executor_local_instructions: execute_phase_attribution.executor_local_instructions,
639                response_finalization_local_instructions: execute_phase_attribution
640                    .response_finalization_local_instructions,
641                grouped_stream_local_instructions: execute_phase_attribution
642                    .grouped_stream_local_instructions,
643                grouped_fold_local_instructions: execute_phase_attribution
644                    .grouped_fold_local_instructions,
645                grouped_finalize_local_instructions: execute_phase_attribution
646                    .grouped_finalize_local_instructions,
647                grouped_count_borrowed_hash_computations: execute_phase_attribution
648                    .grouped_count
649                    .borrowed_hash_computations,
650                grouped_count_bucket_candidate_checks: execute_phase_attribution
651                    .grouped_count
652                    .bucket_candidate_checks,
653                grouped_count_existing_group_hits: execute_phase_attribution
654                    .grouped_count
655                    .existing_group_hits,
656                grouped_count_new_group_inserts: execute_phase_attribution
657                    .grouped_count
658                    .new_group_inserts,
659                grouped_count_row_materialization_local_instructions: execute_phase_attribution
660                    .grouped_count
661                    .row_materialization_local_instructions,
662                grouped_count_group_lookup_local_instructions: execute_phase_attribution
663                    .grouped_count
664                    .group_lookup_local_instructions,
665                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
666                    .grouped_count
667                    .existing_group_update_local_instructions,
668                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
669                    .grouped_count
670                    .new_group_insert_local_instructions,
671                scalar_aggregate_base_row_local_instructions: execute_phase_attribution
672                    .scalar_aggregate_terminal
673                    .base_row_local_instructions,
674                scalar_aggregate_reducer_fold_local_instructions: execute_phase_attribution
675                    .scalar_aggregate_terminal
676                    .reducer_fold_local_instructions,
677                scalar_aggregate_expression_evaluations: execute_phase_attribution
678                    .scalar_aggregate_terminal
679                    .expression_evaluations,
680                scalar_aggregate_filter_evaluations: execute_phase_attribution
681                    .scalar_aggregate_terminal
682                    .filter_evaluations,
683                scalar_aggregate_rows_ingested: execute_phase_attribution
684                    .scalar_aggregate_terminal
685                    .rows_ingested,
686                scalar_aggregate_terminal_count: execute_phase_attribution
687                    .scalar_aggregate_terminal
688                    .terminal_count,
689                scalar_aggregate_unique_input_expr_count: execute_phase_attribution
690                    .scalar_aggregate_terminal
691                    .unique_input_expr_count,
692                scalar_aggregate_unique_filter_expr_count: execute_phase_attribution
693                    .scalar_aggregate_terminal
694                    .unique_filter_expr_count,
695                scalar_aggregate_sink_mode: execute_phase_attribution
696                    .scalar_aggregate_terminal
697                    .sink_mode
698                    .label()
699                    .map(str::to_string),
700                pure_covering_decode_local_instructions,
701                pure_covering_row_assembly_local_instructions,
702                store_get_calls,
703                response_decode_local_instructions: 0,
704                execute_local_instructions,
705                total_local_instructions,
706                sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
707                sql_compiled_command_cache_misses: cache_attribution
708                    .sql_compiled_command_cache_misses,
709                shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
710                shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
711                ..SqlQueryExecutionAttribution::default()
712            },
713        ))
714    }
715
716    /// Execute one single-entity reduced SQL mutation statement.
717    ///
718    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
719    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
720    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
721    where
722        E: PersistedRow<Canister = C> + EntityValue,
723    {
724        let compiled = self.compile_sql_update::<E>(sql)?;
725
726        self.execute_compiled_sql_owned::<E>(compiled)
727    }
728
729    // Compile one SQL query-surface string into the session-owned generic-free
730    // semantic command artifact before execution.
731    pub(in crate::db) fn compile_sql_query<E>(
732        &self,
733        sql: &str,
734    ) -> Result<CompiledSqlCommand, QueryError>
735    where
736        E: PersistedRow<Canister = C> + EntityValue,
737    {
738        self.compile_sql_query_with_cache_attribution::<E>(sql)
739            .map(|(compiled, _, _)| compiled)
740    }
741
742    fn compile_sql_query_with_cache_attribution<E>(
743        &self,
744        sql: &str,
745    ) -> Result<
746        (
747            CompiledSqlCommand,
748            SqlCacheAttribution,
749            SqlCompilePhaseAttribution,
750        ),
751        QueryError,
752    >
753    where
754        E: PersistedRow<Canister = C> + EntityValue,
755    {
756        self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
757    }
758
759    // Compile one SQL update-surface string into the session-owned generic-free
760    // semantic command artifact before execution.
761    pub(in crate::db) fn compile_sql_update<E>(
762        &self,
763        sql: &str,
764    ) -> Result<CompiledSqlCommand, QueryError>
765    where
766        E: PersistedRow<Canister = C> + EntityValue,
767    {
768        self.compile_sql_update_with_cache_attribution::<E>(sql)
769            .map(|(compiled, _, _)| compiled)
770    }
771
772    fn compile_sql_update_with_cache_attribution<E>(
773        &self,
774        sql: &str,
775    ) -> Result<
776        (
777            CompiledSqlCommand,
778            SqlCacheAttribution,
779            SqlCompilePhaseAttribution,
780        ),
781        QueryError,
782    >
783    where
784        E: PersistedRow<Canister = C> + EntityValue,
785    {
786        self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
787    }
788
789    // Reuse one internal compile shell for both outward SQL surfaces so query
790    // and update no longer duplicate cache-key construction and surface
791    // validation plumbing before they reach the real compile/cache owner.
792    fn compile_sql_surface_with_cache_attribution<E>(
793        &self,
794        sql: &str,
795        surface: SqlCompiledCommandSurface,
796    ) -> Result<
797        (
798            CompiledSqlCommand,
799            SqlCacheAttribution,
800            SqlCompilePhaseAttribution,
801        ),
802        QueryError,
803    >
804    where
805        E: PersistedRow<Canister = C> + EntityValue,
806    {
807        let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
808            Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
809        });
810        let cache_key = cache_key?;
811
812        self.compile_sql_statement_with_cache::<E, _>(
813            cache_key,
814            cache_key_local_instructions,
815            sql,
816            |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
817        )
818    }
819
820    // Reuse one previously compiled SQL artifact when the session-local cache
821    // can prove the surface, entity contract, and raw SQL text all match.
822    fn compile_sql_statement_with_cache<E, F>(
823        &self,
824        cache_key: SqlCompiledCommandCacheKey,
825        cache_key_local_instructions: u64,
826        sql: &str,
827        ensure_surface_supported: F,
828    ) -> Result<
829        (
830            CompiledSqlCommand,
831            SqlCacheAttribution,
832            SqlCompilePhaseAttribution,
833        ),
834        QueryError,
835    >
836    where
837        E: PersistedRow<Canister = C> + EntityValue,
838        F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
839    {
840        let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
841            let cached =
842                self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
843            Ok::<_, QueryError>(cached)
844        });
845        let cached = cached?;
846        if let Some(compiled) = cached {
847            return Ok((
848                compiled,
849                SqlCacheAttribution::sql_compiled_command_cache_hit(),
850                SqlCompilePhaseAttribution::cache_hit(
851                    cache_key_local_instructions,
852                    cache_lookup_local_instructions,
853                ),
854            ));
855        }
856
857        let (parse_local_instructions, parsed) = measure_sql_stage(|| {
858            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
859        });
860        let (parsed, parse_attribution) = parsed?;
861        let parse_select_local_instructions = parse_local_instructions
862            .saturating_sub(parse_attribution.tokenize)
863            .saturating_sub(parse_attribution.expr)
864            .saturating_sub(parse_attribution.predicate);
865        ensure_surface_supported(&parsed)?;
866        let authority = EntityAuthority::for_type::<E>();
867        let (
868            compiled,
869            aggregate_lane_check_local_instructions,
870            prepare_local_instructions,
871            lower_local_instructions,
872            bind_local_instructions,
873        ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
874
875        let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
876            self.with_sql_compiled_command_cache(|cache| {
877                cache.insert(cache_key, compiled.clone());
878            });
879            Ok::<_, QueryError>(())
880        });
881        cache_insert?;
882
883        Ok((
884            compiled,
885            SqlCacheAttribution::sql_compiled_command_cache_miss(),
886            SqlCompilePhaseAttribution {
887                cache_key: cache_key_local_instructions,
888                cache_lookup: cache_lookup_local_instructions,
889                parse: parse_local_instructions,
890                parse_tokenize: parse_attribution.tokenize,
891                parse_select: parse_select_local_instructions,
892                parse_expr: parse_attribution.expr,
893                parse_predicate: parse_attribution.predicate,
894                aggregate_lane_check: aggregate_lane_check_local_instructions,
895                prepare: prepare_local_instructions,
896                lower: lower_local_instructions,
897                bind: bind_local_instructions,
898                cache_insert: cache_insert_local_instructions,
899            },
900        ))
901    }
902}