1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8
9pub(crate) use commit::*;
10
11use crate::{
12 db::{
13 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
14 index::IndexStoreRegistry,
15 query::{
16 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
17 },
18 response::{Response, WriteBatchResponse, WriteResponse},
19 store::DataStoreRegistry,
20 },
21 error::InternalError,
22 obs::sink::{self, MetricsSink},
23 traits::{CanisterKind, EntityKind, EntityValue},
24};
25use std::{marker::PhantomData, thread::LocalKey};
26
27#[cfg(test)]
28use crate::db::{index::IndexStore, store::DataStore};
29
30pub struct Db<C: CanisterKind> {
36 data: &'static LocalKey<DataStoreRegistry>,
37 index: &'static LocalKey<IndexStoreRegistry>,
38 _marker: PhantomData<C>,
39}
40
41impl<C: CanisterKind> Db<C> {
42 #[must_use]
43 pub const fn new(
44 data: &'static LocalKey<DataStoreRegistry>,
45 index: &'static LocalKey<IndexStoreRegistry>,
46 ) -> Self {
47 Self {
48 data,
49 index,
50 _marker: PhantomData,
51 }
52 }
53
54 #[must_use]
55 pub(crate) const fn context<E>(&self) -> Context<'_, E>
56 where
57 E: EntityKind<Canister = C> + EntityValue,
58 {
59 Context::new(self)
60 }
61
62 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
64 where
65 E: EntityKind<Canister = C> + EntityValue,
66 {
67 ensure_recovered(self)?;
68
69 Ok(Context::new(self))
70 }
71
72 #[cfg(test)]
76 pub fn with_data_store_mut_for_test<R>(
77 &self,
78 path: &'static str,
79 f: impl FnOnce(&mut DataStore) -> R,
80 ) -> Result<R, InternalError> {
81 self.with_data(|reg| reg.with_store_mut(path, f))
82 }
83
84 pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
85 self.data.with(|reg| f(reg))
86 }
87
88 pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
89 self.index.with(|reg| f(reg))
90 }
91}
92
93impl<C: CanisterKind> Copy for Db<C> {}
94
95impl<C: CanisterKind> Clone for Db<C> {
96 fn clone(&self) -> Self {
97 *self
98 }
99}
100
101pub struct DbSession<C: CanisterKind> {
107 db: Db<C>,
108 debug: bool,
109 metrics: Option<&'static dyn MetricsSink>,
110}
111
112impl<C: CanisterKind> DbSession<C> {
113 #[must_use]
114 pub const fn new(db: Db<C>) -> Self {
115 Self {
116 db,
117 debug: false,
118 metrics: None,
119 }
120 }
121
122 #[must_use]
123 pub const fn debug(mut self) -> Self {
124 self.debug = true;
125 self
126 }
127
128 #[must_use]
129 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
130 self.metrics = Some(sink);
131 self
132 }
133
134 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
135 if let Some(sink) = self.metrics {
136 sink::with_metrics_sink(sink, f)
137 } else {
138 f()
139 }
140 }
141
142 #[must_use]
147 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
148 where
149 E: EntityKind<Canister = C>,
150 {
151 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
152 }
153
154 #[must_use]
155 pub const fn load_with_consistency<E>(
156 &self,
157 consistency: ReadConsistency,
158 ) -> SessionLoadQuery<'_, C, E>
159 where
160 E: EntityKind<Canister = C>,
161 {
162 SessionLoadQuery::new(self, Query::new(consistency))
163 }
164
165 #[must_use]
166 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
167 where
168 E: EntityKind<Canister = C>,
169 {
170 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
171 }
172
173 #[must_use]
174 pub fn delete_with_consistency<E>(
175 &self,
176 consistency: ReadConsistency,
177 ) -> SessionDeleteQuery<'_, C, E>
178 where
179 E: EntityKind<Canister = C>,
180 {
181 SessionDeleteQuery::new(self, Query::new(consistency).delete())
182 }
183
184 #[must_use]
189 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
190 where
191 E: EntityKind<Canister = C> + EntityValue,
192 {
193 LoadExecutor::new(self.db, self.debug)
194 }
195
196 #[must_use]
197 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
198 where
199 E: EntityKind<Canister = C> + EntityValue,
200 {
201 DeleteExecutor::new(self.db, self.debug)
202 }
203
204 #[must_use]
205 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
206 where
207 E: EntityKind<Canister = C> + EntityValue,
208 {
209 SaveExecutor::new(self.db, self.debug)
210 }
211
212 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
217 where
218 E: EntityKind<Canister = C> + EntityValue,
219 {
220 let plan = query.plan()?;
221
222 let result = match query.mode() {
223 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
224 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
225 };
226
227 result.map_err(QueryError::Execute)
228 }
229
230 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
235 where
236 E: EntityKind<Canister = C> + EntityValue,
237 {
238 self.with_metrics(|| self.save_executor::<E>().insert(entity))
239 .map(WriteResponse::new)
240 }
241
242 pub fn insert_many_non_atomic<E>(
246 &self,
247 entities: impl IntoIterator<Item = E>,
248 ) -> Result<WriteBatchResponse<E>, InternalError>
249 where
250 E: EntityKind<Canister = C> + EntityValue,
251 {
252 let entities =
253 self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
254
255 Ok(WriteBatchResponse::new(entities))
256 }
257
258 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
259 where
260 E: EntityKind<Canister = C> + EntityValue,
261 {
262 self.with_metrics(|| self.save_executor::<E>().replace(entity))
263 .map(WriteResponse::new)
264 }
265
266 pub fn replace_many_non_atomic<E>(
270 &self,
271 entities: impl IntoIterator<Item = E>,
272 ) -> Result<WriteBatchResponse<E>, InternalError>
273 where
274 E: EntityKind<Canister = C> + EntityValue,
275 {
276 let entities =
277 self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
278
279 Ok(WriteBatchResponse::new(entities))
280 }
281
282 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
283 where
284 E: EntityKind<Canister = C> + EntityValue,
285 {
286 self.with_metrics(|| self.save_executor::<E>().update(entity))
287 .map(WriteResponse::new)
288 }
289
290 pub fn update_many_non_atomic<E>(
294 &self,
295 entities: impl IntoIterator<Item = E>,
296 ) -> Result<WriteBatchResponse<E>, InternalError>
297 where
298 E: EntityKind<Canister = C> + EntityValue,
299 {
300 let entities =
301 self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
302
303 Ok(WriteBatchResponse::new(entities))
304 }
305
306 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
307 where
308 E: EntityKind<Canister = C> + EntityValue,
309 {
310 self.with_metrics(|| self.save_executor::<E>().insert_view(view))
311 }
312
313 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
314 where
315 E: EntityKind<Canister = C> + EntityValue,
316 {
317 self.with_metrics(|| self.save_executor::<E>().replace_view(view))
318 }
319
320 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
321 where
322 E: EntityKind<Canister = C> + EntityValue,
323 {
324 self.with_metrics(|| self.save_executor::<E>().update_view(view))
325 }
326
327 #[cfg(test)]
329 #[doc(hidden)]
330 pub fn clear_stores_for_tests(&self) {
331 self.db.with_data(|reg| {
333 for (path, _) in reg.iter() {
334 let _ = reg.with_store_mut(path, DataStore::clear);
335 }
336 });
337
338 self.db.with_index(|reg| {
340 for (path, _) in reg.iter() {
341 let _ = reg.with_store_mut(path, IndexStore::clear);
342 }
343 });
344 }
345}