Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10///
11/// TESTS
12///
13#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18    db::{
19        Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20        FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21        QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22        executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
23        query::plan::VisibleIndexes,
24        schema::{
25            AcceptedRowLayoutRuntimeDescriptor, AcceptedSchemaSnapshot, describe_entity_fields,
26            describe_entity_fields_with_persisted_schema, describe_entity_model,
27            describe_entity_model_with_persisted_schema, ensure_accepted_schema_snapshot,
28            show_indexes_for_model, show_indexes_for_model_with_runtime_state,
29        },
30    },
31    error::InternalError,
32    metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
33    model::entity::EntityModel,
34    traits::{CanisterKind, EntityKind, EntityValue, Path},
35    value::Value,
36};
37use std::thread::LocalKey;
38
39#[cfg(feature = "diagnostics")]
40pub use query::{
41    DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
42    QueryExecutionAttribution,
43};
44pub(in crate::db) use response::finalize_structural_grouped_projection_result;
45pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
46#[cfg(feature = "sql")]
47pub use sql::SqlStatementResult;
48#[cfg(all(feature = "sql", feature = "diagnostics"))]
49pub use sql::{
50    SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
51    SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
52};
53#[cfg(all(feature = "sql", feature = "diagnostics"))]
54pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
55
56///
57/// DbSession
58///
59/// Session-scoped database handle with policy (debug, metrics) and execution routing.
60///
61
62pub struct DbSession<C: CanisterKind> {
63    db: Db<C>,
64    debug: bool,
65    metrics: Option<&'static dyn MetricsSink>,
66}
67
68impl<C: CanisterKind> DbSession<C> {
69    /// Construct one session facade for a database handle.
70    #[must_use]
71    pub(crate) const fn new(db: Db<C>) -> Self {
72        Self {
73            db,
74            debug: false,
75            metrics: None,
76        }
77    }
78
79    /// Construct one session facade from store registry and runtime hooks.
80    #[must_use]
81    pub const fn new_with_hooks(
82        store: &'static LocalKey<StoreRegistry>,
83        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84    ) -> Self {
85        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
86    }
87
88    /// Enable debug execution behavior where supported by executors.
89    #[must_use]
90    pub const fn debug(mut self) -> Self {
91        self.debug = true;
92        self
93    }
94
95    /// Attach one metrics sink for all session-executed operations.
96    #[must_use]
97    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
98        self.metrics = Some(sink);
99        self
100    }
101
102    // Shared fluent load wrapper construction keeps the session boundary in
103    // one place when load entry points differ only by missing-row policy.
104    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
105    where
106        E: EntityKind<Canister = C>,
107    {
108        FluentLoadQuery::new(self, Query::new(consistency))
109    }
110
111    // Shared fluent delete wrapper construction keeps the delete-mode handoff
112    // explicit at the session boundary instead of reassembling the same query
113    // shell in each public entry point.
114    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
115    where
116        E: PersistedRow<Canister = C>,
117    {
118        FluentDeleteQuery::new(self, Query::new(consistency).delete())
119    }
120
121    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
122        if let Some(sink) = self.metrics {
123            with_metrics_sink(sink, f)
124        } else {
125            f()
126        }
127    }
128
129    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
130    fn execute_save_with<E, T, R>(
131        &self,
132        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
133        map: impl FnOnce(T) -> R,
134    ) -> Result<R, InternalError>
135    where
136        E: PersistedRow<Canister = C> + EntityValue,
137    {
138        if let Err(error) =
139            self.with_metrics(|| self.ensure_generated_compatible_accepted_schema_snapshot::<E>())
140        {
141            self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
142
143            return Err(error);
144        }
145
146        self.execute_save_with_checked_accepted_schema(op, map)
147    }
148
149    // Execute save work after the caller has already proven that the accepted
150    // schema is generated-compatible. SQL writes use this after their
151    // pre-staging schema guard so mutation staging and save execution do not
152    // run duplicate schema-store reconciliation in the same statement.
153    fn execute_save_with_checked_accepted_schema<E, T, R>(
154        &self,
155        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
156        map: impl FnOnce(T) -> R,
157    ) -> Result<R, InternalError>
158    where
159        E: PersistedRow<Canister = C> + EntityValue,
160    {
161        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
162
163        Ok(map(value))
164    }
165
166    // Shared save-facade wrappers keep response shape explicit at call sites.
167    fn execute_save_entity<E>(
168        &self,
169        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
170    ) -> Result<E, InternalError>
171    where
172        E: PersistedRow<Canister = C> + EntityValue,
173    {
174        self.execute_save_with(op, std::convert::identity)
175    }
176
177    fn execute_save_batch<E>(
178        &self,
179        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
180    ) -> Result<WriteBatchResponse<E>, InternalError>
181    where
182        E: PersistedRow<Canister = C> + EntityValue,
183    {
184        self.execute_save_with(op, WriteBatchResponse::new)
185    }
186
187    // ---------------------------------------------------------------------
188    // Query entry points (public, fluent)
189    // ---------------------------------------------------------------------
190
191    /// Start a fluent load query with default missing-row policy (`Ignore`).
192    #[must_use]
193    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
194    where
195        E: EntityKind<Canister = C>,
196    {
197        self.fluent_load_query(MissingRowPolicy::Ignore)
198    }
199
200    /// Start a fluent load query with explicit missing-row policy.
201    #[must_use]
202    pub const fn load_with_consistency<E>(
203        &self,
204        consistency: MissingRowPolicy,
205    ) -> FluentLoadQuery<'_, E>
206    where
207        E: EntityKind<Canister = C>,
208    {
209        self.fluent_load_query(consistency)
210    }
211
212    /// Start a fluent delete query with default missing-row policy (`Ignore`).
213    #[must_use]
214    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
215    where
216        E: PersistedRow<Canister = C>,
217    {
218        self.fluent_delete_query(MissingRowPolicy::Ignore)
219    }
220
221    /// Start a fluent delete query with explicit missing-row policy.
222    #[must_use]
223    pub fn delete_with_consistency<E>(
224        &self,
225        consistency: MissingRowPolicy,
226    ) -> FluentDeleteQuery<'_, E>
227    where
228        E: PersistedRow<Canister = C>,
229    {
230        self.fluent_delete_query(consistency)
231    }
232
233    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
234    ///
235    /// This terminal bypasses query planning and access routing entirely.
236    #[must_use]
237    pub const fn select_one(&self) -> Value {
238        Value::Int(1)
239    }
240
241    /// Return one stable, human-readable index listing for the entity schema.
242    ///
243    /// Output format mirrors SQL-style introspection:
244    /// - `PRIMARY KEY (field)`
245    /// - `INDEX name (field_a, field_b)`
246    /// - `UNIQUE INDEX name (field_a, field_b)`
247    #[must_use]
248    pub fn show_indexes<E>(&self) -> Vec<String>
249    where
250        E: EntityKind<Canister = C>,
251    {
252        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
253    }
254
255    /// Return one stable, human-readable index listing for one schema model.
256    ///
257    /// This model-only helper is schema-owned and intentionally does not
258    /// attach runtime lifecycle state because it does not carry store
259    /// placement context.
260    #[must_use]
261    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
262        show_indexes_for_model(model)
263    }
264
265    // Return one stable, human-readable index listing for one resolved
266    // store/model pair, attaching the current runtime lifecycle state when the
267    // registry can resolve the backing store handle.
268    pub(in crate::db) fn show_indexes_for_store_model(
269        &self,
270        store_path: &str,
271        model: &'static EntityModel,
272    ) -> Vec<String> {
273        let runtime_state = self
274            .db
275            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
276            .map(|store| store.index_state());
277
278        show_indexes_for_model_with_runtime_state(model, runtime_state)
279    }
280
281    /// Return one stable generated-model list of field descriptors.
282    ///
283    /// This infallible Rust metadata helper intentionally reports the compiled
284    /// schema model. Use `try_show_columns` for the accepted persisted-schema
285    /// view used by SQL and diagnostics surfaces.
286    #[must_use]
287    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
288    where
289        E: EntityKind<Canister = C>,
290    {
291        self.show_columns_for_model(E::MODEL)
292    }
293
294    /// Return one stable generated-model list of field descriptors.
295    #[must_use]
296    pub fn show_columns_for_model(
297        &self,
298        model: &'static EntityModel,
299    ) -> Vec<EntityFieldDescription> {
300        describe_entity_fields(model)
301    }
302
303    /// Return field descriptors using the accepted persisted schema snapshot.
304    ///
305    /// This fallible variant is intended for SQL and diagnostics surfaces that
306    /// can report schema reconciliation failures. The infallible
307    /// `show_columns` helper remains generated-model based.
308    pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
309    where
310        E: EntityKind<Canister = C>,
311    {
312        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
313
314        Ok(describe_entity_fields_with_persisted_schema(&snapshot))
315    }
316
317    /// Return one stable list of runtime-registered entity names.
318    #[must_use]
319    pub fn show_entities(&self) -> Vec<String> {
320        self.db.runtime_entity_names()
321    }
322
323    /// Return one stable list of runtime-registered entity names.
324    ///
325    /// `SHOW TABLES` is only an alias for `SHOW ENTITIES`, so the typed
326    /// metadata surface keeps the same alias relationship.
327    #[must_use]
328    pub fn show_tables(&self) -> Vec<String> {
329        self.show_entities()
330    }
331
332    // Resolve the exact secondary-index set that is visible to planner-owned
333    // query planning for one recovered store/model pair.
334    fn visible_indexes_for_store_model(
335        &self,
336        store_path: &str,
337        model: &'static EntityModel,
338    ) -> Result<VisibleIndexes<'static>, QueryError> {
339        // Phase 1: resolve the recovered store state once at the session
340        // boundary so query/executor planning does not reopen lifecycle checks.
341        let store = self
342            .db
343            .recovered_store(store_path)
344            .map_err(QueryError::execute)?;
345        let state = store.index_state();
346        if state != IndexState::Ready {
347            return Ok(VisibleIndexes::none());
348        }
349        debug_assert_eq!(state, IndexState::Ready);
350
351        // Phase 2: planner-visible indexes are exactly the model-owned index
352        // declarations once the recovered store is query-visible.
353        Ok(VisibleIndexes::planner_visible(model.indexes()))
354    }
355
356    /// Return one generated-model schema description for the entity.
357    ///
358    /// This is a typed `DESCRIBE`-style introspection surface consumed by
359    /// developer tooling and pre-EXPLAIN debugging when a non-failing compiled
360    /// schema view is required.
361    #[must_use]
362    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
363    where
364        E: EntityKind<Canister = C>,
365    {
366        self.describe_entity_model(E::MODEL)
367    }
368
369    /// Return one generated-model schema description for one schema model.
370    #[must_use]
371    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
372        describe_entity_model(model)
373    }
374
375    /// Return a schema description using the accepted persisted schema snapshot.
376    ///
377    /// This is the live-schema counterpart to `describe_entity`. It is fallible
378    /// because loading accepted schema authority can fail if startup
379    /// reconciliation rejects the stored metadata.
380    pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
381    where
382        E: EntityKind<Canister = C>,
383    {
384        let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
385
386        Ok(describe_entity_model_with_persisted_schema(
387            E::MODEL,
388            &snapshot,
389        ))
390    }
391
392    // Ensure and return the accepted schema snapshot for one generated entity.
393    // This may write the first snapshot for an empty store; otherwise it loads
394    // the latest stored snapshot and applies the current exact-match policy.
395    fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
396    where
397        E: EntityKind<Canister = C>,
398    {
399        self.ensure_accepted_schema_snapshot_for_authority(EntityAuthority::for_type::<E>())
400    }
401
402    // Ensure and return the accepted schema snapshot from already-resolved
403    // structural entity authority. SQL and fluent shared-plan cache paths use
404    // this shape after lowering has erased the concrete entity type.
405    fn ensure_accepted_schema_snapshot_for_authority(
406        &self,
407        authority: EntityAuthority,
408    ) -> Result<AcceptedSchemaSnapshot, InternalError> {
409        let store = self.db.recovered_store(authority.store_path())?;
410
411        store.with_schema_mut(|schema_store| {
412            ensure_accepted_schema_snapshot(
413                schema_store,
414                authority.entity_tag(),
415                authority.entity_path(),
416                authority.model(),
417            )
418        })
419    }
420
421    // Ensure accepted schema metadata and derive the execution authority that
422    // consumes it. Keeping the pair together prevents session call sites from
423    // mixing a live-schema fingerprint with a generated-only row layout.
424    fn ensure_accepted_schema_snapshot_and_authority(
425        &self,
426        authority: EntityAuthority,
427    ) -> Result<(AcceptedSchemaSnapshot, EntityAuthority), InternalError> {
428        let accepted_schema = self.ensure_accepted_schema_snapshot_for_authority(authority)?;
429        let accepted_row_layout =
430            AcceptedRowLayoutRuntimeDescriptor::from_accepted_schema(&accepted_schema)?;
431        let row_shape =
432            accepted_row_layout.generated_compatible_row_shape_for_model(authority.model())?;
433        let authority = authority.with_generated_compatible_row_shape(row_shape);
434
435        Ok((accepted_schema, authority))
436    }
437
438    // Ensure accepted schema metadata is safe for write paths that still encode
439    // rows through generated field contracts. Returning only the snapshot keeps
440    // SQL write type checks unchanged while the schema-runtime descriptor guard
441    // rejects unsupported layout or payload drift before mutation staging.
442    fn ensure_generated_compatible_accepted_schema_snapshot<E>(
443        &self,
444    ) -> Result<AcceptedSchemaSnapshot, InternalError>
445    where
446        E: EntityKind<Canister = C>,
447    {
448        let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
449        let accepted_row_layout =
450            AcceptedRowLayoutRuntimeDescriptor::from_accepted_schema(&accepted_schema)?;
451        accepted_row_layout.generated_compatible_row_shape_for_model(E::MODEL)?;
452
453        Ok(accepted_schema)
454    }
455
456    /// Build one point-in-time storage report for observability endpoints.
457    pub fn storage_report(
458        &self,
459        name_to_path: &[(&'static str, &'static str)],
460    ) -> Result<StorageReport, InternalError> {
461        self.db.storage_report(name_to_path)
462    }
463
464    /// Build one point-in-time storage report using default entity-path labels.
465    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
466        self.db.storage_report_default()
467    }
468
469    /// Build one point-in-time integrity scan report for observability endpoints.
470    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
471        self.db.integrity_report()
472    }
473
474    // ---------------------------------------------------------------------
475    // Low-level executors (crate-internal; execution primitives)
476    // ---------------------------------------------------------------------
477
478    #[must_use]
479    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
480    where
481        E: EntityKind<Canister = C> + EntityValue,
482    {
483        LoadExecutor::new(self.db, self.debug)
484    }
485
486    #[must_use]
487    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
488    where
489        E: PersistedRow<Canister = C> + EntityValue,
490    {
491        DeleteExecutor::new(self.db)
492    }
493
494    #[must_use]
495    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
496    where
497        E: PersistedRow<Canister = C> + EntityValue,
498    {
499        SaveExecutor::new(self.db, self.debug)
500    }
501}