1mod query;
7#[cfg(feature = "sql")]
8mod sql;
9#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17 db::{
18 Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19 IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20 PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21 commit::EntityRuntimeHooks,
22 data::DataKey,
23 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
24 query::plan::VisibleIndexes,
25 schema::{
26 describe_entity_model, show_indexes_for_model,
27 show_indexes_for_model_with_runtime_state,
28 },
29 },
30 error::InternalError,
31 metrics::sink::{MetricsSink, with_metrics_sink},
32 model::entity::EntityModel,
33 traits::{CanisterKind, EntityKind, EntityValue, Path},
34 value::Value,
35};
36use std::thread::LocalKey;
37
38#[cfg(all(feature = "sql", feature = "perf-attribution"))]
39pub use sql::{LoweredSqlDispatchExecutorAttribution, SqlProjectionTextExecutorAttribution};
40#[cfg(feature = "sql")]
41pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
42#[cfg(all(feature = "sql", feature = "structural-read-metrics"))]
43pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
44
45pub struct DbSession<C: CanisterKind> {
52 db: Db<C>,
53 debug: bool,
54 metrics: Option<&'static dyn MetricsSink>,
55}
56
57impl<C: CanisterKind> DbSession<C> {
58 #[must_use]
60 pub(crate) const fn new(db: Db<C>) -> Self {
61 Self {
62 db,
63 debug: false,
64 metrics: None,
65 }
66 }
67
68 #[must_use]
70 pub const fn new_with_hooks(
71 store: &'static LocalKey<StoreRegistry>,
72 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
73 ) -> Self {
74 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
75 }
76
77 #[must_use]
79 pub const fn debug(mut self) -> Self {
80 self.debug = true;
81 self
82 }
83
84 #[must_use]
86 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
87 self.metrics = Some(sink);
88 self
89 }
90
91 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
92 if let Some(sink) = self.metrics {
93 with_metrics_sink(sink, f)
94 } else {
95 f()
96 }
97 }
98
99 fn execute_save_with<E, T, R>(
101 &self,
102 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
103 map: impl FnOnce(T) -> R,
104 ) -> Result<R, InternalError>
105 where
106 E: PersistedRow<Canister = C> + EntityValue,
107 {
108 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
109
110 Ok(map(value))
111 }
112
113 fn execute_save_entity<E>(
115 &self,
116 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
117 ) -> Result<E, InternalError>
118 where
119 E: PersistedRow<Canister = C> + EntityValue,
120 {
121 self.execute_save_with(op, std::convert::identity)
122 }
123
124 fn execute_save_batch<E>(
125 &self,
126 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
127 ) -> Result<WriteBatchResponse<E>, InternalError>
128 where
129 E: PersistedRow<Canister = C> + EntityValue,
130 {
131 self.execute_save_with(op, WriteBatchResponse::new)
132 }
133
134 #[must_use]
140 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
141 where
142 E: EntityKind<Canister = C>,
143 {
144 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
145 }
146
147 #[must_use]
149 pub const fn load_with_consistency<E>(
150 &self,
151 consistency: MissingRowPolicy,
152 ) -> FluentLoadQuery<'_, E>
153 where
154 E: EntityKind<Canister = C>,
155 {
156 FluentLoadQuery::new(self, Query::new(consistency))
157 }
158
159 #[must_use]
161 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
162 where
163 E: PersistedRow<Canister = C>,
164 {
165 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
166 }
167
168 #[must_use]
170 pub fn delete_with_consistency<E>(
171 &self,
172 consistency: MissingRowPolicy,
173 ) -> FluentDeleteQuery<'_, E>
174 where
175 E: PersistedRow<Canister = C>,
176 {
177 FluentDeleteQuery::new(self, Query::new(consistency).delete())
178 }
179
180 #[must_use]
184 pub const fn select_one(&self) -> Value {
185 Value::Int(1)
186 }
187
188 #[must_use]
195 pub fn show_indexes<E>(&self) -> Vec<String>
196 where
197 E: EntityKind<Canister = C>,
198 {
199 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
200 }
201
202 #[must_use]
208 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
209 show_indexes_for_model(model)
210 }
211
212 pub(in crate::db) fn show_indexes_for_store_model(
216 &self,
217 store_path: &str,
218 model: &'static EntityModel,
219 ) -> Vec<String> {
220 let runtime_state = self.try_index_state_for_store_path(store_path);
221
222 show_indexes_for_model_with_runtime_state(model, runtime_state)
223 }
224
225 #[must_use]
227 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
228 where
229 E: EntityKind<Canister = C>,
230 {
231 self.show_columns_for_model(E::MODEL)
232 }
233
234 #[must_use]
236 pub fn show_columns_for_model(
237 &self,
238 model: &'static EntityModel,
239 ) -> Vec<EntityFieldDescription> {
240 describe_entity_model(model).fields().to_vec()
241 }
242
243 #[must_use]
245 pub fn show_entities(&self) -> Vec<String> {
246 self.db.runtime_entity_names()
247 }
248
249 fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
254 self.db
255 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
256 .map(|store| store.index_state())
257 }
258
259 fn visible_indexes_for_store_model(
262 &self,
263 store_path: &str,
264 model: &'static EntityModel,
265 ) -> Result<VisibleIndexes<'static>, QueryError> {
266 let store = self
269 .db
270 .recovered_store(store_path)
271 .map_err(QueryError::execute)?;
272 let state = store.index_state();
273 if state != IndexState::Ready {
274 return Ok(VisibleIndexes::none());
275 }
276 debug_assert_eq!(state, IndexState::Ready);
277
278 Ok(VisibleIndexes::planner_visible(model.indexes()))
281 }
282
283 #[must_use]
288 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
289 where
290 E: EntityKind<Canister = C>,
291 {
292 self.describe_entity_model(E::MODEL)
293 }
294
295 #[must_use]
297 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
298 describe_entity_model(model)
299 }
300
301 pub fn storage_report(
303 &self,
304 name_to_path: &[(&'static str, &'static str)],
305 ) -> Result<StorageReport, InternalError> {
306 self.db.storage_report(name_to_path)
307 }
308
309 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
311 self.db.storage_report_default()
312 }
313
314 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
316 self.db.integrity_report()
317 }
318
319 pub fn execute_migration_plan(
324 &self,
325 plan: &MigrationPlan,
326 max_steps: usize,
327 ) -> Result<MigrationRunOutcome, InternalError> {
328 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
329 }
330
331 #[must_use]
336 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
337 where
338 E: EntityKind<Canister = C> + EntityValue,
339 {
340 LoadExecutor::new(self.db, self.debug)
341 }
342
343 #[must_use]
344 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
345 where
346 E: PersistedRow<Canister = C> + EntityValue,
347 {
348 DeleteExecutor::new(self.db)
349 }
350
351 #[must_use]
352 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
353 where
354 E: PersistedRow<Canister = C> + EntityValue,
355 {
356 SaveExecutor::new(self.db, self.debug)
357 }
358}
359
360#[doc(hidden)]
365pub fn debug_remove_entity_row_data_only<C, E>(
366 session: &DbSession<C>,
367 key: &E::Key,
368) -> Result<bool, InternalError>
369where
370 C: CanisterKind,
371 E: PersistedRow<Canister = C> + EntityValue,
372{
373 let store = session.db.recovered_store(E::Store::PATH)?;
376
377 let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
380 let raw_key = data_key.to_raw()?;
381 let storage_key = data_key.storage_key();
382
383 let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
387 if !removed {
388 return Ok(false);
389 }
390
391 store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
392
393 Ok(true)
394}
395
396#[doc(hidden)]
402pub fn debug_mark_store_index_state<C>(
403 session: &DbSession<C>,
404 store_path: &str,
405 state: IndexState,
406) -> Result<(), InternalError>
407where
408 C: CanisterKind,
409{
410 let store = session.db.recovered_store(store_path)?;
413
414 match state {
417 IndexState::Building => store.mark_index_building(),
418 IndexState::Ready => store.mark_index_ready(),
419 IndexState::Dropping => store.mark_index_dropping(),
420 }
421
422 Ok(())
423}