Skip to main content

icydb_core/db/session/
mod.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6mod query;
7#[cfg(feature = "sql")]
8mod sql;
9///
10/// TESTS
11///
12#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17    db::{
18        Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19        IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy, PersistedRow, Query,
20        QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21        commit::EntityRuntimeHooks,
22        cursor::{decode_optional_cursor_token, decode_optional_grouped_cursor_token},
23        data::DataKey,
24        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
25        schema::{describe_entity_model, show_indexes_for_model},
26    },
27    error::InternalError,
28    metrics::sink::{MetricsSink, with_metrics_sink},
29    model::entity::EntityModel,
30    traits::{CanisterKind, EntityKind, EntityValue, Path},
31    value::Value,
32};
33use std::thread::LocalKey;
34
35#[cfg(feature = "sql")]
36pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
37
38// Decode one optional external cursor token and map decode failures into the
39// query-plan cursor error boundary.
40fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
41    decode_optional_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
42}
43
44// Decode one optional grouped continuation token through the existing cursor
45// text boundary while preserving grouped-token ownership for grouped resume.
46fn decode_optional_grouped_cursor(
47    cursor_token: Option<&str>,
48) -> Result<Option<crate::db::cursor::GroupedContinuationToken>, QueryError> {
49    decode_optional_grouped_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
50}
51
52///
53/// DbSession
54///
55/// Session-scoped database handle with policy (debug, metrics) and execution routing.
56///
57
58pub struct DbSession<C: CanisterKind> {
59    db: Db<C>,
60    debug: bool,
61    metrics: Option<&'static dyn MetricsSink>,
62}
63
64impl<C: CanisterKind> DbSession<C> {
65    /// Construct one session facade for a database handle.
66    #[must_use]
67    pub(crate) const fn new(db: Db<C>) -> Self {
68        Self {
69            db,
70            debug: false,
71            metrics: None,
72        }
73    }
74
75    /// Construct one session facade from store registry and runtime hooks.
76    #[must_use]
77    pub const fn new_with_hooks(
78        store: &'static LocalKey<StoreRegistry>,
79        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
80    ) -> Self {
81        Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
82    }
83
84    /// Enable debug execution behavior where supported by executors.
85    #[must_use]
86    pub const fn debug(mut self) -> Self {
87        self.debug = true;
88        self
89    }
90
91    /// Attach one metrics sink for all session-executed operations.
92    #[must_use]
93    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
94        self.metrics = Some(sink);
95        self
96    }
97
98    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
99        if let Some(sink) = self.metrics {
100            with_metrics_sink(sink, f)
101        } else {
102            f()
103        }
104    }
105
106    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
107    fn execute_save_with<E, T, R>(
108        &self,
109        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
110        map: impl FnOnce(T) -> R,
111    ) -> Result<R, InternalError>
112    where
113        E: PersistedRow<Canister = C> + EntityValue,
114    {
115        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
116
117        Ok(map(value))
118    }
119
120    // Shared save-facade wrappers keep response shape explicit at call sites.
121    fn execute_save_entity<E>(
122        &self,
123        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
124    ) -> Result<E, InternalError>
125    where
126        E: PersistedRow<Canister = C> + EntityValue,
127    {
128        self.execute_save_with(op, std::convert::identity)
129    }
130
131    fn execute_save_batch<E>(
132        &self,
133        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
134    ) -> Result<WriteBatchResponse<E>, InternalError>
135    where
136        E: PersistedRow<Canister = C> + EntityValue,
137    {
138        self.execute_save_with(op, WriteBatchResponse::new)
139    }
140
141    // ---------------------------------------------------------------------
142    // Query entry points (public, fluent)
143    // ---------------------------------------------------------------------
144
145    /// Start a fluent load query with default missing-row policy (`Ignore`).
146    #[must_use]
147    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
148    where
149        E: EntityKind<Canister = C>,
150    {
151        FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
152    }
153
154    /// Start a fluent load query with explicit missing-row policy.
155    #[must_use]
156    pub const fn load_with_consistency<E>(
157        &self,
158        consistency: MissingRowPolicy,
159    ) -> FluentLoadQuery<'_, E>
160    where
161        E: EntityKind<Canister = C>,
162    {
163        FluentLoadQuery::new(self, Query::new(consistency))
164    }
165
166    /// Start a fluent delete query with default missing-row policy (`Ignore`).
167    #[must_use]
168    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
169    where
170        E: PersistedRow<Canister = C>,
171    {
172        FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
173    }
174
175    /// Start a fluent delete query with explicit missing-row policy.
176    #[must_use]
177    pub fn delete_with_consistency<E>(
178        &self,
179        consistency: MissingRowPolicy,
180    ) -> FluentDeleteQuery<'_, E>
181    where
182        E: PersistedRow<Canister = C>,
183    {
184        FluentDeleteQuery::new(self, Query::new(consistency).delete())
185    }
186
187    /// Return one constant scalar row equivalent to SQL `SELECT 1`.
188    ///
189    /// This terminal bypasses query planning and access routing entirely.
190    #[must_use]
191    pub const fn select_one(&self) -> Value {
192        Value::Int(1)
193    }
194
195    /// Return one stable, human-readable index listing for the entity schema.
196    ///
197    /// Output format mirrors SQL-style introspection:
198    /// - `PRIMARY KEY (field)`
199    /// - `INDEX name (field_a, field_b)`
200    /// - `UNIQUE INDEX name (field_a, field_b)`
201    #[must_use]
202    pub fn show_indexes<E>(&self) -> Vec<String>
203    where
204        E: EntityKind<Canister = C>,
205    {
206        self.show_indexes_for_model(E::MODEL)
207    }
208
209    /// Return one stable, human-readable index listing for one schema model.
210    #[must_use]
211    pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
212        show_indexes_for_model(model)
213    }
214
215    /// Return one stable list of field descriptors for the entity schema.
216    #[must_use]
217    pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
218    where
219        E: EntityKind<Canister = C>,
220    {
221        self.show_columns_for_model(E::MODEL)
222    }
223
224    /// Return one stable list of field descriptors for one schema model.
225    #[must_use]
226    pub fn show_columns_for_model(
227        &self,
228        model: &'static EntityModel,
229    ) -> Vec<EntityFieldDescription> {
230        describe_entity_model(model).fields().to_vec()
231    }
232
233    /// Return one stable list of runtime-registered entity names.
234    #[must_use]
235    pub fn show_entities(&self) -> Vec<String> {
236        self.db.runtime_entity_names()
237    }
238
239    /// Return one structured schema description for the entity.
240    ///
241    /// This is a typed `DESCRIBE`-style introspection surface consumed by
242    /// developer tooling and pre-EXPLAIN debugging.
243    #[must_use]
244    pub fn describe_entity<E>(&self) -> EntitySchemaDescription
245    where
246        E: EntityKind<Canister = C>,
247    {
248        self.describe_entity_model(E::MODEL)
249    }
250
251    /// Return one structured schema description for one schema model.
252    #[must_use]
253    pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
254        describe_entity_model(model)
255    }
256
257    /// Build one point-in-time storage report for observability endpoints.
258    pub fn storage_report(
259        &self,
260        name_to_path: &[(&'static str, &'static str)],
261    ) -> Result<StorageReport, InternalError> {
262        self.db.storage_report(name_to_path)
263    }
264
265    /// Build one point-in-time storage report using default entity-path labels.
266    pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
267        self.db.storage_report_default()
268    }
269
270    /// Build one point-in-time integrity scan report for observability endpoints.
271    pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
272        self.db.integrity_report()
273    }
274
275    /// Execute one bounded migration run with durable internal cursor state.
276    ///
277    /// Migration progress is persisted internally so upgrades/restarts can
278    /// resume from the last successful step without external cursor ownership.
279    pub fn execute_migration_plan(
280        &self,
281        plan: &MigrationPlan,
282        max_steps: usize,
283    ) -> Result<MigrationRunOutcome, InternalError> {
284        self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
285    }
286
287    // ---------------------------------------------------------------------
288    // Low-level executors (crate-internal; execution primitives)
289    // ---------------------------------------------------------------------
290
291    #[must_use]
292    pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
293    where
294        E: EntityKind<Canister = C> + EntityValue,
295    {
296        LoadExecutor::new(self.db, self.debug)
297    }
298
299    #[must_use]
300    pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
301    where
302        E: PersistedRow<Canister = C> + EntityValue,
303    {
304        DeleteExecutor::new(self.db, self.debug)
305    }
306
307    #[must_use]
308    pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
309    where
310        E: PersistedRow<Canister = C> + EntityValue,
311    {
312        SaveExecutor::new(self.db, self.debug)
313    }
314}
315
316/// Remove one entity row from the authoritative data store only.
317///
318/// This hidden helper exists for stale-index test fixtures that need to keep
319/// secondary/index state intact while deleting the base row bytes.
320#[doc(hidden)]
321pub fn debug_remove_entity_row_data_only<C, E>(
322    session: &DbSession<C>,
323    key: &E::Key,
324) -> Result<bool, InternalError>
325where
326    C: CanisterKind,
327    E: PersistedRow<Canister = C> + EntityValue,
328{
329    // Phase 1: resolve the store through the recovered session boundary so
330    // the helper cannot mutate pre-recovery state.
331    let store = session.db.recovered_store(E::Store::PATH)?;
332
333    // Phase 2: remove only the raw row-store entry and compute the canonical
334    // storage key that any surviving secondary memberships still point at.
335    let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
336    let raw_key = data_key.to_raw()?;
337    let storage_key = data_key.storage_key();
338
339    // Phase 3: preserve the secondary entries but mark any surviving raw
340    // memberships as explicitly missing so storage-owned existence-witness
341    // tests can exercise the stale path without lying about row existence.
342    let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
343    if !removed {
344        return Ok(false);
345    }
346
347    store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
348    store.mark_secondary_existence_witness_authoritative();
349
350    Ok(true)
351}