Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod execute;
8mod projection;
9
10#[cfg(feature = "diagnostics")]
11use candid::CandidType;
12#[cfg(feature = "diagnostics")]
13use serde::Deserialize;
14use std::{cell::RefCell, collections::HashMap, sync::Arc};
15
16// Bump these when SQL cache-key meaning changes in a way that must force
17// existing in-heap entries to miss instead of aliasing old semantics.
18// This cache deliberately stays on syntax-bound SQL statement identity for the
19// front-end prepared/template lane. Grouped semantic canonicalization and
20// grouped structural/cache identity do not flow into this key.
21const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
22
23#[cfg(feature = "diagnostics")]
24use crate::db::DataStore;
25#[cfg(feature = "diagnostics")]
26use crate::db::executor::GroupedCountAttribution;
27#[cfg(feature = "diagnostics")]
28use crate::db::session::sql::projection::{
29    current_pure_covering_decode_local_instructions,
30    current_pure_covering_row_assembly_local_instructions,
31};
32#[cfg(test)]
33use crate::db::sql::parser::parse_sql;
34use crate::db::sql::parser::{SqlInsertStatement, SqlReturningProjection, SqlUpdateStatement};
35use crate::{
36    db::{
37        DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
38        commit::CommitSchemaFingerprint,
39        executor::{EntityAuthority, SharedPreparedExecutionPlan},
40        query::intent::StructuralQuery,
41        schema::commit_schema_fingerprint_for_entity,
42        session::query::QueryPlanCacheAttribution,
43        session::sql::projection::{
44            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
45        },
46        sql::identifier::identifiers_tail_match,
47        sql::lowering::{
48            LoweredSqlCommand, SqlGlobalAggregateCommandCore, SqlLoweringError,
49            bind_lowered_sql_delete_query_structural, bind_lowered_sql_select_query_structural,
50            compile_sql_global_aggregate_command_core_from_prepared,
51            extract_prepared_sql_insert_statement, extract_prepared_sql_update_statement,
52            lower_prepared_sql_delete_statement, lower_prepared_sql_select_statement,
53            lower_sql_command_from_prepared_statement, prepare_sql_statement,
54        },
55        sql::parser::{SqlStatement, parse_sql_with_attribution},
56    },
57    traits::{CanisterKind, EntityValue},
58    value::OutputValue,
59};
60
61#[cfg(all(test, not(feature = "diagnostics")))]
62pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
63#[cfg(feature = "diagnostics")]
64pub use crate::db::session::sql::projection::{
65    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
66};
67
68/// Unified SQL statement payload returned by shared SQL lane execution.
69#[derive(Debug)]
70pub enum SqlStatementResult {
71    Count {
72        row_count: u32,
73    },
74    Projection {
75        columns: Vec<String>,
76        fixed_scales: Vec<Option<u32>>,
77        rows: Vec<Vec<OutputValue>>,
78        row_count: u32,
79    },
80    ProjectionText {
81        columns: Vec<String>,
82        rows: Vec<Vec<String>>,
83        row_count: u32,
84    },
85    Grouped {
86        columns: Vec<String>,
87        fixed_scales: Vec<Option<u32>>,
88        rows: Vec<GroupedRow>,
89        row_count: u32,
90        next_cursor: Option<String>,
91    },
92    Explain(String),
93    Describe(crate::db::EntitySchemaDescription),
94    ShowIndexes(Vec<String>),
95    ShowColumns(Vec<crate::db::EntityFieldDescription>),
96    ShowEntities(Vec<String>),
97}
98
99///
100/// SqlQueryExecutionAttribution
101///
102/// SqlQueryExecutionAttribution records the top-level reduced SQL query cost
103/// split at the new compile/execute seam.
104/// This keeps future cache validation focused on one concrete question:
105/// whether repeated queries stop paying compile cost while execute cost stays
106/// otherwise comparable.
107///
108
109#[cfg(feature = "diagnostics")]
110#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
111pub struct SqlQueryExecutionAttribution {
112    pub compile_local_instructions: u64,
113    pub compile_cache_key_local_instructions: u64,
114    pub compile_cache_lookup_local_instructions: u64,
115    pub compile_parse_local_instructions: u64,
116    pub compile_parse_tokenize_local_instructions: u64,
117    pub compile_parse_select_local_instructions: u64,
118    pub compile_parse_expr_local_instructions: u64,
119    pub compile_parse_predicate_local_instructions: u64,
120    pub compile_aggregate_lane_check_local_instructions: u64,
121    pub compile_prepare_local_instructions: u64,
122    pub compile_lower_local_instructions: u64,
123    pub compile_bind_local_instructions: u64,
124    pub compile_cache_insert_local_instructions: u64,
125    pub planner_local_instructions: u64,
126    pub store_local_instructions: u64,
127    pub executor_local_instructions: u64,
128    pub grouped_stream_local_instructions: u64,
129    pub grouped_fold_local_instructions: u64,
130    pub grouped_finalize_local_instructions: u64,
131    pub grouped_count_borrowed_hash_computations: u64,
132    pub grouped_count_bucket_candidate_checks: u64,
133    pub grouped_count_existing_group_hits: u64,
134    pub grouped_count_new_group_inserts: u64,
135    pub grouped_count_row_materialization_local_instructions: u64,
136    pub grouped_count_group_lookup_local_instructions: u64,
137    pub grouped_count_existing_group_update_local_instructions: u64,
138    pub grouped_count_new_group_insert_local_instructions: u64,
139    pub pure_covering_decode_local_instructions: u64,
140    pub pure_covering_row_assembly_local_instructions: u64,
141    pub store_get_calls: u64,
142    pub response_decode_local_instructions: u64,
143    pub execute_local_instructions: u64,
144    pub total_local_instructions: u64,
145    pub sql_compiled_command_cache_hits: u64,
146    pub sql_compiled_command_cache_misses: u64,
147    pub shared_query_plan_cache_hits: u64,
148    pub shared_query_plan_cache_misses: u64,
149}
150
151// SqlExecutePhaseAttribution keeps the execute side split into select-plan
152// work, physical store/index access, and narrower runtime execution so shell
153// tooling can show all three.
154#[cfg(feature = "diagnostics")]
155#[derive(Clone, Copy, Debug, Eq, PartialEq)]
156pub(in crate::db) struct SqlExecutePhaseAttribution {
157    pub planner_local_instructions: u64,
158    pub store_local_instructions: u64,
159    pub executor_local_instructions: u64,
160    pub grouped_stream_local_instructions: u64,
161    pub grouped_fold_local_instructions: u64,
162    pub grouped_finalize_local_instructions: u64,
163    pub grouped_count: GroupedCountAttribution,
164}
165
166///
167/// SqlCompilePhaseAttribution
168///
169/// SqlCompilePhaseAttribution keeps the SQL-front-end compile miss path split
170/// into the concrete stages that still exist after the shared lower-cache
171/// collapse.
172/// This lets perf audits distinguish cache lookup, parsing, prepared-statement
173/// normalization, lowered-command construction, structural binding, and cache
174/// insertion cost instead of treating compile as one opaque bucket.
175///
176
177#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
178pub(in crate::db) struct SqlCompilePhaseAttribution {
179    pub cache_key: u64,
180    pub cache_lookup: u64,
181    pub parse: u64,
182    pub parse_tokenize: u64,
183    pub parse_select: u64,
184    pub parse_expr: u64,
185    pub parse_predicate: u64,
186    pub aggregate_lane_check: u64,
187    pub prepare: u64,
188    pub lower: u64,
189    pub bind: u64,
190    pub cache_insert: u64,
191}
192
193impl SqlCompilePhaseAttribution {
194    #[must_use]
195    const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
196        Self {
197            cache_key,
198            cache_lookup,
199            parse: 0,
200            parse_tokenize: 0,
201            parse_select: 0,
202            parse_expr: 0,
203            parse_predicate: 0,
204            aggregate_lane_check: 0,
205            prepare: 0,
206            lower: 0,
207            bind: 0,
208            cache_insert: 0,
209        }
210    }
211}
212
213#[cfg(feature = "diagnostics")]
214impl SqlExecutePhaseAttribution {
215    #[must_use]
216    pub(in crate::db) const fn from_execute_total_and_store_total(
217        execute_local_instructions: u64,
218        store_local_instructions: u64,
219    ) -> Self {
220        Self {
221            planner_local_instructions: 0,
222            store_local_instructions,
223            executor_local_instructions: execute_local_instructions
224                .saturating_sub(store_local_instructions),
225            grouped_stream_local_instructions: 0,
226            grouped_fold_local_instructions: 0,
227            grouped_finalize_local_instructions: 0,
228            grouped_count: GroupedCountAttribution::none(),
229        }
230    }
231}
232
233// SqlCacheAttribution keeps the surviving SQL-front-end compile cache separate
234// from the shared lower query-plan cache so perf audits can tell which
235// boundary actually produced reuse on one query path.
236// The SQL compiled-command / prepared-template cache is syntax-bound; the
237// shared lower query-plan cache is where canonical semantic identity applies.
238#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
239pub(in crate::db) struct SqlCacheAttribution {
240    pub sql_compiled_command_cache_hits: u64,
241    pub sql_compiled_command_cache_misses: u64,
242    pub shared_query_plan_cache_hits: u64,
243    pub shared_query_plan_cache_misses: u64,
244}
245
246impl SqlCacheAttribution {
247    #[must_use]
248    const fn none() -> Self {
249        Self {
250            sql_compiled_command_cache_hits: 0,
251            sql_compiled_command_cache_misses: 0,
252            shared_query_plan_cache_hits: 0,
253            shared_query_plan_cache_misses: 0,
254        }
255    }
256
257    #[must_use]
258    const fn sql_compiled_command_cache_hit() -> Self {
259        Self {
260            sql_compiled_command_cache_hits: 1,
261            ..Self::none()
262        }
263    }
264
265    #[must_use]
266    const fn sql_compiled_command_cache_miss() -> Self {
267        Self {
268            sql_compiled_command_cache_misses: 1,
269            ..Self::none()
270        }
271    }
272
273    #[must_use]
274    const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
275        Self {
276            shared_query_plan_cache_hits: attribution.hits,
277            shared_query_plan_cache_misses: attribution.misses,
278            ..Self::none()
279        }
280    }
281
282    #[must_use]
283    const fn merge(self, other: Self) -> Self {
284        Self {
285            sql_compiled_command_cache_hits: self
286                .sql_compiled_command_cache_hits
287                .saturating_add(other.sql_compiled_command_cache_hits),
288            sql_compiled_command_cache_misses: self
289                .sql_compiled_command_cache_misses
290                .saturating_add(other.sql_compiled_command_cache_misses),
291            shared_query_plan_cache_hits: self
292                .shared_query_plan_cache_hits
293                .saturating_add(other.shared_query_plan_cache_hits),
294            shared_query_plan_cache_misses: self
295                .shared_query_plan_cache_misses
296                .saturating_add(other.shared_query_plan_cache_misses),
297        }
298    }
299}
300
301#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
302enum SqlCompiledCommandSurface {
303    Query,
304    Update,
305}
306
307///
308/// SqlCompiledCommandCacheKey
309///
310/// SqlCompiledCommandCacheKey pins one compiled SQL artifact to the exact
311/// session-local semantic boundary that produced it.
312/// The key is intentionally conservative: surface kind, entity path, schema
313/// fingerprint, and raw SQL text must all match before execution can reuse a
314/// prior compile result.
315///
316
317#[derive(Clone, Debug, Eq, Hash, PartialEq)]
318pub(in crate::db) struct SqlCompiledCommandCacheKey {
319    cache_method_version: u8,
320    surface: SqlCompiledCommandSurface,
321    entity_path: &'static str,
322    schema_fingerprint: CommitSchemaFingerprint,
323    sql: String,
324}
325
326///
327/// SqlProjectionContract
328///
329/// SqlProjectionContract is the outward SQL projection contract
330/// derived from one shared lower prepared plan.
331/// SQL execution keeps this wrapper so statement shaping stays owner-local
332/// while all prepared-plan reuse lives entirely below the SQL boundary.
333///
334
335#[derive(Clone, Debug)]
336pub(in crate::db) struct SqlProjectionContract {
337    columns: Vec<String>,
338    fixed_scales: Vec<Option<u32>>,
339}
340
341impl SqlProjectionContract {
342    #[must_use]
343    pub(in crate::db) const fn new(columns: Vec<String>, fixed_scales: Vec<Option<u32>>) -> Self {
344        Self {
345            columns,
346            fixed_scales,
347        }
348    }
349
350    #[must_use]
351    pub(in crate::db) fn into_parts(self) -> (Vec<String>, Vec<Option<u32>>) {
352        (self.columns, self.fixed_scales)
353    }
354}
355
356impl SqlCompiledCommandCacheKey {
357    fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
358    where
359        E: PersistedRow + EntityValue,
360    {
361        Self {
362            cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
363            surface,
364            entity_path: E::PATH,
365            schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
366            sql: sql.to_string(),
367        }
368    }
369
370    #[must_use]
371    pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
372        self.schema_fingerprint
373    }
374}
375
376#[cfg(test)]
377impl SqlCompiledCommandCacheKey {
378    pub(in crate::db) fn query_for_entity_with_method_version<E>(
379        sql: &str,
380        cache_method_version: u8,
381    ) -> Self
382    where
383        E: PersistedRow + EntityValue,
384    {
385        Self::for_entity_with_method_version::<E>(
386            SqlCompiledCommandSurface::Query,
387            sql,
388            cache_method_version,
389        )
390    }
391
392    pub(in crate::db) fn update_for_entity_with_method_version<E>(
393        sql: &str,
394        cache_method_version: u8,
395    ) -> Self
396    where
397        E: PersistedRow + EntityValue,
398    {
399        Self::for_entity_with_method_version::<E>(
400            SqlCompiledCommandSurface::Update,
401            sql,
402            cache_method_version,
403        )
404    }
405
406    fn for_entity_with_method_version<E>(
407        surface: SqlCompiledCommandSurface,
408        sql: &str,
409        cache_method_version: u8,
410    ) -> Self
411    where
412        E: PersistedRow + EntityValue,
413    {
414        Self {
415            cache_method_version,
416            surface,
417            entity_path: E::PATH,
418            schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
419            sql: sql.to_string(),
420        }
421    }
422}
423
424pub(in crate::db) type SqlCompiledCommandCache =
425    HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand>;
426
427thread_local! {
428    // Keep SQL-facing caches in canister-lifetime heap state keyed by the
429    // store registry identity so update calls can warm query-facing SQL reuse
430    // without leaking entries across unrelated registries in tests.
431    static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache>> =
432        RefCell::new(HashMap::default());
433}
434
435// Keep the compile artifact session-owned and generic-free so the SQL surface
436// can separate semantic compilation from execution without coupling the seam to
437// typed entity binding or executor scratch state.
438#[derive(Clone, Debug)]
439pub(in crate::db) enum CompiledSqlCommand {
440    Select {
441        query: Arc<StructuralQuery>,
442        compiled_cache_key: SqlCompiledCommandCacheKey,
443    },
444    Delete {
445        query: Arc<StructuralQuery>,
446        returning: Option<SqlReturningProjection>,
447    },
448    GlobalAggregate {
449        command: Box<SqlGlobalAggregateCommandCore>,
450    },
451    Explain(Box<LoweredSqlCommand>),
452    Insert(SqlInsertStatement),
453    Update(SqlUpdateStatement),
454    DescribeEntity,
455    ShowIndexesEntity,
456    ShowColumnsEntity,
457    ShowEntities,
458}
459
460// Keep parsing as a module-owned helper instead of hanging a pure parser off
461// `DbSession` as a fake session method.
462#[cfg(test)]
463pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
464    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
465}
466
467#[cfg(feature = "diagnostics")]
468#[expect(
469    clippy::missing_const_for_fn,
470    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
471)]
472fn read_sql_local_instruction_counter() -> u64 {
473    #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
474    {
475        canic_cdk::api::performance_counter(1)
476    }
477
478    #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
479    {
480        0
481    }
482}
483
484pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
485    run: impl FnOnce() -> Result<T, E>,
486) -> (u64, Result<T, E>) {
487    #[cfg(feature = "diagnostics")]
488    let start = read_sql_local_instruction_counter();
489
490    let result = run();
491
492    #[cfg(feature = "diagnostics")]
493    let delta = read_sql_local_instruction_counter().saturating_sub(start);
494
495    #[cfg(not(feature = "diagnostics"))]
496    let delta = 0;
497
498    (delta, result)
499}
500
501impl<C: CanisterKind> DbSession<C> {
502    fn with_sql_compiled_command_cache<R>(
503        &self,
504        f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
505    ) -> R {
506        let scope_id = self.db.cache_scope_id();
507
508        SQL_COMPILED_COMMAND_CACHES.with(|caches| {
509            let mut caches = caches.borrow_mut();
510            let cache = caches.entry(scope_id).or_default();
511
512            f(cache)
513        })
514    }
515
516    #[cfg(test)]
517    pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
518        self.with_sql_compiled_command_cache(|cache| cache.len())
519    }
520
521    #[cfg(test)]
522    pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
523        self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
524    }
525
526    // Compile one parsed SQL statement into the generic-free session-owned
527    // semantic command artifact for one resolved authority.
528    #[expect(clippy::too_many_lines)]
529    fn compile_sql_statement_for_authority(
530        statement: &SqlStatement,
531        authority: EntityAuthority,
532        compiled_cache_key: SqlCompiledCommandCacheKey,
533    ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
534        // Reuse one local preparation closure so the session compile surface
535        // reaches the prepared-statement owner directly without another
536        // single-purpose module hop.
537        let prepare_statement = || {
538            measure_sql_stage(|| {
539                prepare_sql_statement(statement.clone(), authority.model().name())
540                    .map_err(QueryError::from_sql_lowering_error)
541            })
542        };
543
544        // Keep metadata-only entity checks local to the compile lane because
545        // they are part of statement admission, not a separate boundary.
546        let validate_metadata_entity = |sql_entity: &str| {
547            if identifiers_tail_match(sql_entity, authority.model().name()) {
548                return Ok(());
549            }
550
551            Err(QueryError::from_sql_lowering_error(
552                SqlLoweringError::EntityMismatch {
553                    sql_entity: sql_entity.to_string(),
554                    expected_entity: authority.model().name(),
555                },
556            ))
557        };
558
559        match statement {
560            SqlStatement::Select(_) => {
561                let (prepare_local_instructions, prepared) = prepare_statement();
562                let prepared = prepared?;
563                let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
564                    measure_sql_stage(|| {
565                        Ok::<_, QueryError>(prepared.statement().is_global_aggregate_lane_shape())
566                    });
567                let requires_aggregate_lane = requires_aggregate_lane?;
568
569                if requires_aggregate_lane {
570                    let (lower_local_instructions, command) = measure_sql_stage(|| {
571                        compile_sql_global_aggregate_command_core_from_prepared(
572                            prepared,
573                            authority.model(),
574                            MissingRowPolicy::Ignore,
575                        )
576                        .map_err(QueryError::from_sql_lowering_error)
577                    });
578                    let command = command?;
579
580                    Ok((
581                        CompiledSqlCommand::GlobalAggregate {
582                            command: Box::new(command),
583                        },
584                        aggregate_lane_check_local_instructions,
585                        prepare_local_instructions,
586                        lower_local_instructions,
587                        0,
588                    ))
589                } else {
590                    let (lower_local_instructions, select) = measure_sql_stage(|| {
591                        lower_prepared_sql_select_statement(prepared, authority.model())
592                            .map_err(QueryError::from_sql_lowering_error)
593                    });
594                    let select = select?;
595                    let (bind_local_instructions, query) = measure_sql_stage(|| {
596                        bind_lowered_sql_select_query_structural(
597                            authority.model(),
598                            select,
599                            MissingRowPolicy::Ignore,
600                        )
601                        .map_err(QueryError::from_sql_lowering_error)
602                    });
603                    let query = query?;
604
605                    Ok((
606                        CompiledSqlCommand::Select {
607                            query: Arc::new(query),
608                            compiled_cache_key,
609                        },
610                        aggregate_lane_check_local_instructions,
611                        prepare_local_instructions,
612                        lower_local_instructions,
613                        bind_local_instructions,
614                    ))
615                }
616            }
617            SqlStatement::Delete(_) => {
618                let (prepare_local_instructions, prepared) = prepare_statement();
619                let prepared = prepared?;
620                let (lower_local_instructions, lowered) = measure_sql_stage(|| {
621                    lower_prepared_sql_delete_statement(prepared)
622                        .map_err(QueryError::from_sql_lowering_error)
623                });
624                let delete = lowered?;
625                let returning = delete.returning().cloned();
626                let query = delete.into_base_query();
627                let (bind_local_instructions, query) = measure_sql_stage(|| {
628                    Ok::<_, QueryError>(bind_lowered_sql_delete_query_structural(
629                        authority.model(),
630                        query,
631                        MissingRowPolicy::Ignore,
632                    ))
633                });
634                let query = query?;
635
636                Ok((
637                    CompiledSqlCommand::Delete {
638                        query: Arc::new(query),
639                        returning,
640                    },
641                    0,
642                    prepare_local_instructions,
643                    lower_local_instructions,
644                    bind_local_instructions,
645                ))
646            }
647            SqlStatement::Insert(_) => {
648                let (prepare_local_instructions, prepared) = prepare_statement();
649                let prepared = prepared?;
650                let statement = extract_prepared_sql_insert_statement(prepared)
651                    .map_err(QueryError::from_sql_lowering_error)?;
652
653                Ok((
654                    CompiledSqlCommand::Insert(statement),
655                    0,
656                    prepare_local_instructions,
657                    0,
658                    0,
659                ))
660            }
661            SqlStatement::Update(_) => {
662                let (prepare_local_instructions, prepared) = prepare_statement();
663                let prepared = prepared?;
664                let statement = extract_prepared_sql_update_statement(prepared)
665                    .map_err(QueryError::from_sql_lowering_error)?;
666
667                Ok((
668                    CompiledSqlCommand::Update(statement),
669                    0,
670                    prepare_local_instructions,
671                    0,
672                    0,
673                ))
674            }
675            SqlStatement::Explain(_) => {
676                let (prepare_local_instructions, prepared) = prepare_statement();
677                let prepared = prepared?;
678                let (lower_local_instructions, lowered) = measure_sql_stage(|| {
679                    lower_sql_command_from_prepared_statement(prepared, authority.model())
680                        .map_err(QueryError::from_sql_lowering_error)
681                });
682                let lowered = lowered?;
683
684                Ok((
685                    CompiledSqlCommand::Explain(Box::new(lowered)),
686                    0,
687                    prepare_local_instructions,
688                    lower_local_instructions,
689                    0,
690                ))
691            }
692            SqlStatement::Describe(_) => {
693                let (prepare_local_instructions, validated) = measure_sql_stage(|| {
694                    let SqlStatement::Describe(statement) = statement else {
695                        return Err(QueryError::invariant(
696                            "compiled SQL DESCRIBE lane must preserve DESCRIBE statement ownership",
697                        ));
698                    };
699
700                    validate_metadata_entity(statement.entity.as_str())
701                });
702                validated?;
703
704                Ok((
705                    CompiledSqlCommand::DescribeEntity,
706                    0,
707                    prepare_local_instructions,
708                    0,
709                    0,
710                ))
711            }
712            SqlStatement::ShowIndexes(entity) => {
713                let (prepare_local_instructions, validated) =
714                    measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
715                validated?;
716
717                Ok((
718                    CompiledSqlCommand::ShowIndexesEntity,
719                    0,
720                    prepare_local_instructions,
721                    0,
722                    0,
723                ))
724            }
725            SqlStatement::ShowColumns(entity) => {
726                let (prepare_local_instructions, validated) =
727                    measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
728                validated?;
729
730                Ok((
731                    CompiledSqlCommand::ShowColumnsEntity,
732                    0,
733                    prepare_local_instructions,
734                    0,
735                    0,
736                ))
737            }
738            SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
739        }
740    }
741
742    // Resolve one SQL SELECT entirely through the shared lower query-plan
743    // cache and derive only the outward SQL projection contract locally.
744    fn sql_select_prepared_plan(
745        &self,
746        query: &StructuralQuery,
747        authority: EntityAuthority,
748        cache_schema_fingerprint: CommitSchemaFingerprint,
749    ) -> Result<
750        (
751            SharedPreparedExecutionPlan,
752            SqlProjectionContract,
753            SqlCacheAttribution,
754        ),
755        QueryError,
756    > {
757        let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
758            authority,
759            cache_schema_fingerprint,
760            query,
761        )?;
762        let projection_spec = prepared_plan
763            .logical_plan()
764            .projection_spec(authority.model());
765        let projection = SqlProjectionContract::new(
766            projection_labels_from_projection_spec(&projection_spec),
767            projection_fixed_scales_from_projection_spec(&projection_spec),
768        );
769
770        Ok((
771            prepared_plan,
772            projection,
773            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
774        ))
775    }
776
777    // Keep query/update surface gating owned by one helper so the SQL
778    // compiled-command lane does not duplicate the same statement-family split
779    // just to change the outward error wording.
780    fn ensure_sql_statement_supported_for_surface(
781        statement: &SqlStatement,
782        surface: SqlCompiledCommandSurface,
783    ) -> Result<(), QueryError> {
784        match (surface, statement) {
785            (
786                SqlCompiledCommandSurface::Query,
787                SqlStatement::Select(_)
788                | SqlStatement::Explain(_)
789                | SqlStatement::Describe(_)
790                | SqlStatement::ShowIndexes(_)
791                | SqlStatement::ShowColumns(_)
792                | SqlStatement::ShowEntities(_),
793            )
794            | (
795                SqlCompiledCommandSurface::Update,
796                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
797            ) => Ok(()),
798            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
799                Err(QueryError::unsupported_query(
800                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
801                ))
802            }
803            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
804                Err(QueryError::unsupported_query(
805                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
806                ))
807            }
808            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
809                Err(QueryError::unsupported_query(
810                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
811                ))
812            }
813            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
814                Err(QueryError::unsupported_query(
815                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
816                ))
817            }
818            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
819                Err(QueryError::unsupported_query(
820                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
821                ))
822            }
823            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
824                Err(QueryError::unsupported_query(
825                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
826                ))
827            }
828            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
829                Err(QueryError::unsupported_query(
830                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
831                ))
832            }
833            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
834                Err(QueryError::unsupported_query(
835                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
836                ))
837            }
838            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
839                Err(QueryError::unsupported_query(
840                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
841                ))
842            }
843        }
844    }
845
846    /// Execute one single-entity reduced SQL query or introspection statement.
847    ///
848    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
849    /// returns SQL-shaped statement output instead of typed entities.
850    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
851    where
852        E: PersistedRow<Canister = C> + EntityValue,
853    {
854        let compiled = self.compile_sql_query::<E>(sql)?;
855
856        self.execute_compiled_sql::<E>(&compiled)
857    }
858
859    /// Execute one reduced SQL query while reporting the compile/execute split
860    /// at the top-level SQL seam.
861    #[cfg(feature = "diagnostics")]
862    #[doc(hidden)]
863    pub fn execute_sql_query_with_attribution<E>(
864        &self,
865        sql: &str,
866    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
867    where
868        E: PersistedRow<Canister = C> + EntityValue,
869    {
870        // Phase 1: measure the compile side of the new seam, including parse,
871        // surface validation, and semantic command construction.
872        let (compile_local_instructions, compiled) =
873            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
874        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
875
876        // Phase 2: measure the execute side separately so repeat-run cache
877        // experiments can prove which side actually moved.
878        let store_get_calls_before = DataStore::current_get_call_count();
879        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
880        let pure_covering_row_assembly_before =
881            current_pure_covering_row_assembly_local_instructions();
882        let (result, execute_cache_attribution, execute_phase_attribution) =
883            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
884        let store_get_calls =
885            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
886        let pure_covering_decode_local_instructions =
887            current_pure_covering_decode_local_instructions()
888                .saturating_sub(pure_covering_decode_before);
889        let pure_covering_row_assembly_local_instructions =
890            current_pure_covering_row_assembly_local_instructions()
891                .saturating_sub(pure_covering_row_assembly_before);
892        let execute_local_instructions = execute_phase_attribution
893            .planner_local_instructions
894            .saturating_add(execute_phase_attribution.store_local_instructions)
895            .saturating_add(execute_phase_attribution.executor_local_instructions);
896        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
897        let total_local_instructions =
898            compile_local_instructions.saturating_add(execute_local_instructions);
899
900        Ok((
901            result,
902            SqlQueryExecutionAttribution {
903                compile_local_instructions,
904                compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
905                compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
906                compile_parse_local_instructions: compile_phase_attribution.parse,
907                compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
908                compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
909                compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
910                compile_parse_predicate_local_instructions: compile_phase_attribution
911                    .parse_predicate,
912                compile_aggregate_lane_check_local_instructions: compile_phase_attribution
913                    .aggregate_lane_check,
914                compile_prepare_local_instructions: compile_phase_attribution.prepare,
915                compile_lower_local_instructions: compile_phase_attribution.lower,
916                compile_bind_local_instructions: compile_phase_attribution.bind,
917                compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
918                planner_local_instructions: execute_phase_attribution.planner_local_instructions,
919                store_local_instructions: execute_phase_attribution.store_local_instructions,
920                executor_local_instructions: execute_phase_attribution.executor_local_instructions,
921                grouped_stream_local_instructions: execute_phase_attribution
922                    .grouped_stream_local_instructions,
923                grouped_fold_local_instructions: execute_phase_attribution
924                    .grouped_fold_local_instructions,
925                grouped_finalize_local_instructions: execute_phase_attribution
926                    .grouped_finalize_local_instructions,
927                grouped_count_borrowed_hash_computations: execute_phase_attribution
928                    .grouped_count
929                    .borrowed_hash_computations,
930                grouped_count_bucket_candidate_checks: execute_phase_attribution
931                    .grouped_count
932                    .bucket_candidate_checks,
933                grouped_count_existing_group_hits: execute_phase_attribution
934                    .grouped_count
935                    .existing_group_hits,
936                grouped_count_new_group_inserts: execute_phase_attribution
937                    .grouped_count
938                    .new_group_inserts,
939                grouped_count_row_materialization_local_instructions: execute_phase_attribution
940                    .grouped_count
941                    .row_materialization_local_instructions,
942                grouped_count_group_lookup_local_instructions: execute_phase_attribution
943                    .grouped_count
944                    .group_lookup_local_instructions,
945                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
946                    .grouped_count
947                    .existing_group_update_local_instructions,
948                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
949                    .grouped_count
950                    .new_group_insert_local_instructions,
951                pure_covering_decode_local_instructions,
952                pure_covering_row_assembly_local_instructions,
953                store_get_calls,
954                response_decode_local_instructions: 0,
955                execute_local_instructions,
956                total_local_instructions,
957                sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
958                sql_compiled_command_cache_misses: cache_attribution
959                    .sql_compiled_command_cache_misses,
960                shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
961                shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
962            },
963        ))
964    }
965
966    /// Execute one single-entity reduced SQL mutation statement.
967    ///
968    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
969    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
970    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
971    where
972        E: PersistedRow<Canister = C> + EntityValue,
973    {
974        let compiled = self.compile_sql_update::<E>(sql)?;
975
976        self.execute_compiled_sql::<E>(&compiled)
977    }
978
979    // Compile one SQL query-surface string into the session-owned generic-free
980    // semantic command artifact before execution.
981    pub(in crate::db) fn compile_sql_query<E>(
982        &self,
983        sql: &str,
984    ) -> Result<CompiledSqlCommand, QueryError>
985    where
986        E: PersistedRow<Canister = C> + EntityValue,
987    {
988        self.compile_sql_query_with_cache_attribution::<E>(sql)
989            .map(|(compiled, _, _)| compiled)
990    }
991
992    fn compile_sql_query_with_cache_attribution<E>(
993        &self,
994        sql: &str,
995    ) -> Result<
996        (
997            CompiledSqlCommand,
998            SqlCacheAttribution,
999            SqlCompilePhaseAttribution,
1000        ),
1001        QueryError,
1002    >
1003    where
1004        E: PersistedRow<Canister = C> + EntityValue,
1005    {
1006        self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
1007    }
1008
1009    // Compile one SQL update-surface string into the session-owned generic-free
1010    // semantic command artifact before execution.
1011    pub(in crate::db) fn compile_sql_update<E>(
1012        &self,
1013        sql: &str,
1014    ) -> Result<CompiledSqlCommand, QueryError>
1015    where
1016        E: PersistedRow<Canister = C> + EntityValue,
1017    {
1018        self.compile_sql_update_with_cache_attribution::<E>(sql)
1019            .map(|(compiled, _, _)| compiled)
1020    }
1021
1022    fn compile_sql_update_with_cache_attribution<E>(
1023        &self,
1024        sql: &str,
1025    ) -> Result<
1026        (
1027            CompiledSqlCommand,
1028            SqlCacheAttribution,
1029            SqlCompilePhaseAttribution,
1030        ),
1031        QueryError,
1032    >
1033    where
1034        E: PersistedRow<Canister = C> + EntityValue,
1035    {
1036        self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
1037    }
1038
1039    // Reuse one internal compile shell for both outward SQL surfaces so query
1040    // and update no longer duplicate cache-key construction and surface
1041    // validation plumbing before they reach the real compile/cache owner.
1042    fn compile_sql_surface_with_cache_attribution<E>(
1043        &self,
1044        sql: &str,
1045        surface: SqlCompiledCommandSurface,
1046    ) -> Result<
1047        (
1048            CompiledSqlCommand,
1049            SqlCacheAttribution,
1050            SqlCompilePhaseAttribution,
1051        ),
1052        QueryError,
1053    >
1054    where
1055        E: PersistedRow<Canister = C> + EntityValue,
1056    {
1057        let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
1058            Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
1059        });
1060        let cache_key = cache_key?;
1061
1062        self.compile_sql_statement_with_cache::<E, _>(
1063            cache_key,
1064            cache_key_local_instructions,
1065            sql,
1066            |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
1067        )
1068    }
1069
1070    // Reuse one previously compiled SQL artifact when the session-local cache
1071    // can prove the surface, entity contract, and raw SQL text all match.
1072    fn compile_sql_statement_with_cache<E, F>(
1073        &self,
1074        cache_key: SqlCompiledCommandCacheKey,
1075        cache_key_local_instructions: u64,
1076        sql: &str,
1077        ensure_surface_supported: F,
1078    ) -> Result<
1079        (
1080            CompiledSqlCommand,
1081            SqlCacheAttribution,
1082            SqlCompilePhaseAttribution,
1083        ),
1084        QueryError,
1085    >
1086    where
1087        E: PersistedRow<Canister = C> + EntityValue,
1088        F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
1089    {
1090        let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
1091            let cached =
1092                self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
1093            Ok::<_, QueryError>(cached)
1094        });
1095        let cached = cached?;
1096        if let Some(compiled) = cached {
1097            return Ok((
1098                compiled,
1099                SqlCacheAttribution::sql_compiled_command_cache_hit(),
1100                SqlCompilePhaseAttribution::cache_hit(
1101                    cache_key_local_instructions,
1102                    cache_lookup_local_instructions,
1103                ),
1104            ));
1105        }
1106
1107        let (parse_local_instructions, parsed) = measure_sql_stage(|| {
1108            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
1109        });
1110        let (parsed, parse_attribution) = parsed?;
1111        let parse_select_local_instructions = parse_local_instructions
1112            .saturating_sub(parse_attribution.tokenize)
1113            .saturating_sub(parse_attribution.expr)
1114            .saturating_sub(parse_attribution.predicate);
1115        ensure_surface_supported(&parsed)?;
1116        let authority = EntityAuthority::for_type::<E>();
1117        let (
1118            compiled,
1119            aggregate_lane_check_local_instructions,
1120            prepare_local_instructions,
1121            lower_local_instructions,
1122            bind_local_instructions,
1123        ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
1124
1125        let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
1126            self.with_sql_compiled_command_cache(|cache| {
1127                cache.insert(cache_key, compiled.clone());
1128            });
1129            Ok::<_, QueryError>(())
1130        });
1131        cache_insert?;
1132
1133        Ok((
1134            compiled,
1135            SqlCacheAttribution::sql_compiled_command_cache_miss(),
1136            SqlCompilePhaseAttribution {
1137                cache_key: cache_key_local_instructions,
1138                cache_lookup: cache_lookup_local_instructions,
1139                parse: parse_local_instructions,
1140                parse_tokenize: parse_attribution.tokenize,
1141                parse_select: parse_select_local_instructions,
1142                parse_expr: parse_attribution.expr,
1143                parse_predicate: parse_attribution.predicate,
1144                aggregate_lane_check: aggregate_lane_check_local_instructions,
1145                prepare: prepare_local_instructions,
1146                lower: lower_local_instructions,
1147                bind: bind_local_instructions,
1148                cache_insert: cache_insert_local_instructions,
1149            },
1150        ))
1151    }
1152}