Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8mod write;
9
10pub(crate) use commit::*;
11pub(crate) use write::WriteUnit;
12
13use crate::{
14    db::{
15        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
16        index::IndexStoreRegistry,
17        query::{
18            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19            diagnostics::{
20                QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
21                start_event, trace_access_from_plan,
22            },
23        },
24        response::{Response, WriteBatchResponse, WriteResponse},
25        store::DataStoreRegistry,
26    },
27    error::InternalError,
28    obs::sink::{self, MetricsSink},
29    traits::{CanisterKind, EntityKind, EntityValue},
30};
31use std::{marker::PhantomData, thread::LocalKey};
32
33#[cfg(test)]
34use crate::db::{index::IndexStore, store::DataStore};
35
36///
37/// Db
38///
39/// A handle to the set of stores registered for a specific canister domain.
40///
41pub struct Db<C: CanisterKind> {
42    data: &'static LocalKey<DataStoreRegistry>,
43    index: &'static LocalKey<IndexStoreRegistry>,
44    _marker: PhantomData<C>,
45}
46
47impl<C: CanisterKind> Db<C> {
48    #[must_use]
49    pub const fn new(
50        data: &'static LocalKey<DataStoreRegistry>,
51        index: &'static LocalKey<IndexStoreRegistry>,
52    ) -> Self {
53        Self {
54            data,
55            index,
56            _marker: PhantomData,
57        }
58    }
59
60    #[must_use]
61    pub(crate) const fn context<E>(&self) -> Context<'_, E>
62    where
63        E: EntityKind<Canister = C> + EntityValue,
64    {
65        Context::new(self)
66    }
67
68    /// Return a recovery-guarded context for read paths (startup recovery only).
69    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
70    where
71        E: EntityKind<Canister = C> + EntityValue,
72    {
73        ensure_recovered(self)?;
74
75        Ok(Context::new(self))
76    }
77
78    /// TEST ONLY: Mutate a data store directly, bypassing atomicity and executors.
79    ///
80    /// This is intended for corruption injection and diagnostic testing only.
81    #[cfg(test)]
82    pub fn with_data_store_mut_for_test<R>(
83        &self,
84        path: &'static str,
85        f: impl FnOnce(&mut DataStore) -> R,
86    ) -> Result<R, InternalError> {
87        self.with_data(|reg| reg.with_store_mut(path, f))
88    }
89
90    pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
91        self.data.with(|reg| f(reg))
92    }
93
94    pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
95        self.index.with(|reg| f(reg))
96    }
97}
98
99impl<C: CanisterKind> Copy for Db<C> {}
100
101impl<C: CanisterKind> Clone for Db<C> {
102    fn clone(&self) -> Self {
103        *self
104    }
105}
106
107///
108/// DbSession
109///
110/// Session-scoped database handle with policy (debug, metrics) and execution routing.
111///
112pub struct DbSession<C: CanisterKind> {
113    db: Db<C>,
114    debug: bool,
115    metrics: Option<&'static dyn MetricsSink>,
116}
117
118impl<C: CanisterKind> DbSession<C> {
119    #[must_use]
120    pub const fn new(db: Db<C>) -> Self {
121        Self {
122            db,
123            debug: false,
124            metrics: None,
125        }
126    }
127
128    #[must_use]
129    pub const fn debug(mut self) -> Self {
130        self.debug = true;
131        self
132    }
133
134    #[must_use]
135    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
136        self.metrics = Some(sink);
137        self
138    }
139
140    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
141        if let Some(sink) = self.metrics {
142            sink::with_metrics_sink(sink, f)
143        } else {
144            f()
145        }
146    }
147
148    // ---------------------------------------------------------------------
149    // Query entry points (public, fluent)
150    // ---------------------------------------------------------------------
151
152    #[must_use]
153    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
154    where
155        E: EntityKind<Canister = C>,
156    {
157        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
158    }
159
160    #[must_use]
161    pub const fn load_with_consistency<E>(
162        &self,
163        consistency: ReadConsistency,
164    ) -> SessionLoadQuery<'_, C, E>
165    where
166        E: EntityKind<Canister = C>,
167    {
168        SessionLoadQuery::new(self, Query::new(consistency))
169    }
170
171    #[must_use]
172    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
173    where
174        E: EntityKind<Canister = C>,
175    {
176        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
177    }
178
179    #[must_use]
180    pub fn delete_with_consistency<E>(
181        &self,
182        consistency: ReadConsistency,
183    ) -> SessionDeleteQuery<'_, C, E>
184    where
185        E: EntityKind<Canister = C>,
186    {
187        SessionDeleteQuery::new(self, Query::new(consistency).delete())
188    }
189
190    // ---------------------------------------------------------------------
191    // Low-level executors (crate-internal; execution primitives)
192    // ---------------------------------------------------------------------
193
194    #[must_use]
195    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
196    where
197        E: EntityKind<Canister = C> + EntityValue,
198    {
199        LoadExecutor::new(self.db, self.debug)
200    }
201
202    #[must_use]
203    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
204    where
205        E: EntityKind<Canister = C> + EntityValue,
206    {
207        DeleteExecutor::new(self.db, self.debug)
208    }
209
210    #[must_use]
211    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
212    where
213        E: EntityKind<Canister = C> + EntityValue,
214    {
215        SaveExecutor::new(self.db, self.debug)
216    }
217
218    // ---------------------------------------------------------------------
219    // Query diagnostics / execution (internal routing)
220    // ---------------------------------------------------------------------
221
222    pub fn diagnose_query<E>(&self, query: &Query<E>) -> Result<QueryDiagnostics, QueryError>
223    where
224        E: EntityKind<Canister = C>,
225    {
226        let explain = query.explain()?;
227
228        Ok(QueryDiagnostics::from(explain))
229    }
230
231    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
232    where
233        E: EntityKind<Canister = C> + EntityValue,
234    {
235        let plan = query.plan()?;
236
237        let result = match query.mode() {
238            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
239            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
240        };
241
242        result.map_err(QueryError::Execute)
243    }
244
245    pub fn execute_with_diagnostics<E>(
246        &self,
247        query: &Query<E>,
248    ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError>
249    where
250        E: EntityKind<Canister = C> + EntityValue,
251    {
252        let plan = query.plan()?;
253        let fingerprint = plan.fingerprint();
254        let access = Some(trace_access_from_plan(plan.access()));
255        let executor = match query.mode() {
256            QueryMode::Load(_) => QueryTraceExecutorKind::Load,
257            QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
258        };
259
260        let start = start_event(fingerprint, access, executor);
261        let result = match query.mode() {
262            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
263            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
264        };
265
266        match result {
267            Ok(response) => {
268                // NOTE: Diagnostics are best-effort; overflow saturates for determinism.
269                let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
270                let finish = finish_event(fingerprint, access, executor, rows);
271                Ok((
272                    response,
273                    QueryExecutionDiagnostics {
274                        fingerprint,
275                        events: vec![start, finish],
276                    },
277                ))
278            }
279            Err(err) => Err(QueryError::Execute(err)),
280        }
281    }
282
283    // ---------------------------------------------------------------------
284    // High-level write API (public, intent-level)
285    // ---------------------------------------------------------------------
286
287    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
288    where
289        E: EntityKind<Canister = C> + EntityValue,
290    {
291        self.with_metrics(|| self.save_executor::<E>().insert(entity))
292            .map(WriteResponse::new)
293    }
294
295    /// Insert a batch with explicitly non-atomic semantics.
296    ///
297    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
298    pub fn insert_many_non_atomic<E>(
299        &self,
300        entities: impl IntoIterator<Item = E>,
301    ) -> Result<WriteBatchResponse<E>, InternalError>
302    where
303        E: EntityKind<Canister = C> + EntityValue,
304    {
305        let entities =
306            self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
307
308        Ok(WriteBatchResponse::new(entities))
309    }
310
311    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
312    where
313        E: EntityKind<Canister = C> + EntityValue,
314    {
315        self.with_metrics(|| self.save_executor::<E>().replace(entity))
316            .map(WriteResponse::new)
317    }
318
319    /// Replace a batch with explicitly non-atomic semantics.
320    ///
321    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
322    pub fn replace_many_non_atomic<E>(
323        &self,
324        entities: impl IntoIterator<Item = E>,
325    ) -> Result<WriteBatchResponse<E>, InternalError>
326    where
327        E: EntityKind<Canister = C> + EntityValue,
328    {
329        let entities =
330            self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
331
332        Ok(WriteBatchResponse::new(entities))
333    }
334
335    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
336    where
337        E: EntityKind<Canister = C> + EntityValue,
338    {
339        self.with_metrics(|| self.save_executor::<E>().update(entity))
340            .map(WriteResponse::new)
341    }
342
343    /// Update a batch with explicitly non-atomic semantics.
344    ///
345    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
346    pub fn update_many_non_atomic<E>(
347        &self,
348        entities: impl IntoIterator<Item = E>,
349    ) -> Result<WriteBatchResponse<E>, InternalError>
350    where
351        E: EntityKind<Canister = C> + EntityValue,
352    {
353        let entities =
354            self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
355
356        Ok(WriteBatchResponse::new(entities))
357    }
358
359    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
360    where
361        E: EntityKind<Canister = C> + EntityValue,
362    {
363        self.with_metrics(|| self.save_executor::<E>().insert_view(view))
364    }
365
366    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
367    where
368        E: EntityKind<Canister = C> + EntityValue,
369    {
370        self.with_metrics(|| self.save_executor::<E>().replace_view(view))
371    }
372
373    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
374    where
375        E: EntityKind<Canister = C> + EntityValue,
376    {
377        self.with_metrics(|| self.save_executor::<E>().update_view(view))
378    }
379
380    /// TEST ONLY: clear all registered data and index stores for this database.
381    #[cfg(test)]
382    #[doc(hidden)]
383    pub fn clear_stores_for_tests(&self) {
384        // Data stores.
385        self.db.with_data(|reg| {
386            for (path, _) in reg.iter() {
387                let _ = reg.with_store_mut(path, DataStore::clear);
388            }
389        });
390
391        // Index stores.
392        self.db.with_index(|reg| {
393            for (path, _) in reg.iter() {
394                let _ = reg.with_store_mut(path, IndexStore::clear);
395            }
396        });
397    }
398}