Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7#[cfg(feature = "sql")]
8mod sql;
9///
10/// TESTS
11///
12#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17    db::{
18        Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19        IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20        PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21        commit::EntityRuntimeHooks,
22        data::DataKey,
23        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
24        query::plan::VisibleIndexes,
25        schema::{
26            describe_entity_model, show_indexes_for_model,
27            show_indexes_for_model_with_runtime_state,
28        },
29    },
30    error::InternalError,
31    metrics::sink::{MetricsSink, with_metrics_sink},
32    model::entity::EntityModel,
33    traits::{CanisterKind, EntityKind, EntityValue, Path},
34    value::Value,
35};
36use std::thread::LocalKey;
37
38#[cfg(feature = "diagnostics")]
39pub use query::QueryExecutionAttribution;
40#[cfg(all(feature = "sql", feature = "diagnostics"))]
41pub use sql::SqlQueryExecutionAttribution;
42#[cfg(feature = "sql")]
43pub use sql::SqlStatementResult;
44#[cfg(all(feature = "sql", feature = "diagnostics"))]
45pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
46
47///
48/// DbSession
49///
50/// Session-scoped database handle with policy (debug, metrics) and execution routing.
51///
52
53pub struct DbSession<C: CanisterKind> {
54    db: Db<C>,
55    debug: bool,
56    metrics: Option<&'static dyn MetricsSink>,
57}
58
59impl<C: CanisterKind> DbSession<C> {
60    /// Construct one session facade for a database handle.
61    #[must_use]
62    pub(crate) const fn new(db: Db<C>) -> Self {
63        Self {
64            db,
65            debug: false,
66            metrics: None,
67        }
68    }
69
70    /// Construct one session facade from store registry and runtime hooks.
71    #[must_use]
72    pub const fn new_with_hooks(
73        store: &'static LocalKey<StoreRegistry>,
74        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
75    ) -> Self {
76        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
77    }
78
79    /// Enable debug execution behavior where supported by executors.
80    #[must_use]
81    pub const fn debug(mut self) -> Self {
82        self.debug = true;
83        self
84    }
85
86    /// Attach one metrics sink for all session-executed operations.
87    #[must_use]
88    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
89        self.metrics = Some(sink);
90        self
91    }
92
93    // Shared fluent load wrapper construction keeps the session boundary in
94    // one place when load entry points differ only by missing-row policy.
95    const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
96    where
97        E: EntityKind<Canister = C>,
98    {
99        FluentLoadQuery::new(self, Query::new(consistency))
100    }
101
102    // Shared fluent delete wrapper construction keeps the delete-mode handoff
103    // explicit at the session boundary instead of reassembling the same query
104    // shell in each public entry point.
105    fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
106    where
107        E: PersistedRow<Canister = C>,
108    {
109        FluentDeleteQuery::new(self, Query::new(consistency).delete())
110    }
111
112    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
113        if let Some(sink) = self.metrics {
114            with_metrics_sink(sink, f)
115        } else {
116            f()
117        }
118    }
119
120    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
121    fn execute_save_with<E, T, R>(
122        &self,
123        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
124        map: impl FnOnce(T) -> R,
125    ) -> Result<R, InternalError>
126    where
127        E: PersistedRow<Canister = C> + EntityValue,
128    {
129        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
130
131        Ok(map(value))
132    }
133
134    // Shared save-facade wrappers keep response shape explicit at call sites.
135    fn execute_save_entity<E>(
136        &self,
137        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
138    ) -> Result<E, InternalError>
139    where
140        E: PersistedRow<Canister = C> + EntityValue,
141    {
142        self.execute_save_with(op, std::convert::identity)
143    }
144
145    fn execute_save_batch<E>(
146        &self,
147        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
148    ) -> Result<WriteBatchResponse<E>, InternalError>
149    where
150        E: PersistedRow<Canister = C> + EntityValue,
151    {
152        self.execute_save_with(op, WriteBatchResponse::new)
153    }
154
155    // ---------------------------------------------------------------------
156    // Query entry points (public, fluent)
157    // ---------------------------------------------------------------------
158
159    /// Start a fluent load query with default missing-row policy (`Ignore`).
160    #[must_use]
161    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
162    where
163        E: EntityKind<Canister = C>,
164    {
165        self.fluent_load_query(MissingRowPolicy::Ignore)
166    }
167
168    /// Start a fluent load query with explicit missing-row policy.
169    #[must_use]
170    pub const fn load_with_consistency<E>(
171        &self,
172        consistency: MissingRowPolicy,
173    ) -> FluentLoadQuery<'_, E>
174    where
175        E: EntityKind<Canister = C>,
176    {
177        self.fluent_load_query(consistency)
178    }
179
180    /// Start a fluent delete query with default missing-row policy (`Ignore`).
181    #[must_use]
182    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
183    where
184        E: PersistedRow<Canister = C>,
185    {
186        self.fluent_delete_query(MissingRowPolicy::Ignore)
187    }
188
189    /// Start a fluent delete query with explicit missing-row policy.
190    #[must_use]
191    pub fn delete_with_consistency<E>(
192        &self,
193        consistency: MissingRowPolicy,
194    ) -> FluentDeleteQuery<'_, E>
195    where
196        E: PersistedRow<Canister = C>,
197    {
198        self.fluent_delete_query(consistency)
199    }
200
201    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
202    ///
203    /// This terminal bypasses query planning and access routing entirely.
204    #[must_use]
205    pub const fn select_one(&self) -> Value {
206        Value::Int(1)
207    }
208
209    /// Return one stable, human-readable index listing for the entity schema.
210    ///
211    /// Output format mirrors SQL-style introspection:
212    /// - `PRIMARY KEY (field)`
213    /// - `INDEX name (field_a, field_b)`
214    /// - `UNIQUE INDEX name (field_a, field_b)`
215    #[must_use]
216    pub fn show_indexes<E>(&self) -> Vec<String>
217    where
218        E: EntityKind<Canister = C>,
219    {
220        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
221    }
222
223    /// Return one stable, human-readable index listing for one schema model.
224    ///
225    /// This model-only helper is schema-owned and intentionally does not
226    /// attach runtime lifecycle state because it does not carry store
227    /// placement context.
228    #[must_use]
229    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
230        show_indexes_for_model(model)
231    }
232
233    // Return one stable, human-readable index listing for one resolved
234    // store/model pair, attaching the current runtime lifecycle state when the
235    // registry can resolve the backing store handle.
236    pub(in crate::db) fn show_indexes_for_store_model(
237        &self,
238        store_path: &str,
239        model: &'static EntityModel,
240    ) -> Vec<String> {
241        let runtime_state = self
242            .db
243            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
244            .map(|store| store.index_state());
245
246        show_indexes_for_model_with_runtime_state(model, runtime_state)
247    }
248
249    /// Return one stable list of field descriptors for the entity schema.
250    #[must_use]
251    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
252    where
253        E: EntityKind<Canister = C>,
254    {
255        self.show_columns_for_model(E::MODEL)
256    }
257
258    /// Return one stable list of field descriptors for one schema model.
259    #[must_use]
260    pub fn show_columns_for_model(
261        &self,
262        model: &'static EntityModel,
263    ) -> Vec<EntityFieldDescription> {
264        describe_entity_model(model).fields().to_vec()
265    }
266
267    /// Return one stable list of runtime-registered entity names.
268    #[must_use]
269    pub fn show_entities(&self) -> Vec<String> {
270        self.db.runtime_entity_names()
271    }
272
273    /// Return one stable list of runtime-registered entity names.
274    ///
275    /// `SHOW TABLES` is only an alias for `SHOW ENTITIES`, so the typed
276    /// metadata surface keeps the same alias relationship.
277    #[must_use]
278    pub fn show_tables(&self) -> Vec<String> {
279        self.show_entities()
280    }
281
282    // Resolve the exact secondary-index set that is visible to planner-owned
283    // query planning for one recovered store/model pair.
284    fn visible_indexes_for_store_model(
285        &self,
286        store_path: &str,
287        model: &'static EntityModel,
288    ) -> Result<VisibleIndexes<'static>, QueryError> {
289        // Phase 1: resolve the recovered store state once at the session
290        // boundary so query/executor planning does not reopen lifecycle checks.
291        let store = self
292            .db
293            .recovered_store(store_path)
294            .map_err(QueryError::execute)?;
295        let state = store.index_state();
296        if state != IndexState::Ready {
297            return Ok(VisibleIndexes::none());
298        }
299        debug_assert_eq!(state, IndexState::Ready);
300
301        // Phase 2: planner-visible indexes are exactly the model-owned index
302        // declarations once the recovered store is query-visible.
303        Ok(VisibleIndexes::planner_visible(model.indexes()))
304    }
305
306    /// Return one structured schema description for the entity.
307    ///
308    /// This is a typed `DESCRIBE`-style introspection surface consumed by
309    /// developer tooling and pre-EXPLAIN debugging.
310    #[must_use]
311    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
312    where
313        E: EntityKind<Canister = C>,
314    {
315        self.describe_entity_model(E::MODEL)
316    }
317
318    /// Return one structured schema description for one schema model.
319    #[must_use]
320    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
321        describe_entity_model(model)
322    }
323
324    /// Build one point-in-time storage report for observability endpoints.
325    pub fn storage_report(
326        &self,
327        name_to_path: &[(&'static str, &'static str)],
328    ) -> Result<StorageReport, InternalError> {
329        self.db.storage_report(name_to_path)
330    }
331
332    /// Build one point-in-time storage report using default entity-path labels.
333    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
334        self.db.storage_report_default()
335    }
336
337    /// Build one point-in-time integrity scan report for observability endpoints.
338    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
339        self.db.integrity_report()
340    }
341
342    /// Execute one bounded migration run with durable internal cursor state.
343    ///
344    /// Migration progress is persisted internally so upgrades/restarts can
345    /// resume from the last successful step without external cursor ownership.
346    pub fn execute_migration_plan(
347        &self,
348        plan: &MigrationPlan,
349        max_steps: usize,
350    ) -> Result<MigrationRunOutcome, InternalError> {
351        self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
352    }
353
354    // ---------------------------------------------------------------------
355    // Low-level executors (crate-internal; execution primitives)
356    // ---------------------------------------------------------------------
357
358    #[must_use]
359    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
360    where
361        E: EntityKind<Canister = C> + EntityValue,
362    {
363        LoadExecutor::new(self.db, self.debug)
364    }
365
366    #[must_use]
367    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
368    where
369        E: PersistedRow<Canister = C> + EntityValue,
370    {
371        DeleteExecutor::new(self.db)
372    }
373
374    #[must_use]
375    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
376    where
377        E: PersistedRow<Canister = C> + EntityValue,
378    {
379        SaveExecutor::new(self.db, self.debug)
380    }
381}
382
383/// Remove one entity row from the authoritative data store only.
384///
385/// This hidden helper exists for stale-index test fixtures that need to keep
386/// secondary/index state intact while deleting the base row bytes.
387#[doc(hidden)]
388pub fn debug_remove_entity_row_data_only<C, E>(
389    session: &DbSession<C>,
390    key: &E::Key,
391) -> Result<bool, InternalError>
392where
393    C: CanisterKind,
394    E: PersistedRow<Canister = C> + EntityValue,
395{
396    // Phase 1: resolve the store through the recovered session boundary so
397    // the helper cannot mutate pre-recovery state.
398    let store = session.db.recovered_store(E::Store::PATH)?;
399
400    // Phase 2: remove only the raw row-store entry and compute the canonical
401    // storage key that any surviving secondary memberships still point at.
402    let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
403    let raw_key = data_key.to_raw()?;
404    let storage_key = data_key.storage_key();
405
406    // Phase 3: preserve the secondary entries but mark any surviving raw
407    // memberships as explicitly missing so stale-index fixtures can exercise
408    // impossible-state behavior without lying about row existence.
409    let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
410    if !removed {
411        return Ok(false);
412    }
413
414    store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
415
416    Ok(true)
417}
418
419/// Mark one recovered store index with one explicit lifecycle state.
420///
421/// This hidden helper exists for test fixtures that need to force one index
422/// out of the `Ready` state while keeping all other lifecycle plumbing
423/// unchanged.
424#[doc(hidden)]
425pub fn debug_mark_store_index_state<C>(
426    session: &DbSession<C>,
427    store_path: &str,
428    state: IndexState,
429) -> Result<(), InternalError>
430where
431    C: CanisterKind,
432{
433    // Phase 1: resolve the recovered store so lifecycle mutation cannot
434    // target pre-recovery state.
435    let store = session.db.recovered_store(store_path)?;
436
437    // Phase 2: apply the explicit lifecycle state directly to the index half
438    // of the store pair so tests can observe the `Ready` gate in isolation.
439    match state {
440        IndexState::Building => store.mark_index_building(),
441        IndexState::Ready => store.mark_index_ready(),
442        IndexState::Dropping => store.mark_index_dropping(),
443    }
444
445    Ok(())
446}