1#[cfg(test)]
3use crate::db::{DataStore, IndexStore};
4use crate::{
5 db::{
6 Db, FluentDeleteQuery, FluentLoadQuery, PagedLoadExecutionWithTrace, PlanError, Query,
7 QueryError, ReadConsistency, Response, WriteBatchResponse, WriteResponse, decode_cursor,
8 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
9 query::{intent::QueryMode, plan::CursorPlanError},
10 },
11 error::InternalError,
12 obs::sink::{MetricsSink, with_metrics_sink},
13 traits::{CanisterKind, EntityKind, EntityValue},
14 types::Id,
15};
16
17pub struct DbSession<C: CanisterKind> {
24 db: Db<C>,
25 debug: bool,
26 metrics: Option<&'static dyn MetricsSink>,
27}
28
29impl<C: CanisterKind> DbSession<C> {
30 #[must_use]
31 pub const fn new(db: Db<C>) -> Self {
32 Self {
33 db,
34 debug: false,
35 metrics: None,
36 }
37 }
38
39 #[must_use]
40 pub const fn debug(mut self) -> Self {
41 self.debug = true;
42 self
43 }
44
45 #[must_use]
46 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
47 self.metrics = Some(sink);
48 self
49 }
50
51 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
52 if let Some(sink) = self.metrics {
53 with_metrics_sink(sink, f)
54 } else {
55 f()
56 }
57 }
58
59 fn execute_save_with<E, T, R>(
61 &self,
62 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
63 map: impl FnOnce(T) -> R,
64 ) -> Result<R, InternalError>
65 where
66 E: EntityKind<Canister = C> + EntityValue,
67 {
68 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
69
70 Ok(map(value))
71 }
72
73 fn execute_save_entity<E>(
75 &self,
76 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
77 ) -> Result<WriteResponse<E>, InternalError>
78 where
79 E: EntityKind<Canister = C> + EntityValue,
80 {
81 self.execute_save_with(op, WriteResponse::new)
82 }
83
84 fn execute_save_batch<E>(
85 &self,
86 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
87 ) -> Result<WriteBatchResponse<E>, InternalError>
88 where
89 E: EntityKind<Canister = C> + EntityValue,
90 {
91 self.execute_save_with(op, WriteBatchResponse::new)
92 }
93
94 fn execute_save_view<E>(
95 &self,
96 op: impl FnOnce(SaveExecutor<E>) -> Result<E::ViewType, InternalError>,
97 ) -> Result<E::ViewType, InternalError>
98 where
99 E: EntityKind<Canister = C> + EntityValue,
100 {
101 self.execute_save_with(op, std::convert::identity)
102 }
103
104 #[must_use]
109 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
110 where
111 E: EntityKind<Canister = C>,
112 {
113 FluentLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
114 }
115
116 #[must_use]
117 pub const fn load_with_consistency<E>(
118 &self,
119 consistency: ReadConsistency,
120 ) -> FluentLoadQuery<'_, E>
121 where
122 E: EntityKind<Canister = C>,
123 {
124 FluentLoadQuery::new(self, Query::new(consistency))
125 }
126
127 #[must_use]
128 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
129 where
130 E: EntityKind<Canister = C>,
131 {
132 FluentDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
133 }
134
135 #[must_use]
136 pub fn delete_with_consistency<E>(
137 &self,
138 consistency: ReadConsistency,
139 ) -> FluentDeleteQuery<'_, E>
140 where
141 E: EntityKind<Canister = C>,
142 {
143 FluentDeleteQuery::new(self, Query::new(consistency).delete())
144 }
145
146 #[must_use]
151 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
152 where
153 E: EntityKind<Canister = C> + EntityValue,
154 {
155 LoadExecutor::new(self.db, self.debug)
156 }
157
158 #[must_use]
159 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
160 where
161 E: EntityKind<Canister = C> + EntityValue,
162 {
163 DeleteExecutor::new(self.db, self.debug)
164 }
165
166 #[must_use]
167 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
168 where
169 E: EntityKind<Canister = C> + EntityValue,
170 {
171 SaveExecutor::new(self.db, self.debug)
172 }
173
174 pub fn execute_query<E>(&self, query: &Query<E>) -> Result<Response<E>, QueryError>
179 where
180 E: EntityKind<Canister = C> + EntityValue,
181 {
182 let plan = query.plan()?;
183
184 let result = match query.mode() {
185 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
186 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
187 };
188
189 result.map_err(QueryError::Execute)
190 }
191
192 pub(crate) fn execute_load_query_count<E>(&self, query: &Query<E>) -> Result<u32, QueryError>
193 where
194 E: EntityKind<Canister = C> + EntityValue,
195 {
196 let plan = query.plan()?;
197
198 self.with_metrics(|| self.load_executor::<E>().aggregate_count(plan))
199 .map_err(QueryError::Execute)
200 }
201
202 pub(crate) fn execute_load_query_exists<E>(&self, query: &Query<E>) -> Result<bool, QueryError>
203 where
204 E: EntityKind<Canister = C> + EntityValue,
205 {
206 let plan = query.plan()?;
207
208 self.with_metrics(|| self.load_executor::<E>().aggregate_exists(plan))
209 .map_err(QueryError::Execute)
210 }
211
212 pub(crate) fn execute_load_query_min<E>(
213 &self,
214 query: &Query<E>,
215 ) -> Result<Option<Id<E>>, QueryError>
216 where
217 E: EntityKind<Canister = C> + EntityValue,
218 {
219 let plan = query.plan()?;
220
221 self.with_metrics(|| self.load_executor::<E>().aggregate_min(plan))
222 .map_err(QueryError::Execute)
223 }
224
225 pub(crate) fn execute_load_query_max<E>(
226 &self,
227 query: &Query<E>,
228 ) -> Result<Option<Id<E>>, QueryError>
229 where
230 E: EntityKind<Canister = C> + EntityValue,
231 {
232 let plan = query.plan()?;
233
234 self.with_metrics(|| self.load_executor::<E>().aggregate_max(plan))
235 .map_err(QueryError::Execute)
236 }
237
238 pub(crate) fn execute_load_query_paged_with_trace<E>(
239 &self,
240 query: &Query<E>,
241 cursor_token: Option<&str>,
242 ) -> Result<PagedLoadExecutionWithTrace<E>, QueryError>
243 where
244 E: EntityKind<Canister = C> + EntityValue,
245 {
246 let plan = query.plan()?;
247 let cursor_bytes = match cursor_token {
248 Some(token) => Some(decode_cursor(token).map_err(|reason| {
249 QueryError::from(PlanError::from(
250 CursorPlanError::InvalidContinuationCursor { reason },
251 ))
252 })?),
253 None => None,
254 };
255 let cursor = plan.plan_cursor(cursor_bytes.as_deref())?;
256
257 let (page, trace) = self
258 .with_metrics(|| {
259 self.load_executor::<E>()
260 .execute_paged_with_cursor_traced(plan, cursor)
261 })
262 .map_err(QueryError::Execute)?;
263
264 Ok((page.items, page.next_cursor, trace))
265 }
266
267 pub fn insert<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
272 where
273 E: EntityKind<Canister = C> + EntityValue,
274 {
275 self.execute_save_entity(|save| save.insert(entity))
276 }
277
278 pub fn insert_many_atomic<E>(
284 &self,
285 entities: impl IntoIterator<Item = E>,
286 ) -> Result<WriteBatchResponse<E>, InternalError>
287 where
288 E: EntityKind<Canister = C> + EntityValue,
289 {
290 self.execute_save_batch(|save| save.insert_many_atomic(entities))
291 }
292
293 pub fn insert_many_non_atomic<E>(
297 &self,
298 entities: impl IntoIterator<Item = E>,
299 ) -> Result<WriteBatchResponse<E>, InternalError>
300 where
301 E: EntityKind<Canister = C> + EntityValue,
302 {
303 self.execute_save_batch(|save| save.insert_many_non_atomic(entities))
304 }
305
306 pub fn replace<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
307 where
308 E: EntityKind<Canister = C> + EntityValue,
309 {
310 self.execute_save_entity(|save| save.replace(entity))
311 }
312
313 pub fn replace_many_atomic<E>(
319 &self,
320 entities: impl IntoIterator<Item = E>,
321 ) -> Result<WriteBatchResponse<E>, InternalError>
322 where
323 E: EntityKind<Canister = C> + EntityValue,
324 {
325 self.execute_save_batch(|save| save.replace_many_atomic(entities))
326 }
327
328 pub fn replace_many_non_atomic<E>(
332 &self,
333 entities: impl IntoIterator<Item = E>,
334 ) -> Result<WriteBatchResponse<E>, InternalError>
335 where
336 E: EntityKind<Canister = C> + EntityValue,
337 {
338 self.execute_save_batch(|save| save.replace_many_non_atomic(entities))
339 }
340
341 pub fn update<E>(&self, entity: E) -> Result<WriteResponse<E>, InternalError>
342 where
343 E: EntityKind<Canister = C> + EntityValue,
344 {
345 self.execute_save_entity(|save| save.update(entity))
346 }
347
348 pub fn update_many_atomic<E>(
354 &self,
355 entities: impl IntoIterator<Item = E>,
356 ) -> Result<WriteBatchResponse<E>, InternalError>
357 where
358 E: EntityKind<Canister = C> + EntityValue,
359 {
360 self.execute_save_batch(|save| save.update_many_atomic(entities))
361 }
362
363 pub fn update_many_non_atomic<E>(
367 &self,
368 entities: impl IntoIterator<Item = E>,
369 ) -> Result<WriteBatchResponse<E>, InternalError>
370 where
371 E: EntityKind<Canister = C> + EntityValue,
372 {
373 self.execute_save_batch(|save| save.update_many_non_atomic(entities))
374 }
375
376 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
377 where
378 E: EntityKind<Canister = C> + EntityValue,
379 {
380 self.execute_save_view::<E>(|save| save.insert_view(view))
381 }
382
383 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
384 where
385 E: EntityKind<Canister = C> + EntityValue,
386 {
387 self.execute_save_view::<E>(|save| save.replace_view(view))
388 }
389
390 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
391 where
392 E: EntityKind<Canister = C> + EntityValue,
393 {
394 self.execute_save_view::<E>(|save| save.update_view(view))
395 }
396
397 #[cfg(test)]
399 #[doc(hidden)]
400 pub fn clear_stores_for_tests(&self) {
401 self.db.with_store_registry(|reg| {
402 for (_, store) in reg.iter() {
403 store.with_data_mut(DataStore::clear);
404 store.with_index_mut(IndexStore::clear);
405 }
406 });
407 }
408}