1#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5 db::{
6 Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7 QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse,
8 cursor::CursorPlanError,
9 decode_cursor,
10 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
11 query::intent::QueryMode,
12 },
13 error::InternalError,
14 obs::sink::{MetricsSink, with_metrics_sink},
15 traits::{CanisterKind, EntityKind, EntityValue},
16 types::{Decimal, Id},
17 value::Value,
18};
19
20type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
21
22pub struct DbSession<C: CanisterKind> {
29 db: Db<C>,
30 debug: bool,
31 metrics: Option<&'static dyn MetricsSink>,
32}
33
34impl<C: CanisterKind> DbSession<C> {
35 #[must_use]
36 pub const fn new(db: Db<C>) -> Self {
37 Self {
38 db,
39 debug: false,
40 metrics: None,
41 }
42 }
43
44 #[must_use]
45 pub const fn debug(mut self) -> Self {
46 self.debug = true;
47 self
48 }
49
50 #[must_use]
51 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
52 self.metrics = Some(sink);
53 self
54 }
55
56 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
57 if let Some(sink) = self.metrics {
58 with_metrics_sink(sink, f)
59 } else {
60 f()
61 }
62 }
63
64 fn execute_save_with<E, T, R>(
66 &self,
67 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
68 map: impl FnOnce(T) -> R,
69 ) -> Result<R, InternalError>
70 where
71 E: EntityKind<Canister = C> + EntityValue,
72 {
73 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
74
75 Ok(map(value))
76 }
77
78 fn execute_save_entity<E>(
80 &self,
81 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
82 ) -> Result<WriteResponse<E>, InternalError>
83 where
84 E: EntityKind<Canister = C> + EntityValue,
85 {
86 self.execute_save_with(op, WriteResponse::new)
87 }
88
89 fn execute_save_batch<E>(
90 &self,
91 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
92 ) -> Result<WriteBatchResponse<E>, InternalError>
93 where
94 E: EntityKind<Canister = C> + EntityValue,
95 {
96 self.execute_save_with(op, WriteBatchResponse::new)
97 }
98
99 fn execute_save_view<E>(
100 &self,
101 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
102 ) -> Result<E::ViewType, InternalError>
103 where
104 E: EntityKind<Canister = C> + EntityValue,
105 {
106 self.execute_save_with(op, std::convert::identity)
107 }
108
109 #[must_use]
114 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
115 where
116 E: EntityKind<Canister = C>,
117 {
118 FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
119 }
120
121 #[must_use]
122 pub const fn load_with_consistency<E>(
123 &self,
124 consistency: ReadConsistency,
125 ) -> FluentLoadQuery<'_, E>
126 where
127 E: EntityKind<Canister = C>,
128 {
129 FluentLoadQuery::new(self, Query::new(consistency))
130 }
131
132 #[must_use]
133 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
134 where
135 E: EntityKind<Canister = C>,
136 {
137 FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
138 }
139
140 #[must_use]
141 pub fn delete_with_consistency<E>(
142 &self,
143 consistency: ReadConsistency,
144 ) -> FluentDeleteQuery<'_, E>
145 where
146 E: EntityKind<Canister = C>,
147 {
148 FluentDeleteQuery::new(self, Query::new(consistency).delete())
149 }
150
151 #[must_use]
156 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
157 where
158 E: EntityKind<Canister = C> + EntityValue,
159 {
160 LoadExecutor::new(self.db, self.debug)
161 }
162
163 #[must_use]
164 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
165 where
166 E: EntityKind<Canister = C> + EntityValue,
167 {
168 DeleteExecutor::new(self.db, self.debug)
169 }
170
171 #[must_use]
172 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
173 where
174 E: EntityKind<Canister = C> + EntityValue,
175 {
176 SaveExecutor::new(self.db, self.debug)
177 }
178
179 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
184 where
185 E: EntityKind<Canister = C> + EntityValue,
186 {
187 let plan = query.plan()?;
188
189 let result = match query.mode() {
190 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
191 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
192 };
193
194 result.map_err(QueryError::Execute)
195 }
196
197 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
198 where
199 E: EntityKind<Canister = C> + EntityValue,
200 {
201 let plan = query.plan()?;
202
203 self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
204 .map_err(QueryError::Execute)
205 }
206
207 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
208 where
209 E: EntityKind<Canister = C> + EntityValue,
210 {
211 let plan = query.plan()?;
212
213 self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
214 .map_err(QueryError::Execute)
215 }
216
217 pub(crate) fn execute_load_query_min<E>(
218 &self,
219 query: &Query<E>,
220 ) -> Result<Option<Id<E>>, QueryError>
221 where
222 E: EntityKind<Canister = C> + EntityValue,
223 {
224 let plan = query.plan()?;
225
226 self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
227 .map_err(QueryError::Execute)
228 }
229
230 pub(crate) fn execute_load_query_max<E>(
231 &self,
232 query: &Query<E>,
233 ) -> Result<Option<Id<E>>, QueryError>
234 where
235 E: EntityKind<Canister = C> + EntityValue,
236 {
237 let plan = query.plan()?;
238
239 self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
240 .map_err(QueryError::Execute)
241 }
242
243 pub(crate) fn execute_load_query_min_by<E>(
244 &self,
245 query: &Query<E>,
246 target_field: &str,
247 ) -> Result<Option<Id<E>>, QueryError>
248 where
249 E: EntityKind<Canister = C> + EntityValue,
250 {
251 let plan = query.plan()?;
252
253 self.with_metrics(|| {
254 self.load_executor::<E>()
255 .aggregate_min_by(plan, target_field)
256 })
257 .map_err(QueryError::Execute)
258 }
259
260 pub(crate) fn execute_load_query_max_by<E>(
261 &self,
262 query: &Query<E>,
263 target_field: &str,
264 ) -> Result<Option<Id<E>>, QueryError>
265 where
266 E: EntityKind<Canister = C> + EntityValue,
267 {
268 let plan = query.plan()?;
269
270 self.with_metrics(|| {
271 self.load_executor::<E>()
272 .aggregate_max_by(plan, target_field)
273 })
274 .map_err(QueryError::Execute)
275 }
276
277 pub(crate) fn execute_load_query_nth_by<E>(
278 &self,
279 query: &Query<E>,
280 target_field: &str,
281 nth: usize,
282 ) -> Result<Option<Id<E>>, QueryError>
283 where
284 E: EntityKind<Canister = C> + EntityValue,
285 {
286 let plan = query.plan()?;
287
288 self.with_metrics(|| {
289 self.load_executor::<E>()
290 .aggregate_nth_by(plan, target_field, nth)
291 })
292 .map_err(QueryError::Execute)
293 }
294
295 pub(crate) fn execute_load_query_sum_by<E>(
296 &self,
297 query: &Query<E>,
298 target_field: &str,
299 ) -> Result<Option<Decimal>, QueryError>
300 where
301 E: EntityKind<Canister = C> + EntityValue,
302 {
303 let plan = query.plan()?;
304
305 self.with_metrics(|| {
306 self.load_executor::<E>()
307 .aggregate_sum_by(plan, target_field)
308 })
309 .map_err(QueryError::Execute)
310 }
311
312 pub(crate) fn execute_load_query_avg_by<E>(
313 &self,
314 query: &Query<E>,
315 target_field: &str,
316 ) -> Result<Option<Decimal>, QueryError>
317 where
318 E: EntityKind<Canister = C> + EntityValue,
319 {
320 let plan = query.plan()?;
321
322 self.with_metrics(|| {
323 self.load_executor::<E>()
324 .aggregate_avg_by(plan, target_field)
325 })
326 .map_err(QueryError::Execute)
327 }
328
329 pub(crate) fn execute_load_query_median_by<E>(
330 &self,
331 query: &Query<E>,
332 target_field: &str,
333 ) -> Result<Option<Id<E>>, QueryError>
334 where
335 E: EntityKind<Canister = C> + EntityValue,
336 {
337 let plan = query.plan()?;
338
339 self.with_metrics(|| {
340 self.load_executor::<E>()
341 .aggregate_median_by(plan, target_field)
342 })
343 .map_err(QueryError::Execute)
344 }
345
346 pub(crate) fn execute_load_query_count_distinct_by<E>(
347 &self,
348 query: &Query<E>,
349 target_field: &str,
350 ) -> Result<u32, QueryError>
351 where
352 E: EntityKind<Canister = C> + EntityValue,
353 {
354 let plan = query.plan()?;
355
356 self.with_metrics(|| {
357 self.load_executor::<E>()
358 .aggregate_count_distinct_by(plan, target_field)
359 })
360 .map_err(QueryError::Execute)
361 }
362
363 pub(crate) fn execute_load_query_min_max_by<E>(
364 &self,
365 query: &Query<E>,
366 target_field: &str,
367 ) -> Result<MinMaxByIds<E>, QueryError>
368 where
369 E: EntityKind<Canister = C> + EntityValue,
370 {
371 let plan = query.plan()?;
372
373 self.with_metrics(|| {
374 self.load_executor::<E>()
375 .aggregate_min_max_by(plan, target_field)
376 })
377 .map_err(QueryError::Execute)
378 }
379
380 pub(crate) fn execute_load_query_values_by<E>(
381 &self,
382 query: &Query<E>,
383 target_field: &str,
384 ) -> Result<Vec<Value>, QueryError>
385 where
386 E: EntityKind<Canister = C> + EntityValue,
387 {
388 let plan = query.plan()?;
389
390 self.with_metrics(|| self.load_executor::<E>().values_by(plan, target_field))
391 .map_err(QueryError::Execute)
392 }
393
394 pub(crate) fn execute_load_query_take<E>(
395 &self,
396 query: &Query<E>,
397 take_count: u32,
398 ) -> Result<Response<E>, QueryError>
399 where
400 E: EntityKind<Canister = C> + EntityValue,
401 {
402 let plan = query.plan()?;
403
404 self.with_metrics(|| self.load_executor::<E>().take(plan, take_count))
405 .map_err(QueryError::Execute)
406 }
407
408 pub(crate) fn execute_load_query_top_k_by<E>(
409 &self,
410 query: &Query<E>,
411 target_field: &str,
412 take_count: u32,
413 ) -> Result<Response<E>, QueryError>
414 where
415 E: EntityKind<Canister = C> + EntityValue,
416 {
417 let plan = query.plan()?;
418
419 self.with_metrics(|| {
420 self.load_executor::<E>()
421 .top_k_by(plan, target_field, take_count)
422 })
423 .map_err(QueryError::Execute)
424 }
425
426 pub(crate) fn execute_load_query_bottom_k_by<E>(
427 &self,
428 query: &Query<E>,
429 target_field: &str,
430 take_count: u32,
431 ) -> Result<Response<E>, QueryError>
432 where
433 E: EntityKind<Canister = C> + EntityValue,
434 {
435 let plan = query.plan()?;
436
437 self.with_metrics(|| {
438 self.load_executor::<E>()
439 .bottom_k_by(plan, target_field, take_count)
440 })
441 .map_err(QueryError::Execute)
442 }
443
444 pub(crate) fn execute_load_query_top_k_by_values<E>(
445 &self,
446 query: &Query<E>,
447 target_field: &str,
448 take_count: u32,
449 ) -> Result<Vec<Value>, QueryError>
450 where
451 E: EntityKind<Canister = C> + EntityValue,
452 {
453 let plan = query.plan()?;
454
455 self.with_metrics(|| {
456 self.load_executor::<E>()
457 .top_k_by_values(plan, target_field, take_count)
458 })
459 .map_err(QueryError::Execute)
460 }
461
462 pub(crate) fn execute_load_query_bottom_k_by_values<E>(
463 &self,
464 query: &Query<E>,
465 target_field: &str,
466 take_count: u32,
467 ) -> Result<Vec<Value>, QueryError>
468 where
469 E: EntityKind<Canister = C> + EntityValue,
470 {
471 let plan = query.plan()?;
472
473 self.with_metrics(|| {
474 self.load_executor::<E>()
475 .bottom_k_by_values(plan, target_field, take_count)
476 })
477 .map_err(QueryError::Execute)
478 }
479
480 pub(crate) fn execute_load_query_top_k_by_with_ids<E>(
481 &self,
482 query: &Query<E>,
483 target_field: &str,
484 take_count: u32,
485 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
486 where
487 E: EntityKind<Canister = C> + EntityValue,
488 {
489 let plan = query.plan()?;
490
491 self.with_metrics(|| {
492 self.load_executor::<E>()
493 .top_k_by_with_ids(plan, target_field, take_count)
494 })
495 .map_err(QueryError::Execute)
496 }
497
498 pub(crate) fn execute_load_query_bottom_k_by_with_ids<E>(
499 &self,
500 query: &Query<E>,
501 target_field: &str,
502 take_count: u32,
503 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
504 where
505 E: EntityKind<Canister = C> + EntityValue,
506 {
507 let plan = query.plan()?;
508
509 self.with_metrics(|| {
510 self.load_executor::<E>()
511 .bottom_k_by_with_ids(plan, target_field, take_count)
512 })
513 .map_err(QueryError::Execute)
514 }
515
516 pub(crate) fn execute_load_query_distinct_values_by<E>(
517 &self,
518 query: &Query<E>,
519 target_field: &str,
520 ) -> Result<Vec<Value>, QueryError>
521 where
522 E: EntityKind<Canister = C> + EntityValue,
523 {
524 let plan = query.plan()?;
525
526 self.with_metrics(|| {
527 self.load_executor::<E>()
528 .distinct_values_by(plan, target_field)
529 })
530 .map_err(QueryError::Execute)
531 }
532
533 pub(crate) fn execute_load_query_values_by_with_ids<E>(
534 &self,
535 query: &Query<E>,
536 target_field: &str,
537 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
538 where
539 E: EntityKind<Canister = C> + EntityValue,
540 {
541 let plan = query.plan()?;
542
543 self.with_metrics(|| {
544 self.load_executor::<E>()
545 .values_by_with_ids(plan, target_field)
546 })
547 .map_err(QueryError::Execute)
548 }
549
550 pub(crate) fn execute_load_query_first_value_by<E>(
551 &self,
552 query: &Query<E>,
553 target_field: &str,
554 ) -> Result<Option<Value>, QueryError>
555 where
556 E: EntityKind<Canister = C> + EntityValue,
557 {
558 let plan = query.plan()?;
559
560 self.with_metrics(|| self.load_executor::<E>().first_value_by(plan, target_field))
561 .map_err(QueryError::Execute)
562 }
563
564 pub(crate) fn execute_load_query_last_value_by<E>(
565 &self,
566 query: &Query<E>,
567 target_field: &str,
568 ) -> Result<Option<Value>, QueryError>
569 where
570 E: EntityKind<Canister = C> + EntityValue,
571 {
572 let plan = query.plan()?;
573
574 self.with_metrics(|| self.load_executor::<E>().last_value_by(plan, target_field))
575 .map_err(QueryError::Execute)
576 }
577
578 pub(crate) fn execute_load_query_first<E>(
579 &self,
580 query: &Query<E>,
581 ) -> Result<Option<Id<E>>, QueryError>
582 where
583 E: EntityKind<Canister = C> + EntityValue,
584 {
585 let plan = query.plan()?;
586
587 self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
588 .map_err(QueryError::Execute)
589 }
590
591 pub(crate) fn execute_load_query_last<E>(
592 &self,
593 query: &Query<E>,
594 ) -> Result<Option<Id<E>>, QueryError>
595 where
596 E: EntityKind<Canister = C> + EntityValue,
597 {
598 let plan = query.plan()?;
599
600 self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
601 .map_err(QueryError::Execute)
602 }
603
604 pub(crate) fn execute_load_query_paged_with_trace<E>(
605 &self,
606 query: &Query<E>,
607 cursor_token: Option<&str>,
608 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
609 where
610 E: EntityKind<Canister = C> + EntityValue,
611 {
612 let plan = query.plan()?;
613 let cursor_bytes = match cursor_token {
614 Some(token) => Some(decode_cursor(token).map_err(|reason| {
615 QueryError::from(PlanError::from(
616 CursorPlanError::InvalidContinuationCursor { reason },
617 ))
618 })?),
619 None => None,
620 };
621 let cursor = plan.prepare_cursor(cursor_bytes.as_deref())?;
622
623 let (page, trace) = self
624 .with_metrics(|| {
625 self.load_executor::<E>()
626 .execute_paged_with_cursor_traced(plan, cursor)
627 })
628 .map_err(QueryError::Execute)?;
629 let next_cursor = page
630 .next_cursor
631 .map(|token| {
632 token.encode().map_err(|err| {
633 QueryError::Execute(InternalError::serialize_internal(format!(
634 "failed to serialize continuation cursor: {err}"
635 )))
636 })
637 })
638 .transpose()?;
639
640 Ok(PagedLoadExecutionWithTrace::new(
641 page.items,
642 next_cursor,
643 trace,
644 ))
645 }
646
647 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
652 where
653 E: EntityKind<Canister = C> + EntityValue,
654 {
655 self.execute_save_entity(|save| save.insert(entity))
656 }
657
658 pub fn insert_many_atomic<E>(
664 &self,
665 entities: impl IntoIterator<Item = E>,
666 ) -> Result<WriteBatchResponse<E>, InternalError>
667 where
668 E: EntityKind<Canister = C> + EntityValue,
669 {
670 self.execute_save_batch(|save| save.insert_many_atomic(entities))
671 }
672
673 pub fn insert_many_non_atomic<E>(
677 &self,
678 entities: impl IntoIterator<Item = E>,
679 ) -> Result<WriteBatchResponse<E>, InternalError>
680 where
681 E: EntityKind<Canister = C> + EntityValue,
682 {
683 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
684 }
685
686 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
687 where
688 E: EntityKind<Canister = C> + EntityValue,
689 {
690 self.execute_save_entity(|save| save.replace(entity))
691 }
692
693 pub fn replace_many_atomic<E>(
699 &self,
700 entities: impl IntoIterator<Item = E>,
701 ) -> Result<WriteBatchResponse<E>, InternalError>
702 where
703 E: EntityKind<Canister = C> + EntityValue,
704 {
705 self.execute_save_batch(|save| save.replace_many_atomic(entities))
706 }
707
708 pub fn replace_many_non_atomic<E>(
712 &self,
713 entities: impl IntoIterator<Item = E>,
714 ) -> Result<WriteBatchResponse<E>, InternalError>
715 where
716 E: EntityKind<Canister = C> + EntityValue,
717 {
718 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
719 }
720
721 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
722 where
723 E: EntityKind<Canister = C> + EntityValue,
724 {
725 self.execute_save_entity(|save| save.update(entity))
726 }
727
728 pub fn update_many_atomic<E>(
734 &self,
735 entities: impl IntoIterator<Item = E>,
736 ) -> Result<WriteBatchResponse<E>, InternalError>
737 where
738 E: EntityKind<Canister = C> + EntityValue,
739 {
740 self.execute_save_batch(|save| save.update_many_atomic(entities))
741 }
742
743 pub fn update_many_non_atomic<E>(
747 &self,
748 entities: impl IntoIterator<Item = E>,
749 ) -> Result<WriteBatchResponse<E>, InternalError>
750 where
751 E: EntityKind<Canister = C> + EntityValue,
752 {
753 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
754 }
755
756 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
757 where
758 E: EntityKind<Canister = C> + EntityValue,
759 {
760 self.execute_save_view::<E>(|save| save.insert_view(view))
761 }
762
763 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
764 where
765 E: EntityKind<Canister = C> + EntityValue,
766 {
767 self.execute_save_view::<E>(|save| save.replace_view(view))
768 }
769
770 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
771 where
772 E: EntityKind<Canister = C> + EntityValue,
773 {
774 self.execute_save_view::<E>(|save| save.update_view(view))
775 }
776
777 #[cfg(test)]
779 #[doc(hidden)]
780 pub fn clear_stores_for_tests(&self) {
781 self.db.with_store_registry(|reg| {
782 for (_, store) in reg.iter() {
785 store.with_data_mut(DataStore::clear);
786 store.with_index_mut(IndexStore::clear);
787 }
788 });
789 }
790}