1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17#[cfg(feature = "sql")]
18use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
19use crate::{
20 db::{
21 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
22 FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
23 StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
24 commit::CommitSchemaFingerprint,
25 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
26 schema::{
27 AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
28 AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
29 accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
30 describe_entity_fields, describe_entity_fields_with_persisted_schema,
31 describe_entity_model, describe_entity_model_with_persisted_schema,
32 ensure_accepted_schema_snapshot, show_indexes_for_model,
33 show_indexes_for_model_with_runtime_state,
34 show_indexes_for_schema_info_with_runtime_state,
35 },
36 },
37 error::InternalError,
38 metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
39 model::entity::EntityModel,
40 traits::{CanisterKind, EntityKind, EntityValue, Path},
41 value::Value,
42};
43use std::{
44 cell::{OnceCell, RefCell},
45 collections::HashMap,
46 thread::LocalKey,
47};
48
49#[cfg(feature = "diagnostics")]
50pub use query::{
51 DirectDataRowAttribution, FluentTerminalExecutionAttribution, GroupedCountAttribution,
52 GroupedExecutionAttribution, KernelRowAttribution, QueryExecutionAttribution,
53 ScalarAggregateAttribution,
54};
55pub(in crate::db) use response::finalize_scalar_paged_execution;
56pub(in crate::db) use response::finalize_structural_grouped_projection_result;
57#[cfg(feature = "sql")]
58pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
59#[cfg(feature = "sql")]
60pub use sql::{
61 SqlAdminBulkDeletePlan, SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind,
62 SqlDdlPreparationReport, SqlDeleteExecutionBounds, SqlDeleteExposurePolicy,
63 SqlDeleteOrderPolicy, SqlDeletePolicyContext, SqlDeletePolicyRejection, SqlDeletePolicyReport,
64 SqlDeleteReturningBounds, SqlDeleteReturningPolicy, SqlDeleteStatementClassification,
65 SqlDeleteWherePolicy, SqlPublicBoundedDeletePlan, SqlPublicBoundedUpdatePlan,
66 SqlPublicPrimaryKeyDeletePlan, SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentDeletePlan,
67 SqlSessionCurrentUpdatePlan, SqlStatementDispatch, SqlStatementResult,
68 SqlStatementShellSurface, SqlStatementSurface, SqlUpdateAssignmentPolicy,
69 SqlUpdateExposurePolicy, SqlUpdateOrderPolicy, SqlUpdatePolicyContext,
70 SqlUpdatePolicyRejection, SqlUpdatePolicyReport, SqlUpdateReturningBounds,
71 SqlUpdateReturningPolicy, SqlUpdateStatementClassification, SqlUpdateWherePolicy,
72 SqlValidatedDeletePlan, SqlValidatedUpdatePlan, classify_sql_delete_policy,
73 classify_sql_update_policy, sql_statement_dispatch, sql_statement_entity_name,
74 sql_statement_shell_surface, sql_statement_surface,
75};
76#[cfg(all(feature = "sql", feature = "diagnostics"))]
77pub use sql::{
78 SqlCompileAttribution, SqlExecutionAttribution, SqlHybridCoveringAttribution,
79 SqlOutputBlobAttribution, SqlPureCoveringAttribution, SqlQueryCacheAttribution,
80 SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
81};
82#[cfg(all(feature = "sql", feature = "diagnostics"))]
83pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
84
85pub struct DbSession<C: CanisterKind> {
92 db: Db<C>,
93 debug: bool,
94 metrics: Option<&'static dyn MetricsSink>,
95}
96
97#[cfg(test)]
98#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
99pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
100 pub schema_info_projections: u64,
101 pub persisted_schema_decodes: u64,
102 pub generated_compatible_row_layout_proofs: u64,
103 pub latest_by_entity_calls: u64,
104 pub visible_index_projections: u64,
105}
106
107#[derive(Clone, Debug)]
108struct AcceptedSchemaQueryCacheEntry {
109 snapshot: AcceptedSchemaSnapshot,
110 identity: AcceptedCatalogIdentity,
111}
112
113pub(in crate::db) type AcceptedSaveContract = (
114 AcceptedRowDecodeContract,
115 AcceptedRowDecodeContract,
116 SchemaInfo,
117 CommitSchemaFingerprint,
118);
119
120#[derive(Clone, Debug)]
121pub(in crate::db) struct AcceptedSchemaCatalogContext {
122 snapshot: AcceptedSchemaSnapshot,
123 identity: AcceptedCatalogIdentity,
124 schema_info: OnceCell<SchemaInfo>,
125}
126
127impl AcceptedSchemaCatalogContext {
128 #[must_use]
129 pub(in crate::db) const fn new(
130 snapshot: AcceptedSchemaSnapshot,
131 identity: AcceptedCatalogIdentity,
132 ) -> Self {
133 Self {
134 snapshot,
135 identity,
136 schema_info: OnceCell::new(),
137 }
138 }
139
140 #[must_use]
141 pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
142 &self.snapshot
143 }
144
145 #[must_use]
146 pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
147 self.identity.accepted_schema_version()
148 }
149
150 #[must_use]
151 pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
152 self.identity.accepted_schema_fingerprint()
153 }
154
155 #[must_use]
156 pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
157 self.identity.fingerprint_method_version()
158 }
159
160 #[must_use]
161 #[cfg(feature = "sql")]
162 pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
163 self.identity
164 }
165
166 fn debug_assert_matches_entity<E>(&self)
167 where
168 E: EntityKind,
169 {
170 debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
171 debug_assert_eq!(self.identity.entity_path(), E::PATH);
172 debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
173 }
174
175 pub(in crate::db) fn accepted_entity_authority_for<E>(
176 &self,
177 ) -> Result<EntityAuthority, InternalError>
178 where
179 E: EntityKind,
180 {
181 self.debug_assert_matches_entity::<E>();
182 let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
183 let (accepted_row_layout, row_proof) =
184 AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
185 &self.snapshot,
186 authority.model(),
187 )?;
188 let row_decode_contract = accepted_row_layout.row_decode_contract();
189
190 Ok(authority.with_accepted_row_decode_contract(
191 row_proof,
192 row_decode_contract,
193 self.accepted_schema_info_for::<E>(),
194 ))
195 }
196
197 #[must_use]
198 pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
199 where
200 E: EntityKind,
201 {
202 self.debug_assert_matches_entity::<E>();
203 self.schema_info
204 .get_or_init(|| {
205 SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
206 E::MODEL,
207 &self.snapshot,
208 true,
209 )
210 })
211 .clone()
212 }
213}
214
215pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
216 accepted_schema: &AcceptedSchemaSnapshot,
217 descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
218) -> Result<AcceptedSaveContract, InternalError>
219where
220 E: EntityKind,
221{
222 let row_decode_contract = descriptor.row_decode_contract();
223 let mutation_row_decode_contract = row_decode_contract.clone();
224 let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
225 let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
226
227 Ok((
228 row_decode_contract,
229 mutation_row_decode_contract,
230 schema_info,
231 schema_fingerprint,
232 ))
233}
234
235thread_local! {
236 static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
241 RefCell::new(HashMap::default());
242}
243
244impl<C: CanisterKind> DbSession<C> {
245 #[must_use]
247 pub(crate) const fn new(db: Db<C>) -> Self {
248 Self {
249 db,
250 debug: false,
251 metrics: None,
252 }
253 }
254
255 #[must_use]
257 pub const fn new_with_hooks(
258 store: &'static LocalKey<StoreRegistry>,
259 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
260 ) -> Self {
261 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
262 }
263
264 #[cfg(test)]
265 pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
266 crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
267 crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
268 crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
269 crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
270 query::reset_visible_index_projection_count_for_tests();
271 }
272
273 #[cfg(test)]
274 pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
275 -> AcceptedCatalogRuntimeCounterSnapshot {
276 AcceptedCatalogRuntimeCounterSnapshot {
277 schema_info_projections:
278 crate::db::schema::accepted_schema_info_projection_count_for_tests(),
279 persisted_schema_decodes:
280 crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
281 generated_compatible_row_layout_proofs:
282 crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
283 latest_by_entity_calls:
284 crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
285 visible_index_projections: query::visible_index_projection_count_for_tests(),
286 }
287 }
288
289 #[must_use]
291 pub const fn debug(mut self) -> Self {
292 self.debug = true;
293 self
294 }
295
296 #[must_use]
298 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
299 self.metrics = Some(sink);
300 self
301 }
302
303 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
306 where
307 E: EntityKind<Canister = C>,
308 {
309 FluentLoadQuery::new(self, Query::new(consistency))
310 }
311
312 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
316 where
317 E: PersistedRow<Canister = C>,
318 {
319 FluentDeleteQuery::new(self, Query::new(consistency).delete())
320 }
321
322 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
323 if let Some(sink) = self.metrics {
324 with_metrics_sink(sink, f)
325 } else {
326 f()
327 }
328 }
329
330 fn execute_save_with<E, T, R>(
332 &self,
333 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
334 map: impl FnOnce(T) -> R,
335 ) -> Result<R, InternalError>
336 where
337 E: PersistedRow<Canister = C> + EntityValue,
338 {
339 let (contract, schema_info, schema_fingerprint) = match self
340 .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
341 {
342 Ok(authority) => authority,
343 Err(error) => {
344 self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
345
346 return Err(error);
347 }
348 };
349 let value = self.with_metrics(|| {
350 op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
351 })?;
352
353 Ok(map(value))
354 }
355
356 fn execute_save_with_checked_accepted_row_contract<E, T, R>(
361 &self,
362 accepted_row_decode_contract: AcceptedRowDecodeContract,
363 accepted_schema_info: SchemaInfo,
364 accepted_schema_fingerprint: CommitSchemaFingerprint,
365 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
366 map: impl FnOnce(T) -> R,
367 ) -> Result<R, InternalError>
368 where
369 E: PersistedRow<Canister = C> + EntityValue,
370 {
371 let value = self.with_metrics(|| {
372 op(self.save_executor::<E>(
373 accepted_row_decode_contract,
374 accepted_schema_info,
375 accepted_schema_fingerprint,
376 ))
377 })?;
378
379 Ok(map(value))
380 }
381
382 fn execute_save_entity<E>(
384 &self,
385 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
386 ) -> Result<E, InternalError>
387 where
388 E: PersistedRow<Canister = C> + EntityValue,
389 {
390 self.execute_save_with(op, std::convert::identity)
391 }
392
393 fn execute_save_batch<E>(
394 &self,
395 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
396 ) -> Result<WriteBatchResponse<E>, InternalError>
397 where
398 E: PersistedRow<Canister = C> + EntityValue,
399 {
400 self.execute_save_with(op, WriteBatchResponse::new)
401 }
402
403 #[must_use]
409 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
410 where
411 E: EntityKind<Canister = C>,
412 {
413 self.fluent_load_query(MissingRowPolicy::Ignore)
414 }
415
416 #[must_use]
418 pub const fn load_with_consistency<E>(
419 &self,
420 consistency: MissingRowPolicy,
421 ) -> FluentLoadQuery<'_, E>
422 where
423 E: EntityKind<Canister = C>,
424 {
425 self.fluent_load_query(consistency)
426 }
427
428 #[must_use]
430 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
431 where
432 E: PersistedRow<Canister = C>,
433 {
434 self.fluent_delete_query(MissingRowPolicy::Ignore)
435 }
436
437 #[must_use]
439 pub fn delete_with_consistency<E>(
440 &self,
441 consistency: MissingRowPolicy,
442 ) -> FluentDeleteQuery<'_, E>
443 where
444 E: PersistedRow<Canister = C>,
445 {
446 self.fluent_delete_query(consistency)
447 }
448
449 #[must_use]
453 pub const fn select_one(&self) -> Value {
454 Value::Int64(1)
455 }
456
457 #[must_use]
464 pub fn show_indexes<E>(&self) -> Vec<String>
465 where
466 E: EntityKind<Canister = C>,
467 {
468 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
469 }
470
471 #[must_use]
477 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
478 show_indexes_for_model(model)
479 }
480
481 pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
486 where
487 E: EntityKind<Canister = C>,
488 {
489 let schema = self.accepted_schema_info_for_entity::<E>()?;
490
491 Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
492 }
493
494 pub(in crate::db) fn show_indexes_for_store_model(
498 &self,
499 store_path: &str,
500 model: &'static EntityModel,
501 ) -> Vec<String> {
502 let runtime_state = self
503 .db
504 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
505 .map(|store| store.index_state());
506
507 show_indexes_for_model_with_runtime_state(model, runtime_state)
508 }
509
510 pub(in crate::db) fn show_indexes_for_store_schema_info(
514 &self,
515 store_path: &str,
516 schema: &SchemaInfo,
517 ) -> Vec<String> {
518 let runtime_state = self
519 .db
520 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
521 .map(|store| store.index_state());
522
523 show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
524 }
525
526 #[must_use]
532 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
533 where
534 E: EntityKind<Canister = C>,
535 {
536 self.show_columns_for_model(E::MODEL)
537 }
538
539 #[must_use]
541 pub fn show_columns_for_model(
542 &self,
543 model: &'static EntityModel,
544 ) -> Vec<EntityFieldDescription> {
545 describe_entity_fields(model)
546 }
547
548 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
554 where
555 E: EntityKind<Canister = C>,
556 {
557 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
558
559 Ok(describe_entity_fields_with_persisted_schema(&snapshot))
560 }
561
562 #[must_use]
569 pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
570 self.try_show_entities().expect("session invariant")
571 }
572
573 pub fn try_show_entities(
575 &self,
576 ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
577 self.db.runtime_entity_catalog()
578 }
579
580 #[must_use]
582 pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
583 self.db.runtime_store_catalog()
584 }
585
586 #[must_use]
588 pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
589 self.db.runtime_memory_catalog()
590 }
591
592 #[cfg(feature = "sql")]
595 fn visible_indexes_for_store_accepted_schema(
596 &self,
597 store_path: &str,
598 schema_info: &SchemaInfo,
599 ) -> Result<VisibleIndexes<'static>, QueryError> {
600 let store = self
603 .db
604 .recovered_store(store_path)
605 .map_err(QueryError::execute)?;
606 let state = store.index_state();
607 if state != IndexState::Ready {
608 return Ok(VisibleIndexes::none());
609 }
610 debug_assert_eq!(state, IndexState::Ready);
611
612 let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
615 debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
616 debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
617 debug_assert_eq!(
618 visible_indexes.accepted_expression_index_count(),
619 Some(visible_indexes.accepted_expression_indexes().len()),
620 );
621
622 Ok(visible_indexes)
623 }
624
625 #[must_use]
631 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
632 where
633 E: EntityKind<Canister = C>,
634 {
635 self.describe_entity_model(E::MODEL)
636 }
637
638 #[must_use]
640 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
641 describe_entity_model(model)
642 }
643
644 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
650 where
651 E: EntityKind<Canister = C>,
652 {
653 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
654
655 Ok(describe_entity_model_with_persisted_schema(
656 E::MODEL,
657 &snapshot,
658 ))
659 }
660
661 fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
665 where
666 E: EntityKind<Canister = C>,
667 {
668 let store = self.db.recovered_store(E::Store::PATH)?;
669
670 store.with_schema_mut(|schema_store| {
671 ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
672 })
673 }
674
675 pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
680 &self,
681 ) -> Result<AcceptedSchemaCatalogContext, InternalError>
682 where
683 E: EntityKind<Canister = C>,
684 {
685 let cache_key = (self.db.cache_scope_id(), E::PATH);
686 if let Some(entry) =
687 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
688 {
689 return Ok(AcceptedSchemaCatalogContext::new(
690 entry.snapshot,
691 entry.identity,
692 ));
693 }
694
695 let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
696 let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
697 let identity = AcceptedCatalogIdentity::new(
698 E::ENTITY_TAG,
699 E::PATH,
700 E::Store::PATH,
701 snapshot.persisted_snapshot().version(),
702 fingerprint,
703 );
704 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
705 cache.borrow_mut().insert(
706 cache_key,
707 AcceptedSchemaQueryCacheEntry {
708 snapshot: snapshot.clone(),
709 identity,
710 },
711 );
712 });
713
714 Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
715 }
716
717 pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
718 &self,
719 ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
720 where
721 E: EntityKind<Canister = C>,
722 {
723 let store = self.db.recovered_store(E::Store::PATH)?;
724
725 store.with_schema_mut(|schema_store| {
726 schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
727 })
728 }
729
730 pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
731 &self,
732 selection: &AcceptedCatalogSnapshotSelection,
733 ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
734 where
735 E: EntityKind<Canister = C>,
736 {
737 let cache_key = (self.db.cache_scope_id(), E::PATH);
738 if let Some(entry) =
739 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
740 && entry.identity == selection.identity()
741 {
742 return Ok(Some(AcceptedSchemaCatalogContext::new(
743 entry.snapshot,
744 entry.identity,
745 )));
746 }
747
748 let snapshot = selection.decode_verified()?;
749 if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
750 return Ok(None);
751 }
752 let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
753
754 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
755 cache.borrow_mut().insert(
756 cache_key,
757 AcceptedSchemaQueryCacheEntry {
758 snapshot,
759 identity: selection.identity(),
760 },
761 );
762 });
763
764 Ok(Some(context))
765 }
766
767 #[cfg(feature = "sql")]
768 fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
769 where
770 E: EntityKind<Canister = C>,
771 {
772 let cache_key = (self.db.cache_scope_id(), E::PATH);
773 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
774 cache.borrow_mut().remove(&cache_key);
775 });
776 }
777
778 fn load_accepted_schema_snapshot_for_query<E>(
779 &self,
780 ) -> Result<AcceptedSchemaSnapshot, InternalError>
781 where
782 E: EntityKind<Canister = C>,
783 {
784 let store = self.db.recovered_store(E::Store::PATH)?;
785
786 store.with_schema_mut(|schema_store| {
787 if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
788 let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
789 if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
790 &accepted,
791 E::MODEL,
792 )
793 .is_ok()
794 {
795 return Ok(accepted);
796 }
797 }
798
799 ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
800 })
801 }
802
803 pub(in crate::db) fn accepted_schema_info_for_entity<E>(
807 &self,
808 ) -> Result<SchemaInfo, InternalError>
809 where
810 E: EntityKind<Canister = C>,
811 {
812 let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
813
814 Ok(catalog.accepted_schema_info_for::<E>())
815 }
816
817 #[cfg(feature = "sql")]
821 pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
822 accepted_schema: &AcceptedSchemaSnapshot,
823 ) -> Result<EntityAuthority, InternalError>
824 where
825 E: EntityKind<Canister = C>,
826 {
827 EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
828 }
829
830 fn ensure_generated_compatible_accepted_save_schema<E>(
835 &self,
836 ) -> Result<
837 (
838 AcceptedRowDecodeContract,
839 SchemaInfo,
840 CommitSchemaFingerprint,
841 ),
842 InternalError,
843 >
844 where
845 E: EntityKind<Canister = C>,
846 {
847 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
848 let (accepted_row_layout, _) =
849 AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
850 &accepted_schema,
851 E::MODEL,
852 )?;
853 let (row_decode_contract, _, schema_info, schema_fingerprint) =
854 accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
855
856 Ok((row_decode_contract, schema_info, schema_fingerprint))
857 }
858
859 pub fn storage_report(
861 &self,
862 name_to_path: &[(&'static str, &'static str)],
863 ) -> Result<StorageReport, InternalError> {
864 self.db.storage_report(name_to_path)
865 }
866
867 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
869 self.db.storage_report_default()
870 }
871
872 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
874 self.db.integrity_report()
875 }
876
877 #[must_use]
882 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
883 where
884 E: EntityKind<Canister = C> + EntityValue,
885 {
886 LoadExecutor::new(self.db, self.debug)
887 }
888
889 #[must_use]
890 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
891 where
892 E: PersistedRow<Canister = C> + EntityValue,
893 {
894 DeleteExecutor::new(self.db)
895 }
896
897 #[must_use]
898 pub(in crate::db) const fn save_executor<E>(
899 &self,
900 accepted_row_decode_contract: AcceptedRowDecodeContract,
901 accepted_schema_info: SchemaInfo,
902 accepted_schema_fingerprint: CommitSchemaFingerprint,
903 ) -> SaveExecutor<E>
904 where
905 E: PersistedRow<Canister = C> + EntityValue,
906 {
907 SaveExecutor::new_with_accepted_contract(
908 self.db,
909 self.debug,
910 accepted_row_decode_contract,
911 accepted_schema_info,
912 accepted_schema_fingerprint,
913 )
914 }
915}