Skip to main content

icydb_core/db/
mod.rs

1//! Module: db
2//!
3//! Responsibility: root subsystem wiring, façade re-exports, and runtime hook contracts.
4//! Does not own: feature semantics delegated to child modules (`query`, `executor`, etc.).
5//! Boundary: top-level db API and internal orchestration entrypoints.
6
7pub(crate) mod access;
8pub(crate) mod contracts;
9pub(crate) mod cursor;
10pub(crate) mod diagnostics;
11pub(crate) mod identity;
12pub(crate) mod predicate;
13pub(crate) mod query;
14pub(crate) mod registry;
15pub(crate) mod response;
16pub(crate) mod session;
17
18pub(in crate::db) mod codec;
19pub(in crate::db) mod commit;
20pub(in crate::db) mod data;
21pub(in crate::db) mod direction;
22pub(in crate::db) mod executor;
23pub(in crate::db) mod index;
24pub(in crate::db) mod relation;
25
26use crate::{
27    db::{
28        commit::{
29            CommitRowOp, CommitSchemaFingerprint, PreparedRowCommitOp,
30            commit_schema_fingerprint_for_entity, ensure_recovered, prepare_row_commit_for_entity,
31            rebuild_secondary_indexes_from_rows, replay_commit_marker_row_ops,
32        },
33        data::RawDataKey,
34        executor::Context,
35        relation::StrongRelationDeleteValidateFn,
36    },
37    error::InternalError,
38    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
39    value::Value,
40};
41use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
42
43pub use codec::cursor::{decode_cursor, encode_cursor};
44pub use data::DataStore;
45pub(crate) use data::StorageKey;
46pub use diagnostics::StorageReport;
47pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
48pub use identity::{EntityName, IndexName};
49pub use index::IndexStore;
50pub use predicate::MissingRowPolicy;
51pub use predicate::ValidateError;
52pub use predicate::{CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature};
53pub use query::{
54    builder::field::FieldRef,
55    expr::{FilterExpr, SortExpr},
56    fluent::{
57        delete::FluentDeleteQuery,
58        load::{FluentLoadQuery, PagedLoadQuery},
59    },
60    intent::{CompiledQuery, DeleteSpec, IntentError, LoadSpec, Query, QueryError, QueryMode},
61    plan::{OrderDirection, PlanError},
62};
63pub use registry::StoreRegistry;
64pub use relation::validate_delete_strong_relations_for_source;
65pub use response::{
66    PagedLoadExecution, PagedLoadExecutionWithTrace, Response, ResponseError, Row,
67    WriteBatchResponse, WriteResponse,
68};
69pub use session::DbSession;
70
71///
72/// GroupedRow
73///
74/// One grouped result row: ordered grouping key values plus ordered aggregate outputs.
75/// Group/aggregate vectors preserve query declaration order.
76///
77#[derive(Clone, Debug, Eq, PartialEq)]
78pub struct GroupedRow {
79    group_key: Vec<Value>,
80    aggregate_values: Vec<Value>,
81}
82
83impl GroupedRow {
84    /// Construct one grouped row payload.
85    #[must_use]
86    pub const fn new(group_key: Vec<Value>, aggregate_values: Vec<Value>) -> Self {
87        Self {
88            group_key,
89            aggregate_values,
90        }
91    }
92
93    /// Borrow grouped key values.
94    #[must_use]
95    pub const fn group_key(&self) -> &[Value] {
96        self.group_key.as_slice()
97    }
98
99    /// Borrow aggregate output values.
100    #[must_use]
101    pub const fn aggregate_values(&self) -> &[Value] {
102        self.aggregate_values.as_slice()
103    }
104}
105
106///
107/// PagedGroupedExecution
108///
109/// Cursor-paged grouped execution payload with optional grouped continuation cursor bytes.
110///
111#[derive(Debug)]
112pub struct PagedGroupedExecution {
113    rows: Vec<GroupedRow>,
114    continuation_cursor: Option<Vec<u8>>,
115}
116
117impl PagedGroupedExecution {
118    /// Construct one grouped paged execution payload.
119    #[must_use]
120    pub const fn new(rows: Vec<GroupedRow>, continuation_cursor: Option<Vec<u8>>) -> Self {
121        Self {
122            rows,
123            continuation_cursor,
124        }
125    }
126
127    /// Borrow grouped rows.
128    #[must_use]
129    pub const fn rows(&self) -> &[GroupedRow] {
130        self.rows.as_slice()
131    }
132
133    /// Borrow optional continuation cursor bytes.
134    #[must_use]
135    pub fn continuation_cursor(&self) -> Option<&[u8]> {
136        self.continuation_cursor.as_deref()
137    }
138
139    /// Consume into grouped rows and continuation cursor bytes.
140    #[must_use]
141    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>) {
142        (self.rows, self.continuation_cursor)
143    }
144}
145
146///
147/// PagedGroupedExecutionWithTrace
148///
149/// Cursor-paged grouped execution payload plus optional route/execution trace.
150///
151#[derive(Debug)]
152pub struct PagedGroupedExecutionWithTrace {
153    execution: PagedGroupedExecution,
154    execution_trace: Option<ExecutionTrace>,
155}
156
157impl PagedGroupedExecutionWithTrace {
158    /// Construct one traced grouped paged execution payload.
159    #[must_use]
160    pub const fn new(
161        rows: Vec<GroupedRow>,
162        continuation_cursor: Option<Vec<u8>>,
163        execution_trace: Option<ExecutionTrace>,
164    ) -> Self {
165        Self {
166            execution: PagedGroupedExecution::new(rows, continuation_cursor),
167            execution_trace,
168        }
169    }
170
171    /// Borrow grouped execution payload.
172    #[must_use]
173    pub const fn execution(&self) -> &PagedGroupedExecution {
174        &self.execution
175    }
176
177    /// Borrow grouped rows.
178    #[must_use]
179    pub const fn rows(&self) -> &[GroupedRow] {
180        self.execution.rows()
181    }
182
183    /// Borrow optional continuation cursor bytes.
184    #[must_use]
185    pub fn continuation_cursor(&self) -> Option<&[u8]> {
186        self.execution.continuation_cursor()
187    }
188
189    /// Borrow optional execution trace details.
190    #[must_use]
191    pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
192        self.execution_trace.as_ref()
193    }
194
195    /// Consume payload and drop trace details.
196    #[must_use]
197    pub fn into_execution(self) -> PagedGroupedExecution {
198        self.execution
199    }
200
201    /// Consume into grouped rows, continuation cursor bytes, and optional trace.
202    #[must_use]
203    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>, Option<ExecutionTrace>) {
204        let (rows, continuation_cursor) = self.execution.into_parts();
205
206        (rows, continuation_cursor, self.execution_trace)
207    }
208}
209
210///
211/// Db
212/// A handle to the set of stores registered for a specific canister domain.
213///
214
215pub struct Db<C: CanisterKind> {
216    store: &'static LocalKey<StoreRegistry>,
217    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
218    _marker: PhantomData<C>,
219}
220
221impl<C: CanisterKind> Db<C> {
222    /// Construct a db handle without per-entity runtime hooks.
223    #[must_use]
224    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
225        Self::new_with_hooks(store, &[])
226    }
227
228    /// Construct a db handle with explicit per-entity runtime hook wiring.
229    #[must_use]
230    pub const fn new_with_hooks(
231        store: &'static LocalKey<StoreRegistry>,
232        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
233    ) -> Self {
234        Self {
235            store,
236            entity_runtime_hooks,
237            _marker: PhantomData,
238        }
239    }
240
241    #[must_use]
242    pub(crate) const fn context<E>(&self) -> Context<'_, E>
243    where
244        E: EntityKind<Canister = C> + EntityValue,
245    {
246        Context::new(self)
247    }
248
249    /// Return a recovery-guarded context for read paths.
250    ///
251    /// This enforces startup recovery and a fast persisted-marker check so reads
252    /// do not proceed while an incomplete commit is pending replay.
253    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
254    where
255        E: EntityKind<Canister = C> + EntityValue,
256    {
257        ensure_recovered(self)?;
258
259        Ok(Context::new(self))
260    }
261
262    /// Ensure startup/in-progress commit recovery has been applied.
263    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
264        ensure_recovered(self)
265    }
266
267    /// Execute one closure against the registered store set.
268    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
269        self.store.with(|reg| f(reg))
270    }
271
272    /// Build one storage diagnostics report for registered stores/entities.
273    pub fn storage_report(
274        &self,
275        name_to_path: &[(&'static str, &'static str)],
276    ) -> Result<StorageReport, InternalError> {
277        diagnostics::storage_report(self, name_to_path)
278    }
279
280    pub(in crate::db) fn prepare_row_commit_op(
281        &self,
282        op: &CommitRowOp,
283    ) -> Result<PreparedRowCommitOp, InternalError> {
284        let hooks = self.runtime_hook_for_entity_path(op.entity_path.as_str())?;
285
286        (hooks.prepare_row_commit)(self, op)
287    }
288
289    pub(in crate::db) fn replay_commit_marker_row_ops(
290        &self,
291        row_ops: &[CommitRowOp],
292    ) -> Result<(), InternalError> {
293        replay_commit_marker_row_ops(self, row_ops)
294    }
295
296    pub(in crate::db) fn rebuild_secondary_indexes_from_rows(&self) -> Result<(), InternalError> {
297        rebuild_secondary_indexes_from_rows(self)
298    }
299
300    // Validate strong relation constraints for delete-selected target keys.
301    pub(crate) fn validate_delete_strong_relations(
302        &self,
303        target_path: &str,
304        deleted_target_keys: &BTreeSet<RawDataKey>,
305    ) -> Result<(), InternalError> {
306        // Skip hook traversal when no target keys were deleted.
307        if deleted_target_keys.is_empty() {
308            return Ok(());
309        }
310
311        // Delegate delete-side relation validation to each entity runtime hook.
312        for hooks in self.entity_runtime_hooks {
313            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
314        }
315
316        Ok(())
317    }
318}
319
320///
321/// EntityRuntimeHooks
322///
323/// Per-entity runtime callbacks used for commit preparation and delete-side
324/// strong relation validation.
325///
326
327pub struct EntityRuntimeHooks<C: CanisterKind> {
328    pub(crate) entity_name: &'static str,
329    pub(crate) entity_path: &'static str,
330    pub(in crate::db) commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
331    pub(in crate::db) prepare_row_commit:
332        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
333    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
334}
335
336impl<C: CanisterKind> EntityRuntimeHooks<C> {
337    #[must_use]
338    /// Build one runtime hook contract for a concrete runtime entity.
339    pub(in crate::db) const fn new(
340        entity_name: &'static str,
341        entity_path: &'static str,
342        commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
343        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
344        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
345    ) -> Self {
346        Self {
347            entity_name,
348            entity_path,
349            commit_schema_fingerprint,
350            prepare_row_commit,
351            validate_delete_strong_relations,
352        }
353    }
354
355    #[must_use]
356    /// Build runtime hooks from one entity type and delete-validation callback.
357    pub const fn for_entity<E>(
358        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
359    ) -> Self
360    where
361        E: EntityKind<Canister = C> + EntityValue,
362    {
363        Self::new(
364            <E as EntityIdentity>::ENTITY_NAME,
365            E::PATH,
366            commit_schema_fingerprint_for_runtime_entity::<E>,
367            prepare_row_commit_for_entity::<E>,
368            validate_delete_strong_relations,
369        )
370    }
371}
372
373fn commit_schema_fingerprint_for_runtime_entity<E>() -> CommitSchemaFingerprint
374where
375    E: EntityKind,
376{
377    commit_schema_fingerprint_for_entity::<E>()
378}
379
380impl<C: CanisterKind> Db<C> {
381    #[must_use]
382    /// Return whether this db has any registered runtime hook callbacks.
383    pub(crate) const fn has_runtime_hooks(&self) -> bool {
384        !self.entity_runtime_hooks.is_empty()
385    }
386
387    // Resolve exactly one runtime hook for a persisted entity name.
388    // Duplicate matches are treated as store invariants.
389    pub(crate) fn runtime_hook_for_entity_name(
390        &self,
391        entity_name: &str,
392    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
393        let mut matched = None;
394        for hooks in self.entity_runtime_hooks {
395            if hooks.entity_name != entity_name {
396                continue;
397            }
398
399            if matched.is_some() {
400                return Err(InternalError::store_invariant(format!(
401                    "duplicate runtime hooks for entity name '{entity_name}'"
402                )));
403            }
404
405            matched = Some(hooks);
406        }
407
408        matched.ok_or_else(|| {
409            InternalError::store_unsupported(format!(
410                "unsupported entity name in data store: '{entity_name}'"
411            ))
412        })
413    }
414
415    // Resolve exactly one runtime hook for a persisted entity path.
416    // Duplicate matches are treated as store invariants.
417    pub(crate) fn runtime_hook_for_entity_path(
418        &self,
419        entity_path: &str,
420    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
421        let mut matched = None;
422        for hooks in self.entity_runtime_hooks {
423            if hooks.entity_path != entity_path {
424                continue;
425            }
426
427            if matched.is_some() {
428                return Err(InternalError::store_invariant(format!(
429                    "duplicate runtime hooks for entity path '{entity_path}'"
430                )));
431            }
432
433            matched = Some(hooks);
434        }
435
436        matched.ok_or_else(|| InternalError::unsupported_entity_path(entity_path))
437    }
438}
439
440impl<C: CanisterKind> Copy for Db<C> {}
441
442impl<C: CanisterKind> Clone for Db<C> {
443    fn clone(&self) -> Self {
444        *self
445    }
446}