1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8
9pub(crate) use commit::*;
10
11use crate::{
12 db::{
13 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
14 index::IndexStoreRegistry,
15 query::{
16 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
17 },
18 response::{Response, WriteBatchResponse, WriteResponse},
19 store::DataStoreRegistry,
20 },
21 error::InternalError,
22 obs::sink::{self, MetricsSink},
23 traits::{CanisterKind, EntityKind, EntityValue},
24};
25use std::{marker::PhantomData, thread::LocalKey};
26
27#[cfg(test)]
28use crate::db::{index::IndexStore, store::DataStore};
29
30pub struct Db<C: CanisterKind> {
36 data: &'static LocalKey<DataStoreRegistry>,
37 index: &'static LocalKey<IndexStoreRegistry>,
38 _marker: PhantomData<C>,
39}
40
41impl<C: CanisterKind> Db<C> {
42 #[must_use]
43 pub const fn new(
44 data: &'static LocalKey<DataStoreRegistry>,
45 index: &'static LocalKey<IndexStoreRegistry>,
46 ) -> Self {
47 Self {
48 data,
49 index,
50 _marker: PhantomData,
51 }
52 }
53
54 #[must_use]
55 pub(crate) const fn context<E>(&self) -> Context<'_, E>
56 where
57 E: EntityKind<Canister = C> + EntityValue,
58 {
59 Context::new(self)
60 }
61
62 pub(crate) fn recovered_context<E>(&self) -> Result<Context<'_, E>, InternalError>
67 where
68 E: EntityKind<Canister = C> + EntityValue,
69 {
70 ensure_recovered(self)?;
71
72 Ok(Context::new(self))
73 }
74
75 #[cfg(test)]
79 pub fn with_data_store_mut_for_test<R>(
80 &self,
81 path: &'static str,
82 f: impl FnOnce(&mut DataStore) -> R,
83 ) -> Result<R, InternalError> {
84 self.with_data(|reg| reg.with_store_mut(path, f))
85 }
86
87 pub(crate) fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
88 self.data.with(|reg| f(reg))
89 }
90
91 pub(crate) fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
92 self.index.with(|reg| f(reg))
93 }
94}
95
96impl<C: CanisterKind> Copy for Db<C> {}
97
98impl<C: CanisterKind> Clone for Db<C> {
99 fn clone(&self) -> Self {
100 *self
101 }
102}
103
104pub struct DbSession<C: CanisterKind> {
110 db: Db<C>,
111 debug: bool,
112 metrics: Option<&'static dyn MetricsSink>,
113}
114
115impl<C: CanisterKind> DbSession<C> {
116 #[must_use]
117 pub const fn new(db: Db<C>) -> Self {
118 Self {
119 db,
120 debug: false,
121 metrics: None,
122 }
123 }
124
125 #[must_use]
126 pub const fn debug(mut self) -> Self {
127 self.debug = true;
128 self
129 }
130
131 #[must_use]
132 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
133 self.metrics = Some(sink);
134 self
135 }
136
137 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
138 if let Some(sink) = self.metrics {
139 sink::with_metrics_sink(sink, f)
140 } else {
141 f()
142 }
143 }
144
145 #[must_use]
150 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
151 where
152 E: EntityKind<Canister = C>,
153 {
154 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
155 }
156
157 #[must_use]
158 pub const fn load_with_consistency<E>(
159 &self,
160 consistency: ReadConsistency,
161 ) -> SessionLoadQuery<'_, C, E>
162 where
163 E: EntityKind<Canister = C>,
164 {
165 SessionLoadQuery::new(self, Query::new(consistency))
166 }
167
168 #[must_use]
169 pub fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
170 where
171 E: EntityKind<Canister = C>,
172 {
173 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
174 }
175
176 #[must_use]
177 pub fn delete_with_consistency<E>(
178 &self,
179 consistency: ReadConsistency,
180 ) -> SessionDeleteQuery<'_, C, E>
181 where
182 E: EntityKind<Canister = C>,
183 {
184 SessionDeleteQuery::new(self, Query::new(consistency).delete())
185 }
186
187 #[must_use]
192 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
193 where
194 E: EntityKind<Canister = C> + EntityValue,
195 {
196 LoadExecutor::new(self.db, self.debug)
197 }
198
199 #[must_use]
200 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
201 where
202 E: EntityKind<Canister = C> + EntityValue,
203 {
204 DeleteExecutor::new(self.db, self.debug)
205 }
206
207 #[must_use]
208 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
209 where
210 E: EntityKind<Canister = C> + EntityValue,
211 {
212 SaveExecutor::new(self.db, self.debug)
213 }
214
215 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
220 where
221 E: EntityKind<Canister = C> + EntityValue,
222 {
223 let plan = query.plan()?;
224
225 let result = match query.mode() {
226 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
227 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
228 };
229
230 result.map_err(QueryError::Execute)
231 }
232
233 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
238 where
239 E: EntityKind<Canister = C> + EntityValue,
240 {
241 self.with_metrics(|| self.save_executor::<E>().insert(entity))
242 .map(WriteResponse::new)
243 }
244
245 pub fn insert_many_non_atomic<E>(
249 &self,
250 entities: impl IntoIterator<Item = E>,
251 ) -> Result<WriteBatchResponse<E>, InternalError>
252 where
253 E: EntityKind<Canister = C> + EntityValue,
254 {
255 let entities =
256 self.with_metrics(|| self.save_executor::<E>().insert_many_non_atomic(entities))?;
257
258 Ok(WriteBatchResponse::new(entities))
259 }
260
261 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
262 where
263 E: EntityKind<Canister = C> + EntityValue,
264 {
265 self.with_metrics(|| self.save_executor::<E>().replace(entity))
266 .map(WriteResponse::new)
267 }
268
269 pub fn replace_many_non_atomic<E>(
273 &self,
274 entities: impl IntoIterator<Item = E>,
275 ) -> Result<WriteBatchResponse<E>, InternalError>
276 where
277 E: EntityKind<Canister = C> + EntityValue,
278 {
279 let entities =
280 self.with_metrics(|| self.save_executor::<E>().replace_many_non_atomic(entities))?;
281
282 Ok(WriteBatchResponse::new(entities))
283 }
284
285 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
286 where
287 E: EntityKind<Canister = C> + EntityValue,
288 {
289 self.with_metrics(|| self.save_executor::<E>().update(entity))
290 .map(WriteResponse::new)
291 }
292
293 pub fn update_many_non_atomic<E>(
297 &self,
298 entities: impl IntoIterator<Item = E>,
299 ) -> Result<WriteBatchResponse<E>, InternalError>
300 where
301 E: EntityKind<Canister = C> + EntityValue,
302 {
303 let entities =
304 self.with_metrics(|| self.save_executor::<E>().update_many_non_atomic(entities))?;
305
306 Ok(WriteBatchResponse::new(entities))
307 }
308
309 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
310 where
311 E: EntityKind<Canister = C> + EntityValue,
312 {
313 self.with_metrics(|| self.save_executor::<E>().insert_view(view))
314 }
315
316 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
317 where
318 E: EntityKind<Canister = C> + EntityValue,
319 {
320 self.with_metrics(|| self.save_executor::<E>().replace_view(view))
321 }
322
323 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
324 where
325 E: EntityKind<Canister = C> + EntityValue,
326 {
327 self.with_metrics(|| self.save_executor::<E>().update_view(view))
328 }
329
330 #[cfg(test)]
332 #[doc(hidden)]
333 pub fn clear_stores_for_tests(&self) {
334 self.db.with_data(|reg| {
336 for (path, _) in reg.iter() {
337 let _ = reg.with_store_mut(path, DataStore::clear);
338 }
339 });
340
341 self.db.with_index(|reg| {
343 for (path, _) in reg.iter() {
344 let _ = reg.with_store_mut(path, IndexStore::clear);
345 }
346 });
347 }
348}