1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18 db::{
19 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20 FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21 QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
23 query::plan::VisibleIndexes,
24 schema::{
25 AcceptedRowLayoutRuntimeDescriptor, AcceptedSchemaSnapshot, describe_entity_fields,
26 describe_entity_fields_with_persisted_schema, describe_entity_model,
27 describe_entity_model_with_persisted_schema, ensure_accepted_schema_snapshot,
28 show_indexes_for_model, show_indexes_for_model_with_runtime_state,
29 },
30 },
31 error::InternalError,
32 metrics::sink::{MetricsSink, with_metrics_sink},
33 model::entity::EntityModel,
34 traits::{CanisterKind, EntityKind, EntityValue, Path},
35 value::Value,
36};
37use std::thread::LocalKey;
38
39#[cfg(feature = "diagnostics")]
40pub use query::{
41 DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
42 QueryExecutionAttribution,
43};
44pub(in crate::db) use response::finalize_structural_grouped_projection_result;
45pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
46#[cfg(feature = "sql")]
47pub use sql::SqlStatementResult;
48#[cfg(all(feature = "sql", feature = "diagnostics"))]
49pub use sql::{
50 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
51 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
52};
53#[cfg(all(feature = "sql", feature = "diagnostics"))]
54pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
55
56pub struct DbSession<C: CanisterKind> {
63 db: Db<C>,
64 debug: bool,
65 metrics: Option<&'static dyn MetricsSink>,
66}
67
68impl<C: CanisterKind> DbSession<C> {
69 #[must_use]
71 pub(crate) const fn new(db: Db<C>) -> Self {
72 Self {
73 db,
74 debug: false,
75 metrics: None,
76 }
77 }
78
79 #[must_use]
81 pub const fn new_with_hooks(
82 store: &'static LocalKey<StoreRegistry>,
83 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
84 ) -> Self {
85 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
86 }
87
88 #[must_use]
90 pub const fn debug(mut self) -> Self {
91 self.debug = true;
92 self
93 }
94
95 #[must_use]
97 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
98 self.metrics = Some(sink);
99 self
100 }
101
102 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
105 where
106 E: EntityKind<Canister = C>,
107 {
108 FluentLoadQuery::new(self, Query::new(consistency))
109 }
110
111 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
115 where
116 E: PersistedRow<Canister = C>,
117 {
118 FluentDeleteQuery::new(self, Query::new(consistency).delete())
119 }
120
121 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
122 if let Some(sink) = self.metrics {
123 with_metrics_sink(sink, f)
124 } else {
125 f()
126 }
127 }
128
129 fn execute_save_with<E, T, R>(
131 &self,
132 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
133 map: impl FnOnce(T) -> R,
134 ) -> Result<R, InternalError>
135 where
136 E: PersistedRow<Canister = C> + EntityValue,
137 {
138 self.ensure_generated_compatible_accepted_schema::<E>()?;
139 self.execute_save_with_checked_accepted_schema(op, map)
140 }
141
142 fn execute_save_with_checked_accepted_schema<E, T, R>(
147 &self,
148 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
149 map: impl FnOnce(T) -> R,
150 ) -> Result<R, InternalError>
151 where
152 E: PersistedRow<Canister = C> + EntityValue,
153 {
154 let value = self.with_metrics(|| op(self.save_executor::<E>()))?;
155
156 Ok(map(value))
157 }
158
159 fn execute_save_entity<E>(
161 &self,
162 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
163 ) -> Result<E, InternalError>
164 where
165 E: PersistedRow<Canister = C> + EntityValue,
166 {
167 self.execute_save_with(op, std::convert::identity)
168 }
169
170 fn execute_save_batch<E>(
171 &self,
172 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
173 ) -> Result<WriteBatchResponse<E>, InternalError>
174 where
175 E: PersistedRow<Canister = C> + EntityValue,
176 {
177 self.execute_save_with(op, WriteBatchResponse::new)
178 }
179
180 #[must_use]
186 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
187 where
188 E: EntityKind<Canister = C>,
189 {
190 self.fluent_load_query(MissingRowPolicy::Ignore)
191 }
192
193 #[must_use]
195 pub const fn load_with_consistency<E>(
196 &self,
197 consistency: MissingRowPolicy,
198 ) -> FluentLoadQuery<'_, E>
199 where
200 E: EntityKind<Canister = C>,
201 {
202 self.fluent_load_query(consistency)
203 }
204
205 #[must_use]
207 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
208 where
209 E: PersistedRow<Canister = C>,
210 {
211 self.fluent_delete_query(MissingRowPolicy::Ignore)
212 }
213
214 #[must_use]
216 pub fn delete_with_consistency<E>(
217 &self,
218 consistency: MissingRowPolicy,
219 ) -> FluentDeleteQuery<'_, E>
220 where
221 E: PersistedRow<Canister = C>,
222 {
223 self.fluent_delete_query(consistency)
224 }
225
226 #[must_use]
230 pub const fn select_one(&self) -> Value {
231 Value::Int(1)
232 }
233
234 #[must_use]
241 pub fn show_indexes<E>(&self) -> Vec<String>
242 where
243 E: EntityKind<Canister = C>,
244 {
245 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
246 }
247
248 #[must_use]
254 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
255 show_indexes_for_model(model)
256 }
257
258 pub(in crate::db) fn show_indexes_for_store_model(
262 &self,
263 store_path: &str,
264 model: &'static EntityModel,
265 ) -> Vec<String> {
266 let runtime_state = self
267 .db
268 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
269 .map(|store| store.index_state());
270
271 show_indexes_for_model_with_runtime_state(model, runtime_state)
272 }
273
274 #[must_use]
280 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
281 where
282 E: EntityKind<Canister = C>,
283 {
284 self.show_columns_for_model(E::MODEL)
285 }
286
287 #[must_use]
289 pub fn show_columns_for_model(
290 &self,
291 model: &'static EntityModel,
292 ) -> Vec<EntityFieldDescription> {
293 describe_entity_fields(model)
294 }
295
296 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
302 where
303 E: EntityKind<Canister = C>,
304 {
305 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
306
307 Ok(describe_entity_fields_with_persisted_schema(
308 E::MODEL,
309 &snapshot,
310 ))
311 }
312
313 #[must_use]
315 pub fn show_entities(&self) -> Vec<String> {
316 self.db.runtime_entity_names()
317 }
318
319 #[must_use]
324 pub fn show_tables(&self) -> Vec<String> {
325 self.show_entities()
326 }
327
328 fn visible_indexes_for_store_model(
331 &self,
332 store_path: &str,
333 model: &'static EntityModel,
334 ) -> Result<VisibleIndexes<'static>, QueryError> {
335 let store = self
338 .db
339 .recovered_store(store_path)
340 .map_err(QueryError::execute)?;
341 let state = store.index_state();
342 if state != IndexState::Ready {
343 return Ok(VisibleIndexes::none());
344 }
345 debug_assert_eq!(state, IndexState::Ready);
346
347 Ok(VisibleIndexes::planner_visible(model.indexes()))
350 }
351
352 #[must_use]
358 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
359 where
360 E: EntityKind<Canister = C>,
361 {
362 self.describe_entity_model(E::MODEL)
363 }
364
365 #[must_use]
367 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
368 describe_entity_model(model)
369 }
370
371 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
377 where
378 E: EntityKind<Canister = C>,
379 {
380 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
381
382 Ok(describe_entity_model_with_persisted_schema(
383 E::MODEL,
384 &snapshot,
385 ))
386 }
387
388 fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
392 where
393 E: EntityKind<Canister = C>,
394 {
395 self.ensure_accepted_schema_snapshot_for_authority(EntityAuthority::for_type::<E>())
396 }
397
398 fn ensure_accepted_schema_snapshot_for_authority(
402 &self,
403 authority: EntityAuthority,
404 ) -> Result<AcceptedSchemaSnapshot, InternalError> {
405 let store = self.db.recovered_store(authority.store_path())?;
406
407 store.with_schema_mut(|schema_store| {
408 ensure_accepted_schema_snapshot(
409 schema_store,
410 authority.entity_tag(),
411 authority.entity_path(),
412 authority.model(),
413 )
414 })
415 }
416
417 fn ensure_accepted_schema_snapshot_and_authority(
421 &self,
422 authority: EntityAuthority,
423 ) -> Result<(AcceptedSchemaSnapshot, EntityAuthority), InternalError> {
424 let accepted_schema = self.ensure_accepted_schema_snapshot_for_authority(authority)?;
425 let accepted_row_layout =
426 AcceptedRowLayoutRuntimeDescriptor::from_accepted_schema(&accepted_schema)?;
427 let authority = authority.with_accepted_row_layout(&accepted_row_layout)?;
428
429 Ok((accepted_schema, authority))
430 }
431
432 fn ensure_generated_compatible_accepted_schema<E>(
437 &self,
438 ) -> Result<AcceptedSchemaSnapshot, InternalError>
439 where
440 E: EntityKind<Canister = C>,
441 {
442 let (accepted_schema, _) =
443 self.ensure_accepted_schema_snapshot_and_authority(EntityAuthority::for_type::<E>())?;
444
445 Ok(accepted_schema)
446 }
447
448 pub fn storage_report(
450 &self,
451 name_to_path: &[(&'static str, &'static str)],
452 ) -> Result<StorageReport, InternalError> {
453 self.db.storage_report(name_to_path)
454 }
455
456 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
458 self.db.storage_report_default()
459 }
460
461 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
463 self.db.integrity_report()
464 }
465
466 #[must_use]
471 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
472 where
473 E: EntityKind<Canister = C> + EntityValue,
474 {
475 LoadExecutor::new(self.db, self.debug)
476 }
477
478 #[must_use]
479 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
480 where
481 E: PersistedRow<Canister = C> + EntityValue,
482 {
483 DeleteExecutor::new(self.db)
484 }
485
486 #[must_use]
487 pub(in crate::db) const fn save_executor<E>(&self) -> SaveExecutor<E>
488 where
489 E: PersistedRow<Canister = C> + EntityValue,
490 {
491 SaveExecutor::new(self.db, self.debug)
492 }
493}