Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17#[cfg(feature = "sql")]
18use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
19use crate::{
20    db::{
21        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
22        FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
23        StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
24        commit::CommitSchemaFingerprint,
25        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
26        schema::{
27            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
28            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
29            accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
30            describe_entity_fields, describe_entity_fields_with_persisted_schema,
31            describe_entity_model, describe_entity_model_with_persisted_schema,
32            ensure_accepted_schema_snapshot, show_indexes_for_model,
33            show_indexes_for_model_with_runtime_state,
34            show_indexes_for_schema_info_with_runtime_state,
35        },
36    },
37    error::InternalError,
38    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
39    model::entity::EntityModel,
40    traits::{CanisterKind, EntityKind, EntityValue, Path},
41    value::Value,
42};
43use std::{
44    cell::{OnceCell, RefCell},
45    collections::HashMap,
46    thread::LocalKey,
47};
48
49#[cfg(feature = "diagnostics")]
50pub use query::{
51    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
52    QueryExecutionAttribution,
53};
54pub(in crate::db) use response::finalize_scalar_paged_execution;
55pub(in crate::db) use response::finalize_structural_grouped_projection_result;
56#[cfg(feature = "sql")]
57pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
58#[cfg(all(feature = "sql", feature = "diagnostics"))]
59pub use sql::{
60    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
61    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
62};
63#[cfg(feature = "sql")]
64pub use sql::{
65    SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport, SqlStatementResult,
66    SqlStatementSurface, sql_statement_entity_name, sql_statement_surface,
67};
68#[cfg(all(feature = "sql", feature = "diagnostics"))]
69pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
70
71///
72/// DbSession
73///
74/// Session-scoped database handle with policy (debug, metrics) and execution routing.
75///
76
77pub struct DbSession<C: CanisterKind> {
78    db: Db<C>,
79    debug: bool,
80    metrics: Option<&'static dyn MetricsSink>,
81}
82
83#[cfg(test)]
84#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
85pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
86    pub schema_info_projections: u64,
87    pub persisted_schema_decodes: u64,
88    pub generated_compatible_row_layout_proofs: u64,
89    pub latest_by_entity_calls: u64,
90    pub visible_index_projections: u64,
91}
92
93#[derive(Clone, Debug)]
94struct AcceptedSchemaQueryCacheEntry {
95    snapshot: AcceptedSchemaSnapshot,
96    identity: AcceptedCatalogIdentity,
97}
98
99pub(in crate::db) type AcceptedSaveContract = (
100    AcceptedRowDecodeContract,
101    AcceptedRowDecodeContract,
102    SchemaInfo,
103    CommitSchemaFingerprint,
104);
105
106#[derive(Clone, Debug)]
107pub(in crate::db) struct AcceptedSchemaCatalogContext {
108    snapshot: AcceptedSchemaSnapshot,
109    identity: AcceptedCatalogIdentity,
110    schema_info: OnceCell<SchemaInfo>,
111}
112
113impl AcceptedSchemaCatalogContext {
114    #[must_use]
115    pub(in crate::db) const fn new(
116        snapshot: AcceptedSchemaSnapshot,
117        identity: AcceptedCatalogIdentity,
118    ) -> Self {
119        Self {
120            snapshot,
121            identity,
122            schema_info: OnceCell::new(),
123        }
124    }
125
126    #[must_use]
127    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
128        &self.snapshot
129    }
130
131    #[must_use]
132    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
133        self.identity.accepted_schema_version()
134    }
135
136    #[must_use]
137    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
138        self.identity.accepted_schema_fingerprint()
139    }
140
141    #[must_use]
142    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
143        self.identity.fingerprint_method_version()
144    }
145
146    #[must_use]
147    #[cfg(feature = "sql")]
148    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
149        self.identity
150    }
151
152    fn debug_assert_matches_entity<E>(&self)
153    where
154        E: EntityKind,
155    {
156        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
157        debug_assert_eq!(self.identity.entity_path(), E::PATH);
158        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
159    }
160
161    pub(in crate::db) fn accepted_entity_authority_for<E>(
162        &self,
163    ) -> Result<EntityAuthority, InternalError>
164    where
165        E: EntityKind,
166    {
167        self.debug_assert_matches_entity::<E>();
168        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
169        let (accepted_row_layout, row_proof) =
170            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
171                &self.snapshot,
172                authority.model(),
173            )?;
174        let row_decode_contract = accepted_row_layout.row_decode_contract();
175
176        Ok(authority.with_accepted_row_decode_contract(
177            row_proof,
178            row_decode_contract,
179            self.accepted_schema_info_for::<E>(),
180        ))
181    }
182
183    #[must_use]
184    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
185    where
186        E: EntityKind,
187    {
188        self.debug_assert_matches_entity::<E>();
189        self.schema_info
190            .get_or_init(|| {
191                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
192                    E::MODEL,
193                    &self.snapshot,
194                    true,
195                )
196            })
197            .clone()
198    }
199}
200
201pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
202    accepted_schema: &AcceptedSchemaSnapshot,
203    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
204) -> Result<AcceptedSaveContract, InternalError>
205where
206    E: EntityKind,
207{
208    let row_decode_contract = descriptor.row_decode_contract();
209    let mutation_row_decode_contract = row_decode_contract.clone();
210    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
211    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
212
213    Ok((
214        row_decode_contract,
215        mutation_row_decode_contract,
216        schema_info,
217        schema_fingerprint,
218    ))
219}
220
221thread_local! {
222    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
223    // but repeated read calls should not reload the stable schema snapshot just
224    // to prove an already-warmed cache key. SQL DDL publication invalidates this
225    // heap cache before the next query observes the new accepted schema.
226    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
227        RefCell::new(HashMap::default());
228}
229
230impl<C: CanisterKind> DbSession<C> {
231    /// Construct one session facade for a database handle.
232    #[must_use]
233    pub(crate) const fn new(db: Db<C>) -> Self {
234        Self {
235            db,
236            debug: false,
237            metrics: None,
238        }
239    }
240
241    /// Construct one session facade from store registry and runtime hooks.
242    #[must_use]
243    pub const fn new_with_hooks(
244        store: &'static LocalKey<StoreRegistry>,
245        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
246    ) -> Self {
247        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
248    }
249
250    #[cfg(test)]
251    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
252        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
253        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
254        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
255        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
256        query::reset_visible_index_projection_count_for_tests();
257    }
258
259    #[cfg(test)]
260    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
261    -> AcceptedCatalogRuntimeCounterSnapshot {
262        AcceptedCatalogRuntimeCounterSnapshot {
263            schema_info_projections:
264                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
265            persisted_schema_decodes:
266                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
267            generated_compatible_row_layout_proofs:
268                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
269            latest_by_entity_calls:
270                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
271            visible_index_projections: query::visible_index_projection_count_for_tests(),
272        }
273    }
274
275    /// Enable debug execution behavior where supported by executors.
276    #[must_use]
277    pub const fn debug(mut self) -> Self {
278        self.debug = true;
279        self
280    }
281
282    /// Attach one metrics sink for all session-executed operations.
283    #[must_use]
284    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
285        self.metrics = Some(sink);
286        self
287    }
288
289    // Shared fluent load wrapper construction keeps the session boundary in
290    // one place when load entry points differ only by missing-row policy.
291    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
292    where
293        E: EntityKind<Canister = C>,
294    {
295        FluentLoadQuery::new(self, Query::new(consistency))
296    }
297
298    // Shared fluent delete wrapper construction keeps the delete-mode handoff
299    // explicit at the session boundary instead of reassembling the same query
300    // shell in each public entry point.
301    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
302    where
303        E: PersistedRow<Canister = C>,
304    {
305        FluentDeleteQuery::new(self, Query::new(consistency).delete())
306    }
307
308    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
309        if let Some(sink) = self.metrics {
310            with_metrics_sink(sink, f)
311        } else {
312            f()
313        }
314    }
315
316    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
317    fn execute_save_with<E, T, R>(
318        &self,
319        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
320        map: impl FnOnce(T) -> R,
321    ) -> Result<R, InternalError>
322    where
323        E: PersistedRow<Canister = C> + EntityValue,
324    {
325        let (contract, schema_info, schema_fingerprint) = match self
326            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
327        {
328            Ok(authority) => authority,
329            Err(error) => {
330                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
331
332                return Err(error);
333            }
334        };
335        let value = self.with_metrics(|| {
336            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
337        })?;
338
339        Ok(map(value))
340    }
341
342    // Execute save work after the caller has already proven that the accepted
343    // row contract is generated-compatible. SQL and structural writes use this
344    // after their pre-staging schema guard so mutation staging and save
345    // execution do not rerun schema-store reconciliation in the same statement.
346    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
347        &self,
348        accepted_row_decode_contract: AcceptedRowDecodeContract,
349        accepted_schema_info: SchemaInfo,
350        accepted_schema_fingerprint: CommitSchemaFingerprint,
351        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
352        map: impl FnOnce(T) -> R,
353    ) -> Result<R, InternalError>
354    where
355        E: PersistedRow<Canister = C> + EntityValue,
356    {
357        let value = self.with_metrics(|| {
358            op(self.save_executor::<E>(
359                accepted_row_decode_contract,
360                accepted_schema_info,
361                accepted_schema_fingerprint,
362            ))
363        })?;
364
365        Ok(map(value))
366    }
367
368    // Shared save-facade wrappers keep response shape explicit at call sites.
369    fn execute_save_entity<E>(
370        &self,
371        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
372    ) -> Result<E, InternalError>
373    where
374        E: PersistedRow<Canister = C> + EntityValue,
375    {
376        self.execute_save_with(op, std::convert::identity)
377    }
378
379    fn execute_save_batch<E>(
380        &self,
381        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
382    ) -> Result<WriteBatchResponse<E>, InternalError>
383    where
384        E: PersistedRow<Canister = C> + EntityValue,
385    {
386        self.execute_save_with(op, WriteBatchResponse::new)
387    }
388
389    // ---------------------------------------------------------------------
390    // Query entry points (public, fluent)
391    // ---------------------------------------------------------------------
392
393    /// Start a fluent load query with default missing-row policy (`Ignore`).
394    #[must_use]
395    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
396    where
397        E: EntityKind<Canister = C>,
398    {
399        self.fluent_load_query(MissingRowPolicy::Ignore)
400    }
401
402    /// Start a fluent load query with explicit missing-row policy.
403    #[must_use]
404    pub const fn load_with_consistency<E>(
405        &self,
406        consistency: MissingRowPolicy,
407    ) -> FluentLoadQuery<'_, E>
408    where
409        E: EntityKind<Canister = C>,
410    {
411        self.fluent_load_query(consistency)
412    }
413
414    /// Start a fluent delete query with default missing-row policy (`Ignore`).
415    #[must_use]
416    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
417    where
418        E: PersistedRow<Canister = C>,
419    {
420        self.fluent_delete_query(MissingRowPolicy::Ignore)
421    }
422
423    /// Start a fluent delete query with explicit missing-row policy.
424    #[must_use]
425    pub fn delete_with_consistency<E>(
426        &self,
427        consistency: MissingRowPolicy,
428    ) -> FluentDeleteQuery<'_, E>
429    where
430        E: PersistedRow<Canister = C>,
431    {
432        self.fluent_delete_query(consistency)
433    }
434
435    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
436    ///
437    /// This terminal bypasses query planning and access routing entirely.
438    #[must_use]
439    pub const fn select_one(&self) -> Value {
440        Value::Int64(1)
441    }
442
443    /// Return one stable, human-readable index listing for the entity schema.
444    ///
445    /// Output format mirrors SQL-style introspection:
446    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
447    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
448    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
449    #[must_use]
450    pub fn show_indexes<E>(&self) -> Vec<String>
451    where
452        E: EntityKind<Canister = C>,
453    {
454        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
455    }
456
457    /// Return one stable, human-readable index listing for one schema model.
458    ///
459    /// This model-only helper is schema-owned and intentionally does not
460    /// attach runtime lifecycle state because it does not carry store
461    /// placement context.
462    #[must_use]
463    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
464        show_indexes_for_model(model)
465    }
466
467    /// Return one stable, human-readable index listing for the accepted schema.
468    ///
469    /// Unlike `show_indexes`, this fallible live-schema helper reflects
470    /// accepted DDL-created indexes as well as compiled schema indexes.
471    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
472    where
473        E: EntityKind<Canister = C>,
474    {
475        let schema = self.accepted_schema_info_for_entity::<E>()?;
476
477        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
478    }
479
480    // Return one stable, human-readable index listing for one resolved
481    // store/model pair, attaching the current runtime lifecycle state when the
482    // registry can resolve the backing store handle.
483    pub(in crate::db) fn show_indexes_for_store_model(
484        &self,
485        store_path: &str,
486        model: &'static EntityModel,
487    ) -> Vec<String> {
488        let runtime_state = self
489            .db
490            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
491            .map(|store| store.index_state());
492
493        show_indexes_for_model_with_runtime_state(model, runtime_state)
494    }
495
496    // Return one stable, human-readable index listing for one resolved
497    // store/accepted-schema pair, attaching the current runtime lifecycle state
498    // when the registry can resolve the backing store handle.
499    pub(in crate::db) fn show_indexes_for_store_schema_info(
500        &self,
501        store_path: &str,
502        schema: &SchemaInfo,
503    ) -> Vec<String> {
504        let runtime_state = self
505            .db
506            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
507            .map(|store| store.index_state());
508
509        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
510    }
511
512    /// Return one stable generated-model list of field descriptors.
513    ///
514    /// This infallible Rust metadata helper intentionally reports the compiled
515    /// schema model. Use `try_show_columns` for the accepted persisted-schema
516    /// view used by SQL and diagnostics surfaces.
517    #[must_use]
518    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
519    where
520        E: EntityKind<Canister = C>,
521    {
522        self.show_columns_for_model(E::MODEL)
523    }
524
525    /// Return one stable generated-model list of field descriptors.
526    #[must_use]
527    pub fn show_columns_for_model(
528        &self,
529        model: &'static EntityModel,
530    ) -> Vec<EntityFieldDescription> {
531        describe_entity_fields(model)
532    }
533
534    /// Return field descriptors using the accepted persisted schema snapshot.
535    ///
536    /// This fallible variant is intended for SQL and diagnostics surfaces that
537    /// can report schema reconciliation failures. The infallible
538    /// `show_columns` helper remains generated-model based.
539    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
540    where
541        E: EntityKind<Canister = C>,
542    {
543        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
544
545        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
546    }
547
548    /// Return one stable list of runtime-registered entity catalog entries.
549    #[must_use]
550    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
551        self.try_show_entities()
552            .expect("SHOW ENTITIES metadata requires accepted schema authority")
553    }
554
555    /// Return one stable list of runtime-registered entity catalog entries.
556    pub fn try_show_entities(
557        &self,
558    ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
559        self.db.runtime_entity_catalog()
560    }
561
562    /// Return one stable list of runtime-registered stores.
563    #[must_use]
564    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
565        self.db.runtime_store_catalog()
566    }
567
568    /// Return one stable list of runtime-registered stable-memory allocations.
569    #[must_use]
570    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
571        self.db.runtime_memory_catalog()
572    }
573
574    // Resolve the exact secondary-index set that is visible to planner-owned
575    // query planning for one recovered store and accepted schema pair.
576    #[cfg(feature = "sql")]
577    fn visible_indexes_for_store_accepted_schema(
578        &self,
579        store_path: &str,
580        schema_info: &SchemaInfo,
581    ) -> Result<VisibleIndexes<'static>, QueryError> {
582        // Phase 1: resolve the recovered store state once at the session
583        // boundary so query/executor planning does not reopen lifecycle checks.
584        let store = self
585            .db
586            .recovered_store(store_path)
587            .map_err(QueryError::execute)?;
588        let state = store.index_state();
589        if state != IndexState::Ready {
590            return Ok(VisibleIndexes::none());
591        }
592        debug_assert_eq!(state, IndexState::Ready);
593
594        // Phase 2: planner-visible indexes are accepted schema contracts once
595        // the recovered store is query-visible.
596        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
597        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
598        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
599        debug_assert_eq!(
600            visible_indexes.accepted_expression_index_count(),
601            Some(visible_indexes.accepted_expression_indexes().len()),
602        );
603
604        Ok(visible_indexes)
605    }
606
607    /// Return one generated-model schema description for the entity.
608    ///
609    /// This is a typed `DESCRIBE`-style introspection surface consumed by
610    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
611    /// schema view is required.
612    #[must_use]
613    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
614    where
615        E: EntityKind<Canister = C>,
616    {
617        self.describe_entity_model(E::MODEL)
618    }
619
620    /// Return one generated-model schema description for one schema model.
621    #[must_use]
622    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
623        describe_entity_model(model)
624    }
625
626    /// Return a schema description using the accepted persisted schema snapshot.
627    ///
628    /// This is the live-schema counterpart to `describe_entity`. It is fallible
629    /// because loading accepted schema authority can fail if startup
630    /// reconciliation rejects the stored metadata.
631    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
632    where
633        E: EntityKind<Canister = C>,
634    {
635        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
636
637        Ok(describe_entity_model_with_persisted_schema(
638            E::MODEL,
639            &snapshot,
640        ))
641    }
642
643    // Ensure and return the accepted schema snapshot for one generated entity.
644    // This may write the first snapshot for an empty store; otherwise it loads
645    // the latest stored snapshot and applies the current exact-match policy.
646    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
647    where
648        E: EntityKind<Canister = C>,
649    {
650        let store = self.db.recovered_store(E::Store::PATH)?;
651
652        store.with_schema_mut(|schema_store| {
653            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
654        })
655    }
656
657    // Load the current accepted schema snapshot for read/query paths without
658    // rerunning generated proposal reconciliation on every cold query call.
659    // Startup and write paths still own reconciliation; the fallback only keeps
660    // first-use test stores and freshly initialized local stores functional.
661    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
662        &self,
663    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
664    where
665        E: EntityKind<Canister = C>,
666    {
667        let cache_key = (self.db.cache_scope_id(), E::PATH);
668        if let Some(entry) =
669            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
670        {
671            return Ok(AcceptedSchemaCatalogContext::new(
672                entry.snapshot,
673                entry.identity,
674            ));
675        }
676
677        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
678        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
679        let identity = AcceptedCatalogIdentity::new(
680            E::ENTITY_TAG,
681            E::PATH,
682            E::Store::PATH,
683            snapshot.persisted_snapshot().version(),
684            fingerprint,
685        );
686        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
687            cache.borrow_mut().insert(
688                cache_key,
689                AcceptedSchemaQueryCacheEntry {
690                    snapshot: snapshot.clone(),
691                    identity,
692                },
693            );
694        });
695
696        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
697    }
698
699    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
700        &self,
701    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
702    where
703        E: EntityKind<Canister = C>,
704    {
705        let store = self.db.recovered_store(E::Store::PATH)?;
706
707        store.with_schema_mut(|schema_store| {
708            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
709        })
710    }
711
712    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
713        &self,
714        selection: &AcceptedCatalogSnapshotSelection,
715    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
716    where
717        E: EntityKind<Canister = C>,
718    {
719        let snapshot = selection.decode_verified()?;
720        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
721            return Ok(None);
722        }
723        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
724        let cache_key = (self.db.cache_scope_id(), E::PATH);
725
726        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
727            cache.borrow_mut().insert(
728                cache_key,
729                AcceptedSchemaQueryCacheEntry {
730                    snapshot,
731                    identity: selection.identity(),
732                },
733            );
734        });
735
736        Ok(Some(context))
737    }
738
739    #[cfg(feature = "sql")]
740    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
741    where
742        E: EntityKind<Canister = C>,
743    {
744        let cache_key = (self.db.cache_scope_id(), E::PATH);
745        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
746            cache.borrow_mut().remove(&cache_key);
747        });
748    }
749
750    fn load_accepted_schema_snapshot_for_query<E>(
751        &self,
752    ) -> Result<AcceptedSchemaSnapshot, InternalError>
753    where
754        E: EntityKind<Canister = C>,
755    {
756        let store = self.db.recovered_store(E::Store::PATH)?;
757
758        store.with_schema_mut(|schema_store| {
759            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
760                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
761                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
762                    &accepted,
763                    E::MODEL,
764                )
765                .is_ok()
766                {
767                    return Ok(accepted);
768                }
769            }
770
771            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
772        })
773    }
774
775    // Build the accepted schema-info projection for one typed entity. Fluent
776    // terminal adapters use this before constructing slot-bound descriptors so
777    // field slot authority comes from the accepted schema snapshot.
778    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
779        &self,
780    ) -> Result<SchemaInfo, InternalError>
781    where
782        E: EntityKind<Canister = C>,
783    {
784        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
785
786        Ok(catalog.accepted_schema_info_for::<E>())
787    }
788
789    // Derive typed executor authority from an accepted snapshot the caller
790    // already loaded, avoiding a second schema-store pass in SQL write/select
791    // adapters that need both write descriptors and read selector authority.
792    #[cfg(feature = "sql")]
793    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
794        accepted_schema: &AcceptedSchemaSnapshot,
795    ) -> Result<EntityAuthority, InternalError>
796    where
797        E: EntityKind<Canister = C>,
798    {
799        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
800    }
801
802    // Ensure accepted schema metadata is safe for write paths that still encode
803    // rows through generated field contracts. Returning only the snapshot keeps
804    // SQL write type checks unchanged while the schema-runtime contract guard
805    // rejects unsupported layout or payload drift before mutation staging.
806    fn ensure_generated_compatible_accepted_save_schema<E>(
807        &self,
808    ) -> Result<
809        (
810            AcceptedRowDecodeContract,
811            SchemaInfo,
812            CommitSchemaFingerprint,
813        ),
814        InternalError,
815    >
816    where
817        E: EntityKind<Canister = C>,
818    {
819        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
820        let (accepted_row_layout, _) =
821            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
822                &accepted_schema,
823                E::MODEL,
824            )?;
825        let (row_decode_contract, _, schema_info, schema_fingerprint) =
826            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
827
828        Ok((row_decode_contract, schema_info, schema_fingerprint))
829    }
830
831    /// Build one point-in-time storage report for observability endpoints.
832    pub fn storage_report(
833        &self,
834        name_to_path: &[(&'static str, &'static str)],
835    ) -> Result<StorageReport, InternalError> {
836        self.db.storage_report(name_to_path)
837    }
838
839    /// Build one point-in-time storage report using default entity-path labels.
840    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
841        self.db.storage_report_default()
842    }
843
844    /// Build one point-in-time integrity scan report for observability endpoints.
845    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
846        self.db.integrity_report()
847    }
848
849    // ---------------------------------------------------------------------
850    // Low-level executors (crate-internal; execution primitives)
851    // ---------------------------------------------------------------------
852
853    #[must_use]
854    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
855    where
856        E: EntityKind<Canister = C> + EntityValue,
857    {
858        LoadExecutor::new(self.db, self.debug)
859    }
860
861    #[must_use]
862    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
863    where
864        E: PersistedRow<Canister = C> + EntityValue,
865    {
866        DeleteExecutor::new(self.db)
867    }
868
869    #[must_use]
870    pub(in crate::db) const fn save_executor<E>(
871        &self,
872        accepted_row_decode_contract: AcceptedRowDecodeContract,
873        accepted_schema_info: SchemaInfo,
874        accepted_schema_fingerprint: CommitSchemaFingerprint,
875    ) -> SaveExecutor<E>
876    where
877        E: PersistedRow<Canister = C> + EntityValue,
878    {
879        SaveExecutor::new_with_accepted_contract(
880            self.db,
881            self.debug,
882            accepted_row_decode_contract,
883            accepted_schema_info,
884            accepted_schema_fingerprint,
885        )
886    }
887}