Skip to main content

icydb_core/db/
mod.rs

1// 1️⃣ Module declarations
2pub(crate) mod cursor;
3pub(crate) mod diagnostics;
4pub(crate) mod identity;
5pub(crate) mod query;
6pub(crate) mod registry;
7pub(crate) mod response;
8
9pub(in crate::db) mod codec;
10pub(in crate::db) mod commit;
11pub(in crate::db) mod data;
12pub(in crate::db) mod entity_decode;
13pub(in crate::db) mod executor;
14pub(in crate::db) mod index;
15pub(in crate::db) mod relation;
16
17// 2️⃣ Public re-exports (Tier-2 API surface)
18pub use cursor::encode_cursor;
19pub use data::DataStore;
20pub(crate) use data::StorageKey;
21pub use diagnostics::StorageReport;
22pub use executor::{
23    ExecutionAccessPathVariant, ExecutionFastPath, ExecutionPushdownType, ExecutionTrace,
24};
25pub use identity::{EntityName, IndexName};
26pub use index::IndexStore;
27#[cfg(test)]
28pub(crate) use index::hash_value;
29pub use query::{
30    ReadConsistency,
31    builder::field::FieldRef,
32    expr::{FilterExpr, SortExpr},
33    intent::{IntentError, Query, QueryError},
34    plan::{OrderDirection, PlanError},
35    predicate::{
36        CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature, ValidateError,
37    },
38    session::{
39        delete::SessionDeleteQuery,
40        load::{PagedLoadQuery, SessionLoadQuery},
41    },
42};
43pub use registry::StoreRegistry;
44pub use relation::validate_delete_strong_relations_for_source;
45pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
46
47// 3️⃣ Internal imports (implementation wiring)
48use crate::{
49    db::{
50        commit::{
51            CommitRowOp, PreparedRowCommitOp, ensure_recovered, prepare_row_commit_for_entity,
52        },
53        data::RawDataKey,
54        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
55        query::intent::QueryMode,
56        relation::StrongRelationDeleteValidateFn,
57    },
58    error::{ErrorClass, ErrorOrigin, InternalError},
59    obs::sink::{MetricsSink, with_metrics_sink},
60    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
61};
62use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
63
64///
65/// PagedLoadExecution
66///
67/// Cursor-paged load response with optional continuation cursor bytes.
68///
69pub type PagedLoadExecution<E> = (Response<E>, Option<Vec<u8>>);
70
71///
72/// PagedLoadExecutionWithTrace
73///
74/// Cursor-paged load response plus optional execution trace details.
75///
76pub type PagedLoadExecutionWithTrace<E> = (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>);
77
78///
79/// Db
80/// A handle to the set of stores registered for a specific canister domain.
81///
82
83pub struct Db<C: CanisterKind> {
84    store: &'static LocalKey<StoreRegistry>,
85    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
86    _marker: PhantomData<C>,
87}
88
89impl<C: CanisterKind> Db<C> {
90    #[must_use]
91    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
92        Self::new_with_hooks(store, &[])
93    }
94
95    #[must_use]
96    pub const fn new_with_hooks(
97        store: &'static LocalKey<StoreRegistry>,
98        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
99    ) -> Self {
100        Self {
101            store,
102            entity_runtime_hooks,
103            _marker: PhantomData,
104        }
105    }
106
107    #[must_use]
108    pub(crate) const fn context<E>(&self) -> Context<'_, E>
109    where
110        E: EntityKind<Canister = C> + EntityValue,
111    {
112        Context::new(self)
113    }
114
115    /// Return a recovery-guarded context for read paths.
116    ///
117    /// This enforces startup recovery and a fast persisted-marker check so reads
118    /// do not proceed while an incomplete commit is pending replay.
119    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
120    where
121        E: EntityKind<Canister = C> + EntityValue,
122    {
123        ensure_recovered(self)?;
124
125        Ok(Context::new(self))
126    }
127
128    /// Ensure startup/in-progress commit recovery has been applied.
129    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
130        ensure_recovered(self)
131    }
132
133    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
134        self.store.with(|reg| f(reg))
135    }
136
137    pub fn storage_report(
138        &self,
139        name_to_path: &[(&'static str, &'static str)],
140    ) -> Result<StorageReport, InternalError> {
141        diagnostics::storage_report(self, name_to_path)
142    }
143
144    pub(in crate::db) fn prepare_row_commit_op(
145        &self,
146        op: &CommitRowOp,
147    ) -> Result<PreparedRowCommitOp, InternalError> {
148        let hooks = self
149            .entity_runtime_hooks
150            .iter()
151            .find(|hooks| hooks.entity_path == op.entity_path.as_str())
152            .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
153
154        (hooks.prepare_row_commit)(self, op)
155    }
156
157    // Validate strong relation constraints for delete-selected target keys.
158    pub(crate) fn validate_delete_strong_relations(
159        &self,
160        target_path: &str,
161        deleted_target_keys: &BTreeSet<RawDataKey>,
162    ) -> Result<(), InternalError> {
163        if deleted_target_keys.is_empty() {
164            return Ok(());
165        }
166
167        for hooks in self.entity_runtime_hooks {
168            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
169        }
170
171        Ok(())
172    }
173}
174
175///
176/// EntityRuntimeHooks
177///
178/// Per-entity runtime callbacks used for commit preparation and delete-side
179/// strong relation validation.
180///
181
182pub struct EntityRuntimeHooks<C: CanisterKind> {
183    pub(crate) entity_name: &'static str,
184    pub(crate) entity_path: &'static str,
185    pub(in crate::db) prepare_row_commit:
186        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
187    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
188}
189
190impl<C: CanisterKind> EntityRuntimeHooks<C> {
191    #[must_use]
192    pub(in crate::db) const fn new(
193        entity_name: &'static str,
194        entity_path: &'static str,
195        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
196        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
197    ) -> Self {
198        Self {
199            entity_name,
200            entity_path,
201            prepare_row_commit,
202            validate_delete_strong_relations,
203        }
204    }
205
206    #[must_use]
207    pub const fn for_entity<E>(
208        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
209    ) -> Self
210    where
211        E: EntityKind<Canister = C> + EntityValue,
212    {
213        Self::new(
214            <E as EntityIdentity>::ENTITY_NAME,
215            E::PATH,
216            prepare_row_commit_for_entity::<E>,
217            validate_delete_strong_relations,
218        )
219    }
220}
221
222impl<C: CanisterKind> Db<C> {
223    #[must_use]
224    pub(crate) const fn has_runtime_hooks(&self) -> bool {
225        !self.entity_runtime_hooks.is_empty()
226    }
227
228    pub(crate) fn runtime_hook_for_entity_name(
229        &self,
230        entity_name: &str,
231    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
232        let mut matched = None;
233        for hooks in self.entity_runtime_hooks {
234            if hooks.entity_name != entity_name {
235                continue;
236            }
237
238            if matched.is_some() {
239                return Err(InternalError::new(
240                    ErrorClass::InvariantViolation,
241                    ErrorOrigin::Store,
242                    format!("duplicate runtime hooks for entity name '{entity_name}'"),
243                ));
244            }
245
246            matched = Some(hooks);
247        }
248
249        matched.ok_or_else(|| {
250            InternalError::new(
251                ErrorClass::Unsupported,
252                ErrorOrigin::Store,
253                format!("unsupported entity name in data store: '{entity_name}'"),
254            )
255        })
256    }
257}
258
259impl<C: CanisterKind> Copy for Db<C> {}
260
261impl<C: CanisterKind> Clone for Db<C> {
262    fn clone(&self) -> Self {
263        *self
264    }
265}
266
267///
268/// DbSession
269///
270/// Session-scoped database handle with policy (debug, metrics) and execution routing.
271///
272pub struct DbSession<C: CanisterKind> {
273    db: Db<C>,
274    debug: bool,
275    metrics: Option<&'static dyn MetricsSink>,
276}
277
278impl<C: CanisterKind> DbSession<C> {
279    #[must_use]
280    pub const fn new(db: Db<C>) -> Self {
281        Self {
282            db,
283            debug: false,
284            metrics: None,
285        }
286    }
287
288    #[must_use]
289    pub const fn debug(mut self) -> Self {
290        self.debug = true;
291        self
292    }
293
294    #[must_use]
295    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
296        self.metrics = Some(sink);
297        self
298    }
299
300    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
301        if let Some(sink) = self.metrics {
302            with_metrics_sink(sink, f)
303        } else {
304            f()
305        }
306    }
307
308    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
309    fn execute_save_with<E, T, R>(
310        &self,
311        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
312        map: impl FnOnce(T) -> R,
313    ) -> Result<R, InternalError>
314    where
315        E: EntityKind<Canister = C> + EntityValue,
316    {
317        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
318
319        Ok(map(value))
320    }
321
322    // Shared save-facade wrappers keep response shape explicit at call sites.
323    fn execute_save_entity<E>(
324        &self,
325        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
326    ) -> Result<WriteResponse<E>, InternalError>
327    where
328        E: EntityKind<Canister = C> + EntityValue,
329    {
330        self.execute_save_with(op, WriteResponse::new)
331    }
332
333    fn execute_save_batch<E>(
334        &self,
335        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
336    ) -> Result<WriteBatchResponse<E>, InternalError>
337    where
338        E: EntityKind<Canister = C> + EntityValue,
339    {
340        self.execute_save_with(op, WriteBatchResponse::new)
341    }
342
343    fn execute_save_view<E>(
344        &self,
345        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
346    ) -> Result<E::ViewType, InternalError>
347    where
348        E: EntityKind<Canister = C> + EntityValue,
349    {
350        self.execute_save_with(op, std::convert::identity)
351    }
352
353    // ---------------------------------------------------------------------
354    // Query entry points (public, fluent)
355    // ---------------------------------------------------------------------
356
357    #[must_use]
358    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
359    where
360        E: EntityKind<Canister = C>,
361    {
362        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
363    }
364
365    #[must_use]
366    pub const fn load_with_consistency<E>(
367        &self,
368        consistency: ReadConsistency,
369    ) -> SessionLoadQuery<'_, C, E>
370    where
371        E: EntityKind<Canister = C>,
372    {
373        SessionLoadQuery::new(self, Query::new(consistency))
374    }
375
376    #[must_use]
377    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
378    where
379        E: EntityKind<Canister = C>,
380    {
381        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
382    }
383
384    #[must_use]
385    pub fn delete_with_consistency<E>(
386        &self,
387        consistency: ReadConsistency,
388    ) -> SessionDeleteQuery<'_, C, E>
389    where
390        E: EntityKind<Canister = C>,
391    {
392        SessionDeleteQuery::new(self, Query::new(consistency).delete())
393    }
394
395    // ---------------------------------------------------------------------
396    // Low-level executors (crate-internal; execution primitives)
397    // ---------------------------------------------------------------------
398
399    #[must_use]
400    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
401    where
402        E: EntityKind<Canister = C> + EntityValue,
403    {
404        LoadExecutor::new(self.db, self.debug)
405    }
406
407    #[must_use]
408    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
409    where
410        E: EntityKind<Canister = C> + EntityValue,
411    {
412        DeleteExecutor::new(self.db, self.debug)
413    }
414
415    #[must_use]
416    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
417    where
418        E: EntityKind<Canister = C> + EntityValue,
419    {
420        SaveExecutor::new(self.db, self.debug)
421    }
422
423    // ---------------------------------------------------------------------
424    // Query diagnostics / execution (internal routing)
425    // ---------------------------------------------------------------------
426
427    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
428    where
429        E: EntityKind<Canister = C> + EntityValue,
430    {
431        let plan = query.plan()?;
432
433        let result = match query.mode() {
434            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
435            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
436        };
437
438        result.map_err(QueryError::Execute)
439    }
440
441    pub(crate) fn execute_load_query_paged_with_trace<E>(
442        &self,
443        query: &Query<E>,
444        cursor_token: Option<&str>,
445    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
446    where
447        E: EntityKind<Canister = C> + EntityValue,
448    {
449        let plan = query.plan()?;
450        let cursor_bytes = match cursor_token {
451            Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
452                QueryError::from(PlanError::from(
453                    crate::db::query::plan::CursorPlanError::InvalidContinuationCursor { reason },
454                ))
455            })?),
456            None => None,
457        };
458        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
459
460        let (page, trace) = self
461            .with_metrics(|| {
462                self.load_executor::<E>()
463                    .execute_paged_with_cursor_traced(plan, cursor)
464            })
465            .map_err(QueryError::Execute)?;
466
467        Ok((page.items, page.next_cursor, trace))
468    }
469
470    // ---------------------------------------------------------------------
471    // High-level write API (public, intent-level)
472    // ---------------------------------------------------------------------
473
474    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
475    where
476        E: EntityKind<Canister = C> + EntityValue,
477    {
478        self.execute_save_entity(|save| save.insert(entity))
479    }
480
481    /// Insert a single-entity-type batch atomically in one commit window.
482    ///
483    /// If any item fails pre-commit validation, no row in the batch is persisted.
484    ///
485    /// This API is not a multi-entity transaction surface.
486    pub fn insert_many_atomic<E>(
487        &self,
488        entities: impl IntoIterator<Item = E>,
489    ) -> Result<WriteBatchResponse<E>, InternalError>
490    where
491        E: EntityKind<Canister = C> + EntityValue,
492    {
493        self.execute_save_batch(|save| save.insert_many_atomic(entities))
494    }
495
496    /// Insert a batch with explicitly non-atomic semantics.
497    ///
498    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
499    pub fn insert_many_non_atomic<E>(
500        &self,
501        entities: impl IntoIterator<Item = E>,
502    ) -> Result<WriteBatchResponse<E>, InternalError>
503    where
504        E: EntityKind<Canister = C> + EntityValue,
505    {
506        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
507    }
508
509    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
510    where
511        E: EntityKind<Canister = C> + EntityValue,
512    {
513        self.execute_save_entity(|save| save.replace(entity))
514    }
515
516    /// Replace a single-entity-type batch atomically in one commit window.
517    ///
518    /// If any item fails pre-commit validation, no row in the batch is persisted.
519    ///
520    /// This API is not a multi-entity transaction surface.
521    pub fn replace_many_atomic<E>(
522        &self,
523        entities: impl IntoIterator<Item = E>,
524    ) -> Result<WriteBatchResponse<E>, InternalError>
525    where
526        E: EntityKind<Canister = C> + EntityValue,
527    {
528        self.execute_save_batch(|save| save.replace_many_atomic(entities))
529    }
530
531    /// Replace a batch with explicitly non-atomic semantics.
532    ///
533    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
534    pub fn replace_many_non_atomic<E>(
535        &self,
536        entities: impl IntoIterator<Item = E>,
537    ) -> Result<WriteBatchResponse<E>, InternalError>
538    where
539        E: EntityKind<Canister = C> + EntityValue,
540    {
541        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
542    }
543
544    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
545    where
546        E: EntityKind<Canister = C> + EntityValue,
547    {
548        self.execute_save_entity(|save| save.update(entity))
549    }
550
551    /// Update a single-entity-type batch atomically in one commit window.
552    ///
553    /// If any item fails pre-commit validation, no row in the batch is persisted.
554    ///
555    /// This API is not a multi-entity transaction surface.
556    pub fn update_many_atomic<E>(
557        &self,
558        entities: impl IntoIterator<Item = E>,
559    ) -> Result<WriteBatchResponse<E>, InternalError>
560    where
561        E: EntityKind<Canister = C> + EntityValue,
562    {
563        self.execute_save_batch(|save| save.update_many_atomic(entities))
564    }
565
566    /// Update a batch with explicitly non-atomic semantics.
567    ///
568    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
569    pub fn update_many_non_atomic<E>(
570        &self,
571        entities: impl IntoIterator<Item = E>,
572    ) -> Result<WriteBatchResponse<E>, InternalError>
573    where
574        E: EntityKind<Canister = C> + EntityValue,
575    {
576        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
577    }
578
579    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
580    where
581        E: EntityKind<Canister = C> + EntityValue,
582    {
583        self.execute_save_view::<E>(|save| save.insert_view(view))
584    }
585
586    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
587    where
588        E: EntityKind<Canister = C> + EntityValue,
589    {
590        self.execute_save_view::<E>(|save| save.replace_view(view))
591    }
592
593    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
594    where
595        E: EntityKind<Canister = C> + EntityValue,
596    {
597        self.execute_save_view::<E>(|save| save.update_view(view))
598    }
599
600    /// TEST ONLY: clear all registered data and index stores for this database.
601    #[cfg(test)]
602    #[doc(hidden)]
603    pub fn clear_stores_for_tests(&self) {
604        self.db.with_store_registry(|reg| {
605            for (_, store) in reg.iter() {
606                store.with_data_mut(DataStore::clear);
607                store.with_index_mut(IndexStore::clear);
608            }
609        });
610    }
611}