1mod query;
7#[cfg(feature = "sql")]
8mod sql;
9#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17 db::{
18 Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19 IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20 PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21 commit::EntityRuntimeHooks,
22 data::DataKey,
23 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
24 query::plan::VisibleIndexes,
25 schema::{
26 describe_entity_model, show_indexes_for_model,
27 show_indexes_for_model_with_runtime_state,
28 },
29 },
30 error::InternalError,
31 metrics::sink::{MetricsSink, with_metrics_sink},
32 model::entity::EntityModel,
33 traits::{CanisterKind, EntityKind, EntityValue, Path},
34 value::Value,
35};
36use std::thread::LocalKey;
37
38#[cfg(feature = "sql")]
39pub use sql::SqlStatementResult;
40#[cfg(all(feature = "sql", feature = "structural-read-metrics"))]
41pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
42
43pub struct DbSession<C: CanisterKind> {
50 db: Db<C>,
51 debug: bool,
52 metrics: Option<&'static dyn MetricsSink>,
53}
54
55impl<C: CanisterKind> DbSession<C> {
56 #[must_use]
58 pub(crate) const fn new(db: Db<C>) -> Self {
59 Self {
60 db,
61 debug: false,
62 metrics: None,
63 }
64 }
65
66 #[must_use]
68 pub const fn new_with_hooks(
69 store: &'static LocalKey<StoreRegistry>,
70 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
71 ) -> Self {
72 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
73 }
74
75 #[must_use]
77 pub const fn debug(mut self) -> Self {
78 self.debug = true;
79 self
80 }
81
82 #[must_use]
84 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
85 self.metrics = Some(sink);
86 self
87 }
88
89 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
90 if let Some(sink) = self.metrics {
91 with_metrics_sink(sink, f)
92 } else {
93 f()
94 }
95 }
96
97 fn execute_save_with<E, T, R>(
99 &self,
100 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
101 map: impl FnOnce(T) -> R,
102 ) -> Result<R, InternalError>
103 where
104 E: PersistedRow<Canister = C> + EntityValue,
105 {
106 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
107
108 Ok(map(value))
109 }
110
111 fn execute_save_entity<E>(
113 &self,
114 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
115 ) -> Result<E, InternalError>
116 where
117 E: PersistedRow<Canister = C> + EntityValue,
118 {
119 self.execute_save_with(op, std::convert::identity)
120 }
121
122 fn execute_save_batch<E>(
123 &self,
124 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
125 ) -> Result<WriteBatchResponse<E>, InternalError>
126 where
127 E: PersistedRow<Canister = C> + EntityValue,
128 {
129 self.execute_save_with(op, WriteBatchResponse::new)
130 }
131
132 #[must_use]
138 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
139 where
140 E: EntityKind<Canister = C>,
141 {
142 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
143 }
144
145 #[must_use]
147 pub const fn load_with_consistency<E>(
148 &self,
149 consistency: MissingRowPolicy,
150 ) -> FluentLoadQuery<'_, E>
151 where
152 E: EntityKind<Canister = C>,
153 {
154 FluentLoadQuery::new(self, Query::new(consistency))
155 }
156
157 #[must_use]
159 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
160 where
161 E: PersistedRow<Canister = C>,
162 {
163 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
164 }
165
166 #[must_use]
168 pub fn delete_with_consistency<E>(
169 &self,
170 consistency: MissingRowPolicy,
171 ) -> FluentDeleteQuery<'_, E>
172 where
173 E: PersistedRow<Canister = C>,
174 {
175 FluentDeleteQuery::new(self, Query::new(consistency).delete())
176 }
177
178 #[must_use]
182 pub const fn select_one(&self) -> Value {
183 Value::Int(1)
184 }
185
186 #[must_use]
193 pub fn show_indexes<E>(&self) -> Vec<String>
194 where
195 E: EntityKind<Canister = C>,
196 {
197 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
198 }
199
200 #[must_use]
206 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
207 show_indexes_for_model(model)
208 }
209
210 pub(in crate::db) fn show_indexes_for_store_model(
214 &self,
215 store_path: &str,
216 model: &'static EntityModel,
217 ) -> Vec<String> {
218 let runtime_state = self.try_index_state_for_store_path(store_path);
219
220 show_indexes_for_model_with_runtime_state(model, runtime_state)
221 }
222
223 #[must_use]
225 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
226 where
227 E: EntityKind<Canister = C>,
228 {
229 self.show_columns_for_model(E::MODEL)
230 }
231
232 #[must_use]
234 pub fn show_columns_for_model(
235 &self,
236 model: &'static EntityModel,
237 ) -> Vec<EntityFieldDescription> {
238 describe_entity_model(model).fields().to_vec()
239 }
240
241 #[must_use]
243 pub fn show_entities(&self) -> Vec<String> {
244 self.db.runtime_entity_names()
245 }
246
247 #[must_use]
252 pub fn show_tables(&self) -> Vec<String> {
253 self.show_entities()
254 }
255
256 fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
261 self.db
262 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
263 .map(|store| store.index_state())
264 }
265
266 fn visible_indexes_for_store_model(
269 &self,
270 store_path: &str,
271 model: &'static EntityModel,
272 ) -> Result<VisibleIndexes<'static>, QueryError> {
273 let store = self
276 .db
277 .recovered_store(store_path)
278 .map_err(QueryError::execute)?;
279 let state = store.index_state();
280 if state != IndexState::Ready {
281 return Ok(VisibleIndexes::none());
282 }
283 debug_assert_eq!(state, IndexState::Ready);
284
285 Ok(VisibleIndexes::planner_visible(model.indexes()))
288 }
289
290 #[must_use]
295 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
296 where
297 E: EntityKind<Canister = C>,
298 {
299 self.describe_entity_model(E::MODEL)
300 }
301
302 #[must_use]
304 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
305 describe_entity_model(model)
306 }
307
308 pub fn storage_report(
310 &self,
311 name_to_path: &[(&'static str, &'static str)],
312 ) -> Result<StorageReport, InternalError> {
313 self.db.storage_report(name_to_path)
314 }
315
316 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
318 self.db.storage_report_default()
319 }
320
321 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
323 self.db.integrity_report()
324 }
325
326 pub fn execute_migration_plan(
331 &self,
332 plan: &MigrationPlan,
333 max_steps: usize,
334 ) -> Result<MigrationRunOutcome, InternalError> {
335 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
336 }
337
338 #[must_use]
343 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
344 where
345 E: EntityKind<Canister = C> + EntityValue,
346 {
347 LoadExecutor::new(self.db, self.debug)
348 }
349
350 #[must_use]
351 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
352 where
353 E: PersistedRow<Canister = C> + EntityValue,
354 {
355 DeleteExecutor::new(self.db)
356 }
357
358 #[must_use]
359 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
360 where
361 E: PersistedRow<Canister = C> + EntityValue,
362 {
363 SaveExecutor::new(self.db, self.debug)
364 }
365}
366
367#[doc(hidden)]
372pub fn debug_remove_entity_row_data_only<C, E>(
373 session: &DbSession<C>,
374 key: &E::Key,
375) -> Result<bool, InternalError>
376where
377 C: CanisterKind,
378 E: PersistedRow<Canister = C> + EntityValue,
379{
380 let store = session.db.recovered_store(E::Store::PATH)?;
383
384 let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
387 let raw_key = data_key.to_raw()?;
388 let storage_key = data_key.storage_key();
389
390 let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
394 if !removed {
395 return Ok(false);
396 }
397
398 store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
399
400 Ok(true)
401}
402
403#[doc(hidden)]
409pub fn debug_mark_store_index_state<C>(
410 session: &DbSession<C>,
411 store_path: &str,
412 state: IndexState,
413) -> Result<(), InternalError>
414where
415 C: CanisterKind,
416{
417 let store = session.db.recovered_store(store_path)?;
420
421 match state {
424 IndexState::Building => store.mark_index_building(),
425 IndexState::Ready => store.mark_index_ready(),
426 IndexState::Dropping => store.mark_index_dropping(),
427 }
428
429 Ok(())
430}