Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8mod write;
9
10pub(crate) use commit::*;
11pub(crate) use write::WriteUnit;
12
13use crate::{
14    db::{
15        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
16        index::{IndexStore, IndexStoreRegistry},
17        query::{
18            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19            diagnostics::{
20                QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
21                start_event, trace_access_from_plan,
22            },
23        },
24        response::Response,
25        store::{DataStore, DataStoreRegistry},
26    },
27    error::InternalError,
28    obs::sink::{self, MetricsSink},
29    traits::{CanisterKind, EntityKind},
30};
31use std::{marker::PhantomData, thread::LocalKey};
32
33///
34/// EntityRegistryEntry
35///
36/// Minimal entity metadata for save-time reference existence checks.
37/// Captures the entity path and its data store path.
38///
39
40#[derive(Clone, Copy, Debug)]
41pub struct EntityRegistryEntry {
42    pub entity_path: &'static str,
43    pub store_path: &'static str,
44}
45
46///
47/// Db
48///
49/// A handle to the set of stores registered for a specific canister domain.
50///
51pub struct Db<C: CanisterKind> {
52    data: &'static LocalKey<DataStoreRegistry>,
53    index: &'static LocalKey<IndexStoreRegistry>,
54    entities: &'static [EntityRegistryEntry],
55    _marker: PhantomData<C>,
56}
57
58impl<C: CanisterKind> Db<C> {
59    #[must_use]
60    pub const fn new(
61        data: &'static LocalKey<DataStoreRegistry>,
62        index: &'static LocalKey<IndexStoreRegistry>,
63        entities: &'static [EntityRegistryEntry],
64    ) -> Self {
65        Self {
66            data,
67            index,
68            entities,
69            _marker: PhantomData,
70        }
71    }
72
73    #[must_use]
74    pub(crate) const fn context<E>(&self) -> Context<'_, E>
75    where
76        E: EntityKind<Canister = C>,
77    {
78        Context::new(self)
79    }
80
81    /// Return a recovery-guarded context for read paths.
82    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
83    where
84        E: EntityKind<Canister = C>,
85    {
86        ensure_recovered(self)?;
87        Ok(Context::new(self))
88    }
89
90    /// TEST ONLY: Mutate a data store directly, bypassing atomicity and executors.
91    ///
92    /// This is intended for corruption injection and diagnostic testing only.
93    #[cfg(test)]
94    pub fn with_data_store_mut_for_test<R>(
95        &self,
96        path: &'static str,
97        f: impl FnOnce(&mut DataStore) -> R,
98    ) -> Result<R, InternalError> {
99        self.with_data(|reg| reg.with_store_mut(path, f))
100    }
101
102    pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
103        self.data.with(|reg| f(reg))
104    }
105
106    pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
107        self.index.with(|reg| f(reg))
108    }
109
110    pub(crate) const fn entity_registry(&self) -> &'static [EntityRegistryEntry] {
111        self.entities
112    }
113}
114
115impl<C: CanisterKind> Copy for Db<C> {}
116
117impl<C: CanisterKind> Clone for Db<C> {
118    fn clone(&self) -> Self {
119        *self
120    }
121}
122
123///
124/// DbSession
125///
126/// Session-scoped database handle with policy (debug, metrics) and execution routing.
127///
128pub struct DbSession<C: CanisterKind> {
129    db: Db<C>,
130    debug: bool,
131    metrics: Option<&'static dyn MetricsSink>,
132}
133
134impl<C: CanisterKind> DbSession<C> {
135    #[must_use]
136    pub const fn new(db: Db<C>) -> Self {
137        Self {
138            db,
139            debug: false,
140            metrics: None,
141        }
142    }
143
144    #[must_use]
145    pub const fn debug(mut self) -> Self {
146        self.debug = true;
147        self
148    }
149
150    #[must_use]
151    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
152        self.metrics = Some(sink);
153        self
154    }
155
156    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
157        if let Some(sink) = self.metrics {
158            sink::with_metrics_sink(sink, f)
159        } else {
160            f()
161        }
162    }
163
164    // ---------------------------------------------------------------------
165    // Query entry points (public, fluent)
166    // ---------------------------------------------------------------------
167
168    #[must_use]
169    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
170    where
171        E: EntityKind<Canister = C>,
172    {
173        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
174    }
175
176    #[must_use]
177    pub const fn load_with_consistency<E>(
178        &self,
179        consistency: ReadConsistency,
180    ) -> SessionLoadQuery<'_, C, E>
181    where
182        E: EntityKind<Canister = C>,
183    {
184        SessionLoadQuery::new(self, Query::new(consistency))
185    }
186
187    #[must_use]
188    pub const fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
189    where
190        E: EntityKind<Canister = C>,
191    {
192        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
193    }
194
195    #[must_use]
196    pub const fn delete_with_consistency<E>(
197        &self,
198        consistency: ReadConsistency,
199    ) -> SessionDeleteQuery<'_, C, E>
200    where
201        E: EntityKind<Canister = C>,
202    {
203        SessionDeleteQuery::new(self, Query::new(consistency).delete())
204    }
205
206    // ---------------------------------------------------------------------
207    // Low-level executors (crate-internal; execution primitives)
208    // ---------------------------------------------------------------------
209
210    #[must_use]
211    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
212    where
213        E: EntityKind<Canister = C>,
214    {
215        LoadExecutor::new(self.db, self.debug)
216    }
217
218    #[must_use]
219    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
220    where
221        E: EntityKind<Canister = C>,
222    {
223        DeleteExecutor::new(self.db, self.debug)
224    }
225
226    #[must_use]
227    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
228    where
229        E: EntityKind<Canister = C>,
230    {
231        SaveExecutor::new(self.db, self.debug)
232    }
233
234    // ---------------------------------------------------------------------
235    // Query diagnostics / execution (internal routing)
236    // ---------------------------------------------------------------------
237
238    pub fn diagnose_query<E: EntityKind<Canister = C>>(
239        &self,
240        query: &Query<E>,
241    ) -> Result<QueryDiagnostics, QueryError> {
242        let explain = query.explain()?;
243        Ok(QueryDiagnostics::from(explain))
244    }
245
246    pub fn execute_query<E: EntityKind<Canister = C>>(
247        &self,
248        query: &Query<E>,
249    ) -> Result<Response<E>, QueryError> {
250        let plan = query.plan()?;
251        let result = match query.mode() {
252            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
253            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
254        };
255
256        result.map_err(QueryError::Execute)
257    }
258
259    pub fn execute_with_diagnostics<E: EntityKind<Canister = C>>(
260        &self,
261        query: &Query<E>,
262    ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError> {
263        let plan = query.plan()?;
264        let fingerprint = plan.fingerprint();
265        let access = Some(trace_access_from_plan(plan.access()));
266        let executor = match query.mode() {
267            QueryMode::Load(_) => QueryTraceExecutorKind::Load,
268            QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
269        };
270
271        let start = start_event(fingerprint, access, executor);
272        let result = match query.mode() {
273            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
274            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
275        };
276
277        match result {
278            Ok(response) => {
279                let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
280                let finish = finish_event(fingerprint, access, executor, rows);
281                Ok((
282                    response,
283                    QueryExecutionDiagnostics {
284                        fingerprint,
285                        events: vec![start, finish],
286                    },
287                ))
288            }
289            Err(err) => Err(QueryError::Execute(err)),
290        }
291    }
292
293    // ---------------------------------------------------------------------
294    // High-level write API (public, intent-level)
295    // ---------------------------------------------------------------------
296
297    pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
298    where
299        E: EntityKind<Canister = C>,
300    {
301        self.with_metrics(|| self.save_executor::<E>().insert(entity))
302    }
303
304    pub fn insert_many<E>(
305        &self,
306        entities: impl IntoIterator<Item = E>,
307    ) -> Result<Vec<E>, InternalError>
308    where
309        E: EntityKind<Canister = C>,
310    {
311        self.with_metrics(|| self.save_executor::<E>().insert_many(entities))
312    }
313
314    pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
315    where
316        E: EntityKind<Canister = C>,
317    {
318        self.with_metrics(|| self.save_executor::<E>().replace(entity))
319    }
320
321    pub fn replace_many<E>(
322        &self,
323        entities: impl IntoIterator<Item = E>,
324    ) -> Result<Vec<E>, InternalError>
325    where
326        E: EntityKind<Canister = C>,
327    {
328        self.with_metrics(|| self.save_executor::<E>().replace_many(entities))
329    }
330
331    pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
332    where
333        E: EntityKind<Canister = C>,
334    {
335        self.with_metrics(|| self.save_executor::<E>().update(entity))
336    }
337
338    pub fn update_many<E>(
339        &self,
340        entities: impl IntoIterator<Item = E>,
341    ) -> Result<Vec<E>, InternalError>
342    where
343        E: EntityKind<Canister = C>,
344    {
345        self.with_metrics(|| self.save_executor::<E>().update_many(entities))
346    }
347
348    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
349    where
350        E: EntityKind<Canister = C>,
351    {
352        self.with_metrics(|| self.save_executor::<E>().insert_view(view))
353    }
354
355    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
356    where
357        E: EntityKind<Canister = C>,
358    {
359        self.with_metrics(|| self.save_executor::<E>().replace_view(view))
360    }
361
362    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
363    where
364        E: EntityKind<Canister = C>,
365    {
366        self.with_metrics(|| self.save_executor::<E>().update_view(view))
367    }
368
369    /// TEST ONLY: clear all registered data and index stores for this database.
370    #[doc(hidden)]
371    pub fn clear_stores_for_tests(&self) {
372        // Data stores.
373        self.db.with_data(|reg| {
374            for (path, _) in reg.iter() {
375                let _ = reg.with_store_mut(path, DataStore::clear);
376            }
377        });
378
379        // Index stores.
380        self.db.with_index(|reg| {
381            for (path, _) in reg.iter() {
382                let _ = reg.with_store_mut(path, IndexStore::clear);
383            }
384        });
385    }
386}