Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17#[cfg(feature = "sql")]
18use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
19use crate::{
20    db::{
21        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
22        FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
23        StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
24        commit::CommitSchemaFingerprint,
25        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
26        schema::{
27            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
28            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
29            accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
30            describe_entity_fields, describe_entity_fields_with_persisted_schema,
31            describe_entity_model, describe_entity_model_with_persisted_schema,
32            ensure_accepted_schema_snapshot, show_indexes_for_model,
33            show_indexes_for_model_with_runtime_state,
34            show_indexes_for_schema_info_with_runtime_state,
35        },
36    },
37    error::InternalError,
38    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
39    model::entity::EntityModel,
40    traits::{CanisterKind, EntityKind, EntityValue, Path},
41    value::Value,
42};
43use std::{
44    cell::{OnceCell, RefCell},
45    collections::HashMap,
46    thread::LocalKey,
47};
48
49#[cfg(feature = "diagnostics")]
50pub use query::{
51    DirectDataRowAttribution, FluentTerminalExecutionAttribution, GroupedCountAttribution,
52    GroupedExecutionAttribution, KernelRowAttribution, QueryExecutionAttribution,
53    ScalarAggregateAttribution,
54};
55pub(in crate::db) use response::finalize_scalar_paged_execution;
56pub(in crate::db) use response::finalize_structural_grouped_projection_result;
57#[cfg(feature = "sql")]
58pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
59#[cfg(feature = "sql")]
60pub use sql::{
61    SqlAdminBulkDeletePlan, SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind,
62    SqlDdlPreparationReport, SqlDeleteExecutionBounds, SqlDeleteExposurePolicy,
63    SqlDeleteOrderPolicy, SqlDeletePolicyContext, SqlDeletePolicyRejection, SqlDeletePolicyReport,
64    SqlDeleteReturningBounds, SqlDeleteReturningPolicy, SqlDeleteStatementClassification,
65    SqlDeleteWherePolicy, SqlPublicBoundedDeletePlan, SqlPublicBoundedUpdatePlan,
66    SqlPublicPrimaryKeyDeletePlan, SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentDeletePlan,
67    SqlSessionCurrentUpdatePlan, SqlStatementDispatch, SqlStatementResult,
68    SqlStatementShellSurface, SqlStatementSurface, SqlUpdateAssignmentPolicy,
69    SqlUpdateExposurePolicy, SqlUpdateOrderPolicy, SqlUpdatePolicyContext,
70    SqlUpdatePolicyRejection, SqlUpdatePolicyReport, SqlUpdateReturningBounds,
71    SqlUpdateReturningPolicy, SqlUpdateStatementClassification, SqlUpdateWherePolicy,
72    SqlValidatedDeletePlan, SqlValidatedUpdatePlan, classify_sql_delete_policy,
73    classify_sql_update_policy, sql_statement_dispatch, sql_statement_entity_name,
74    sql_statement_shell_surface, sql_statement_surface,
75};
76#[cfg(all(feature = "sql", feature = "diagnostics"))]
77pub use sql::{
78    SqlCompileAttribution, SqlExecutionAttribution, SqlHybridCoveringAttribution,
79    SqlOutputBlobAttribution, SqlPureCoveringAttribution, SqlQueryCacheAttribution,
80    SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
81};
82#[cfg(all(feature = "sql", feature = "diagnostics"))]
83pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
84
85///
86/// DbSession
87///
88/// Session-scoped database handle with policy (debug, metrics) and execution routing.
89///
90
91pub struct DbSession<C: CanisterKind> {
92    db: Db<C>,
93    debug: bool,
94    metrics: Option<&'static dyn MetricsSink>,
95}
96
97#[cfg(test)]
98#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
99pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
100    pub schema_info_projections: u64,
101    pub persisted_schema_decodes: u64,
102    pub generated_compatible_row_layout_proofs: u64,
103    pub latest_by_entity_calls: u64,
104    pub visible_index_projections: u64,
105}
106
107#[derive(Clone, Debug)]
108struct AcceptedSchemaQueryCacheEntry {
109    snapshot: AcceptedSchemaSnapshot,
110    identity: AcceptedCatalogIdentity,
111}
112
113pub(in crate::db) type AcceptedSaveContract = (
114    AcceptedRowDecodeContract,
115    AcceptedRowDecodeContract,
116    SchemaInfo,
117    CommitSchemaFingerprint,
118);
119
120#[derive(Clone, Debug)]
121pub(in crate::db) struct AcceptedSchemaCatalogContext {
122    snapshot: AcceptedSchemaSnapshot,
123    identity: AcceptedCatalogIdentity,
124    schema_info: OnceCell<SchemaInfo>,
125}
126
127impl AcceptedSchemaCatalogContext {
128    #[must_use]
129    pub(in crate::db) const fn new(
130        snapshot: AcceptedSchemaSnapshot,
131        identity: AcceptedCatalogIdentity,
132    ) -> Self {
133        Self {
134            snapshot,
135            identity,
136            schema_info: OnceCell::new(),
137        }
138    }
139
140    #[must_use]
141    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
142        &self.snapshot
143    }
144
145    #[must_use]
146    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
147        self.identity.accepted_schema_version()
148    }
149
150    #[must_use]
151    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
152        self.identity.accepted_schema_fingerprint()
153    }
154
155    #[must_use]
156    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
157        self.identity.fingerprint_method_version()
158    }
159
160    #[must_use]
161    #[cfg(feature = "sql")]
162    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
163        self.identity
164    }
165
166    fn debug_assert_matches_entity<E>(&self)
167    where
168        E: EntityKind,
169    {
170        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
171        debug_assert_eq!(self.identity.entity_path(), E::PATH);
172        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
173    }
174
175    pub(in crate::db) fn accepted_entity_authority_for<E>(
176        &self,
177    ) -> Result<EntityAuthority, InternalError>
178    where
179        E: EntityKind,
180    {
181        self.debug_assert_matches_entity::<E>();
182        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
183        let (accepted_row_layout, row_proof) =
184            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
185                &self.snapshot,
186                authority.model(),
187            )?;
188        let row_decode_contract = accepted_row_layout.row_decode_contract();
189
190        Ok(authority.with_accepted_row_decode_contract(
191            row_proof,
192            row_decode_contract,
193            self.accepted_schema_info_for::<E>(),
194        ))
195    }
196
197    #[must_use]
198    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
199    where
200        E: EntityKind,
201    {
202        self.debug_assert_matches_entity::<E>();
203        self.schema_info
204            .get_or_init(|| {
205                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
206                    E::MODEL,
207                    &self.snapshot,
208                    true,
209                )
210            })
211            .clone()
212    }
213}
214
215pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
216    accepted_schema: &AcceptedSchemaSnapshot,
217    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
218) -> Result<AcceptedSaveContract, InternalError>
219where
220    E: EntityKind,
221{
222    let row_decode_contract = descriptor.row_decode_contract();
223    let mutation_row_decode_contract = row_decode_contract.clone();
224    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
225    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
226
227    Ok((
228        row_decode_contract,
229        mutation_row_decode_contract,
230        schema_info,
231        schema_fingerprint,
232    ))
233}
234
235thread_local! {
236    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
237    // but repeated read calls should not reload the stable schema snapshot just
238    // to prove an already-warmed cache key. SQL DDL publication invalidates this
239    // heap cache before the next query observes the new accepted schema.
240    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
241        RefCell::new(HashMap::default());
242}
243
244impl<C: CanisterKind> DbSession<C> {
245    /// Construct one session facade for a database handle.
246    #[must_use]
247    pub(crate) const fn new(db: Db<C>) -> Self {
248        Self {
249            db,
250            debug: false,
251            metrics: None,
252        }
253    }
254
255    /// Construct one session facade from store registry and runtime hooks.
256    #[must_use]
257    pub const fn new_with_hooks(
258        store: &'static LocalKey<StoreRegistry>,
259        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
260    ) -> Self {
261        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
262    }
263
264    #[cfg(test)]
265    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
266        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
267        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
268        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
269        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
270        query::reset_visible_index_projection_count_for_tests();
271    }
272
273    #[cfg(test)]
274    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
275    -> AcceptedCatalogRuntimeCounterSnapshot {
276        AcceptedCatalogRuntimeCounterSnapshot {
277            schema_info_projections:
278                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
279            persisted_schema_decodes:
280                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
281            generated_compatible_row_layout_proofs:
282                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
283            latest_by_entity_calls:
284                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
285            visible_index_projections: query::visible_index_projection_count_for_tests(),
286        }
287    }
288
289    /// Enable debug execution behavior where supported by executors.
290    #[must_use]
291    pub const fn debug(mut self) -> Self {
292        self.debug = true;
293        self
294    }
295
296    /// Attach one metrics sink for all session-executed operations.
297    #[must_use]
298    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
299        self.metrics = Some(sink);
300        self
301    }
302
303    // Shared fluent load wrapper construction keeps the session boundary in
304    // one place when load entry points differ only by missing-row policy.
305    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
306    where
307        E: EntityKind<Canister = C>,
308    {
309        FluentLoadQuery::new(self, Query::new(consistency))
310    }
311
312    // Shared fluent delete wrapper construction keeps the delete-mode handoff
313    // explicit at the session boundary instead of reassembling the same query
314    // shell in each public entry point.
315    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
316    where
317        E: PersistedRow<Canister = C>,
318    {
319        FluentDeleteQuery::new(self, Query::new(consistency).delete())
320    }
321
322    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
323        if let Some(sink) = self.metrics {
324            with_metrics_sink(sink, f)
325        } else {
326            f()
327        }
328    }
329
330    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
331    fn execute_save_with<E, T, R>(
332        &self,
333        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
334        map: impl FnOnce(T) -> R,
335    ) -> Result<R, InternalError>
336    where
337        E: PersistedRow<Canister = C> + EntityValue,
338    {
339        let (contract, schema_info, schema_fingerprint) = match self
340            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
341        {
342            Ok(authority) => authority,
343            Err(error) => {
344                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
345
346                return Err(error);
347            }
348        };
349        let value = self.with_metrics(|| {
350            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
351        })?;
352
353        Ok(map(value))
354    }
355
356    // Execute save work after the caller has already proven that the accepted
357    // row contract is generated-compatible. SQL and structural writes use this
358    // after their pre-staging schema guard so mutation staging and save
359    // execution do not rerun schema-store reconciliation in the same statement.
360    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
361        &self,
362        accepted_row_decode_contract: AcceptedRowDecodeContract,
363        accepted_schema_info: SchemaInfo,
364        accepted_schema_fingerprint: CommitSchemaFingerprint,
365        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
366        map: impl FnOnce(T) -> R,
367    ) -> Result<R, InternalError>
368    where
369        E: PersistedRow<Canister = C> + EntityValue,
370    {
371        let value = self.with_metrics(|| {
372            op(self.save_executor::<E>(
373                accepted_row_decode_contract,
374                accepted_schema_info,
375                accepted_schema_fingerprint,
376            ))
377        })?;
378
379        Ok(map(value))
380    }
381
382    // Shared save-facade wrappers keep response shape explicit at call sites.
383    fn execute_save_entity<E>(
384        &self,
385        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
386    ) -> Result<E, InternalError>
387    where
388        E: PersistedRow<Canister = C> + EntityValue,
389    {
390        self.execute_save_with(op, std::convert::identity)
391    }
392
393    fn execute_save_batch<E>(
394        &self,
395        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
396    ) -> Result<WriteBatchResponse<E>, InternalError>
397    where
398        E: PersistedRow<Canister = C> + EntityValue,
399    {
400        self.execute_save_with(op, WriteBatchResponse::new)
401    }
402
403    // ---------------------------------------------------------------------
404    // Query entry points (public, fluent)
405    // ---------------------------------------------------------------------
406
407    /// Start a fluent load query with default missing-row policy (`Ignore`).
408    #[must_use]
409    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
410    where
411        E: EntityKind<Canister = C>,
412    {
413        self.fluent_load_query(MissingRowPolicy::Ignore)
414    }
415
416    /// Start a fluent load query with explicit missing-row policy.
417    #[must_use]
418    pub const fn load_with_consistency<E>(
419        &self,
420        consistency: MissingRowPolicy,
421    ) -> FluentLoadQuery<'_, E>
422    where
423        E: EntityKind<Canister = C>,
424    {
425        self.fluent_load_query(consistency)
426    }
427
428    /// Start a fluent delete query with default missing-row policy (`Ignore`).
429    #[must_use]
430    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
431    where
432        E: PersistedRow<Canister = C>,
433    {
434        self.fluent_delete_query(MissingRowPolicy::Ignore)
435    }
436
437    /// Start a fluent delete query with explicit missing-row policy.
438    #[must_use]
439    pub fn delete_with_consistency<E>(
440        &self,
441        consistency: MissingRowPolicy,
442    ) -> FluentDeleteQuery<'_, E>
443    where
444        E: PersistedRow<Canister = C>,
445    {
446        self.fluent_delete_query(consistency)
447    }
448
449    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
450    ///
451    /// This terminal bypasses query planning and access routing entirely.
452    #[must_use]
453    pub const fn select_one(&self) -> Value {
454        Value::Int64(1)
455    }
456
457    /// Return one stable, human-readable index listing for the entity schema.
458    ///
459    /// Output format mirrors SQL-style introspection:
460    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
461    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
462    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
463    #[must_use]
464    pub fn show_indexes<E>(&self) -> Vec<String>
465    where
466        E: EntityKind<Canister = C>,
467    {
468        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
469    }
470
471    /// Return one stable, human-readable index listing for one schema model.
472    ///
473    /// This model-only helper is schema-owned and intentionally does not
474    /// attach runtime lifecycle state because it does not carry store
475    /// placement context.
476    #[must_use]
477    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
478        show_indexes_for_model(model)
479    }
480
481    /// Return one stable, human-readable index listing for the accepted schema.
482    ///
483    /// Unlike `show_indexes`, this fallible live-schema helper reflects
484    /// accepted DDL-created indexes as well as compiled schema indexes.
485    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
486    where
487        E: EntityKind<Canister = C>,
488    {
489        let schema = self.accepted_schema_info_for_entity::<E>()?;
490
491        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
492    }
493
494    // Return one stable, human-readable index listing for one resolved
495    // store/model pair, attaching the current runtime lifecycle state when the
496    // registry can resolve the backing store handle.
497    pub(in crate::db) fn show_indexes_for_store_model(
498        &self,
499        store_path: &str,
500        model: &'static EntityModel,
501    ) -> Vec<String> {
502        let runtime_state = self
503            .db
504            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
505            .map(|store| store.index_state());
506
507        show_indexes_for_model_with_runtime_state(model, runtime_state)
508    }
509
510    // Return one stable, human-readable index listing for one resolved
511    // store/accepted-schema pair, attaching the current runtime lifecycle state
512    // when the registry can resolve the backing store handle.
513    pub(in crate::db) fn show_indexes_for_store_schema_info(
514        &self,
515        store_path: &str,
516        schema: &SchemaInfo,
517    ) -> Vec<String> {
518        let runtime_state = self
519            .db
520            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
521            .map(|store| store.index_state());
522
523        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
524    }
525
526    /// Return one stable generated-model list of field descriptors.
527    ///
528    /// This infallible Rust metadata helper intentionally reports the compiled
529    /// schema model. Use `try_show_columns` for the accepted persisted-schema
530    /// view used by SQL and diagnostics surfaces.
531    #[must_use]
532    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
533    where
534        E: EntityKind<Canister = C>,
535    {
536        self.show_columns_for_model(E::MODEL)
537    }
538
539    /// Return one stable generated-model list of field descriptors.
540    #[must_use]
541    pub fn show_columns_for_model(
542        &self,
543        model: &'static EntityModel,
544    ) -> Vec<EntityFieldDescription> {
545        describe_entity_fields(model)
546    }
547
548    /// Return field descriptors using the accepted persisted schema snapshot.
549    ///
550    /// This fallible variant is intended for SQL and diagnostics surfaces that
551    /// can report schema reconciliation failures. The infallible
552    /// `show_columns` helper remains generated-model based.
553    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
554    where
555        E: EntityKind<Canister = C>,
556    {
557        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
558
559        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
560    }
561
562    /// Return one stable list of runtime-registered entity catalog entries.
563    ///
564    /// # Panics
565    ///
566    /// Panics if the runtime session cannot read its registered entity catalog.
567    /// Use `try_show_entities` when the caller can report the internal error.
568    #[must_use]
569    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
570        self.try_show_entities().expect("session invariant")
571    }
572
573    /// Return one stable list of runtime-registered entity catalog entries.
574    pub fn try_show_entities(
575        &self,
576    ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
577        self.db.runtime_entity_catalog()
578    }
579
580    /// Return one stable list of runtime-registered stores.
581    #[must_use]
582    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
583        self.db.runtime_store_catalog()
584    }
585
586    /// Return one stable list of runtime-registered stable-memory allocations.
587    #[must_use]
588    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
589        self.db.runtime_memory_catalog()
590    }
591
592    // Resolve the exact secondary-index set that is visible to planner-owned
593    // query planning for one recovered store and accepted schema pair.
594    #[cfg(feature = "sql")]
595    fn visible_indexes_for_store_accepted_schema(
596        &self,
597        store_path: &str,
598        schema_info: &SchemaInfo,
599    ) -> Result<VisibleIndexes<'static>, QueryError> {
600        // Phase 1: resolve the recovered store state once at the session
601        // boundary so query/executor planning does not reopen lifecycle checks.
602        let store = self
603            .db
604            .recovered_store(store_path)
605            .map_err(QueryError::execute)?;
606        let state = store.index_state();
607        if state != IndexState::Ready {
608            return Ok(VisibleIndexes::none());
609        }
610        debug_assert_eq!(state, IndexState::Ready);
611
612        // Phase 2: planner-visible indexes are accepted schema contracts once
613        // the recovered store is query-visible.
614        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
615        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
616        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
617        debug_assert_eq!(
618            visible_indexes.accepted_expression_index_count(),
619            Some(visible_indexes.accepted_expression_indexes().len()),
620        );
621
622        Ok(visible_indexes)
623    }
624
625    /// Return one generated-model schema description for the entity.
626    ///
627    /// This is a typed `DESCRIBE`-style introspection surface consumed by
628    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
629    /// schema view is required.
630    #[must_use]
631    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
632    where
633        E: EntityKind<Canister = C>,
634    {
635        self.describe_entity_model(E::MODEL)
636    }
637
638    /// Return one generated-model schema description for one schema model.
639    #[must_use]
640    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
641        describe_entity_model(model)
642    }
643
644    /// Return a schema description using the accepted persisted schema snapshot.
645    ///
646    /// This is the live-schema counterpart to `describe_entity`. It is fallible
647    /// because loading accepted schema authority can fail if startup
648    /// reconciliation rejects the stored metadata.
649    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
650    where
651        E: EntityKind<Canister = C>,
652    {
653        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
654
655        Ok(describe_entity_model_with_persisted_schema(
656            E::MODEL,
657            &snapshot,
658        ))
659    }
660
661    // Ensure and return the accepted schema snapshot for one generated entity.
662    // This may write the first snapshot for an empty store; otherwise it loads
663    // the latest stored snapshot and applies the current exact-match policy.
664    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
665    where
666        E: EntityKind<Canister = C>,
667    {
668        let store = self.db.recovered_store(E::Store::PATH)?;
669
670        store.with_schema_mut(|schema_store| {
671            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
672        })
673    }
674
675    // Load the current accepted schema snapshot for read/query paths without
676    // rerunning generated proposal reconciliation on every cold query call.
677    // Startup and write paths still own reconciliation; the fallback only keeps
678    // first-use test stores and freshly initialized local stores functional.
679    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
680        &self,
681    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
682    where
683        E: EntityKind<Canister = C>,
684    {
685        let cache_key = (self.db.cache_scope_id(), E::PATH);
686        if let Some(entry) =
687            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
688        {
689            return Ok(AcceptedSchemaCatalogContext::new(
690                entry.snapshot,
691                entry.identity,
692            ));
693        }
694
695        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
696        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
697        let identity = AcceptedCatalogIdentity::new(
698            E::ENTITY_TAG,
699            E::PATH,
700            E::Store::PATH,
701            snapshot.persisted_snapshot().version(),
702            fingerprint,
703        );
704        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
705            cache.borrow_mut().insert(
706                cache_key,
707                AcceptedSchemaQueryCacheEntry {
708                    snapshot: snapshot.clone(),
709                    identity,
710                },
711            );
712        });
713
714        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
715    }
716
717    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
718        &self,
719    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
720    where
721        E: EntityKind<Canister = C>,
722    {
723        let store = self.db.recovered_store(E::Store::PATH)?;
724
725        store.with_schema_mut(|schema_store| {
726            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
727        })
728    }
729
730    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
731        &self,
732        selection: &AcceptedCatalogSnapshotSelection,
733    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
734    where
735        E: EntityKind<Canister = C>,
736    {
737        let cache_key = (self.db.cache_scope_id(), E::PATH);
738        if let Some(entry) =
739            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
740            && entry.identity == selection.identity()
741        {
742            return Ok(Some(AcceptedSchemaCatalogContext::new(
743                entry.snapshot,
744                entry.identity,
745            )));
746        }
747
748        let snapshot = selection.decode_verified()?;
749        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
750            return Ok(None);
751        }
752        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
753
754        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
755            cache.borrow_mut().insert(
756                cache_key,
757                AcceptedSchemaQueryCacheEntry {
758                    snapshot,
759                    identity: selection.identity(),
760                },
761            );
762        });
763
764        Ok(Some(context))
765    }
766
767    #[cfg(feature = "sql")]
768    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
769    where
770        E: EntityKind<Canister = C>,
771    {
772        let cache_key = (self.db.cache_scope_id(), E::PATH);
773        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
774            cache.borrow_mut().remove(&cache_key);
775        });
776    }
777
778    fn load_accepted_schema_snapshot_for_query<E>(
779        &self,
780    ) -> Result<AcceptedSchemaSnapshot, InternalError>
781    where
782        E: EntityKind<Canister = C>,
783    {
784        let store = self.db.recovered_store(E::Store::PATH)?;
785
786        store.with_schema_mut(|schema_store| {
787            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
788                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
789                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
790                    &accepted,
791                    E::MODEL,
792                )
793                .is_ok()
794                {
795                    return Ok(accepted);
796                }
797            }
798
799            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
800        })
801    }
802
803    // Build the accepted schema-info projection for one typed entity. Fluent
804    // terminal adapters use this before constructing slot-bound descriptors so
805    // field slot authority comes from the accepted schema snapshot.
806    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
807        &self,
808    ) -> Result<SchemaInfo, InternalError>
809    where
810        E: EntityKind<Canister = C>,
811    {
812        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
813
814        Ok(catalog.accepted_schema_info_for::<E>())
815    }
816
817    // Derive typed executor authority from an accepted snapshot the caller
818    // already loaded, avoiding a second schema-store pass in SQL write/select
819    // adapters that need both write descriptors and read selector authority.
820    #[cfg(feature = "sql")]
821    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
822        accepted_schema: &AcceptedSchemaSnapshot,
823    ) -> Result<EntityAuthority, InternalError>
824    where
825        E: EntityKind<Canister = C>,
826    {
827        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
828    }
829
830    // Ensure accepted schema metadata is safe for write paths that still encode
831    // rows through generated field contracts. Returning only the snapshot keeps
832    // SQL write type checks unchanged while the schema-runtime contract guard
833    // rejects unsupported layout or payload drift before mutation staging.
834    fn ensure_generated_compatible_accepted_save_schema<E>(
835        &self,
836    ) -> Result<
837        (
838            AcceptedRowDecodeContract,
839            SchemaInfo,
840            CommitSchemaFingerprint,
841        ),
842        InternalError,
843    >
844    where
845        E: EntityKind<Canister = C>,
846    {
847        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
848        let (accepted_row_layout, _) =
849            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
850                &accepted_schema,
851                E::MODEL,
852            )?;
853        let (row_decode_contract, _, schema_info, schema_fingerprint) =
854            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
855
856        Ok((row_decode_contract, schema_info, schema_fingerprint))
857    }
858
859    /// Build one point-in-time storage report for observability endpoints.
860    pub fn storage_report(
861        &self,
862        name_to_path: &[(&'static str, &'static str)],
863    ) -> Result<StorageReport, InternalError> {
864        self.db.storage_report(name_to_path)
865    }
866
867    /// Build one point-in-time storage report using default entity-path labels.
868    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
869        self.db.storage_report_default()
870    }
871
872    /// Build one point-in-time integrity scan report for observability endpoints.
873    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
874        self.db.integrity_report()
875    }
876
877    // ---------------------------------------------------------------------
878    // Low-level executors (crate-internal; execution primitives)
879    // ---------------------------------------------------------------------
880
881    #[must_use]
882    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
883    where
884        E: EntityKind<Canister = C> + EntityValue,
885    {
886        LoadExecutor::new(self.db, self.debug)
887    }
888
889    #[must_use]
890    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
891    where
892        E: PersistedRow<Canister = C> + EntityValue,
893    {
894        DeleteExecutor::new(self.db)
895    }
896
897    #[must_use]
898    pub(in crate::db) const fn save_executor<E>(
899        &self,
900        accepted_row_decode_contract: AcceptedRowDecodeContract,
901        accepted_schema_info: SchemaInfo,
902        accepted_schema_fingerprint: CommitSchemaFingerprint,
903    ) -> SaveExecutor<E>
904    where
905        E: PersistedRow<Canister = C> + EntityValue,
906    {
907        SaveExecutor::new_with_accepted_contract(
908            self.db,
909            self.debug,
910            accepted_row_decode_contract,
911            accepted_schema_info,
912            accepted_schema_fingerprint,
913        )
914    }
915}