Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod bounded_cache;
7mod query;
8mod response;
9#[cfg(feature = "sql")]
10mod sql;
11///
12/// TESTS
13///
14#[cfg(all(test, feature = "sql"))]
15mod tests;
16mod write;
17
18#[cfg(any(test, feature = "sql-explain"))]
19use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
20use crate::{
21    db::{
22        Db, EntityCatalogCounts, EntityCatalogDescription, EntityFieldDescription,
23        EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
24        IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
25        StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
26        commit::CommitSchemaFingerprint,
27        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
28        schema::{
29            AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
30            AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, PersistedFieldKind,
31            PersistedFieldSnapshot, SchemaInfo, SchemaVersion, accepted_commit_schema_fingerprint,
32            accepted_schema_cache_fingerprint, describe_entity_fields,
33            describe_entity_fields_with_persisted_schema, describe_entity_model,
34            describe_entity_model_with_persisted_schema, ensure_accepted_schema_snapshot,
35            show_indexes_for_model, show_indexes_for_model_with_runtime_state,
36            show_indexes_for_schema_info_with_runtime_state,
37        },
38    },
39    error::InternalError,
40    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
41    model::entity::EntityModel,
42    traits::{CanisterKind, EntityKind, EntityValue, Path},
43    value::Value,
44};
45use std::{
46    cell::{OnceCell, RefCell},
47    collections::HashMap,
48    thread::LocalKey,
49};
50
51#[cfg(feature = "diagnostics")]
52pub use query::{
53    DirectDataRowAttribution, FluentTerminalExecutionAttribution, GroupedCountAttribution,
54    GroupedExecutionAttribution, KernelRowAttribution, QueryExecutionAttribution,
55    ScalarAggregateAttribution,
56};
57pub(in crate::db) use response::finalize_scalar_paged_execution;
58pub(in crate::db) use response::finalize_structural_grouped_projection_result;
59#[cfg(feature = "sql")]
60pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
61#[cfg(feature = "sql")]
62pub use sql::{
63    SqlAdminBulkDeletePlan, SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind,
64    SqlDdlPreparationReport, SqlDeleteExposurePolicy, SqlDeletePolicyContext,
65    SqlDeletePolicyRejection, SqlDeletePolicyReport, SqlDeleteStatementClassification,
66    SqlPublicBoundedDeletePlan, SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyDeletePlan,
67    SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentDeletePlan, SqlSessionCurrentUpdatePlan,
68    SqlStatementDispatch, SqlStatementResult, SqlStatementShellSurface, SqlStatementSurface,
69    SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy, SqlUpdatePolicyContext,
70    SqlUpdatePolicyRejection, SqlUpdatePolicyReport, SqlUpdateStatementClassification,
71    SqlValidatedDeletePlan, SqlValidatedUpdatePlan, SqlWriteExecutionBounds, SqlWriteOrderProof,
72    SqlWriteReturningBounds, SqlWriteReturningShape, SqlWriteStatementShape, SqlWriteWhereProof,
73    classify_sql_delete_policy, classify_sql_update_policy, sql_statement_dispatch,
74    sql_statement_entity_name, sql_statement_shell_surface, sql_statement_surface,
75};
76#[cfg(all(feature = "sql", feature = "diagnostics"))]
77pub use sql::{
78    SqlCompileAttribution, SqlExecutionAttribution, SqlHybridCoveringAttribution,
79    SqlOutputBlobAttribution, SqlPureCoveringAttribution, SqlQueryCacheAttribution,
80    SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
81};
82#[cfg(all(feature = "sql", feature = "diagnostics"))]
83pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
84
85///
86/// DbSession
87///
88/// Session-scoped database handle with policy (debug, metrics) and execution routing.
89///
90
91pub struct DbSession<C: CanisterKind> {
92    db: Db<C>,
93    debug: bool,
94    metrics: Option<&'static dyn MetricsSink>,
95}
96
97#[cfg(test)]
98#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
99pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
100    pub schema_info_projections: u64,
101    pub persisted_schema_decodes: u64,
102    pub generated_compatible_row_layout_proofs: u64,
103    pub latest_by_entity_calls: u64,
104    pub visible_index_projections: u64,
105}
106
107#[derive(Clone, Debug)]
108struct AcceptedSchemaQueryCacheEntry {
109    snapshot: AcceptedSchemaSnapshot,
110    identity: AcceptedCatalogIdentity,
111}
112
113type AcceptedSchemaQueryCacheKey = (usize, &'static str);
114
115pub(in crate::db) type AcceptedSaveContract = (
116    AcceptedRowDecodeContract,
117    AcceptedRowDecodeContract,
118    SchemaInfo,
119    CommitSchemaFingerprint,
120);
121
122#[derive(Clone, Debug)]
123pub(in crate::db) struct AcceptedSchemaCatalogContext {
124    snapshot: AcceptedSchemaSnapshot,
125    identity: AcceptedCatalogIdentity,
126    schema_info: OnceCell<SchemaInfo>,
127}
128
129impl AcceptedSchemaCatalogContext {
130    #[must_use]
131    pub(in crate::db) const fn new(
132        snapshot: AcceptedSchemaSnapshot,
133        identity: AcceptedCatalogIdentity,
134    ) -> Self {
135        Self {
136            snapshot,
137            identity,
138            schema_info: OnceCell::new(),
139        }
140    }
141
142    #[must_use]
143    pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
144        &self.snapshot
145    }
146
147    #[must_use]
148    pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
149        self.identity.accepted_schema_version()
150    }
151
152    #[must_use]
153    pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
154        self.identity.accepted_schema_fingerprint()
155    }
156
157    #[must_use]
158    pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
159        self.identity.fingerprint_method_version()
160    }
161
162    #[must_use]
163    #[cfg(feature = "sql")]
164    pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
165        self.identity
166    }
167
168    fn debug_assert_matches_entity<E>(&self)
169    where
170        E: EntityKind,
171    {
172        debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
173        debug_assert_eq!(self.identity.entity_path(), E::PATH);
174        debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
175    }
176
177    pub(in crate::db) fn accepted_entity_authority_for<E>(
178        &self,
179    ) -> Result<EntityAuthority, InternalError>
180    where
181        E: EntityKind,
182    {
183        let schema_info = self.accepted_schema_info_for::<E>();
184
185        self.accepted_entity_authority_for_schema_info::<E>(schema_info)
186    }
187
188    fn accepted_entity_authority_for_schema_info<E>(
189        &self,
190        schema_info: SchemaInfo,
191    ) -> Result<EntityAuthority, InternalError>
192    where
193        E: EntityKind,
194    {
195        self.debug_assert_matches_entity::<E>();
196        let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
197        let (accepted_row_layout, row_proof) =
198            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
199                &self.snapshot,
200                authority.model(),
201            )?;
202        let row_decode_contract = accepted_row_layout.row_decode_contract();
203
204        Ok(
205            authority.with_accepted_row_decode_contract(
206                row_proof,
207                row_decode_contract,
208                schema_info,
209            ),
210        )
211    }
212
213    #[cfg(feature = "sql")]
214    pub(in crate::db) fn accepted_entity_authority_and_schema_info_for<E>(
215        &self,
216    ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
217    where
218        E: EntityKind,
219    {
220        let schema_info = self.accepted_schema_info_for::<E>();
221        let authority = self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?;
222
223        Ok((authority, schema_info))
224    }
225
226    #[cfg(feature = "sql")]
227    pub(in crate::db) fn accepted_or_provided_entity_authority_for<E>(
228        &self,
229        accepted_authority: Option<&EntityAuthority>,
230    ) -> Result<EntityAuthority, InternalError>
231    where
232        E: EntityKind,
233    {
234        match accepted_authority {
235            Some(authority) => Ok(authority.clone()),
236            None => self.accepted_entity_authority_for::<E>(),
237        }
238    }
239
240    #[cfg(feature = "sql-explain")]
241    pub(in crate::db) fn accepted_or_provided_entity_authority_and_schema_info_for<E>(
242        &self,
243        accepted_authority: Option<&EntityAuthority>,
244    ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
245    where
246        E: EntityKind,
247    {
248        let schema_info = self.accepted_schema_info_for::<E>();
249        let authority = match accepted_authority {
250            Some(authority) => authority.clone(),
251            None => self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?,
252        };
253
254        Ok((authority, schema_info))
255    }
256
257    #[must_use]
258    pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
259    where
260        E: EntityKind,
261    {
262        self.debug_assert_matches_entity::<E>();
263        self.schema_info
264            .get_or_init(|| {
265                SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
266                    E::MODEL,
267                    &self.snapshot,
268                    true,
269                )
270            })
271            .clone()
272    }
273}
274
275pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
276    accepted_schema: &AcceptedSchemaSnapshot,
277    descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
278) -> Result<AcceptedSaveContract, InternalError>
279where
280    E: EntityKind,
281{
282    let row_decode_contract = descriptor.row_decode_contract();
283    let mutation_row_decode_contract = row_decode_contract.clone();
284    let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
285    let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
286
287    Ok((
288        row_decode_contract,
289        mutation_row_decode_contract,
290        schema_info,
291        schema_fingerprint,
292    ))
293}
294
295fn relation_field_count(fields: &[PersistedFieldSnapshot]) -> usize {
296    fields
297        .iter()
298        .filter(|field| persisted_kind_is_relation_field(field.kind()))
299        .count()
300}
301
302fn persisted_kind_is_relation_field(kind: &PersistedFieldKind) -> bool {
303    match kind {
304        PersistedFieldKind::Relation { .. } => true,
305        PersistedFieldKind::List(inner) | PersistedFieldKind::Set(inner) => {
306            matches!(inner.as_ref(), PersistedFieldKind::Relation { .. })
307        }
308        _ => false,
309    }
310}
311
312thread_local! {
313    // Query-side SQL/fluent cache setup needs accepted runtime schema authority,
314    // but repeated read calls should not reload the stable schema snapshot just
315    // to prove an already-warmed cache key. SQL DDL publication invalidates this
316    // heap cache before the next query observes the new accepted schema.
317    static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
318        RefCell::new(HashMap::default());
319}
320
321impl<C: CanisterKind> DbSession<C> {
322    /// Construct one session facade for a database handle.
323    #[must_use]
324    pub(crate) const fn new(db: Db<C>) -> Self {
325        Self {
326            db,
327            debug: false,
328            metrics: None,
329        }
330    }
331
332    /// Construct one session facade from store registry and runtime hooks.
333    #[must_use]
334    pub const fn new_with_hooks(
335        store: &'static LocalKey<StoreRegistry>,
336        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
337    ) -> Self {
338        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
339    }
340
341    #[cfg(test)]
342    pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
343        crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
344        crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
345        crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
346        crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
347        query::reset_visible_index_projection_count_for_tests();
348    }
349
350    #[cfg(test)]
351    pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
352    -> AcceptedCatalogRuntimeCounterSnapshot {
353        AcceptedCatalogRuntimeCounterSnapshot {
354            schema_info_projections:
355                crate::db::schema::accepted_schema_info_projection_count_for_tests(),
356            persisted_schema_decodes:
357                crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
358            generated_compatible_row_layout_proofs:
359                crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
360            latest_by_entity_calls:
361                crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
362            visible_index_projections: query::visible_index_projection_count_for_tests(),
363        }
364    }
365
366    /// Enable debug execution behavior where supported by executors.
367    #[must_use]
368    pub const fn debug(mut self) -> Self {
369        self.debug = true;
370        self
371    }
372
373    /// Attach one metrics sink for all session-executed operations.
374    #[must_use]
375    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
376        self.metrics = Some(sink);
377        self
378    }
379
380    // Shared fluent load wrapper construction keeps the session boundary in
381    // one place when load entry points differ only by missing-row policy.
382    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
383    where
384        E: EntityKind<Canister = C>,
385    {
386        FluentLoadQuery::new(self, Query::new(consistency))
387    }
388
389    // Shared fluent delete wrapper construction keeps the delete-mode handoff
390    // explicit at the session boundary instead of reassembling the same query
391    // shell in each public entry point.
392    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
393    where
394        E: PersistedRow<Canister = C>,
395    {
396        FluentDeleteQuery::new(self, Query::new(consistency).delete())
397    }
398
399    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
400        if let Some(sink) = self.metrics {
401            with_metrics_sink(sink, f)
402        } else {
403            f()
404        }
405    }
406
407    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
408    fn execute_save_with<E, T, R>(
409        &self,
410        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
411        map: impl FnOnce(T) -> R,
412    ) -> Result<R, InternalError>
413    where
414        E: PersistedRow<Canister = C> + EntityValue,
415    {
416        let (contract, schema_info, schema_fingerprint) = match self
417            .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
418        {
419            Ok(authority) => authority,
420            Err(error) => {
421                self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
422
423                return Err(error);
424            }
425        };
426        let value = self.with_metrics(|| {
427            op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
428        })?;
429
430        Ok(map(value))
431    }
432
433    // Execute save work after the caller has already proven that the accepted
434    // row contract is generated-compatible. SQL and structural writes use this
435    // after their pre-staging schema guard so mutation staging and save
436    // execution do not rerun schema-store reconciliation in the same statement.
437    fn execute_save_with_checked_accepted_row_contract<E, T, R>(
438        &self,
439        accepted_row_decode_contract: AcceptedRowDecodeContract,
440        accepted_schema_info: SchemaInfo,
441        accepted_schema_fingerprint: CommitSchemaFingerprint,
442        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
443        map: impl FnOnce(T) -> R,
444    ) -> Result<R, InternalError>
445    where
446        E: PersistedRow<Canister = C> + EntityValue,
447    {
448        let value = self.with_metrics(|| {
449            op(self.save_executor::<E>(
450                accepted_row_decode_contract,
451                accepted_schema_info,
452                accepted_schema_fingerprint,
453            ))
454        })?;
455
456        Ok(map(value))
457    }
458
459    // Shared save-facade wrappers keep response shape explicit at call sites.
460    fn execute_save_entity<E>(
461        &self,
462        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
463    ) -> Result<E, InternalError>
464    where
465        E: PersistedRow<Canister = C> + EntityValue,
466    {
467        self.execute_save_with(op, std::convert::identity)
468    }
469
470    fn execute_save_batch<E>(
471        &self,
472        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
473    ) -> Result<WriteBatchResponse<E>, InternalError>
474    where
475        E: PersistedRow<Canister = C> + EntityValue,
476    {
477        self.execute_save_with(op, WriteBatchResponse::new)
478    }
479
480    // ---------------------------------------------------------------------
481    // Query entry points (public, fluent)
482    // ---------------------------------------------------------------------
483
484    /// Start a fluent load query with default missing-row policy (`Ignore`).
485    #[must_use]
486    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
487    where
488        E: EntityKind<Canister = C>,
489    {
490        self.fluent_load_query(MissingRowPolicy::Ignore)
491    }
492
493    /// Start a fluent load query with explicit missing-row policy.
494    #[must_use]
495    pub const fn load_with_consistency<E>(
496        &self,
497        consistency: MissingRowPolicy,
498    ) -> FluentLoadQuery<'_, E>
499    where
500        E: EntityKind<Canister = C>,
501    {
502        self.fluent_load_query(consistency)
503    }
504
505    /// Start a fluent delete query with default missing-row policy (`Ignore`).
506    #[must_use]
507    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
508    where
509        E: PersistedRow<Canister = C>,
510    {
511        self.fluent_delete_query(MissingRowPolicy::Ignore)
512    }
513
514    /// Start a fluent delete query with explicit missing-row policy.
515    #[must_use]
516    pub fn delete_with_consistency<E>(
517        &self,
518        consistency: MissingRowPolicy,
519    ) -> FluentDeleteQuery<'_, E>
520    where
521        E: PersistedRow<Canister = C>,
522    {
523        self.fluent_delete_query(consistency)
524    }
525
526    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
527    ///
528    /// This terminal bypasses query planning and access routing entirely.
529    #[must_use]
530    pub const fn select_one(&self) -> Value {
531        Value::Int64(1)
532    }
533
534    /// Return one stable, human-readable index listing for the entity schema.
535    ///
536    /// Output format mirrors SQL-style introspection:
537    /// - `PRIMARY KEY (field) [state=ready] [origin=generated]`
538    /// - `INDEX name (field_a, field_b) [state=ready] [origin=generated]`
539    /// - `UNIQUE INDEX name (field_a, field_b) [state=ready] [origin=generated]`
540    #[must_use]
541    pub fn show_indexes<E>(&self) -> Vec<String>
542    where
543        E: EntityKind<Canister = C>,
544    {
545        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
546    }
547
548    /// Return one stable, human-readable index listing for one schema model.
549    ///
550    /// This model-only helper is schema-owned and intentionally does not
551    /// attach runtime lifecycle state because it does not carry store
552    /// placement context.
553    #[must_use]
554    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
555        show_indexes_for_model(model)
556    }
557
558    /// Return one stable, human-readable index listing for the accepted schema.
559    ///
560    /// Unlike `show_indexes`, this fallible live-schema helper reflects
561    /// accepted DDL-created indexes as well as compiled schema indexes.
562    pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
563    where
564        E: EntityKind<Canister = C>,
565    {
566        let schema = self.accepted_schema_info_for_entity::<E>()?;
567
568        Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
569    }
570
571    // Return one stable, human-readable index listing for one resolved
572    // store/model pair, attaching the current runtime lifecycle state when the
573    // registry can resolve the backing store handle.
574    pub(in crate::db) fn show_indexes_for_store_model(
575        &self,
576        store_path: &str,
577        model: &'static EntityModel,
578    ) -> Vec<String> {
579        let runtime_state = self
580            .db
581            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
582            .map(|store| store.index_state());
583
584        show_indexes_for_model_with_runtime_state(model, runtime_state)
585    }
586
587    // Return one stable, human-readable index listing for one resolved
588    // store/accepted-schema pair, attaching the current runtime lifecycle state
589    // when the registry can resolve the backing store handle.
590    pub(in crate::db) fn show_indexes_for_store_schema_info(
591        &self,
592        store_path: &str,
593        schema: &SchemaInfo,
594    ) -> Vec<String> {
595        let runtime_state = self
596            .db
597            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
598            .map(|store| store.index_state());
599
600        show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
601    }
602
603    /// Return one stable generated-model list of field descriptors.
604    ///
605    /// This infallible Rust metadata helper intentionally reports the compiled
606    /// schema model. Use `try_show_columns` for the accepted persisted-schema
607    /// view used by SQL and diagnostics surfaces.
608    #[must_use]
609    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
610    where
611        E: EntityKind<Canister = C>,
612    {
613        self.show_columns_for_model(E::MODEL)
614    }
615
616    /// Return one stable generated-model list of field descriptors.
617    #[must_use]
618    pub fn show_columns_for_model(
619        &self,
620        model: &'static EntityModel,
621    ) -> Vec<EntityFieldDescription> {
622        describe_entity_fields(model)
623    }
624
625    /// Return field descriptors using the accepted persisted schema snapshot.
626    ///
627    /// This fallible variant is intended for SQL and diagnostics surfaces that
628    /// can report schema reconciliation failures. The infallible
629    /// `show_columns` helper remains generated-model based.
630    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
631    where
632        E: EntityKind<Canister = C>,
633    {
634        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
635
636        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
637    }
638
639    /// Return one stable list of runtime-registered entity catalog entries.
640    ///
641    /// # Panics
642    ///
643    /// Panics if the runtime session cannot read its registered entity catalog.
644    /// Use `try_show_entities` when the caller can report the internal error.
645    #[must_use]
646    pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
647        self.try_show_entities().expect("session invariant")
648    }
649
650    /// Return one stable list of runtime-registered entity catalog entries.
651    pub fn try_show_entities(&self) -> Result<Vec<EntityCatalogDescription>, InternalError> {
652        let mut entities = Vec::with_capacity(self.db.entity_runtime_hooks.len());
653
654        for hooks in self.db.entity_runtime_hooks {
655            let store = self.db.recovered_store(hooks.store_path)?;
656            let storage = store
657                .storage_capabilities()
658                .storage_mode()
659                .as_str()
660                .to_string();
661            let accepted = self.accepted_schema_catalog_context_for_runtime_hook(hooks, store)?;
662            let snapshot = accepted.snapshot().persisted_snapshot();
663
664            entities.push(EntityCatalogDescription::new(
665                snapshot.entity_name().to_string(),
666                snapshot.entity_path().to_string(),
667                hooks.store_path.to_string(),
668                storage,
669                EntityCatalogCounts::new(
670                    u32::try_from(snapshot.fields().len()).unwrap_or(u32::MAX),
671                    u32::try_from(snapshot.indexes().len()).unwrap_or(u32::MAX),
672                    u32::try_from(relation_field_count(snapshot.fields())).unwrap_or(u32::MAX),
673                    snapshot.version().get(),
674                ),
675            ));
676        }
677
678        Ok(entities)
679    }
680
681    fn accepted_schema_catalog_context_for_runtime_hook(
682        &self,
683        hooks: &EntityRuntimeHooks<C>,
684        store: crate::db::registry::StoreHandle,
685    ) -> Result<AcceptedSchemaCatalogContext, InternalError> {
686        let cache_key = self.accepted_schema_query_cache_key(hooks.entity_path);
687        if let Some(context) =
688            Self::accepted_schema_catalog_context_from_runtime_hook_cache(cache_key, hooks, store)?
689        {
690            return Ok(context);
691        }
692
693        let snapshot = Self::load_accepted_schema_snapshot_for_runtime_hook(hooks, store)?;
694        let identity = AcceptedCatalogIdentity::new(
695            hooks.entity_tag,
696            hooks.entity_path,
697            hooks.store_path,
698            snapshot.persisted_snapshot().version(),
699            accepted_schema_cache_fingerprint(&snapshot)?,
700        );
701        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), identity);
702        Self::insert_accepted_schema_query_cache(cache_key, snapshot, identity);
703
704        Ok(context)
705    }
706
707    fn accepted_schema_catalog_context_from_runtime_hook_cache(
708        cache_key: AcceptedSchemaQueryCacheKey,
709        hooks: &EntityRuntimeHooks<C>,
710        store: crate::db::registry::StoreHandle,
711    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError> {
712        let selection = store.with_schema_mut(|schema_store| {
713            schema_store.latest_catalog_identity(
714                hooks.entity_tag,
715                hooks.entity_path,
716                hooks.store_path,
717            )
718        })?;
719        if let Some(selection) = selection
720            && let Some(context) = Self::accepted_schema_catalog_context_from_query_cache(
721                cache_key,
722                selection.identity(),
723            )
724        {
725            return Ok(Some(context));
726        }
727
728        Ok(None)
729    }
730
731    fn load_accepted_schema_snapshot_for_runtime_hook(
732        hooks: &EntityRuntimeHooks<C>,
733        store: crate::db::registry::StoreHandle,
734    ) -> Result<AcceptedSchemaSnapshot, InternalError> {
735        store.with_schema_mut(|schema_store| {
736            if let Some(snapshot) = schema_store.latest_persisted_snapshot(hooks.entity_tag)? {
737                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
738                if accepted.entity_path() == hooks.entity_path {
739                    return Ok(accepted);
740                }
741            }
742
743            ensure_accepted_schema_snapshot(
744                schema_store,
745                hooks.entity_tag,
746                hooks.entity_path,
747                hooks.model,
748            )
749        })
750    }
751
752    /// Return one stable list of runtime-registered stores.
753    #[must_use]
754    pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
755        self.db.runtime_store_catalog()
756    }
757
758    /// Return one stable list of runtime-registered stable-memory allocations.
759    #[must_use]
760    pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
761        self.db.runtime_memory_catalog()
762    }
763
764    // Resolve the exact secondary-index set that is visible to planner-owned
765    // query planning for one recovered store and accepted schema pair.
766    #[cfg(any(test, feature = "sql-explain"))]
767    fn visible_indexes_for_store_accepted_schema(
768        &self,
769        store_path: &str,
770        schema_info: &SchemaInfo,
771    ) -> Result<VisibleIndexes<'static>, QueryError> {
772        // Phase 1: resolve the recovered store state once at the session
773        // boundary so query/executor planning does not reopen lifecycle checks.
774        let store = self
775            .db
776            .recovered_store(store_path)
777            .map_err(QueryError::execute)?;
778        let state = store.index_state();
779        if state != IndexState::Ready {
780            return Ok(VisibleIndexes::none());
781        }
782        debug_assert_eq!(state, IndexState::Ready);
783
784        // Phase 2: planner-visible indexes are accepted schema contracts once
785        // the recovered store is query-visible.
786        let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
787        debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
788        debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
789        debug_assert_eq!(
790            visible_indexes.accepted_expression_index_count(),
791            Some(visible_indexes.accepted_expression_indexes().len()),
792        );
793
794        Ok(visible_indexes)
795    }
796
797    /// Return one generated-model schema description for the entity.
798    ///
799    /// This is a typed `DESCRIBE`-style introspection surface consumed by
800    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
801    /// schema view is required.
802    #[must_use]
803    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
804    where
805        E: EntityKind<Canister = C>,
806    {
807        self.describe_entity_model(E::MODEL)
808    }
809
810    /// Return one generated-model schema description for one schema model.
811    #[must_use]
812    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
813        describe_entity_model(model)
814    }
815
816    /// Return a schema description using the accepted persisted schema snapshot.
817    ///
818    /// This is the live-schema counterpart to `describe_entity`. It is fallible
819    /// because loading accepted schema authority can fail if startup
820    /// reconciliation rejects the stored metadata.
821    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
822    where
823        E: EntityKind<Canister = C>,
824    {
825        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
826
827        Ok(describe_entity_model_with_persisted_schema(
828            E::MODEL,
829            &snapshot,
830        ))
831    }
832
833    // Ensure and return the accepted schema snapshot for one generated entity.
834    // This may write the first snapshot for an empty store; otherwise it loads
835    // the latest stored snapshot and applies the current exact-match policy.
836    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
837    where
838        E: EntityKind<Canister = C>,
839    {
840        let store = self.db.recovered_store(E::Store::PATH)?;
841
842        store.with_schema_mut(|schema_store| {
843            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
844        })
845    }
846
847    fn accepted_schema_query_cache_key(
848        &self,
849        entity_path: &'static str,
850    ) -> AcceptedSchemaQueryCacheKey {
851        (self.db.cache_scope_id(), entity_path)
852    }
853
854    fn accepted_schema_catalog_context_from_query_cache(
855        cache_key: AcceptedSchemaQueryCacheKey,
856        identity: AcceptedCatalogIdentity,
857    ) -> Option<AcceptedSchemaCatalogContext> {
858        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
859            cache.borrow().get(&cache_key).and_then(|entry| {
860                (entry.identity == identity)
861                    .then(|| AcceptedSchemaCatalogContext::new(entry.snapshot.clone(), identity))
862            })
863        })
864    }
865
866    fn insert_accepted_schema_query_cache(
867        cache_key: AcceptedSchemaQueryCacheKey,
868        snapshot: AcceptedSchemaSnapshot,
869        identity: AcceptedCatalogIdentity,
870    ) {
871        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
872            cache.borrow_mut().insert(
873                cache_key,
874                AcceptedSchemaQueryCacheEntry { snapshot, identity },
875            );
876        });
877    }
878
879    #[cfg(test)]
880    pub(in crate::db) fn clear_accepted_schema_query_cache_for_tests() {
881        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
882            cache.borrow_mut().clear();
883        });
884    }
885
886    // Load the current accepted schema snapshot for read/query paths without
887    // rerunning generated proposal reconciliation on every cold query call.
888    // Startup and write paths still own reconciliation; the fallback only keeps
889    // first-use test stores and freshly initialized local stores functional.
890    pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
891        &self,
892    ) -> Result<AcceptedSchemaCatalogContext, InternalError>
893    where
894        E: EntityKind<Canister = C>,
895    {
896        let cache_key = self.accepted_schema_query_cache_key(E::PATH);
897        if let Some(entry) =
898            ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
899        {
900            return Ok(AcceptedSchemaCatalogContext::new(
901                entry.snapshot,
902                entry.identity,
903            ));
904        }
905
906        let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
907        let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
908        let identity = AcceptedCatalogIdentity::new(
909            E::ENTITY_TAG,
910            E::PATH,
911            E::Store::PATH,
912            snapshot.persisted_snapshot().version(),
913            fingerprint,
914        );
915        Self::insert_accepted_schema_query_cache(cache_key, snapshot.clone(), identity);
916
917        Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
918    }
919
920    pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
921        &self,
922    ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
923    where
924        E: EntityKind<Canister = C>,
925    {
926        let store = self.db.recovered_store(E::Store::PATH)?;
927
928        store.with_schema_mut(|schema_store| {
929            schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
930        })
931    }
932
933    pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
934        &self,
935        selection: &AcceptedCatalogSnapshotSelection,
936    ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
937    where
938        E: EntityKind<Canister = C>,
939    {
940        let cache_key = self.accepted_schema_query_cache_key(E::PATH);
941        if let Some(context) =
942            Self::accepted_schema_catalog_context_from_query_cache(cache_key, selection.identity())
943        {
944            return Ok(Some(context));
945        }
946
947        let snapshot = selection.decode_verified()?;
948        if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
949            return Ok(None);
950        }
951        let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
952
953        Self::insert_accepted_schema_query_cache(cache_key, snapshot, selection.identity());
954
955        Ok(Some(context))
956    }
957
958    #[cfg(feature = "sql")]
959    fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
960    where
961        E: EntityKind<Canister = C>,
962    {
963        let cache_key = self.accepted_schema_query_cache_key(E::PATH);
964        ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
965            cache.borrow_mut().remove(&cache_key);
966        });
967    }
968
969    fn load_accepted_schema_snapshot_for_query<E>(
970        &self,
971    ) -> Result<AcceptedSchemaSnapshot, InternalError>
972    where
973        E: EntityKind<Canister = C>,
974    {
975        let store = self.db.recovered_store(E::Store::PATH)?;
976
977        store.with_schema_mut(|schema_store| {
978            if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
979                let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
980                if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
981                    &accepted,
982                    E::MODEL,
983                )
984                .is_ok()
985                {
986                    return Ok(accepted);
987                }
988            }
989
990            ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
991        })
992    }
993
994    // Build the accepted schema-info projection for one typed entity. Fluent
995    // terminal adapters use this before constructing slot-bound descriptors so
996    // field slot authority comes from the accepted schema snapshot.
997    pub(in crate::db) fn accepted_schema_info_for_entity<E>(
998        &self,
999    ) -> Result<SchemaInfo, InternalError>
1000    where
1001        E: EntityKind<Canister = C>,
1002    {
1003        let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
1004
1005        Ok(catalog.accepted_schema_info_for::<E>())
1006    }
1007
1008    // Derive typed executor authority from an accepted snapshot the caller
1009    // already loaded, avoiding a second schema-store pass in SQL write/select
1010    // adapters that need both write descriptors and read selector authority.
1011    #[cfg(feature = "sql")]
1012    pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
1013        accepted_schema: &AcceptedSchemaSnapshot,
1014    ) -> Result<EntityAuthority, InternalError>
1015    where
1016        E: EntityKind<Canister = C>,
1017    {
1018        EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
1019    }
1020
1021    // Ensure accepted schema metadata is safe for write paths that still encode
1022    // rows through generated field contracts. Returning only the snapshot keeps
1023    // SQL write type checks unchanged while the schema-runtime contract guard
1024    // rejects unsupported layout or payload drift before mutation staging.
1025    fn ensure_generated_compatible_accepted_save_schema<E>(
1026        &self,
1027    ) -> Result<
1028        (
1029            AcceptedRowDecodeContract,
1030            SchemaInfo,
1031            CommitSchemaFingerprint,
1032        ),
1033        InternalError,
1034    >
1035    where
1036        E: EntityKind<Canister = C>,
1037    {
1038        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
1039        let (accepted_row_layout, _) =
1040            AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
1041                &accepted_schema,
1042                E::MODEL,
1043            )?;
1044        let (row_decode_contract, _, schema_info, schema_fingerprint) =
1045            accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
1046
1047        Ok((row_decode_contract, schema_info, schema_fingerprint))
1048    }
1049
1050    /// Build one point-in-time storage report for observability endpoints.
1051    pub fn storage_report(
1052        &self,
1053        name_to_path: &[(&'static str, &'static str)],
1054    ) -> Result<StorageReport, InternalError> {
1055        self.db.storage_report(name_to_path)
1056    }
1057
1058    /// Build one point-in-time storage report using default entity-path labels.
1059    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
1060        self.db.storage_report_default()
1061    }
1062
1063    /// Build one point-in-time integrity scan report for observability endpoints.
1064    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
1065        self.db.integrity_report()
1066    }
1067
1068    // ---------------------------------------------------------------------
1069    // Low-level executors (crate-internal; execution primitives)
1070    // ---------------------------------------------------------------------
1071
1072    #[must_use]
1073    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
1074    where
1075        E: EntityKind<Canister = C> + EntityValue,
1076    {
1077        LoadExecutor::new(self.db, self.debug)
1078    }
1079
1080    #[must_use]
1081    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
1082    where
1083        E: PersistedRow<Canister = C> + EntityValue,
1084    {
1085        DeleteExecutor::new(self.db)
1086    }
1087
1088    #[must_use]
1089    pub(in crate::db) const fn save_executor<E>(
1090        &self,
1091        accepted_row_decode_contract: AcceptedRowDecodeContract,
1092        accepted_schema_info: SchemaInfo,
1093        accepted_schema_fingerprint: CommitSchemaFingerprint,
1094    ) -> SaveExecutor<E>
1095    where
1096        E: PersistedRow<Canister = C> + EntityValue,
1097    {
1098        SaveExecutor::new_with_accepted_contract(
1099            self.db,
1100            self.debug,
1101            accepted_row_decode_contract,
1102            accepted_schema_info,
1103            accepted_schema_fingerprint,
1104        )
1105    }
1106}