Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7#[cfg(feature = "sql")]
8mod sql;
9///
10/// TESTS
11///
12#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17    db::{
18        Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19        IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20        PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21        commit::EntityRuntimeHooks,
22        data::DataKey,
23        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
24        query::plan::VisibleIndexes,
25        schema::{
26            describe_entity_model, show_indexes_for_model,
27            show_indexes_for_model_with_runtime_state,
28        },
29    },
30    error::InternalError,
31    metrics::sink::{MetricsSink, with_metrics_sink},
32    model::entity::EntityModel,
33    traits::{CanisterKind, EntityKind, EntityValue, Path},
34    value::Value,
35};
36#[cfg(feature = "sql")]
37use std::cell::OnceCell;
38use std::thread::LocalKey;
39
40#[cfg(all(feature = "sql", feature = "perf-attribution"))]
41pub use sql::SqlQueryExecutionAttribution;
42#[cfg(feature = "sql")]
43pub use sql::SqlStatementResult;
44#[cfg(all(feature = "sql", feature = "structural-read-metrics"))]
45pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
46
47///
48/// DbSession
49///
50/// Session-scoped database handle with policy (debug, metrics) and execution routing.
51///
52
53pub struct DbSession<C: CanisterKind> {
54    db: Db<C>,
55    debug: bool,
56    metrics: Option<&'static dyn MetricsSink>,
57    #[cfg(feature = "sql")]
58    sql_compiled_command_cache: OnceCell<std::cell::RefCell<sql::SqlCompiledCommandCache>>,
59}
60
61impl<C: CanisterKind> DbSession<C> {
62    /// Construct one session facade for a database handle.
63    #[must_use]
64    pub(crate) const fn new(db: Db<C>) -> Self {
65        Self {
66            db,
67            debug: false,
68            metrics: None,
69            #[cfg(feature = "sql")]
70            sql_compiled_command_cache: OnceCell::new(),
71        }
72    }
73
74    /// Construct one session facade from store registry and runtime hooks.
75    #[must_use]
76    pub const fn new_with_hooks(
77        store: &'static LocalKey<StoreRegistry>,
78        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
79    ) -> Self {
80        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
81    }
82
83    /// Enable debug execution behavior where supported by executors.
84    #[must_use]
85    pub const fn debug(mut self) -> Self {
86        self.debug = true;
87        self
88    }
89
90    /// Attach one metrics sink for all session-executed operations.
91    #[must_use]
92    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
93        self.metrics = Some(sink);
94        self
95    }
96
97    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
98        if let Some(sink) = self.metrics {
99            with_metrics_sink(sink, f)
100        } else {
101            f()
102        }
103    }
104
105    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
106    fn execute_save_with<E, T, R>(
107        &self,
108        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
109        map: impl FnOnce(T) -> R,
110    ) -> Result<R, InternalError>
111    where
112        E: PersistedRow<Canister = C> + EntityValue,
113    {
114        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
115
116        Ok(map(value))
117    }
118
119    // Shared save-facade wrappers keep response shape explicit at call sites.
120    fn execute_save_entity<E>(
121        &self,
122        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
123    ) -> Result<E, InternalError>
124    where
125        E: PersistedRow<Canister = C> + EntityValue,
126    {
127        self.execute_save_with(op, std::convert::identity)
128    }
129
130    fn execute_save_batch<E>(
131        &self,
132        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
133    ) -> Result<WriteBatchResponse<E>, InternalError>
134    where
135        E: PersistedRow<Canister = C> + EntityValue,
136    {
137        self.execute_save_with(op, WriteBatchResponse::new)
138    }
139
140    // ---------------------------------------------------------------------
141    // Query entry points (public, fluent)
142    // ---------------------------------------------------------------------
143
144    /// Start a fluent load query with default missing-row policy (`Ignore`).
145    #[must_use]
146    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
147    where
148        E: EntityKind<Canister = C>,
149    {
150        FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
151    }
152
153    /// Start a fluent load query with explicit missing-row policy.
154    #[must_use]
155    pub const fn load_with_consistency<E>(
156        &self,
157        consistency: MissingRowPolicy,
158    ) -> FluentLoadQuery<'_, E>
159    where
160        E: EntityKind<Canister = C>,
161    {
162        FluentLoadQuery::new(self, Query::new(consistency))
163    }
164
165    /// Start a fluent delete query with default missing-row policy (`Ignore`).
166    #[must_use]
167    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
168    where
169        E: PersistedRow<Canister = C>,
170    {
171        FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
172    }
173
174    /// Start a fluent delete query with explicit missing-row policy.
175    #[must_use]
176    pub fn delete_with_consistency<E>(
177        &self,
178        consistency: MissingRowPolicy,
179    ) -> FluentDeleteQuery<'_, E>
180    where
181        E: PersistedRow<Canister = C>,
182    {
183        FluentDeleteQuery::new(self, Query::new(consistency).delete())
184    }
185
186    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
187    ///
188    /// This terminal bypasses query planning and access routing entirely.
189    #[must_use]
190    pub const fn select_one(&self) -> Value {
191        Value::Int(1)
192    }
193
194    /// Return one stable, human-readable index listing for the entity schema.
195    ///
196    /// Output format mirrors SQL-style introspection:
197    /// - `PRIMARY KEY (field)`
198    /// - `INDEX name (field_a, field_b)`
199    /// - `UNIQUE INDEX name (field_a, field_b)`
200    #[must_use]
201    pub fn show_indexes<E>(&self) -> Vec<String>
202    where
203        E: EntityKind<Canister = C>,
204    {
205        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
206    }
207
208    /// Return one stable, human-readable index listing for one schema model.
209    ///
210    /// This model-only helper is schema-owned and intentionally does not
211    /// attach runtime lifecycle state because it does not carry store
212    /// placement context.
213    #[must_use]
214    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
215        show_indexes_for_model(model)
216    }
217
218    // Return one stable, human-readable index listing for one resolved
219    // store/model pair, attaching the current runtime lifecycle state when the
220    // registry can resolve the backing store handle.
221    pub(in crate::db) fn show_indexes_for_store_model(
222        &self,
223        store_path: &str,
224        model: &'static EntityModel,
225    ) -> Vec<String> {
226        let runtime_state = self.try_index_state_for_store_path(store_path);
227
228        show_indexes_for_model_with_runtime_state(model, runtime_state)
229    }
230
231    /// Return one stable list of field descriptors for the entity schema.
232    #[must_use]
233    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
234    where
235        E: EntityKind<Canister = C>,
236    {
237        self.show_columns_for_model(E::MODEL)
238    }
239
240    /// Return one stable list of field descriptors for one schema model.
241    #[must_use]
242    pub fn show_columns_for_model(
243        &self,
244        model: &'static EntityModel,
245    ) -> Vec<EntityFieldDescription> {
246        describe_entity_model(model).fields().to_vec()
247    }
248
249    /// Return one stable list of runtime-registered entity names.
250    #[must_use]
251    pub fn show_entities(&self) -> Vec<String> {
252        self.db.runtime_entity_names()
253    }
254
255    /// Return one stable list of runtime-registered entity names.
256    ///
257    /// `SHOW TABLES` is only an alias for `SHOW ENTITIES`, so the typed
258    /// metadata surface keeps the same alias relationship.
259    #[must_use]
260    pub fn show_tables(&self) -> Vec<String> {
261        self.show_entities()
262    }
263
264    // Best-effort runtime state lookup for metadata surfaces.
265    // SHOW INDEXES should stay readable even if one store handle is missing
266    // from the registry, so this helper falls back to the pure schema-owned
267    // listing instead of turning metadata inspection into an execution error.
268    fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
269        self.db
270            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
271            .map(|store| store.index_state())
272    }
273
274    // Resolve the exact secondary-index set that is visible to planner-owned
275    // query planning for one recovered store/model pair.
276    fn visible_indexes_for_store_model(
277        &self,
278        store_path: &str,
279        model: &'static EntityModel,
280    ) -> Result<VisibleIndexes<'static>, QueryError> {
281        // Phase 1: resolve the recovered store state once at the session
282        // boundary so query/executor planning does not reopen lifecycle checks.
283        let store = self
284            .db
285            .recovered_store(store_path)
286            .map_err(QueryError::execute)?;
287        let state = store.index_state();
288        if state != IndexState::Ready {
289            return Ok(VisibleIndexes::none());
290        }
291        debug_assert_eq!(state, IndexState::Ready);
292
293        // Phase 2: planner-visible indexes are exactly the model-owned index
294        // declarations once the recovered store is query-visible.
295        Ok(VisibleIndexes::planner_visible(model.indexes()))
296    }
297
298    /// Return one structured schema description for the entity.
299    ///
300    /// This is a typed `DESCRIBE`-style introspection surface consumed by
301    /// developer tooling and pre-EXPLAIN debugging.
302    #[must_use]
303    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
304    where
305        E: EntityKind<Canister = C>,
306    {
307        self.describe_entity_model(E::MODEL)
308    }
309
310    /// Return one structured schema description for one schema model.
311    #[must_use]
312    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
313        describe_entity_model(model)
314    }
315
316    /// Build one point-in-time storage report for observability endpoints.
317    pub fn storage_report(
318        &self,
319        name_to_path: &[(&'static str, &'static str)],
320    ) -> Result<StorageReport, InternalError> {
321        self.db.storage_report(name_to_path)
322    }
323
324    /// Build one point-in-time storage report using default entity-path labels.
325    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
326        self.db.storage_report_default()
327    }
328
329    /// Build one point-in-time integrity scan report for observability endpoints.
330    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
331        self.db.integrity_report()
332    }
333
334    /// Execute one bounded migration run with durable internal cursor state.
335    ///
336    /// Migration progress is persisted internally so upgrades/restarts can
337    /// resume from the last successful step without external cursor ownership.
338    pub fn execute_migration_plan(
339        &self,
340        plan: &MigrationPlan,
341        max_steps: usize,
342    ) -> Result<MigrationRunOutcome, InternalError> {
343        self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
344    }
345
346    // ---------------------------------------------------------------------
347    // Low-level executors (crate-internal; execution primitives)
348    // ---------------------------------------------------------------------
349
350    #[must_use]
351    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
352    where
353        E: EntityKind<Canister = C> + EntityValue,
354    {
355        LoadExecutor::new(self.db, self.debug)
356    }
357
358    #[must_use]
359    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
360    where
361        E: PersistedRow<Canister = C> + EntityValue,
362    {
363        DeleteExecutor::new(self.db)
364    }
365
366    #[must_use]
367    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
368    where
369        E: PersistedRow<Canister = C> + EntityValue,
370    {
371        SaveExecutor::new(self.db, self.debug)
372    }
373}
374
375/// Remove one entity row from the authoritative data store only.
376///
377/// This hidden helper exists for stale-index test fixtures that need to keep
378/// secondary/index state intact while deleting the base row bytes.
379#[doc(hidden)]
380pub fn debug_remove_entity_row_data_only<C, E>(
381    session: &DbSession<C>,
382    key: &E::Key,
383) -> Result<bool, InternalError>
384where
385    C: CanisterKind,
386    E: PersistedRow<Canister = C> + EntityValue,
387{
388    // Phase 1: resolve the store through the recovered session boundary so
389    // the helper cannot mutate pre-recovery state.
390    let store = session.db.recovered_store(E::Store::PATH)?;
391
392    // Phase 2: remove only the raw row-store entry and compute the canonical
393    // storage key that any surviving secondary memberships still point at.
394    let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
395    let raw_key = data_key.to_raw()?;
396    let storage_key = data_key.storage_key();
397
398    // Phase 3: preserve the secondary entries but mark any surviving raw
399    // memberships as explicitly missing so stale-index fixtures can exercise
400    // impossible-state behavior without lying about row existence.
401    let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
402    if !removed {
403        return Ok(false);
404    }
405
406    store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
407
408    Ok(true)
409}
410
411/// Mark one recovered store index with one explicit lifecycle state.
412///
413/// This hidden helper exists for test fixtures that need to force one index
414/// out of the `Ready` state while keeping all other lifecycle plumbing
415/// unchanged.
416#[doc(hidden)]
417pub fn debug_mark_store_index_state<C>(
418    session: &DbSession<C>,
419    store_path: &str,
420    state: IndexState,
421) -> Result<(), InternalError>
422where
423    C: CanisterKind,
424{
425    // Phase 1: resolve the recovered store so lifecycle mutation cannot
426    // target pre-recovery state.
427    let store = session.db.recovered_store(store_path)?;
428
429    // Phase 2: apply the explicit lifecycle state directly to the index half
430    // of the store pair so tests can observe the `Ready` gate in isolation.
431    match state {
432        IndexState::Building => store.mark_index_building(),
433        IndexState::Ready => store.mark_index_ready(),
434        IndexState::Dropping => store.mark_index_dropping(),
435    }
436
437    Ok(())
438}