Skip to main content

icydb_core/db/
mod.rs

1//! Module: db
2//!
3//! Responsibility: root subsystem wiring, façade re-exports, and runtime hook contracts.
4//! Does not own: feature semantics delegated to child modules (`query`, `executor`, etc.).
5//! Boundary: top-level db API and internal orchestration entrypoints.
6
7pub(crate) mod access;
8pub(crate) mod contracts;
9pub(crate) mod cursor;
10pub(crate) mod diagnostics;
11pub(crate) mod identity;
12pub(crate) mod predicate;
13pub(crate) mod query;
14pub(crate) mod registry;
15pub(crate) mod response;
16pub(crate) mod session;
17
18pub(in crate::db) mod codec;
19pub(in crate::db) mod commit;
20pub(in crate::db) mod data;
21pub(in crate::db) mod direction;
22pub(in crate::db) mod executor;
23pub(in crate::db) mod index;
24pub(in crate::db) mod numeric;
25pub(in crate::db) mod relation;
26
27use crate::{
28    db::{
29        commit::{
30            CommitRowOp, CommitSchemaFingerprint, PreparedRowCommitOp,
31            commit_schema_fingerprint_for_entity, ensure_recovered, prepare_row_commit_for_entity,
32            rebuild_secondary_indexes_from_rows, replay_commit_marker_row_ops,
33        },
34        data::RawDataKey,
35        executor::Context,
36        relation::StrongRelationDeleteValidateFn,
37    },
38    error::InternalError,
39    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
40    value::Value,
41};
42use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
43
44pub use codec::cursor::{decode_cursor, encode_cursor};
45pub use data::DataStore;
46pub(crate) use data::StorageKey;
47pub use diagnostics::StorageReport;
48pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
49pub use identity::{EntityName, IndexName};
50pub use index::IndexStore;
51pub use predicate::MissingRowPolicy;
52pub use predicate::ValidateError;
53pub use predicate::{CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature};
54pub use query::{
55    builder::{
56        AggregateExpr, FieldRef, count, count_by, exists, first, last, max, max_by, min, min_by,
57        sum,
58    },
59    expr::{FilterExpr, SortExpr},
60    fluent::{
61        delete::FluentDeleteQuery,
62        load::{FluentLoadQuery, PagedLoadQuery},
63    },
64    intent::{
65        CompiledQuery, DeleteSpec, IntentError, LoadSpec, Query, QueryError, QueryExecuteError,
66        QueryMode,
67    },
68    plan::{OrderDirection, PlanError},
69};
70pub use registry::StoreRegistry;
71pub use relation::validate_delete_strong_relations_for_source;
72pub use response::{
73    PagedLoadExecution, PagedLoadExecutionWithTrace, ProjectedRow, Response, ResponseError, Row,
74    WriteBatchResponse, WriteResponse,
75};
76pub use session::DbSession;
77
78///
79/// GroupedRow
80///
81/// One grouped result row: ordered grouping key values plus ordered aggregate outputs.
82/// Group/aggregate vectors preserve query declaration order.
83///
84#[derive(Clone, Debug, Eq, PartialEq)]
85pub struct GroupedRow {
86    group_key: Vec<Value>,
87    aggregate_values: Vec<Value>,
88}
89
90impl GroupedRow {
91    /// Construct one grouped row payload.
92    #[must_use]
93    pub const fn new(group_key: Vec<Value>, aggregate_values: Vec<Value>) -> Self {
94        Self {
95            group_key,
96            aggregate_values,
97        }
98    }
99
100    /// Borrow grouped key values.
101    #[must_use]
102    pub const fn group_key(&self) -> &[Value] {
103        self.group_key.as_slice()
104    }
105
106    /// Borrow aggregate output values.
107    #[must_use]
108    pub const fn aggregate_values(&self) -> &[Value] {
109        self.aggregate_values.as_slice()
110    }
111}
112
113///
114/// PagedGroupedExecution
115///
116/// Cursor-paged grouped execution payload with optional grouped continuation cursor bytes.
117///
118#[derive(Debug)]
119pub struct PagedGroupedExecution {
120    rows: Vec<GroupedRow>,
121    continuation_cursor: Option<Vec<u8>>,
122}
123
124impl PagedGroupedExecution {
125    /// Construct one grouped paged execution payload.
126    #[must_use]
127    pub const fn new(rows: Vec<GroupedRow>, continuation_cursor: Option<Vec<u8>>) -> Self {
128        Self {
129            rows,
130            continuation_cursor,
131        }
132    }
133
134    /// Borrow grouped rows.
135    #[must_use]
136    pub const fn rows(&self) -> &[GroupedRow] {
137        self.rows.as_slice()
138    }
139
140    /// Borrow optional continuation cursor bytes.
141    #[must_use]
142    pub fn continuation_cursor(&self) -> Option<&[u8]> {
143        self.continuation_cursor.as_deref()
144    }
145
146    /// Consume into grouped rows and continuation cursor bytes.
147    #[must_use]
148    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>) {
149        (self.rows, self.continuation_cursor)
150    }
151}
152
153///
154/// PagedGroupedExecutionWithTrace
155///
156/// Cursor-paged grouped execution payload plus optional route/execution trace.
157///
158#[derive(Debug)]
159pub struct PagedGroupedExecutionWithTrace {
160    execution: PagedGroupedExecution,
161    execution_trace: Option<ExecutionTrace>,
162}
163
164impl PagedGroupedExecutionWithTrace {
165    /// Construct one traced grouped paged execution payload.
166    #[must_use]
167    pub const fn new(
168        rows: Vec<GroupedRow>,
169        continuation_cursor: Option<Vec<u8>>,
170        execution_trace: Option<ExecutionTrace>,
171    ) -> Self {
172        Self {
173            execution: PagedGroupedExecution::new(rows, continuation_cursor),
174            execution_trace,
175        }
176    }
177
178    /// Borrow grouped execution payload.
179    #[must_use]
180    pub const fn execution(&self) -> &PagedGroupedExecution {
181        &self.execution
182    }
183
184    /// Borrow grouped rows.
185    #[must_use]
186    pub const fn rows(&self) -> &[GroupedRow] {
187        self.execution.rows()
188    }
189
190    /// Borrow optional continuation cursor bytes.
191    #[must_use]
192    pub fn continuation_cursor(&self) -> Option<&[u8]> {
193        self.execution.continuation_cursor()
194    }
195
196    /// Borrow optional execution trace details.
197    #[must_use]
198    pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
199        self.execution_trace.as_ref()
200    }
201
202    /// Consume payload and drop trace details.
203    #[must_use]
204    pub fn into_execution(self) -> PagedGroupedExecution {
205        self.execution
206    }
207
208    /// Consume into grouped rows, continuation cursor bytes, and optional trace.
209    #[must_use]
210    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>, Option<ExecutionTrace>) {
211        let (rows, continuation_cursor) = self.execution.into_parts();
212
213        (rows, continuation_cursor, self.execution_trace)
214    }
215}
216
217///
218/// Db
219/// A handle to the set of stores registered for a specific canister domain.
220///
221
222pub struct Db<C: CanisterKind> {
223    store: &'static LocalKey<StoreRegistry>,
224    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
225    _marker: PhantomData<C>,
226}
227
228impl<C: CanisterKind> Db<C> {
229    /// Construct a db handle without per-entity runtime hooks.
230    #[must_use]
231    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
232        Self::new_with_hooks(store, &[])
233    }
234
235    /// Construct a db handle with explicit per-entity runtime hook wiring.
236    #[must_use]
237    pub const fn new_with_hooks(
238        store: &'static LocalKey<StoreRegistry>,
239        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
240    ) -> Self {
241        Self {
242            store,
243            entity_runtime_hooks,
244            _marker: PhantomData,
245        }
246    }
247
248    #[must_use]
249    pub(crate) const fn context<E>(&self) -> Context<'_, E>
250    where
251        E: EntityKind<Canister = C> + EntityValue,
252    {
253        Context::new(self)
254    }
255
256    /// Return a recovery-guarded context for read paths.
257    ///
258    /// This enforces startup recovery and a fast persisted-marker check so reads
259    /// do not proceed while an incomplete commit is pending replay.
260    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
261    where
262        E: EntityKind<Canister = C> + EntityValue,
263    {
264        ensure_recovered(self)?;
265
266        Ok(Context::new(self))
267    }
268
269    /// Ensure startup/in-progress commit recovery has been applied.
270    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
271        ensure_recovered(self)
272    }
273
274    /// Execute one closure against the registered store set.
275    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
276        self.store.with(|reg| f(reg))
277    }
278
279    /// Build one storage diagnostics report for registered stores/entities.
280    pub fn storage_report(
281        &self,
282        name_to_path: &[(&'static str, &'static str)],
283    ) -> Result<StorageReport, InternalError> {
284        diagnostics::storage_report(self, name_to_path)
285    }
286
287    pub(in crate::db) fn prepare_row_commit_op(
288        &self,
289        op: &CommitRowOp,
290    ) -> Result<PreparedRowCommitOp, InternalError> {
291        let hooks = self.runtime_hook_for_entity_path(op.entity_path.as_str())?;
292
293        (hooks.prepare_row_commit)(self, op)
294    }
295
296    pub(in crate::db) fn replay_commit_marker_row_ops(
297        &self,
298        row_ops: &[CommitRowOp],
299    ) -> Result<(), InternalError> {
300        replay_commit_marker_row_ops(self, row_ops)
301    }
302
303    pub(in crate::db) fn rebuild_secondary_indexes_from_rows(&self) -> Result<(), InternalError> {
304        rebuild_secondary_indexes_from_rows(self)
305    }
306
307    // Validate strong relation constraints for delete-selected target keys.
308    pub(crate) fn validate_delete_strong_relations(
309        &self,
310        target_path: &str,
311        deleted_target_keys: &BTreeSet<RawDataKey>,
312    ) -> Result<(), InternalError> {
313        // Skip hook traversal when no target keys were deleted.
314        if deleted_target_keys.is_empty() {
315            return Ok(());
316        }
317
318        // Delegate delete-side relation validation to each entity runtime hook.
319        for hooks in self.entity_runtime_hooks {
320            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
321        }
322
323        Ok(())
324    }
325}
326
327///
328/// EntityRuntimeHooks
329///
330/// Per-entity runtime callbacks used for commit preparation and delete-side
331/// strong relation validation.
332///
333
334pub struct EntityRuntimeHooks<C: CanisterKind> {
335    pub(crate) entity_name: &'static str,
336    pub(crate) entity_path: &'static str,
337    pub(in crate::db) commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
338    pub(in crate::db) prepare_row_commit:
339        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
340    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
341}
342
343impl<C: CanisterKind> EntityRuntimeHooks<C> {
344    #[must_use]
345    /// Build one runtime hook contract for a concrete runtime entity.
346    pub(in crate::db) const fn new(
347        entity_name: &'static str,
348        entity_path: &'static str,
349        commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
350        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
351        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
352    ) -> Self {
353        Self {
354            entity_name,
355            entity_path,
356            commit_schema_fingerprint,
357            prepare_row_commit,
358            validate_delete_strong_relations,
359        }
360    }
361
362    #[must_use]
363    /// Build runtime hooks from one entity type and delete-validation callback.
364    pub const fn for_entity<E>(
365        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
366    ) -> Self
367    where
368        E: EntityKind<Canister = C> + EntityValue,
369    {
370        Self::new(
371            <E as EntityIdentity>::ENTITY_NAME,
372            E::PATH,
373            commit_schema_fingerprint_for_runtime_entity::<E>,
374            prepare_row_commit_for_entity::<E>,
375            validate_delete_strong_relations,
376        )
377    }
378}
379
380fn commit_schema_fingerprint_for_runtime_entity<E>() -> CommitSchemaFingerprint
381where
382    E: EntityKind,
383{
384    commit_schema_fingerprint_for_entity::<E>()
385}
386
387impl<C: CanisterKind> Db<C> {
388    #[must_use]
389    /// Return whether this db has any registered runtime hook callbacks.
390    pub(crate) const fn has_runtime_hooks(&self) -> bool {
391        !self.entity_runtime_hooks.is_empty()
392    }
393
394    // Resolve exactly one runtime hook for a persisted entity name.
395    // Duplicate matches are treated as store invariants.
396    pub(crate) fn runtime_hook_for_entity_name(
397        &self,
398        entity_name: &str,
399    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
400        let mut matched = None;
401        for hooks in self.entity_runtime_hooks {
402            if hooks.entity_name != entity_name {
403                continue;
404            }
405
406            if matched.is_some() {
407                return Err(InternalError::store_invariant(format!(
408                    "duplicate runtime hooks for entity name '{entity_name}'"
409                )));
410            }
411
412            matched = Some(hooks);
413        }
414
415        matched.ok_or_else(|| {
416            InternalError::store_unsupported(format!(
417                "unsupported entity name in data store: '{entity_name}'"
418            ))
419        })
420    }
421
422    // Resolve exactly one runtime hook for a persisted entity path.
423    // Duplicate matches are treated as store invariants.
424    pub(crate) fn runtime_hook_for_entity_path(
425        &self,
426        entity_path: &str,
427    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
428        let mut matched = None;
429        for hooks in self.entity_runtime_hooks {
430            if hooks.entity_path != entity_path {
431                continue;
432            }
433
434            if matched.is_some() {
435                return Err(InternalError::store_invariant(format!(
436                    "duplicate runtime hooks for entity path '{entity_path}'"
437                )));
438            }
439
440            matched = Some(hooks);
441        }
442
443        matched.ok_or_else(|| InternalError::unsupported_entity_path(entity_path))
444    }
445}
446
447impl<C: CanisterKind> Copy for Db<C> {}
448
449impl<C: CanisterKind> Clone for Db<C> {
450    fn clone(&self) -> Self {
451        *self
452    }
453}