Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub mod cursor;
3pub(crate) mod executor;
4pub mod identity;
5pub mod index;
6pub mod query;
7mod relation;
8pub mod response;
9pub mod store;
10
11pub use commit::*;
12pub use relation::{StrongRelationDeleteValidateFn, validate_delete_strong_relations_for_source};
13
14use crate::{
15    db::{
16        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
17        query::{
18            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19            plan::PlanError,
20        },
21        response::{Response, WriteBatchResponse, WriteResponse},
22        store::{RawDataKey, StoreRegistry},
23    },
24    error::InternalError,
25    obs::sink::{self, MetricsSink},
26    traits::{CanisterKind, EntityKind, EntityValue},
27};
28use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
29
30#[cfg(test)]
31use crate::db::{index::IndexStore, store::DataStore};
32
33///
34/// Db
35///
36/// A handle to the set of stores registered for a specific canister domain.
37///
38pub struct Db<C: CanisterKind> {
39    store: &'static LocalKey<StoreRegistry>,
40    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
41    _marker: PhantomData<C>,
42}
43
44impl<C: CanisterKind> Db<C> {
45    #[must_use]
46    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
47        Self::new_with_hooks(store, &[])
48    }
49
50    #[must_use]
51    pub const fn new_with_hooks(
52        store: &'static LocalKey<StoreRegistry>,
53        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
54    ) -> Self {
55        Self {
56            store,
57            entity_runtime_hooks,
58            _marker: PhantomData,
59        }
60    }
61
62    #[must_use]
63    pub(crate) const fn context<E>(&self) -> Context<'_, E>
64    where
65        E: EntityKind<Canister = C> + EntityValue,
66    {
67        Context::new(self)
68    }
69
70    /// Return a recovery-guarded context for read paths.
71    ///
72    /// This enforces startup recovery and a fast persisted-marker check so reads
73    /// do not proceed while an incomplete commit is pending replay.
74    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
75    where
76        E: EntityKind<Canister = C> + EntityValue,
77    {
78        ensure_recovered(self)?;
79
80        Ok(Context::new(self))
81    }
82
83    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
84        self.store.with(|reg| f(reg))
85    }
86
87    pub(crate) fn prepare_row_commit_op(
88        &self,
89        op: &CommitRowOp,
90    ) -> Result<PreparedRowCommitOp, InternalError> {
91        let hooks = self
92            .entity_runtime_hooks
93            .iter()
94            .find(|hooks| hooks.entity_path == op.entity_path.as_str())
95            .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
96
97        (hooks.prepare_row_commit)(self, op)
98    }
99
100    // Validate strong relation constraints for delete-selected target keys.
101    pub(crate) fn validate_delete_strong_relations(
102        &self,
103        target_path: &str,
104        deleted_target_keys: &BTreeSet<RawDataKey>,
105    ) -> Result<(), InternalError> {
106        if deleted_target_keys.is_empty() {
107            return Ok(());
108        }
109
110        for hooks in self.entity_runtime_hooks {
111            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
112        }
113
114        Ok(())
115    }
116}
117
118///
119/// EntityRuntimeHooks
120///
121/// Per-entity runtime callbacks used for commit preparation and delete-side
122/// strong relation validation.
123///
124
125pub struct EntityRuntimeHooks<C: CanisterKind> {
126    pub entity_path: &'static str,
127    pub prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
128    pub validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
129}
130
131impl<C: CanisterKind> EntityRuntimeHooks<C> {
132    #[must_use]
133    pub const fn new(
134        entity_path: &'static str,
135        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
136        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
137    ) -> Self {
138        Self {
139            entity_path,
140            prepare_row_commit,
141            validate_delete_strong_relations,
142        }
143    }
144}
145
146impl<C: CanisterKind> Copy for Db<C> {}
147
148impl<C: CanisterKind> Clone for Db<C> {
149    fn clone(&self) -> Self {
150        *self
151    }
152}
153
154///
155/// DbSession
156///
157/// Session-scoped database handle with policy (debug, metrics) and execution routing.
158///
159pub struct DbSession<C: CanisterKind> {
160    db: Db<C>,
161    debug: bool,
162    metrics: Option<&'static dyn MetricsSink>,
163}
164
165impl<C: CanisterKind> DbSession<C> {
166    #[must_use]
167    pub const fn new(db: Db<C>) -> Self {
168        Self {
169            db,
170            debug: false,
171            metrics: None,
172        }
173    }
174
175    #[must_use]
176    pub const fn debug(mut self) -> Self {
177        self.debug = true;
178        self
179    }
180
181    #[must_use]
182    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
183        self.metrics = Some(sink);
184        self
185    }
186
187    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
188        if let Some(sink) = self.metrics {
189            sink::with_metrics_sink(sink, f)
190        } else {
191            f()
192        }
193    }
194
195    // ---------------------------------------------------------------------
196    // Query entry points (public, fluent)
197    // ---------------------------------------------------------------------
198
199    #[must_use]
200    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
201    where
202        E: EntityKind<Canister = C>,
203    {
204        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
205    }
206
207    #[must_use]
208    pub const fn load_with_consistency<E>(
209        &self,
210        consistency: ReadConsistency,
211    ) -> SessionLoadQuery<'_, C, E>
212    where
213        E: EntityKind<Canister = C>,
214    {
215        SessionLoadQuery::new(self, Query::new(consistency))
216    }
217
218    #[must_use]
219    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
220    where
221        E: EntityKind<Canister = C>,
222    {
223        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
224    }
225
226    #[must_use]
227    pub fn delete_with_consistency<E>(
228        &self,
229        consistency: ReadConsistency,
230    ) -> SessionDeleteQuery<'_, C, E>
231    where
232        E: EntityKind<Canister = C>,
233    {
234        SessionDeleteQuery::new(self, Query::new(consistency).delete())
235    }
236
237    // ---------------------------------------------------------------------
238    // Low-level executors (crate-internal; execution primitives)
239    // ---------------------------------------------------------------------
240
241    #[must_use]
242    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
243    where
244        E: EntityKind<Canister = C> + EntityValue,
245    {
246        LoadExecutor::new(self.db, self.debug)
247    }
248
249    #[must_use]
250    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
251    where
252        E: EntityKind<Canister = C> + EntityValue,
253    {
254        DeleteExecutor::new(self.db, self.debug)
255    }
256
257    #[must_use]
258    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
259    where
260        E: EntityKind<Canister = C> + EntityValue,
261    {
262        SaveExecutor::new(self.db, self.debug)
263    }
264
265    // ---------------------------------------------------------------------
266    // Query diagnostics / execution (internal routing)
267    // ---------------------------------------------------------------------
268
269    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
270    where
271        E: EntityKind<Canister = C> + EntityValue,
272    {
273        let plan = query.plan()?;
274
275        let result = match query.mode() {
276            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
277            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
278        };
279
280        result.map_err(QueryError::Execute)
281    }
282
283    pub(crate) fn execute_load_query_paged<E>(
284        &self,
285        query: &Query<E>,
286        cursor_token: Option<&str>,
287    ) -> Result<(Response<E>, Option<Vec<u8>>), QueryError>
288    where
289        E: EntityKind<Canister = C> + EntityValue,
290    {
291        let plan = query.plan()?;
292        let cursor_bytes = match cursor_token {
293            Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
294                QueryError::from(PlanError::InvalidContinuationCursor { reason })
295            })?),
296            None => None,
297        };
298        let boundary = plan.plan_cursor_boundary(cursor_bytes.as_deref())?;
299
300        let page = self
301            .with_metrics(|| self.load_executor::<E>().execute_paged(plan, boundary))
302            .map_err(QueryError::Execute)?;
303
304        Ok((page.items, page.next_cursor))
305    }
306
307    // ---------------------------------------------------------------------
308    // High-level write API (public, intent-level)
309    // ---------------------------------------------------------------------
310
311    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
312    where
313        E: EntityKind<Canister = C> + EntityValue,
314    {
315        self.with_metrics(|| self.save_executor::<E>().insert(entity))
316            .map(WriteResponse::new)
317    }
318
319    /// Insert a single-entity-type batch atomically in one commit window.
320    ///
321    /// If any item fails pre-commit validation, no row in the batch is persisted.
322    ///
323    /// This API is not a multi-entity transaction surface.
324    pub fn insert_many_atomic<E>(
325        &self,
326        entities: impl IntoIterator<Item = E>,
327    ) -> Result<WriteBatchResponse<E>, InternalError>
328    where
329        E: EntityKind<Canister = C> + EntityValue,
330    {
331        let entities =
332            self.with_metrics(|| self.save_executor::<E>().insert_many_atomic(entities))?;
333
334        Ok(WriteBatchResponse::new(entities))
335    }
336
337    /// Insert a batch with explicitly non-atomic semantics.
338    ///
339    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
340    pub fn insert_many_non_atomic<E>(
341        &self,
342        entities: impl IntoIterator<Item = E>,
343    ) -> Result<WriteBatchResponse<E>, InternalError>
344    where
345        E: EntityKind<Canister = C> + EntityValue,
346    {
347        let entities =
348            self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
349
350        Ok(WriteBatchResponse::new(entities))
351    }
352
353    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
354    where
355        E: EntityKind<Canister = C> + EntityValue,
356    {
357        self.with_metrics(|| self.save_executor::<E>().replace(entity))
358            .map(WriteResponse::new)
359    }
360
361    /// Replace a single-entity-type batch atomically in one commit window.
362    ///
363    /// If any item fails pre-commit validation, no row in the batch is persisted.
364    ///
365    /// This API is not a multi-entity transaction surface.
366    pub fn replace_many_atomic<E>(
367        &self,
368        entities: impl IntoIterator<Item = E>,
369    ) -> Result<WriteBatchResponse<E>, InternalError>
370    where
371        E: EntityKind<Canister = C> + EntityValue,
372    {
373        let entities =
374            self.with_metrics(|| self.save_executor::<E>().replace_many_atomic(entities))?;
375
376        Ok(WriteBatchResponse::new(entities))
377    }
378
379    /// Replace a batch with explicitly non-atomic semantics.
380    ///
381    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
382    pub fn replace_many_non_atomic<E>(
383        &self,
384        entities: impl IntoIterator<Item = E>,
385    ) -> Result<WriteBatchResponse<E>, InternalError>
386    where
387        E: EntityKind<Canister = C> + EntityValue,
388    {
389        let entities =
390            self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
391
392        Ok(WriteBatchResponse::new(entities))
393    }
394
395    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
396    where
397        E: EntityKind<Canister = C> + EntityValue,
398    {
399        self.with_metrics(|| self.save_executor::<E>().update(entity))
400            .map(WriteResponse::new)
401    }
402
403    /// Update a single-entity-type batch atomically in one commit window.
404    ///
405    /// If any item fails pre-commit validation, no row in the batch is persisted.
406    ///
407    /// This API is not a multi-entity transaction surface.
408    pub fn update_many_atomic<E>(
409        &self,
410        entities: impl IntoIterator<Item = E>,
411    ) -> Result<WriteBatchResponse<E>, InternalError>
412    where
413        E: EntityKind<Canister = C> + EntityValue,
414    {
415        let entities =
416            self.with_metrics(|| self.save_executor::<E>().update_many_atomic(entities))?;
417
418        Ok(WriteBatchResponse::new(entities))
419    }
420
421    /// Update a batch with explicitly non-atomic semantics.
422    ///
423    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
424    pub fn update_many_non_atomic<E>(
425        &self,
426        entities: impl IntoIterator<Item = E>,
427    ) -> Result<WriteBatchResponse<E>, InternalError>
428    where
429        E: EntityKind<Canister = C> + EntityValue,
430    {
431        let entities =
432            self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
433
434        Ok(WriteBatchResponse::new(entities))
435    }
436
437    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
438    where
439        E: EntityKind<Canister = C> + EntityValue,
440    {
441        self.with_metrics(|| self.save_executor::<E>().insert_view(view))
442    }
443
444    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
445    where
446        E: EntityKind<Canister = C> + EntityValue,
447    {
448        self.with_metrics(|| self.save_executor::<E>().replace_view(view))
449    }
450
451    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
452    where
453        E: EntityKind<Canister = C> + EntityValue,
454    {
455        self.with_metrics(|| self.save_executor::<E>().update_view(view))
456    }
457
458    /// TEST ONLY: clear all registered data and index stores for this database.
459    #[cfg(test)]
460    #[doc(hidden)]
461    pub fn clear_stores_for_tests(&self) {
462        self.db.with_store_registry(|reg| {
463            for (_, store) in reg.iter() {
464                store.with_data_mut(DataStore::clear);
465                store.with_index_mut(IndexStore::clear);
466            }
467        });
468    }
469}