1pub(crate) mod access;
3pub(crate) mod contracts;
4pub(crate) mod cursor;
5pub(crate) mod diagnostics;
6pub(crate) mod identity;
7pub(crate) mod predicate;
8pub(crate) mod query;
9pub(crate) mod registry;
10pub(crate) mod response;
11pub(crate) mod session;
12
13pub(in crate::db) mod codec;
14pub(in crate::db) mod commit;
15pub(in crate::db) mod data;
16pub(in crate::db) mod direction;
17pub(in crate::db) mod executor;
18pub(in crate::db) mod index;
19pub(in crate::db) mod relation;
20
21pub use codec::cursor::{decode_cursor, encode_cursor};
23pub use data::DataStore;
24pub(crate) use data::StorageKey;
25pub use diagnostics::StorageReport;
26pub use executor::{ExecutionAccessPathVariant, ExecutionOptimization, ExecutionTrace};
27pub use identity::{EntityName, IndexName};
28pub use index::IndexStore;
29pub use predicate::MissingRowPolicy;
30pub use predicate::ValidateError;
31pub use predicate::{CoercionId, CompareOp, ComparePredicate, Predicate, UnsupportedQueryFeature};
32pub use query::{
33 builder::field::FieldRef,
34 expr::{FilterExpr, SortExpr},
35 fluent::{
36 delete::FluentDeleteQuery,
37 load::{FluentLoadQuery, PagedLoadQuery},
38 },
39 intent::{CompiledQuery, DeleteSpec, IntentError, LoadSpec, Query, QueryError, QueryMode},
40 plan::{OrderDirection, PlanError},
41};
42pub use registry::StoreRegistry;
43pub use relation::validate_delete_strong_relations_for_source;
44pub use response::{Response, ResponseError, Row, WriteBatchResponse, WriteResponse};
45pub use session::DbSession;
46
47use crate::{
49 db::{
50 commit::{
51 CommitRowOp, CommitSchemaFingerprint, PreparedRowCommitOp,
52 commit_schema_fingerprint_for_entity, ensure_recovered, prepare_row_commit_for_entity,
53 rebuild_secondary_indexes_from_rows, replay_commit_marker_row_ops,
54 },
55 data::RawDataKey,
56 executor::Context,
57 relation::StrongRelationDeleteValidateFn,
58 },
59 error::InternalError,
60 traits::{CanisterKind, EntityIdentity, EntityKind, EntityValue},
61 value::Value,
62};
63use std::{collections::BTreeSet, marker::PhantomData, thread::LocalKey};
64
65#[derive(Debug)]
72pub struct PagedLoadExecution<E: EntityKind> {
73 response: Response<E>,
74 continuation_cursor: Option<Vec<u8>>,
75}
76
77impl<E: EntityKind> PagedLoadExecution<E> {
78 #[must_use]
80 pub const fn new(response: Response<E>, continuation_cursor: Option<Vec<u8>>) -> Self {
81 Self {
82 response,
83 continuation_cursor,
84 }
85 }
86
87 #[must_use]
89 pub const fn response(&self) -> &Response<E> {
90 &self.response
91 }
92
93 #[must_use]
95 pub fn continuation_cursor(&self) -> Option<&[u8]> {
96 self.continuation_cursor.as_deref()
97 }
98
99 #[must_use]
101 pub fn into_parts(self) -> (Response<E>, Option<Vec<u8>>) {
102 (self.response, self.continuation_cursor)
103 }
104}
105
106impl<E: EntityKind> From<(Response<E>, Option<Vec<u8>>)> for PagedLoadExecution<E> {
107 fn from(value: (Response<E>, Option<Vec<u8>>)) -> Self {
108 let (response, continuation_cursor) = value;
109
110 Self::new(response, continuation_cursor)
111 }
112}
113
114impl<E: EntityKind> From<PagedLoadExecution<E>> for (Response<E>, Option<Vec<u8>>) {
115 fn from(value: PagedLoadExecution<E>) -> Self {
116 value.into_parts()
117 }
118}
119
120#[derive(Debug)]
127pub struct PagedLoadExecutionWithTrace<E: EntityKind> {
128 execution: PagedLoadExecution<E>,
129 execution_trace: Option<ExecutionTrace>,
130}
131
132impl<E: EntityKind> PagedLoadExecutionWithTrace<E> {
133 #[must_use]
135 pub const fn new(
136 response: Response<E>,
137 continuation_cursor: Option<Vec<u8>>,
138 execution_trace: Option<ExecutionTrace>,
139 ) -> Self {
140 Self {
141 execution: PagedLoadExecution::new(response, continuation_cursor),
142 execution_trace,
143 }
144 }
145
146 #[must_use]
148 pub const fn execution(&self) -> &PagedLoadExecution<E> {
149 &self.execution
150 }
151
152 #[must_use]
154 pub const fn response(&self) -> &Response<E> {
155 self.execution.response()
156 }
157
158 #[must_use]
160 pub fn continuation_cursor(&self) -> Option<&[u8]> {
161 self.execution.continuation_cursor()
162 }
163
164 #[must_use]
166 pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
167 self.execution_trace.as_ref()
168 }
169
170 #[must_use]
172 pub fn into_execution(self) -> PagedLoadExecution<E> {
173 self.execution
174 }
175
176 #[must_use]
178 pub fn into_parts(self) -> (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>) {
179 let (response, continuation_cursor) = self.execution.into_parts();
180
181 (response, continuation_cursor, self.execution_trace)
182 }
183}
184
185impl<E: EntityKind> From<(Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>)>
186 for PagedLoadExecutionWithTrace<E>
187{
188 fn from(value: (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>)) -> Self {
189 let (response, continuation_cursor, execution_trace) = value;
190
191 Self::new(response, continuation_cursor, execution_trace)
192 }
193}
194
195impl<E: EntityKind> From<PagedLoadExecutionWithTrace<E>>
196 for (Response<E>, Option<Vec<u8>>, Option<ExecutionTrace>)
197{
198 fn from(value: PagedLoadExecutionWithTrace<E>) -> Self {
199 value.into_parts()
200 }
201}
202
203#[derive(Clone, Debug, Eq, PartialEq)]
210pub struct GroupedRow {
211 group_key: Vec<Value>,
212 aggregate_values: Vec<Value>,
213}
214
215impl GroupedRow {
216 #[must_use]
218 pub const fn new(group_key: Vec<Value>, aggregate_values: Vec<Value>) -> Self {
219 Self {
220 group_key,
221 aggregate_values,
222 }
223 }
224
225 #[must_use]
227 pub const fn group_key(&self) -> &[Value] {
228 self.group_key.as_slice()
229 }
230
231 #[must_use]
233 pub const fn aggregate_values(&self) -> &[Value] {
234 self.aggregate_values.as_slice()
235 }
236}
237
238#[derive(Debug)]
244pub struct PagedGroupedExecution {
245 rows: Vec<GroupedRow>,
246 continuation_cursor: Option<Vec<u8>>,
247}
248
249impl PagedGroupedExecution {
250 #[must_use]
252 pub const fn new(rows: Vec<GroupedRow>, continuation_cursor: Option<Vec<u8>>) -> Self {
253 Self {
254 rows,
255 continuation_cursor,
256 }
257 }
258
259 #[must_use]
261 pub const fn rows(&self) -> &[GroupedRow] {
262 self.rows.as_slice()
263 }
264
265 #[must_use]
267 pub fn continuation_cursor(&self) -> Option<&[u8]> {
268 self.continuation_cursor.as_deref()
269 }
270
271 #[must_use]
273 pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>) {
274 (self.rows, self.continuation_cursor)
275 }
276}
277
278#[derive(Debug)]
284pub struct PagedGroupedExecutionWithTrace {
285 execution: PagedGroupedExecution,
286 execution_trace: Option<ExecutionTrace>,
287}
288
289impl PagedGroupedExecutionWithTrace {
290 #[must_use]
292 pub const fn new(
293 rows: Vec<GroupedRow>,
294 continuation_cursor: Option<Vec<u8>>,
295 execution_trace: Option<ExecutionTrace>,
296 ) -> Self {
297 Self {
298 execution: PagedGroupedExecution::new(rows, continuation_cursor),
299 execution_trace,
300 }
301 }
302
303 #[must_use]
305 pub const fn execution(&self) -> &PagedGroupedExecution {
306 &self.execution
307 }
308
309 #[must_use]
311 pub const fn rows(&self) -> &[GroupedRow] {
312 self.execution.rows()
313 }
314
315 #[must_use]
317 pub fn continuation_cursor(&self) -> Option<&[u8]> {
318 self.execution.continuation_cursor()
319 }
320
321 #[must_use]
323 pub const fn execution_trace(&self) -> Option<&ExecutionTrace> {
324 self.execution_trace.as_ref()
325 }
326
327 #[must_use]
329 pub fn into_execution(self) -> PagedGroupedExecution {
330 self.execution
331 }
332
333 #[must_use]
335 pub fn into_parts(self) -> (Vec<GroupedRow>, Option<Vec<u8>>, Option<ExecutionTrace>) {
336 let (rows, continuation_cursor) = self.execution.into_parts();
337
338 (rows, continuation_cursor, self.execution_trace)
339 }
340}
341
342pub struct Db<C: CanisterKind> {
348 store: &'static LocalKey<StoreRegistry>,
349 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
350 _marker: PhantomData<C>,
351}
352
353impl<C: CanisterKind> Db<C> {
354 #[must_use]
355 pub const fn new(store: &'static LocalKey<StoreRegistry>) -> Self {
356 Self::new_with_hooks(store, &[])
357 }
358
359 #[must_use]
360 pub const fn new_with_hooks(
361 store: &'static LocalKey<StoreRegistry>,
362 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
363 ) -> Self {
364 Self {
365 store,
366 entity_runtime_hooks,
367 _marker: PhantomData,
368 }
369 }
370
371 #[must_use]
372 pub(crate) const fn context<E>(&self) -> Context<'_, E>
373 where
374 E: EntityKind<Canister = C> + EntityValue,
375 {
376 Context::new(self)
377 }
378
379 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
384 where
385 E: EntityKind<Canister = C> + EntityValue,
386 {
387 ensure_recovered(self)?;
388
389 Ok(Context::new(self))
390 }
391
392 pub(crate) fn ensure_recovered_state(&self) -> Result<(), InternalError> {
394 ensure_recovered(self)
395 }
396
397 pub(crate) fn with_store_registry<R>(&self, f: impl FnOnce(&StoreRegistry) -> R) -> R {
398 self.store.with(|reg| f(reg))
399 }
400
401 pub fn storage_report(
402 &self,
403 name_to_path: &[(&'static str, &'static str)],
404 ) -> Result<StorageReport, InternalError> {
405 diagnostics::storage_report(self, name_to_path)
406 }
407
408 pub(in crate::db) fn prepare_row_commit_op(
409 &self,
410 op: &CommitRowOp,
411 ) -> Result<PreparedRowCommitOp, InternalError> {
412 let hooks = self.runtime_hook_for_entity_path(op.entity_path.as_str())?;
413
414 (hooks.prepare_row_commit)(self, op)
415 }
416
417 pub(in crate::db) fn replay_commit_marker_row_ops(
418 &self,
419 row_ops: &[CommitRowOp],
420 ) -> Result<(), InternalError> {
421 replay_commit_marker_row_ops(self, row_ops)
422 }
423
424 pub(in crate::db) fn rebuild_secondary_indexes_from_rows(&self) -> Result<(), InternalError> {
425 rebuild_secondary_indexes_from_rows(self)
426 }
427
428 pub(crate) fn validate_delete_strong_relations(
430 &self,
431 target_path: &str,
432 deleted_target_keys: &BTreeSet<RawDataKey>,
433 ) -> Result<(), InternalError> {
434 if deleted_target_keys.is_empty() {
435 return Ok(());
436 }
437
438 for hooks in self.entity_runtime_hooks {
439 (hooks.validate_delete_strong_relations)(self, target_path, deleted_target_keys)?;
440 }
441
442 Ok(())
443 }
444}
445
446pub struct EntityRuntimeHooks<C: CanisterKind> {
454 pub(crate) entity_name: &'static str,
455 pub(crate) entity_path: &'static str,
456 pub(in crate::db) commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
457 pub(in crate::db) prepare_row_commit:
458 fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
459 pub(crate) validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
460}
461
462impl<C: CanisterKind> EntityRuntimeHooks<C> {
463 #[must_use]
464 pub(in crate::db) const fn new(
465 entity_name: &'static str,
466 entity_path: &'static str,
467 commit_schema_fingerprint: fn() -> CommitSchemaFingerprint,
468 prepare_row_commit: fn(&Db<C>, &CommitRowOp) -> Result<PreparedRowCommitOp, InternalError>,
469 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
470 ) -> Self {
471 Self {
472 entity_name,
473 entity_path,
474 commit_schema_fingerprint,
475 prepare_row_commit,
476 validate_delete_strong_relations,
477 }
478 }
479
480 #[must_use]
481 pub const fn for_entity<E>(
482 validate_delete_strong_relations: StrongRelationDeleteValidateFn<C>,
483 ) -> Self
484 where
485 E: EntityKind<Canister = C> + EntityValue,
486 {
487 Self::new(
488 <E as EntityIdentity>::ENTITY_NAME,
489 E::PATH,
490 commit_schema_fingerprint_for_runtime_entity::<E>,
491 prepare_row_commit_for_entity::<E>,
492 validate_delete_strong_relations,
493 )
494 }
495}
496
497fn commit_schema_fingerprint_for_runtime_entity<E>() -> CommitSchemaFingerprint
498where
499 E: EntityKind,
500{
501 commit_schema_fingerprint_for_entity::<E>()
502}
503
504impl<C: CanisterKind> Db<C> {
505 #[must_use]
506 pub(crate) const fn has_runtime_hooks(&self) -> bool {
507 !self.entity_runtime_hooks.is_empty()
508 }
509
510 pub(crate) fn runtime_hook_for_entity_name(
513 &self,
514 entity_name: &str,
515 ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
516 let mut matched = None;
517 for hooks in self.entity_runtime_hooks {
518 if hooks.entity_name != entity_name {
519 continue;
520 }
521
522 if matched.is_some() {
523 return Err(InternalError::store_invariant(format!(
524 "duplicate runtime hooks for entity name '{entity_name}'"
525 )));
526 }
527
528 matched = Some(hooks);
529 }
530
531 matched.ok_or_else(|| {
532 InternalError::store_unsupported(format!(
533 "unsupported entity name in data store: '{entity_name}'"
534 ))
535 })
536 }
537
538 pub(crate) fn runtime_hook_for_entity_path(
541 &self,
542 entity_path: &str,
543 ) -> Result<&EntityRuntimeHooks<C>, InternalError> {
544 let mut matched = None;
545 for hooks in self.entity_runtime_hooks {
546 if hooks.entity_path != entity_path {
547 continue;
548 }
549
550 if matched.is_some() {
551 return Err(InternalError::store_invariant(format!(
552 "duplicate runtime hooks for entity path '{entity_path}'"
553 )));
554 }
555
556 matched = Some(hooks);
557 }
558
559 matched.ok_or_else(|| InternalError::unsupported_entity_path(entity_path))
560 }
561}
562
563impl<C: CanisterKind> Copy for Db<C> {}
564
565impl<C: CanisterKind> Clone for Db<C> {
566 fn clone(&self) -> Self {
567 *self
568 }
569}