1#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5 db::{
6 Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7 QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9 query::{intent::QueryMode, plan::CursorPlanError},
10 },
11 error::InternalError,
12 obs::sink::{MetricsSink, with_metrics_sink},
13 traits::{CanisterKind, EntityKind, EntityValue},
14 types::Id,
15};
16
17pub struct DbSession<C: CanisterKind> {
24 db: Db<C>,
25 debug: bool,
26 metrics: Option<&'static dyn MetricsSink>,
27}
28
29impl<C: CanisterKind> DbSession<C> {
30 #[must_use]
31 pub const fn new(db: Db<C>) -> Self {
32 Self {
33 db,
34 debug: false,
35 metrics: None,
36 }
37 }
38
39 #[must_use]
40 pub const fn debug(mut self) -> Self {
41 self.debug = true;
42 self
43 }
44
45 #[must_use]
46 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
47 self.metrics = Some(sink);
48 self
49 }
50
51 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
52 if let Some(sink) = self.metrics {
53 with_metrics_sink(sink, f)
54 } else {
55 f()
56 }
57 }
58
59 fn execute_save_with<E, T, R>(
61 &self,
62 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
63 map: impl FnOnce(T) -> R,
64 ) -> Result<R, InternalError>
65 where
66 E: EntityKind<Canister = C> + EntityValue,
67 {
68 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
69
70 Ok(map(value))
71 }
72
73 fn execute_save_entity<E>(
75 &self,
76 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
77 ) -> Result<WriteResponse<E>, InternalError>
78 where
79 E: EntityKind<Canister = C> + EntityValue,
80 {
81 self.execute_save_with(op, WriteResponse::new)
82 }
83
84 fn execute_save_batch<E>(
85 &self,
86 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
87 ) -> Result<WriteBatchResponse<E>, InternalError>
88 where
89 E: EntityKind<Canister = C> + EntityValue,
90 {
91 self.execute_save_with(op, WriteBatchResponse::new)
92 }
93
94 fn execute_save_view<E>(
95 &self,
96 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
97 ) -> Result<E::ViewType, InternalError>
98 where
99 E: EntityKind<Canister = C> + EntityValue,
100 {
101 self.execute_save_with(op, std::convert::identity)
102 }
103
104 #[must_use]
109 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
110 where
111 E: EntityKind<Canister = C>,
112 {
113 FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
114 }
115
116 #[must_use]
117 pub const fn load_with_consistency<E>(
118 &self,
119 consistency: ReadConsistency,
120 ) -> FluentLoadQuery<'_, E>
121 where
122 E: EntityKind<Canister = C>,
123 {
124 FluentLoadQuery::new(self, Query::new(consistency))
125 }
126
127 #[must_use]
128 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
129 where
130 E: EntityKind<Canister = C>,
131 {
132 FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
133 }
134
135 #[must_use]
136 pub fn delete_with_consistency<E>(
137 &self,
138 consistency: ReadConsistency,
139 ) -> FluentDeleteQuery<'_, E>
140 where
141 E: EntityKind<Canister = C>,
142 {
143 FluentDeleteQuery::new(self, Query::new(consistency).delete())
144 }
145
146 #[must_use]
151 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
152 where
153 E: EntityKind<Canister = C> + EntityValue,
154 {
155 LoadExecutor::new(self.db, self.debug)
156 }
157
158 #[must_use]
159 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
160 where
161 E: EntityKind<Canister = C> + EntityValue,
162 {
163 DeleteExecutor::new(self.db, self.debug)
164 }
165
166 #[must_use]
167 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
168 where
169 E: EntityKind<Canister = C> + EntityValue,
170 {
171 SaveExecutor::new(self.db, self.debug)
172 }
173
174 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
179 where
180 E: EntityKind<Canister = C> + EntityValue,
181 {
182 let plan = query.plan()?;
183
184 let result = match query.mode() {
185 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
186 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
187 };
188
189 result.map_err(QueryError::Execute)
190 }
191
192 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
193 where
194 E: EntityKind<Canister = C> + EntityValue,
195 {
196 let plan = query.plan()?;
197
198 self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
199 .map_err(QueryError::Execute)
200 }
201
202 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
203 where
204 E: EntityKind<Canister = C> + EntityValue,
205 {
206 let plan = query.plan()?;
207
208 self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
209 .map_err(QueryError::Execute)
210 }
211
212 pub(crate) fn execute_load_query_min<E>(
213 &self,
214 query: &Query<E>,
215 ) -> Result<Option<Id<E>>, QueryError>
216 where
217 E: EntityKind<Canister = C> + EntityValue,
218 {
219 let plan = query.plan()?;
220
221 self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
222 .map_err(QueryError::Execute)
223 }
224
225 pub(crate) fn execute_load_query_max<E>(
226 &self,
227 query: &Query<E>,
228 ) -> Result<Option<Id<E>>, QueryError>
229 where
230 E: EntityKind<Canister = C> + EntityValue,
231 {
232 let plan = query.plan()?;
233
234 self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
235 .map_err(QueryError::Execute)
236 }
237
238 pub(crate) fn execute_load_query_first<E>(
239 &self,
240 query: &Query<E>,
241 ) -> Result<Option<Id<E>>, QueryError>
242 where
243 E: EntityKind<Canister = C> + EntityValue,
244 {
245 let plan = query.plan()?;
246
247 self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
248 .map_err(QueryError::Execute)
249 }
250
251 pub(crate) fn execute_load_query_last<E>(
252 &self,
253 query: &Query<E>,
254 ) -> Result<Option<Id<E>>, QueryError>
255 where
256 E: EntityKind<Canister = C> + EntityValue,
257 {
258 let plan = query.plan()?;
259
260 self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
261 .map_err(QueryError::Execute)
262 }
263
264 pub(crate) fn execute_load_query_paged_with_trace<E>(
265 &self,
266 query: &Query<E>,
267 cursor_token: Option<&str>,
268 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
269 where
270 E: EntityKind<Canister = C> + EntityValue,
271 {
272 let plan = query.plan()?;
273 let cursor_bytes = match cursor_token {
274 Some(token) => Some(decode_cursor(token).map_err(|reason| {
275 QueryError::from(PlanError::from(
276 CursorPlanError::InvalidContinuationCursor { reason },
277 ))
278 })?),
279 None => None,
280 };
281 let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
282
283 let (page, trace) = self
284 .with_metrics(|| {
285 self.load_executor::<E>()
286 .execute_paged_with_cursor_traced(plan, cursor)
287 })
288 .map_err(QueryError::Execute)?;
289
290 Ok((page.items, page.next_cursor, trace))
291 }
292
293 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
298 where
299 E: EntityKind<Canister = C> + EntityValue,
300 {
301 self.execute_save_entity(|save| save.insert(entity))
302 }
303
304 pub fn insert_many_atomic<E>(
310 &self,
311 entities: impl IntoIterator<Item = E>,
312 ) -> Result<WriteBatchResponse<E>, InternalError>
313 where
314 E: EntityKind<Canister = C> + EntityValue,
315 {
316 self.execute_save_batch(|save| save.insert_many_atomic(entities))
317 }
318
319 pub fn insert_many_non_atomic<E>(
323 &self,
324 entities: impl IntoIterator<Item = E>,
325 ) -> Result<WriteBatchResponse<E>, InternalError>
326 where
327 E: EntityKind<Canister = C> + EntityValue,
328 {
329 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
330 }
331
332 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
333 where
334 E: EntityKind<Canister = C> + EntityValue,
335 {
336 self.execute_save_entity(|save| save.replace(entity))
337 }
338
339 pub fn replace_many_atomic<E>(
345 &self,
346 entities: impl IntoIterator<Item = E>,
347 ) -> Result<WriteBatchResponse<E>, InternalError>
348 where
349 E: EntityKind<Canister = C> + EntityValue,
350 {
351 self.execute_save_batch(|save| save.replace_many_atomic(entities))
352 }
353
354 pub fn replace_many_non_atomic<E>(
358 &self,
359 entities: impl IntoIterator<Item = E>,
360 ) -> Result<WriteBatchResponse<E>, InternalError>
361 where
362 E: EntityKind<Canister = C> + EntityValue,
363 {
364 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
365 }
366
367 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
368 where
369 E: EntityKind<Canister = C> + EntityValue,
370 {
371 self.execute_save_entity(|save| save.update(entity))
372 }
373
374 pub fn update_many_atomic<E>(
380 &self,
381 entities: impl IntoIterator<Item = E>,
382 ) -> Result<WriteBatchResponse<E>, InternalError>
383 where
384 E: EntityKind<Canister = C> + EntityValue,
385 {
386 self.execute_save_batch(|save| save.update_many_atomic(entities))
387 }
388
389 pub fn update_many_non_atomic<E>(
393 &self,
394 entities: impl IntoIterator<Item = E>,
395 ) -> Result<WriteBatchResponse<E>, InternalError>
396 where
397 E: EntityKind<Canister = C> + EntityValue,
398 {
399 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
400 }
401
402 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
403 where
404 E: EntityKind<Canister = C> + EntityValue,
405 {
406 self.execute_save_view::<E>(|save| save.insert_view(view))
407 }
408
409 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
410 where
411 E: EntityKind<Canister = C> + EntityValue,
412 {
413 self.execute_save_view::<E>(|save| save.replace_view(view))
414 }
415
416 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
417 where
418 E: EntityKind<Canister = C> + EntityValue,
419 {
420 self.execute_save_view::<E>(|save| save.update_view(view))
421 }
422
423 #[cfg(test)]
425 #[doc(hidden)]
426 pub fn clear_stores_for_tests(&self) {
427 self.db.with_store_registry(|reg| {
428 for (_, store) in reg.iter() {
429 store.with_data_mut(DataStore::clear);
430 store.with_index_mut(IndexStore::clear);
431 }
432 });
433 }
434}