1mod query;
7#[cfg(feature = "sql")]
8mod sql;
9#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17 db::{
18 Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19 IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20 PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21 commit::EntityRuntimeHooks,
22 cursor::{decode_optional_cursor_token, decode_optional_grouped_cursor_token},
23 data::DataKey,
24 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
25 schema::{
26 describe_entity_model, show_indexes_for_model,
27 show_indexes_for_model_with_runtime_state,
28 },
29 },
30 error::InternalError,
31 metrics::sink::{MetricsSink, with_metrics_sink},
32 model::entity::EntityModel,
33 traits::{CanisterKind, EntityKind, EntityValue, Path},
34 value::Value,
35};
36use std::thread::LocalKey;
37
38#[cfg(feature = "sql")]
39pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
40
41fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
44 decode_optional_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
45}
46
47fn decode_optional_grouped_cursor(
50 cursor_token: Option<&str>,
51) -> Result<Option<crate::db::cursor::GroupedContinuationToken>, QueryError> {
52 decode_optional_grouped_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
53}
54
55pub struct DbSession<C: CanisterKind> {
62 db: Db<C>,
63 debug: bool,
64 metrics: Option<&'static dyn MetricsSink>,
65}
66
67impl<C: CanisterKind> DbSession<C> {
68 #[must_use]
70 pub(crate) const fn new(db: Db<C>) -> Self {
71 Self {
72 db,
73 debug: false,
74 metrics: None,
75 }
76 }
77
78 #[must_use]
80 pub const fn new_with_hooks(
81 store: &'static LocalKey<StoreRegistry>,
82 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
83 ) -> Self {
84 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
85 }
86
87 #[must_use]
89 pub const fn debug(mut self) -> Self {
90 self.debug = true;
91 self
92 }
93
94 #[must_use]
96 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
97 self.metrics = Some(sink);
98 self
99 }
100
101 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
102 if let Some(sink) = self.metrics {
103 with_metrics_sink(sink, f)
104 } else {
105 f()
106 }
107 }
108
109 fn execute_save_with<E, T, R>(
111 &self,
112 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
113 map: impl FnOnce(T) -> R,
114 ) -> Result<R, InternalError>
115 where
116 E: PersistedRow<Canister = C> + EntityValue,
117 {
118 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
119
120 Ok(map(value))
121 }
122
123 fn execute_save_entity<E>(
125 &self,
126 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
127 ) -> Result<E, InternalError>
128 where
129 E: PersistedRow<Canister = C> + EntityValue,
130 {
131 self.execute_save_with(op, std::convert::identity)
132 }
133
134 fn execute_save_batch<E>(
135 &self,
136 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
137 ) -> Result<WriteBatchResponse<E>, InternalError>
138 where
139 E: PersistedRow<Canister = C> + EntityValue,
140 {
141 self.execute_save_with(op, WriteBatchResponse::new)
142 }
143
144 #[must_use]
150 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
151 where
152 E: EntityKind<Canister = C>,
153 {
154 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
155 }
156
157 #[must_use]
159 pub const fn load_with_consistency<E>(
160 &self,
161 consistency: MissingRowPolicy,
162 ) -> FluentLoadQuery<'_, E>
163 where
164 E: EntityKind<Canister = C>,
165 {
166 FluentLoadQuery::new(self, Query::new(consistency))
167 }
168
169 #[must_use]
171 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
172 where
173 E: PersistedRow<Canister = C>,
174 {
175 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
176 }
177
178 #[must_use]
180 pub fn delete_with_consistency<E>(
181 &self,
182 consistency: MissingRowPolicy,
183 ) -> FluentDeleteQuery<'_, E>
184 where
185 E: PersistedRow<Canister = C>,
186 {
187 FluentDeleteQuery::new(self, Query::new(consistency).delete())
188 }
189
190 #[must_use]
194 pub const fn select_one(&self) -> Value {
195 Value::Int(1)
196 }
197
198 #[must_use]
205 pub fn show_indexes<E>(&self) -> Vec<String>
206 where
207 E: EntityKind<Canister = C>,
208 {
209 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
210 }
211
212 #[must_use]
218 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
219 show_indexes_for_model(model)
220 }
221
222 pub(in crate::db) fn show_indexes_for_store_model(
226 &self,
227 store_path: &str,
228 model: &'static EntityModel,
229 ) -> Vec<String> {
230 let runtime_state = self.try_index_state_for_store_path(store_path);
231
232 show_indexes_for_model_with_runtime_state(model, runtime_state)
233 }
234
235 #[must_use]
237 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
238 where
239 E: EntityKind<Canister = C>,
240 {
241 self.show_columns_for_model(E::MODEL)
242 }
243
244 #[must_use]
246 pub fn show_columns_for_model(
247 &self,
248 model: &'static EntityModel,
249 ) -> Vec<EntityFieldDescription> {
250 describe_entity_model(model).fields().to_vec()
251 }
252
253 #[must_use]
255 pub fn show_entities(&self) -> Vec<String> {
256 self.db.runtime_entity_names()
257 }
258
259 fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
264 self.db
265 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
266 .map(|store| store.index_state())
267 }
268
269 #[must_use]
274 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
275 where
276 E: EntityKind<Canister = C>,
277 {
278 self.describe_entity_model(E::MODEL)
279 }
280
281 #[must_use]
283 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
284 describe_entity_model(model)
285 }
286
287 pub fn storage_report(
289 &self,
290 name_to_path: &[(&'static str, &'static str)],
291 ) -> Result<StorageReport, InternalError> {
292 self.db.storage_report(name_to_path)
293 }
294
295 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
297 self.db.storage_report_default()
298 }
299
300 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
302 self.db.integrity_report()
303 }
304
305 pub fn execute_migration_plan(
310 &self,
311 plan: &MigrationPlan,
312 max_steps: usize,
313 ) -> Result<MigrationRunOutcome, InternalError> {
314 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
315 }
316
317 #[must_use]
322 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
323 where
324 E: EntityKind<Canister = C> + EntityValue,
325 {
326 LoadExecutor::new(self.db, self.debug)
327 }
328
329 #[must_use]
330 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
331 where
332 E: PersistedRow<Canister = C> + EntityValue,
333 {
334 DeleteExecutor::new(self.db, self.debug)
335 }
336
337 #[must_use]
338 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
339 where
340 E: PersistedRow<Canister = C> + EntityValue,
341 {
342 SaveExecutor::new(self.db, self.debug)
343 }
344}
345
346#[doc(hidden)]
351pub fn debug_remove_entity_row_data_only<C, E>(
352 session: &DbSession<C>,
353 key: &E::Key,
354) -> Result<bool, InternalError>
355where
356 C: CanisterKind,
357 E: PersistedRow<Canister = C> + EntityValue,
358{
359 let store = session.db.recovered_store(E::Store::PATH)?;
362
363 let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
366 let raw_key = data_key.to_raw()?;
367 let storage_key = data_key.storage_key();
368
369 let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
373 if !removed {
374 return Ok(false);
375 }
376
377 store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
378 store.mark_secondary_existence_witness_authoritative();
379
380 Ok(true)
381}
382
383#[doc(hidden)]
389pub fn debug_mark_store_index_state<C>(
390 session: &DbSession<C>,
391 store_path: &str,
392 state: IndexState,
393) -> Result<(), InternalError>
394where
395 C: CanisterKind,
396{
397 let store = session.db.recovered_store(store_path)?;
400
401 match state {
405 IndexState::Building => store.mark_index_building(),
406 IndexState::Valid => store.mark_index_valid(),
407 IndexState::Dropping => store.mark_index_dropping(),
408 }
409
410 Ok(())
411}