Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub mod cursor;
3pub(crate) mod executor;
4pub mod identity;
5pub mod index;
6pub mod query;
7pub mod response;
8pub mod store;
9
10pub(crate) use commit::*;
11
12use crate::{
13    db::{
14        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
15        index::IndexStoreRegistry,
16        query::{
17            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
18            plan::PlanError,
19        },
20        response::{Response, WriteBatchResponse, WriteResponse},
21        store::DataStoreRegistry,
22    },
23    error::InternalError,
24    obs::sink::{self, MetricsSink},
25    traits::{CanisterKind, EntityKind, EntityValue},
26};
27use std::{marker::PhantomData, thread::LocalKey};
28
29#[cfg(test)]
30use crate::db::{index::IndexStore, store::DataStore};
31
32///
33/// Db
34///
35/// A handle to the set of stores registered for a specific canister domain.
36///
37pub struct Db<C: CanisterKind> {
38    data: &'static LocalKey<DataStoreRegistry>,
39    index: &'static LocalKey<IndexStoreRegistry>,
40    _marker: PhantomData<C>,
41}
42
43impl<C: CanisterKind> Db<C> {
44    #[must_use]
45    pub const fn new(
46        data: &'static LocalKey<DataStoreRegistry>,
47        index: &'static LocalKey<IndexStoreRegistry>,
48    ) -> Self {
49        Self {
50            data,
51            index,
52            _marker: PhantomData,
53        }
54    }
55
56    #[must_use]
57    pub(crate) const fn context<E>(&self) -> Context<'_, E>
58    where
59        E: EntityKind<Canister = C> + EntityValue,
60    {
61        Context::new(self)
62    }
63
64    /// Return a recovery-guarded context for read paths.
65    ///
66    /// This enforces startup recovery and a fast persisted-marker check so reads
67    /// do not proceed while an incomplete commit is pending replay.
68    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
69    where
70        E: EntityKind<Canister = C> + EntityValue,
71    {
72        ensure_recovered(self)?;
73
74        Ok(Context::new(self))
75    }
76
77    /// TEST ONLY: Mutate a data store directly, bypassing atomicity and executors.
78    ///
79    /// This is intended for corruption injection and diagnostic testing only.
80    #[cfg(test)]
81    pub fn with_data_store_mut_for_test<R>(
82        &self,
83        path: &'static str,
84        f: impl FnOnce(&mut DataStore) -> R,
85    ) -> Result<R, InternalError> {
86        self.with_data(|reg| reg.with_store_mut(path, f))
87    }
88
89    pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
90        self.data.with(|reg| f(reg))
91    }
92
93    pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
94        self.index.with(|reg| f(reg))
95    }
96}
97
98impl<C: CanisterKind> Copy for Db<C> {}
99
100impl<C: CanisterKind> Clone for Db<C> {
101    fn clone(&self) -> Self {
102        *self
103    }
104}
105
106///
107/// DbSession
108///
109/// Session-scoped database handle with policy (debug, metrics) and execution routing.
110///
111pub struct DbSession<C: CanisterKind> {
112    db: Db<C>,
113    debug: bool,
114    metrics: Option<&'static dyn MetricsSink>,
115}
116
117impl<C: CanisterKind> DbSession<C> {
118    #[must_use]
119    pub const fn new(db: Db<C>) -> Self {
120        Self {
121            db,
122            debug: false,
123            metrics: None,
124        }
125    }
126
127    #[must_use]
128    pub const fn debug(mut self) -> Self {
129        self.debug = true;
130        self
131    }
132
133    #[must_use]
134    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
135        self.metrics = Some(sink);
136        self
137    }
138
139    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
140        if let Some(sink) = self.metrics {
141            sink::with_metrics_sink(sink, f)
142        } else {
143            f()
144        }
145    }
146
147    // ---------------------------------------------------------------------
148    // Query entry points (public, fluent)
149    // ---------------------------------------------------------------------
150
151    #[must_use]
152    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
153    where
154        E: EntityKind<Canister = C>,
155    {
156        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
157    }
158
159    #[must_use]
160    pub const fn load_with_consistency<E>(
161        &self,
162        consistency: ReadConsistency,
163    ) -> SessionLoadQuery<'_, C, E>
164    where
165        E: EntityKind<Canister = C>,
166    {
167        SessionLoadQuery::new(self, Query::new(consistency))
168    }
169
170    #[must_use]
171    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
172    where
173        E: EntityKind<Canister = C>,
174    {
175        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
176    }
177
178    #[must_use]
179    pub fn delete_with_consistency<E>(
180        &self,
181        consistency: ReadConsistency,
182    ) -> SessionDeleteQuery<'_, C, E>
183    where
184        E: EntityKind<Canister = C>,
185    {
186        SessionDeleteQuery::new(self, Query::new(consistency).delete())
187    }
188
189    // ---------------------------------------------------------------------
190    // Low-level executors (crate-internal; execution primitives)
191    // ---------------------------------------------------------------------
192
193    #[must_use]
194    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
195    where
196        E: EntityKind<Canister = C> + EntityValue,
197    {
198        LoadExecutor::new(self.db, self.debug)
199    }
200
201    #[must_use]
202    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
203    where
204        E: EntityKind<Canister = C> + EntityValue,
205    {
206        DeleteExecutor::new(self.db, self.debug)
207    }
208
209    #[must_use]
210    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
211    where
212        E: EntityKind<Canister = C> + EntityValue,
213    {
214        SaveExecutor::new(self.db, self.debug)
215    }
216
217    // ---------------------------------------------------------------------
218    // Query diagnostics / execution (internal routing)
219    // ---------------------------------------------------------------------
220
221    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
222    where
223        E: EntityKind<Canister = C> + EntityValue,
224    {
225        let plan = query.plan()?;
226
227        let result = match query.mode() {
228            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
229            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
230        };
231
232        result.map_err(QueryError::Execute)
233    }
234
235    pub(crate) fn execute_load_query_paged<E>(
236        &self,
237        query: &Query<E>,
238        cursor_token: Option<&str>,
239    ) -> Result<(Response<E>, Option<Vec<u8>>), QueryError>
240    where
241        E: EntityKind<Canister = C> + EntityValue,
242    {
243        let plan = query.plan()?;
244        let cursor_bytes = match cursor_token {
245            Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
246                QueryError::from(PlanError::InvalidContinuationCursor { reason })
247            })?),
248            None => None,
249        };
250        let boundary = plan.plan_cursor_boundary(cursor_bytes.as_deref())?;
251
252        let page = self
253            .with_metrics(|| self.load_executor::<E>().execute_paged(plan, boundary))
254            .map_err(QueryError::Execute)?;
255
256        Ok((page.items, page.next_cursor))
257    }
258
259    // ---------------------------------------------------------------------
260    // High-level write API (public, intent-level)
261    // ---------------------------------------------------------------------
262
263    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
264    where
265        E: EntityKind<Canister = C> + EntityValue,
266    {
267        self.with_metrics(|| self.save_executor::<E>().insert(entity))
268            .map(WriteResponse::new)
269    }
270
271    /// Insert a batch with explicitly non-atomic semantics.
272    ///
273    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
274    pub fn insert_many_non_atomic<E>(
275        &self,
276        entities: impl IntoIterator<Item = E>,
277    ) -> Result<WriteBatchResponse<E>, InternalError>
278    where
279        E: EntityKind<Canister = C> + EntityValue,
280    {
281        let entities =
282            self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
283
284        Ok(WriteBatchResponse::new(entities))
285    }
286
287    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
288    where
289        E: EntityKind<Canister = C> + EntityValue,
290    {
291        self.with_metrics(|| self.save_executor::<E>().replace(entity))
292            .map(WriteResponse::new)
293    }
294
295    /// Replace a batch with explicitly non-atomic semantics.
296    ///
297    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
298    pub fn replace_many_non_atomic<E>(
299        &self,
300        entities: impl IntoIterator<Item = E>,
301    ) -> Result<WriteBatchResponse<E>, InternalError>
302    where
303        E: EntityKind<Canister = C> + EntityValue,
304    {
305        let entities =
306            self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
307
308        Ok(WriteBatchResponse::new(entities))
309    }
310
311    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
312    where
313        E: EntityKind<Canister = C> + EntityValue,
314    {
315        self.with_metrics(|| self.save_executor::<E>().update(entity))
316            .map(WriteResponse::new)
317    }
318
319    /// Update a batch with explicitly non-atomic semantics.
320    ///
321    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
322    pub fn update_many_non_atomic<E>(
323        &self,
324        entities: impl IntoIterator<Item = E>,
325    ) -> Result<WriteBatchResponse<E>, InternalError>
326    where
327        E: EntityKind<Canister = C> + EntityValue,
328    {
329        let entities =
330            self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
331
332        Ok(WriteBatchResponse::new(entities))
333    }
334
335    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
336    where
337        E: EntityKind<Canister = C> + EntityValue,
338    {
339        self.with_metrics(|| self.save_executor::<E>().insert_view(view))
340    }
341
342    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
343    where
344        E: EntityKind<Canister = C> + EntityValue,
345    {
346        self.with_metrics(|| self.save_executor::<E>().replace_view(view))
347    }
348
349    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
350    where
351        E: EntityKind<Canister = C> + EntityValue,
352    {
353        self.with_metrics(|| self.save_executor::<E>().update_view(view))
354    }
355
356    /// TEST ONLY: clear all registered data and index stores for this database.
357    #[cfg(test)]
358    #[doc(hidden)]
359    pub fn clear_stores_for_tests(&self) {
360        // Data stores.
361        self.db.with_data(|reg| {
362            for (path, _) in reg.iter() {
363                let _ = reg.with_store_mut(path, DataStore::clear);
364            }
365        });
366
367        // Index stores.
368        self.db.with_index(|reg| {
369            for (path, _) in reg.iter() {
370                let _ = reg.with_store_mut(path, IndexStore::clear);
371            }
372        });
373    }
374}