Skip to main content

icydb_core/db/
session.rs

1// 3️⃣ Internal imports (implementation wiring)
2#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5    db::{
6        Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7        QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9        query::{intent::QueryMode, plan::CursorPlanError},
10    },
11    error::InternalError,
12    obs::sink::{MetricsSink, with_metrics_sink},
13    traits::{CanisterKind, EntityKind, EntityValue},
14    types::{Decimal, Id},
15    value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20///
21/// DbSession
22///
23/// Session-scoped database handle with policy (debug, metrics) and execution routing.
24///
25
26pub struct DbSession<C: CanisterKind> {
27    db: Db<C>,
28    debug: bool,
29    metrics: Option<&'static dyn MetricsSink>,
30}
31
32impl<C: CanisterKind> DbSession<C> {
33    #[must_use]
34    pub const fn new(db: Db<C>) -> Self {
35        Self {
36            db,
37            debug: false,
38            metrics: None,
39        }
40    }
41
42    #[must_use]
43    pub const fn debug(mut self) -> Self {
44        self.debug = true;
45        self
46    }
47
48    #[must_use]
49    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
50        self.metrics = Some(sink);
51        self
52    }
53
54    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
55        if let Some(sink) = self.metrics {
56            with_metrics_sink(sink, f)
57        } else {
58            f()
59        }
60    }
61
62    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
63    fn execute_save_with<E, T, R>(
64        &self,
65        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
66        map: impl FnOnce(T) -> R,
67    ) -> Result<R, InternalError>
68    where
69        E: EntityKind<Canister = C> + EntityValue,
70    {
71        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
72
73        Ok(map(value))
74    }
75
76    // Shared save-facade wrappers keep response shape explicit at call sites.
77    fn execute_save_entity<E>(
78        &self,
79        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
80    ) -> Result<WriteResponse<E>, InternalError>
81    where
82        E: EntityKind<Canister = C> + EntityValue,
83    {
84        self.execute_save_with(op, WriteResponse::new)
85    }
86
87    fn execute_save_batch<E>(
88        &self,
89        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
90    ) -> Result<WriteBatchResponse<E>, InternalError>
91    where
92        E: EntityKind<Canister = C> + EntityValue,
93    {
94        self.execute_save_with(op, WriteBatchResponse::new)
95    }
96
97    fn execute_save_view<E>(
98        &self,
99        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
100    ) -> Result<E::ViewType, InternalError>
101    where
102        E: EntityKind<Canister = C> + EntityValue,
103    {
104        self.execute_save_with(op, std::convert::identity)
105    }
106
107    // ---------------------------------------------------------------------
108    // Query entry points (public, fluent)
109    // ---------------------------------------------------------------------
110
111    #[must_use]
112    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
113    where
114        E: EntityKind<Canister = C>,
115    {
116        FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
117    }
118
119    #[must_use]
120    pub const fn load_with_consistency<E>(
121        &self,
122        consistency: ReadConsistency,
123    ) -> FluentLoadQuery<'_, E>
124    where
125        E: EntityKind<Canister = C>,
126    {
127        FluentLoadQuery::new(self, Query::new(consistency))
128    }
129
130    #[must_use]
131    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
132    where
133        E: EntityKind<Canister = C>,
134    {
135        FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
136    }
137
138    #[must_use]
139    pub fn delete_with_consistency<E>(
140        &self,
141        consistency: ReadConsistency,
142    ) -> FluentDeleteQuery<'_, E>
143    where
144        E: EntityKind<Canister = C>,
145    {
146        FluentDeleteQuery::new(self, Query::new(consistency).delete())
147    }
148
149    // ---------------------------------------------------------------------
150    // Low-level executors (crate-internal; execution primitives)
151    // ---------------------------------------------------------------------
152
153    #[must_use]
154    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
155    where
156        E: EntityKind<Canister = C> + EntityValue,
157    {
158        LoadExecutor::new(self.db, self.debug)
159    }
160
161    #[must_use]
162    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
163    where
164        E: EntityKind<Canister = C> + EntityValue,
165    {
166        DeleteExecutor::new(self.db, self.debug)
167    }
168
169    #[must_use]
170    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
171    where
172        E: EntityKind<Canister = C> + EntityValue,
173    {
174        SaveExecutor::new(self.db, self.debug)
175    }
176
177    // ---------------------------------------------------------------------
178    // Query diagnostics / execution (internal routing)
179    // ---------------------------------------------------------------------
180
181    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
182    where
183        E: EntityKind<Canister = C> + EntityValue,
184    {
185        let plan = query.plan()?;
186
187        let result = match query.mode() {
188            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
189            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
190        };
191
192        result.map_err(QueryError::Execute)
193    }
194
195    pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
196    where
197        E: EntityKind<Canister = C> + EntityValue,
198    {
199        let plan = query.plan()?;
200
201        self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
202            .map_err(QueryError::Execute)
203    }
204
205    pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
206    where
207        E: EntityKind<Canister = C> + EntityValue,
208    {
209        let plan = query.plan()?;
210
211        self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
212            .map_err(QueryError::Execute)
213    }
214
215    pub(crate) fn execute_load_query_min<E>(
216        &self,
217        query: &Query<E>,
218    ) -> Result<Option<Id<E>>, QueryError>
219    where
220        E: EntityKind<Canister = C> + EntityValue,
221    {
222        let plan = query.plan()?;
223
224        self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
225            .map_err(QueryError::Execute)
226    }
227
228    pub(crate) fn execute_load_query_max<E>(
229        &self,
230        query: &Query<E>,
231    ) -> Result<Option<Id<E>>, QueryError>
232    where
233        E: EntityKind<Canister = C> + EntityValue,
234    {
235        let plan = query.plan()?;
236
237        self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
238            .map_err(QueryError::Execute)
239    }
240
241    pub(crate) fn execute_load_query_min_by<E>(
242        &self,
243        query: &Query<E>,
244        target_field: &str,
245    ) -> Result<Option<Id<E>>, QueryError>
246    where
247        E: EntityKind<Canister = C> + EntityValue,
248    {
249        let plan = query.plan()?;
250
251        self.with_metrics(|| {
252            self.load_executor::<E>()
253                .aggregate_min_by(plan, target_field)
254        })
255        .map_err(QueryError::Execute)
256    }
257
258    pub(crate) fn execute_load_query_max_by<E>(
259        &self,
260        query: &Query<E>,
261        target_field: &str,
262    ) -> Result<Option<Id<E>>, QueryError>
263    where
264        E: EntityKind<Canister = C> + EntityValue,
265    {
266        let plan = query.plan()?;
267
268        self.with_metrics(|| {
269            self.load_executor::<E>()
270                .aggregate_max_by(plan, target_field)
271        })
272        .map_err(QueryError::Execute)
273    }
274
275    pub(crate) fn execute_load_query_nth_by<E>(
276        &self,
277        query: &Query<E>,
278        target_field: &str,
279        nth: usize,
280    ) -> Result<Option<Id<E>>, QueryError>
281    where
282        E: EntityKind<Canister = C> + EntityValue,
283    {
284        let plan = query.plan()?;
285
286        self.with_metrics(|| {
287            self.load_executor::<E>()
288                .aggregate_nth_by(plan, target_field, nth)
289        })
290        .map_err(QueryError::Execute)
291    }
292
293    pub(crate) fn execute_load_query_sum_by<E>(
294        &self,
295        query: &Query<E>,
296        target_field: &str,
297    ) -> Result<Option<Decimal>, QueryError>
298    where
299        E: EntityKind<Canister = C> + EntityValue,
300    {
301        let plan = query.plan()?;
302
303        self.with_metrics(|| {
304            self.load_executor::<E>()
305                .aggregate_sum_by(plan, target_field)
306        })
307        .map_err(QueryError::Execute)
308    }
309
310    pub(crate) fn execute_load_query_avg_by<E>(
311        &self,
312        query: &Query<E>,
313        target_field: &str,
314    ) -> Result<Option<Decimal>, QueryError>
315    where
316        E: EntityKind<Canister = C> + EntityValue,
317    {
318        let plan = query.plan()?;
319
320        self.with_metrics(|| {
321            self.load_executor::<E>()
322                .aggregate_avg_by(plan, target_field)
323        })
324        .map_err(QueryError::Execute)
325    }
326
327    pub(crate) fn execute_load_query_median_by<E>(
328        &self,
329        query: &Query<E>,
330        target_field: &str,
331    ) -> Result<Option<Id<E>>, QueryError>
332    where
333        E: EntityKind<Canister = C> + EntityValue,
334    {
335        let plan = query.plan()?;
336
337        self.with_metrics(|| {
338            self.load_executor::<E>()
339                .aggregate_median_by(plan, target_field)
340        })
341        .map_err(QueryError::Execute)
342    }
343
344    pub(crate) fn execute_load_query_count_distinct_by<E>(
345        &self,
346        query: &Query<E>,
347        target_field: &str,
348    ) -> Result<u32, QueryError>
349    where
350        E: EntityKind<Canister = C> + EntityValue,
351    {
352        let plan = query.plan()?;
353
354        self.with_metrics(|| {
355            self.load_executor::<E>()
356                .aggregate_count_distinct_by(plan, target_field)
357        })
358        .map_err(QueryError::Execute)
359    }
360
361    pub(crate) fn execute_load_query_min_max_by<E>(
362        &self,
363        query: &Query<E>,
364        target_field: &str,
365    ) -> Result<MinMaxByIds<E>, QueryError>
366    where
367        E: EntityKind<Canister = C> + EntityValue,
368    {
369        let plan = query.plan()?;
370
371        self.with_metrics(|| {
372            self.load_executor::<E>()
373                .aggregate_min_max_by(plan, target_field)
374        })
375        .map_err(QueryError::Execute)
376    }
377
378    pub(crate) fn execute_load_query_values_by<E>(
379        &self,
380        query: &Query<E>,
381        target_field: &str,
382    ) -> Result<Vec<Value>, QueryError>
383    where
384        E: EntityKind<Canister = C> + EntityValue,
385    {
386        let plan = query.plan()?;
387
388        self.with_metrics(|| self.load_executor::<E>().values_by(plan, target_field))
389            .map_err(QueryError::Execute)
390    }
391
392    pub(crate) fn execute_load_query_take<E>(
393        &self,
394        query: &Query<E>,
395        take_count: u32,
396    ) -> Result<Response<E>, QueryError>
397    where
398        E: EntityKind<Canister = C> + EntityValue,
399    {
400        let plan = query.plan()?;
401
402        self.with_metrics(|| self.load_executor::<E>().take(plan, take_count))
403            .map_err(QueryError::Execute)
404    }
405
406    pub(crate) fn execute_load_query_top_k_by<E>(
407        &self,
408        query: &Query<E>,
409        target_field: &str,
410        take_count: u32,
411    ) -> Result<Response<E>, QueryError>
412    where
413        E: EntityKind<Canister = C> + EntityValue,
414    {
415        let plan = query.plan()?;
416
417        self.with_metrics(|| {
418            self.load_executor::<E>()
419                .top_k_by(plan, target_field, take_count)
420        })
421        .map_err(QueryError::Execute)
422    }
423
424    pub(crate) fn execute_load_query_distinct_values_by<E>(
425        &self,
426        query: &Query<E>,
427        target_field: &str,
428    ) -> Result<Vec<Value>, QueryError>
429    where
430        E: EntityKind<Canister = C> + EntityValue,
431    {
432        let plan = query.plan()?;
433
434        self.with_metrics(|| {
435            self.load_executor::<E>()
436                .distinct_values_by(plan, target_field)
437        })
438        .map_err(QueryError::Execute)
439    }
440
441    pub(crate) fn execute_load_query_values_by_with_ids<E>(
442        &self,
443        query: &Query<E>,
444        target_field: &str,
445    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
446    where
447        E: EntityKind<Canister = C> + EntityValue,
448    {
449        let plan = query.plan()?;
450
451        self.with_metrics(|| {
452            self.load_executor::<E>()
453                .values_by_with_ids(plan, target_field)
454        })
455        .map_err(QueryError::Execute)
456    }
457
458    pub(crate) fn execute_load_query_first_value_by<E>(
459        &self,
460        query: &Query<E>,
461        target_field: &str,
462    ) -> Result<Option<Value>, QueryError>
463    where
464        E: EntityKind<Canister = C> + EntityValue,
465    {
466        let plan = query.plan()?;
467
468        self.with_metrics(|| self.load_executor::<E>().first_value_by(plan, target_field))
469            .map_err(QueryError::Execute)
470    }
471
472    pub(crate) fn execute_load_query_last_value_by<E>(
473        &self,
474        query: &Query<E>,
475        target_field: &str,
476    ) -> Result<Option<Value>, QueryError>
477    where
478        E: EntityKind<Canister = C> + EntityValue,
479    {
480        let plan = query.plan()?;
481
482        self.with_metrics(|| self.load_executor::<E>().last_value_by(plan, target_field))
483            .map_err(QueryError::Execute)
484    }
485
486    pub(crate) fn execute_load_query_first<E>(
487        &self,
488        query: &Query<E>,
489    ) -> Result<Option<Id<E>>, QueryError>
490    where
491        E: EntityKind<Canister = C> + EntityValue,
492    {
493        let plan = query.plan()?;
494
495        self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
496            .map_err(QueryError::Execute)
497    }
498
499    pub(crate) fn execute_load_query_last<E>(
500        &self,
501        query: &Query<E>,
502    ) -> Result<Option<Id<E>>, QueryError>
503    where
504        E: EntityKind<Canister = C> + EntityValue,
505    {
506        let plan = query.plan()?;
507
508        self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
509            .map_err(QueryError::Execute)
510    }
511
512    pub(crate) fn execute_load_query_paged_with_trace<E>(
513        &self,
514        query: &Query<E>,
515        cursor_token: Option<&str>,
516    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
517    where
518        E: EntityKind<Canister = C> + EntityValue,
519    {
520        let plan = query.plan()?;
521        let cursor_bytes = match cursor_token {
522            Some(token) => Some(decode_cursor(token).map_err(|reason| {
523                QueryError::from(PlanError::from(
524                    CursorPlanError::InvalidContinuationCursor { reason },
525                ))
526            })?),
527            None => None,
528        };
529        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
530
531        let (page, trace) = self
532            .with_metrics(|| {
533                self.load_executor::<E>()
534                    .execute_paged_with_cursor_traced(plan, cursor)
535            })
536            .map_err(QueryError::Execute)?;
537
538        Ok(PagedLoadExecutionWithTrace::new(
539            page.items,
540            page.next_cursor,
541            trace,
542        ))
543    }
544
545    // ---------------------------------------------------------------------
546    // High-level write API (public, intent-level)
547    // ---------------------------------------------------------------------
548
549    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
550    where
551        E: EntityKind<Canister = C> + EntityValue,
552    {
553        self.execute_save_entity(|save| save.insert(entity))
554    }
555
556    /// Insert a single-entity-type batch atomically in one commit window.
557    ///
558    /// If any item fails pre-commit validation, no row in the batch is persisted.
559    ///
560    /// This API is not a multi-entity transaction surface.
561    pub fn insert_many_atomic<E>(
562        &self,
563        entities: impl IntoIterator<Item = E>,
564    ) -> Result<WriteBatchResponse<E>, InternalError>
565    where
566        E: EntityKind<Canister = C> + EntityValue,
567    {
568        self.execute_save_batch(|save| save.insert_many_atomic(entities))
569    }
570
571    /// Insert a batch with explicitly non-atomic semantics.
572    ///
573    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
574    pub fn insert_many_non_atomic<E>(
575        &self,
576        entities: impl IntoIterator<Item = E>,
577    ) -> Result<WriteBatchResponse<E>, InternalError>
578    where
579        E: EntityKind<Canister = C> + EntityValue,
580    {
581        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
582    }
583
584    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
585    where
586        E: EntityKind<Canister = C> + EntityValue,
587    {
588        self.execute_save_entity(|save| save.replace(entity))
589    }
590
591    /// Replace a single-entity-type batch atomically in one commit window.
592    ///
593    /// If any item fails pre-commit validation, no row in the batch is persisted.
594    ///
595    /// This API is not a multi-entity transaction surface.
596    pub fn replace_many_atomic<E>(
597        &self,
598        entities: impl IntoIterator<Item = E>,
599    ) -> Result<WriteBatchResponse<E>, InternalError>
600    where
601        E: EntityKind<Canister = C> + EntityValue,
602    {
603        self.execute_save_batch(|save| save.replace_many_atomic(entities))
604    }
605
606    /// Replace a batch with explicitly non-atomic semantics.
607    ///
608    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
609    pub fn replace_many_non_atomic<E>(
610        &self,
611        entities: impl IntoIterator<Item = E>,
612    ) -> Result<WriteBatchResponse<E>, InternalError>
613    where
614        E: EntityKind<Canister = C> + EntityValue,
615    {
616        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
617    }
618
619    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
620    where
621        E: EntityKind<Canister = C> + EntityValue,
622    {
623        self.execute_save_entity(|save| save.update(entity))
624    }
625
626    /// Update a single-entity-type batch atomically in one commit window.
627    ///
628    /// If any item fails pre-commit validation, no row in the batch is persisted.
629    ///
630    /// This API is not a multi-entity transaction surface.
631    pub fn update_many_atomic<E>(
632        &self,
633        entities: impl IntoIterator<Item = E>,
634    ) -> Result<WriteBatchResponse<E>, InternalError>
635    where
636        E: EntityKind<Canister = C> + EntityValue,
637    {
638        self.execute_save_batch(|save| save.update_many_atomic(entities))
639    }
640
641    /// Update a batch with explicitly non-atomic semantics.
642    ///
643    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
644    pub fn update_many_non_atomic<E>(
645        &self,
646        entities: impl IntoIterator<Item = E>,
647    ) -> Result<WriteBatchResponse<E>, InternalError>
648    where
649        E: EntityKind<Canister = C> + EntityValue,
650    {
651        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
652    }
653
654    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
655    where
656        E: EntityKind<Canister = C> + EntityValue,
657    {
658        self.execute_save_view::<E>(|save| save.insert_view(view))
659    }
660
661    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
662    where
663        E: EntityKind<Canister = C> + EntityValue,
664    {
665        self.execute_save_view::<E>(|save| save.replace_view(view))
666    }
667
668    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
669    where
670        E: EntityKind<Canister = C> + EntityValue,
671    {
672        self.execute_save_view::<E>(|save| save.update_view(view))
673    }
674
675    /// TEST ONLY: clear all registered data and index stores for this database.
676    #[cfg(test)]
677    #[doc(hidden)]
678    pub fn clear_stores_for_tests(&self) {
679        self.db.with_store_registry(|reg| {
680            for (_, store) in reg.iter() {
681                store.with_data_mut(DataStore::clear);
682                store.with_index_mut(IndexStore::clear);
683            }
684        });
685    }
686}