Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17#[cfg(feature = "sql")]
18use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
19use crate::{
20    db::{
21        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
22        FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
23        StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
24        commit::CommitSchemaFingerprint,
25        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
26        schema::{
27            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
28            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
29            accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
30            describe_entity_fields, describe_entity_fields_with_persisted_schema,
31            describe_entity_model, describe_entity_model_with_persisted_schema,
32            ensure_accepted_schema_snapshot, show_indexes_for_model,
33            show_indexes_for_model_with_runtime_state,
34            show_indexes_for_schema_info_with_runtime_state,
35        },
36    },
37    error::InternalError,
38    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
39    model::entity::EntityModel,
40    traits::{CanisterKind, EntityKind, EntityValue, Path},
41    value::Value,
42};
43use std::{
44    cell::{OnceCell, RefCell},
45    collections::HashMap,
46    thread::LocalKey,
47};
48
49#[cfg(feature = "diagnostics")]
50pub use query::{
51    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
52    QueryExecutionAttribution,
53};
54pub(in crate::db) use response::finalize_scalar_paged_execution;
55pub(in crate::db) use response::finalize_structural_grouped_projection_result;
56#[cfg(feature = "sql")]
57pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
58#[cfg(feature = "sql")]
59pub use sql::{
60    SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport,
61    SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentUpdatePlan,
62    SqlStatementResult, SqlStatementShellSurface, SqlStatementSurface, SqlUpdateAssignmentPolicy,
63    SqlUpdateExposurePolicy, SqlUpdateOrderPolicy, SqlUpdatePolicyContext,
64    SqlUpdatePolicyRejection, SqlUpdatePolicyReport, SqlUpdateReturningBounds,
65    SqlUpdateReturningPolicy, SqlUpdateStatementClassification, SqlUpdateWherePolicy,
66    SqlValidatedUpdatePlan, classify_sql_update_policy, sql_statement_entity_name,
67    sql_statement_shell_surface, sql_statement_surface,
68};
69#[cfg(all(feature = "sql", feature = "diagnostics"))]
70pub use sql::{
71    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
72    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
73};
74#[cfg(all(feature = "sql", feature = "diagnostics"))]
75pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
76
77///
78/// DbSession
79///
80/// Session-scoped database handle with policy (debug, metrics) and execution routing.
81///
82
83pub struct DbSession<C: CanisterKind> {
84    db: Db<C>,
85    debug: bool,
86    metrics: Option<&'static dyn MetricsSink>,
87}
88
89#[cfg(test)]
90#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
91pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
92    pub schema_info_projections: u64,
93    pub persisted_schema_decodes: u64,
94    pub generated_compatible_row_layout_proofs: u64,
95    pub latest_by_entity_calls: u64,
96    pub visible_index_projections: u64,
97}
98
99#[derive(Clone, Debug)]
100struct AcceptedSchemaQueryCacheEntry {
101    snapshot: AcceptedSchemaSnapshot,
102    identity: AcceptedCatalogIdentity,
103}
104
105pub(in crate::db) type AcceptedSaveContract = (
106    AcceptedRowDecodeContract,
107    AcceptedRowDecodeContract,
108    SchemaInfo,
109    CommitSchemaFingerprint,
110);
111
112#[derive(Clone, Debug)]
113pub(in crate::db) struct AcceptedSchemaCatalogContext {
114    snapshot: AcceptedSchemaSnapshot,
115    identity: AcceptedCatalogIdentity,
116    schema_info: OnceCell<SchemaInfo>,
117}
118
119impl AcceptedSchemaCatalogContext {
120    #[must_use]
121    pub(in crate::db) const fn new(
122        snapshot: AcceptedSchemaSnapshot,
123        identity: AcceptedCatalogIdentity,
124    ) -> Self {
125        Self {
126            snapshot,
127            identity,
128            schema_info: OnceCell::new(),
129        }
130    }
131
132    #[must_use]
133    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
134        &self.snapshot
135    }
136
137    #[must_use]
138    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
139        self.identity.accepted_schema_version()
140    }
141
142    #[must_use]
143    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
144        self.identity.accepted_schema_fingerprint()
145    }
146
147    #[must_use]
148    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
149        self.identity.fingerprint_method_version()
150    }
151
152    #[must_use]
153    #[cfg(feature = "sql")]
154    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
155        self.identity
156    }
157
158    fn debug_assert_matches_entity<E>(&self)
159    where
160        E: EntityKind,
161    {
162        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
163        debug_assert_eq!(self.identity.entity_path(), E::PATH);
164        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
165    }
166
167    pub(in crate::db) fn accepted_entity_authority_for<E>(
168        &self,
169    ) -> Result<EntityAuthority, InternalError>
170    where
171        E: EntityKind,
172    {
173        self.debug_assert_matches_entity::<E>();
174        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
175        let (accepted_row_layout, row_proof) =
176            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
177                &self.snapshot,
178                authority.model(),
179            )?;
180        let row_decode_contract = accepted_row_layout.row_decode_contract();
181
182        Ok(authority.with_accepted_row_decode_contract(
183            row_proof,
184            row_decode_contract,
185            self.accepted_schema_info_for::<E>(),
186        ))
187    }
188
189    #[must_use]
190    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
191    where
192        E: EntityKind,
193    {
194        self.debug_assert_matches_entity::<E>();
195        self.schema_info
196            .get_or_init(|| {
197                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
198                    E::MODEL,
199                    &self.snapshot,
200                    true,
201                )
202            })
203            .clone()
204    }
205}
206
207pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
208    accepted_schema: &AcceptedSchemaSnapshot,
209    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
210) -> Result<AcceptedSaveContract, InternalError>
211where
212    E: EntityKind,
213{
214    let row_decode_contract = descriptor.row_decode_contract();
215    let mutation_row_decode_contract = row_decode_contract.clone();
216    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
217    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
218
219    Ok((
220        row_decode_contract,
221        mutation_row_decode_contract,
222        schema_info,
223        schema_fingerprint,
224    ))
225}
226
227thread_local! {
228    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
229    // but repeated read calls should not reload the stable schema snapshot just
230    // to prove an already-warmed cache key. SQL DDL publication invalidates this
231    // heap cache before the next query observes the new accepted schema.
232    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
233        RefCell::new(HashMap::default());
234}
235
236impl<C: CanisterKind> DbSession<C> {
237    /// Construct one session facade for a database handle.
238    #[must_use]
239    pub(crate) const fn new(db: Db<C>) -> Self {
240        Self {
241            db,
242            debug: false,
243            metrics: None,
244        }
245    }
246
247    /// Construct one session facade from store registry and runtime hooks.
248    #[must_use]
249    pub const fn new_with_hooks(
250        store: &'static LocalKey<StoreRegistry>,
251        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
252    ) -> Self {
253        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
254    }
255
256    #[cfg(test)]
257    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
258        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
259        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
260        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
261        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
262        query::reset_visible_index_projection_count_for_tests();
263    }
264
265    #[cfg(test)]
266    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
267    -> AcceptedCatalogRuntimeCounterSnapshot {
268        AcceptedCatalogRuntimeCounterSnapshot {
269            schema_info_projections:
270                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
271            persisted_schema_decodes:
272                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
273            generated_compatible_row_layout_proofs:
274                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
275            latest_by_entity_calls:
276                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
277            visible_index_projections: query::visible_index_projection_count_for_tests(),
278        }
279    }
280
281    /// Enable debug execution behavior where supported by executors.
282    #[must_use]
283    pub const fn debug(mut self) -> Self {
284        self.debug = true;
285        self
286    }
287
288    /// Attach one metrics sink for all session-executed operations.
289    #[must_use]
290    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
291        self.metrics = Some(sink);
292        self
293    }
294
295    // Shared fluent load wrapper construction keeps the session boundary in
296    // one place when load entry points differ only by missing-row policy.
297    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
298    where
299        E: EntityKind<Canister = C>,
300    {
301        FluentLoadQuery::new(self, Query::new(consistency))
302    }
303
304    // Shared fluent delete wrapper construction keeps the delete-mode handoff
305    // explicit at the session boundary instead of reassembling the same query
306    // shell in each public entry point.
307    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
308    where
309        E: PersistedRow<Canister = C>,
310    {
311        FluentDeleteQuery::new(self, Query::new(consistency).delete())
312    }
313
314    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
315        if let Some(sink) = self.metrics {
316            with_metrics_sink(sink, f)
317        } else {
318            f()
319        }
320    }
321
322    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
323    fn execute_save_with<E, T, R>(
324        &self,
325        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
326        map: impl FnOnce(T) -> R,
327    ) -> Result<R, InternalError>
328    where
329        E: PersistedRow<Canister = C> + EntityValue,
330    {
331        let (contract, schema_info, schema_fingerprint) = match self
332            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
333        {
334            Ok(authority) => authority,
335            Err(error) => {
336                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
337
338                return Err(error);
339            }
340        };
341        let value = self.with_metrics(|| {
342            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
343        })?;
344
345        Ok(map(value))
346    }
347
348    // Execute save work after the caller has already proven that the accepted
349    // row contract is generated-compatible. SQL and structural writes use this
350    // after their pre-staging schema guard so mutation staging and save
351    // execution do not rerun schema-store reconciliation in the same statement.
352    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
353        &self,
354        accepted_row_decode_contract: AcceptedRowDecodeContract,
355        accepted_schema_info: SchemaInfo,
356        accepted_schema_fingerprint: CommitSchemaFingerprint,
357        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
358        map: impl FnOnce(T) -> R,
359    ) -> Result<R, InternalError>
360    where
361        E: PersistedRow<Canister = C> + EntityValue,
362    {
363        let value = self.with_metrics(|| {
364            op(self.save_executor::<E>(
365                accepted_row_decode_contract,
366                accepted_schema_info,
367                accepted_schema_fingerprint,
368            ))
369        })?;
370
371        Ok(map(value))
372    }
373
374    // Shared save-facade wrappers keep response shape explicit at call sites.
375    fn execute_save_entity<E>(
376        &self,
377        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
378    ) -> Result<E, InternalError>
379    where
380        E: PersistedRow<Canister = C> + EntityValue,
381    {
382        self.execute_save_with(op, std::convert::identity)
383    }
384
385    fn execute_save_batch<E>(
386        &self,
387        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
388    ) -> Result<WriteBatchResponse<E>, InternalError>
389    where
390        E: PersistedRow<Canister = C> + EntityValue,
391    {
392        self.execute_save_with(op, WriteBatchResponse::new)
393    }
394
395    // ---------------------------------------------------------------------
396    // Query entry points (public, fluent)
397    // ---------------------------------------------------------------------
398
399    /// Start a fluent load query with default missing-row policy (`Ignore`).
400    #[must_use]
401    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
402    where
403        E: EntityKind<Canister = C>,
404    {
405        self.fluent_load_query(MissingRowPolicy::Ignore)
406    }
407
408    /// Start a fluent load query with explicit missing-row policy.
409    #[must_use]
410    pub const fn load_with_consistency<E>(
411        &self,
412        consistency: MissingRowPolicy,
413    ) -> FluentLoadQuery<'_, E>
414    where
415        E: EntityKind<Canister = C>,
416    {
417        self.fluent_load_query(consistency)
418    }
419
420    /// Start a fluent delete query with default missing-row policy (`Ignore`).
421    #[must_use]
422    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
423    where
424        E: PersistedRow<Canister = C>,
425    {
426        self.fluent_delete_query(MissingRowPolicy::Ignore)
427    }
428
429    /// Start a fluent delete query with explicit missing-row policy.
430    #[must_use]
431    pub fn delete_with_consistency<E>(
432        &self,
433        consistency: MissingRowPolicy,
434    ) -> FluentDeleteQuery<'_, E>
435    where
436        E: PersistedRow<Canister = C>,
437    {
438        self.fluent_delete_query(consistency)
439    }
440
441    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
442    ///
443    /// This terminal bypasses query planning and access routing entirely.
444    #[must_use]
445    pub const fn select_one(&self) -> Value {
446        Value::Int64(1)
447    }
448
449    /// Return one stable, human-readable index listing for the entity schema.
450    ///
451    /// Output format mirrors SQL-style introspection:
452    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
453    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
454    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
455    #[must_use]
456    pub fn show_indexes<E>(&self) -> Vec<String>
457    where
458        E: EntityKind<Canister = C>,
459    {
460        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
461    }
462
463    /// Return one stable, human-readable index listing for one schema model.
464    ///
465    /// This model-only helper is schema-owned and intentionally does not
466    /// attach runtime lifecycle state because it does not carry store
467    /// placement context.
468    #[must_use]
469    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
470        show_indexes_for_model(model)
471    }
472
473    /// Return one stable, human-readable index listing for the accepted schema.
474    ///
475    /// Unlike `show_indexes`, this fallible live-schema helper reflects
476    /// accepted DDL-created indexes as well as compiled schema indexes.
477    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
478    where
479        E: EntityKind<Canister = C>,
480    {
481        let schema = self.accepted_schema_info_for_entity::<E>()?;
482
483        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
484    }
485
486    // Return one stable, human-readable index listing for one resolved
487    // store/model pair, attaching the current runtime lifecycle state when the
488    // registry can resolve the backing store handle.
489    pub(in crate::db) fn show_indexes_for_store_model(
490        &self,
491        store_path: &str,
492        model: &'static EntityModel,
493    ) -> Vec<String> {
494        let runtime_state = self
495            .db
496            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
497            .map(|store| store.index_state());
498
499        show_indexes_for_model_with_runtime_state(model, runtime_state)
500    }
501
502    // Return one stable, human-readable index listing for one resolved
503    // store/accepted-schema pair, attaching the current runtime lifecycle state
504    // when the registry can resolve the backing store handle.
505    pub(in crate::db) fn show_indexes_for_store_schema_info(
506        &self,
507        store_path: &str,
508        schema: &SchemaInfo,
509    ) -> Vec<String> {
510        let runtime_state = self
511            .db
512            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
513            .map(|store| store.index_state());
514
515        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
516    }
517
518    /// Return one stable generated-model list of field descriptors.
519    ///
520    /// This infallible Rust metadata helper intentionally reports the compiled
521    /// schema model. Use `try_show_columns` for the accepted persisted-schema
522    /// view used by SQL and diagnostics surfaces.
523    #[must_use]
524    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
525    where
526        E: EntityKind<Canister = C>,
527    {
528        self.show_columns_for_model(E::MODEL)
529    }
530
531    /// Return one stable generated-model list of field descriptors.
532    #[must_use]
533    pub fn show_columns_for_model(
534        &self,
535        model: &'static EntityModel,
536    ) -> Vec<EntityFieldDescription> {
537        describe_entity_fields(model)
538    }
539
540    /// Return field descriptors using the accepted persisted schema snapshot.
541    ///
542    /// This fallible variant is intended for SQL and diagnostics surfaces that
543    /// can report schema reconciliation failures. The infallible
544    /// `show_columns` helper remains generated-model based.
545    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
546    where
547        E: EntityKind<Canister = C>,
548    {
549        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
550
551        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
552    }
553
554    /// Return one stable list of runtime-registered entity catalog entries.
555    #[must_use]
556    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
557        self.try_show_entities().expect("session invariant")
558    }
559
560    /// Return one stable list of runtime-registered entity catalog entries.
561    pub fn try_show_entities(
562        &self,
563    ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
564        self.db.runtime_entity_catalog()
565    }
566
567    /// Return one stable list of runtime-registered stores.
568    #[must_use]
569    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
570        self.db.runtime_store_catalog()
571    }
572
573    /// Return one stable list of runtime-registered stable-memory allocations.
574    #[must_use]
575    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
576        self.db.runtime_memory_catalog()
577    }
578
579    // Resolve the exact secondary-index set that is visible to planner-owned
580    // query planning for one recovered store and accepted schema pair.
581    #[cfg(feature = "sql")]
582    fn visible_indexes_for_store_accepted_schema(
583        &self,
584        store_path: &str,
585        schema_info: &SchemaInfo,
586    ) -> Result<VisibleIndexes<'static>, QueryError> {
587        // Phase 1: resolve the recovered store state once at the session
588        // boundary so query/executor planning does not reopen lifecycle checks.
589        let store = self
590            .db
591            .recovered_store(store_path)
592            .map_err(QueryError::execute)?;
593        let state = store.index_state();
594        if state != IndexState::Ready {
595            return Ok(VisibleIndexes::none());
596        }
597        debug_assert_eq!(state, IndexState::Ready);
598
599        // Phase 2: planner-visible indexes are accepted schema contracts once
600        // the recovered store is query-visible.
601        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
602        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
603        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
604        debug_assert_eq!(
605            visible_indexes.accepted_expression_index_count(),
606            Some(visible_indexes.accepted_expression_indexes().len()),
607        );
608
609        Ok(visible_indexes)
610    }
611
612    /// Return one generated-model schema description for the entity.
613    ///
614    /// This is a typed `DESCRIBE`-style introspection surface consumed by
615    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
616    /// schema view is required.
617    #[must_use]
618    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
619    where
620        E: EntityKind<Canister = C>,
621    {
622        self.describe_entity_model(E::MODEL)
623    }
624
625    /// Return one generated-model schema description for one schema model.
626    #[must_use]
627    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
628        describe_entity_model(model)
629    }
630
631    /// Return a schema description using the accepted persisted schema snapshot.
632    ///
633    /// This is the live-schema counterpart to `describe_entity`. It is fallible
634    /// because loading accepted schema authority can fail if startup
635    /// reconciliation rejects the stored metadata.
636    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
637    where
638        E: EntityKind<Canister = C>,
639    {
640        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
641
642        Ok(describe_entity_model_with_persisted_schema(
643            E::MODEL,
644            &snapshot,
645        ))
646    }
647
648    // Ensure and return the accepted schema snapshot for one generated entity.
649    // This may write the first snapshot for an empty store; otherwise it loads
650    // the latest stored snapshot and applies the current exact-match policy.
651    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
652    where
653        E: EntityKind<Canister = C>,
654    {
655        let store = self.db.recovered_store(E::Store::PATH)?;
656
657        store.with_schema_mut(|schema_store| {
658            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
659        })
660    }
661
662    // Load the current accepted schema snapshot for read/query paths without
663    // rerunning generated proposal reconciliation on every cold query call.
664    // Startup and write paths still own reconciliation; the fallback only keeps
665    // first-use test stores and freshly initialized local stores functional.
666    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
667        &self,
668    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
669    where
670        E: EntityKind<Canister = C>,
671    {
672        let cache_key = (self.db.cache_scope_id(), E::PATH);
673        if let Some(entry) =
674            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
675        {
676            return Ok(AcceptedSchemaCatalogContext::new(
677                entry.snapshot,
678                entry.identity,
679            ));
680        }
681
682        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
683        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
684        let identity = AcceptedCatalogIdentity::new(
685            E::ENTITY_TAG,
686            E::PATH,
687            E::Store::PATH,
688            snapshot.persisted_snapshot().version(),
689            fingerprint,
690        );
691        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
692            cache.borrow_mut().insert(
693                cache_key,
694                AcceptedSchemaQueryCacheEntry {
695                    snapshot: snapshot.clone(),
696                    identity,
697                },
698            );
699        });
700
701        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
702    }
703
704    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
705        &self,
706    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
707    where
708        E: EntityKind<Canister = C>,
709    {
710        let store = self.db.recovered_store(E::Store::PATH)?;
711
712        store.with_schema_mut(|schema_store| {
713            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
714        })
715    }
716
717    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
718        &self,
719        selection: &AcceptedCatalogSnapshotSelection,
720    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
721    where
722        E: EntityKind<Canister = C>,
723    {
724        let snapshot = selection.decode_verified()?;
725        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
726            return Ok(None);
727        }
728        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
729        let cache_key = (self.db.cache_scope_id(), E::PATH);
730
731        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
732            cache.borrow_mut().insert(
733                cache_key,
734                AcceptedSchemaQueryCacheEntry {
735                    snapshot,
736                    identity: selection.identity(),
737                },
738            );
739        });
740
741        Ok(Some(context))
742    }
743
744    #[cfg(feature = "sql")]
745    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
746    where
747        E: EntityKind<Canister = C>,
748    {
749        let cache_key = (self.db.cache_scope_id(), E::PATH);
750        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
751            cache.borrow_mut().remove(&cache_key);
752        });
753    }
754
755    fn load_accepted_schema_snapshot_for_query<E>(
756        &self,
757    ) -> Result<AcceptedSchemaSnapshot, InternalError>
758    where
759        E: EntityKind<Canister = C>,
760    {
761        let store = self.db.recovered_store(E::Store::PATH)?;
762
763        store.with_schema_mut(|schema_store| {
764            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
765                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
766                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
767                    &accepted,
768                    E::MODEL,
769                )
770                .is_ok()
771                {
772                    return Ok(accepted);
773                }
774            }
775
776            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
777        })
778    }
779
780    // Build the accepted schema-info projection for one typed entity. Fluent
781    // terminal adapters use this before constructing slot-bound descriptors so
782    // field slot authority comes from the accepted schema snapshot.
783    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
784        &self,
785    ) -> Result<SchemaInfo, InternalError>
786    where
787        E: EntityKind<Canister = C>,
788    {
789        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
790
791        Ok(catalog.accepted_schema_info_for::<E>())
792    }
793
794    // Derive typed executor authority from an accepted snapshot the caller
795    // already loaded, avoiding a second schema-store pass in SQL write/select
796    // adapters that need both write descriptors and read selector authority.
797    #[cfg(feature = "sql")]
798    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
799        accepted_schema: &AcceptedSchemaSnapshot,
800    ) -> Result<EntityAuthority, InternalError>
801    where
802        E: EntityKind<Canister = C>,
803    {
804        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
805    }
806
807    // Ensure accepted schema metadata is safe for write paths that still encode
808    // rows through generated field contracts. Returning only the snapshot keeps
809    // SQL write type checks unchanged while the schema-runtime contract guard
810    // rejects unsupported layout or payload drift before mutation staging.
811    fn ensure_generated_compatible_accepted_save_schema<E>(
812        &self,
813    ) -> Result<
814        (
815            AcceptedRowDecodeContract,
816            SchemaInfo,
817            CommitSchemaFingerprint,
818        ),
819        InternalError,
820    >
821    where
822        E: EntityKind<Canister = C>,
823    {
824        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
825        let (accepted_row_layout, _) =
826            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
827                &accepted_schema,
828                E::MODEL,
829            )?;
830        let (row_decode_contract, _, schema_info, schema_fingerprint) =
831            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
832
833        Ok((row_decode_contract, schema_info, schema_fingerprint))
834    }
835
836    /// Build one point-in-time storage report for observability endpoints.
837    pub fn storage_report(
838        &self,
839        name_to_path: &[(&'static str, &'static str)],
840    ) -> Result<StorageReport, InternalError> {
841        self.db.storage_report(name_to_path)
842    }
843
844    /// Build one point-in-time storage report using default entity-path labels.
845    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
846        self.db.storage_report_default()
847    }
848
849    /// Build one point-in-time integrity scan report for observability endpoints.
850    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
851        self.db.integrity_report()
852    }
853
854    // ---------------------------------------------------------------------
855    // Low-level executors (crate-internal; execution primitives)
856    // ---------------------------------------------------------------------
857
858    #[must_use]
859    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
860    where
861        E: EntityKind<Canister = C> + EntityValue,
862    {
863        LoadExecutor::new(self.db, self.debug)
864    }
865
866    #[must_use]
867    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
868    where
869        E: PersistedRow<Canister = C> + EntityValue,
870    {
871        DeleteExecutor::new(self.db)
872    }
873
874    #[must_use]
875    pub(in crate::db) const fn save_executor<E>(
876        &self,
877        accepted_row_decode_contract: AcceptedRowDecodeContract,
878        accepted_schema_info: SchemaInfo,
879        accepted_schema_fingerprint: CommitSchemaFingerprint,
880    ) -> SaveExecutor<E>
881    where
882        E: PersistedRow<Canister = C> + EntityValue,
883    {
884        SaveExecutor::new_with_accepted_contract(
885            self.db,
886            self.debug,
887            accepted_row_decode_contract,
888            accepted_schema_info,
889            accepted_schema_fingerprint,
890        )
891    }
892}