Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8mod write;
9
10pub(crate) use commit::*;
11pub(crate) use write::WriteUnit;
12
13use crate::{
14    db::{
15        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
16        index::{IndexStore, IndexStoreRegistry},
17        query::{
18            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19            diagnostics::{
20                QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
21                start_event, trace_access_from_plan,
22            },
23        },
24        response::Response,
25        store::{DataStore, DataStoreRegistry},
26    },
27    error::InternalError,
28    obs::sink::{self, MetricsSink},
29    traits::{CanisterKind, EntityKind, EntityValue},
30};
31use std::{marker::PhantomData, thread::LocalKey};
32
33///
34/// EntityRegistryEntry
35///
36/// Minimal entity metadata for save-time reference existence checks.
37/// Captures the entity path and its data store path.
38///
39
40#[derive(Clone, Copy, Debug)]
41pub struct EntityRegistryEntry {
42    pub entity_path: &'static str,
43    pub store_path: &'static str,
44}
45
46///
47/// Db
48///
49/// A handle to the set of stores registered for a specific canister domain.
50///
51pub struct Db<C: CanisterKind> {
52    data: &'static LocalKey<DataStoreRegistry>,
53    index: &'static LocalKey<IndexStoreRegistry>,
54    entities: &'static [EntityRegistryEntry],
55    _marker: PhantomData<C>,
56}
57
58impl<C: CanisterKind> Db<C> {
59    #[must_use]
60    pub const fn new(
61        data: &'static LocalKey<DataStoreRegistry>,
62        index: &'static LocalKey<IndexStoreRegistry>,
63        entities: &'static [EntityRegistryEntry],
64    ) -> Self {
65        Self {
66            data,
67            index,
68            entities,
69            _marker: PhantomData,
70        }
71    }
72
73    #[must_use]
74    pub(crate) const fn context<E>(&self) -> Context<'_, E>
75    where
76        E: EntityKind<Canister = C> + EntityValue,
77    {
78        Context::new(self)
79    }
80
81    /// Return a recovery-guarded context for read paths.
82    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
83    where
84        E: EntityKind<Canister = C> + EntityValue,
85    {
86        ensure_recovered(self)?;
87
88        Ok(Context::new(self))
89    }
90
91    /// TEST ONLY: Mutate a data store directly, bypassing atomicity and executors.
92    ///
93    /// This is intended for corruption injection and diagnostic testing only.
94    #[cfg(test)]
95    pub fn with_data_store_mut_for_test<R>(
96        &self,
97        path: &'static str,
98        f: impl FnOnce(&mut DataStore) -> R,
99    ) -> Result<R, InternalError> {
100        self.with_data(|reg| reg.with_store_mut(path, f))
101    }
102
103    pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
104        self.data.with(|reg| f(reg))
105    }
106
107    pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
108        self.index.with(|reg| f(reg))
109    }
110
111    pub(crate) const fn entity_registry(&self) -> &'static [EntityRegistryEntry] {
112        self.entities
113    }
114}
115
116impl<C: CanisterKind> Copy for Db<C> {}
117
118impl<C: CanisterKind> Clone for Db<C> {
119    fn clone(&self) -> Self {
120        *self
121    }
122}
123
124///
125/// DbSession
126///
127/// Session-scoped database handle with policy (debug, metrics) and execution routing.
128///
129pub struct DbSession<C: CanisterKind> {
130    db: Db<C>,
131    debug: bool,
132    metrics: Option<&'static dyn MetricsSink>,
133}
134
135impl<C: CanisterKind> DbSession<C> {
136    #[must_use]
137    pub const fn new(db: Db<C>) -> Self {
138        Self {
139            db,
140            debug: false,
141            metrics: None,
142        }
143    }
144
145    #[must_use]
146    pub const fn debug(mut self) -> Self {
147        self.debug = true;
148        self
149    }
150
151    #[must_use]
152    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
153        self.metrics = Some(sink);
154        self
155    }
156
157    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
158        if let Some(sink) = self.metrics {
159            sink::with_metrics_sink(sink, f)
160        } else {
161            f()
162        }
163    }
164
165    // ---------------------------------------------------------------------
166    // Query entry points (public, fluent)
167    // ---------------------------------------------------------------------
168
169    #[must_use]
170    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
171    where
172        E: EntityKind<Canister = C>,
173    {
174        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
175    }
176
177    #[must_use]
178    pub const fn load_with_consistency<E>(
179        &self,
180        consistency: ReadConsistency,
181    ) -> SessionLoadQuery<'_, C, E>
182    where
183        E: EntityKind<Canister = C>,
184    {
185        SessionLoadQuery::new(self, Query::new(consistency))
186    }
187
188    #[must_use]
189    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
190    where
191        E: EntityKind<Canister = C>,
192    {
193        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
194    }
195
196    #[must_use]
197    pub fn delete_with_consistency<E>(
198        &self,
199        consistency: ReadConsistency,
200    ) -> SessionDeleteQuery<'_, C, E>
201    where
202        E: EntityKind<Canister = C>,
203    {
204        SessionDeleteQuery::new(self, Query::new(consistency).delete())
205    }
206
207    // ---------------------------------------------------------------------
208    // Low-level executors (crate-internal; execution primitives)
209    // ---------------------------------------------------------------------
210
211    #[must_use]
212    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
213    where
214        E: EntityKind<Canister = C> + EntityValue,
215    {
216        LoadExecutor::new(self.db, self.debug)
217    }
218
219    #[must_use]
220    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
221    where
222        E: EntityKind<Canister = C> + EntityValue,
223    {
224        DeleteExecutor::new(self.db, self.debug)
225    }
226
227    #[must_use]
228    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
229    where
230        E: EntityKind<Canister = C> + EntityValue,
231    {
232        SaveExecutor::new(self.db, self.debug)
233    }
234
235    // ---------------------------------------------------------------------
236    // Query diagnostics / execution (internal routing)
237    // ---------------------------------------------------------------------
238
239    pub fn diagnose_query<E>(&self, query: &Query<E>) -> Result<QueryDiagnostics, QueryError>
240    where
241        E: EntityKind<Canister = C>,
242    {
243        let explain = query.explain()?;
244
245        Ok(QueryDiagnostics::from(explain))
246    }
247
248    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
249    where
250        E: EntityKind<Canister = C> + EntityValue,
251    {
252        let plan = query.plan()?;
253
254        let result = match query.mode() {
255            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
256            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
257        };
258
259        result.map_err(QueryError::Execute)
260    }
261
262    pub fn execute_with_diagnostics<E>(
263        &self,
264        query: &Query<E>,
265    ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError>
266    where
267        E: EntityKind<Canister = C> + EntityValue,
268    {
269        let plan = query.plan()?;
270        let fingerprint = plan.fingerprint();
271        let access = Some(trace_access_from_plan(plan.access()));
272        let executor = match query.mode() {
273            QueryMode::Load(_) => QueryTraceExecutorKind::Load,
274            QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
275        };
276
277        let start = start_event(fingerprint, access, executor);
278        let result = match query.mode() {
279            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
280            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
281        };
282
283        match result {
284            Ok(response) => {
285                let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
286                let finish = finish_event(fingerprint, access, executor, rows);
287                Ok((
288                    response,
289                    QueryExecutionDiagnostics {
290                        fingerprint,
291                        events: vec![start, finish],
292                    },
293                ))
294            }
295            Err(err) => Err(QueryError::Execute(err)),
296        }
297    }
298
299    // ---------------------------------------------------------------------
300    // High-level write API (public, intent-level)
301    // ---------------------------------------------------------------------
302
303    pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
304    where
305        E: EntityKind<Canister = C> + EntityValue,
306    {
307        self.with_metrics(|| self.save_executor::<E>().insert(entity))
308    }
309
310    pub fn insert_many<E>(
311        &self,
312        entities: impl IntoIterator<Item = E>,
313    ) -> Result<Vec<E>, InternalError>
314    where
315        E: EntityKind<Canister = C> + EntityValue,
316    {
317        self.with_metrics(|| self.save_executor::<E>().insert_many(entities))
318    }
319
320    pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
321    where
322        E: EntityKind<Canister = C> + EntityValue,
323    {
324        self.with_metrics(|| self.save_executor::<E>().replace(entity))
325    }
326
327    pub fn replace_many<E>(
328        &self,
329        entities: impl IntoIterator<Item = E>,
330    ) -> Result<Vec<E>, InternalError>
331    where
332        E: EntityKind<Canister = C> + EntityValue,
333    {
334        self.with_metrics(|| self.save_executor::<E>().replace_many(entities))
335    }
336
337    pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
338    where
339        E: EntityKind<Canister = C> + EntityValue,
340    {
341        self.with_metrics(|| self.save_executor::<E>().update(entity))
342    }
343
344    pub fn update_many<E>(
345        &self,
346        entities: impl IntoIterator<Item = E>,
347    ) -> Result<Vec<E>, InternalError>
348    where
349        E: EntityKind<Canister = C> + EntityValue,
350    {
351        self.with_metrics(|| self.save_executor::<E>().update_many(entities))
352    }
353
354    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
355    where
356        E: EntityKind<Canister = C> + EntityValue,
357    {
358        self.with_metrics(|| self.save_executor::<E>().insert_view(view))
359    }
360
361    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
362    where
363        E: EntityKind<Canister = C> + EntityValue,
364    {
365        self.with_metrics(|| self.save_executor::<E>().replace_view(view))
366    }
367
368    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
369    where
370        E: EntityKind<Canister = C> + EntityValue,
371    {
372        self.with_metrics(|| self.save_executor::<E>().update_view(view))
373    }
374
375    /// TEST ONLY: clear all registered data and index stores for this database.
376    #[doc(hidden)]
377    pub fn clear_stores_for_tests(&self) {
378        // Data stores.
379        self.db.with_data(|reg| {
380            for (path, _) in reg.iter() {
381                let _ = reg.with_store_mut(path, DataStore::clear);
382            }
383        });
384
385        // Index stores.
386        self.db.with_index(|reg| {
387            for (path, _) in reg.iter() {
388                let _ = reg.with_store_mut(path, IndexStore::clear);
389            }
390        });
391    }
392}