1pub(crate) mod cursor;
3pub(crate) mod diagnostics;
4pub(crate) mod identity;
5pub(crate) mod query;
6pub(crate) mod registry;
7pub(crate) mod response;
8
9pub(in crate::db) mod codec;
10pub(in crate::db) mod commit;
11pub(in crate::db) mod data;
12pub(in crate::db) mod entity_decode;
13pub(in crate::db) mod executor;
14pub(in crate::db) mod index;
15pub(in crate::db) mod relation;
16
17pub use cursor::encode_cursor;
19pub use data::DataStore;
20pub(crate) use data::StorageKey;
21pub use diagnostics::StorageReport;
22pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
23pub use identity::{EntityName, IndexName};
24pub use index::IndexStore;
25#[cfg(test)]
26pub(crate) use index::hash_value;
27pub use query::{
28 ReadConsistency,
29 builder::field::FieldRef,
30 expr::{FilterExpr, SortExpr},
31 intent::{IntentError, Query, QueryError},
32 plan::{OrderDirection, PlanError},
33 predicate::{
34 CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature, ValidateError,
35 },
36 session::{
37 delete::SessionDeleteQuery,
38 load::{PagedLoadQuery, SessionLoadQuery},
39 },
40};
41pub use registry::StoreRegistry;
42pub use relation::validate_delete_strong_relations_for_source;
43pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
44
45use crate::{
47 db::{
48 commit::{
49 CommitRowOp, PreparedRowCommitOp, ensure_recovered, prepare_row_commit_for_entity,
50 },
51 data::RawDataKey,
52 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
53 query::intent::QueryMode,
54 relation::StrongRelationDeleteValidateFn,
55 },
56 error::InternalError,
57 obs::sink::{MetricsSink, with_metrics_sink},
58 traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
59 types::Id,
60};
61use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
62
63pub type PagedLoadExecution<E> = (Response<E>, Option<Vec<u8>>);
69
70pub type PagedLoadExecutionWithTrace<E> = (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>);
76
77pub struct Db<C: CanisterKind> {
83 store: &'static LocalKey<StoreRegistry>,
84 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
85 _marker: PhantomData<C>,
86}
87
88impl<C: CanisterKind> Db<C> {
89 #[must_use]
90 pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
91 Self::new_with_hooks(store, &[])
92 }
93
94 #[must_use]
95 pub const fn new_with_hooks(
96 store: &'static LocalKey<StoreRegistry>,
97 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
98 ) -> Self {
99 Self {
100 store,
101 entity_runtime_hooks,
102 _marker: PhantomData,
103 }
104 }
105
106 #[must_use]
107 pub(crate) const fn context<E>(&self) -> Context<'_, E>
108 where
109 E: EntityKind<Canister = C> + EntityValue,
110 {
111 Context::new(self)
112 }
113
114 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
119 where
120 E: EntityKind<Canister = C> + EntityValue,
121 {
122 ensure_recovered(self)?;
123
124 Ok(Context::new(self))
125 }
126
127 pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
129 ensure_recovered(self)
130 }
131
132 pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
133 self.store.with(|reg| f(reg))
134 }
135
136 pub fn storage_report(
137 &self,
138 name_to_path: &[(&'static str, &'static str)],
139 ) -> Result<StorageReport, InternalError> {
140 diagnostics::storage_report(self, name_to_path)
141 }
142
143 pub(in crate::db) fn prepare_row_commit_op(
144 &self,
145 op: &CommitRowOp,
146 ) -> Result<PreparedRowCommitOp, InternalError> {
147 let hooks = self
148 .entity_runtime_hooks
149 .iter()
150 .find(|hooks| hooks.entity_path == op.entity_path.as_str())
151 .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
152
153 (hooks.prepare_row_commit)(self, op)
154 }
155
156 pub(crate) fn validate_delete_strong_relations(
158 &self,
159 target_path: &str,
160 deleted_target_keys: &BTreeSet<RawDataKey>,
161 ) -> Result<(), InternalError> {
162 if deleted_target_keys.is_empty() {
163 return Ok(());
164 }
165
166 for hooks in self.entity_runtime_hooks {
167 (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
168 }
169
170 Ok(())
171 }
172}
173
174pub struct EntityRuntimeHooks<C: CanisterKind> {
182 pub(crate) entity_name: &'static str,
183 pub(crate) entity_path: &'static str,
184 pub(in crate::db) prepare_row_commit:
185 fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
186 pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
187}
188
189impl<C: CanisterKind> EntityRuntimeHooks<C> {
190 #[must_use]
191 pub(in crate::db) const fn new(
192 entity_name: &'static str,
193 entity_path: &'static str,
194 prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
195 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
196 ) -> Self {
197 Self {
198 entity_name,
199 entity_path,
200 prepare_row_commit,
201 validate_delete_strong_relations,
202 }
203 }
204
205 #[must_use]
206 pub const fn for_entity<E>(
207 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
208 ) -> Self
209 where
210 E: EntityKind<Canister = C> + EntityValue,
211 {
212 Self::new(
213 <E as EntityIdentity>::ENTITY_NAME,
214 E::PATH,
215 prepare_row_commit_for_entity::<E>,
216 validate_delete_strong_relations,
217 )
218 }
219}
220
221impl<C: CanisterKind> Db<C> {
222 #[must_use]
223 pub(crate) const fn has_runtime_hooks(&self) -> bool {
224 !self.entity_runtime_hooks.is_empty()
225 }
226
227 pub(crate) fn runtime_hook_for_entity_name(
228 &self,
229 entity_name: &str,
230 ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
231 let mut matched = None;
232 for hooks in self.entity_runtime_hooks {
233 if hooks.entity_name != entity_name {
234 continue;
235 }
236
237 if matched.is_some() {
238 return Err(InternalError::store_invariant(format!(
239 "duplicate runtime hooks for entity name '{entity_name}'"
240 )));
241 }
242
243 matched = Some(hooks);
244 }
245
246 matched.ok_or_else(|| {
247 InternalError::store_unsupported(format!(
248 "unsupported entity name in data store: '{entity_name}'"
249 ))
250 })
251 }
252}
253
254impl<C: CanisterKind> Copy for Db<C> {}
255
256impl<C: CanisterKind> Clone for Db<C> {
257 fn clone(&self) -> Self {
258 *self
259 }
260}
261
262pub struct DbSession<C: CanisterKind> {
269 db: Db<C>,
270 debug: bool,
271 metrics: Option<&'static dyn MetricsSink>,
272}
273
274impl<C: CanisterKind> DbSession<C> {
275 #[must_use]
276 pub const fn new(db: Db<C>) -> Self {
277 Self {
278 db,
279 debug: false,
280 metrics: None,
281 }
282 }
283
284 #[must_use]
285 pub const fn debug(mut self) -> Self {
286 self.debug = true;
287 self
288 }
289
290 #[must_use]
291 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
292 self.metrics = Some(sink);
293 self
294 }
295
296 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
297 if let Some(sink) = self.metrics {
298 with_metrics_sink(sink, f)
299 } else {
300 f()
301 }
302 }
303
304 fn execute_save_with<E, T, R>(
306 &self,
307 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
308 map: impl FnOnce(T) -> R,
309 ) -> Result<R, InternalError>
310 where
311 E: EntityKind<Canister = C> + EntityValue,
312 {
313 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
314
315 Ok(map(value))
316 }
317
318 fn execute_save_entity<E>(
320 &self,
321 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
322 ) -> Result<WriteResponse<E>, InternalError>
323 where
324 E: EntityKind<Canister = C> + EntityValue,
325 {
326 self.execute_save_with(op, WriteResponse::new)
327 }
328
329 fn execute_save_batch<E>(
330 &self,
331 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
332 ) -> Result<WriteBatchResponse<E>, InternalError>
333 where
334 E: EntityKind<Canister = C> + EntityValue,
335 {
336 self.execute_save_with(op, WriteBatchResponse::new)
337 }
338
339 fn execute_save_view<E>(
340 &self,
341 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
342 ) -> Result<E::ViewType, InternalError>
343 where
344 E: EntityKind<Canister = C> + EntityValue,
345 {
346 self.execute_save_with(op, std::convert::identity)
347 }
348
349 #[must_use]
354 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
355 where
356 E: EntityKind<Canister = C>,
357 {
358 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
359 }
360
361 #[must_use]
362 pub const fn load_with_consistency<E>(
363 &self,
364 consistency: ReadConsistency,
365 ) -> SessionLoadQuery<'_, C, E>
366 where
367 E: EntityKind<Canister = C>,
368 {
369 SessionLoadQuery::new(self, Query::new(consistency))
370 }
371
372 #[must_use]
373 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
374 where
375 E: EntityKind<Canister = C>,
376 {
377 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
378 }
379
380 #[must_use]
381 pub fn delete_with_consistency<E>(
382 &self,
383 consistency: ReadConsistency,
384 ) -> SessionDeleteQuery<'_, C, E>
385 where
386 E: EntityKind<Canister = C>,
387 {
388 SessionDeleteQuery::new(self, Query::new(consistency).delete())
389 }
390
391 #[must_use]
396 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
397 where
398 E: EntityKind<Canister = C> + EntityValue,
399 {
400 LoadExecutor::new(self.db, self.debug)
401 }
402
403 #[must_use]
404 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
405 where
406 E: EntityKind<Canister = C> + EntityValue,
407 {
408 DeleteExecutor::new(self.db, self.debug)
409 }
410
411 #[must_use]
412 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
413 where
414 E: EntityKind<Canister = C> + EntityValue,
415 {
416 SaveExecutor::new(self.db, self.debug)
417 }
418
419 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
424 where
425 E: EntityKind<Canister = C> + EntityValue,
426 {
427 let plan = query.plan()?;
428
429 let result = match query.mode() {
430 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
431 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
432 };
433
434 result.map_err(QueryError::Execute)
435 }
436
437 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
438 where
439 E: EntityKind<Canister = C> + EntityValue,
440 {
441 let plan = query.plan()?;
442
443 self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
444 .map_err(QueryError::Execute)
445 }
446
447 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
448 where
449 E: EntityKind<Canister = C> + EntityValue,
450 {
451 let plan = query.plan()?;
452
453 self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
454 .map_err(QueryError::Execute)
455 }
456
457 pub(crate) fn execute_load_query_min<E>(
458 &self,
459 query: &Query<E>,
460 ) -> Result<Option<Id<E>>, QueryError>
461 where
462 E: EntityKind<Canister = C> + EntityValue,
463 {
464 let plan = query.plan()?;
465
466 self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
467 .map_err(QueryError::Execute)
468 }
469
470 pub(crate) fn execute_load_query_max<E>(
471 &self,
472 query: &Query<E>,
473 ) -> Result<Option<Id<E>>, QueryError>
474 where
475 E: EntityKind<Canister = C> + EntityValue,
476 {
477 let plan = query.plan()?;
478
479 self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
480 .map_err(QueryError::Execute)
481 }
482
483 pub(crate) fn execute_load_query_paged_with_trace<E>(
484 &self,
485 query: &Query<E>,
486 cursor_token: Option<&str>,
487 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
488 where
489 E: EntityKind<Canister = C> + EntityValue,
490 {
491 let plan = query.plan()?;
492 let cursor_bytes = match cursor_token {
493 Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
494 QueryError::from(PlanError::from(
495 crate::db::query::plan::CursorPlanError::InvalidContinuationCursor { reason },
496 ))
497 })?),
498 None => None,
499 };
500 let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
501
502 let (page, trace) = self
503 .with_metrics(|| {
504 self.load_executor::<E>()
505 .execute_paged_with_cursor_traced(plan, cursor)
506 })
507 .map_err(QueryError::Execute)?;
508
509 Ok((page.items, page.next_cursor, trace))
510 }
511
512 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
517 where
518 E: EntityKind<Canister = C> + EntityValue,
519 {
520 self.execute_save_entity(|save| save.insert(entity))
521 }
522
523 pub fn insert_many_atomic<E>(
529 &self,
530 entities: impl IntoIterator<Item = E>,
531 ) -> Result<WriteBatchResponse<E>, InternalError>
532 where
533 E: EntityKind<Canister = C> + EntityValue,
534 {
535 self.execute_save_batch(|save| save.insert_many_atomic(entities))
536 }
537
538 pub fn insert_many_non_atomic<E>(
542 &self,
543 entities: impl IntoIterator<Item = E>,
544 ) -> Result<WriteBatchResponse<E>, InternalError>
545 where
546 E: EntityKind<Canister = C> + EntityValue,
547 {
548 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
549 }
550
551 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
552 where
553 E: EntityKind<Canister = C> + EntityValue,
554 {
555 self.execute_save_entity(|save| save.replace(entity))
556 }
557
558 pub fn replace_many_atomic<E>(
564 &self,
565 entities: impl IntoIterator<Item = E>,
566 ) -> Result<WriteBatchResponse<E>, InternalError>
567 where
568 E: EntityKind<Canister = C> + EntityValue,
569 {
570 self.execute_save_batch(|save| save.replace_many_atomic(entities))
571 }
572
573 pub fn replace_many_non_atomic<E>(
577 &self,
578 entities: impl IntoIterator<Item = E>,
579 ) -> Result<WriteBatchResponse<E>, InternalError>
580 where
581 E: EntityKind<Canister = C> + EntityValue,
582 {
583 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
584 }
585
586 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
587 where
588 E: EntityKind<Canister = C> + EntityValue,
589 {
590 self.execute_save_entity(|save| save.update(entity))
591 }
592
593 pub fn update_many_atomic<E>(
599 &self,
600 entities: impl IntoIterator<Item = E>,
601 ) -> Result<WriteBatchResponse<E>, InternalError>
602 where
603 E: EntityKind<Canister = C> + EntityValue,
604 {
605 self.execute_save_batch(|save| save.update_many_atomic(entities))
606 }
607
608 pub fn update_many_non_atomic<E>(
612 &self,
613 entities: impl IntoIterator<Item = E>,
614 ) -> Result<WriteBatchResponse<E>, InternalError>
615 where
616 E: EntityKind<Canister = C> + EntityValue,
617 {
618 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
619 }
620
621 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
622 where
623 E: EntityKind<Canister = C> + EntityValue,
624 {
625 self.execute_save_view::<E>(|save| save.insert_view(view))
626 }
627
628 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
629 where
630 E: EntityKind<Canister = C> + EntityValue,
631 {
632 self.execute_save_view::<E>(|save| save.replace_view(view))
633 }
634
635 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
636 where
637 E: EntityKind<Canister = C> + EntityValue,
638 {
639 self.execute_save_view::<E>(|save| save.update_view(view))
640 }
641
642 #[cfg(test)]
644 #[doc(hidden)]
645 pub fn clear_stores_for_tests(&self) {
646 self.db.with_store_registry(|reg| {
647 for (_, store) in reg.iter() {
648 store.with_data_mut(DataStore::clear);
649 store.with_index_mut(IndexStore::clear);
650 }
651 });
652 }
653}