Skip to main content

icydb_core/db/
session.rs

1// 3️⃣ Internal imports (implementation wiring)
2#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5    db::{
6        Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7        QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8        executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9        query::{intent::QueryMode, plan::CursorPlanError},
10    },
11    error::InternalError,
12    obs::sink::{MetricsSink, with_metrics_sink},
13    traits::{CanisterKind, EntityKind, EntityValue},
14    types::Id,
15};
16
17///
18/// DbSession
19///
20/// Session-scoped database handle with policy (debug, metrics) and execution routing.
21///
22
23pub struct DbSession<C: CanisterKind> {
24    db: Db<C>,
25    debug: bool,
26    metrics: Option<&'static dyn MetricsSink>,
27}
28
29impl<C: CanisterKind> DbSession<C> {
30    #[must_use]
31    pub const fn new(db: Db<C>) -> Self {
32        Self {
33            db,
34            debug: false,
35            metrics: None,
36        }
37    }
38
39    #[must_use]
40    pub const fn debug(mut self) -> Self {
41        self.debug = true;
42        self
43    }
44
45    #[must_use]
46    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
47        self.metrics = Some(sink);
48        self
49    }
50
51    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
52        if let Some(sink) = self.metrics {
53            with_metrics_sink(sink, f)
54        } else {
55            f()
56        }
57    }
58
59    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
60    fn execute_save_with<E, T, R>(
61        &self,
62        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
63        map: impl FnOnce(T) -> R,
64    ) -> Result<R, InternalError>
65    where
66        E: EntityKind<Canister = C> + EntityValue,
67    {
68        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
69
70        Ok(map(value))
71    }
72
73    // Shared save-facade wrappers keep response shape explicit at call sites.
74    fn execute_save_entity<E>(
75        &self,
76        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
77    ) -> Result<WriteResponse<E>, InternalError>
78    where
79        E: EntityKind<Canister = C> + EntityValue,
80    {
81        self.execute_save_with(op, WriteResponse::new)
82    }
83
84    fn execute_save_batch<E>(
85        &self,
86        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
87    ) -> Result<WriteBatchResponse<E>, InternalError>
88    where
89        E: EntityKind<Canister = C> + EntityValue,
90    {
91        self.execute_save_with(op, WriteBatchResponse::new)
92    }
93
94    fn execute_save_view<E>(
95        &self,
96        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
97    ) -> Result<E::ViewType, InternalError>
98    where
99        E: EntityKind<Canister = C> + EntityValue,
100    {
101        self.execute_save_with(op, std::convert::identity)
102    }
103
104    // ---------------------------------------------------------------------
105    // Query entry points (public, fluent)
106    // ---------------------------------------------------------------------
107
108    #[must_use]
109    pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
110    where
111        E: EntityKind<Canister = C>,
112    {
113        FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
114    }
115
116    #[must_use]
117    pub const fn load_with_consistency<E>(
118        &self,
119        consistency: ReadConsistency,
120    ) -> FluentLoadQuery<'_, E>
121    where
122        E: EntityKind<Canister = C>,
123    {
124        FluentLoadQuery::new(self, Query::new(consistency))
125    }
126
127    #[must_use]
128    pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
129    where
130        E: EntityKind<Canister = C>,
131    {
132        FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
133    }
134
135    #[must_use]
136    pub fn delete_with_consistency<E>(
137        &self,
138        consistency: ReadConsistency,
139    ) -> FluentDeleteQuery<'_, E>
140    where
141        E: EntityKind<Canister = C>,
142    {
143        FluentDeleteQuery::new(self, Query::new(consistency).delete())
144    }
145
146    // ---------------------------------------------------------------------
147    // Low-level executors (crate-internal; execution primitives)
148    // ---------------------------------------------------------------------
149
150    #[must_use]
151    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
152    where
153        E: EntityKind<Canister = C> + EntityValue,
154    {
155        LoadExecutor::new(self.db, self.debug)
156    }
157
158    #[must_use]
159    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
160    where
161        E: EntityKind<Canister = C> + EntityValue,
162    {
163        DeleteExecutor::new(self.db, self.debug)
164    }
165
166    #[must_use]
167    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
168    where
169        E: EntityKind<Canister = C> + EntityValue,
170    {
171        SaveExecutor::new(self.db, self.debug)
172    }
173
174    // ---------------------------------------------------------------------
175    // Query diagnostics / execution (internal routing)
176    // ---------------------------------------------------------------------
177
178    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
179    where
180        E: EntityKind<Canister = C> + EntityValue,
181    {
182        let plan = query.plan()?;
183
184        let result = match query.mode() {
185            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
186            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
187        };
188
189        result.map_err(QueryError::Execute)
190    }
191
192    pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
193    where
194        E: EntityKind<Canister = C> + EntityValue,
195    {
196        let plan = query.plan()?;
197
198        self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
199            .map_err(QueryError::Execute)
200    }
201
202    pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
203    where
204        E: EntityKind<Canister = C> + EntityValue,
205    {
206        let plan = query.plan()?;
207
208        self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
209            .map_err(QueryError::Execute)
210    }
211
212    pub(crate) fn execute_load_query_min<E>(
213        &self,
214        query: &Query<E>,
215    ) -> Result<Option<Id<E>>, QueryError>
216    where
217        E: EntityKind<Canister = C> + EntityValue,
218    {
219        let plan = query.plan()?;
220
221        self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
222            .map_err(QueryError::Execute)
223    }
224
225    pub(crate) fn execute_load_query_max<E>(
226        &self,
227        query: &Query<E>,
228    ) -> Result<Option<Id<E>>, QueryError>
229    where
230        E: EntityKind<Canister = C> + EntityValue,
231    {
232        let plan = query.plan()?;
233
234        self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
235            .map_err(QueryError::Execute)
236    }
237
238    pub(crate) fn execute_load_query_paged_with_trace<E>(
239        &self,
240        query: &Query<E>,
241        cursor_token: Option<&str>,
242    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
243    where
244        E: EntityKind<Canister = C> + EntityValue,
245    {
246        let plan = query.plan()?;
247        let cursor_bytes = match cursor_token {
248            Some(token) => Some(decode_cursor(token).map_err(|reason| {
249                QueryError::from(PlanError::from(
250                    CursorPlanError::InvalidContinuationCursor { reason },
251                ))
252            })?),
253            None => None,
254        };
255        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
256
257        let (page, trace) = self
258            .with_metrics(|| {
259                self.load_executor::<E>()
260                    .execute_paged_with_cursor_traced(plan, cursor)
261            })
262            .map_err(QueryError::Execute)?;
263
264        Ok((page.items, page.next_cursor, trace))
265    }
266
267    // ---------------------------------------------------------------------
268    // High-level write API (public, intent-level)
269    // ---------------------------------------------------------------------
270
271    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
272    where
273        E: EntityKind<Canister = C> + EntityValue,
274    {
275        self.execute_save_entity(|save| save.insert(entity))
276    }
277
278    /// Insert a single-entity-type batch atomically in one commit window.
279    ///
280    /// If any item fails pre-commit validation, no row in the batch is persisted.
281    ///
282    /// This API is not a multi-entity transaction surface.
283    pub fn insert_many_atomic<E>(
284        &self,
285        entities: impl IntoIterator<Item = E>,
286    ) -> Result<WriteBatchResponse<E>, InternalError>
287    where
288        E: EntityKind<Canister = C> + EntityValue,
289    {
290        self.execute_save_batch(|save| save.insert_many_atomic(entities))
291    }
292
293    /// Insert a batch with explicitly non-atomic semantics.
294    ///
295    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
296    pub fn insert_many_non_atomic<E>(
297        &self,
298        entities: impl IntoIterator<Item = E>,
299    ) -> Result<WriteBatchResponse<E>, InternalError>
300    where
301        E: EntityKind<Canister = C> + EntityValue,
302    {
303        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
304    }
305
306    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
307    where
308        E: EntityKind<Canister = C> + EntityValue,
309    {
310        self.execute_save_entity(|save| save.replace(entity))
311    }
312
313    /// Replace a single-entity-type batch atomically in one commit window.
314    ///
315    /// If any item fails pre-commit validation, no row in the batch is persisted.
316    ///
317    /// This API is not a multi-entity transaction surface.
318    pub fn replace_many_atomic<E>(
319        &self,
320        entities: impl IntoIterator<Item = E>,
321    ) -> Result<WriteBatchResponse<E>, InternalError>
322    where
323        E: EntityKind<Canister = C> + EntityValue,
324    {
325        self.execute_save_batch(|save| save.replace_many_atomic(entities))
326    }
327
328    /// Replace a batch with explicitly non-atomic semantics.
329    ///
330    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
331    pub fn replace_many_non_atomic<E>(
332        &self,
333        entities: impl IntoIterator<Item = E>,
334    ) -> Result<WriteBatchResponse<E>, InternalError>
335    where
336        E: EntityKind<Canister = C> + EntityValue,
337    {
338        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
339    }
340
341    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
342    where
343        E: EntityKind<Canister = C> + EntityValue,
344    {
345        self.execute_save_entity(|save| save.update(entity))
346    }
347
348    /// Update a single-entity-type batch atomically in one commit window.
349    ///
350    /// If any item fails pre-commit validation, no row in the batch is persisted.
351    ///
352    /// This API is not a multi-entity transaction surface.
353    pub fn update_many_atomic<E>(
354        &self,
355        entities: impl IntoIterator<Item = E>,
356    ) -> Result<WriteBatchResponse<E>, InternalError>
357    where
358        E: EntityKind<Canister = C> + EntityValue,
359    {
360        self.execute_save_batch(|save| save.update_many_atomic(entities))
361    }
362
363    /// Update a batch with explicitly non-atomic semantics.
364    ///
365    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
366    pub fn update_many_non_atomic<E>(
367        &self,
368        entities: impl IntoIterator<Item = E>,
369    ) -> Result<WriteBatchResponse<E>, InternalError>
370    where
371        E: EntityKind<Canister = C> + EntityValue,
372    {
373        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
374    }
375
376    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
377    where
378        E: EntityKind<Canister = C> + EntityValue,
379    {
380        self.execute_save_view::<E>(|save| save.insert_view(view))
381    }
382
383    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
384    where
385        E: EntityKind<Canister = C> + EntityValue,
386    {
387        self.execute_save_view::<E>(|save| save.replace_view(view))
388    }
389
390    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
391    where
392        E: EntityKind<Canister = C> + EntityValue,
393    {
394        self.execute_save_view::<E>(|save| save.update_view(view))
395    }
396
397    /// TEST ONLY: clear all registered data and index stores for this database.
398    #[cfg(test)]
399    #[doc(hidden)]
400    pub fn clear_stores_for_tests(&self) {
401        self.db.with_store_registry(|reg| {
402            for (_, store) in reg.iter() {
403                store.with_data_mut(DataStore::clear);
404                store.with_index_mut(IndexStore::clear);
405            }
406        });
407    }
408}