Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17#[cfg(feature = "sql")]
18use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
19use crate::{
20    db::{
21        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
22        FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
23        StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
24        commit::CommitSchemaFingerprint,
25        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
26        schema::{
27            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
28            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
29            accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
30            describe_entity_fields, describe_entity_fields_with_persisted_schema,
31            describe_entity_model, describe_entity_model_with_persisted_schema,
32            ensure_accepted_schema_snapshot, show_indexes_for_model,
33            show_indexes_for_model_with_runtime_state,
34            show_indexes_for_schema_info_with_runtime_state,
35        },
36    },
37    error::InternalError,
38    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
39    model::entity::EntityModel,
40    traits::{CanisterKind, EntityKind, EntityValue, Path},
41    value::Value,
42};
43use std::{
44    cell::{OnceCell, RefCell},
45    collections::HashMap,
46    thread::LocalKey,
47};
48
49#[cfg(feature = "diagnostics")]
50pub use query::{
51    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
52    KernelRowAttribution, QueryExecutionAttribution,
53};
54pub(in crate::db) use response::finalize_scalar_paged_execution;
55pub(in crate::db) use response::finalize_structural_grouped_projection_result;
56#[cfg(feature = "sql")]
57pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
58#[cfg(feature = "sql")]
59pub use sql::{
60    SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport,
61    SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentUpdatePlan,
62    SqlStatementDispatch, SqlStatementResult, SqlStatementShellSurface, SqlStatementSurface,
63    SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy, SqlUpdateOrderPolicy,
64    SqlUpdatePolicyContext, SqlUpdatePolicyRejection, SqlUpdatePolicyReport,
65    SqlUpdateReturningBounds, SqlUpdateReturningPolicy, SqlUpdateStatementClassification,
66    SqlUpdateWherePolicy, SqlValidatedUpdatePlan, classify_sql_update_policy,
67    sql_statement_dispatch, sql_statement_entity_name, sql_statement_shell_surface,
68    sql_statement_surface,
69};
70#[cfg(all(feature = "sql", feature = "diagnostics"))]
71pub use sql::{
72    SqlCompileAttribution, SqlExecutionAttribution, SqlOutputBlobAttribution,
73    SqlPureCoveringAttribution, SqlQueryCacheAttribution, SqlQueryExecutionAttribution,
74    SqlScalarAggregateAttribution,
75};
76#[cfg(all(feature = "sql", feature = "diagnostics"))]
77pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
78
79///
80/// DbSession
81///
82/// Session-scoped database handle with policy (debug, metrics) and execution routing.
83///
84
85pub struct DbSession<C: CanisterKind> {
86    db: Db<C>,
87    debug: bool,
88    metrics: Option<&'static dyn MetricsSink>,
89}
90
91#[cfg(test)]
92#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
93pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
94    pub schema_info_projections: u64,
95    pub persisted_schema_decodes: u64,
96    pub generated_compatible_row_layout_proofs: u64,
97    pub latest_by_entity_calls: u64,
98    pub visible_index_projections: u64,
99}
100
101#[derive(Clone, Debug)]
102struct AcceptedSchemaQueryCacheEntry {
103    snapshot: AcceptedSchemaSnapshot,
104    identity: AcceptedCatalogIdentity,
105}
106
107pub(in crate::db) type AcceptedSaveContract = (
108    AcceptedRowDecodeContract,
109    AcceptedRowDecodeContract,
110    SchemaInfo,
111    CommitSchemaFingerprint,
112);
113
114#[derive(Clone, Debug)]
115pub(in crate::db) struct AcceptedSchemaCatalogContext {
116    snapshot: AcceptedSchemaSnapshot,
117    identity: AcceptedCatalogIdentity,
118    schema_info: OnceCell<SchemaInfo>,
119}
120
121impl AcceptedSchemaCatalogContext {
122    #[must_use]
123    pub(in crate::db) const fn new(
124        snapshot: AcceptedSchemaSnapshot,
125        identity: AcceptedCatalogIdentity,
126    ) -> Self {
127        Self {
128            snapshot,
129            identity,
130            schema_info: OnceCell::new(),
131        }
132    }
133
134    #[must_use]
135    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
136        &self.snapshot
137    }
138
139    #[must_use]
140    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
141        self.identity.accepted_schema_version()
142    }
143
144    #[must_use]
145    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
146        self.identity.accepted_schema_fingerprint()
147    }
148
149    #[must_use]
150    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
151        self.identity.fingerprint_method_version()
152    }
153
154    #[must_use]
155    #[cfg(feature = "sql")]
156    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
157        self.identity
158    }
159
160    fn debug_assert_matches_entity<E>(&self)
161    where
162        E: EntityKind,
163    {
164        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
165        debug_assert_eq!(self.identity.entity_path(), E::PATH);
166        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
167    }
168
169    pub(in crate::db) fn accepted_entity_authority_for<E>(
170        &self,
171    ) -> Result<EntityAuthority, InternalError>
172    where
173        E: EntityKind,
174    {
175        self.debug_assert_matches_entity::<E>();
176        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
177        let (accepted_row_layout, row_proof) =
178            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
179                &self.snapshot,
180                authority.model(),
181            )?;
182        let row_decode_contract = accepted_row_layout.row_decode_contract();
183
184        Ok(authority.with_accepted_row_decode_contract(
185            row_proof,
186            row_decode_contract,
187            self.accepted_schema_info_for::<E>(),
188        ))
189    }
190
191    #[must_use]
192    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
193    where
194        E: EntityKind,
195    {
196        self.debug_assert_matches_entity::<E>();
197        self.schema_info
198            .get_or_init(|| {
199                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
200                    E::MODEL,
201                    &self.snapshot,
202                    true,
203                )
204            })
205            .clone()
206    }
207}
208
209pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
210    accepted_schema: &AcceptedSchemaSnapshot,
211    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
212) -> Result<AcceptedSaveContract, InternalError>
213where
214    E: EntityKind,
215{
216    let row_decode_contract = descriptor.row_decode_contract();
217    let mutation_row_decode_contract = row_decode_contract.clone();
218    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
219    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
220
221    Ok((
222        row_decode_contract,
223        mutation_row_decode_contract,
224        schema_info,
225        schema_fingerprint,
226    ))
227}
228
229thread_local! {
230    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
231    // but repeated read calls should not reload the stable schema snapshot just
232    // to prove an already-warmed cache key. SQL DDL publication invalidates this
233    // heap cache before the next query observes the new accepted schema.
234    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
235        RefCell::new(HashMap::default());
236}
237
238impl<C: CanisterKind> DbSession<C> {
239    /// Construct one session facade for a database handle.
240    #[must_use]
241    pub(crate) const fn new(db: Db<C>) -> Self {
242        Self {
243            db,
244            debug: false,
245            metrics: None,
246        }
247    }
248
249    /// Construct one session facade from store registry and runtime hooks.
250    #[must_use]
251    pub const fn new_with_hooks(
252        store: &'static LocalKey<StoreRegistry>,
253        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
254    ) -> Self {
255        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
256    }
257
258    #[cfg(test)]
259    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
260        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
261        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
262        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
263        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
264        query::reset_visible_index_projection_count_for_tests();
265    }
266
267    #[cfg(test)]
268    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
269    -> AcceptedCatalogRuntimeCounterSnapshot {
270        AcceptedCatalogRuntimeCounterSnapshot {
271            schema_info_projections:
272                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
273            persisted_schema_decodes:
274                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
275            generated_compatible_row_layout_proofs:
276                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
277            latest_by_entity_calls:
278                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
279            visible_index_projections: query::visible_index_projection_count_for_tests(),
280        }
281    }
282
283    /// Enable debug execution behavior where supported by executors.
284    #[must_use]
285    pub const fn debug(mut self) -> Self {
286        self.debug = true;
287        self
288    }
289
290    /// Attach one metrics sink for all session-executed operations.
291    #[must_use]
292    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
293        self.metrics = Some(sink);
294        self
295    }
296
297    // Shared fluent load wrapper construction keeps the session boundary in
298    // one place when load entry points differ only by missing-row policy.
299    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
300    where
301        E: EntityKind<Canister = C>,
302    {
303        FluentLoadQuery::new(self, Query::new(consistency))
304    }
305
306    // Shared fluent delete wrapper construction keeps the delete-mode handoff
307    // explicit at the session boundary instead of reassembling the same query
308    // shell in each public entry point.
309    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
310    where
311        E: PersistedRow<Canister = C>,
312    {
313        FluentDeleteQuery::new(self, Query::new(consistency).delete())
314    }
315
316    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
317        if let Some(sink) = self.metrics {
318            with_metrics_sink(sink, f)
319        } else {
320            f()
321        }
322    }
323
324    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
325    fn execute_save_with<E, T, R>(
326        &self,
327        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
328        map: impl FnOnce(T) -> R,
329    ) -> Result<R, InternalError>
330    where
331        E: PersistedRow<Canister = C> + EntityValue,
332    {
333        let (contract, schema_info, schema_fingerprint) = match self
334            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
335        {
336            Ok(authority) => authority,
337            Err(error) => {
338                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
339
340                return Err(error);
341            }
342        };
343        let value = self.with_metrics(|| {
344            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
345        })?;
346
347        Ok(map(value))
348    }
349
350    // Execute save work after the caller has already proven that the accepted
351    // row contract is generated-compatible. SQL and structural writes use this
352    // after their pre-staging schema guard so mutation staging and save
353    // execution do not rerun schema-store reconciliation in the same statement.
354    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
355        &self,
356        accepted_row_decode_contract: AcceptedRowDecodeContract,
357        accepted_schema_info: SchemaInfo,
358        accepted_schema_fingerprint: CommitSchemaFingerprint,
359        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
360        map: impl FnOnce(T) -> R,
361    ) -> Result<R, InternalError>
362    where
363        E: PersistedRow<Canister = C> + EntityValue,
364    {
365        let value = self.with_metrics(|| {
366            op(self.save_executor::<E>(
367                accepted_row_decode_contract,
368                accepted_schema_info,
369                accepted_schema_fingerprint,
370            ))
371        })?;
372
373        Ok(map(value))
374    }
375
376    // Shared save-facade wrappers keep response shape explicit at call sites.
377    fn execute_save_entity<E>(
378        &self,
379        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
380    ) -> Result<E, InternalError>
381    where
382        E: PersistedRow<Canister = C> + EntityValue,
383    {
384        self.execute_save_with(op, std::convert::identity)
385    }
386
387    fn execute_save_batch<E>(
388        &self,
389        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
390    ) -> Result<WriteBatchResponse<E>, InternalError>
391    where
392        E: PersistedRow<Canister = C> + EntityValue,
393    {
394        self.execute_save_with(op, WriteBatchResponse::new)
395    }
396
397    // ---------------------------------------------------------------------
398    // Query entry points (public, fluent)
399    // ---------------------------------------------------------------------
400
401    /// Start a fluent load query with default missing-row policy (`Ignore`).
402    #[must_use]
403    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
404    where
405        E: EntityKind<Canister = C>,
406    {
407        self.fluent_load_query(MissingRowPolicy::Ignore)
408    }
409
410    /// Start a fluent load query with explicit missing-row policy.
411    #[must_use]
412    pub const fn load_with_consistency<E>(
413        &self,
414        consistency: MissingRowPolicy,
415    ) -> FluentLoadQuery<'_, E>
416    where
417        E: EntityKind<Canister = C>,
418    {
419        self.fluent_load_query(consistency)
420    }
421
422    /// Start a fluent delete query with default missing-row policy (`Ignore`).
423    #[must_use]
424    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
425    where
426        E: PersistedRow<Canister = C>,
427    {
428        self.fluent_delete_query(MissingRowPolicy::Ignore)
429    }
430
431    /// Start a fluent delete query with explicit missing-row policy.
432    #[must_use]
433    pub fn delete_with_consistency<E>(
434        &self,
435        consistency: MissingRowPolicy,
436    ) -> FluentDeleteQuery<'_, E>
437    where
438        E: PersistedRow<Canister = C>,
439    {
440        self.fluent_delete_query(consistency)
441    }
442
443    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
444    ///
445    /// This terminal bypasses query planning and access routing entirely.
446    #[must_use]
447    pub const fn select_one(&self) -> Value {
448        Value::Int64(1)
449    }
450
451    /// Return one stable, human-readable index listing for the entity schema.
452    ///
453    /// Output format mirrors SQL-style introspection:
454    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
455    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
456    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
457    #[must_use]
458    pub fn show_indexes<E>(&self) -> Vec<String>
459    where
460        E: EntityKind<Canister = C>,
461    {
462        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
463    }
464
465    /// Return one stable, human-readable index listing for one schema model.
466    ///
467    /// This model-only helper is schema-owned and intentionally does not
468    /// attach runtime lifecycle state because it does not carry store
469    /// placement context.
470    #[must_use]
471    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
472        show_indexes_for_model(model)
473    }
474
475    /// Return one stable, human-readable index listing for the accepted schema.
476    ///
477    /// Unlike `show_indexes`, this fallible live-schema helper reflects
478    /// accepted DDL-created indexes as well as compiled schema indexes.
479    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
480    where
481        E: EntityKind<Canister = C>,
482    {
483        let schema = self.accepted_schema_info_for_entity::<E>()?;
484
485        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
486    }
487
488    // Return one stable, human-readable index listing for one resolved
489    // store/model pair, attaching the current runtime lifecycle state when the
490    // registry can resolve the backing store handle.
491    pub(in crate::db) fn show_indexes_for_store_model(
492        &self,
493        store_path: &str,
494        model: &'static EntityModel,
495    ) -> Vec<String> {
496        let runtime_state = self
497            .db
498            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
499            .map(|store| store.index_state());
500
501        show_indexes_for_model_with_runtime_state(model, runtime_state)
502    }
503
504    // Return one stable, human-readable index listing for one resolved
505    // store/accepted-schema pair, attaching the current runtime lifecycle state
506    // when the registry can resolve the backing store handle.
507    pub(in crate::db) fn show_indexes_for_store_schema_info(
508        &self,
509        store_path: &str,
510        schema: &SchemaInfo,
511    ) -> Vec<String> {
512        let runtime_state = self
513            .db
514            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
515            .map(|store| store.index_state());
516
517        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
518    }
519
520    /// Return one stable generated-model list of field descriptors.
521    ///
522    /// This infallible Rust metadata helper intentionally reports the compiled
523    /// schema model. Use `try_show_columns` for the accepted persisted-schema
524    /// view used by SQL and diagnostics surfaces.
525    #[must_use]
526    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
527    where
528        E: EntityKind<Canister = C>,
529    {
530        self.show_columns_for_model(E::MODEL)
531    }
532
533    /// Return one stable generated-model list of field descriptors.
534    #[must_use]
535    pub fn show_columns_for_model(
536        &self,
537        model: &'static EntityModel,
538    ) -> Vec<EntityFieldDescription> {
539        describe_entity_fields(model)
540    }
541
542    /// Return field descriptors using the accepted persisted schema snapshot.
543    ///
544    /// This fallible variant is intended for SQL and diagnostics surfaces that
545    /// can report schema reconciliation failures. The infallible
546    /// `show_columns` helper remains generated-model based.
547    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
548    where
549        E: EntityKind<Canister = C>,
550    {
551        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
552
553        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
554    }
555
556    /// Return one stable list of runtime-registered entity catalog entries.
557    ///
558    /// # Panics
559    ///
560    /// Panics if the runtime session cannot read its registered entity catalog.
561    /// Use `try_show_entities` when the caller can report the internal error.
562    #[must_use]
563    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
564        self.try_show_entities().expect("session invariant")
565    }
566
567    /// Return one stable list of runtime-registered entity catalog entries.
568    pub fn try_show_entities(
569        &self,
570    ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
571        self.db.runtime_entity_catalog()
572    }
573
574    /// Return one stable list of runtime-registered stores.
575    #[must_use]
576    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
577        self.db.runtime_store_catalog()
578    }
579
580    /// Return one stable list of runtime-registered stable-memory allocations.
581    #[must_use]
582    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
583        self.db.runtime_memory_catalog()
584    }
585
586    // Resolve the exact secondary-index set that is visible to planner-owned
587    // query planning for one recovered store and accepted schema pair.
588    #[cfg(feature = "sql")]
589    fn visible_indexes_for_store_accepted_schema(
590        &self,
591        store_path: &str,
592        schema_info: &SchemaInfo,
593    ) -> Result<VisibleIndexes<'static>, QueryError> {
594        // Phase 1: resolve the recovered store state once at the session
595        // boundary so query/executor planning does not reopen lifecycle checks.
596        let store = self
597            .db
598            .recovered_store(store_path)
599            .map_err(QueryError::execute)?;
600        let state = store.index_state();
601        if state != IndexState::Ready {
602            return Ok(VisibleIndexes::none());
603        }
604        debug_assert_eq!(state, IndexState::Ready);
605
606        // Phase 2: planner-visible indexes are accepted schema contracts once
607        // the recovered store is query-visible.
608        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
609        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
610        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
611        debug_assert_eq!(
612            visible_indexes.accepted_expression_index_count(),
613            Some(visible_indexes.accepted_expression_indexes().len()),
614        );
615
616        Ok(visible_indexes)
617    }
618
619    /// Return one generated-model schema description for the entity.
620    ///
621    /// This is a typed `DESCRIBE`-style introspection surface consumed by
622    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
623    /// schema view is required.
624    #[must_use]
625    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
626    where
627        E: EntityKind<Canister = C>,
628    {
629        self.describe_entity_model(E::MODEL)
630    }
631
632    /// Return one generated-model schema description for one schema model.
633    #[must_use]
634    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
635        describe_entity_model(model)
636    }
637
638    /// Return a schema description using the accepted persisted schema snapshot.
639    ///
640    /// This is the live-schema counterpart to `describe_entity`. It is fallible
641    /// because loading accepted schema authority can fail if startup
642    /// reconciliation rejects the stored metadata.
643    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
644    where
645        E: EntityKind<Canister = C>,
646    {
647        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
648
649        Ok(describe_entity_model_with_persisted_schema(
650            E::MODEL,
651            &snapshot,
652        ))
653    }
654
655    // Ensure and return the accepted schema snapshot for one generated entity.
656    // This may write the first snapshot for an empty store; otherwise it loads
657    // the latest stored snapshot and applies the current exact-match policy.
658    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
659    where
660        E: EntityKind<Canister = C>,
661    {
662        let store = self.db.recovered_store(E::Store::PATH)?;
663
664        store.with_schema_mut(|schema_store| {
665            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
666        })
667    }
668
669    // Load the current accepted schema snapshot for read/query paths without
670    // rerunning generated proposal reconciliation on every cold query call.
671    // Startup and write paths still own reconciliation; the fallback only keeps
672    // first-use test stores and freshly initialized local stores functional.
673    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
674        &self,
675    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
676    where
677        E: EntityKind<Canister = C>,
678    {
679        let cache_key = (self.db.cache_scope_id(), E::PATH);
680        if let Some(entry) =
681            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
682        {
683            return Ok(AcceptedSchemaCatalogContext::new(
684                entry.snapshot,
685                entry.identity,
686            ));
687        }
688
689        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
690        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
691        let identity = AcceptedCatalogIdentity::new(
692            E::ENTITY_TAG,
693            E::PATH,
694            E::Store::PATH,
695            snapshot.persisted_snapshot().version(),
696            fingerprint,
697        );
698        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
699            cache.borrow_mut().insert(
700                cache_key,
701                AcceptedSchemaQueryCacheEntry {
702                    snapshot: snapshot.clone(),
703                    identity,
704                },
705            );
706        });
707
708        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
709    }
710
711    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
712        &self,
713    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
714    where
715        E: EntityKind<Canister = C>,
716    {
717        let store = self.db.recovered_store(E::Store::PATH)?;
718
719        store.with_schema_mut(|schema_store| {
720            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
721        })
722    }
723
724    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
725        &self,
726        selection: &AcceptedCatalogSnapshotSelection,
727    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
728    where
729        E: EntityKind<Canister = C>,
730    {
731        let snapshot = selection.decode_verified()?;
732        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
733            return Ok(None);
734        }
735        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
736        let cache_key = (self.db.cache_scope_id(), E::PATH);
737
738        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
739            cache.borrow_mut().insert(
740                cache_key,
741                AcceptedSchemaQueryCacheEntry {
742                    snapshot,
743                    identity: selection.identity(),
744                },
745            );
746        });
747
748        Ok(Some(context))
749    }
750
751    #[cfg(feature = "sql")]
752    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
753    where
754        E: EntityKind<Canister = C>,
755    {
756        let cache_key = (self.db.cache_scope_id(), E::PATH);
757        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
758            cache.borrow_mut().remove(&cache_key);
759        });
760    }
761
762    fn load_accepted_schema_snapshot_for_query<E>(
763        &self,
764    ) -> Result<AcceptedSchemaSnapshot, InternalError>
765    where
766        E: EntityKind<Canister = C>,
767    {
768        let store = self.db.recovered_store(E::Store::PATH)?;
769
770        store.with_schema_mut(|schema_store| {
771            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
772                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
773                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
774                    &accepted,
775                    E::MODEL,
776                )
777                .is_ok()
778                {
779                    return Ok(accepted);
780                }
781            }
782
783            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
784        })
785    }
786
787    // Build the accepted schema-info projection for one typed entity. Fluent
788    // terminal adapters use this before constructing slot-bound descriptors so
789    // field slot authority comes from the accepted schema snapshot.
790    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
791        &self,
792    ) -> Result<SchemaInfo, InternalError>
793    where
794        E: EntityKind<Canister = C>,
795    {
796        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
797
798        Ok(catalog.accepted_schema_info_for::<E>())
799    }
800
801    // Derive typed executor authority from an accepted snapshot the caller
802    // already loaded, avoiding a second schema-store pass in SQL write/select
803    // adapters that need both write descriptors and read selector authority.
804    #[cfg(feature = "sql")]
805    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
806        accepted_schema: &AcceptedSchemaSnapshot,
807    ) -> Result<EntityAuthority, InternalError>
808    where
809        E: EntityKind<Canister = C>,
810    {
811        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
812    }
813
814    // Ensure accepted schema metadata is safe for write paths that still encode
815    // rows through generated field contracts. Returning only the snapshot keeps
816    // SQL write type checks unchanged while the schema-runtime contract guard
817    // rejects unsupported layout or payload drift before mutation staging.
818    fn ensure_generated_compatible_accepted_save_schema<E>(
819        &self,
820    ) -> Result<
821        (
822            AcceptedRowDecodeContract,
823            SchemaInfo,
824            CommitSchemaFingerprint,
825        ),
826        InternalError,
827    >
828    where
829        E: EntityKind<Canister = C>,
830    {
831        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
832        let (accepted_row_layout, _) =
833            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
834                &accepted_schema,
835                E::MODEL,
836            )?;
837        let (row_decode_contract, _, schema_info, schema_fingerprint) =
838            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
839
840        Ok((row_decode_contract, schema_info, schema_fingerprint))
841    }
842
843    /// Build one point-in-time storage report for observability endpoints.
844    pub fn storage_report(
845        &self,
846        name_to_path: &[(&'static str, &'static str)],
847    ) -> Result<StorageReport, InternalError> {
848        self.db.storage_report(name_to_path)
849    }
850
851    /// Build one point-in-time storage report using default entity-path labels.
852    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
853        self.db.storage_report_default()
854    }
855
856    /// Build one point-in-time integrity scan report for observability endpoints.
857    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
858        self.db.integrity_report()
859    }
860
861    // ---------------------------------------------------------------------
862    // Low-level executors (crate-internal; execution primitives)
863    // ---------------------------------------------------------------------
864
865    #[must_use]
866    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
867    where
868        E: EntityKind<Canister = C> + EntityValue,
869    {
870        LoadExecutor::new(self.db, self.debug)
871    }
872
873    #[must_use]
874    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
875    where
876        E: PersistedRow<Canister = C> + EntityValue,
877    {
878        DeleteExecutor::new(self.db)
879    }
880
881    #[must_use]
882    pub(in crate::db) const fn save_executor<E>(
883        &self,
884        accepted_row_decode_contract: AcceptedRowDecodeContract,
885        accepted_schema_info: SchemaInfo,
886        accepted_schema_fingerprint: CommitSchemaFingerprint,
887    ) -> SaveExecutor<E>
888    where
889        E: PersistedRow<Canister = C> + EntityValue,
890    {
891        SaveExecutor::new_with_accepted_contract(
892            self.db,
893            self.debug,
894            accepted_row_decode_contract,
895            accepted_schema_info,
896            accepted_schema_fingerprint,
897        )
898    }
899}