1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8mod write;
9
10pub(crate) use commit::*;
11pub(crate) use write::WriteUnit;
12
13use crate::{
14 db::{
15 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
16 index::IndexStoreRegistry,
17 query::{
18 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19 diagnostics::{
20 QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
21 start_event, trace_access_from_plan,
22 },
23 },
24 response::{Response, WriteBatchResponse, WriteResponse},
25 store::DataStoreRegistry,
26 },
27 error::InternalError,
28 obs::sink::{self, MetricsSink},
29 traits::{CanisterKind, EntityKind, EntityValue},
30};
31use std::{marker::PhantomData, thread::LocalKey};
32
33#[cfg(test)]
34use crate::db::{index::IndexStore, store::DataStore};
35
36pub struct Db<C: CanisterKind> {
42 data: &'static LocalKey<DataStoreRegistry>,
43 index: &'static LocalKey<IndexStoreRegistry>,
44 _marker: PhantomData<C>,
45}
46
47impl<C: CanisterKind> Db<C> {
48 #[must_use]
49 pub const fn new(
50 data: &'static LocalKey<DataStoreRegistry>,
51 index: &'static LocalKey<IndexStoreRegistry>,
52 ) -> Self {
53 Self {
54 data,
55 index,
56 _marker: PhantomData,
57 }
58 }
59
60 #[must_use]
61 pub(crate) const fn context<E>(&self) -> Context<'_, E>
62 where
63 E: EntityKind<Canister = C> + EntityValue,
64 {
65 Context::new(self)
66 }
67
68 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
70 where
71 E: EntityKind<Canister = C> + EntityValue,
72 {
73 ensure_recovered(self)?;
74
75 Ok(Context::new(self))
76 }
77
78 #[cfg(test)]
82 pub fn with_data_store_mut_for_test<R>(
83 &self,
84 path: &'static str,
85 f: impl FnOnce(&mut DataStore) -> R,
86 ) -> Result<R, InternalError> {
87 self.with_data(|reg| reg.with_store_mut(path, f))
88 }
89
90 pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
91 self.data.with(|reg| f(reg))
92 }
93
94 pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
95 self.index.with(|reg| f(reg))
96 }
97}
98
99impl<C: CanisterKind> Copy for Db<C> {}
100
101impl<C: CanisterKind> Clone for Db<C> {
102 fn clone(&self) -> Self {
103 *self
104 }
105}
106
107pub struct DbSession<C: CanisterKind> {
113 db: Db<C>,
114 debug: bool,
115 metrics: Option<&'static dyn MetricsSink>,
116}
117
118impl<C: CanisterKind> DbSession<C> {
119 #[must_use]
120 pub const fn new(db: Db<C>) -> Self {
121 Self {
122 db,
123 debug: false,
124 metrics: None,
125 }
126 }
127
128 #[must_use]
129 pub const fn debug(mut self) -> Self {
130 self.debug = true;
131 self
132 }
133
134 #[must_use]
135 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
136 self.metrics = Some(sink);
137 self
138 }
139
140 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
141 if let Some(sink) = self.metrics {
142 sink::with_metrics_sink(sink, f)
143 } else {
144 f()
145 }
146 }
147
148 #[must_use]
153 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
154 where
155 E: EntityKind<Canister = C>,
156 {
157 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
158 }
159
160 #[must_use]
161 pub const fn load_with_consistency<E>(
162 &self,
163 consistency: ReadConsistency,
164 ) -> SessionLoadQuery<'_, C, E>
165 where
166 E: EntityKind<Canister = C>,
167 {
168 SessionLoadQuery::new(self, Query::new(consistency))
169 }
170
171 #[must_use]
172 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
173 where
174 E: EntityKind<Canister = C>,
175 {
176 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
177 }
178
179 #[must_use]
180 pub fn delete_with_consistency<E>(
181 &self,
182 consistency: ReadConsistency,
183 ) -> SessionDeleteQuery<'_, C, E>
184 where
185 E: EntityKind<Canister = C>,
186 {
187 SessionDeleteQuery::new(self, Query::new(consistency).delete())
188 }
189
190 #[must_use]
195 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
196 where
197 E: EntityKind<Canister = C> + EntityValue,
198 {
199 LoadExecutor::new(self.db, self.debug)
200 }
201
202 #[must_use]
203 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
204 where
205 E: EntityKind<Canister = C> + EntityValue,
206 {
207 DeleteExecutor::new(self.db, self.debug)
208 }
209
210 #[must_use]
211 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
212 where
213 E: EntityKind<Canister = C> + EntityValue,
214 {
215 SaveExecutor::new(self.db, self.debug)
216 }
217
218 pub fn diagnose_query<E>(&self, query: &Query<E>) -> Result<QueryDiagnostics, QueryError>
223 where
224 E: EntityKind<Canister = C>,
225 {
226 let explain = query.explain()?;
227
228 Ok(QueryDiagnostics::from(explain))
229 }
230
231 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
232 where
233 E: EntityKind<Canister = C> + EntityValue,
234 {
235 let plan = query.plan()?;
236
237 let result = match query.mode() {
238 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
239 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
240 };
241
242 result.map_err(QueryError::Execute)
243 }
244
245 pub fn execute_with_diagnostics<E>(
246 &self,
247 query: &Query<E>,
248 ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError>
249 where
250 E: EntityKind<Canister = C> + EntityValue,
251 {
252 let plan = query.plan()?;
253 let fingerprint = plan.fingerprint();
254 let access = Some(trace_access_from_plan(plan.access()));
255 let executor = match query.mode() {
256 QueryMode::Load(_) => QueryTraceExecutorKind::Load,
257 QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
258 };
259
260 let start = start_event(fingerprint, access, executor);
261 let result = match query.mode() {
262 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
263 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
264 };
265
266 match result {
267 Ok(response) => {
268 let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
270 let finish = finish_event(fingerprint, access, executor, rows);
271 Ok((
272 response,
273 QueryExecutionDiagnostics {
274 fingerprint,
275 events: vec![start, finish],
276 },
277 ))
278 }
279 Err(err) => Err(QueryError::Execute(err)),
280 }
281 }
282
283 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
288 where
289 E: EntityKind<Canister = C> + EntityValue,
290 {
291 self.with_metrics(|| self.save_executor::<E>().insert(entity))
292 .map(WriteResponse::new)
293 }
294
295 pub fn insert_many_non_atomic<E>(
299 &self,
300 entities: impl IntoIterator<Item = E>,
301 ) -> Result<WriteBatchResponse<E>, InternalError>
302 where
303 E: EntityKind<Canister = C> + EntityValue,
304 {
305 let entities =
306 self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
307
308 Ok(WriteBatchResponse::new(entities))
309 }
310
311 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
312 where
313 E: EntityKind<Canister = C> + EntityValue,
314 {
315 self.with_metrics(|| self.save_executor::<E>().replace(entity))
316 .map(WriteResponse::new)
317 }
318
319 pub fn replace_many_non_atomic<E>(
323 &self,
324 entities: impl IntoIterator<Item = E>,
325 ) -> Result<WriteBatchResponse<E>, InternalError>
326 where
327 E: EntityKind<Canister = C> + EntityValue,
328 {
329 let entities =
330 self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
331
332 Ok(WriteBatchResponse::new(entities))
333 }
334
335 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
336 where
337 E: EntityKind<Canister = C> + EntityValue,
338 {
339 self.with_metrics(|| self.save_executor::<E>().update(entity))
340 .map(WriteResponse::new)
341 }
342
343 pub fn update_many_non_atomic<E>(
347 &self,
348 entities: impl IntoIterator<Item = E>,
349 ) -> Result<WriteBatchResponse<E>, InternalError>
350 where
351 E: EntityKind<Canister = C> + EntityValue,
352 {
353 let entities =
354 self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
355
356 Ok(WriteBatchResponse::new(entities))
357 }
358
359 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
360 where
361 E: EntityKind<Canister = C> + EntityValue,
362 {
363 self.with_metrics(|| self.save_executor::<E>().insert_view(view))
364 }
365
366 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
367 where
368 E: EntityKind<Canister = C> + EntityValue,
369 {
370 self.with_metrics(|| self.save_executor::<E>().replace_view(view))
371 }
372
373 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
374 where
375 E: EntityKind<Canister = C> + EntityValue,
376 {
377 self.with_metrics(|| self.save_executor::<E>().update_view(view))
378 }
379
380 #[cfg(test)]
382 #[doc(hidden)]
383 pub fn clear_stores_for_tests(&self) {
384 self.db.with_data(|reg| {
386 for (path, _) in reg.iter() {
387 let _ = reg.with_store_mut(path, DataStore::clear);
388 }
389 });
390
391 self.db.with_index(|reg| {
393 for (path, _) in reg.iter() {
394 let _ = reg.with_store_mut(path, IndexStore::clear);
395 }
396 });
397 }
398}