1pub(crate) mod cursor;
3pub(crate) mod diagnostics;
4pub(crate) mod identity;
5pub(crate) mod query;
6pub(crate) mod registry;
7pub(crate) mod response;
8
9pub(in crate::db) mod codec;
10pub(in crate::db) mod commit;
11pub(in crate::db) mod data;
12pub(in crate::db) mod entity_decode;
13pub(in crate::db) mod executor;
14pub(in crate::db) mod index;
15pub(in crate::db) mod relation;
16
17pub use cursor::encode_cursor;
19pub use data::DataStore;
20pub(crate) use data::StorageKey;
21pub use diagnostics::StorageReport;
22pub use identity::{EntityName, IndexName};
23pub use index::IndexStore;
24#[cfg(test)]
25pub(crate) use index::hash_value;
26pub use query::{
27 ReadConsistency,
28 builder::field::FieldRef,
29 expr::{FilterExpr, SortExpr},
30 intent::{IntentError, Query, QueryError},
31 plan::{OrderDirection, PlanError},
32 predicate::{
33 CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature, ValidateError,
34 },
35 session::{
36 delete::SessionDeleteQuery,
37 load::{PagedLoadQuery, SessionLoadQuery},
38 },
39};
40pub use registry::StoreRegistry;
41pub use relation::validate_delete_strong_relations_for_source;
42pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
43
44use crate::{
46 db::{
47 commit::{
48 CommitRowOp, PreparedRowCommitOp, ensure_recovered, prepare_row_commit_for_entity,
49 },
50 data::RawDataKey,
51 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
52 query::intent::QueryMode,
53 relation::StrongRelationDeleteValidateFn,
54 },
55 error::{ErrorClass, ErrorOrigin, InternalError},
56 obs::sink::{MetricsSink, with_metrics_sink},
57 traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
58};
59use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
60
61pub struct Db<C: CanisterKind> {
67 store: &'static LocalKey<StoreRegistry>,
68 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
69 _marker: PhantomData<C>,
70}
71
72impl<C: CanisterKind> Db<C> {
73 #[must_use]
74 pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
75 Self::new_with_hooks(store, &[])
76 }
77
78 #[must_use]
79 pub const fn new_with_hooks(
80 store: &'static LocalKey<StoreRegistry>,
81 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
82 ) -> Self {
83 Self {
84 store,
85 entity_runtime_hooks,
86 _marker: PhantomData,
87 }
88 }
89
90 #[must_use]
91 pub(crate) const fn context<E>(&self) -> Context<'_, E>
92 where
93 E: EntityKind<Canister = C> + EntityValue,
94 {
95 Context::new(self)
96 }
97
98 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
103 where
104 E: EntityKind<Canister = C> + EntityValue,
105 {
106 ensure_recovered(self)?;
107
108 Ok(Context::new(self))
109 }
110
111 pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
113 ensure_recovered(self)
114 }
115
116 pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
117 self.store.with(|reg| f(reg))
118 }
119
120 pub fn storage_report(
121 &self,
122 name_to_path: &[(&'static str, &'static str)],
123 ) -> Result<StorageReport, InternalError> {
124 diagnostics::storage_report(self, name_to_path)
125 }
126
127 pub(in crate::db) fn prepare_row_commit_op(
128 &self,
129 op: &CommitRowOp,
130 ) -> Result<PreparedRowCommitOp, InternalError> {
131 let hooks = self
132 .entity_runtime_hooks
133 .iter()
134 .find(|hooks| hooks.entity_path == op.entity_path.as_str())
135 .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
136
137 (hooks.prepare_row_commit)(self, op)
138 }
139
140 pub(crate) fn validate_delete_strong_relations(
142 &self,
143 target_path: &str,
144 deleted_target_keys: &BTreeSet<RawDataKey>,
145 ) -> Result<(), InternalError> {
146 if deleted_target_keys.is_empty() {
147 return Ok(());
148 }
149
150 for hooks in self.entity_runtime_hooks {
151 (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
152 }
153
154 Ok(())
155 }
156}
157
158pub struct EntityRuntimeHooks<C: CanisterKind> {
166 pub(crate) entity_name: &'static str,
167 pub(crate) entity_path: &'static str,
168 pub(in crate::db) prepare_row_commit:
169 fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
170 pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
171}
172
173impl<C: CanisterKind> EntityRuntimeHooks<C> {
174 #[must_use]
175 pub(in crate::db) const fn new(
176 entity_name: &'static str,
177 entity_path: &'static str,
178 prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
179 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
180 ) -> Self {
181 Self {
182 entity_name,
183 entity_path,
184 prepare_row_commit,
185 validate_delete_strong_relations,
186 }
187 }
188
189 #[must_use]
190 pub const fn for_entity<E>(
191 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
192 ) -> Self
193 where
194 E: EntityKind<Canister = C> + EntityValue,
195 {
196 Self::new(
197 <E as EntityIdentity>::ENTITY_NAME,
198 E::PATH,
199 prepare_row_commit_for_entity::<E>,
200 validate_delete_strong_relations,
201 )
202 }
203}
204
205impl<C: CanisterKind> Db<C> {
206 #[must_use]
207 pub(crate) const fn has_runtime_hooks(&self) -> bool {
208 !self.entity_runtime_hooks.is_empty()
209 }
210
211 pub(crate) fn runtime_hook_for_entity_name(
212 &self,
213 entity_name: &str,
214 ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
215 let mut matched = None;
216 for hooks in self.entity_runtime_hooks {
217 if hooks.entity_name != entity_name {
218 continue;
219 }
220
221 if matched.is_some() {
222 return Err(InternalError::new(
223 ErrorClass::InvariantViolation,
224 ErrorOrigin::Store,
225 format!("duplicate runtime hooks for entity name '{entity_name}'"),
226 ));
227 }
228
229 matched = Some(hooks);
230 }
231
232 matched.ok_or_else(|| {
233 InternalError::new(
234 ErrorClass::Unsupported,
235 ErrorOrigin::Store,
236 format!("unsupported entity name in data store: '{entity_name}'"),
237 )
238 })
239 }
240}
241
242impl<C: CanisterKind> Copy for Db<C> {}
243
244impl<C: CanisterKind> Clone for Db<C> {
245 fn clone(&self) -> Self {
246 *self
247 }
248}
249
250pub struct DbSession<C: CanisterKind> {
256 db: Db<C>,
257 debug: bool,
258 metrics: Option<&'static dyn MetricsSink>,
259}
260
261impl<C: CanisterKind> DbSession<C> {
262 #[must_use]
263 pub const fn new(db: Db<C>) -> Self {
264 Self {
265 db,
266 debug: false,
267 metrics: None,
268 }
269 }
270
271 #[must_use]
272 pub const fn debug(mut self) -> Self {
273 self.debug = true;
274 self
275 }
276
277 #[must_use]
278 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
279 self.metrics = Some(sink);
280 self
281 }
282
283 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
284 if let Some(sink) = self.metrics {
285 with_metrics_sink(sink, f)
286 } else {
287 f()
288 }
289 }
290
291 fn execute_save_with<E, T, R>(
293 &self,
294 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
295 map: impl FnOnce(T) -> R,
296 ) -> Result<R, InternalError>
297 where
298 E: EntityKind<Canister = C> + EntityValue,
299 {
300 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
301
302 Ok(map(value))
303 }
304
305 fn execute_save_entity<E>(
307 &self,
308 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
309 ) -> Result<WriteResponse<E>, InternalError>
310 where
311 E: EntityKind<Canister = C> + EntityValue,
312 {
313 self.execute_save_with(op, WriteResponse::new)
314 }
315
316 fn execute_save_batch<E>(
317 &self,
318 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
319 ) -> Result<WriteBatchResponse<E>, InternalError>
320 where
321 E: EntityKind<Canister = C> + EntityValue,
322 {
323 self.execute_save_with(op, WriteBatchResponse::new)
324 }
325
326 fn execute_save_view<E>(
327 &self,
328 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
329 ) -> Result<E::ViewType, InternalError>
330 where
331 E: EntityKind<Canister = C> + EntityValue,
332 {
333 self.execute_save_with(op, std::convert::identity)
334 }
335
336 #[must_use]
341 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
342 where
343 E: EntityKind<Canister = C>,
344 {
345 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
346 }
347
348 #[must_use]
349 pub const fn load_with_consistency<E>(
350 &self,
351 consistency: ReadConsistency,
352 ) -> SessionLoadQuery<'_, C, E>
353 where
354 E: EntityKind<Canister = C>,
355 {
356 SessionLoadQuery::new(self, Query::new(consistency))
357 }
358
359 #[must_use]
360 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
361 where
362 E: EntityKind<Canister = C>,
363 {
364 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
365 }
366
367 #[must_use]
368 pub fn delete_with_consistency<E>(
369 &self,
370 consistency: ReadConsistency,
371 ) -> SessionDeleteQuery<'_, C, E>
372 where
373 E: EntityKind<Canister = C>,
374 {
375 SessionDeleteQuery::new(self, Query::new(consistency).delete())
376 }
377
378 #[must_use]
383 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
384 where
385 E: EntityKind<Canister = C> + EntityValue,
386 {
387 LoadExecutor::new(self.db, self.debug)
388 }
389
390 #[must_use]
391 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
392 where
393 E: EntityKind<Canister = C> + EntityValue,
394 {
395 DeleteExecutor::new(self.db, self.debug)
396 }
397
398 #[must_use]
399 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
400 where
401 E: EntityKind<Canister = C> + EntityValue,
402 {
403 SaveExecutor::new(self.db, self.debug)
404 }
405
406 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
411 where
412 E: EntityKind<Canister = C> + EntityValue,
413 {
414 let plan = query.plan()?;
415
416 let result = match query.mode() {
417 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
418 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
419 };
420
421 result.map_err(QueryError::Execute)
422 }
423
424 pub(crate) fn execute_load_query_paged<E>(
425 &self,
426 query: &Query<E>,
427 cursor_token: Option<&str>,
428 ) -> Result<(Response<E>, Option<Vec<u8>>), QueryError>
429 where
430 E: EntityKind<Canister = C> + EntityValue,
431 {
432 let plan = query.plan()?;
433 let cursor_bytes = match cursor_token {
434 Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
435 QueryError::from(PlanError::InvalidContinuationCursor { reason })
436 })?),
437 None => None,
438 };
439 let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
440
441 let page = self
442 .with_metrics(|| {
443 self.load_executor::<E>()
444 .execute_paged_with_cursor(plan, cursor)
445 })
446 .map_err(QueryError::Execute)?;
447
448 Ok((page.items, page.next_cursor))
449 }
450
451 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
456 where
457 E: EntityKind<Canister = C> + EntityValue,
458 {
459 self.execute_save_entity(|save| save.insert(entity))
460 }
461
462 pub fn insert_many_atomic<E>(
468 &self,
469 entities: impl IntoIterator<Item = E>,
470 ) -> Result<WriteBatchResponse<E>, InternalError>
471 where
472 E: EntityKind<Canister = C> + EntityValue,
473 {
474 self.execute_save_batch(|save| save.insert_many_atomic(entities))
475 }
476
477 pub fn insert_many_non_atomic<E>(
481 &self,
482 entities: impl IntoIterator<Item = E>,
483 ) -> Result<WriteBatchResponse<E>, InternalError>
484 where
485 E: EntityKind<Canister = C> + EntityValue,
486 {
487 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
488 }
489
490 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
491 where
492 E: EntityKind<Canister = C> + EntityValue,
493 {
494 self.execute_save_entity(|save| save.replace(entity))
495 }
496
497 pub fn replace_many_atomic<E>(
503 &self,
504 entities: impl IntoIterator<Item = E>,
505 ) -> Result<WriteBatchResponse<E>, InternalError>
506 where
507 E: EntityKind<Canister = C> + EntityValue,
508 {
509 self.execute_save_batch(|save| save.replace_many_atomic(entities))
510 }
511
512 pub fn replace_many_non_atomic<E>(
516 &self,
517 entities: impl IntoIterator<Item = E>,
518 ) -> Result<WriteBatchResponse<E>, InternalError>
519 where
520 E: EntityKind<Canister = C> + EntityValue,
521 {
522 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
523 }
524
525 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
526 where
527 E: EntityKind<Canister = C> + EntityValue,
528 {
529 self.execute_save_entity(|save| save.update(entity))
530 }
531
532 pub fn update_many_atomic<E>(
538 &self,
539 entities: impl IntoIterator<Item = E>,
540 ) -> Result<WriteBatchResponse<E>, InternalError>
541 where
542 E: EntityKind<Canister = C> + EntityValue,
543 {
544 self.execute_save_batch(|save| save.update_many_atomic(entities))
545 }
546
547 pub fn update_many_non_atomic<E>(
551 &self,
552 entities: impl IntoIterator<Item = E>,
553 ) -> Result<WriteBatchResponse<E>, InternalError>
554 where
555 E: EntityKind<Canister = C> + EntityValue,
556 {
557 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
558 }
559
560 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
561 where
562 E: EntityKind<Canister = C> + EntityValue,
563 {
564 self.execute_save_view::<E>(|save| save.insert_view(view))
565 }
566
567 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
568 where
569 E: EntityKind<Canister = C> + EntityValue,
570 {
571 self.execute_save_view::<E>(|save| save.replace_view(view))
572 }
573
574 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
575 where
576 E: EntityKind<Canister = C> + EntityValue,
577 {
578 self.execute_save_view::<E>(|save| save.update_view(view))
579 }
580
581 #[cfg(test)]
583 #[doc(hidden)]
584 pub fn clear_stores_for_tests(&self) {
585 self.db.with_store_registry(|reg| {
586 for (_, store) in reg.iter() {
587 store.with_data_mut(DataStore::clear);
588 store.with_index_mut(IndexStore::clear);
589 }
590 });
591 }
592}