Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7#[cfg(feature = "sql")]
8mod sql;
9///
10/// TESTS
11///
12#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17    db::{
18        Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19        IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20        PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21        commit::EntityRuntimeHooks,
22        data::DataKey,
23        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
24        query::plan::VisibleIndexes,
25        schema::{
26            describe_entity_model, show_indexes_for_model,
27            show_indexes_for_model_with_runtime_state,
28        },
29    },
30    error::InternalError,
31    metrics::sink::{MetricsSink, with_metrics_sink},
32    model::entity::EntityModel,
33    traits::{CanisterKind, EntityKind, EntityValue, Path},
34    value::Value,
35};
36use std::thread::LocalKey;
37
38#[cfg(all(feature = "sql", feature = "perf-attribution"))]
39pub use sql::{LoweredSqlDispatchExecutorAttribution, SqlProjectionTextExecutorAttribution};
40#[cfg(feature = "sql")]
41pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
42#[cfg(all(feature = "sql", feature = "structural-read-metrics"))]
43pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
44
45///
46/// DbSession
47///
48/// Session-scoped database handle with policy (debug, metrics) and execution routing.
49///
50
51pub struct DbSession<C: CanisterKind> {
52    db: Db<C>,
53    debug: bool,
54    metrics: Option<&'static dyn MetricsSink>,
55}
56
57impl<C: CanisterKind> DbSession<C> {
58    /// Construct one session facade for a database handle.
59    #[must_use]
60    pub(crate) const fn new(db: Db<C>) -> Self {
61        Self {
62            db,
63            debug: false,
64            metrics: None,
65        }
66    }
67
68    /// Construct one session facade from store registry and runtime hooks.
69    #[must_use]
70    pub const fn new_with_hooks(
71        store: &'static LocalKey<StoreRegistry>,
72        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
73    ) -> Self {
74        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
75    }
76
77    /// Enable debug execution behavior where supported by executors.
78    #[must_use]
79    pub const fn debug(mut self) -> Self {
80        self.debug = true;
81        self
82    }
83
84    /// Attach one metrics sink for all session-executed operations.
85    #[must_use]
86    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
87        self.metrics = Some(sink);
88        self
89    }
90
91    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
92        if let Some(sink) = self.metrics {
93            with_metrics_sink(sink, f)
94        } else {
95            f()
96        }
97    }
98
99    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
100    fn execute_save_with<E, T, R>(
101        &self,
102        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
103        map: impl FnOnce(T) -> R,
104    ) -> Result<R, InternalError>
105    where
106        E: PersistedRow<Canister = C> + EntityValue,
107    {
108        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
109
110        Ok(map(value))
111    }
112
113    // Shared save-facade wrappers keep response shape explicit at call sites.
114    fn execute_save_entity<E>(
115        &self,
116        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
117    ) -> Result<E, InternalError>
118    where
119        E: PersistedRow<Canister = C> + EntityValue,
120    {
121        self.execute_save_with(op, std::convert::identity)
122    }
123
124    fn execute_save_batch<E>(
125        &self,
126        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
127    ) -> Result<WriteBatchResponse<E>, InternalError>
128    where
129        E: PersistedRow<Canister = C> + EntityValue,
130    {
131        self.execute_save_with(op, WriteBatchResponse::new)
132    }
133
134    // ---------------------------------------------------------------------
135    // Query entry points (public, fluent)
136    // ---------------------------------------------------------------------
137
138    /// Start a fluent load query with default missing-row policy (`Ignore`).
139    #[must_use]
140    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
141    where
142        E: EntityKind<Canister = C>,
143    {
144        FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
145    }
146
147    /// Start a fluent load query with explicit missing-row policy.
148    #[must_use]
149    pub const fn load_with_consistency<E>(
150        &self,
151        consistency: MissingRowPolicy,
152    ) -> FluentLoadQuery<'_, E>
153    where
154        E: EntityKind<Canister = C>,
155    {
156        FluentLoadQuery::new(self, Query::new(consistency))
157    }
158
159    /// Start a fluent delete query with default missing-row policy (`Ignore`).
160    #[must_use]
161    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
162    where
163        E: PersistedRow<Canister = C>,
164    {
165        FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
166    }
167
168    /// Start a fluent delete query with explicit missing-row policy.
169    #[must_use]
170    pub fn delete_with_consistency<E>(
171        &self,
172        consistency: MissingRowPolicy,
173    ) -> FluentDeleteQuery<'_, E>
174    where
175        E: PersistedRow<Canister = C>,
176    {
177        FluentDeleteQuery::new(self, Query::new(consistency).delete())
178    }
179
180    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
181    ///
182    /// This terminal bypasses query planning and access routing entirely.
183    #[must_use]
184    pub const fn select_one(&self) -> Value {
185        Value::Int(1)
186    }
187
188    /// Return one stable, human-readable index listing for the entity schema.
189    ///
190    /// Output format mirrors SQL-style introspection:
191    /// - `PRIMARY KEY (field)`
192    /// - `INDEX name (field_a, field_b)`
193    /// - `UNIQUE INDEX name (field_a, field_b)`
194    #[must_use]
195    pub fn show_indexes<E>(&self) -> Vec<String>
196    where
197        E: EntityKind<Canister = C>,
198    {
199        self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
200    }
201
202    /// Return one stable, human-readable index listing for one schema model.
203    ///
204    /// This model-only helper is schema-owned and intentionally does not
205    /// attach runtime lifecycle state because it does not carry store
206    /// placement context.
207    #[must_use]
208    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
209        show_indexes_for_model(model)
210    }
211
212    // Return one stable, human-readable index listing for one resolved
213    // store/model pair, attaching the current runtime lifecycle state when the
214    // registry can resolve the backing store handle.
215    pub(in crate::db) fn show_indexes_for_store_model(
216        &self,
217        store_path: &str,
218        model: &'static EntityModel,
219    ) -> Vec<String> {
220        let runtime_state = self.try_index_state_for_store_path(store_path);
221
222        show_indexes_for_model_with_runtime_state(model, runtime_state)
223    }
224
225    /// Return one stable list of field descriptors for the entity schema.
226    #[must_use]
227    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
228    where
229        E: EntityKind<Canister = C>,
230    {
231        self.show_columns_for_model(E::MODEL)
232    }
233
234    /// Return one stable list of field descriptors for one schema model.
235    #[must_use]
236    pub fn show_columns_for_model(
237        &self,
238        model: &'static EntityModel,
239    ) -> Vec<EntityFieldDescription> {
240        describe_entity_model(model).fields().to_vec()
241    }
242
243    /// Return one stable list of runtime-registered entity names.
244    #[must_use]
245    pub fn show_entities(&self) -> Vec<String> {
246        self.db.runtime_entity_names()
247    }
248
249    // Best-effort runtime state lookup for metadata surfaces.
250    // SHOW INDEXES should stay readable even if one store handle is missing
251    // from the registry, so this helper falls back to the pure schema-owned
252    // listing instead of turning metadata inspection into an execution error.
253    fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
254        self.db
255            .with_store_registry(|registry| registry.try_get_store(store_path).ok())
256            .map(|store| store.index_state())
257    }
258
259    // Resolve the exact secondary-index set that is visible to planner-owned
260    // query planning for one recovered store/model pair.
261    fn visible_indexes_for_store_model(
262        &self,
263        store_path: &str,
264        model: &'static EntityModel,
265    ) -> Result<VisibleIndexes<'static>, QueryError> {
266        // Phase 1: resolve the recovered store state once at the session
267        // boundary so query/executor planning does not reopen lifecycle checks.
268        let store = self
269            .db
270            .recovered_store(store_path)
271            .map_err(QueryError::execute)?;
272        let state = store.index_state();
273        if state != IndexState::Ready {
274            return Ok(VisibleIndexes::none());
275        }
276        debug_assert_eq!(state, IndexState::Ready);
277
278        // Phase 2: planner-visible indexes are exactly the model-owned index
279        // declarations once the recovered store is query-visible.
280        Ok(VisibleIndexes::planner_visible(model.indexes()))
281    }
282
283    /// Return one structured schema description for the entity.
284    ///
285    /// This is a typed `DESCRIBE`-style introspection surface consumed by
286    /// developer tooling and pre-EXPLAIN debugging.
287    #[must_use]
288    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
289    where
290        E: EntityKind<Canister = C>,
291    {
292        self.describe_entity_model(E::MODEL)
293    }
294
295    /// Return one structured schema description for one schema model.
296    #[must_use]
297    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
298        describe_entity_model(model)
299    }
300
301    /// Build one point-in-time storage report for observability endpoints.
302    pub fn storage_report(
303        &self,
304        name_to_path: &[(&'static str, &'static str)],
305    ) -> Result<StorageReport, InternalError> {
306        self.db.storage_report(name_to_path)
307    }
308
309    /// Build one point-in-time storage report using default entity-path labels.
310    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
311        self.db.storage_report_default()
312    }
313
314    /// Build one point-in-time integrity scan report for observability endpoints.
315    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
316        self.db.integrity_report()
317    }
318
319    /// Execute one bounded migration run with durable internal cursor state.
320    ///
321    /// Migration progress is persisted internally so upgrades/restarts can
322    /// resume from the last successful step without external cursor ownership.
323    pub fn execute_migration_plan(
324        &self,
325        plan: &MigrationPlan,
326        max_steps: usize,
327    ) -> Result<MigrationRunOutcome, InternalError> {
328        self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
329    }
330
331    // ---------------------------------------------------------------------
332    // Low-level executors (crate-internal; execution primitives)
333    // ---------------------------------------------------------------------
334
335    #[must_use]
336    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
337    where
338        E: EntityKind<Canister = C> + EntityValue,
339    {
340        LoadExecutor::new(self.db, self.debug)
341    }
342
343    #[must_use]
344    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
345    where
346        E: PersistedRow<Canister = C> + EntityValue,
347    {
348        DeleteExecutor::new(self.db)
349    }
350
351    #[must_use]
352    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
353    where
354        E: PersistedRow<Canister = C> + EntityValue,
355    {
356        SaveExecutor::new(self.db, self.debug)
357    }
358}
359
360/// Remove one entity row from the authoritative data store only.
361///
362/// This hidden helper exists for stale-index test fixtures that need to keep
363/// secondary/index state intact while deleting the base row bytes.
364#[doc(hidden)]
365pub fn debug_remove_entity_row_data_only<C, E>(
366    session: &DbSession<C>,
367    key: &E::Key,
368) -> Result<bool, InternalError>
369where
370    C: CanisterKind,
371    E: PersistedRow<Canister = C> + EntityValue,
372{
373    // Phase 1: resolve the store through the recovered session boundary so
374    // the helper cannot mutate pre-recovery state.
375    let store = session.db.recovered_store(E::Store::PATH)?;
376
377    // Phase 2: remove only the raw row-store entry and compute the canonical
378    // storage key that any surviving secondary memberships still point at.
379    let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
380    let raw_key = data_key.to_raw()?;
381    let storage_key = data_key.storage_key();
382
383    // Phase 3: preserve the secondary entries but mark any surviving raw
384    // memberships as explicitly missing so stale-index fixtures can exercise
385    // impossible-state behavior without lying about row existence.
386    let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
387    if !removed {
388        return Ok(false);
389    }
390
391    store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
392
393    Ok(true)
394}
395
396/// Mark one recovered store index with one explicit lifecycle state.
397///
398/// This hidden helper exists for test fixtures that need to force one index
399/// out of the `Ready` state while keeping all other lifecycle plumbing
400/// unchanged.
401#[doc(hidden)]
402pub fn debug_mark_store_index_state<C>(
403    session: &DbSession<C>,
404    store_path: &str,
405    state: IndexState,
406) -> Result<(), InternalError>
407where
408    C: CanisterKind,
409{
410    // Phase 1: resolve the recovered store so lifecycle mutation cannot
411    // target pre-recovery state.
412    let store = session.db.recovered_store(store_path)?;
413
414    // Phase 2: apply the explicit lifecycle state directly to the index half
415    // of the store pair so tests can observe the `Ready` gate in isolation.
416    match state {
417        IndexState::Building => store.mark_index_building(),
418        IndexState::Ready => store.mark_index_ready(),
419        IndexState::Dropping => store.mark_index_dropping(),
420    }
421
422    Ok(())
423}