Skip to main content

icydb_core/db/
mod.rs

1// 1️⃣ Module declarations
2pub(crate) mod cursor;
3pub(crate) mod diagnostics;
4pub(crate) mod identity;
5pub(crate) mod query;
6pub(crate) mod registry;
7pub(crate) mod response;
8
9pub(in crate::db) mod codec;
10pub(in crate::db) mod commit;
11pub(in crate::db) mod data;
12pub(in crate::db) mod entity_decode;
13pub(in crate::db) mod executor;
14pub(in crate::db) mod index;
15pub(in crate::db) mod relation;
16
17// 2️⃣ Public re-exports (Tier-2 API surface)
18pub use cursor::encode_cursor;
19pub use data::DataStore;
20pub(crate) use data::StorageKey;
21pub use diagnostics::StorageReport;
22pub use identity::{EntityName, IndexName};
23pub use index::IndexStore;
24#[cfg(test)]
25pub(crate) use index::hash_value;
26pub use query::{
27    ReadConsistency,
28    builder::field::FieldRef,
29    expr::{FilterExpr, SortExpr},
30    intent::{IntentError, Query, QueryError},
31    plan::{OrderDirection, PlanError},
32    predicate::{
33        CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature, ValidateError,
34    },
35    session::{
36        delete::SessionDeleteQuery,
37        load::{PagedLoadQuery, SessionLoadQuery},
38    },
39};
40pub use registry::StoreRegistry;
41pub use relation::validate_delete_strong_relations_for_source;
42pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
43
44// 3️⃣ Internal imports (implementation wiring)
45use crate::{
46    db::{
47        commit::{
48            CommitRowOp, PreparedRowCommitOp, ensure_recovered, prepare_row_commit_for_entity,
49        },
50        data::RawDataKey,
51        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
52        query::intent::QueryMode,
53        relation::StrongRelationDeleteValidateFn,
54    },
55    error::{ErrorClass, ErrorOrigin, InternalError},
56    obs::sink::{MetricsSink, with_metrics_sink},
57    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
58};
59use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
60
61///
62/// Db
63/// A handle to the set of stores registered for a specific canister domain.
64///
65
66pub struct Db<C: CanisterKind> {
67    store: &'static LocalKey<StoreRegistry>,
68    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
69    _marker: PhantomData<C>,
70}
71
72impl<C: CanisterKind> Db<C> {
73    #[must_use]
74    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
75        Self::new_with_hooks(store, &[])
76    }
77
78    #[must_use]
79    pub const fn new_with_hooks(
80        store: &'static LocalKey<StoreRegistry>,
81        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
82    ) -> Self {
83        Self {
84            store,
85            entity_runtime_hooks,
86            _marker: PhantomData,
87        }
88    }
89
90    #[must_use]
91    pub(crate) const fn context<E>(&self) -> Context<'_, E>
92    where
93        E: EntityKind<Canister = C> + EntityValue,
94    {
95        Context::new(self)
96    }
97
98    /// Return a recovery-guarded context for read paths.
99    ///
100    /// This enforces startup recovery and a fast persisted-marker check so reads
101    /// do not proceed while an incomplete commit is pending replay.
102    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
103    where
104        E: EntityKind<Canister = C> + EntityValue,
105    {
106        ensure_recovered(self)?;
107
108        Ok(Context::new(self))
109    }
110
111    /// Ensure startup/in-progress commit recovery has been applied.
112    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
113        ensure_recovered(self)
114    }
115
116    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
117        self.store.with(|reg| f(reg))
118    }
119
120    pub fn storage_report(
121        &self,
122        name_to_path: &[(&'static str, &'static str)],
123    ) -> Result<StorageReport, InternalError> {
124        diagnostics::storage_report(self, name_to_path)
125    }
126
127    pub(in crate::db) fn prepare_row_commit_op(
128        &self,
129        op: &CommitRowOp,
130    ) -> Result<PreparedRowCommitOp, InternalError> {
131        let hooks = self
132            .entity_runtime_hooks
133            .iter()
134            .find(|hooks| hooks.entity_path == op.entity_path.as_str())
135            .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
136
137        (hooks.prepare_row_commit)(self, op)
138    }
139
140    // Validate strong relation constraints for delete-selected target keys.
141    pub(crate) fn validate_delete_strong_relations(
142        &self,
143        target_path: &str,
144        deleted_target_keys: &BTreeSet<RawDataKey>,
145    ) -> Result<(), InternalError> {
146        if deleted_target_keys.is_empty() {
147            return Ok(());
148        }
149
150        for hooks in self.entity_runtime_hooks {
151            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
152        }
153
154        Ok(())
155    }
156}
157
158///
159/// EntityRuntimeHooks
160///
161/// Per-entity runtime callbacks used for commit preparation and delete-side
162/// strong relation validation.
163///
164
165pub struct EntityRuntimeHooks<C: CanisterKind> {
166    pub(crate) entity_name: &'static str,
167    pub(crate) entity_path: &'static str,
168    pub(in crate::db) prepare_row_commit:
169        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
170    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
171}
172
173impl<C: CanisterKind> EntityRuntimeHooks<C> {
174    #[must_use]
175    pub(in crate::db) const fn new(
176        entity_name: &'static str,
177        entity_path: &'static str,
178        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
179        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
180    ) -> Self {
181        Self {
182            entity_name,
183            entity_path,
184            prepare_row_commit,
185            validate_delete_strong_relations,
186        }
187    }
188
189    #[must_use]
190    pub const fn for_entity<E>(
191        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
192    ) -> Self
193    where
194        E: EntityKind<Canister = C> + EntityValue,
195    {
196        Self::new(
197            <E as EntityIdentity>::ENTITY_NAME,
198            E::PATH,
199            prepare_row_commit_for_entity::<E>,
200            validate_delete_strong_relations,
201        )
202    }
203}
204
205impl<C: CanisterKind> Db<C> {
206    #[must_use]
207    pub(crate) const fn has_runtime_hooks(&self) -> bool {
208        !self.entity_runtime_hooks.is_empty()
209    }
210
211    pub(crate) fn runtime_hook_for_entity_name(
212        &self,
213        entity_name: &str,
214    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
215        let mut matched = None;
216        for hooks in self.entity_runtime_hooks {
217            if hooks.entity_name != entity_name {
218                continue;
219            }
220
221            if matched.is_some() {
222                return Err(InternalError::new(
223                    ErrorClass::InvariantViolation,
224                    ErrorOrigin::Store,
225                    format!("duplicate runtime hooks for entity name '{entity_name}'"),
226                ));
227            }
228
229            matched = Some(hooks);
230        }
231
232        matched.ok_or_else(|| {
233            InternalError::new(
234                ErrorClass::Unsupported,
235                ErrorOrigin::Store,
236                format!("unsupported entity name in data store: '{entity_name}'"),
237            )
238        })
239    }
240}
241
242impl<C: CanisterKind> Copy for Db<C> {}
243
244impl<C: CanisterKind> Clone for Db<C> {
245    fn clone(&self) -> Self {
246        *self
247    }
248}
249
250///
251/// DbSession
252///
253/// Session-scoped database handle with policy (debug, metrics) and execution routing.
254///
255pub struct DbSession<C: CanisterKind> {
256    db: Db<C>,
257    debug: bool,
258    metrics: Option<&'static dyn MetricsSink>,
259}
260
261impl<C: CanisterKind> DbSession<C> {
262    #[must_use]
263    pub const fn new(db: Db<C>) -> Self {
264        Self {
265            db,
266            debug: false,
267            metrics: None,
268        }
269    }
270
271    #[must_use]
272    pub const fn debug(mut self) -> Self {
273        self.debug = true;
274        self
275    }
276
277    #[must_use]
278    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
279        self.metrics = Some(sink);
280        self
281    }
282
283    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
284        if let Some(sink) = self.metrics {
285            with_metrics_sink(sink, f)
286        } else {
287            f()
288        }
289    }
290
291    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
292    fn execute_save_with<E, T, R>(
293        &self,
294        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
295        map: impl FnOnce(T) -> R,
296    ) -> Result<R, InternalError>
297    where
298        E: EntityKind<Canister = C> + EntityValue,
299    {
300        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
301
302        Ok(map(value))
303    }
304
305    // Shared save-facade wrappers keep response shape explicit at call sites.
306    fn execute_save_entity<E>(
307        &self,
308        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
309    ) -> Result<WriteResponse<E>, InternalError>
310    where
311        E: EntityKind<Canister = C> + EntityValue,
312    {
313        self.execute_save_with(op, WriteResponse::new)
314    }
315
316    fn execute_save_batch<E>(
317        &self,
318        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
319    ) -> Result<WriteBatchResponse<E>, InternalError>
320    where
321        E: EntityKind<Canister = C> + EntityValue,
322    {
323        self.execute_save_with(op, WriteBatchResponse::new)
324    }
325
326    fn execute_save_view<E>(
327        &self,
328        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
329    ) -> Result<E::ViewType, InternalError>
330    where
331        E: EntityKind<Canister = C> + EntityValue,
332    {
333        self.execute_save_with(op, std::convert::identity)
334    }
335
336    // ---------------------------------------------------------------------
337    // Query entry points (public, fluent)
338    // ---------------------------------------------------------------------
339
340    #[must_use]
341    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
342    where
343        E: EntityKind<Canister = C>,
344    {
345        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
346    }
347
348    #[must_use]
349    pub const fn load_with_consistency<E>(
350        &self,
351        consistency: ReadConsistency,
352    ) -> SessionLoadQuery<'_, C, E>
353    where
354        E: EntityKind<Canister = C>,
355    {
356        SessionLoadQuery::new(self, Query::new(consistency))
357    }
358
359    #[must_use]
360    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
361    where
362        E: EntityKind<Canister = C>,
363    {
364        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
365    }
366
367    #[must_use]
368    pub fn delete_with_consistency<E>(
369        &self,
370        consistency: ReadConsistency,
371    ) -> SessionDeleteQuery<'_, C, E>
372    where
373        E: EntityKind<Canister = C>,
374    {
375        SessionDeleteQuery::new(self, Query::new(consistency).delete())
376    }
377
378    // ---------------------------------------------------------------------
379    // Low-level executors (crate-internal; execution primitives)
380    // ---------------------------------------------------------------------
381
382    #[must_use]
383    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
384    where
385        E: EntityKind<Canister = C> + EntityValue,
386    {
387        LoadExecutor::new(self.db, self.debug)
388    }
389
390    #[must_use]
391    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
392    where
393        E: EntityKind<Canister = C> + EntityValue,
394    {
395        DeleteExecutor::new(self.db, self.debug)
396    }
397
398    #[must_use]
399    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
400    where
401        E: EntityKind<Canister = C> + EntityValue,
402    {
403        SaveExecutor::new(self.db, self.debug)
404    }
405
406    // ---------------------------------------------------------------------
407    // Query diagnostics / execution (internal routing)
408    // ---------------------------------------------------------------------
409
410    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
411    where
412        E: EntityKind<Canister = C> + EntityValue,
413    {
414        let plan = query.plan()?;
415
416        let result = match query.mode() {
417            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
418            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
419        };
420
421        result.map_err(QueryError::Execute)
422    }
423
424    pub(crate) fn execute_load_query_paged<E>(
425        &self,
426        query: &Query<E>,
427        cursor_token: Option<&str>,
428    ) -> Result<(Response<E>, Option<Vec<u8>>), QueryError>
429    where
430        E: EntityKind<Canister = C> + EntityValue,
431    {
432        let plan = query.plan()?;
433        let cursor_bytes = match cursor_token {
434            Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
435                QueryError::from(PlanError::InvalidContinuationCursor { reason })
436            })?),
437            None => None,
438        };
439        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
440
441        let page = self
442            .with_metrics(|| {
443                self.load_executor::<E>()
444                    .execute_paged_with_cursor(plan, cursor)
445            })
446            .map_err(QueryError::Execute)?;
447
448        Ok((page.items, page.next_cursor))
449    }
450
451    // ---------------------------------------------------------------------
452    // High-level write API (public, intent-level)
453    // ---------------------------------------------------------------------
454
455    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
456    where
457        E: EntityKind<Canister = C> + EntityValue,
458    {
459        self.execute_save_entity(|save| save.insert(entity))
460    }
461
462    /// Insert a single-entity-type batch atomically in one commit window.
463    ///
464    /// If any item fails pre-commit validation, no row in the batch is persisted.
465    ///
466    /// This API is not a multi-entity transaction surface.
467    pub fn insert_many_atomic<E>(
468        &self,
469        entities: impl IntoIterator<Item = E>,
470    ) -> Result<WriteBatchResponse<E>, InternalError>
471    where
472        E: EntityKind<Canister = C> + EntityValue,
473    {
474        self.execute_save_batch(|save| save.insert_many_atomic(entities))
475    }
476
477    /// Insert a batch with explicitly non-atomic semantics.
478    ///
479    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
480    pub fn insert_many_non_atomic<E>(
481        &self,
482        entities: impl IntoIterator<Item = E>,
483    ) -> Result<WriteBatchResponse<E>, InternalError>
484    where
485        E: EntityKind<Canister = C> + EntityValue,
486    {
487        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
488    }
489
490    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
491    where
492        E: EntityKind<Canister = C> + EntityValue,
493    {
494        self.execute_save_entity(|save| save.replace(entity))
495    }
496
497    /// Replace a single-entity-type batch atomically in one commit window.
498    ///
499    /// If any item fails pre-commit validation, no row in the batch is persisted.
500    ///
501    /// This API is not a multi-entity transaction surface.
502    pub fn replace_many_atomic<E>(
503        &self,
504        entities: impl IntoIterator<Item = E>,
505    ) -> Result<WriteBatchResponse<E>, InternalError>
506    where
507        E: EntityKind<Canister = C> + EntityValue,
508    {
509        self.execute_save_batch(|save| save.replace_many_atomic(entities))
510    }
511
512    /// Replace a batch with explicitly non-atomic semantics.
513    ///
514    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
515    pub fn replace_many_non_atomic<E>(
516        &self,
517        entities: impl IntoIterator<Item = E>,
518    ) -> Result<WriteBatchResponse<E>, InternalError>
519    where
520        E: EntityKind<Canister = C> + EntityValue,
521    {
522        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
523    }
524
525    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
526    where
527        E: EntityKind<Canister = C> + EntityValue,
528    {
529        self.execute_save_entity(|save| save.update(entity))
530    }
531
532    /// Update a single-entity-type batch atomically in one commit window.
533    ///
534    /// If any item fails pre-commit validation, no row in the batch is persisted.
535    ///
536    /// This API is not a multi-entity transaction surface.
537    pub fn update_many_atomic<E>(
538        &self,
539        entities: impl IntoIterator<Item = E>,
540    ) -> Result<WriteBatchResponse<E>, InternalError>
541    where
542        E: EntityKind<Canister = C> + EntityValue,
543    {
544        self.execute_save_batch(|save| save.update_many_atomic(entities))
545    }
546
547    /// Update a batch with explicitly non-atomic semantics.
548    ///
549    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
550    pub fn update_many_non_atomic<E>(
551        &self,
552        entities: impl IntoIterator<Item = E>,
553    ) -> Result<WriteBatchResponse<E>, InternalError>
554    where
555        E: EntityKind<Canister = C> + EntityValue,
556    {
557        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
558    }
559
560    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
561    where
562        E: EntityKind<Canister = C> + EntityValue,
563    {
564        self.execute_save_view::<E>(|save| save.insert_view(view))
565    }
566
567    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
568    where
569        E: EntityKind<Canister = C> + EntityValue,
570    {
571        self.execute_save_view::<E>(|save| save.replace_view(view))
572    }
573
574    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
575    where
576        E: EntityKind<Canister = C> + EntityValue,
577    {
578        self.execute_save_view::<E>(|save| save.update_view(view))
579    }
580
581    /// TEST ONLY: clear all registered data and index stores for this database.
582    #[cfg(test)]
583    #[doc(hidden)]
584    pub fn clear_stores_for_tests(&self) {
585        self.db.with_store_registry(|reg| {
586            for (_, store) in reg.iter() {
587                store.with_data_mut(DataStore::clear);
588                store.with_index_mut(IndexStore::clear);
589            }
590        });
591    }
592}