Skip to main content

icydb_core/db/
mod.rs

1// 1️⃣ Module declarations
2pub(crate) mod cursor;
3pub(crate) mod diagnostics;
4pub(crate) mod identity;
5pub(crate) mod query;
6pub(crate) mod registry;
7pub(crate) mod response;
8
9pub(in crate::db) mod codec;
10pub(in crate::db) mod commit;
11pub(in crate::db) mod data;
12pub(in crate::db) mod entity_decode;
13pub(in crate::db) mod executor;
14pub(in crate::db) mod index;
15pub(in crate::db) mod relation;
16
17// 2️⃣ Public re-exports (Tier-2 API surface)
18pub use cursor::encode_cursor;
19pub use data::DataStore;
20pub(crate) use data::StorageKey;
21pub use diagnostics::StorageReport;
22pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
23pub use identity::{EntityName, IndexName};
24pub use index::IndexStore;
25#[cfg(test)]
26pub(crate) use index::hash_value;
27pub use query::{
28    ReadConsistency,
29    builder::field::FieldRef,
30    expr::{FilterExpr, SortExpr},
31    intent::{IntentError, Query, QueryError},
32    plan::{OrderDirection, PlanError},
33    predicate::{
34        CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature, ValidateError,
35    },
36    session::{
37        delete::SessionDeleteQuery,
38        load::{PagedLoadQuery, SessionLoadQuery},
39    },
40};
41pub use registry::StoreRegistry;
42pub use relation::validate_delete_strong_relations_for_source;
43pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
44
45// 3️⃣ Internal imports (implementation wiring)
46use crate::{
47    db::{
48        commit::{
49            CommitRowOp, PreparedRowCommitOp, ensure_recovered, prepare_row_commit_for_entity,
50        },
51        data::RawDataKey,
52        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
53        query::intent::QueryMode,
54        relation::StrongRelationDeleteValidateFn,
55    },
56    error::InternalError,
57    obs::sink::{MetricsSink, with_metrics_sink},
58    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
59};
60use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
61
62///
63/// PagedLoadExecution
64///
65/// Cursor-paged load response with optional continuation cursor bytes.
66///
67pub type PagedLoadExecution<E> = (Response<E>, Option<Vec<u8>>);
68
69///
70/// PagedLoadExecutionWithTrace
71///
72/// Cursor-paged load response plus optional execution trace details.
73///
74pub type PagedLoadExecutionWithTrace<E> = (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>);
75
76///
77/// Db
78/// A handle to the set of stores registered for a specific canister domain.
79///
80
81pub struct Db<C: CanisterKind> {
82    store: &'static LocalKey<StoreRegistry>,
83    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84    _marker: PhantomData<C>,
85}
86
87impl<C: CanisterKind> Db<C> {
88    #[must_use]
89    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
90        Self::new_with_hooks(store, &[])
91    }
92
93    #[must_use]
94    pub const fn new_with_hooks(
95        store: &'static LocalKey<StoreRegistry>,
96        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
97    ) -> Self {
98        Self {
99            store,
100            entity_runtime_hooks,
101            _marker: PhantomData,
102        }
103    }
104
105    #[must_use]
106    pub(crate) const fn context<E>(&self) -> Context<'_, E>
107    where
108        E: EntityKind<Canister = C> + EntityValue,
109    {
110        Context::new(self)
111    }
112
113    /// Return a recovery-guarded context for read paths.
114    ///
115    /// This enforces startup recovery and a fast persisted-marker check so reads
116    /// do not proceed while an incomplete commit is pending replay.
117    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
118    where
119        E: EntityKind<Canister = C> + EntityValue,
120    {
121        ensure_recovered(self)?;
122
123        Ok(Context::new(self))
124    }
125
126    /// Ensure startup/in-progress commit recovery has been applied.
127    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
128        ensure_recovered(self)
129    }
130
131    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
132        self.store.with(|reg| f(reg))
133    }
134
135    pub fn storage_report(
136        &self,
137        name_to_path: &[(&'static str, &'static str)],
138    ) -> Result<StorageReport, InternalError> {
139        diagnostics::storage_report(self, name_to_path)
140    }
141
142    pub(in crate::db) fn prepare_row_commit_op(
143        &self,
144        op: &CommitRowOp,
145    ) -> Result<PreparedRowCommitOp, InternalError> {
146        let hooks = self
147            .entity_runtime_hooks
148            .iter()
149            .find(|hooks| hooks.entity_path == op.entity_path.as_str())
150            .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
151
152        (hooks.prepare_row_commit)(self, op)
153    }
154
155    // Validate strong relation constraints for delete-selected target keys.
156    pub(crate) fn validate_delete_strong_relations(
157        &self,
158        target_path: &str,
159        deleted_target_keys: &BTreeSet<RawDataKey>,
160    ) -> Result<(), InternalError> {
161        if deleted_target_keys.is_empty() {
162            return Ok(());
163        }
164
165        for hooks in self.entity_runtime_hooks {
166            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
167        }
168
169        Ok(())
170    }
171}
172
173///
174/// EntityRuntimeHooks
175///
176/// Per-entity runtime callbacks used for commit preparation and delete-side
177/// strong relation validation.
178///
179
180pub struct EntityRuntimeHooks<C: CanisterKind> {
181    pub(crate) entity_name: &'static str,
182    pub(crate) entity_path: &'static str,
183    pub(in crate::db) prepare_row_commit:
184        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
185    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
186}
187
188impl<C: CanisterKind> EntityRuntimeHooks<C> {
189    #[must_use]
190    pub(in crate::db) const fn new(
191        entity_name: &'static str,
192        entity_path: &'static str,
193        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
194        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
195    ) -> Self {
196        Self {
197            entity_name,
198            entity_path,
199            prepare_row_commit,
200            validate_delete_strong_relations,
201        }
202    }
203
204    #[must_use]
205    pub const fn for_entity<E>(
206        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
207    ) -> Self
208    where
209        E: EntityKind<Canister = C> + EntityValue,
210    {
211        Self::new(
212            <E as EntityIdentity>::ENTITY_NAME,
213            E::PATH,
214            prepare_row_commit_for_entity::<E>,
215            validate_delete_strong_relations,
216        )
217    }
218}
219
220impl<C: CanisterKind> Db<C> {
221    #[must_use]
222    pub(crate) const fn has_runtime_hooks(&self) -> bool {
223        !self.entity_runtime_hooks.is_empty()
224    }
225
226    pub(crate) fn runtime_hook_for_entity_name(
227        &self,
228        entity_name: &str,
229    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
230        let mut matched = None;
231        for hooks in self.entity_runtime_hooks {
232            if hooks.entity_name != entity_name {
233                continue;
234            }
235
236            if matched.is_some() {
237                return Err(InternalError::store_invariant(format!(
238                    "duplicate runtime hooks for entity name '{entity_name}'"
239                )));
240            }
241
242            matched = Some(hooks);
243        }
244
245        matched.ok_or_else(|| {
246            InternalError::store_unsupported(format!(
247                "unsupported entity name in data store: '{entity_name}'"
248            ))
249        })
250    }
251}
252
253impl<C: CanisterKind> Copy for Db<C> {}
254
255impl<C: CanisterKind> Clone for Db<C> {
256    fn clone(&self) -> Self {
257        *self
258    }
259}
260
261///
262/// DbSession
263///
264/// Session-scoped database handle with policy (debug, metrics) and execution routing.
265///
266pub struct DbSession<C: CanisterKind> {
267    db: Db<C>,
268    debug: bool,
269    metrics: Option<&'static dyn MetricsSink>,
270}
271
272impl<C: CanisterKind> DbSession<C> {
273    #[must_use]
274    pub const fn new(db: Db<C>) -> Self {
275        Self {
276            db,
277            debug: false,
278            metrics: None,
279        }
280    }
281
282    #[must_use]
283    pub const fn debug(mut self) -> Self {
284        self.debug = true;
285        self
286    }
287
288    #[must_use]
289    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
290        self.metrics = Some(sink);
291        self
292    }
293
294    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
295        if let Some(sink) = self.metrics {
296            with_metrics_sink(sink, f)
297        } else {
298            f()
299        }
300    }
301
302    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
303    fn execute_save_with<E, T, R>(
304        &self,
305        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
306        map: impl FnOnce(T) -> R,
307    ) -> Result<R, InternalError>
308    where
309        E: EntityKind<Canister = C> + EntityValue,
310    {
311        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
312
313        Ok(map(value))
314    }
315
316    // Shared save-facade wrappers keep response shape explicit at call sites.
317    fn execute_save_entity<E>(
318        &self,
319        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
320    ) -> Result<WriteResponse<E>, InternalError>
321    where
322        E: EntityKind<Canister = C> + EntityValue,
323    {
324        self.execute_save_with(op, WriteResponse::new)
325    }
326
327    fn execute_save_batch<E>(
328        &self,
329        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
330    ) -> Result<WriteBatchResponse<E>, InternalError>
331    where
332        E: EntityKind<Canister = C> + EntityValue,
333    {
334        self.execute_save_with(op, WriteBatchResponse::new)
335    }
336
337    fn execute_save_view<E>(
338        &self,
339        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
340    ) -> Result<E::ViewType, InternalError>
341    where
342        E: EntityKind<Canister = C> + EntityValue,
343    {
344        self.execute_save_with(op, std::convert::identity)
345    }
346
347    // ---------------------------------------------------------------------
348    // Query entry points (public, fluent)
349    // ---------------------------------------------------------------------
350
351    #[must_use]
352    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
353    where
354        E: EntityKind<Canister = C>,
355    {
356        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
357    }
358
359    #[must_use]
360    pub const fn load_with_consistency<E>(
361        &self,
362        consistency: ReadConsistency,
363    ) -> SessionLoadQuery<'_, C, E>
364    where
365        E: EntityKind<Canister = C>,
366    {
367        SessionLoadQuery::new(self, Query::new(consistency))
368    }
369
370    #[must_use]
371    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
372    where
373        E: EntityKind<Canister = C>,
374    {
375        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
376    }
377
378    #[must_use]
379    pub fn delete_with_consistency<E>(
380        &self,
381        consistency: ReadConsistency,
382    ) -> SessionDeleteQuery<'_, C, E>
383    where
384        E: EntityKind<Canister = C>,
385    {
386        SessionDeleteQuery::new(self, Query::new(consistency).delete())
387    }
388
389    // ---------------------------------------------------------------------
390    // Low-level executors (crate-internal; execution primitives)
391    // ---------------------------------------------------------------------
392
393    #[must_use]
394    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
395    where
396        E: EntityKind<Canister = C> + EntityValue,
397    {
398        LoadExecutor::new(self.db, self.debug)
399    }
400
401    #[must_use]
402    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
403    where
404        E: EntityKind<Canister = C> + EntityValue,
405    {
406        DeleteExecutor::new(self.db, self.debug)
407    }
408
409    #[must_use]
410    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
411    where
412        E: EntityKind<Canister = C> + EntityValue,
413    {
414        SaveExecutor::new(self.db, self.debug)
415    }
416
417    // ---------------------------------------------------------------------
418    // Query diagnostics / execution (internal routing)
419    // ---------------------------------------------------------------------
420
421    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
422    where
423        E: EntityKind<Canister = C> + EntityValue,
424    {
425        let plan = query.plan()?;
426
427        let result = match query.mode() {
428            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
429            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
430        };
431
432        result.map_err(QueryError::Execute)
433    }
434
435    pub(crate) fn execute_load_query_paged_with_trace<E>(
436        &self,
437        query: &Query<E>,
438        cursor_token: Option<&str>,
439    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
440    where
441        E: EntityKind<Canister = C> + EntityValue,
442    {
443        let plan = query.plan()?;
444        let cursor_bytes = match cursor_token {
445            Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
446                QueryError::from(PlanError::from(
447                    crate::db::query::plan::CursorPlanError::InvalidContinuationCursor { reason },
448                ))
449            })?),
450            None => None,
451        };
452        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
453
454        let (page, trace) = self
455            .with_metrics(|| {
456                self.load_executor::<E>()
457                    .execute_paged_with_cursor_traced(plan, cursor)
458            })
459            .map_err(QueryError::Execute)?;
460
461        Ok((page.items, page.next_cursor, trace))
462    }
463
464    // ---------------------------------------------------------------------
465    // High-level write API (public, intent-level)
466    // ---------------------------------------------------------------------
467
468    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
469    where
470        E: EntityKind<Canister = C> + EntityValue,
471    {
472        self.execute_save_entity(|save| save.insert(entity))
473    }
474
475    /// Insert a single-entity-type batch atomically in one commit window.
476    ///
477    /// If any item fails pre-commit validation, no row in the batch is persisted.
478    ///
479    /// This API is not a multi-entity transaction surface.
480    pub fn insert_many_atomic<E>(
481        &self,
482        entities: impl IntoIterator<Item = E>,
483    ) -> Result<WriteBatchResponse<E>, InternalError>
484    where
485        E: EntityKind<Canister = C> + EntityValue,
486    {
487        self.execute_save_batch(|save| save.insert_many_atomic(entities))
488    }
489
490    /// Insert a batch with explicitly non-atomic semantics.
491    ///
492    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
493    pub fn insert_many_non_atomic<E>(
494        &self,
495        entities: impl IntoIterator<Item = E>,
496    ) -> Result<WriteBatchResponse<E>, InternalError>
497    where
498        E: EntityKind<Canister = C> + EntityValue,
499    {
500        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
501    }
502
503    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
504    where
505        E: EntityKind<Canister = C> + EntityValue,
506    {
507        self.execute_save_entity(|save| save.replace(entity))
508    }
509
510    /// Replace a single-entity-type batch atomically in one commit window.
511    ///
512    /// If any item fails pre-commit validation, no row in the batch is persisted.
513    ///
514    /// This API is not a multi-entity transaction surface.
515    pub fn replace_many_atomic<E>(
516        &self,
517        entities: impl IntoIterator<Item = E>,
518    ) -> Result<WriteBatchResponse<E>, InternalError>
519    where
520        E: EntityKind<Canister = C> + EntityValue,
521    {
522        self.execute_save_batch(|save| save.replace_many_atomic(entities))
523    }
524
525    /// Replace a batch with explicitly non-atomic semantics.
526    ///
527    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
528    pub fn replace_many_non_atomic<E>(
529        &self,
530        entities: impl IntoIterator<Item = E>,
531    ) -> Result<WriteBatchResponse<E>, InternalError>
532    where
533        E: EntityKind<Canister = C> + EntityValue,
534    {
535        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
536    }
537
538    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
539    where
540        E: EntityKind<Canister = C> + EntityValue,
541    {
542        self.execute_save_entity(|save| save.update(entity))
543    }
544
545    /// Update a single-entity-type batch atomically in one commit window.
546    ///
547    /// If any item fails pre-commit validation, no row in the batch is persisted.
548    ///
549    /// This API is not a multi-entity transaction surface.
550    pub fn update_many_atomic<E>(
551        &self,
552        entities: impl IntoIterator<Item = E>,
553    ) -> Result<WriteBatchResponse<E>, InternalError>
554    where
555        E: EntityKind<Canister = C> + EntityValue,
556    {
557        self.execute_save_batch(|save| save.update_many_atomic(entities))
558    }
559
560    /// Update a batch with explicitly non-atomic semantics.
561    ///
562    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
563    pub fn update_many_non_atomic<E>(
564        &self,
565        entities: impl IntoIterator<Item = E>,
566    ) -> Result<WriteBatchResponse<E>, InternalError>
567    where
568        E: EntityKind<Canister = C> + EntityValue,
569    {
570        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
571    }
572
573    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
574    where
575        E: EntityKind<Canister = C> + EntityValue,
576    {
577        self.execute_save_view::<E>(|save| save.insert_view(view))
578    }
579
580    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
581    where
582        E: EntityKind<Canister = C> + EntityValue,
583    {
584        self.execute_save_view::<E>(|save| save.replace_view(view))
585    }
586
587    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
588    where
589        E: EntityKind<Canister = C> + EntityValue,
590    {
591        self.execute_save_view::<E>(|save| save.update_view(view))
592    }
593
594    /// TEST ONLY: clear all registered data and index stores for this database.
595    #[cfg(test)]
596    #[doc(hidden)]
597    pub fn clear_stores_for_tests(&self) {
598        self.db.with_store_registry(|reg| {
599            for (_, store) in reg.iter() {
600                store.with_data_mut(DataStore::clear);
601                store.with_index_mut(IndexStore::clear);
602            }
603        });
604    }
605}