Skip to main content

icydb_core/db/
session.rs

1// 3️⃣ Internal imports (implementation wiring)
2#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5    db::{
6        Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7        QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9        query::{intent::QueryMode, plan::CursorPlanError},
10    },
11    error::InternalError,
12    obs::sink::{MetricsSink, with_metrics_sink},
13    traits::{CanisterKind, EntityKind, EntityValue},
14    types::{Decimal, Id},
15};
16
17type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
18
19///
20/// DbSession
21///
22/// Session-scoped database handle with policy (debug, metrics) and execution routing.
23///
24
25pub struct DbSession<C: CanisterKind> {
26    db: Db<C>,
27    debug: bool,
28    metrics: Option<&'static dyn MetricsSink>,
29}
30
31impl<C: CanisterKind> DbSession<C> {
32    #[must_use]
33    pub const fn new(db: Db<C>) -> Self {
34        Self {
35            db,
36            debug: false,
37            metrics: None,
38        }
39    }
40
41    #[must_use]
42    pub const fn debug(mut self) -> Self {
43        self.debug = true;
44        self
45    }
46
47    #[must_use]
48    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
49        self.metrics = Some(sink);
50        self
51    }
52
53    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
54        if let Some(sink) = self.metrics {
55            with_metrics_sink(sink, f)
56        } else {
57            f()
58        }
59    }
60
61    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
62    fn execute_save_with<E, T, R>(
63        &self,
64        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
65        map: impl FnOnce(T) -> R,
66    ) -> Result<R, InternalError>
67    where
68        E: EntityKind<Canister = C> + EntityValue,
69    {
70        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
71
72        Ok(map(value))
73    }
74
75    // Shared save-facade wrappers keep response shape explicit at call sites.
76    fn execute_save_entity<E>(
77        &self,
78        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
79    ) -> Result<WriteResponse<E>, InternalError>
80    where
81        E: EntityKind<Canister = C> + EntityValue,
82    {
83        self.execute_save_with(op, WriteResponse::new)
84    }
85
86    fn execute_save_batch<E>(
87        &self,
88        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
89    ) -> Result<WriteBatchResponse<E>, InternalError>
90    where
91        E: EntityKind<Canister = C> + EntityValue,
92    {
93        self.execute_save_with(op, WriteBatchResponse::new)
94    }
95
96    fn execute_save_view<E>(
97        &self,
98        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
99    ) -> Result<E::ViewType, InternalError>
100    where
101        E: EntityKind<Canister = C> + EntityValue,
102    {
103        self.execute_save_with(op, std::convert::identity)
104    }
105
106    // ---------------------------------------------------------------------
107    // Query entry points (public, fluent)
108    // ---------------------------------------------------------------------
109
110    #[must_use]
111    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
112    where
113        E: EntityKind<Canister = C>,
114    {
115        FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
116    }
117
118    #[must_use]
119    pub const fn load_with_consistency<E>(
120        &self,
121        consistency: ReadConsistency,
122    ) -> FluentLoadQuery<'_, E>
123    where
124        E: EntityKind<Canister = C>,
125    {
126        FluentLoadQuery::new(self, Query::new(consistency))
127    }
128
129    #[must_use]
130    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
131    where
132        E: EntityKind<Canister = C>,
133    {
134        FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
135    }
136
137    #[must_use]
138    pub fn delete_with_consistency<E>(
139        &self,
140        consistency: ReadConsistency,
141    ) -> FluentDeleteQuery<'_, E>
142    where
143        E: EntityKind<Canister = C>,
144    {
145        FluentDeleteQuery::new(self, Query::new(consistency).delete())
146    }
147
148    // ---------------------------------------------------------------------
149    // Low-level executors (crate-internal; execution primitives)
150    // ---------------------------------------------------------------------
151
152    #[must_use]
153    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
154    where
155        E: EntityKind<Canister = C> + EntityValue,
156    {
157        LoadExecutor::new(self.db, self.debug)
158    }
159
160    #[must_use]
161    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
162    where
163        E: EntityKind<Canister = C> + EntityValue,
164    {
165        DeleteExecutor::new(self.db, self.debug)
166    }
167
168    #[must_use]
169    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
170    where
171        E: EntityKind<Canister = C> + EntityValue,
172    {
173        SaveExecutor::new(self.db, self.debug)
174    }
175
176    // ---------------------------------------------------------------------
177    // Query diagnostics / execution (internal routing)
178    // ---------------------------------------------------------------------
179
180    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
181    where
182        E: EntityKind<Canister = C> + EntityValue,
183    {
184        let plan = query.plan()?;
185
186        let result = match query.mode() {
187            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
188            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
189        };
190
191        result.map_err(QueryError::Execute)
192    }
193
194    pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
195    where
196        E: EntityKind<Canister = C> + EntityValue,
197    {
198        let plan = query.plan()?;
199
200        self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
201            .map_err(QueryError::Execute)
202    }
203
204    pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
205    where
206        E: EntityKind<Canister = C> + EntityValue,
207    {
208        let plan = query.plan()?;
209
210        self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
211            .map_err(QueryError::Execute)
212    }
213
214    pub(crate) fn execute_load_query_min<E>(
215        &self,
216        query: &Query<E>,
217    ) -> Result<Option<Id<E>>, QueryError>
218    where
219        E: EntityKind<Canister = C> + EntityValue,
220    {
221        let plan = query.plan()?;
222
223        self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
224            .map_err(QueryError::Execute)
225    }
226
227    pub(crate) fn execute_load_query_max<E>(
228        &self,
229        query: &Query<E>,
230    ) -> Result<Option<Id<E>>, QueryError>
231    where
232        E: EntityKind<Canister = C> + EntityValue,
233    {
234        let plan = query.plan()?;
235
236        self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
237            .map_err(QueryError::Execute)
238    }
239
240    pub(crate) fn execute_load_query_min_by<E>(
241        &self,
242        query: &Query<E>,
243        target_field: &str,
244    ) -> Result<Option<Id<E>>, QueryError>
245    where
246        E: EntityKind<Canister = C> + EntityValue,
247    {
248        let plan = query.plan()?;
249
250        self.with_metrics(|| {
251            self.load_executor::<E>()
252                .aggregate_min_by(plan, target_field)
253        })
254        .map_err(QueryError::Execute)
255    }
256
257    pub(crate) fn execute_load_query_max_by<E>(
258        &self,
259        query: &Query<E>,
260        target_field: &str,
261    ) -> Result<Option<Id<E>>, QueryError>
262    where
263        E: EntityKind<Canister = C> + EntityValue,
264    {
265        let plan = query.plan()?;
266
267        self.with_metrics(|| {
268            self.load_executor::<E>()
269                .aggregate_max_by(plan, target_field)
270        })
271        .map_err(QueryError::Execute)
272    }
273
274    pub(crate) fn execute_load_query_nth_by<E>(
275        &self,
276        query: &Query<E>,
277        target_field: &str,
278        nth: usize,
279    ) -> Result<Option<Id<E>>, QueryError>
280    where
281        E: EntityKind<Canister = C> + EntityValue,
282    {
283        let plan = query.plan()?;
284
285        self.with_metrics(|| {
286            self.load_executor::<E>()
287                .aggregate_nth_by(plan, target_field, nth)
288        })
289        .map_err(QueryError::Execute)
290    }
291
292    pub(crate) fn execute_load_query_sum_by<E>(
293        &self,
294        query: &Query<E>,
295        target_field: &str,
296    ) -> Result<Option<Decimal>, QueryError>
297    where
298        E: EntityKind<Canister = C> + EntityValue,
299    {
300        let plan = query.plan()?;
301
302        self.with_metrics(|| {
303            self.load_executor::<E>()
304                .aggregate_sum_by(plan, target_field)
305        })
306        .map_err(QueryError::Execute)
307    }
308
309    pub(crate) fn execute_load_query_avg_by<E>(
310        &self,
311        query: &Query<E>,
312        target_field: &str,
313    ) -> Result<Option<Decimal>, QueryError>
314    where
315        E: EntityKind<Canister = C> + EntityValue,
316    {
317        let plan = query.plan()?;
318
319        self.with_metrics(|| {
320            self.load_executor::<E>()
321                .aggregate_avg_by(plan, target_field)
322        })
323        .map_err(QueryError::Execute)
324    }
325
326    pub(crate) fn execute_load_query_median_by<E>(
327        &self,
328        query: &Query<E>,
329        target_field: &str,
330    ) -> Result<Option<Id<E>>, QueryError>
331    where
332        E: EntityKind<Canister = C> + EntityValue,
333    {
334        let plan = query.plan()?;
335
336        self.with_metrics(|| {
337            self.load_executor::<E>()
338                .aggregate_median_by(plan, target_field)
339        })
340        .map_err(QueryError::Execute)
341    }
342
343    pub(crate) fn execute_load_query_count_distinct_by<E>(
344        &self,
345        query: &Query<E>,
346        target_field: &str,
347    ) -> Result<u32, QueryError>
348    where
349        E: EntityKind<Canister = C> + EntityValue,
350    {
351        let plan = query.plan()?;
352
353        self.with_metrics(|| {
354            self.load_executor::<E>()
355                .aggregate_count_distinct_by(plan, target_field)
356        })
357        .map_err(QueryError::Execute)
358    }
359
360    pub(crate) fn execute_load_query_min_max_by<E>(
361        &self,
362        query: &Query<E>,
363        target_field: &str,
364    ) -> Result<MinMaxByIds<E>, QueryError>
365    where
366        E: EntityKind<Canister = C> + EntityValue,
367    {
368        let plan = query.plan()?;
369
370        self.with_metrics(|| {
371            self.load_executor::<E>()
372                .aggregate_min_max_by(plan, target_field)
373        })
374        .map_err(QueryError::Execute)
375    }
376
377    pub(crate) fn execute_load_query_first<E>(
378        &self,
379        query: &Query<E>,
380    ) -> Result<Option<Id<E>>, QueryError>
381    where
382        E: EntityKind<Canister = C> + EntityValue,
383    {
384        let plan = query.plan()?;
385
386        self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
387            .map_err(QueryError::Execute)
388    }
389
390    pub(crate) fn execute_load_query_last<E>(
391        &self,
392        query: &Query<E>,
393    ) -> Result<Option<Id<E>>, QueryError>
394    where
395        E: EntityKind<Canister = C> + EntityValue,
396    {
397        let plan = query.plan()?;
398
399        self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
400            .map_err(QueryError::Execute)
401    }
402
403    pub(crate) fn execute_load_query_paged_with_trace<E>(
404        &self,
405        query: &Query<E>,
406        cursor_token: Option<&str>,
407    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
408    where
409        E: EntityKind<Canister = C> + EntityValue,
410    {
411        let plan = query.plan()?;
412        let cursor_bytes = match cursor_token {
413            Some(token) => Some(decode_cursor(token).map_err(|reason| {
414                QueryError::from(PlanError::from(
415                    CursorPlanError::InvalidContinuationCursor { reason },
416                ))
417            })?),
418            None => None,
419        };
420        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
421
422        let (page, trace) = self
423            .with_metrics(|| {
424                self.load_executor::<E>()
425                    .execute_paged_with_cursor_traced(plan, cursor)
426            })
427            .map_err(QueryError::Execute)?;
428
429        Ok((page.items, page.next_cursor, trace))
430    }
431
432    // ---------------------------------------------------------------------
433    // High-level write API (public, intent-level)
434    // ---------------------------------------------------------------------
435
436    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
437    where
438        E: EntityKind<Canister = C> + EntityValue,
439    {
440        self.execute_save_entity(|save| save.insert(entity))
441    }
442
443    /// Insert a single-entity-type batch atomically in one commit window.
444    ///
445    /// If any item fails pre-commit validation, no row in the batch is persisted.
446    ///
447    /// This API is not a multi-entity transaction surface.
448    pub fn insert_many_atomic<E>(
449        &self,
450        entities: impl IntoIterator<Item = E>,
451    ) -> Result<WriteBatchResponse<E>, InternalError>
452    where
453        E: EntityKind<Canister = C> + EntityValue,
454    {
455        self.execute_save_batch(|save| save.insert_many_atomic(entities))
456    }
457
458    /// Insert a batch with explicitly non-atomic semantics.
459    ///
460    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
461    pub fn insert_many_non_atomic<E>(
462        &self,
463        entities: impl IntoIterator<Item = E>,
464    ) -> Result<WriteBatchResponse<E>, InternalError>
465    where
466        E: EntityKind<Canister = C> + EntityValue,
467    {
468        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
469    }
470
471    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
472    where
473        E: EntityKind<Canister = C> + EntityValue,
474    {
475        self.execute_save_entity(|save| save.replace(entity))
476    }
477
478    /// Replace a single-entity-type batch atomically in one commit window.
479    ///
480    /// If any item fails pre-commit validation, no row in the batch is persisted.
481    ///
482    /// This API is not a multi-entity transaction surface.
483    pub fn replace_many_atomic<E>(
484        &self,
485        entities: impl IntoIterator<Item = E>,
486    ) -> Result<WriteBatchResponse<E>, InternalError>
487    where
488        E: EntityKind<Canister = C> + EntityValue,
489    {
490        self.execute_save_batch(|save| save.replace_many_atomic(entities))
491    }
492
493    /// Replace a batch with explicitly non-atomic semantics.
494    ///
495    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
496    pub fn replace_many_non_atomic<E>(
497        &self,
498        entities: impl IntoIterator<Item = E>,
499    ) -> Result<WriteBatchResponse<E>, InternalError>
500    where
501        E: EntityKind<Canister = C> + EntityValue,
502    {
503        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
504    }
505
506    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
507    where
508        E: EntityKind<Canister = C> + EntityValue,
509    {
510        self.execute_save_entity(|save| save.update(entity))
511    }
512
513    /// Update a single-entity-type batch atomically in one commit window.
514    ///
515    /// If any item fails pre-commit validation, no row in the batch is persisted.
516    ///
517    /// This API is not a multi-entity transaction surface.
518    pub fn update_many_atomic<E>(
519        &self,
520        entities: impl IntoIterator<Item = E>,
521    ) -> Result<WriteBatchResponse<E>, InternalError>
522    where
523        E: EntityKind<Canister = C> + EntityValue,
524    {
525        self.execute_save_batch(|save| save.update_many_atomic(entities))
526    }
527
528    /// Update a batch with explicitly non-atomic semantics.
529    ///
530    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
531    pub fn update_many_non_atomic<E>(
532        &self,
533        entities: impl IntoIterator<Item = E>,
534    ) -> Result<WriteBatchResponse<E>, InternalError>
535    where
536        E: EntityKind<Canister = C> + EntityValue,
537    {
538        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
539    }
540
541    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
542    where
543        E: EntityKind<Canister = C> + EntityValue,
544    {
545        self.execute_save_view::<E>(|save| save.insert_view(view))
546    }
547
548    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
549    where
550        E: EntityKind<Canister = C> + EntityValue,
551    {
552        self.execute_save_view::<E>(|save| save.replace_view(view))
553    }
554
555    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
556    where
557        E: EntityKind<Canister = C> + EntityValue,
558    {
559        self.execute_save_view::<E>(|save| save.update_view(view))
560    }
561
562    /// TEST ONLY: clear all registered data and index stores for this database.
563    #[cfg(test)]
564    #[doc(hidden)]
565    pub fn clear_stores_for_tests(&self) {
566        self.db.with_store_registry(|reg| {
567            for (_, store) in reg.iter() {
568                store.with_data_mut(DataStore::clear);
569                store.with_index_mut(IndexStore::clear);
570            }
571        });
572    }
573}