1#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5 db::{
6 Db, FluentDeleteQuery, FluentLoadQuery, PagedGroupedExecutionWithTrace,
7 PagedLoadExecutionWithTrace, PlanError, Query, QueryError, ReadConsistency, Response,
8 WriteBatchResponse, WriteResponse,
9 cursor::CursorPlanError,
10 decode_cursor,
11 executor::{DeleteExecutor, ExecutablePlan, ExecutorPlanError, LoadExecutor, SaveExecutor},
12 query::intent::QueryMode,
13 },
14 error::InternalError,
15 obs::sink::{MetricsSink, with_metrics_sink},
16 traits::{CanisterKind, EntityKind, EntityValue},
17 types::{Decimal, Id},
18 value::Value,
19};
20
21fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
23 QueryError::from(err.into_plan_error())
24}
25
26pub struct DbSession<C: CanisterKind> {
33 db: Db<C>,
34 debug: bool,
35 metrics: Option<&'static dyn MetricsSink>,
36}
37
38impl<C: CanisterKind> DbSession<C> {
39 #[must_use]
40 pub const fn new(db: Db<C>) -> Self {
41 Self {
42 db,
43 debug: false,
44 metrics: None,
45 }
46 }
47
48 #[must_use]
49 pub const fn debug(mut self) -> Self {
50 self.debug = true;
51 self
52 }
53
54 #[must_use]
55 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
56 self.metrics = Some(sink);
57 self
58 }
59
60 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
61 if let Some(sink) = self.metrics {
62 with_metrics_sink(sink, f)
63 } else {
64 f()
65 }
66 }
67
68 fn execute_save_with<E, T, R>(
70 &self,
71 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
72 map: impl FnOnce(T) -> R,
73 ) -> Result<R, InternalError>
74 where
75 E: EntityKind<Canister = C> + EntityValue,
76 {
77 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
78
79 Ok(map(value))
80 }
81
82 fn execute_save_entity<E>(
84 &self,
85 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
86 ) -> Result<WriteResponse<E>, InternalError>
87 where
88 E: EntityKind<Canister = C> + EntityValue,
89 {
90 self.execute_save_with(op, WriteResponse::new)
91 }
92
93 fn execute_save_batch<E>(
94 &self,
95 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
96 ) -> Result<WriteBatchResponse<E>, InternalError>
97 where
98 E: EntityKind<Canister = C> + EntityValue,
99 {
100 self.execute_save_with(op, WriteBatchResponse::new)
101 }
102
103 fn execute_save_view<E>(
104 &self,
105 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
106 ) -> Result<E::ViewType, InternalError>
107 where
108 E: EntityKind<Canister = C> + EntityValue,
109 {
110 self.execute_save_with(op, std::convert::identity)
111 }
112
113 #[must_use]
118 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
119 where
120 E: EntityKind<Canister = C>,
121 {
122 FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
123 }
124
125 #[must_use]
126 pub const fn load_with_consistency<E>(
127 &self,
128 consistency: ReadConsistency,
129 ) -> FluentLoadQuery<'_, E>
130 where
131 E: EntityKind<Canister = C>,
132 {
133 FluentLoadQuery::new(self, Query::new(consistency))
134 }
135
136 #[must_use]
137 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
138 where
139 E: EntityKind<Canister = C>,
140 {
141 FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
142 }
143
144 #[must_use]
145 pub fn delete_with_consistency<E>(
146 &self,
147 consistency: ReadConsistency,
148 ) -> FluentDeleteQuery<'_, E>
149 where
150 E: EntityKind<Canister = C>,
151 {
152 FluentDeleteQuery::new(self, Query::new(consistency).delete())
153 }
154
155 #[must_use]
160 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
161 where
162 E: EntityKind<Canister = C> + EntityValue,
163 {
164 LoadExecutor::new(self.db, self.debug)
165 }
166
167 #[must_use]
168 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
169 where
170 E: EntityKind<Canister = C> + EntityValue,
171 {
172 DeleteExecutor::new(self.db, self.debug)
173 }
174
175 #[must_use]
176 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
177 where
178 E: EntityKind<Canister = C> + EntityValue,
179 {
180 SaveExecutor::new(self.db, self.debug)
181 }
182
183 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
188 where
189 E: EntityKind<Canister = C> + EntityValue,
190 {
191 let plan = query.plan()?;
192
193 let result = match query.mode() {
194 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
195 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
196 };
197
198 result.map_err(QueryError::Execute)
199 }
200
201 fn execute_load_query_with<E, T>(
204 &self,
205 query: &Query<E>,
206 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
207 ) -> Result<T, QueryError>
208 where
209 E: EntityKind<Canister = C> + EntityValue,
210 {
211 let plan = query.plan()?;
212
213 self.with_metrics(|| op(self.load_executor::<E>(), plan))
214 .map_err(QueryError::Execute)
215 }
216
217 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
218 where
219 E: EntityKind<Canister = C> + EntityValue,
220 {
221 self.execute_load_query_with(query, |load, plan| load.aggregate_count(plan))
222 }
223
224 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
225 where
226 E: EntityKind<Canister = C> + EntityValue,
227 {
228 self.execute_load_query_with(query, |load, plan| load.aggregate_exists(plan))
229 }
230
231 pub(crate) fn execute_load_query_min<E>(
232 &self,
233 query: &Query<E>,
234 ) -> Result<Option<Id<E>>, QueryError>
235 where
236 E: EntityKind<Canister = C> + EntityValue,
237 {
238 self.execute_load_query_with(query, |load, plan| load.aggregate_min(plan))
239 }
240
241 pub(crate) fn execute_load_query_max<E>(
242 &self,
243 query: &Query<E>,
244 ) -> Result<Option<Id<E>>, QueryError>
245 where
246 E: EntityKind<Canister = C> + EntityValue,
247 {
248 self.execute_load_query_with(query, |load, plan| load.aggregate_max(plan))
249 }
250
251 pub(crate) fn execute_load_query_min_by<E>(
252 &self,
253 query: &Query<E>,
254 target_field: &str,
255 ) -> Result<Option<Id<E>>, QueryError>
256 where
257 E: EntityKind<Canister = C> + EntityValue,
258 {
259 self.execute_load_query_with(query, |load, plan| {
260 load.aggregate_min_by(plan, target_field)
261 })
262 }
263
264 pub(crate) fn execute_load_query_max_by<E>(
265 &self,
266 query: &Query<E>,
267 target_field: &str,
268 ) -> Result<Option<Id<E>>, QueryError>
269 where
270 E: EntityKind<Canister = C> + EntityValue,
271 {
272 self.execute_load_query_with(query, |load, plan| {
273 load.aggregate_max_by(plan, target_field)
274 })
275 }
276
277 pub(crate) fn execute_load_query_nth_by<E>(
278 &self,
279 query: &Query<E>,
280 target_field: &str,
281 nth: usize,
282 ) -> Result<Option<Id<E>>, QueryError>
283 where
284 E: EntityKind<Canister = C> + EntityValue,
285 {
286 self.execute_load_query_with(query, |load, plan| {
287 load.aggregate_nth_by(plan, target_field, nth)
288 })
289 }
290
291 pub(crate) fn execute_load_query_sum_by<E>(
292 &self,
293 query: &Query<E>,
294 target_field: &str,
295 ) -> Result<Option<Decimal>, QueryError>
296 where
297 E: EntityKind<Canister = C> + EntityValue,
298 {
299 self.execute_load_query_with(query, |load, plan| {
300 load.aggregate_sum_by(plan, target_field)
301 })
302 }
303
304 pub(crate) fn execute_load_query_avg_by<E>(
305 &self,
306 query: &Query<E>,
307 target_field: &str,
308 ) -> Result<Option<Decimal>, QueryError>
309 where
310 E: EntityKind<Canister = C> + EntityValue,
311 {
312 self.execute_load_query_with(query, |load, plan| {
313 load.aggregate_avg_by(plan, target_field)
314 })
315 }
316
317 pub(crate) fn execute_load_query_median_by<E>(
318 &self,
319 query: &Query<E>,
320 target_field: &str,
321 ) -> Result<Option<Id<E>>, QueryError>
322 where
323 E: EntityKind<Canister = C> + EntityValue,
324 {
325 self.execute_load_query_with(query, |load, plan| {
326 load.aggregate_median_by(plan, target_field)
327 })
328 }
329
330 pub(crate) fn execute_load_query_count_distinct_by<E>(
331 &self,
332 query: &Query<E>,
333 target_field: &str,
334 ) -> Result<u32, QueryError>
335 where
336 E: EntityKind<Canister = C> + EntityValue,
337 {
338 self.execute_load_query_with(query, |load, plan| {
339 load.aggregate_count_distinct_by(plan, target_field)
340 })
341 }
342
343 #[expect(clippy::type_complexity)]
344 pub(crate) fn execute_load_query_min_max_by<E>(
345 &self,
346 query: &Query<E>,
347 target_field: &str,
348 ) -> Result<Option<(Id<E>, Id<E>)>, QueryError>
349 where
350 E: EntityKind<Canister = C> + EntityValue,
351 {
352 self.execute_load_query_with(query, |load, plan| {
353 load.aggregate_min_max_by(plan, target_field)
354 })
355 }
356
357 pub(crate) fn execute_load_query_values_by<E>(
358 &self,
359 query: &Query<E>,
360 target_field: &str,
361 ) -> Result<Vec<Value>, QueryError>
362 where
363 E: EntityKind<Canister = C> + EntityValue,
364 {
365 self.execute_load_query_with(query, |load, plan| load.values_by(plan, target_field))
366 }
367
368 pub(crate) fn execute_load_query_take<E>(
369 &self,
370 query: &Query<E>,
371 take_count: u32,
372 ) -> Result<Response<E>, QueryError>
373 where
374 E: EntityKind<Canister = C> + EntityValue,
375 {
376 self.execute_load_query_with(query, |load, plan| load.take(plan, take_count))
377 }
378
379 pub(crate) fn execute_load_query_top_k_by<E>(
380 &self,
381 query: &Query<E>,
382 target_field: &str,
383 take_count: u32,
384 ) -> Result<Response<E>, QueryError>
385 where
386 E: EntityKind<Canister = C> + EntityValue,
387 {
388 self.execute_load_query_with(query, |load, plan| {
389 load.top_k_by(plan, target_field, take_count)
390 })
391 }
392
393 pub(crate) fn execute_load_query_bottom_k_by<E>(
394 &self,
395 query: &Query<E>,
396 target_field: &str,
397 take_count: u32,
398 ) -> Result<Response<E>, QueryError>
399 where
400 E: EntityKind<Canister = C> + EntityValue,
401 {
402 self.execute_load_query_with(query, |load, plan| {
403 load.bottom_k_by(plan, target_field, take_count)
404 })
405 }
406
407 pub(crate) fn execute_load_query_top_k_by_values<E>(
408 &self,
409 query: &Query<E>,
410 target_field: &str,
411 take_count: u32,
412 ) -> Result<Vec<Value>, QueryError>
413 where
414 E: EntityKind<Canister = C> + EntityValue,
415 {
416 self.execute_load_query_with(query, |load, plan| {
417 load.top_k_by_values(plan, target_field, take_count)
418 })
419 }
420
421 pub(crate) fn execute_load_query_bottom_k_by_values<E>(
422 &self,
423 query: &Query<E>,
424 target_field: &str,
425 take_count: u32,
426 ) -> Result<Vec<Value>, QueryError>
427 where
428 E: EntityKind<Canister = C> + EntityValue,
429 {
430 self.execute_load_query_with(query, |load, plan| {
431 load.bottom_k_by_values(plan, target_field, take_count)
432 })
433 }
434
435 pub(crate) fn execute_load_query_top_k_by_with_ids<E>(
436 &self,
437 query: &Query<E>,
438 target_field: &str,
439 take_count: u32,
440 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
441 where
442 E: EntityKind<Canister = C> + EntityValue,
443 {
444 self.execute_load_query_with(query, |load, plan| {
445 load.top_k_by_with_ids(plan, target_field, take_count)
446 })
447 }
448
449 pub(crate) fn execute_load_query_bottom_k_by_with_ids<E>(
450 &self,
451 query: &Query<E>,
452 target_field: &str,
453 take_count: u32,
454 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
455 where
456 E: EntityKind<Canister = C> + EntityValue,
457 {
458 self.execute_load_query_with(query, |load, plan| {
459 load.bottom_k_by_with_ids(plan, target_field, take_count)
460 })
461 }
462
463 pub(crate) fn execute_load_query_distinct_values_by<E>(
464 &self,
465 query: &Query<E>,
466 target_field: &str,
467 ) -> Result<Vec<Value>, QueryError>
468 where
469 E: EntityKind<Canister = C> + EntityValue,
470 {
471 self.execute_load_query_with(query, |load, plan| {
472 load.distinct_values_by(plan, target_field)
473 })
474 }
475
476 pub(crate) fn execute_load_query_values_by_with_ids<E>(
477 &self,
478 query: &Query<E>,
479 target_field: &str,
480 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
481 where
482 E: EntityKind<Canister = C> + EntityValue,
483 {
484 self.execute_load_query_with(query, |load, plan| {
485 load.values_by_with_ids(plan, target_field)
486 })
487 }
488
489 pub(crate) fn execute_load_query_first_value_by<E>(
490 &self,
491 query: &Query<E>,
492 target_field: &str,
493 ) -> Result<Option<Value>, QueryError>
494 where
495 E: EntityKind<Canister = C> + EntityValue,
496 {
497 self.execute_load_query_with(query, |load, plan| load.first_value_by(plan, target_field))
498 }
499
500 pub(crate) fn execute_load_query_last_value_by<E>(
501 &self,
502 query: &Query<E>,
503 target_field: &str,
504 ) -> Result<Option<Value>, QueryError>
505 where
506 E: EntityKind<Canister = C> + EntityValue,
507 {
508 self.execute_load_query_with(query, |load, plan| load.last_value_by(plan, target_field))
509 }
510
511 pub(crate) fn execute_load_query_first<E>(
512 &self,
513 query: &Query<E>,
514 ) -> Result<Option<Id<E>>, QueryError>
515 where
516 E: EntityKind<Canister = C> + EntityValue,
517 {
518 self.execute_load_query_with(query, |load, plan| load.aggregate_first(plan))
519 }
520
521 pub(crate) fn execute_load_query_last<E>(
522 &self,
523 query: &Query<E>,
524 ) -> Result<Option<Id<E>>, QueryError>
525 where
526 E: EntityKind<Canister = C> + EntityValue,
527 {
528 self.execute_load_query_with(query, |load, plan| load.aggregate_last(plan))
529 }
530
531 pub(crate) fn execute_load_query_paged_with_trace<E>(
532 &self,
533 query: &Query<E>,
534 cursor_token: Option<&str>,
535 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
536 where
537 E: EntityKind<Canister = C> + EntityValue,
538 {
539 let plan = query.plan()?;
540 if plan.as_inner().grouped_plan().is_some() {
541 return Err(QueryError::Execute(
542 InternalError::query_executor_invariant(
543 "grouped plans require execute_grouped(...)",
544 ),
545 ));
546 }
547 let cursor_bytes = match cursor_token {
548 Some(token) => Some(decode_cursor(token).map_err(|reason| {
549 QueryError::from(PlanError::from(
550 CursorPlanError::InvalidContinuationCursor { reason },
551 ))
552 })?),
553 None => None,
554 };
555 let cursor = plan
556 .prepare_cursor(cursor_bytes.as_deref())
557 .map_err(map_executor_plan_error)?;
558
559 let (page, trace) = self
560 .with_metrics(|| {
561 self.load_executor::<E>()
562 .execute_paged_with_cursor_traced(plan, cursor)
563 })
564 .map_err(QueryError::Execute)?;
565 let next_cursor = page
566 .next_cursor
567 .map(|token| {
568 let Some(token) = token.as_scalar() else {
569 return Err(QueryError::Execute(
570 InternalError::query_executor_invariant(
571 "scalar load pagination emitted grouped continuation token",
572 ),
573 ));
574 };
575
576 token.encode().map_err(|err| {
577 QueryError::Execute(InternalError::serialize_internal(format!(
578 "failed to serialize continuation cursor: {err}"
579 )))
580 })
581 })
582 .transpose()?;
583
584 Ok(PagedLoadExecutionWithTrace::new(
585 page.items,
586 next_cursor,
587 trace,
588 ))
589 }
590
591 pub fn execute_grouped<E>(
596 &self,
597 query: &Query<E>,
598 cursor_token: Option<&str>,
599 ) -> Result<PagedGroupedExecutionWithTrace, QueryError>
600 where
601 E: EntityKind<Canister = C> + EntityValue,
602 {
603 let plan = query.plan()?;
604 if plan.as_inner().grouped_plan().is_none() {
605 return Err(QueryError::Execute(
606 InternalError::query_executor_invariant(
607 "execute_grouped requires grouped logical plans",
608 ),
609 ));
610 }
611 let cursor_bytes = match cursor_token {
612 Some(token) => Some(decode_cursor(token).map_err(|reason| {
613 QueryError::from(PlanError::from(
614 CursorPlanError::InvalidContinuationCursor { reason },
615 ))
616 })?),
617 None => None,
618 };
619 let cursor = plan
620 .prepare_grouped_cursor(cursor_bytes.as_deref())
621 .map_err(map_executor_plan_error)?;
622
623 let (page, trace) = self
624 .with_metrics(|| {
625 self.load_executor::<E>()
626 .execute_grouped_paged_with_cursor_traced(plan, cursor)
627 })
628 .map_err(QueryError::Execute)?;
629 let next_cursor = page
630 .next_cursor
631 .map(|token| {
632 let Some(token) = token.as_grouped() else {
633 return Err(QueryError::Execute(
634 InternalError::query_executor_invariant(
635 "grouped pagination emitted scalar continuation token",
636 ),
637 ));
638 };
639
640 token.encode().map_err(|err| {
641 QueryError::Execute(InternalError::serialize_internal(format!(
642 "failed to serialize grouped continuation cursor: {err}"
643 )))
644 })
645 })
646 .transpose()?;
647
648 Ok(PagedGroupedExecutionWithTrace::new(
649 page.rows,
650 next_cursor,
651 trace,
652 ))
653 }
654
655 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
660 where
661 E: EntityKind<Canister = C> + EntityValue,
662 {
663 self.execute_save_entity(|save| save.insert(entity))
664 }
665
666 pub fn insert_many_atomic<E>(
672 &self,
673 entities: impl IntoIterator<Item = E>,
674 ) -> Result<WriteBatchResponse<E>, InternalError>
675 where
676 E: EntityKind<Canister = C> + EntityValue,
677 {
678 self.execute_save_batch(|save| save.insert_many_atomic(entities))
679 }
680
681 pub fn insert_many_non_atomic<E>(
685 &self,
686 entities: impl IntoIterator<Item = E>,
687 ) -> Result<WriteBatchResponse<E>, InternalError>
688 where
689 E: EntityKind<Canister = C> + EntityValue,
690 {
691 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
692 }
693
694 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
695 where
696 E: EntityKind<Canister = C> + EntityValue,
697 {
698 self.execute_save_entity(|save| save.replace(entity))
699 }
700
701 pub fn replace_many_atomic<E>(
707 &self,
708 entities: impl IntoIterator<Item = E>,
709 ) -> Result<WriteBatchResponse<E>, InternalError>
710 where
711 E: EntityKind<Canister = C> + EntityValue,
712 {
713 self.execute_save_batch(|save| save.replace_many_atomic(entities))
714 }
715
716 pub fn replace_many_non_atomic<E>(
720 &self,
721 entities: impl IntoIterator<Item = E>,
722 ) -> Result<WriteBatchResponse<E>, InternalError>
723 where
724 E: EntityKind<Canister = C> + EntityValue,
725 {
726 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
727 }
728
729 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
730 where
731 E: EntityKind<Canister = C> + EntityValue,
732 {
733 self.execute_save_entity(|save| save.update(entity))
734 }
735
736 pub fn update_many_atomic<E>(
742 &self,
743 entities: impl IntoIterator<Item = E>,
744 ) -> Result<WriteBatchResponse<E>, InternalError>
745 where
746 E: EntityKind<Canister = C> + EntityValue,
747 {
748 self.execute_save_batch(|save| save.update_many_atomic(entities))
749 }
750
751 pub fn update_many_non_atomic<E>(
755 &self,
756 entities: impl IntoIterator<Item = E>,
757 ) -> Result<WriteBatchResponse<E>, InternalError>
758 where
759 E: EntityKind<Canister = C> + EntityValue,
760 {
761 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
762 }
763
764 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
765 where
766 E: EntityKind<Canister = C> + EntityValue,
767 {
768 self.execute_save_view::<E>(|save| save.insert_view(view))
769 }
770
771 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
772 where
773 E: EntityKind<Canister = C> + EntityValue,
774 {
775 self.execute_save_view::<E>(|save| save.replace_view(view))
776 }
777
778 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
779 where
780 E: EntityKind<Canister = C> + EntityValue,
781 {
782 self.execute_save_view::<E>(|save| save.update_view(view))
783 }
784
785 #[cfg(test)]
787 #[doc(hidden)]
788 pub fn clear_stores_for_tests(&self) {
789 self.db.with_store_registry(|reg| {
790 for (_, store) in reg.iter() {
793 store.with_data_mut(DataStore::clear);
794 store.with_index_mut(IndexStore::clear);
795 }
796 });
797 }
798}