Skip to main content

icydb_core/db/
session.rs

1//! Module: session
2//! Responsibility: user-facing query/write execution facade over db executors.
3//! Does not own: planning semantics, cursor validation rules, or storage mutation protocol.
4//! Boundary: converts fluent/query intent calls into executor operations and response DTOs.
5
6#[cfg(test)]
7use crate::db::{DataStore, IndexStore};
8use crate::{
9    db::{
10        Db, FluentDeleteQuery, FluentLoadQuery, MissingRowPolicy, PagedGroupedExecutionWithTrace,
11        PagedLoadExecutionWithTrace, PlanError, Query, QueryError, Response, WriteBatchResponse,
12        WriteResponse,
13        cursor::CursorPlanError,
14        decode_cursor,
15        executor::{DeleteExecutor, ExecutablePlan, ExecutorPlanError, LoadExecutor, SaveExecutor},
16        query::intent::QueryMode,
17    },
18    error::InternalError,
19    obs::sink::{MetricsSink, with_metrics_sink},
20    traits::{CanisterKind, EntityKind, EntityValue},
21};
22
23// Map executor-owned plan-surface failures into query-owned plan errors.
24fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
25    match err {
26        ExecutorPlanError::Cursor(err) => QueryError::from(PlanError::from(*err)),
27    }
28}
29
30///
31/// DbSession
32///
33/// Session-scoped database handle with policy (debug, metrics) and execution routing.
34///
35
36pub struct DbSession<C: CanisterKind> {
37    db: Db<C>,
38    debug: bool,
39    metrics: Option<&'static dyn MetricsSink>,
40}
41
42impl<C: CanisterKind> DbSession<C> {
43    /// Construct one session facade for a database handle.
44    #[must_use]
45    pub const fn new(db: Db<C>) -> Self {
46        Self {
47            db,
48            debug: false,
49            metrics: None,
50        }
51    }
52
53    /// Enable debug execution behavior where supported by executors.
54    #[must_use]
55    pub const fn debug(mut self) -> Self {
56        self.debug = true;
57        self
58    }
59
60    /// Attach one metrics sink for all session-executed operations.
61    #[must_use]
62    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
63        self.metrics = Some(sink);
64        self
65    }
66
67    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
68        if let Some(sink) = self.metrics {
69            with_metrics_sink(sink, f)
70        } else {
71            f()
72        }
73    }
74
75    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
76    fn execute_save_with<E, T, R>(
77        &self,
78        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
79        map: impl FnOnce(T) -> R,
80    ) -> Result<R, InternalError>
81    where
82        E: EntityKind<Canister = C> + EntityValue,
83    {
84        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
85
86        Ok(map(value))
87    }
88
89    // Shared save-facade wrappers keep response shape explicit at call sites.
90    fn execute_save_entity<E>(
91        &self,
92        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
93    ) -> Result<WriteResponse<E>, InternalError>
94    where
95        E: EntityKind<Canister = C> + EntityValue,
96    {
97        self.execute_save_with(op, WriteResponse::new)
98    }
99
100    fn execute_save_batch<E>(
101        &self,
102        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
103    ) -> Result<WriteBatchResponse<E>, InternalError>
104    where
105        E: EntityKind<Canister = C> + EntityValue,
106    {
107        self.execute_save_with(op, WriteBatchResponse::new)
108    }
109
110    fn execute_save_view<E>(
111        &self,
112        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
113    ) -> Result<E::ViewType, InternalError>
114    where
115        E: EntityKind<Canister = C> + EntityValue,
116    {
117        self.execute_save_with(op, std::convert::identity)
118    }
119
120    // ---------------------------------------------------------------------
121    // Query entry points (public, fluent)
122    // ---------------------------------------------------------------------
123
124    /// Start a fluent load query with default missing-row policy (`Ignore`).
125    #[must_use]
126    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
127    where
128        E: EntityKind<Canister = C>,
129    {
130        FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
131    }
132
133    /// Start a fluent load query with explicit missing-row policy.
134    #[must_use]
135    pub const fn load_with_consistency<E>(
136        &self,
137        consistency: MissingRowPolicy,
138    ) -> FluentLoadQuery<'_, E>
139    where
140        E: EntityKind<Canister = C>,
141    {
142        FluentLoadQuery::new(self, Query::new(consistency))
143    }
144
145    /// Start a fluent delete query with default missing-row policy (`Ignore`).
146    #[must_use]
147    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
148    where
149        E: EntityKind<Canister = C>,
150    {
151        FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
152    }
153
154    /// Start a fluent delete query with explicit missing-row policy.
155    #[must_use]
156    pub fn delete_with_consistency<E>(
157        &self,
158        consistency: MissingRowPolicy,
159    ) -> FluentDeleteQuery<'_, E>
160    where
161        E: EntityKind<Canister = C>,
162    {
163        FluentDeleteQuery::new(self, Query::new(consistency).delete())
164    }
165
166    // ---------------------------------------------------------------------
167    // Low-level executors (crate-internal; execution primitives)
168    // ---------------------------------------------------------------------
169
170    #[must_use]
171    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
172    where
173        E: EntityKind<Canister = C> + EntityValue,
174    {
175        LoadExecutor::new(self.db, self.debug)
176    }
177
178    #[must_use]
179    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
180    where
181        E: EntityKind<Canister = C> + EntityValue,
182    {
183        DeleteExecutor::new(self.db, self.debug)
184    }
185
186    #[must_use]
187    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
188    where
189        E: EntityKind<Canister = C> + EntityValue,
190    {
191        SaveExecutor::new(self.db, self.debug)
192    }
193
194    // ---------------------------------------------------------------------
195    // Query diagnostics / execution (internal routing)
196    // ---------------------------------------------------------------------
197
198    /// Execute one scalar load/delete query and return materialized response rows.
199    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
200    where
201        E: EntityKind<Canister = C> + EntityValue,
202    {
203        let plan = query.plan()?.into_executable();
204
205        let result = match query.mode() {
206            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
207            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
208        };
209
210        result.map_err(QueryError::Execute)
211    }
212
213    // Shared load-query terminal wrapper: build plan, run under metrics, map
214    // execution errors into query-facing errors.
215    pub(crate) fn execute_load_query_with<E, T>(
216        &self,
217        query: &Query<E>,
218        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
219    ) -> Result<T, QueryError>
220    where
221        E: EntityKind<Canister = C> + EntityValue,
222    {
223        let plan = query.plan()?.into_executable();
224
225        self.with_metrics(|| op(self.load_executor::<E>(), plan))
226            .map_err(QueryError::Execute)
227    }
228
229    /// Execute one scalar paged load query and return optional continuation cursor plus trace.
230    pub(crate) fn execute_load_query_paged_with_trace<E>(
231        &self,
232        query: &Query<E>,
233        cursor_token: Option<&str>,
234    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
235    where
236        E: EntityKind<Canister = C> + EntityValue,
237    {
238        // Phase 1: build/validate executable plan and reject grouped plans.
239        let plan = query.plan()?.into_executable();
240        if plan.as_inner().grouped_plan().is_some() {
241            return Err(QueryError::Execute(invariant(
242                "grouped plans require execute_grouped(...)",
243            )));
244        }
245
246        // Phase 2: decode external cursor token and validate it against plan surface.
247        let cursor_bytes = match cursor_token {
248            Some(token) => Some(decode_cursor(token).map_err(|reason| {
249                QueryError::from(PlanError::from(
250                    CursorPlanError::invalid_continuation_cursor(reason),
251                ))
252            })?),
253            None => None,
254        };
255        let cursor = plan
256            .prepare_cursor(cursor_bytes.as_deref())
257            .map_err(map_executor_plan_error)?;
258
259        // Phase 3: execute one traced page and encode outbound continuation token.
260        let (page, trace) = self
261            .with_metrics(|| {
262                self.load_executor::<E>()
263                    .execute_paged_with_cursor_traced(plan, cursor)
264            })
265            .map_err(QueryError::Execute)?;
266        let next_cursor = page
267            .next_cursor
268            .map(|token| {
269                let Some(token) = token.as_scalar() else {
270                    return Err(QueryError::Execute(invariant(
271                        "scalar load pagination emitted grouped continuation token",
272                    )));
273                };
274
275                token.encode().map_err(|err| {
276                    QueryError::Execute(InternalError::serialize_internal(format!(
277                        "failed to serialize continuation cursor: {err}"
278                    )))
279                })
280            })
281            .transpose()?;
282
283        Ok(PagedLoadExecutionWithTrace::new(
284            page.items,
285            next_cursor,
286            trace,
287        ))
288    }
289
290    /// Execute one grouped query page with optional grouped continuation cursor.
291    ///
292    /// This is the explicit grouped execution boundary; scalar load APIs reject
293    /// grouped plans to preserve scalar response contracts.
294    pub fn execute_grouped<E>(
295        &self,
296        query: &Query<E>,
297        cursor_token: Option<&str>,
298    ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
299    where
300        E: EntityKind<Canister = C> + EntityValue,
301    {
302        // Phase 1: build/validate executable plan and require grouped shape.
303        let plan = query.plan()?.into_executable();
304        if plan.as_inner().grouped_plan().is_none() {
305            return Err(QueryError::Execute(invariant(
306                "execute_grouped requires grouped logical plans",
307            )));
308        }
309
310        // Phase 2: decode external grouped cursor token and validate against plan.
311        let cursor_bytes = match cursor_token {
312            Some(token) => Some(decode_cursor(token).map_err(|reason| {
313                QueryError::from(PlanError::from(
314                    CursorPlanError::invalid_continuation_cursor(reason),
315                ))
316            })?),
317            None => None,
318        };
319        let cursor = plan
320            .prepare_grouped_cursor(cursor_bytes.as_deref())
321            .map_err(map_executor_plan_error)?;
322
323        // Phase 3: execute grouped page and encode outbound grouped continuation token.
324        let (page, trace) = self
325            .with_metrics(|| {
326                self.load_executor::<E>()
327                    .execute_grouped_paged_with_cursor_traced(plan, cursor)
328            })
329            .map_err(QueryError::Execute)?;
330        let next_cursor = page
331            .next_cursor
332            .map(|token| {
333                let Some(token) = token.as_grouped() else {
334                    return Err(QueryError::Execute(invariant(
335                        "grouped pagination emitted scalar continuation token",
336                    )));
337                };
338
339                token.encode().map_err(|err| {
340                    QueryError::Execute(InternalError::serialize_internal(format!(
341                        "failed to serialize grouped continuation cursor: {err}"
342                    )))
343                })
344            })
345            .transpose()?;
346
347        Ok(PagedGroupedExecutionWithTrace::new(
348            page.rows,
349            next_cursor,
350            trace,
351        ))
352    }
353
354    // ---------------------------------------------------------------------
355    // High-level write API (public, intent-level)
356    // ---------------------------------------------------------------------
357
358    /// Insert one entity row.
359    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
360    where
361        E: EntityKind<Canister = C> + EntityValue,
362    {
363        self.execute_save_entity(|save| save.insert(entity))
364    }
365
366    /// Insert a single-entity-type batch atomically in one commit window.
367    ///
368    /// If any item fails pre-commit validation, no row in the batch is persisted.
369    ///
370    /// This API is not a multi-entity transaction surface.
371    pub fn insert_many_atomic<E>(
372        &self,
373        entities: impl IntoIterator<Item = E>,
374    ) -> Result<WriteBatchResponse<E>, InternalError>
375    where
376        E: EntityKind<Canister = C> + EntityValue,
377    {
378        self.execute_save_batch(|save| save.insert_many_atomic(entities))
379    }
380
381    /// Insert a batch with explicitly non-atomic semantics.
382    ///
383    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
384    pub fn insert_many_non_atomic<E>(
385        &self,
386        entities: impl IntoIterator<Item = E>,
387    ) -> Result<WriteBatchResponse<E>, InternalError>
388    where
389        E: EntityKind<Canister = C> + EntityValue,
390    {
391        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
392    }
393
394    /// Replace one existing entity row.
395    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
396    where
397        E: EntityKind<Canister = C> + EntityValue,
398    {
399        self.execute_save_entity(|save| save.replace(entity))
400    }
401
402    /// Replace a single-entity-type batch atomically in one commit window.
403    ///
404    /// If any item fails pre-commit validation, no row in the batch is persisted.
405    ///
406    /// This API is not a multi-entity transaction surface.
407    pub fn replace_many_atomic<E>(
408        &self,
409        entities: impl IntoIterator<Item = E>,
410    ) -> Result<WriteBatchResponse<E>, InternalError>
411    where
412        E: EntityKind<Canister = C> + EntityValue,
413    {
414        self.execute_save_batch(|save| save.replace_many_atomic(entities))
415    }
416
417    /// Replace a batch with explicitly non-atomic semantics.
418    ///
419    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
420    pub fn replace_many_non_atomic<E>(
421        &self,
422        entities: impl IntoIterator<Item = E>,
423    ) -> Result<WriteBatchResponse<E>, InternalError>
424    where
425        E: EntityKind<Canister = C> + EntityValue,
426    {
427        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
428    }
429
430    /// Update one existing entity row.
431    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
432    where
433        E: EntityKind<Canister = C> + EntityValue,
434    {
435        self.execute_save_entity(|save| save.update(entity))
436    }
437
438    /// Update a single-entity-type batch atomically in one commit window.
439    ///
440    /// If any item fails pre-commit validation, no row in the batch is persisted.
441    ///
442    /// This API is not a multi-entity transaction surface.
443    pub fn update_many_atomic<E>(
444        &self,
445        entities: impl IntoIterator<Item = E>,
446    ) -> Result<WriteBatchResponse<E>, InternalError>
447    where
448        E: EntityKind<Canister = C> + EntityValue,
449    {
450        self.execute_save_batch(|save| save.update_many_atomic(entities))
451    }
452
453    /// Update a batch with explicitly non-atomic semantics.
454    ///
455    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
456    pub fn update_many_non_atomic<E>(
457        &self,
458        entities: impl IntoIterator<Item = E>,
459    ) -> Result<WriteBatchResponse<E>, InternalError>
460    where
461        E: EntityKind<Canister = C> + EntityValue,
462    {
463        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
464    }
465
466    /// Insert one view value and return the stored view.
467    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
468    where
469        E: EntityKind<Canister = C> + EntityValue,
470    {
471        self.execute_save_view::<E>(|save| save.insert_view(view))
472    }
473
474    /// Replace one view value and return the stored view.
475    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
476    where
477        E: EntityKind<Canister = C> + EntityValue,
478    {
479        self.execute_save_view::<E>(|save| save.replace_view(view))
480    }
481
482    /// Update one view value and return the stored view.
483    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
484    where
485        E: EntityKind<Canister = C> + EntityValue,
486    {
487        self.execute_save_view::<E>(|save| save.update_view(view))
488    }
489
490    /// TEST ONLY: clear all registered data and index stores for this database.
491    #[cfg(test)]
492    #[doc(hidden)]
493    pub fn clear_stores_for_tests(&self) {
494        self.db.with_store_registry(|reg| {
495            // Test cleanup only: clearing all stores is set-like and does not
496            // depend on registry iteration order.
497            for (_, store) in reg.iter() {
498                store.with_data_mut(DataStore::clear);
499                store.with_index_mut(IndexStore::clear);
500            }
501        });
502    }
503}
504
505fn invariant(message: impl Into<String>) -> InternalError {
506    InternalError::query_executor_invariant(message)
507}