1mod query;
7#[cfg(feature = "sql")]
8mod sql;
9#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17 db::{
18 Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19 IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20 PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21 commit::EntityRuntimeHooks,
22 data::DataKey,
23 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
24 query::plan::VisibleIndexes,
25 schema::{
26 describe_entity_model, show_indexes_for_model,
27 show_indexes_for_model_with_runtime_state,
28 },
29 },
30 error::InternalError,
31 metrics::sink::{MetricsSink, with_metrics_sink},
32 model::entity::EntityModel,
33 traits::{CanisterKind, EntityKind, EntityValue, Path},
34 value::Value,
35};
36use std::{cell::OnceCell, thread::LocalKey};
37
38#[cfg(feature = "perf-attribution")]
39pub use query::QueryExecutionAttribution;
40#[cfg(all(feature = "sql", feature = "perf-attribution"))]
41pub use sql::SqlQueryExecutionAttribution;
42#[cfg(feature = "sql")]
43pub use sql::SqlStatementResult;
44#[cfg(all(feature = "sql", feature = "structural-read-metrics"))]
45pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
46
47pub struct DbSession<C: CanisterKind> {
54 db: Db<C>,
55 debug: bool,
56 metrics: Option<&'static dyn MetricsSink>,
57 query_plan_cache: OnceCell<std::cell::RefCell<query::QueryPlanCache>>,
58 #[cfg(feature = "sql")]
59 sql_compiled_command_cache: OnceCell<std::cell::RefCell<sql::SqlCompiledCommandCache>>,
60 #[cfg(feature = "sql")]
61 sql_select_plan_cache: OnceCell<std::cell::RefCell<sql::SqlSelectPlanCache>>,
62}
63
64impl<C: CanisterKind> DbSession<C> {
65 #[must_use]
67 pub(crate) const fn new(db: Db<C>) -> Self {
68 Self {
69 db,
70 debug: false,
71 metrics: None,
72 query_plan_cache: OnceCell::new(),
73 #[cfg(feature = "sql")]
74 sql_compiled_command_cache: OnceCell::new(),
75 #[cfg(feature = "sql")]
76 sql_select_plan_cache: OnceCell::new(),
77 }
78 }
79
80 #[must_use]
82 pub const fn new_with_hooks(
83 store: &'static LocalKey<StoreRegistry>,
84 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
85 ) -> Self {
86 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
87 }
88
89 #[must_use]
91 pub const fn debug(mut self) -> Self {
92 self.debug = true;
93 self
94 }
95
96 #[must_use]
98 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
99 self.metrics = Some(sink);
100 self
101 }
102
103 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
104 if let Some(sink) = self.metrics {
105 with_metrics_sink(sink, f)
106 } else {
107 f()
108 }
109 }
110
111 fn execute_save_with<E, T, R>(
113 &self,
114 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
115 map: impl FnOnce(T) -> R,
116 ) -> Result<R, InternalError>
117 where
118 E: PersistedRow<Canister = C> + EntityValue,
119 {
120 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
121
122 Ok(map(value))
123 }
124
125 fn execute_save_entity<E>(
127 &self,
128 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
129 ) -> Result<E, InternalError>
130 where
131 E: PersistedRow<Canister = C> + EntityValue,
132 {
133 self.execute_save_with(op, std::convert::identity)
134 }
135
136 fn execute_save_batch<E>(
137 &self,
138 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
139 ) -> Result<WriteBatchResponse<E>, InternalError>
140 where
141 E: PersistedRow<Canister = C> + EntityValue,
142 {
143 self.execute_save_with(op, WriteBatchResponse::new)
144 }
145
146 #[must_use]
152 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
153 where
154 E: EntityKind<Canister = C>,
155 {
156 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
157 }
158
159 #[must_use]
161 pub const fn load_with_consistency<E>(
162 &self,
163 consistency: MissingRowPolicy,
164 ) -> FluentLoadQuery<'_, E>
165 where
166 E: EntityKind<Canister = C>,
167 {
168 FluentLoadQuery::new(self, Query::new(consistency))
169 }
170
171 #[must_use]
173 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
174 where
175 E: PersistedRow<Canister = C>,
176 {
177 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
178 }
179
180 #[must_use]
182 pub fn delete_with_consistency<E>(
183 &self,
184 consistency: MissingRowPolicy,
185 ) -> FluentDeleteQuery<'_, E>
186 where
187 E: PersistedRow<Canister = C>,
188 {
189 FluentDeleteQuery::new(self, Query::new(consistency).delete())
190 }
191
192 #[must_use]
196 pub const fn select_one(&self) -> Value {
197 Value::Int(1)
198 }
199
200 #[must_use]
207 pub fn show_indexes<E>(&self) -> Vec<String>
208 where
209 E: EntityKind<Canister = C>,
210 {
211 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
212 }
213
214 #[must_use]
220 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
221 show_indexes_for_model(model)
222 }
223
224 pub(in crate::db) fn show_indexes_for_store_model(
228 &self,
229 store_path: &str,
230 model: &'static EntityModel,
231 ) -> Vec<String> {
232 let runtime_state = self.try_index_state_for_store_path(store_path);
233
234 show_indexes_for_model_with_runtime_state(model, runtime_state)
235 }
236
237 #[must_use]
239 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
240 where
241 E: EntityKind<Canister = C>,
242 {
243 self.show_columns_for_model(E::MODEL)
244 }
245
246 #[must_use]
248 pub fn show_columns_for_model(
249 &self,
250 model: &'static EntityModel,
251 ) -> Vec<EntityFieldDescription> {
252 describe_entity_model(model).fields().to_vec()
253 }
254
255 #[must_use]
257 pub fn show_entities(&self) -> Vec<String> {
258 self.db.runtime_entity_names()
259 }
260
261 #[must_use]
266 pub fn show_tables(&self) -> Vec<String> {
267 self.show_entities()
268 }
269
270 fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
275 self.db
276 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
277 .map(|store| store.index_state())
278 }
279
280 fn visible_indexes_for_store_model(
283 &self,
284 store_path: &str,
285 model: &'static EntityModel,
286 ) -> Result<VisibleIndexes<'static>, QueryError> {
287 let store = self
290 .db
291 .recovered_store(store_path)
292 .map_err(QueryError::execute)?;
293 let state = store.index_state();
294 if state != IndexState::Ready {
295 return Ok(VisibleIndexes::none());
296 }
297 debug_assert_eq!(state, IndexState::Ready);
298
299 Ok(VisibleIndexes::planner_visible(model.indexes()))
302 }
303
304 #[must_use]
309 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
310 where
311 E: EntityKind<Canister = C>,
312 {
313 self.describe_entity_model(E::MODEL)
314 }
315
316 #[must_use]
318 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
319 describe_entity_model(model)
320 }
321
322 pub fn storage_report(
324 &self,
325 name_to_path: &[(&'static str, &'static str)],
326 ) -> Result<StorageReport, InternalError> {
327 self.db.storage_report(name_to_path)
328 }
329
330 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
332 self.db.storage_report_default()
333 }
334
335 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
337 self.db.integrity_report()
338 }
339
340 pub fn execute_migration_plan(
345 &self,
346 plan: &MigrationPlan,
347 max_steps: usize,
348 ) -> Result<MigrationRunOutcome, InternalError> {
349 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
350 }
351
352 #[must_use]
357 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
358 where
359 E: EntityKind<Canister = C> + EntityValue,
360 {
361 LoadExecutor::new(self.db, self.debug)
362 }
363
364 #[must_use]
365 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
366 where
367 E: PersistedRow<Canister = C> + EntityValue,
368 {
369 DeleteExecutor::new(self.db)
370 }
371
372 #[must_use]
373 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
374 where
375 E: PersistedRow<Canister = C> + EntityValue,
376 {
377 SaveExecutor::new(self.db, self.debug)
378 }
379}
380
381#[doc(hidden)]
386pub fn debug_remove_entity_row_data_only<C, E>(
387 session: &DbSession<C>,
388 key: &E::Key,
389) -> Result<bool, InternalError>
390where
391 C: CanisterKind,
392 E: PersistedRow<Canister = C> + EntityValue,
393{
394 let store = session.db.recovered_store(E::Store::PATH)?;
397
398 let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
401 let raw_key = data_key.to_raw()?;
402 let storage_key = data_key.storage_key();
403
404 let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
408 if !removed {
409 return Ok(false);
410 }
411
412 store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
413
414 Ok(true)
415}
416
417#[doc(hidden)]
423pub fn debug_mark_store_index_state<C>(
424 session: &DbSession<C>,
425 store_path: &str,
426 state: IndexState,
427) -> Result<(), InternalError>
428where
429 C: CanisterKind,
430{
431 let store = session.db.recovered_store(store_path)?;
434
435 match state {
438 IndexState::Building => store.mark_index_building(),
439 IndexState::Ready => store.mark_index_ready(),
440 IndexState::Dropping => store.mark_index_dropping(),
441 }
442
443 Ok(())
444}