1mod query;
7#[cfg(feature = "sql")]
8mod sql;
9#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17 db::{
18 Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19 IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20 PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21 commit::EntityRuntimeHooks,
22 data::DataKey,
23 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
24 query::plan::VisibleIndexes,
25 schema::{
26 describe_entity_model, show_indexes_for_model,
27 show_indexes_for_model_with_runtime_state,
28 },
29 },
30 error::InternalError,
31 metrics::sink::{MetricsSink, with_metrics_sink},
32 model::entity::EntityModel,
33 traits::{CanisterKind, EntityKind, EntityValue, Path},
34 value::Value,
35};
36use std::thread::LocalKey;
37
38#[cfg(feature = "sql")]
39pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
40
41pub struct DbSession<C: CanisterKind> {
48 db: Db<C>,
49 debug: bool,
50 metrics: Option<&'static dyn MetricsSink>,
51}
52
53impl<C: CanisterKind> DbSession<C> {
54 #[must_use]
56 pub(crate) const fn new(db: Db<C>) -> Self {
57 Self {
58 db,
59 debug: false,
60 metrics: None,
61 }
62 }
63
64 #[must_use]
66 pub const fn new_with_hooks(
67 store: &'static LocalKey<StoreRegistry>,
68 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
69 ) -> Self {
70 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
71 }
72
73 #[must_use]
75 pub const fn debug(mut self) -> Self {
76 self.debug = true;
77 self
78 }
79
80 #[must_use]
82 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
83 self.metrics = Some(sink);
84 self
85 }
86
87 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
88 if let Some(sink) = self.metrics {
89 with_metrics_sink(sink, f)
90 } else {
91 f()
92 }
93 }
94
95 fn execute_save_with<E, T, R>(
97 &self,
98 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
99 map: impl FnOnce(T) -> R,
100 ) -> Result<R, InternalError>
101 where
102 E: PersistedRow<Canister = C> + EntityValue,
103 {
104 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
105
106 Ok(map(value))
107 }
108
109 fn execute_save_entity<E>(
111 &self,
112 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
113 ) -> Result<E, InternalError>
114 where
115 E: PersistedRow<Canister = C> + EntityValue,
116 {
117 self.execute_save_with(op, std::convert::identity)
118 }
119
120 fn execute_save_batch<E>(
121 &self,
122 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
123 ) -> Result<WriteBatchResponse<E>, InternalError>
124 where
125 E: PersistedRow<Canister = C> + EntityValue,
126 {
127 self.execute_save_with(op, WriteBatchResponse::new)
128 }
129
130 #[must_use]
136 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
137 where
138 E: EntityKind<Canister = C>,
139 {
140 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
141 }
142
143 #[must_use]
145 pub const fn load_with_consistency<E>(
146 &self,
147 consistency: MissingRowPolicy,
148 ) -> FluentLoadQuery<'_, E>
149 where
150 E: EntityKind<Canister = C>,
151 {
152 FluentLoadQuery::new(self, Query::new(consistency))
153 }
154
155 #[must_use]
157 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
158 where
159 E: PersistedRow<Canister = C>,
160 {
161 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
162 }
163
164 #[must_use]
166 pub fn delete_with_consistency<E>(
167 &self,
168 consistency: MissingRowPolicy,
169 ) -> FluentDeleteQuery<'_, E>
170 where
171 E: PersistedRow<Canister = C>,
172 {
173 FluentDeleteQuery::new(self, Query::new(consistency).delete())
174 }
175
176 #[must_use]
180 pub const fn select_one(&self) -> Value {
181 Value::Int(1)
182 }
183
184 #[must_use]
191 pub fn show_indexes<E>(&self) -> Vec<String>
192 where
193 E: EntityKind<Canister = C>,
194 {
195 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
196 }
197
198 #[must_use]
204 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
205 show_indexes_for_model(model)
206 }
207
208 pub(in crate::db) fn show_indexes_for_store_model(
212 &self,
213 store_path: &str,
214 model: &'static EntityModel,
215 ) -> Vec<String> {
216 let runtime_state = self.try_index_state_for_store_path(store_path);
217
218 show_indexes_for_model_with_runtime_state(model, runtime_state)
219 }
220
221 #[must_use]
223 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
224 where
225 E: EntityKind<Canister = C>,
226 {
227 self.show_columns_for_model(E::MODEL)
228 }
229
230 #[must_use]
232 pub fn show_columns_for_model(
233 &self,
234 model: &'static EntityModel,
235 ) -> Vec<EntityFieldDescription> {
236 describe_entity_model(model).fields().to_vec()
237 }
238
239 #[must_use]
241 pub fn show_entities(&self) -> Vec<String> {
242 self.db.runtime_entity_names()
243 }
244
245 fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
250 self.db
251 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
252 .map(|store| store.index_state())
253 }
254
255 fn visible_indexes_for_store_model(
258 &self,
259 store_path: &str,
260 model: &'static EntityModel,
261 ) -> Result<VisibleIndexes<'static>, QueryError> {
262 let store = self
265 .db
266 .recovered_store(store_path)
267 .map_err(QueryError::execute)?;
268 let state = store.index_state();
269 if state != IndexState::Ready {
270 return Ok(VisibleIndexes::none());
271 }
272 debug_assert_eq!(state, IndexState::Ready);
273
274 Ok(VisibleIndexes::planner_visible(model.indexes()))
277 }
278
279 #[must_use]
284 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
285 where
286 E: EntityKind<Canister = C>,
287 {
288 self.describe_entity_model(E::MODEL)
289 }
290
291 #[must_use]
293 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
294 describe_entity_model(model)
295 }
296
297 pub fn storage_report(
299 &self,
300 name_to_path: &[(&'static str, &'static str)],
301 ) -> Result<StorageReport, InternalError> {
302 self.db.storage_report(name_to_path)
303 }
304
305 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
307 self.db.storage_report_default()
308 }
309
310 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
312 self.db.integrity_report()
313 }
314
315 pub fn execute_migration_plan(
320 &self,
321 plan: &MigrationPlan,
322 max_steps: usize,
323 ) -> Result<MigrationRunOutcome, InternalError> {
324 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
325 }
326
327 #[must_use]
332 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
333 where
334 E: EntityKind<Canister = C> + EntityValue,
335 {
336 LoadExecutor::new(self.db, self.debug)
337 }
338
339 #[must_use]
340 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
341 where
342 E: PersistedRow<Canister = C> + EntityValue,
343 {
344 DeleteExecutor::new(self.db, self.debug)
345 }
346
347 #[must_use]
348 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
349 where
350 E: PersistedRow<Canister = C> + EntityValue,
351 {
352 SaveExecutor::new(self.db, self.debug)
353 }
354}
355
356#[doc(hidden)]
361pub fn debug_remove_entity_row_data_only<C, E>(
362 session: &DbSession<C>,
363 key: &E::Key,
364) -> Result<bool, InternalError>
365where
366 C: CanisterKind,
367 E: PersistedRow<Canister = C> + EntityValue,
368{
369 let store = session.db.recovered_store(E::Store::PATH)?;
372
373 let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
376 let raw_key = data_key.to_raw()?;
377 let storage_key = data_key.storage_key();
378
379 let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
383 if !removed {
384 return Ok(false);
385 }
386
387 store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
388
389 Ok(true)
390}
391
392#[doc(hidden)]
398pub fn debug_mark_store_index_state<C>(
399 session: &DbSession<C>,
400 store_path: &str,
401 state: IndexState,
402) -> Result<(), InternalError>
403where
404 C: CanisterKind,
405{
406 let store = session.db.recovered_store(store_path)?;
409
410 match state {
413 IndexState::Building => store.mark_index_building(),
414 IndexState::Ready => store.mark_index_ready(),
415 IndexState::Dropping => store.mark_index_dropping(),
416 }
417
418 Ok(())
419}