Skip to main content

icydb_core/db/
session.rs

1// 3️⃣ Internal imports (implementation wiring)
2#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5    db::{
6        Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7        QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9        query::{intent::QueryMode, plan::CursorPlanError},
10    },
11    error::InternalError,
12    obs::sink::{MetricsSink, with_metrics_sink},
13    traits::{CanisterKind, EntityKind, EntityValue},
14    types::{Decimal, Id},
15};
16
17type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
18
19///
20/// DbSession
21///
22/// Session-scoped database handle with policy (debug, metrics) and execution routing.
23///
24
25pub struct DbSession<C: CanisterKind> {
26    db: Db<C>,
27    debug: bool,
28    metrics: Option<&'static dyn MetricsSink>,
29}
30
31impl<C: CanisterKind> DbSession<C> {
32    #[must_use]
33    pub const fn new(db: Db<C>) -> Self {
34        Self {
35            db,
36            debug: false,
37            metrics: None,
38        }
39    }
40
41    #[must_use]
42    pub const fn debug(mut self) -> Self {
43        self.debug = true;
44        self
45    }
46
47    #[must_use]
48    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
49        self.metrics = Some(sink);
50        self
51    }
52
53    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
54        if let Some(sink) = self.metrics {
55            with_metrics_sink(sink, f)
56        } else {
57            f()
58        }
59    }
60
61    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
62    fn execute_save_with<E, T, R>(
63        &self,
64        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
65        map: impl FnOnce(T) -> R,
66    ) -> Result<R, InternalError>
67    where
68        E: EntityKind<Canister = C> + EntityValue,
69    {
70        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
71
72        Ok(map(value))
73    }
74
75    // Shared save-facade wrappers keep response shape explicit at call sites.
76    fn execute_save_entity<E>(
77        &self,
78        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
79    ) -> Result<WriteResponse<E>, InternalError>
80    where
81        E: EntityKind<Canister = C> + EntityValue,
82    {
83        self.execute_save_with(op, WriteResponse::new)
84    }
85
86    fn execute_save_batch<E>(
87        &self,
88        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
89    ) -> Result<WriteBatchResponse<E>, InternalError>
90    where
91        E: EntityKind<Canister = C> + EntityValue,
92    {
93        self.execute_save_with(op, WriteBatchResponse::new)
94    }
95
96    fn execute_save_view<E>(
97        &self,
98        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
99    ) -> Result<E::ViewType, InternalError>
100    where
101        E: EntityKind<Canister = C> + EntityValue,
102    {
103        self.execute_save_with(op, std::convert::identity)
104    }
105
106    // ---------------------------------------------------------------------
107    // Query entry points (public, fluent)
108    // ---------------------------------------------------------------------
109
110    #[must_use]
111    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
112    where
113        E: EntityKind<Canister = C>,
114    {
115        FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
116    }
117
118    #[must_use]
119    pub const fn load_with_consistency<E>(
120        &self,
121        consistency: ReadConsistency,
122    ) -> FluentLoadQuery<'_, E>
123    where
124        E: EntityKind<Canister = C>,
125    {
126        FluentLoadQuery::new(self, Query::new(consistency))
127    }
128
129    #[must_use]
130    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
131    where
132        E: EntityKind<Canister = C>,
133    {
134        FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
135    }
136
137    #[must_use]
138    pub fn delete_with_consistency<E>(
139        &self,
140        consistency: ReadConsistency,
141    ) -> FluentDeleteQuery<'_, E>
142    where
143        E: EntityKind<Canister = C>,
144    {
145        FluentDeleteQuery::new(self, Query::new(consistency).delete())
146    }
147
148    // ---------------------------------------------------------------------
149    // Low-level executors (crate-internal; execution primitives)
150    // ---------------------------------------------------------------------
151
152    #[must_use]
153    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
154    where
155        E: EntityKind<Canister = C> + EntityValue,
156    {
157        LoadExecutor::new(self.db, self.debug)
158    }
159
160    #[must_use]
161    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
162    where
163        E: EntityKind<Canister = C> + EntityValue,
164    {
165        DeleteExecutor::new(self.db, self.debug)
166    }
167
168    #[must_use]
169    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
170    where
171        E: EntityKind<Canister = C> + EntityValue,
172    {
173        SaveExecutor::new(self.db, self.debug)
174    }
175
176    // ---------------------------------------------------------------------
177    // Query diagnostics / execution (internal routing)
178    // ---------------------------------------------------------------------
179
180    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
181    where
182        E: EntityKind<Canister = C> + EntityValue,
183    {
184        let plan = query.plan()?;
185
186        let result = match query.mode() {
187            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
188            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
189        };
190
191        result.map_err(QueryError::Execute)
192    }
193
194    pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
195    where
196        E: EntityKind<Canister = C> + EntityValue,
197    {
198        let plan = query.plan()?;
199
200        self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
201            .map_err(QueryError::Execute)
202    }
203
204    pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
205    where
206        E: EntityKind<Canister = C> + EntityValue,
207    {
208        let plan = query.plan()?;
209
210        self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
211            .map_err(QueryError::Execute)
212    }
213
214    pub(crate) fn execute_load_query_min<E>(
215        &self,
216        query: &Query<E>,
217    ) -> Result<Option<Id<E>>, QueryError>
218    where
219        E: EntityKind<Canister = C> + EntityValue,
220    {
221        let plan = query.plan()?;
222
223        self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
224            .map_err(QueryError::Execute)
225    }
226
227    pub(crate) fn execute_load_query_max<E>(
228        &self,
229        query: &Query<E>,
230    ) -> Result<Option<Id<E>>, QueryError>
231    where
232        E: EntityKind<Canister = C> + EntityValue,
233    {
234        let plan = query.plan()?;
235
236        self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
237            .map_err(QueryError::Execute)
238    }
239
240    pub(crate) fn execute_load_query_min_by<E>(
241        &self,
242        query: &Query<E>,
243        target_field: &str,
244    ) -> Result<Option<Id<E>>, QueryError>
245    where
246        E: EntityKind<Canister = C> + EntityValue,
247    {
248        let plan = query.plan()?;
249
250        self.with_metrics(|| {
251            self.load_executor::<E>()
252                .aggregate_min_by(plan, target_field)
253        })
254        .map_err(QueryError::Execute)
255    }
256
257    pub(crate) fn execute_load_query_max_by<E>(
258        &self,
259        query: &Query<E>,
260        target_field: &str,
261    ) -> Result<Option<Id<E>>, QueryError>
262    where
263        E: EntityKind<Canister = C> + EntityValue,
264    {
265        let plan = query.plan()?;
266
267        self.with_metrics(|| {
268            self.load_executor::<E>()
269                .aggregate_max_by(plan, target_field)
270        })
271        .map_err(QueryError::Execute)
272    }
273
274    pub(crate) fn execute_load_query_nth_by<E>(
275        &self,
276        query: &Query<E>,
277        target_field: &str,
278        nth: usize,
279    ) -> Result<Option<Id<E>>, QueryError>
280    where
281        E: EntityKind<Canister = C> + EntityValue,
282    {
283        let plan = query.plan()?;
284
285        self.with_metrics(|| {
286            self.load_executor::<E>()
287                .aggregate_nth_by(plan, target_field, nth)
288        })
289        .map_err(QueryError::Execute)
290    }
291
292    pub(crate) fn execute_load_query_sum_by<E>(
293        &self,
294        query: &Query<E>,
295        target_field: &str,
296    ) -> Result<Option<Decimal>, QueryError>
297    where
298        E: EntityKind<Canister = C> + EntityValue,
299    {
300        let plan = query.plan()?;
301
302        self.with_metrics(|| {
303            self.load_executor::<E>()
304                .aggregate_sum_by(plan, target_field)
305        })
306        .map_err(QueryError::Execute)
307    }
308
309    pub(crate) fn execute_load_query_avg_by<E>(
310        &self,
311        query: &Query<E>,
312        target_field: &str,
313    ) -> Result<Option<Decimal>, QueryError>
314    where
315        E: EntityKind<Canister = C> + EntityValue,
316    {
317        let plan = query.plan()?;
318
319        self.with_metrics(|| {
320            self.load_executor::<E>()
321                .aggregate_avg_by(plan, target_field)
322        })
323        .map_err(QueryError::Execute)
324    }
325
326    pub(crate) fn execute_load_query_median_by<E>(
327        &self,
328        query: &Query<E>,
329        target_field: &str,
330    ) -> Result<Option<Id<E>>, QueryError>
331    where
332        E: EntityKind<Canister = C> + EntityValue,
333    {
334        let plan = query.plan()?;
335
336        self.with_metrics(|| {
337            self.load_executor::<E>()
338                .aggregate_median_by(plan, target_field)
339        })
340        .map_err(QueryError::Execute)
341    }
342
343    pub(crate) fn execute_load_query_count_distinct_by<E>(
344        &self,
345        query: &Query<E>,
346        target_field: &str,
347    ) -> Result<u32, QueryError>
348    where
349        E: EntityKind<Canister = C> + EntityValue,
350    {
351        let plan = query.plan()?;
352
353        self.with_metrics(|| {
354            self.load_executor::<E>()
355                .aggregate_count_distinct_by(plan, target_field)
356        })
357        .map_err(QueryError::Execute)
358    }
359
360    pub(crate) fn execute_load_query_min_max_by<E>(
361        &self,
362        query: &Query<E>,
363        target_field: &str,
364    ) -> Result<MinMaxByIds<E>, QueryError>
365    where
366        E: EntityKind<Canister = C> + EntityValue,
367    {
368        let plan = query.plan()?;
369
370        self.with_metrics(|| {
371            self.load_executor::<E>()
372                .aggregate_min_max_by(plan, target_field)
373        })
374        .map_err(QueryError::Execute)
375    }
376
377    pub(crate) fn execute_load_query_first<E>(
378        &self,
379        query: &Query<E>,
380    ) -> Result<Option<Id<E>>, QueryError>
381    where
382        E: EntityKind<Canister = C> + EntityValue,
383    {
384        let plan = query.plan()?;
385
386        self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
387            .map_err(QueryError::Execute)
388    }
389
390    pub(crate) fn execute_load_query_last<E>(
391        &self,
392        query: &Query<E>,
393    ) -> Result<Option<Id<E>>, QueryError>
394    where
395        E: EntityKind<Canister = C> + EntityValue,
396    {
397        let plan = query.plan()?;
398
399        self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
400            .map_err(QueryError::Execute)
401    }
402
403    pub(crate) fn execute_load_query_paged_with_trace<E>(
404        &self,
405        query: &Query<E>,
406        cursor_token: Option<&str>,
407    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
408    where
409        E: EntityKind<Canister = C> + EntityValue,
410    {
411        let plan = query.plan()?;
412        let cursor_bytes = match cursor_token {
413            Some(token) => Some(decode_cursor(token).map_err(|reason| {
414                QueryError::from(PlanError::from(
415                    CursorPlanError::InvalidContinuationCursor { reason },
416                ))
417            })?),
418            None => None,
419        };
420        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
421
422        let (page, trace) = self
423            .with_metrics(|| {
424                self.load_executor::<E>()
425                    .execute_paged_with_cursor_traced(plan, cursor)
426            })
427            .map_err(QueryError::Execute)?;
428
429        Ok(PagedLoadExecutionWithTrace::new(
430            page.items,
431            page.next_cursor,
432            trace,
433        ))
434    }
435
436    // ---------------------------------------------------------------------
437    // High-level write API (public, intent-level)
438    // ---------------------------------------------------------------------
439
440    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
441    where
442        E: EntityKind<Canister = C> + EntityValue,
443    {
444        self.execute_save_entity(|save| save.insert(entity))
445    }
446
447    /// Insert a single-entity-type batch atomically in one commit window.
448    ///
449    /// If any item fails pre-commit validation, no row in the batch is persisted.
450    ///
451    /// This API is not a multi-entity transaction surface.
452    pub fn insert_many_atomic<E>(
453        &self,
454        entities: impl IntoIterator<Item = E>,
455    ) -> Result<WriteBatchResponse<E>, InternalError>
456    where
457        E: EntityKind<Canister = C> + EntityValue,
458    {
459        self.execute_save_batch(|save| save.insert_many_atomic(entities))
460    }
461
462    /// Insert a batch with explicitly non-atomic semantics.
463    ///
464    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
465    pub fn insert_many_non_atomic<E>(
466        &self,
467        entities: impl IntoIterator<Item = E>,
468    ) -> Result<WriteBatchResponse<E>, InternalError>
469    where
470        E: EntityKind<Canister = C> + EntityValue,
471    {
472        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
473    }
474
475    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
476    where
477        E: EntityKind<Canister = C> + EntityValue,
478    {
479        self.execute_save_entity(|save| save.replace(entity))
480    }
481
482    /// Replace a single-entity-type batch atomically in one commit window.
483    ///
484    /// If any item fails pre-commit validation, no row in the batch is persisted.
485    ///
486    /// This API is not a multi-entity transaction surface.
487    pub fn replace_many_atomic<E>(
488        &self,
489        entities: impl IntoIterator<Item = E>,
490    ) -> Result<WriteBatchResponse<E>, InternalError>
491    where
492        E: EntityKind<Canister = C> + EntityValue,
493    {
494        self.execute_save_batch(|save| save.replace_many_atomic(entities))
495    }
496
497    /// Replace a batch with explicitly non-atomic semantics.
498    ///
499    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
500    pub fn replace_many_non_atomic<E>(
501        &self,
502        entities: impl IntoIterator<Item = E>,
503    ) -> Result<WriteBatchResponse<E>, InternalError>
504    where
505        E: EntityKind<Canister = C> + EntityValue,
506    {
507        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
508    }
509
510    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
511    where
512        E: EntityKind<Canister = C> + EntityValue,
513    {
514        self.execute_save_entity(|save| save.update(entity))
515    }
516
517    /// Update a single-entity-type batch atomically in one commit window.
518    ///
519    /// If any item fails pre-commit validation, no row in the batch is persisted.
520    ///
521    /// This API is not a multi-entity transaction surface.
522    pub fn update_many_atomic<E>(
523        &self,
524        entities: impl IntoIterator<Item = E>,
525    ) -> Result<WriteBatchResponse<E>, InternalError>
526    where
527        E: EntityKind<Canister = C> + EntityValue,
528    {
529        self.execute_save_batch(|save| save.update_many_atomic(entities))
530    }
531
532    /// Update a batch with explicitly non-atomic semantics.
533    ///
534    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
535    pub fn update_many_non_atomic<E>(
536        &self,
537        entities: impl IntoIterator<Item = E>,
538    ) -> Result<WriteBatchResponse<E>, InternalError>
539    where
540        E: EntityKind<Canister = C> + EntityValue,
541    {
542        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
543    }
544
545    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
546    where
547        E: EntityKind<Canister = C> + EntityValue,
548    {
549        self.execute_save_view::<E>(|save| save.insert_view(view))
550    }
551
552    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
553    where
554        E: EntityKind<Canister = C> + EntityValue,
555    {
556        self.execute_save_view::<E>(|save| save.replace_view(view))
557    }
558
559    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
560    where
561        E: EntityKind<Canister = C> + EntityValue,
562    {
563        self.execute_save_view::<E>(|save| save.update_view(view))
564    }
565
566    /// TEST ONLY: clear all registered data and index stores for this database.
567    #[cfg(test)]
568    #[doc(hidden)]
569    pub fn clear_stores_for_tests(&self) {
570        self.db.with_store_registry(|reg| {
571            for (_, store) in reg.iter() {
572                store.with_data_mut(DataStore::clear);
573                store.with_index_mut(IndexStore::clear);
574            }
575        });
576    }
577}