Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8
9pub(crate) use commit::*;
10
11use crate::{
12    db::{
13        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
14        index::IndexStoreRegistry,
15        query::{
16            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
17        },
18        response::{Response, WriteBatchResponse, WriteResponse},
19        store::DataStoreRegistry,
20    },
21    error::InternalError,
22    obs::sink::{self, MetricsSink},
23    traits::{CanisterKind, EntityKind, EntityValue},
24};
25use std::{marker::PhantomData, thread::LocalKey};
26
27#[cfg(test)]
28use crate::db::{index::IndexStore, store::DataStore};
29
30///
31/// Db
32///
33/// A handle to the set of stores registered for a specific canister domain.
34///
35pub struct Db<C: CanisterKind> {
36    data: &'static LocalKey<DataStoreRegistry>,
37    index: &'static LocalKey<IndexStoreRegistry>,
38    _marker: PhantomData<C>,
39}
40
41impl<C: CanisterKind> Db<C> {
42    #[must_use]
43    pub const fn new(
44        data: &'static LocalKey<DataStoreRegistry>,
45        index: &'static LocalKey<IndexStoreRegistry>,
46    ) -> Self {
47        Self {
48            data,
49            index,
50            _marker: PhantomData,
51        }
52    }
53
54    #[must_use]
55    pub(crate) const fn context<E>(&self) -> Context<'_, E>
56    where
57        E: EntityKind<Canister = C> + EntityValue,
58    {
59        Context::new(self)
60    }
61
62    /// Return a recovery-guarded context for read paths.
63    ///
64    /// This enforces startup recovery and a fast persisted-marker check so reads
65    /// do not proceed while an incomplete commit is pending replay.
66    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
67    where
68        E: EntityKind<Canister = C> + EntityValue,
69    {
70        ensure_recovered(self)?;
71
72        Ok(Context::new(self))
73    }
74
75    /// TEST ONLY: Mutate a data store directly, bypassing atomicity and executors.
76    ///
77    /// This is intended for corruption injection and diagnostic testing only.
78    #[cfg(test)]
79    pub fn with_data_store_mut_for_test<R>(
80        &self,
81        path: &'static str,
82        f: impl FnOnce(&mut DataStore) -> R,
83    ) -> Result<R, InternalError> {
84        self.with_data(|reg| reg.with_store_mut(path, f))
85    }
86
87    pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
88        self.data.with(|reg| f(reg))
89    }
90
91    pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
92        self.index.with(|reg| f(reg))
93    }
94}
95
96impl<C: CanisterKind> Copy for Db<C> {}
97
98impl<C: CanisterKind> Clone for Db<C> {
99    fn clone(&self) -> Self {
100        *self
101    }
102}
103
104///
105/// DbSession
106///
107/// Session-scoped database handle with policy (debug, metrics) and execution routing.
108///
109pub struct DbSession<C: CanisterKind> {
110    db: Db<C>,
111    debug: bool,
112    metrics: Option<&'static dyn MetricsSink>,
113}
114
115impl<C: CanisterKind> DbSession<C> {
116    #[must_use]
117    pub const fn new(db: Db<C>) -> Self {
118        Self {
119            db,
120            debug: false,
121            metrics: None,
122        }
123    }
124
125    #[must_use]
126    pub const fn debug(mut self) -> Self {
127        self.debug = true;
128        self
129    }
130
131    #[must_use]
132    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
133        self.metrics = Some(sink);
134        self
135    }
136
137    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
138        if let Some(sink) = self.metrics {
139            sink::with_metrics_sink(sink, f)
140        } else {
141            f()
142        }
143    }
144
145    // ---------------------------------------------------------------------
146    // Query entry points (public, fluent)
147    // ---------------------------------------------------------------------
148
149    #[must_use]
150    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
151    where
152        E: EntityKind<Canister = C>,
153    {
154        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
155    }
156
157    #[must_use]
158    pub const fn load_with_consistency<E>(
159        &self,
160        consistency: ReadConsistency,
161    ) -> SessionLoadQuery<'_, C, E>
162    where
163        E: EntityKind<Canister = C>,
164    {
165        SessionLoadQuery::new(self, Query::new(consistency))
166    }
167
168    #[must_use]
169    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
170    where
171        E: EntityKind<Canister = C>,
172    {
173        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
174    }
175
176    #[must_use]
177    pub fn delete_with_consistency<E>(
178        &self,
179        consistency: ReadConsistency,
180    ) -> SessionDeleteQuery<'_, C, E>
181    where
182        E: EntityKind<Canister = C>,
183    {
184        SessionDeleteQuery::new(self, Query::new(consistency).delete())
185    }
186
187    // ---------------------------------------------------------------------
188    // Low-level executors (crate-internal; execution primitives)
189    // ---------------------------------------------------------------------
190
191    #[must_use]
192    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
193    where
194        E: EntityKind<Canister = C> + EntityValue,
195    {
196        LoadExecutor::new(self.db, self.debug)
197    }
198
199    #[must_use]
200    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
201    where
202        E: EntityKind<Canister = C> + EntityValue,
203    {
204        DeleteExecutor::new(self.db, self.debug)
205    }
206
207    #[must_use]
208    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
209    where
210        E: EntityKind<Canister = C> + EntityValue,
211    {
212        SaveExecutor::new(self.db, self.debug)
213    }
214
215    // ---------------------------------------------------------------------
216    // Query diagnostics / execution (internal routing)
217    // ---------------------------------------------------------------------
218
219    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
220    where
221        E: EntityKind<Canister = C> + EntityValue,
222    {
223        let plan = query.plan()?;
224
225        let result = match query.mode() {
226            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
227            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
228        };
229
230        result.map_err(QueryError::Execute)
231    }
232
233    // ---------------------------------------------------------------------
234    // High-level write API (public, intent-level)
235    // ---------------------------------------------------------------------
236
237    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
238    where
239        E: EntityKind<Canister = C> + EntityValue,
240    {
241        self.with_metrics(|| self.save_executor::<E>().insert(entity))
242            .map(WriteResponse::new)
243    }
244
245    /// Insert a batch with explicitly non-atomic semantics.
246    ///
247    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
248    pub fn insert_many_non_atomic<E>(
249        &self,
250        entities: impl IntoIterator<Item = E>,
251    ) -> Result<WriteBatchResponse<E>, InternalError>
252    where
253        E: EntityKind<Canister = C> + EntityValue,
254    {
255        let entities =
256            self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
257
258        Ok(WriteBatchResponse::new(entities))
259    }
260
261    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
262    where
263        E: EntityKind<Canister = C> + EntityValue,
264    {
265        self.with_metrics(|| self.save_executor::<E>().replace(entity))
266            .map(WriteResponse::new)
267    }
268
269    /// Replace a batch with explicitly non-atomic semantics.
270    ///
271    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
272    pub fn replace_many_non_atomic<E>(
273        &self,
274        entities: impl IntoIterator<Item = E>,
275    ) -> Result<WriteBatchResponse<E>, InternalError>
276    where
277        E: EntityKind<Canister = C> + EntityValue,
278    {
279        let entities =
280            self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
281
282        Ok(WriteBatchResponse::new(entities))
283    }
284
285    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
286    where
287        E: EntityKind<Canister = C> + EntityValue,
288    {
289        self.with_metrics(|| self.save_executor::<E>().update(entity))
290            .map(WriteResponse::new)
291    }
292
293    /// Update a batch with explicitly non-atomic semantics.
294    ///
295    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
296    pub fn update_many_non_atomic<E>(
297        &self,
298        entities: impl IntoIterator<Item = E>,
299    ) -> Result<WriteBatchResponse<E>, InternalError>
300    where
301        E: EntityKind<Canister = C> + EntityValue,
302    {
303        let entities =
304            self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
305
306        Ok(WriteBatchResponse::new(entities))
307    }
308
309    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
310    where
311        E: EntityKind<Canister = C> + EntityValue,
312    {
313        self.with_metrics(|| self.save_executor::<E>().insert_view(view))
314    }
315
316    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
317    where
318        E: EntityKind<Canister = C> + EntityValue,
319    {
320        self.with_metrics(|| self.save_executor::<E>().replace_view(view))
321    }
322
323    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
324    where
325        E: EntityKind<Canister = C> + EntityValue,
326    {
327        self.with_metrics(|| self.save_executor::<E>().update_view(view))
328    }
329
330    /// TEST ONLY: clear all registered data and index stores for this database.
331    #[cfg(test)]
332    #[doc(hidden)]
333    pub fn clear_stores_for_tests(&self) {
334        // Data stores.
335        self.db.with_data(|reg| {
336            for (path, _) in reg.iter() {
337                let _ = reg.with_store_mut(path, DataStore::clear);
338            }
339        });
340
341        // Index stores.
342        self.db.with_index(|reg| {
343            for (path, _) in reg.iter() {
344                let _ = reg.with_store_mut(path, IndexStore::clear);
345            }
346        });
347    }
348}