Skip to main content

icydb_core/db/
session.rs

1// 3️⃣ Internal imports (implementation wiring)
2#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5    db::{
6        Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7        QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse,
8        cursor::CursorPlanError,
9        decode_cursor,
10        executor::{DeleteExecutor, ExecutablePlan, ExecutorPlanError, LoadExecutor, SaveExecutor},
11        query::intent::QueryMode,
12    },
13    error::InternalError,
14    obs::sink::{MetricsSink, with_metrics_sink},
15    traits::{CanisterKind, EntityKind, EntityValue},
16    types::{Decimal, Id},
17    value::Value,
18};
19
20// Map executor-owned plan-surface failures into query-owned plan errors.
21fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
22    QueryError::from(err.into_plan_error())
23}
24
25///
26/// DbSession
27///
28/// Session-scoped database handle with policy (debug, metrics) and execution routing.
29///
30
31pub struct DbSession<C: CanisterKind> {
32    db: Db<C>,
33    debug: bool,
34    metrics: Option<&'static dyn MetricsSink>,
35}
36
37impl<C: CanisterKind> DbSession<C> {
38    #[must_use]
39    pub const fn new(db: Db<C>) -> Self {
40        Self {
41            db,
42            debug: false,
43            metrics: None,
44        }
45    }
46
47    #[must_use]
48    pub const fn debug(mut self) -> Self {
49        self.debug = true;
50        self
51    }
52
53    #[must_use]
54    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
55        self.metrics = Some(sink);
56        self
57    }
58
59    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
60        if let Some(sink) = self.metrics {
61            with_metrics_sink(sink, f)
62        } else {
63            f()
64        }
65    }
66
67    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
68    fn execute_save_with<E, T, R>(
69        &self,
70        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
71        map: impl FnOnce(T) -> R,
72    ) -> Result<R, InternalError>
73    where
74        E: EntityKind<Canister = C> + EntityValue,
75    {
76        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
77
78        Ok(map(value))
79    }
80
81    // Shared save-facade wrappers keep response shape explicit at call sites.
82    fn execute_save_entity<E>(
83        &self,
84        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
85    ) -> Result<WriteResponse<E>, InternalError>
86    where
87        E: EntityKind<Canister = C> + EntityValue,
88    {
89        self.execute_save_with(op, WriteResponse::new)
90    }
91
92    fn execute_save_batch<E>(
93        &self,
94        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
95    ) -> Result<WriteBatchResponse<E>, InternalError>
96    where
97        E: EntityKind<Canister = C> + EntityValue,
98    {
99        self.execute_save_with(op, WriteBatchResponse::new)
100    }
101
102    fn execute_save_view<E>(
103        &self,
104        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
105    ) -> Result<E::ViewType, InternalError>
106    where
107        E: EntityKind<Canister = C> + EntityValue,
108    {
109        self.execute_save_with(op, std::convert::identity)
110    }
111
112    // ---------------------------------------------------------------------
113    // Query entry points (public, fluent)
114    // ---------------------------------------------------------------------
115
116    #[must_use]
117    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
118    where
119        E: EntityKind<Canister = C>,
120    {
121        FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
122    }
123
124    #[must_use]
125    pub const fn load_with_consistency<E>(
126        &self,
127        consistency: ReadConsistency,
128    ) -> FluentLoadQuery<'_, E>
129    where
130        E: EntityKind<Canister = C>,
131    {
132        FluentLoadQuery::new(self, Query::new(consistency))
133    }
134
135    #[must_use]
136    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
137    where
138        E: EntityKind<Canister = C>,
139    {
140        FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
141    }
142
143    #[must_use]
144    pub fn delete_with_consistency<E>(
145        &self,
146        consistency: ReadConsistency,
147    ) -> FluentDeleteQuery<'_, E>
148    where
149        E: EntityKind<Canister = C>,
150    {
151        FluentDeleteQuery::new(self, Query::new(consistency).delete())
152    }
153
154    // ---------------------------------------------------------------------
155    // Low-level executors (crate-internal; execution primitives)
156    // ---------------------------------------------------------------------
157
158    #[must_use]
159    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
160    where
161        E: EntityKind<Canister = C> + EntityValue,
162    {
163        LoadExecutor::new(self.db, self.debug)
164    }
165
166    #[must_use]
167    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
168    where
169        E: EntityKind<Canister = C> + EntityValue,
170    {
171        DeleteExecutor::new(self.db, self.debug)
172    }
173
174    #[must_use]
175    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
176    where
177        E: EntityKind<Canister = C> + EntityValue,
178    {
179        SaveExecutor::new(self.db, self.debug)
180    }
181
182    // ---------------------------------------------------------------------
183    // Query diagnostics / execution (internal routing)
184    // ---------------------------------------------------------------------
185
186    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
187    where
188        E: EntityKind<Canister = C> + EntityValue,
189    {
190        let plan = query.plan()?;
191
192        let result = match query.mode() {
193            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
194            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
195        };
196
197        result.map_err(QueryError::Execute)
198    }
199
200    // Shared load-query terminal wrapper: build plan, run under metrics, map
201    // execution errors into query-facing errors.
202    fn execute_load_query_with<E, T>(
203        &self,
204        query: &Query<E>,
205        op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
206    ) -> Result<T, QueryError>
207    where
208        E: EntityKind<Canister = C> + EntityValue,
209    {
210        let plan = query.plan()?;
211
212        self.with_metrics(|| op(self.load_executor::<E>(), plan))
213            .map_err(QueryError::Execute)
214    }
215
216    pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
217    where
218        E: EntityKind<Canister = C> + EntityValue,
219    {
220        self.execute_load_query_with(query, |load, plan| load.aggregate_count(plan))
221    }
222
223    pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
224    where
225        E: EntityKind<Canister = C> + EntityValue,
226    {
227        self.execute_load_query_with(query, |load, plan| load.aggregate_exists(plan))
228    }
229
230    pub(crate) fn execute_load_query_min<E>(
231        &self,
232        query: &Query<E>,
233    ) -> Result<Option<Id<E>>, QueryError>
234    where
235        E: EntityKind<Canister = C> + EntityValue,
236    {
237        self.execute_load_query_with(query, |load, plan| load.aggregate_min(plan))
238    }
239
240    pub(crate) fn execute_load_query_max<E>(
241        &self,
242        query: &Query<E>,
243    ) -> Result<Option<Id<E>>, QueryError>
244    where
245        E: EntityKind<Canister = C> + EntityValue,
246    {
247        self.execute_load_query_with(query, |load, plan| load.aggregate_max(plan))
248    }
249
250    pub(crate) fn execute_load_query_min_by<E>(
251        &self,
252        query: &Query<E>,
253        target_field: &str,
254    ) -> Result<Option<Id<E>>, QueryError>
255    where
256        E: EntityKind<Canister = C> + EntityValue,
257    {
258        self.execute_load_query_with(query, |load, plan| {
259            load.aggregate_min_by(plan, target_field)
260        })
261    }
262
263    pub(crate) fn execute_load_query_max_by<E>(
264        &self,
265        query: &Query<E>,
266        target_field: &str,
267    ) -> Result<Option<Id<E>>, QueryError>
268    where
269        E: EntityKind<Canister = C> + EntityValue,
270    {
271        self.execute_load_query_with(query, |load, plan| {
272            load.aggregate_max_by(plan, target_field)
273        })
274    }
275
276    pub(crate) fn execute_load_query_nth_by<E>(
277        &self,
278        query: &Query<E>,
279        target_field: &str,
280        nth: usize,
281    ) -> Result<Option<Id<E>>, QueryError>
282    where
283        E: EntityKind<Canister = C> + EntityValue,
284    {
285        self.execute_load_query_with(query, |load, plan| {
286            load.aggregate_nth_by(plan, target_field, nth)
287        })
288    }
289
290    pub(crate) fn execute_load_query_sum_by<E>(
291        &self,
292        query: &Query<E>,
293        target_field: &str,
294    ) -> Result<Option<Decimal>, QueryError>
295    where
296        E: EntityKind<Canister = C> + EntityValue,
297    {
298        self.execute_load_query_with(query, |load, plan| {
299            load.aggregate_sum_by(plan, target_field)
300        })
301    }
302
303    pub(crate) fn execute_load_query_avg_by<E>(
304        &self,
305        query: &Query<E>,
306        target_field: &str,
307    ) -> Result<Option<Decimal>, QueryError>
308    where
309        E: EntityKind<Canister = C> + EntityValue,
310    {
311        self.execute_load_query_with(query, |load, plan| {
312            load.aggregate_avg_by(plan, target_field)
313        })
314    }
315
316    pub(crate) fn execute_load_query_median_by<E>(
317        &self,
318        query: &Query<E>,
319        target_field: &str,
320    ) -> Result<Option<Id<E>>, QueryError>
321    where
322        E: EntityKind<Canister = C> + EntityValue,
323    {
324        self.execute_load_query_with(query, |load, plan| {
325            load.aggregate_median_by(plan, target_field)
326        })
327    }
328
329    pub(crate) fn execute_load_query_count_distinct_by<E>(
330        &self,
331        query: &Query<E>,
332        target_field: &str,
333    ) -> Result<u32, QueryError>
334    where
335        E: EntityKind<Canister = C> + EntityValue,
336    {
337        self.execute_load_query_with(query, |load, plan| {
338            load.aggregate_count_distinct_by(plan, target_field)
339        })
340    }
341
342    #[expect(clippy::type_complexity)]
343    pub(crate) fn execute_load_query_min_max_by<E>(
344        &self,
345        query: &Query<E>,
346        target_field: &str,
347    ) -> Result<Option<(Id<E>, Id<E>)>, QueryError>
348    where
349        E: EntityKind<Canister = C> + EntityValue,
350    {
351        self.execute_load_query_with(query, |load, plan| {
352            load.aggregate_min_max_by(plan, target_field)
353        })
354    }
355
356    pub(crate) fn execute_load_query_values_by<E>(
357        &self,
358        query: &Query<E>,
359        target_field: &str,
360    ) -> Result<Vec<Value>, QueryError>
361    where
362        E: EntityKind<Canister = C> + EntityValue,
363    {
364        self.execute_load_query_with(query, |load, plan| load.values_by(plan, target_field))
365    }
366
367    pub(crate) fn execute_load_query_take<E>(
368        &self,
369        query: &Query<E>,
370        take_count: u32,
371    ) -> Result<Response<E>, QueryError>
372    where
373        E: EntityKind<Canister = C> + EntityValue,
374    {
375        self.execute_load_query_with(query, |load, plan| load.take(plan, take_count))
376    }
377
378    pub(crate) fn execute_load_query_top_k_by<E>(
379        &self,
380        query: &Query<E>,
381        target_field: &str,
382        take_count: u32,
383    ) -> Result<Response<E>, QueryError>
384    where
385        E: EntityKind<Canister = C> + EntityValue,
386    {
387        self.execute_load_query_with(query, |load, plan| {
388            load.top_k_by(plan, target_field, take_count)
389        })
390    }
391
392    pub(crate) fn execute_load_query_bottom_k_by<E>(
393        &self,
394        query: &Query<E>,
395        target_field: &str,
396        take_count: u32,
397    ) -> Result<Response<E>, QueryError>
398    where
399        E: EntityKind<Canister = C> + EntityValue,
400    {
401        self.execute_load_query_with(query, |load, plan| {
402            load.bottom_k_by(plan, target_field, take_count)
403        })
404    }
405
406    pub(crate) fn execute_load_query_top_k_by_values<E>(
407        &self,
408        query: &Query<E>,
409        target_field: &str,
410        take_count: u32,
411    ) -> Result<Vec<Value>, QueryError>
412    where
413        E: EntityKind<Canister = C> + EntityValue,
414    {
415        self.execute_load_query_with(query, |load, plan| {
416            load.top_k_by_values(plan, target_field, take_count)
417        })
418    }
419
420    pub(crate) fn execute_load_query_bottom_k_by_values<E>(
421        &self,
422        query: &Query<E>,
423        target_field: &str,
424        take_count: u32,
425    ) -> Result<Vec<Value>, QueryError>
426    where
427        E: EntityKind<Canister = C> + EntityValue,
428    {
429        self.execute_load_query_with(query, |load, plan| {
430            load.bottom_k_by_values(plan, target_field, take_count)
431        })
432    }
433
434    pub(crate) fn execute_load_query_top_k_by_with_ids<E>(
435        &self,
436        query: &Query<E>,
437        target_field: &str,
438        take_count: u32,
439    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
440    where
441        E: EntityKind<Canister = C> + EntityValue,
442    {
443        self.execute_load_query_with(query, |load, plan| {
444            load.top_k_by_with_ids(plan, target_field, take_count)
445        })
446    }
447
448    pub(crate) fn execute_load_query_bottom_k_by_with_ids<E>(
449        &self,
450        query: &Query<E>,
451        target_field: &str,
452        take_count: u32,
453    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
454    where
455        E: EntityKind<Canister = C> + EntityValue,
456    {
457        self.execute_load_query_with(query, |load, plan| {
458            load.bottom_k_by_with_ids(plan, target_field, take_count)
459        })
460    }
461
462    pub(crate) fn execute_load_query_distinct_values_by<E>(
463        &self,
464        query: &Query<E>,
465        target_field: &str,
466    ) -> Result<Vec<Value>, QueryError>
467    where
468        E: EntityKind<Canister = C> + EntityValue,
469    {
470        self.execute_load_query_with(query, |load, plan| {
471            load.distinct_values_by(plan, target_field)
472        })
473    }
474
475    pub(crate) fn execute_load_query_values_by_with_ids<E>(
476        &self,
477        query: &Query<E>,
478        target_field: &str,
479    ) -> Result<Vec<(Id<E>, Value)>, QueryError>
480    where
481        E: EntityKind<Canister = C> + EntityValue,
482    {
483        self.execute_load_query_with(query, |load, plan| {
484            load.values_by_with_ids(plan, target_field)
485        })
486    }
487
488    pub(crate) fn execute_load_query_first_value_by<E>(
489        &self,
490        query: &Query<E>,
491        target_field: &str,
492    ) -> Result<Option<Value>, QueryError>
493    where
494        E: EntityKind<Canister = C> + EntityValue,
495    {
496        self.execute_load_query_with(query, |load, plan| load.first_value_by(plan, target_field))
497    }
498
499    pub(crate) fn execute_load_query_last_value_by<E>(
500        &self,
501        query: &Query<E>,
502        target_field: &str,
503    ) -> Result<Option<Value>, QueryError>
504    where
505        E: EntityKind<Canister = C> + EntityValue,
506    {
507        self.execute_load_query_with(query, |load, plan| load.last_value_by(plan, target_field))
508    }
509
510    pub(crate) fn execute_load_query_first<E>(
511        &self,
512        query: &Query<E>,
513    ) -> Result<Option<Id<E>>, QueryError>
514    where
515        E: EntityKind<Canister = C> + EntityValue,
516    {
517        self.execute_load_query_with(query, |load, plan| load.aggregate_first(plan))
518    }
519
520    pub(crate) fn execute_load_query_last<E>(
521        &self,
522        query: &Query<E>,
523    ) -> Result<Option<Id<E>>, QueryError>
524    where
525        E: EntityKind<Canister = C> + EntityValue,
526    {
527        self.execute_load_query_with(query, |load, plan| load.aggregate_last(plan))
528    }
529
530    pub(crate) fn execute_load_query_paged_with_trace<E>(
531        &self,
532        query: &Query<E>,
533        cursor_token: Option<&str>,
534    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
535    where
536        E: EntityKind<Canister = C> + EntityValue,
537    {
538        let plan = query.plan()?;
539        let cursor_bytes = match cursor_token {
540            Some(token) => Some(decode_cursor(token).map_err(|reason| {
541                QueryError::from(PlanError::from(
542                    CursorPlanError::InvalidContinuationCursor { reason },
543                ))
544            })?),
545            None => None,
546        };
547        let cursor = plan
548            .prepare_cursor(cursor_bytes.as_deref())
549            .map_err(map_executor_plan_error)?;
550
551        let (page, trace) = self
552            .with_metrics(|| {
553                self.load_executor::<E>()
554                    .execute_paged_with_cursor_traced(plan, cursor)
555            })
556            .map_err(QueryError::Execute)?;
557        let next_cursor = page
558            .next_cursor
559            .map(|token| {
560                token.encode().map_err(|err| {
561                    QueryError::Execute(InternalError::serialize_internal(format!(
562                        "failed to serialize continuation cursor: {err}"
563                    )))
564                })
565            })
566            .transpose()?;
567
568        Ok(PagedLoadExecutionWithTrace::new(
569            page.items,
570            next_cursor,
571            trace,
572        ))
573    }
574
575    // ---------------------------------------------------------------------
576    // High-level write API (public, intent-level)
577    // ---------------------------------------------------------------------
578
579    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
580    where
581        E: EntityKind<Canister = C> + EntityValue,
582    {
583        self.execute_save_entity(|save| save.insert(entity))
584    }
585
586    /// Insert a single-entity-type batch atomically in one commit window.
587    ///
588    /// If any item fails pre-commit validation, no row in the batch is persisted.
589    ///
590    /// This API is not a multi-entity transaction surface.
591    pub fn insert_many_atomic<E>(
592        &self,
593        entities: impl IntoIterator<Item = E>,
594    ) -> Result<WriteBatchResponse<E>, InternalError>
595    where
596        E: EntityKind<Canister = C> + EntityValue,
597    {
598        self.execute_save_batch(|save| save.insert_many_atomic(entities))
599    }
600
601    /// Insert a batch with explicitly non-atomic semantics.
602    ///
603    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
604    pub fn insert_many_non_atomic<E>(
605        &self,
606        entities: impl IntoIterator<Item = E>,
607    ) -> Result<WriteBatchResponse<E>, InternalError>
608    where
609        E: EntityKind<Canister = C> + EntityValue,
610    {
611        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
612    }
613
614    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
615    where
616        E: EntityKind<Canister = C> + EntityValue,
617    {
618        self.execute_save_entity(|save| save.replace(entity))
619    }
620
621    /// Replace a single-entity-type batch atomically in one commit window.
622    ///
623    /// If any item fails pre-commit validation, no row in the batch is persisted.
624    ///
625    /// This API is not a multi-entity transaction surface.
626    pub fn replace_many_atomic<E>(
627        &self,
628        entities: impl IntoIterator<Item = E>,
629    ) -> Result<WriteBatchResponse<E>, InternalError>
630    where
631        E: EntityKind<Canister = C> + EntityValue,
632    {
633        self.execute_save_batch(|save| save.replace_many_atomic(entities))
634    }
635
636    /// Replace a batch with explicitly non-atomic semantics.
637    ///
638    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
639    pub fn replace_many_non_atomic<E>(
640        &self,
641        entities: impl IntoIterator<Item = E>,
642    ) -> Result<WriteBatchResponse<E>, InternalError>
643    where
644        E: EntityKind<Canister = C> + EntityValue,
645    {
646        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
647    }
648
649    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
650    where
651        E: EntityKind<Canister = C> + EntityValue,
652    {
653        self.execute_save_entity(|save| save.update(entity))
654    }
655
656    /// Update a single-entity-type batch atomically in one commit window.
657    ///
658    /// If any item fails pre-commit validation, no row in the batch is persisted.
659    ///
660    /// This API is not a multi-entity transaction surface.
661    pub fn update_many_atomic<E>(
662        &self,
663        entities: impl IntoIterator<Item = E>,
664    ) -> Result<WriteBatchResponse<E>, InternalError>
665    where
666        E: EntityKind<Canister = C> + EntityValue,
667    {
668        self.execute_save_batch(|save| save.update_many_atomic(entities))
669    }
670
671    /// Update a batch with explicitly non-atomic semantics.
672    ///
673    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
674    pub fn update_many_non_atomic<E>(
675        &self,
676        entities: impl IntoIterator<Item = E>,
677    ) -> Result<WriteBatchResponse<E>, InternalError>
678    where
679        E: EntityKind<Canister = C> + EntityValue,
680    {
681        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
682    }
683
684    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
685    where
686        E: EntityKind<Canister = C> + EntityValue,
687    {
688        self.execute_save_view::<E>(|save| save.insert_view(view))
689    }
690
691    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
692    where
693        E: EntityKind<Canister = C> + EntityValue,
694    {
695        self.execute_save_view::<E>(|save| save.replace_view(view))
696    }
697
698    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
699    where
700        E: EntityKind<Canister = C> + EntityValue,
701    {
702        self.execute_save_view::<E>(|save| save.update_view(view))
703    }
704
705    /// TEST ONLY: clear all registered data and index stores for this database.
706    #[cfg(test)]
707    #[doc(hidden)]
708    pub fn clear_stores_for_tests(&self) {
709        self.db.with_store_registry(|reg| {
710            // Test cleanup only: clearing all stores is set-like and does not
711            // depend on registry iteration order.
712            for (_, store) in reg.iter() {
713                store.with_data_mut(DataStore::clear);
714                store.with_index_mut(IndexStore::clear);
715            }
716        });
717    }
718}