Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18    db::{
19        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20        FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21        QueryError, StorageReport, StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
22        commit::CommitSchemaFingerprint,
23        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
24        query::plan::VisibleIndexes,
25        schema::{
26            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
27            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
28            accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
29            describe_entity_fields, describe_entity_fields_with_persisted_schema,
30            describe_entity_model, describe_entity_model_with_persisted_schema,
31            ensure_accepted_schema_snapshot, show_indexes_for_model,
32            show_indexes_for_model_with_runtime_state,
33            show_indexes_for_schema_info_with_runtime_state,
34        },
35    },
36    error::InternalError,
37    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
38    model::entity::EntityModel,
39    traits::{CanisterKind, EntityKind, EntityValue, Path},
40    value::Value,
41};
42use std::{
43    cell::{OnceCell, RefCell},
44    collections::HashMap,
45    thread::LocalKey,
46};
47
48#[cfg(feature = "diagnostics")]
49pub use query::{
50    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
51    QueryExecutionAttribution,
52};
53pub(in crate::db) use response::finalize_structural_grouped_projection_result;
54pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
55#[cfg(all(feature = "sql", feature = "diagnostics"))]
56pub use sql::{
57    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
58    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
59};
60#[cfg(feature = "sql")]
61pub use sql::{
62    SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport, SqlStatementResult,
63    SqlStatementSurface, sql_statement_entity_name, sql_statement_surface,
64};
65#[cfg(all(feature = "sql", feature = "diagnostics"))]
66pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
67
68///
69/// DbSession
70///
71/// Session-scoped database handle with policy (debug, metrics) and execution routing.
72///
73
74pub struct DbSession<C: CanisterKind> {
75    db: Db<C>,
76    debug: bool,
77    metrics: Option<&'static dyn MetricsSink>,
78}
79
80#[cfg(test)]
81#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
82pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
83    pub schema_info_projections: u64,
84    pub persisted_schema_decodes: u64,
85    pub generated_compatible_row_layout_proofs: u64,
86    pub latest_by_entity_calls: u64,
87    pub visible_index_projections: u64,
88}
89
90#[derive(Clone, Debug)]
91struct AcceptedSchemaQueryCacheEntry {
92    snapshot: AcceptedSchemaSnapshot,
93    identity: AcceptedCatalogIdentity,
94}
95
96pub(in crate::db) type AcceptedSaveContract = (
97    AcceptedRowDecodeContract,
98    AcceptedRowDecodeContract,
99    SchemaInfo,
100    CommitSchemaFingerprint,
101);
102
103#[derive(Clone, Debug)]
104pub(in crate::db) struct AcceptedSchemaCatalogContext {
105    snapshot: AcceptedSchemaSnapshot,
106    identity: AcceptedCatalogIdentity,
107    schema_info: OnceCell<SchemaInfo>,
108}
109
110impl AcceptedSchemaCatalogContext {
111    #[must_use]
112    pub(in crate::db) const fn new(
113        snapshot: AcceptedSchemaSnapshot,
114        identity: AcceptedCatalogIdentity,
115    ) -> Self {
116        Self {
117            snapshot,
118            identity,
119            schema_info: OnceCell::new(),
120        }
121    }
122
123    #[must_use]
124    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
125        &self.snapshot
126    }
127
128    #[must_use]
129    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
130        self.identity.accepted_schema_version()
131    }
132
133    #[must_use]
134    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
135        self.identity.accepted_schema_fingerprint()
136    }
137
138    #[must_use]
139    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
140        self.identity.fingerprint_method_version()
141    }
142
143    #[must_use]
144    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
145        self.identity
146    }
147
148    fn debug_assert_matches_entity<E>(&self)
149    where
150        E: EntityKind,
151    {
152        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
153        debug_assert_eq!(self.identity.entity_path(), E::PATH);
154        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
155    }
156
157    pub(in crate::db) fn accepted_entity_authority_for<E>(
158        &self,
159    ) -> Result<EntityAuthority, InternalError>
160    where
161        E: EntityKind,
162    {
163        self.debug_assert_matches_entity::<E>();
164        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
165        let (accepted_row_layout, row_proof) =
166            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
167                &self.snapshot,
168                authority.model(),
169            )?;
170        let row_decode_contract = accepted_row_layout.row_decode_contract();
171
172        Ok(authority.with_accepted_row_decode_contract(
173            row_proof,
174            row_decode_contract,
175            self.accepted_schema_info_for::<E>(),
176        ))
177    }
178
179    #[must_use]
180    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
181    where
182        E: EntityKind,
183    {
184        self.debug_assert_matches_entity::<E>();
185        self.schema_info
186            .get_or_init(|| {
187                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
188                    E::MODEL,
189                    &self.snapshot,
190                    true,
191                )
192            })
193            .clone()
194    }
195}
196
197pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
198    accepted_schema: &AcceptedSchemaSnapshot,
199    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
200) -> Result<AcceptedSaveContract, InternalError>
201where
202    E: EntityKind,
203{
204    let row_decode_contract = descriptor.row_decode_contract();
205    let mutation_row_decode_contract = row_decode_contract.clone();
206    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
207    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
208
209    Ok((
210        row_decode_contract,
211        mutation_row_decode_contract,
212        schema_info,
213        schema_fingerprint,
214    ))
215}
216
217thread_local! {
218    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
219    // but repeated read calls should not reload the stable schema snapshot just
220    // to prove an already-warmed cache key. SQL DDL publication invalidates this
221    // heap cache before the next query observes the new accepted schema.
222    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
223        RefCell::new(HashMap::default());
224}
225
226impl<C: CanisterKind> DbSession<C> {
227    /// Construct one session facade for a database handle.
228    #[must_use]
229    pub(crate) const fn new(db: Db<C>) -> Self {
230        Self {
231            db,
232            debug: false,
233            metrics: None,
234        }
235    }
236
237    /// Construct one session facade from store registry and runtime hooks.
238    #[must_use]
239    pub const fn new_with_hooks(
240        store: &'static LocalKey<StoreRegistry>,
241        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
242    ) -> Self {
243        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
244    }
245
246    #[cfg(test)]
247    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
248        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
249        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
250        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
251        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
252        query::reset_visible_index_projection_count_for_tests();
253    }
254
255    #[cfg(test)]
256    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
257    -> AcceptedCatalogRuntimeCounterSnapshot {
258        AcceptedCatalogRuntimeCounterSnapshot {
259            schema_info_projections:
260                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
261            persisted_schema_decodes:
262                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
263            generated_compatible_row_layout_proofs:
264                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
265            latest_by_entity_calls:
266                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
267            visible_index_projections: query::visible_index_projection_count_for_tests(),
268        }
269    }
270
271    /// Enable debug execution behavior where supported by executors.
272    #[must_use]
273    pub const fn debug(mut self) -> Self {
274        self.debug = true;
275        self
276    }
277
278    /// Attach one metrics sink for all session-executed operations.
279    #[must_use]
280    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
281        self.metrics = Some(sink);
282        self
283    }
284
285    // Shared fluent load wrapper construction keeps the session boundary in
286    // one place when load entry points differ only by missing-row policy.
287    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
288    where
289        E: EntityKind<Canister = C>,
290    {
291        FluentLoadQuery::new(self, Query::new(consistency))
292    }
293
294    // Shared fluent delete wrapper construction keeps the delete-mode handoff
295    // explicit at the session boundary instead of reassembling the same query
296    // shell in each public entry point.
297    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
298    where
299        E: PersistedRow<Canister = C>,
300    {
301        FluentDeleteQuery::new(self, Query::new(consistency).delete())
302    }
303
304    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
305        if let Some(sink) = self.metrics {
306            with_metrics_sink(sink, f)
307        } else {
308            f()
309        }
310    }
311
312    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
313    fn execute_save_with<E, T, R>(
314        &self,
315        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
316        map: impl FnOnce(T) -> R,
317    ) -> Result<R, InternalError>
318    where
319        E: PersistedRow<Canister = C> + EntityValue,
320    {
321        let (contract, schema_info, schema_fingerprint) = match self
322            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
323        {
324            Ok(authority) => authority,
325            Err(error) => {
326                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
327
328                return Err(error);
329            }
330        };
331        let value = self.with_metrics(|| {
332            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
333        })?;
334
335        Ok(map(value))
336    }
337
338    // Execute save work after the caller has already proven that the accepted
339    // row contract is generated-compatible. SQL and structural writes use this
340    // after their pre-staging schema guard so mutation staging and save
341    // execution do not rerun schema-store reconciliation in the same statement.
342    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
343        &self,
344        accepted_row_decode_contract: AcceptedRowDecodeContract,
345        accepted_schema_info: SchemaInfo,
346        accepted_schema_fingerprint: CommitSchemaFingerprint,
347        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
348        map: impl FnOnce(T) -> R,
349    ) -> Result<R, InternalError>
350    where
351        E: PersistedRow<Canister = C> + EntityValue,
352    {
353        let value = self.with_metrics(|| {
354            op(self.save_executor::<E>(
355                accepted_row_decode_contract,
356                accepted_schema_info,
357                accepted_schema_fingerprint,
358            ))
359        })?;
360
361        Ok(map(value))
362    }
363
364    // Shared save-facade wrappers keep response shape explicit at call sites.
365    fn execute_save_entity<E>(
366        &self,
367        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
368    ) -> Result<E, InternalError>
369    where
370        E: PersistedRow<Canister = C> + EntityValue,
371    {
372        self.execute_save_with(op, std::convert::identity)
373    }
374
375    fn execute_save_batch<E>(
376        &self,
377        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
378    ) -> Result<WriteBatchResponse<E>, InternalError>
379    where
380        E: PersistedRow<Canister = C> + EntityValue,
381    {
382        self.execute_save_with(op, WriteBatchResponse::new)
383    }
384
385    // ---------------------------------------------------------------------
386    // Query entry points (public, fluent)
387    // ---------------------------------------------------------------------
388
389    /// Start a fluent load query with default missing-row policy (`Ignore`).
390    #[must_use]
391    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
392    where
393        E: EntityKind<Canister = C>,
394    {
395        self.fluent_load_query(MissingRowPolicy::Ignore)
396    }
397
398    /// Start a fluent load query with explicit missing-row policy.
399    #[must_use]
400    pub const fn load_with_consistency<E>(
401        &self,
402        consistency: MissingRowPolicy,
403    ) -> FluentLoadQuery<'_, E>
404    where
405        E: EntityKind<Canister = C>,
406    {
407        self.fluent_load_query(consistency)
408    }
409
410    /// Start a fluent delete query with default missing-row policy (`Ignore`).
411    #[must_use]
412    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
413    where
414        E: PersistedRow<Canister = C>,
415    {
416        self.fluent_delete_query(MissingRowPolicy::Ignore)
417    }
418
419    /// Start a fluent delete query with explicit missing-row policy.
420    #[must_use]
421    pub fn delete_with_consistency<E>(
422        &self,
423        consistency: MissingRowPolicy,
424    ) -> FluentDeleteQuery<'_, E>
425    where
426        E: PersistedRow<Canister = C>,
427    {
428        self.fluent_delete_query(consistency)
429    }
430
431    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
432    ///
433    /// This terminal bypasses query planning and access routing entirely.
434    #[must_use]
435    pub const fn select_one(&self) -> Value {
436        Value::Int64(1)
437    }
438
439    /// Return one stable, human-readable index listing for the entity schema.
440    ///
441    /// Output format mirrors SQL-style introspection:
442    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
443    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
444    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
445    #[must_use]
446    pub fn show_indexes<E>(&self) -> Vec<String>
447    where
448        E: EntityKind<Canister = C>,
449    {
450        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
451    }
452
453    /// Return one stable, human-readable index listing for one schema model.
454    ///
455    /// This model-only helper is schema-owned and intentionally does not
456    /// attach runtime lifecycle state because it does not carry store
457    /// placement context.
458    #[must_use]
459    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
460        show_indexes_for_model(model)
461    }
462
463    /// Return one stable, human-readable index listing for the accepted schema.
464    ///
465    /// Unlike `show_indexes`, this fallible live-schema helper reflects
466    /// accepted DDL-created indexes as well as compiled schema indexes.
467    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
468    where
469        E: EntityKind<Canister = C>,
470    {
471        let schema = self.accepted_schema_info_for_entity::<E>()?;
472
473        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
474    }
475
476    // Return one stable, human-readable index listing for one resolved
477    // store/model pair, attaching the current runtime lifecycle state when the
478    // registry can resolve the backing store handle.
479    pub(in crate::db) fn show_indexes_for_store_model(
480        &self,
481        store_path: &str,
482        model: &'static EntityModel,
483    ) -> Vec<String> {
484        let runtime_state = self
485            .db
486            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
487            .map(|store| store.index_state());
488
489        show_indexes_for_model_with_runtime_state(model, runtime_state)
490    }
491
492    // Return one stable, human-readable index listing for one resolved
493    // store/accepted-schema pair, attaching the current runtime lifecycle state
494    // when the registry can resolve the backing store handle.
495    pub(in crate::db) fn show_indexes_for_store_schema_info(
496        &self,
497        store_path: &str,
498        schema: &SchemaInfo,
499    ) -> Vec<String> {
500        let runtime_state = self
501            .db
502            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
503            .map(|store| store.index_state());
504
505        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
506    }
507
508    /// Return one stable generated-model list of field descriptors.
509    ///
510    /// This infallible Rust metadata helper intentionally reports the compiled
511    /// schema model. Use `try_show_columns` for the accepted persisted-schema
512    /// view used by SQL and diagnostics surfaces.
513    #[must_use]
514    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
515    where
516        E: EntityKind<Canister = C>,
517    {
518        self.show_columns_for_model(E::MODEL)
519    }
520
521    /// Return one stable generated-model list of field descriptors.
522    #[must_use]
523    pub fn show_columns_for_model(
524        &self,
525        model: &'static EntityModel,
526    ) -> Vec<EntityFieldDescription> {
527        describe_entity_fields(model)
528    }
529
530    /// Return field descriptors using the accepted persisted schema snapshot.
531    ///
532    /// This fallible variant is intended for SQL and diagnostics surfaces that
533    /// can report schema reconciliation failures. The infallible
534    /// `show_columns` helper remains generated-model based.
535    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
536    where
537        E: EntityKind<Canister = C>,
538    {
539        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
540
541        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
542    }
543
544    /// Return one stable list of runtime-registered entity catalog entries.
545    #[must_use]
546    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
547        self.try_show_entities()
548            .expect("SHOW ENTITIES metadata requires accepted schema authority")
549    }
550
551    /// Return one stable list of runtime-registered entity catalog entries.
552    pub fn try_show_entities(
553        &self,
554    ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
555        self.db.runtime_entity_catalog()
556    }
557
558    /// Return one stable list of runtime-registered stores.
559    #[must_use]
560    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
561        self.db.runtime_store_catalog()
562    }
563
564    /// Return one stable list of runtime-registered stable-memory allocations.
565    #[must_use]
566    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
567        self.db.runtime_memory_catalog()
568    }
569
570    // Resolve the exact secondary-index set that is visible to planner-owned
571    // query planning for one recovered store and accepted schema pair.
572    fn visible_indexes_for_store_accepted_schema(
573        &self,
574        store_path: &str,
575        schema_info: &SchemaInfo,
576    ) -> Result<VisibleIndexes<'static>, QueryError> {
577        // Phase 1: resolve the recovered store state once at the session
578        // boundary so query/executor planning does not reopen lifecycle checks.
579        let store = self
580            .db
581            .recovered_store(store_path)
582            .map_err(QueryError::execute)?;
583        let state = store.index_state();
584        if state != IndexState::Ready {
585            return Ok(VisibleIndexes::none());
586        }
587        debug_assert_eq!(state, IndexState::Ready);
588
589        // Phase 2: planner-visible indexes are accepted schema contracts once
590        // the recovered store is query-visible.
591        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
592        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
593        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
594        debug_assert_eq!(
595            visible_indexes.accepted_expression_index_count(),
596            Some(visible_indexes.accepted_expression_indexes().len()),
597        );
598
599        Ok(visible_indexes)
600    }
601
602    /// Return one generated-model schema description for the entity.
603    ///
604    /// This is a typed `DESCRIBE`-style introspection surface consumed by
605    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
606    /// schema view is required.
607    #[must_use]
608    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
609    where
610        E: EntityKind<Canister = C>,
611    {
612        self.describe_entity_model(E::MODEL)
613    }
614
615    /// Return one generated-model schema description for one schema model.
616    #[must_use]
617    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
618        describe_entity_model(model)
619    }
620
621    /// Return a schema description using the accepted persisted schema snapshot.
622    ///
623    /// This is the live-schema counterpart to `describe_entity`. It is fallible
624    /// because loading accepted schema authority can fail if startup
625    /// reconciliation rejects the stored metadata.
626    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
627    where
628        E: EntityKind<Canister = C>,
629    {
630        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
631
632        Ok(describe_entity_model_with_persisted_schema(
633            E::MODEL,
634            &snapshot,
635        ))
636    }
637
638    // Ensure and return the accepted schema snapshot for one generated entity.
639    // This may write the first snapshot for an empty store; otherwise it loads
640    // the latest stored snapshot and applies the current exact-match policy.
641    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
642    where
643        E: EntityKind<Canister = C>,
644    {
645        let store = self.db.recovered_store(E::Store::PATH)?;
646
647        store.with_schema_mut(|schema_store| {
648            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
649        })
650    }
651
652    // Load the current accepted schema snapshot for read/query paths without
653    // rerunning generated proposal reconciliation on every cold query call.
654    // Startup and write paths still own reconciliation; the fallback only keeps
655    // first-use test stores and freshly initialized local stores functional.
656    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
657        &self,
658    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
659    where
660        E: EntityKind<Canister = C>,
661    {
662        let cache_key = (self.db.cache_scope_id(), E::PATH);
663        if let Some(entry) =
664            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
665        {
666            return Ok(AcceptedSchemaCatalogContext::new(
667                entry.snapshot,
668                entry.identity,
669            ));
670        }
671
672        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
673        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
674        let identity = AcceptedCatalogIdentity::new(
675            E::ENTITY_TAG,
676            E::PATH,
677            E::Store::PATH,
678            snapshot.persisted_snapshot().version(),
679            fingerprint,
680        );
681        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
682            cache.borrow_mut().insert(
683                cache_key,
684                AcceptedSchemaQueryCacheEntry {
685                    snapshot: snapshot.clone(),
686                    identity,
687                },
688            );
689        });
690
691        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
692    }
693
694    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
695        &self,
696    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
697    where
698        E: EntityKind<Canister = C>,
699    {
700        let store = self.db.recovered_store(E::Store::PATH)?;
701
702        store.with_schema_mut(|schema_store| {
703            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
704        })
705    }
706
707    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
708        &self,
709        selection: &AcceptedCatalogSnapshotSelection,
710    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
711    where
712        E: EntityKind<Canister = C>,
713    {
714        let snapshot = selection.decode_verified()?;
715        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
716            return Ok(None);
717        }
718        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
719        let cache_key = (self.db.cache_scope_id(), E::PATH);
720
721        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
722            cache.borrow_mut().insert(
723                cache_key,
724                AcceptedSchemaQueryCacheEntry {
725                    snapshot,
726                    identity: selection.identity(),
727                },
728            );
729        });
730
731        Ok(Some(context))
732    }
733
734    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
735    where
736        E: EntityKind<Canister = C>,
737    {
738        let cache_key = (self.db.cache_scope_id(), E::PATH);
739        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
740            cache.borrow_mut().remove(&cache_key);
741        });
742    }
743
744    fn load_accepted_schema_snapshot_for_query<E>(
745        &self,
746    ) -> Result<AcceptedSchemaSnapshot, InternalError>
747    where
748        E: EntityKind<Canister = C>,
749    {
750        let store = self.db.recovered_store(E::Store::PATH)?;
751
752        store.with_schema_mut(|schema_store| {
753            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
754                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
755                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
756                    &accepted,
757                    E::MODEL,
758                )
759                .is_ok()
760                {
761                    return Ok(accepted);
762                }
763            }
764
765            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
766        })
767    }
768
769    // Build the accepted schema-info projection for one typed entity. Fluent
770    // terminal adapters use this before constructing slot-bound descriptors so
771    // field slot authority comes from the accepted schema snapshot.
772    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
773        &self,
774    ) -> Result<SchemaInfo, InternalError>
775    where
776        E: EntityKind<Canister = C>,
777    {
778        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
779
780        Ok(catalog.accepted_schema_info_for::<E>())
781    }
782
783    // Derive typed executor authority from an accepted snapshot the caller
784    // already loaded, avoiding a second schema-store pass in SQL write/select
785    // adapters that need both write descriptors and read selector authority.
786    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
787        accepted_schema: &AcceptedSchemaSnapshot,
788    ) -> Result<EntityAuthority, InternalError>
789    where
790        E: EntityKind<Canister = C>,
791    {
792        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
793    }
794
795    // Ensure accepted schema metadata is safe for write paths that still encode
796    // rows through generated field contracts. Returning only the snapshot keeps
797    // SQL write type checks unchanged while the schema-runtime contract guard
798    // rejects unsupported layout or payload drift before mutation staging.
799    fn ensure_generated_compatible_accepted_save_schema<E>(
800        &self,
801    ) -> Result<
802        (
803            AcceptedRowDecodeContract,
804            SchemaInfo,
805            CommitSchemaFingerprint,
806        ),
807        InternalError,
808    >
809    where
810        E: EntityKind<Canister = C>,
811    {
812        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
813        let (accepted_row_layout, _) =
814            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
815                &accepted_schema,
816                E::MODEL,
817            )?;
818        let (row_decode_contract, _, schema_info, schema_fingerprint) =
819            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
820
821        Ok((row_decode_contract, schema_info, schema_fingerprint))
822    }
823
824    /// Build one point-in-time storage report for observability endpoints.
825    pub fn storage_report(
826        &self,
827        name_to_path: &[(&'static str, &'static str)],
828    ) -> Result<StorageReport, InternalError> {
829        self.db.storage_report(name_to_path)
830    }
831
832    /// Build one point-in-time storage report using default entity-path labels.
833    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
834        self.db.storage_report_default()
835    }
836
837    /// Build one point-in-time integrity scan report for observability endpoints.
838    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
839        self.db.integrity_report()
840    }
841
842    // ---------------------------------------------------------------------
843    // Low-level executors (crate-internal; execution primitives)
844    // ---------------------------------------------------------------------
845
846    #[must_use]
847    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
848    where
849        E: EntityKind<Canister = C> + EntityValue,
850    {
851        LoadExecutor::new(self.db, self.debug)
852    }
853
854    #[must_use]
855    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
856    where
857        E: PersistedRow<Canister = C> + EntityValue,
858    {
859        DeleteExecutor::new(self.db)
860    }
861
862    #[must_use]
863    pub(in crate::db) const fn save_executor<E>(
864        &self,
865        accepted_row_decode_contract: AcceptedRowDecodeContract,
866        accepted_schema_info: SchemaInfo,
867        accepted_schema_fingerprint: CommitSchemaFingerprint,
868    ) -> SaveExecutor<E>
869    where
870        E: PersistedRow<Canister = C> + EntityValue,
871    {
872        SaveExecutor::new_with_accepted_contract(
873            self.db,
874            self.debug,
875            accepted_row_decode_contract,
876            accepted_schema_info,
877            accepted_schema_fingerprint,
878        )
879    }
880}