1pub(crate) mod cursor;
3pub(crate) mod diagnostics;
4pub(crate) mod identity;
5pub(crate) mod query;
6pub(crate) mod registry;
7pub(crate) mod response;
8
9pub(in crate::db) mod codec;
10pub(in crate::db) mod commit;
11pub(in crate::db) mod data;
12pub(in crate::db) mod entity_decode;
13pub(in crate::db) mod executor;
14pub(in crate::db) mod index;
15pub(in crate::db) mod relation;
16
17pub use cursor::encode_cursor;
19pub use data::DataStore;
20pub(crate) use data::StorageKey;
21pub use diagnostics::StorageReport;
22pub use executor::{
23 ExecutionAccessPathVariant, ExecutionFastPath, ExecutionPushdownType, ExecutionTrace,
24};
25pub use identity::{EntityName, IndexName};
26pub use index::IndexStore;
27#[cfg(test)]
28pub(crate) use index::hash_value;
29pub use query::{
30 ReadConsistency,
31 builder::field::FieldRef,
32 expr::{FilterExpr, SortExpr},
33 intent::{IntentError, Query, QueryError},
34 plan::{OrderDirection, PlanError},
35 predicate::{
36 CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature, ValidateError,
37 },
38 session::{
39 delete::SessionDeleteQuery,
40 load::{PagedLoadQuery, SessionLoadQuery},
41 },
42};
43pub use registry::StoreRegistry;
44pub use relation::validate_delete_strong_relations_for_source;
45pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
46
47use crate::{
49 db::{
50 commit::{
51 CommitRowOp, PreparedRowCommitOp, ensure_recovered, prepare_row_commit_for_entity,
52 },
53 data::RawDataKey,
54 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
55 query::intent::QueryMode,
56 relation::StrongRelationDeleteValidateFn,
57 },
58 error::{ErrorClass, ErrorOrigin, InternalError},
59 obs::sink::{MetricsSink, with_metrics_sink},
60 traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
61};
62use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
63
64pub type PagedLoadExecution<E> = (Response<E>, Option<Vec<u8>>);
70
71pub type PagedLoadExecutionWithTrace<E> = (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>);
77
78pub struct Db<C: CanisterKind> {
84 store: &'static LocalKey<StoreRegistry>,
85 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
86 _marker: PhantomData<C>,
87}
88
89impl<C: CanisterKind> Db<C> {
90 #[must_use]
91 pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
92 Self::new_with_hooks(store, &[])
93 }
94
95 #[must_use]
96 pub const fn new_with_hooks(
97 store: &'static LocalKey<StoreRegistry>,
98 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
99 ) -> Self {
100 Self {
101 store,
102 entity_runtime_hooks,
103 _marker: PhantomData,
104 }
105 }
106
107 #[must_use]
108 pub(crate) const fn context<E>(&self) -> Context<'_, E>
109 where
110 E: EntityKind<Canister = C> + EntityValue,
111 {
112 Context::new(self)
113 }
114
115 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
120 where
121 E: EntityKind<Canister = C> + EntityValue,
122 {
123 ensure_recovered(self)?;
124
125 Ok(Context::new(self))
126 }
127
128 pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
130 ensure_recovered(self)
131 }
132
133 pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
134 self.store.with(|reg| f(reg))
135 }
136
137 pub fn storage_report(
138 &self,
139 name_to_path: &[(&'static str, &'static str)],
140 ) -> Result<StorageReport, InternalError> {
141 diagnostics::storage_report(self, name_to_path)
142 }
143
144 pub(in crate::db) fn prepare_row_commit_op(
145 &self,
146 op: &CommitRowOp,
147 ) -> Result<PreparedRowCommitOp, InternalError> {
148 let hooks = self
149 .entity_runtime_hooks
150 .iter()
151 .find(|hooks| hooks.entity_path == op.entity_path.as_str())
152 .ok_or_else(|| InternalError::unsupported_entity_path(op.entity_path.as_str()))?;
153
154 (hooks.prepare_row_commit)(self, op)
155 }
156
157 pub(crate) fn validate_delete_strong_relations(
159 &self,
160 target_path: &str,
161 deleted_target_keys: &BTreeSet<RawDataKey>,
162 ) -> Result<(), InternalError> {
163 if deleted_target_keys.is_empty() {
164 return Ok(());
165 }
166
167 for hooks in self.entity_runtime_hooks {
168 (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
169 }
170
171 Ok(())
172 }
173}
174
175pub struct EntityRuntimeHooks<C: CanisterKind> {
183 pub(crate) entity_name: &'static str,
184 pub(crate) entity_path: &'static str,
185 pub(in crate::db) prepare_row_commit:
186 fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
187 pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
188}
189
190impl<C: CanisterKind> EntityRuntimeHooks<C> {
191 #[must_use]
192 pub(in crate::db) const fn new(
193 entity_name: &'static str,
194 entity_path: &'static str,
195 prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
196 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
197 ) -> Self {
198 Self {
199 entity_name,
200 entity_path,
201 prepare_row_commit,
202 validate_delete_strong_relations,
203 }
204 }
205
206 #[must_use]
207 pub const fn for_entity<E>(
208 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
209 ) -> Self
210 where
211 E: EntityKind<Canister = C> + EntityValue,
212 {
213 Self::new(
214 <E as EntityIdentity>::ENTITY_NAME,
215 E::PATH,
216 prepare_row_commit_for_entity::<E>,
217 validate_delete_strong_relations,
218 )
219 }
220}
221
222impl<C: CanisterKind> Db<C> {
223 #[must_use]
224 pub(crate) const fn has_runtime_hooks(&self) -> bool {
225 !self.entity_runtime_hooks.is_empty()
226 }
227
228 pub(crate) fn runtime_hook_for_entity_name(
229 &self,
230 entity_name: &str,
231 ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
232 let mut matched = None;
233 for hooks in self.entity_runtime_hooks {
234 if hooks.entity_name != entity_name {
235 continue;
236 }
237
238 if matched.is_some() {
239 return Err(InternalError::new(
240 ErrorClass::InvariantViolation,
241 ErrorOrigin::Store,
242 format!("duplicate runtime hooks for entity name '{entity_name}'"),
243 ));
244 }
245
246 matched = Some(hooks);
247 }
248
249 matched.ok_or_else(|| {
250 InternalError::new(
251 ErrorClass::Unsupported,
252 ErrorOrigin::Store,
253 format!("unsupported entity name in data store: '{entity_name}'"),
254 )
255 })
256 }
257}
258
259impl<C: CanisterKind> Copy for Db<C> {}
260
261impl<C: CanisterKind> Clone for Db<C> {
262 fn clone(&self) -> Self {
263 *self
264 }
265}
266
267pub struct DbSession<C: CanisterKind> {
273 db: Db<C>,
274 debug: bool,
275 metrics: Option<&'static dyn MetricsSink>,
276}
277
278impl<C: CanisterKind> DbSession<C> {
279 #[must_use]
280 pub const fn new(db: Db<C>) -> Self {
281 Self {
282 db,
283 debug: false,
284 metrics: None,
285 }
286 }
287
288 #[must_use]
289 pub const fn debug(mut self) -> Self {
290 self.debug = true;
291 self
292 }
293
294 #[must_use]
295 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
296 self.metrics = Some(sink);
297 self
298 }
299
300 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
301 if let Some(sink) = self.metrics {
302 with_metrics_sink(sink, f)
303 } else {
304 f()
305 }
306 }
307
308 fn execute_save_with<E, T, R>(
310 &self,
311 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
312 map: impl FnOnce(T) -> R,
313 ) -> Result<R, InternalError>
314 where
315 E: EntityKind<Canister = C> + EntityValue,
316 {
317 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
318
319 Ok(map(value))
320 }
321
322 fn execute_save_entity<E>(
324 &self,
325 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
326 ) -> Result<WriteResponse<E>, InternalError>
327 where
328 E: EntityKind<Canister = C> + EntityValue,
329 {
330 self.execute_save_with(op, WriteResponse::new)
331 }
332
333 fn execute_save_batch<E>(
334 &self,
335 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
336 ) -> Result<WriteBatchResponse<E>, InternalError>
337 where
338 E: EntityKind<Canister = C> + EntityValue,
339 {
340 self.execute_save_with(op, WriteBatchResponse::new)
341 }
342
343 fn execute_save_view<E>(
344 &self,
345 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
346 ) -> Result<E::ViewType, InternalError>
347 where
348 E: EntityKind<Canister = C> + EntityValue,
349 {
350 self.execute_save_with(op, std::convert::identity)
351 }
352
353 #[must_use]
358 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
359 where
360 E: EntityKind<Canister = C>,
361 {
362 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
363 }
364
365 #[must_use]
366 pub const fn load_with_consistency<E>(
367 &self,
368 consistency: ReadConsistency,
369 ) -> SessionLoadQuery<'_, C, E>
370 where
371 E: EntityKind<Canister = C>,
372 {
373 SessionLoadQuery::new(self, Query::new(consistency))
374 }
375
376 #[must_use]
377 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
378 where
379 E: EntityKind<Canister = C>,
380 {
381 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
382 }
383
384 #[must_use]
385 pub fn delete_with_consistency<E>(
386 &self,
387 consistency: ReadConsistency,
388 ) -> SessionDeleteQuery<'_, C, E>
389 where
390 E: EntityKind<Canister = C>,
391 {
392 SessionDeleteQuery::new(self, Query::new(consistency).delete())
393 }
394
395 #[must_use]
400 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
401 where
402 E: EntityKind<Canister = C> + EntityValue,
403 {
404 LoadExecutor::new(self.db, self.debug)
405 }
406
407 #[must_use]
408 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
409 where
410 E: EntityKind<Canister = C> + EntityValue,
411 {
412 DeleteExecutor::new(self.db, self.debug)
413 }
414
415 #[must_use]
416 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
417 where
418 E: EntityKind<Canister = C> + EntityValue,
419 {
420 SaveExecutor::new(self.db, self.debug)
421 }
422
423 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
428 where
429 E: EntityKind<Canister = C> + EntityValue,
430 {
431 let plan = query.plan()?;
432
433 let result = match query.mode() {
434 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
435 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
436 };
437
438 result.map_err(QueryError::Execute)
439 }
440
441 pub(crate) fn execute_load_query_paged_with_trace<E>(
442 &self,
443 query: &Query<E>,
444 cursor_token: Option<&str>,
445 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
446 where
447 E: EntityKind<Canister = C> + EntityValue,
448 {
449 let plan = query.plan()?;
450 let cursor_bytes = match cursor_token {
451 Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
452 QueryError::from(PlanError::from(
453 crate::db::query::plan::CursorPlanError::InvalidContinuationCursor { reason },
454 ))
455 })?),
456 None => None,
457 };
458 let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
459
460 let (page, trace) = self
461 .with_metrics(|| {
462 self.load_executor::<E>()
463 .execute_paged_with_cursor_traced(plan, cursor)
464 })
465 .map_err(QueryError::Execute)?;
466
467 Ok((page.items, page.next_cursor, trace))
468 }
469
470 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
475 where
476 E: EntityKind<Canister = C> + EntityValue,
477 {
478 self.execute_save_entity(|save| save.insert(entity))
479 }
480
481 pub fn insert_many_atomic<E>(
487 &self,
488 entities: impl IntoIterator<Item = E>,
489 ) -> Result<WriteBatchResponse<E>, InternalError>
490 where
491 E: EntityKind<Canister = C> + EntityValue,
492 {
493 self.execute_save_batch(|save| save.insert_many_atomic(entities))
494 }
495
496 pub fn insert_many_non_atomic<E>(
500 &self,
501 entities: impl IntoIterator<Item = E>,
502 ) -> Result<WriteBatchResponse<E>, InternalError>
503 where
504 E: EntityKind<Canister = C> + EntityValue,
505 {
506 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
507 }
508
509 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
510 where
511 E: EntityKind<Canister = C> + EntityValue,
512 {
513 self.execute_save_entity(|save| save.replace(entity))
514 }
515
516 pub fn replace_many_atomic<E>(
522 &self,
523 entities: impl IntoIterator<Item = E>,
524 ) -> Result<WriteBatchResponse<E>, InternalError>
525 where
526 E: EntityKind<Canister = C> + EntityValue,
527 {
528 self.execute_save_batch(|save| save.replace_many_atomic(entities))
529 }
530
531 pub fn replace_many_non_atomic<E>(
535 &self,
536 entities: impl IntoIterator<Item = E>,
537 ) -> Result<WriteBatchResponse<E>, InternalError>
538 where
539 E: EntityKind<Canister = C> + EntityValue,
540 {
541 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
542 }
543
544 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
545 where
546 E: EntityKind<Canister = C> + EntityValue,
547 {
548 self.execute_save_entity(|save| save.update(entity))
549 }
550
551 pub fn update_many_atomic<E>(
557 &self,
558 entities: impl IntoIterator<Item = E>,
559 ) -> Result<WriteBatchResponse<E>, InternalError>
560 where
561 E: EntityKind<Canister = C> + EntityValue,
562 {
563 self.execute_save_batch(|save| save.update_many_atomic(entities))
564 }
565
566 pub fn update_many_non_atomic<E>(
570 &self,
571 entities: impl IntoIterator<Item = E>,
572 ) -> Result<WriteBatchResponse<E>, InternalError>
573 where
574 E: EntityKind<Canister = C> + EntityValue,
575 {
576 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
577 }
578
579 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
580 where
581 E: EntityKind<Canister = C> + EntityValue,
582 {
583 self.execute_save_view::<E>(|save| save.insert_view(view))
584 }
585
586 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
587 where
588 E: EntityKind<Canister = C> + EntityValue,
589 {
590 self.execute_save_view::<E>(|save| save.replace_view(view))
591 }
592
593 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
594 where
595 E: EntityKind<Canister = C> + EntityValue,
596 {
597 self.execute_save_view::<E>(|save| save.update_view(view))
598 }
599
600 #[cfg(test)]
602 #[doc(hidden)]
603 pub fn clear_stores_for_tests(&self) {
604 self.db.with_store_registry(|reg| {
605 for (_, store) in reg.iter() {
606 store.with_data_mut(DataStore::clear);
607 store.with_index_mut(IndexStore::clear);
608 }
609 });
610 }
611}