Skip to main content

icydb_core/db/
session.rs

1// 3️⃣ Internal imports (implementation wiring)
2#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5    db::{
6        Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7        QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9        query::{intent::QueryMode, plan::CursorPlanError},
10    },
11    error::InternalError,
12    obs::sink::{MetricsSink, with_metrics_sink},
13    traits::{CanisterKind, EntityKind, EntityValue},
14    types::{Decimal, Id},
15    value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20///
21/// DbSession
22///
23/// Session-scoped database handle with policy (debug, metrics) and execution routing.
24///
25
26pub struct DbSession<C: CanisterKind> {
27    db: Db<C>,
28    debug: bool,
29    metrics: Option<&'static dyn MetricsSink>,
30}
31
32impl<C: CanisterKind> DbSession<C> {
33    #[must_use]
34    pub const fn new(db: Db<C>) -> Self {
35        Self {
36            db,
37            debug: false,
38            metrics: None,
39        }
40    }
41
42    #[must_use]
43    pub const fn debug(mut self) -> Self {
44        self.debug = true;
45        self
46    }
47
48    #[must_use]
49    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
50        self.metrics = Some(sink);
51        self
52    }
53
54    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
55        if let Some(sink) = self.metrics {
56            with_metrics_sink(sink, f)
57        } else {
58            f()
59        }
60    }
61
62    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
63    fn execute_save_with<E, T, R>(
64        &self,
65        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
66        map: impl FnOnce(T) -> R,
67    ) -> Result<R, InternalError>
68    where
69        E: EntityKind<Canister = C> + EntityValue,
70    {
71        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
72
73        Ok(map(value))
74    }
75
76    // Shared save-facade wrappers keep response shape explicit at call sites.
77    fn execute_save_entity<E>(
78        &self,
79        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
80    ) -> Result<WriteResponse<E>, InternalError>
81    where
82        E: EntityKind<Canister = C> + EntityValue,
83    {
84        self.execute_save_with(op, WriteResponse::new)
85    }
86
87    fn execute_save_batch<E>(
88        &self,
89        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
90    ) -> Result<WriteBatchResponse<E>, InternalError>
91    where
92        E: EntityKind<Canister = C> + EntityValue,
93    {
94        self.execute_save_with(op, WriteBatchResponse::new)
95    }
96
97    fn execute_save_view<E>(
98        &self,
99        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
100    ) -> Result<E::ViewType, InternalError>
101    where
102        E: EntityKind<Canister = C> + EntityValue,
103    {
104        self.execute_save_with(op, std::convert::identity)
105    }
106
107    // ---------------------------------------------------------------------
108    // Query entry points (public, fluent)
109    // ---------------------------------------------------------------------
110
111    #[must_use]
112    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
113    where
114        E: EntityKind<Canister = C>,
115    {
116        FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
117    }
118
119    #[must_use]
120    pub const fn load_with_consistency<E>(
121        &self,
122        consistency: ReadConsistency,
123    ) -> FluentLoadQuery<'_, E>
124    where
125        E: EntityKind<Canister = C>,
126    {
127        FluentLoadQuery::new(self, Query::new(consistency))
128    }
129
130    #[must_use]
131    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
132    where
133        E: EntityKind<Canister = C>,
134    {
135        FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
136    }
137
138    #[must_use]
139    pub fn delete_with_consistency<E>(
140        &self,
141        consistency: ReadConsistency,
142    ) -> FluentDeleteQuery<'_, E>
143    where
144        E: EntityKind<Canister = C>,
145    {
146        FluentDeleteQuery::new(self, Query::new(consistency).delete())
147    }
148
149    // ---------------------------------------------------------------------
150    // Low-level executors (crate-internal; execution primitives)
151    // ---------------------------------------------------------------------
152
153    #[must_use]
154    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
155    where
156        E: EntityKind<Canister = C> + EntityValue,
157    {
158        LoadExecutor::new(self.db, self.debug)
159    }
160
161    #[must_use]
162    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
163    where
164        E: EntityKind<Canister = C> + EntityValue,
165    {
166        DeleteExecutor::new(self.db, self.debug)
167    }
168
169    #[must_use]
170    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
171    where
172        E: EntityKind<Canister = C> + EntityValue,
173    {
174        SaveExecutor::new(self.db, self.debug)
175    }
176
177    // ---------------------------------------------------------------------
178    // Query diagnostics / execution (internal routing)
179    // ---------------------------------------------------------------------
180
181    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
182    where
183        E: EntityKind<Canister = C> + EntityValue,
184    {
185        let plan = query.plan()?;
186
187        let result = match query.mode() {
188            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
189            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
190        };
191
192        result.map_err(QueryError::Execute)
193    }
194
195    pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
196    where
197        E: EntityKind<Canister = C> + EntityValue,
198    {
199        let plan = query.plan()?;
200
201        self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
202            .map_err(QueryError::Execute)
203    }
204
205    pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
206    where
207        E: EntityKind<Canister = C> + EntityValue,
208    {
209        let plan = query.plan()?;
210
211        self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
212            .map_err(QueryError::Execute)
213    }
214
215    pub(crate) fn execute_load_query_min<E>(
216        &self,
217        query: &Query<E>,
218    ) -> Result<Option<Id<E>>, QueryError>
219    where
220        E: EntityKind<Canister = C> + EntityValue,
221    {
222        let plan = query.plan()?;
223
224        self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
225            .map_err(QueryError::Execute)
226    }
227
228    pub(crate) fn execute_load_query_max<E>(
229        &self,
230        query: &Query<E>,
231    ) -> Result<Option<Id<E>>, QueryError>
232    where
233        E: EntityKind<Canister = C> + EntityValue,
234    {
235        let plan = query.plan()?;
236
237        self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
238            .map_err(QueryError::Execute)
239    }
240
241    pub(crate) fn execute_load_query_min_by<E>(
242        &self,
243        query: &Query<E>,
244        target_field: &str,
245    ) -> Result<Option<Id<E>>, QueryError>
246    where
247        E: EntityKind<Canister = C> + EntityValue,
248    {
249        let plan = query.plan()?;
250
251        self.with_metrics(|| {
252            self.load_executor::<E>()
253                .aggregate_min_by(plan, target_field)
254        })
255        .map_err(QueryError::Execute)
256    }
257
258    pub(crate) fn execute_load_query_max_by<E>(
259        &self,
260        query: &Query<E>,
261        target_field: &str,
262    ) -> Result<Option<Id<E>>, QueryError>
263    where
264        E: EntityKind<Canister = C> + EntityValue,
265    {
266        let plan = query.plan()?;
267
268        self.with_metrics(|| {
269            self.load_executor::<E>()
270                .aggregate_max_by(plan, target_field)
271        })
272        .map_err(QueryError::Execute)
273    }
274
275    pub(crate) fn execute_load_query_nth_by<E>(
276        &self,
277        query: &Query<E>,
278        target_field: &str,
279        nth: usize,
280    ) -> Result<Option<Id<E>>, QueryError>
281    where
282        E: EntityKind<Canister = C> + EntityValue,
283    {
284        let plan = query.plan()?;
285
286        self.with_metrics(|| {
287            self.load_executor::<E>()
288                .aggregate_nth_by(plan, target_field, nth)
289        })
290        .map_err(QueryError::Execute)
291    }
292
293    pub(crate) fn execute_load_query_sum_by<E>(
294        &self,
295        query: &Query<E>,
296        target_field: &str,
297    ) -> Result<Option<Decimal>, QueryError>
298    where
299        E: EntityKind<Canister = C> + EntityValue,
300    {
301        let plan = query.plan()?;
302
303        self.with_metrics(|| {
304            self.load_executor::<E>()
305                .aggregate_sum_by(plan, target_field)
306        })
307        .map_err(QueryError::Execute)
308    }
309
310    pub(crate) fn execute_load_query_avg_by<E>(
311        &self,
312        query: &Query<E>,
313        target_field: &str,
314    ) -> Result<Option<Decimal>, QueryError>
315    where
316        E: EntityKind<Canister = C> + EntityValue,
317    {
318        let plan = query.plan()?;
319
320        self.with_metrics(|| {
321            self.load_executor::<E>()
322                .aggregate_avg_by(plan, target_field)
323        })
324        .map_err(QueryError::Execute)
325    }
326
327    pub(crate) fn execute_load_query_median_by<E>(
328        &self,
329        query: &Query<E>,
330        target_field: &str,
331    ) -> Result<Option<Id<E>>, QueryError>
332    where
333        E: EntityKind<Canister = C> + EntityValue,
334    {
335        let plan = query.plan()?;
336
337        self.with_metrics(|| {
338            self.load_executor::<E>()
339                .aggregate_median_by(plan, target_field)
340        })
341        .map_err(QueryError::Execute)
342    }
343
344    pub(crate) fn execute_load_query_count_distinct_by<E>(
345        &self,
346        query: &Query<E>,
347        target_field: &str,
348    ) -> Result<u32, QueryError>
349    where
350        E: EntityKind<Canister = C> + EntityValue,
351    {
352        let plan = query.plan()?;
353
354        self.with_metrics(|| {
355            self.load_executor::<E>()
356                .aggregate_count_distinct_by(plan, target_field)
357        })
358        .map_err(QueryError::Execute)
359    }
360
361    pub(crate) fn execute_load_query_min_max_by<E>(
362        &self,
363        query: &Query<E>,
364        target_field: &str,
365    ) -> Result<MinMaxByIds<E>, QueryError>
366    where
367        E: EntityKind<Canister = C> + EntityValue,
368    {
369        let plan = query.plan()?;
370
371        self.with_metrics(|| {
372            self.load_executor::<E>()
373                .aggregate_min_max_by(plan, target_field)
374        })
375        .map_err(QueryError::Execute)
376    }
377
378    pub(crate) fn execute_load_query_values_by<E>(
379        &self,
380        query: &Query<E>,
381        target_field: &str,
382    ) -> Result<Vec<Value>, QueryError>
383    where
384        E: EntityKind<Canister = C> + EntityValue,
385    {
386        let plan = query.plan()?;
387
388        self.with_metrics(|| self.load_executor::<E>().values_by(plan, target_field))
389            .map_err(QueryError::Execute)
390    }
391
392    pub(crate) fn execute_load_query_first<E>(
393        &self,
394        query: &Query<E>,
395    ) -> Result<Option<Id<E>>, QueryError>
396    where
397        E: EntityKind<Canister = C> + EntityValue,
398    {
399        let plan = query.plan()?;
400
401        self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
402            .map_err(QueryError::Execute)
403    }
404
405    pub(crate) fn execute_load_query_last<E>(
406        &self,
407        query: &Query<E>,
408    ) -> Result<Option<Id<E>>, QueryError>
409    where
410        E: EntityKind<Canister = C> + EntityValue,
411    {
412        let plan = query.plan()?;
413
414        self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
415            .map_err(QueryError::Execute)
416    }
417
418    pub(crate) fn execute_load_query_paged_with_trace<E>(
419        &self,
420        query: &Query<E>,
421        cursor_token: Option<&str>,
422    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
423    where
424        E: EntityKind<Canister = C> + EntityValue,
425    {
426        let plan = query.plan()?;
427        let cursor_bytes = match cursor_token {
428            Some(token) => Some(decode_cursor(token).map_err(|reason| {
429                QueryError::from(PlanError::from(
430                    CursorPlanError::InvalidContinuationCursor { reason },
431                ))
432            })?),
433            None => None,
434        };
435        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
436
437        let (page, trace) = self
438            .with_metrics(|| {
439                self.load_executor::<E>()
440                    .execute_paged_with_cursor_traced(plan, cursor)
441            })
442            .map_err(QueryError::Execute)?;
443
444        Ok(PagedLoadExecutionWithTrace::new(
445            page.items,
446            page.next_cursor,
447            trace,
448        ))
449    }
450
451    // ---------------------------------------------------------------------
452    // High-level write API (public, intent-level)
453    // ---------------------------------------------------------------------
454
455    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
456    where
457        E: EntityKind<Canister = C> + EntityValue,
458    {
459        self.execute_save_entity(|save| save.insert(entity))
460    }
461
462    /// Insert a single-entity-type batch atomically in one commit window.
463    ///
464    /// If any item fails pre-commit validation, no row in the batch is persisted.
465    ///
466    /// This API is not a multi-entity transaction surface.
467    pub fn insert_many_atomic<E>(
468        &self,
469        entities: impl IntoIterator<Item = E>,
470    ) -> Result<WriteBatchResponse<E>, InternalError>
471    where
472        E: EntityKind<Canister = C> + EntityValue,
473    {
474        self.execute_save_batch(|save| save.insert_many_atomic(entities))
475    }
476
477    /// Insert a batch with explicitly non-atomic semantics.
478    ///
479    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
480    pub fn insert_many_non_atomic<E>(
481        &self,
482        entities: impl IntoIterator<Item = E>,
483    ) -> Result<WriteBatchResponse<E>, InternalError>
484    where
485        E: EntityKind<Canister = C> + EntityValue,
486    {
487        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
488    }
489
490    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
491    where
492        E: EntityKind<Canister = C> + EntityValue,
493    {
494        self.execute_save_entity(|save| save.replace(entity))
495    }
496
497    /// Replace a single-entity-type batch atomically in one commit window.
498    ///
499    /// If any item fails pre-commit validation, no row in the batch is persisted.
500    ///
501    /// This API is not a multi-entity transaction surface.
502    pub fn replace_many_atomic<E>(
503        &self,
504        entities: impl IntoIterator<Item = E>,
505    ) -> Result<WriteBatchResponse<E>, InternalError>
506    where
507        E: EntityKind<Canister = C> + EntityValue,
508    {
509        self.execute_save_batch(|save| save.replace_many_atomic(entities))
510    }
511
512    /// Replace a batch with explicitly non-atomic semantics.
513    ///
514    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
515    pub fn replace_many_non_atomic<E>(
516        &self,
517        entities: impl IntoIterator<Item = E>,
518    ) -> Result<WriteBatchResponse<E>, InternalError>
519    where
520        E: EntityKind<Canister = C> + EntityValue,
521    {
522        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
523    }
524
525    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
526    where
527        E: EntityKind<Canister = C> + EntityValue,
528    {
529        self.execute_save_entity(|save| save.update(entity))
530    }
531
532    /// Update a single-entity-type batch atomically in one commit window.
533    ///
534    /// If any item fails pre-commit validation, no row in the batch is persisted.
535    ///
536    /// This API is not a multi-entity transaction surface.
537    pub fn update_many_atomic<E>(
538        &self,
539        entities: impl IntoIterator<Item = E>,
540    ) -> Result<WriteBatchResponse<E>, InternalError>
541    where
542        E: EntityKind<Canister = C> + EntityValue,
543    {
544        self.execute_save_batch(|save| save.update_many_atomic(entities))
545    }
546
547    /// Update a batch with explicitly non-atomic semantics.
548    ///
549    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
550    pub fn update_many_non_atomic<E>(
551        &self,
552        entities: impl IntoIterator<Item = E>,
553    ) -> Result<WriteBatchResponse<E>, InternalError>
554    where
555        E: EntityKind<Canister = C> + EntityValue,
556    {
557        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
558    }
559
560    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
561    where
562        E: EntityKind<Canister = C> + EntityValue,
563    {
564        self.execute_save_view::<E>(|save| save.insert_view(view))
565    }
566
567    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
568    where
569        E: EntityKind<Canister = C> + EntityValue,
570    {
571        self.execute_save_view::<E>(|save| save.replace_view(view))
572    }
573
574    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
575    where
576        E: EntityKind<Canister = C> + EntityValue,
577    {
578        self.execute_save_view::<E>(|save| save.update_view(view))
579    }
580
581    /// TEST ONLY: clear all registered data and index stores for this database.
582    #[cfg(test)]
583    #[doc(hidden)]
584    pub fn clear_stores_for_tests(&self) {
585        self.db.with_store_registry(|reg| {
586            for (_, store) in reg.iter() {
587                store.with_data_mut(DataStore::clear);
588                store.with_index_mut(IndexStore::clear);
589            }
590        });
591    }
592}