1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8mod write;
9
10pub(crate) use commit::*;
11pub(crate) use write::WriteUnit;
12
13use crate::{
14 db::{
15 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
16 index::{IndexStore, IndexStoreRegistry},
17 query::{
18 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19 diagnostics::{
20 QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
21 start_event, trace_access_from_plan,
22 },
23 },
24 response::{Response, WriteBatchResponse, WriteResponse},
25 store::{DataStore, DataStoreRegistry},
26 },
27 error::InternalError,
28 obs::sink::{self, MetricsSink},
29 traits::{CanisterKind, EntityKind, EntityValue},
30};
31use std::{marker::PhantomData, thread::LocalKey};
32
33pub struct Db<C: CanisterKind> {
39 data: &'static LocalKey<DataStoreRegistry>,
40 index: &'static LocalKey<IndexStoreRegistry>,
41 _marker: PhantomData<C>,
42}
43
44impl<C: CanisterKind> Db<C> {
45 #[must_use]
46 pub const fn new(
47 data: &'static LocalKey<DataStoreRegistry>,
48 index: &'static LocalKey<IndexStoreRegistry>,
49 ) -> Self {
50 Self {
51 data,
52 index,
53 _marker: PhantomData,
54 }
55 }
56
57 #[must_use]
58 pub(crate) const fn context<E>(&self) -> Context<'_, E>
59 where
60 E: EntityKind<Canister = C> + EntityValue,
61 {
62 Context::new(self)
63 }
64
65 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
67 where
68 E: EntityKind<Canister = C> + EntityValue,
69 {
70 ensure_recovered(self)?;
71
72 Ok(Context::new(self))
73 }
74
75 #[cfg(test)]
79 pub fn with_data_store_mut_for_test<R>(
80 &self,
81 path: &'static str,
82 f: impl FnOnce(&mut DataStore) -> R,
83 ) -> Result<R, InternalError> {
84 self.with_data(|reg| reg.with_store_mut(path, f))
85 }
86
87 pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
88 self.data.with(|reg| f(reg))
89 }
90
91 pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
92 self.index.with(|reg| f(reg))
93 }
94}
95
96impl<C: CanisterKind> Copy for Db<C> {}
97
98impl<C: CanisterKind> Clone for Db<C> {
99 fn clone(&self) -> Self {
100 *self
101 }
102}
103
104pub struct DbSession<C: CanisterKind> {
110 db: Db<C>,
111 debug: bool,
112 metrics: Option<&'static dyn MetricsSink>,
113}
114
115impl<C: CanisterKind> DbSession<C> {
116 #[must_use]
117 pub const fn new(db: Db<C>) -> Self {
118 Self {
119 db,
120 debug: false,
121 metrics: None,
122 }
123 }
124
125 #[must_use]
126 pub const fn debug(mut self) -> Self {
127 self.debug = true;
128 self
129 }
130
131 #[must_use]
132 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
133 self.metrics = Some(sink);
134 self
135 }
136
137 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
138 if let Some(sink) = self.metrics {
139 sink::with_metrics_sink(sink, f)
140 } else {
141 f()
142 }
143 }
144
145 #[must_use]
150 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
151 where
152 E: EntityKind<Canister = C>,
153 {
154 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
155 }
156
157 #[must_use]
158 pub const fn load_with_consistency<E>(
159 &self,
160 consistency: ReadConsistency,
161 ) -> SessionLoadQuery<'_, C, E>
162 where
163 E: EntityKind<Canister = C>,
164 {
165 SessionLoadQuery::new(self, Query::new(consistency))
166 }
167
168 #[must_use]
169 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
170 where
171 E: EntityKind<Canister = C>,
172 {
173 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
174 }
175
176 #[must_use]
177 pub fn delete_with_consistency<E>(
178 &self,
179 consistency: ReadConsistency,
180 ) -> SessionDeleteQuery<'_, C, E>
181 where
182 E: EntityKind<Canister = C>,
183 {
184 SessionDeleteQuery::new(self, Query::new(consistency).delete())
185 }
186
187 #[must_use]
192 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
193 where
194 E: EntityKind<Canister = C> + EntityValue,
195 {
196 LoadExecutor::new(self.db, self.debug)
197 }
198
199 #[must_use]
200 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
201 where
202 E: EntityKind<Canister = C> + EntityValue,
203 {
204 DeleteExecutor::new(self.db, self.debug)
205 }
206
207 #[must_use]
208 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
209 where
210 E: EntityKind<Canister = C> + EntityValue,
211 {
212 SaveExecutor::new(self.db, self.debug)
213 }
214
215 pub fn diagnose_query<E>(&self, query: &Query<E>) -> Result<QueryDiagnostics, QueryError>
220 where
221 E: EntityKind<Canister = C>,
222 {
223 let explain = query.explain()?;
224
225 Ok(QueryDiagnostics::from(explain))
226 }
227
228 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
229 where
230 E: EntityKind<Canister = C> + EntityValue,
231 {
232 let plan = query.plan()?;
233
234 let result = match query.mode() {
235 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
236 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
237 };
238
239 result.map_err(QueryError::Execute)
240 }
241
242 pub fn execute_with_diagnostics<E>(
243 &self,
244 query: &Query<E>,
245 ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError>
246 where
247 E: EntityKind<Canister = C> + EntityValue,
248 {
249 let plan = query.plan()?;
250 let fingerprint = plan.fingerprint();
251 let access = Some(trace_access_from_plan(plan.access()));
252 let executor = match query.mode() {
253 QueryMode::Load(_) => QueryTraceExecutorKind::Load,
254 QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
255 };
256
257 let start = start_event(fingerprint, access, executor);
258 let result = match query.mode() {
259 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
260 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
261 };
262
263 match result {
264 Ok(response) => {
265 let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
267 let finish = finish_event(fingerprint, access, executor, rows);
268 Ok((
269 response,
270 QueryExecutionDiagnostics {
271 fingerprint,
272 events: vec![start, finish],
273 },
274 ))
275 }
276 Err(err) => Err(QueryError::Execute(err)),
277 }
278 }
279
280 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
285 where
286 E: EntityKind<Canister = C> + EntityValue,
287 {
288 self.with_metrics(|| self.save_executor::<E>().insert(entity))
289 .map(WriteResponse::new)
290 }
291
292 pub fn insert_many<E>(
293 &self,
294 entities: impl IntoIterator<Item = E>,
295 ) -> Result<WriteBatchResponse<E>, InternalError>
296 where
297 E: EntityKind<Canister = C> + EntityValue,
298 {
299 let entities = self.with_metrics(|| self.save_executor::<E>().insert_many(entities))?;
300
301 Ok(WriteBatchResponse::new(entities))
302 }
303
304 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
305 where
306 E: EntityKind<Canister = C> + EntityValue,
307 {
308 self.with_metrics(|| self.save_executor::<E>().replace(entity))
309 .map(WriteResponse::new)
310 }
311
312 pub fn replace_many<E>(
313 &self,
314 entities: impl IntoIterator<Item = E>,
315 ) -> Result<WriteBatchResponse<E>, InternalError>
316 where
317 E: EntityKind<Canister = C> + EntityValue,
318 {
319 let entities = self.with_metrics(|| self.save_executor::<E>().replace_many(entities))?;
320
321 Ok(WriteBatchResponse::new(entities))
322 }
323
324 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
325 where
326 E: EntityKind<Canister = C> + EntityValue,
327 {
328 self.with_metrics(|| self.save_executor::<E>().update(entity))
329 .map(WriteResponse::new)
330 }
331
332 pub fn update_many<E>(
333 &self,
334 entities: impl IntoIterator<Item = E>,
335 ) -> Result<WriteBatchResponse<E>, InternalError>
336 where
337 E: EntityKind<Canister = C> + EntityValue,
338 {
339 let entities = self.with_metrics(|| self.save_executor::<E>().update_many(entities))?;
340
341 Ok(WriteBatchResponse::new(entities))
342 }
343
344 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
345 where
346 E: EntityKind<Canister = C> + EntityValue,
347 {
348 self.with_metrics(|| self.save_executor::<E>().insert_view(view))
349 }
350
351 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
352 where
353 E: EntityKind<Canister = C> + EntityValue,
354 {
355 self.with_metrics(|| self.save_executor::<E>().replace_view(view))
356 }
357
358 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
359 where
360 E: EntityKind<Canister = C> + EntityValue,
361 {
362 self.with_metrics(|| self.save_executor::<E>().update_view(view))
363 }
364
365 #[doc(hidden)]
367 pub fn clear_stores_for_tests(&self) {
368 self.db.with_data(|reg| {
370 for (path, _) in reg.iter() {
371 let _ = reg.with_store_mut(path, DataStore::clear);
372 }
373 });
374
375 self.db.with_index(|reg| {
377 for (path, _) in reg.iter() {
378 let _ = reg.with_store_mut(path, IndexStore::clear);
379 }
380 });
381 }
382}