1mod commit;
2pub mod cursor;
3pub(crate) mod executor;
4pub mod identity;
5pub mod index;
6pub mod query;
7mod relation;
8pub mod response;
9pub mod store;
10
11pub use commit::*;
12pub use relation::{StrongRelationDeleteValidateFn, validate_delete_strong_relations_for_source};
13
14use crate::{
15 db::{
16 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
17 query::{
18 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19 plan::PlanError,
20 },
21 response::{Response, WriteBatchResponse, WriteResponse},
22 store::{RawDataKey, StoreRegistry},
23 },
24 error::InternalError,
25 obs::sink::{self, MetricsSink},
26 traits::{CanisterKind, EntityKind, EntityValue},
27};
28use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
29
30#[cfg(test)]
31use crate::db::{index::IndexStore, store::DataStore};
32
33pub struct Db<C: CanisterKind> {
39 store: &'static LocalKey<StoreRegistry>,
40 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
41 _marker: PhantomData<C>,
42}
43
44impl<C: CanisterKind> Db<C> {
45 #[must_use]
46 pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
47 Self::new_with_hooks(store, &[])
48 }
49
50 #[must_use]
51 pub const fn new_with_hooks(
52 store: &'static LocalKey<StoreRegistry>,
53 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
54 ) -> Self {
55 Self {
56 store,
57 entity_runtime_hooks,
58 _marker: PhantomData,
59 }
60 }
61
62 #[must_use]
63 pub(crate) const fn context<E>(&self) -> Context<'_, E>
64 where
65 E: EntityKind<Canister = C> + EntityValue,
66 {
67 Context::new(self)
68 }
69
70 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
75 where
76 E: EntityKind<Canister = C> + EntityValue,
77 {
78 ensure_recovered(self)?;
79
80 Ok(Context::new(self))
81 }
82
83 pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
84 self.store.with(|reg| f(reg))
85 }
86
87 pub(crate) fn prepare_row_commit_op(
88 &self,
89 op: &CommitRowOp,
90 ) -> Result<PreparedRowCommitOp, InternalError> {
91 let hooks = self
92 .entity_runtime_hooks
93 .iter()
94 .find(|hooks| hooks.entity_path == op.entity_path.as_str())
95 .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
96
97 (hooks.prepare_row_commit)(self, op)
98 }
99
100 pub(crate) fn validate_delete_strong_relations(
102 &self,
103 target_path: &str,
104 deleted_target_keys: &BTreeSet<RawDataKey>,
105 ) -> Result<(), InternalError> {
106 if deleted_target_keys.is_empty() {
107 return Ok(());
108 }
109
110 for hooks in self.entity_runtime_hooks {
111 (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
112 }
113
114 Ok(())
115 }
116}
117
118pub struct EntityRuntimeHooks<C: CanisterKind> {
126 pub entity_path: &'static str,
127 pub prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
128 pub validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
129}
130
131impl<C: CanisterKind> EntityRuntimeHooks<C> {
132 #[must_use]
133 pub const fn new(
134 entity_path: &'static str,
135 prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
136 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
137 ) -> Self {
138 Self {
139 entity_path,
140 prepare_row_commit,
141 validate_delete_strong_relations,
142 }
143 }
144}
145
146impl<C: CanisterKind> Copy for Db<C> {}
147
148impl<C: CanisterKind> Clone for Db<C> {
149 fn clone(&self) -> Self {
150 *self
151 }
152}
153
154pub struct DbSession<C: CanisterKind> {
160 db: Db<C>,
161 debug: bool,
162 metrics: Option<&'static dyn MetricsSink>,
163}
164
165impl<C: CanisterKind> DbSession<C> {
166 #[must_use]
167 pub const fn new(db: Db<C>) -> Self {
168 Self {
169 db,
170 debug: false,
171 metrics: None,
172 }
173 }
174
175 #[must_use]
176 pub const fn debug(mut self) -> Self {
177 self.debug = true;
178 self
179 }
180
181 #[must_use]
182 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
183 self.metrics = Some(sink);
184 self
185 }
186
187 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
188 if let Some(sink) = self.metrics {
189 sink::with_metrics_sink(sink, f)
190 } else {
191 f()
192 }
193 }
194
195 #[must_use]
200 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
201 where
202 E: EntityKind<Canister = C>,
203 {
204 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
205 }
206
207 #[must_use]
208 pub const fn load_with_consistency<E>(
209 &self,
210 consistency: ReadConsistency,
211 ) -> SessionLoadQuery<'_, C, E>
212 where
213 E: EntityKind<Canister = C>,
214 {
215 SessionLoadQuery::new(self, Query::new(consistency))
216 }
217
218 #[must_use]
219 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
220 where
221 E: EntityKind<Canister = C>,
222 {
223 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
224 }
225
226 #[must_use]
227 pub fn delete_with_consistency<E>(
228 &self,
229 consistency: ReadConsistency,
230 ) -> SessionDeleteQuery<'_, C, E>
231 where
232 E: EntityKind<Canister = C>,
233 {
234 SessionDeleteQuery::new(self, Query::new(consistency).delete())
235 }
236
237 #[must_use]
242 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
243 where
244 E: EntityKind<Canister = C> + EntityValue,
245 {
246 LoadExecutor::new(self.db, self.debug)
247 }
248
249 #[must_use]
250 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
251 where
252 E: EntityKind<Canister = C> + EntityValue,
253 {
254 DeleteExecutor::new(self.db, self.debug)
255 }
256
257 #[must_use]
258 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
259 where
260 E: EntityKind<Canister = C> + EntityValue,
261 {
262 SaveExecutor::new(self.db, self.debug)
263 }
264
265 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
270 where
271 E: EntityKind<Canister = C> + EntityValue,
272 {
273 let plan = query.plan()?;
274
275 let result = match query.mode() {
276 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
277 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
278 };
279
280 result.map_err(QueryError::Execute)
281 }
282
283 pub(crate) fn execute_load_query_paged<E>(
284 &self,
285 query: &Query<E>,
286 cursor_token: Option<&str>,
287 ) -> Result<(Response<E>, Option<Vec<u8>>), QueryError>
288 where
289 E: EntityKind<Canister = C> + EntityValue,
290 {
291 let plan = query.plan()?;
292 let cursor_bytes = match cursor_token {
293 Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
294 QueryError::from(PlanError::InvalidContinuationCursor { reason })
295 })?),
296 None => None,
297 };
298 let boundary = plan.plan_cursor_boundary(cursor_bytes.as_deref())?;
299
300 let page = self
301 .with_metrics(|| self.load_executor::<E>().execute_paged(plan, boundary))
302 .map_err(QueryError::Execute)?;
303
304 Ok((page.items, page.next_cursor))
305 }
306
307 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
312 where
313 E: EntityKind<Canister = C> + EntityValue,
314 {
315 self.with_metrics(|| self.save_executor::<E>().insert(entity))
316 .map(WriteResponse::new)
317 }
318
319 pub fn insert_many_atomic<E>(
325 &self,
326 entities: impl IntoIterator<Item = E>,
327 ) -> Result<WriteBatchResponse<E>, InternalError>
328 where
329 E: EntityKind<Canister = C> + EntityValue,
330 {
331 let entities =
332 self.with_metrics(|| self.save_executor::<E>().insert_many_atomic(entities))?;
333
334 Ok(WriteBatchResponse::new(entities))
335 }
336
337 pub fn insert_many_non_atomic<E>(
341 &self,
342 entities: impl IntoIterator<Item = E>,
343 ) -> Result<WriteBatchResponse<E>, InternalError>
344 where
345 E: EntityKind<Canister = C> + EntityValue,
346 {
347 let entities =
348 self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
349
350 Ok(WriteBatchResponse::new(entities))
351 }
352
353 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
354 where
355 E: EntityKind<Canister = C> + EntityValue,
356 {
357 self.with_metrics(|| self.save_executor::<E>().replace(entity))
358 .map(WriteResponse::new)
359 }
360
361 pub fn replace_many_atomic<E>(
367 &self,
368 entities: impl IntoIterator<Item = E>,
369 ) -> Result<WriteBatchResponse<E>, InternalError>
370 where
371 E: EntityKind<Canister = C> + EntityValue,
372 {
373 let entities =
374 self.with_metrics(|| self.save_executor::<E>().replace_many_atomic(entities))?;
375
376 Ok(WriteBatchResponse::new(entities))
377 }
378
379 pub fn replace_many_non_atomic<E>(
383 &self,
384 entities: impl IntoIterator<Item = E>,
385 ) -> Result<WriteBatchResponse<E>, InternalError>
386 where
387 E: EntityKind<Canister = C> + EntityValue,
388 {
389 let entities =
390 self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
391
392 Ok(WriteBatchResponse::new(entities))
393 }
394
395 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
396 where
397 E: EntityKind<Canister = C> + EntityValue,
398 {
399 self.with_metrics(|| self.save_executor::<E>().update(entity))
400 .map(WriteResponse::new)
401 }
402
403 pub fn update_many_atomic<E>(
409 &self,
410 entities: impl IntoIterator<Item = E>,
411 ) -> Result<WriteBatchResponse<E>, InternalError>
412 where
413 E: EntityKind<Canister = C> + EntityValue,
414 {
415 let entities =
416 self.with_metrics(|| self.save_executor::<E>().update_many_atomic(entities))?;
417
418 Ok(WriteBatchResponse::new(entities))
419 }
420
421 pub fn update_many_non_atomic<E>(
425 &self,
426 entities: impl IntoIterator<Item = E>,
427 ) -> Result<WriteBatchResponse<E>, InternalError>
428 where
429 E: EntityKind<Canister = C> + EntityValue,
430 {
431 let entities =
432 self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
433
434 Ok(WriteBatchResponse::new(entities))
435 }
436
437 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
438 where
439 E: EntityKind<Canister = C> + EntityValue,
440 {
441 self.with_metrics(|| self.save_executor::<E>().insert_view(view))
442 }
443
444 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
445 where
446 E: EntityKind<Canister = C> + EntityValue,
447 {
448 self.with_metrics(|| self.save_executor::<E>().replace_view(view))
449 }
450
451 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
452 where
453 E: EntityKind<Canister = C> + EntityValue,
454 {
455 self.with_metrics(|| self.save_executor::<E>().update_view(view))
456 }
457
458 #[cfg(test)]
460 #[doc(hidden)]
461 pub fn clear_stores_for_tests(&self) {
462 self.db.with_store_registry(|reg| {
463 for (_, store) in reg.iter() {
464 store.with_data_mut(DataStore::clear);
465 store.with_index_mut(IndexStore::clear);
466 }
467 });
468 }
469}