1#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5 db::{
6 Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7 QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9 query::{intent::QueryMode, plan::CursorPlanError},
10 },
11 error::InternalError,
12 obs::sink::{MetricsSink, with_metrics_sink},
13 traits::{CanisterKind, EntityKind, EntityValue},
14 types::{Decimal, Id},
15 value::Value,
16};
17
18type MinMaxByIds<E> = Option<(Id<E>, Id<E>)>;
19
20pub struct DbSession<C: CanisterKind> {
27 db: Db<C>,
28 debug: bool,
29 metrics: Option<&'static dyn MetricsSink>,
30}
31
32impl<C: CanisterKind> DbSession<C> {
33 #[must_use]
34 pub const fn new(db: Db<C>) -> Self {
35 Self {
36 db,
37 debug: false,
38 metrics: None,
39 }
40 }
41
42 #[must_use]
43 pub const fn debug(mut self) -> Self {
44 self.debug = true;
45 self
46 }
47
48 #[must_use]
49 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
50 self.metrics = Some(sink);
51 self
52 }
53
54 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
55 if let Some(sink) = self.metrics {
56 with_metrics_sink(sink, f)
57 } else {
58 f()
59 }
60 }
61
62 fn execute_save_with<E, T, R>(
64 &self,
65 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
66 map: impl FnOnce(T) -> R,
67 ) -> Result<R, InternalError>
68 where
69 E: EntityKind<Canister = C> + EntityValue,
70 {
71 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
72
73 Ok(map(value))
74 }
75
76 fn execute_save_entity<E>(
78 &self,
79 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
80 ) -> Result<WriteResponse<E>, InternalError>
81 where
82 E: EntityKind<Canister = C> + EntityValue,
83 {
84 self.execute_save_with(op, WriteResponse::new)
85 }
86
87 fn execute_save_batch<E>(
88 &self,
89 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
90 ) -> Result<WriteBatchResponse<E>, InternalError>
91 where
92 E: EntityKind<Canister = C> + EntityValue,
93 {
94 self.execute_save_with(op, WriteBatchResponse::new)
95 }
96
97 fn execute_save_view<E>(
98 &self,
99 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
100 ) -> Result<E::ViewType, InternalError>
101 where
102 E: EntityKind<Canister = C> + EntityValue,
103 {
104 self.execute_save_with(op, std::convert::identity)
105 }
106
107 #[must_use]
112 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
113 where
114 E: EntityKind<Canister = C>,
115 {
116 FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
117 }
118
119 #[must_use]
120 pub const fn load_with_consistency<E>(
121 &self,
122 consistency: ReadConsistency,
123 ) -> FluentLoadQuery<'_, E>
124 where
125 E: EntityKind<Canister = C>,
126 {
127 FluentLoadQuery::new(self, Query::new(consistency))
128 }
129
130 #[must_use]
131 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
132 where
133 E: EntityKind<Canister = C>,
134 {
135 FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
136 }
137
138 #[must_use]
139 pub fn delete_with_consistency<E>(
140 &self,
141 consistency: ReadConsistency,
142 ) -> FluentDeleteQuery<'_, E>
143 where
144 E: EntityKind<Canister = C>,
145 {
146 FluentDeleteQuery::new(self, Query::new(consistency).delete())
147 }
148
149 #[must_use]
154 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
155 where
156 E: EntityKind<Canister = C> + EntityValue,
157 {
158 LoadExecutor::new(self.db, self.debug)
159 }
160
161 #[must_use]
162 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
163 where
164 E: EntityKind<Canister = C> + EntityValue,
165 {
166 DeleteExecutor::new(self.db, self.debug)
167 }
168
169 #[must_use]
170 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
171 where
172 E: EntityKind<Canister = C> + EntityValue,
173 {
174 SaveExecutor::new(self.db, self.debug)
175 }
176
177 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
182 where
183 E: EntityKind<Canister = C> + EntityValue,
184 {
185 let plan = query.plan()?;
186
187 let result = match query.mode() {
188 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
189 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
190 };
191
192 result.map_err(QueryError::Execute)
193 }
194
195 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
196 where
197 E: EntityKind<Canister = C> + EntityValue,
198 {
199 let plan = query.plan()?;
200
201 self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
202 .map_err(QueryError::Execute)
203 }
204
205 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
206 where
207 E: EntityKind<Canister = C> + EntityValue,
208 {
209 let plan = query.plan()?;
210
211 self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
212 .map_err(QueryError::Execute)
213 }
214
215 pub(crate) fn execute_load_query_min<E>(
216 &self,
217 query: &Query<E>,
218 ) -> Result<Option<Id<E>>, QueryError>
219 where
220 E: EntityKind<Canister = C> + EntityValue,
221 {
222 let plan = query.plan()?;
223
224 self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
225 .map_err(QueryError::Execute)
226 }
227
228 pub(crate) fn execute_load_query_max<E>(
229 &self,
230 query: &Query<E>,
231 ) -> Result<Option<Id<E>>, QueryError>
232 where
233 E: EntityKind<Canister = C> + EntityValue,
234 {
235 let plan = query.plan()?;
236
237 self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
238 .map_err(QueryError::Execute)
239 }
240
241 pub(crate) fn execute_load_query_min_by<E>(
242 &self,
243 query: &Query<E>,
244 target_field: &str,
245 ) -> Result<Option<Id<E>>, QueryError>
246 where
247 E: EntityKind<Canister = C> + EntityValue,
248 {
249 let plan = query.plan()?;
250
251 self.with_metrics(|| {
252 self.load_executor::<E>()
253 .aggregate_min_by(plan, target_field)
254 })
255 .map_err(QueryError::Execute)
256 }
257
258 pub(crate) fn execute_load_query_max_by<E>(
259 &self,
260 query: &Query<E>,
261 target_field: &str,
262 ) -> Result<Option<Id<E>>, QueryError>
263 where
264 E: EntityKind<Canister = C> + EntityValue,
265 {
266 let plan = query.plan()?;
267
268 self.with_metrics(|| {
269 self.load_executor::<E>()
270 .aggregate_max_by(plan, target_field)
271 })
272 .map_err(QueryError::Execute)
273 }
274
275 pub(crate) fn execute_load_query_nth_by<E>(
276 &self,
277 query: &Query<E>,
278 target_field: &str,
279 nth: usize,
280 ) -> Result<Option<Id<E>>, QueryError>
281 where
282 E: EntityKind<Canister = C> + EntityValue,
283 {
284 let plan = query.plan()?;
285
286 self.with_metrics(|| {
287 self.load_executor::<E>()
288 .aggregate_nth_by(plan, target_field, nth)
289 })
290 .map_err(QueryError::Execute)
291 }
292
293 pub(crate) fn execute_load_query_sum_by<E>(
294 &self,
295 query: &Query<E>,
296 target_field: &str,
297 ) -> Result<Option<Decimal>, QueryError>
298 where
299 E: EntityKind<Canister = C> + EntityValue,
300 {
301 let plan = query.plan()?;
302
303 self.with_metrics(|| {
304 self.load_executor::<E>()
305 .aggregate_sum_by(plan, target_field)
306 })
307 .map_err(QueryError::Execute)
308 }
309
310 pub(crate) fn execute_load_query_avg_by<E>(
311 &self,
312 query: &Query<E>,
313 target_field: &str,
314 ) -> Result<Option<Decimal>, QueryError>
315 where
316 E: EntityKind<Canister = C> + EntityValue,
317 {
318 let plan = query.plan()?;
319
320 self.with_metrics(|| {
321 self.load_executor::<E>()
322 .aggregate_avg_by(plan, target_field)
323 })
324 .map_err(QueryError::Execute)
325 }
326
327 pub(crate) fn execute_load_query_median_by<E>(
328 &self,
329 query: &Query<E>,
330 target_field: &str,
331 ) -> Result<Option<Id<E>>, QueryError>
332 where
333 E: EntityKind<Canister = C> + EntityValue,
334 {
335 let plan = query.plan()?;
336
337 self.with_metrics(|| {
338 self.load_executor::<E>()
339 .aggregate_median_by(plan, target_field)
340 })
341 .map_err(QueryError::Execute)
342 }
343
344 pub(crate) fn execute_load_query_count_distinct_by<E>(
345 &self,
346 query: &Query<E>,
347 target_field: &str,
348 ) -> Result<u32, QueryError>
349 where
350 E: EntityKind<Canister = C> + EntityValue,
351 {
352 let plan = query.plan()?;
353
354 self.with_metrics(|| {
355 self.load_executor::<E>()
356 .aggregate_count_distinct_by(plan, target_field)
357 })
358 .map_err(QueryError::Execute)
359 }
360
361 pub(crate) fn execute_load_query_min_max_by<E>(
362 &self,
363 query: &Query<E>,
364 target_field: &str,
365 ) -> Result<MinMaxByIds<E>, QueryError>
366 where
367 E: EntityKind<Canister = C> + EntityValue,
368 {
369 let plan = query.plan()?;
370
371 self.with_metrics(|| {
372 self.load_executor::<E>()
373 .aggregate_min_max_by(plan, target_field)
374 })
375 .map_err(QueryError::Execute)
376 }
377
378 pub(crate) fn execute_load_query_values_by<E>(
379 &self,
380 query: &Query<E>,
381 target_field: &str,
382 ) -> Result<Vec<Value>, QueryError>
383 where
384 E: EntityKind<Canister = C> + EntityValue,
385 {
386 let plan = query.plan()?;
387
388 self.with_metrics(|| self.load_executor::<E>().values_by(plan, target_field))
389 .map_err(QueryError::Execute)
390 }
391
392 pub(crate) fn execute_load_query_first<E>(
393 &self,
394 query: &Query<E>,
395 ) -> Result<Option<Id<E>>, QueryError>
396 where
397 E: EntityKind<Canister = C> + EntityValue,
398 {
399 let plan = query.plan()?;
400
401 self.with_metrics(|| self.load_executor::<E>().aggregate_first(plan))
402 .map_err(QueryError::Execute)
403 }
404
405 pub(crate) fn execute_load_query_last<E>(
406 &self,
407 query: &Query<E>,
408 ) -> Result<Option<Id<E>>, QueryError>
409 where
410 E: EntityKind<Canister = C> + EntityValue,
411 {
412 let plan = query.plan()?;
413
414 self.with_metrics(|| self.load_executor::<E>().aggregate_last(plan))
415 .map_err(QueryError::Execute)
416 }
417
418 pub(crate) fn execute_load_query_paged_with_trace<E>(
419 &self,
420 query: &Query<E>,
421 cursor_token: Option<&str>,
422 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
423 where
424 E: EntityKind<Canister = C> + EntityValue,
425 {
426 let plan = query.plan()?;
427 let cursor_bytes = match cursor_token {
428 Some(token) => Some(decode_cursor(token).map_err(|reason| {
429 QueryError::from(PlanError::from(
430 CursorPlanError::InvalidContinuationCursor { reason },
431 ))
432 })?),
433 None => None,
434 };
435 let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
436
437 let (page, trace) = self
438 .with_metrics(|| {
439 self.load_executor::<E>()
440 .execute_paged_with_cursor_traced(plan, cursor)
441 })
442 .map_err(QueryError::Execute)?;
443
444 Ok(PagedLoadExecutionWithTrace::new(
445 page.items,
446 page.next_cursor,
447 trace,
448 ))
449 }
450
451 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
456 where
457 E: EntityKind<Canister = C> + EntityValue,
458 {
459 self.execute_save_entity(|save| save.insert(entity))
460 }
461
462 pub fn insert_many_atomic<E>(
468 &self,
469 entities: impl IntoIterator<Item = E>,
470 ) -> Result<WriteBatchResponse<E>, InternalError>
471 where
472 E: EntityKind<Canister = C> + EntityValue,
473 {
474 self.execute_save_batch(|save| save.insert_many_atomic(entities))
475 }
476
477 pub fn insert_many_non_atomic<E>(
481 &self,
482 entities: impl IntoIterator<Item = E>,
483 ) -> Result<WriteBatchResponse<E>, InternalError>
484 where
485 E: EntityKind<Canister = C> + EntityValue,
486 {
487 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
488 }
489
490 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
491 where
492 E: EntityKind<Canister = C> + EntityValue,
493 {
494 self.execute_save_entity(|save| save.replace(entity))
495 }
496
497 pub fn replace_many_atomic<E>(
503 &self,
504 entities: impl IntoIterator<Item = E>,
505 ) -> Result<WriteBatchResponse<E>, InternalError>
506 where
507 E: EntityKind<Canister = C> + EntityValue,
508 {
509 self.execute_save_batch(|save| save.replace_many_atomic(entities))
510 }
511
512 pub fn replace_many_non_atomic<E>(
516 &self,
517 entities: impl IntoIterator<Item = E>,
518 ) -> Result<WriteBatchResponse<E>, InternalError>
519 where
520 E: EntityKind<Canister = C> + EntityValue,
521 {
522 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
523 }
524
525 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
526 where
527 E: EntityKind<Canister = C> + EntityValue,
528 {
529 self.execute_save_entity(|save| save.update(entity))
530 }
531
532 pub fn update_many_atomic<E>(
538 &self,
539 entities: impl IntoIterator<Item = E>,
540 ) -> Result<WriteBatchResponse<E>, InternalError>
541 where
542 E: EntityKind<Canister = C> + EntityValue,
543 {
544 self.execute_save_batch(|save| save.update_many_atomic(entities))
545 }
546
547 pub fn update_many_non_atomic<E>(
551 &self,
552 entities: impl IntoIterator<Item = E>,
553 ) -> Result<WriteBatchResponse<E>, InternalError>
554 where
555 E: EntityKind<Canister = C> + EntityValue,
556 {
557 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
558 }
559
560 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
561 where
562 E: EntityKind<Canister = C> + EntityValue,
563 {
564 self.execute_save_view::<E>(|save| save.insert_view(view))
565 }
566
567 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
568 where
569 E: EntityKind<Canister = C> + EntityValue,
570 {
571 self.execute_save_view::<E>(|save| save.replace_view(view))
572 }
573
574 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
575 where
576 E: EntityKind<Canister = C> + EntityValue,
577 {
578 self.execute_save_view::<E>(|save| save.update_view(view))
579 }
580
581 #[cfg(test)]
583 #[doc(hidden)]
584 pub fn clear_stores_for_tests(&self) {
585 self.db.with_store_registry(|reg| {
586 for (_, store) in reg.iter() {
587 store.with_data_mut(DataStore::clear);
588 store.with_index_mut(IndexStore::clear);
589 }
590 });
591 }
592}