Skip to main content

icydb_core/db/session/sql/
mod.rs

1//! Module: db::session::sql
2//! Responsibility: session-owned SQL execution, explain, projection, and
3//! surface-classification helpers above lowered SQL commands.
4//! Does not own: SQL parsing or structural executor runtime behavior.
5//! Boundary: keeps session visibility, authority selection, and SQL surface routing in one subsystem.
6
7mod execute;
8mod projection;
9
10#[cfg(feature = "diagnostics")]
11use candid::CandidType;
12#[cfg(feature = "diagnostics")]
13use serde::Deserialize;
14use std::{cell::RefCell, collections::HashMap, sync::Arc};
15
16// Bump these when SQL cache-key meaning changes in a way that must force
17// existing in-heap entries to miss instead of aliasing old semantics.
18// This cache deliberately stays on syntax-bound SQL statement identity for the
19// front-end prepared/template lane. Grouped semantic canonicalization and
20// grouped structural/cache identity do not flow into this key.
21const SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION: u8 = 1;
22
23#[cfg(feature = "diagnostics")]
24use crate::db::DataStore;
25#[cfg(feature = "diagnostics")]
26use crate::db::executor::GroupedCountAttribution;
27#[cfg(feature = "diagnostics")]
28use crate::db::session::sql::projection::{
29    current_pure_covering_decode_local_instructions,
30    current_pure_covering_row_assembly_local_instructions,
31};
32#[cfg(test)]
33use crate::db::sql::parser::parse_sql;
34use crate::db::sql::parser::{SqlDeleteStatement, SqlInsertStatement, SqlUpdateStatement};
35use crate::{
36    db::{
37        DbSession, GroupedRow, MissingRowPolicy, PersistedRow, QueryError,
38        commit::CommitSchemaFingerprint,
39        executor::{EntityAuthority, SharedPreparedExecutionPlan},
40        query::intent::StructuralQuery,
41        schema::commit_schema_fingerprint_for_entity,
42        session::query::QueryPlanCacheAttribution,
43        session::sql::projection::{
44            projection_fixed_scales_from_projection_spec, projection_labels_from_projection_spec,
45        },
46        sql::identifier::identifiers_tail_match,
47        sql::lowering::{
48            LoweredBaseQueryShape, LoweredSqlCommand, LoweredSqlQuery,
49            SqlGlobalAggregateCommandCore, SqlLoweringError,
50            bind_lowered_sql_select_query_structural,
51            compile_sql_global_aggregate_command_core_from_prepared,
52            is_sql_global_aggregate_statement, lower_sql_command_from_prepared_statement,
53            prepare_sql_statement,
54        },
55        sql::parser::{SqlStatement, parse_sql_with_attribution},
56    },
57    traits::{CanisterKind, EntityValue},
58};
59
60#[cfg(all(test, not(feature = "diagnostics")))]
61pub(crate) use crate::db::session::sql::projection::with_sql_projection_materialization_metrics;
62#[cfg(feature = "diagnostics")]
63pub use crate::db::session::sql::projection::{
64    SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics,
65};
66
67/// Unified SQL statement payload returned by shared SQL lane execution.
68#[derive(Debug)]
69pub enum SqlStatementResult {
70    Count {
71        row_count: u32,
72    },
73    Projection {
74        columns: Vec<String>,
75        fixed_scales: Vec<Option<u32>>,
76        rows: Vec<Vec<crate::value::Value>>,
77        row_count: u32,
78    },
79    ProjectionText {
80        columns: Vec<String>,
81        rows: Vec<Vec<String>>,
82        row_count: u32,
83    },
84    Grouped {
85        columns: Vec<String>,
86        fixed_scales: Vec<Option<u32>>,
87        rows: Vec<GroupedRow>,
88        row_count: u32,
89        next_cursor: Option<String>,
90    },
91    Explain(String),
92    Describe(crate::db::EntitySchemaDescription),
93    ShowIndexes(Vec<String>),
94    ShowColumns(Vec<crate::db::EntityFieldDescription>),
95    ShowEntities(Vec<String>),
96}
97
98///
99/// SqlQueryExecutionAttribution
100///
101/// SqlQueryExecutionAttribution records the top-level reduced SQL query cost
102/// split at the new compile/execute seam.
103/// This keeps future cache validation focused on one concrete question:
104/// whether repeated queries stop paying compile cost while execute cost stays
105/// otherwise comparable.
106///
107
108#[cfg(feature = "diagnostics")]
109#[derive(CandidType, Clone, Debug, Deserialize, Eq, PartialEq)]
110pub struct SqlQueryExecutionAttribution {
111    pub compile_local_instructions: u64,
112    pub compile_cache_key_local_instructions: u64,
113    pub compile_cache_lookup_local_instructions: u64,
114    pub compile_parse_local_instructions: u64,
115    pub compile_parse_tokenize_local_instructions: u64,
116    pub compile_parse_select_local_instructions: u64,
117    pub compile_parse_expr_local_instructions: u64,
118    pub compile_parse_predicate_local_instructions: u64,
119    pub compile_aggregate_lane_check_local_instructions: u64,
120    pub compile_prepare_local_instructions: u64,
121    pub compile_lower_local_instructions: u64,
122    pub compile_bind_local_instructions: u64,
123    pub compile_cache_insert_local_instructions: u64,
124    pub planner_local_instructions: u64,
125    pub store_local_instructions: u64,
126    pub executor_local_instructions: u64,
127    pub grouped_stream_local_instructions: u64,
128    pub grouped_fold_local_instructions: u64,
129    pub grouped_finalize_local_instructions: u64,
130    pub grouped_count_borrowed_hash_computations: u64,
131    pub grouped_count_bucket_candidate_checks: u64,
132    pub grouped_count_existing_group_hits: u64,
133    pub grouped_count_new_group_inserts: u64,
134    pub grouped_count_row_materialization_local_instructions: u64,
135    pub grouped_count_group_lookup_local_instructions: u64,
136    pub grouped_count_existing_group_update_local_instructions: u64,
137    pub grouped_count_new_group_insert_local_instructions: u64,
138    pub pure_covering_decode_local_instructions: u64,
139    pub pure_covering_row_assembly_local_instructions: u64,
140    pub store_get_calls: u64,
141    pub response_decode_local_instructions: u64,
142    pub execute_local_instructions: u64,
143    pub total_local_instructions: u64,
144    pub sql_compiled_command_cache_hits: u64,
145    pub sql_compiled_command_cache_misses: u64,
146    pub shared_query_plan_cache_hits: u64,
147    pub shared_query_plan_cache_misses: u64,
148}
149
150// SqlExecutePhaseAttribution keeps the execute side split into select-plan
151// work, physical store/index access, and narrower runtime execution so shell
152// tooling can show all three.
153#[cfg(feature = "diagnostics")]
154#[derive(Clone, Copy, Debug, Eq, PartialEq)]
155pub(in crate::db) struct SqlExecutePhaseAttribution {
156    pub planner_local_instructions: u64,
157    pub store_local_instructions: u64,
158    pub executor_local_instructions: u64,
159    pub grouped_stream_local_instructions: u64,
160    pub grouped_fold_local_instructions: u64,
161    pub grouped_finalize_local_instructions: u64,
162    pub grouped_count: GroupedCountAttribution,
163}
164
165///
166/// SqlCompilePhaseAttribution
167///
168/// SqlCompilePhaseAttribution keeps the SQL-front-end compile miss path split
169/// into the concrete stages that still exist after the shared lower-cache
170/// collapse.
171/// This lets perf audits distinguish cache lookup, parsing, prepared-statement
172/// normalization, lowered-command construction, structural binding, and cache
173/// insertion cost instead of treating compile as one opaque bucket.
174///
175
176#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
177pub(in crate::db) struct SqlCompilePhaseAttribution {
178    pub cache_key: u64,
179    pub cache_lookup: u64,
180    pub parse: u64,
181    pub parse_tokenize: u64,
182    pub parse_select: u64,
183    pub parse_expr: u64,
184    pub parse_predicate: u64,
185    pub aggregate_lane_check: u64,
186    pub prepare: u64,
187    pub lower: u64,
188    pub bind: u64,
189    pub cache_insert: u64,
190}
191
192impl SqlCompilePhaseAttribution {
193    #[must_use]
194    const fn cache_hit(cache_key: u64, cache_lookup: u64) -> Self {
195        Self {
196            cache_key,
197            cache_lookup,
198            parse: 0,
199            parse_tokenize: 0,
200            parse_select: 0,
201            parse_expr: 0,
202            parse_predicate: 0,
203            aggregate_lane_check: 0,
204            prepare: 0,
205            lower: 0,
206            bind: 0,
207            cache_insert: 0,
208        }
209    }
210}
211
212#[cfg(feature = "diagnostics")]
213impl SqlExecutePhaseAttribution {
214    #[must_use]
215    pub(in crate::db) const fn from_execute_total_and_store_total(
216        execute_local_instructions: u64,
217        store_local_instructions: u64,
218    ) -> Self {
219        Self {
220            planner_local_instructions: 0,
221            store_local_instructions,
222            executor_local_instructions: execute_local_instructions
223                .saturating_sub(store_local_instructions),
224            grouped_stream_local_instructions: 0,
225            grouped_fold_local_instructions: 0,
226            grouped_finalize_local_instructions: 0,
227            grouped_count: GroupedCountAttribution::none(),
228        }
229    }
230}
231
232// SqlCacheAttribution keeps the surviving SQL-front-end compile cache separate
233// from the shared lower query-plan cache so perf audits can tell which
234// boundary actually produced reuse on one query path.
235// The SQL compiled-command / prepared-template cache is syntax-bound; the
236// shared lower query-plan cache is where canonical semantic identity applies.
237#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
238pub(in crate::db) struct SqlCacheAttribution {
239    pub sql_compiled_command_cache_hits: u64,
240    pub sql_compiled_command_cache_misses: u64,
241    pub shared_query_plan_cache_hits: u64,
242    pub shared_query_plan_cache_misses: u64,
243}
244
245impl SqlCacheAttribution {
246    #[must_use]
247    const fn none() -> Self {
248        Self {
249            sql_compiled_command_cache_hits: 0,
250            sql_compiled_command_cache_misses: 0,
251            shared_query_plan_cache_hits: 0,
252            shared_query_plan_cache_misses: 0,
253        }
254    }
255
256    #[must_use]
257    const fn sql_compiled_command_cache_hit() -> Self {
258        Self {
259            sql_compiled_command_cache_hits: 1,
260            ..Self::none()
261        }
262    }
263
264    #[must_use]
265    const fn sql_compiled_command_cache_miss() -> Self {
266        Self {
267            sql_compiled_command_cache_misses: 1,
268            ..Self::none()
269        }
270    }
271
272    #[must_use]
273    const fn from_shared_query_plan_cache(attribution: QueryPlanCacheAttribution) -> Self {
274        Self {
275            shared_query_plan_cache_hits: attribution.hits,
276            shared_query_plan_cache_misses: attribution.misses,
277            ..Self::none()
278        }
279    }
280
281    #[must_use]
282    const fn merge(self, other: Self) -> Self {
283        Self {
284            sql_compiled_command_cache_hits: self
285                .sql_compiled_command_cache_hits
286                .saturating_add(other.sql_compiled_command_cache_hits),
287            sql_compiled_command_cache_misses: self
288                .sql_compiled_command_cache_misses
289                .saturating_add(other.sql_compiled_command_cache_misses),
290            shared_query_plan_cache_hits: self
291                .shared_query_plan_cache_hits
292                .saturating_add(other.shared_query_plan_cache_hits),
293            shared_query_plan_cache_misses: self
294                .shared_query_plan_cache_misses
295                .saturating_add(other.shared_query_plan_cache_misses),
296        }
297    }
298}
299
300#[derive(Clone, Copy, Debug, Eq, Hash, PartialEq)]
301enum SqlCompiledCommandSurface {
302    Query,
303    Update,
304}
305
306///
307/// SqlCompiledCommandCacheKey
308///
309/// SqlCompiledCommandCacheKey pins one compiled SQL artifact to the exact
310/// session-local semantic boundary that produced it.
311/// The key is intentionally conservative: surface kind, entity path, schema
312/// fingerprint, and raw SQL text must all match before execution can reuse a
313/// prior compile result.
314///
315
316#[derive(Clone, Debug, Eq, Hash, PartialEq)]
317pub(in crate::db) struct SqlCompiledCommandCacheKey {
318    cache_method_version: u8,
319    surface: SqlCompiledCommandSurface,
320    entity_path: &'static str,
321    schema_fingerprint: CommitSchemaFingerprint,
322    sql: String,
323}
324
325///
326/// SqlProjectionContract
327///
328/// SqlProjectionContract is the outward SQL projection contract
329/// derived from one shared lower prepared plan.
330/// SQL execution keeps this wrapper so statement shaping stays owner-local
331/// while all prepared-plan reuse lives entirely below the SQL boundary.
332///
333
334#[derive(Clone, Debug)]
335pub(in crate::db) struct SqlProjectionContract {
336    columns: Vec<String>,
337    fixed_scales: Vec<Option<u32>>,
338}
339
340impl SqlProjectionContract {
341    #[must_use]
342    pub(in crate::db) const fn new(columns: Vec<String>, fixed_scales: Vec<Option<u32>>) -> Self {
343        Self {
344            columns,
345            fixed_scales,
346        }
347    }
348
349    #[must_use]
350    pub(in crate::db) fn into_parts(self) -> (Vec<String>, Vec<Option<u32>>) {
351        (self.columns, self.fixed_scales)
352    }
353}
354
355impl SqlCompiledCommandCacheKey {
356    fn for_entity<E>(surface: SqlCompiledCommandSurface, sql: &str) -> Self
357    where
358        E: PersistedRow + EntityValue,
359    {
360        Self {
361            cache_method_version: SQL_COMPILED_COMMAND_CACHE_METHOD_VERSION,
362            surface,
363            entity_path: E::PATH,
364            schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
365            sql: sql.to_string(),
366        }
367    }
368
369    #[must_use]
370    pub(in crate::db) const fn schema_fingerprint(&self) -> CommitSchemaFingerprint {
371        self.schema_fingerprint
372    }
373}
374
375#[cfg(test)]
376impl SqlCompiledCommandCacheKey {
377    pub(in crate::db) fn query_for_entity_with_method_version<E>(
378        sql: &str,
379        cache_method_version: u8,
380    ) -> Self
381    where
382        E: PersistedRow + EntityValue,
383    {
384        Self::for_entity_with_method_version::<E>(
385            SqlCompiledCommandSurface::Query,
386            sql,
387            cache_method_version,
388        )
389    }
390
391    pub(in crate::db) fn update_for_entity_with_method_version<E>(
392        sql: &str,
393        cache_method_version: u8,
394    ) -> Self
395    where
396        E: PersistedRow + EntityValue,
397    {
398        Self::for_entity_with_method_version::<E>(
399            SqlCompiledCommandSurface::Update,
400            sql,
401            cache_method_version,
402        )
403    }
404
405    fn for_entity_with_method_version<E>(
406        surface: SqlCompiledCommandSurface,
407        sql: &str,
408        cache_method_version: u8,
409    ) -> Self
410    where
411        E: PersistedRow + EntityValue,
412    {
413        Self {
414            cache_method_version,
415            surface,
416            entity_path: E::PATH,
417            schema_fingerprint: commit_schema_fingerprint_for_entity::<E>(),
418            sql: sql.to_string(),
419        }
420    }
421}
422
423pub(in crate::db) type SqlCompiledCommandCache =
424    HashMap<SqlCompiledCommandCacheKey, CompiledSqlCommand>;
425
426thread_local! {
427    // Keep SQL-facing caches in canister-lifetime heap state keyed by the
428    // store registry identity so update calls can warm query-facing SQL reuse
429    // without leaking entries across unrelated registries in tests.
430    static SQL_COMPILED_COMMAND_CACHES: RefCell<HashMap<usize, SqlCompiledCommandCache>> =
431        RefCell::new(HashMap::default());
432}
433
434// Keep the compile artifact session-owned and generic-free so the SQL surface
435// can separate semantic compilation from execution without coupling the seam to
436// typed entity binding or executor scratch state.
437#[derive(Clone, Debug)]
438pub(in crate::db) enum CompiledSqlCommand {
439    Select {
440        query: Arc<StructuralQuery>,
441        compiled_cache_key: SqlCompiledCommandCacheKey,
442    },
443    Delete {
444        query: LoweredBaseQueryShape,
445        statement: SqlDeleteStatement,
446    },
447    GlobalAggregate {
448        command: Box<SqlGlobalAggregateCommandCore>,
449    },
450    Explain(LoweredSqlCommand),
451    Insert(SqlInsertStatement),
452    Update(SqlUpdateStatement),
453    DescribeEntity,
454    ShowIndexesEntity,
455    ShowColumnsEntity,
456    ShowEntities,
457}
458
459// Keep parsing as a module-owned helper instead of hanging a pure parser off
460// `DbSession` as a fake session method.
461#[cfg(test)]
462pub(in crate::db) fn parse_sql_statement(sql: &str) -> Result<SqlStatement, QueryError> {
463    parse_sql(sql).map_err(QueryError::from_sql_parse_error)
464}
465
466#[cfg(feature = "diagnostics")]
467#[expect(
468    clippy::missing_const_for_fn,
469    reason = "the wasm32 branch reads the runtime performance counter and cannot be const"
470)]
471fn read_sql_local_instruction_counter() -> u64 {
472    #[cfg(all(feature = "diagnostics", target_arch = "wasm32"))]
473    {
474        canic_cdk::api::performance_counter(1)
475    }
476
477    #[cfg(not(all(feature = "diagnostics", target_arch = "wasm32")))]
478    {
479        0
480    }
481}
482
483pub(in crate::db::session::sql) fn measure_sql_stage<T, E>(
484    run: impl FnOnce() -> Result<T, E>,
485) -> (u64, Result<T, E>) {
486    #[cfg(feature = "diagnostics")]
487    let start = read_sql_local_instruction_counter();
488
489    let result = run();
490
491    #[cfg(feature = "diagnostics")]
492    let delta = read_sql_local_instruction_counter().saturating_sub(start);
493
494    #[cfg(not(feature = "diagnostics"))]
495    let delta = 0;
496
497    (delta, result)
498}
499
500impl<C: CanisterKind> DbSession<C> {
501    fn with_sql_compiled_command_cache<R>(
502        &self,
503        f: impl FnOnce(&mut SqlCompiledCommandCache) -> R,
504    ) -> R {
505        let scope_id = self.db.cache_scope_id();
506
507        SQL_COMPILED_COMMAND_CACHES.with(|caches| {
508            let mut caches = caches.borrow_mut();
509            let cache = caches.entry(scope_id).or_default();
510
511            f(cache)
512        })
513    }
514
515    #[cfg(test)]
516    pub(in crate::db) fn sql_compiled_command_cache_len(&self) -> usize {
517        self.with_sql_compiled_command_cache(|cache| cache.len())
518    }
519
520    #[cfg(test)]
521    pub(in crate::db) fn clear_sql_caches_for_tests(&self) {
522        self.with_sql_compiled_command_cache(SqlCompiledCommandCache::clear);
523    }
524
525    // Compile one parsed SQL statement into the generic-free session-owned
526    // semantic command artifact for one resolved authority.
527    #[expect(clippy::too_many_lines)]
528    fn compile_sql_statement_for_authority(
529        statement: &SqlStatement,
530        authority: EntityAuthority,
531        compiled_cache_key: SqlCompiledCommandCacheKey,
532    ) -> Result<(CompiledSqlCommand, u64, u64, u64, u64), QueryError> {
533        // Reuse one local preparation closure so the session compile surface
534        // reaches the prepared-statement owner directly without another
535        // single-purpose module hop.
536        let prepare_statement = || {
537            measure_sql_stage(|| {
538                prepare_sql_statement(statement.clone(), authority.model().name())
539                    .map_err(QueryError::from_sql_lowering_error)
540            })
541        };
542
543        // Keep metadata-only entity checks local to the compile lane because
544        // they are part of statement admission, not a separate boundary.
545        let validate_metadata_entity = |sql_entity: &str| {
546            if identifiers_tail_match(sql_entity, authority.model().name()) {
547                return Ok(());
548            }
549
550            Err(QueryError::from_sql_lowering_error(
551                SqlLoweringError::EntityMismatch {
552                    sql_entity: sql_entity.to_string(),
553                    expected_entity: authority.model().name(),
554                },
555            ))
556        };
557
558        match statement {
559            SqlStatement::Select(_) => {
560                let (prepare_local_instructions, prepared) = prepare_statement();
561                let prepared = prepared?;
562                let (aggregate_lane_check_local_instructions, requires_aggregate_lane) =
563                    measure_sql_stage(|| {
564                        Ok::<_, QueryError>(is_sql_global_aggregate_statement(prepared.statement()))
565                    });
566                let requires_aggregate_lane = requires_aggregate_lane?;
567
568                if requires_aggregate_lane {
569                    let (lower_local_instructions, command) = measure_sql_stage(|| {
570                        compile_sql_global_aggregate_command_core_from_prepared(
571                            prepared,
572                            authority.model(),
573                            MissingRowPolicy::Ignore,
574                        )
575                        .map_err(QueryError::from_sql_lowering_error)
576                    });
577                    let command = command?;
578
579                    Ok((
580                        CompiledSqlCommand::GlobalAggregate {
581                            command: Box::new(command),
582                        },
583                        aggregate_lane_check_local_instructions,
584                        prepare_local_instructions,
585                        lower_local_instructions,
586                        0,
587                    ))
588                } else {
589                    let (lower_local_instructions, lowered) = measure_sql_stage(|| {
590                        lower_sql_command_from_prepared_statement(prepared, authority.model()).map_err(
591                            |err| match err {
592                                SqlLoweringError::UnexpectedQueryLaneStatement => {
593                                    QueryError::invariant(
594                                        "query-lane SQL lowering reached a non query-compatible statement",
595                                    )
596                                }
597                                other => QueryError::from_sql_lowering_error(other),
598                            },
599                        )
600                    });
601                    let lowered = lowered?;
602                    let Some(LoweredSqlQuery::Select(select)) = lowered.into_query() else {
603                        return Err(QueryError::invariant(
604                            "compiled SQL SELECT lane must lower to lowered SQL SELECT",
605                        ));
606                    };
607                    let (bind_local_instructions, query) = measure_sql_stage(|| {
608                        bind_lowered_sql_select_query_structural(
609                            authority.model(),
610                            select,
611                            MissingRowPolicy::Ignore,
612                        )
613                        .map_err(QueryError::from_sql_lowering_error)
614                    });
615                    let query = query?;
616
617                    Ok((
618                        CompiledSqlCommand::Select {
619                            query: Arc::new(query),
620                            compiled_cache_key,
621                        },
622                        aggregate_lane_check_local_instructions,
623                        prepare_local_instructions,
624                        lower_local_instructions,
625                        bind_local_instructions,
626                    ))
627                }
628            }
629            SqlStatement::Delete(_) => {
630                let (prepare_local_instructions, prepared) = prepare_statement();
631                let prepared = prepared?;
632                let normalized_statement = prepared.clone().into_statement();
633                let (lower_local_instructions, lowered) = measure_sql_stage(|| {
634                    lower_sql_command_from_prepared_statement(prepared, authority.model())
635                        .map_err(QueryError::from_sql_lowering_error)
636                });
637                let lowered = lowered?;
638                let Some(LoweredSqlQuery::Delete(query)) = lowered.into_query() else {
639                    return Err(QueryError::invariant(
640                        "compiled SQL DELETE lane must lower to lowered SQL DELETE",
641                    ));
642                };
643                let SqlStatement::Delete(statement) = normalized_statement else {
644                    return Err(QueryError::invariant(
645                        "prepared SQL DELETE compilation must preserve DELETE statement ownership",
646                    ));
647                };
648
649                Ok((
650                    CompiledSqlCommand::Delete { query, statement },
651                    0,
652                    prepare_local_instructions,
653                    lower_local_instructions,
654                    0,
655                ))
656            }
657            SqlStatement::Insert(_) => {
658                let (prepare_local_instructions, prepared) = prepare_statement();
659                let prepared = prepared?;
660                let SqlStatement::Insert(statement) = prepared.into_statement() else {
661                    return Err(QueryError::invariant(
662                        "prepared SQL INSERT compilation must preserve INSERT statement ownership",
663                    ));
664                };
665
666                Ok((
667                    CompiledSqlCommand::Insert(statement),
668                    0,
669                    prepare_local_instructions,
670                    0,
671                    0,
672                ))
673            }
674            SqlStatement::Update(_) => {
675                let (prepare_local_instructions, prepared) = prepare_statement();
676                let prepared = prepared?;
677                let SqlStatement::Update(statement) = prepared.into_statement() else {
678                    return Err(QueryError::invariant(
679                        "prepared SQL UPDATE compilation must preserve UPDATE statement ownership",
680                    ));
681                };
682
683                Ok((
684                    CompiledSqlCommand::Update(statement),
685                    0,
686                    prepare_local_instructions,
687                    0,
688                    0,
689                ))
690            }
691            SqlStatement::Explain(_) => {
692                let (prepare_local_instructions, prepared) = prepare_statement();
693                let prepared = prepared?;
694                let (lower_local_instructions, lowered) = measure_sql_stage(|| {
695                    lower_sql_command_from_prepared_statement(prepared, authority.model())
696                        .map_err(QueryError::from_sql_lowering_error)
697                });
698                let lowered = lowered?;
699
700                Ok((
701                    CompiledSqlCommand::Explain(lowered),
702                    0,
703                    prepare_local_instructions,
704                    lower_local_instructions,
705                    0,
706                ))
707            }
708            SqlStatement::Describe(_) => {
709                let (prepare_local_instructions, validated) = measure_sql_stage(|| {
710                    let SqlStatement::Describe(statement) = statement else {
711                        return Err(QueryError::invariant(
712                            "compiled SQL DESCRIBE lane must preserve DESCRIBE statement ownership",
713                        ));
714                    };
715
716                    validate_metadata_entity(statement.entity.as_str())
717                });
718                validated?;
719
720                Ok((
721                    CompiledSqlCommand::DescribeEntity,
722                    0,
723                    prepare_local_instructions,
724                    0,
725                    0,
726                ))
727            }
728            SqlStatement::ShowIndexes(entity) => {
729                let (prepare_local_instructions, validated) =
730                    measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
731                validated?;
732
733                Ok((
734                    CompiledSqlCommand::ShowIndexesEntity,
735                    0,
736                    prepare_local_instructions,
737                    0,
738                    0,
739                ))
740            }
741            SqlStatement::ShowColumns(entity) => {
742                let (prepare_local_instructions, validated) =
743                    measure_sql_stage(|| validate_metadata_entity(entity.entity.as_str()));
744                validated?;
745
746                Ok((
747                    CompiledSqlCommand::ShowColumnsEntity,
748                    0,
749                    prepare_local_instructions,
750                    0,
751                    0,
752                ))
753            }
754            SqlStatement::ShowEntities(_) => Ok((CompiledSqlCommand::ShowEntities, 0, 0, 0, 0)),
755        }
756    }
757
758    // Resolve one SQL SELECT entirely through the shared lower query-plan
759    // cache and derive only the outward SQL projection contract locally.
760    fn sql_select_prepared_plan(
761        &self,
762        query: &StructuralQuery,
763        authority: EntityAuthority,
764        cache_schema_fingerprint: CommitSchemaFingerprint,
765    ) -> Result<
766        (
767            SharedPreparedExecutionPlan,
768            SqlProjectionContract,
769            SqlCacheAttribution,
770        ),
771        QueryError,
772    > {
773        let (prepared_plan, cache_attribution) = self.cached_shared_query_plan_for_authority(
774            authority,
775            cache_schema_fingerprint,
776            query,
777        )?;
778        let projection_spec = prepared_plan
779            .logical_plan()
780            .projection_spec(authority.model());
781        let projection = SqlProjectionContract::new(
782            projection_labels_from_projection_spec(&projection_spec),
783            projection_fixed_scales_from_projection_spec(&projection_spec),
784        );
785
786        Ok((
787            prepared_plan,
788            projection,
789            SqlCacheAttribution::from_shared_query_plan_cache(cache_attribution),
790        ))
791    }
792
793    // Keep query/update surface gating owned by one helper so the SQL
794    // compiled-command lane does not duplicate the same statement-family split
795    // just to change the outward error wording.
796    fn ensure_sql_statement_supported_for_surface(
797        statement: &SqlStatement,
798        surface: SqlCompiledCommandSurface,
799    ) -> Result<(), QueryError> {
800        match (surface, statement) {
801            (
802                SqlCompiledCommandSurface::Query,
803                SqlStatement::Select(_)
804                | SqlStatement::Explain(_)
805                | SqlStatement::Describe(_)
806                | SqlStatement::ShowIndexes(_)
807                | SqlStatement::ShowColumns(_)
808                | SqlStatement::ShowEntities(_),
809            )
810            | (
811                SqlCompiledCommandSurface::Update,
812                SqlStatement::Insert(_) | SqlStatement::Update(_) | SqlStatement::Delete(_),
813            ) => Ok(()),
814            (SqlCompiledCommandSurface::Query, SqlStatement::Insert(_)) => {
815                Err(QueryError::unsupported_query(
816                    "execute_sql_query rejects INSERT; use execute_sql_update::<E>()",
817                ))
818            }
819            (SqlCompiledCommandSurface::Query, SqlStatement::Update(_)) => {
820                Err(QueryError::unsupported_query(
821                    "execute_sql_query rejects UPDATE; use execute_sql_update::<E>()",
822                ))
823            }
824            (SqlCompiledCommandSurface::Query, SqlStatement::Delete(_)) => {
825                Err(QueryError::unsupported_query(
826                    "execute_sql_query rejects DELETE; use execute_sql_update::<E>()",
827                ))
828            }
829            (SqlCompiledCommandSurface::Update, SqlStatement::Select(_)) => {
830                Err(QueryError::unsupported_query(
831                    "execute_sql_update rejects SELECT; use execute_sql_query::<E>()",
832                ))
833            }
834            (SqlCompiledCommandSurface::Update, SqlStatement::Explain(_)) => {
835                Err(QueryError::unsupported_query(
836                    "execute_sql_update rejects EXPLAIN; use execute_sql_query::<E>()",
837                ))
838            }
839            (SqlCompiledCommandSurface::Update, SqlStatement::Describe(_)) => {
840                Err(QueryError::unsupported_query(
841                    "execute_sql_update rejects DESCRIBE; use execute_sql_query::<E>()",
842                ))
843            }
844            (SqlCompiledCommandSurface::Update, SqlStatement::ShowIndexes(_)) => {
845                Err(QueryError::unsupported_query(
846                    "execute_sql_update rejects SHOW INDEXES; use execute_sql_query::<E>()",
847                ))
848            }
849            (SqlCompiledCommandSurface::Update, SqlStatement::ShowColumns(_)) => {
850                Err(QueryError::unsupported_query(
851                    "execute_sql_update rejects SHOW COLUMNS; use execute_sql_query::<E>()",
852                ))
853            }
854            (SqlCompiledCommandSurface::Update, SqlStatement::ShowEntities(_)) => {
855                Err(QueryError::unsupported_query(
856                    "execute_sql_update rejects SHOW ENTITIES; use execute_sql_query::<E>()",
857                ))
858            }
859        }
860    }
861
862    /// Execute one single-entity reduced SQL query or introspection statement.
863    ///
864    /// This surface stays hard-bound to `E`, rejects state-changing SQL, and
865    /// returns SQL-shaped statement output instead of typed entities.
866    pub fn execute_sql_query<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
867    where
868        E: PersistedRow<Canister = C> + EntityValue,
869    {
870        let compiled = self.compile_sql_query::<E>(sql)?;
871
872        self.execute_compiled_sql::<E>(&compiled)
873    }
874
875    /// Execute one reduced SQL query while reporting the compile/execute split
876    /// at the top-level SQL seam.
877    #[cfg(feature = "diagnostics")]
878    #[doc(hidden)]
879    pub fn execute_sql_query_with_attribution<E>(
880        &self,
881        sql: &str,
882    ) -> Result<(SqlStatementResult, SqlQueryExecutionAttribution), QueryError>
883    where
884        E: PersistedRow<Canister = C> + EntityValue,
885    {
886        // Phase 1: measure the compile side of the new seam, including parse,
887        // surface validation, and semantic command construction.
888        let (compile_local_instructions, compiled) =
889            measure_sql_stage(|| self.compile_sql_query_with_cache_attribution::<E>(sql));
890        let (compiled, compile_cache_attribution, compile_phase_attribution) = compiled?;
891
892        // Phase 2: measure the execute side separately so repeat-run cache
893        // experiments can prove which side actually moved.
894        let store_get_calls_before = DataStore::current_get_call_count();
895        let pure_covering_decode_before = current_pure_covering_decode_local_instructions();
896        let pure_covering_row_assembly_before =
897            current_pure_covering_row_assembly_local_instructions();
898        let (result, execute_cache_attribution, execute_phase_attribution) =
899            self.execute_compiled_sql_with_phase_attribution::<E>(&compiled)?;
900        let store_get_calls =
901            DataStore::current_get_call_count().saturating_sub(store_get_calls_before);
902        let pure_covering_decode_local_instructions =
903            current_pure_covering_decode_local_instructions()
904                .saturating_sub(pure_covering_decode_before);
905        let pure_covering_row_assembly_local_instructions =
906            current_pure_covering_row_assembly_local_instructions()
907                .saturating_sub(pure_covering_row_assembly_before);
908        let execute_local_instructions = execute_phase_attribution
909            .planner_local_instructions
910            .saturating_add(execute_phase_attribution.store_local_instructions)
911            .saturating_add(execute_phase_attribution.executor_local_instructions);
912        let cache_attribution = compile_cache_attribution.merge(execute_cache_attribution);
913        let total_local_instructions =
914            compile_local_instructions.saturating_add(execute_local_instructions);
915
916        Ok((
917            result,
918            SqlQueryExecutionAttribution {
919                compile_local_instructions,
920                compile_cache_key_local_instructions: compile_phase_attribution.cache_key,
921                compile_cache_lookup_local_instructions: compile_phase_attribution.cache_lookup,
922                compile_parse_local_instructions: compile_phase_attribution.parse,
923                compile_parse_tokenize_local_instructions: compile_phase_attribution.parse_tokenize,
924                compile_parse_select_local_instructions: compile_phase_attribution.parse_select,
925                compile_parse_expr_local_instructions: compile_phase_attribution.parse_expr,
926                compile_parse_predicate_local_instructions: compile_phase_attribution
927                    .parse_predicate,
928                compile_aggregate_lane_check_local_instructions: compile_phase_attribution
929                    .aggregate_lane_check,
930                compile_prepare_local_instructions: compile_phase_attribution.prepare,
931                compile_lower_local_instructions: compile_phase_attribution.lower,
932                compile_bind_local_instructions: compile_phase_attribution.bind,
933                compile_cache_insert_local_instructions: compile_phase_attribution.cache_insert,
934                planner_local_instructions: execute_phase_attribution.planner_local_instructions,
935                store_local_instructions: execute_phase_attribution.store_local_instructions,
936                executor_local_instructions: execute_phase_attribution.executor_local_instructions,
937                grouped_stream_local_instructions: execute_phase_attribution
938                    .grouped_stream_local_instructions,
939                grouped_fold_local_instructions: execute_phase_attribution
940                    .grouped_fold_local_instructions,
941                grouped_finalize_local_instructions: execute_phase_attribution
942                    .grouped_finalize_local_instructions,
943                grouped_count_borrowed_hash_computations: execute_phase_attribution
944                    .grouped_count
945                    .borrowed_hash_computations,
946                grouped_count_bucket_candidate_checks: execute_phase_attribution
947                    .grouped_count
948                    .bucket_candidate_checks,
949                grouped_count_existing_group_hits: execute_phase_attribution
950                    .grouped_count
951                    .existing_group_hits,
952                grouped_count_new_group_inserts: execute_phase_attribution
953                    .grouped_count
954                    .new_group_inserts,
955                grouped_count_row_materialization_local_instructions: execute_phase_attribution
956                    .grouped_count
957                    .row_materialization_local_instructions,
958                grouped_count_group_lookup_local_instructions: execute_phase_attribution
959                    .grouped_count
960                    .group_lookup_local_instructions,
961                grouped_count_existing_group_update_local_instructions: execute_phase_attribution
962                    .grouped_count
963                    .existing_group_update_local_instructions,
964                grouped_count_new_group_insert_local_instructions: execute_phase_attribution
965                    .grouped_count
966                    .new_group_insert_local_instructions,
967                pure_covering_decode_local_instructions,
968                pure_covering_row_assembly_local_instructions,
969                store_get_calls,
970                response_decode_local_instructions: 0,
971                execute_local_instructions,
972                total_local_instructions,
973                sql_compiled_command_cache_hits: cache_attribution.sql_compiled_command_cache_hits,
974                sql_compiled_command_cache_misses: cache_attribution
975                    .sql_compiled_command_cache_misses,
976                shared_query_plan_cache_hits: cache_attribution.shared_query_plan_cache_hits,
977                shared_query_plan_cache_misses: cache_attribution.shared_query_plan_cache_misses,
978            },
979        ))
980    }
981
982    /// Execute one single-entity reduced SQL mutation statement.
983    ///
984    /// This surface stays hard-bound to `E`, rejects read-only SQL, and
985    /// returns SQL-shaped mutation output such as counts or `RETURNING` rows.
986    pub fn execute_sql_update<E>(&self, sql: &str) -> Result<SqlStatementResult, QueryError>
987    where
988        E: PersistedRow<Canister = C> + EntityValue,
989    {
990        let compiled = self.compile_sql_update::<E>(sql)?;
991
992        self.execute_compiled_sql::<E>(&compiled)
993    }
994
995    // Compile one SQL query-surface string into the session-owned generic-free
996    // semantic command artifact before execution.
997    pub(in crate::db) fn compile_sql_query<E>(
998        &self,
999        sql: &str,
1000    ) -> Result<CompiledSqlCommand, QueryError>
1001    where
1002        E: PersistedRow<Canister = C> + EntityValue,
1003    {
1004        self.compile_sql_query_with_cache_attribution::<E>(sql)
1005            .map(|(compiled, _, _)| compiled)
1006    }
1007
1008    fn compile_sql_query_with_cache_attribution<E>(
1009        &self,
1010        sql: &str,
1011    ) -> Result<
1012        (
1013            CompiledSqlCommand,
1014            SqlCacheAttribution,
1015            SqlCompilePhaseAttribution,
1016        ),
1017        QueryError,
1018    >
1019    where
1020        E: PersistedRow<Canister = C> + EntityValue,
1021    {
1022        self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Query)
1023    }
1024
1025    // Compile one SQL update-surface string into the session-owned generic-free
1026    // semantic command artifact before execution.
1027    pub(in crate::db) fn compile_sql_update<E>(
1028        &self,
1029        sql: &str,
1030    ) -> Result<CompiledSqlCommand, QueryError>
1031    where
1032        E: PersistedRow<Canister = C> + EntityValue,
1033    {
1034        self.compile_sql_update_with_cache_attribution::<E>(sql)
1035            .map(|(compiled, _, _)| compiled)
1036    }
1037
1038    fn compile_sql_update_with_cache_attribution<E>(
1039        &self,
1040        sql: &str,
1041    ) -> Result<
1042        (
1043            CompiledSqlCommand,
1044            SqlCacheAttribution,
1045            SqlCompilePhaseAttribution,
1046        ),
1047        QueryError,
1048    >
1049    where
1050        E: PersistedRow<Canister = C> + EntityValue,
1051    {
1052        self.compile_sql_surface_with_cache_attribution::<E>(sql, SqlCompiledCommandSurface::Update)
1053    }
1054
1055    // Reuse one internal compile shell for both outward SQL surfaces so query
1056    // and update no longer duplicate cache-key construction and surface
1057    // validation plumbing before they reach the real compile/cache owner.
1058    fn compile_sql_surface_with_cache_attribution<E>(
1059        &self,
1060        sql: &str,
1061        surface: SqlCompiledCommandSurface,
1062    ) -> Result<
1063        (
1064            CompiledSqlCommand,
1065            SqlCacheAttribution,
1066            SqlCompilePhaseAttribution,
1067        ),
1068        QueryError,
1069    >
1070    where
1071        E: PersistedRow<Canister = C> + EntityValue,
1072    {
1073        let (cache_key_local_instructions, cache_key) = measure_sql_stage(|| {
1074            Ok::<_, QueryError>(SqlCompiledCommandCacheKey::for_entity::<E>(surface, sql))
1075        });
1076        let cache_key = cache_key?;
1077
1078        self.compile_sql_statement_with_cache::<E, _>(
1079            cache_key,
1080            cache_key_local_instructions,
1081            sql,
1082            |statement| Self::ensure_sql_statement_supported_for_surface(statement, surface),
1083        )
1084    }
1085
1086    // Reuse one previously compiled SQL artifact when the session-local cache
1087    // can prove the surface, entity contract, and raw SQL text all match.
1088    fn compile_sql_statement_with_cache<E, F>(
1089        &self,
1090        cache_key: SqlCompiledCommandCacheKey,
1091        cache_key_local_instructions: u64,
1092        sql: &str,
1093        ensure_surface_supported: F,
1094    ) -> Result<
1095        (
1096            CompiledSqlCommand,
1097            SqlCacheAttribution,
1098            SqlCompilePhaseAttribution,
1099        ),
1100        QueryError,
1101    >
1102    where
1103        E: PersistedRow<Canister = C> + EntityValue,
1104        F: FnOnce(&SqlStatement) -> Result<(), QueryError>,
1105    {
1106        let (cache_lookup_local_instructions, cached) = measure_sql_stage(|| {
1107            let cached =
1108                self.with_sql_compiled_command_cache(|cache| cache.get(&cache_key).cloned());
1109            Ok::<_, QueryError>(cached)
1110        });
1111        let cached = cached?;
1112        if let Some(compiled) = cached {
1113            return Ok((
1114                compiled,
1115                SqlCacheAttribution::sql_compiled_command_cache_hit(),
1116                SqlCompilePhaseAttribution::cache_hit(
1117                    cache_key_local_instructions,
1118                    cache_lookup_local_instructions,
1119                ),
1120            ));
1121        }
1122
1123        let (parse_local_instructions, parsed) = measure_sql_stage(|| {
1124            parse_sql_with_attribution(sql).map_err(QueryError::from_sql_parse_error)
1125        });
1126        let (parsed, parse_attribution) = parsed?;
1127        let parse_select_local_instructions = parse_local_instructions
1128            .saturating_sub(parse_attribution.tokenize)
1129            .saturating_sub(parse_attribution.expr)
1130            .saturating_sub(parse_attribution.predicate);
1131        ensure_surface_supported(&parsed)?;
1132        let authority = EntityAuthority::for_type::<E>();
1133        let (
1134            compiled,
1135            aggregate_lane_check_local_instructions,
1136            prepare_local_instructions,
1137            lower_local_instructions,
1138            bind_local_instructions,
1139        ) = Self::compile_sql_statement_for_authority(&parsed, authority, cache_key.clone())?;
1140
1141        let (cache_insert_local_instructions, cache_insert) = measure_sql_stage(|| {
1142            self.with_sql_compiled_command_cache(|cache| {
1143                cache.insert(cache_key, compiled.clone());
1144            });
1145            Ok::<_, QueryError>(())
1146        });
1147        cache_insert?;
1148
1149        Ok((
1150            compiled,
1151            SqlCacheAttribution::sql_compiled_command_cache_miss(),
1152            SqlCompilePhaseAttribution {
1153                cache_key: cache_key_local_instructions,
1154                cache_lookup: cache_lookup_local_instructions,
1155                parse: parse_local_instructions,
1156                parse_tokenize: parse_attribution.tokenize,
1157                parse_select: parse_select_local_instructions,
1158                parse_expr: parse_attribution.expr,
1159                parse_predicate: parse_attribution.predicate,
1160                aggregate_lane_check: aggregate_lane_check_local_instructions,
1161                prepare: prepare_local_instructions,
1162                lower: lower_local_instructions,
1163                bind: bind_local_instructions,
1164                cache_insert: cache_insert_local_instructions,
1165            },
1166        ))
1167    }
1168}