Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub mod cursor;
3pub(crate) mod executor;
4pub mod identity;
5pub mod index;
6pub mod query;
7mod relation;
8pub mod response;
9pub mod store;
10
11pub use commit::*;
12pub use relation::{StrongRelationDeleteValidateFn, validate_delete_strong_relations_for_source};
13
14use crate::{
15    db::{
16        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
17        query::{
18            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19            plan::PlanError,
20        },
21        response::{Response, WriteBatchResponse, WriteResponse},
22        store::{RawDataKey, StoreRegistry},
23    },
24    error::InternalError,
25    obs::sink::{self, MetricsSink},
26    traits::{CanisterKind, EntityKind, EntityValue},
27};
28use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
29
30#[cfg(test)]
31use crate::db::{index::IndexStore, store::DataStore};
32
33///
34/// Db
35///
36/// A handle to the set of stores registered for a specific canister domain.
37///
38pub struct Db<C: CanisterKind> {
39    store: &'static LocalKey<StoreRegistry>,
40    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
41    _marker: PhantomData<C>,
42}
43
44impl<C: CanisterKind> Db<C> {
45    #[must_use]
46    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
47        Self::new_with_hooks(store, &[])
48    }
49
50    #[must_use]
51    pub const fn new_with_hooks(
52        store: &'static LocalKey<StoreRegistry>,
53        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
54    ) -> Self {
55        Self {
56            store,
57            entity_runtime_hooks,
58            _marker: PhantomData,
59        }
60    }
61
62    #[must_use]
63    pub(crate) const fn context<E>(&self) -> Context<'_, E>
64    where
65        E: EntityKind<Canister = C> + EntityValue,
66    {
67        Context::new(self)
68    }
69
70    /// Return a recovery-guarded context for read paths.
71    ///
72    /// This enforces startup recovery and a fast persisted-marker check so reads
73    /// do not proceed while an incomplete commit is pending replay.
74    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
75    where
76        E: EntityKind<Canister = C> + EntityValue,
77    {
78        ensure_recovered(self)?;
79
80        Ok(Context::new(self))
81    }
82
83    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
84        self.store.with(|reg| f(reg))
85    }
86
87    pub(crate) fn prepare_row_commit_op(
88        &self,
89        op: &CommitRowOp,
90    ) -> Result<PreparedRowCommitOp, InternalError> {
91        let hooks = self
92            .entity_runtime_hooks
93            .iter()
94            .find(|hooks| hooks.entity_path == op.entity_path.as_str())
95            .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
96
97        (hooks.prepare_row_commit)(self, op)
98    }
99
100    // Validate strong relation constraints for delete-selected target keys.
101    pub(crate) fn validate_delete_strong_relations(
102        &self,
103        target_path: &str,
104        deleted_target_keys: &BTreeSet<RawDataKey>,
105    ) -> Result<(), InternalError> {
106        if deleted_target_keys.is_empty() {
107            return Ok(());
108        }
109
110        for hooks in self.entity_runtime_hooks {
111            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
112        }
113
114        Ok(())
115    }
116}
117
118///
119/// EntityRuntimeHooks
120///
121/// Per-entity runtime callbacks used for commit preparation and delete-side
122/// strong relation validation.
123///
124
125pub struct EntityRuntimeHooks<C: CanisterKind> {
126    pub entity_path: &'static str,
127    pub prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
128    pub validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
129}
130
131impl<C: CanisterKind> EntityRuntimeHooks<C> {
132    #[must_use]
133    pub const fn new(
134        entity_path: &'static str,
135        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
136        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
137    ) -> Self {
138        Self {
139            entity_path,
140            prepare_row_commit,
141            validate_delete_strong_relations,
142        }
143    }
144}
145
146impl<C: CanisterKind> Copy for Db<C> {}
147
148impl<C: CanisterKind> Clone for Db<C> {
149    fn clone(&self) -> Self {
150        *self
151    }
152}
153
154///
155/// DbSession
156///
157/// Session-scoped database handle with policy (debug, metrics) and execution routing.
158///
159pub struct DbSession<C: CanisterKind> {
160    db: Db<C>,
161    debug: bool,
162    metrics: Option<&'static dyn MetricsSink>,
163}
164
165impl<C: CanisterKind> DbSession<C> {
166    #[must_use]
167    pub const fn new(db: Db<C>) -> Self {
168        Self {
169            db,
170            debug: false,
171            metrics: None,
172        }
173    }
174
175    #[must_use]
176    pub const fn debug(mut self) -> Self {
177        self.debug = true;
178        self
179    }
180
181    #[must_use]
182    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
183        self.metrics = Some(sink);
184        self
185    }
186
187    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
188        if let Some(sink) = self.metrics {
189            sink::with_metrics_sink(sink, f)
190        } else {
191            f()
192        }
193    }
194
195    // ---------------------------------------------------------------------
196    // Query entry points (public, fluent)
197    // ---------------------------------------------------------------------
198
199    #[must_use]
200    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
201    where
202        E: EntityKind<Canister = C>,
203    {
204        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
205    }
206
207    #[must_use]
208    pub const fn load_with_consistency<E>(
209        &self,
210        consistency: ReadConsistency,
211    ) -> SessionLoadQuery<'_, C, E>
212    where
213        E: EntityKind<Canister = C>,
214    {
215        SessionLoadQuery::new(self, Query::new(consistency))
216    }
217
218    #[must_use]
219    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
220    where
221        E: EntityKind<Canister = C>,
222    {
223        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
224    }
225
226    #[must_use]
227    pub fn delete_with_consistency<E>(
228        &self,
229        consistency: ReadConsistency,
230    ) -> SessionDeleteQuery<'_, C, E>
231    where
232        E: EntityKind<Canister = C>,
233    {
234        SessionDeleteQuery::new(self, Query::new(consistency).delete())
235    }
236
237    // ---------------------------------------------------------------------
238    // Low-level executors (crate-internal; execution primitives)
239    // ---------------------------------------------------------------------
240
241    #[must_use]
242    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
243    where
244        E: EntityKind<Canister = C> + EntityValue,
245    {
246        LoadExecutor::new(self.db, self.debug)
247    }
248
249    #[must_use]
250    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
251    where
252        E: EntityKind<Canister = C> + EntityValue,
253    {
254        DeleteExecutor::new(self.db, self.debug)
255    }
256
257    #[must_use]
258    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
259    where
260        E: EntityKind<Canister = C> + EntityValue,
261    {
262        SaveExecutor::new(self.db, self.debug)
263    }
264
265    // ---------------------------------------------------------------------
266    // Query diagnostics / execution (internal routing)
267    // ---------------------------------------------------------------------
268
269    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
270    where
271        E: EntityKind<Canister = C> + EntityValue,
272    {
273        let plan = query.plan()?;
274
275        let result = match query.mode() {
276            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
277            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
278        };
279
280        result.map_err(QueryError::Execute)
281    }
282
283    pub(crate) fn execute_load_query_paged<E>(
284        &self,
285        query: &Query<E>,
286        cursor_token: Option<&str>,
287    ) -> Result<(Response<E>, Option<Vec<u8>>), QueryError>
288    where
289        E: EntityKind<Canister = C> + EntityValue,
290    {
291        let plan = query.plan()?;
292        let cursor_bytes = match cursor_token {
293            Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
294                QueryError::from(PlanError::InvalidContinuationCursor { reason })
295            })?),
296            None => None,
297        };
298        let boundary = plan.plan_cursor_boundary(cursor_bytes.as_deref())?;
299
300        let page = self
301            .with_metrics(|| self.load_executor::<E>().execute_paged(plan, boundary))
302            .map_err(QueryError::Execute)?;
303
304        Ok((page.items, page.next_cursor))
305    }
306
307    // ---------------------------------------------------------------------
308    // High-level write API (public, intent-level)
309    // ---------------------------------------------------------------------
310
311    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
312    where
313        E: EntityKind<Canister = C> + EntityValue,
314    {
315        self.with_metrics(|| self.save_executor::<E>().insert(entity))
316            .map(WriteResponse::new)
317    }
318
319    /// Insert a batch with explicitly non-atomic semantics.
320    ///
321    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
322    pub fn insert_many_non_atomic<E>(
323        &self,
324        entities: impl IntoIterator<Item = E>,
325    ) -> Result<WriteBatchResponse<E>, InternalError>
326    where
327        E: EntityKind<Canister = C> + EntityValue,
328    {
329        let entities =
330            self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
331
332        Ok(WriteBatchResponse::new(entities))
333    }
334
335    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
336    where
337        E: EntityKind<Canister = C> + EntityValue,
338    {
339        self.with_metrics(|| self.save_executor::<E>().replace(entity))
340            .map(WriteResponse::new)
341    }
342
343    /// Replace a batch with explicitly non-atomic semantics.
344    ///
345    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
346    pub fn replace_many_non_atomic<E>(
347        &self,
348        entities: impl IntoIterator<Item = E>,
349    ) -> Result<WriteBatchResponse<E>, InternalError>
350    where
351        E: EntityKind<Canister = C> + EntityValue,
352    {
353        let entities =
354            self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
355
356        Ok(WriteBatchResponse::new(entities))
357    }
358
359    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
360    where
361        E: EntityKind<Canister = C> + EntityValue,
362    {
363        self.with_metrics(|| self.save_executor::<E>().update(entity))
364            .map(WriteResponse::new)
365    }
366
367    /// Update a batch with explicitly non-atomic semantics.
368    ///
369    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
370    pub fn update_many_non_atomic<E>(
371        &self,
372        entities: impl IntoIterator<Item = E>,
373    ) -> Result<WriteBatchResponse<E>, InternalError>
374    where
375        E: EntityKind<Canister = C> + EntityValue,
376    {
377        let entities =
378            self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
379
380        Ok(WriteBatchResponse::new(entities))
381    }
382
383    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
384    where
385        E: EntityKind<Canister = C> + EntityValue,
386    {
387        self.with_metrics(|| self.save_executor::<E>().insert_view(view))
388    }
389
390    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
391    where
392        E: EntityKind<Canister = C> + EntityValue,
393    {
394        self.with_metrics(|| self.save_executor::<E>().replace_view(view))
395    }
396
397    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
398    where
399        E: EntityKind<Canister = C> + EntityValue,
400    {
401        self.with_metrics(|| self.save_executor::<E>().update_view(view))
402    }
403
404    /// TEST ONLY: clear all registered data and index stores for this database.
405    #[cfg(test)]
406    #[doc(hidden)]
407    pub fn clear_stores_for_tests(&self) {
408        self.db.with_store_registry(|reg| {
409            for (_, store) in reg.iter() {
410                store.with_data_mut(DataStore::clear);
411                store.with_index_mut(IndexStore::clear);
412            }
413        });
414    }
415}