Skip to main content

icydb_core/db/
mod.rs

1//! Module: db
2//!
3//! Responsibility: root subsystem wiring, façade re-exports, and runtime hook contracts.
4//! Does not own: feature semantics delegated to child modules (`query`, `executor`, etc.).
5//! Boundary: top-level db API and internal orchestration entrypoints.
6
7pub(crate) mod access;
8pub(crate) mod contracts;
9pub(crate) mod cursor;
10pub(crate) mod diagnostics;
11pub(crate) mod identity;
12pub(crate) mod predicate;
13pub(crate) mod query;
14pub(crate) mod registry;
15pub(crate) mod response;
16pub(crate) mod session;
17
18pub(in crate::db) mod codec;
19pub(in crate::db) mod commit;
20pub(in crate::db) mod data;
21pub(in crate::db) mod direction;
22pub(in crate::db) mod executor;
23pub(in crate::db) mod index;
24pub(in crate::db) mod relation;
25
26use crate::{
27    db::{
28        commit::{
29            CommitRowOp, CommitSchemaFingerprint, PreparedRowCommitOp,
30            commit_schema_fingerprint_for_entity, ensure_recovered, prepare_row_commit_for_entity,
31            rebuild_secondary_indexes_from_rows, replay_commit_marker_row_ops,
32        },
33        data::RawDataKey,
34        executor::Context,
35        relation::StrongRelationDeleteValidateFn,
36    },
37    error::InternalError,
38    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
39    value::Value,
40};
41use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
42
43pub use codec::cursor::{decode_cursor, encode_cursor};
44pub use data::DataStore;
45pub(crate) use data::StorageKey;
46pub use diagnostics::StorageReport;
47pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
48pub use identity::{EntityName, IndexName};
49pub use index::IndexStore;
50pub use predicate::MissingRowPolicy;
51pub use predicate::ValidateError;
52pub use predicate::{CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature};
53pub use query::{
54    builder::{
55        AggregateExpr, FieldRef, count, count_by, exists, first, last, max, max_by, min, min_by,
56        sum,
57    },
58    expr::{FilterExpr, SortExpr},
59    fluent::{
60        delete::FluentDeleteQuery,
61        load::{FluentLoadQuery, PagedLoadQuery},
62    },
63    intent::{
64        CompiledQuery, DeleteSpec, IntentError, LoadSpec, Query, QueryError, QueryExecuteError,
65        QueryMode,
66    },
67    plan::{OrderDirection, PlanError},
68};
69pub use registry::StoreRegistry;
70pub use relation::validate_delete_strong_relations_for_source;
71pub use response::{
72    PagedLoadExecution, PagedLoadExecutionWithTrace, ProjectedRow, Response, ResponseError, Row,
73    WriteBatchResponse, WriteResponse,
74};
75pub use session::DbSession;
76
77///
78/// GroupedRow
79///
80/// One grouped result row: ordered grouping key values plus ordered aggregate outputs.
81/// Group/aggregate vectors preserve query declaration order.
82///
83#[derive(Clone, Debug, Eq, PartialEq)]
84pub struct GroupedRow {
85    group_key: Vec<Value>,
86    aggregate_values: Vec<Value>,
87}
88
89impl GroupedRow {
90    /// Construct one grouped row payload.
91    #[must_use]
92    pub const fn new(group_key: Vec<Value>, aggregate_values: Vec<Value>) -> Self {
93        Self {
94            group_key,
95            aggregate_values,
96        }
97    }
98
99    /// Borrow grouped key values.
100    #[must_use]
101    pub const fn group_key(&self) -> &[Value] {
102        self.group_key.as_slice()
103    }
104
105    /// Borrow aggregate output values.
106    #[must_use]
107    pub const fn aggregate_values(&self) -> &[Value] {
108        self.aggregate_values.as_slice()
109    }
110}
111
112///
113/// PagedGroupedExecution
114///
115/// Cursor-paged grouped execution payload with optional grouped continuation cursor bytes.
116///
117#[derive(Debug)]
118pub struct PagedGroupedExecution {
119    rows: Vec<GroupedRow>,
120    continuation_cursor: Option<Vec<u8>>,
121}
122
123impl PagedGroupedExecution {
124    /// Construct one grouped paged execution payload.
125    #[must_use]
126    pub const fn new(rows: Vec<GroupedRow>, continuation_cursor: Option<Vec<u8>>) -> Self {
127        Self {
128            rows,
129            continuation_cursor,
130        }
131    }
132
133    /// Borrow grouped rows.
134    #[must_use]
135    pub const fn rows(&self) -> &[GroupedRow] {
136        self.rows.as_slice()
137    }
138
139    /// Borrow optional continuation cursor bytes.
140    #[must_use]
141    pub fn continuation_cursor(&self) -> Option<&[u8]> {
142        self.continuation_cursor.as_deref()
143    }
144
145    /// Consume into grouped rows and continuation cursor bytes.
146    #[must_use]
147    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>) {
148        (self.rows, self.continuation_cursor)
149    }
150}
151
152///
153/// PagedGroupedExecutionWithTrace
154///
155/// Cursor-paged grouped execution payload plus optional route/execution trace.
156///
157#[derive(Debug)]
158pub struct PagedGroupedExecutionWithTrace {
159    execution: PagedGroupedExecution,
160    execution_trace: Option<ExecutionTrace>,
161}
162
163impl PagedGroupedExecutionWithTrace {
164    /// Construct one traced grouped paged execution payload.
165    #[must_use]
166    pub const fn new(
167        rows: Vec<GroupedRow>,
168        continuation_cursor: Option<Vec<u8>>,
169        execution_trace: Option<ExecutionTrace>,
170    ) -> Self {
171        Self {
172            execution: PagedGroupedExecution::new(rows, continuation_cursor),
173            execution_trace,
174        }
175    }
176
177    /// Borrow grouped execution payload.
178    #[must_use]
179    pub const fn execution(&self) -> &PagedGroupedExecution {
180        &self.execution
181    }
182
183    /// Borrow grouped rows.
184    #[must_use]
185    pub const fn rows(&self) -> &[GroupedRow] {
186        self.execution.rows()
187    }
188
189    /// Borrow optional continuation cursor bytes.
190    #[must_use]
191    pub fn continuation_cursor(&self) -> Option<&[u8]> {
192        self.execution.continuation_cursor()
193    }
194
195    /// Borrow optional execution trace details.
196    #[must_use]
197    pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
198        self.execution_trace.as_ref()
199    }
200
201    /// Consume payload and drop trace details.
202    #[must_use]
203    pub fn into_execution(self) -> PagedGroupedExecution {
204        self.execution
205    }
206
207    /// Consume into grouped rows, continuation cursor bytes, and optional trace.
208    #[must_use]
209    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>, Option<ExecutionTrace>) {
210        let (rows, continuation_cursor) = self.execution.into_parts();
211
212        (rows, continuation_cursor, self.execution_trace)
213    }
214}
215
216///
217/// Db
218/// A handle to the set of stores registered for a specific canister domain.
219///
220
221pub struct Db<C: CanisterKind> {
222    store: &'static LocalKey<StoreRegistry>,
223    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
224    _marker: PhantomData<C>,
225}
226
227impl<C: CanisterKind> Db<C> {
228    /// Construct a db handle without per-entity runtime hooks.
229    #[must_use]
230    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
231        Self::new_with_hooks(store, &[])
232    }
233
234    /// Construct a db handle with explicit per-entity runtime hook wiring.
235    #[must_use]
236    pub const fn new_with_hooks(
237        store: &'static LocalKey<StoreRegistry>,
238        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
239    ) -> Self {
240        Self {
241            store,
242            entity_runtime_hooks,
243            _marker: PhantomData,
244        }
245    }
246
247    #[must_use]
248    pub(crate) const fn context<E>(&self) -> Context<'_, E>
249    where
250        E: EntityKind<Canister = C> + EntityValue,
251    {
252        Context::new(self)
253    }
254
255    /// Return a recovery-guarded context for read paths.
256    ///
257    /// This enforces startup recovery and a fast persisted-marker check so reads
258    /// do not proceed while an incomplete commit is pending replay.
259    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
260    where
261        E: EntityKind<Canister = C> + EntityValue,
262    {
263        ensure_recovered(self)?;
264
265        Ok(Context::new(self))
266    }
267
268    /// Ensure startup/in-progress commit recovery has been applied.
269    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
270        ensure_recovered(self)
271    }
272
273    /// Execute one closure against the registered store set.
274    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
275        self.store.with(|reg| f(reg))
276    }
277
278    /// Build one storage diagnostics report for registered stores/entities.
279    pub fn storage_report(
280        &self,
281        name_to_path: &[(&'static str, &'static str)],
282    ) -> Result<StorageReport, InternalError> {
283        diagnostics::storage_report(self, name_to_path)
284    }
285
286    pub(in crate::db) fn prepare_row_commit_op(
287        &self,
288        op: &CommitRowOp,
289    ) -> Result<PreparedRowCommitOp, InternalError> {
290        let hooks = self.runtime_hook_for_entity_path(op.entity_path.as_str())?;
291
292        (hooks.prepare_row_commit)(self, op)
293    }
294
295    pub(in crate::db) fn replay_commit_marker_row_ops(
296        &self,
297        row_ops: &[CommitRowOp],
298    ) -> Result<(), InternalError> {
299        replay_commit_marker_row_ops(self, row_ops)
300    }
301
302    pub(in crate::db) fn rebuild_secondary_indexes_from_rows(&self) -> Result<(), InternalError> {
303        rebuild_secondary_indexes_from_rows(self)
304    }
305
306    // Validate strong relation constraints for delete-selected target keys.
307    pub(crate) fn validate_delete_strong_relations(
308        &self,
309        target_path: &str,
310        deleted_target_keys: &BTreeSet<RawDataKey>,
311    ) -> Result<(), InternalError> {
312        // Skip hook traversal when no target keys were deleted.
313        if deleted_target_keys.is_empty() {
314            return Ok(());
315        }
316
317        // Delegate delete-side relation validation to each entity runtime hook.
318        for hooks in self.entity_runtime_hooks {
319            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
320        }
321
322        Ok(())
323    }
324}
325
326///
327/// EntityRuntimeHooks
328///
329/// Per-entity runtime callbacks used for commit preparation and delete-side
330/// strong relation validation.
331///
332
333pub struct EntityRuntimeHooks<C: CanisterKind> {
334    pub(crate) entity_name: &'static str,
335    pub(crate) entity_path: &'static str,
336    pub(in crate::db) commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
337    pub(in crate::db) prepare_row_commit:
338        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
339    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
340}
341
342impl<C: CanisterKind> EntityRuntimeHooks<C> {
343    #[must_use]
344    /// Build one runtime hook contract for a concrete runtime entity.
345    pub(in crate::db) const fn new(
346        entity_name: &'static str,
347        entity_path: &'static str,
348        commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
349        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
350        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
351    ) -> Self {
352        Self {
353            entity_name,
354            entity_path,
355            commit_schema_fingerprint,
356            prepare_row_commit,
357            validate_delete_strong_relations,
358        }
359    }
360
361    #[must_use]
362    /// Build runtime hooks from one entity type and delete-validation callback.
363    pub const fn for_entity<E>(
364        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
365    ) -> Self
366    where
367        E: EntityKind<Canister = C> + EntityValue,
368    {
369        Self::new(
370            <E as EntityIdentity>::ENTITY_NAME,
371            E::PATH,
372            commit_schema_fingerprint_for_runtime_entity::<E>,
373            prepare_row_commit_for_entity::<E>,
374            validate_delete_strong_relations,
375        )
376    }
377}
378
379fn commit_schema_fingerprint_for_runtime_entity<E>() -> CommitSchemaFingerprint
380where
381    E: EntityKind,
382{
383    commit_schema_fingerprint_for_entity::<E>()
384}
385
386impl<C: CanisterKind> Db<C> {
387    #[must_use]
388    /// Return whether this db has any registered runtime hook callbacks.
389    pub(crate) const fn has_runtime_hooks(&self) -> bool {
390        !self.entity_runtime_hooks.is_empty()
391    }
392
393    // Resolve exactly one runtime hook for a persisted entity name.
394    // Duplicate matches are treated as store invariants.
395    pub(crate) fn runtime_hook_for_entity_name(
396        &self,
397        entity_name: &str,
398    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
399        let mut matched = None;
400        for hooks in self.entity_runtime_hooks {
401            if hooks.entity_name != entity_name {
402                continue;
403            }
404
405            if matched.is_some() {
406                return Err(InternalError::store_invariant(format!(
407                    "duplicate runtime hooks for entity name '{entity_name}'"
408                )));
409            }
410
411            matched = Some(hooks);
412        }
413
414        matched.ok_or_else(|| {
415            InternalError::store_unsupported(format!(
416                "unsupported entity name in data store: '{entity_name}'"
417            ))
418        })
419    }
420
421    // Resolve exactly one runtime hook for a persisted entity path.
422    // Duplicate matches are treated as store invariants.
423    pub(crate) fn runtime_hook_for_entity_path(
424        &self,
425        entity_path: &str,
426    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
427        let mut matched = None;
428        for hooks in self.entity_runtime_hooks {
429            if hooks.entity_path != entity_path {
430                continue;
431            }
432
433            if matched.is_some() {
434                return Err(InternalError::store_invariant(format!(
435                    "duplicate runtime hooks for entity path '{entity_path}'"
436                )));
437            }
438
439            matched = Some(hooks);
440        }
441
442        matched.ok_or_else(|| InternalError::unsupported_entity_path(entity_path))
443    }
444}
445
446impl<C: CanisterKind> Copy for Db<C> {}
447
448impl<C: CanisterKind> Clone for Db<C> {
449    fn clone(&self) -> Self {
450        *self
451    }
452}