Skip to main content

icydb_core/db/
mod.rs

1mod commit;
2pub mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8pub mod traits;
9pub mod types;
10mod write;
11
12pub(crate) use commit::*;
13pub(crate) use write::WriteUnit;
14
15use crate::{
16    db::{
17        executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor, UpsertExecutor},
18        index::IndexStoreRegistry,
19        query::{
20            Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
21            diagnostics::{
22                QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
23                start_event, trace_access_from_plan,
24            },
25        },
26        response::Response,
27        store::DataStoreRegistry,
28        traits::FromKey,
29    },
30    error::InternalError,
31    obs::sink::{self, MetricsSink},
32    traits::{CanisterKind, EntityKind},
33};
34use std::{marker::PhantomData, thread::LocalKey};
35
36///
37/// Db
38///
39/// A handle to the set of stores registered for a specific canister domain.
40///
41/// - `C` is the [`CanisterKind`] (schema canister marker).
42///
43/// The `Db` acts as the entry point for querying, saving, and deleting entities
44/// within a single canister's store registry.
45///
46/// Schema/model identity must be validated before use. Derive-generated models
47/// uphold identity invariants; manual models should validate once (e.g. via
48/// `SchemaInfo::from_entity_model`) prior to executor use.
49///
50
51pub struct Db<C: CanisterKind> {
52    data: &'static LocalKey<DataStoreRegistry>,
53    index: &'static LocalKey<IndexStoreRegistry>,
54    _marker: PhantomData<C>,
55}
56
57impl<C: CanisterKind> Db<C> {
58    #[must_use]
59    pub const fn new(
60        data: &'static LocalKey<DataStoreRegistry>,
61        index: &'static LocalKey<IndexStoreRegistry>,
62    ) -> Self {
63        Self {
64            data,
65            index,
66            _marker: PhantomData,
67        }
68    }
69
70    #[must_use]
71    pub const fn context<E>(&self) -> Context<'_, E>
72    where
73        E: EntityKind<Canister = C>,
74    {
75        Context::new(self)
76    }
77
78    /// Run a closure with read access to the data store registry.
79    pub fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
80        self.data.with(|reg| f(reg))
81    }
82
83    /// Run a closure with read access to the index store registry.
84    pub fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
85        self.index.with(|reg| f(reg))
86    }
87}
88
89// Manual Copy + Clone implementations.
90// Safe because Db only contains &'static LocalKey<_> handles,
91// duplicating them does not duplicate the contents.
92impl<C: CanisterKind> Copy for Db<C> {}
93
94impl<C: CanisterKind> Clone for Db<C> {
95    fn clone(&self) -> Self {
96        *self
97    }
98}
99
100///
101/// DbSession
102/// Database handle plus a debug flag that controls executor verbosity.
103///
104
105pub struct DbSession<C: CanisterKind> {
106    db: Db<C>,
107    debug: bool,
108    metrics: Option<&'static dyn MetricsSink>,
109}
110
111impl<C: CanisterKind> DbSession<C> {
112    #[must_use]
113    /// Create a new session scoped to the provided database.
114    pub const fn new(db: Db<C>) -> Self {
115        Self {
116            db,
117            debug: false,
118            metrics: None,
119        }
120    }
121
122    #[must_use]
123    /// Enable debug logging for subsequent queries in this session.
124    pub const fn debug(mut self) -> Self {
125        self.debug = true;
126        self
127    }
128
129    #[must_use]
130    /// Override the metrics sink for operations executed through this session.
131    pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
132        self.metrics = Some(sink);
133        self
134    }
135
136    fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
137        if let Some(sink) = self.metrics {
138            sink::with_metrics_sink(sink, f)
139        } else {
140            f()
141        }
142    }
143
144    //
145    // Query entry points
146    //
147
148    ///
149    /// Load Query
150    /// Create a fluent, session-bound load query with default consistency.
151    ///
152    #[must_use]
153    pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
154    where
155        E: EntityKind<Canister = C>,
156    {
157        SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
158    }
159
160    ///
161    /// Load Query With Consistency
162    /// Create a fluent, session-bound load query with explicit consistency.
163    ///
164    #[must_use]
165    pub const fn load_with_consistency<E>(
166        &self,
167        consistency: ReadConsistency,
168    ) -> SessionLoadQuery<'_, C, E>
169    where
170        E: EntityKind<Canister = C>,
171    {
172        SessionLoadQuery::new(self, Query::new(consistency))
173    }
174
175    ///
176    /// Delete Query
177    /// Create a fluent, session-bound delete query with default consistency.
178    ///
179    #[must_use]
180    pub const fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
181    where
182        E: EntityKind<Canister = C>,
183    {
184        SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
185    }
186
187    ///
188    /// Delete Query With Consistency
189    /// Create a fluent, session-bound delete query with explicit consistency.
190    ///
191    #[must_use]
192    pub const fn delete_with_consistency<E>(
193        &self,
194        consistency: ReadConsistency,
195    ) -> SessionDeleteQuery<'_, C, E>
196    where
197        E: EntityKind<Canister = C>,
198    {
199        SessionDeleteQuery::new(self, Query::new(consistency).delete())
200    }
201
202    //
203    // Low-level executors
204    //
205
206    /// Get a [`LoadExecutor`] for executing planned load queries.
207    /// Note: executor methods do not apply the session metrics override.
208    #[must_use]
209    pub const fn load_executor<E>(&self) -> LoadExecutor<E>
210    where
211        E: EntityKind<Canister = C>,
212    {
213        LoadExecutor::new(self.db, self.debug)
214    }
215
216    /// Get a [`SaveExecutor`] for inserting or updating entities.
217    ///
218    /// Normally you will use the higher-level `insert/replace/update` shortcuts instead.
219    /// Note: executor methods do not apply the session metrics override.
220    #[must_use]
221    pub const fn save<E>(&self) -> SaveExecutor<E>
222    where
223        E: EntityKind<Canister = C>,
224    {
225        SaveExecutor::new(self.db, self.debug)
226    }
227
228    /// Get an [`UpsertExecutor`] for inserting or updating by a unique index.
229    /// Note: executor methods do not apply the session metrics override.
230    #[must_use]
231    pub const fn upsert<E>(&self) -> UpsertExecutor<E>
232    where
233        E: EntityKind<Canister = C>,
234        E::PrimaryKey: FromKey,
235    {
236        UpsertExecutor::new(self.db, self.debug)
237    }
238
239    /// Get a [`DeleteExecutor`] for deleting entities by key or query.
240    /// Note: executor methods do not apply the session metrics override.
241    #[must_use]
242    pub const fn delete_executor<E>(&self) -> DeleteExecutor<E>
243    where
244        E: EntityKind<Canister = C>,
245    {
246        DeleteExecutor::new(self.db, self.debug)
247    }
248
249    //
250    // Query diagnostics
251    //
252
253    /// Plan and return diagnostics for a query without executing it.
254    pub fn diagnose_query<E: EntityKind<Canister = C>>(
255        &self,
256        query: &Query<E>,
257    ) -> Result<QueryDiagnostics, QueryError> {
258        let explain = query.explain()?;
259
260        Ok(QueryDiagnostics::from(explain))
261    }
262
263    /// Execute a query intent using session policy and executor routing.
264    pub fn execute_query<E: EntityKind<Canister = C>>(
265        &self,
266        query: &Query<E>,
267    ) -> Result<Response<E>, QueryError> {
268        let plan = query.plan()?;
269        let result = match query.mode() {
270            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
271            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
272        };
273
274        result.map_err(QueryError::Execute)
275    }
276
277    /// Execute a query and return per-execution diagnostics.
278    pub fn execute_with_diagnostics<E: EntityKind<Canister = C>>(
279        &self,
280        query: &Query<E>,
281    ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError> {
282        let plan = query.plan()?;
283        let fingerprint = plan.fingerprint();
284        let access = Some(trace_access_from_plan(plan.access()));
285        let executor = match query.mode() {
286            QueryMode::Load(_) => QueryTraceExecutorKind::Load,
287            QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
288        };
289        let start = start_event(fingerprint, access, executor);
290
291        let result = match query.mode() {
292            QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
293            QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
294        };
295        match result {
296            Ok(response) => {
297                let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
298                let finish = finish_event(fingerprint, access, executor, rows);
299                let diagnostics = QueryExecutionDiagnostics {
300                    fingerprint,
301                    events: vec![start, finish],
302                };
303
304                Ok((response, diagnostics))
305            }
306
307            Err(err) => Err(QueryError::Execute(err)),
308        }
309    }
310
311    //
312    // High-level save shortcuts
313    //
314
315    /// Insert a new entity, returning the stored value.
316    pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
317    where
318        E: EntityKind<Canister = C>,
319    {
320        self.with_metrics(|| self.save::<E>().insert(entity))
321    }
322
323    /// Insert multiple entities, returning stored values.
324    pub fn insert_many<E>(
325        &self,
326        entities: impl IntoIterator<Item = E>,
327    ) -> Result<Vec<E>, InternalError>
328    where
329        E: EntityKind<Canister = C>,
330    {
331        self.with_metrics(|| self.save::<E>().insert_many(entities))
332    }
333
334    /// Replace an existing entity or insert it if it does not yet exist.
335    pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
336    where
337        E: EntityKind<Canister = C>,
338    {
339        self.with_metrics(|| self.save::<E>().replace(entity))
340    }
341
342    /// Replace multiple entities, inserting if missing.
343    pub fn replace_many<E>(
344        &self,
345        entities: impl IntoIterator<Item = E>,
346    ) -> Result<Vec<E>, InternalError>
347    where
348        E: EntityKind<Canister = C>,
349    {
350        self.with_metrics(|| self.save::<E>().replace_many(entities))
351    }
352
353    /// Partially update an existing entity.
354    pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
355    where
356        E: EntityKind<Canister = C>,
357    {
358        self.with_metrics(|| self.save::<E>().update(entity))
359    }
360
361    /// Partially update multiple existing entities.
362    pub fn update_many<E>(
363        &self,
364        entities: impl IntoIterator<Item = E>,
365    ) -> Result<Vec<E>, InternalError>
366    where
367        E: EntityKind<Canister = C>,
368    {
369        self.with_metrics(|| self.save::<E>().update_many(entities))
370    }
371
372    /// Insert a new view value for an entity.
373    pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
374    where
375        E: EntityKind<Canister = C>,
376    {
377        self.with_metrics(|| self.save::<E>().insert_view(view))
378    }
379
380    /// Replace an existing view or insert it if it does not yet exist.
381    pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
382    where
383        E: EntityKind<Canister = C>,
384    {
385        self.with_metrics(|| self.save::<E>().replace_view(view))
386    }
387
388    /// Partially update an existing view.
389    pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
390    where
391        E: EntityKind<Canister = C>,
392    {
393        self.with_metrics(|| self.save::<E>().update_view(view))
394    }
395}