Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17#[cfg(any(test, feature = "sql-explain"))]
18use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
19use crate::{
20    db::{
21        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
22        FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
23        StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
24        commit::CommitSchemaFingerprint,
25        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
26        schema::{
27            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
28            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
29            accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
30            describe_entity_fields, describe_entity_fields_with_persisted_schema,
31            describe_entity_model, describe_entity_model_with_persisted_schema,
32            ensure_accepted_schema_snapshot, show_indexes_for_model,
33            show_indexes_for_model_with_runtime_state,
34            show_indexes_for_schema_info_with_runtime_state,
35        },
36    },
37    error::InternalError,
38    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
39    model::entity::EntityModel,
40    traits::{CanisterKind, EntityKind, EntityValue, Path},
41    value::Value,
42};
43use std::{
44    cell::{OnceCell, RefCell},
45    collections::HashMap,
46    thread::LocalKey,
47};
48
49#[cfg(feature = "diagnostics")]
50pub use query::{
51    DirectDataRowAttribution, FluentTerminalExecutionAttribution, GroupedCountAttribution,
52    GroupedExecutionAttribution, KernelRowAttribution, QueryExecutionAttribution,
53    ScalarAggregateAttribution,
54};
55pub(in crate::db) use response::finalize_scalar_paged_execution;
56pub(in crate::db) use response::finalize_structural_grouped_projection_result;
57#[cfg(feature = "sql")]
58pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
59#[cfg(feature = "sql")]
60pub use sql::{
61    SqlAdminBulkDeletePlan, SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind,
62    SqlDdlPreparationReport, SqlDeleteExposurePolicy, SqlDeletePolicyContext,
63    SqlDeletePolicyRejection, SqlDeletePolicyReport, SqlDeleteStatementClassification,
64    SqlPublicBoundedDeletePlan, SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyDeletePlan,
65    SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentDeletePlan, SqlSessionCurrentUpdatePlan,
66    SqlStatementDispatch, SqlStatementResult, SqlStatementShellSurface, SqlStatementSurface,
67    SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy, SqlUpdatePolicyContext,
68    SqlUpdatePolicyRejection, SqlUpdatePolicyReport, SqlUpdateStatementClassification,
69    SqlValidatedDeletePlan, SqlValidatedUpdatePlan, SqlWriteExecutionBounds, SqlWriteOrderProof,
70    SqlWriteReturningBounds, SqlWriteReturningShape, SqlWriteStatementShape, SqlWriteWhereProof,
71    classify_sql_delete_policy, classify_sql_update_policy, sql_statement_dispatch,
72    sql_statement_entity_name, sql_statement_shell_surface, sql_statement_surface,
73};
74#[cfg(all(feature = "sql", feature = "diagnostics"))]
75pub use sql::{
76    SqlCompileAttribution, SqlExecutionAttribution, SqlHybridCoveringAttribution,
77    SqlOutputBlobAttribution, SqlPureCoveringAttribution, SqlQueryCacheAttribution,
78    SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
79};
80#[cfg(all(feature = "sql", feature = "diagnostics"))]
81pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
82
83///
84/// DbSession
85///
86/// Session-scoped database handle with policy (debug, metrics) and execution routing.
87///
88
89pub struct DbSession<C: CanisterKind> {
90    db: Db<C>,
91    debug: bool,
92    metrics: Option<&'static dyn MetricsSink>,
93}
94
95#[cfg(test)]
96#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
97pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
98    pub schema_info_projections: u64,
99    pub persisted_schema_decodes: u64,
100    pub generated_compatible_row_layout_proofs: u64,
101    pub latest_by_entity_calls: u64,
102    pub visible_index_projections: u64,
103}
104
105#[derive(Clone, Debug)]
106struct AcceptedSchemaQueryCacheEntry {
107    snapshot: AcceptedSchemaSnapshot,
108    identity: AcceptedCatalogIdentity,
109}
110
111pub(in crate::db) type AcceptedSaveContract = (
112    AcceptedRowDecodeContract,
113    AcceptedRowDecodeContract,
114    SchemaInfo,
115    CommitSchemaFingerprint,
116);
117
118#[derive(Clone, Debug)]
119pub(in crate::db) struct AcceptedSchemaCatalogContext {
120    snapshot: AcceptedSchemaSnapshot,
121    identity: AcceptedCatalogIdentity,
122    schema_info: OnceCell<SchemaInfo>,
123}
124
125impl AcceptedSchemaCatalogContext {
126    #[must_use]
127    pub(in crate::db) const fn new(
128        snapshot: AcceptedSchemaSnapshot,
129        identity: AcceptedCatalogIdentity,
130    ) -> Self {
131        Self {
132            snapshot,
133            identity,
134            schema_info: OnceCell::new(),
135        }
136    }
137
138    #[must_use]
139    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
140        &self.snapshot
141    }
142
143    #[must_use]
144    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
145        self.identity.accepted_schema_version()
146    }
147
148    #[must_use]
149    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
150        self.identity.accepted_schema_fingerprint()
151    }
152
153    #[must_use]
154    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
155        self.identity.fingerprint_method_version()
156    }
157
158    #[must_use]
159    #[cfg(feature = "sql")]
160    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
161        self.identity
162    }
163
164    fn debug_assert_matches_entity<E>(&self)
165    where
166        E: EntityKind,
167    {
168        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
169        debug_assert_eq!(self.identity.entity_path(), E::PATH);
170        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
171    }
172
173    pub(in crate::db) fn accepted_entity_authority_for<E>(
174        &self,
175    ) -> Result<EntityAuthority, InternalError>
176    where
177        E: EntityKind,
178    {
179        let schema_info = self.accepted_schema_info_for::<E>();
180
181        self.accepted_entity_authority_for_schema_info::<E>(schema_info)
182    }
183
184    fn accepted_entity_authority_for_schema_info<E>(
185        &self,
186        schema_info: SchemaInfo,
187    ) -> Result<EntityAuthority, InternalError>
188    where
189        E: EntityKind,
190    {
191        self.debug_assert_matches_entity::<E>();
192        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
193        let (accepted_row_layout, row_proof) =
194            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
195                &self.snapshot,
196                authority.model(),
197            )?;
198        let row_decode_contract = accepted_row_layout.row_decode_contract();
199
200        Ok(
201            authority.with_accepted_row_decode_contract(
202                row_proof,
203                row_decode_contract,
204                schema_info,
205            ),
206        )
207    }
208
209    #[cfg(feature = "sql")]
210    pub(in crate::db) fn accepted_entity_authority_and_schema_info_for<E>(
211        &self,
212    ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
213    where
214        E: EntityKind,
215    {
216        let schema_info = self.accepted_schema_info_for::<E>();
217        let authority = self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?;
218
219        Ok((authority, schema_info))
220    }
221
222    #[cfg(feature = "sql")]
223    pub(in crate::db) fn accepted_or_provided_entity_authority_for<E>(
224        &self,
225        accepted_authority: Option<&EntityAuthority>,
226    ) -> Result<EntityAuthority, InternalError>
227    where
228        E: EntityKind,
229    {
230        match accepted_authority {
231            Some(authority) => Ok(authority.clone()),
232            None => self.accepted_entity_authority_for::<E>(),
233        }
234    }
235
236    #[cfg(feature = "sql-explain")]
237    pub(in crate::db) fn accepted_or_provided_entity_authority_and_schema_info_for<E>(
238        &self,
239        accepted_authority: Option<&EntityAuthority>,
240    ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
241    where
242        E: EntityKind,
243    {
244        let schema_info = self.accepted_schema_info_for::<E>();
245        let authority = match accepted_authority {
246            Some(authority) => authority.clone(),
247            None => self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?,
248        };
249
250        Ok((authority, schema_info))
251    }
252
253    #[must_use]
254    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
255    where
256        E: EntityKind,
257    {
258        self.debug_assert_matches_entity::<E>();
259        self.schema_info
260            .get_or_init(|| {
261                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
262                    E::MODEL,
263                    &self.snapshot,
264                    true,
265                )
266            })
267            .clone()
268    }
269}
270
271pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
272    accepted_schema: &AcceptedSchemaSnapshot,
273    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
274) -> Result<AcceptedSaveContract, InternalError>
275where
276    E: EntityKind,
277{
278    let row_decode_contract = descriptor.row_decode_contract();
279    let mutation_row_decode_contract = row_decode_contract.clone();
280    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
281    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
282
283    Ok((
284        row_decode_contract,
285        mutation_row_decode_contract,
286        schema_info,
287        schema_fingerprint,
288    ))
289}
290
291thread_local! {
292    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
293    // but repeated read calls should not reload the stable schema snapshot just
294    // to prove an already-warmed cache key. SQL DDL publication invalidates this
295    // heap cache before the next query observes the new accepted schema.
296    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
297        RefCell::new(HashMap::default());
298}
299
300impl<C: CanisterKind> DbSession<C> {
301    /// Construct one session facade for a database handle.
302    #[must_use]
303    pub(crate) const fn new(db: Db<C>) -> Self {
304        Self {
305            db,
306            debug: false,
307            metrics: None,
308        }
309    }
310
311    /// Construct one session facade from store registry and runtime hooks.
312    #[must_use]
313    pub const fn new_with_hooks(
314        store: &'static LocalKey<StoreRegistry>,
315        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
316    ) -> Self {
317        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
318    }
319
320    #[cfg(test)]
321    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
322        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
323        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
324        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
325        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
326        query::reset_visible_index_projection_count_for_tests();
327    }
328
329    #[cfg(test)]
330    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
331    -> AcceptedCatalogRuntimeCounterSnapshot {
332        AcceptedCatalogRuntimeCounterSnapshot {
333            schema_info_projections:
334                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
335            persisted_schema_decodes:
336                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
337            generated_compatible_row_layout_proofs:
338                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
339            latest_by_entity_calls:
340                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
341            visible_index_projections: query::visible_index_projection_count_for_tests(),
342        }
343    }
344
345    /// Enable debug execution behavior where supported by executors.
346    #[must_use]
347    pub const fn debug(mut self) -> Self {
348        self.debug = true;
349        self
350    }
351
352    /// Attach one metrics sink for all session-executed operations.
353    #[must_use]
354    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
355        self.metrics = Some(sink);
356        self
357    }
358
359    // Shared fluent load wrapper construction keeps the session boundary in
360    // one place when load entry points differ only by missing-row policy.
361    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
362    where
363        E: EntityKind<Canister = C>,
364    {
365        FluentLoadQuery::new(self, Query::new(consistency))
366    }
367
368    // Shared fluent delete wrapper construction keeps the delete-mode handoff
369    // explicit at the session boundary instead of reassembling the same query
370    // shell in each public entry point.
371    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
372    where
373        E: PersistedRow<Canister = C>,
374    {
375        FluentDeleteQuery::new(self, Query::new(consistency).delete())
376    }
377
378    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
379        if let Some(sink) = self.metrics {
380            with_metrics_sink(sink, f)
381        } else {
382            f()
383        }
384    }
385
386    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
387    fn execute_save_with<E, T, R>(
388        &self,
389        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
390        map: impl FnOnce(T) -> R,
391    ) -> Result<R, InternalError>
392    where
393        E: PersistedRow<Canister = C> + EntityValue,
394    {
395        let (contract, schema_info, schema_fingerprint) = match self
396            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
397        {
398            Ok(authority) => authority,
399            Err(error) => {
400                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
401
402                return Err(error);
403            }
404        };
405        let value = self.with_metrics(|| {
406            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
407        })?;
408
409        Ok(map(value))
410    }
411
412    // Execute save work after the caller has already proven that the accepted
413    // row contract is generated-compatible. SQL and structural writes use this
414    // after their pre-staging schema guard so mutation staging and save
415    // execution do not rerun schema-store reconciliation in the same statement.
416    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
417        &self,
418        accepted_row_decode_contract: AcceptedRowDecodeContract,
419        accepted_schema_info: SchemaInfo,
420        accepted_schema_fingerprint: CommitSchemaFingerprint,
421        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
422        map: impl FnOnce(T) -> R,
423    ) -> Result<R, InternalError>
424    where
425        E: PersistedRow<Canister = C> + EntityValue,
426    {
427        let value = self.with_metrics(|| {
428            op(self.save_executor::<E>(
429                accepted_row_decode_contract,
430                accepted_schema_info,
431                accepted_schema_fingerprint,
432            ))
433        })?;
434
435        Ok(map(value))
436    }
437
438    // Shared save-facade wrappers keep response shape explicit at call sites.
439    fn execute_save_entity<E>(
440        &self,
441        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
442    ) -> Result<E, InternalError>
443    where
444        E: PersistedRow<Canister = C> + EntityValue,
445    {
446        self.execute_save_with(op, std::convert::identity)
447    }
448
449    fn execute_save_batch<E>(
450        &self,
451        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
452    ) -> Result<WriteBatchResponse<E>, InternalError>
453    where
454        E: PersistedRow<Canister = C> + EntityValue,
455    {
456        self.execute_save_with(op, WriteBatchResponse::new)
457    }
458
459    // ---------------------------------------------------------------------
460    // Query entry points (public, fluent)
461    // ---------------------------------------------------------------------
462
463    /// Start a fluent load query with default missing-row policy (`Ignore`).
464    #[must_use]
465    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
466    where
467        E: EntityKind<Canister = C>,
468    {
469        self.fluent_load_query(MissingRowPolicy::Ignore)
470    }
471
472    /// Start a fluent load query with explicit missing-row policy.
473    #[must_use]
474    pub const fn load_with_consistency<E>(
475        &self,
476        consistency: MissingRowPolicy,
477    ) -> FluentLoadQuery<'_, E>
478    where
479        E: EntityKind<Canister = C>,
480    {
481        self.fluent_load_query(consistency)
482    }
483
484    /// Start a fluent delete query with default missing-row policy (`Ignore`).
485    #[must_use]
486    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
487    where
488        E: PersistedRow<Canister = C>,
489    {
490        self.fluent_delete_query(MissingRowPolicy::Ignore)
491    }
492
493    /// Start a fluent delete query with explicit missing-row policy.
494    #[must_use]
495    pub fn delete_with_consistency<E>(
496        &self,
497        consistency: MissingRowPolicy,
498    ) -> FluentDeleteQuery<'_, E>
499    where
500        E: PersistedRow<Canister = C>,
501    {
502        self.fluent_delete_query(consistency)
503    }
504
505    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
506    ///
507    /// This terminal bypasses query planning and access routing entirely.
508    #[must_use]
509    pub const fn select_one(&self) -> Value {
510        Value::Int64(1)
511    }
512
513    /// Return one stable, human-readable index listing for the entity schema.
514    ///
515    /// Output format mirrors SQL-style introspection:
516    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
517    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
518    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
519    #[must_use]
520    pub fn show_indexes<E>(&self) -> Vec<String>
521    where
522        E: EntityKind<Canister = C>,
523    {
524        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
525    }
526
527    /// Return one stable, human-readable index listing for one schema model.
528    ///
529    /// This model-only helper is schema-owned and intentionally does not
530    /// attach runtime lifecycle state because it does not carry store
531    /// placement context.
532    #[must_use]
533    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
534        show_indexes_for_model(model)
535    }
536
537    /// Return one stable, human-readable index listing for the accepted schema.
538    ///
539    /// Unlike `show_indexes`, this fallible live-schema helper reflects
540    /// accepted DDL-created indexes as well as compiled schema indexes.
541    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
542    where
543        E: EntityKind<Canister = C>,
544    {
545        let schema = self.accepted_schema_info_for_entity::<E>()?;
546
547        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
548    }
549
550    // Return one stable, human-readable index listing for one resolved
551    // store/model pair, attaching the current runtime lifecycle state when the
552    // registry can resolve the backing store handle.
553    pub(in crate::db) fn show_indexes_for_store_model(
554        &self,
555        store_path: &str,
556        model: &'static EntityModel,
557    ) -> Vec<String> {
558        let runtime_state = self
559            .db
560            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
561            .map(|store| store.index_state());
562
563        show_indexes_for_model_with_runtime_state(model, runtime_state)
564    }
565
566    // Return one stable, human-readable index listing for one resolved
567    // store/accepted-schema pair, attaching the current runtime lifecycle state
568    // when the registry can resolve the backing store handle.
569    pub(in crate::db) fn show_indexes_for_store_schema_info(
570        &self,
571        store_path: &str,
572        schema: &SchemaInfo,
573    ) -> Vec<String> {
574        let runtime_state = self
575            .db
576            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
577            .map(|store| store.index_state());
578
579        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
580    }
581
582    /// Return one stable generated-model list of field descriptors.
583    ///
584    /// This infallible Rust metadata helper intentionally reports the compiled
585    /// schema model. Use `try_show_columns` for the accepted persisted-schema
586    /// view used by SQL and diagnostics surfaces.
587    #[must_use]
588    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
589    where
590        E: EntityKind<Canister = C>,
591    {
592        self.show_columns_for_model(E::MODEL)
593    }
594
595    /// Return one stable generated-model list of field descriptors.
596    #[must_use]
597    pub fn show_columns_for_model(
598        &self,
599        model: &'static EntityModel,
600    ) -> Vec<EntityFieldDescription> {
601        describe_entity_fields(model)
602    }
603
604    /// Return field descriptors using the accepted persisted schema snapshot.
605    ///
606    /// This fallible variant is intended for SQL and diagnostics surfaces that
607    /// can report schema reconciliation failures. The infallible
608    /// `show_columns` helper remains generated-model based.
609    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
610    where
611        E: EntityKind<Canister = C>,
612    {
613        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
614
615        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
616    }
617
618    /// Return one stable list of runtime-registered entity catalog entries.
619    ///
620    /// # Panics
621    ///
622    /// Panics if the runtime session cannot read its registered entity catalog.
623    /// Use `try_show_entities` when the caller can report the internal error.
624    #[must_use]
625    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
626        self.try_show_entities().expect("session invariant")
627    }
628
629    /// Return one stable list of runtime-registered entity catalog entries.
630    pub fn try_show_entities(
631        &self,
632    ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
633        self.db.runtime_entity_catalog()
634    }
635
636    /// Return one stable list of runtime-registered stores.
637    #[must_use]
638    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
639        self.db.runtime_store_catalog()
640    }
641
642    /// Return one stable list of runtime-registered stable-memory allocations.
643    #[must_use]
644    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
645        self.db.runtime_memory_catalog()
646    }
647
648    // Resolve the exact secondary-index set that is visible to planner-owned
649    // query planning for one recovered store and accepted schema pair.
650    #[cfg(any(test, feature = "sql-explain"))]
651    fn visible_indexes_for_store_accepted_schema(
652        &self,
653        store_path: &str,
654        schema_info: &SchemaInfo,
655    ) -> Result<VisibleIndexes<'static>, QueryError> {
656        // Phase 1: resolve the recovered store state once at the session
657        // boundary so query/executor planning does not reopen lifecycle checks.
658        let store = self
659            .db
660            .recovered_store(store_path)
661            .map_err(QueryError::execute)?;
662        let state = store.index_state();
663        if state != IndexState::Ready {
664            return Ok(VisibleIndexes::none());
665        }
666        debug_assert_eq!(state, IndexState::Ready);
667
668        // Phase 2: planner-visible indexes are accepted schema contracts once
669        // the recovered store is query-visible.
670        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
671        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
672        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
673        debug_assert_eq!(
674            visible_indexes.accepted_expression_index_count(),
675            Some(visible_indexes.accepted_expression_indexes().len()),
676        );
677
678        Ok(visible_indexes)
679    }
680
681    /// Return one generated-model schema description for the entity.
682    ///
683    /// This is a typed `DESCRIBE`-style introspection surface consumed by
684    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
685    /// schema view is required.
686    #[must_use]
687    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
688    where
689        E: EntityKind<Canister = C>,
690    {
691        self.describe_entity_model(E::MODEL)
692    }
693
694    /// Return one generated-model schema description for one schema model.
695    #[must_use]
696    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
697        describe_entity_model(model)
698    }
699
700    /// Return a schema description using the accepted persisted schema snapshot.
701    ///
702    /// This is the live-schema counterpart to `describe_entity`. It is fallible
703    /// because loading accepted schema authority can fail if startup
704    /// reconciliation rejects the stored metadata.
705    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
706    where
707        E: EntityKind<Canister = C>,
708    {
709        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
710
711        Ok(describe_entity_model_with_persisted_schema(
712            E::MODEL,
713            &snapshot,
714        ))
715    }
716
717    // Ensure and return the accepted schema snapshot for one generated entity.
718    // This may write the first snapshot for an empty store; otherwise it loads
719    // the latest stored snapshot and applies the current exact-match policy.
720    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
721    where
722        E: EntityKind<Canister = C>,
723    {
724        let store = self.db.recovered_store(E::Store::PATH)?;
725
726        store.with_schema_mut(|schema_store| {
727            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
728        })
729    }
730
731    // Load the current accepted schema snapshot for read/query paths without
732    // rerunning generated proposal reconciliation on every cold query call.
733    // Startup and write paths still own reconciliation; the fallback only keeps
734    // first-use test stores and freshly initialized local stores functional.
735    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
736        &self,
737    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
738    where
739        E: EntityKind<Canister = C>,
740    {
741        let cache_key = (self.db.cache_scope_id(), E::PATH);
742        if let Some(entry) =
743            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
744        {
745            return Ok(AcceptedSchemaCatalogContext::new(
746                entry.snapshot,
747                entry.identity,
748            ));
749        }
750
751        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
752        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
753        let identity = AcceptedCatalogIdentity::new(
754            E::ENTITY_TAG,
755            E::PATH,
756            E::Store::PATH,
757            snapshot.persisted_snapshot().version(),
758            fingerprint,
759        );
760        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
761            cache.borrow_mut().insert(
762                cache_key,
763                AcceptedSchemaQueryCacheEntry {
764                    snapshot: snapshot.clone(),
765                    identity,
766                },
767            );
768        });
769
770        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
771    }
772
773    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
774        &self,
775    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
776    where
777        E: EntityKind<Canister = C>,
778    {
779        let store = self.db.recovered_store(E::Store::PATH)?;
780
781        store.with_schema_mut(|schema_store| {
782            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
783        })
784    }
785
786    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
787        &self,
788        selection: &AcceptedCatalogSnapshotSelection,
789    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
790    where
791        E: EntityKind<Canister = C>,
792    {
793        let cache_key = (self.db.cache_scope_id(), E::PATH);
794        if let Some(entry) =
795            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
796            && entry.identity == selection.identity()
797        {
798            return Ok(Some(AcceptedSchemaCatalogContext::new(
799                entry.snapshot,
800                entry.identity,
801            )));
802        }
803
804        let snapshot = selection.decode_verified()?;
805        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
806            return Ok(None);
807        }
808        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
809
810        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
811            cache.borrow_mut().insert(
812                cache_key,
813                AcceptedSchemaQueryCacheEntry {
814                    snapshot,
815                    identity: selection.identity(),
816                },
817            );
818        });
819
820        Ok(Some(context))
821    }
822
823    #[cfg(feature = "sql")]
824    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
825    where
826        E: EntityKind<Canister = C>,
827    {
828        let cache_key = (self.db.cache_scope_id(), E::PATH);
829        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
830            cache.borrow_mut().remove(&cache_key);
831        });
832    }
833
834    fn load_accepted_schema_snapshot_for_query<E>(
835        &self,
836    ) -> Result<AcceptedSchemaSnapshot, InternalError>
837    where
838        E: EntityKind<Canister = C>,
839    {
840        let store = self.db.recovered_store(E::Store::PATH)?;
841
842        store.with_schema_mut(|schema_store| {
843            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
844                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
845                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
846                    &accepted,
847                    E::MODEL,
848                )
849                .is_ok()
850                {
851                    return Ok(accepted);
852                }
853            }
854
855            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
856        })
857    }
858
859    // Build the accepted schema-info projection for one typed entity. Fluent
860    // terminal adapters use this before constructing slot-bound descriptors so
861    // field slot authority comes from the accepted schema snapshot.
862    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
863        &self,
864    ) -> Result<SchemaInfo, InternalError>
865    where
866        E: EntityKind<Canister = C>,
867    {
868        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
869
870        Ok(catalog.accepted_schema_info_for::<E>())
871    }
872
873    // Derive typed executor authority from an accepted snapshot the caller
874    // already loaded, avoiding a second schema-store pass in SQL write/select
875    // adapters that need both write descriptors and read selector authority.
876    #[cfg(feature = "sql")]
877    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
878        accepted_schema: &AcceptedSchemaSnapshot,
879    ) -> Result<EntityAuthority, InternalError>
880    where
881        E: EntityKind<Canister = C>,
882    {
883        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
884    }
885
886    // Ensure accepted schema metadata is safe for write paths that still encode
887    // rows through generated field contracts. Returning only the snapshot keeps
888    // SQL write type checks unchanged while the schema-runtime contract guard
889    // rejects unsupported layout or payload drift before mutation staging.
890    fn ensure_generated_compatible_accepted_save_schema<E>(
891        &self,
892    ) -> Result<
893        (
894            AcceptedRowDecodeContract,
895            SchemaInfo,
896            CommitSchemaFingerprint,
897        ),
898        InternalError,
899    >
900    where
901        E: EntityKind<Canister = C>,
902    {
903        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
904        let (accepted_row_layout, _) =
905            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
906                &accepted_schema,
907                E::MODEL,
908            )?;
909        let (row_decode_contract, _, schema_info, schema_fingerprint) =
910            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
911
912        Ok((row_decode_contract, schema_info, schema_fingerprint))
913    }
914
915    /// Build one point-in-time storage report for observability endpoints.
916    pub fn storage_report(
917        &self,
918        name_to_path: &[(&'static str, &'static str)],
919    ) -> Result<StorageReport, InternalError> {
920        self.db.storage_report(name_to_path)
921    }
922
923    /// Build one point-in-time storage report using default entity-path labels.
924    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
925        self.db.storage_report_default()
926    }
927
928    /// Build one point-in-time integrity scan report for observability endpoints.
929    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
930        self.db.integrity_report()
931    }
932
933    // ---------------------------------------------------------------------
934    // Low-level executors (crate-internal; execution primitives)
935    // ---------------------------------------------------------------------
936
937    #[must_use]
938    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
939    where
940        E: EntityKind<Canister = C> + EntityValue,
941    {
942        LoadExecutor::new(self.db, self.debug)
943    }
944
945    #[must_use]
946    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
947    where
948        E: PersistedRow<Canister = C> + EntityValue,
949    {
950        DeleteExecutor::new(self.db)
951    }
952
953    #[must_use]
954    pub(in crate::db) const fn save_executor<E>(
955        &self,
956        accepted_row_decode_contract: AcceptedRowDecodeContract,
957        accepted_schema_info: SchemaInfo,
958        accepted_schema_fingerprint: CommitSchemaFingerprint,
959    ) -> SaveExecutor<E>
960    where
961        E: PersistedRow<Canister = C> + EntityValue,
962    {
963        SaveExecutor::new_with_accepted_contract(
964            self.db,
965            self.debug,
966            accepted_row_decode_contract,
967            accepted_schema_info,
968            accepted_schema_fingerprint,
969        )
970    }
971}