1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8mod write;
9
10pub(crate) use commit::*;
11pub(crate) use write::WriteUnit;
12
13use crate::{
14 db::{
15 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
16 index::{IndexStore, IndexStoreRegistry},
17 query::{
18 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19 diagnostics::{
20 QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
21 start_event, trace_access_from_plan,
22 },
23 },
24 response::{Response, WriteBatchResponse, WriteResponse},
25 store::{DataStore, DataStoreRegistry},
26 },
27 error::InternalError,
28 obs::sink::{self, MetricsSink},
29 traits::{CanisterKind, EntityKind, EntityValue},
30};
31use std::{marker::PhantomData, thread::LocalKey};
32
33#[derive(Clone, Copy, Debug)]
41pub struct EntityRegistryEntry {
42 pub entity_path: &'static str,
43 pub store_path: &'static str,
44}
45
46pub struct Db<C: CanisterKind> {
52 data: &'static LocalKey<DataStoreRegistry>,
53 index: &'static LocalKey<IndexStoreRegistry>,
54 entities: &'static [EntityRegistryEntry],
55 _marker: PhantomData<C>,
56}
57
58impl<C: CanisterKind> Db<C> {
59 #[must_use]
60 pub const fn new(
61 data: &'static LocalKey<DataStoreRegistry>,
62 index: &'static LocalKey<IndexStoreRegistry>,
63 entities: &'static [EntityRegistryEntry],
64 ) -> Self {
65 Self {
66 data,
67 index,
68 entities,
69 _marker: PhantomData,
70 }
71 }
72
73 #[must_use]
74 pub(crate) const fn context<E>(&self) -> Context<'_, E>
75 where
76 E: EntityKind<Canister = C> + EntityValue,
77 {
78 Context::new(self)
79 }
80
81 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
83 where
84 E: EntityKind<Canister = C> + EntityValue,
85 {
86 ensure_recovered(self)?;
87
88 Ok(Context::new(self))
89 }
90
91 #[cfg(test)]
95 pub fn with_data_store_mut_for_test<R>(
96 &self,
97 path: &'static str,
98 f: impl FnOnce(&mut DataStore) -> R,
99 ) -> Result<R, InternalError> {
100 self.with_data(|reg| reg.with_store_mut(path, f))
101 }
102
103 pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
104 self.data.with(|reg| f(reg))
105 }
106
107 pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
108 self.index.with(|reg| f(reg))
109 }
110
111 pub(crate) const fn entity_registry(&self) -> &'static [EntityRegistryEntry] {
112 self.entities
113 }
114}
115
116impl<C: CanisterKind> Copy for Db<C> {}
117
118impl<C: CanisterKind> Clone for Db<C> {
119 fn clone(&self) -> Self {
120 *self
121 }
122}
123
124pub struct DbSession<C: CanisterKind> {
130 db: Db<C>,
131 debug: bool,
132 metrics: Option<&'static dyn MetricsSink>,
133}
134
135impl<C: CanisterKind> DbSession<C> {
136 #[must_use]
137 pub const fn new(db: Db<C>) -> Self {
138 Self {
139 db,
140 debug: false,
141 metrics: None,
142 }
143 }
144
145 #[must_use]
146 pub const fn debug(mut self) -> Self {
147 self.debug = true;
148 self
149 }
150
151 #[must_use]
152 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
153 self.metrics = Some(sink);
154 self
155 }
156
157 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
158 if let Some(sink) = self.metrics {
159 sink::with_metrics_sink(sink, f)
160 } else {
161 f()
162 }
163 }
164
165 #[must_use]
170 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
171 where
172 E: EntityKind<Canister = C>,
173 {
174 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
175 }
176
177 #[must_use]
178 pub const fn load_with_consistency<E>(
179 &self,
180 consistency: ReadConsistency,
181 ) -> SessionLoadQuery<'_, C, E>
182 where
183 E: EntityKind<Canister = C>,
184 {
185 SessionLoadQuery::new(self, Query::new(consistency))
186 }
187
188 #[must_use]
189 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
190 where
191 E: EntityKind<Canister = C>,
192 {
193 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
194 }
195
196 #[must_use]
197 pub fn delete_with_consistency<E>(
198 &self,
199 consistency: ReadConsistency,
200 ) -> SessionDeleteQuery<'_, C, E>
201 where
202 E: EntityKind<Canister = C>,
203 {
204 SessionDeleteQuery::new(self, Query::new(consistency).delete())
205 }
206
207 #[must_use]
212 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
213 where
214 E: EntityKind<Canister = C> + EntityValue,
215 {
216 LoadExecutor::new(self.db, self.debug)
217 }
218
219 #[must_use]
220 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
221 where
222 E: EntityKind<Canister = C> + EntityValue,
223 {
224 DeleteExecutor::new(self.db, self.debug)
225 }
226
227 #[must_use]
228 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
229 where
230 E: EntityKind<Canister = C> + EntityValue,
231 {
232 SaveExecutor::new(self.db, self.debug)
233 }
234
235 pub fn diagnose_query<E>(&self, query: &Query<E>) -> Result<QueryDiagnostics, QueryError>
240 where
241 E: EntityKind<Canister = C>,
242 {
243 let explain = query.explain()?;
244
245 Ok(QueryDiagnostics::from(explain))
246 }
247
248 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
249 where
250 E: EntityKind<Canister = C> + EntityValue,
251 {
252 let plan = query.plan()?;
253
254 let result = match query.mode() {
255 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
256 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
257 };
258
259 result.map_err(QueryError::Execute)
260 }
261
262 pub fn execute_with_diagnostics<E>(
263 &self,
264 query: &Query<E>,
265 ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError>
266 where
267 E: EntityKind<Canister = C> + EntityValue,
268 {
269 let plan = query.plan()?;
270 let fingerprint = plan.fingerprint();
271 let access = Some(trace_access_from_plan(plan.access()));
272 let executor = match query.mode() {
273 QueryMode::Load(_) => QueryTraceExecutorKind::Load,
274 QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
275 };
276
277 let start = start_event(fingerprint, access, executor);
278 let result = match query.mode() {
279 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
280 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
281 };
282
283 match result {
284 Ok(response) => {
285 let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
286 let finish = finish_event(fingerprint, access, executor, rows);
287 Ok((
288 response,
289 QueryExecutionDiagnostics {
290 fingerprint,
291 events: vec![start, finish],
292 },
293 ))
294 }
295 Err(err) => Err(QueryError::Execute(err)),
296 }
297 }
298
299 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
304 where
305 E: EntityKind<Canister = C> + EntityValue,
306 {
307 self.with_metrics(|| self.save_executor::<E>().insert(entity))
308 .map(WriteResponse::new)
309 }
310
311 pub fn insert_many<E>(
312 &self,
313 entities: impl IntoIterator<Item = E>,
314 ) -> Result<WriteBatchResponse<E>, InternalError>
315 where
316 E: EntityKind<Canister = C> + EntityValue,
317 {
318 let entities = self.with_metrics(|| self.save_executor::<E>().insert_many(entities))?;
319
320 Ok(WriteBatchResponse::new(entities))
321 }
322
323 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
324 where
325 E: EntityKind<Canister = C> + EntityValue,
326 {
327 self.with_metrics(|| self.save_executor::<E>().replace(entity))
328 .map(WriteResponse::new)
329 }
330
331 pub fn replace_many<E>(
332 &self,
333 entities: impl IntoIterator<Item = E>,
334 ) -> Result<WriteBatchResponse<E>, InternalError>
335 where
336 E: EntityKind<Canister = C> + EntityValue,
337 {
338 let entities = self.with_metrics(|| self.save_executor::<E>().replace_many(entities))?;
339
340 Ok(WriteBatchResponse::new(entities))
341 }
342
343 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
344 where
345 E: EntityKind<Canister = C> + EntityValue,
346 {
347 self.with_metrics(|| self.save_executor::<E>().update(entity))
348 .map(WriteResponse::new)
349 }
350
351 pub fn update_many<E>(
352 &self,
353 entities: impl IntoIterator<Item = E>,
354 ) -> Result<WriteBatchResponse<E>, InternalError>
355 where
356 E: EntityKind<Canister = C> + EntityValue,
357 {
358 let entities = self.with_metrics(|| self.save_executor::<E>().update_many(entities))?;
359
360 Ok(WriteBatchResponse::new(entities))
361 }
362
363 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
364 where
365 E: EntityKind<Canister = C> + EntityValue,
366 {
367 self.with_metrics(|| self.save_executor::<E>().insert_view(view))
368 }
369
370 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
371 where
372 E: EntityKind<Canister = C> + EntityValue,
373 {
374 self.with_metrics(|| self.save_executor::<E>().replace_view(view))
375 }
376
377 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
378 where
379 E: EntityKind<Canister = C> + EntityValue,
380 {
381 self.with_metrics(|| self.save_executor::<E>().update_view(view))
382 }
383
384 #[doc(hidden)]
386 pub fn clear_stores_for_tests(&self) {
387 self.db.with_data(|reg| {
389 for (path, _) in reg.iter() {
390 let _ = reg.with_store_mut(path, DataStore::clear);
391 }
392 });
393
394 self.db.with_index(|reg| {
396 for (path, _) in reg.iter() {
397 let _ = reg.with_store_mut(path, IndexStore::clear);
398 }
399 });
400 }
401}