Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7#[cfg(feature = "sql")]
8mod sql;
9///
10/// TESTS
11///
12#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17    db::{
18        Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19        IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20        PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21        commit::EntityRuntimeHooks,
22        cursor::{decode_optional_cursor_token, decode_optional_grouped_cursor_token},
23        data::DataKey,
24        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
25        query::plan::VisibleIndexes,
26        schema::{
27            describe_entity_model, show_indexes_for_model,
28            show_indexes_for_model_with_runtime_state,
29        },
30    },
31    error::InternalError,
32    metrics::sink::{MetricsSink, with_metrics_sink},
33    model::entity::EntityModel,
34    traits::{CanisterKind, EntityKind, EntityValue, Path},
35    value::Value,
36};
37use std::thread::LocalKey;
38
39#[cfg(feature = "sql")]
40pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
41
42// Decode one optional external cursor token and map decode failures into the
43// query-plan cursor error boundary.
44fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
45    decode_optional_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
46}
47
48// Decode one optional grouped continuation token through the existing cursor
49// text boundary while preserving grouped-token ownership for grouped resume.
50fn decode_optional_grouped_cursor(
51    cursor_token: Option<&str>,
52) -> Result<Option<crate::db::cursor::GroupedContinuationToken>, QueryError> {
53    decode_optional_grouped_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
54}
55
56///
57/// DbSession
58///
59/// Session-scoped database handle with policy (debug, metrics) and execution routing.
60///
61
62pub struct DbSession<C: CanisterKind> {
63    db: Db<C>,
64    debug: bool,
65    metrics: Option<&'static dyn MetricsSink>,
66}
67
68impl<C: CanisterKind> DbSession<C> {
69    /// Construct one session facade for a database handle.
70    #[must_use]
71    pub(crate) const fn new(db: Db<C>) -> Self {
72        Self {
73            db,
74            debug: false,
75            metrics: None,
76        }
77    }
78
79    /// Construct one session facade from store registry and runtime hooks.
80    #[must_use]
81    pub const fn new_with_hooks(
82        store: &'static LocalKey<StoreRegistry>,
83        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84    ) -> Self {
85        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
86    }
87
88    /// Enable debug execution behavior where supported by executors.
89    #[must_use]
90    pub const fn debug(mut self) -> Self {
91        self.debug = true;
92        self
93    }
94
95    /// Attach one metrics sink for all session-executed operations.
96    #[must_use]
97    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
98        self.metrics = Some(sink);
99        self
100    }
101
102    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
103        if let Some(sink) = self.metrics {
104            with_metrics_sink(sink, f)
105        } else {
106            f()
107        }
108    }
109
110    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
111    fn execute_save_with<E, T, R>(
112        &self,
113        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
114        map: impl FnOnce(T) -> R,
115    ) -> Result<R, InternalError>
116    where
117        E: PersistedRow<Canister = C> + EntityValue,
118    {
119        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
120
121        Ok(map(value))
122    }
123
124    // Shared save-facade wrappers keep response shape explicit at call sites.
125    fn execute_save_entity<E>(
126        &self,
127        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
128    ) -> Result<E, InternalError>
129    where
130        E: PersistedRow<Canister = C> + EntityValue,
131    {
132        self.execute_save_with(op, std::convert::identity)
133    }
134
135    fn execute_save_batch<E>(
136        &self,
137        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
138    ) -> Result<WriteBatchResponse<E>, InternalError>
139    where
140        E: PersistedRow<Canister = C> + EntityValue,
141    {
142        self.execute_save_with(op, WriteBatchResponse::new)
143    }
144
145    // ---------------------------------------------------------------------
146    // Query entry points (public, fluent)
147    // ---------------------------------------------------------------------
148
149    /// Start a fluent load query with default missing-row policy (`Ignore`).
150    #[must_use]
151    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
152    where
153        E: EntityKind<Canister = C>,
154    {
155        FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
156    }
157
158    /// Start a fluent load query with explicit missing-row policy.
159    #[must_use]
160    pub const fn load_with_consistency<E>(
161        &self,
162        consistency: MissingRowPolicy,
163    ) -> FluentLoadQuery<'_, E>
164    where
165        E: EntityKind<Canister = C>,
166    {
167        FluentLoadQuery::new(self, Query::new(consistency))
168    }
169
170    /// Start a fluent delete query with default missing-row policy (`Ignore`).
171    #[must_use]
172    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
173    where
174        E: PersistedRow<Canister = C>,
175    {
176        FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
177    }
178
179    /// Start a fluent delete query with explicit missing-row policy.
180    #[must_use]
181    pub fn delete_with_consistency<E>(
182        &self,
183        consistency: MissingRowPolicy,
184    ) -> FluentDeleteQuery<'_, E>
185    where
186        E: PersistedRow<Canister = C>,
187    {
188        FluentDeleteQuery::new(self, Query::new(consistency).delete())
189    }
190
191    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
192    ///
193    /// This terminal bypasses query planning and access routing entirely.
194    #[must_use]
195    pub const fn select_one(&self) -> Value {
196        Value::Int(1)
197    }
198
199    /// Return one stable, human-readable index listing for the entity schema.
200    ///
201    /// Output format mirrors SQL-style introspection:
202    /// - `PRIMARY KEY (field)`
203    /// - `INDEX name (field_a, field_b)`
204    /// - `UNIQUE INDEX name (field_a, field_b)`
205    #[must_use]
206    pub fn show_indexes<E>(&self) -> Vec<String>
207    where
208        E: EntityKind<Canister = C>,
209    {
210        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
211    }
212
213    /// Return one stable, human-readable index listing for one schema model.
214    ///
215    /// This model-only helper is schema-owned and intentionally does not
216    /// attach runtime lifecycle state because it does not carry store
217    /// placement context.
218    #[must_use]
219    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
220        show_indexes_for_model(model)
221    }
222
223    // Return one stable, human-readable index listing for one resolved
224    // store/model pair, attaching the current runtime lifecycle state when the
225    // registry can resolve the backing store handle.
226    pub(in crate::db) fn show_indexes_for_store_model(
227        &self,
228        store_path: &str,
229        model: &'static EntityModel,
230    ) -> Vec<String> {
231        let runtime_state = self.try_index_state_for_store_path(store_path);
232
233        show_indexes_for_model_with_runtime_state(model, runtime_state)
234    }
235
236    /// Return one stable list of field descriptors for the entity schema.
237    #[must_use]
238    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
239    where
240        E: EntityKind<Canister = C>,
241    {
242        self.show_columns_for_model(E::MODEL)
243    }
244
245    /// Return one stable list of field descriptors for one schema model.
246    #[must_use]
247    pub fn show_columns_for_model(
248        &self,
249        model: &'static EntityModel,
250    ) -> Vec<EntityFieldDescription> {
251        describe_entity_model(model).fields().to_vec()
252    }
253
254    /// Return one stable list of runtime-registered entity names.
255    #[must_use]
256    pub fn show_entities(&self) -> Vec<String> {
257        self.db.runtime_entity_names()
258    }
259
260    // Best-effort runtime state lookup for metadata surfaces.
261    // SHOW INDEXES should stay readable even if one store handle is missing
262    // from the registry, so this helper falls back to the pure schema-owned
263    // listing instead of turning metadata inspection into an execution error.
264    fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
265        self.db
266            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
267            .map(|store| store.index_state())
268    }
269
270    // Resolve the exact secondary-index set that is visible to planner-owned
271    // query planning for one recovered store/model pair.
272    fn visible_indexes_for_store_model(
273        &self,
274        store_path: &str,
275        model: &'static EntityModel,
276    ) -> Result<VisibleIndexes<'static>, QueryError> {
277        // Phase 1: resolve the recovered store state once at the session
278        // boundary so query/executor planning does not reopen lifecycle checks.
279        let store = self
280            .db
281            .recovered_store(store_path)
282            .map_err(QueryError::execute)?;
283        let state = store.index_state();
284        if state != IndexState::Ready {
285            return Ok(VisibleIndexes::none());
286        }
287        debug_assert_eq!(state, IndexState::Ready);
288
289        // Phase 2: planner-visible indexes are exactly the model-owned index
290        // declarations once the recovered store is query-visible.
291        Ok(VisibleIndexes::planner_visible(model.indexes()))
292    }
293
294    /// Return one structured schema description for the entity.
295    ///
296    /// This is a typed `DESCRIBE`-style introspection surface consumed by
297    /// developer tooling and pre-EXPLAIN debugging.
298    #[must_use]
299    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
300    where
301        E: EntityKind<Canister = C>,
302    {
303        self.describe_entity_model(E::MODEL)
304    }
305
306    /// Return one structured schema description for one schema model.
307    #[must_use]
308    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
309        describe_entity_model(model)
310    }
311
312    /// Build one point-in-time storage report for observability endpoints.
313    pub fn storage_report(
314        &self,
315        name_to_path: &[(&'static str, &'static str)],
316    ) -> Result<StorageReport, InternalError> {
317        self.db.storage_report(name_to_path)
318    }
319
320    /// Build one point-in-time storage report using default entity-path labels.
321    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
322        self.db.storage_report_default()
323    }
324
325    /// Build one point-in-time integrity scan report for observability endpoints.
326    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
327        self.db.integrity_report()
328    }
329
330    /// Execute one bounded migration run with durable internal cursor state.
331    ///
332    /// Migration progress is persisted internally so upgrades/restarts can
333    /// resume from the last successful step without external cursor ownership.
334    pub fn execute_migration_plan(
335        &self,
336        plan: &MigrationPlan,
337        max_steps: usize,
338    ) -> Result<MigrationRunOutcome, InternalError> {
339        self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
340    }
341
342    // ---------------------------------------------------------------------
343    // Low-level executors (crate-internal; execution primitives)
344    // ---------------------------------------------------------------------
345
346    #[must_use]
347    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
348    where
349        E: EntityKind<Canister = C> + EntityValue,
350    {
351        LoadExecutor::new(self.db, self.debug)
352    }
353
354    #[must_use]
355    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
356    where
357        E: PersistedRow<Canister = C> + EntityValue,
358    {
359        DeleteExecutor::new(self.db, self.debug)
360    }
361
362    #[must_use]
363    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
364    where
365        E: PersistedRow<Canister = C> + EntityValue,
366    {
367        SaveExecutor::new(self.db, self.debug)
368    }
369}
370
371/// Remove one entity row from the authoritative data store only.
372///
373/// This hidden helper exists for stale-index test fixtures that need to keep
374/// secondary/index state intact while deleting the base row bytes.
375#[doc(hidden)]
376pub fn debug_remove_entity_row_data_only<C, E>(
377    session: &DbSession<C>,
378    key: &E::Key,
379) -> Result<bool, InternalError>
380where
381    C: CanisterKind,
382    E: PersistedRow<Canister = C> + EntityValue,
383{
384    // Phase 1: resolve the store through the recovered session boundary so
385    // the helper cannot mutate pre-recovery state.
386    let store = session.db.recovered_store(E::Store::PATH)?;
387
388    // Phase 2: remove only the raw row-store entry and compute the canonical
389    // storage key that any surviving secondary memberships still point at.
390    let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
391    let raw_key = data_key.to_raw()?;
392    let storage_key = data_key.storage_key();
393
394    // Phase 3: preserve the secondary entries but mark any surviving raw
395    // memberships as explicitly missing so stale-index fixtures can exercise
396    // impossible-state behavior without lying about row existence.
397    let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
398    if !removed {
399        return Ok(false);
400    }
401
402    store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
403
404    Ok(true)
405}
406
407/// Mark one recovered store index with one explicit lifecycle state.
408///
409/// This hidden helper exists for test fixtures that need to force one index
410/// out of the `Ready` state while keeping all other lifecycle plumbing
411/// unchanged.
412#[doc(hidden)]
413pub fn debug_mark_store_index_state<C>(
414    session: &DbSession<C>,
415    store_path: &str,
416    state: IndexState,
417) -> Result<(), InternalError>
418where
419    C: CanisterKind,
420{
421    // Phase 1: resolve the recovered store so lifecycle mutation cannot
422    // target pre-recovery state.
423    let store = session.db.recovered_store(store_path)?;
424
425    // Phase 2: apply the explicit lifecycle state directly to the index half
426    // of the store pair so tests can observe the `Ready` gate in isolation.
427    match state {
428        IndexState::Building => store.mark_index_building(),
429        IndexState::Ready => store.mark_index_ready(),
430        IndexState::Dropping => store.mark_index_dropping(),
431    }
432
433    Ok(())
434}