1#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5 db::{
6 Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7 QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9 query::{intent::QueryMode, plan::CursorPlanError},
10 },
11 error::InternalError,
12 obs::sink::{MetricsSink, with_metrics_sink},
13 traits::{CanisterKind, EntityKind, EntityValue},
14 types::{Decimal, Id},
15 value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20pub struct DbSession<C: CanisterKind> {
27 db: Db<C>,
28 debug: bool,
29 metrics: Option<&'static dyn MetricsSink>,
30}
31
32impl<C: CanisterKind> DbSession<C> {
33 #[must_use]
34 pub const fn new(db: Db<C>) -> Self {
35 Self {
36 db,
37 debug: false,
38 metrics: None,
39 }
40 }
41
42 #[must_use]
43 pub const fn debug(mut self) -> Self {
44 self.debug = true;
45 self
46 }
47
48 #[must_use]
49 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
50 self.metrics = Some(sink);
51 self
52 }
53
54 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
55 if let Some(sink) = self.metrics {
56 with_metrics_sink(sink, f)
57 } else {
58 f()
59 }
60 }
61
62 fn execute_save_with<E, T, R>(
64 &self,
65 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
66 map: impl FnOnce(T) -> R,
67 ) -> Result<R, InternalError>
68 where
69 E: EntityKind<Canister = C> + EntityValue,
70 {
71 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
72
73 Ok(map(value))
74 }
75
76 fn execute_save_entity<E>(
78 &self,
79 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
80 ) -> Result<WriteResponse<E>, InternalError>
81 where
82 E: EntityKind<Canister = C> + EntityValue,
83 {
84 self.execute_save_with(op, WriteResponse::new)
85 }
86
87 fn execute_save_batch<E>(
88 &self,
89 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
90 ) -> Result<WriteBatchResponse<E>, InternalError>
91 where
92 E: EntityKind<Canister = C> + EntityValue,
93 {
94 self.execute_save_with(op, WriteBatchResponse::new)
95 }
96
97 fn execute_save_view<E>(
98 &self,
99 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
100 ) -> Result<E::ViewType, InternalError>
101 where
102 E: EntityKind<Canister = C> + EntityValue,
103 {
104 self.execute_save_with(op, std::convert::identity)
105 }
106
107 #[must_use]
112 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
113 where
114 E: EntityKind<Canister = C>,
115 {
116 FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
117 }
118
119 #[must_use]
120 pub const fn load_with_consistency<E>(
121 &self,
122 consistency: ReadConsistency,
123 ) -> FluentLoadQuery<'_, E>
124 where
125 E: EntityKind<Canister = C>,
126 {
127 FluentLoadQuery::new(self, Query::new(consistency))
128 }
129
130 #[must_use]
131 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
132 where
133 E: EntityKind<Canister = C>,
134 {
135 FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
136 }
137
138 #[must_use]
139 pub fn delete_with_consistency<E>(
140 &self,
141 consistency: ReadConsistency,
142 ) -> FluentDeleteQuery<'_, E>
143 where
144 E: EntityKind<Canister = C>,
145 {
146 FluentDeleteQuery::new(self, Query::new(consistency).delete())
147 }
148
149 #[must_use]
154 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
155 where
156 E: EntityKind<Canister = C> + EntityValue,
157 {
158 LoadExecutor::new(self.db, self.debug)
159 }
160
161 #[must_use]
162 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
163 where
164 E: EntityKind<Canister = C> + EntityValue,
165 {
166 DeleteExecutor::new(self.db, self.debug)
167 }
168
169 #[must_use]
170 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
171 where
172 E: EntityKind<Canister = C> + EntityValue,
173 {
174 SaveExecutor::new(self.db, self.debug)
175 }
176
177 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
182 where
183 E: EntityKind<Canister = C> + EntityValue,
184 {
185 let plan = query.plan()?;
186
187 let result = match query.mode() {
188 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
189 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
190 };
191
192 result.map_err(QueryError::Execute)
193 }
194
195 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
196 where
197 E: EntityKind<Canister = C> + EntityValue,
198 {
199 let plan = query.plan()?;
200
201 self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
202 .map_err(QueryError::Execute)
203 }
204
205 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
206 where
207 E: EntityKind<Canister = C> + EntityValue,
208 {
209 let plan = query.plan()?;
210
211 self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
212 .map_err(QueryError::Execute)
213 }
214
215 pub(crate) fn execute_load_query_min<E>(
216 &self,
217 query: &Query<E>,
218 ) -> Result<Option<Id<E>>, QueryError>
219 where
220 E: EntityKind<Canister = C> + EntityValue,
221 {
222 let plan = query.plan()?;
223
224 self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
225 .map_err(QueryError::Execute)
226 }
227
228 pub(crate) fn execute_load_query_max<E>(
229 &self,
230 query: &Query<E>,
231 ) -> Result<Option<Id<E>>, QueryError>
232 where
233 E: EntityKind<Canister = C> + EntityValue,
234 {
235 let plan = query.plan()?;
236
237 self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
238 .map_err(QueryError::Execute)
239 }
240
241 pub(crate) fn execute_load_query_min_by<E>(
242 &self,
243 query: &Query<E>,
244 target_field: &str,
245 ) -> Result<Option<Id<E>>, QueryError>
246 where
247 E: EntityKind<Canister = C> + EntityValue,
248 {
249 let plan = query.plan()?;
250
251 self.with_metrics(|| {
252 self.load_executor::<E>()
253 .aggregate_min_by(plan, target_field)
254 })
255 .map_err(QueryError::Execute)
256 }
257
258 pub(crate) fn execute_load_query_max_by<E>(
259 &self,
260 query: &Query<E>,
261 target_field: &str,
262 ) -> Result<Option<Id<E>>, QueryError>
263 where
264 E: EntityKind<Canister = C> + EntityValue,
265 {
266 let plan = query.plan()?;
267
268 self.with_metrics(|| {
269 self.load_executor::<E>()
270 .aggregate_max_by(plan, target_field)
271 })
272 .map_err(QueryError::Execute)
273 }
274
275 pub(crate) fn execute_load_query_nth_by<E>(
276 &self,
277 query: &Query<E>,
278 target_field: &str,
279 nth: usize,
280 ) -> Result<Option<Id<E>>, QueryError>
281 where
282 E: EntityKind<Canister = C> + EntityValue,
283 {
284 let plan = query.plan()?;
285
286 self.with_metrics(|| {
287 self.load_executor::<E>()
288 .aggregate_nth_by(plan, target_field, nth)
289 })
290 .map_err(QueryError::Execute)
291 }
292
293 pub(crate) fn execute_load_query_sum_by<E>(
294 &self,
295 query: &Query<E>,
296 target_field: &str,
297 ) -> Result<Option<Decimal>, QueryError>
298 where
299 E: EntityKind<Canister = C> + EntityValue,
300 {
301 let plan = query.plan()?;
302
303 self.with_metrics(|| {
304 self.load_executor::<E>()
305 .aggregate_sum_by(plan, target_field)
306 })
307 .map_err(QueryError::Execute)
308 }
309
310 pub(crate) fn execute_load_query_avg_by<E>(
311 &self,
312 query: &Query<E>,
313 target_field: &str,
314 ) -> Result<Option<Decimal>, QueryError>
315 where
316 E: EntityKind<Canister = C> + EntityValue,
317 {
318 let plan = query.plan()?;
319
320 self.with_metrics(|| {
321 self.load_executor::<E>()
322 .aggregate_avg_by(plan, target_field)
323 })
324 .map_err(QueryError::Execute)
325 }
326
327 pub(crate) fn execute_load_query_median_by<E>(
328 &self,
329 query: &Query<E>,
330 target_field: &str,
331 ) -> Result<Option<Id<E>>, QueryError>
332 where
333 E: EntityKind<Canister = C> + EntityValue,
334 {
335 let plan = query.plan()?;
336
337 self.with_metrics(|| {
338 self.load_executor::<E>()
339 .aggregate_median_by(plan, target_field)
340 })
341 .map_err(QueryError::Execute)
342 }
343
344 pub(crate) fn execute_load_query_count_distinct_by<E>(
345 &self,
346 query: &Query<E>,
347 target_field: &str,
348 ) -> Result<u32, QueryError>
349 where
350 E: EntityKind<Canister = C> + EntityValue,
351 {
352 let plan = query.plan()?;
353
354 self.with_metrics(|| {
355 self.load_executor::<E>()
356 .aggregate_count_distinct_by(plan, target_field)
357 })
358 .map_err(QueryError::Execute)
359 }
360
361 pub(crate) fn execute_load_query_min_max_by<E>(
362 &self,
363 query: &Query<E>,
364 target_field: &str,
365 ) -> Result<MinMaxByIds<E>, QueryError>
366 where
367 E: EntityKind<Canister = C> + EntityValue,
368 {
369 let plan = query.plan()?;
370
371 self.with_metrics(|| {
372 self.load_executor::<E>()
373 .aggregate_min_max_by(plan, target_field)
374 })
375 .map_err(QueryError::Execute)
376 }
377
378 pub(crate) fn execute_load_query_values_by<E>(
379 &self,
380 query: &Query<E>,
381 target_field: &str,
382 ) -> Result<Vec<Value>, QueryError>
383 where
384 E: EntityKind<Canister = C> + EntityValue,
385 {
386 let plan = query.plan()?;
387
388 self.with_metrics(|| self.load_executor::<E>().values_by(plan, target_field))
389 .map_err(QueryError::Execute)
390 }
391
392 pub(crate) fn execute_load_query_take<E>(
393 &self,
394 query: &Query<E>,
395 take_count: u32,
396 ) -> Result<Response<E>, QueryError>
397 where
398 E: EntityKind<Canister = C> + EntityValue,
399 {
400 let plan = query.plan()?;
401
402 self.with_metrics(|| self.load_executor::<E>().take(plan, take_count))
403 .map_err(QueryError::Execute)
404 }
405
406 pub(crate) fn execute_load_query_top_k_by<E>(
407 &self,
408 query: &Query<E>,
409 target_field: &str,
410 take_count: u32,
411 ) -> Result<Response<E>, QueryError>
412 where
413 E: EntityKind<Canister = C> + EntityValue,
414 {
415 let plan = query.plan()?;
416
417 self.with_metrics(|| {
418 self.load_executor::<E>()
419 .top_k_by(plan, target_field, take_count)
420 })
421 .map_err(QueryError::Execute)
422 }
423
424 pub(crate) fn execute_load_query_distinct_values_by<E>(
425 &self,
426 query: &Query<E>,
427 target_field: &str,
428 ) -> Result<Vec<Value>, QueryError>
429 where
430 E: EntityKind<Canister = C> + EntityValue,
431 {
432 let plan = query.plan()?;
433
434 self.with_metrics(|| {
435 self.load_executor::<E>()
436 .distinct_values_by(plan, target_field)
437 })
438 .map_err(QueryError::Execute)
439 }
440
441 pub(crate) fn execute_load_query_values_by_with_ids<E>(
442 &self,
443 query: &Query<E>,
444 target_field: &str,
445 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
446 where
447 E: EntityKind<Canister = C> + EntityValue,
448 {
449 let plan = query.plan()?;
450
451 self.with_metrics(|| {
452 self.load_executor::<E>()
453 .values_by_with_ids(plan, target_field)
454 })
455 .map_err(QueryError::Execute)
456 }
457
458 pub(crate) fn execute_load_query_first_value_by<E>(
459 &self,
460 query: &Query<E>,
461 target_field: &str,
462 ) -> Result<Option<Value>, QueryError>
463 where
464 E: EntityKind<Canister = C> + EntityValue,
465 {
466 let plan = query.plan()?;
467
468 self.with_metrics(|| self.load_executor::<E>().first_value_by(plan, target_field))
469 .map_err(QueryError::Execute)
470 }
471
472 pub(crate) fn execute_load_query_last_value_by<E>(
473 &self,
474 query: &Query<E>,
475 target_field: &str,
476 ) -> Result<Option<Value>, QueryError>
477 where
478 E: EntityKind<Canister = C> + EntityValue,
479 {
480 let plan = query.plan()?;
481
482 self.with_metrics(|| self.load_executor::<E>().last_value_by(plan, target_field))
483 .map_err(QueryError::Execute)
484 }
485
486 pub(crate) fn execute_load_query_first<E>(
487 &self,
488 query: &Query<E>,
489 ) -> Result<Option<Id<E>>, QueryError>
490 where
491 E: EntityKind<Canister = C> + EntityValue,
492 {
493 let plan = query.plan()?;
494
495 self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
496 .map_err(QueryError::Execute)
497 }
498
499 pub(crate) fn execute_load_query_last<E>(
500 &self,
501 query: &Query<E>,
502 ) -> Result<Option<Id<E>>, QueryError>
503 where
504 E: EntityKind<Canister = C> + EntityValue,
505 {
506 let plan = query.plan()?;
507
508 self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
509 .map_err(QueryError::Execute)
510 }
511
512 pub(crate) fn execute_load_query_paged_with_trace<E>(
513 &self,
514 query: &Query<E>,
515 cursor_token: Option<&str>,
516 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
517 where
518 E: EntityKind<Canister = C> + EntityValue,
519 {
520 let plan = query.plan()?;
521 let cursor_bytes = match cursor_token {
522 Some(token) => Some(decode_cursor(token).map_err(|reason| {
523 QueryError::from(PlanError::from(
524 CursorPlanError::InvalidContinuationCursor { reason },
525 ))
526 })?),
527 None => None,
528 };
529 let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
530
531 let (page, trace) = self
532 .with_metrics(|| {
533 self.load_executor::<E>()
534 .execute_paged_with_cursor_traced(plan, cursor)
535 })
536 .map_err(QueryError::Execute)?;
537
538 Ok(PagedLoadExecutionWithTrace::new(
539 page.items,
540 page.next_cursor,
541 trace,
542 ))
543 }
544
545 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
550 where
551 E: EntityKind<Canister = C> + EntityValue,
552 {
553 self.execute_save_entity(|save| save.insert(entity))
554 }
555
556 pub fn insert_many_atomic<E>(
562 &self,
563 entities: impl IntoIterator<Item = E>,
564 ) -> Result<WriteBatchResponse<E>, InternalError>
565 where
566 E: EntityKind<Canister = C> + EntityValue,
567 {
568 self.execute_save_batch(|save| save.insert_many_atomic(entities))
569 }
570
571 pub fn insert_many_non_atomic<E>(
575 &self,
576 entities: impl IntoIterator<Item = E>,
577 ) -> Result<WriteBatchResponse<E>, InternalError>
578 where
579 E: EntityKind<Canister = C> + EntityValue,
580 {
581 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
582 }
583
584 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
585 where
586 E: EntityKind<Canister = C> + EntityValue,
587 {
588 self.execute_save_entity(|save| save.replace(entity))
589 }
590
591 pub fn replace_many_atomic<E>(
597 &self,
598 entities: impl IntoIterator<Item = E>,
599 ) -> Result<WriteBatchResponse<E>, InternalError>
600 where
601 E: EntityKind<Canister = C> + EntityValue,
602 {
603 self.execute_save_batch(|save| save.replace_many_atomic(entities))
604 }
605
606 pub fn replace_many_non_atomic<E>(
610 &self,
611 entities: impl IntoIterator<Item = E>,
612 ) -> Result<WriteBatchResponse<E>, InternalError>
613 where
614 E: EntityKind<Canister = C> + EntityValue,
615 {
616 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
617 }
618
619 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
620 where
621 E: EntityKind<Canister = C> + EntityValue,
622 {
623 self.execute_save_entity(|save| save.update(entity))
624 }
625
626 pub fn update_many_atomic<E>(
632 &self,
633 entities: impl IntoIterator<Item = E>,
634 ) -> Result<WriteBatchResponse<E>, InternalError>
635 where
636 E: EntityKind<Canister = C> + EntityValue,
637 {
638 self.execute_save_batch(|save| save.update_many_atomic(entities))
639 }
640
641 pub fn update_many_non_atomic<E>(
645 &self,
646 entities: impl IntoIterator<Item = E>,
647 ) -> Result<WriteBatchResponse<E>, InternalError>
648 where
649 E: EntityKind<Canister = C> + EntityValue,
650 {
651 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
652 }
653
654 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
655 where
656 E: EntityKind<Canister = C> + EntityValue,
657 {
658 self.execute_save_view::<E>(|save| save.insert_view(view))
659 }
660
661 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
662 where
663 E: EntityKind<Canister = C> + EntityValue,
664 {
665 self.execute_save_view::<E>(|save| save.replace_view(view))
666 }
667
668 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
669 where
670 E: EntityKind<Canister = C> + EntityValue,
671 {
672 self.execute_save_view::<E>(|save| save.update_view(view))
673 }
674
675 #[cfg(test)]
677 #[doc(hidden)]
678 pub fn clear_stores_for_tests(&self) {
679 self.db.with_store_registry(|reg| {
680 for (_, store) in reg.iter() {
681 store.with_data_mut(DataStore::clear);
682 store.with_index_mut(IndexStore::clear);
683 }
684 });
685 }
686}