Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17#[cfg(feature = "sql")]
18use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
19use crate::{
20    db::{
21        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
22        FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
23        StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
24        commit::CommitSchemaFingerprint,
25        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
26        schema::{
27            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
28            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
29            accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
30            describe_entity_fields, describe_entity_fields_with_persisted_schema,
31            describe_entity_model, describe_entity_model_with_persisted_schema,
32            ensure_accepted_schema_snapshot, show_indexes_for_model,
33            show_indexes_for_model_with_runtime_state,
34            show_indexes_for_schema_info_with_runtime_state,
35        },
36    },
37    error::InternalError,
38    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
39    model::entity::EntityModel,
40    traits::{CanisterKind, EntityKind, EntityValue, Path},
41    value::Value,
42};
43use std::{
44    cell::{OnceCell, RefCell},
45    collections::HashMap,
46    thread::LocalKey,
47};
48
49#[cfg(feature = "diagnostics")]
50pub use query::{
51    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
52    QueryExecutionAttribution,
53};
54pub(in crate::db) use response::finalize_scalar_paged_execution;
55pub(in crate::db) use response::finalize_structural_grouped_projection_result;
56#[cfg(feature = "sql")]
57pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
58#[cfg(feature = "sql")]
59pub use sql::{
60    SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport,
61    SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentUpdatePlan,
62    SqlStatementDispatch, SqlStatementResult, SqlStatementShellSurface, SqlStatementSurface,
63    SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy, SqlUpdateOrderPolicy,
64    SqlUpdatePolicyContext, SqlUpdatePolicyRejection, SqlUpdatePolicyReport,
65    SqlUpdateReturningBounds, SqlUpdateReturningPolicy, SqlUpdateStatementClassification,
66    SqlUpdateWherePolicy, SqlValidatedUpdatePlan, classify_sql_update_policy,
67    sql_statement_dispatch, sql_statement_entity_name, sql_statement_shell_surface,
68    sql_statement_surface,
69};
70#[cfg(all(feature = "sql", feature = "diagnostics"))]
71pub use sql::{
72    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
73    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
74};
75#[cfg(all(feature = "sql", feature = "diagnostics"))]
76pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
77
78///
79/// DbSession
80///
81/// Session-scoped database handle with policy (debug, metrics) and execution routing.
82///
83
84pub struct DbSession<C: CanisterKind> {
85    db: Db<C>,
86    debug: bool,
87    metrics: Option<&'static dyn MetricsSink>,
88}
89
90#[cfg(test)]
91#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
92pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
93    pub schema_info_projections: u64,
94    pub persisted_schema_decodes: u64,
95    pub generated_compatible_row_layout_proofs: u64,
96    pub latest_by_entity_calls: u64,
97    pub visible_index_projections: u64,
98}
99
100#[derive(Clone, Debug)]
101struct AcceptedSchemaQueryCacheEntry {
102    snapshot: AcceptedSchemaSnapshot,
103    identity: AcceptedCatalogIdentity,
104}
105
106pub(in crate::db) type AcceptedSaveContract = (
107    AcceptedRowDecodeContract,
108    AcceptedRowDecodeContract,
109    SchemaInfo,
110    CommitSchemaFingerprint,
111);
112
113#[derive(Clone, Debug)]
114pub(in crate::db) struct AcceptedSchemaCatalogContext {
115    snapshot: AcceptedSchemaSnapshot,
116    identity: AcceptedCatalogIdentity,
117    schema_info: OnceCell<SchemaInfo>,
118}
119
120impl AcceptedSchemaCatalogContext {
121    #[must_use]
122    pub(in crate::db) const fn new(
123        snapshot: AcceptedSchemaSnapshot,
124        identity: AcceptedCatalogIdentity,
125    ) -> Self {
126        Self {
127            snapshot,
128            identity,
129            schema_info: OnceCell::new(),
130        }
131    }
132
133    #[must_use]
134    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
135        &self.snapshot
136    }
137
138    #[must_use]
139    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
140        self.identity.accepted_schema_version()
141    }
142
143    #[must_use]
144    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
145        self.identity.accepted_schema_fingerprint()
146    }
147
148    #[must_use]
149    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
150        self.identity.fingerprint_method_version()
151    }
152
153    #[must_use]
154    #[cfg(feature = "sql")]
155    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
156        self.identity
157    }
158
159    fn debug_assert_matches_entity<E>(&self)
160    where
161        E: EntityKind,
162    {
163        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
164        debug_assert_eq!(self.identity.entity_path(), E::PATH);
165        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
166    }
167
168    pub(in crate::db) fn accepted_entity_authority_for<E>(
169        &self,
170    ) -> Result<EntityAuthority, InternalError>
171    where
172        E: EntityKind,
173    {
174        self.debug_assert_matches_entity::<E>();
175        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
176        let (accepted_row_layout, row_proof) =
177            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
178                &self.snapshot,
179                authority.model(),
180            )?;
181        let row_decode_contract = accepted_row_layout.row_decode_contract();
182
183        Ok(authority.with_accepted_row_decode_contract(
184            row_proof,
185            row_decode_contract,
186            self.accepted_schema_info_for::<E>(),
187        ))
188    }
189
190    #[must_use]
191    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
192    where
193        E: EntityKind,
194    {
195        self.debug_assert_matches_entity::<E>();
196        self.schema_info
197            .get_or_init(|| {
198                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
199                    E::MODEL,
200                    &self.snapshot,
201                    true,
202                )
203            })
204            .clone()
205    }
206}
207
208pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
209    accepted_schema: &AcceptedSchemaSnapshot,
210    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
211) -> Result<AcceptedSaveContract, InternalError>
212where
213    E: EntityKind,
214{
215    let row_decode_contract = descriptor.row_decode_contract();
216    let mutation_row_decode_contract = row_decode_contract.clone();
217    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
218    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
219
220    Ok((
221        row_decode_contract,
222        mutation_row_decode_contract,
223        schema_info,
224        schema_fingerprint,
225    ))
226}
227
228thread_local! {
229    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
230    // but repeated read calls should not reload the stable schema snapshot just
231    // to prove an already-warmed cache key. SQL DDL publication invalidates this
232    // heap cache before the next query observes the new accepted schema.
233    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
234        RefCell::new(HashMap::default());
235}
236
237impl<C: CanisterKind> DbSession<C> {
238    /// Construct one session facade for a database handle.
239    #[must_use]
240    pub(crate) const fn new(db: Db<C>) -> Self {
241        Self {
242            db,
243            debug: false,
244            metrics: None,
245        }
246    }
247
248    /// Construct one session facade from store registry and runtime hooks.
249    #[must_use]
250    pub const fn new_with_hooks(
251        store: &'static LocalKey<StoreRegistry>,
252        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
253    ) -> Self {
254        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
255    }
256
257    #[cfg(test)]
258    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
259        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
260        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
261        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
262        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
263        query::reset_visible_index_projection_count_for_tests();
264    }
265
266    #[cfg(test)]
267    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
268    -> AcceptedCatalogRuntimeCounterSnapshot {
269        AcceptedCatalogRuntimeCounterSnapshot {
270            schema_info_projections:
271                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
272            persisted_schema_decodes:
273                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
274            generated_compatible_row_layout_proofs:
275                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
276            latest_by_entity_calls:
277                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
278            visible_index_projections: query::visible_index_projection_count_for_tests(),
279        }
280    }
281
282    /// Enable debug execution behavior where supported by executors.
283    #[must_use]
284    pub const fn debug(mut self) -> Self {
285        self.debug = true;
286        self
287    }
288
289    /// Attach one metrics sink for all session-executed operations.
290    #[must_use]
291    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
292        self.metrics = Some(sink);
293        self
294    }
295
296    // Shared fluent load wrapper construction keeps the session boundary in
297    // one place when load entry points differ only by missing-row policy.
298    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
299    where
300        E: EntityKind<Canister = C>,
301    {
302        FluentLoadQuery::new(self, Query::new(consistency))
303    }
304
305    // Shared fluent delete wrapper construction keeps the delete-mode handoff
306    // explicit at the session boundary instead of reassembling the same query
307    // shell in each public entry point.
308    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
309    where
310        E: PersistedRow<Canister = C>,
311    {
312        FluentDeleteQuery::new(self, Query::new(consistency).delete())
313    }
314
315    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
316        if let Some(sink) = self.metrics {
317            with_metrics_sink(sink, f)
318        } else {
319            f()
320        }
321    }
322
323    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
324    fn execute_save_with<E, T, R>(
325        &self,
326        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
327        map: impl FnOnce(T) -> R,
328    ) -> Result<R, InternalError>
329    where
330        E: PersistedRow<Canister = C> + EntityValue,
331    {
332        let (contract, schema_info, schema_fingerprint) = match self
333            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
334        {
335            Ok(authority) => authority,
336            Err(error) => {
337                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
338
339                return Err(error);
340            }
341        };
342        let value = self.with_metrics(|| {
343            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
344        })?;
345
346        Ok(map(value))
347    }
348
349    // Execute save work after the caller has already proven that the accepted
350    // row contract is generated-compatible. SQL and structural writes use this
351    // after their pre-staging schema guard so mutation staging and save
352    // execution do not rerun schema-store reconciliation in the same statement.
353    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
354        &self,
355        accepted_row_decode_contract: AcceptedRowDecodeContract,
356        accepted_schema_info: SchemaInfo,
357        accepted_schema_fingerprint: CommitSchemaFingerprint,
358        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
359        map: impl FnOnce(T) -> R,
360    ) -> Result<R, InternalError>
361    where
362        E: PersistedRow<Canister = C> + EntityValue,
363    {
364        let value = self.with_metrics(|| {
365            op(self.save_executor::<E>(
366                accepted_row_decode_contract,
367                accepted_schema_info,
368                accepted_schema_fingerprint,
369            ))
370        })?;
371
372        Ok(map(value))
373    }
374
375    // Shared save-facade wrappers keep response shape explicit at call sites.
376    fn execute_save_entity<E>(
377        &self,
378        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
379    ) -> Result<E, InternalError>
380    where
381        E: PersistedRow<Canister = C> + EntityValue,
382    {
383        self.execute_save_with(op, std::convert::identity)
384    }
385
386    fn execute_save_batch<E>(
387        &self,
388        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
389    ) -> Result<WriteBatchResponse<E>, InternalError>
390    where
391        E: PersistedRow<Canister = C> + EntityValue,
392    {
393        self.execute_save_with(op, WriteBatchResponse::new)
394    }
395
396    // ---------------------------------------------------------------------
397    // Query entry points (public, fluent)
398    // ---------------------------------------------------------------------
399
400    /// Start a fluent load query with default missing-row policy (`Ignore`).
401    #[must_use]
402    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
403    where
404        E: EntityKind<Canister = C>,
405    {
406        self.fluent_load_query(MissingRowPolicy::Ignore)
407    }
408
409    /// Start a fluent load query with explicit missing-row policy.
410    #[must_use]
411    pub const fn load_with_consistency<E>(
412        &self,
413        consistency: MissingRowPolicy,
414    ) -> FluentLoadQuery<'_, E>
415    where
416        E: EntityKind<Canister = C>,
417    {
418        self.fluent_load_query(consistency)
419    }
420
421    /// Start a fluent delete query with default missing-row policy (`Ignore`).
422    #[must_use]
423    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
424    where
425        E: PersistedRow<Canister = C>,
426    {
427        self.fluent_delete_query(MissingRowPolicy::Ignore)
428    }
429
430    /// Start a fluent delete query with explicit missing-row policy.
431    #[must_use]
432    pub fn delete_with_consistency<E>(
433        &self,
434        consistency: MissingRowPolicy,
435    ) -> FluentDeleteQuery<'_, E>
436    where
437        E: PersistedRow<Canister = C>,
438    {
439        self.fluent_delete_query(consistency)
440    }
441
442    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
443    ///
444    /// This terminal bypasses query planning and access routing entirely.
445    #[must_use]
446    pub const fn select_one(&self) -> Value {
447        Value::Int64(1)
448    }
449
450    /// Return one stable, human-readable index listing for the entity schema.
451    ///
452    /// Output format mirrors SQL-style introspection:
453    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
454    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
455    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
456    #[must_use]
457    pub fn show_indexes<E>(&self) -> Vec<String>
458    where
459        E: EntityKind<Canister = C>,
460    {
461        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
462    }
463
464    /// Return one stable, human-readable index listing for one schema model.
465    ///
466    /// This model-only helper is schema-owned and intentionally does not
467    /// attach runtime lifecycle state because it does not carry store
468    /// placement context.
469    #[must_use]
470    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
471        show_indexes_for_model(model)
472    }
473
474    /// Return one stable, human-readable index listing for the accepted schema.
475    ///
476    /// Unlike `show_indexes`, this fallible live-schema helper reflects
477    /// accepted DDL-created indexes as well as compiled schema indexes.
478    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
479    where
480        E: EntityKind<Canister = C>,
481    {
482        let schema = self.accepted_schema_info_for_entity::<E>()?;
483
484        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
485    }
486
487    // Return one stable, human-readable index listing for one resolved
488    // store/model pair, attaching the current runtime lifecycle state when the
489    // registry can resolve the backing store handle.
490    pub(in crate::db) fn show_indexes_for_store_model(
491        &self,
492        store_path: &str,
493        model: &'static EntityModel,
494    ) -> Vec<String> {
495        let runtime_state = self
496            .db
497            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
498            .map(|store| store.index_state());
499
500        show_indexes_for_model_with_runtime_state(model, runtime_state)
501    }
502
503    // Return one stable, human-readable index listing for one resolved
504    // store/accepted-schema pair, attaching the current runtime lifecycle state
505    // when the registry can resolve the backing store handle.
506    pub(in crate::db) fn show_indexes_for_store_schema_info(
507        &self,
508        store_path: &str,
509        schema: &SchemaInfo,
510    ) -> Vec<String> {
511        let runtime_state = self
512            .db
513            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
514            .map(|store| store.index_state());
515
516        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
517    }
518
519    /// Return one stable generated-model list of field descriptors.
520    ///
521    /// This infallible Rust metadata helper intentionally reports the compiled
522    /// schema model. Use `try_show_columns` for the accepted persisted-schema
523    /// view used by SQL and diagnostics surfaces.
524    #[must_use]
525    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
526    where
527        E: EntityKind<Canister = C>,
528    {
529        self.show_columns_for_model(E::MODEL)
530    }
531
532    /// Return one stable generated-model list of field descriptors.
533    #[must_use]
534    pub fn show_columns_for_model(
535        &self,
536        model: &'static EntityModel,
537    ) -> Vec<EntityFieldDescription> {
538        describe_entity_fields(model)
539    }
540
541    /// Return field descriptors using the accepted persisted schema snapshot.
542    ///
543    /// This fallible variant is intended for SQL and diagnostics surfaces that
544    /// can report schema reconciliation failures. The infallible
545    /// `show_columns` helper remains generated-model based.
546    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
547    where
548        E: EntityKind<Canister = C>,
549    {
550        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
551
552        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
553    }
554
555    /// Return one stable list of runtime-registered entity catalog entries.
556    #[must_use]
557    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
558        self.try_show_entities().expect("session invariant")
559    }
560
561    /// Return one stable list of runtime-registered entity catalog entries.
562    pub fn try_show_entities(
563        &self,
564    ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
565        self.db.runtime_entity_catalog()
566    }
567
568    /// Return one stable list of runtime-registered stores.
569    #[must_use]
570    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
571        self.db.runtime_store_catalog()
572    }
573
574    /// Return one stable list of runtime-registered stable-memory allocations.
575    #[must_use]
576    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
577        self.db.runtime_memory_catalog()
578    }
579
580    // Resolve the exact secondary-index set that is visible to planner-owned
581    // query planning for one recovered store and accepted schema pair.
582    #[cfg(feature = "sql")]
583    fn visible_indexes_for_store_accepted_schema(
584        &self,
585        store_path: &str,
586        schema_info: &SchemaInfo,
587    ) -> Result<VisibleIndexes<'static>, QueryError> {
588        // Phase 1: resolve the recovered store state once at the session
589        // boundary so query/executor planning does not reopen lifecycle checks.
590        let store = self
591            .db
592            .recovered_store(store_path)
593            .map_err(QueryError::execute)?;
594        let state = store.index_state();
595        if state != IndexState::Ready {
596            return Ok(VisibleIndexes::none());
597        }
598        debug_assert_eq!(state, IndexState::Ready);
599
600        // Phase 2: planner-visible indexes are accepted schema contracts once
601        // the recovered store is query-visible.
602        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
603        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
604        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
605        debug_assert_eq!(
606            visible_indexes.accepted_expression_index_count(),
607            Some(visible_indexes.accepted_expression_indexes().len()),
608        );
609
610        Ok(visible_indexes)
611    }
612
613    /// Return one generated-model schema description for the entity.
614    ///
615    /// This is a typed `DESCRIBE`-style introspection surface consumed by
616    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
617    /// schema view is required.
618    #[must_use]
619    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
620    where
621        E: EntityKind<Canister = C>,
622    {
623        self.describe_entity_model(E::MODEL)
624    }
625
626    /// Return one generated-model schema description for one schema model.
627    #[must_use]
628    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
629        describe_entity_model(model)
630    }
631
632    /// Return a schema description using the accepted persisted schema snapshot.
633    ///
634    /// This is the live-schema counterpart to `describe_entity`. It is fallible
635    /// because loading accepted schema authority can fail if startup
636    /// reconciliation rejects the stored metadata.
637    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
638    where
639        E: EntityKind<Canister = C>,
640    {
641        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
642
643        Ok(describe_entity_model_with_persisted_schema(
644            E::MODEL,
645            &snapshot,
646        ))
647    }
648
649    // Ensure and return the accepted schema snapshot for one generated entity.
650    // This may write the first snapshot for an empty store; otherwise it loads
651    // the latest stored snapshot and applies the current exact-match policy.
652    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
653    where
654        E: EntityKind<Canister = C>,
655    {
656        let store = self.db.recovered_store(E::Store::PATH)?;
657
658        store.with_schema_mut(|schema_store| {
659            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
660        })
661    }
662
663    // Load the current accepted schema snapshot for read/query paths without
664    // rerunning generated proposal reconciliation on every cold query call.
665    // Startup and write paths still own reconciliation; the fallback only keeps
666    // first-use test stores and freshly initialized local stores functional.
667    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
668        &self,
669    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
670    where
671        E: EntityKind<Canister = C>,
672    {
673        let cache_key = (self.db.cache_scope_id(), E::PATH);
674        if let Some(entry) =
675            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
676        {
677            return Ok(AcceptedSchemaCatalogContext::new(
678                entry.snapshot,
679                entry.identity,
680            ));
681        }
682
683        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
684        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
685        let identity = AcceptedCatalogIdentity::new(
686            E::ENTITY_TAG,
687            E::PATH,
688            E::Store::PATH,
689            snapshot.persisted_snapshot().version(),
690            fingerprint,
691        );
692        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
693            cache.borrow_mut().insert(
694                cache_key,
695                AcceptedSchemaQueryCacheEntry {
696                    snapshot: snapshot.clone(),
697                    identity,
698                },
699            );
700        });
701
702        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
703    }
704
705    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
706        &self,
707    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
708    where
709        E: EntityKind<Canister = C>,
710    {
711        let store = self.db.recovered_store(E::Store::PATH)?;
712
713        store.with_schema_mut(|schema_store| {
714            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
715        })
716    }
717
718    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
719        &self,
720        selection: &AcceptedCatalogSnapshotSelection,
721    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
722    where
723        E: EntityKind<Canister = C>,
724    {
725        let snapshot = selection.decode_verified()?;
726        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
727            return Ok(None);
728        }
729        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
730        let cache_key = (self.db.cache_scope_id(), E::PATH);
731
732        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
733            cache.borrow_mut().insert(
734                cache_key,
735                AcceptedSchemaQueryCacheEntry {
736                    snapshot,
737                    identity: selection.identity(),
738                },
739            );
740        });
741
742        Ok(Some(context))
743    }
744
745    #[cfg(feature = "sql")]
746    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
747    where
748        E: EntityKind<Canister = C>,
749    {
750        let cache_key = (self.db.cache_scope_id(), E::PATH);
751        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
752            cache.borrow_mut().remove(&cache_key);
753        });
754    }
755
756    fn load_accepted_schema_snapshot_for_query<E>(
757        &self,
758    ) -> Result<AcceptedSchemaSnapshot, InternalError>
759    where
760        E: EntityKind<Canister = C>,
761    {
762        let store = self.db.recovered_store(E::Store::PATH)?;
763
764        store.with_schema_mut(|schema_store| {
765            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
766                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
767                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
768                    &accepted,
769                    E::MODEL,
770                )
771                .is_ok()
772                {
773                    return Ok(accepted);
774                }
775            }
776
777            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
778        })
779    }
780
781    // Build the accepted schema-info projection for one typed entity. Fluent
782    // terminal adapters use this before constructing slot-bound descriptors so
783    // field slot authority comes from the accepted schema snapshot.
784    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
785        &self,
786    ) -> Result<SchemaInfo, InternalError>
787    where
788        E: EntityKind<Canister = C>,
789    {
790        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
791
792        Ok(catalog.accepted_schema_info_for::<E>())
793    }
794
795    // Derive typed executor authority from an accepted snapshot the caller
796    // already loaded, avoiding a second schema-store pass in SQL write/select
797    // adapters that need both write descriptors and read selector authority.
798    #[cfg(feature = "sql")]
799    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
800        accepted_schema: &AcceptedSchemaSnapshot,
801    ) -> Result<EntityAuthority, InternalError>
802    where
803        E: EntityKind<Canister = C>,
804    {
805        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
806    }
807
808    // Ensure accepted schema metadata is safe for write paths that still encode
809    // rows through generated field contracts. Returning only the snapshot keeps
810    // SQL write type checks unchanged while the schema-runtime contract guard
811    // rejects unsupported layout or payload drift before mutation staging.
812    fn ensure_generated_compatible_accepted_save_schema<E>(
813        &self,
814    ) -> Result<
815        (
816            AcceptedRowDecodeContract,
817            SchemaInfo,
818            CommitSchemaFingerprint,
819        ),
820        InternalError,
821    >
822    where
823        E: EntityKind<Canister = C>,
824    {
825        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
826        let (accepted_row_layout, _) =
827            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
828                &accepted_schema,
829                E::MODEL,
830            )?;
831        let (row_decode_contract, _, schema_info, schema_fingerprint) =
832            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
833
834        Ok((row_decode_contract, schema_info, schema_fingerprint))
835    }
836
837    /// Build one point-in-time storage report for observability endpoints.
838    pub fn storage_report(
839        &self,
840        name_to_path: &[(&'static str, &'static str)],
841    ) -> Result<StorageReport, InternalError> {
842        self.db.storage_report(name_to_path)
843    }
844
845    /// Build one point-in-time storage report using default entity-path labels.
846    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
847        self.db.storage_report_default()
848    }
849
850    /// Build one point-in-time integrity scan report for observability endpoints.
851    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
852        self.db.integrity_report()
853    }
854
855    // ---------------------------------------------------------------------
856    // Low-level executors (crate-internal; execution primitives)
857    // ---------------------------------------------------------------------
858
859    #[must_use]
860    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
861    where
862        E: EntityKind<Canister = C> + EntityValue,
863    {
864        LoadExecutor::new(self.db, self.debug)
865    }
866
867    #[must_use]
868    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
869    where
870        E: PersistedRow<Canister = C> + EntityValue,
871    {
872        DeleteExecutor::new(self.db)
873    }
874
875    #[must_use]
876    pub(in crate::db) const fn save_executor<E>(
877        &self,
878        accepted_row_decode_contract: AcceptedRowDecodeContract,
879        accepted_schema_info: SchemaInfo,
880        accepted_schema_fingerprint: CommitSchemaFingerprint,
881    ) -> SaveExecutor<E>
882    where
883        E: PersistedRow<Canister = C> + EntityValue,
884    {
885        SaveExecutor::new_with_accepted_contract(
886            self.db,
887            self.debug,
888            accepted_row_decode_contract,
889            accepted_schema_info,
890            accepted_schema_fingerprint,
891        )
892    }
893}