1mod commit;
2pub mod cursor;
3pub(crate) mod executor;
4pub mod identity;
5pub mod index;
6pub mod query;
7pub mod response;
8pub mod store;
9
10pub(crate) use commit::*;
11
12use crate::{
13 db::{
14 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
15 index::IndexStoreRegistry,
16 query::{
17 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
18 plan::PlanError,
19 },
20 response::{Response, WriteBatchResponse, WriteResponse},
21 store::DataStoreRegistry,
22 },
23 error::InternalError,
24 obs::sink::{self, MetricsSink},
25 traits::{CanisterKind, EntityKind, EntityValue},
26};
27use std::{marker::PhantomData, thread::LocalKey};
28
29#[cfg(test)]
30use crate::db::{index::IndexStore, store::DataStore};
31
32pub struct Db<C: CanisterKind> {
38 data: &'static LocalKey<DataStoreRegistry>,
39 index: &'static LocalKey<IndexStoreRegistry>,
40 _marker: PhantomData<C>,
41}
42
43impl<C: CanisterKind> Db<C> {
44 #[must_use]
45 pub const fn new(
46 data: &'static LocalKey<DataStoreRegistry>,
47 index: &'static LocalKey<IndexStoreRegistry>,
48 ) -> Self {
49 Self {
50 data,
51 index,
52 _marker: PhantomData,
53 }
54 }
55
56 #[must_use]
57 pub(crate) const fn context<E>(&self) -> Context<'_, E>
58 where
59 E: EntityKind<Canister = C> + EntityValue,
60 {
61 Context::new(self)
62 }
63
64 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
69 where
70 E: EntityKind<Canister = C> + EntityValue,
71 {
72 ensure_recovered(self)?;
73
74 Ok(Context::new(self))
75 }
76
77 #[cfg(test)]
81 pub fn with_data_store_mut_for_test<R>(
82 &self,
83 path: &'static str,
84 f: impl FnOnce(&mut DataStore) -> R,
85 ) -> Result<R, InternalError> {
86 self.with_data(|reg| reg.with_store_mut(path, f))
87 }
88
89 pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
90 self.data.with(|reg| f(reg))
91 }
92
93 pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
94 self.index.with(|reg| f(reg))
95 }
96}
97
98impl<C: CanisterKind> Copy for Db<C> {}
99
100impl<C: CanisterKind> Clone for Db<C> {
101 fn clone(&self) -> Self {
102 *self
103 }
104}
105
106pub struct DbSession<C: CanisterKind> {
112 db: Db<C>,
113 debug: bool,
114 metrics: Option<&'static dyn MetricsSink>,
115}
116
117impl<C: CanisterKind> DbSession<C> {
118 #[must_use]
119 pub const fn new(db: Db<C>) -> Self {
120 Self {
121 db,
122 debug: false,
123 metrics: None,
124 }
125 }
126
127 #[must_use]
128 pub const fn debug(mut self) -> Self {
129 self.debug = true;
130 self
131 }
132
133 #[must_use]
134 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
135 self.metrics = Some(sink);
136 self
137 }
138
139 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
140 if let Some(sink) = self.metrics {
141 sink::with_metrics_sink(sink, f)
142 } else {
143 f()
144 }
145 }
146
147 #[must_use]
152 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
153 where
154 E: EntityKind<Canister = C>,
155 {
156 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
157 }
158
159 #[must_use]
160 pub const fn load_with_consistency<E>(
161 &self,
162 consistency: ReadConsistency,
163 ) -> SessionLoadQuery<'_, C, E>
164 where
165 E: EntityKind<Canister = C>,
166 {
167 SessionLoadQuery::new(self, Query::new(consistency))
168 }
169
170 #[must_use]
171 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
172 where
173 E: EntityKind<Canister = C>,
174 {
175 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
176 }
177
178 #[must_use]
179 pub fn delete_with_consistency<E>(
180 &self,
181 consistency: ReadConsistency,
182 ) -> SessionDeleteQuery<'_, C, E>
183 where
184 E: EntityKind<Canister = C>,
185 {
186 SessionDeleteQuery::new(self, Query::new(consistency).delete())
187 }
188
189 #[must_use]
194 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
195 where
196 E: EntityKind<Canister = C> + EntityValue,
197 {
198 LoadExecutor::new(self.db, self.debug)
199 }
200
201 #[must_use]
202 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
203 where
204 E: EntityKind<Canister = C> + EntityValue,
205 {
206 DeleteExecutor::new(self.db, self.debug)
207 }
208
209 #[must_use]
210 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
211 where
212 E: EntityKind<Canister = C> + EntityValue,
213 {
214 SaveExecutor::new(self.db, self.debug)
215 }
216
217 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
222 where
223 E: EntityKind<Canister = C> + EntityValue,
224 {
225 let plan = query.plan()?;
226
227 let result = match query.mode() {
228 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
229 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
230 };
231
232 result.map_err(QueryError::Execute)
233 }
234
235 pub(crate) fn execute_load_query_paged<E>(
236 &self,
237 query: &Query<E>,
238 cursor_token: Option<&str>,
239 ) -> Result<(Response<E>, Option<Vec<u8>>), QueryError>
240 where
241 E: EntityKind<Canister = C> + EntityValue,
242 {
243 let plan = query.plan()?;
244 let cursor_bytes = match cursor_token {
245 Some(token) => Some(cursor::decode_cursor(token).map_err(|reason| {
246 QueryError::from(PlanError::InvalidContinuationCursor { reason })
247 })?),
248 None => None,
249 };
250 let boundary = plan.plan_cursor_boundary(cursor_bytes.as_deref())?;
251
252 let page = self
253 .with_metrics(|| self.load_executor::<E>().execute_paged(plan, boundary))
254 .map_err(QueryError::Execute)?;
255
256 Ok((page.items, page.next_cursor))
257 }
258
259 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
264 where
265 E: EntityKind<Canister = C> + EntityValue,
266 {
267 self.with_metrics(|| self.save_executor::<E>().insert(entity))
268 .map(WriteResponse::new)
269 }
270
271 pub fn insert_many_non_atomic<E>(
275 &self,
276 entities: impl IntoIterator<Item = E>,
277 ) -> Result<WriteBatchResponse<E>, InternalError>
278 where
279 E: EntityKind<Canister = C> + EntityValue,
280 {
281 let entities =
282 self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
283
284 Ok(WriteBatchResponse::new(entities))
285 }
286
287 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
288 where
289 E: EntityKind<Canister = C> + EntityValue,
290 {
291 self.with_metrics(|| self.save_executor::<E>().replace(entity))
292 .map(WriteResponse::new)
293 }
294
295 pub fn replace_many_non_atomic<E>(
299 &self,
300 entities: impl IntoIterator<Item = E>,
301 ) -> Result<WriteBatchResponse<E>, InternalError>
302 where
303 E: EntityKind<Canister = C> + EntityValue,
304 {
305 let entities =
306 self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
307
308 Ok(WriteBatchResponse::new(entities))
309 }
310
311 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
312 where
313 E: EntityKind<Canister = C> + EntityValue,
314 {
315 self.with_metrics(|| self.save_executor::<E>().update(entity))
316 .map(WriteResponse::new)
317 }
318
319 pub fn update_many_non_atomic<E>(
323 &self,
324 entities: impl IntoIterator<Item = E>,
325 ) -> Result<WriteBatchResponse<E>, InternalError>
326 where
327 E: EntityKind<Canister = C> + EntityValue,
328 {
329 let entities =
330 self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
331
332 Ok(WriteBatchResponse::new(entities))
333 }
334
335 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
336 where
337 E: EntityKind<Canister = C> + EntityValue,
338 {
339 self.with_metrics(|| self.save_executor::<E>().insert_view(view))
340 }
341
342 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
343 where
344 E: EntityKind<Canister = C> + EntityValue,
345 {
346 self.with_metrics(|| self.save_executor::<E>().replace_view(view))
347 }
348
349 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
350 where
351 E: EntityKind<Canister = C> + EntityValue,
352 {
353 self.with_metrics(|| self.save_executor::<E>().update_view(view))
354 }
355
356 #[cfg(test)]
358 #[doc(hidden)]
359 pub fn clear_stores_for_tests(&self) {
360 self.db.with_data(|reg| {
362 for (path, _) in reg.iter() {
363 let _ = reg.with_store_mut(path, DataStore::clear);
364 }
365 });
366
367 self.db.with_index(|reg| {
369 for (path, _) in reg.iter() {
370 let _ = reg.with_store_mut(path, IndexStore::clear);
371 }
372 });
373 }
374}