Skip to main content

icydb_core/db/
mod.rs

1//! Module: db
2//!
3//! Responsibility: root subsystem wiring, façade re-exports, and runtime hook contracts.
4//! Does not own: feature semantics delegated to child modules (`query`, `executor`, etc.).
5//! Boundary: top-level db API and internal orchestration entrypoints.
6
7pub(crate) mod access;
8pub(crate) mod contracts;
9pub(crate) mod cursor;
10pub(crate) mod diagnostics;
11pub(crate) mod identity;
12pub(crate) mod predicate;
13pub(crate) mod query;
14pub(crate) mod registry;
15pub(crate) mod response;
16pub(crate) mod session;
17
18pub(in crate::db) mod codec;
19pub(in crate::db) mod commit;
20pub(in crate::db) mod data;
21pub(in crate::db) mod direction;
22pub(in crate::db) mod executor;
23pub(in crate::db) mod index;
24pub(in crate::db) mod relation;
25
26use crate::{
27    db::{
28        commit::{
29            CommitRowOp, CommitSchemaFingerprint, PreparedRowCommitOp,
30            commit_schema_fingerprint_for_entity, ensure_recovered, prepare_row_commit_for_entity,
31            rebuild_secondary_indexes_from_rows, replay_commit_marker_row_ops,
32        },
33        data::RawDataKey,
34        executor::Context,
35        relation::StrongRelationDeleteValidateFn,
36    },
37    error::InternalError,
38    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
39    value::Value,
40};
41use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
42
43pub use codec::cursor::{decode_cursor, encode_cursor};
44pub use data::DataStore;
45pub(crate) use data::StorageKey;
46pub use diagnostics::StorageReport;
47pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
48pub use identity::{EntityName, IndexName};
49pub use index::IndexStore;
50pub use predicate::MissingRowPolicy;
51pub use predicate::ValidateError;
52pub use predicate::{CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature};
53pub use query::{
54    builder::{
55        AggregateExpr, FieldRef, count, count_by, exists, first, last, max, max_by, min, min_by,
56        sum,
57    },
58    expr::{FilterExpr, SortExpr},
59    fluent::{
60        delete::FluentDeleteQuery,
61        load::{FluentLoadQuery, PagedLoadQuery},
62    },
63    intent::{CompiledQuery, DeleteSpec, IntentError, LoadSpec, Query, QueryError, QueryMode},
64    plan::{OrderDirection, PlanError},
65};
66pub use registry::StoreRegistry;
67pub use relation::validate_delete_strong_relations_for_source;
68pub use response::{
69    PagedLoadExecution, PagedLoadExecutionWithTrace, Response, ResponseError, Row,
70    WriteBatchResponse, WriteResponse,
71};
72pub use session::DbSession;
73
74///
75/// GroupedRow
76///
77/// One grouped result row: ordered grouping key values plus ordered aggregate outputs.
78/// Group/aggregate vectors preserve query declaration order.
79///
80#[derive(Clone, Debug, Eq, PartialEq)]
81pub struct GroupedRow {
82    group_key: Vec<Value>,
83    aggregate_values: Vec<Value>,
84}
85
86impl GroupedRow {
87    /// Construct one grouped row payload.
88    #[must_use]
89    pub const fn new(group_key: Vec<Value>, aggregate_values: Vec<Value>) -> Self {
90        Self {
91            group_key,
92            aggregate_values,
93        }
94    }
95
96    /// Borrow grouped key values.
97    #[must_use]
98    pub const fn group_key(&self) -> &[Value] {
99        self.group_key.as_slice()
100    }
101
102    /// Borrow aggregate output values.
103    #[must_use]
104    pub const fn aggregate_values(&self) -> &[Value] {
105        self.aggregate_values.as_slice()
106    }
107}
108
109///
110/// PagedGroupedExecution
111///
112/// Cursor-paged grouped execution payload with optional grouped continuation cursor bytes.
113///
114#[derive(Debug)]
115pub struct PagedGroupedExecution {
116    rows: Vec<GroupedRow>,
117    continuation_cursor: Option<Vec<u8>>,
118}
119
120impl PagedGroupedExecution {
121    /// Construct one grouped paged execution payload.
122    #[must_use]
123    pub const fn new(rows: Vec<GroupedRow>, continuation_cursor: Option<Vec<u8>>) -> Self {
124        Self {
125            rows,
126            continuation_cursor,
127        }
128    }
129
130    /// Borrow grouped rows.
131    #[must_use]
132    pub const fn rows(&self) -> &[GroupedRow] {
133        self.rows.as_slice()
134    }
135
136    /// Borrow optional continuation cursor bytes.
137    #[must_use]
138    pub fn continuation_cursor(&self) -> Option<&[u8]> {
139        self.continuation_cursor.as_deref()
140    }
141
142    /// Consume into grouped rows and continuation cursor bytes.
143    #[must_use]
144    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>) {
145        (self.rows, self.continuation_cursor)
146    }
147}
148
149///
150/// PagedGroupedExecutionWithTrace
151///
152/// Cursor-paged grouped execution payload plus optional route/execution trace.
153///
154#[derive(Debug)]
155pub struct PagedGroupedExecutionWithTrace {
156    execution: PagedGroupedExecution,
157    execution_trace: Option<ExecutionTrace>,
158}
159
160impl PagedGroupedExecutionWithTrace {
161    /// Construct one traced grouped paged execution payload.
162    #[must_use]
163    pub const fn new(
164        rows: Vec<GroupedRow>,
165        continuation_cursor: Option<Vec<u8>>,
166        execution_trace: Option<ExecutionTrace>,
167    ) -> Self {
168        Self {
169            execution: PagedGroupedExecution::new(rows, continuation_cursor),
170            execution_trace,
171        }
172    }
173
174    /// Borrow grouped execution payload.
175    #[must_use]
176    pub const fn execution(&self) -> &PagedGroupedExecution {
177        &self.execution
178    }
179
180    /// Borrow grouped rows.
181    #[must_use]
182    pub const fn rows(&self) -> &[GroupedRow] {
183        self.execution.rows()
184    }
185
186    /// Borrow optional continuation cursor bytes.
187    #[must_use]
188    pub fn continuation_cursor(&self) -> Option<&[u8]> {
189        self.execution.continuation_cursor()
190    }
191
192    /// Borrow optional execution trace details.
193    #[must_use]
194    pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
195        self.execution_trace.as_ref()
196    }
197
198    /// Consume payload and drop trace details.
199    #[must_use]
200    pub fn into_execution(self) -> PagedGroupedExecution {
201        self.execution
202    }
203
204    /// Consume into grouped rows, continuation cursor bytes, and optional trace.
205    #[must_use]
206    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>, Option<ExecutionTrace>) {
207        let (rows, continuation_cursor) = self.execution.into_parts();
208
209        (rows, continuation_cursor, self.execution_trace)
210    }
211}
212
213///
214/// Db
215/// A handle to the set of stores registered for a specific canister domain.
216///
217
218pub struct Db<C: CanisterKind> {
219    store: &'static LocalKey<StoreRegistry>,
220    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
221    _marker: PhantomData<C>,
222}
223
224impl<C: CanisterKind> Db<C> {
225    /// Construct a db handle without per-entity runtime hooks.
226    #[must_use]
227    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
228        Self::new_with_hooks(store, &[])
229    }
230
231    /// Construct a db handle with explicit per-entity runtime hook wiring.
232    #[must_use]
233    pub const fn new_with_hooks(
234        store: &'static LocalKey<StoreRegistry>,
235        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
236    ) -> Self {
237        Self {
238            store,
239            entity_runtime_hooks,
240            _marker: PhantomData,
241        }
242    }
243
244    #[must_use]
245    pub(crate) const fn context<E>(&self) -> Context<'_, E>
246    where
247        E: EntityKind<Canister = C> + EntityValue,
248    {
249        Context::new(self)
250    }
251
252    /// Return a recovery-guarded context for read paths.
253    ///
254    /// This enforces startup recovery and a fast persisted-marker check so reads
255    /// do not proceed while an incomplete commit is pending replay.
256    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
257    where
258        E: EntityKind<Canister = C> + EntityValue,
259    {
260        ensure_recovered(self)?;
261
262        Ok(Context::new(self))
263    }
264
265    /// Ensure startup/in-progress commit recovery has been applied.
266    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
267        ensure_recovered(self)
268    }
269
270    /// Execute one closure against the registered store set.
271    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
272        self.store.with(|reg| f(reg))
273    }
274
275    /// Build one storage diagnostics report for registered stores/entities.
276    pub fn storage_report(
277        &self,
278        name_to_path: &[(&'static str, &'static str)],
279    ) -> Result<StorageReport, InternalError> {
280        diagnostics::storage_report(self, name_to_path)
281    }
282
283    pub(in crate::db) fn prepare_row_commit_op(
284        &self,
285        op: &CommitRowOp,
286    ) -> Result<PreparedRowCommitOp, InternalError> {
287        let hooks = self.runtime_hook_for_entity_path(op.entity_path.as_str())?;
288
289        (hooks.prepare_row_commit)(self, op)
290    }
291
292    pub(in crate::db) fn replay_commit_marker_row_ops(
293        &self,
294        row_ops: &[CommitRowOp],
295    ) -> Result<(), InternalError> {
296        replay_commit_marker_row_ops(self, row_ops)
297    }
298
299    pub(in crate::db) fn rebuild_secondary_indexes_from_rows(&self) -> Result<(), InternalError> {
300        rebuild_secondary_indexes_from_rows(self)
301    }
302
303    // Validate strong relation constraints for delete-selected target keys.
304    pub(crate) fn validate_delete_strong_relations(
305        &self,
306        target_path: &str,
307        deleted_target_keys: &BTreeSet<RawDataKey>,
308    ) -> Result<(), InternalError> {
309        // Skip hook traversal when no target keys were deleted.
310        if deleted_target_keys.is_empty() {
311            return Ok(());
312        }
313
314        // Delegate delete-side relation validation to each entity runtime hook.
315        for hooks in self.entity_runtime_hooks {
316            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
317        }
318
319        Ok(())
320    }
321}
322
323///
324/// EntityRuntimeHooks
325///
326/// Per-entity runtime callbacks used for commit preparation and delete-side
327/// strong relation validation.
328///
329
330pub struct EntityRuntimeHooks<C: CanisterKind> {
331    pub(crate) entity_name: &'static str,
332    pub(crate) entity_path: &'static str,
333    pub(in crate::db) commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
334    pub(in crate::db) prepare_row_commit:
335        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
336    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
337}
338
339impl<C: CanisterKind> EntityRuntimeHooks<C> {
340    #[must_use]
341    /// Build one runtime hook contract for a concrete runtime entity.
342    pub(in crate::db) const fn new(
343        entity_name: &'static str,
344        entity_path: &'static str,
345        commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
346        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
347        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
348    ) -> Self {
349        Self {
350            entity_name,
351            entity_path,
352            commit_schema_fingerprint,
353            prepare_row_commit,
354            validate_delete_strong_relations,
355        }
356    }
357
358    #[must_use]
359    /// Build runtime hooks from one entity type and delete-validation callback.
360    pub const fn for_entity<E>(
361        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
362    ) -> Self
363    where
364        E: EntityKind<Canister = C> + EntityValue,
365    {
366        Self::new(
367            <E as EntityIdentity>::ENTITY_NAME,
368            E::PATH,
369            commit_schema_fingerprint_for_runtime_entity::<E>,
370            prepare_row_commit_for_entity::<E>,
371            validate_delete_strong_relations,
372        )
373    }
374}
375
376fn commit_schema_fingerprint_for_runtime_entity<E>() -> CommitSchemaFingerprint
377where
378    E: EntityKind,
379{
380    commit_schema_fingerprint_for_entity::<E>()
381}
382
383impl<C: CanisterKind> Db<C> {
384    #[must_use]
385    /// Return whether this db has any registered runtime hook callbacks.
386    pub(crate) const fn has_runtime_hooks(&self) -> bool {
387        !self.entity_runtime_hooks.is_empty()
388    }
389
390    // Resolve exactly one runtime hook for a persisted entity name.
391    // Duplicate matches are treated as store invariants.
392    pub(crate) fn runtime_hook_for_entity_name(
393        &self,
394        entity_name: &str,
395    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
396        let mut matched = None;
397        for hooks in self.entity_runtime_hooks {
398            if hooks.entity_name != entity_name {
399                continue;
400            }
401
402            if matched.is_some() {
403                return Err(InternalError::store_invariant(format!(
404                    "duplicate runtime hooks for entity name '{entity_name}'"
405                )));
406            }
407
408            matched = Some(hooks);
409        }
410
411        matched.ok_or_else(|| {
412            InternalError::store_unsupported(format!(
413                "unsupported entity name in data store: '{entity_name}'"
414            ))
415        })
416    }
417
418    // Resolve exactly one runtime hook for a persisted entity path.
419    // Duplicate matches are treated as store invariants.
420    pub(crate) fn runtime_hook_for_entity_path(
421        &self,
422        entity_path: &str,
423    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
424        let mut matched = None;
425        for hooks in self.entity_runtime_hooks {
426            if hooks.entity_path != entity_path {
427                continue;
428            }
429
430            if matched.is_some() {
431                return Err(InternalError::store_invariant(format!(
432                    "duplicate runtime hooks for entity path '{entity_path}'"
433                )));
434            }
435
436            matched = Some(hooks);
437        }
438
439        matched.ok_or_else(|| InternalError::unsupported_entity_path(entity_path))
440    }
441}
442
443impl<C: CanisterKind> Copy for Db<C> {}
444
445impl<C: CanisterKind> Clone for Db<C> {
446    fn clone(&self) -> Self {
447        *self
448    }
449}