1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8mod write;
9
10pub(crate) use commit::*;
11pub(crate) use write::WriteUnit;
12
13use crate::{
14 db::{
15 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
16 index::{IndexStore, IndexStoreRegistry},
17 query::{
18 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19 diagnostics::{
20 QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
21 start_event, trace_access_from_plan,
22 },
23 },
24 response::Response,
25 store::{DataStore, DataStoreRegistry},
26 },
27 error::InternalError,
28 obs::sink::{self, MetricsSink},
29 traits::{CanisterKind, EntityKind},
30};
31use std::{marker::PhantomData, thread::LocalKey};
32
33#[derive(Clone, Copy, Debug)]
41pub struct EntityRegistryEntry {
42 pub entity_path: &'static str,
43 pub store_path: &'static str,
44}
45
46pub struct Db<C: CanisterKind> {
52 data: &'static LocalKey<DataStoreRegistry>,
53 index: &'static LocalKey<IndexStoreRegistry>,
54 entities: &'static [EntityRegistryEntry],
55 _marker: PhantomData<C>,
56}
57
58impl<C: CanisterKind> Db<C> {
59 #[must_use]
60 pub const fn new(
61 data: &'static LocalKey<DataStoreRegistry>,
62 index: &'static LocalKey<IndexStoreRegistry>,
63 entities: &'static [EntityRegistryEntry],
64 ) -> Self {
65 Self {
66 data,
67 index,
68 entities,
69 _marker: PhantomData,
70 }
71 }
72
73 #[must_use]
74 pub(crate) const fn context<E>(&self) -> Context<'_, E>
75 where
76 E: EntityKind<Canister = C>,
77 {
78 Context::new(self)
79 }
80
81 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
83 where
84 E: EntityKind<Canister = C>,
85 {
86 ensure_recovered(self)?;
87 Ok(Context::new(self))
88 }
89
90 #[cfg(test)]
94 pub fn with_data_store_mut_for_test<R>(
95 &self,
96 path: &'static str,
97 f: impl FnOnce(&mut DataStore) -> R,
98 ) -> Result<R, InternalError> {
99 self.with_data(|reg| reg.with_store_mut(path, f))
100 }
101
102 pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
103 self.data.with(|reg| f(reg))
104 }
105
106 pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
107 self.index.with(|reg| f(reg))
108 }
109
110 pub(crate) const fn entity_registry(&self) -> &'static [EntityRegistryEntry] {
111 self.entities
112 }
113}
114
115impl<C: CanisterKind> Copy for Db<C> {}
116
117impl<C: CanisterKind> Clone for Db<C> {
118 fn clone(&self) -> Self {
119 *self
120 }
121}
122
123pub struct DbSession<C: CanisterKind> {
129 db: Db<C>,
130 debug: bool,
131 metrics: Option<&'static dyn MetricsSink>,
132}
133
134impl<C: CanisterKind> DbSession<C> {
135 #[must_use]
136 pub const fn new(db: Db<C>) -> Self {
137 Self {
138 db,
139 debug: false,
140 metrics: None,
141 }
142 }
143
144 #[must_use]
145 pub const fn debug(mut self) -> Self {
146 self.debug = true;
147 self
148 }
149
150 #[must_use]
151 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
152 self.metrics = Some(sink);
153 self
154 }
155
156 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
157 if let Some(sink) = self.metrics {
158 sink::with_metrics_sink(sink, f)
159 } else {
160 f()
161 }
162 }
163
164 #[must_use]
169 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
170 where
171 E: EntityKind<Canister = C>,
172 {
173 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
174 }
175
176 #[must_use]
177 pub const fn load_with_consistency<E>(
178 &self,
179 consistency: ReadConsistency,
180 ) -> SessionLoadQuery<'_, C, E>
181 where
182 E: EntityKind<Canister = C>,
183 {
184 SessionLoadQuery::new(self, Query::new(consistency))
185 }
186
187 #[must_use]
188 pub const fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
189 where
190 E: EntityKind<Canister = C>,
191 {
192 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
193 }
194
195 #[must_use]
196 pub const fn delete_with_consistency<E>(
197 &self,
198 consistency: ReadConsistency,
199 ) -> SessionDeleteQuery<'_, C, E>
200 where
201 E: EntityKind<Canister = C>,
202 {
203 SessionDeleteQuery::new(self, Query::new(consistency).delete())
204 }
205
206 #[must_use]
211 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
212 where
213 E: EntityKind<Canister = C>,
214 {
215 LoadExecutor::new(self.db, self.debug)
216 }
217
218 #[must_use]
219 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
220 where
221 E: EntityKind<Canister = C>,
222 {
223 DeleteExecutor::new(self.db, self.debug)
224 }
225
226 #[must_use]
227 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
228 where
229 E: EntityKind<Canister = C>,
230 {
231 SaveExecutor::new(self.db, self.debug)
232 }
233
234 pub fn diagnose_query<E: EntityKind<Canister = C>>(
239 &self,
240 query: &Query<E>,
241 ) -> Result<QueryDiagnostics, QueryError> {
242 let explain = query.explain()?;
243 Ok(QueryDiagnostics::from(explain))
244 }
245
246 pub fn execute_query<E: EntityKind<Canister = C>>(
247 &self,
248 query: &Query<E>,
249 ) -> Result<Response<E>, QueryError> {
250 let plan = query.plan()?;
251 let result = match query.mode() {
252 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
253 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
254 };
255
256 result.map_err(QueryError::Execute)
257 }
258
259 pub fn execute_with_diagnostics<E: EntityKind<Canister = C>>(
260 &self,
261 query: &Query<E>,
262 ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError> {
263 let plan = query.plan()?;
264 let fingerprint = plan.fingerprint();
265 let access = Some(trace_access_from_plan(plan.access()));
266 let executor = match query.mode() {
267 QueryMode::Load(_) => QueryTraceExecutorKind::Load,
268 QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
269 };
270
271 let start = start_event(fingerprint, access, executor);
272 let result = match query.mode() {
273 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
274 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
275 };
276
277 match result {
278 Ok(response) => {
279 let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
280 let finish = finish_event(fingerprint, access, executor, rows);
281 Ok((
282 response,
283 QueryExecutionDiagnostics {
284 fingerprint,
285 events: vec![start, finish],
286 },
287 ))
288 }
289 Err(err) => Err(QueryError::Execute(err)),
290 }
291 }
292
293 pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
298 where
299 E: EntityKind<Canister = C>,
300 {
301 self.with_metrics(|| self.save_executor::<E>().insert(entity))
302 }
303
304 pub fn insert_many<E>(
305 &self,
306 entities: impl IntoIterator<Item = E>,
307 ) -> Result<Vec<E>, InternalError>
308 where
309 E: EntityKind<Canister = C>,
310 {
311 self.with_metrics(|| self.save_executor::<E>().insert_many(entities))
312 }
313
314 pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
315 where
316 E: EntityKind<Canister = C>,
317 {
318 self.with_metrics(|| self.save_executor::<E>().replace(entity))
319 }
320
321 pub fn replace_many<E>(
322 &self,
323 entities: impl IntoIterator<Item = E>,
324 ) -> Result<Vec<E>, InternalError>
325 where
326 E: EntityKind<Canister = C>,
327 {
328 self.with_metrics(|| self.save_executor::<E>().replace_many(entities))
329 }
330
331 pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
332 where
333 E: EntityKind<Canister = C>,
334 {
335 self.with_metrics(|| self.save_executor::<E>().update(entity))
336 }
337
338 pub fn update_many<E>(
339 &self,
340 entities: impl IntoIterator<Item = E>,
341 ) -> Result<Vec<E>, InternalError>
342 where
343 E: EntityKind<Canister = C>,
344 {
345 self.with_metrics(|| self.save_executor::<E>().update_many(entities))
346 }
347
348 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
349 where
350 E: EntityKind<Canister = C>,
351 {
352 self.with_metrics(|| self.save_executor::<E>().insert_view(view))
353 }
354
355 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
356 where
357 E: EntityKind<Canister = C>,
358 {
359 self.with_metrics(|| self.save_executor::<E>().replace_view(view))
360 }
361
362 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
363 where
364 E: EntityKind<Canister = C>,
365 {
366 self.with_metrics(|| self.save_executor::<E>().update_view(view))
367 }
368
369 #[doc(hidden)]
371 pub fn clear_stores_for_tests(&self) {
372 self.db.with_data(|reg| {
374 for (path, _) in reg.iter() {
375 let _ = reg.with_store_mut(path, DataStore::clear);
376 }
377 });
378
379 self.db.with_index(|reg| {
381 for (path, _) in reg.iter() {
382 let _ = reg.with_store_mut(path, IndexStore::clear);
383 }
384 });
385 }
386}