1pub(crate) mod cursor;
3pub(crate) mod diagnostics;
4pub(crate) mod identity;
5pub(crate) mod query;
6pub(crate) mod registry;
7pub(crate) mod response;
8
9pub(in crate::db) mod codec;
10pub(in crate::db) mod commit;
11pub(in crate::db) mod data;
12pub(in crate::db) mod entity_decode;
13pub(in crate::db) mod executor;
14pub(in crate::db) mod index;
15pub(in crate::db) mod relation;
16
17pub use cursor::encode_cursor;
19pub use data::DataStore;
20pub(crate) use data::StorageKey;
21pub use diagnostics::StorageReport;
22pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
23pub use identity::{EntityName, IndexName};
24pub use index::IndexStore;
25#[cfg(test)]
26pub(crate) use index::hash_value;
27pub use query::{
28 ReadConsistency,
29 builder::field::FieldRef,
30 expr::{FilterExpr, SortExpr},
31 intent::{IntentError, Query, QueryError},
32 plan::{OrderDirection, PlanError},
33 predicate::{
34 CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature, ValidateError,
35 },
36 session::{
37 delete::SessionDeleteQuery,
38 load::{PagedLoadQuery, SessionLoadQuery},
39 },
40};
41pub use registry::StoreRegistry;
42pub use relation::validate_delete_strong_relations_for_source;
43pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
44
45use crate::{
47 db::{
48 commit::{
49 CommitRowOp, PreparedRowCommitOp, ensure_recovered, prepare_row_commit_for_entity,
50 },
51 data::RawDataKey,
52 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
53 query::intent::QueryMode,
54 relation::StrongRelationDeleteValidateFn,
55 },
56 error::InternalError,
57 obs::sink::{MetricsSink, with_metrics_sink},
58 traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
59};
60use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
61
62pub type PagedLoadExecution<E> = (Response<E>, Option<Vec<u8>>);
68
69pub type PagedLoadExecutionWithTrace<E> = (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>);
75
76pub struct Db<C: CanisterKind> {
82 store: &'static LocalKey<StoreRegistry>,
83 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84 _marker: PhantomData<C>,
85}
86
87impl<C: CanisterKind> Db<C> {
88 #[must_use]
89 pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
90 Self::new_with_hooks(store, &[])
91 }
92
93 #[must_use]
94 pub const fn new_with_hooks(
95 store: &'static LocalKey<StoreRegistry>,
96 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
97 ) -> Self {
98 Self {
99 store,
100 entity_runtime_hooks,
101 _marker: PhantomData,
102 }
103 }
104
105 #[must_use]
106 pub(crate) const fn context<E>(&self) -> Context<'_, E>
107 where
108 E: EntityKind<Canister = C> + EntityValue,
109 {
110 Context::new(self)
111 }
112
113 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
118 where
119 E: EntityKind<Canister = C> + EntityValue,
120 {
121 ensure_recovered(self)?;
122
123 Ok(Context::new(self))
124 }
125
126 pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
128 ensure_recovered(self)
129 }
130
131 pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
132 self.store.with(|reg| f(reg))
133 }
134
135 pub fn storage_report(
136 &self,
137 name_to_path: &[(&'static str, &'static str)],
138 ) -> Result<StorageReport, InternalError> {
139 diagnostics::storage_report(self, name_to_path)
140 }
141
142 pub(in crate::db) fn prepare_row_commit_op(
143 &self,
144 op: &CommitRowOp,
145 ) -> Result<PreparedRowCommitOp, InternalError> {
146 let hooks = self
147 .entity_runtime_hooks
148 .iter()
149 .find(|hooks| hooks.entity_path == op.entity_path.as_str())
150 .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
151
152 (hooks.prepare_row_commit)(self, op)
153 }
154
155 pub(crate) fn validate_delete_strong_relations(
157 &self,
158 target_path: &str,
159 deleted_target_keys: &BTreeSet<RawDataKey>,
160 ) -> Result<(), InternalError> {
161 if deleted_target_keys.is_empty() {
162 return Ok(());
163 }
164
165 for hooks in self.entity_runtime_hooks {
166 (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
167 }
168
169 Ok(())
170 }
171}
172
173pub struct EntityRuntimeHooks<C: CanisterKind> {
181 pub(crate) entity_name: &'static str,
182 pub(crate) entity_path: &'static str,
183 pub(in crate::db) prepare_row_commit:
184 fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
185 pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
186}
187
188impl<C: CanisterKind> EntityRuntimeHooks<C> {
189 #[must_use]
190 pub(in crate::db) const fn new(
191 entity_name: &'static str,
192 entity_path: &'static str,
193 prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
194 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
195 ) -> Self {
196 Self {
197 entity_name,
198 entity_path,
199 prepare_row_commit,
200 validate_delete_strong_relations,
201 }
202 }
203
204 #[must_use]
205 pub const fn for_entity<E>(
206 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
207 ) -> Self
208 where
209 E: EntityKind<Canister = C> + EntityValue,
210 {
211 Self::new(
212 <E as EntityIdentity>::ENTITY_NAME,
213 E::PATH,
214 prepare_row_commit_for_entity::<E>,
215 validate_delete_strong_relations,
216 )
217 }
218}
219
220impl<C: CanisterKind> Db<C> {
221 #[must_use]
222 pub(crate) const fn has_runtime_hooks(&self) -> bool {
223 !self.entity_runtime_hooks.is_empty()
224 }
225
226 pub(crate) fn runtime_hook_for_entity_name(
227 &self,
228 entity_name: &str,
229 ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
230 let mut matched = None;
231 for hooks in self.entity_runtime_hooks {
232 if hooks.entity_name != entity_name {
233 continue;
234 }
235
236 if matched.is_some() {
237 return Err(InternalError::store_invariant(format!(
238 "duplicate runtime hooks for entity name '{entity_name}'"
239 )));
240 }
241
242 matched = Some(hooks);
243 }
244
245 matched.ok_or_else(|| {
246 InternalError::store_unsupported(format!(
247 "unsupported entity name in data store: '{entity_name}'"
248 ))
249 })
250 }
251}
252
253impl<C: CanisterKind> Copy for Db<C> {}
254
255impl<C: CanisterKind> Clone for Db<C> {
256 fn clone(&self) -> Self {
257 *self
258 }
259}
260
261pub struct DbSession<C: CanisterKind> {
267 db: Db<C>,
268 debug: bool,
269 metrics: Option<&'static dyn MetricsSink>,
270}
271
272impl<C: CanisterKind> DbSession<C> {
273 #[must_use]
274 pub const fn new(db: Db<C>) -> Self {
275 Self {
276 db,
277 debug: false,
278 metrics: None,
279 }
280 }
281
282 #[must_use]
283 pub const fn debug(mut self) -> Self {
284 self.debug = true;
285 self
286 }
287
288 #[must_use]
289 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
290 self.metrics = Some(sink);
291 self
292 }
293
294 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
295 if let Some(sink) = self.metrics {
296 with_metrics_sink(sink, f)
297 } else {
298 f()
299 }
300 }
301
302 fn execute_save_with<E, T, R>(
304 &self,
305 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
306 map: impl FnOnce(T) -> R,
307 ) -> Result<R, InternalError>
308 where
309 E: EntityKind<Canister = C> + EntityValue,
310 {
311 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
312
313 Ok(map(value))
314 }
315
316 fn execute_save_entity<E>(
318 &self,
319 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
320 ) -> Result<WriteResponse<E>, InternalError>
321 where
322 E: EntityKind<Canister = C> + EntityValue,
323 {
324 self.execute_save_with(op, WriteResponse::new)
325 }
326
327 fn execute_save_batch<E>(
328 &self,
329 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
330 ) -> Result<WriteBatchResponse<E>, InternalError>
331 where
332 E: EntityKind<Canister = C> + EntityValue,
333 {
334 self.execute_save_with(op, WriteBatchResponse::new)
335 }
336
337 fn execute_save_view<E>(
338 &self,
339 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
340 ) -> Result<E::ViewType, InternalError>
341 where
342 E: EntityKind<Canister = C> + EntityValue,
343 {
344 self.execute_save_with(op, std::convert::identity)
345 }
346
347 #[must_use]
352 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
353 where
354 E: EntityKind<Canister = C>,
355 {
356 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
357 }
358
359 #[must_use]
360 pub const fn load_with_consistency<E>(
361 &self,
362 consistency: ReadConsistency,
363 ) -> SessionLoadQuery<'_, C, E>
364 where
365 E: EntityKind<Canister = C>,
366 {
367 SessionLoadQuery::new(self, Query::new(consistency))
368 }
369
370 #[must_use]
371 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
372 where
373 E: EntityKind<Canister = C>,
374 {
375 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
376 }
377
378 #[must_use]
379 pub fn delete_with_consistency<E>(
380 &self,
381 consistency: ReadConsistency,
382 ) -> SessionDeleteQuery<'_, C, E>
383 where
384 E: EntityKind<Canister = C>,
385 {
386 SessionDeleteQuery::new(self, Query::new(consistency).delete())
387 }
388
389 #[must_use]
394 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
395 where
396 E: EntityKind<Canister = C> + EntityValue,
397 {
398 LoadExecutor::new(self.db, self.debug)
399 }
400
401 #[must_use]
402 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
403 where
404 E: EntityKind<Canister = C> + EntityValue,
405 {
406 DeleteExecutor::new(self.db, self.debug)
407 }
408
409 #[must_use]
410 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
411 where
412 E: EntityKind<Canister = C> + EntityValue,
413 {
414 SaveExecutor::new(self.db, self.debug)
415 }
416
417 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
422 where
423 E: EntityKind<Canister = C> + EntityValue,
424 {
425 let plan = query.plan()?;
426
427 let result = match query.mode() {
428 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
429 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
430 };
431
432 result.map_err(QueryError::Execute)
433 }
434
435 pub(crate) fn execute_load_query_paged_with_trace<E>(
436 &self,
437 query: &Query<E>,
438 cursor_token: Option<&str>,
439 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
440 where
441 E: EntityKind<Canister = C> + EntityValue,
442 {
443 let plan = query.plan()?;
444 let cursor_bytes = match cursor_token {
445 Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
446 QueryError::from(PlanError::from(
447 crate::db::query::plan::CursorPlanError::InvalidContinuationCursor { reason },
448 ))
449 })?),
450 None => None,
451 };
452 let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
453
454 let (page, trace) = self
455 .with_metrics(|| {
456 self.load_executor::<E>()
457 .execute_paged_with_cursor_traced(plan, cursor)
458 })
459 .map_err(QueryError::Execute)?;
460
461 Ok((page.items, page.next_cursor, trace))
462 }
463
464 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
469 where
470 E: EntityKind<Canister = C> + EntityValue,
471 {
472 self.execute_save_entity(|save| save.insert(entity))
473 }
474
475 pub fn insert_many_atomic<E>(
481 &self,
482 entities: impl IntoIterator<Item = E>,
483 ) -> Result<WriteBatchResponse<E>, InternalError>
484 where
485 E: EntityKind<Canister = C> + EntityValue,
486 {
487 self.execute_save_batch(|save| save.insert_many_atomic(entities))
488 }
489
490 pub fn insert_many_non_atomic<E>(
494 &self,
495 entities: impl IntoIterator<Item = E>,
496 ) -> Result<WriteBatchResponse<E>, InternalError>
497 where
498 E: EntityKind<Canister = C> + EntityValue,
499 {
500 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
501 }
502
503 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
504 where
505 E: EntityKind<Canister = C> + EntityValue,
506 {
507 self.execute_save_entity(|save| save.replace(entity))
508 }
509
510 pub fn replace_many_atomic<E>(
516 &self,
517 entities: impl IntoIterator<Item = E>,
518 ) -> Result<WriteBatchResponse<E>, InternalError>
519 where
520 E: EntityKind<Canister = C> + EntityValue,
521 {
522 self.execute_save_batch(|save| save.replace_many_atomic(entities))
523 }
524
525 pub fn replace_many_non_atomic<E>(
529 &self,
530 entities: impl IntoIterator<Item = E>,
531 ) -> Result<WriteBatchResponse<E>, InternalError>
532 where
533 E: EntityKind<Canister = C> + EntityValue,
534 {
535 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
536 }
537
538 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
539 where
540 E: EntityKind<Canister = C> + EntityValue,
541 {
542 self.execute_save_entity(|save| save.update(entity))
543 }
544
545 pub fn update_many_atomic<E>(
551 &self,
552 entities: impl IntoIterator<Item = E>,
553 ) -> Result<WriteBatchResponse<E>, InternalError>
554 where
555 E: EntityKind<Canister = C> + EntityValue,
556 {
557 self.execute_save_batch(|save| save.update_many_atomic(entities))
558 }
559
560 pub fn update_many_non_atomic<E>(
564 &self,
565 entities: impl IntoIterator<Item = E>,
566 ) -> Result<WriteBatchResponse<E>, InternalError>
567 where
568 E: EntityKind<Canister = C> + EntityValue,
569 {
570 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
571 }
572
573 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
574 where
575 E: EntityKind<Canister = C> + EntityValue,
576 {
577 self.execute_save_view::<E>(|save| save.insert_view(view))
578 }
579
580 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
581 where
582 E: EntityKind<Canister = C> + EntityValue,
583 {
584 self.execute_save_view::<E>(|save| save.replace_view(view))
585 }
586
587 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
588 where
589 E: EntityKind<Canister = C> + EntityValue,
590 {
591 self.execute_save_view::<E>(|save| save.update_view(view))
592 }
593
594 #[cfg(test)]
596 #[doc(hidden)]
597 pub fn clear_stores_for_tests(&self) {
598 self.db.with_store_registry(|reg| {
599 for (_, store) in reg.iter() {
600 store.with_data_mut(DataStore::clear);
601 store.with_index_mut(IndexStore::clear);
602 }
603 });
604 }
605}