1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18 db::{
19 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20 FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21 QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
23 query::plan::VisibleIndexes,
24 schema::{
25 AcceptedSchemaSnapshot, describe_entity_fields,
26 describe_entity_fields_with_persisted_schema, describe_entity_model,
27 describe_entity_model_with_persisted_schema, ensure_initial_schema_snapshot,
28 show_indexes_for_model, show_indexes_for_model_with_runtime_state,
29 },
30 },
31 error::InternalError,
32 metrics::sink::{MetricsSink, with_metrics_sink},
33 model::entity::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue, Path},
35 value::Value,
36};
37use std::thread::LocalKey;
38
39#[cfg(feature = "diagnostics")]
40pub use query::{
41 DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
42 QueryExecutionAttribution,
43};
44pub(in crate::db) use response::finalize_structural_grouped_projection_result;
45pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
46#[cfg(feature = "sql")]
47pub use sql::SqlStatementResult;
48#[cfg(all(feature = "sql", feature = "diagnostics"))]
49pub use sql::{
50 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
51 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
52};
53#[cfg(all(feature = "sql", feature = "diagnostics"))]
54pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
55
56pub struct DbSession<C: CanisterKind> {
63 db: Db<C>,
64 debug: bool,
65 metrics: Option<&'static dyn MetricsSink>,
66}
67
68impl<C: CanisterKind> DbSession<C> {
69 #[must_use]
71 pub(crate) const fn new(db: Db<C>) -> Self {
72 Self {
73 db,
74 debug: false,
75 metrics: None,
76 }
77 }
78
79 #[must_use]
81 pub const fn new_with_hooks(
82 store: &'static LocalKey<StoreRegistry>,
83 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84 ) -> Self {
85 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
86 }
87
88 #[must_use]
90 pub const fn debug(mut self) -> Self {
91 self.debug = true;
92 self
93 }
94
95 #[must_use]
97 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
98 self.metrics = Some(sink);
99 self
100 }
101
102 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
105 where
106 E: EntityKind<Canister = C>,
107 {
108 FluentLoadQuery::new(self, Query::new(consistency))
109 }
110
111 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
115 where
116 E: PersistedRow<Canister = C>,
117 {
118 FluentDeleteQuery::new(self, Query::new(consistency).delete())
119 }
120
121 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
122 if let Some(sink) = self.metrics {
123 with_metrics_sink(sink, f)
124 } else {
125 f()
126 }
127 }
128
129 fn execute_save_with<E, T, R>(
131 &self,
132 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
133 map: impl FnOnce(T) -> R,
134 ) -> Result<R, InternalError>
135 where
136 E: PersistedRow<Canister = C> + EntityValue,
137 {
138 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
139
140 Ok(map(value))
141 }
142
143 fn execute_save_entity<E>(
145 &self,
146 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
147 ) -> Result<E, InternalError>
148 where
149 E: PersistedRow<Canister = C> + EntityValue,
150 {
151 self.execute_save_with(op, std::convert::identity)
152 }
153
154 fn execute_save_batch<E>(
155 &self,
156 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
157 ) -> Result<WriteBatchResponse<E>, InternalError>
158 where
159 E: PersistedRow<Canister = C> + EntityValue,
160 {
161 self.execute_save_with(op, WriteBatchResponse::new)
162 }
163
164 #[must_use]
170 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
171 where
172 E: EntityKind<Canister = C>,
173 {
174 self.fluent_load_query(MissingRowPolicy::Ignore)
175 }
176
177 #[must_use]
179 pub const fn load_with_consistency<E>(
180 &self,
181 consistency: MissingRowPolicy,
182 ) -> FluentLoadQuery<'_, E>
183 where
184 E: EntityKind<Canister = C>,
185 {
186 self.fluent_load_query(consistency)
187 }
188
189 #[must_use]
191 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
192 where
193 E: PersistedRow<Canister = C>,
194 {
195 self.fluent_delete_query(MissingRowPolicy::Ignore)
196 }
197
198 #[must_use]
200 pub fn delete_with_consistency<E>(
201 &self,
202 consistency: MissingRowPolicy,
203 ) -> FluentDeleteQuery<'_, E>
204 where
205 E: PersistedRow<Canister = C>,
206 {
207 self.fluent_delete_query(consistency)
208 }
209
210 #[must_use]
214 pub const fn select_one(&self) -> Value {
215 Value::Int(1)
216 }
217
218 #[must_use]
225 pub fn show_indexes<E>(&self) -> Vec<String>
226 where
227 E: EntityKind<Canister = C>,
228 {
229 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
230 }
231
232 #[must_use]
238 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
239 show_indexes_for_model(model)
240 }
241
242 pub(in crate::db) fn show_indexes_for_store_model(
246 &self,
247 store_path: &str,
248 model: &'static EntityModel,
249 ) -> Vec<String> {
250 let runtime_state = self
251 .db
252 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
253 .map(|store| store.index_state());
254
255 show_indexes_for_model_with_runtime_state(model, runtime_state)
256 }
257
258 #[must_use]
264 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
265 where
266 E: EntityKind<Canister = C>,
267 {
268 self.show_columns_for_model(E::MODEL)
269 }
270
271 #[must_use]
273 pub fn show_columns_for_model(
274 &self,
275 model: &'static EntityModel,
276 ) -> Vec<EntityFieldDescription> {
277 describe_entity_fields(model)
278 }
279
280 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
286 where
287 E: EntityKind<Canister = C>,
288 {
289 let snapshot = self.ensure_accepted_initial_schema_snapshot::<E>()?;
290
291 Ok(describe_entity_fields_with_persisted_schema(
292 E::MODEL,
293 &snapshot,
294 ))
295 }
296
297 #[must_use]
299 pub fn show_entities(&self) -> Vec<String> {
300 self.db.runtime_entity_names()
301 }
302
303 #[must_use]
308 pub fn show_tables(&self) -> Vec<String> {
309 self.show_entities()
310 }
311
312 fn visible_indexes_for_store_model(
315 &self,
316 store_path: &str,
317 model: &'static EntityModel,
318 ) -> Result<VisibleIndexes<'static>, QueryError> {
319 let store = self
322 .db
323 .recovered_store(store_path)
324 .map_err(QueryError::execute)?;
325 let state = store.index_state();
326 if state != IndexState::Ready {
327 return Ok(VisibleIndexes::none());
328 }
329 debug_assert_eq!(state, IndexState::Ready);
330
331 Ok(VisibleIndexes::planner_visible(model.indexes()))
334 }
335
336 #[must_use]
342 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
343 where
344 E: EntityKind<Canister = C>,
345 {
346 self.describe_entity_model(E::MODEL)
347 }
348
349 #[must_use]
351 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
352 describe_entity_model(model)
353 }
354
355 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
361 where
362 E: EntityKind<Canister = C>,
363 {
364 let snapshot = self.ensure_accepted_initial_schema_snapshot::<E>()?;
365
366 Ok(describe_entity_model_with_persisted_schema(
367 E::MODEL,
368 &snapshot,
369 ))
370 }
371
372 fn ensure_accepted_initial_schema_snapshot<E>(
377 &self,
378 ) -> Result<AcceptedSchemaSnapshot, InternalError>
379 where
380 E: EntityKind<Canister = C>,
381 {
382 self.ensure_accepted_initial_schema_snapshot_for_authority(EntityAuthority::for_type::<E>())
383 }
384
385 fn ensure_accepted_initial_schema_snapshot_for_authority(
390 &self,
391 authority: EntityAuthority,
392 ) -> Result<AcceptedSchemaSnapshot, InternalError> {
393 let store = self.db.recovered_store(authority.store_path())?;
394
395 store.with_schema_mut(|schema_store| {
396 ensure_initial_schema_snapshot(
397 schema_store,
398 authority.entity_tag(),
399 authority.entity_path(),
400 authority.model(),
401 )
402 })
403 }
404
405 pub fn storage_report(
407 &self,
408 name_to_path: &[(&'static str, &'static str)],
409 ) -> Result<StorageReport, InternalError> {
410 self.db.storage_report(name_to_path)
411 }
412
413 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
415 self.db.storage_report_default()
416 }
417
418 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
420 self.db.integrity_report()
421 }
422
423 #[must_use]
428 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
429 where
430 E: EntityKind<Canister = C> + EntityValue,
431 {
432 LoadExecutor::new(self.db, self.debug)
433 }
434
435 #[must_use]
436 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
437 where
438 E: PersistedRow<Canister = C> + EntityValue,
439 {
440 DeleteExecutor::new(self.db)
441 }
442
443 #[must_use]
444 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
445 where
446 E: PersistedRow<Canister = C> + EntityValue,
447 {
448 SaveExecutor::new(self.db, self.debug)
449 }
450}