Skip to main content

icydb_core/db/
mod.rs

1// 1️⃣ Module declarations
2pub(crate) mod access;
3pub(crate) mod contracts;
4pub(crate) mod cursor;
5pub(crate) mod diagnostics;
6pub(crate) mod identity;
7pub(crate) mod predicate;
8pub(crate) mod query;
9pub(crate) mod registry;
10pub(crate) mod response;
11pub(crate) mod session;
12
13pub(in crate::db) mod codec;
14pub(in crate::db) mod commit;
15pub(in crate::db) mod data;
16pub(in crate::db) mod direction;
17pub(in crate::db) mod executor;
18pub(in crate::db) mod index;
19pub(in crate::db) mod relation;
20
21// 2️⃣ Public re-exports (Tier-2 API surface)
22pub use codec::cursor::{decode_cursor, encode_cursor};
23pub use data::DataStore;
24pub(crate) use data::StorageKey;
25pub use diagnostics::StorageReport;
26pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
27pub use identity::{EntityName, IndexName};
28pub use index::IndexStore;
29pub use predicate::MissingRowPolicy;
30pub use predicate::ValidateError;
31pub use predicate::{CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature};
32pub use query::{
33    builder::field::FieldRef,
34    expr::{FilterExpr, SortExpr},
35    fluent::{
36        delete::FluentDeleteQuery,
37        load::{FluentLoadQuery, PagedLoadQuery},
38    },
39    intent::{CompiledQuery, DeleteSpec, IntentError, LoadSpec, Query, QueryError, QueryMode},
40    plan::{OrderDirection, PlanError},
41};
42pub use registry::StoreRegistry;
43pub use relation::validate_delete_strong_relations_for_source;
44pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
45pub use session::DbSession;
46
47// 3️⃣ Internal imports (implementation wiring)
48use crate::{
49    db::{
50        commit::{
51            CommitRowOp, PreparedRowCommitOp, ensure_recovered, prepare_row_commit_for_entity,
52            rebuild_secondary_indexes_from_rows, replay_commit_marker_row_ops,
53        },
54        data::RawDataKey,
55        executor::Context,
56        relation::StrongRelationDeleteValidateFn,
57    },
58    error::InternalError,
59    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
60    value::Value,
61};
62use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
63
64///
65/// PagedLoadExecution
66///
67/// Cursor-paged load response with optional continuation cursor bytes.
68///
69
70#[derive(Debug)]
71pub struct PagedLoadExecution<E: EntityKind> {
72    response: Response<E>,
73    continuation_cursor: Option<Vec<u8>>,
74}
75
76impl<E: EntityKind> PagedLoadExecution<E> {
77    /// Create a paged load execution payload.
78    #[must_use]
79    pub const fn new(response: Response<E>, continuation_cursor: Option<Vec<u8>>) -> Self {
80        Self {
81            response,
82            continuation_cursor,
83        }
84    }
85
86    /// Borrow the paged response rows.
87    #[must_use]
88    pub const fn response(&self) -> &Response<E> {
89        &self.response
90    }
91
92    /// Borrow the optional continuation cursor bytes.
93    #[must_use]
94    pub fn continuation_cursor(&self) -> Option<&[u8]> {
95        self.continuation_cursor.as_deref()
96    }
97
98    /// Consume this payload and return `(response, continuation_cursor)`.
99    #[must_use]
100    pub fn into_parts(self) -> (Response<E>, Option<Vec<u8>>) {
101        (self.response, self.continuation_cursor)
102    }
103}
104
105impl<E: EntityKind> From<(Response<E>, Option<Vec<u8>>)> for PagedLoadExecution<E> {
106    fn from(value: (Response<E>, Option<Vec<u8>>)) -> Self {
107        let (response, continuation_cursor) = value;
108
109        Self::new(response, continuation_cursor)
110    }
111}
112
113impl<E: EntityKind> From<PagedLoadExecution<E>> for (Response<E>, Option<Vec<u8>>) {
114    fn from(value: PagedLoadExecution<E>) -> Self {
115        value.into_parts()
116    }
117}
118
119///
120/// PagedLoadExecutionWithTrace
121///
122/// Cursor-paged load response plus optional execution trace details.
123///
124
125#[derive(Debug)]
126pub struct PagedLoadExecutionWithTrace<E: EntityKind> {
127    execution: PagedLoadExecution<E>,
128    execution_trace: Option<ExecutionTrace>,
129}
130
131impl<E: EntityKind> PagedLoadExecutionWithTrace<E> {
132    /// Create a traced paged load execution payload.
133    #[must_use]
134    pub const fn new(
135        response: Response<E>,
136        continuation_cursor: Option<Vec<u8>>,
137        execution_trace: Option<ExecutionTrace>,
138    ) -> Self {
139        Self {
140            execution: PagedLoadExecution::new(response, continuation_cursor),
141            execution_trace,
142        }
143    }
144
145    /// Borrow the paged execution payload.
146    #[must_use]
147    pub const fn execution(&self) -> &PagedLoadExecution<E> {
148        &self.execution
149    }
150
151    /// Borrow the paged response rows.
152    #[must_use]
153    pub const fn response(&self) -> &Response<E> {
154        self.execution.response()
155    }
156
157    /// Borrow the optional continuation cursor bytes.
158    #[must_use]
159    pub fn continuation_cursor(&self) -> Option<&[u8]> {
160        self.execution.continuation_cursor()
161    }
162
163    /// Borrow optional execution trace details.
164    #[must_use]
165    pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
166        self.execution_trace.as_ref()
167    }
168
169    /// Consume this payload and drop trace details.
170    #[must_use]
171    pub fn into_execution(self) -> PagedLoadExecution<E> {
172        self.execution
173    }
174
175    /// Consume this payload and return `(response, continuation_cursor, trace)`.
176    #[must_use]
177    pub fn into_parts(self) -> (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>) {
178        let (response, continuation_cursor) = self.execution.into_parts();
179
180        (response, continuation_cursor, self.execution_trace)
181    }
182}
183
184impl<E: EntityKind> From<(Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>)>
185    for PagedLoadExecutionWithTrace<E>
186{
187    fn from(value: (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>)) -> Self {
188        let (response, continuation_cursor, execution_trace) = value;
189
190        Self::new(response, continuation_cursor, execution_trace)
191    }
192}
193
194impl<E: EntityKind> From<PagedLoadExecutionWithTrace<E>>
195    for (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>)
196{
197    fn from(value: PagedLoadExecutionWithTrace<E>) -> Self {
198        value.into_parts()
199    }
200}
201
202///
203/// GroupedRow
204///
205/// One grouped result row: ordered grouping key values plus ordered aggregate outputs.
206/// Group/aggregate vectors preserve query declaration order.
207///
208#[derive(Clone, Debug, Eq, PartialEq)]
209pub struct GroupedRow {
210    group_key: Vec<Value>,
211    aggregate_values: Vec<Value>,
212}
213
214impl GroupedRow {
215    /// Construct one grouped row payload.
216    #[must_use]
217    pub const fn new(group_key: Vec<Value>, aggregate_values: Vec<Value>) -> Self {
218        Self {
219            group_key,
220            aggregate_values,
221        }
222    }
223
224    /// Borrow grouped key values.
225    #[must_use]
226    pub const fn group_key(&self) -> &[Value] {
227        self.group_key.as_slice()
228    }
229
230    /// Borrow aggregate output values.
231    #[must_use]
232    pub const fn aggregate_values(&self) -> &[Value] {
233        self.aggregate_values.as_slice()
234    }
235}
236
237///
238/// PagedGroupedExecution
239///
240/// Cursor-paged grouped execution payload with optional grouped continuation cursor bytes.
241///
242#[derive(Debug)]
243pub struct PagedGroupedExecution {
244    rows: Vec<GroupedRow>,
245    continuation_cursor: Option<Vec<u8>>,
246}
247
248impl PagedGroupedExecution {
249    /// Construct one grouped paged execution payload.
250    #[must_use]
251    pub const fn new(rows: Vec<GroupedRow>, continuation_cursor: Option<Vec<u8>>) -> Self {
252        Self {
253            rows,
254            continuation_cursor,
255        }
256    }
257
258    /// Borrow grouped rows.
259    #[must_use]
260    pub const fn rows(&self) -> &[GroupedRow] {
261        self.rows.as_slice()
262    }
263
264    /// Borrow optional continuation cursor bytes.
265    #[must_use]
266    pub fn continuation_cursor(&self) -> Option<&[u8]> {
267        self.continuation_cursor.as_deref()
268    }
269
270    /// Consume into grouped rows and continuation cursor bytes.
271    #[must_use]
272    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>) {
273        (self.rows, self.continuation_cursor)
274    }
275}
276
277///
278/// PagedGroupedExecutionWithTrace
279///
280/// Cursor-paged grouped execution payload plus optional route/execution trace.
281///
282#[derive(Debug)]
283pub struct PagedGroupedExecutionWithTrace {
284    execution: PagedGroupedExecution,
285    execution_trace: Option<ExecutionTrace>,
286}
287
288impl PagedGroupedExecutionWithTrace {
289    /// Construct one traced grouped paged execution payload.
290    #[must_use]
291    pub const fn new(
292        rows: Vec<GroupedRow>,
293        continuation_cursor: Option<Vec<u8>>,
294        execution_trace: Option<ExecutionTrace>,
295    ) -> Self {
296        Self {
297            execution: PagedGroupedExecution::new(rows, continuation_cursor),
298            execution_trace,
299        }
300    }
301
302    /// Borrow grouped execution payload.
303    #[must_use]
304    pub const fn execution(&self) -> &PagedGroupedExecution {
305        &self.execution
306    }
307
308    /// Borrow grouped rows.
309    #[must_use]
310    pub const fn rows(&self) -> &[GroupedRow] {
311        self.execution.rows()
312    }
313
314    /// Borrow optional continuation cursor bytes.
315    #[must_use]
316    pub fn continuation_cursor(&self) -> Option<&[u8]> {
317        self.execution.continuation_cursor()
318    }
319
320    /// Borrow optional execution trace details.
321    #[must_use]
322    pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
323        self.execution_trace.as_ref()
324    }
325
326    /// Consume payload and drop trace details.
327    #[must_use]
328    pub fn into_execution(self) -> PagedGroupedExecution {
329        self.execution
330    }
331
332    /// Consume into grouped rows, continuation cursor bytes, and optional trace.
333    #[must_use]
334    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>, Option<ExecutionTrace>) {
335        let (rows, continuation_cursor) = self.execution.into_parts();
336
337        (rows, continuation_cursor, self.execution_trace)
338    }
339}
340
341///
342/// Db
343/// A handle to the set of stores registered for a specific canister domain.
344///
345
346pub struct Db<C: CanisterKind> {
347    store: &'static LocalKey<StoreRegistry>,
348    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
349    _marker: PhantomData<C>,
350}
351
352impl<C: CanisterKind> Db<C> {
353    #[must_use]
354    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
355        Self::new_with_hooks(store, &[])
356    }
357
358    #[must_use]
359    pub const fn new_with_hooks(
360        store: &'static LocalKey<StoreRegistry>,
361        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
362    ) -> Self {
363        Self {
364            store,
365            entity_runtime_hooks,
366            _marker: PhantomData,
367        }
368    }
369
370    #[must_use]
371    pub(crate) const fn context<E>(&self) -> Context<'_, E>
372    where
373        E: EntityKind<Canister = C> + EntityValue,
374    {
375        Context::new(self)
376    }
377
378    /// Return a recovery-guarded context for read paths.
379    ///
380    /// This enforces startup recovery and a fast persisted-marker check so reads
381    /// do not proceed while an incomplete commit is pending replay.
382    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
383    where
384        E: EntityKind<Canister = C> + EntityValue,
385    {
386        ensure_recovered(self)?;
387
388        Ok(Context::new(self))
389    }
390
391    /// Ensure startup/in-progress commit recovery has been applied.
392    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
393        ensure_recovered(self)
394    }
395
396    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
397        self.store.with(|reg| f(reg))
398    }
399
400    pub fn storage_report(
401        &self,
402        name_to_path: &[(&'static str, &'static str)],
403    ) -> Result<StorageReport, InternalError> {
404        diagnostics::storage_report(self, name_to_path)
405    }
406
407    pub(in crate::db) fn prepare_row_commit_op(
408        &self,
409        op: &CommitRowOp,
410    ) -> Result<PreparedRowCommitOp, InternalError> {
411        let hooks = self.runtime_hook_for_entity_path(op.entity_path.as_str())?;
412
413        (hooks.prepare_row_commit)(self, op)
414    }
415
416    pub(in crate::db) fn replay_commit_marker_row_ops(
417        &self,
418        row_ops: &[CommitRowOp],
419    ) -> Result<(), InternalError> {
420        replay_commit_marker_row_ops(self, row_ops)
421    }
422
423    pub(in crate::db) fn rebuild_secondary_indexes_from_rows(&self) -> Result<(), InternalError> {
424        rebuild_secondary_indexes_from_rows(self)
425    }
426
427    // Validate strong relation constraints for delete-selected target keys.
428    pub(crate) fn validate_delete_strong_relations(
429        &self,
430        target_path: &str,
431        deleted_target_keys: &BTreeSet<RawDataKey>,
432    ) -> Result<(), InternalError> {
433        if deleted_target_keys.is_empty() {
434            return Ok(());
435        }
436
437        for hooks in self.entity_runtime_hooks {
438            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
439        }
440
441        Ok(())
442    }
443}
444
445///
446/// EntityRuntimeHooks
447///
448/// Per-entity runtime callbacks used for commit preparation and delete-side
449/// strong relation validation.
450///
451
452pub struct EntityRuntimeHooks<C: CanisterKind> {
453    pub(crate) entity_name: &'static str,
454    pub(crate) entity_path: &'static str,
455    pub(in crate::db) prepare_row_commit:
456        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
457    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
458}
459
460impl<C: CanisterKind> EntityRuntimeHooks<C> {
461    #[must_use]
462    pub(in crate::db) const fn new(
463        entity_name: &'static str,
464        entity_path: &'static str,
465        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
466        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
467    ) -> Self {
468        Self {
469            entity_name,
470            entity_path,
471            prepare_row_commit,
472            validate_delete_strong_relations,
473        }
474    }
475
476    #[must_use]
477    pub const fn for_entity<E>(
478        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
479    ) -> Self
480    where
481        E: EntityKind<Canister = C> + EntityValue,
482    {
483        Self::new(
484            <E as EntityIdentity>::ENTITY_NAME,
485            E::PATH,
486            prepare_row_commit_for_entity::<E>,
487            validate_delete_strong_relations,
488        )
489    }
490}
491
492impl<C: CanisterKind> Db<C> {
493    #[must_use]
494    pub(crate) const fn has_runtime_hooks(&self) -> bool {
495        !self.entity_runtime_hooks.is_empty()
496    }
497
498    // Resolve exactly one runtime hook for a persisted entity name.
499    // Duplicate matches are treated as store invariants.
500    pub(crate) fn runtime_hook_for_entity_name(
501        &self,
502        entity_name: &str,
503    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
504        let mut matched = None;
505        for hooks in self.entity_runtime_hooks {
506            if hooks.entity_name != entity_name {
507                continue;
508            }
509
510            if matched.is_some() {
511                return Err(InternalError::store_invariant(format!(
512                    "duplicate runtime hooks for entity name '{entity_name}'"
513                )));
514            }
515
516            matched = Some(hooks);
517        }
518
519        matched.ok_or_else(|| {
520            InternalError::store_unsupported(format!(
521                "unsupported entity name in data store: '{entity_name}'"
522            ))
523        })
524    }
525
526    // Resolve exactly one runtime hook for a persisted entity path.
527    // Duplicate matches are treated as store invariants.
528    pub(crate) fn runtime_hook_for_entity_path(
529        &self,
530        entity_path: &str,
531    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
532        let mut matched = None;
533        for hooks in self.entity_runtime_hooks {
534            if hooks.entity_path != entity_path {
535                continue;
536            }
537
538            if matched.is_some() {
539                return Err(InternalError::store_invariant(format!(
540                    "duplicate runtime hooks for entity path '{entity_path}'"
541                )));
542            }
543
544            matched = Some(hooks);
545        }
546
547        matched.ok_or_else(|| InternalError::unsupported_entity_path(entity_path))
548    }
549}
550
551impl<C: CanisterKind> Copy for Db<C> {}
552
553impl<C: CanisterKind> Clone for Db<C> {
554    fn clone(&self) -> Self {
555        *self
556    }
557}