1mod query;
7#[cfg(feature = "sql")]
8mod sql;
9#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17 db::{
18 Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19 IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20 PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21 commit::EntityRuntimeHooks,
22 data::DataKey,
23 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
24 query::plan::VisibleIndexes,
25 schema::{
26 describe_entity_model, show_indexes_for_model,
27 show_indexes_for_model_with_runtime_state,
28 },
29 },
30 error::InternalError,
31 metrics::sink::{MetricsSink, with_metrics_sink},
32 model::entity::EntityModel,
33 traits::{CanisterKind, EntityKind, EntityValue, Path},
34 value::Value,
35};
36use std::thread::LocalKey;
37
38#[cfg(feature = "diagnostics")]
39pub use query::QueryExecutionAttribution;
40#[cfg(all(feature = "sql", feature = "diagnostics"))]
41pub use sql::SqlQueryExecutionAttribution;
42#[cfg(feature = "sql")]
43pub use sql::SqlStatementResult;
44#[cfg(all(feature = "sql", feature = "diagnostics"))]
45pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
46
47pub struct DbSession<C: CanisterKind> {
54 db: Db<C>,
55 debug: bool,
56 metrics: Option<&'static dyn MetricsSink>,
57}
58
59impl<C: CanisterKind> DbSession<C> {
60 #[must_use]
62 pub(crate) const fn new(db: Db<C>) -> Self {
63 Self {
64 db,
65 debug: false,
66 metrics: None,
67 }
68 }
69
70 #[must_use]
72 pub const fn new_with_hooks(
73 store: &'static LocalKey<StoreRegistry>,
74 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
75 ) -> Self {
76 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
77 }
78
79 #[must_use]
81 pub const fn debug(mut self) -> Self {
82 self.debug = true;
83 self
84 }
85
86 #[must_use]
88 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
89 self.metrics = Some(sink);
90 self
91 }
92
93 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
96 where
97 E: EntityKind<Canister = C>,
98 {
99 FluentLoadQuery::new(self, Query::new(consistency))
100 }
101
102 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
106 where
107 E: PersistedRow<Canister = C>,
108 {
109 FluentDeleteQuery::new(self, Query::new(consistency).delete())
110 }
111
112 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
113 if let Some(sink) = self.metrics {
114 with_metrics_sink(sink, f)
115 } else {
116 f()
117 }
118 }
119
120 fn execute_save_with<E, T, R>(
122 &self,
123 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
124 map: impl FnOnce(T) -> R,
125 ) -> Result<R, InternalError>
126 where
127 E: PersistedRow<Canister = C> + EntityValue,
128 {
129 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
130
131 Ok(map(value))
132 }
133
134 fn execute_save_entity<E>(
136 &self,
137 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
138 ) -> Result<E, InternalError>
139 where
140 E: PersistedRow<Canister = C> + EntityValue,
141 {
142 self.execute_save_with(op, std::convert::identity)
143 }
144
145 fn execute_save_batch<E>(
146 &self,
147 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
148 ) -> Result<WriteBatchResponse<E>, InternalError>
149 where
150 E: PersistedRow<Canister = C> + EntityValue,
151 {
152 self.execute_save_with(op, WriteBatchResponse::new)
153 }
154
155 #[must_use]
161 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
162 where
163 E: EntityKind<Canister = C>,
164 {
165 self.fluent_load_query(MissingRowPolicy::Ignore)
166 }
167
168 #[must_use]
170 pub const fn load_with_consistency<E>(
171 &self,
172 consistency: MissingRowPolicy,
173 ) -> FluentLoadQuery<'_, E>
174 where
175 E: EntityKind<Canister = C>,
176 {
177 self.fluent_load_query(consistency)
178 }
179
180 #[must_use]
182 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
183 where
184 E: PersistedRow<Canister = C>,
185 {
186 self.fluent_delete_query(MissingRowPolicy::Ignore)
187 }
188
189 #[must_use]
191 pub fn delete_with_consistency<E>(
192 &self,
193 consistency: MissingRowPolicy,
194 ) -> FluentDeleteQuery<'_, E>
195 where
196 E: PersistedRow<Canister = C>,
197 {
198 self.fluent_delete_query(consistency)
199 }
200
201 #[must_use]
205 pub const fn select_one(&self) -> Value {
206 Value::Int(1)
207 }
208
209 #[must_use]
216 pub fn show_indexes<E>(&self) -> Vec<String>
217 where
218 E: EntityKind<Canister = C>,
219 {
220 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
221 }
222
223 #[must_use]
229 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
230 show_indexes_for_model(model)
231 }
232
233 pub(in crate::db) fn show_indexes_for_store_model(
237 &self,
238 store_path: &str,
239 model: &'static EntityModel,
240 ) -> Vec<String> {
241 let runtime_state = self.try_index_state_for_store_path(store_path);
242
243 show_indexes_for_model_with_runtime_state(model, runtime_state)
244 }
245
246 #[must_use]
248 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
249 where
250 E: EntityKind<Canister = C>,
251 {
252 self.show_columns_for_model(E::MODEL)
253 }
254
255 #[must_use]
257 pub fn show_columns_for_model(
258 &self,
259 model: &'static EntityModel,
260 ) -> Vec<EntityFieldDescription> {
261 describe_entity_model(model).fields().to_vec()
262 }
263
264 #[must_use]
266 pub fn show_entities(&self) -> Vec<String> {
267 self.db.runtime_entity_names()
268 }
269
270 #[must_use]
275 pub fn show_tables(&self) -> Vec<String> {
276 self.show_entities()
277 }
278
279 fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
284 self.db
285 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
286 .map(|store| store.index_state())
287 }
288
289 fn visible_indexes_for_store_model(
292 &self,
293 store_path: &str,
294 model: &'static EntityModel,
295 ) -> Result<VisibleIndexes<'static>, QueryError> {
296 let store = self
299 .db
300 .recovered_store(store_path)
301 .map_err(QueryError::execute)?;
302 let state = store.index_state();
303 if state != IndexState::Ready {
304 return Ok(VisibleIndexes::none());
305 }
306 debug_assert_eq!(state, IndexState::Ready);
307
308 Ok(VisibleIndexes::planner_visible(model.indexes()))
311 }
312
313 #[must_use]
318 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
319 where
320 E: EntityKind<Canister = C>,
321 {
322 self.describe_entity_model(E::MODEL)
323 }
324
325 #[must_use]
327 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
328 describe_entity_model(model)
329 }
330
331 pub fn storage_report(
333 &self,
334 name_to_path: &[(&'static str, &'static str)],
335 ) -> Result<StorageReport, InternalError> {
336 self.db.storage_report(name_to_path)
337 }
338
339 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
341 self.db.storage_report_default()
342 }
343
344 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
346 self.db.integrity_report()
347 }
348
349 pub fn execute_migration_plan(
354 &self,
355 plan: &MigrationPlan,
356 max_steps: usize,
357 ) -> Result<MigrationRunOutcome, InternalError> {
358 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
359 }
360
361 #[must_use]
366 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
367 where
368 E: EntityKind<Canister = C> + EntityValue,
369 {
370 LoadExecutor::new(self.db, self.debug)
371 }
372
373 #[must_use]
374 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
375 where
376 E: PersistedRow<Canister = C> + EntityValue,
377 {
378 DeleteExecutor::new(self.db)
379 }
380
381 #[must_use]
382 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
383 where
384 E: PersistedRow<Canister = C> + EntityValue,
385 {
386 SaveExecutor::new(self.db, self.debug)
387 }
388}
389
390#[doc(hidden)]
395pub fn debug_remove_entity_row_data_only<C, E>(
396 session: &DbSession<C>,
397 key: &E::Key,
398) -> Result<bool, InternalError>
399where
400 C: CanisterKind,
401 E: PersistedRow<Canister = C> + EntityValue,
402{
403 let store = session.db.recovered_store(E::Store::PATH)?;
406
407 let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
410 let raw_key = data_key.to_raw()?;
411 let storage_key = data_key.storage_key();
412
413 let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
417 if !removed {
418 return Ok(false);
419 }
420
421 store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
422
423 Ok(true)
424}
425
426#[doc(hidden)]
432pub fn debug_mark_store_index_state<C>(
433 session: &DbSession<C>,
434 store_path: &str,
435 state: IndexState,
436) -> Result<(), InternalError>
437where
438 C: CanisterKind,
439{
440 let store = session.db.recovered_store(store_path)?;
443
444 match state {
447 IndexState::Building => store.mark_index_building(),
448 IndexState::Ready => store.mark_index_ready(),
449 IndexState::Dropping => store.mark_index_dropping(),
450 }
451
452 Ok(())
453}