1mod query;
7#[cfg(feature = "sql")]
8mod sql;
9#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17 db::{
18 Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19 IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20 PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21 commit::EntityRuntimeHooks,
22 data::DataKey,
23 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
24 query::plan::VisibleIndexes,
25 schema::{
26 describe_entity_model, show_indexes_for_model,
27 show_indexes_for_model_with_runtime_state,
28 },
29 },
30 error::InternalError,
31 metrics::sink::{MetricsSink, with_metrics_sink},
32 model::entity::EntityModel,
33 traits::{CanisterKind, EntityKind, EntityValue, Path},
34 value::Value,
35};
36#[cfg(feature = "sql")]
37use std::cell::OnceCell;
38use std::thread::LocalKey;
39
40#[cfg(all(feature = "sql", feature = "perf-attribution"))]
41pub use sql::SqlQueryExecutionAttribution;
42#[cfg(feature = "sql")]
43pub use sql::SqlStatementResult;
44#[cfg(all(feature = "sql", feature = "structural-read-metrics"))]
45pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
46
47pub struct DbSession<C: CanisterKind> {
54 db: Db<C>,
55 debug: bool,
56 metrics: Option<&'static dyn MetricsSink>,
57 #[cfg(feature = "sql")]
58 sql_compiled_command_cache: OnceCell<std::cell::RefCell<sql::SqlCompiledCommandCache>>,
59}
60
61impl<C: CanisterKind> DbSession<C> {
62 #[must_use]
64 pub(crate) const fn new(db: Db<C>) -> Self {
65 Self {
66 db,
67 debug: false,
68 metrics: None,
69 #[cfg(feature = "sql")]
70 sql_compiled_command_cache: OnceCell::new(),
71 }
72 }
73
74 #[must_use]
76 pub const fn new_with_hooks(
77 store: &'static LocalKey<StoreRegistry>,
78 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
79 ) -> Self {
80 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
81 }
82
83 #[must_use]
85 pub const fn debug(mut self) -> Self {
86 self.debug = true;
87 self
88 }
89
90 #[must_use]
92 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
93 self.metrics = Some(sink);
94 self
95 }
96
97 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
98 if let Some(sink) = self.metrics {
99 with_metrics_sink(sink, f)
100 } else {
101 f()
102 }
103 }
104
105 fn execute_save_with<E, T, R>(
107 &self,
108 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
109 map: impl FnOnce(T) -> R,
110 ) -> Result<R, InternalError>
111 where
112 E: PersistedRow<Canister = C> + EntityValue,
113 {
114 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
115
116 Ok(map(value))
117 }
118
119 fn execute_save_entity<E>(
121 &self,
122 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
123 ) -> Result<E, InternalError>
124 where
125 E: PersistedRow<Canister = C> + EntityValue,
126 {
127 self.execute_save_with(op, std::convert::identity)
128 }
129
130 fn execute_save_batch<E>(
131 &self,
132 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
133 ) -> Result<WriteBatchResponse<E>, InternalError>
134 where
135 E: PersistedRow<Canister = C> + EntityValue,
136 {
137 self.execute_save_with(op, WriteBatchResponse::new)
138 }
139
140 #[must_use]
146 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
147 where
148 E: EntityKind<Canister = C>,
149 {
150 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
151 }
152
153 #[must_use]
155 pub const fn load_with_consistency<E>(
156 &self,
157 consistency: MissingRowPolicy,
158 ) -> FluentLoadQuery<'_, E>
159 where
160 E: EntityKind<Canister = C>,
161 {
162 FluentLoadQuery::new(self, Query::new(consistency))
163 }
164
165 #[must_use]
167 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
168 where
169 E: PersistedRow<Canister = C>,
170 {
171 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
172 }
173
174 #[must_use]
176 pub fn delete_with_consistency<E>(
177 &self,
178 consistency: MissingRowPolicy,
179 ) -> FluentDeleteQuery<'_, E>
180 where
181 E: PersistedRow<Canister = C>,
182 {
183 FluentDeleteQuery::new(self, Query::new(consistency).delete())
184 }
185
186 #[must_use]
190 pub const fn select_one(&self) -> Value {
191 Value::Int(1)
192 }
193
194 #[must_use]
201 pub fn show_indexes<E>(&self) -> Vec<String>
202 where
203 E: EntityKind<Canister = C>,
204 {
205 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
206 }
207
208 #[must_use]
214 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
215 show_indexes_for_model(model)
216 }
217
218 pub(in crate::db) fn show_indexes_for_store_model(
222 &self,
223 store_path: &str,
224 model: &'static EntityModel,
225 ) -> Vec<String> {
226 let runtime_state = self.try_index_state_for_store_path(store_path);
227
228 show_indexes_for_model_with_runtime_state(model, runtime_state)
229 }
230
231 #[must_use]
233 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
234 where
235 E: EntityKind<Canister = C>,
236 {
237 self.show_columns_for_model(E::MODEL)
238 }
239
240 #[must_use]
242 pub fn show_columns_for_model(
243 &self,
244 model: &'static EntityModel,
245 ) -> Vec<EntityFieldDescription> {
246 describe_entity_model(model).fields().to_vec()
247 }
248
249 #[must_use]
251 pub fn show_entities(&self) -> Vec<String> {
252 self.db.runtime_entity_names()
253 }
254
255 #[must_use]
260 pub fn show_tables(&self) -> Vec<String> {
261 self.show_entities()
262 }
263
264 fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
269 self.db
270 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
271 .map(|store| store.index_state())
272 }
273
274 fn visible_indexes_for_store_model(
277 &self,
278 store_path: &str,
279 model: &'static EntityModel,
280 ) -> Result<VisibleIndexes<'static>, QueryError> {
281 let store = self
284 .db
285 .recovered_store(store_path)
286 .map_err(QueryError::execute)?;
287 let state = store.index_state();
288 if state != IndexState::Ready {
289 return Ok(VisibleIndexes::none());
290 }
291 debug_assert_eq!(state, IndexState::Ready);
292
293 Ok(VisibleIndexes::planner_visible(model.indexes()))
296 }
297
298 #[must_use]
303 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
304 where
305 E: EntityKind<Canister = C>,
306 {
307 self.describe_entity_model(E::MODEL)
308 }
309
310 #[must_use]
312 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
313 describe_entity_model(model)
314 }
315
316 pub fn storage_report(
318 &self,
319 name_to_path: &[(&'static str, &'static str)],
320 ) -> Result<StorageReport, InternalError> {
321 self.db.storage_report(name_to_path)
322 }
323
324 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
326 self.db.storage_report_default()
327 }
328
329 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
331 self.db.integrity_report()
332 }
333
334 pub fn execute_migration_plan(
339 &self,
340 plan: &MigrationPlan,
341 max_steps: usize,
342 ) -> Result<MigrationRunOutcome, InternalError> {
343 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
344 }
345
346 #[must_use]
351 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
352 where
353 E: EntityKind<Canister = C> + EntityValue,
354 {
355 LoadExecutor::new(self.db, self.debug)
356 }
357
358 #[must_use]
359 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
360 where
361 E: PersistedRow<Canister = C> + EntityValue,
362 {
363 DeleteExecutor::new(self.db)
364 }
365
366 #[must_use]
367 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
368 where
369 E: PersistedRow<Canister = C> + EntityValue,
370 {
371 SaveExecutor::new(self.db, self.debug)
372 }
373}
374
375#[doc(hidden)]
380pub fn debug_remove_entity_row_data_only<C, E>(
381 session: &DbSession<C>,
382 key: &E::Key,
383) -> Result<bool, InternalError>
384where
385 C: CanisterKind,
386 E: PersistedRow<Canister = C> + EntityValue,
387{
388 let store = session.db.recovered_store(E::Store::PATH)?;
391
392 let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
395 let raw_key = data_key.to_raw()?;
396 let storage_key = data_key.storage_key();
397
398 let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
402 if !removed {
403 return Ok(false);
404 }
405
406 store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
407
408 Ok(true)
409}
410
411#[doc(hidden)]
417pub fn debug_mark_store_index_state<C>(
418 session: &DbSession<C>,
419 store_path: &str,
420 state: IndexState,
421) -> Result<(), InternalError>
422where
423 C: CanisterKind,
424{
425 let store = session.db.recovered_store(store_path)?;
428
429 match state {
432 IndexState::Building => store.mark_index_building(),
433 IndexState::Ready => store.mark_index_ready(),
434 IndexState::Dropping => store.mark_index_dropping(),
435 }
436
437 Ok(())
438}