Skip to main content

icydb_core/db/
mod.rs

1// 1️⃣ Module declarations
2pub(crate) mod cursor;
3pub(crate) mod diagnostics;
4pub(crate) mod identity;
5pub(crate) mod query;
6pub(crate) mod registry;
7pub(crate) mod response;
8
9pub(in crate::db) mod codec;
10pub(in crate::db) mod commit;
11pub(in crate::db) mod data;
12pub(in crate::db) mod entity_decode;
13pub(in crate::db) mod executor;
14pub(in crate::db) mod index;
15pub(in crate::db) mod relation;
16
17// 2️⃣ Public re-exports (Tier-2 API surface)
18pub use cursor::encode_cursor;
19pub use data::DataStore;
20pub(crate) use data::StorageKey;
21pub use diagnostics::StorageReport;
22pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
23pub use identity::{EntityName, IndexName};
24pub use index::IndexStore;
25#[cfg(test)]
26pub(crate) use index::hash_value;
27pub use query::{
28    ReadConsistency,
29    builder::field::FieldRef,
30    expr::{FilterExpr, SortExpr},
31    intent::{IntentError, Query, QueryError},
32    plan::{OrderDirection, PlanError},
33    predicate::{
34        CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature, ValidateError,
35    },
36    session::{
37        delete::SessionDeleteQuery,
38        load::{PagedLoadQuery, SessionLoadQuery},
39    },
40};
41pub use registry::StoreRegistry;
42pub use relation::validate_delete_strong_relations_for_source;
43pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
44
45// 3️⃣ Internal imports (implementation wiring)
46use crate::{
47    db::{
48        commit::{
49            CommitRowOp, PreparedRowCommitOp, ensure_recovered, prepare_row_commit_for_entity,
50        },
51        data::RawDataKey,
52        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
53        query::intent::QueryMode,
54        relation::StrongRelationDeleteValidateFn,
55    },
56    error::InternalError,
57    obs::sink::{MetricsSink, with_metrics_sink},
58    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
59    types::Id,
60};
61use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
62
63///
64/// PagedLoadExecution
65///
66/// Cursor-paged load response with optional continuation cursor bytes.
67///
68pub type PagedLoadExecution<E> = (Response<E>, Option<Vec<u8>>);
69
70///
71/// PagedLoadExecutionWithTrace
72///
73/// Cursor-paged load response plus optional execution trace details.
74///
75pub type PagedLoadExecutionWithTrace<E> = (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>);
76
77///
78/// Db
79/// A handle to the set of stores registered for a specific canister domain.
80///
81
82pub struct Db<C: CanisterKind> {
83    store: &'static LocalKey<StoreRegistry>,
84    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
85    _marker: PhantomData<C>,
86}
87
88impl<C: CanisterKind> Db<C> {
89    #[must_use]
90    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
91        Self::new_with_hooks(store, &[])
92    }
93
94    #[must_use]
95    pub const fn new_with_hooks(
96        store: &'static LocalKey<StoreRegistry>,
97        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
98    ) -> Self {
99        Self {
100            store,
101            entity_runtime_hooks,
102            _marker: PhantomData,
103        }
104    }
105
106    #[must_use]
107    pub(crate) const fn context<E>(&self) -> Context<'_, E>
108    where
109        E: EntityKind<Canister = C> + EntityValue,
110    {
111        Context::new(self)
112    }
113
114    /// Return a recovery-guarded context for read paths.
115    ///
116    /// This enforces startup recovery and a fast persisted-marker check so reads
117    /// do not proceed while an incomplete commit is pending replay.
118    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
119    where
120        E: EntityKind<Canister = C> + EntityValue,
121    {
122        ensure_recovered(self)?;
123
124        Ok(Context::new(self))
125    }
126
127    /// Ensure startup/in-progress commit recovery has been applied.
128    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
129        ensure_recovered(self)
130    }
131
132    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
133        self.store.with(|reg| f(reg))
134    }
135
136    pub fn storage_report(
137        &self,
138        name_to_path: &[(&'static str, &'static str)],
139    ) -> Result<StorageReport, InternalError> {
140        diagnostics::storage_report(self, name_to_path)
141    }
142
143    pub(in crate::db) fn prepare_row_commit_op(
144        &self,
145        op: &CommitRowOp,
146    ) -> Result<PreparedRowCommitOp, InternalError> {
147        let hooks = self
148            .entity_runtime_hooks
149            .iter()
150            .find(|hooks| hooks.entity_path == op.entity_path.as_str())
151            .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
152
153        (hooks.prepare_row_commit)(self, op)
154    }
155
156    // Validate strong relation constraints for delete-selected target keys.
157    pub(crate) fn validate_delete_strong_relations(
158        &self,
159        target_path: &str,
160        deleted_target_keys: &BTreeSet<RawDataKey>,
161    ) -> Result<(), InternalError> {
162        if deleted_target_keys.is_empty() {
163            return Ok(());
164        }
165
166        for hooks in self.entity_runtime_hooks {
167            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
168        }
169
170        Ok(())
171    }
172}
173
174///
175/// EntityRuntimeHooks
176///
177/// Per-entity runtime callbacks used for commit preparation and delete-side
178/// strong relation validation.
179///
180
181pub struct EntityRuntimeHooks<C: CanisterKind> {
182    pub(crate) entity_name: &'static str,
183    pub(crate) entity_path: &'static str,
184    pub(in crate::db) prepare_row_commit:
185        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
186    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
187}
188
189impl<C: CanisterKind> EntityRuntimeHooks<C> {
190    #[must_use]
191    pub(in crate::db) const fn new(
192        entity_name: &'static str,
193        entity_path: &'static str,
194        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
195        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
196    ) -> Self {
197        Self {
198            entity_name,
199            entity_path,
200            prepare_row_commit,
201            validate_delete_strong_relations,
202        }
203    }
204
205    #[must_use]
206    pub const fn for_entity<E>(
207        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
208    ) -> Self
209    where
210        E: EntityKind<Canister = C> + EntityValue,
211    {
212        Self::new(
213            <E as EntityIdentity>::ENTITY_NAME,
214            E::PATH,
215            prepare_row_commit_for_entity::<E>,
216            validate_delete_strong_relations,
217        )
218    }
219}
220
221impl<C: CanisterKind> Db<C> {
222    #[must_use]
223    pub(crate) const fn has_runtime_hooks(&self) -> bool {
224        !self.entity_runtime_hooks.is_empty()
225    }
226
227    pub(crate) fn runtime_hook_for_entity_name(
228        &self,
229        entity_name: &str,
230    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
231        let mut matched = None;
232        for hooks in self.entity_runtime_hooks {
233            if hooks.entity_name != entity_name {
234                continue;
235            }
236
237            if matched.is_some() {
238                return Err(InternalError::store_invariant(format!(
239                    "duplicate runtime hooks for entity name '{entity_name}'"
240                )));
241            }
242
243            matched = Some(hooks);
244        }
245
246        matched.ok_or_else(|| {
247            InternalError::store_unsupported(format!(
248                "unsupported entity name in data store: '{entity_name}'"
249            ))
250        })
251    }
252}
253
254impl<C: CanisterKind> Copy for Db<C> {}
255
256impl<C: CanisterKind> Clone for Db<C> {
257    fn clone(&self) -> Self {
258        *self
259    }
260}
261
262///
263/// DbSession
264///
265/// Session-scoped database handle with policy (debug, metrics) and execution routing.
266///
267
268pub struct DbSession<C: CanisterKind> {
269    db: Db<C>,
270    debug: bool,
271    metrics: Option<&'static dyn MetricsSink>,
272}
273
274impl<C: CanisterKind> DbSession<C> {
275    #[must_use]
276    pub const fn new(db: Db<C>) -> Self {
277        Self {
278            db,
279            debug: false,
280            metrics: None,
281        }
282    }
283
284    #[must_use]
285    pub const fn debug(mut self) -> Self {
286        self.debug = true;
287        self
288    }
289
290    #[must_use]
291    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
292        self.metrics = Some(sink);
293        self
294    }
295
296    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
297        if let Some(sink) = self.metrics {
298            with_metrics_sink(sink, f)
299        } else {
300            f()
301        }
302    }
303
304    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
305    fn execute_save_with<E, T, R>(
306        &self,
307        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
308        map: impl FnOnce(T) -> R,
309    ) -> Result<R, InternalError>
310    where
311        E: EntityKind<Canister = C> + EntityValue,
312    {
313        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
314
315        Ok(map(value))
316    }
317
318    // Shared save-facade wrappers keep response shape explicit at call sites.
319    fn execute_save_entity<E>(
320        &self,
321        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
322    ) -> Result<WriteResponse<E>, InternalError>
323    where
324        E: EntityKind<Canister = C> + EntityValue,
325    {
326        self.execute_save_with(op, WriteResponse::new)
327    }
328
329    fn execute_save_batch<E>(
330        &self,
331        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
332    ) -> Result<WriteBatchResponse<E>, InternalError>
333    where
334        E: EntityKind<Canister = C> + EntityValue,
335    {
336        self.execute_save_with(op, WriteBatchResponse::new)
337    }
338
339    fn execute_save_view<E>(
340        &self,
341        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
342    ) -> Result<E::ViewType, InternalError>
343    where
344        E: EntityKind<Canister = C> + EntityValue,
345    {
346        self.execute_save_with(op, std::convert::identity)
347    }
348
349    // ---------------------------------------------------------------------
350    // Query entry points (public, fluent)
351    // ---------------------------------------------------------------------
352
353    #[must_use]
354    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
355    where
356        E: EntityKind<Canister = C>,
357    {
358        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
359    }
360
361    #[must_use]
362    pub const fn load_with_consistency<E>(
363        &self,
364        consistency: ReadConsistency,
365    ) -> SessionLoadQuery<'_, C, E>
366    where
367        E: EntityKind<Canister = C>,
368    {
369        SessionLoadQuery::new(self, Query::new(consistency))
370    }
371
372    #[must_use]
373    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
374    where
375        E: EntityKind<Canister = C>,
376    {
377        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
378    }
379
380    #[must_use]
381    pub fn delete_with_consistency<E>(
382        &self,
383        consistency: ReadConsistency,
384    ) -> SessionDeleteQuery<'_, C, E>
385    where
386        E: EntityKind<Canister = C>,
387    {
388        SessionDeleteQuery::new(self, Query::new(consistency).delete())
389    }
390
391    // ---------------------------------------------------------------------
392    // Low-level executors (crate-internal; execution primitives)
393    // ---------------------------------------------------------------------
394
395    #[must_use]
396    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
397    where
398        E: EntityKind<Canister = C> + EntityValue,
399    {
400        LoadExecutor::new(self.db, self.debug)
401    }
402
403    #[must_use]
404    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
405    where
406        E: EntityKind<Canister = C> + EntityValue,
407    {
408        DeleteExecutor::new(self.db, self.debug)
409    }
410
411    #[must_use]
412    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
413    where
414        E: EntityKind<Canister = C> + EntityValue,
415    {
416        SaveExecutor::new(self.db, self.debug)
417    }
418
419    // ---------------------------------------------------------------------
420    // Query diagnostics / execution (internal routing)
421    // ---------------------------------------------------------------------
422
423    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
424    where
425        E: EntityKind<Canister = C> + EntityValue,
426    {
427        let plan = query.plan()?;
428
429        let result = match query.mode() {
430            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
431            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
432        };
433
434        result.map_err(QueryError::Execute)
435    }
436
437    pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
438    where
439        E: EntityKind<Canister = C> + EntityValue,
440    {
441        let plan = query.plan()?;
442
443        self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
444            .map_err(QueryError::Execute)
445    }
446
447    pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
448    where
449        E: EntityKind<Canister = C> + EntityValue,
450    {
451        let plan = query.plan()?;
452
453        self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
454            .map_err(QueryError::Execute)
455    }
456
457    pub(crate) fn execute_load_query_min<E>(
458        &self,
459        query: &Query<E>,
460    ) -> Result<Option<Id<E>>, QueryError>
461    where
462        E: EntityKind<Canister = C> + EntityValue,
463    {
464        let plan = query.plan()?;
465
466        self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
467            .map_err(QueryError::Execute)
468    }
469
470    pub(crate) fn execute_load_query_max<E>(
471        &self,
472        query: &Query<E>,
473    ) -> Result<Option<Id<E>>, QueryError>
474    where
475        E: EntityKind<Canister = C> + EntityValue,
476    {
477        let plan = query.plan()?;
478
479        self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
480            .map_err(QueryError::Execute)
481    }
482
483    pub(crate) fn execute_load_query_paged_with_trace<E>(
484        &self,
485        query: &Query<E>,
486        cursor_token: Option<&str>,
487    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
488    where
489        E: EntityKind<Canister = C> + EntityValue,
490    {
491        let plan = query.plan()?;
492        let cursor_bytes = match cursor_token {
493            Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
494                QueryError::from(PlanError::from(
495                    crate::db::query::plan::CursorPlanError::InvalidContinuationCursor { reason },
496                ))
497            })?),
498            None => None,
499        };
500        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
501
502        let (page, trace) = self
503            .with_metrics(|| {
504                self.load_executor::<E>()
505                    .execute_paged_with_cursor_traced(plan, cursor)
506            })
507            .map_err(QueryError::Execute)?;
508
509        Ok((page.items, page.next_cursor, trace))
510    }
511
512    // ---------------------------------------------------------------------
513    // High-level write API (public, intent-level)
514    // ---------------------------------------------------------------------
515
516    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
517    where
518        E: EntityKind<Canister = C> + EntityValue,
519    {
520        self.execute_save_entity(|save| save.insert(entity))
521    }
522
523    /// Insert a single-entity-type batch atomically in one commit window.
524    ///
525    /// If any item fails pre-commit validation, no row in the batch is persisted.
526    ///
527    /// This API is not a multi-entity transaction surface.
528    pub fn insert_many_atomic<E>(
529        &self,
530        entities: impl IntoIterator<Item = E>,
531    ) -> Result<WriteBatchResponse<E>, InternalError>
532    where
533        E: EntityKind<Canister = C> + EntityValue,
534    {
535        self.execute_save_batch(|save| save.insert_many_atomic(entities))
536    }
537
538    /// Insert a batch with explicitly non-atomic semantics.
539    ///
540    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
541    pub fn insert_many_non_atomic<E>(
542        &self,
543        entities: impl IntoIterator<Item = E>,
544    ) -> Result<WriteBatchResponse<E>, InternalError>
545    where
546        E: EntityKind<Canister = C> + EntityValue,
547    {
548        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
549    }
550
551    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
552    where
553        E: EntityKind<Canister = C> + EntityValue,
554    {
555        self.execute_save_entity(|save| save.replace(entity))
556    }
557
558    /// Replace a single-entity-type batch atomically in one commit window.
559    ///
560    /// If any item fails pre-commit validation, no row in the batch is persisted.
561    ///
562    /// This API is not a multi-entity transaction surface.
563    pub fn replace_many_atomic<E>(
564        &self,
565        entities: impl IntoIterator<Item = E>,
566    ) -> Result<WriteBatchResponse<E>, InternalError>
567    where
568        E: EntityKind<Canister = C> + EntityValue,
569    {
570        self.execute_save_batch(|save| save.replace_many_atomic(entities))
571    }
572
573    /// Replace a batch with explicitly non-atomic semantics.
574    ///
575    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
576    pub fn replace_many_non_atomic<E>(
577        &self,
578        entities: impl IntoIterator<Item = E>,
579    ) -> Result<WriteBatchResponse<E>, InternalError>
580    where
581        E: EntityKind<Canister = C> + EntityValue,
582    {
583        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
584    }
585
586    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
587    where
588        E: EntityKind<Canister = C> + EntityValue,
589    {
590        self.execute_save_entity(|save| save.update(entity))
591    }
592
593    /// Update a single-entity-type batch atomically in one commit window.
594    ///
595    /// If any item fails pre-commit validation, no row in the batch is persisted.
596    ///
597    /// This API is not a multi-entity transaction surface.
598    pub fn update_many_atomic<E>(
599        &self,
600        entities: impl IntoIterator<Item = E>,
601    ) -> Result<WriteBatchResponse<E>, InternalError>
602    where
603        E: EntityKind<Canister = C> + EntityValue,
604    {
605        self.execute_save_batch(|save| save.update_many_atomic(entities))
606    }
607
608    /// Update a batch with explicitly non-atomic semantics.
609    ///
610    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
611    pub fn update_many_non_atomic<E>(
612        &self,
613        entities: impl IntoIterator<Item = E>,
614    ) -> Result<WriteBatchResponse<E>, InternalError>
615    where
616        E: EntityKind<Canister = C> + EntityValue,
617    {
618        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
619    }
620
621    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
622    where
623        E: EntityKind<Canister = C> + EntityValue,
624    {
625        self.execute_save_view::<E>(|save| save.insert_view(view))
626    }
627
628    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
629    where
630        E: EntityKind<Canister = C> + EntityValue,
631    {
632        self.execute_save_view::<E>(|save| save.replace_view(view))
633    }
634
635    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
636    where
637        E: EntityKind<Canister = C> + EntityValue,
638    {
639        self.execute_save_view::<E>(|save| save.update_view(view))
640    }
641
642    /// TEST ONLY: clear all registered data and index stores for this database.
643    #[cfg(test)]
644    #[doc(hidden)]
645    pub fn clear_stores_for_tests(&self) {
646        self.db.with_store_registry(|reg| {
647            for (_, store) in reg.iter() {
648                store.with_data_mut(DataStore::clear);
649                store.with_index_mut(IndexStore::clear);
650            }
651        });
652    }
653}