Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8
9pub(crate) use commit::*;
10
11use crate::{
12    db::{
13        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
14        index::IndexStoreRegistry,
15        query::{
16            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
17        },
18        response::{Response, WriteBatchResponse, WriteResponse},
19        store::DataStoreRegistry,
20    },
21    error::InternalError,
22    obs::sink::{self, MetricsSink},
23    traits::{CanisterKind, EntityKind, EntityValue},
24};
25use std::{marker::PhantomData, thread::LocalKey};
26
27#[cfg(test)]
28use crate::db::{index::IndexStore, store::DataStore};
29
30///
31/// Db
32///
33/// A handle to the set of stores registered for a specific canister domain.
34///
35pub struct Db<C: CanisterKind> {
36    data: &'static LocalKey<DataStoreRegistry>,
37    index: &'static LocalKey<IndexStoreRegistry>,
38    _marker: PhantomData<C>,
39}
40
41impl<C: CanisterKind> Db<C> {
42    #[must_use]
43    pub const fn new(
44        data: &'static LocalKey<DataStoreRegistry>,
45        index: &'static LocalKey<IndexStoreRegistry>,
46    ) -> Self {
47        Self {
48            data,
49            index,
50            _marker: PhantomData,
51        }
52    }
53
54    #[must_use]
55    pub(crate) const fn context<E>(&self) -> Context<'_, E>
56    where
57        E: EntityKind<Canister = C> + EntityValue,
58    {
59        Context::new(self)
60    }
61
62    /// Return a recovery-guarded context for read paths (startup recovery only).
63    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
64    where
65        E: EntityKind<Canister = C> + EntityValue,
66    {
67        ensure_recovered(self)?;
68
69        Ok(Context::new(self))
70    }
71
72    /// TEST ONLY: Mutate a data store directly, bypassing atomicity and executors.
73    ///
74    /// This is intended for corruption injection and diagnostic testing only.
75    #[cfg(test)]
76    pub fn with_data_store_mut_for_test<R>(
77        &self,
78        path: &'static str,
79        f: impl FnOnce(&mut DataStore) -> R,
80    ) -> Result<R, InternalError> {
81        self.with_data(|reg| reg.with_store_mut(path, f))
82    }
83
84    pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
85        self.data.with(|reg| f(reg))
86    }
87
88    pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
89        self.index.with(|reg| f(reg))
90    }
91}
92
93impl<C: CanisterKind> Copy for Db<C> {}
94
95impl<C: CanisterKind> Clone for Db<C> {
96    fn clone(&self) -> Self {
97        *self
98    }
99}
100
101///
102/// DbSession
103///
104/// Session-scoped database handle with policy (debug, metrics) and execution routing.
105///
106pub struct DbSession<C: CanisterKind> {
107    db: Db<C>,
108    debug: bool,
109    metrics: Option<&'static dyn MetricsSink>,
110}
111
112impl<C: CanisterKind> DbSession<C> {
113    #[must_use]
114    pub const fn new(db: Db<C>) -> Self {
115        Self {
116            db,
117            debug: false,
118            metrics: None,
119        }
120    }
121
122    #[must_use]
123    pub const fn debug(mut self) -> Self {
124        self.debug = true;
125        self
126    }
127
128    #[must_use]
129    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
130        self.metrics = Some(sink);
131        self
132    }
133
134    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
135        if let Some(sink) = self.metrics {
136            sink::with_metrics_sink(sink, f)
137        } else {
138            f()
139        }
140    }
141
142    // ---------------------------------------------------------------------
143    // Query entry points (public, fluent)
144    // ---------------------------------------------------------------------
145
146    #[must_use]
147    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
148    where
149        E: EntityKind<Canister = C>,
150    {
151        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
152    }
153
154    #[must_use]
155    pub const fn load_with_consistency<E>(
156        &self,
157        consistency: ReadConsistency,
158    ) -> SessionLoadQuery<'_, C, E>
159    where
160        E: EntityKind<Canister = C>,
161    {
162        SessionLoadQuery::new(self, Query::new(consistency))
163    }
164
165    #[must_use]
166    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
167    where
168        E: EntityKind<Canister = C>,
169    {
170        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
171    }
172
173    #[must_use]
174    pub fn delete_with_consistency<E>(
175        &self,
176        consistency: ReadConsistency,
177    ) -> SessionDeleteQuery<'_, C, E>
178    where
179        E: EntityKind<Canister = C>,
180    {
181        SessionDeleteQuery::new(self, Query::new(consistency).delete())
182    }
183
184    // ---------------------------------------------------------------------
185    // Low-level executors (crate-internal; execution primitives)
186    // ---------------------------------------------------------------------
187
188    #[must_use]
189    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
190    where
191        E: EntityKind<Canister = C> + EntityValue,
192    {
193        LoadExecutor::new(self.db, self.debug)
194    }
195
196    #[must_use]
197    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
198    where
199        E: EntityKind<Canister = C> + EntityValue,
200    {
201        DeleteExecutor::new(self.db, self.debug)
202    }
203
204    #[must_use]
205    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
206    where
207        E: EntityKind<Canister = C> + EntityValue,
208    {
209        SaveExecutor::new(self.db, self.debug)
210    }
211
212    // ---------------------------------------------------------------------
213    // Query diagnostics / execution (internal routing)
214    // ---------------------------------------------------------------------
215
216    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
217    where
218        E: EntityKind<Canister = C> + EntityValue,
219    {
220        let plan = query.plan()?;
221
222        let result = match query.mode() {
223            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
224            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
225        };
226
227        result.map_err(QueryError::Execute)
228    }
229
230    // ---------------------------------------------------------------------
231    // High-level write API (public, intent-level)
232    // ---------------------------------------------------------------------
233
234    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
235    where
236        E: EntityKind<Canister = C> + EntityValue,
237    {
238        self.with_metrics(|| self.save_executor::<E>().insert(entity))
239            .map(WriteResponse::new)
240    }
241
242    /// Insert a batch with explicitly non-atomic semantics.
243    ///
244    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
245    pub fn insert_many_non_atomic<E>(
246        &self,
247        entities: impl IntoIterator<Item = E>,
248    ) -> Result<WriteBatchResponse<E>, InternalError>
249    where
250        E: EntityKind<Canister = C> + EntityValue,
251    {
252        let entities =
253            self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
254
255        Ok(WriteBatchResponse::new(entities))
256    }
257
258    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
259    where
260        E: EntityKind<Canister = C> + EntityValue,
261    {
262        self.with_metrics(|| self.save_executor::<E>().replace(entity))
263            .map(WriteResponse::new)
264    }
265
266    /// Replace a batch with explicitly non-atomic semantics.
267    ///
268    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
269    pub fn replace_many_non_atomic<E>(
270        &self,
271        entities: impl IntoIterator<Item = E>,
272    ) -> Result<WriteBatchResponse<E>, InternalError>
273    where
274        E: EntityKind<Canister = C> + EntityValue,
275    {
276        let entities =
277            self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
278
279        Ok(WriteBatchResponse::new(entities))
280    }
281
282    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
283    where
284        E: EntityKind<Canister = C> + EntityValue,
285    {
286        self.with_metrics(|| self.save_executor::<E>().update(entity))
287            .map(WriteResponse::new)
288    }
289
290    /// Update a batch with explicitly non-atomic semantics.
291    ///
292    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
293    pub fn update_many_non_atomic<E>(
294        &self,
295        entities: impl IntoIterator<Item = E>,
296    ) -> Result<WriteBatchResponse<E>, InternalError>
297    where
298        E: EntityKind<Canister = C> + EntityValue,
299    {
300        let entities =
301            self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
302
303        Ok(WriteBatchResponse::new(entities))
304    }
305
306    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
307    where
308        E: EntityKind<Canister = C> + EntityValue,
309    {
310        self.with_metrics(|| self.save_executor::<E>().insert_view(view))
311    }
312
313    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
314    where
315        E: EntityKind<Canister = C> + EntityValue,
316    {
317        self.with_metrics(|| self.save_executor::<E>().replace_view(view))
318    }
319
320    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
321    where
322        E: EntityKind<Canister = C> + EntityValue,
323    {
324        self.with_metrics(|| self.save_executor::<E>().update_view(view))
325    }
326
327    /// TEST ONLY: clear all registered data and index stores for this database.
328    #[cfg(test)]
329    #[doc(hidden)]
330    pub fn clear_stores_for_tests(&self) {
331        // Data stores.
332        self.db.with_data(|reg| {
333            for (path, _) in reg.iter() {
334                let _ = reg.with_store_mut(path, DataStore::clear);
335            }
336        });
337
338        // Index stores.
339        self.db.with_index(|reg| {
340            for (path, _) in reg.iter() {
341                let _ = reg.with_store_mut(path, IndexStore::clear);
342            }
343        });
344    }
345}