1#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5 db::{
6 Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7 QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse,
8 cursor::CursorPlanError,
9 decode_cursor,
10 executor::{DeleteExecutor, ExecutablePlan, ExecutorPlanError, LoadExecutor, SaveExecutor},
11 query::intent::QueryMode,
12 },
13 error::InternalError,
14 obs::sink::{MetricsSink, with_metrics_sink},
15 traits::{CanisterKind, EntityKind, EntityValue},
16 types::{Decimal, Id},
17 value::Value,
18};
19
20fn map_executor_plan_error(err: ExecutorPlanError) -> QueryError {
22 QueryError::from(err.into_plan_error())
23}
24
25pub struct DbSession<C: CanisterKind> {
32 db: Db<C>,
33 debug: bool,
34 metrics: Option<&'static dyn MetricsSink>,
35}
36
37impl<C: CanisterKind> DbSession<C> {
38 #[must_use]
39 pub const fn new(db: Db<C>) -> Self {
40 Self {
41 db,
42 debug: false,
43 metrics: None,
44 }
45 }
46
47 #[must_use]
48 pub const fn debug(mut self) -> Self {
49 self.debug = true;
50 self
51 }
52
53 #[must_use]
54 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
55 self.metrics = Some(sink);
56 self
57 }
58
59 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
60 if let Some(sink) = self.metrics {
61 with_metrics_sink(sink, f)
62 } else {
63 f()
64 }
65 }
66
67 fn execute_save_with<E, T, R>(
69 &self,
70 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
71 map: impl FnOnce(T) -> R,
72 ) -> Result<R, InternalError>
73 where
74 E: EntityKind<Canister = C> + EntityValue,
75 {
76 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
77
78 Ok(map(value))
79 }
80
81 fn execute_save_entity<E>(
83 &self,
84 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
85 ) -> Result<WriteResponse<E>, InternalError>
86 where
87 E: EntityKind<Canister = C> + EntityValue,
88 {
89 self.execute_save_with(op, WriteResponse::new)
90 }
91
92 fn execute_save_batch<E>(
93 &self,
94 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
95 ) -> Result<WriteBatchResponse<E>, InternalError>
96 where
97 E: EntityKind<Canister = C> + EntityValue,
98 {
99 self.execute_save_with(op, WriteBatchResponse::new)
100 }
101
102 fn execute_save_view<E>(
103 &self,
104 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
105 ) -> Result<E::ViewType, InternalError>
106 where
107 E: EntityKind<Canister = C> + EntityValue,
108 {
109 self.execute_save_with(op, std::convert::identity)
110 }
111
112 #[must_use]
117 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
118 where
119 E: EntityKind<Canister = C>,
120 {
121 FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
122 }
123
124 #[must_use]
125 pub const fn load_with_consistency<E>(
126 &self,
127 consistency: ReadConsistency,
128 ) -> FluentLoadQuery<'_, E>
129 where
130 E: EntityKind<Canister = C>,
131 {
132 FluentLoadQuery::new(self, Query::new(consistency))
133 }
134
135 #[must_use]
136 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
137 where
138 E: EntityKind<Canister = C>,
139 {
140 FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
141 }
142
143 #[must_use]
144 pub fn delete_with_consistency<E>(
145 &self,
146 consistency: ReadConsistency,
147 ) -> FluentDeleteQuery<'_, E>
148 where
149 E: EntityKind<Canister = C>,
150 {
151 FluentDeleteQuery::new(self, Query::new(consistency).delete())
152 }
153
154 #[must_use]
159 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
160 where
161 E: EntityKind<Canister = C> + EntityValue,
162 {
163 LoadExecutor::new(self.db, self.debug)
164 }
165
166 #[must_use]
167 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
168 where
169 E: EntityKind<Canister = C> + EntityValue,
170 {
171 DeleteExecutor::new(self.db, self.debug)
172 }
173
174 #[must_use]
175 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
176 where
177 E: EntityKind<Canister = C> + EntityValue,
178 {
179 SaveExecutor::new(self.db, self.debug)
180 }
181
182 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
187 where
188 E: EntityKind<Canister = C> + EntityValue,
189 {
190 let plan = query.plan()?;
191
192 let result = match query.mode() {
193 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
194 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
195 };
196
197 result.map_err(QueryError::Execute)
198 }
199
200 fn execute_load_query_with<E, T>(
203 &self,
204 query: &Query<E>,
205 op: impl FnOnce(LoadExecutor<E>, ExecutablePlan<E>) -> Result<T, InternalError>,
206 ) -> Result<T, QueryError>
207 where
208 E: EntityKind<Canister = C> + EntityValue,
209 {
210 let plan = query.plan()?;
211
212 self.with_metrics(|| op(self.load_executor::<E>(), plan))
213 .map_err(QueryError::Execute)
214 }
215
216 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
217 where
218 E: EntityKind<Canister = C> + EntityValue,
219 {
220 self.execute_load_query_with(query, |load, plan| load.aggregate_count(plan))
221 }
222
223 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
224 where
225 E: EntityKind<Canister = C> + EntityValue,
226 {
227 self.execute_load_query_with(query, |load, plan| load.aggregate_exists(plan))
228 }
229
230 pub(crate) fn execute_load_query_min<E>(
231 &self,
232 query: &Query<E>,
233 ) -> Result<Option<Id<E>>, QueryError>
234 where
235 E: EntityKind<Canister = C> + EntityValue,
236 {
237 self.execute_load_query_with(query, |load, plan| load.aggregate_min(plan))
238 }
239
240 pub(crate) fn execute_load_query_max<E>(
241 &self,
242 query: &Query<E>,
243 ) -> Result<Option<Id<E>>, QueryError>
244 where
245 E: EntityKind<Canister = C> + EntityValue,
246 {
247 self.execute_load_query_with(query, |load, plan| load.aggregate_max(plan))
248 }
249
250 pub(crate) fn execute_load_query_min_by<E>(
251 &self,
252 query: &Query<E>,
253 target_field: &str,
254 ) -> Result<Option<Id<E>>, QueryError>
255 where
256 E: EntityKind<Canister = C> + EntityValue,
257 {
258 self.execute_load_query_with(query, |load, plan| {
259 load.aggregate_min_by(plan, target_field)
260 })
261 }
262
263 pub(crate) fn execute_load_query_max_by<E>(
264 &self,
265 query: &Query<E>,
266 target_field: &str,
267 ) -> Result<Option<Id<E>>, QueryError>
268 where
269 E: EntityKind<Canister = C> + EntityValue,
270 {
271 self.execute_load_query_with(query, |load, plan| {
272 load.aggregate_max_by(plan, target_field)
273 })
274 }
275
276 pub(crate) fn execute_load_query_nth_by<E>(
277 &self,
278 query: &Query<E>,
279 target_field: &str,
280 nth: usize,
281 ) -> Result<Option<Id<E>>, QueryError>
282 where
283 E: EntityKind<Canister = C> + EntityValue,
284 {
285 self.execute_load_query_with(query, |load, plan| {
286 load.aggregate_nth_by(plan, target_field, nth)
287 })
288 }
289
290 pub(crate) fn execute_load_query_sum_by<E>(
291 &self,
292 query: &Query<E>,
293 target_field: &str,
294 ) -> Result<Option<Decimal>, QueryError>
295 where
296 E: EntityKind<Canister = C> + EntityValue,
297 {
298 self.execute_load_query_with(query, |load, plan| {
299 load.aggregate_sum_by(plan, target_field)
300 })
301 }
302
303 pub(crate) fn execute_load_query_avg_by<E>(
304 &self,
305 query: &Query<E>,
306 target_field: &str,
307 ) -> Result<Option<Decimal>, QueryError>
308 where
309 E: EntityKind<Canister = C> + EntityValue,
310 {
311 self.execute_load_query_with(query, |load, plan| {
312 load.aggregate_avg_by(plan, target_field)
313 })
314 }
315
316 pub(crate) fn execute_load_query_median_by<E>(
317 &self,
318 query: &Query<E>,
319 target_field: &str,
320 ) -> Result<Option<Id<E>>, QueryError>
321 where
322 E: EntityKind<Canister = C> + EntityValue,
323 {
324 self.execute_load_query_with(query, |load, plan| {
325 load.aggregate_median_by(plan, target_field)
326 })
327 }
328
329 pub(crate) fn execute_load_query_count_distinct_by<E>(
330 &self,
331 query: &Query<E>,
332 target_field: &str,
333 ) -> Result<u32, QueryError>
334 where
335 E: EntityKind<Canister = C> + EntityValue,
336 {
337 self.execute_load_query_with(query, |load, plan| {
338 load.aggregate_count_distinct_by(plan, target_field)
339 })
340 }
341
342 #[expect(clippy::type_complexity)]
343 pub(crate) fn execute_load_query_min_max_by<E>(
344 &self,
345 query: &Query<E>,
346 target_field: &str,
347 ) -> Result<Option<(Id<E>, Id<E>)>, QueryError>
348 where
349 E: EntityKind<Canister = C> + EntityValue,
350 {
351 self.execute_load_query_with(query, |load, plan| {
352 load.aggregate_min_max_by(plan, target_field)
353 })
354 }
355
356 pub(crate) fn execute_load_query_values_by<E>(
357 &self,
358 query: &Query<E>,
359 target_field: &str,
360 ) -> Result<Vec<Value>, QueryError>
361 where
362 E: EntityKind<Canister = C> + EntityValue,
363 {
364 self.execute_load_query_with(query, |load, plan| load.values_by(plan, target_field))
365 }
366
367 pub(crate) fn execute_load_query_take<E>(
368 &self,
369 query: &Query<E>,
370 take_count: u32,
371 ) -> Result<Response<E>, QueryError>
372 where
373 E: EntityKind<Canister = C> + EntityValue,
374 {
375 self.execute_load_query_with(query, |load, plan| load.take(plan, take_count))
376 }
377
378 pub(crate) fn execute_load_query_top_k_by<E>(
379 &self,
380 query: &Query<E>,
381 target_field: &str,
382 take_count: u32,
383 ) -> Result<Response<E>, QueryError>
384 where
385 E: EntityKind<Canister = C> + EntityValue,
386 {
387 self.execute_load_query_with(query, |load, plan| {
388 load.top_k_by(plan, target_field, take_count)
389 })
390 }
391
392 pub(crate) fn execute_load_query_bottom_k_by<E>(
393 &self,
394 query: &Query<E>,
395 target_field: &str,
396 take_count: u32,
397 ) -> Result<Response<E>, QueryError>
398 where
399 E: EntityKind<Canister = C> + EntityValue,
400 {
401 self.execute_load_query_with(query, |load, plan| {
402 load.bottom_k_by(plan, target_field, take_count)
403 })
404 }
405
406 pub(crate) fn execute_load_query_top_k_by_values<E>(
407 &self,
408 query: &Query<E>,
409 target_field: &str,
410 take_count: u32,
411 ) -> Result<Vec<Value>, QueryError>
412 where
413 E: EntityKind<Canister = C> + EntityValue,
414 {
415 self.execute_load_query_with(query, |load, plan| {
416 load.top_k_by_values(plan, target_field, take_count)
417 })
418 }
419
420 pub(crate) fn execute_load_query_bottom_k_by_values<E>(
421 &self,
422 query: &Query<E>,
423 target_field: &str,
424 take_count: u32,
425 ) -> Result<Vec<Value>, QueryError>
426 where
427 E: EntityKind<Canister = C> + EntityValue,
428 {
429 self.execute_load_query_with(query, |load, plan| {
430 load.bottom_k_by_values(plan, target_field, take_count)
431 })
432 }
433
434 pub(crate) fn execute_load_query_top_k_by_with_ids<E>(
435 &self,
436 query: &Query<E>,
437 target_field: &str,
438 take_count: u32,
439 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
440 where
441 E: EntityKind<Canister = C> + EntityValue,
442 {
443 self.execute_load_query_with(query, |load, plan| {
444 load.top_k_by_with_ids(plan, target_field, take_count)
445 })
446 }
447
448 pub(crate) fn execute_load_query_bottom_k_by_with_ids<E>(
449 &self,
450 query: &Query<E>,
451 target_field: &str,
452 take_count: u32,
453 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
454 where
455 E: EntityKind<Canister = C> + EntityValue,
456 {
457 self.execute_load_query_with(query, |load, plan| {
458 load.bottom_k_by_with_ids(plan, target_field, take_count)
459 })
460 }
461
462 pub(crate) fn execute_load_query_distinct_values_by<E>(
463 &self,
464 query: &Query<E>,
465 target_field: &str,
466 ) -> Result<Vec<Value>, QueryError>
467 where
468 E: EntityKind<Canister = C> + EntityValue,
469 {
470 self.execute_load_query_with(query, |load, plan| {
471 load.distinct_values_by(plan, target_field)
472 })
473 }
474
475 pub(crate) fn execute_load_query_values_by_with_ids<E>(
476 &self,
477 query: &Query<E>,
478 target_field: &str,
479 ) -> Result<Vec<(Id<E>, Value)>, QueryError>
480 where
481 E: EntityKind<Canister = C> + EntityValue,
482 {
483 self.execute_load_query_with(query, |load, plan| {
484 load.values_by_with_ids(plan, target_field)
485 })
486 }
487
488 pub(crate) fn execute_load_query_first_value_by<E>(
489 &self,
490 query: &Query<E>,
491 target_field: &str,
492 ) -> Result<Option<Value>, QueryError>
493 where
494 E: EntityKind<Canister = C> + EntityValue,
495 {
496 self.execute_load_query_with(query, |load, plan| load.first_value_by(plan, target_field))
497 }
498
499 pub(crate) fn execute_load_query_last_value_by<E>(
500 &self,
501 query: &Query<E>,
502 target_field: &str,
503 ) -> Result<Option<Value>, QueryError>
504 where
505 E: EntityKind<Canister = C> + EntityValue,
506 {
507 self.execute_load_query_with(query, |load, plan| load.last_value_by(plan, target_field))
508 }
509
510 pub(crate) fn execute_load_query_first<E>(
511 &self,
512 query: &Query<E>,
513 ) -> Result<Option<Id<E>>, QueryError>
514 where
515 E: EntityKind<Canister = C> + EntityValue,
516 {
517 self.execute_load_query_with(query, |load, plan| load.aggregate_first(plan))
518 }
519
520 pub(crate) fn execute_load_query_last<E>(
521 &self,
522 query: &Query<E>,
523 ) -> Result<Option<Id<E>>, QueryError>
524 where
525 E: EntityKind<Canister = C> + EntityValue,
526 {
527 self.execute_load_query_with(query, |load, plan| load.aggregate_last(plan))
528 }
529
530 pub(crate) fn execute_load_query_paged_with_trace<E>(
531 &self,
532 query: &Query<E>,
533 cursor_token: Option<&str>,
534 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
535 where
536 E: EntityKind<Canister = C> + EntityValue,
537 {
538 let plan = query.plan()?;
539 let cursor_bytes = match cursor_token {
540 Some(token) => Some(decode_cursor(token).map_err(|reason| {
541 QueryError::from(PlanError::from(
542 CursorPlanError::InvalidContinuationCursor { reason },
543 ))
544 })?),
545 None => None,
546 };
547 let cursor = plan
548 .prepare_cursor(cursor_bytes.as_deref())
549 .map_err(map_executor_plan_error)?;
550
551 let (page, trace) = self
552 .with_metrics(|| {
553 self.load_executor::<E>()
554 .execute_paged_with_cursor_traced(plan, cursor)
555 })
556 .map_err(QueryError::Execute)?;
557 let next_cursor = page
558 .next_cursor
559 .map(|token| {
560 token.encode().map_err(|err| {
561 QueryError::Execute(InternalError::serialize_internal(format!(
562 "failed to serialize continuation cursor: {err}"
563 )))
564 })
565 })
566 .transpose()?;
567
568 Ok(PagedLoadExecutionWithTrace::new(
569 page.items,
570 next_cursor,
571 trace,
572 ))
573 }
574
575 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
580 where
581 E: EntityKind<Canister = C> + EntityValue,
582 {
583 self.execute_save_entity(|save| save.insert(entity))
584 }
585
586 pub fn insert_many_atomic<E>(
592 &self,
593 entities: impl IntoIterator<Item = E>,
594 ) -> Result<WriteBatchResponse<E>, InternalError>
595 where
596 E: EntityKind<Canister = C> + EntityValue,
597 {
598 self.execute_save_batch(|save| save.insert_many_atomic(entities))
599 }
600
601 pub fn insert_many_non_atomic<E>(
605 &self,
606 entities: impl IntoIterator<Item = E>,
607 ) -> Result<WriteBatchResponse<E>, InternalError>
608 where
609 E: EntityKind<Canister = C> + EntityValue,
610 {
611 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
612 }
613
614 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
615 where
616 E: EntityKind<Canister = C> + EntityValue,
617 {
618 self.execute_save_entity(|save| save.replace(entity))
619 }
620
621 pub fn replace_many_atomic<E>(
627 &self,
628 entities: impl IntoIterator<Item = E>,
629 ) -> Result<WriteBatchResponse<E>, InternalError>
630 where
631 E: EntityKind<Canister = C> + EntityValue,
632 {
633 self.execute_save_batch(|save| save.replace_many_atomic(entities))
634 }
635
636 pub fn replace_many_non_atomic<E>(
640 &self,
641 entities: impl IntoIterator<Item = E>,
642 ) -> Result<WriteBatchResponse<E>, InternalError>
643 where
644 E: EntityKind<Canister = C> + EntityValue,
645 {
646 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
647 }
648
649 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
650 where
651 E: EntityKind<Canister = C> + EntityValue,
652 {
653 self.execute_save_entity(|save| save.update(entity))
654 }
655
656 pub fn update_many_atomic<E>(
662 &self,
663 entities: impl IntoIterator<Item = E>,
664 ) -> Result<WriteBatchResponse<E>, InternalError>
665 where
666 E: EntityKind<Canister = C> + EntityValue,
667 {
668 self.execute_save_batch(|save| save.update_many_atomic(entities))
669 }
670
671 pub fn update_many_non_atomic<E>(
675 &self,
676 entities: impl IntoIterator<Item = E>,
677 ) -> Result<WriteBatchResponse<E>, InternalError>
678 where
679 E: EntityKind<Canister = C> + EntityValue,
680 {
681 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
682 }
683
684 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
685 where
686 E: EntityKind<Canister = C> + EntityValue,
687 {
688 self.execute_save_view::<E>(|save| save.insert_view(view))
689 }
690
691 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
692 where
693 E: EntityKind<Canister = C> + EntityValue,
694 {
695 self.execute_save_view::<E>(|save| save.replace_view(view))
696 }
697
698 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
699 where
700 E: EntityKind<Canister = C> + EntityValue,
701 {
702 self.execute_save_view::<E>(|save| save.update_view(view))
703 }
704
705 #[cfg(test)]
707 #[doc(hidden)]
708 pub fn clear_stores_for_tests(&self) {
709 self.db.with_store_registry(|reg| {
710 for (_, store) in reg.iter() {
713 store.with_data_mut(DataStore::clear);
714 store.with_index_mut(IndexStore::clear);
715 }
716 });
717 }
718}