Skip to main content

icydb_core/db/
session.rs

1// 3️⃣ Internal imports (implementation wiring)
2#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5    db::{
6        Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7        QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9        query::{intent::QueryMode, plan::CursorPlanError},
10    },
11    error::InternalError,
12    obs::sink::{MetricsSink, with_metrics_sink},
13    traits::{CanisterKind, EntityKind, EntityValue},
14    types::Id,
15};
16
17///
18/// DbSession
19///
20/// Session-scoped database handle with policy (debug, metrics) and execution routing.
21///
22
23pub struct DbSession<C: CanisterKind> {
24    db: Db<C>,
25    debug: bool,
26    metrics: Option<&'static dyn MetricsSink>,
27}
28
29impl<C: CanisterKind> DbSession<C> {
30    #[must_use]
31    pub const fn new(db: Db<C>) -> Self {
32        Self {
33            db,
34            debug: false,
35            metrics: None,
36        }
37    }
38
39    #[must_use]
40    pub const fn debug(mut self) -> Self {
41        self.debug = true;
42        self
43    }
44
45    #[must_use]
46    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
47        self.metrics = Some(sink);
48        self
49    }
50
51    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
52        if let Some(sink) = self.metrics {
53            with_metrics_sink(sink, f)
54        } else {
55            f()
56        }
57    }
58
59    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
60    fn execute_save_with<E, T, R>(
61        &self,
62        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
63        map: impl FnOnce(T) -> R,
64    ) -> Result<R, InternalError>
65    where
66        E: EntityKind<Canister = C> + EntityValue,
67    {
68        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
69
70        Ok(map(value))
71    }
72
73    // Shared save-facade wrappers keep response shape explicit at call sites.
74    fn execute_save_entity<E>(
75        &self,
76        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
77    ) -> Result<WriteResponse<E>, InternalError>
78    where
79        E: EntityKind<Canister = C> + EntityValue,
80    {
81        self.execute_save_with(op, WriteResponse::new)
82    }
83
84    fn execute_save_batch<E>(
85        &self,
86        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
87    ) -> Result<WriteBatchResponse<E>, InternalError>
88    where
89        E: EntityKind<Canister = C> + EntityValue,
90    {
91        self.execute_save_with(op, WriteBatchResponse::new)
92    }
93
94    fn execute_save_view<E>(
95        &self,
96        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
97    ) -> Result<E::ViewType, InternalError>
98    where
99        E: EntityKind<Canister = C> + EntityValue,
100    {
101        self.execute_save_with(op, std::convert::identity)
102    }
103
104    // ---------------------------------------------------------------------
105    // Query entry points (public, fluent)
106    // ---------------------------------------------------------------------
107
108    #[must_use]
109    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
110    where
111        E: EntityKind<Canister = C>,
112    {
113        FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
114    }
115
116    #[must_use]
117    pub const fn load_with_consistency<E>(
118        &self,
119        consistency: ReadConsistency,
120    ) -> FluentLoadQuery<'_, E>
121    where
122        E: EntityKind<Canister = C>,
123    {
124        FluentLoadQuery::new(self, Query::new(consistency))
125    }
126
127    #[must_use]
128    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
129    where
130        E: EntityKind<Canister = C>,
131    {
132        FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
133    }
134
135    #[must_use]
136    pub fn delete_with_consistency<E>(
137        &self,
138        consistency: ReadConsistency,
139    ) -> FluentDeleteQuery<'_, E>
140    where
141        E: EntityKind<Canister = C>,
142    {
143        FluentDeleteQuery::new(self, Query::new(consistency).delete())
144    }
145
146    // ---------------------------------------------------------------------
147    // Low-level executors (crate-internal; execution primitives)
148    // ---------------------------------------------------------------------
149
150    #[must_use]
151    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
152    where
153        E: EntityKind<Canister = C> + EntityValue,
154    {
155        LoadExecutor::new(self.db, self.debug)
156    }
157
158    #[must_use]
159    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
160    where
161        E: EntityKind<Canister = C> + EntityValue,
162    {
163        DeleteExecutor::new(self.db, self.debug)
164    }
165
166    #[must_use]
167    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
168    where
169        E: EntityKind<Canister = C> + EntityValue,
170    {
171        SaveExecutor::new(self.db, self.debug)
172    }
173
174    // ---------------------------------------------------------------------
175    // Query diagnostics / execution (internal routing)
176    // ---------------------------------------------------------------------
177
178    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
179    where
180        E: EntityKind<Canister = C> + EntityValue,
181    {
182        let plan = query.plan()?;
183
184        let result = match query.mode() {
185            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
186            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
187        };
188
189        result.map_err(QueryError::Execute)
190    }
191
192    pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
193    where
194        E: EntityKind<Canister = C> + EntityValue,
195    {
196        let plan = query.plan()?;
197
198        self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
199            .map_err(QueryError::Execute)
200    }
201
202    pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
203    where
204        E: EntityKind<Canister = C> + EntityValue,
205    {
206        let plan = query.plan()?;
207
208        self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
209            .map_err(QueryError::Execute)
210    }
211
212    pub(crate) fn execute_load_query_min<E>(
213        &self,
214        query: &Query<E>,
215    ) -> Result<Option<Id<E>>, QueryError>
216    where
217        E: EntityKind<Canister = C> + EntityValue,
218    {
219        let plan = query.plan()?;
220
221        self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
222            .map_err(QueryError::Execute)
223    }
224
225    pub(crate) fn execute_load_query_max<E>(
226        &self,
227        query: &Query<E>,
228    ) -> Result<Option<Id<E>>, QueryError>
229    where
230        E: EntityKind<Canister = C> + EntityValue,
231    {
232        let plan = query.plan()?;
233
234        self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
235            .map_err(QueryError::Execute)
236    }
237
238    pub(crate) fn execute_load_query_first<E>(
239        &self,
240        query: &Query<E>,
241    ) -> Result<Option<Id<E>>, QueryError>
242    where
243        E: EntityKind<Canister = C> + EntityValue,
244    {
245        let plan = query.plan()?;
246
247        self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
248            .map_err(QueryError::Execute)
249    }
250
251    pub(crate) fn execute_load_query_last<E>(
252        &self,
253        query: &Query<E>,
254    ) -> Result<Option<Id<E>>, QueryError>
255    where
256        E: EntityKind<Canister = C> + EntityValue,
257    {
258        let plan = query.plan()?;
259
260        self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
261            .map_err(QueryError::Execute)
262    }
263
264    pub(crate) fn execute_load_query_paged_with_trace<E>(
265        &self,
266        query: &Query<E>,
267        cursor_token: Option<&str>,
268    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
269    where
270        E: EntityKind<Canister = C> + EntityValue,
271    {
272        let plan = query.plan()?;
273        let cursor_bytes = match cursor_token {
274            Some(token) => Some(decode_cursor(token).map_err(|reason| {
275                QueryError::from(PlanError::from(
276                    CursorPlanError::InvalidContinuationCursor { reason },
277                ))
278            })?),
279            None => None,
280        };
281        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
282
283        let (page, trace) = self
284            .with_metrics(|| {
285                self.load_executor::<E>()
286                    .execute_paged_with_cursor_traced(plan, cursor)
287            })
288            .map_err(QueryError::Execute)?;
289
290        Ok((page.items, page.next_cursor, trace))
291    }
292
293    // ---------------------------------------------------------------------
294    // High-level write API (public, intent-level)
295    // ---------------------------------------------------------------------
296
297    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
298    where
299        E: EntityKind<Canister = C> + EntityValue,
300    {
301        self.execute_save_entity(|save| save.insert(entity))
302    }
303
304    /// Insert a single-entity-type batch atomically in one commit window.
305    ///
306    /// If any item fails pre-commit validation, no row in the batch is persisted.
307    ///
308    /// This API is not a multi-entity transaction surface.
309    pub fn insert_many_atomic<E>(
310        &self,
311        entities: impl IntoIterator<Item = E>,
312    ) -> Result<WriteBatchResponse<E>, InternalError>
313    where
314        E: EntityKind<Canister = C> + EntityValue,
315    {
316        self.execute_save_batch(|save| save.insert_many_atomic(entities))
317    }
318
319    /// Insert a batch with explicitly non-atomic semantics.
320    ///
321    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
322    pub fn insert_many_non_atomic<E>(
323        &self,
324        entities: impl IntoIterator<Item = E>,
325    ) -> Result<WriteBatchResponse<E>, InternalError>
326    where
327        E: EntityKind<Canister = C> + EntityValue,
328    {
329        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
330    }
331
332    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
333    where
334        E: EntityKind<Canister = C> + EntityValue,
335    {
336        self.execute_save_entity(|save| save.replace(entity))
337    }
338
339    /// Replace a single-entity-type batch atomically in one commit window.
340    ///
341    /// If any item fails pre-commit validation, no row in the batch is persisted.
342    ///
343    /// This API is not a multi-entity transaction surface.
344    pub fn replace_many_atomic<E>(
345        &self,
346        entities: impl IntoIterator<Item = E>,
347    ) -> Result<WriteBatchResponse<E>, InternalError>
348    where
349        E: EntityKind<Canister = C> + EntityValue,
350    {
351        self.execute_save_batch(|save| save.replace_many_atomic(entities))
352    }
353
354    /// Replace a batch with explicitly non-atomic semantics.
355    ///
356    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
357    pub fn replace_many_non_atomic<E>(
358        &self,
359        entities: impl IntoIterator<Item = E>,
360    ) -> Result<WriteBatchResponse<E>, InternalError>
361    where
362        E: EntityKind<Canister = C> + EntityValue,
363    {
364        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
365    }
366
367    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
368    where
369        E: EntityKind<Canister = C> + EntityValue,
370    {
371        self.execute_save_entity(|save| save.update(entity))
372    }
373
374    /// Update a single-entity-type batch atomically in one commit window.
375    ///
376    /// If any item fails pre-commit validation, no row in the batch is persisted.
377    ///
378    /// This API is not a multi-entity transaction surface.
379    pub fn update_many_atomic<E>(
380        &self,
381        entities: impl IntoIterator<Item = E>,
382    ) -> Result<WriteBatchResponse<E>, InternalError>
383    where
384        E: EntityKind<Canister = C> + EntityValue,
385    {
386        self.execute_save_batch(|save| save.update_many_atomic(entities))
387    }
388
389    /// Update a batch with explicitly non-atomic semantics.
390    ///
391    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
392    pub fn update_many_non_atomic<E>(
393        &self,
394        entities: impl IntoIterator<Item = E>,
395    ) -> Result<WriteBatchResponse<E>, InternalError>
396    where
397        E: EntityKind<Canister = C> + EntityValue,
398    {
399        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
400    }
401
402    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
403    where
404        E: EntityKind<Canister = C> + EntityValue,
405    {
406        self.execute_save_view::<E>(|save| save.insert_view(view))
407    }
408
409    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
410    where
411        E: EntityKind<Canister = C> + EntityValue,
412    {
413        self.execute_save_view::<E>(|save| save.replace_view(view))
414    }
415
416    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
417    where
418        E: EntityKind<Canister = C> + EntityValue,
419    {
420        self.execute_save_view::<E>(|save| save.update_view(view))
421    }
422
423    /// TEST ONLY: clear all registered data and index stores for this database.
424    #[cfg(test)]
425    #[doc(hidden)]
426    pub fn clear_stores_for_tests(&self) {
427        self.db.with_store_registry(|reg| {
428            for (_, store) in reg.iter() {
429                store.with_data_mut(DataStore::clear);
430                store.with_index_mut(IndexStore::clear);
431            }
432        });
433    }
434}