Skip to main content

icydb_core/db/
session.rs

1// 3️⃣ Internal imports (implementation wiring)
2#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5    db::{
6        Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7        QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9        query::{intent::QueryMode, plan::CursorPlanError},
10    },
11    error::InternalError,
12    obs::sink::{MetricsSink, with_metrics_sink},
13    traits::{CanisterKind, EntityKind, EntityValue},
14    types::{Decimal, Id},
15    value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20///
21/// DbSession
22///
23/// Session-scoped database handle with policy (debug, metrics) and execution routing.
24///
25
26pub struct DbSession<C: CanisterKind> {
27    db: Db<C>,
28    debug: bool,
29    metrics: Option<&'static dyn MetricsSink>,
30}
31
32impl<C: CanisterKind> DbSession<C> {
33    #[must_use]
34    pub const fn new(db: Db<C>) -> Self {
35        Self {
36            db,
37            debug: false,
38            metrics: None,
39        }
40    }
41
42    #[must_use]
43    pub const fn debug(mut self) -> Self {
44        self.debug = true;
45        self
46    }
47
48    #[must_use]
49    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
50        self.metrics = Some(sink);
51        self
52    }
53
54    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
55        if let Some(sink) = self.metrics {
56            with_metrics_sink(sink, f)
57        } else {
58            f()
59        }
60    }
61
62    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
63    fn execute_save_with<E, T, R>(
64        &self,
65        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
66        map: impl FnOnce(T) -> R,
67    ) -> Result<R, InternalError>
68    where
69        E: EntityKind<Canister = C> + EntityValue,
70    {
71        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
72
73        Ok(map(value))
74    }
75
76    // Shared save-facade wrappers keep response shape explicit at call sites.
77    fn execute_save_entity<E>(
78        &self,
79        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
80    ) -> Result<WriteResponse<E>, InternalError>
81    where
82        E: EntityKind<Canister = C> + EntityValue,
83    {
84        self.execute_save_with(op, WriteResponse::new)
85    }
86
87    fn execute_save_batch<E>(
88        &self,
89        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
90    ) -> Result<WriteBatchResponse<E>, InternalError>
91    where
92        E: EntityKind<Canister = C> + EntityValue,
93    {
94        self.execute_save_with(op, WriteBatchResponse::new)
95    }
96
97    fn execute_save_view<E>(
98        &self,
99        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
100    ) -> Result<E::ViewType, InternalError>
101    where
102        E: EntityKind<Canister = C> + EntityValue,
103    {
104        self.execute_save_with(op, std::convert::identity)
105    }
106
107    // ---------------------------------------------------------------------
108    // Query entry points (public, fluent)
109    // ---------------------------------------------------------------------
110
111    #[must_use]
112    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
113    where
114        E: EntityKind<Canister = C>,
115    {
116        FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
117    }
118
119    #[must_use]
120    pub const fn load_with_consistency<E>(
121        &self,
122        consistency: ReadConsistency,
123    ) -> FluentLoadQuery<'_, E>
124    where
125        E: EntityKind<Canister = C>,
126    {
127        FluentLoadQuery::new(self, Query::new(consistency))
128    }
129
130    #[must_use]
131    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
132    where
133        E: EntityKind<Canister = C>,
134    {
135        FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
136    }
137
138    #[must_use]
139    pub fn delete_with_consistency<E>(
140        &self,
141        consistency: ReadConsistency,
142    ) -> FluentDeleteQuery<'_, E>
143    where
144        E: EntityKind<Canister = C>,
145    {
146        FluentDeleteQuery::new(self, Query::new(consistency).delete())
147    }
148
149    // ---------------------------------------------------------------------
150    // Low-level executors (crate-internal; execution primitives)
151    // ---------------------------------------------------------------------
152
153    #[must_use]
154    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
155    where
156        E: EntityKind<Canister = C> + EntityValue,
157    {
158        LoadExecutor::new(self.db, self.debug)
159    }
160
161    #[must_use]
162    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
163    where
164        E: EntityKind<Canister = C> + EntityValue,
165    {
166        DeleteExecutor::new(self.db, self.debug)
167    }
168
169    #[must_use]
170    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
171    where
172        E: EntityKind<Canister = C> + EntityValue,
173    {
174        SaveExecutor::new(self.db, self.debug)
175    }
176
177    // ---------------------------------------------------------------------
178    // Query diagnostics / execution (internal routing)
179    // ---------------------------------------------------------------------
180
181    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
182    where
183        E: EntityKind<Canister = C> + EntityValue,
184    {
185        let plan = query.plan()?;
186
187        let result = match query.mode() {
188            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
189            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
190        };
191
192        result.map_err(QueryError::Execute)
193    }
194
195    pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
196    where
197        E: EntityKind<Canister = C> + EntityValue,
198    {
199        let plan = query.plan()?;
200
201        self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
202            .map_err(QueryError::Execute)
203    }
204
205    pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
206    where
207        E: EntityKind<Canister = C> + EntityValue,
208    {
209        let plan = query.plan()?;
210
211        self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
212            .map_err(QueryError::Execute)
213    }
214
215    pub(crate) fn execute_load_query_min<E>(
216        &self,
217        query: &Query<E>,
218    ) -> Result<Option<Id<E>>, QueryError>
219    where
220        E: EntityKind<Canister = C> + EntityValue,
221    {
222        let plan = query.plan()?;
223
224        self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
225            .map_err(QueryError::Execute)
226    }
227
228    pub(crate) fn execute_load_query_max<E>(
229        &self,
230        query: &Query<E>,
231    ) -> Result<Option<Id<E>>, QueryError>
232    where
233        E: EntityKind<Canister = C> + EntityValue,
234    {
235        let plan = query.plan()?;
236
237        self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
238            .map_err(QueryError::Execute)
239    }
240
241    pub(crate) fn execute_load_query_min_by<E>(
242        &self,
243        query: &Query<E>,
244        target_field: &str,
245    ) -> Result<Option<Id<E>>, QueryError>
246    where
247        E: EntityKind<Canister = C> + EntityValue,
248    {
249        let plan = query.plan()?;
250
251        self.with_metrics(|| {
252            self.load_executor::<E>()
253                .aggregate_min_by(plan, target_field)
254        })
255        .map_err(QueryError::Execute)
256    }
257
258    pub(crate) fn execute_load_query_max_by<E>(
259        &self,
260        query: &Query<E>,
261        target_field: &str,
262    ) -> Result<Option<Id<E>>, QueryError>
263    where
264        E: EntityKind<Canister = C> + EntityValue,
265    {
266        let plan = query.plan()?;
267
268        self.with_metrics(|| {
269            self.load_executor::<E>()
270                .aggregate_max_by(plan, target_field)
271        })
272        .map_err(QueryError::Execute)
273    }
274
275    pub(crate) fn execute_load_query_nth_by<E>(
276        &self,
277        query: &Query<E>,
278        target_field: &str,
279        nth: usize,
280    ) -> Result<Option<Id<E>>, QueryError>
281    where
282        E: EntityKind<Canister = C> + EntityValue,
283    {
284        let plan = query.plan()?;
285
286        self.with_metrics(|| {
287            self.load_executor::<E>()
288                .aggregate_nth_by(plan, target_field, nth)
289        })
290        .map_err(QueryError::Execute)
291    }
292
293    pub(crate) fn execute_load_query_sum_by<E>(
294        &self,
295        query: &Query<E>,
296        target_field: &str,
297    ) -> Result<Option<Decimal>, QueryError>
298    where
299        E: EntityKind<Canister = C> + EntityValue,
300    {
301        let plan = query.plan()?;
302
303        self.with_metrics(|| {
304            self.load_executor::<E>()
305                .aggregate_sum_by(plan, target_field)
306        })
307        .map_err(QueryError::Execute)
308    }
309
310    pub(crate) fn execute_load_query_avg_by<E>(
311        &self,
312        query: &Query<E>,
313        target_field: &str,
314    ) -> Result<Option<Decimal>, QueryError>
315    where
316        E: EntityKind<Canister = C> + EntityValue,
317    {
318        let plan = query.plan()?;
319
320        self.with_metrics(|| {
321            self.load_executor::<E>()
322                .aggregate_avg_by(plan, target_field)
323        })
324        .map_err(QueryError::Execute)
325    }
326
327    pub(crate) fn execute_load_query_median_by<E>(
328        &self,
329        query: &Query<E>,
330        target_field: &str,
331    ) -> Result<Option<Id<E>>, QueryError>
332    where
333        E: EntityKind<Canister = C> + EntityValue,
334    {
335        let plan = query.plan()?;
336
337        self.with_metrics(|| {
338            self.load_executor::<E>()
339                .aggregate_median_by(plan, target_field)
340        })
341        .map_err(QueryError::Execute)
342    }
343
344    pub(crate) fn execute_load_query_count_distinct_by<E>(
345        &self,
346        query: &Query<E>,
347        target_field: &str,
348    ) -> Result<u32, QueryError>
349    where
350        E: EntityKind<Canister = C> + EntityValue,
351    {
352        let plan = query.plan()?;
353
354        self.with_metrics(|| {
355            self.load_executor::<E>()
356                .aggregate_count_distinct_by(plan, target_field)
357        })
358        .map_err(QueryError::Execute)
359    }
360
361    pub(crate) fn execute_load_query_min_max_by<E>(
362        &self,
363        query: &Query<E>,
364        target_field: &str,
365    ) -> Result<MinMaxByIds<E>, QueryError>
366    where
367        E: EntityKind<Canister = C> + EntityValue,
368    {
369        let plan = query.plan()?;
370
371        self.with_metrics(|| {
372            self.load_executor::<E>()
373                .aggregate_min_max_by(plan, target_field)
374        })
375        .map_err(QueryError::Execute)
376    }
377
378    pub(crate) fn execute_load_query_values_by<E>(
379        &self,
380        query: &Query<E>,
381        target_field: &str,
382    ) -> Result<Vec<Value>, QueryError>
383    where
384        E: EntityKind<Canister = C> + EntityValue,
385    {
386        let plan = query.plan()?;
387
388        self.with_metrics(|| self.load_executor::<E>().values_by(plan, target_field))
389            .map_err(QueryError::Execute)
390    }
391
392    pub(crate) fn execute_load_query_distinct_values_by<E>(
393        &self,
394        query: &Query<E>,
395        target_field: &str,
396    ) -> Result<Vec<Value>, QueryError>
397    where
398        E: EntityKind<Canister = C> + EntityValue,
399    {
400        let plan = query.plan()?;
401
402        self.with_metrics(|| {
403            self.load_executor::<E>()
404                .distinct_values_by(plan, target_field)
405        })
406        .map_err(QueryError::Execute)
407    }
408
409    pub(crate) fn execute_load_query_values_by_with_ids<E>(
410        &self,
411        query: &Query<E>,
412        target_field: &str,
413    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
414    where
415        E: EntityKind<Canister = C> + EntityValue,
416    {
417        let plan = query.plan()?;
418
419        self.with_metrics(|| {
420            self.load_executor::<E>()
421                .values_by_with_ids(plan, target_field)
422        })
423        .map_err(QueryError::Execute)
424    }
425
426    pub(crate) fn execute_load_query_first_value_by<E>(
427        &self,
428        query: &Query<E>,
429        target_field: &str,
430    ) -> Result<Option<Value>, QueryError>
431    where
432        E: EntityKind<Canister = C> + EntityValue,
433    {
434        let plan = query.plan()?;
435
436        self.with_metrics(|| self.load_executor::<E>().first_value_by(plan, target_field))
437            .map_err(QueryError::Execute)
438    }
439
440    pub(crate) fn execute_load_query_last_value_by<E>(
441        &self,
442        query: &Query<E>,
443        target_field: &str,
444    ) -> Result<Option<Value>, QueryError>
445    where
446        E: EntityKind<Canister = C> + EntityValue,
447    {
448        let plan = query.plan()?;
449
450        self.with_metrics(|| self.load_executor::<E>().last_value_by(plan, target_field))
451            .map_err(QueryError::Execute)
452    }
453
454    pub(crate) fn execute_load_query_first<E>(
455        &self,
456        query: &Query<E>,
457    ) -> Result<Option<Id<E>>, QueryError>
458    where
459        E: EntityKind<Canister = C> + EntityValue,
460    {
461        let plan = query.plan()?;
462
463        self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
464            .map_err(QueryError::Execute)
465    }
466
467    pub(crate) fn execute_load_query_last<E>(
468        &self,
469        query: &Query<E>,
470    ) -> Result<Option<Id<E>>, QueryError>
471    where
472        E: EntityKind<Canister = C> + EntityValue,
473    {
474        let plan = query.plan()?;
475
476        self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
477            .map_err(QueryError::Execute)
478    }
479
480    pub(crate) fn execute_load_query_paged_with_trace<E>(
481        &self,
482        query: &Query<E>,
483        cursor_token: Option<&str>,
484    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
485    where
486        E: EntityKind<Canister = C> + EntityValue,
487    {
488        let plan = query.plan()?;
489        let cursor_bytes = match cursor_token {
490            Some(token) => Some(decode_cursor(token).map_err(|reason| {
491                QueryError::from(PlanError::from(
492                    CursorPlanError::InvalidContinuationCursor { reason },
493                ))
494            })?),
495            None => None,
496        };
497        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
498
499        let (page, trace) = self
500            .with_metrics(|| {
501                self.load_executor::<E>()
502                    .execute_paged_with_cursor_traced(plan, cursor)
503            })
504            .map_err(QueryError::Execute)?;
505
506        Ok(PagedLoadExecutionWithTrace::new(
507            page.items,
508            page.next_cursor,
509            trace,
510        ))
511    }
512
513    // ---------------------------------------------------------------------
514    // High-level write API (public, intent-level)
515    // ---------------------------------------------------------------------
516
517    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
518    where
519        E: EntityKind<Canister = C> + EntityValue,
520    {
521        self.execute_save_entity(|save| save.insert(entity))
522    }
523
524    /// Insert a single-entity-type batch atomically in one commit window.
525    ///
526    /// If any item fails pre-commit validation, no row in the batch is persisted.
527    ///
528    /// This API is not a multi-entity transaction surface.
529    pub fn insert_many_atomic<E>(
530        &self,
531        entities: impl IntoIterator<Item = E>,
532    ) -> Result<WriteBatchResponse<E>, InternalError>
533    where
534        E: EntityKind<Canister = C> + EntityValue,
535    {
536        self.execute_save_batch(|save| save.insert_many_atomic(entities))
537    }
538
539    /// Insert a batch with explicitly non-atomic semantics.
540    ///
541    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
542    pub fn insert_many_non_atomic<E>(
543        &self,
544        entities: impl IntoIterator<Item = E>,
545    ) -> Result<WriteBatchResponse<E>, InternalError>
546    where
547        E: EntityKind<Canister = C> + EntityValue,
548    {
549        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
550    }
551
552    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
553    where
554        E: EntityKind<Canister = C> + EntityValue,
555    {
556        self.execute_save_entity(|save| save.replace(entity))
557    }
558
559    /// Replace a single-entity-type batch atomically in one commit window.
560    ///
561    /// If any item fails pre-commit validation, no row in the batch is persisted.
562    ///
563    /// This API is not a multi-entity transaction surface.
564    pub fn replace_many_atomic<E>(
565        &self,
566        entities: impl IntoIterator<Item = E>,
567    ) -> Result<WriteBatchResponse<E>, InternalError>
568    where
569        E: EntityKind<Canister = C> + EntityValue,
570    {
571        self.execute_save_batch(|save| save.replace_many_atomic(entities))
572    }
573
574    /// Replace a batch with explicitly non-atomic semantics.
575    ///
576    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
577    pub fn replace_many_non_atomic<E>(
578        &self,
579        entities: impl IntoIterator<Item = E>,
580    ) -> Result<WriteBatchResponse<E>, InternalError>
581    where
582        E: EntityKind<Canister = C> + EntityValue,
583    {
584        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
585    }
586
587    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
588    where
589        E: EntityKind<Canister = C> + EntityValue,
590    {
591        self.execute_save_entity(|save| save.update(entity))
592    }
593
594    /// Update a single-entity-type batch atomically in one commit window.
595    ///
596    /// If any item fails pre-commit validation, no row in the batch is persisted.
597    ///
598    /// This API is not a multi-entity transaction surface.
599    pub fn update_many_atomic<E>(
600        &self,
601        entities: impl IntoIterator<Item = E>,
602    ) -> Result<WriteBatchResponse<E>, InternalError>
603    where
604        E: EntityKind<Canister = C> + EntityValue,
605    {
606        self.execute_save_batch(|save| save.update_many_atomic(entities))
607    }
608
609    /// Update a batch with explicitly non-atomic semantics.
610    ///
611    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
612    pub fn update_many_non_atomic<E>(
613        &self,
614        entities: impl IntoIterator<Item = E>,
615    ) -> Result<WriteBatchResponse<E>, InternalError>
616    where
617        E: EntityKind<Canister = C> + EntityValue,
618    {
619        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
620    }
621
622    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
623    where
624        E: EntityKind<Canister = C> + EntityValue,
625    {
626        self.execute_save_view::<E>(|save| save.insert_view(view))
627    }
628
629    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
630    where
631        E: EntityKind<Canister = C> + EntityValue,
632    {
633        self.execute_save_view::<E>(|save| save.replace_view(view))
634    }
635
636    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
637    where
638        E: EntityKind<Canister = C> + EntityValue,
639    {
640        self.execute_save_view::<E>(|save| save.update_view(view))
641    }
642
643    /// TEST ONLY: clear all registered data and index stores for this database.
644    #[cfg(test)]
645    #[doc(hidden)]
646    pub fn clear_stores_for_tests(&self) {
647        self.db.with_store_registry(|reg| {
648            for (_, store) in reg.iter() {
649                store.with_data_mut(DataStore::clear);
650                store.with_index_mut(IndexStore::clear);
651            }
652        });
653    }
654}