1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8mod write;
9
10pub(crate) use commit::*;
11pub(crate) use write::WriteUnit;
12
13use crate::{
14 db::{
15 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
16 index::{IndexStore, IndexStoreRegistry},
17 query::{
18 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
19 diagnostics::{
20 QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
21 start_event, trace_access_from_plan,
22 },
23 },
24 response::Response,
25 store::{DataStore, DataStoreRegistry},
26 },
27 error::InternalError,
28 obs::sink::{self, MetricsSink},
29 traits::{CanisterKind, EntityKind},
30};
31use std::{marker::PhantomData, thread::LocalKey};
32
33pub struct Db<C: CanisterKind> {
39 data: &'static LocalKey<DataStoreRegistry>,
40 index: &'static LocalKey<IndexStoreRegistry>,
41 _marker: PhantomData<C>,
42}
43
44impl<C: CanisterKind> Db<C> {
45 #[must_use]
46 pub const fn new(
47 data: &'static LocalKey<DataStoreRegistry>,
48 index: &'static LocalKey<IndexStoreRegistry>,
49 ) -> Self {
50 Self {
51 data,
52 index,
53 _marker: PhantomData,
54 }
55 }
56
57 #[must_use]
58 pub(crate) const fn context<E>(&self) -> Context<'_, E>
59 where
60 E: EntityKind<Canister = C>,
61 {
62 Context::new(self)
63 }
64
65 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
67 where
68 E: EntityKind<Canister = C>,
69 {
70 ensure_recovered(self)?;
71 Ok(Context::new(self))
72 }
73
74 pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
75 self.data.with(|reg| f(reg))
76 }
77
78 pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
79 self.index.with(|reg| f(reg))
80 }
81}
82
83impl<C: CanisterKind> Copy for Db<C> {}
84
85impl<C: CanisterKind> Clone for Db<C> {
86 fn clone(&self) -> Self {
87 *self
88 }
89}
90
91pub struct DbSession<C: CanisterKind> {
97 db: Db<C>,
98 debug: bool,
99 metrics: Option<&'static dyn MetricsSink>,
100}
101
102impl<C: CanisterKind> DbSession<C> {
103 #[must_use]
104 pub const fn new(db: Db<C>) -> Self {
105 Self {
106 db,
107 debug: false,
108 metrics: None,
109 }
110 }
111
112 #[must_use]
113 pub const fn debug(mut self) -> Self {
114 self.debug = true;
115 self
116 }
117
118 #[must_use]
119 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
120 self.metrics = Some(sink);
121 self
122 }
123
124 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
125 if let Some(sink) = self.metrics {
126 sink::with_metrics_sink(sink, f)
127 } else {
128 f()
129 }
130 }
131
132 #[must_use]
137 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
138 where
139 E: EntityKind<Canister = C>,
140 {
141 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
142 }
143
144 #[must_use]
145 pub const fn load_with_consistency<E>(
146 &self,
147 consistency: ReadConsistency,
148 ) -> SessionLoadQuery<'_, C, E>
149 where
150 E: EntityKind<Canister = C>,
151 {
152 SessionLoadQuery::new(self, Query::new(consistency))
153 }
154
155 #[must_use]
156 pub const fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
157 where
158 E: EntityKind<Canister = C>,
159 {
160 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
161 }
162
163 #[must_use]
164 pub const fn delete_with_consistency<E>(
165 &self,
166 consistency: ReadConsistency,
167 ) -> SessionDeleteQuery<'_, C, E>
168 where
169 E: EntityKind<Canister = C>,
170 {
171 SessionDeleteQuery::new(self, Query::new(consistency).delete())
172 }
173
174 #[must_use]
179 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
180 where
181 E: EntityKind<Canister = C>,
182 {
183 LoadExecutor::new(self.db, self.debug)
184 }
185
186 #[must_use]
187 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
188 where
189 E: EntityKind<Canister = C>,
190 {
191 DeleteExecutor::new(self.db, self.debug)
192 }
193
194 #[must_use]
195 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
196 where
197 E: EntityKind<Canister = C>,
198 {
199 SaveExecutor::new(self.db, self.debug)
200 }
201
202 pub fn diagnose_query<E: EntityKind<Canister = C>>(
207 &self,
208 query: &Query<E>,
209 ) -> Result<QueryDiagnostics, QueryError> {
210 let explain = query.explain()?;
211 Ok(QueryDiagnostics::from(explain))
212 }
213
214 pub fn execute_query<E: EntityKind<Canister = C>>(
215 &self,
216 query: &Query<E>,
217 ) -> Result<Response<E>, QueryError> {
218 let plan = query.plan()?;
219 let result = match query.mode() {
220 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
221 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
222 };
223
224 result.map_err(QueryError::Execute)
225 }
226
227 pub fn execute_with_diagnostics<E: EntityKind<Canister = C>>(
228 &self,
229 query: &Query<E>,
230 ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError> {
231 let plan = query.plan()?;
232 let fingerprint = plan.fingerprint();
233 let access = Some(trace_access_from_plan(plan.access()));
234 let executor = match query.mode() {
235 QueryMode::Load(_) => QueryTraceExecutorKind::Load,
236 QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
237 };
238
239 let start = start_event(fingerprint, access, executor);
240 let result = match query.mode() {
241 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
242 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
243 };
244
245 match result {
246 Ok(response) => {
247 let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
248 let finish = finish_event(fingerprint, access, executor, rows);
249 Ok((
250 response,
251 QueryExecutionDiagnostics {
252 fingerprint,
253 events: vec![start, finish],
254 },
255 ))
256 }
257 Err(err) => Err(QueryError::Execute(err)),
258 }
259 }
260
261 pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
266 where
267 E: EntityKind<Canister = C>,
268 {
269 self.with_metrics(|| self.save_executor::<E>().insert(entity))
270 }
271
272 pub fn insert_many<E>(
273 &self,
274 entities: impl IntoIterator<Item = E>,
275 ) -> Result<Vec<E>, InternalError>
276 where
277 E: EntityKind<Canister = C>,
278 {
279 self.with_metrics(|| self.save_executor::<E>().insert_many(entities))
280 }
281
282 pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
283 where
284 E: EntityKind<Canister = C>,
285 {
286 self.with_metrics(|| self.save_executor::<E>().replace(entity))
287 }
288
289 pub fn replace_many<E>(
290 &self,
291 entities: impl IntoIterator<Item = E>,
292 ) -> Result<Vec<E>, InternalError>
293 where
294 E: EntityKind<Canister = C>,
295 {
296 self.with_metrics(|| self.save_executor::<E>().replace_many(entities))
297 }
298
299 pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
300 where
301 E: EntityKind<Canister = C>,
302 {
303 self.with_metrics(|| self.save_executor::<E>().update(entity))
304 }
305
306 pub fn update_many<E>(
307 &self,
308 entities: impl IntoIterator<Item = E>,
309 ) -> Result<Vec<E>, InternalError>
310 where
311 E: EntityKind<Canister = C>,
312 {
313 self.with_metrics(|| self.save_executor::<E>().update_many(entities))
314 }
315
316 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
317 where
318 E: EntityKind<Canister = C>,
319 {
320 self.with_metrics(|| self.save_executor::<E>().insert_view(view))
321 }
322
323 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
324 where
325 E: EntityKind<Canister = C>,
326 {
327 self.with_metrics(|| self.save_executor::<E>().replace_view(view))
328 }
329
330 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
331 where
332 E: EntityKind<Canister = C>,
333 {
334 self.with_metrics(|| self.save_executor::<E>().update_view(view))
335 }
336
337 #[doc(hidden)]
339 pub fn clear_stores_for_tests(&self) {
340 self.db.with_data(|reg| {
342 for (path, _) in reg.iter() {
343 let _ = reg.with_store_mut(path, DataStore::clear);
344 }
345 });
346
347 self.db.with_index(|reg| {
349 for (path, _) in reg.iter() {
350 let _ = reg.with_store_mut(path, IndexStore::clear);
351 }
352 });
353 }
354}