Skip to main content

icydb_core/db/
mod.rs

1// 1️⃣ Module declarations
2pub(crate) mod cursor;
3pub(crate) mod diagnostics;
4pub(crate) mod identity;
5pub(crate) mod query;
6pub(crate) mod registry;
7pub(crate) mod response;
8
9pub(in crate::db) mod codec;
10pub(in crate::db) mod commit;
11pub(in crate::db) mod data;
12pub(in crate::db) mod entity_decode;
13pub(in crate::db) mod executor;
14pub(in crate::db) mod index;
15pub(in crate::db) mod relation;
16
17// 2️⃣ Public re-exports (Tier-2 API surface)
18pub use cursor::encode_cursor;
19pub use data::DataStore;
20pub(crate) use data::StorageKey;
21pub use diagnostics::StorageReport;
22pub use executor::{
23    ExecutionAccessPathVariant, ExecutionFastPath, ExecutionPushdownType, ExecutionTrace,
24};
25pub use identity::{EntityName, IndexName};
26pub use index::IndexStore;
27#[cfg(test)]
28pub(crate) use index::hash_value;
29pub use query::{
30    ReadConsistency,
31    builder::field::FieldRef,
32    expr::{FilterExpr, SortExpr},
33    intent::{IntentError, Query, QueryError},
34    plan::{OrderDirection, PlanError},
35    predicate::{
36        CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature, ValidateError,
37    },
38    session::{
39        delete::SessionDeleteQuery,
40        load::{PagedLoadQuery, SessionLoadQuery},
41    },
42};
43pub use registry::StoreRegistry;
44pub use relation::validate_delete_strong_relations_for_source;
45pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
46
47// 3️⃣ Internal imports (implementation wiring)
48use crate::{
49    db::{
50        commit::{
51            CommitRowOp, PreparedRowCommitOp, ensure_recovered, prepare_row_commit_for_entity,
52        },
53        data::RawDataKey,
54        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
55        query::intent::QueryMode,
56        relation::StrongRelationDeleteValidateFn,
57    },
58    error::InternalError,
59    obs::sink::{MetricsSink, with_metrics_sink},
60    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
61};
62use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
63
64///
65/// PagedLoadExecution
66///
67/// Cursor-paged load response with optional continuation cursor bytes.
68///
69pub type PagedLoadExecution<E> = (Response<E>, Option<Vec<u8>>);
70
71///
72/// PagedLoadExecutionWithTrace
73///
74/// Cursor-paged load response plus optional execution trace details.
75///
76pub type PagedLoadExecutionWithTrace<E> = (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>);
77
78///
79/// Db
80/// A handle to the set of stores registered for a specific canister domain.
81///
82
83pub struct Db<C: CanisterKind> {
84    store: &'static LocalKey<StoreRegistry>,
85    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
86    _marker: PhantomData<C>,
87}
88
89impl<C: CanisterKind> Db<C> {
90    #[must_use]
91    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
92        Self::new_with_hooks(store, &[])
93    }
94
95    #[must_use]
96    pub const fn new_with_hooks(
97        store: &'static LocalKey<StoreRegistry>,
98        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
99    ) -> Self {
100        Self {
101            store,
102            entity_runtime_hooks,
103            _marker: PhantomData,
104        }
105    }
106
107    #[must_use]
108    pub(crate) const fn context<E>(&self) -> Context<'_, E>
109    where
110        E: EntityKind<Canister = C> + EntityValue,
111    {
112        Context::new(self)
113    }
114
115    /// Return a recovery-guarded context for read paths.
116    ///
117    /// This enforces startup recovery and a fast persisted-marker check so reads
118    /// do not proceed while an incomplete commit is pending replay.
119    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
120    where
121        E: EntityKind<Canister = C> + EntityValue,
122    {
123        ensure_recovered(self)?;
124
125        Ok(Context::new(self))
126    }
127
128    /// Ensure startup/in-progress commit recovery has been applied.
129    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
130        ensure_recovered(self)
131    }
132
133    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
134        self.store.with(|reg| f(reg))
135    }
136
137    pub fn storage_report(
138        &self,
139        name_to_path: &[(&'static str, &'static str)],
140    ) -> Result<StorageReport, InternalError> {
141        diagnostics::storage_report(self, name_to_path)
142    }
143
144    pub(in crate::db) fn prepare_row_commit_op(
145        &self,
146        op: &CommitRowOp,
147    ) -> Result<PreparedRowCommitOp, InternalError> {
148        let hooks = self
149            .entity_runtime_hooks
150            .iter()
151            .find(|hooks| hooks.entity_path == op.entity_path.as_str())
152            .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
153
154        (hooks.prepare_row_commit)(self, op)
155    }
156
157    // Validate strong relation constraints for delete-selected target keys.
158    pub(crate) fn validate_delete_strong_relations(
159        &self,
160        target_path: &str,
161        deleted_target_keys: &BTreeSet<RawDataKey>,
162    ) -> Result<(), InternalError> {
163        if deleted_target_keys.is_empty() {
164            return Ok(());
165        }
166
167        for hooks in self.entity_runtime_hooks {
168            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
169        }
170
171        Ok(())
172    }
173}
174
175///
176/// EntityRuntimeHooks
177///
178/// Per-entity runtime callbacks used for commit preparation and delete-side
179/// strong relation validation.
180///
181
182pub struct EntityRuntimeHooks<C: CanisterKind> {
183    pub(crate) entity_name: &'static str,
184    pub(crate) entity_path: &'static str,
185    pub(in crate::db) prepare_row_commit:
186        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
187    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
188}
189
190impl<C: CanisterKind> EntityRuntimeHooks<C> {
191    #[must_use]
192    pub(in crate::db) const fn new(
193        entity_name: &'static str,
194        entity_path: &'static str,
195        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
196        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
197    ) -> Self {
198        Self {
199            entity_name,
200            entity_path,
201            prepare_row_commit,
202            validate_delete_strong_relations,
203        }
204    }
205
206    #[must_use]
207    pub const fn for_entity<E>(
208        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
209    ) -> Self
210    where
211        E: EntityKind<Canister = C> + EntityValue,
212    {
213        Self::new(
214            <E as EntityIdentity>::ENTITY_NAME,
215            E::PATH,
216            prepare_row_commit_for_entity::<E>,
217            validate_delete_strong_relations,
218        )
219    }
220}
221
222impl<C: CanisterKind> Db<C> {
223    #[must_use]
224    pub(crate) const fn has_runtime_hooks(&self) -> bool {
225        !self.entity_runtime_hooks.is_empty()
226    }
227
228    pub(crate) fn runtime_hook_for_entity_name(
229        &self,
230        entity_name: &str,
231    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
232        let mut matched = None;
233        for hooks in self.entity_runtime_hooks {
234            if hooks.entity_name != entity_name {
235                continue;
236            }
237
238            if matched.is_some() {
239                return Err(InternalError::store_invariant(format!(
240                    "duplicate runtime hooks for entity name '{entity_name}'"
241                )));
242            }
243
244            matched = Some(hooks);
245        }
246
247        matched.ok_or_else(|| {
248            InternalError::store_unsupported(format!(
249                "unsupported entity name in data store: '{entity_name}'"
250            ))
251        })
252    }
253}
254
255impl<C: CanisterKind> Copy for Db<C> {}
256
257impl<C: CanisterKind> Clone for Db<C> {
258    fn clone(&self) -> Self {
259        *self
260    }
261}
262
263///
264/// DbSession
265///
266/// Session-scoped database handle with policy (debug, metrics) and execution routing.
267///
268pub struct DbSession<C: CanisterKind> {
269    db: Db<C>,
270    debug: bool,
271    metrics: Option<&'static dyn MetricsSink>,
272}
273
274impl<C: CanisterKind> DbSession<C> {
275    #[must_use]
276    pub const fn new(db: Db<C>) -> Self {
277        Self {
278            db,
279            debug: false,
280            metrics: None,
281        }
282    }
283
284    #[must_use]
285    pub const fn debug(mut self) -> Self {
286        self.debug = true;
287        self
288    }
289
290    #[must_use]
291    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
292        self.metrics = Some(sink);
293        self
294    }
295
296    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
297        if let Some(sink) = self.metrics {
298            with_metrics_sink(sink, f)
299        } else {
300            f()
301        }
302    }
303
304    // Shared save-facade wrapper keeps metrics wiring and response shaping uniform.
305    fn execute_save_with<E, T, R>(
306        &self,
307        op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
308        map: impl FnOnce(T) -> R,
309    ) -> Result<R, InternalError>
310    where
311        E: EntityKind<Canister = C> + EntityValue,
312    {
313        let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
314
315        Ok(map(value))
316    }
317
318    // Shared save-facade wrappers keep response shape explicit at call sites.
319    fn execute_save_entity<E>(
320        &self,
321        op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
322    ) -> Result<WriteResponse<E>, InternalError>
323    where
324        E: EntityKind<Canister = C> + EntityValue,
325    {
326        self.execute_save_with(op, WriteResponse::new)
327    }
328
329    fn execute_save_batch<E>(
330        &self,
331        op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
332    ) -> Result<WriteBatchResponse<E>, InternalError>
333    where
334        E: EntityKind<Canister = C> + EntityValue,
335    {
336        self.execute_save_with(op, WriteBatchResponse::new)
337    }
338
339    fn execute_save_view<E>(
340        &self,
341        op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
342    ) -> Result<E::ViewType, InternalError>
343    where
344        E: EntityKind<Canister = C> + EntityValue,
345    {
346        self.execute_save_with(op, std::convert::identity)
347    }
348
349    // ---------------------------------------------------------------------
350    // Query entry points (public, fluent)
351    // ---------------------------------------------------------------------
352
353    #[must_use]
354    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
355    where
356        E: EntityKind<Canister = C>,
357    {
358        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
359    }
360
361    #[must_use]
362    pub const fn load_with_consistency<E>(
363        &self,
364        consistency: ReadConsistency,
365    ) -> SessionLoadQuery<'_, C, E>
366    where
367        E: EntityKind<Canister = C>,
368    {
369        SessionLoadQuery::new(self, Query::new(consistency))
370    }
371
372    #[must_use]
373    pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
374    where
375        E: EntityKind<Canister = C>,
376    {
377        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
378    }
379
380    #[must_use]
381    pub fn delete_with_consistency<E>(
382        &self,
383        consistency: ReadConsistency,
384    ) -> SessionDeleteQuery<'_, C, E>
385    where
386        E: EntityKind<Canister = C>,
387    {
388        SessionDeleteQuery::new(self, Query::new(consistency).delete())
389    }
390
391    // ---------------------------------------------------------------------
392    // Low-level executors (crate-internal; execution primitives)
393    // ---------------------------------------------------------------------
394
395    #[must_use]
396    pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
397    where
398        E: EntityKind<Canister = C> + EntityValue,
399    {
400        LoadExecutor::new(self.db, self.debug)
401    }
402
403    #[must_use]
404    pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
405    where
406        E: EntityKind<Canister = C> + EntityValue,
407    {
408        DeleteExecutor::new(self.db, self.debug)
409    }
410
411    #[must_use]
412    pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
413    where
414        E: EntityKind<Canister = C> + EntityValue,
415    {
416        SaveExecutor::new(self.db, self.debug)
417    }
418
419    // ---------------------------------------------------------------------
420    // Query diagnostics / execution (internal routing)
421    // ---------------------------------------------------------------------
422
423    pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
424    where
425        E: EntityKind<Canister = C> + EntityValue,
426    {
427        let plan = query.plan()?;
428
429        let result = match query.mode() {
430            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
431            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
432        };
433
434        result.map_err(QueryError::Execute)
435    }
436
437    pub(crate) fn execute_load_query_paged_with_trace<E>(
438        &self,
439        query: &Query<E>,
440        cursor_token: Option<&str>,
441    ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
442    where
443        E: EntityKind<Canister = C> + EntityValue,
444    {
445        let plan = query.plan()?;
446        let cursor_bytes = match cursor_token {
447            Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
448                QueryError::from(PlanError::from(
449                    crate::db::query::plan::CursorPlanError::InvalidContinuationCursor { reason },
450                ))
451            })?),
452            None => None,
453        };
454        let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
455
456        let (page, trace) = self
457            .with_metrics(|| {
458                self.load_executor::<E>()
459                    .execute_paged_with_cursor_traced(plan, cursor)
460            })
461            .map_err(QueryError::Execute)?;
462
463        Ok((page.items, page.next_cursor, trace))
464    }
465
466    // ---------------------------------------------------------------------
467    // High-level write API (public, intent-level)
468    // ---------------------------------------------------------------------
469
470    pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
471    where
472        E: EntityKind<Canister = C> + EntityValue,
473    {
474        self.execute_save_entity(|save| save.insert(entity))
475    }
476
477    /// Insert a single-entity-type batch atomically in one commit window.
478    ///
479    /// If any item fails pre-commit validation, no row in the batch is persisted.
480    ///
481    /// This API is not a multi-entity transaction surface.
482    pub fn insert_many_atomic<E>(
483        &self,
484        entities: impl IntoIterator<Item = E>,
485    ) -> Result<WriteBatchResponse<E>, InternalError>
486    where
487        E: EntityKind<Canister = C> + EntityValue,
488    {
489        self.execute_save_batch(|save| save.insert_many_atomic(entities))
490    }
491
492    /// Insert a batch with explicitly non-atomic semantics.
493    ///
494    /// WARNING: fail-fast and non-atomic. Earlier inserts may commit before an error.
495    pub fn insert_many_non_atomic<E>(
496        &self,
497        entities: impl IntoIterator<Item = E>,
498    ) -> Result<WriteBatchResponse<E>, InternalError>
499    where
500        E: EntityKind<Canister = C> + EntityValue,
501    {
502        self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
503    }
504
505    pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
506    where
507        E: EntityKind<Canister = C> + EntityValue,
508    {
509        self.execute_save_entity(|save| save.replace(entity))
510    }
511
512    /// Replace a single-entity-type batch atomically in one commit window.
513    ///
514    /// If any item fails pre-commit validation, no row in the batch is persisted.
515    ///
516    /// This API is not a multi-entity transaction surface.
517    pub fn replace_many_atomic<E>(
518        &self,
519        entities: impl IntoIterator<Item = E>,
520    ) -> Result<WriteBatchResponse<E>, InternalError>
521    where
522        E: EntityKind<Canister = C> + EntityValue,
523    {
524        self.execute_save_batch(|save| save.replace_many_atomic(entities))
525    }
526
527    /// Replace a batch with explicitly non-atomic semantics.
528    ///
529    /// WARNING: fail-fast and non-atomic. Earlier replaces may commit before an error.
530    pub fn replace_many_non_atomic<E>(
531        &self,
532        entities: impl IntoIterator<Item = E>,
533    ) -> Result<WriteBatchResponse<E>, InternalError>
534    where
535        E: EntityKind<Canister = C> + EntityValue,
536    {
537        self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
538    }
539
540    pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
541    where
542        E: EntityKind<Canister = C> + EntityValue,
543    {
544        self.execute_save_entity(|save| save.update(entity))
545    }
546
547    /// Update a single-entity-type batch atomically in one commit window.
548    ///
549    /// If any item fails pre-commit validation, no row in the batch is persisted.
550    ///
551    /// This API is not a multi-entity transaction surface.
552    pub fn update_many_atomic<E>(
553        &self,
554        entities: impl IntoIterator<Item = E>,
555    ) -> Result<WriteBatchResponse<E>, InternalError>
556    where
557        E: EntityKind<Canister = C> + EntityValue,
558    {
559        self.execute_save_batch(|save| save.update_many_atomic(entities))
560    }
561
562    /// Update a batch with explicitly non-atomic semantics.
563    ///
564    /// WARNING: fail-fast and non-atomic. Earlier updates may commit before an error.
565    pub fn update_many_non_atomic<E>(
566        &self,
567        entities: impl IntoIterator<Item = E>,
568    ) -> Result<WriteBatchResponse<E>, InternalError>
569    where
570        E: EntityKind<Canister = C> + EntityValue,
571    {
572        self.execute_save_batch(|save| save.update_many_non_atomic(entities))
573    }
574
575    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
576    where
577        E: EntityKind<Canister = C> + EntityValue,
578    {
579        self.execute_save_view::<E>(|save| save.insert_view(view))
580    }
581
582    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
583    where
584        E: EntityKind<Canister = C> + EntityValue,
585    {
586        self.execute_save_view::<E>(|save| save.replace_view(view))
587    }
588
589    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
590    where
591        E: EntityKind<Canister = C> + EntityValue,
592    {
593        self.execute_save_view::<E>(|save| save.update_view(view))
594    }
595
596    /// TEST ONLY: clear all registered data and index stores for this database.
597    #[cfg(test)]
598    #[doc(hidden)]
599    pub fn clear_stores_for_tests(&self) {
600        self.db.with_store_registry(|reg| {
601            for (_, store) in reg.iter() {
602                store.with_data_mut(DataStore::clear);
603                store.with_index_mut(IndexStore::clear);
604            }
605        });
606    }
607}