1mod query;
7#[cfg(feature = "sql")]
8mod sql;
9#[cfg(all(test, feature = "sql"))]
13mod tests;
14mod write;
15
16use crate::{
17 db::{
18 Db, EntityFieldDescription, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
19 IndexState, IntegrityReport, MigrationPlan, MigrationRunOutcome, MissingRowPolicy,
20 PersistedRow, Query, QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
21 commit::EntityRuntimeHooks,
22 cursor::{decode_optional_cursor_token, decode_optional_grouped_cursor_token},
23 data::DataKey,
24 executor::{DeleteExecutor, LoadExecutor, SaveExecutor},
25 query::plan::VisibleIndexes,
26 schema::{
27 describe_entity_model, show_indexes_for_model,
28 show_indexes_for_model_with_runtime_state,
29 },
30 },
31 error::InternalError,
32 metrics::sink::{MetricsSink, with_metrics_sink},
33 model::entity::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue, Path},
35 value::Value,
36};
37use std::thread::LocalKey;
38
39#[cfg(feature = "sql")]
40pub use sql::{SqlDispatchResult, SqlParsedStatement, SqlStatementRoute};
41
42fn decode_optional_cursor_bytes(cursor_token: Option<&str>) -> Result<Option<Vec<u8>>, QueryError> {
45 decode_optional_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
46}
47
48fn decode_optional_grouped_cursor(
51 cursor_token: Option<&str>,
52) -> Result<Option<crate::db::cursor::GroupedContinuationToken>, QueryError> {
53 decode_optional_grouped_cursor_token(cursor_token).map_err(QueryError::from_cursor_plan_error)
54}
55
56pub struct DbSession<C: CanisterKind> {
63 db: Db<C>,
64 debug: bool,
65 metrics: Option<&'static dyn MetricsSink>,
66}
67
68impl<C: CanisterKind> DbSession<C> {
69 #[must_use]
71 pub(crate) const fn new(db: Db<C>) -> Self {
72 Self {
73 db,
74 debug: false,
75 metrics: None,
76 }
77 }
78
79 #[must_use]
81 pub const fn new_with_hooks(
82 store: &'static LocalKey<StoreRegistry>,
83 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84 ) -> Self {
85 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
86 }
87
88 #[must_use]
90 pub const fn debug(mut self) -> Self {
91 self.debug = true;
92 self
93 }
94
95 #[must_use]
97 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
98 self.metrics = Some(sink);
99 self
100 }
101
102 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
103 if let Some(sink) = self.metrics {
104 with_metrics_sink(sink, f)
105 } else {
106 f()
107 }
108 }
109
110 fn execute_save_with<E, T, R>(
112 &self,
113 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
114 map: impl FnOnce(T) -> R,
115 ) -> Result<R, InternalError>
116 where
117 E: PersistedRow<Canister = C> + EntityValue,
118 {
119 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
120
121 Ok(map(value))
122 }
123
124 fn execute_save_entity<E>(
126 &self,
127 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
128 ) -> Result<E, InternalError>
129 where
130 E: PersistedRow<Canister = C> + EntityValue,
131 {
132 self.execute_save_with(op, std::convert::identity)
133 }
134
135 fn execute_save_batch<E>(
136 &self,
137 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
138 ) -> Result<WriteBatchResponse<E>, InternalError>
139 where
140 E: PersistedRow<Canister = C> + EntityValue,
141 {
142 self.execute_save_with(op, WriteBatchResponse::new)
143 }
144
145 #[must_use]
151 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
152 where
153 E: EntityKind<Canister = C>,
154 {
155 FluentLoadQuery::new(self, Query::new(MissingRowPolicy::Ignore))
156 }
157
158 #[must_use]
160 pub const fn load_with_consistency<E>(
161 &self,
162 consistency: MissingRowPolicy,
163 ) -> FluentLoadQuery<'_, E>
164 where
165 E: EntityKind<Canister = C>,
166 {
167 FluentLoadQuery::new(self, Query::new(consistency))
168 }
169
170 #[must_use]
172 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
173 where
174 E: PersistedRow<Canister = C>,
175 {
176 FluentDeleteQuery::new(self, Query::new(MissingRowPolicy::Ignore).delete())
177 }
178
179 #[must_use]
181 pub fn delete_with_consistency<E>(
182 &self,
183 consistency: MissingRowPolicy,
184 ) -> FluentDeleteQuery<'_, E>
185 where
186 E: PersistedRow<Canister = C>,
187 {
188 FluentDeleteQuery::new(self, Query::new(consistency).delete())
189 }
190
191 #[must_use]
195 pub const fn select_one(&self) -> Value {
196 Value::Int(1)
197 }
198
199 #[must_use]
206 pub fn show_indexes<E>(&self) -> Vec<String>
207 where
208 E: EntityKind<Canister = C>,
209 {
210 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
211 }
212
213 #[must_use]
219 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
220 show_indexes_for_model(model)
221 }
222
223 pub(in crate::db) fn show_indexes_for_store_model(
227 &self,
228 store_path: &str,
229 model: &'static EntityModel,
230 ) -> Vec<String> {
231 let runtime_state = self.try_index_state_for_store_path(store_path);
232
233 show_indexes_for_model_with_runtime_state(model, runtime_state)
234 }
235
236 #[must_use]
238 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
239 where
240 E: EntityKind<Canister = C>,
241 {
242 self.show_columns_for_model(E::MODEL)
243 }
244
245 #[must_use]
247 pub fn show_columns_for_model(
248 &self,
249 model: &'static EntityModel,
250 ) -> Vec<EntityFieldDescription> {
251 describe_entity_model(model).fields().to_vec()
252 }
253
254 #[must_use]
256 pub fn show_entities(&self) -> Vec<String> {
257 self.db.runtime_entity_names()
258 }
259
260 fn try_index_state_for_store_path(&self, store_path: &str) -> Option<IndexState> {
265 self.db
266 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
267 .map(|store| store.index_state())
268 }
269
270 fn visible_indexes_for_store_model(
273 &self,
274 store_path: &str,
275 model: &'static EntityModel,
276 ) -> Result<VisibleIndexes<'static>, QueryError> {
277 let store = self
280 .db
281 .recovered_store(store_path)
282 .map_err(QueryError::execute)?;
283 let state = store.index_state();
284 if state != IndexState::Ready {
285 return Ok(VisibleIndexes::none());
286 }
287 debug_assert_eq!(state, IndexState::Ready);
288
289 Ok(VisibleIndexes::planner_visible(model.indexes()))
292 }
293
294 #[must_use]
299 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
300 where
301 E: EntityKind<Canister = C>,
302 {
303 self.describe_entity_model(E::MODEL)
304 }
305
306 #[must_use]
308 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
309 describe_entity_model(model)
310 }
311
312 pub fn storage_report(
314 &self,
315 name_to_path: &[(&'static str, &'static str)],
316 ) -> Result<StorageReport, InternalError> {
317 self.db.storage_report(name_to_path)
318 }
319
320 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
322 self.db.storage_report_default()
323 }
324
325 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
327 self.db.integrity_report()
328 }
329
330 pub fn execute_migration_plan(
335 &self,
336 plan: &MigrationPlan,
337 max_steps: usize,
338 ) -> Result<MigrationRunOutcome, InternalError> {
339 self.with_metrics(|| self.db.execute_migration_plan(plan, max_steps))
340 }
341
342 #[must_use]
347 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
348 where
349 E: EntityKind<Canister = C> + EntityValue,
350 {
351 LoadExecutor::new(self.db, self.debug)
352 }
353
354 #[must_use]
355 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
356 where
357 E: PersistedRow<Canister = C> + EntityValue,
358 {
359 DeleteExecutor::new(self.db, self.debug)
360 }
361
362 #[must_use]
363 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
364 where
365 E: PersistedRow<Canister = C> + EntityValue,
366 {
367 SaveExecutor::new(self.db, self.debug)
368 }
369}
370
371#[doc(hidden)]
376pub fn debug_remove_entity_row_data_only<C, E>(
377 session: &DbSession<C>,
378 key: &E::Key,
379) -> Result<bool, InternalError>
380where
381 C: CanisterKind,
382 E: PersistedRow<Canister = C> + EntityValue,
383{
384 let store = session.db.recovered_store(E::Store::PATH)?;
387
388 let data_key = DataKey::try_from_field_value(E::ENTITY_TAG, key)?;
391 let raw_key = data_key.to_raw()?;
392 let storage_key = data_key.storage_key();
393
394 let removed = store.with_data_mut(|data| data.remove(&raw_key).is_some());
398 if !removed {
399 return Ok(false);
400 }
401
402 store.with_index_mut(|index| index.mark_memberships_missing_for_storage_key(storage_key))?;
403
404 Ok(true)
405}
406
407#[doc(hidden)]
413pub fn debug_mark_store_index_state<C>(
414 session: &DbSession<C>,
415 store_path: &str,
416 state: IndexState,
417) -> Result<(), InternalError>
418where
419 C: CanisterKind,
420{
421 let store = session.db.recovered_store(store_path)?;
424
425 match state {
428 IndexState::Building => store.mark_index_building(),
429 IndexState::Ready => store.mark_index_ready(),
430 IndexState::Dropping => store.mark_index_dropping(),
431 }
432
433 Ok(())
434}