1#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5 db::{
6 Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7 QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9 query::{intent::QueryMode, plan::CursorPlanError},
10 },
11 error::InternalError,
12 obs::sink::{MetricsSink, with_metrics_sink},
13 traits::{CanisterKind, EntityKind, EntityValue},
14 types::{Decimal, Id},
15};
16
17type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
18
19pub struct DbSession<C: CanisterKind> {
26 db: Db<C>,
27 debug: bool,
28 metrics: Option<&'static dyn MetricsSink>,
29}
30
31impl<C: CanisterKind> DbSession<C> {
32 #[must_use]
33 pub const fn new(db: Db<C>) -> Self {
34 Self {
35 db,
36 debug: false,
37 metrics: None,
38 }
39 }
40
41 #[must_use]
42 pub const fn debug(mut self) -> Self {
43 self.debug = true;
44 self
45 }
46
47 #[must_use]
48 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
49 self.metrics = Some(sink);
50 self
51 }
52
53 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
54 if let Some(sink) = self.metrics {
55 with_metrics_sink(sink, f)
56 } else {
57 f()
58 }
59 }
60
61 fn execute_save_with<E, T, R>(
63 &self,
64 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
65 map: impl FnOnce(T) -> R,
66 ) -> Result<R, InternalError>
67 where
68 E: EntityKind<Canister = C> + EntityValue,
69 {
70 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
71
72 Ok(map(value))
73 }
74
75 fn execute_save_entity<E>(
77 &self,
78 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
79 ) -> Result<WriteResponse<E>, InternalError>
80 where
81 E: EntityKind<Canister = C> + EntityValue,
82 {
83 self.execute_save_with(op, WriteResponse::new)
84 }
85
86 fn execute_save_batch<E>(
87 &self,
88 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
89 ) -> Result<WriteBatchResponse<E>, InternalError>
90 where
91 E: EntityKind<Canister = C> + EntityValue,
92 {
93 self.execute_save_with(op, WriteBatchResponse::new)
94 }
95
96 fn execute_save_view<E>(
97 &self,
98 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
99 ) -> Result<E::ViewType, InternalError>
100 where
101 E: EntityKind<Canister = C> + EntityValue,
102 {
103 self.execute_save_with(op, std::convert::identity)
104 }
105
106 #[must_use]
111 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
112 where
113 E: EntityKind<Canister = C>,
114 {
115 FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
116 }
117
118 #[must_use]
119 pub const fn load_with_consistency<E>(
120 &self,
121 consistency: ReadConsistency,
122 ) -> FluentLoadQuery<'_, E>
123 where
124 E: EntityKind<Canister = C>,
125 {
126 FluentLoadQuery::new(self, Query::new(consistency))
127 }
128
129 #[must_use]
130 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
131 where
132 E: EntityKind<Canister = C>,
133 {
134 FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
135 }
136
137 #[must_use]
138 pub fn delete_with_consistency<E>(
139 &self,
140 consistency: ReadConsistency,
141 ) -> FluentDeleteQuery<'_, E>
142 where
143 E: EntityKind<Canister = C>,
144 {
145 FluentDeleteQuery::new(self, Query::new(consistency).delete())
146 }
147
148 #[must_use]
153 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
154 where
155 E: EntityKind<Canister = C> + EntityValue,
156 {
157 LoadExecutor::new(self.db, self.debug)
158 }
159
160 #[must_use]
161 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
162 where
163 E: EntityKind<Canister = C> + EntityValue,
164 {
165 DeleteExecutor::new(self.db, self.debug)
166 }
167
168 #[must_use]
169 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
170 where
171 E: EntityKind<Canister = C> + EntityValue,
172 {
173 SaveExecutor::new(self.db, self.debug)
174 }
175
176 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
181 where
182 E: EntityKind<Canister = C> + EntityValue,
183 {
184 let plan = query.plan()?;
185
186 let result = match query.mode() {
187 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
188 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
189 };
190
191 result.map_err(QueryError::Execute)
192 }
193
194 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
195 where
196 E: EntityKind<Canister = C> + EntityValue,
197 {
198 let plan = query.plan()?;
199
200 self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
201 .map_err(QueryError::Execute)
202 }
203
204 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
205 where
206 E: EntityKind<Canister = C> + EntityValue,
207 {
208 let plan = query.plan()?;
209
210 self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
211 .map_err(QueryError::Execute)
212 }
213
214 pub(crate) fn execute_load_query_min<E>(
215 &self,
216 query: &Query<E>,
217 ) -> Result<Option<Id<E>>, QueryError>
218 where
219 E: EntityKind<Canister = C> + EntityValue,
220 {
221 let plan = query.plan()?;
222
223 self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
224 .map_err(QueryError::Execute)
225 }
226
227 pub(crate) fn execute_load_query_max<E>(
228 &self,
229 query: &Query<E>,
230 ) -> Result<Option<Id<E>>, QueryError>
231 where
232 E: EntityKind<Canister = C> + EntityValue,
233 {
234 let plan = query.plan()?;
235
236 self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
237 .map_err(QueryError::Execute)
238 }
239
240 pub(crate) fn execute_load_query_min_by<E>(
241 &self,
242 query: &Query<E>,
243 target_field: &str,
244 ) -> Result<Option<Id<E>>, QueryError>
245 where
246 E: EntityKind<Canister = C> + EntityValue,
247 {
248 let plan = query.plan()?;
249
250 self.with_metrics(|| {
251 self.load_executor::<E>()
252 .aggregate_min_by(plan, target_field)
253 })
254 .map_err(QueryError::Execute)
255 }
256
257 pub(crate) fn execute_load_query_max_by<E>(
258 &self,
259 query: &Query<E>,
260 target_field: &str,
261 ) -> Result<Option<Id<E>>, QueryError>
262 where
263 E: EntityKind<Canister = C> + EntityValue,
264 {
265 let plan = query.plan()?;
266
267 self.with_metrics(|| {
268 self.load_executor::<E>()
269 .aggregate_max_by(plan, target_field)
270 })
271 .map_err(QueryError::Execute)
272 }
273
274 pub(crate) fn execute_load_query_nth_by<E>(
275 &self,
276 query: &Query<E>,
277 target_field: &str,
278 nth: usize,
279 ) -> Result<Option<Id<E>>, QueryError>
280 where
281 E: EntityKind<Canister = C> + EntityValue,
282 {
283 let plan = query.plan()?;
284
285 self.with_metrics(|| {
286 self.load_executor::<E>()
287 .aggregate_nth_by(plan, target_field, nth)
288 })
289 .map_err(QueryError::Execute)
290 }
291
292 pub(crate) fn execute_load_query_sum_by<E>(
293 &self,
294 query: &Query<E>,
295 target_field: &str,
296 ) -> Result<Option<Decimal>, QueryError>
297 where
298 E: EntityKind<Canister = C> + EntityValue,
299 {
300 let plan = query.plan()?;
301
302 self.with_metrics(|| {
303 self.load_executor::<E>()
304 .aggregate_sum_by(plan, target_field)
305 })
306 .map_err(QueryError::Execute)
307 }
308
309 pub(crate) fn execute_load_query_avg_by<E>(
310 &self,
311 query: &Query<E>,
312 target_field: &str,
313 ) -> Result<Option<Decimal>, QueryError>
314 where
315 E: EntityKind<Canister = C> + EntityValue,
316 {
317 let plan = query.plan()?;
318
319 self.with_metrics(|| {
320 self.load_executor::<E>()
321 .aggregate_avg_by(plan, target_field)
322 })
323 .map_err(QueryError::Execute)
324 }
325
326 pub(crate) fn execute_load_query_median_by<E>(
327 &self,
328 query: &Query<E>,
329 target_field: &str,
330 ) -> Result<Option<Id<E>>, QueryError>
331 where
332 E: EntityKind<Canister = C> + EntityValue,
333 {
334 let plan = query.plan()?;
335
336 self.with_metrics(|| {
337 self.load_executor::<E>()
338 .aggregate_median_by(plan, target_field)
339 })
340 .map_err(QueryError::Execute)
341 }
342
343 pub(crate) fn execute_load_query_count_distinct_by<E>(
344 &self,
345 query: &Query<E>,
346 target_field: &str,
347 ) -> Result<u32, QueryError>
348 where
349 E: EntityKind<Canister = C> + EntityValue,
350 {
351 let plan = query.plan()?;
352
353 self.with_metrics(|| {
354 self.load_executor::<E>()
355 .aggregate_count_distinct_by(plan, target_field)
356 })
357 .map_err(QueryError::Execute)
358 }
359
360 pub(crate) fn execute_load_query_min_max_by<E>(
361 &self,
362 query: &Query<E>,
363 target_field: &str,
364 ) -> Result<MinMaxByIds<E>, QueryError>
365 where
366 E: EntityKind<Canister = C> + EntityValue,
367 {
368 let plan = query.plan()?;
369
370 self.with_metrics(|| {
371 self.load_executor::<E>()
372 .aggregate_min_max_by(plan, target_field)
373 })
374 .map_err(QueryError::Execute)
375 }
376
377 pub(crate) fn execute_load_query_first<E>(
378 &self,
379 query: &Query<E>,
380 ) -> Result<Option<Id<E>>, QueryError>
381 where
382 E: EntityKind<Canister = C> + EntityValue,
383 {
384 let plan = query.plan()?;
385
386 self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
387 .map_err(QueryError::Execute)
388 }
389
390 pub(crate) fn execute_load_query_last<E>(
391 &self,
392 query: &Query<E>,
393 ) -> Result<Option<Id<E>>, QueryError>
394 where
395 E: EntityKind<Canister = C> + EntityValue,
396 {
397 let plan = query.plan()?;
398
399 self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
400 .map_err(QueryError::Execute)
401 }
402
403 pub(crate) fn execute_load_query_paged_with_trace<E>(
404 &self,
405 query: &Query<E>,
406 cursor_token: Option<&str>,
407 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
408 where
409 E: EntityKind<Canister = C> + EntityValue,
410 {
411 let plan = query.plan()?;
412 let cursor_bytes = match cursor_token {
413 Some(token) => Some(decode_cursor(token).map_err(|reason| {
414 QueryError::from(PlanError::from(
415 CursorPlanError::InvalidContinuationCursor { reason },
416 ))
417 })?),
418 None => None,
419 };
420 let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
421
422 let (page, trace) = self
423 .with_metrics(|| {
424 self.load_executor::<E>()
425 .execute_paged_with_cursor_traced(plan, cursor)
426 })
427 .map_err(QueryError::Execute)?;
428
429 Ok((page.items, page.next_cursor, trace))
430 }
431
432 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
437 where
438 E: EntityKind<Canister = C> + EntityValue,
439 {
440 self.execute_save_entity(|save| save.insert(entity))
441 }
442
443 pub fn insert_many_atomic<E>(
449 &self,
450 entities: impl IntoIterator<Item = E>,
451 ) -> Result<WriteBatchResponse<E>, InternalError>
452 where
453 E: EntityKind<Canister = C> + EntityValue,
454 {
455 self.execute_save_batch(|save| save.insert_many_atomic(entities))
456 }
457
458 pub fn insert_many_non_atomic<E>(
462 &self,
463 entities: impl IntoIterator<Item = E>,
464 ) -> Result<WriteBatchResponse<E>, InternalError>
465 where
466 E: EntityKind<Canister = C> + EntityValue,
467 {
468 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
469 }
470
471 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
472 where
473 E: EntityKind<Canister = C> + EntityValue,
474 {
475 self.execute_save_entity(|save| save.replace(entity))
476 }
477
478 pub fn replace_many_atomic<E>(
484 &self,
485 entities: impl IntoIterator<Item = E>,
486 ) -> Result<WriteBatchResponse<E>, InternalError>
487 where
488 E: EntityKind<Canister = C> + EntityValue,
489 {
490 self.execute_save_batch(|save| save.replace_many_atomic(entities))
491 }
492
493 pub fn replace_many_non_atomic<E>(
497 &self,
498 entities: impl IntoIterator<Item = E>,
499 ) -> Result<WriteBatchResponse<E>, InternalError>
500 where
501 E: EntityKind<Canister = C> + EntityValue,
502 {
503 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
504 }
505
506 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
507 where
508 E: EntityKind<Canister = C> + EntityValue,
509 {
510 self.execute_save_entity(|save| save.update(entity))
511 }
512
513 pub fn update_many_atomic<E>(
519 &self,
520 entities: impl IntoIterator<Item = E>,
521 ) -> Result<WriteBatchResponse<E>, InternalError>
522 where
523 E: EntityKind<Canister = C> + EntityValue,
524 {
525 self.execute_save_batch(|save| save.update_many_atomic(entities))
526 }
527
528 pub fn update_many_non_atomic<E>(
532 &self,
533 entities: impl IntoIterator<Item = E>,
534 ) -> Result<WriteBatchResponse<E>, InternalError>
535 where
536 E: EntityKind<Canister = C> + EntityValue,
537 {
538 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
539 }
540
541 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
542 where
543 E: EntityKind<Canister = C> + EntityValue,
544 {
545 self.execute_save_view::<E>(|save| save.insert_view(view))
546 }
547
548 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
549 where
550 E: EntityKind<Canister = C> + EntityValue,
551 {
552 self.execute_save_view::<E>(|save| save.replace_view(view))
553 }
554
555 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
556 where
557 E: EntityKind<Canister = C> + EntityValue,
558 {
559 self.execute_save_view::<E>(|save| save.update_view(view))
560 }
561
562 #[cfg(test)]
564 #[doc(hidden)]
565 pub fn clear_stores_for_tests(&self) {
566 self.db.with_store_registry(|reg| {
567 for (_, store) in reg.iter() {
568 store.with_data_mut(DataStore::clear);
569 store.with_index_mut(IndexStore::clear);
570 }
571 });
572 }
573}