Skip to main content

icydb_core/db/
mod.rs

1// 1️⃣ Module declarations
2pub(crate) mod access;
3pub(crate) mod contracts;
4pub(crate) mod cursor;
5pub(crate) mod diagnostics;
6pub(crate) mod identity;
7pub(crate) mod predicate;
8pub(crate) mod query;
9pub(crate) mod registry;
10pub(crate) mod response;
11pub(crate) mod session;
12
13pub(in crate::db) mod codec;
14pub(in crate::db) mod commit;
15pub(in crate::db) mod data;
16pub(in crate::db) mod direction;
17pub(in crate::db) mod executor;
18pub(in crate::db) mod index;
19pub(in crate::db) mod relation;
20
21// 2️⃣ Public re-exports (Tier-2 API surface)
22pub use codec::cursor::{decode_cursor, encode_cursor};
23pub use data::DataStore;
24pub(crate) use data::StorageKey;
25pub use diagnostics::StorageReport;
26pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
27pub use identity::{EntityName, IndexName};
28pub use index::IndexStore;
29pub use predicate::MissingRowPolicy;
30pub use predicate::ValidateError;
31pub use predicate::{CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature};
32pub use query::{
33    builder::field::FieldRef,
34    expr::{FilterExpr, SortExpr},
35    fluent::{
36        delete::FluentDeleteQuery,
37        load::{FluentLoadQuery, PagedLoadQuery},
38    },
39    intent::{CompiledQuery, DeleteSpec, IntentError, LoadSpec, Query, QueryError, QueryMode},
40    plan::{OrderDirection, PlanError},
41};
42pub use registry::StoreRegistry;
43pub use relation::validate_delete_strong_relations_for_source;
44pub use response::{
45    PagedLoadExecution, PagedLoadExecutionWithTrace, Response, ResponseError, Row,
46    WriteBatchResponse, WriteResponse,
47};
48pub use session::DbSession;
49
50// 3️⃣ Internal imports (implementation wiring)
51use crate::{
52    db::{
53        commit::{
54            CommitRowOp, CommitSchemaFingerprint, PreparedRowCommitOp,
55            commit_schema_fingerprint_for_entity, ensure_recovered, prepare_row_commit_for_entity,
56            rebuild_secondary_indexes_from_rows, replay_commit_marker_row_ops,
57        },
58        data::RawDataKey,
59        executor::Context,
60        relation::StrongRelationDeleteValidateFn,
61    },
62    error::InternalError,
63    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
64    value::Value,
65};
66use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
67
68///
69/// GroupedRow
70///
71/// One grouped result row: ordered grouping key values plus ordered aggregate outputs.
72/// Group/aggregate vectors preserve query declaration order.
73///
74#[derive(Clone, Debug, Eq, PartialEq)]
75pub struct GroupedRow {
76    group_key: Vec<Value>,
77    aggregate_values: Vec<Value>,
78}
79
80impl GroupedRow {
81    /// Construct one grouped row payload.
82    #[must_use]
83    pub const fn new(group_key: Vec<Value>, aggregate_values: Vec<Value>) -> Self {
84        Self {
85            group_key,
86            aggregate_values,
87        }
88    }
89
90    /// Borrow grouped key values.
91    #[must_use]
92    pub const fn group_key(&self) -> &[Value] {
93        self.group_key.as_slice()
94    }
95
96    /// Borrow aggregate output values.
97    #[must_use]
98    pub const fn aggregate_values(&self) -> &[Value] {
99        self.aggregate_values.as_slice()
100    }
101}
102
103///
104/// PagedGroupedExecution
105///
106/// Cursor-paged grouped execution payload with optional grouped continuation cursor bytes.
107///
108#[derive(Debug)]
109pub struct PagedGroupedExecution {
110    rows: Vec<GroupedRow>,
111    continuation_cursor: Option<Vec<u8>>,
112}
113
114impl PagedGroupedExecution {
115    /// Construct one grouped paged execution payload.
116    #[must_use]
117    pub const fn new(rows: Vec<GroupedRow>, continuation_cursor: Option<Vec<u8>>) -> Self {
118        Self {
119            rows,
120            continuation_cursor,
121        }
122    }
123
124    /// Borrow grouped rows.
125    #[must_use]
126    pub const fn rows(&self) -> &[GroupedRow] {
127        self.rows.as_slice()
128    }
129
130    /// Borrow optional continuation cursor bytes.
131    #[must_use]
132    pub fn continuation_cursor(&self) -> Option<&[u8]> {
133        self.continuation_cursor.as_deref()
134    }
135
136    /// Consume into grouped rows and continuation cursor bytes.
137    #[must_use]
138    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>) {
139        (self.rows, self.continuation_cursor)
140    }
141}
142
143///
144/// PagedGroupedExecutionWithTrace
145///
146/// Cursor-paged grouped execution payload plus optional route/execution trace.
147///
148#[derive(Debug)]
149pub struct PagedGroupedExecutionWithTrace {
150    execution: PagedGroupedExecution,
151    execution_trace: Option<ExecutionTrace>,
152}
153
154impl PagedGroupedExecutionWithTrace {
155    /// Construct one traced grouped paged execution payload.
156    #[must_use]
157    pub const fn new(
158        rows: Vec<GroupedRow>,
159        continuation_cursor: Option<Vec<u8>>,
160        execution_trace: Option<ExecutionTrace>,
161    ) -> Self {
162        Self {
163            execution: PagedGroupedExecution::new(rows, continuation_cursor),
164            execution_trace,
165        }
166    }
167
168    /// Borrow grouped execution payload.
169    #[must_use]
170    pub const fn execution(&self) -> &PagedGroupedExecution {
171        &self.execution
172    }
173
174    /// Borrow grouped rows.
175    #[must_use]
176    pub const fn rows(&self) -> &[GroupedRow] {
177        self.execution.rows()
178    }
179
180    /// Borrow optional continuation cursor bytes.
181    #[must_use]
182    pub fn continuation_cursor(&self) -> Option<&[u8]> {
183        self.execution.continuation_cursor()
184    }
185
186    /// Borrow optional execution trace details.
187    #[must_use]
188    pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
189        self.execution_trace.as_ref()
190    }
191
192    /// Consume payload and drop trace details.
193    #[must_use]
194    pub fn into_execution(self) -> PagedGroupedExecution {
195        self.execution
196    }
197
198    /// Consume into grouped rows, continuation cursor bytes, and optional trace.
199    #[must_use]
200    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>, Option<ExecutionTrace>) {
201        let (rows, continuation_cursor) = self.execution.into_parts();
202
203        (rows, continuation_cursor, self.execution_trace)
204    }
205}
206
207///
208/// Db
209/// A handle to the set of stores registered for a specific canister domain.
210///
211
212pub struct Db<C: CanisterKind> {
213    store: &'static LocalKey<StoreRegistry>,
214    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
215    _marker: PhantomData<C>,
216}
217
218impl<C: CanisterKind> Db<C> {
219    #[must_use]
220    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
221        Self::new_with_hooks(store, &[])
222    }
223
224    #[must_use]
225    pub const fn new_with_hooks(
226        store: &'static LocalKey<StoreRegistry>,
227        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
228    ) -> Self {
229        Self {
230            store,
231            entity_runtime_hooks,
232            _marker: PhantomData,
233        }
234    }
235
236    #[must_use]
237    pub(crate) const fn context<E>(&self) -> Context<'_, E>
238    where
239        E: EntityKind<Canister = C> + EntityValue,
240    {
241        Context::new(self)
242    }
243
244    /// Return a recovery-guarded context for read paths.
245    ///
246    /// This enforces startup recovery and a fast persisted-marker check so reads
247    /// do not proceed while an incomplete commit is pending replay.
248    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
249    where
250        E: EntityKind<Canister = C> + EntityValue,
251    {
252        ensure_recovered(self)?;
253
254        Ok(Context::new(self))
255    }
256
257    /// Ensure startup/in-progress commit recovery has been applied.
258    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
259        ensure_recovered(self)
260    }
261
262    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
263        self.store.with(|reg| f(reg))
264    }
265
266    pub fn storage_report(
267        &self,
268        name_to_path: &[(&'static str, &'static str)],
269    ) -> Result<StorageReport, InternalError> {
270        diagnostics::storage_report(self, name_to_path)
271    }
272
273    pub(in crate::db) fn prepare_row_commit_op(
274        &self,
275        op: &CommitRowOp,
276    ) -> Result<PreparedRowCommitOp, InternalError> {
277        let hooks = self.runtime_hook_for_entity_path(op.entity_path.as_str())?;
278
279        (hooks.prepare_row_commit)(self, op)
280    }
281
282    pub(in crate::db) fn replay_commit_marker_row_ops(
283        &self,
284        row_ops: &[CommitRowOp],
285    ) -> Result<(), InternalError> {
286        replay_commit_marker_row_ops(self, row_ops)
287    }
288
289    pub(in crate::db) fn rebuild_secondary_indexes_from_rows(&self) -> Result<(), InternalError> {
290        rebuild_secondary_indexes_from_rows(self)
291    }
292
293    // Validate strong relation constraints for delete-selected target keys.
294    pub(crate) fn validate_delete_strong_relations(
295        &self,
296        target_path: &str,
297        deleted_target_keys: &BTreeSet<RawDataKey>,
298    ) -> Result<(), InternalError> {
299        if deleted_target_keys.is_empty() {
300            return Ok(());
301        }
302
303        for hooks in self.entity_runtime_hooks {
304            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
305        }
306
307        Ok(())
308    }
309}
310
311///
312/// EntityRuntimeHooks
313///
314/// Per-entity runtime callbacks used for commit preparation and delete-side
315/// strong relation validation.
316///
317
318pub struct EntityRuntimeHooks<C: CanisterKind> {
319    pub(crate) entity_name: &'static str,
320    pub(crate) entity_path: &'static str,
321    pub(in crate::db) commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
322    pub(in crate::db) prepare_row_commit:
323        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
324    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
325}
326
327impl<C: CanisterKind> EntityRuntimeHooks<C> {
328    #[must_use]
329    pub(in crate::db) const fn new(
330        entity_name: &'static str,
331        entity_path: &'static str,
332        commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
333        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
334        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
335    ) -> Self {
336        Self {
337            entity_name,
338            entity_path,
339            commit_schema_fingerprint,
340            prepare_row_commit,
341            validate_delete_strong_relations,
342        }
343    }
344
345    #[must_use]
346    pub const fn for_entity<E>(
347        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
348    ) -> Self
349    where
350        E: EntityKind<Canister = C> + EntityValue,
351    {
352        Self::new(
353            <E as EntityIdentity>::ENTITY_NAME,
354            E::PATH,
355            commit_schema_fingerprint_for_runtime_entity::<E>,
356            prepare_row_commit_for_entity::<E>,
357            validate_delete_strong_relations,
358        )
359    }
360}
361
362fn commit_schema_fingerprint_for_runtime_entity<E>() -> CommitSchemaFingerprint
363where
364    E: EntityKind,
365{
366    commit_schema_fingerprint_for_entity::<E>()
367}
368
369impl<C: CanisterKind> Db<C> {
370    #[must_use]
371    pub(crate) const fn has_runtime_hooks(&self) -> bool {
372        !self.entity_runtime_hooks.is_empty()
373    }
374
375    // Resolve exactly one runtime hook for a persisted entity name.
376    // Duplicate matches are treated as store invariants.
377    pub(crate) fn runtime_hook_for_entity_name(
378        &self,
379        entity_name: &str,
380    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
381        let mut matched = None;
382        for hooks in self.entity_runtime_hooks {
383            if hooks.entity_name != entity_name {
384                continue;
385            }
386
387            if matched.is_some() {
388                return Err(InternalError::store_invariant(format!(
389                    "duplicate runtime hooks for entity name '{entity_name}'"
390                )));
391            }
392
393            matched = Some(hooks);
394        }
395
396        matched.ok_or_else(|| {
397            InternalError::store_unsupported(format!(
398                "unsupported entity name in data store: '{entity_name}'"
399            ))
400        })
401    }
402
403    // Resolve exactly one runtime hook for a persisted entity path.
404    // Duplicate matches are treated as store invariants.
405    pub(crate) fn runtime_hook_for_entity_path(
406        &self,
407        entity_path: &str,
408    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
409        let mut matched = None;
410        for hooks in self.entity_runtime_hooks {
411            if hooks.entity_path != entity_path {
412                continue;
413            }
414
415            if matched.is_some() {
416                return Err(InternalError::store_invariant(format!(
417                    "duplicate runtime hooks for entity path '{entity_path}'"
418                )));
419            }
420
421            matched = Some(hooks);
422        }
423
424        matched.ok_or_else(|| InternalError::unsupported_entity_path(entity_path))
425    }
426}
427
428impl<C: CanisterKind> Copy for Db<C> {}
429
430impl<C: CanisterKind> Clone for Db<C> {
431    fn clone(&self) -> Self {
432        *self
433    }
434}