Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8mod write;
9
10pub(crate) use commit::*;
11pub(crate) use write::WriteUnit;
12
13use crate::{
14    db::{
15        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
16        index::{IndexStore, IndexStoreRegistry},
17        query::{
18            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19            diagnostics::{
20                QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
21                start_event, trace_access_from_plan,
22            },
23        },
24        response::{Response, WriteBatchResponse, WriteResponse},
25        store::{DataStore, DataStoreRegistry},
26    },
27    error::InternalError,
28    obs::sink::{self, MetricsSink},
29    traits::{CanisterKind, EntityKind, EntityValue},
30};
31use std::{marker::PhantomData, thread::LocalKey};
32
33///
34/// Db
35///
36/// A handle to the set of stores registered for a specific canister domain.
37///
38pub struct Db<C: CanisterKind> {
39    data: &'static LocalKey<DataStoreRegistry>,
40    index: &'static LocalKey<IndexStoreRegistry>,
41    _marker: PhantomData<C>,
42}
43
44impl<C: CanisterKind> Db<C> {
45    #[must_use]
46    pub const fn new(
47        data: &'static LocalKey<DataStoreRegistry>,
48        index: &'static LocalKey<IndexStoreRegistry>,
49    ) -> Self {
50        Self {
51            data,
52            index,
53            _marker: PhantomData,
54        }
55    }
56
57    #[must_use]
58    pub(crate) const fn context<E>(&self) -> Context<'_, E>
59    where
60        E: EntityKind<Canister = C> + EntityValue,
61    {
62        Context::new(self)
63    }
64
65    /// Return a recovery-guarded context for read paths.
66    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
67    where
68        E: EntityKind<Canister = C> + EntityValue,
69    {
70        ensure_recovered(self)?;
71
72        Ok(Context::new(self))
73    }
74
75    /// TEST ONLY: Mutate a data store directly, bypassing atomicity and executors.
76    ///
77    /// This is intended for corruption injection and diagnostic testing only.
78    #[cfg(test)]
79    pub fn with_data_store_mut_for_test<R>(
80        &self,
81        path: &'static str,
82        f: impl FnOnce(&mut DataStore) -> R,
83    ) -> Result<R, InternalError> {
84        self.with_data(|reg| reg.with_store_mut(path, f))
85    }
86
87    pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
88        self.data.with(|reg| f(reg))
89    }
90
91    pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
92        self.index.with(|reg| f(reg))
93    }
94}
95
96impl<C: CanisterKind> Copy for Db<C> {}
97
98impl<C: CanisterKind> Clone for Db<C> {
99    fn clone(&self) -> Self {
100        *self
101    }
102}
103
104///
105/// DbSession
106///
107/// Session-scoped database handle with policy (debug, metrics) and execution routing.
108///
109pub struct DbSession<C: CanisterKind> {
110    db: Db<C>,
111    debug: bool,
112    metrics: Option<&'static dyn MetricsSink>,
113}
114
115impl<C: CanisterKind> DbSession<C> {
116    #[must_use]
117    pub const fn new(db: Db<C>) -> Self {
118        Self {
119            db,
120            debug: false,
121            metrics: None,
122        }
123    }
124
125    #[must_use]
126    pub const fn debug(mut self) -> Self {
127        self.debug = true;
128        self
129    }
130
131    #[must_use]
132    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
133        self.metrics = Some(sink);
134        self
135    }
136
137    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
138        if let Some(sink) = self.metrics {
139            sink::with_metrics_sink(sink, f)
140        } else {
141            f()
142        }
143    }
144
145    // ---------------------------------------------------------------------
146    // Query entry points (public, fluent)
147    // ---------------------------------------------------------------------
148
149    #[must_use]
150    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
151    where
152        E: EntityKind<Canister = C>,
153    {
154        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
155    }
156
157    #[must_use]
158    pub const fn load_with_consistency<E>(
159        &self,
160        consistency: ReadConsistency,
161    ) -> SessionLoadQuery<'_, C, E>
162    where
163        E: EntityKind<Canister = C>,
164    {
165        SessionLoadQuery::new(self, Query::new(consistency))
166    }
167
168    #[must_use]
169    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
170    where
171        E: EntityKind<Canister = C>,
172    {
173        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
174    }
175
176    #[must_use]
177    pub fn delete_with_consistency<E>(
178        &self,
179        consistency: ReadConsistency,
180    ) -> SessionDeleteQuery<'_, C, E>
181    where
182        E: EntityKind<Canister = C>,
183    {
184        SessionDeleteQuery::new(self, Query::new(consistency).delete())
185    }
186
187    // ---------------------------------------------------------------------
188    // Low-level executors (crate-internal; execution primitives)
189    // ---------------------------------------------------------------------
190
191    #[must_use]
192    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
193    where
194        E: EntityKind<Canister = C> + EntityValue,
195    {
196        LoadExecutor::new(self.db, self.debug)
197    }
198
199    #[must_use]
200    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
201    where
202        E: EntityKind<Canister = C> + EntityValue,
203    {
204        DeleteExecutor::new(self.db, self.debug)
205    }
206
207    #[must_use]
208    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
209    where
210        E: EntityKind<Canister = C> + EntityValue,
211    {
212        SaveExecutor::new(self.db, self.debug)
213    }
214
215    // ---------------------------------------------------------------------
216    // Query diagnostics / execution (internal routing)
217    // ---------------------------------------------------------------------
218
219    pub fn diagnose_query<E>(&self, query: &Query<E>) -> Result<QueryDiagnostics, QueryError>
220    where
221        E: EntityKind<Canister = C>,
222    {
223        let explain = query.explain()?;
224
225        Ok(QueryDiagnostics::from(explain))
226    }
227
228    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
229    where
230        E: EntityKind<Canister = C> + EntityValue,
231    {
232        let plan = query.plan()?;
233
234        let result = match query.mode() {
235            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
236            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
237        };
238
239        result.map_err(QueryError::Execute)
240    }
241
242    pub fn execute_with_diagnostics<E>(
243        &self,
244        query: &Query<E>,
245    ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError>
246    where
247        E: EntityKind<Canister = C> + EntityValue,
248    {
249        let plan = query.plan()?;
250        let fingerprint = plan.fingerprint();
251        let access = Some(trace_access_from_plan(plan.access()));
252        let executor = match query.mode() {
253            QueryMode::Load(_) => QueryTraceExecutorKind::Load,
254            QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
255        };
256
257        let start = start_event(fingerprint, access, executor);
258        let result = match query.mode() {
259            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
260            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
261        };
262
263        match result {
264            Ok(response) => {
265                // NOTE: Diagnostics are best-effort; overflow saturates for determinism.
266                let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
267                let finish = finish_event(fingerprint, access, executor, rows);
268                Ok((
269                    response,
270                    QueryExecutionDiagnostics {
271                        fingerprint,
272                        events: vec![start, finish],
273                    },
274                ))
275            }
276            Err(err) => Err(QueryError::Execute(err)),
277        }
278    }
279
280    // ---------------------------------------------------------------------
281    // High-level write API (public, intent-level)
282    // ---------------------------------------------------------------------
283
284    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
285    where
286        E: EntityKind<Canister = C> + EntityValue,
287    {
288        self.with_metrics(|| self.save_executor::<E>().insert(entity))
289            .map(WriteResponse::new)
290    }
291
292    pub fn insert_many<E>(
293        &self,
294        entities: impl IntoIterator<Item = E>,
295    ) -> Result<WriteBatchResponse<E>, InternalError>
296    where
297        E: EntityKind<Canister = C> + EntityValue,
298    {
299        let entities = self.with_metrics(|| self.save_executor::<E>().insert_many(entities))?;
300
301        Ok(WriteBatchResponse::new(entities))
302    }
303
304    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
305    where
306        E: EntityKind<Canister = C> + EntityValue,
307    {
308        self.with_metrics(|| self.save_executor::<E>().replace(entity))
309            .map(WriteResponse::new)
310    }
311
312    pub fn replace_many<E>(
313        &self,
314        entities: impl IntoIterator<Item = E>,
315    ) -> Result<WriteBatchResponse<E>, InternalError>
316    where
317        E: EntityKind<Canister = C> + EntityValue,
318    {
319        let entities = self.with_metrics(|| self.save_executor::<E>().replace_many(entities))?;
320
321        Ok(WriteBatchResponse::new(entities))
322    }
323
324    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
325    where
326        E: EntityKind<Canister = C> + EntityValue,
327    {
328        self.with_metrics(|| self.save_executor::<E>().update(entity))
329            .map(WriteResponse::new)
330    }
331
332    pub fn update_many<E>(
333        &self,
334        entities: impl IntoIterator<Item = E>,
335    ) -> Result<WriteBatchResponse<E>, InternalError>
336    where
337        E: EntityKind<Canister = C> + EntityValue,
338    {
339        let entities = self.with_metrics(|| self.save_executor::<E>().update_many(entities))?;
340
341        Ok(WriteBatchResponse::new(entities))
342    }
343
344    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
345    where
346        E: EntityKind<Canister = C> + EntityValue,
347    {
348        self.with_metrics(|| self.save_executor::<E>().insert_view(view))
349    }
350
351    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
352    where
353        E: EntityKind<Canister = C> + EntityValue,
354    {
355        self.with_metrics(|| self.save_executor::<E>().replace_view(view))
356    }
357
358    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
359    where
360        E: EntityKind<Canister = C> + EntityValue,
361    {
362        self.with_metrics(|| self.save_executor::<E>().update_view(view))
363    }
364
365    /// TEST ONLY: clear all registered data and index stores for this database.
366    #[doc(hidden)]
367    pub fn clear_stores_for_tests(&self) {
368        // Data stores.
369        self.db.with_data(|reg| {
370            for (path, _) in reg.iter() {
371                let _ = reg.with_store_mut(path, DataStore::clear);
372            }
373        });
374
375        // Index stores.
376        self.db.with_index(|reg| {
377            for (path, _) in reg.iter() {
378                let _ = reg.with_store_mut(path, IndexStore::clear);
379            }
380        });
381    }
382}