Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17#[cfg(feature = "sql")]
18use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
19use crate::{
20    db::{
21        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
22        FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
23        StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
24        commit::CommitSchemaFingerprint,
25        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
26        schema::{
27            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
28            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
29            accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
30            describe_entity_fields, describe_entity_fields_with_persisted_schema,
31            describe_entity_model, describe_entity_model_with_persisted_schema,
32            ensure_accepted_schema_snapshot, show_indexes_for_model,
33            show_indexes_for_model_with_runtime_state,
34            show_indexes_for_schema_info_with_runtime_state,
35        },
36    },
37    error::InternalError,
38    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
39    model::entity::EntityModel,
40    traits::{CanisterKind, EntityKind, EntityValue, Path},
41    value::Value,
42};
43use std::{
44    cell::{OnceCell, RefCell},
45    collections::HashMap,
46    thread::LocalKey,
47};
48
49#[cfg(feature = "diagnostics")]
50pub use query::{
51    DirectDataRowAttribution, FluentTerminalExecutionAttribution, GroupedCountAttribution,
52    GroupedExecutionAttribution, KernelRowAttribution, QueryExecutionAttribution,
53    ScalarAggregateAttribution,
54};
55pub(in crate::db) use response::finalize_scalar_paged_execution;
56pub(in crate::db) use response::finalize_structural_grouped_projection_result;
57#[cfg(feature = "sql")]
58pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
59#[cfg(feature = "sql")]
60pub use sql::{
61    SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport,
62    SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentUpdatePlan,
63    SqlStatementDispatch, SqlStatementResult, SqlStatementShellSurface, SqlStatementSurface,
64    SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy, SqlUpdateOrderPolicy,
65    SqlUpdatePolicyContext, SqlUpdatePolicyRejection, SqlUpdatePolicyReport,
66    SqlUpdateReturningBounds, SqlUpdateReturningPolicy, SqlUpdateStatementClassification,
67    SqlUpdateWherePolicy, SqlValidatedUpdatePlan, classify_sql_update_policy,
68    sql_statement_dispatch, sql_statement_entity_name, sql_statement_shell_surface,
69    sql_statement_surface,
70};
71#[cfg(all(feature = "sql", feature = "diagnostics"))]
72pub use sql::{
73    SqlCompileAttribution, SqlExecutionAttribution, SqlHybridCoveringAttribution,
74    SqlOutputBlobAttribution, SqlPureCoveringAttribution, SqlQueryCacheAttribution,
75    SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
76};
77#[cfg(all(feature = "sql", feature = "diagnostics"))]
78pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
79
80///
81/// DbSession
82///
83/// Session-scoped database handle with policy (debug, metrics) and execution routing.
84///
85
86pub struct DbSession<C: CanisterKind> {
87    db: Db<C>,
88    debug: bool,
89    metrics: Option<&'static dyn MetricsSink>,
90}
91
92#[cfg(test)]
93#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
94pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
95    pub schema_info_projections: u64,
96    pub persisted_schema_decodes: u64,
97    pub generated_compatible_row_layout_proofs: u64,
98    pub latest_by_entity_calls: u64,
99    pub visible_index_projections: u64,
100}
101
102#[derive(Clone, Debug)]
103struct AcceptedSchemaQueryCacheEntry {
104    snapshot: AcceptedSchemaSnapshot,
105    identity: AcceptedCatalogIdentity,
106}
107
108pub(in crate::db) type AcceptedSaveContract = (
109    AcceptedRowDecodeContract,
110    AcceptedRowDecodeContract,
111    SchemaInfo,
112    CommitSchemaFingerprint,
113);
114
115#[derive(Clone, Debug)]
116pub(in crate::db) struct AcceptedSchemaCatalogContext {
117    snapshot: AcceptedSchemaSnapshot,
118    identity: AcceptedCatalogIdentity,
119    schema_info: OnceCell<SchemaInfo>,
120}
121
122impl AcceptedSchemaCatalogContext {
123    #[must_use]
124    pub(in crate::db) const fn new(
125        snapshot: AcceptedSchemaSnapshot,
126        identity: AcceptedCatalogIdentity,
127    ) -> Self {
128        Self {
129            snapshot,
130            identity,
131            schema_info: OnceCell::new(),
132        }
133    }
134
135    #[must_use]
136    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
137        &self.snapshot
138    }
139
140    #[must_use]
141    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
142        self.identity.accepted_schema_version()
143    }
144
145    #[must_use]
146    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
147        self.identity.accepted_schema_fingerprint()
148    }
149
150    #[must_use]
151    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
152        self.identity.fingerprint_method_version()
153    }
154
155    #[must_use]
156    #[cfg(feature = "sql")]
157    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
158        self.identity
159    }
160
161    fn debug_assert_matches_entity<E>(&self)
162    where
163        E: EntityKind,
164    {
165        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
166        debug_assert_eq!(self.identity.entity_path(), E::PATH);
167        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
168    }
169
170    pub(in crate::db) fn accepted_entity_authority_for<E>(
171        &self,
172    ) -> Result<EntityAuthority, InternalError>
173    where
174        E: EntityKind,
175    {
176        self.debug_assert_matches_entity::<E>();
177        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
178        let (accepted_row_layout, row_proof) =
179            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
180                &self.snapshot,
181                authority.model(),
182            )?;
183        let row_decode_contract = accepted_row_layout.row_decode_contract();
184
185        Ok(authority.with_accepted_row_decode_contract(
186            row_proof,
187            row_decode_contract,
188            self.accepted_schema_info_for::<E>(),
189        ))
190    }
191
192    #[must_use]
193    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
194    where
195        E: EntityKind,
196    {
197        self.debug_assert_matches_entity::<E>();
198        self.schema_info
199            .get_or_init(|| {
200                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
201                    E::MODEL,
202                    &self.snapshot,
203                    true,
204                )
205            })
206            .clone()
207    }
208}
209
210pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
211    accepted_schema: &AcceptedSchemaSnapshot,
212    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
213) -> Result<AcceptedSaveContract, InternalError>
214where
215    E: EntityKind,
216{
217    let row_decode_contract = descriptor.row_decode_contract();
218    let mutation_row_decode_contract = row_decode_contract.clone();
219    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
220    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
221
222    Ok((
223        row_decode_contract,
224        mutation_row_decode_contract,
225        schema_info,
226        schema_fingerprint,
227    ))
228}
229
230thread_local! {
231    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
232    // but repeated read calls should not reload the stable schema snapshot just
233    // to prove an already-warmed cache key. SQL DDL publication invalidates this
234    // heap cache before the next query observes the new accepted schema.
235    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
236        RefCell::new(HashMap::default());
237}
238
239impl<C: CanisterKind> DbSession<C> {
240    /// Construct one session facade for a database handle.
241    #[must_use]
242    pub(crate) const fn new(db: Db<C>) -> Self {
243        Self {
244            db,
245            debug: false,
246            metrics: None,
247        }
248    }
249
250    /// Construct one session facade from store registry and runtime hooks.
251    #[must_use]
252    pub const fn new_with_hooks(
253        store: &'static LocalKey<StoreRegistry>,
254        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
255    ) -> Self {
256        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
257    }
258
259    #[cfg(test)]
260    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
261        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
262        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
263        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
264        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
265        query::reset_visible_index_projection_count_for_tests();
266    }
267
268    #[cfg(test)]
269    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
270    -> AcceptedCatalogRuntimeCounterSnapshot {
271        AcceptedCatalogRuntimeCounterSnapshot {
272            schema_info_projections:
273                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
274            persisted_schema_decodes:
275                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
276            generated_compatible_row_layout_proofs:
277                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
278            latest_by_entity_calls:
279                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
280            visible_index_projections: query::visible_index_projection_count_for_tests(),
281        }
282    }
283
284    /// Enable debug execution behavior where supported by executors.
285    #[must_use]
286    pub const fn debug(mut self) -> Self {
287        self.debug = true;
288        self
289    }
290
291    /// Attach one metrics sink for all session-executed operations.
292    #[must_use]
293    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
294        self.metrics = Some(sink);
295        self
296    }
297
298    // Shared fluent load wrapper construction keeps the session boundary in
299    // one place when load entry points differ only by missing-row policy.
300    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
301    where
302        E: EntityKind<Canister = C>,
303    {
304        FluentLoadQuery::new(self, Query::new(consistency))
305    }
306
307    // Shared fluent delete wrapper construction keeps the delete-mode handoff
308    // explicit at the session boundary instead of reassembling the same query
309    // shell in each public entry point.
310    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
311    where
312        E: PersistedRow<Canister = C>,
313    {
314        FluentDeleteQuery::new(self, Query::new(consistency).delete())
315    }
316
317    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
318        if let Some(sink) = self.metrics {
319            with_metrics_sink(sink, f)
320        } else {
321            f()
322        }
323    }
324
325    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
326    fn execute_save_with<E, T, R>(
327        &self,
328        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
329        map: impl FnOnce(T) -> R,
330    ) -> Result<R, InternalError>
331    where
332        E: PersistedRow<Canister = C> + EntityValue,
333    {
334        let (contract, schema_info, schema_fingerprint) = match self
335            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
336        {
337            Ok(authority) => authority,
338            Err(error) => {
339                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
340
341                return Err(error);
342            }
343        };
344        let value = self.with_metrics(|| {
345            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
346        })?;
347
348        Ok(map(value))
349    }
350
351    // Execute save work after the caller has already proven that the accepted
352    // row contract is generated-compatible. SQL and structural writes use this
353    // after their pre-staging schema guard so mutation staging and save
354    // execution do not rerun schema-store reconciliation in the same statement.
355    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
356        &self,
357        accepted_row_decode_contract: AcceptedRowDecodeContract,
358        accepted_schema_info: SchemaInfo,
359        accepted_schema_fingerprint: CommitSchemaFingerprint,
360        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
361        map: impl FnOnce(T) -> R,
362    ) -> Result<R, InternalError>
363    where
364        E: PersistedRow<Canister = C> + EntityValue,
365    {
366        let value = self.with_metrics(|| {
367            op(self.save_executor::<E>(
368                accepted_row_decode_contract,
369                accepted_schema_info,
370                accepted_schema_fingerprint,
371            ))
372        })?;
373
374        Ok(map(value))
375    }
376
377    // Shared save-facade wrappers keep response shape explicit at call sites.
378    fn execute_save_entity<E>(
379        &self,
380        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
381    ) -> Result<E, InternalError>
382    where
383        E: PersistedRow<Canister = C> + EntityValue,
384    {
385        self.execute_save_with(op, std::convert::identity)
386    }
387
388    fn execute_save_batch<E>(
389        &self,
390        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
391    ) -> Result<WriteBatchResponse<E>, InternalError>
392    where
393        E: PersistedRow<Canister = C> + EntityValue,
394    {
395        self.execute_save_with(op, WriteBatchResponse::new)
396    }
397
398    // ---------------------------------------------------------------------
399    // Query entry points (public, fluent)
400    // ---------------------------------------------------------------------
401
402    /// Start a fluent load query with default missing-row policy (`Ignore`).
403    #[must_use]
404    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
405    where
406        E: EntityKind<Canister = C>,
407    {
408        self.fluent_load_query(MissingRowPolicy::Ignore)
409    }
410
411    /// Start a fluent load query with explicit missing-row policy.
412    #[must_use]
413    pub const fn load_with_consistency<E>(
414        &self,
415        consistency: MissingRowPolicy,
416    ) -> FluentLoadQuery<'_, E>
417    where
418        E: EntityKind<Canister = C>,
419    {
420        self.fluent_load_query(consistency)
421    }
422
423    /// Start a fluent delete query with default missing-row policy (`Ignore`).
424    #[must_use]
425    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
426    where
427        E: PersistedRow<Canister = C>,
428    {
429        self.fluent_delete_query(MissingRowPolicy::Ignore)
430    }
431
432    /// Start a fluent delete query with explicit missing-row policy.
433    #[must_use]
434    pub fn delete_with_consistency<E>(
435        &self,
436        consistency: MissingRowPolicy,
437    ) -> FluentDeleteQuery<'_, E>
438    where
439        E: PersistedRow<Canister = C>,
440    {
441        self.fluent_delete_query(consistency)
442    }
443
444    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
445    ///
446    /// This terminal bypasses query planning and access routing entirely.
447    #[must_use]
448    pub const fn select_one(&self) -> Value {
449        Value::Int64(1)
450    }
451
452    /// Return one stable, human-readable index listing for the entity schema.
453    ///
454    /// Output format mirrors SQL-style introspection:
455    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
456    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
457    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
458    #[must_use]
459    pub fn show_indexes<E>(&self) -> Vec<String>
460    where
461        E: EntityKind<Canister = C>,
462    {
463        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
464    }
465
466    /// Return one stable, human-readable index listing for one schema model.
467    ///
468    /// This model-only helper is schema-owned and intentionally does not
469    /// attach runtime lifecycle state because it does not carry store
470    /// placement context.
471    #[must_use]
472    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
473        show_indexes_for_model(model)
474    }
475
476    /// Return one stable, human-readable index listing for the accepted schema.
477    ///
478    /// Unlike `show_indexes`, this fallible live-schema helper reflects
479    /// accepted DDL-created indexes as well as compiled schema indexes.
480    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
481    where
482        E: EntityKind<Canister = C>,
483    {
484        let schema = self.accepted_schema_info_for_entity::<E>()?;
485
486        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
487    }
488
489    // Return one stable, human-readable index listing for one resolved
490    // store/model pair, attaching the current runtime lifecycle state when the
491    // registry can resolve the backing store handle.
492    pub(in crate::db) fn show_indexes_for_store_model(
493        &self,
494        store_path: &str,
495        model: &'static EntityModel,
496    ) -> Vec<String> {
497        let runtime_state = self
498            .db
499            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
500            .map(|store| store.index_state());
501
502        show_indexes_for_model_with_runtime_state(model, runtime_state)
503    }
504
505    // Return one stable, human-readable index listing for one resolved
506    // store/accepted-schema pair, attaching the current runtime lifecycle state
507    // when the registry can resolve the backing store handle.
508    pub(in crate::db) fn show_indexes_for_store_schema_info(
509        &self,
510        store_path: &str,
511        schema: &SchemaInfo,
512    ) -> Vec<String> {
513        let runtime_state = self
514            .db
515            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
516            .map(|store| store.index_state());
517
518        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
519    }
520
521    /// Return one stable generated-model list of field descriptors.
522    ///
523    /// This infallible Rust metadata helper intentionally reports the compiled
524    /// schema model. Use `try_show_columns` for the accepted persisted-schema
525    /// view used by SQL and diagnostics surfaces.
526    #[must_use]
527    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
528    where
529        E: EntityKind<Canister = C>,
530    {
531        self.show_columns_for_model(E::MODEL)
532    }
533
534    /// Return one stable generated-model list of field descriptors.
535    #[must_use]
536    pub fn show_columns_for_model(
537        &self,
538        model: &'static EntityModel,
539    ) -> Vec<EntityFieldDescription> {
540        describe_entity_fields(model)
541    }
542
543    /// Return field descriptors using the accepted persisted schema snapshot.
544    ///
545    /// This fallible variant is intended for SQL and diagnostics surfaces that
546    /// can report schema reconciliation failures. The infallible
547    /// `show_columns` helper remains generated-model based.
548    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
549    where
550        E: EntityKind<Canister = C>,
551    {
552        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
553
554        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
555    }
556
557    /// Return one stable list of runtime-registered entity catalog entries.
558    ///
559    /// # Panics
560    ///
561    /// Panics if the runtime session cannot read its registered entity catalog.
562    /// Use `try_show_entities` when the caller can report the internal error.
563    #[must_use]
564    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
565        self.try_show_entities().expect("session invariant")
566    }
567
568    /// Return one stable list of runtime-registered entity catalog entries.
569    pub fn try_show_entities(
570        &self,
571    ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
572        self.db.runtime_entity_catalog()
573    }
574
575    /// Return one stable list of runtime-registered stores.
576    #[must_use]
577    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
578        self.db.runtime_store_catalog()
579    }
580
581    /// Return one stable list of runtime-registered stable-memory allocations.
582    #[must_use]
583    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
584        self.db.runtime_memory_catalog()
585    }
586
587    // Resolve the exact secondary-index set that is visible to planner-owned
588    // query planning for one recovered store and accepted schema pair.
589    #[cfg(feature = "sql")]
590    fn visible_indexes_for_store_accepted_schema(
591        &self,
592        store_path: &str,
593        schema_info: &SchemaInfo,
594    ) -> Result<VisibleIndexes<'static>, QueryError> {
595        // Phase 1: resolve the recovered store state once at the session
596        // boundary so query/executor planning does not reopen lifecycle checks.
597        let store = self
598            .db
599            .recovered_store(store_path)
600            .map_err(QueryError::execute)?;
601        let state = store.index_state();
602        if state != IndexState::Ready {
603            return Ok(VisibleIndexes::none());
604        }
605        debug_assert_eq!(state, IndexState::Ready);
606
607        // Phase 2: planner-visible indexes are accepted schema contracts once
608        // the recovered store is query-visible.
609        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
610        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
611        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
612        debug_assert_eq!(
613            visible_indexes.accepted_expression_index_count(),
614            Some(visible_indexes.accepted_expression_indexes().len()),
615        );
616
617        Ok(visible_indexes)
618    }
619
620    /// Return one generated-model schema description for the entity.
621    ///
622    /// This is a typed `DESCRIBE`-style introspection surface consumed by
623    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
624    /// schema view is required.
625    #[must_use]
626    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
627    where
628        E: EntityKind<Canister = C>,
629    {
630        self.describe_entity_model(E::MODEL)
631    }
632
633    /// Return one generated-model schema description for one schema model.
634    #[must_use]
635    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
636        describe_entity_model(model)
637    }
638
639    /// Return a schema description using the accepted persisted schema snapshot.
640    ///
641    /// This is the live-schema counterpart to `describe_entity`. It is fallible
642    /// because loading accepted schema authority can fail if startup
643    /// reconciliation rejects the stored metadata.
644    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
645    where
646        E: EntityKind<Canister = C>,
647    {
648        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
649
650        Ok(describe_entity_model_with_persisted_schema(
651            E::MODEL,
652            &snapshot,
653        ))
654    }
655
656    // Ensure and return the accepted schema snapshot for one generated entity.
657    // This may write the first snapshot for an empty store; otherwise it loads
658    // the latest stored snapshot and applies the current exact-match policy.
659    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
660    where
661        E: EntityKind<Canister = C>,
662    {
663        let store = self.db.recovered_store(E::Store::PATH)?;
664
665        store.with_schema_mut(|schema_store| {
666            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
667        })
668    }
669
670    // Load the current accepted schema snapshot for read/query paths without
671    // rerunning generated proposal reconciliation on every cold query call.
672    // Startup and write paths still own reconciliation; the fallback only keeps
673    // first-use test stores and freshly initialized local stores functional.
674    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
675        &self,
676    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
677    where
678        E: EntityKind<Canister = C>,
679    {
680        let cache_key = (self.db.cache_scope_id(), E::PATH);
681        if let Some(entry) =
682            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
683        {
684            return Ok(AcceptedSchemaCatalogContext::new(
685                entry.snapshot,
686                entry.identity,
687            ));
688        }
689
690        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
691        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
692        let identity = AcceptedCatalogIdentity::new(
693            E::ENTITY_TAG,
694            E::PATH,
695            E::Store::PATH,
696            snapshot.persisted_snapshot().version(),
697            fingerprint,
698        );
699        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
700            cache.borrow_mut().insert(
701                cache_key,
702                AcceptedSchemaQueryCacheEntry {
703                    snapshot: snapshot.clone(),
704                    identity,
705                },
706            );
707        });
708
709        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
710    }
711
712    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
713        &self,
714    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
715    where
716        E: EntityKind<Canister = C>,
717    {
718        let store = self.db.recovered_store(E::Store::PATH)?;
719
720        store.with_schema_mut(|schema_store| {
721            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
722        })
723    }
724
725    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
726        &self,
727        selection: &AcceptedCatalogSnapshotSelection,
728    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
729    where
730        E: EntityKind<Canister = C>,
731    {
732        let cache_key = (self.db.cache_scope_id(), E::PATH);
733        if let Some(entry) =
734            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
735            && entry.identity == selection.identity()
736        {
737            return Ok(Some(AcceptedSchemaCatalogContext::new(
738                entry.snapshot,
739                entry.identity,
740            )));
741        }
742
743        let snapshot = selection.decode_verified()?;
744        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
745            return Ok(None);
746        }
747        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
748
749        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
750            cache.borrow_mut().insert(
751                cache_key,
752                AcceptedSchemaQueryCacheEntry {
753                    snapshot,
754                    identity: selection.identity(),
755                },
756            );
757        });
758
759        Ok(Some(context))
760    }
761
762    #[cfg(feature = "sql")]
763    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
764    where
765        E: EntityKind<Canister = C>,
766    {
767        let cache_key = (self.db.cache_scope_id(), E::PATH);
768        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
769            cache.borrow_mut().remove(&cache_key);
770        });
771    }
772
773    fn load_accepted_schema_snapshot_for_query<E>(
774        &self,
775    ) -> Result<AcceptedSchemaSnapshot, InternalError>
776    where
777        E: EntityKind<Canister = C>,
778    {
779        let store = self.db.recovered_store(E::Store::PATH)?;
780
781        store.with_schema_mut(|schema_store| {
782            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
783                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
784                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
785                    &accepted,
786                    E::MODEL,
787                )
788                .is_ok()
789                {
790                    return Ok(accepted);
791                }
792            }
793
794            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
795        })
796    }
797
798    // Build the accepted schema-info projection for one typed entity. Fluent
799    // terminal adapters use this before constructing slot-bound descriptors so
800    // field slot authority comes from the accepted schema snapshot.
801    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
802        &self,
803    ) -> Result<SchemaInfo, InternalError>
804    where
805        E: EntityKind<Canister = C>,
806    {
807        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
808
809        Ok(catalog.accepted_schema_info_for::<E>())
810    }
811
812    // Derive typed executor authority from an accepted snapshot the caller
813    // already loaded, avoiding a second schema-store pass in SQL write/select
814    // adapters that need both write descriptors and read selector authority.
815    #[cfg(feature = "sql")]
816    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
817        accepted_schema: &AcceptedSchemaSnapshot,
818    ) -> Result<EntityAuthority, InternalError>
819    where
820        E: EntityKind<Canister = C>,
821    {
822        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
823    }
824
825    // Ensure accepted schema metadata is safe for write paths that still encode
826    // rows through generated field contracts. Returning only the snapshot keeps
827    // SQL write type checks unchanged while the schema-runtime contract guard
828    // rejects unsupported layout or payload drift before mutation staging.
829    fn ensure_generated_compatible_accepted_save_schema<E>(
830        &self,
831    ) -> Result<
832        (
833            AcceptedRowDecodeContract,
834            SchemaInfo,
835            CommitSchemaFingerprint,
836        ),
837        InternalError,
838    >
839    where
840        E: EntityKind<Canister = C>,
841    {
842        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
843        let (accepted_row_layout, _) =
844            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
845                &accepted_schema,
846                E::MODEL,
847            )?;
848        let (row_decode_contract, _, schema_info, schema_fingerprint) =
849            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
850
851        Ok((row_decode_contract, schema_info, schema_fingerprint))
852    }
853
854    /// Build one point-in-time storage report for observability endpoints.
855    pub fn storage_report(
856        &self,
857        name_to_path: &[(&'static str, &'static str)],
858    ) -> Result<StorageReport, InternalError> {
859        self.db.storage_report(name_to_path)
860    }
861
862    /// Build one point-in-time storage report using default entity-path labels.
863    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
864        self.db.storage_report_default()
865    }
866
867    /// Build one point-in-time integrity scan report for observability endpoints.
868    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
869        self.db.integrity_report()
870    }
871
872    // ---------------------------------------------------------------------
873    // Low-level executors (crate-internal; execution primitives)
874    // ---------------------------------------------------------------------
875
876    #[must_use]
877    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
878    where
879        E: EntityKind<Canister = C> + EntityValue,
880    {
881        LoadExecutor::new(self.db, self.debug)
882    }
883
884    #[must_use]
885    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
886    where
887        E: PersistedRow<Canister = C> + EntityValue,
888    {
889        DeleteExecutor::new(self.db)
890    }
891
892    #[must_use]
893    pub(in crate::db) const fn save_executor<E>(
894        &self,
895        accepted_row_decode_contract: AcceptedRowDecodeContract,
896        accepted_schema_info: SchemaInfo,
897        accepted_schema_fingerprint: CommitSchemaFingerprint,
898    ) -> SaveExecutor<E>
899    where
900        E: PersistedRow<Canister = C> + EntityValue,
901    {
902        SaveExecutor::new_with_accepted_contract(
903            self.db,
904            self.debug,
905            accepted_row_decode_contract,
906            accepted_schema_info,
907            accepted_schema_fingerprint,
908        )
909    }
910}