1#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5 db::{
6 Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7 QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9 query::{intent::QueryMode, plan::CursorPlanError},
10 },
11 error::InternalError,
12 obs::sink::{MetricsSink, with_metrics_sink},
13 traits::{CanisterKind, EntityKind, EntityValue},
14 types::{Decimal, Id},
15 value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20pub struct DbSession<C: CanisterKind> {
27 db: Db<C>,
28 debug: bool,
29 metrics: Option<&'static dyn MetricsSink>,
30}
31
32impl<C: CanisterKind> DbSession<C> {
33 #[must_use]
34 pub const fn new(db: Db<C>) -> Self {
35 Self {
36 db,
37 debug: false,
38 metrics: None,
39 }
40 }
41
42 #[must_use]
43 pub const fn debug(mut self) -> Self {
44 self.debug = true;
45 self
46 }
47
48 #[must_use]
49 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
50 self.metrics = Some(sink);
51 self
52 }
53
54 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
55 if let Some(sink) = self.metrics {
56 with_metrics_sink(sink, f)
57 } else {
58 f()
59 }
60 }
61
62 fn execute_save_with<E, T, R>(
64 &self,
65 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
66 map: impl FnOnce(T) -> R,
67 ) -> Result<R, InternalError>
68 where
69 E: EntityKind<Canister = C> + EntityValue,
70 {
71 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
72
73 Ok(map(value))
74 }
75
76 fn execute_save_entity<E>(
78 &self,
79 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
80 ) -> Result<WriteResponse<E>, InternalError>
81 where
82 E: EntityKind<Canister = C> + EntityValue,
83 {
84 self.execute_save_with(op, WriteResponse::new)
85 }
86
87 fn execute_save_batch<E>(
88 &self,
89 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
90 ) -> Result<WriteBatchResponse<E>, InternalError>
91 where
92 E: EntityKind<Canister = C> + EntityValue,
93 {
94 self.execute_save_with(op, WriteBatchResponse::new)
95 }
96
97 fn execute_save_view<E>(
98 &self,
99 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
100 ) -> Result<E::ViewType, InternalError>
101 where
102 E: EntityKind<Canister = C> + EntityValue,
103 {
104 self.execute_save_with(op, std::convert::identity)
105 }
106
107 #[must_use]
112 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
113 where
114 E: EntityKind<Canister = C>,
115 {
116 FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
117 }
118
119 #[must_use]
120 pub const fn load_with_consistency<E>(
121 &self,
122 consistency: ReadConsistency,
123 ) -> FluentLoadQuery<'_, E>
124 where
125 E: EntityKind<Canister = C>,
126 {
127 FluentLoadQuery::new(self, Query::new(consistency))
128 }
129
130 #[must_use]
131 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
132 where
133 E: EntityKind<Canister = C>,
134 {
135 FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
136 }
137
138 #[must_use]
139 pub fn delete_with_consistency<E>(
140 &self,
141 consistency: ReadConsistency,
142 ) -> FluentDeleteQuery<'_, E>
143 where
144 E: EntityKind<Canister = C>,
145 {
146 FluentDeleteQuery::new(self, Query::new(consistency).delete())
147 }
148
149 #[must_use]
154 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
155 where
156 E: EntityKind<Canister = C> + EntityValue,
157 {
158 LoadExecutor::new(self.db, self.debug)
159 }
160
161 #[must_use]
162 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
163 where
164 E: EntityKind<Canister = C> + EntityValue,
165 {
166 DeleteExecutor::new(self.db, self.debug)
167 }
168
169 #[must_use]
170 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
171 where
172 E: EntityKind<Canister = C> + EntityValue,
173 {
174 SaveExecutor::new(self.db, self.debug)
175 }
176
177 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
182 where
183 E: EntityKind<Canister = C> + EntityValue,
184 {
185 let plan = query.plan()?;
186
187 let result = match query.mode() {
188 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
189 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
190 };
191
192 result.map_err(QueryError::Execute)
193 }
194
195 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
196 where
197 E: EntityKind<Canister = C> + EntityValue,
198 {
199 let plan = query.plan()?;
200
201 self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
202 .map_err(QueryError::Execute)
203 }
204
205 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
206 where
207 E: EntityKind<Canister = C> + EntityValue,
208 {
209 let plan = query.plan()?;
210
211 self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
212 .map_err(QueryError::Execute)
213 }
214
215 pub(crate) fn execute_load_query_min<E>(
216 &self,
217 query: &Query<E>,
218 ) -> Result<Option<Id<E>>, QueryError>
219 where
220 E: EntityKind<Canister = C> + EntityValue,
221 {
222 let plan = query.plan()?;
223
224 self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
225 .map_err(QueryError::Execute)
226 }
227
228 pub(crate) fn execute_load_query_max<E>(
229 &self,
230 query: &Query<E>,
231 ) -> Result<Option<Id<E>>, QueryError>
232 where
233 E: EntityKind<Canister = C> + EntityValue,
234 {
235 let plan = query.plan()?;
236
237 self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
238 .map_err(QueryError::Execute)
239 }
240
241 pub(crate) fn execute_load_query_min_by<E>(
242 &self,
243 query: &Query<E>,
244 target_field: &str,
245 ) -> Result<Option<Id<E>>, QueryError>
246 where
247 E: EntityKind<Canister = C> + EntityValue,
248 {
249 let plan = query.plan()?;
250
251 self.with_metrics(|| {
252 self.load_executor::<E>()
253 .aggregate_min_by(plan, target_field)
254 })
255 .map_err(QueryError::Execute)
256 }
257
258 pub(crate) fn execute_load_query_max_by<E>(
259 &self,
260 query: &Query<E>,
261 target_field: &str,
262 ) -> Result<Option<Id<E>>, QueryError>
263 where
264 E: EntityKind<Canister = C> + EntityValue,
265 {
266 let plan = query.plan()?;
267
268 self.with_metrics(|| {
269 self.load_executor::<E>()
270 .aggregate_max_by(plan, target_field)
271 })
272 .map_err(QueryError::Execute)
273 }
274
275 pub(crate) fn execute_load_query_nth_by<E>(
276 &self,
277 query: &Query<E>,
278 target_field: &str,
279 nth: usize,
280 ) -> Result<Option<Id<E>>, QueryError>
281 where
282 E: EntityKind<Canister = C> + EntityValue,
283 {
284 let plan = query.plan()?;
285
286 self.with_metrics(|| {
287 self.load_executor::<E>()
288 .aggregate_nth_by(plan, target_field, nth)
289 })
290 .map_err(QueryError::Execute)
291 }
292
293 pub(crate) fn execute_load_query_sum_by<E>(
294 &self,
295 query: &Query<E>,
296 target_field: &str,
297 ) -> Result<Option<Decimal>, QueryError>
298 where
299 E: EntityKind<Canister = C> + EntityValue,
300 {
301 let plan = query.plan()?;
302
303 self.with_metrics(|| {
304 self.load_executor::<E>()
305 .aggregate_sum_by(plan, target_field)
306 })
307 .map_err(QueryError::Execute)
308 }
309
310 pub(crate) fn execute_load_query_avg_by<E>(
311 &self,
312 query: &Query<E>,
313 target_field: &str,
314 ) -> Result<Option<Decimal>, QueryError>
315 where
316 E: EntityKind<Canister = C> + EntityValue,
317 {
318 let plan = query.plan()?;
319
320 self.with_metrics(|| {
321 self.load_executor::<E>()
322 .aggregate_avg_by(plan, target_field)
323 })
324 .map_err(QueryError::Execute)
325 }
326
327 pub(crate) fn execute_load_query_median_by<E>(
328 &self,
329 query: &Query<E>,
330 target_field: &str,
331 ) -> Result<Option<Id<E>>, QueryError>
332 where
333 E: EntityKind<Canister = C> + EntityValue,
334 {
335 let plan = query.plan()?;
336
337 self.with_metrics(|| {
338 self.load_executor::<E>()
339 .aggregate_median_by(plan, target_field)
340 })
341 .map_err(QueryError::Execute)
342 }
343
344 pub(crate) fn execute_load_query_count_distinct_by<E>(
345 &self,
346 query: &Query<E>,
347 target_field: &str,
348 ) -> Result<u32, QueryError>
349 where
350 E: EntityKind<Canister = C> + EntityValue,
351 {
352 let plan = query.plan()?;
353
354 self.with_metrics(|| {
355 self.load_executor::<E>()
356 .aggregate_count_distinct_by(plan, target_field)
357 })
358 .map_err(QueryError::Execute)
359 }
360
361 pub(crate) fn execute_load_query_min_max_by<E>(
362 &self,
363 query: &Query<E>,
364 target_field: &str,
365 ) -> Result<MinMaxByIds<E>, QueryError>
366 where
367 E: EntityKind<Canister = C> + EntityValue,
368 {
369 let plan = query.plan()?;
370
371 self.with_metrics(|| {
372 self.load_executor::<E>()
373 .aggregate_min_max_by(plan, target_field)
374 })
375 .map_err(QueryError::Execute)
376 }
377
378 pub(crate) fn execute_load_query_values_by<E>(
379 &self,
380 query: &Query<E>,
381 target_field: &str,
382 ) -> Result<Vec<Value>, QueryError>
383 where
384 E: EntityKind<Canister = C> + EntityValue,
385 {
386 let plan = query.plan()?;
387
388 self.with_metrics(|| self.load_executor::<E>().values_by(plan, target_field))
389 .map_err(QueryError::Execute)
390 }
391
392 pub(crate) fn execute_load_query_take<E>(
393 &self,
394 query: &Query<E>,
395 take_count: u32,
396 ) -> Result<Response<E>, QueryError>
397 where
398 E: EntityKind<Canister = C> + EntityValue,
399 {
400 let plan = query.plan()?;
401
402 self.with_metrics(|| self.load_executor::<E>().take(plan, take_count))
403 .map_err(QueryError::Execute)
404 }
405
406 pub(crate) fn execute_load_query_top_k_by<E>(
407 &self,
408 query: &Query<E>,
409 target_field: &str,
410 take_count: u32,
411 ) -> Result<Response<E>, QueryError>
412 where
413 E: EntityKind<Canister = C> + EntityValue,
414 {
415 let plan = query.plan()?;
416
417 self.with_metrics(|| {
418 self.load_executor::<E>()
419 .top_k_by(plan, target_field, take_count)
420 })
421 .map_err(QueryError::Execute)
422 }
423
424 pub(crate) fn execute_load_query_bottom_k_by<E>(
425 &self,
426 query: &Query<E>,
427 target_field: &str,
428 take_count: u32,
429 ) -> Result<Response<E>, QueryError>
430 where
431 E: EntityKind<Canister = C> + EntityValue,
432 {
433 let plan = query.plan()?;
434
435 self.with_metrics(|| {
436 self.load_executor::<E>()
437 .bottom_k_by(plan, target_field, take_count)
438 })
439 .map_err(QueryError::Execute)
440 }
441
442 pub(crate) fn execute_load_query_top_k_by_values<E>(
443 &self,
444 query: &Query<E>,
445 target_field: &str,
446 take_count: u32,
447 ) -> Result<Vec<Value>, QueryError>
448 where
449 E: EntityKind<Canister = C> + EntityValue,
450 {
451 let plan = query.plan()?;
452
453 self.with_metrics(|| {
454 self.load_executor::<E>()
455 .top_k_by_values(plan, target_field, take_count)
456 })
457 .map_err(QueryError::Execute)
458 }
459
460 pub(crate) fn execute_load_query_bottom_k_by_values<E>(
461 &self,
462 query: &Query<E>,
463 target_field: &str,
464 take_count: u32,
465 ) -> Result<Vec<Value>, QueryError>
466 where
467 E: EntityKind<Canister = C> + EntityValue,
468 {
469 let plan = query.plan()?;
470
471 self.with_metrics(|| {
472 self.load_executor::<E>()
473 .bottom_k_by_values(plan, target_field, take_count)
474 })
475 .map_err(QueryError::Execute)
476 }
477
478 pub(crate) fn execute_load_query_top_k_by_with_ids<E>(
479 &self,
480 query: &Query<E>,
481 target_field: &str,
482 take_count: u32,
483 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
484 where
485 E: EntityKind<Canister = C> + EntityValue,
486 {
487 let plan = query.plan()?;
488
489 self.with_metrics(|| {
490 self.load_executor::<E>()
491 .top_k_by_with_ids(plan, target_field, take_count)
492 })
493 .map_err(QueryError::Execute)
494 }
495
496 pub(crate) fn execute_load_query_bottom_k_by_with_ids<E>(
497 &self,
498 query: &Query<E>,
499 target_field: &str,
500 take_count: u32,
501 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
502 where
503 E: EntityKind<Canister = C> + EntityValue,
504 {
505 let plan = query.plan()?;
506
507 self.with_metrics(|| {
508 self.load_executor::<E>()
509 .bottom_k_by_with_ids(plan, target_field, take_count)
510 })
511 .map_err(QueryError::Execute)
512 }
513
514 pub(crate) fn execute_load_query_distinct_values_by<E>(
515 &self,
516 query: &Query<E>,
517 target_field: &str,
518 ) -> Result<Vec<Value>, QueryError>
519 where
520 E: EntityKind<Canister = C> + EntityValue,
521 {
522 let plan = query.plan()?;
523
524 self.with_metrics(|| {
525 self.load_executor::<E>()
526 .distinct_values_by(plan, target_field)
527 })
528 .map_err(QueryError::Execute)
529 }
530
531 pub(crate) fn execute_load_query_values_by_with_ids<E>(
532 &self,
533 query: &Query<E>,
534 target_field: &str,
535 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
536 where
537 E: EntityKind<Canister = C> + EntityValue,
538 {
539 let plan = query.plan()?;
540
541 self.with_metrics(|| {
542 self.load_executor::<E>()
543 .values_by_with_ids(plan, target_field)
544 })
545 .map_err(QueryError::Execute)
546 }
547
548 pub(crate) fn execute_load_query_first_value_by<E>(
549 &self,
550 query: &Query<E>,
551 target_field: &str,
552 ) -> Result<Option<Value>, QueryError>
553 where
554 E: EntityKind<Canister = C> + EntityValue,
555 {
556 let plan = query.plan()?;
557
558 self.with_metrics(|| self.load_executor::<E>().first_value_by(plan, target_field))
559 .map_err(QueryError::Execute)
560 }
561
562 pub(crate) fn execute_load_query_last_value_by<E>(
563 &self,
564 query: &Query<E>,
565 target_field: &str,
566 ) -> Result<Option<Value>, QueryError>
567 where
568 E: EntityKind<Canister = C> + EntityValue,
569 {
570 let plan = query.plan()?;
571
572 self.with_metrics(|| self.load_executor::<E>().last_value_by(plan, target_field))
573 .map_err(QueryError::Execute)
574 }
575
576 pub(crate) fn execute_load_query_first<E>(
577 &self,
578 query: &Query<E>,
579 ) -> Result<Option<Id<E>>, QueryError>
580 where
581 E: EntityKind<Canister = C> + EntityValue,
582 {
583 let plan = query.plan()?;
584
585 self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
586 .map_err(QueryError::Execute)
587 }
588
589 pub(crate) fn execute_load_query_last<E>(
590 &self,
591 query: &Query<E>,
592 ) -> Result<Option<Id<E>>, QueryError>
593 where
594 E: EntityKind<Canister = C> + EntityValue,
595 {
596 let plan = query.plan()?;
597
598 self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
599 .map_err(QueryError::Execute)
600 }
601
602 pub(crate) fn execute_load_query_paged_with_trace<E>(
603 &self,
604 query: &Query<E>,
605 cursor_token: Option<&str>,
606 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
607 where
608 E: EntityKind<Canister = C> + EntityValue,
609 {
610 let plan = query.plan()?;
611 let cursor_bytes = match cursor_token {
612 Some(token) => Some(decode_cursor(token).map_err(|reason| {
613 QueryError::from(PlanError::from(
614 CursorPlanError::InvalidContinuationCursor { reason },
615 ))
616 })?),
617 None => None,
618 };
619 let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
620
621 let (page, trace) = self
622 .with_metrics(|| {
623 self.load_executor::<E>()
624 .execute_paged_with_cursor_traced(plan, cursor)
625 })
626 .map_err(QueryError::Execute)?;
627 let next_cursor = page
628 .next_cursor
629 .map(|token| {
630 token.encode().map_err(|err| {
631 QueryError::Execute(InternalError::serialize_internal(format!(
632 "failed to serialize continuation cursor: {err}"
633 )))
634 })
635 })
636 .transpose()?;
637
638 Ok(PagedLoadExecutionWithTrace::new(
639 page.items,
640 next_cursor,
641 trace,
642 ))
643 }
644
645 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
650 where
651 E: EntityKind<Canister = C> + EntityValue,
652 {
653 self.execute_save_entity(|save| save.insert(entity))
654 }
655
656 pub fn insert_many_atomic<E>(
662 &self,
663 entities: impl IntoIterator<Item = E>,
664 ) -> Result<WriteBatchResponse<E>, InternalError>
665 where
666 E: EntityKind<Canister = C> + EntityValue,
667 {
668 self.execute_save_batch(|save| save.insert_many_atomic(entities))
669 }
670
671 pub fn insert_many_non_atomic<E>(
675 &self,
676 entities: impl IntoIterator<Item = E>,
677 ) -> Result<WriteBatchResponse<E>, InternalError>
678 where
679 E: EntityKind<Canister = C> + EntityValue,
680 {
681 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
682 }
683
684 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
685 where
686 E: EntityKind<Canister = C> + EntityValue,
687 {
688 self.execute_save_entity(|save| save.replace(entity))
689 }
690
691 pub fn replace_many_atomic<E>(
697 &self,
698 entities: impl IntoIterator<Item = E>,
699 ) -> Result<WriteBatchResponse<E>, InternalError>
700 where
701 E: EntityKind<Canister = C> + EntityValue,
702 {
703 self.execute_save_batch(|save| save.replace_many_atomic(entities))
704 }
705
706 pub fn replace_many_non_atomic<E>(
710 &self,
711 entities: impl IntoIterator<Item = E>,
712 ) -> Result<WriteBatchResponse<E>, InternalError>
713 where
714 E: EntityKind<Canister = C> + EntityValue,
715 {
716 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
717 }
718
719 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
720 where
721 E: EntityKind<Canister = C> + EntityValue,
722 {
723 self.execute_save_entity(|save| save.update(entity))
724 }
725
726 pub fn update_many_atomic<E>(
732 &self,
733 entities: impl IntoIterator<Item = E>,
734 ) -> Result<WriteBatchResponse<E>, InternalError>
735 where
736 E: EntityKind<Canister = C> + EntityValue,
737 {
738 self.execute_save_batch(|save| save.update_many_atomic(entities))
739 }
740
741 pub fn update_many_non_atomic<E>(
745 &self,
746 entities: impl IntoIterator<Item = E>,
747 ) -> Result<WriteBatchResponse<E>, InternalError>
748 where
749 E: EntityKind<Canister = C> + EntityValue,
750 {
751 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
752 }
753
754 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
755 where
756 E: EntityKind<Canister = C> + EntityValue,
757 {
758 self.execute_save_view::<E>(|save| save.insert_view(view))
759 }
760
761 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
762 where
763 E: EntityKind<Canister = C> + EntityValue,
764 {
765 self.execute_save_view::<E>(|save| save.replace_view(view))
766 }
767
768 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
769 where
770 E: EntityKind<Canister = C> + EntityValue,
771 {
772 self.execute_save_view::<E>(|save| save.update_view(view))
773 }
774
775 #[cfg(test)]
777 #[doc(hidden)]
778 pub fn clear_stores_for_tests(&self) {
779 self.db.with_store_registry(|reg| {
780 for (_, store) in reg.iter() {
783 store.with_data_mut(DataStore::clear);
784 store.with_index_mut(IndexStore::clear);
785 }
786 });
787 }
788}