Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17#[cfg(feature = "sql")]
18use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
19use crate::{
20    db::{
21        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
22        FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
23        StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
24        commit::CommitSchemaFingerprint,
25        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
26        schema::{
27            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
28            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
29            accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
30            describe_entity_fields, describe_entity_fields_with_persisted_schema,
31            describe_entity_model, describe_entity_model_with_persisted_schema,
32            ensure_accepted_schema_snapshot, show_indexes_for_model,
33            show_indexes_for_model_with_runtime_state,
34            show_indexes_for_schema_info_with_runtime_state,
35        },
36    },
37    error::InternalError,
38    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
39    model::entity::EntityModel,
40    traits::{CanisterKind, EntityKind, EntityValue, Path},
41    value::Value,
42};
43use std::{
44    cell::{OnceCell, RefCell},
45    collections::HashMap,
46    thread::LocalKey,
47};
48
49#[cfg(feature = "diagnostics")]
50pub use query::{
51    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
52    QueryExecutionAttribution,
53};
54pub(in crate::db) use response::finalize_scalar_paged_execution;
55pub(in crate::db) use response::finalize_structural_grouped_projection_result;
56#[cfg(feature = "sql")]
57pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
58#[cfg(feature = "sql")]
59pub use sql::{
60    SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport,
61    SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentUpdatePlan,
62    SqlStatementResult, SqlStatementSurface, SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy,
63    SqlUpdateOrderPolicy, SqlUpdatePolicyContext, SqlUpdatePolicyRejection, SqlUpdatePolicyReport,
64    SqlUpdateReturningBounds, SqlUpdateReturningPolicy, SqlUpdateStatementClassification,
65    SqlUpdateWherePolicy, SqlValidatedUpdatePlan, classify_sql_update_policy,
66    sql_statement_entity_name, sql_statement_surface,
67};
68#[cfg(all(feature = "sql", feature = "diagnostics"))]
69pub use sql::{
70    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
71    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
72};
73#[cfg(all(feature = "sql", feature = "diagnostics"))]
74pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
75
76///
77/// DbSession
78///
79/// Session-scoped database handle with policy (debug, metrics) and execution routing.
80///
81
82pub struct DbSession<C: CanisterKind> {
83    db: Db<C>,
84    debug: bool,
85    metrics: Option<&'static dyn MetricsSink>,
86}
87
88#[cfg(test)]
89#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
90pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
91    pub schema_info_projections: u64,
92    pub persisted_schema_decodes: u64,
93    pub generated_compatible_row_layout_proofs: u64,
94    pub latest_by_entity_calls: u64,
95    pub visible_index_projections: u64,
96}
97
98#[derive(Clone, Debug)]
99struct AcceptedSchemaQueryCacheEntry {
100    snapshot: AcceptedSchemaSnapshot,
101    identity: AcceptedCatalogIdentity,
102}
103
104pub(in crate::db) type AcceptedSaveContract = (
105    AcceptedRowDecodeContract,
106    AcceptedRowDecodeContract,
107    SchemaInfo,
108    CommitSchemaFingerprint,
109);
110
111#[derive(Clone, Debug)]
112pub(in crate::db) struct AcceptedSchemaCatalogContext {
113    snapshot: AcceptedSchemaSnapshot,
114    identity: AcceptedCatalogIdentity,
115    schema_info: OnceCell<SchemaInfo>,
116}
117
118impl AcceptedSchemaCatalogContext {
119    #[must_use]
120    pub(in crate::db) const fn new(
121        snapshot: AcceptedSchemaSnapshot,
122        identity: AcceptedCatalogIdentity,
123    ) -> Self {
124        Self {
125            snapshot,
126            identity,
127            schema_info: OnceCell::new(),
128        }
129    }
130
131    #[must_use]
132    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
133        &self.snapshot
134    }
135
136    #[must_use]
137    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
138        self.identity.accepted_schema_version()
139    }
140
141    #[must_use]
142    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
143        self.identity.accepted_schema_fingerprint()
144    }
145
146    #[must_use]
147    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
148        self.identity.fingerprint_method_version()
149    }
150
151    #[must_use]
152    #[cfg(feature = "sql")]
153    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
154        self.identity
155    }
156
157    fn debug_assert_matches_entity<E>(&self)
158    where
159        E: EntityKind,
160    {
161        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
162        debug_assert_eq!(self.identity.entity_path(), E::PATH);
163        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
164    }
165
166    pub(in crate::db) fn accepted_entity_authority_for<E>(
167        &self,
168    ) -> Result<EntityAuthority, InternalError>
169    where
170        E: EntityKind,
171    {
172        self.debug_assert_matches_entity::<E>();
173        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
174        let (accepted_row_layout, row_proof) =
175            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
176                &self.snapshot,
177                authority.model(),
178            )?;
179        let row_decode_contract = accepted_row_layout.row_decode_contract();
180
181        Ok(authority.with_accepted_row_decode_contract(
182            row_proof,
183            row_decode_contract,
184            self.accepted_schema_info_for::<E>(),
185        ))
186    }
187
188    #[must_use]
189    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
190    where
191        E: EntityKind,
192    {
193        self.debug_assert_matches_entity::<E>();
194        self.schema_info
195            .get_or_init(|| {
196                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
197                    E::MODEL,
198                    &self.snapshot,
199                    true,
200                )
201            })
202            .clone()
203    }
204}
205
206pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
207    accepted_schema: &AcceptedSchemaSnapshot,
208    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
209) -> Result<AcceptedSaveContract, InternalError>
210where
211    E: EntityKind,
212{
213    let row_decode_contract = descriptor.row_decode_contract();
214    let mutation_row_decode_contract = row_decode_contract.clone();
215    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
216    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
217
218    Ok((
219        row_decode_contract,
220        mutation_row_decode_contract,
221        schema_info,
222        schema_fingerprint,
223    ))
224}
225
226thread_local! {
227    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
228    // but repeated read calls should not reload the stable schema snapshot just
229    // to prove an already-warmed cache key. SQL DDL publication invalidates this
230    // heap cache before the next query observes the new accepted schema.
231    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
232        RefCell::new(HashMap::default());
233}
234
235impl<C: CanisterKind> DbSession<C> {
236    /// Construct one session facade for a database handle.
237    #[must_use]
238    pub(crate) const fn new(db: Db<C>) -> Self {
239        Self {
240            db,
241            debug: false,
242            metrics: None,
243        }
244    }
245
246    /// Construct one session facade from store registry and runtime hooks.
247    #[must_use]
248    pub const fn new_with_hooks(
249        store: &'static LocalKey<StoreRegistry>,
250        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
251    ) -> Self {
252        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
253    }
254
255    #[cfg(test)]
256    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
257        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
258        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
259        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
260        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
261        query::reset_visible_index_projection_count_for_tests();
262    }
263
264    #[cfg(test)]
265    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
266    -> AcceptedCatalogRuntimeCounterSnapshot {
267        AcceptedCatalogRuntimeCounterSnapshot {
268            schema_info_projections:
269                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
270            persisted_schema_decodes:
271                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
272            generated_compatible_row_layout_proofs:
273                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
274            latest_by_entity_calls:
275                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
276            visible_index_projections: query::visible_index_projection_count_for_tests(),
277        }
278    }
279
280    /// Enable debug execution behavior where supported by executors.
281    #[must_use]
282    pub const fn debug(mut self) -> Self {
283        self.debug = true;
284        self
285    }
286
287    /// Attach one metrics sink for all session-executed operations.
288    #[must_use]
289    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
290        self.metrics = Some(sink);
291        self
292    }
293
294    // Shared fluent load wrapper construction keeps the session boundary in
295    // one place when load entry points differ only by missing-row policy.
296    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
297    where
298        E: EntityKind<Canister = C>,
299    {
300        FluentLoadQuery::new(self, Query::new(consistency))
301    }
302
303    // Shared fluent delete wrapper construction keeps the delete-mode handoff
304    // explicit at the session boundary instead of reassembling the same query
305    // shell in each public entry point.
306    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
307    where
308        E: PersistedRow<Canister = C>,
309    {
310        FluentDeleteQuery::new(self, Query::new(consistency).delete())
311    }
312
313    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
314        if let Some(sink) = self.metrics {
315            with_metrics_sink(sink, f)
316        } else {
317            f()
318        }
319    }
320
321    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
322    fn execute_save_with<E, T, R>(
323        &self,
324        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
325        map: impl FnOnce(T) -> R,
326    ) -> Result<R, InternalError>
327    where
328        E: PersistedRow<Canister = C> + EntityValue,
329    {
330        let (contract, schema_info, schema_fingerprint) = match self
331            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
332        {
333            Ok(authority) => authority,
334            Err(error) => {
335                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
336
337                return Err(error);
338            }
339        };
340        let value = self.with_metrics(|| {
341            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
342        })?;
343
344        Ok(map(value))
345    }
346
347    // Execute save work after the caller has already proven that the accepted
348    // row contract is generated-compatible. SQL and structural writes use this
349    // after their pre-staging schema guard so mutation staging and save
350    // execution do not rerun schema-store reconciliation in the same statement.
351    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
352        &self,
353        accepted_row_decode_contract: AcceptedRowDecodeContract,
354        accepted_schema_info: SchemaInfo,
355        accepted_schema_fingerprint: CommitSchemaFingerprint,
356        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
357        map: impl FnOnce(T) -> R,
358    ) -> Result<R, InternalError>
359    where
360        E: PersistedRow<Canister = C> + EntityValue,
361    {
362        let value = self.with_metrics(|| {
363            op(self.save_executor::<E>(
364                accepted_row_decode_contract,
365                accepted_schema_info,
366                accepted_schema_fingerprint,
367            ))
368        })?;
369
370        Ok(map(value))
371    }
372
373    // Shared save-facade wrappers keep response shape explicit at call sites.
374    fn execute_save_entity<E>(
375        &self,
376        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
377    ) -> Result<E, InternalError>
378    where
379        E: PersistedRow<Canister = C> + EntityValue,
380    {
381        self.execute_save_with(op, std::convert::identity)
382    }
383
384    fn execute_save_batch<E>(
385        &self,
386        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
387    ) -> Result<WriteBatchResponse<E>, InternalError>
388    where
389        E: PersistedRow<Canister = C> + EntityValue,
390    {
391        self.execute_save_with(op, WriteBatchResponse::new)
392    }
393
394    // ---------------------------------------------------------------------
395    // Query entry points (public, fluent)
396    // ---------------------------------------------------------------------
397
398    /// Start a fluent load query with default missing-row policy (`Ignore`).
399    #[must_use]
400    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
401    where
402        E: EntityKind<Canister = C>,
403    {
404        self.fluent_load_query(MissingRowPolicy::Ignore)
405    }
406
407    /// Start a fluent load query with explicit missing-row policy.
408    #[must_use]
409    pub const fn load_with_consistency<E>(
410        &self,
411        consistency: MissingRowPolicy,
412    ) -> FluentLoadQuery<'_, E>
413    where
414        E: EntityKind<Canister = C>,
415    {
416        self.fluent_load_query(consistency)
417    }
418
419    /// Start a fluent delete query with default missing-row policy (`Ignore`).
420    #[must_use]
421    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
422    where
423        E: PersistedRow<Canister = C>,
424    {
425        self.fluent_delete_query(MissingRowPolicy::Ignore)
426    }
427
428    /// Start a fluent delete query with explicit missing-row policy.
429    #[must_use]
430    pub fn delete_with_consistency<E>(
431        &self,
432        consistency: MissingRowPolicy,
433    ) -> FluentDeleteQuery<'_, E>
434    where
435        E: PersistedRow<Canister = C>,
436    {
437        self.fluent_delete_query(consistency)
438    }
439
440    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
441    ///
442    /// This terminal bypasses query planning and access routing entirely.
443    #[must_use]
444    pub const fn select_one(&self) -> Value {
445        Value::Int64(1)
446    }
447
448    /// Return one stable, human-readable index listing for the entity schema.
449    ///
450    /// Output format mirrors SQL-style introspection:
451    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
452    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
453    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
454    #[must_use]
455    pub fn show_indexes<E>(&self) -> Vec<String>
456    where
457        E: EntityKind<Canister = C>,
458    {
459        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
460    }
461
462    /// Return one stable, human-readable index listing for one schema model.
463    ///
464    /// This model-only helper is schema-owned and intentionally does not
465    /// attach runtime lifecycle state because it does not carry store
466    /// placement context.
467    #[must_use]
468    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
469        show_indexes_for_model(model)
470    }
471
472    /// Return one stable, human-readable index listing for the accepted schema.
473    ///
474    /// Unlike `show_indexes`, this fallible live-schema helper reflects
475    /// accepted DDL-created indexes as well as compiled schema indexes.
476    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
477    where
478        E: EntityKind<Canister = C>,
479    {
480        let schema = self.accepted_schema_info_for_entity::<E>()?;
481
482        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
483    }
484
485    // Return one stable, human-readable index listing for one resolved
486    // store/model pair, attaching the current runtime lifecycle state when the
487    // registry can resolve the backing store handle.
488    pub(in crate::db) fn show_indexes_for_store_model(
489        &self,
490        store_path: &str,
491        model: &'static EntityModel,
492    ) -> Vec<String> {
493        let runtime_state = self
494            .db
495            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
496            .map(|store| store.index_state());
497
498        show_indexes_for_model_with_runtime_state(model, runtime_state)
499    }
500
501    // Return one stable, human-readable index listing for one resolved
502    // store/accepted-schema pair, attaching the current runtime lifecycle state
503    // when the registry can resolve the backing store handle.
504    pub(in crate::db) fn show_indexes_for_store_schema_info(
505        &self,
506        store_path: &str,
507        schema: &SchemaInfo,
508    ) -> Vec<String> {
509        let runtime_state = self
510            .db
511            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
512            .map(|store| store.index_state());
513
514        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
515    }
516
517    /// Return one stable generated-model list of field descriptors.
518    ///
519    /// This infallible Rust metadata helper intentionally reports the compiled
520    /// schema model. Use `try_show_columns` for the accepted persisted-schema
521    /// view used by SQL and diagnostics surfaces.
522    #[must_use]
523    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
524    where
525        E: EntityKind<Canister = C>,
526    {
527        self.show_columns_for_model(E::MODEL)
528    }
529
530    /// Return one stable generated-model list of field descriptors.
531    #[must_use]
532    pub fn show_columns_for_model(
533        &self,
534        model: &'static EntityModel,
535    ) -> Vec<EntityFieldDescription> {
536        describe_entity_fields(model)
537    }
538
539    /// Return field descriptors using the accepted persisted schema snapshot.
540    ///
541    /// This fallible variant is intended for SQL and diagnostics surfaces that
542    /// can report schema reconciliation failures. The infallible
543    /// `show_columns` helper remains generated-model based.
544    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
545    where
546        E: EntityKind<Canister = C>,
547    {
548        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
549
550        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
551    }
552
553    /// Return one stable list of runtime-registered entity catalog entries.
554    #[must_use]
555    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
556        self.try_show_entities().expect("session invariant")
557    }
558
559    /// Return one stable list of runtime-registered entity catalog entries.
560    pub fn try_show_entities(
561        &self,
562    ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
563        self.db.runtime_entity_catalog()
564    }
565
566    /// Return one stable list of runtime-registered stores.
567    #[must_use]
568    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
569        self.db.runtime_store_catalog()
570    }
571
572    /// Return one stable list of runtime-registered stable-memory allocations.
573    #[must_use]
574    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
575        self.db.runtime_memory_catalog()
576    }
577
578    // Resolve the exact secondary-index set that is visible to planner-owned
579    // query planning for one recovered store and accepted schema pair.
580    #[cfg(feature = "sql")]
581    fn visible_indexes_for_store_accepted_schema(
582        &self,
583        store_path: &str,
584        schema_info: &SchemaInfo,
585    ) -> Result<VisibleIndexes<'static>, QueryError> {
586        // Phase 1: resolve the recovered store state once at the session
587        // boundary so query/executor planning does not reopen lifecycle checks.
588        let store = self
589            .db
590            .recovered_store(store_path)
591            .map_err(QueryError::execute)?;
592        let state = store.index_state();
593        if state != IndexState::Ready {
594            return Ok(VisibleIndexes::none());
595        }
596        debug_assert_eq!(state, IndexState::Ready);
597
598        // Phase 2: planner-visible indexes are accepted schema contracts once
599        // the recovered store is query-visible.
600        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
601        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
602        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
603        debug_assert_eq!(
604            visible_indexes.accepted_expression_index_count(),
605            Some(visible_indexes.accepted_expression_indexes().len()),
606        );
607
608        Ok(visible_indexes)
609    }
610
611    /// Return one generated-model schema description for the entity.
612    ///
613    /// This is a typed `DESCRIBE`-style introspection surface consumed by
614    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
615    /// schema view is required.
616    #[must_use]
617    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
618    where
619        E: EntityKind<Canister = C>,
620    {
621        self.describe_entity_model(E::MODEL)
622    }
623
624    /// Return one generated-model schema description for one schema model.
625    #[must_use]
626    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
627        describe_entity_model(model)
628    }
629
630    /// Return a schema description using the accepted persisted schema snapshot.
631    ///
632    /// This is the live-schema counterpart to `describe_entity`. It is fallible
633    /// because loading accepted schema authority can fail if startup
634    /// reconciliation rejects the stored metadata.
635    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
636    where
637        E: EntityKind<Canister = C>,
638    {
639        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
640
641        Ok(describe_entity_model_with_persisted_schema(
642            E::MODEL,
643            &snapshot,
644        ))
645    }
646
647    // Ensure and return the accepted schema snapshot for one generated entity.
648    // This may write the first snapshot for an empty store; otherwise it loads
649    // the latest stored snapshot and applies the current exact-match policy.
650    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
651    where
652        E: EntityKind<Canister = C>,
653    {
654        let store = self.db.recovered_store(E::Store::PATH)?;
655
656        store.with_schema_mut(|schema_store| {
657            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
658        })
659    }
660
661    // Load the current accepted schema snapshot for read/query paths without
662    // rerunning generated proposal reconciliation on every cold query call.
663    // Startup and write paths still own reconciliation; the fallback only keeps
664    // first-use test stores and freshly initialized local stores functional.
665    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
666        &self,
667    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
668    where
669        E: EntityKind<Canister = C>,
670    {
671        let cache_key = (self.db.cache_scope_id(), E::PATH);
672        if let Some(entry) =
673            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
674        {
675            return Ok(AcceptedSchemaCatalogContext::new(
676                entry.snapshot,
677                entry.identity,
678            ));
679        }
680
681        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
682        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
683        let identity = AcceptedCatalogIdentity::new(
684            E::ENTITY_TAG,
685            E::PATH,
686            E::Store::PATH,
687            snapshot.persisted_snapshot().version(),
688            fingerprint,
689        );
690        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
691            cache.borrow_mut().insert(
692                cache_key,
693                AcceptedSchemaQueryCacheEntry {
694                    snapshot: snapshot.clone(),
695                    identity,
696                },
697            );
698        });
699
700        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
701    }
702
703    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
704        &self,
705    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
706    where
707        E: EntityKind<Canister = C>,
708    {
709        let store = self.db.recovered_store(E::Store::PATH)?;
710
711        store.with_schema_mut(|schema_store| {
712            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
713        })
714    }
715
716    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
717        &self,
718        selection: &AcceptedCatalogSnapshotSelection,
719    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
720    where
721        E: EntityKind<Canister = C>,
722    {
723        let snapshot = selection.decode_verified()?;
724        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
725            return Ok(None);
726        }
727        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
728        let cache_key = (self.db.cache_scope_id(), E::PATH);
729
730        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
731            cache.borrow_mut().insert(
732                cache_key,
733                AcceptedSchemaQueryCacheEntry {
734                    snapshot,
735                    identity: selection.identity(),
736                },
737            );
738        });
739
740        Ok(Some(context))
741    }
742
743    #[cfg(feature = "sql")]
744    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
745    where
746        E: EntityKind<Canister = C>,
747    {
748        let cache_key = (self.db.cache_scope_id(), E::PATH);
749        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
750            cache.borrow_mut().remove(&cache_key);
751        });
752    }
753
754    fn load_accepted_schema_snapshot_for_query<E>(
755        &self,
756    ) -> Result<AcceptedSchemaSnapshot, InternalError>
757    where
758        E: EntityKind<Canister = C>,
759    {
760        let store = self.db.recovered_store(E::Store::PATH)?;
761
762        store.with_schema_mut(|schema_store| {
763            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
764                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
765                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
766                    &accepted,
767                    E::MODEL,
768                )
769                .is_ok()
770                {
771                    return Ok(accepted);
772                }
773            }
774
775            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
776        })
777    }
778
779    // Build the accepted schema-info projection for one typed entity. Fluent
780    // terminal adapters use this before constructing slot-bound descriptors so
781    // field slot authority comes from the accepted schema snapshot.
782    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
783        &self,
784    ) -> Result<SchemaInfo, InternalError>
785    where
786        E: EntityKind<Canister = C>,
787    {
788        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
789
790        Ok(catalog.accepted_schema_info_for::<E>())
791    }
792
793    // Derive typed executor authority from an accepted snapshot the caller
794    // already loaded, avoiding a second schema-store pass in SQL write/select
795    // adapters that need both write descriptors and read selector authority.
796    #[cfg(feature = "sql")]
797    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
798        accepted_schema: &AcceptedSchemaSnapshot,
799    ) -> Result<EntityAuthority, InternalError>
800    where
801        E: EntityKind<Canister = C>,
802    {
803        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
804    }
805
806    // Ensure accepted schema metadata is safe for write paths that still encode
807    // rows through generated field contracts. Returning only the snapshot keeps
808    // SQL write type checks unchanged while the schema-runtime contract guard
809    // rejects unsupported layout or payload drift before mutation staging.
810    fn ensure_generated_compatible_accepted_save_schema<E>(
811        &self,
812    ) -> Result<
813        (
814            AcceptedRowDecodeContract,
815            SchemaInfo,
816            CommitSchemaFingerprint,
817        ),
818        InternalError,
819    >
820    where
821        E: EntityKind<Canister = C>,
822    {
823        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
824        let (accepted_row_layout, _) =
825            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
826                &accepted_schema,
827                E::MODEL,
828            )?;
829        let (row_decode_contract, _, schema_info, schema_fingerprint) =
830            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
831
832        Ok((row_decode_contract, schema_info, schema_fingerprint))
833    }
834
835    /// Build one point-in-time storage report for observability endpoints.
836    pub fn storage_report(
837        &self,
838        name_to_path: &[(&'static str, &'static str)],
839    ) -> Result<StorageReport, InternalError> {
840        self.db.storage_report(name_to_path)
841    }
842
843    /// Build one point-in-time storage report using default entity-path labels.
844    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
845        self.db.storage_report_default()
846    }
847
848    /// Build one point-in-time integrity scan report for observability endpoints.
849    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
850        self.db.integrity_report()
851    }
852
853    // ---------------------------------------------------------------------
854    // Low-level executors (crate-internal; execution primitives)
855    // ---------------------------------------------------------------------
856
857    #[must_use]
858    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
859    where
860        E: EntityKind<Canister = C> + EntityValue,
861    {
862        LoadExecutor::new(self.db, self.debug)
863    }
864
865    #[must_use]
866    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
867    where
868        E: PersistedRow<Canister = C> + EntityValue,
869    {
870        DeleteExecutor::new(self.db)
871    }
872
873    #[must_use]
874    pub(in crate::db) const fn save_executor<E>(
875        &self,
876        accepted_row_decode_contract: AcceptedRowDecodeContract,
877        accepted_schema_info: SchemaInfo,
878        accepted_schema_fingerprint: CommitSchemaFingerprint,
879    ) -> SaveExecutor<E>
880    where
881        E: PersistedRow<Canister = C> + EntityValue,
882    {
883        SaveExecutor::new_with_accepted_contract(
884            self.db,
885            self.debug,
886            accepted_row_decode_contract,
887            accepted_schema_info,
888            accepted_schema_fingerprint,
889        )
890    }
891}