1mod query;
7mod response;
8#[cfg(feature = "sql")]
9mod sql;
10#[cfg(all(test, feature = "sql"))]
14mod tests;
15mod write;
16
17#[cfg(any(test, feature = "sql-explain"))]
18use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
19use crate::{
20 db::{
21 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
22 FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
23 StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
24 commit::CommitSchemaFingerprint,
25 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
26 schema::{
27 AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
28 AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
29 accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
30 describe_entity_fields, describe_entity_fields_with_persisted_schema,
31 describe_entity_model, describe_entity_model_with_persisted_schema,
32 ensure_accepted_schema_snapshot, show_indexes_for_model,
33 show_indexes_for_model_with_runtime_state,
34 show_indexes_for_schema_info_with_runtime_state,
35 },
36 },
37 error::InternalError,
38 metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
39 model::entity::EntityModel,
40 traits::{CanisterKind, EntityKind, EntityValue, Path},
41 value::Value,
42};
43use std::{
44 cell::{OnceCell, RefCell},
45 collections::HashMap,
46 thread::LocalKey,
47};
48
49#[cfg(feature = "diagnostics")]
50pub use query::{
51 DirectDataRowAttribution, FluentTerminalExecutionAttribution, GroupedCountAttribution,
52 GroupedExecutionAttribution, KernelRowAttribution, QueryExecutionAttribution,
53 ScalarAggregateAttribution,
54};
55pub(in crate::db) use response::finalize_scalar_paged_execution;
56pub(in crate::db) use response::finalize_structural_grouped_projection_result;
57#[cfg(feature = "sql")]
58pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
59#[cfg(feature = "sql")]
60pub use sql::{
61 SqlAdminBulkDeletePlan, SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind,
62 SqlDdlPreparationReport, SqlDeleteExposurePolicy, SqlDeletePolicyContext,
63 SqlDeletePolicyRejection, SqlDeletePolicyReport, SqlDeleteStatementClassification,
64 SqlPublicBoundedDeletePlan, SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyDeletePlan,
65 SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentDeletePlan, SqlSessionCurrentUpdatePlan,
66 SqlStatementDispatch, SqlStatementResult, SqlStatementShellSurface, SqlStatementSurface,
67 SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy, SqlUpdatePolicyContext,
68 SqlUpdatePolicyRejection, SqlUpdatePolicyReport, SqlUpdateStatementClassification,
69 SqlValidatedDeletePlan, SqlValidatedUpdatePlan, SqlWriteExecutionBounds, SqlWriteOrderProof,
70 SqlWriteReturningBounds, SqlWriteReturningShape, SqlWriteStatementShape, SqlWriteWhereProof,
71 classify_sql_delete_policy, classify_sql_update_policy, sql_statement_dispatch,
72 sql_statement_entity_name, sql_statement_shell_surface, sql_statement_surface,
73};
74#[cfg(all(feature = "sql", feature = "diagnostics"))]
75pub use sql::{
76 SqlCompileAttribution, SqlExecutionAttribution, SqlHybridCoveringAttribution,
77 SqlOutputBlobAttribution, SqlPureCoveringAttribution, SqlQueryCacheAttribution,
78 SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
79};
80#[cfg(all(feature = "sql", feature = "diagnostics"))]
81pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
82
83pub struct DbSession<C: CanisterKind> {
90 db: Db<C>,
91 debug: bool,
92 metrics: Option<&'static dyn MetricsSink>,
93}
94
95#[cfg(test)]
96#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
97pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
98 pub schema_info_projections: u64,
99 pub persisted_schema_decodes: u64,
100 pub generated_compatible_row_layout_proofs: u64,
101 pub latest_by_entity_calls: u64,
102 pub visible_index_projections: u64,
103}
104
105#[derive(Clone, Debug)]
106struct AcceptedSchemaQueryCacheEntry {
107 snapshot: AcceptedSchemaSnapshot,
108 identity: AcceptedCatalogIdentity,
109}
110
111pub(in crate::db) type AcceptedSaveContract = (
112 AcceptedRowDecodeContract,
113 AcceptedRowDecodeContract,
114 SchemaInfo,
115 CommitSchemaFingerprint,
116);
117
118#[derive(Clone, Debug)]
119pub(in crate::db) struct AcceptedSchemaCatalogContext {
120 snapshot: AcceptedSchemaSnapshot,
121 identity: AcceptedCatalogIdentity,
122 schema_info: OnceCell<SchemaInfo>,
123}
124
125impl AcceptedSchemaCatalogContext {
126 #[must_use]
127 pub(in crate::db) const fn new(
128 snapshot: AcceptedSchemaSnapshot,
129 identity: AcceptedCatalogIdentity,
130 ) -> Self {
131 Self {
132 snapshot,
133 identity,
134 schema_info: OnceCell::new(),
135 }
136 }
137
138 #[must_use]
139 pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
140 &self.snapshot
141 }
142
143 #[must_use]
144 pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
145 self.identity.accepted_schema_version()
146 }
147
148 #[must_use]
149 pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
150 self.identity.accepted_schema_fingerprint()
151 }
152
153 #[must_use]
154 pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
155 self.identity.fingerprint_method_version()
156 }
157
158 #[must_use]
159 #[cfg(feature = "sql")]
160 pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
161 self.identity
162 }
163
164 fn debug_assert_matches_entity<E>(&self)
165 where
166 E: EntityKind,
167 {
168 debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
169 debug_assert_eq!(self.identity.entity_path(), E::PATH);
170 debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
171 }
172
173 pub(in crate::db) fn accepted_entity_authority_for<E>(
174 &self,
175 ) -> Result<EntityAuthority, InternalError>
176 where
177 E: EntityKind,
178 {
179 let schema_info = self.accepted_schema_info_for::<E>();
180
181 self.accepted_entity_authority_for_schema_info::<E>(schema_info)
182 }
183
184 fn accepted_entity_authority_for_schema_info<E>(
185 &self,
186 schema_info: SchemaInfo,
187 ) -> Result<EntityAuthority, InternalError>
188 where
189 E: EntityKind,
190 {
191 self.debug_assert_matches_entity::<E>();
192 let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
193 let (accepted_row_layout, row_proof) =
194 AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
195 &self.snapshot,
196 authority.model(),
197 )?;
198 let row_decode_contract = accepted_row_layout.row_decode_contract();
199
200 Ok(
201 authority.with_accepted_row_decode_contract(
202 row_proof,
203 row_decode_contract,
204 schema_info,
205 ),
206 )
207 }
208
209 #[cfg(feature = "sql")]
210 pub(in crate::db) fn accepted_entity_authority_and_schema_info_for<E>(
211 &self,
212 ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
213 where
214 E: EntityKind,
215 {
216 let schema_info = self.accepted_schema_info_for::<E>();
217 let authority = self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?;
218
219 Ok((authority, schema_info))
220 }
221
222 #[cfg(feature = "sql")]
223 pub(in crate::db) fn accepted_or_provided_entity_authority_for<E>(
224 &self,
225 accepted_authority: Option<&EntityAuthority>,
226 ) -> Result<EntityAuthority, InternalError>
227 where
228 E: EntityKind,
229 {
230 match accepted_authority {
231 Some(authority) => Ok(authority.clone()),
232 None => self.accepted_entity_authority_for::<E>(),
233 }
234 }
235
236 #[cfg(feature = "sql-explain")]
237 pub(in crate::db) fn accepted_or_provided_entity_authority_and_schema_info_for<E>(
238 &self,
239 accepted_authority: Option<&EntityAuthority>,
240 ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
241 where
242 E: EntityKind,
243 {
244 let schema_info = self.accepted_schema_info_for::<E>();
245 let authority = match accepted_authority {
246 Some(authority) => authority.clone(),
247 None => self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?,
248 };
249
250 Ok((authority, schema_info))
251 }
252
253 #[must_use]
254 pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
255 where
256 E: EntityKind,
257 {
258 self.debug_assert_matches_entity::<E>();
259 self.schema_info
260 .get_or_init(|| {
261 SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
262 E::MODEL,
263 &self.snapshot,
264 true,
265 )
266 })
267 .clone()
268 }
269}
270
271pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
272 accepted_schema: &AcceptedSchemaSnapshot,
273 descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
274) -> Result<AcceptedSaveContract, InternalError>
275where
276 E: EntityKind,
277{
278 let row_decode_contract = descriptor.row_decode_contract();
279 let mutation_row_decode_contract = row_decode_contract.clone();
280 let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
281 let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
282
283 Ok((
284 row_decode_contract,
285 mutation_row_decode_contract,
286 schema_info,
287 schema_fingerprint,
288 ))
289}
290
291thread_local! {
292 static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
297 RefCell::new(HashMap::default());
298}
299
300impl<C: CanisterKind> DbSession<C> {
301 #[must_use]
303 pub(crate) const fn new(db: Db<C>) -> Self {
304 Self {
305 db,
306 debug: false,
307 metrics: None,
308 }
309 }
310
311 #[must_use]
313 pub const fn new_with_hooks(
314 store: &'static LocalKey<StoreRegistry>,
315 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
316 ) -> Self {
317 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
318 }
319
320 #[cfg(test)]
321 pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
322 crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
323 crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
324 crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
325 crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
326 query::reset_visible_index_projection_count_for_tests();
327 }
328
329 #[cfg(test)]
330 pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
331 -> AcceptedCatalogRuntimeCounterSnapshot {
332 AcceptedCatalogRuntimeCounterSnapshot {
333 schema_info_projections:
334 crate::db::schema::accepted_schema_info_projection_count_for_tests(),
335 persisted_schema_decodes:
336 crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
337 generated_compatible_row_layout_proofs:
338 crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
339 latest_by_entity_calls:
340 crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
341 visible_index_projections: query::visible_index_projection_count_for_tests(),
342 }
343 }
344
345 #[must_use]
347 pub const fn debug(mut self) -> Self {
348 self.debug = true;
349 self
350 }
351
352 #[must_use]
354 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
355 self.metrics = Some(sink);
356 self
357 }
358
359 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
362 where
363 E: EntityKind<Canister = C>,
364 {
365 FluentLoadQuery::new(self, Query::new(consistency))
366 }
367
368 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
372 where
373 E: PersistedRow<Canister = C>,
374 {
375 FluentDeleteQuery::new(self, Query::new(consistency).delete())
376 }
377
378 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
379 if let Some(sink) = self.metrics {
380 with_metrics_sink(sink, f)
381 } else {
382 f()
383 }
384 }
385
386 fn execute_save_with<E, T, R>(
388 &self,
389 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
390 map: impl FnOnce(T) -> R,
391 ) -> Result<R, InternalError>
392 where
393 E: PersistedRow<Canister = C> + EntityValue,
394 {
395 let (contract, schema_info, schema_fingerprint) = match self
396 .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
397 {
398 Ok(authority) => authority,
399 Err(error) => {
400 self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
401
402 return Err(error);
403 }
404 };
405 let value = self.with_metrics(|| {
406 op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
407 })?;
408
409 Ok(map(value))
410 }
411
412 fn execute_save_with_checked_accepted_row_contract<E, T, R>(
417 &self,
418 accepted_row_decode_contract: AcceptedRowDecodeContract,
419 accepted_schema_info: SchemaInfo,
420 accepted_schema_fingerprint: CommitSchemaFingerprint,
421 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
422 map: impl FnOnce(T) -> R,
423 ) -> Result<R, InternalError>
424 where
425 E: PersistedRow<Canister = C> + EntityValue,
426 {
427 let value = self.with_metrics(|| {
428 op(self.save_executor::<E>(
429 accepted_row_decode_contract,
430 accepted_schema_info,
431 accepted_schema_fingerprint,
432 ))
433 })?;
434
435 Ok(map(value))
436 }
437
438 fn execute_save_entity<E>(
440 &self,
441 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
442 ) -> Result<E, InternalError>
443 where
444 E: PersistedRow<Canister = C> + EntityValue,
445 {
446 self.execute_save_with(op, std::convert::identity)
447 }
448
449 fn execute_save_batch<E>(
450 &self,
451 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
452 ) -> Result<WriteBatchResponse<E>, InternalError>
453 where
454 E: PersistedRow<Canister = C> + EntityValue,
455 {
456 self.execute_save_with(op, WriteBatchResponse::new)
457 }
458
459 #[must_use]
465 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
466 where
467 E: EntityKind<Canister = C>,
468 {
469 self.fluent_load_query(MissingRowPolicy::Ignore)
470 }
471
472 #[must_use]
474 pub const fn load_with_consistency<E>(
475 &self,
476 consistency: MissingRowPolicy,
477 ) -> FluentLoadQuery<'_, E>
478 where
479 E: EntityKind<Canister = C>,
480 {
481 self.fluent_load_query(consistency)
482 }
483
484 #[must_use]
486 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
487 where
488 E: PersistedRow<Canister = C>,
489 {
490 self.fluent_delete_query(MissingRowPolicy::Ignore)
491 }
492
493 #[must_use]
495 pub fn delete_with_consistency<E>(
496 &self,
497 consistency: MissingRowPolicy,
498 ) -> FluentDeleteQuery<'_, E>
499 where
500 E: PersistedRow<Canister = C>,
501 {
502 self.fluent_delete_query(consistency)
503 }
504
505 #[must_use]
509 pub const fn select_one(&self) -> Value {
510 Value::Int64(1)
511 }
512
513 #[must_use]
520 pub fn show_indexes<E>(&self) -> Vec<String>
521 where
522 E: EntityKind<Canister = C>,
523 {
524 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
525 }
526
527 #[must_use]
533 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
534 show_indexes_for_model(model)
535 }
536
537 pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
542 where
543 E: EntityKind<Canister = C>,
544 {
545 let schema = self.accepted_schema_info_for_entity::<E>()?;
546
547 Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
548 }
549
550 pub(in crate::db) fn show_indexes_for_store_model(
554 &self,
555 store_path: &str,
556 model: &'static EntityModel,
557 ) -> Vec<String> {
558 let runtime_state = self
559 .db
560 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
561 .map(|store| store.index_state());
562
563 show_indexes_for_model_with_runtime_state(model, runtime_state)
564 }
565
566 pub(in crate::db) fn show_indexes_for_store_schema_info(
570 &self,
571 store_path: &str,
572 schema: &SchemaInfo,
573 ) -> Vec<String> {
574 let runtime_state = self
575 .db
576 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
577 .map(|store| store.index_state());
578
579 show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
580 }
581
582 #[must_use]
588 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
589 where
590 E: EntityKind<Canister = C>,
591 {
592 self.show_columns_for_model(E::MODEL)
593 }
594
595 #[must_use]
597 pub fn show_columns_for_model(
598 &self,
599 model: &'static EntityModel,
600 ) -> Vec<EntityFieldDescription> {
601 describe_entity_fields(model)
602 }
603
604 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
610 where
611 E: EntityKind<Canister = C>,
612 {
613 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
614
615 Ok(describe_entity_fields_with_persisted_schema(&snapshot))
616 }
617
618 #[must_use]
625 pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
626 self.try_show_entities().expect("session invariant")
627 }
628
629 pub fn try_show_entities(
631 &self,
632 ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
633 self.db.runtime_entity_catalog()
634 }
635
636 #[must_use]
638 pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
639 self.db.runtime_store_catalog()
640 }
641
642 #[must_use]
644 pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
645 self.db.runtime_memory_catalog()
646 }
647
648 #[cfg(any(test, feature = "sql-explain"))]
651 fn visible_indexes_for_store_accepted_schema(
652 &self,
653 store_path: &str,
654 schema_info: &SchemaInfo,
655 ) -> Result<VisibleIndexes<'static>, QueryError> {
656 let store = self
659 .db
660 .recovered_store(store_path)
661 .map_err(QueryError::execute)?;
662 let state = store.index_state();
663 if state != IndexState::Ready {
664 return Ok(VisibleIndexes::none());
665 }
666 debug_assert_eq!(state, IndexState::Ready);
667
668 let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
671 debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
672 debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
673 debug_assert_eq!(
674 visible_indexes.accepted_expression_index_count(),
675 Some(visible_indexes.accepted_expression_indexes().len()),
676 );
677
678 Ok(visible_indexes)
679 }
680
681 #[must_use]
687 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
688 where
689 E: EntityKind<Canister = C>,
690 {
691 self.describe_entity_model(E::MODEL)
692 }
693
694 #[must_use]
696 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
697 describe_entity_model(model)
698 }
699
700 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
706 where
707 E: EntityKind<Canister = C>,
708 {
709 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
710
711 Ok(describe_entity_model_with_persisted_schema(
712 E::MODEL,
713 &snapshot,
714 ))
715 }
716
717 fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
721 where
722 E: EntityKind<Canister = C>,
723 {
724 let store = self.db.recovered_store(E::Store::PATH)?;
725
726 store.with_schema_mut(|schema_store| {
727 ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
728 })
729 }
730
731 pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
736 &self,
737 ) -> Result<AcceptedSchemaCatalogContext, InternalError>
738 where
739 E: EntityKind<Canister = C>,
740 {
741 let cache_key = (self.db.cache_scope_id(), E::PATH);
742 if let Some(entry) =
743 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
744 {
745 return Ok(AcceptedSchemaCatalogContext::new(
746 entry.snapshot,
747 entry.identity,
748 ));
749 }
750
751 let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
752 let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
753 let identity = AcceptedCatalogIdentity::new(
754 E::ENTITY_TAG,
755 E::PATH,
756 E::Store::PATH,
757 snapshot.persisted_snapshot().version(),
758 fingerprint,
759 );
760 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
761 cache.borrow_mut().insert(
762 cache_key,
763 AcceptedSchemaQueryCacheEntry {
764 snapshot: snapshot.clone(),
765 identity,
766 },
767 );
768 });
769
770 Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
771 }
772
773 pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
774 &self,
775 ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
776 where
777 E: EntityKind<Canister = C>,
778 {
779 let store = self.db.recovered_store(E::Store::PATH)?;
780
781 store.with_schema_mut(|schema_store| {
782 schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
783 })
784 }
785
786 pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
787 &self,
788 selection: &AcceptedCatalogSnapshotSelection,
789 ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
790 where
791 E: EntityKind<Canister = C>,
792 {
793 let cache_key = (self.db.cache_scope_id(), E::PATH);
794 if let Some(entry) =
795 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
796 && entry.identity == selection.identity()
797 {
798 return Ok(Some(AcceptedSchemaCatalogContext::new(
799 entry.snapshot,
800 entry.identity,
801 )));
802 }
803
804 let snapshot = selection.decode_verified()?;
805 if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
806 return Ok(None);
807 }
808 let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
809
810 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
811 cache.borrow_mut().insert(
812 cache_key,
813 AcceptedSchemaQueryCacheEntry {
814 snapshot,
815 identity: selection.identity(),
816 },
817 );
818 });
819
820 Ok(Some(context))
821 }
822
823 #[cfg(feature = "sql")]
824 fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
825 where
826 E: EntityKind<Canister = C>,
827 {
828 let cache_key = (self.db.cache_scope_id(), E::PATH);
829 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
830 cache.borrow_mut().remove(&cache_key);
831 });
832 }
833
834 fn load_accepted_schema_snapshot_for_query<E>(
835 &self,
836 ) -> Result<AcceptedSchemaSnapshot, InternalError>
837 where
838 E: EntityKind<Canister = C>,
839 {
840 let store = self.db.recovered_store(E::Store::PATH)?;
841
842 store.with_schema_mut(|schema_store| {
843 if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
844 let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
845 if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
846 &accepted,
847 E::MODEL,
848 )
849 .is_ok()
850 {
851 return Ok(accepted);
852 }
853 }
854
855 ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
856 })
857 }
858
859 pub(in crate::db) fn accepted_schema_info_for_entity<E>(
863 &self,
864 ) -> Result<SchemaInfo, InternalError>
865 where
866 E: EntityKind<Canister = C>,
867 {
868 let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
869
870 Ok(catalog.accepted_schema_info_for::<E>())
871 }
872
873 #[cfg(feature = "sql")]
877 pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
878 accepted_schema: &AcceptedSchemaSnapshot,
879 ) -> Result<EntityAuthority, InternalError>
880 where
881 E: EntityKind<Canister = C>,
882 {
883 EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
884 }
885
886 fn ensure_generated_compatible_accepted_save_schema<E>(
891 &self,
892 ) -> Result<
893 (
894 AcceptedRowDecodeContract,
895 SchemaInfo,
896 CommitSchemaFingerprint,
897 ),
898 InternalError,
899 >
900 where
901 E: EntityKind<Canister = C>,
902 {
903 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
904 let (accepted_row_layout, _) =
905 AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
906 &accepted_schema,
907 E::MODEL,
908 )?;
909 let (row_decode_contract, _, schema_info, schema_fingerprint) =
910 accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
911
912 Ok((row_decode_contract, schema_info, schema_fingerprint))
913 }
914
915 pub fn storage_report(
917 &self,
918 name_to_path: &[(&'static str, &'static str)],
919 ) -> Result<StorageReport, InternalError> {
920 self.db.storage_report(name_to_path)
921 }
922
923 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
925 self.db.storage_report_default()
926 }
927
928 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
930 self.db.integrity_report()
931 }
932
933 #[must_use]
938 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
939 where
940 E: EntityKind<Canister = C> + EntityValue,
941 {
942 LoadExecutor::new(self.db, self.debug)
943 }
944
945 #[must_use]
946 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
947 where
948 E: PersistedRow<Canister = C> + EntityValue,
949 {
950 DeleteExecutor::new(self.db)
951 }
952
953 #[must_use]
954 pub(in crate::db) const fn save_executor<E>(
955 &self,
956 accepted_row_decode_contract: AcceptedRowDecodeContract,
957 accepted_schema_info: SchemaInfo,
958 accepted_schema_fingerprint: CommitSchemaFingerprint,
959 ) -> SaveExecutor<E>
960 where
961 E: PersistedRow<Canister = C> + EntityValue,
962 {
963 SaveExecutor::new_with_accepted_contract(
964 self.db,
965 self.debug,
966 accepted_row_decode_contract,
967 accepted_schema_info,
968 accepted_schema_fingerprint,
969 )
970 }
971}