1mod query;
7#[cfg(feature = "sql")]
8mod sql;
9#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17 db::{
18 Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19 IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20 PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21 commit::EntityRuntimeHooks,
22 data::DataKey,
23 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
24 query::plan::VisibleIndexes,
25 schema::{
26 describe_entity_model, show_indexes_for_model,
27 show_indexes_for_model_with_runtime_state,
28 },
29 },
30 error::InternalError,
31 metrics::sink::{MetricsSink, with_metrics_sink},
32 model::entity::EntityModel,
33 traits::{CanisterKind, EntityKind, EntityValue, Path},
34 value::Value,
35};
36use std::thread::LocalKey;
37
38#[cfg(all(feature = "sql", feature = "perf-attribution"))]
39pub use sql::{LoweredSqlDispatchExecutorAttribution, SqlProjectionTextExecutorAttribution};
40#[cfg(feature = "sql")]
41pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
42
43pub struct DbSession<C: CanisterKind> {
50 db: Db<C>,
51 debug: bool,
52 metrics: Option<&'static dyn MetricsSink>,
53}
54
55impl<C: CanisterKind> DbSession<C> {
56 #[must_use]
58 pub(crate) const fn new(db: Db<C>) -> Self {
59 Self {
60 db,
61 debug: false,
62 metrics: None,
63 }
64 }
65
66 #[must_use]
68 pub const fn new_with_hooks(
69 store: &'static LocalKey<StoreRegistry>,
70 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
71 ) -> Self {
72 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
73 }
74
75 #[must_use]
77 pub const fn debug(mut self) -> Self {
78 self.debug = true;
79 self
80 }
81
82 #[must_use]
84 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
85 self.metrics = Some(sink);
86 self
87 }
88
89 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
90 if let Some(sink) = self.metrics {
91 with_metrics_sink(sink, f)
92 } else {
93 f()
94 }
95 }
96
97 fn execute_save_with<E, T, R>(
99 &self,
100 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
101 map: impl FnOnce(T) -> R,
102 ) -> Result<R, InternalError>
103 where
104 E: PersistedRow<Canister = C> + EntityValue,
105 {
106 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
107
108 Ok(map(value))
109 }
110
111 fn execute_save_entity<E>(
113 &self,
114 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
115 ) -> Result<E, InternalError>
116 where
117 E: PersistedRow<Canister = C> + EntityValue,
118 {
119 self.execute_save_with(op, std::convert::identity)
120 }
121
122 fn execute_save_batch<E>(
123 &self,
124 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
125 ) -> Result<WriteBatchResponse<E>, InternalError>
126 where
127 E: PersistedRow<Canister = C> + EntityValue,
128 {
129 self.execute_save_with(op, WriteBatchResponse::new)
130 }
131
132 #[must_use]
138 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
139 where
140 E: EntityKind<Canister = C>,
141 {
142 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
143 }
144
145 #[must_use]
147 pub const fn load_with_consistency<E>(
148 &self,
149 consistency: MissingRowPolicy,
150 ) -> FluentLoadQuery<'_, E>
151 where
152 E: EntityKind<Canister = C>,
153 {
154 FluentLoadQuery::new(self, Query::new(consistency))
155 }
156
157 #[must_use]
159 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
160 where
161 E: PersistedRow<Canister = C>,
162 {
163 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
164 }
165
166 #[must_use]
168 pub fn delete_with_consistency<E>(
169 &self,
170 consistency: MissingRowPolicy,
171 ) -> FluentDeleteQuery<'_, E>
172 where
173 E: PersistedRow<Canister = C>,
174 {
175 FluentDeleteQuery::new(self, Query::new(consistency).delete())
176 }
177
178 #[must_use]
182 pub const fn select_one(&self) -> Value {
183 Value::Int(1)
184 }
185
186 #[must_use]
193 pub fn show_indexes<E>(&self) -> Vec<String>
194 where
195 E: EntityKind<Canister = C>,
196 {
197 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
198 }
199
200 #[must_use]
206 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
207 show_indexes_for_model(model)
208 }
209
210 pub(in crate::db) fn show_indexes_for_store_model(
214 &self,
215 store_path: &str,
216 model: &'static EntityModel,
217 ) -> Vec<String> {
218 let runtime_state = self.try_index_state_for_store_path(store_path);
219
220 show_indexes_for_model_with_runtime_state(model, runtime_state)
221 }
222
223 #[must_use]
225 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
226 where
227 E: EntityKind<Canister = C>,
228 {
229 self.show_columns_for_model(E::MODEL)
230 }
231
232 #[must_use]
234 pub fn show_columns_for_model(
235 &self,
236 model: &'static EntityModel,
237 ) -> Vec<EntityFieldDescription> {
238 describe_entity_model(model).fields().to_vec()
239 }
240
241 #[must_use]
243 pub fn show_entities(&self) -> Vec<String> {
244 self.db.runtime_entity_names()
245 }
246
247 fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
252 self.db
253 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
254 .map(|store| store.index_state())
255 }
256
257 fn visible_indexes_for_store_model(
260 &self,
261 store_path: &str,
262 model: &'static EntityModel,
263 ) -> Result<VisibleIndexes<'static>, QueryError> {
264 let store = self
267 .db
268 .recovered_store(store_path)
269 .map_err(QueryError::execute)?;
270 let state = store.index_state();
271 if state != IndexState::Ready {
272 return Ok(VisibleIndexes::none());
273 }
274 debug_assert_eq!(state, IndexState::Ready);
275
276 Ok(VisibleIndexes::planner_visible(model.indexes()))
279 }
280
281 #[must_use]
286 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
287 where
288 E: EntityKind<Canister = C>,
289 {
290 self.describe_entity_model(E::MODEL)
291 }
292
293 #[must_use]
295 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
296 describe_entity_model(model)
297 }
298
299 pub fn storage_report(
301 &self,
302 name_to_path: &[(&'static str, &'static str)],
303 ) -> Result<StorageReport, InternalError> {
304 self.db.storage_report(name_to_path)
305 }
306
307 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
309 self.db.storage_report_default()
310 }
311
312 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
314 self.db.integrity_report()
315 }
316
317 pub fn execute_migration_plan(
322 &self,
323 plan: &MigrationPlan,
324 max_steps: usize,
325 ) -> Result<MigrationRunOutcome, InternalError> {
326 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
327 }
328
329 #[must_use]
334 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
335 where
336 E: EntityKind<Canister = C> + EntityValue,
337 {
338 LoadExecutor::new(self.db, self.debug)
339 }
340
341 #[must_use]
342 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
343 where
344 E: PersistedRow<Canister = C> + EntityValue,
345 {
346 DeleteExecutor::new(self.db)
347 }
348
349 #[must_use]
350 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
351 where
352 E: PersistedRow<Canister = C> + EntityValue,
353 {
354 SaveExecutor::new(self.db, self.debug)
355 }
356}
357
358#[doc(hidden)]
363pub fn debug_remove_entity_row_data_only<C, E>(
364 session: &DbSession<C>,
365 key: &E::Key,
366) -> Result<bool, InternalError>
367where
368 C: CanisterKind,
369 E: PersistedRow<Canister = C> + EntityValue,
370{
371 let store = session.db.recovered_store(E::Store::PATH)?;
374
375 let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
378 let raw_key = data_key.to_raw()?;
379 let storage_key = data_key.storage_key();
380
381 let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
385 if !removed {
386 return Ok(false);
387 }
388
389 store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
390
391 Ok(true)
392}
393
394#[doc(hidden)]
400pub fn debug_mark_store_index_state<C>(
401 session: &DbSession<C>,
402 store_path: &str,
403 state: IndexState,
404) -> Result<(), InternalError>
405where
406 C: CanisterKind,
407{
408 let store = session.db.recovered_store(store_path)?;
411
412 match state {
415 IndexState::Building => store.mark_index_building(),
416 IndexState::Ready => store.mark_index_ready(),
417 IndexState::Dropping => store.mark_index_dropping(),
418 }
419
420 Ok(())
421}