Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod bounded_cache;
7mod query;
8mod response;
9#[cfg(feature = "sql")]
10mod sql;
11///
12/// TESTS
13///
14#[cfg(all(test, feature = "sql"))]
15mod tests;
16mod write;
17
18#[cfg(any(test, feature = "sql-explain"))]
19use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
20use crate::{
21    db::{
22        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
23        FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
24        StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
25        commit::CommitSchemaFingerprint,
26        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
27        schema::{
28            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
29            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
30            accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
31            describe_entity_fields, describe_entity_fields_with_persisted_schema,
32            describe_entity_model, describe_entity_model_with_persisted_schema,
33            ensure_accepted_schema_snapshot, show_indexes_for_model,
34            show_indexes_for_model_with_runtime_state,
35            show_indexes_for_schema_info_with_runtime_state,
36        },
37    },
38    error::InternalError,
39    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
40    model::entity::EntityModel,
41    traits::{CanisterKind, EntityKind, EntityValue, Path},
42    value::Value,
43};
44use std::{
45    cell::{OnceCell, RefCell},
46    collections::HashMap,
47    thread::LocalKey,
48};
49
50#[cfg(feature = "diagnostics")]
51pub use query::{
52    DirectDataRowAttribution, FluentTerminalExecutionAttribution, GroupedCountAttribution,
53    GroupedExecutionAttribution, KernelRowAttribution, QueryExecutionAttribution,
54    ScalarAggregateAttribution,
55};
56pub(in crate::db) use response::finalize_scalar_paged_execution;
57pub(in crate::db) use response::finalize_structural_grouped_projection_result;
58#[cfg(feature = "sql")]
59pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
60#[cfg(feature = "sql")]
61pub use sql::{
62    SqlAdminBulkDeletePlan, SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind,
63    SqlDdlPreparationReport, SqlDeleteExposurePolicy, SqlDeletePolicyContext,
64    SqlDeletePolicyRejection, SqlDeletePolicyReport, SqlDeleteStatementClassification,
65    SqlPublicBoundedDeletePlan, SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyDeletePlan,
66    SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentDeletePlan, SqlSessionCurrentUpdatePlan,
67    SqlStatementDispatch, SqlStatementResult, SqlStatementShellSurface, SqlStatementSurface,
68    SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy, SqlUpdatePolicyContext,
69    SqlUpdatePolicyRejection, SqlUpdatePolicyReport, SqlUpdateStatementClassification,
70    SqlValidatedDeletePlan, SqlValidatedUpdatePlan, SqlWriteExecutionBounds, SqlWriteOrderProof,
71    SqlWriteReturningBounds, SqlWriteReturningShape, SqlWriteStatementShape, SqlWriteWhereProof,
72    classify_sql_delete_policy, classify_sql_update_policy, sql_statement_dispatch,
73    sql_statement_entity_name, sql_statement_shell_surface, sql_statement_surface,
74};
75#[cfg(all(feature = "sql", feature = "diagnostics"))]
76pub use sql::{
77    SqlCompileAttribution, SqlExecutionAttribution, SqlHybridCoveringAttribution,
78    SqlOutputBlobAttribution, SqlPureCoveringAttribution, SqlQueryCacheAttribution,
79    SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
80};
81#[cfg(all(feature = "sql", feature = "diagnostics"))]
82pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
83
84///
85/// DbSession
86///
87/// Session-scoped database handle with policy (debug, metrics) and execution routing.
88///
89
90pub struct DbSession<C: CanisterKind> {
91    db: Db<C>,
92    debug: bool,
93    metrics: Option<&'static dyn MetricsSink>,
94}
95
96#[cfg(test)]
97#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
98pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
99    pub schema_info_projections: u64,
100    pub persisted_schema_decodes: u64,
101    pub generated_compatible_row_layout_proofs: u64,
102    pub latest_by_entity_calls: u64,
103    pub visible_index_projections: u64,
104}
105
106#[derive(Clone, Debug)]
107struct AcceptedSchemaQueryCacheEntry {
108    snapshot: AcceptedSchemaSnapshot,
109    identity: AcceptedCatalogIdentity,
110}
111
112pub(in crate::db) type AcceptedSaveContract = (
113    AcceptedRowDecodeContract,
114    AcceptedRowDecodeContract,
115    SchemaInfo,
116    CommitSchemaFingerprint,
117);
118
119#[derive(Clone, Debug)]
120pub(in crate::db) struct AcceptedSchemaCatalogContext {
121    snapshot: AcceptedSchemaSnapshot,
122    identity: AcceptedCatalogIdentity,
123    schema_info: OnceCell<SchemaInfo>,
124}
125
126impl AcceptedSchemaCatalogContext {
127    #[must_use]
128    pub(in crate::db) const fn new(
129        snapshot: AcceptedSchemaSnapshot,
130        identity: AcceptedCatalogIdentity,
131    ) -> Self {
132        Self {
133            snapshot,
134            identity,
135            schema_info: OnceCell::new(),
136        }
137    }
138
139    #[must_use]
140    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
141        &self.snapshot
142    }
143
144    #[must_use]
145    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
146        self.identity.accepted_schema_version()
147    }
148
149    #[must_use]
150    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
151        self.identity.accepted_schema_fingerprint()
152    }
153
154    #[must_use]
155    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
156        self.identity.fingerprint_method_version()
157    }
158
159    #[must_use]
160    #[cfg(feature = "sql")]
161    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
162        self.identity
163    }
164
165    fn debug_assert_matches_entity<E>(&self)
166    where
167        E: EntityKind,
168    {
169        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
170        debug_assert_eq!(self.identity.entity_path(), E::PATH);
171        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
172    }
173
174    pub(in crate::db) fn accepted_entity_authority_for<E>(
175        &self,
176    ) -> Result<EntityAuthority, InternalError>
177    where
178        E: EntityKind,
179    {
180        let schema_info = self.accepted_schema_info_for::<E>();
181
182        self.accepted_entity_authority_for_schema_info::<E>(schema_info)
183    }
184
185    fn accepted_entity_authority_for_schema_info<E>(
186        &self,
187        schema_info: SchemaInfo,
188    ) -> Result<EntityAuthority, InternalError>
189    where
190        E: EntityKind,
191    {
192        self.debug_assert_matches_entity::<E>();
193        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
194        let (accepted_row_layout, row_proof) =
195            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
196                &self.snapshot,
197                authority.model(),
198            )?;
199        let row_decode_contract = accepted_row_layout.row_decode_contract();
200
201        Ok(
202            authority.with_accepted_row_decode_contract(
203                row_proof,
204                row_decode_contract,
205                schema_info,
206            ),
207        )
208    }
209
210    #[cfg(feature = "sql")]
211    pub(in crate::db) fn accepted_entity_authority_and_schema_info_for<E>(
212        &self,
213    ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
214    where
215        E: EntityKind,
216    {
217        let schema_info = self.accepted_schema_info_for::<E>();
218        let authority = self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?;
219
220        Ok((authority, schema_info))
221    }
222
223    #[cfg(feature = "sql")]
224    pub(in crate::db) fn accepted_or_provided_entity_authority_for<E>(
225        &self,
226        accepted_authority: Option<&EntityAuthority>,
227    ) -> Result<EntityAuthority, InternalError>
228    where
229        E: EntityKind,
230    {
231        match accepted_authority {
232            Some(authority) => Ok(authority.clone()),
233            None => self.accepted_entity_authority_for::<E>(),
234        }
235    }
236
237    #[cfg(feature = "sql-explain")]
238    pub(in crate::db) fn accepted_or_provided_entity_authority_and_schema_info_for<E>(
239        &self,
240        accepted_authority: Option<&EntityAuthority>,
241    ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
242    where
243        E: EntityKind,
244    {
245        let schema_info = self.accepted_schema_info_for::<E>();
246        let authority = match accepted_authority {
247            Some(authority) => authority.clone(),
248            None => self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?,
249        };
250
251        Ok((authority, schema_info))
252    }
253
254    #[must_use]
255    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
256    where
257        E: EntityKind,
258    {
259        self.debug_assert_matches_entity::<E>();
260        self.schema_info
261            .get_or_init(|| {
262                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
263                    E::MODEL,
264                    &self.snapshot,
265                    true,
266                )
267            })
268            .clone()
269    }
270}
271
272pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
273    accepted_schema: &AcceptedSchemaSnapshot,
274    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
275) -> Result<AcceptedSaveContract, InternalError>
276where
277    E: EntityKind,
278{
279    let row_decode_contract = descriptor.row_decode_contract();
280    let mutation_row_decode_contract = row_decode_contract.clone();
281    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
282    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
283
284    Ok((
285        row_decode_contract,
286        mutation_row_decode_contract,
287        schema_info,
288        schema_fingerprint,
289    ))
290}
291
292thread_local! {
293    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
294    // but repeated read calls should not reload the stable schema snapshot just
295    // to prove an already-warmed cache key. SQL DDL publication invalidates this
296    // heap cache before the next query observes the new accepted schema.
297    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
298        RefCell::new(HashMap::default());
299}
300
301impl<C: CanisterKind> DbSession<C> {
302    /// Construct one session facade for a database handle.
303    #[must_use]
304    pub(crate) const fn new(db: Db<C>) -> Self {
305        Self {
306            db,
307            debug: false,
308            metrics: None,
309        }
310    }
311
312    /// Construct one session facade from store registry and runtime hooks.
313    #[must_use]
314    pub const fn new_with_hooks(
315        store: &'static LocalKey<StoreRegistry>,
316        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
317    ) -> Self {
318        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
319    }
320
321    #[cfg(test)]
322    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
323        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
324        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
325        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
326        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
327        query::reset_visible_index_projection_count_for_tests();
328    }
329
330    #[cfg(test)]
331    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
332    -> AcceptedCatalogRuntimeCounterSnapshot {
333        AcceptedCatalogRuntimeCounterSnapshot {
334            schema_info_projections:
335                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
336            persisted_schema_decodes:
337                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
338            generated_compatible_row_layout_proofs:
339                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
340            latest_by_entity_calls:
341                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
342            visible_index_projections: query::visible_index_projection_count_for_tests(),
343        }
344    }
345
346    /// Enable debug execution behavior where supported by executors.
347    #[must_use]
348    pub const fn debug(mut self) -> Self {
349        self.debug = true;
350        self
351    }
352
353    /// Attach one metrics sink for all session-executed operations.
354    #[must_use]
355    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
356        self.metrics = Some(sink);
357        self
358    }
359
360    // Shared fluent load wrapper construction keeps the session boundary in
361    // one place when load entry points differ only by missing-row policy.
362    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
363    where
364        E: EntityKind<Canister = C>,
365    {
366        FluentLoadQuery::new(self, Query::new(consistency))
367    }
368
369    // Shared fluent delete wrapper construction keeps the delete-mode handoff
370    // explicit at the session boundary instead of reassembling the same query
371    // shell in each public entry point.
372    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
373    where
374        E: PersistedRow<Canister = C>,
375    {
376        FluentDeleteQuery::new(self, Query::new(consistency).delete())
377    }
378
379    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
380        if let Some(sink) = self.metrics {
381            with_metrics_sink(sink, f)
382        } else {
383            f()
384        }
385    }
386
387    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
388    fn execute_save_with<E, T, R>(
389        &self,
390        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
391        map: impl FnOnce(T) -> R,
392    ) -> Result<R, InternalError>
393    where
394        E: PersistedRow<Canister = C> + EntityValue,
395    {
396        let (contract, schema_info, schema_fingerprint) = match self
397            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
398        {
399            Ok(authority) => authority,
400            Err(error) => {
401                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
402
403                return Err(error);
404            }
405        };
406        let value = self.with_metrics(|| {
407            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
408        })?;
409
410        Ok(map(value))
411    }
412
413    // Execute save work after the caller has already proven that the accepted
414    // row contract is generated-compatible. SQL and structural writes use this
415    // after their pre-staging schema guard so mutation staging and save
416    // execution do not rerun schema-store reconciliation in the same statement.
417    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
418        &self,
419        accepted_row_decode_contract: AcceptedRowDecodeContract,
420        accepted_schema_info: SchemaInfo,
421        accepted_schema_fingerprint: CommitSchemaFingerprint,
422        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
423        map: impl FnOnce(T) -> R,
424    ) -> Result<R, InternalError>
425    where
426        E: PersistedRow<Canister = C> + EntityValue,
427    {
428        let value = self.with_metrics(|| {
429            op(self.save_executor::<E>(
430                accepted_row_decode_contract,
431                accepted_schema_info,
432                accepted_schema_fingerprint,
433            ))
434        })?;
435
436        Ok(map(value))
437    }
438
439    // Shared save-facade wrappers keep response shape explicit at call sites.
440    fn execute_save_entity<E>(
441        &self,
442        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
443    ) -> Result<E, InternalError>
444    where
445        E: PersistedRow<Canister = C> + EntityValue,
446    {
447        self.execute_save_with(op, std::convert::identity)
448    }
449
450    fn execute_save_batch<E>(
451        &self,
452        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
453    ) -> Result<WriteBatchResponse<E>, InternalError>
454    where
455        E: PersistedRow<Canister = C> + EntityValue,
456    {
457        self.execute_save_with(op, WriteBatchResponse::new)
458    }
459
460    // ---------------------------------------------------------------------
461    // Query entry points (public, fluent)
462    // ---------------------------------------------------------------------
463
464    /// Start a fluent load query with default missing-row policy (`Ignore`).
465    #[must_use]
466    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
467    where
468        E: EntityKind<Canister = C>,
469    {
470        self.fluent_load_query(MissingRowPolicy::Ignore)
471    }
472
473    /// Start a fluent load query with explicit missing-row policy.
474    #[must_use]
475    pub const fn load_with_consistency<E>(
476        &self,
477        consistency: MissingRowPolicy,
478    ) -> FluentLoadQuery<'_, E>
479    where
480        E: EntityKind<Canister = C>,
481    {
482        self.fluent_load_query(consistency)
483    }
484
485    /// Start a fluent delete query with default missing-row policy (`Ignore`).
486    #[must_use]
487    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
488    where
489        E: PersistedRow<Canister = C>,
490    {
491        self.fluent_delete_query(MissingRowPolicy::Ignore)
492    }
493
494    /// Start a fluent delete query with explicit missing-row policy.
495    #[must_use]
496    pub fn delete_with_consistency<E>(
497        &self,
498        consistency: MissingRowPolicy,
499    ) -> FluentDeleteQuery<'_, E>
500    where
501        E: PersistedRow<Canister = C>,
502    {
503        self.fluent_delete_query(consistency)
504    }
505
506    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
507    ///
508    /// This terminal bypasses query planning and access routing entirely.
509    #[must_use]
510    pub const fn select_one(&self) -> Value {
511        Value::Int64(1)
512    }
513
514    /// Return one stable, human-readable index listing for the entity schema.
515    ///
516    /// Output format mirrors SQL-style introspection:
517    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
518    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
519    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
520    #[must_use]
521    pub fn show_indexes<E>(&self) -> Vec<String>
522    where
523        E: EntityKind<Canister = C>,
524    {
525        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
526    }
527
528    /// Return one stable, human-readable index listing for one schema model.
529    ///
530    /// This model-only helper is schema-owned and intentionally does not
531    /// attach runtime lifecycle state because it does not carry store
532    /// placement context.
533    #[must_use]
534    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
535        show_indexes_for_model(model)
536    }
537
538    /// Return one stable, human-readable index listing for the accepted schema.
539    ///
540    /// Unlike `show_indexes`, this fallible live-schema helper reflects
541    /// accepted DDL-created indexes as well as compiled schema indexes.
542    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
543    where
544        E: EntityKind<Canister = C>,
545    {
546        let schema = self.accepted_schema_info_for_entity::<E>()?;
547
548        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
549    }
550
551    // Return one stable, human-readable index listing for one resolved
552    // store/model pair, attaching the current runtime lifecycle state when the
553    // registry can resolve the backing store handle.
554    pub(in crate::db) fn show_indexes_for_store_model(
555        &self,
556        store_path: &str,
557        model: &'static EntityModel,
558    ) -> Vec<String> {
559        let runtime_state = self
560            .db
561            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
562            .map(|store| store.index_state());
563
564        show_indexes_for_model_with_runtime_state(model, runtime_state)
565    }
566
567    // Return one stable, human-readable index listing for one resolved
568    // store/accepted-schema pair, attaching the current runtime lifecycle state
569    // when the registry can resolve the backing store handle.
570    pub(in crate::db) fn show_indexes_for_store_schema_info(
571        &self,
572        store_path: &str,
573        schema: &SchemaInfo,
574    ) -> Vec<String> {
575        let runtime_state = self
576            .db
577            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
578            .map(|store| store.index_state());
579
580        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
581    }
582
583    /// Return one stable generated-model list of field descriptors.
584    ///
585    /// This infallible Rust metadata helper intentionally reports the compiled
586    /// schema model. Use `try_show_columns` for the accepted persisted-schema
587    /// view used by SQL and diagnostics surfaces.
588    #[must_use]
589    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
590    where
591        E: EntityKind<Canister = C>,
592    {
593        self.show_columns_for_model(E::MODEL)
594    }
595
596    /// Return one stable generated-model list of field descriptors.
597    #[must_use]
598    pub fn show_columns_for_model(
599        &self,
600        model: &'static EntityModel,
601    ) -> Vec<EntityFieldDescription> {
602        describe_entity_fields(model)
603    }
604
605    /// Return field descriptors using the accepted persisted schema snapshot.
606    ///
607    /// This fallible variant is intended for SQL and diagnostics surfaces that
608    /// can report schema reconciliation failures. The infallible
609    /// `show_columns` helper remains generated-model based.
610    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
611    where
612        E: EntityKind<Canister = C>,
613    {
614        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
615
616        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
617    }
618
619    /// Return one stable list of runtime-registered entity catalog entries.
620    ///
621    /// # Panics
622    ///
623    /// Panics if the runtime session cannot read its registered entity catalog.
624    /// Use `try_show_entities` when the caller can report the internal error.
625    #[must_use]
626    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
627        self.try_show_entities().expect("session invariant")
628    }
629
630    /// Return one stable list of runtime-registered entity catalog entries.
631    pub fn try_show_entities(
632        &self,
633    ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
634        self.db.runtime_entity_catalog()
635    }
636
637    /// Return one stable list of runtime-registered stores.
638    #[must_use]
639    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
640        self.db.runtime_store_catalog()
641    }
642
643    /// Return one stable list of runtime-registered stable-memory allocations.
644    #[must_use]
645    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
646        self.db.runtime_memory_catalog()
647    }
648
649    // Resolve the exact secondary-index set that is visible to planner-owned
650    // query planning for one recovered store and accepted schema pair.
651    #[cfg(any(test, feature = "sql-explain"))]
652    fn visible_indexes_for_store_accepted_schema(
653        &self,
654        store_path: &str,
655        schema_info: &SchemaInfo,
656    ) -> Result<VisibleIndexes<'static>, QueryError> {
657        // Phase 1: resolve the recovered store state once at the session
658        // boundary so query/executor planning does not reopen lifecycle checks.
659        let store = self
660            .db
661            .recovered_store(store_path)
662            .map_err(QueryError::execute)?;
663        let state = store.index_state();
664        if state != IndexState::Ready {
665            return Ok(VisibleIndexes::none());
666        }
667        debug_assert_eq!(state, IndexState::Ready);
668
669        // Phase 2: planner-visible indexes are accepted schema contracts once
670        // the recovered store is query-visible.
671        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
672        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
673        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
674        debug_assert_eq!(
675            visible_indexes.accepted_expression_index_count(),
676            Some(visible_indexes.accepted_expression_indexes().len()),
677        );
678
679        Ok(visible_indexes)
680    }
681
682    /// Return one generated-model schema description for the entity.
683    ///
684    /// This is a typed `DESCRIBE`-style introspection surface consumed by
685    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
686    /// schema view is required.
687    #[must_use]
688    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
689    where
690        E: EntityKind<Canister = C>,
691    {
692        self.describe_entity_model(E::MODEL)
693    }
694
695    /// Return one generated-model schema description for one schema model.
696    #[must_use]
697    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
698        describe_entity_model(model)
699    }
700
701    /// Return a schema description using the accepted persisted schema snapshot.
702    ///
703    /// This is the live-schema counterpart to `describe_entity`. It is fallible
704    /// because loading accepted schema authority can fail if startup
705    /// reconciliation rejects the stored metadata.
706    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
707    where
708        E: EntityKind<Canister = C>,
709    {
710        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
711
712        Ok(describe_entity_model_with_persisted_schema(
713            E::MODEL,
714            &snapshot,
715        ))
716    }
717
718    // Ensure and return the accepted schema snapshot for one generated entity.
719    // This may write the first snapshot for an empty store; otherwise it loads
720    // the latest stored snapshot and applies the current exact-match policy.
721    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
722    where
723        E: EntityKind<Canister = C>,
724    {
725        let store = self.db.recovered_store(E::Store::PATH)?;
726
727        store.with_schema_mut(|schema_store| {
728            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
729        })
730    }
731
732    // Load the current accepted schema snapshot for read/query paths without
733    // rerunning generated proposal reconciliation on every cold query call.
734    // Startup and write paths still own reconciliation; the fallback only keeps
735    // first-use test stores and freshly initialized local stores functional.
736    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
737        &self,
738    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
739    where
740        E: EntityKind<Canister = C>,
741    {
742        let cache_key = (self.db.cache_scope_id(), E::PATH);
743        if let Some(entry) =
744            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
745        {
746            return Ok(AcceptedSchemaCatalogContext::new(
747                entry.snapshot,
748                entry.identity,
749            ));
750        }
751
752        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
753        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
754        let identity = AcceptedCatalogIdentity::new(
755            E::ENTITY_TAG,
756            E::PATH,
757            E::Store::PATH,
758            snapshot.persisted_snapshot().version(),
759            fingerprint,
760        );
761        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
762            cache.borrow_mut().insert(
763                cache_key,
764                AcceptedSchemaQueryCacheEntry {
765                    snapshot: snapshot.clone(),
766                    identity,
767                },
768            );
769        });
770
771        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
772    }
773
774    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
775        &self,
776    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
777    where
778        E: EntityKind<Canister = C>,
779    {
780        let store = self.db.recovered_store(E::Store::PATH)?;
781
782        store.with_schema_mut(|schema_store| {
783            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
784        })
785    }
786
787    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
788        &self,
789        selection: &AcceptedCatalogSnapshotSelection,
790    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
791    where
792        E: EntityKind<Canister = C>,
793    {
794        let cache_key = (self.db.cache_scope_id(), E::PATH);
795        if let Some(entry) =
796            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
797            && entry.identity == selection.identity()
798        {
799            return Ok(Some(AcceptedSchemaCatalogContext::new(
800                entry.snapshot,
801                entry.identity,
802            )));
803        }
804
805        let snapshot = selection.decode_verified()?;
806        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
807            return Ok(None);
808        }
809        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
810
811        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
812            cache.borrow_mut().insert(
813                cache_key,
814                AcceptedSchemaQueryCacheEntry {
815                    snapshot,
816                    identity: selection.identity(),
817                },
818            );
819        });
820
821        Ok(Some(context))
822    }
823
824    #[cfg(feature = "sql")]
825    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
826    where
827        E: EntityKind<Canister = C>,
828    {
829        let cache_key = (self.db.cache_scope_id(), E::PATH);
830        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
831            cache.borrow_mut().remove(&cache_key);
832        });
833    }
834
835    fn load_accepted_schema_snapshot_for_query<E>(
836        &self,
837    ) -> Result<AcceptedSchemaSnapshot, InternalError>
838    where
839        E: EntityKind<Canister = C>,
840    {
841        let store = self.db.recovered_store(E::Store::PATH)?;
842
843        store.with_schema_mut(|schema_store| {
844            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
845                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
846                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
847                    &accepted,
848                    E::MODEL,
849                )
850                .is_ok()
851                {
852                    return Ok(accepted);
853                }
854            }
855
856            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
857        })
858    }
859
860    // Build the accepted schema-info projection for one typed entity. Fluent
861    // terminal adapters use this before constructing slot-bound descriptors so
862    // field slot authority comes from the accepted schema snapshot.
863    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
864        &self,
865    ) -> Result<SchemaInfo, InternalError>
866    where
867        E: EntityKind<Canister = C>,
868    {
869        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
870
871        Ok(catalog.accepted_schema_info_for::<E>())
872    }
873
874    // Derive typed executor authority from an accepted snapshot the caller
875    // already loaded, avoiding a second schema-store pass in SQL write/select
876    // adapters that need both write descriptors and read selector authority.
877    #[cfg(feature = "sql")]
878    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
879        accepted_schema: &AcceptedSchemaSnapshot,
880    ) -> Result<EntityAuthority, InternalError>
881    where
882        E: EntityKind<Canister = C>,
883    {
884        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
885    }
886
887    // Ensure accepted schema metadata is safe for write paths that still encode
888    // rows through generated field contracts. Returning only the snapshot keeps
889    // SQL write type checks unchanged while the schema-runtime contract guard
890    // rejects unsupported layout or payload drift before mutation staging.
891    fn ensure_generated_compatible_accepted_save_schema<E>(
892        &self,
893    ) -> Result<
894        (
895            AcceptedRowDecodeContract,
896            SchemaInfo,
897            CommitSchemaFingerprint,
898        ),
899        InternalError,
900    >
901    where
902        E: EntityKind<Canister = C>,
903    {
904        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
905        let (accepted_row_layout, _) =
906            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
907                &accepted_schema,
908                E::MODEL,
909            )?;
910        let (row_decode_contract, _, schema_info, schema_fingerprint) =
911            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
912
913        Ok((row_decode_contract, schema_info, schema_fingerprint))
914    }
915
916    /// Build one point-in-time storage report for observability endpoints.
917    pub fn storage_report(
918        &self,
919        name_to_path: &[(&'static str, &'static str)],
920    ) -> Result<StorageReport, InternalError> {
921        self.db.storage_report(name_to_path)
922    }
923
924    /// Build one point-in-time storage report using default entity-path labels.
925    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
926        self.db.storage_report_default()
927    }
928
929    /// Build one point-in-time integrity scan report for observability endpoints.
930    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
931        self.db.integrity_report()
932    }
933
934    // ---------------------------------------------------------------------
935    // Low-level executors (crate-internal; execution primitives)
936    // ---------------------------------------------------------------------
937
938    #[must_use]
939    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
940    where
941        E: EntityKind<Canister = C> + EntityValue,
942    {
943        LoadExecutor::new(self.db, self.debug)
944    }
945
946    #[must_use]
947    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
948    where
949        E: PersistedRow<Canister = C> + EntityValue,
950    {
951        DeleteExecutor::new(self.db)
952    }
953
954    #[must_use]
955    pub(in crate::db) const fn save_executor<E>(
956        &self,
957        accepted_row_decode_contract: AcceptedRowDecodeContract,
958        accepted_schema_info: SchemaInfo,
959        accepted_schema_fingerprint: CommitSchemaFingerprint,
960    ) -> SaveExecutor<E>
961    where
962        E: PersistedRow<Canister = C> + EntityValue,
963    {
964        SaveExecutor::new_with_accepted_contract(
965            self.db,
966            self.debug,
967            accepted_row_decode_contract,
968            accepted_schema_info,
969            accepted_schema_fingerprint,
970        )
971    }
972}