1#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5 db::{
6 Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7 QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse,
8 cursor::CursorPlanError,
9 decode_cursor,
10 executor::{DeleteExecutor, ExecutorPlanError, LoadExecutor, SaveExecutor},
11 query::intent::QueryMode,
12 },
13 error::InternalError,
14 obs::sink::{MetricsSink, with_metrics_sink},
15 traits::{CanisterKind, EntityKind, EntityValue},
16 types::{Decimal, Id},
17 value::Value,
18};
19
20fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
22 QueryError::from(err.into_plan_error())
23}
24
25pub struct DbSession<C: CanisterKind> {
32 db: Db<C>,
33 debug: bool,
34 metrics: Option<&'static dyn MetricsSink>,
35}
36
37impl<C: CanisterKind> DbSession<C> {
38 #[must_use]
39 pub const fn new(db: Db<C>) -> Self {
40 Self {
41 db,
42 debug: false,
43 metrics: None,
44 }
45 }
46
47 #[must_use]
48 pub const fn debug(mut self) -> Self {
49 self.debug = true;
50 self
51 }
52
53 #[must_use]
54 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
55 self.metrics = Some(sink);
56 self
57 }
58
59 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
60 if let Some(sink) = self.metrics {
61 with_metrics_sink(sink, f)
62 } else {
63 f()
64 }
65 }
66
67 fn execute_save_with<E, T, R>(
69 &self,
70 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
71 map: impl FnOnce(T) -> R,
72 ) -> Result<R, InternalError>
73 where
74 E: EntityKind<Canister = C> + EntityValue,
75 {
76 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
77
78 Ok(map(value))
79 }
80
81 fn execute_save_entity<E>(
83 &self,
84 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
85 ) -> Result<WriteResponse<E>, InternalError>
86 where
87 E: EntityKind<Canister = C> + EntityValue,
88 {
89 self.execute_save_with(op, WriteResponse::new)
90 }
91
92 fn execute_save_batch<E>(
93 &self,
94 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
95 ) -> Result<WriteBatchResponse<E>, InternalError>
96 where
97 E: EntityKind<Canister = C> + EntityValue,
98 {
99 self.execute_save_with(op, WriteBatchResponse::new)
100 }
101
102 fn execute_save_view<E>(
103 &self,
104 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
105 ) -> Result<E::ViewType, InternalError>
106 where
107 E: EntityKind<Canister = C> + EntityValue,
108 {
109 self.execute_save_with(op, std::convert::identity)
110 }
111
112 #[must_use]
117 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
118 where
119 E: EntityKind<Canister = C>,
120 {
121 FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
122 }
123
124 #[must_use]
125 pub const fn load_with_consistency<E>(
126 &self,
127 consistency: ReadConsistency,
128 ) -> FluentLoadQuery<'_, E>
129 where
130 E: EntityKind<Canister = C>,
131 {
132 FluentLoadQuery::new(self, Query::new(consistency))
133 }
134
135 #[must_use]
136 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
137 where
138 E: EntityKind<Canister = C>,
139 {
140 FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
141 }
142
143 #[must_use]
144 pub fn delete_with_consistency<E>(
145 &self,
146 consistency: ReadConsistency,
147 ) -> FluentDeleteQuery<'_, E>
148 where
149 E: EntityKind<Canister = C>,
150 {
151 FluentDeleteQuery::new(self, Query::new(consistency).delete())
152 }
153
154 #[must_use]
159 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
160 where
161 E: EntityKind<Canister = C> + EntityValue,
162 {
163 LoadExecutor::new(self.db, self.debug)
164 }
165
166 #[must_use]
167 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
168 where
169 E: EntityKind<Canister = C> + EntityValue,
170 {
171 DeleteExecutor::new(self.db, self.debug)
172 }
173
174 #[must_use]
175 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
176 where
177 E: EntityKind<Canister = C> + EntityValue,
178 {
179 SaveExecutor::new(self.db, self.debug)
180 }
181
182 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
187 where
188 E: EntityKind<Canister = C> + EntityValue,
189 {
190 let plan = query.plan()?;
191
192 let result = match query.mode() {
193 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
194 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
195 };
196
197 result.map_err(QueryError::Execute)
198 }
199
200 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
201 where
202 E: EntityKind<Canister = C> + EntityValue,
203 {
204 let plan = query.plan()?;
205
206 self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
207 .map_err(QueryError::Execute)
208 }
209
210 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
211 where
212 E: EntityKind<Canister = C> + EntityValue,
213 {
214 let plan = query.plan()?;
215
216 self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
217 .map_err(QueryError::Execute)
218 }
219
220 pub(crate) fn execute_load_query_min<E>(
221 &self,
222 query: &Query<E>,
223 ) -> Result<Option<Id<E>>, QueryError>
224 where
225 E: EntityKind<Canister = C> + EntityValue,
226 {
227 let plan = query.plan()?;
228
229 self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
230 .map_err(QueryError::Execute)
231 }
232
233 pub(crate) fn execute_load_query_max<E>(
234 &self,
235 query: &Query<E>,
236 ) -> Result<Option<Id<E>>, QueryError>
237 where
238 E: EntityKind<Canister = C> + EntityValue,
239 {
240 let plan = query.plan()?;
241
242 self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
243 .map_err(QueryError::Execute)
244 }
245
246 pub(crate) fn execute_load_query_min_by<E>(
247 &self,
248 query: &Query<E>,
249 target_field: &str,
250 ) -> Result<Option<Id<E>>, QueryError>
251 where
252 E: EntityKind<Canister = C> + EntityValue,
253 {
254 let plan = query.plan()?;
255
256 self.with_metrics(|| {
257 self.load_executor::<E>()
258 .aggregate_min_by(plan, target_field)
259 })
260 .map_err(QueryError::Execute)
261 }
262
263 pub(crate) fn execute_load_query_max_by<E>(
264 &self,
265 query: &Query<E>,
266 target_field: &str,
267 ) -> Result<Option<Id<E>>, QueryError>
268 where
269 E: EntityKind<Canister = C> + EntityValue,
270 {
271 let plan = query.plan()?;
272
273 self.with_metrics(|| {
274 self.load_executor::<E>()
275 .aggregate_max_by(plan, target_field)
276 })
277 .map_err(QueryError::Execute)
278 }
279
280 pub(crate) fn execute_load_query_nth_by<E>(
281 &self,
282 query: &Query<E>,
283 target_field: &str,
284 nth: usize,
285 ) -> Result<Option<Id<E>>, QueryError>
286 where
287 E: EntityKind<Canister = C> + EntityValue,
288 {
289 let plan = query.plan()?;
290
291 self.with_metrics(|| {
292 self.load_executor::<E>()
293 .aggregate_nth_by(plan, target_field, nth)
294 })
295 .map_err(QueryError::Execute)
296 }
297
298 pub(crate) fn execute_load_query_sum_by<E>(
299 &self,
300 query: &Query<E>,
301 target_field: &str,
302 ) -> Result<Option<Decimal>, QueryError>
303 where
304 E: EntityKind<Canister = C> + EntityValue,
305 {
306 let plan = query.plan()?;
307
308 self.with_metrics(|| {
309 self.load_executor::<E>()
310 .aggregate_sum_by(plan, target_field)
311 })
312 .map_err(QueryError::Execute)
313 }
314
315 pub(crate) fn execute_load_query_avg_by<E>(
316 &self,
317 query: &Query<E>,
318 target_field: &str,
319 ) -> Result<Option<Decimal>, QueryError>
320 where
321 E: EntityKind<Canister = C> + EntityValue,
322 {
323 let plan = query.plan()?;
324
325 self.with_metrics(|| {
326 self.load_executor::<E>()
327 .aggregate_avg_by(plan, target_field)
328 })
329 .map_err(QueryError::Execute)
330 }
331
332 pub(crate) fn execute_load_query_median_by<E>(
333 &self,
334 query: &Query<E>,
335 target_field: &str,
336 ) -> Result<Option<Id<E>>, QueryError>
337 where
338 E: EntityKind<Canister = C> + EntityValue,
339 {
340 let plan = query.plan()?;
341
342 self.with_metrics(|| {
343 self.load_executor::<E>()
344 .aggregate_median_by(plan, target_field)
345 })
346 .map_err(QueryError::Execute)
347 }
348
349 pub(crate) fn execute_load_query_count_distinct_by<E>(
350 &self,
351 query: &Query<E>,
352 target_field: &str,
353 ) -> Result<u32, QueryError>
354 where
355 E: EntityKind<Canister = C> + EntityValue,
356 {
357 let plan = query.plan()?;
358
359 self.with_metrics(|| {
360 self.load_executor::<E>()
361 .aggregate_count_distinct_by(plan, target_field)
362 })
363 .map_err(QueryError::Execute)
364 }
365
366 #[expect(clippy::type_complexity)]
367 pub(crate) fn execute_load_query_min_max_by<E>(
368 &self,
369 query: &Query<E>,
370 target_field: &str,
371 ) -> Result<Option<(Id<E>, Id<E>)>, QueryError>
372 where
373 E: EntityKind<Canister = C> + EntityValue,
374 {
375 let plan = query.plan()?;
376
377 self.with_metrics(|| {
378 self.load_executor::<E>()
379 .aggregate_min_max_by(plan, target_field)
380 })
381 .map_err(QueryError::Execute)
382 }
383
384 pub(crate) fn execute_load_query_values_by<E>(
385 &self,
386 query: &Query<E>,
387 target_field: &str,
388 ) -> Result<Vec<Value>, QueryError>
389 where
390 E: EntityKind<Canister = C> + EntityValue,
391 {
392 let plan = query.plan()?;
393
394 self.with_metrics(|| self.load_executor::<E>().values_by(plan, target_field))
395 .map_err(QueryError::Execute)
396 }
397
398 pub(crate) fn execute_load_query_take<E>(
399 &self,
400 query: &Query<E>,
401 take_count: u32,
402 ) -> Result<Response<E>, QueryError>
403 where
404 E: EntityKind<Canister = C> + EntityValue,
405 {
406 let plan = query.plan()?;
407
408 self.with_metrics(|| self.load_executor::<E>().take(plan, take_count))
409 .map_err(QueryError::Execute)
410 }
411
412 pub(crate) fn execute_load_query_top_k_by<E>(
413 &self,
414 query: &Query<E>,
415 target_field: &str,
416 take_count: u32,
417 ) -> Result<Response<E>, QueryError>
418 where
419 E: EntityKind<Canister = C> + EntityValue,
420 {
421 let plan = query.plan()?;
422
423 self.with_metrics(|| {
424 self.load_executor::<E>()
425 .top_k_by(plan, target_field, take_count)
426 })
427 .map_err(QueryError::Execute)
428 }
429
430 pub(crate) fn execute_load_query_bottom_k_by<E>(
431 &self,
432 query: &Query<E>,
433 target_field: &str,
434 take_count: u32,
435 ) -> Result<Response<E>, QueryError>
436 where
437 E: EntityKind<Canister = C> + EntityValue,
438 {
439 let plan = query.plan()?;
440
441 self.with_metrics(|| {
442 self.load_executor::<E>()
443 .bottom_k_by(plan, target_field, take_count)
444 })
445 .map_err(QueryError::Execute)
446 }
447
448 pub(crate) fn execute_load_query_top_k_by_values<E>(
449 &self,
450 query: &Query<E>,
451 target_field: &str,
452 take_count: u32,
453 ) -> Result<Vec<Value>, QueryError>
454 where
455 E: EntityKind<Canister = C> + EntityValue,
456 {
457 let plan = query.plan()?;
458
459 self.with_metrics(|| {
460 self.load_executor::<E>()
461 .top_k_by_values(plan, target_field, take_count)
462 })
463 .map_err(QueryError::Execute)
464 }
465
466 pub(crate) fn execute_load_query_bottom_k_by_values<E>(
467 &self,
468 query: &Query<E>,
469 target_field: &str,
470 take_count: u32,
471 ) -> Result<Vec<Value>, QueryError>
472 where
473 E: EntityKind<Canister = C> + EntityValue,
474 {
475 let plan = query.plan()?;
476
477 self.with_metrics(|| {
478 self.load_executor::<E>()
479 .bottom_k_by_values(plan, target_field, take_count)
480 })
481 .map_err(QueryError::Execute)
482 }
483
484 pub(crate) fn execute_load_query_top_k_by_with_ids<E>(
485 &self,
486 query: &Query<E>,
487 target_field: &str,
488 take_count: u32,
489 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
490 where
491 E: EntityKind<Canister = C> + EntityValue,
492 {
493 let plan = query.plan()?;
494
495 self.with_metrics(|| {
496 self.load_executor::<E>()
497 .top_k_by_with_ids(plan, target_field, take_count)
498 })
499 .map_err(QueryError::Execute)
500 }
501
502 pub(crate) fn execute_load_query_bottom_k_by_with_ids<E>(
503 &self,
504 query: &Query<E>,
505 target_field: &str,
506 take_count: u32,
507 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
508 where
509 E: EntityKind<Canister = C> + EntityValue,
510 {
511 let plan = query.plan()?;
512
513 self.with_metrics(|| {
514 self.load_executor::<E>()
515 .bottom_k_by_with_ids(plan, target_field, take_count)
516 })
517 .map_err(QueryError::Execute)
518 }
519
520 pub(crate) fn execute_load_query_distinct_values_by<E>(
521 &self,
522 query: &Query<E>,
523 target_field: &str,
524 ) -> Result<Vec<Value>, QueryError>
525 where
526 E: EntityKind<Canister = C> + EntityValue,
527 {
528 let plan = query.plan()?;
529
530 self.with_metrics(|| {
531 self.load_executor::<E>()
532 .distinct_values_by(plan, target_field)
533 })
534 .map_err(QueryError::Execute)
535 }
536
537 pub(crate) fn execute_load_query_values_by_with_ids<E>(
538 &self,
539 query: &Query<E>,
540 target_field: &str,
541 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
542 where
543 E: EntityKind<Canister = C> + EntityValue,
544 {
545 let plan = query.plan()?;
546
547 self.with_metrics(|| {
548 self.load_executor::<E>()
549 .values_by_with_ids(plan, target_field)
550 })
551 .map_err(QueryError::Execute)
552 }
553
554 pub(crate) fn execute_load_query_first_value_by<E>(
555 &self,
556 query: &Query<E>,
557 target_field: &str,
558 ) -> Result<Option<Value>, QueryError>
559 where
560 E: EntityKind<Canister = C> + EntityValue,
561 {
562 let plan = query.plan()?;
563
564 self.with_metrics(|| self.load_executor::<E>().first_value_by(plan, target_field))
565 .map_err(QueryError::Execute)
566 }
567
568 pub(crate) fn execute_load_query_last_value_by<E>(
569 &self,
570 query: &Query<E>,
571 target_field: &str,
572 ) -> Result<Option<Value>, QueryError>
573 where
574 E: EntityKind<Canister = C> + EntityValue,
575 {
576 let plan = query.plan()?;
577
578 self.with_metrics(|| self.load_executor::<E>().last_value_by(plan, target_field))
579 .map_err(QueryError::Execute)
580 }
581
582 pub(crate) fn execute_load_query_first<E>(
583 &self,
584 query: &Query<E>,
585 ) -> Result<Option<Id<E>>, QueryError>
586 where
587 E: EntityKind<Canister = C> + EntityValue,
588 {
589 let plan = query.plan()?;
590
591 self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
592 .map_err(QueryError::Execute)
593 }
594
595 pub(crate) fn execute_load_query_last<E>(
596 &self,
597 query: &Query<E>,
598 ) -> Result<Option<Id<E>>, QueryError>
599 where
600 E: EntityKind<Canister = C> + EntityValue,
601 {
602 let plan = query.plan()?;
603
604 self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
605 .map_err(QueryError::Execute)
606 }
607
608 pub(crate) fn execute_load_query_paged_with_trace<E>(
609 &self,
610 query: &Query<E>,
611 cursor_token: Option<&str>,
612 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
613 where
614 E: EntityKind<Canister = C> + EntityValue,
615 {
616 let plan = query.plan()?;
617 let cursor_bytes = match cursor_token {
618 Some(token) => Some(decode_cursor(token).map_err(|reason| {
619 QueryError::from(PlanError::from(
620 CursorPlanError::InvalidContinuationCursor { reason },
621 ))
622 })?),
623 None => None,
624 };
625 let cursor = plan
626 .prepare_cursor(cursor_bytes.as_deref())
627 .map_err(map_executor_plan_error)?;
628
629 let (page, trace) = self
630 .with_metrics(|| {
631 self.load_executor::<E>()
632 .execute_paged_with_cursor_traced(plan, cursor)
633 })
634 .map_err(QueryError::Execute)?;
635 let next_cursor = page
636 .next_cursor
637 .map(|token| {
638 token.encode().map_err(|err| {
639 QueryError::Execute(InternalError::serialize_internal(format!(
640 "failed to serialize continuation cursor: {err}"
641 )))
642 })
643 })
644 .transpose()?;
645
646 Ok(PagedLoadExecutionWithTrace::new(
647 page.items,
648 next_cursor,
649 trace,
650 ))
651 }
652
653 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
658 where
659 E: EntityKind<Canister = C> + EntityValue,
660 {
661 self.execute_save_entity(|save| save.insert(entity))
662 }
663
664 pub fn insert_many_atomic<E>(
670 &self,
671 entities: impl IntoIterator<Item = E>,
672 ) -> Result<WriteBatchResponse<E>, InternalError>
673 where
674 E: EntityKind<Canister = C> + EntityValue,
675 {
676 self.execute_save_batch(|save| save.insert_many_atomic(entities))
677 }
678
679 pub fn insert_many_non_atomic<E>(
683 &self,
684 entities: impl IntoIterator<Item = E>,
685 ) -> Result<WriteBatchResponse<E>, InternalError>
686 where
687 E: EntityKind<Canister = C> + EntityValue,
688 {
689 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
690 }
691
692 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
693 where
694 E: EntityKind<Canister = C> + EntityValue,
695 {
696 self.execute_save_entity(|save| save.replace(entity))
697 }
698
699 pub fn replace_many_atomic<E>(
705 &self,
706 entities: impl IntoIterator<Item = E>,
707 ) -> Result<WriteBatchResponse<E>, InternalError>
708 where
709 E: EntityKind<Canister = C> + EntityValue,
710 {
711 self.execute_save_batch(|save| save.replace_many_atomic(entities))
712 }
713
714 pub fn replace_many_non_atomic<E>(
718 &self,
719 entities: impl IntoIterator<Item = E>,
720 ) -> Result<WriteBatchResponse<E>, InternalError>
721 where
722 E: EntityKind<Canister = C> + EntityValue,
723 {
724 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
725 }
726
727 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
728 where
729 E: EntityKind<Canister = C> + EntityValue,
730 {
731 self.execute_save_entity(|save| save.update(entity))
732 }
733
734 pub fn update_many_atomic<E>(
740 &self,
741 entities: impl IntoIterator<Item = E>,
742 ) -> Result<WriteBatchResponse<E>, InternalError>
743 where
744 E: EntityKind<Canister = C> + EntityValue,
745 {
746 self.execute_save_batch(|save| save.update_many_atomic(entities))
747 }
748
749 pub fn update_many_non_atomic<E>(
753 &self,
754 entities: impl IntoIterator<Item = E>,
755 ) -> Result<WriteBatchResponse<E>, InternalError>
756 where
757 E: EntityKind<Canister = C> + EntityValue,
758 {
759 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
760 }
761
762 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
763 where
764 E: EntityKind<Canister = C> + EntityValue,
765 {
766 self.execute_save_view::<E>(|save| save.insert_view(view))
767 }
768
769 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
770 where
771 E: EntityKind<Canister = C> + EntityValue,
772 {
773 self.execute_save_view::<E>(|save| save.replace_view(view))
774 }
775
776 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
777 where
778 E: EntityKind<Canister = C> + EntityValue,
779 {
780 self.execute_save_view::<E>(|save| save.update_view(view))
781 }
782
783 #[cfg(test)]
785 #[doc(hidden)]
786 pub fn clear_stores_for_tests(&self) {
787 self.db.with_store_registry(|reg| {
788 for (_, store) in reg.iter() {
791 store.with_data_mut(DataStore::clear);
792 store.with_index_mut(IndexStore::clear);
793 }
794 });
795 }
796}