Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8mod write;
9
10pub(crate) use commit::*;
11pub(crate) use write::WriteUnit;
12
13use crate::{
14    db::{
15        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
16        index::{IndexStore, IndexStoreRegistry},
17        query::{
18            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19            diagnostics::{
20                QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
21                start_event, trace_access_from_plan,
22            },
23        },
24        response::Response,
25        store::{DataStore, DataStoreRegistry},
26    },
27    error::InternalError,
28    obs::sink::{self, MetricsSink},
29    traits::{CanisterKind, EntityKind},
30};
31use std::{marker::PhantomData, thread::LocalKey};
32
33///
34/// Db
35///
36/// A handle to the set of stores registered for a specific canister domain.
37///
38pub struct Db<C: CanisterKind> {
39    data: &'static LocalKey<DataStoreRegistry>,
40    index: &'static LocalKey<IndexStoreRegistry>,
41    _marker: PhantomData<C>,
42}
43
44impl<C: CanisterKind> Db<C> {
45    #[must_use]
46    pub const fn new(
47        data: &'static LocalKey<DataStoreRegistry>,
48        index: &'static LocalKey<IndexStoreRegistry>,
49    ) -> Self {
50        Self {
51            data,
52            index,
53            _marker: PhantomData,
54        }
55    }
56
57    #[must_use]
58    pub(crate) const fn context<E>(&self) -> Context<'_, E>
59    where
60        E: EntityKind<Canister = C>,
61    {
62        Context::new(self)
63    }
64
65    /// Return a recovery-guarded context for read paths.
66    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
67    where
68        E: EntityKind<Canister = C>,
69    {
70        ensure_recovered(self)?;
71        Ok(Context::new(self))
72    }
73
74    pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
75        self.data.with(|reg| f(reg))
76    }
77
78    pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
79        self.index.with(|reg| f(reg))
80    }
81}
82
83impl<C: CanisterKind> Copy for Db<C> {}
84
85impl<C: CanisterKind> Clone for Db<C> {
86    fn clone(&self) -> Self {
87        *self
88    }
89}
90
91///
92/// DbSession
93///
94/// Session-scoped database handle with policy (debug, metrics) and execution routing.
95///
96pub struct DbSession<C: CanisterKind> {
97    db: Db<C>,
98    debug: bool,
99    metrics: Option<&'static dyn MetricsSink>,
100}
101
102impl<C: CanisterKind> DbSession<C> {
103    #[must_use]
104    pub const fn new(db: Db<C>) -> Self {
105        Self {
106            db,
107            debug: false,
108            metrics: None,
109        }
110    }
111
112    #[must_use]
113    pub const fn debug(mut self) -> Self {
114        self.debug = true;
115        self
116    }
117
118    #[must_use]
119    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
120        self.metrics = Some(sink);
121        self
122    }
123
124    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
125        if let Some(sink) = self.metrics {
126            sink::with_metrics_sink(sink, f)
127        } else {
128            f()
129        }
130    }
131
132    // ---------------------------------------------------------------------
133    // Query entry points (public, fluent)
134    // ---------------------------------------------------------------------
135
136    #[must_use]
137    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
138    where
139        E: EntityKind<Canister = C>,
140    {
141        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
142    }
143
144    #[must_use]
145    pub const fn load_with_consistency<E>(
146        &self,
147        consistency: ReadConsistency,
148    ) -> SessionLoadQuery<'_, C, E>
149    where
150        E: EntityKind<Canister = C>,
151    {
152        SessionLoadQuery::new(self, Query::new(consistency))
153    }
154
155    #[must_use]
156    pub const fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
157    where
158        E: EntityKind<Canister = C>,
159    {
160        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
161    }
162
163    #[must_use]
164    pub const fn delete_with_consistency<E>(
165        &self,
166        consistency: ReadConsistency,
167    ) -> SessionDeleteQuery<'_, C, E>
168    where
169        E: EntityKind<Canister = C>,
170    {
171        SessionDeleteQuery::new(self, Query::new(consistency).delete())
172    }
173
174    // ---------------------------------------------------------------------
175    // Low-level executors (crate-internal; execution primitives)
176    // ---------------------------------------------------------------------
177
178    #[must_use]
179    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
180    where
181        E: EntityKind<Canister = C>,
182    {
183        LoadExecutor::new(self.db, self.debug)
184    }
185
186    #[must_use]
187    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
188    where
189        E: EntityKind<Canister = C>,
190    {
191        DeleteExecutor::new(self.db, self.debug)
192    }
193
194    #[must_use]
195    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
196    where
197        E: EntityKind<Canister = C>,
198    {
199        SaveExecutor::new(self.db, self.debug)
200    }
201
202    // ---------------------------------------------------------------------
203    // Query diagnostics / execution (internal routing)
204    // ---------------------------------------------------------------------
205
206    pub fn diagnose_query<E: EntityKind<Canister = C>>(
207        &self,
208        query: &Query<E>,
209    ) -> Result<QueryDiagnostics, QueryError> {
210        let explain = query.explain()?;
211        Ok(QueryDiagnostics::from(explain))
212    }
213
214    pub fn execute_query<E: EntityKind<Canister = C>>(
215        &self,
216        query: &Query<E>,
217    ) -> Result<Response<E>, QueryError> {
218        let plan = query.plan()?;
219        let result = match query.mode() {
220            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
221            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
222        };
223
224        result.map_err(QueryError::Execute)
225    }
226
227    pub fn execute_with_diagnostics<E: EntityKind<Canister = C>>(
228        &self,
229        query: &Query<E>,
230    ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError> {
231        let plan = query.plan()?;
232        let fingerprint = plan.fingerprint();
233        let access = Some(trace_access_from_plan(plan.access()));
234        let executor = match query.mode() {
235            QueryMode::Load(_) => QueryTraceExecutorKind::Load,
236            QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
237        };
238
239        let start = start_event(fingerprint, access, executor);
240        let result = match query.mode() {
241            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
242            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
243        };
244
245        match result {
246            Ok(response) => {
247                let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
248                let finish = finish_event(fingerprint, access, executor, rows);
249                Ok((
250                    response,
251                    QueryExecutionDiagnostics {
252                        fingerprint,
253                        events: vec![start, finish],
254                    },
255                ))
256            }
257            Err(err) => Err(QueryError::Execute(err)),
258        }
259    }
260
261    // ---------------------------------------------------------------------
262    // High-level write API (public, intent-level)
263    // ---------------------------------------------------------------------
264
265    pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
266    where
267        E: EntityKind<Canister = C>,
268    {
269        self.with_metrics(|| self.save_executor::<E>().insert(entity))
270    }
271
272    pub fn insert_many<E>(
273        &self,
274        entities: impl IntoIterator<Item = E>,
275    ) -> Result<Vec<E>, InternalError>
276    where
277        E: EntityKind<Canister = C>,
278    {
279        self.with_metrics(|| self.save_executor::<E>().insert_many(entities))
280    }
281
282    pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
283    where
284        E: EntityKind<Canister = C>,
285    {
286        self.with_metrics(|| self.save_executor::<E>().replace(entity))
287    }
288
289    pub fn replace_many<E>(
290        &self,
291        entities: impl IntoIterator<Item = E>,
292    ) -> Result<Vec<E>, InternalError>
293    where
294        E: EntityKind<Canister = C>,
295    {
296        self.with_metrics(|| self.save_executor::<E>().replace_many(entities))
297    }
298
299    pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
300    where
301        E: EntityKind<Canister = C>,
302    {
303        self.with_metrics(|| self.save_executor::<E>().update(entity))
304    }
305
306    pub fn update_many<E>(
307        &self,
308        entities: impl IntoIterator<Item = E>,
309    ) -> Result<Vec<E>, InternalError>
310    where
311        E: EntityKind<Canister = C>,
312    {
313        self.with_metrics(|| self.save_executor::<E>().update_many(entities))
314    }
315
316    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
317    where
318        E: EntityKind<Canister = C>,
319    {
320        self.with_metrics(|| self.save_executor::<E>().insert_view(view))
321    }
322
323    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
324    where
325        E: EntityKind<Canister = C>,
326    {
327        self.with_metrics(|| self.save_executor::<E>().replace_view(view))
328    }
329
330    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
331    where
332        E: EntityKind<Canister = C>,
333    {
334        self.with_metrics(|| self.save_executor::<E>().update_view(view))
335    }
336
337    /// TEST ONLY: clear all registered data and index stores for this database.
338    #[doc(hidden)]
339    pub fn clear_stores_for_tests(&self) {
340        // Data stores.
341        self.db.with_data(|reg| {
342            for (path, _) in reg.iter() {
343                let _ = reg.with_store_mut(path, DataStore::clear);
344            }
345        });
346
347        // Index stores.
348        self.db.with_index(|reg| {
349            for (path, _) in reg.iter() {
350                let _ = reg.with_store_mut(path, IndexStore::clear);
351            }
352        });
353    }
354}