1mod commit;
2pub(crate) mod executor;
3pub mod identity;
4pub mod index;
5pub mod query;
6pub mod response;
7pub mod store;
8pub mod traits;
9pub mod types;
10mod write;
11
12pub(crate) use commit::*;
13pub(crate) use write::WriteUnit;
14
15use crate::{
16 db::{
17 executor::{Context, DeleteExecutor, LoadExecutor, SaveExecutor},
18 index::IndexStoreRegistry,
19 query::{
20 Query, QueryError, QueryMode, ReadConsistency, SessionDeleteQuery, SessionLoadQuery,
21 diagnostics::{
22 QueryDiagnostics, QueryExecutionDiagnostics, QueryTraceExecutorKind, finish_event,
23 start_event, trace_access_from_plan,
24 },
25 },
26 response::Response,
27 store::DataStoreRegistry,
28 },
29 error::InternalError,
30 obs::sink::{self, MetricsSink},
31 traits::{CanisterKind, EntityKind},
32};
33use std::{marker::PhantomData, thread::LocalKey};
34
35pub struct Db<C: CanisterKind> {
41 data: &'static LocalKey<DataStoreRegistry>,
42 index: &'static LocalKey<IndexStoreRegistry>,
43 _marker: PhantomData<C>,
44}
45
46impl<C: CanisterKind> Db<C> {
47 #[must_use]
48 pub const fn new(
49 data: &'static LocalKey<DataStoreRegistry>,
50 index: &'static LocalKey<IndexStoreRegistry>,
51 ) -> Self {
52 Self {
53 data,
54 index,
55 _marker: PhantomData,
56 }
57 }
58
59 #[must_use]
60 pub const fn context<E>(&self) -> Context<'_, E>
61 where
62 E: EntityKind<Canister = C>,
63 {
64 Context::new(self)
65 }
66
67 pub fn with_data<R>(&self, f: impl FnOnce(&DataStoreRegistry) -> R) -> R {
68 self.data.with(|reg| f(reg))
69 }
70
71 pub fn with_index<R>(&self, f: impl FnOnce(&IndexStoreRegistry) -> R) -> R {
72 self.index.with(|reg| f(reg))
73 }
74}
75
76impl<C: CanisterKind> Copy for Db<C> {}
77impl<C: CanisterKind> Clone for Db<C> {
78 fn clone(&self) -> Self {
79 *self
80 }
81}
82
83pub struct DbSession<C: CanisterKind> {
89 db: Db<C>,
90 debug: bool,
91 metrics: Option<&'static dyn MetricsSink>,
92}
93
94impl<C: CanisterKind> DbSession<C> {
95 #[must_use]
96 pub const fn new(db: Db<C>) -> Self {
97 Self {
98 db,
99 debug: false,
100 metrics: None,
101 }
102 }
103
104 #[must_use]
105 pub const fn debug(mut self) -> Self {
106 self.debug = true;
107 self
108 }
109
110 #[must_use]
111 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
112 self.metrics = Some(sink);
113 self
114 }
115
116 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
117 if let Some(sink) = self.metrics {
118 sink::with_metrics_sink(sink, f)
119 } else {
120 f()
121 }
122 }
123
124 #[must_use]
129 pub const fn load<E>(&self) -> SessionLoadQuery<'_, C, E>
130 where
131 E: EntityKind<Canister = C>,
132 {
133 SessionLoadQuery::new(self, Query::new(ReadConsistency::MissingOk))
134 }
135
136 #[must_use]
137 pub const fn load_with_consistency<E>(
138 &self,
139 consistency: ReadConsistency,
140 ) -> SessionLoadQuery<'_, C, E>
141 where
142 E: EntityKind<Canister = C>,
143 {
144 SessionLoadQuery::new(self, Query::new(consistency))
145 }
146
147 #[must_use]
148 pub const fn delete<E>(&self) -> SessionDeleteQuery<'_, C, E>
149 where
150 E: EntityKind<Canister = C>,
151 {
152 SessionDeleteQuery::new(self, Query::new(ReadConsistency::MissingOk).delete())
153 }
154
155 #[must_use]
156 pub const fn delete_with_consistency<E>(
157 &self,
158 consistency: ReadConsistency,
159 ) -> SessionDeleteQuery<'_, C, E>
160 where
161 E: EntityKind<Canister = C>,
162 {
163 SessionDeleteQuery::new(self, Query::new(consistency).delete())
164 }
165
166 #[must_use]
171 pub(crate) const fn load_executor<E>(&self) -> LoadExecutor<E>
172 where
173 E: EntityKind<Canister = C>,
174 {
175 LoadExecutor::new(self.db, self.debug)
176 }
177
178 #[must_use]
179 pub(crate) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
180 where
181 E: EntityKind<Canister = C>,
182 {
183 DeleteExecutor::new(self.db, self.debug)
184 }
185
186 #[must_use]
187 pub(crate) const fn save_executor<E>(&self) -> SaveExecutor<E>
188 where
189 E: EntityKind<Canister = C>,
190 {
191 SaveExecutor::new(self.db, self.debug)
192 }
193
194 pub fn diagnose_query<E: EntityKind<Canister = C>>(
199 &self,
200 query: &Query<E>,
201 ) -> Result<QueryDiagnostics, QueryError> {
202 let explain = query.explain()?;
203 Ok(QueryDiagnostics::from(explain))
204 }
205
206 pub fn execute_query<E: EntityKind<Canister = C>>(
207 &self,
208 query: &Query<E>,
209 ) -> Result<Response<E>, QueryError> {
210 let plan = query.plan()?;
211 let result = match query.mode() {
212 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
213 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
214 };
215
216 result.map_err(QueryError::Execute)
217 }
218
219 pub fn execute_with_diagnostics<E: EntityKind<Canister = C>>(
220 &self,
221 query: &Query<E>,
222 ) -> Result<(Response<E>, QueryExecutionDiagnostics), QueryError> {
223 let plan = query.plan()?;
224 let fingerprint = plan.fingerprint();
225 let access = Some(trace_access_from_plan(plan.access()));
226 let executor = match query.mode() {
227 QueryMode::Load(_) => QueryTraceExecutorKind::Load,
228 QueryMode::Delete(_) => QueryTraceExecutorKind::Delete,
229 };
230
231 let start = start_event(fingerprint, access, executor);
232 let result = match query.mode() {
233 QueryMode::Load(_) => self.with_metrics(|| self.load_executor::<E>().execute(plan)),
234 QueryMode::Delete(_) => self.with_metrics(|| self.delete_executor::<E>().execute(plan)),
235 };
236
237 match result {
238 Ok(response) => {
239 let rows = u64::try_from(response.0.len()).unwrap_or(u64::MAX);
240 let finish = finish_event(fingerprint, access, executor, rows);
241 Ok((
242 response,
243 QueryExecutionDiagnostics {
244 fingerprint,
245 events: vec![start, finish],
246 },
247 ))
248 }
249 Err(err) => Err(QueryError::Execute(err)),
250 }
251 }
252
253 pub fn insert<E>(&self, entity: E) -> Result<E, InternalError>
258 where
259 E: EntityKind<Canister = C>,
260 {
261 self.with_metrics(|| self.save_executor::<E>().insert(entity))
262 }
263
264 pub fn insert_many<E>(
265 &self,
266 entities: impl IntoIterator<Item = E>,
267 ) -> Result<Vec<E>, InternalError>
268 where
269 E: EntityKind<Canister = C>,
270 {
271 self.with_metrics(|| self.save_executor::<E>().insert_many(entities))
272 }
273
274 pub fn replace<E>(&self, entity: E) -> Result<E, InternalError>
275 where
276 E: EntityKind<Canister = C>,
277 {
278 self.with_metrics(|| self.save_executor::<E>().replace(entity))
279 }
280
281 pub fn replace_many<E>(
282 &self,
283 entities: impl IntoIterator<Item = E>,
284 ) -> Result<Vec<E>, InternalError>
285 where
286 E: EntityKind<Canister = C>,
287 {
288 self.with_metrics(|| self.save_executor::<E>().replace_many(entities))
289 }
290
291 pub fn update<E>(&self, entity: E) -> Result<E, InternalError>
292 where
293 E: EntityKind<Canister = C>,
294 {
295 self.with_metrics(|| self.save_executor::<E>().update(entity))
296 }
297
298 pub fn update_many<E>(
299 &self,
300 entities: impl IntoIterator<Item = E>,
301 ) -> Result<Vec<E>, InternalError>
302 where
303 E: EntityKind<Canister = C>,
304 {
305 self.with_metrics(|| self.save_executor::<E>().update_many(entities))
306 }
307
308 pub fn insert_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
309 where
310 E: EntityKind<Canister = C>,
311 {
312 self.with_metrics(|| self.save_executor::<E>().insert_view(view))
313 }
314
315 pub fn replace_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
316 where
317 E: EntityKind<Canister = C>,
318 {
319 self.with_metrics(|| self.save_executor::<E>().replace_view(view))
320 }
321
322 pub fn update_view<E>(&self, view: E::ViewType) -> Result<E::ViewType, InternalError>
323 where
324 E: EntityKind<Canister = C>,
325 {
326 self.with_metrics(|| self.save_executor::<E>().update_view(view))
327 }
328}