Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7#[cfg(feature = "sql")]
8mod sql;
9///
10/// TESTS
11///
12#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17    db::{
18        Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19        IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20        PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21        commit::EntityRuntimeHooks,
22        cursor::{decode_optional_cursor_token, decode_optional_grouped_cursor_token},
23        data::DataKey,
24        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
25        schema::{
26            describe_entity_model, show_indexes_for_model,
27            show_indexes_for_model_with_runtime_state,
28        },
29    },
30    error::InternalError,
31    metrics::sink::{MetricsSink, with_metrics_sink},
32    model::entity::EntityModel,
33    traits::{CanisterKind, EntityKind, EntityValue, Path},
34    value::Value,
35};
36use std::thread::LocalKey;
37
38#[cfg(feature = "sql")]
39pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
40
41// Decode one optional external cursor token and map decode failures into the
42// query-plan cursor error boundary.
43fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
44    decode_optional_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
45}
46
47// Decode one optional grouped continuation token through the existing cursor
48// text boundary while preserving grouped-token ownership for grouped resume.
49fn decode_optional_grouped_cursor(
50    cursor_token: Option<&str>,
51) -> Result<Option<crate::db::cursor::GroupedContinuationToken>, QueryError> {
52    decode_optional_grouped_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
53}
54
55///
56/// DbSession
57///
58/// Session-scoped database handle with policy (debug, metrics) and execution routing.
59///
60
61pub struct DbSession<C: CanisterKind> {
62    db: Db<C>,
63    debug: bool,
64    metrics: Option<&'static dyn MetricsSink>,
65}
66
67impl<C: CanisterKind> DbSession<C> {
68    /// Construct one session facade for a database handle.
69    #[must_use]
70    pub(crate) const fn new(db: Db<C>) -> Self {
71        Self {
72            db,
73            debug: false,
74            metrics: None,
75        }
76    }
77
78    /// Construct one session facade from store registry and runtime hooks.
79    #[must_use]
80    pub const fn new_with_hooks(
81        store: &'static LocalKey<StoreRegistry>,
82        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
83    ) -> Self {
84        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
85    }
86
87    /// Enable debug execution behavior where supported by executors.
88    #[must_use]
89    pub const fn debug(mut self) -> Self {
90        self.debug = true;
91        self
92    }
93
94    /// Attach one metrics sink for all session-executed operations.
95    #[must_use]
96    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
97        self.metrics = Some(sink);
98        self
99    }
100
101    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
102        if let Some(sink) = self.metrics {
103            with_metrics_sink(sink, f)
104        } else {
105            f()
106        }
107    }
108
109    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
110    fn execute_save_with<E, T, R>(
111        &self,
112        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
113        map: impl FnOnce(T) -> R,
114    ) -> Result<R, InternalError>
115    where
116        E: PersistedRow<Canister = C> + EntityValue,
117    {
118        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
119
120        Ok(map(value))
121    }
122
123    // Shared save-facade wrappers keep response shape explicit at call sites.
124    fn execute_save_entity<E>(
125        &self,
126        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
127    ) -> Result<E, InternalError>
128    where
129        E: PersistedRow<Canister = C> + EntityValue,
130    {
131        self.execute_save_with(op, std::convert::identity)
132    }
133
134    fn execute_save_batch<E>(
135        &self,
136        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
137    ) -> Result<WriteBatchResponse<E>, InternalError>
138    where
139        E: PersistedRow<Canister = C> + EntityValue,
140    {
141        self.execute_save_with(op, WriteBatchResponse::new)
142    }
143
144    // ---------------------------------------------------------------------
145    // Query entry points (public, fluent)
146    // ---------------------------------------------------------------------
147
148    /// Start a fluent load query with default missing-row policy (`Ignore`).
149    #[must_use]
150    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
151    where
152        E: EntityKind<Canister = C>,
153    {
154        FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
155    }
156
157    /// Start a fluent load query with explicit missing-row policy.
158    #[must_use]
159    pub const fn load_with_consistency<E>(
160        &self,
161        consistency: MissingRowPolicy,
162    ) -> FluentLoadQuery<'_, E>
163    where
164        E: EntityKind<Canister = C>,
165    {
166        FluentLoadQuery::new(self, Query::new(consistency))
167    }
168
169    /// Start a fluent delete query with default missing-row policy (`Ignore`).
170    #[must_use]
171    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
172    where
173        E: PersistedRow<Canister = C>,
174    {
175        FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
176    }
177
178    /// Start a fluent delete query with explicit missing-row policy.
179    #[must_use]
180    pub fn delete_with_consistency<E>(
181        &self,
182        consistency: MissingRowPolicy,
183    ) -> FluentDeleteQuery<'_, E>
184    where
185        E: PersistedRow<Canister = C>,
186    {
187        FluentDeleteQuery::new(self, Query::new(consistency).delete())
188    }
189
190    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
191    ///
192    /// This terminal bypasses query planning and access routing entirely.
193    #[must_use]
194    pub const fn select_one(&self) -> Value {
195        Value::Int(1)
196    }
197
198    /// Return one stable, human-readable index listing for the entity schema.
199    ///
200    /// Output format mirrors SQL-style introspection:
201    /// - `PRIMARY KEY (field)`
202    /// - `INDEX name (field_a, field_b)`
203    /// - `UNIQUE INDEX name (field_a, field_b)`
204    #[must_use]
205    pub fn show_indexes<E>(&self) -> Vec<String>
206    where
207        E: EntityKind<Canister = C>,
208    {
209        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
210    }
211
212    /// Return one stable, human-readable index listing for one schema model.
213    ///
214    /// This model-only helper is schema-owned and intentionally does not
215    /// attach runtime lifecycle state because it does not carry store
216    /// placement authority.
217    #[must_use]
218    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
219        show_indexes_for_model(model)
220    }
221
222    // Return one stable, human-readable index listing for one resolved
223    // store/model pair, attaching the current runtime lifecycle state when the
224    // registry can resolve the backing store handle.
225    pub(in crate::db) fn show_indexes_for_store_model(
226        &self,
227        store_path: &str,
228        model: &'static EntityModel,
229    ) -> Vec<String> {
230        let runtime_state = self.try_index_state_for_store_path(store_path);
231
232        show_indexes_for_model_with_runtime_state(model, runtime_state)
233    }
234
235    /// Return one stable list of field descriptors for the entity schema.
236    #[must_use]
237    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
238    where
239        E: EntityKind<Canister = C>,
240    {
241        self.show_columns_for_model(E::MODEL)
242    }
243
244    /// Return one stable list of field descriptors for one schema model.
245    #[must_use]
246    pub fn show_columns_for_model(
247        &self,
248        model: &'static EntityModel,
249    ) -> Vec<EntityFieldDescription> {
250        describe_entity_model(model).fields().to_vec()
251    }
252
253    /// Return one stable list of runtime-registered entity names.
254    #[must_use]
255    pub fn show_entities(&self) -> Vec<String> {
256        self.db.runtime_entity_names()
257    }
258
259    // Best-effort runtime state lookup for metadata surfaces.
260    // SHOW INDEXES should stay readable even if one store handle is missing
261    // from the registry, so this helper falls back to the pure schema-owned
262    // listing instead of turning metadata inspection into an execution error.
263    fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
264        self.db
265            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
266            .map(|store| store.index_state())
267    }
268
269    /// Return one structured schema description for the entity.
270    ///
271    /// This is a typed `DESCRIBE`-style introspection surface consumed by
272    /// developer tooling and pre-EXPLAIN debugging.
273    #[must_use]
274    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
275    where
276        E: EntityKind<Canister = C>,
277    {
278        self.describe_entity_model(E::MODEL)
279    }
280
281    /// Return one structured schema description for one schema model.
282    #[must_use]
283    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
284        describe_entity_model(model)
285    }
286
287    /// Build one point-in-time storage report for observability endpoints.
288    pub fn storage_report(
289        &self,
290        name_to_path: &[(&'static str, &'static str)],
291    ) -> Result<StorageReport, InternalError> {
292        self.db.storage_report(name_to_path)
293    }
294
295    /// Build one point-in-time storage report using default entity-path labels.
296    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
297        self.db.storage_report_default()
298    }
299
300    /// Build one point-in-time integrity scan report for observability endpoints.
301    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
302        self.db.integrity_report()
303    }
304
305    /// Execute one bounded migration run with durable internal cursor state.
306    ///
307    /// Migration progress is persisted internally so upgrades/restarts can
308    /// resume from the last successful step without external cursor ownership.
309    pub fn execute_migration_plan(
310        &self,
311        plan: &MigrationPlan,
312        max_steps: usize,
313    ) -> Result<MigrationRunOutcome, InternalError> {
314        self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
315    }
316
317    // ---------------------------------------------------------------------
318    // Low-level executors (crate-internal; execution primitives)
319    // ---------------------------------------------------------------------
320
321    #[must_use]
322    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
323    where
324        E: EntityKind<Canister = C> + EntityValue,
325    {
326        LoadExecutor::new(self.db, self.debug)
327    }
328
329    #[must_use]
330    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
331    where
332        E: PersistedRow<Canister = C> + EntityValue,
333    {
334        DeleteExecutor::new(self.db, self.debug)
335    }
336
337    #[must_use]
338    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
339    where
340        E: PersistedRow<Canister = C> + EntityValue,
341    {
342        SaveExecutor::new(self.db, self.debug)
343    }
344}
345
346/// Remove one entity row from the authoritative data store only.
347///
348/// This hidden helper exists for stale-index test fixtures that need to keep
349/// secondary/index state intact while deleting the base row bytes.
350#[doc(hidden)]
351pub fn debug_remove_entity_row_data_only<C, E>(
352    session: &DbSession<C>,
353    key: &E::Key,
354) -> Result<bool, InternalError>
355where
356    C: CanisterKind,
357    E: PersistedRow<Canister = C> + EntityValue,
358{
359    // Phase 1: resolve the store through the recovered session boundary so
360    // the helper cannot mutate pre-recovery state.
361    let store = session.db.recovered_store(E::Store::PATH)?;
362
363    // Phase 2: remove only the raw row-store entry and compute the canonical
364    // storage key that any surviving secondary memberships still point at.
365    let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
366    let raw_key = data_key.to_raw()?;
367    let storage_key = data_key.storage_key();
368
369    // Phase 3: preserve the secondary entries but mark any surviving raw
370    // memberships as explicitly missing so storage-owned existence-witness
371    // tests can exercise the stale path without lying about row existence.
372    let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
373    if !removed {
374        return Ok(false);
375    }
376
377    store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
378    store.mark_secondary_existence_witness_authoritative();
379
380    Ok(true)
381}
382
383/// Mark one recovered store index with one explicit lifecycle state.
384///
385/// This hidden helper exists for test fixtures that need to force one index
386/// out of the `Valid` state while keeping all other authority machinery
387/// unchanged.
388#[doc(hidden)]
389pub fn debug_mark_store_index_state<C>(
390    session: &DbSession<C>,
391    store_path: &str,
392    state: IndexState,
393) -> Result<(), InternalError>
394where
395    C: CanisterKind,
396{
397    // Phase 1: resolve the recovered store so lifecycle mutation cannot
398    // target pre-recovery state.
399    let store = session.db.recovered_store(store_path)?;
400
401    // Phase 2: apply the explicit lifecycle state directly to the index half
402    // of the store pair. Covering authority bits remain untouched so tests
403    // can observe the `Valid` gate fail closed in isolation.
404    match state {
405        IndexState::Building => store.mark_index_building(),
406        IndexState::Valid => store.mark_index_valid(),
407        IndexState::Dropping => store.mark_index_dropping(),
408    }
409
410    Ok(())
411}