1mod query;
7#[cfg(feature = "sql")]
8mod sql;
9#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17 db::{
18 Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19 IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy, PersistedRow, Query,
20 QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21 commit::EntityRuntimeHooks,
22 cursor::{decode_optional_cursor_token, decode_optional_grouped_cursor_token},
23 data::DataKey,
24 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
25 schema::{describe_entity_model, show_indexes_for_model},
26 },
27 error::InternalError,
28 metrics::sink::{MetricsSink, with_metrics_sink},
29 model::entity::EntityModel,
30 traits::{CanisterKind, EntityKind, EntityValue, Path},
31 value::Value,
32};
33use std::thread::LocalKey;
34
35#[cfg(feature = "sql")]
36pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
37
38fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
41 decode_optional_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
42}
43
44fn decode_optional_grouped_cursor(
47 cursor_token: Option<&str>,
48) -> Result<Option<crate::db::cursor::GroupedContinuationToken>, QueryError> {
49 decode_optional_grouped_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
50}
51
52pub struct DbSession<C: CanisterKind> {
59 db: Db<C>,
60 debug: bool,
61 metrics: Option<&'static dyn MetricsSink>,
62}
63
64impl<C: CanisterKind> DbSession<C> {
65 #[must_use]
67 pub(crate) const fn new(db: Db<C>) -> Self {
68 Self {
69 db,
70 debug: false,
71 metrics: None,
72 }
73 }
74
75 #[must_use]
77 pub const fn new_with_hooks(
78 store: &'static LocalKey<StoreRegistry>,
79 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
80 ) -> Self {
81 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
82 }
83
84 #[must_use]
86 pub const fn debug(mut self) -> Self {
87 self.debug = true;
88 self
89 }
90
91 #[must_use]
93 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
94 self.metrics = Some(sink);
95 self
96 }
97
98 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
99 if let Some(sink) = self.metrics {
100 with_metrics_sink(sink, f)
101 } else {
102 f()
103 }
104 }
105
106 fn execute_save_with<E, T, R>(
108 &self,
109 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
110 map: impl FnOnce(T) -> R,
111 ) -> Result<R, InternalError>
112 where
113 E: PersistedRow<Canister = C> + EntityValue,
114 {
115 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
116
117 Ok(map(value))
118 }
119
120 fn execute_save_entity<E>(
122 &self,
123 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
124 ) -> Result<E, InternalError>
125 where
126 E: PersistedRow<Canister = C> + EntityValue,
127 {
128 self.execute_save_with(op, std::convert::identity)
129 }
130
131 fn execute_save_batch<E>(
132 &self,
133 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
134 ) -> Result<WriteBatchResponse<E>, InternalError>
135 where
136 E: PersistedRow<Canister = C> + EntityValue,
137 {
138 self.execute_save_with(op, WriteBatchResponse::new)
139 }
140
141 #[must_use]
147 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
148 where
149 E: EntityKind<Canister = C>,
150 {
151 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
152 }
153
154 #[must_use]
156 pub const fn load_with_consistency<E>(
157 &self,
158 consistency: MissingRowPolicy,
159 ) -> FluentLoadQuery<'_, E>
160 where
161 E: EntityKind<Canister = C>,
162 {
163 FluentLoadQuery::new(self, Query::new(consistency))
164 }
165
166 #[must_use]
168 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
169 where
170 E: PersistedRow<Canister = C>,
171 {
172 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
173 }
174
175 #[must_use]
177 pub fn delete_with_consistency<E>(
178 &self,
179 consistency: MissingRowPolicy,
180 ) -> FluentDeleteQuery<'_, E>
181 where
182 E: PersistedRow<Canister = C>,
183 {
184 FluentDeleteQuery::new(self, Query::new(consistency).delete())
185 }
186
187 #[must_use]
191 pub const fn select_one(&self) -> Value {
192 Value::Int(1)
193 }
194
195 #[must_use]
202 pub fn show_indexes<E>(&self) -> Vec<String>
203 where
204 E: EntityKind<Canister = C>,
205 {
206 self.show_indexes_for_model(E::MODEL)
207 }
208
209 #[must_use]
211 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
212 show_indexes_for_model(model)
213 }
214
215 #[must_use]
217 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
218 where
219 E: EntityKind<Canister = C>,
220 {
221 self.show_columns_for_model(E::MODEL)
222 }
223
224 #[must_use]
226 pub fn show_columns_for_model(
227 &self,
228 model: &'static EntityModel,
229 ) -> Vec<EntityFieldDescription> {
230 describe_entity_model(model).fields().to_vec()
231 }
232
233 #[must_use]
235 pub fn show_entities(&self) -> Vec<String> {
236 self.db.runtime_entity_names()
237 }
238
239 #[must_use]
244 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
245 where
246 E: EntityKind<Canister = C>,
247 {
248 self.describe_entity_model(E::MODEL)
249 }
250
251 #[must_use]
253 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
254 describe_entity_model(model)
255 }
256
257 pub fn storage_report(
259 &self,
260 name_to_path: &[(&'static str, &'static str)],
261 ) -> Result<StorageReport, InternalError> {
262 self.db.storage_report(name_to_path)
263 }
264
265 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
267 self.db.storage_report_default()
268 }
269
270 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
272 self.db.integrity_report()
273 }
274
275 pub fn execute_migration_plan(
280 &self,
281 plan: &MigrationPlan,
282 max_steps: usize,
283 ) -> Result<MigrationRunOutcome, InternalError> {
284 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
285 }
286
287 #[must_use]
292 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
293 where
294 E: EntityKind<Canister = C> + EntityValue,
295 {
296 LoadExecutor::new(self.db, self.debug)
297 }
298
299 #[must_use]
300 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
301 where
302 E: PersistedRow<Canister = C> + EntityValue,
303 {
304 DeleteExecutor::new(self.db, self.debug)
305 }
306
307 #[must_use]
308 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
309 where
310 E: PersistedRow<Canister = C> + EntityValue,
311 {
312 SaveExecutor::new(self.db, self.debug)
313 }
314}
315
316#[doc(hidden)]
321pub fn debug_remove_entity_row_data_only<C, E>(
322 session: &DbSession<C>,
323 key: &E::Key,
324) -> Result<bool, InternalError>
325where
326 C: CanisterKind,
327 E: PersistedRow<Canister = C> + EntityValue,
328{
329 let store = session.db.recovered_store(E::Store::PATH)?;
332
333 let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
336 let raw_key = data_key.to_raw()?;
337 let storage_key = data_key.storage_key();
338
339 let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
343 if !removed {
344 return Ok(false);
345 }
346
347 store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
348 store.mark_secondary_existence_witness_authoritative();
349
350 Ok(true)
351}