Skip to main content

icydb_core/db/
mod.rs

1// 1️⃣ Module declarations
2pub(crate) mod access;
3pub(crate) mod contracts;
4pub(crate) mod cursor;
5pub(crate) mod diagnostics;
6pub(crate) mod identity;
7pub(crate) mod predicate;
8pub(crate) mod query;
9pub(crate) mod registry;
10pub(crate) mod response;
11pub(crate) mod session;
12
13pub(in crate::db) mod codec;
14pub(in crate::db) mod commit;
15pub(in crate::db) mod data;
16pub(in crate::db) mod direction;
17pub(in crate::db) mod executor;
18pub(in crate::db) mod index;
19pub(in crate::db) mod relation;
20
21// 2️⃣ Public re-exports (Tier-2 API surface)
22pub use codec::cursor::{decode_cursor, encode_cursor};
23pub use data::DataStore;
24pub(crate) use data::StorageKey;
25pub use diagnostics::StorageReport;
26pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
27pub use identity::{EntityName, IndexName};
28pub use index::IndexStore;
29pub use predicate::MissingRowPolicy;
30pub use predicate::ValidateError;
31pub use predicate::{CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature};
32pub use query::{
33    builder::field::FieldRef,
34    expr::{FilterExpr, SortExpr},
35    fluent::{
36        delete::FluentDeleteQuery,
37        load::{FluentLoadQuery, PagedLoadQuery},
38    },
39    intent::{CompiledQuery, DeleteSpec, IntentError, LoadSpec, Query, QueryError, QueryMode},
40    plan::{OrderDirection, PlanError},
41};
42pub use registry::StoreRegistry;
43pub use relation::validate_delete_strong_relations_for_source;
44pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
45pub use session::DbSession;
46
47// 3️⃣ Internal imports (implementation wiring)
48use crate::{
49    db::{
50        commit::{
51            CommitRowOp, CommitSchemaFingerprint, PreparedRowCommitOp,
52            commit_schema_fingerprint_for_entity, ensure_recovered, prepare_row_commit_for_entity,
53            rebuild_secondary_indexes_from_rows, replay_commit_marker_row_ops,
54        },
55        data::RawDataKey,
56        executor::Context,
57        relation::StrongRelationDeleteValidateFn,
58    },
59    error::InternalError,
60    traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
61    value::Value,
62};
63use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
64
65///
66/// PagedLoadExecution
67///
68/// Cursor-paged load response with optional continuation cursor bytes.
69///
70
71#[derive(Debug)]
72pub struct PagedLoadExecution<E: EntityKind> {
73    response: Response<E>,
74    continuation_cursor: Option<Vec<u8>>,
75}
76
77impl<E: EntityKind> PagedLoadExecution<E> {
78    /// Create a paged load execution payload.
79    #[must_use]
80    pub const fn new(response: Response<E>, continuation_cursor: Option<Vec<u8>>) -> Self {
81        Self {
82            response,
83            continuation_cursor,
84        }
85    }
86
87    /// Borrow the paged response rows.
88    #[must_use]
89    pub const fn response(&self) -> &Response<E> {
90        &self.response
91    }
92
93    /// Borrow the optional continuation cursor bytes.
94    #[must_use]
95    pub fn continuation_cursor(&self) -> Option<&[u8]> {
96        self.continuation_cursor.as_deref()
97    }
98
99    /// Consume this payload and return `(response, continuation_cursor)`.
100    #[must_use]
101    pub fn into_parts(self) -> (Response<E>, Option<Vec<u8>>) {
102        (self.response, self.continuation_cursor)
103    }
104}
105
106impl<E: EntityKind> From<(Response<E>, Option<Vec<u8>>)> for PagedLoadExecution<E> {
107    fn from(value: (Response<E>, Option<Vec<u8>>)) -> Self {
108        let (response, continuation_cursor) = value;
109
110        Self::new(response, continuation_cursor)
111    }
112}
113
114impl<E: EntityKind> From<PagedLoadExecution<E>> for (Response<E>, Option<Vec<u8>>) {
115    fn from(value: PagedLoadExecution<E>) -> Self {
116        value.into_parts()
117    }
118}
119
120///
121/// PagedLoadExecutionWithTrace
122///
123/// Cursor-paged load response plus optional execution trace details.
124///
125
126#[derive(Debug)]
127pub struct PagedLoadExecutionWithTrace<E: EntityKind> {
128    execution: PagedLoadExecution<E>,
129    execution_trace: Option<ExecutionTrace>,
130}
131
132impl<E: EntityKind> PagedLoadExecutionWithTrace<E> {
133    /// Create a traced paged load execution payload.
134    #[must_use]
135    pub const fn new(
136        response: Response<E>,
137        continuation_cursor: Option<Vec<u8>>,
138        execution_trace: Option<ExecutionTrace>,
139    ) -> Self {
140        Self {
141            execution: PagedLoadExecution::new(response, continuation_cursor),
142            execution_trace,
143        }
144    }
145
146    /// Borrow the paged execution payload.
147    #[must_use]
148    pub const fn execution(&self) -> &PagedLoadExecution<E> {
149        &self.execution
150    }
151
152    /// Borrow the paged response rows.
153    #[must_use]
154    pub const fn response(&self) -> &Response<E> {
155        self.execution.response()
156    }
157
158    /// Borrow the optional continuation cursor bytes.
159    #[must_use]
160    pub fn continuation_cursor(&self) -> Option<&[u8]> {
161        self.execution.continuation_cursor()
162    }
163
164    /// Borrow optional execution trace details.
165    #[must_use]
166    pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
167        self.execution_trace.as_ref()
168    }
169
170    /// Consume this payload and drop trace details.
171    #[must_use]
172    pub fn into_execution(self) -> PagedLoadExecution<E> {
173        self.execution
174    }
175
176    /// Consume this payload and return `(response, continuation_cursor, trace)`.
177    #[must_use]
178    pub fn into_parts(self) -> (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>) {
179        let (response, continuation_cursor) = self.execution.into_parts();
180
181        (response, continuation_cursor, self.execution_trace)
182    }
183}
184
185impl<E: EntityKind> From<(Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>)>
186    for PagedLoadExecutionWithTrace<E>
187{
188    fn from(value: (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>)) -> Self {
189        let (response, continuation_cursor, execution_trace) = value;
190
191        Self::new(response, continuation_cursor, execution_trace)
192    }
193}
194
195impl<E: EntityKind> From<PagedLoadExecutionWithTrace<E>>
196    for (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>)
197{
198    fn from(value: PagedLoadExecutionWithTrace<E>) -> Self {
199        value.into_parts()
200    }
201}
202
203///
204/// GroupedRow
205///
206/// One grouped result row: ordered grouping key values plus ordered aggregate outputs.
207/// Group/aggregate vectors preserve query declaration order.
208///
209#[derive(Clone, Debug, Eq, PartialEq)]
210pub struct GroupedRow {
211    group_key: Vec<Value>,
212    aggregate_values: Vec<Value>,
213}
214
215impl GroupedRow {
216    /// Construct one grouped row payload.
217    #[must_use]
218    pub const fn new(group_key: Vec<Value>, aggregate_values: Vec<Value>) -> Self {
219        Self {
220            group_key,
221            aggregate_values,
222        }
223    }
224
225    /// Borrow grouped key values.
226    #[must_use]
227    pub const fn group_key(&self) -> &[Value] {
228        self.group_key.as_slice()
229    }
230
231    /// Borrow aggregate output values.
232    #[must_use]
233    pub const fn aggregate_values(&self) -> &[Value] {
234        self.aggregate_values.as_slice()
235    }
236}
237
238///
239/// PagedGroupedExecution
240///
241/// Cursor-paged grouped execution payload with optional grouped continuation cursor bytes.
242///
243#[derive(Debug)]
244pub struct PagedGroupedExecution {
245    rows: Vec<GroupedRow>,
246    continuation_cursor: Option<Vec<u8>>,
247}
248
249impl PagedGroupedExecution {
250    /// Construct one grouped paged execution payload.
251    #[must_use]
252    pub const fn new(rows: Vec<GroupedRow>, continuation_cursor: Option<Vec<u8>>) -> Self {
253        Self {
254            rows,
255            continuation_cursor,
256        }
257    }
258
259    /// Borrow grouped rows.
260    #[must_use]
261    pub const fn rows(&self) -> &[GroupedRow] {
262        self.rows.as_slice()
263    }
264
265    /// Borrow optional continuation cursor bytes.
266    #[must_use]
267    pub fn continuation_cursor(&self) -> Option<&[u8]> {
268        self.continuation_cursor.as_deref()
269    }
270
271    /// Consume into grouped rows and continuation cursor bytes.
272    #[must_use]
273    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>) {
274        (self.rows, self.continuation_cursor)
275    }
276}
277
278///
279/// PagedGroupedExecutionWithTrace
280///
281/// Cursor-paged grouped execution payload plus optional route/execution trace.
282///
283#[derive(Debug)]
284pub struct PagedGroupedExecutionWithTrace {
285    execution: PagedGroupedExecution,
286    execution_trace: Option<ExecutionTrace>,
287}
288
289impl PagedGroupedExecutionWithTrace {
290    /// Construct one traced grouped paged execution payload.
291    #[must_use]
292    pub const fn new(
293        rows: Vec<GroupedRow>,
294        continuation_cursor: Option<Vec<u8>>,
295        execution_trace: Option<ExecutionTrace>,
296    ) -> Self {
297        Self {
298            execution: PagedGroupedExecution::new(rows, continuation_cursor),
299            execution_trace,
300        }
301    }
302
303    /// Borrow grouped execution payload.
304    #[must_use]
305    pub const fn execution(&self) -> &PagedGroupedExecution {
306        &self.execution
307    }
308
309    /// Borrow grouped rows.
310    #[must_use]
311    pub const fn rows(&self) -> &[GroupedRow] {
312        self.execution.rows()
313    }
314
315    /// Borrow optional continuation cursor bytes.
316    #[must_use]
317    pub fn continuation_cursor(&self) -> Option<&[u8]> {
318        self.execution.continuation_cursor()
319    }
320
321    /// Borrow optional execution trace details.
322    #[must_use]
323    pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
324        self.execution_trace.as_ref()
325    }
326
327    /// Consume payload and drop trace details.
328    #[must_use]
329    pub fn into_execution(self) -> PagedGroupedExecution {
330        self.execution
331    }
332
333    /// Consume into grouped rows, continuation cursor bytes, and optional trace.
334    #[must_use]
335    pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>, Option<ExecutionTrace>) {
336        let (rows, continuation_cursor) = self.execution.into_parts();
337
338        (rows, continuation_cursor, self.execution_trace)
339    }
340}
341
342///
343/// Db
344/// A handle to the set of stores registered for a specific canister domain.
345///
346
347pub struct Db<C: CanisterKind> {
348    store: &'static LocalKey<StoreRegistry>,
349    entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
350    _marker: PhantomData<C>,
351}
352
353impl<C: CanisterKind> Db<C> {
354    #[must_use]
355    pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
356        Self::new_with_hooks(store, &[])
357    }
358
359    #[must_use]
360    pub const fn new_with_hooks(
361        store: &'static LocalKey<StoreRegistry>,
362        entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
363    ) -> Self {
364        Self {
365            store,
366            entity_runtime_hooks,
367            _marker: PhantomData,
368        }
369    }
370
371    #[must_use]
372    pub(crate) const fn context<E>(&self) -> Context<'_, E>
373    where
374        E: EntityKind<Canister = C> + EntityValue,
375    {
376        Context::new(self)
377    }
378
379    /// Return a recovery-guarded context for read paths.
380    ///
381    /// This enforces startup recovery and a fast persisted-marker check so reads
382    /// do not proceed while an incomplete commit is pending replay.
383    pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
384    where
385        E: EntityKind<Canister = C> + EntityValue,
386    {
387        ensure_recovered(self)?;
388
389        Ok(Context::new(self))
390    }
391
392    /// Ensure startup/in-progress commit recovery has been applied.
393    pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
394        ensure_recovered(self)
395    }
396
397    pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
398        self.store.with(|reg| f(reg))
399    }
400
401    pub fn storage_report(
402        &self,
403        name_to_path: &[(&'static str, &'static str)],
404    ) -> Result<StorageReport, InternalError> {
405        diagnostics::storage_report(self, name_to_path)
406    }
407
408    pub(in crate::db) fn prepare_row_commit_op(
409        &self,
410        op: &CommitRowOp,
411    ) -> Result<PreparedRowCommitOp, InternalError> {
412        let hooks = self.runtime_hook_for_entity_path(op.entity_path.as_str())?;
413
414        (hooks.prepare_row_commit)(self, op)
415    }
416
417    pub(in crate::db) fn replay_commit_marker_row_ops(
418        &self,
419        row_ops: &[CommitRowOp],
420    ) -> Result<(), InternalError> {
421        replay_commit_marker_row_ops(self, row_ops)
422    }
423
424    pub(in crate::db) fn rebuild_secondary_indexes_from_rows(&self) -> Result<(), InternalError> {
425        rebuild_secondary_indexes_from_rows(self)
426    }
427
428    // Validate strong relation constraints for delete-selected target keys.
429    pub(crate) fn validate_delete_strong_relations(
430        &self,
431        target_path: &str,
432        deleted_target_keys: &BTreeSet<RawDataKey>,
433    ) -> Result<(), InternalError> {
434        if deleted_target_keys.is_empty() {
435            return Ok(());
436        }
437
438        for hooks in self.entity_runtime_hooks {
439            (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
440        }
441
442        Ok(())
443    }
444}
445
446///
447/// EntityRuntimeHooks
448///
449/// Per-entity runtime callbacks used for commit preparation and delete-side
450/// strong relation validation.
451///
452
453pub struct EntityRuntimeHooks<C: CanisterKind> {
454    pub(crate) entity_name: &'static str,
455    pub(crate) entity_path: &'static str,
456    pub(in crate::db) commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
457    pub(in crate::db) prepare_row_commit:
458        fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
459    pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
460}
461
462impl<C: CanisterKind> EntityRuntimeHooks<C> {
463    #[must_use]
464    pub(in crate::db) const fn new(
465        entity_name: &'static str,
466        entity_path: &'static str,
467        commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
468        prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
469        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
470    ) -> Self {
471        Self {
472            entity_name,
473            entity_path,
474            commit_schema_fingerprint,
475            prepare_row_commit,
476            validate_delete_strong_relations,
477        }
478    }
479
480    #[must_use]
481    pub const fn for_entity<E>(
482        validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
483    ) -> Self
484    where
485        E: EntityKind<Canister = C> + EntityValue,
486    {
487        Self::new(
488            <E as EntityIdentity>::ENTITY_NAME,
489            E::PATH,
490            commit_schema_fingerprint_for_runtime_entity::<E>,
491            prepare_row_commit_for_entity::<E>,
492            validate_delete_strong_relations,
493        )
494    }
495}
496
497fn commit_schema_fingerprint_for_runtime_entity<E>() -> CommitSchemaFingerprint
498where
499    E: EntityKind,
500{
501    commit_schema_fingerprint_for_entity::<E>()
502}
503
504impl<C: CanisterKind> Db<C> {
505    #[must_use]
506    pub(crate) const fn has_runtime_hooks(&self) -> bool {
507        !self.entity_runtime_hooks.is_empty()
508    }
509
510    // Resolve exactly one runtime hook for a persisted entity name.
511    // Duplicate matches are treated as store invariants.
512    pub(crate) fn runtime_hook_for_entity_name(
513        &self,
514        entity_name: &str,
515    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
516        let mut matched = None;
517        for hooks in self.entity_runtime_hooks {
518            if hooks.entity_name != entity_name {
519                continue;
520            }
521
522            if matched.is_some() {
523                return Err(InternalError::store_invariant(format!(
524                    "duplicate runtime hooks for entity name '{entity_name}'"
525                )));
526            }
527
528            matched = Some(hooks);
529        }
530
531        matched.ok_or_else(|| {
532            InternalError::store_unsupported(format!(
533                "unsupported entity name in data store: '{entity_name}'"
534            ))
535        })
536    }
537
538    // Resolve exactly one runtime hook for a persisted entity path.
539    // Duplicate matches are treated as store invariants.
540    pub(crate) fn runtime_hook_for_entity_path(
541        &self,
542        entity_path: &str,
543    ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
544        let mut matched = None;
545        for hooks in self.entity_runtime_hooks {
546            if hooks.entity_path != entity_path {
547                continue;
548            }
549
550            if matched.is_some() {
551                return Err(InternalError::store_invariant(format!(
552                    "duplicate runtime hooks for entity path '{entity_path}'"
553                )));
554            }
555
556            matched = Some(hooks);
557        }
558
559        matched.ok_or_else(|| InternalError::unsupported_entity_path(entity_path))
560    }
561}
562
563impl<C: CanisterKind> Copy for Db<C> {}
564
565impl<C: CanisterKind> Clone for Db<C> {
566    fn clone(&self) -> Self {
567        *self
568    }
569}