1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17use crate::{
18 db::{
19 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
20 FluentLoadQuery, IndexState, IntegrityReport, MissingRowPolicy, PersistedRow, Query,
21 QueryError, StorageReport, StoreRegistry, WriteBatchResponse,
22 commit::CommitSchemaFingerprint,
23 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
24 query::plan::VisibleIndexes,
25 schema::{
26 AcceptedRowDecodeContract, AcceptedRowLayoutRuntimeDescriptor, AcceptedSchemaSnapshot,
27 SchemaInfo, accepted_commit_schema_fingerprint, describe_entity_fields,
28 describe_entity_fields_with_persisted_schema, describe_entity_model,
29 describe_entity_model_with_persisted_schema, ensure_accepted_schema_snapshot,
30 show_indexes_for_model, show_indexes_for_model_with_runtime_state,
31 show_indexes_for_schema_info_with_runtime_state,
32 },
33 },
34 error::InternalError,
35 metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
36 model::entity::EntityModel,
37 traits::{CanisterKind, EntityKind, EntityValue, Path},
38 value::Value,
39};
40use std::thread::LocalKey;
41
42#[cfg(feature = "diagnostics")]
43pub use query::{
44 DirectDataRowAttribution, GroupedCountAttribution, GroupedExecutionAttribution,
45 QueryExecutionAttribution,
46};
47pub(in crate::db) use response::finalize_structural_grouped_projection_result;
48pub(in crate::db) use response::{finalize_scalar_paged_execution, sql_grouped_cursor_from_bytes};
49#[cfg(all(feature = "sql", feature = "diagnostics"))]
50pub use sql::{
51 SqlCompileAttribution, SqlExecutionAttribution, SqlPureCoveringAttribution,
52 SqlQueryCacheAttribution, SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
53};
54#[cfg(feature = "sql")]
55pub use sql::{
56 SqlDdlExecutionStatus, SqlDdlMutationKind, SqlDdlPreparationReport, SqlStatementResult,
57 sql_statement_entity_name,
58};
59#[cfg(all(feature = "sql", feature = "diagnostics"))]
60pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
61
62pub struct DbSession<C: CanisterKind> {
69 db: Db<C>,
70 debug: bool,
71 metrics: Option<&'static dyn MetricsSink>,
72}
73
74impl<C: CanisterKind> DbSession<C> {
75 #[must_use]
77 pub(crate) const fn new(db: Db<C>) -> Self {
78 Self {
79 db,
80 debug: false,
81 metrics: None,
82 }
83 }
84
85 #[must_use]
87 pub const fn new_with_hooks(
88 store: &'static LocalKey<StoreRegistry>,
89 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
90 ) -> Self {
91 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
92 }
93
94 #[must_use]
96 pub const fn debug(mut self) -> Self {
97 self.debug = true;
98 self
99 }
100
101 #[must_use]
103 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
104 self.metrics = Some(sink);
105 self
106 }
107
108 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
111 where
112 E: EntityKind<Canister = C>,
113 {
114 FluentLoadQuery::new(self, Query::new(consistency))
115 }
116
117 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
121 where
122 E: PersistedRow<Canister = C>,
123 {
124 FluentDeleteQuery::new(self, Query::new(consistency).delete())
125 }
126
127 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
128 if let Some(sink) = self.metrics {
129 with_metrics_sink(sink, f)
130 } else {
131 f()
132 }
133 }
134
135 fn execute_save_with<E, T, R>(
137 &self,
138 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
139 map: impl FnOnce(T) -> R,
140 ) -> Result<R, InternalError>
141 where
142 E: PersistedRow<Canister = C> + EntityValue,
143 {
144 let (contract, schema_info, schema_fingerprint) = match self
145 .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
146 {
147 Ok(authority) => authority,
148 Err(error) => {
149 self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
150
151 return Err(error);
152 }
153 };
154 let value = self.with_metrics(|| {
155 op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
156 })?;
157
158 Ok(map(value))
159 }
160
161 fn execute_save_with_checked_accepted_row_contract<E, T, R>(
166 &self,
167 accepted_row_decode_contract: AcceptedRowDecodeContract,
168 accepted_schema_info: SchemaInfo,
169 accepted_schema_fingerprint: CommitSchemaFingerprint,
170 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
171 map: impl FnOnce(T) -> R,
172 ) -> Result<R, InternalError>
173 where
174 E: PersistedRow<Canister = C> + EntityValue,
175 {
176 let value = self.with_metrics(|| {
177 op(self.save_executor::<E>(
178 accepted_row_decode_contract,
179 accepted_schema_info,
180 accepted_schema_fingerprint,
181 ))
182 })?;
183
184 Ok(map(value))
185 }
186
187 fn execute_save_entity<E>(
189 &self,
190 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
191 ) -> Result<E, InternalError>
192 where
193 E: PersistedRow<Canister = C> + EntityValue,
194 {
195 self.execute_save_with(op, std::convert::identity)
196 }
197
198 fn execute_save_batch<E>(
199 &self,
200 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
201 ) -> Result<WriteBatchResponse<E>, InternalError>
202 where
203 E: PersistedRow<Canister = C> + EntityValue,
204 {
205 self.execute_save_with(op, WriteBatchResponse::new)
206 }
207
208 #[must_use]
214 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
215 where
216 E: EntityKind<Canister = C>,
217 {
218 self.fluent_load_query(MissingRowPolicy::Ignore)
219 }
220
221 #[must_use]
223 pub const fn load_with_consistency<E>(
224 &self,
225 consistency: MissingRowPolicy,
226 ) -> FluentLoadQuery<'_, E>
227 where
228 E: EntityKind<Canister = C>,
229 {
230 self.fluent_load_query(consistency)
231 }
232
233 #[must_use]
235 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
236 where
237 E: PersistedRow<Canister = C>,
238 {
239 self.fluent_delete_query(MissingRowPolicy::Ignore)
240 }
241
242 #[must_use]
244 pub fn delete_with_consistency<E>(
245 &self,
246 consistency: MissingRowPolicy,
247 ) -> FluentDeleteQuery<'_, E>
248 where
249 E: PersistedRow<Canister = C>,
250 {
251 self.fluent_delete_query(consistency)
252 }
253
254 #[must_use]
258 pub const fn select_one(&self) -> Value {
259 Value::Int(1)
260 }
261
262 #[must_use]
269 pub fn show_indexes<E>(&self) -> Vec<String>
270 where
271 E: EntityKind<Canister = C>,
272 {
273 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
274 }
275
276 #[must_use]
282 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
283 show_indexes_for_model(model)
284 }
285
286 pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
291 where
292 E: EntityKind<Canister = C>,
293 {
294 let schema = self.accepted_schema_info_for_entity::<E>()?;
295
296 Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
297 }
298
299 pub(in crate::db) fn show_indexes_for_store_model(
303 &self,
304 store_path: &str,
305 model: &'static EntityModel,
306 ) -> Vec<String> {
307 let runtime_state = self
308 .db
309 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
310 .map(|store| store.index_state());
311
312 show_indexes_for_model_with_runtime_state(model, runtime_state)
313 }
314
315 pub(in crate::db) fn show_indexes_for_store_schema_info(
319 &self,
320 store_path: &str,
321 schema: &SchemaInfo,
322 ) -> Vec<String> {
323 let runtime_state = self
324 .db
325 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
326 .map(|store| store.index_state());
327
328 show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
329 }
330
331 #[must_use]
337 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
338 where
339 E: EntityKind<Canister = C>,
340 {
341 self.show_columns_for_model(E::MODEL)
342 }
343
344 #[must_use]
346 pub fn show_columns_for_model(
347 &self,
348 model: &'static EntityModel,
349 ) -> Vec<EntityFieldDescription> {
350 describe_entity_fields(model)
351 }
352
353 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
359 where
360 E: EntityKind<Canister = C>,
361 {
362 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
363
364 Ok(describe_entity_fields_with_persisted_schema(&snapshot))
365 }
366
367 #[must_use]
369 pub fn show_entities(&self) -> Vec<String> {
370 self.db.runtime_entity_names()
371 }
372
373 #[must_use]
378 pub fn show_tables(&self) -> Vec<String> {
379 self.show_entities()
380 }
381
382 fn visible_indexes_for_store_accepted_schema(
385 &self,
386 store_path: &str,
387 schema_info: &SchemaInfo,
388 ) -> Result<VisibleIndexes<'static>, QueryError> {
389 let store = self
392 .db
393 .recovered_store(store_path)
394 .map_err(QueryError::execute)?;
395 let state = store.index_state();
396 if state != IndexState::Ready {
397 return Ok(VisibleIndexes::none());
398 }
399 debug_assert_eq!(state, IndexState::Ready);
400
401 let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
404 debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
405 debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
406 debug_assert_eq!(
407 visible_indexes.accepted_expression_index_count(),
408 Some(visible_indexes.accepted_expression_indexes().len()),
409 );
410
411 Ok(visible_indexes)
412 }
413
414 #[must_use]
420 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
421 where
422 E: EntityKind<Canister = C>,
423 {
424 self.describe_entity_model(E::MODEL)
425 }
426
427 #[must_use]
429 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
430 describe_entity_model(model)
431 }
432
433 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
439 where
440 E: EntityKind<Canister = C>,
441 {
442 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
443
444 Ok(describe_entity_model_with_persisted_schema(
445 E::MODEL,
446 &snapshot,
447 ))
448 }
449
450 fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
454 where
455 E: EntityKind<Canister = C>,
456 {
457 let store = self.db.recovered_store(E::Store::PATH)?;
458
459 store.with_schema_mut(|schema_store| {
460 ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
461 })
462 }
463
464 pub(in crate::db) fn accepted_schema_info_for_entity<E>(
468 &self,
469 ) -> Result<SchemaInfo, InternalError>
470 where
471 E: EntityKind<Canister = C>,
472 {
473 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
474
475 Ok(
476 SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
477 E::MODEL,
478 &accepted_schema,
479 true,
480 ),
481 )
482 }
483
484 pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
488 accepted_schema: &AcceptedSchemaSnapshot,
489 ) -> Result<EntityAuthority, InternalError>
490 where
491 E: EntityKind<Canister = C>,
492 {
493 EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
494 }
495
496 pub(in crate::db) fn accepted_entity_authority<E>(
500 &self,
501 ) -> Result<(AcceptedSchemaSnapshot, EntityAuthority), InternalError>
502 where
503 E: EntityKind<Canister = C>,
504 {
505 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
506 let authority = Self::accepted_entity_authority_for_schema::<E>(&accepted_schema)?;
507
508 Ok((accepted_schema, authority))
509 }
510
511 fn ensure_generated_compatible_accepted_save_schema<E>(
516 &self,
517 ) -> Result<
518 (
519 AcceptedRowDecodeContract,
520 SchemaInfo,
521 CommitSchemaFingerprint,
522 ),
523 InternalError,
524 >
525 where
526 E: EntityKind<Canister = C>,
527 {
528 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
529 let (accepted_row_layout, _) =
530 AcceptedRowLayoutRuntimeDescriptor::from_generated_compatible_schema(
531 &accepted_schema,
532 E::MODEL,
533 )?;
534 let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, &accepted_schema);
535 let schema_fingerprint = accepted_commit_schema_fingerprint(&accepted_schema)?;
536
537 Ok((
538 accepted_row_layout.row_decode_contract(),
539 schema_info,
540 schema_fingerprint,
541 ))
542 }
543
544 pub fn storage_report(
546 &self,
547 name_to_path: &[(&'static str, &'static str)],
548 ) -> Result<StorageReport, InternalError> {
549 self.db.storage_report(name_to_path)
550 }
551
552 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
554 self.db.storage_report_default()
555 }
556
557 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
559 self.db.integrity_report()
560 }
561
562 #[must_use]
567 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
568 where
569 E: EntityKind<Canister = C> + EntityValue,
570 {
571 LoadExecutor::new(self.db, self.debug)
572 }
573
574 #[must_use]
575 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
576 where
577 E: PersistedRow<Canister = C> + EntityValue,
578 {
579 DeleteExecutor::new(self.db)
580 }
581
582 #[must_use]
583 pub(in crate::db) const fn save_executor<E>(
584 &self,
585 accepted_row_decode_contract: AcceptedRowDecodeContract,
586 accepted_schema_info: SchemaInfo,
587 accepted_schema_fingerprint: CommitSchemaFingerprint,
588 ) -> SaveExecutor<E>
589 where
590 E: PersistedRow<Canister = C> + EntityValue,
591 {
592 SaveExecutor::new_with_accepted_contract(
593 self.db,
594 self.debug,
595 accepted_row_decode_contract,
596 accepted_schema_info,
597 accepted_schema_fingerprint,
598 )
599 }
600}