1mod commit;
2pub mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8pub mod traits;
9pub mod types;
10mod write;
11
12pub(crate) use commit::*;
13pub(crate) use write::WriteUnit;
14
15use crate::{
16 db::{
17 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor, UpsertExecutor},
18 index::IndexStoreRegistry,
19 query::{
20 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
21 diagnostics::{
22 QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
23 start_event, trace_access_from_plan,
24 },
25 },
26 response::Response,
27 store::DataStoreRegistry,
28 traits::FromKey,
29 },
30 error::InternalError,
31 obs::sink::{self, MetricsSink},
32 traits::{CanisterKind, EntityKind},
33};
34use std::{marker::PhantomData, thread::LocalKey};
35
36pub struct Db<C: CanisterKind> {
52 data: &'static LocalKey<DataStoreRegistry>,
53 index: &'static LocalKey<IndexStoreRegistry>,
54 _marker: PhantomData<C>,
55}
56
57impl<C: CanisterKind> Db<C> {
58 #[must_use]
59 pub const fn new(
60 data: &'static LocalKey<DataStoreRegistry>,
61 index: &'static LocalKey<IndexStoreRegistry>,
62 ) -> Self {
63 Self {
64 data,
65 index,
66 _marker: PhantomData,
67 }
68 }
69
70 #[must_use]
71 pub const fn context<E>(&self) -> Context<'_, E>
72 where
73 E: EntityKind<Canister = C>,
74 {
75 Context::new(self)
76 }
77
78 pub fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
80 self.data.with(|reg| f(reg))
81 }
82
83 pub fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
85 self.index.with(|reg| f(reg))
86 }
87}
88
89impl<C: CanisterKind> Copy for Db<C> {}
93
94impl<C: CanisterKind> Clone for Db<C> {
95 fn clone(&self) -> Self {
96 *self
97 }
98}
99
100pub struct DbSession<C: CanisterKind> {
106 db: Db<C>,
107 debug: bool,
108 metrics: Option<&'static dyn MetricsSink>,
109}
110
111impl<C: CanisterKind> DbSession<C> {
112 #[must_use]
113 pub const fn new(db: Db<C>) -> Self {
115 Self {
116 db,
117 debug: false,
118 metrics: None,
119 }
120 }
121
122 #[must_use]
123 pub const fn debug(mut self) -> Self {
125 self.debug = true;
126 self
127 }
128
129 #[must_use]
130 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
132 self.metrics = Some(sink);
133 self
134 }
135
136 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
137 if let Some(sink) = self.metrics {
138 sink::with_metrics_sink(sink, f)
139 } else {
140 f()
141 }
142 }
143
144 #[must_use]
153 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
154 where
155 E: EntityKind<Canister = C>,
156 {
157 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
158 }
159
160 #[must_use]
165 pub const fn load_with_consistency<E>(
166 &self,
167 consistency: ReadConsistency,
168 ) -> SessionLoadQuery<'_, C, E>
169 where
170 E: EntityKind<Canister = C>,
171 {
172 SessionLoadQuery::new(self, Query::new(consistency))
173 }
174
175 #[must_use]
180 pub const fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
181 where
182 E: EntityKind<Canister = C>,
183 {
184 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
185 }
186
187 #[must_use]
192 pub const fn delete_with_consistency<E>(
193 &self,
194 consistency: ReadConsistency,
195 ) -> SessionDeleteQuery<'_, C, E>
196 where
197 E: EntityKind<Canister = C>,
198 {
199 SessionDeleteQuery::new(self, Query::new(consistency).delete())
200 }
201
202 #[must_use]
209 pub const fn load_executor<E>(&self) -> LoadExecutor<E>
210 where
211 E: EntityKind<Canister = C>,
212 {
213 LoadExecutor::new(self.db, self.debug)
214 }
215
216 #[must_use]
221 pub const fn save<E>(&self) -> SaveExecutor<E>
222 where
223 E: EntityKind<Canister = C>,
224 {
225 SaveExecutor::new(self.db, self.debug)
226 }
227
228 #[must_use]
231 pub const fn upsert<E>(&self) -> UpsertExecutor<E>
232 where
233 E: EntityKind<Canister = C>,
234 E::PrimaryKey: FromKey,
235 {
236 UpsertExecutor::new(self.db, self.debug)
237 }
238
239 #[must_use]
242 pub const fn delete_executor<E>(&self) -> DeleteExecutor<E>
243 where
244 E: EntityKind<Canister = C>,
245 {
246 DeleteExecutor::new(self.db, self.debug)
247 }
248
249 pub fn diagnose_query<E: EntityKind<Canister = C>>(
255 &self,
256 query: &Query<E>,
257 ) -> Result<QueryDiagnostics, QueryError> {
258 let explain = query.explain()?;
259
260 Ok(QueryDiagnostics::from(explain))
261 }
262
263 pub fn execute_query<E: EntityKind<Canister = C>>(
265 &self,
266 query: &Query<E>,
267 ) -> Result<Response<E>, QueryError> {
268 let plan = query.plan()?;
269 let result = match query.mode() {
270 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
271 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
272 };
273
274 result.map_err(QueryError::Execute)
275 }
276
277 pub fn execute_with_diagnostics<E: EntityKind<Canister = C>>(
279 &self,
280 query: &Query<E>,
281 ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError> {
282 let plan = query.plan()?;
283 let fingerprint = plan.fingerprint();
284 let access = Some(trace_access_from_plan(plan.access()));
285 let executor = match query.mode() {
286 QueryMode::Load(_) => QueryTraceExecutorKind::Load,
287 QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
288 };
289 let start = start_event(fingerprint, access, executor);
290
291 let result = match query.mode() {
292 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
293 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
294 };
295 match result {
296 Ok(response) => {
297 let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
298 let finish = finish_event(fingerprint, access, executor, rows);
299 let diagnostics = QueryExecutionDiagnostics {
300 fingerprint,
301 events: vec![start, finish],
302 };
303
304 Ok((response, diagnostics))
305 }
306
307 Err(err) => Err(QueryError::Execute(err)),
308 }
309 }
310
311 pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
317 where
318 E: EntityKind<Canister = C>,
319 {
320 self.with_metrics(|| self.save::<E>().insert(entity))
321 }
322
323 pub fn insert_many<E>(
325 &self,
326 entities: impl IntoIterator<Item = E>,
327 ) -> Result<Vec<E>, InternalError>
328 where
329 E: EntityKind<Canister = C>,
330 {
331 self.with_metrics(|| self.save::<E>().insert_many(entities))
332 }
333
334 pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
336 where
337 E: EntityKind<Canister = C>,
338 {
339 self.with_metrics(|| self.save::<E>().replace(entity))
340 }
341
342 pub fn replace_many<E>(
344 &self,
345 entities: impl IntoIterator<Item = E>,
346 ) -> Result<Vec<E>, InternalError>
347 where
348 E: EntityKind<Canister = C>,
349 {
350 self.with_metrics(|| self.save::<E>().replace_many(entities))
351 }
352
353 pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
355 where
356 E: EntityKind<Canister = C>,
357 {
358 self.with_metrics(|| self.save::<E>().update(entity))
359 }
360
361 pub fn update_many<E>(
363 &self,
364 entities: impl IntoIterator<Item = E>,
365 ) -> Result<Vec<E>, InternalError>
366 where
367 E: EntityKind<Canister = C>,
368 {
369 self.with_metrics(|| self.save::<E>().update_many(entities))
370 }
371
372 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
374 where
375 E: EntityKind<Canister = C>,
376 {
377 self.with_metrics(|| self.save::<E>().insert_view(view))
378 }
379
380 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
382 where
383 E: EntityKind<Canister = C>,
384 {
385 self.with_metrics(|| self.save::<E>().replace_view(view))
386 }
387
388 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
390 where
391 E: EntityKind<Canister = C>,
392 {
393 self.with_metrics(|| self.save::<E>().update_view(view))
394 }
395}