1mod bounded_cache;
7mod query;
8mod response;
9#[cfg(feature = "sql")]
10mod sql;
11#[cfg(all(test, feature = "sql"))]
15mod tests;
16mod write;
17
18#[cfg(any(test, feature = "sql-explain"))]
19use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
20use crate::{
21 db::{
22 Db, EntityFieldDescription, EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery,
23 FluentLoadQuery, IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
24 StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
25 commit::CommitSchemaFingerprint,
26 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
27 schema::{
28 AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
29 AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, SchemaInfo, SchemaVersion,
30 accepted_commit_schema_fingerprint, accepted_schema_cache_fingerprint,
31 describe_entity_fields, describe_entity_fields_with_persisted_schema,
32 describe_entity_model, describe_entity_model_with_persisted_schema,
33 ensure_accepted_schema_snapshot, show_indexes_for_model,
34 show_indexes_for_model_with_runtime_state,
35 show_indexes_for_schema_info_with_runtime_state,
36 },
37 },
38 error::InternalError,
39 metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
40 model::entity::EntityModel,
41 traits::{CanisterKind, EntityKind, EntityValue, Path},
42 value::Value,
43};
44use std::{
45 cell::{OnceCell, RefCell},
46 collections::HashMap,
47 thread::LocalKey,
48};
49
50#[cfg(feature = "diagnostics")]
51pub use query::{
52 DirectDataRowAttribution, FluentTerminalExecutionAttribution, GroupedCountAttribution,
53 GroupedExecutionAttribution, KernelRowAttribution, QueryExecutionAttribution,
54 ScalarAggregateAttribution,
55};
56pub(in crate::db) use response::finalize_scalar_paged_execution;
57pub(in crate::db) use response::finalize_structural_grouped_projection_result;
58#[cfg(feature = "sql")]
59pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
60#[cfg(feature = "sql")]
61pub use sql::{
62 SqlAdminBulkDeletePlan, SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind,
63 SqlDdlPreparationReport, SqlDeleteExposurePolicy, SqlDeletePolicyContext,
64 SqlDeletePolicyRejection, SqlDeletePolicyReport, SqlDeleteStatementClassification,
65 SqlPublicBoundedDeletePlan, SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyDeletePlan,
66 SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentDeletePlan, SqlSessionCurrentUpdatePlan,
67 SqlStatementDispatch, SqlStatementResult, SqlStatementShellSurface, SqlStatementSurface,
68 SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy, SqlUpdatePolicyContext,
69 SqlUpdatePolicyRejection, SqlUpdatePolicyReport, SqlUpdateStatementClassification,
70 SqlValidatedDeletePlan, SqlValidatedUpdatePlan, SqlWriteExecutionBounds, SqlWriteOrderProof,
71 SqlWriteReturningBounds, SqlWriteReturningShape, SqlWriteStatementShape, SqlWriteWhereProof,
72 classify_sql_delete_policy, classify_sql_update_policy, sql_statement_dispatch,
73 sql_statement_entity_name, sql_statement_shell_surface, sql_statement_surface,
74};
75#[cfg(all(feature = "sql", feature = "diagnostics"))]
76pub use sql::{
77 SqlCompileAttribution, SqlExecutionAttribution, SqlHybridCoveringAttribution,
78 SqlOutputBlobAttribution, SqlPureCoveringAttribution, SqlQueryCacheAttribution,
79 SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
80};
81#[cfg(all(feature = "sql", feature = "diagnostics"))]
82pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
83
84pub struct DbSession<C: CanisterKind> {
91 db: Db<C>,
92 debug: bool,
93 metrics: Option<&'static dyn MetricsSink>,
94}
95
96#[cfg(test)]
97#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
98pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
99 pub schema_info_projections: u64,
100 pub persisted_schema_decodes: u64,
101 pub generated_compatible_row_layout_proofs: u64,
102 pub latest_by_entity_calls: u64,
103 pub visible_index_projections: u64,
104}
105
106#[derive(Clone, Debug)]
107struct AcceptedSchemaQueryCacheEntry {
108 snapshot: AcceptedSchemaSnapshot,
109 identity: AcceptedCatalogIdentity,
110}
111
112pub(in crate::db) type AcceptedSaveContract = (
113 AcceptedRowDecodeContract,
114 AcceptedRowDecodeContract,
115 SchemaInfo,
116 CommitSchemaFingerprint,
117);
118
119#[derive(Clone, Debug)]
120pub(in crate::db) struct AcceptedSchemaCatalogContext {
121 snapshot: AcceptedSchemaSnapshot,
122 identity: AcceptedCatalogIdentity,
123 schema_info: OnceCell<SchemaInfo>,
124}
125
126impl AcceptedSchemaCatalogContext {
127 #[must_use]
128 pub(in crate::db) const fn new(
129 snapshot: AcceptedSchemaSnapshot,
130 identity: AcceptedCatalogIdentity,
131 ) -> Self {
132 Self {
133 snapshot,
134 identity,
135 schema_info: OnceCell::new(),
136 }
137 }
138
139 #[must_use]
140 pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
141 &self.snapshot
142 }
143
144 #[must_use]
145 pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
146 self.identity.accepted_schema_version()
147 }
148
149 #[must_use]
150 pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
151 self.identity.accepted_schema_fingerprint()
152 }
153
154 #[must_use]
155 pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
156 self.identity.fingerprint_method_version()
157 }
158
159 #[must_use]
160 #[cfg(feature = "sql")]
161 pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
162 self.identity
163 }
164
165 fn debug_assert_matches_entity<E>(&self)
166 where
167 E: EntityKind,
168 {
169 debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
170 debug_assert_eq!(self.identity.entity_path(), E::PATH);
171 debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
172 }
173
174 pub(in crate::db) fn accepted_entity_authority_for<E>(
175 &self,
176 ) -> Result<EntityAuthority, InternalError>
177 where
178 E: EntityKind,
179 {
180 let schema_info = self.accepted_schema_info_for::<E>();
181
182 self.accepted_entity_authority_for_schema_info::<E>(schema_info)
183 }
184
185 fn accepted_entity_authority_for_schema_info<E>(
186 &self,
187 schema_info: SchemaInfo,
188 ) -> Result<EntityAuthority, InternalError>
189 where
190 E: EntityKind,
191 {
192 self.debug_assert_matches_entity::<E>();
193 let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
194 let (accepted_row_layout, row_proof) =
195 AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
196 &self.snapshot,
197 authority.model(),
198 )?;
199 let row_decode_contract = accepted_row_layout.row_decode_contract();
200
201 Ok(
202 authority.with_accepted_row_decode_contract(
203 row_proof,
204 row_decode_contract,
205 schema_info,
206 ),
207 )
208 }
209
210 #[cfg(feature = "sql")]
211 pub(in crate::db) fn accepted_entity_authority_and_schema_info_for<E>(
212 &self,
213 ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
214 where
215 E: EntityKind,
216 {
217 let schema_info = self.accepted_schema_info_for::<E>();
218 let authority = self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?;
219
220 Ok((authority, schema_info))
221 }
222
223 #[cfg(feature = "sql")]
224 pub(in crate::db) fn accepted_or_provided_entity_authority_for<E>(
225 &self,
226 accepted_authority: Option<&EntityAuthority>,
227 ) -> Result<EntityAuthority, InternalError>
228 where
229 E: EntityKind,
230 {
231 match accepted_authority {
232 Some(authority) => Ok(authority.clone()),
233 None => self.accepted_entity_authority_for::<E>(),
234 }
235 }
236
237 #[cfg(feature = "sql-explain")]
238 pub(in crate::db) fn accepted_or_provided_entity_authority_and_schema_info_for<E>(
239 &self,
240 accepted_authority: Option<&EntityAuthority>,
241 ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
242 where
243 E: EntityKind,
244 {
245 let schema_info = self.accepted_schema_info_for::<E>();
246 let authority = match accepted_authority {
247 Some(authority) => authority.clone(),
248 None => self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?,
249 };
250
251 Ok((authority, schema_info))
252 }
253
254 #[must_use]
255 pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
256 where
257 E: EntityKind,
258 {
259 self.debug_assert_matches_entity::<E>();
260 self.schema_info
261 .get_or_init(|| {
262 SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
263 E::MODEL,
264 &self.snapshot,
265 true,
266 )
267 })
268 .clone()
269 }
270}
271
272pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
273 accepted_schema: &AcceptedSchemaSnapshot,
274 descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
275) -> Result<AcceptedSaveContract, InternalError>
276where
277 E: EntityKind,
278{
279 let row_decode_contract = descriptor.row_decode_contract();
280 let mutation_row_decode_contract = row_decode_contract.clone();
281 let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
282 let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
283
284 Ok((
285 row_decode_contract,
286 mutation_row_decode_contract,
287 schema_info,
288 schema_fingerprint,
289 ))
290}
291
292thread_local! {
293 static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
298 RefCell::new(HashMap::default());
299}
300
301impl<C: CanisterKind> DbSession<C> {
302 #[must_use]
304 pub(crate) const fn new(db: Db<C>) -> Self {
305 Self {
306 db,
307 debug: false,
308 metrics: None,
309 }
310 }
311
312 #[must_use]
314 pub const fn new_with_hooks(
315 store: &'static LocalKey<StoreRegistry>,
316 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
317 ) -> Self {
318 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
319 }
320
321 #[cfg(test)]
322 pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
323 crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
324 crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
325 crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
326 crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
327 query::reset_visible_index_projection_count_for_tests();
328 }
329
330 #[cfg(test)]
331 pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
332 -> AcceptedCatalogRuntimeCounterSnapshot {
333 AcceptedCatalogRuntimeCounterSnapshot {
334 schema_info_projections:
335 crate::db::schema::accepted_schema_info_projection_count_for_tests(),
336 persisted_schema_decodes:
337 crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
338 generated_compatible_row_layout_proofs:
339 crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
340 latest_by_entity_calls:
341 crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
342 visible_index_projections: query::visible_index_projection_count_for_tests(),
343 }
344 }
345
346 #[must_use]
348 pub const fn debug(mut self) -> Self {
349 self.debug = true;
350 self
351 }
352
353 #[must_use]
355 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
356 self.metrics = Some(sink);
357 self
358 }
359
360 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
363 where
364 E: EntityKind<Canister = C>,
365 {
366 FluentLoadQuery::new(self, Query::new(consistency))
367 }
368
369 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
373 where
374 E: PersistedRow<Canister = C>,
375 {
376 FluentDeleteQuery::new(self, Query::new(consistency).delete())
377 }
378
379 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
380 if let Some(sink) = self.metrics {
381 with_metrics_sink(sink, f)
382 } else {
383 f()
384 }
385 }
386
387 fn execute_save_with<E, T, R>(
389 &self,
390 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
391 map: impl FnOnce(T) -> R,
392 ) -> Result<R, InternalError>
393 where
394 E: PersistedRow<Canister = C> + EntityValue,
395 {
396 let (contract, schema_info, schema_fingerprint) = match self
397 .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
398 {
399 Ok(authority) => authority,
400 Err(error) => {
401 self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
402
403 return Err(error);
404 }
405 };
406 let value = self.with_metrics(|| {
407 op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
408 })?;
409
410 Ok(map(value))
411 }
412
413 fn execute_save_with_checked_accepted_row_contract<E, T, R>(
418 &self,
419 accepted_row_decode_contract: AcceptedRowDecodeContract,
420 accepted_schema_info: SchemaInfo,
421 accepted_schema_fingerprint: CommitSchemaFingerprint,
422 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
423 map: impl FnOnce(T) -> R,
424 ) -> Result<R, InternalError>
425 where
426 E: PersistedRow<Canister = C> + EntityValue,
427 {
428 let value = self.with_metrics(|| {
429 op(self.save_executor::<E>(
430 accepted_row_decode_contract,
431 accepted_schema_info,
432 accepted_schema_fingerprint,
433 ))
434 })?;
435
436 Ok(map(value))
437 }
438
439 fn execute_save_entity<E>(
441 &self,
442 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
443 ) -> Result<E, InternalError>
444 where
445 E: PersistedRow<Canister = C> + EntityValue,
446 {
447 self.execute_save_with(op, std::convert::identity)
448 }
449
450 fn execute_save_batch<E>(
451 &self,
452 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
453 ) -> Result<WriteBatchResponse<E>, InternalError>
454 where
455 E: PersistedRow<Canister = C> + EntityValue,
456 {
457 self.execute_save_with(op, WriteBatchResponse::new)
458 }
459
460 #[must_use]
466 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
467 where
468 E: EntityKind<Canister = C>,
469 {
470 self.fluent_load_query(MissingRowPolicy::Ignore)
471 }
472
473 #[must_use]
475 pub const fn load_with_consistency<E>(
476 &self,
477 consistency: MissingRowPolicy,
478 ) -> FluentLoadQuery<'_, E>
479 where
480 E: EntityKind<Canister = C>,
481 {
482 self.fluent_load_query(consistency)
483 }
484
485 #[must_use]
487 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
488 where
489 E: PersistedRow<Canister = C>,
490 {
491 self.fluent_delete_query(MissingRowPolicy::Ignore)
492 }
493
494 #[must_use]
496 pub fn delete_with_consistency<E>(
497 &self,
498 consistency: MissingRowPolicy,
499 ) -> FluentDeleteQuery<'_, E>
500 where
501 E: PersistedRow<Canister = C>,
502 {
503 self.fluent_delete_query(consistency)
504 }
505
506 #[must_use]
510 pub const fn select_one(&self) -> Value {
511 Value::Int64(1)
512 }
513
514 #[must_use]
521 pub fn show_indexes<E>(&self) -> Vec<String>
522 where
523 E: EntityKind<Canister = C>,
524 {
525 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
526 }
527
528 #[must_use]
534 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
535 show_indexes_for_model(model)
536 }
537
538 pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
543 where
544 E: EntityKind<Canister = C>,
545 {
546 let schema = self.accepted_schema_info_for_entity::<E>()?;
547
548 Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
549 }
550
551 pub(in crate::db) fn show_indexes_for_store_model(
555 &self,
556 store_path: &str,
557 model: &'static EntityModel,
558 ) -> Vec<String> {
559 let runtime_state = self
560 .db
561 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
562 .map(|store| store.index_state());
563
564 show_indexes_for_model_with_runtime_state(model, runtime_state)
565 }
566
567 pub(in crate::db) fn show_indexes_for_store_schema_info(
571 &self,
572 store_path: &str,
573 schema: &SchemaInfo,
574 ) -> Vec<String> {
575 let runtime_state = self
576 .db
577 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
578 .map(|store| store.index_state());
579
580 show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
581 }
582
583 #[must_use]
589 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
590 where
591 E: EntityKind<Canister = C>,
592 {
593 self.show_columns_for_model(E::MODEL)
594 }
595
596 #[must_use]
598 pub fn show_columns_for_model(
599 &self,
600 model: &'static EntityModel,
601 ) -> Vec<EntityFieldDescription> {
602 describe_entity_fields(model)
603 }
604
605 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
611 where
612 E: EntityKind<Canister = C>,
613 {
614 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
615
616 Ok(describe_entity_fields_with_persisted_schema(&snapshot))
617 }
618
619 #[must_use]
626 pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
627 self.try_show_entities().expect("session invariant")
628 }
629
630 pub fn try_show_entities(
632 &self,
633 ) -> Result<Vec<crate::db::EntityCatalogDescription>, InternalError> {
634 self.db.runtime_entity_catalog()
635 }
636
637 #[must_use]
639 pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
640 self.db.runtime_store_catalog()
641 }
642
643 #[must_use]
645 pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
646 self.db.runtime_memory_catalog()
647 }
648
649 #[cfg(any(test, feature = "sql-explain"))]
652 fn visible_indexes_for_store_accepted_schema(
653 &self,
654 store_path: &str,
655 schema_info: &SchemaInfo,
656 ) -> Result<VisibleIndexes<'static>, QueryError> {
657 let store = self
660 .db
661 .recovered_store(store_path)
662 .map_err(QueryError::execute)?;
663 let state = store.index_state();
664 if state != IndexState::Ready {
665 return Ok(VisibleIndexes::none());
666 }
667 debug_assert_eq!(state, IndexState::Ready);
668
669 let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
672 debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
673 debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
674 debug_assert_eq!(
675 visible_indexes.accepted_expression_index_count(),
676 Some(visible_indexes.accepted_expression_indexes().len()),
677 );
678
679 Ok(visible_indexes)
680 }
681
682 #[must_use]
688 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
689 where
690 E: EntityKind<Canister = C>,
691 {
692 self.describe_entity_model(E::MODEL)
693 }
694
695 #[must_use]
697 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
698 describe_entity_model(model)
699 }
700
701 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
707 where
708 E: EntityKind<Canister = C>,
709 {
710 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
711
712 Ok(describe_entity_model_with_persisted_schema(
713 E::MODEL,
714 &snapshot,
715 ))
716 }
717
718 fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
722 where
723 E: EntityKind<Canister = C>,
724 {
725 let store = self.db.recovered_store(E::Store::PATH)?;
726
727 store.with_schema_mut(|schema_store| {
728 ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
729 })
730 }
731
732 pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
737 &self,
738 ) -> Result<AcceptedSchemaCatalogContext, InternalError>
739 where
740 E: EntityKind<Canister = C>,
741 {
742 let cache_key = (self.db.cache_scope_id(), E::PATH);
743 if let Some(entry) =
744 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
745 {
746 return Ok(AcceptedSchemaCatalogContext::new(
747 entry.snapshot,
748 entry.identity,
749 ));
750 }
751
752 let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
753 let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
754 let identity = AcceptedCatalogIdentity::new(
755 E::ENTITY_TAG,
756 E::PATH,
757 E::Store::PATH,
758 snapshot.persisted_snapshot().version(),
759 fingerprint,
760 );
761 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
762 cache.borrow_mut().insert(
763 cache_key,
764 AcceptedSchemaQueryCacheEntry {
765 snapshot: snapshot.clone(),
766 identity,
767 },
768 );
769 });
770
771 Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
772 }
773
774 pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
775 &self,
776 ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
777 where
778 E: EntityKind<Canister = C>,
779 {
780 let store = self.db.recovered_store(E::Store::PATH)?;
781
782 store.with_schema_mut(|schema_store| {
783 schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
784 })
785 }
786
787 pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
788 &self,
789 selection: &AcceptedCatalogSnapshotSelection,
790 ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
791 where
792 E: EntityKind<Canister = C>,
793 {
794 let cache_key = (self.db.cache_scope_id(), E::PATH);
795 if let Some(entry) =
796 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
797 && entry.identity == selection.identity()
798 {
799 return Ok(Some(AcceptedSchemaCatalogContext::new(
800 entry.snapshot,
801 entry.identity,
802 )));
803 }
804
805 let snapshot = selection.decode_verified()?;
806 if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
807 return Ok(None);
808 }
809 let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
810
811 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
812 cache.borrow_mut().insert(
813 cache_key,
814 AcceptedSchemaQueryCacheEntry {
815 snapshot,
816 identity: selection.identity(),
817 },
818 );
819 });
820
821 Ok(Some(context))
822 }
823
824 #[cfg(feature = "sql")]
825 fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
826 where
827 E: EntityKind<Canister = C>,
828 {
829 let cache_key = (self.db.cache_scope_id(), E::PATH);
830 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
831 cache.borrow_mut().remove(&cache_key);
832 });
833 }
834
835 fn load_accepted_schema_snapshot_for_query<E>(
836 &self,
837 ) -> Result<AcceptedSchemaSnapshot, InternalError>
838 where
839 E: EntityKind<Canister = C>,
840 {
841 let store = self.db.recovered_store(E::Store::PATH)?;
842
843 store.with_schema_mut(|schema_store| {
844 if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
845 let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
846 if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
847 &accepted,
848 E::MODEL,
849 )
850 .is_ok()
851 {
852 return Ok(accepted);
853 }
854 }
855
856 ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
857 })
858 }
859
860 pub(in crate::db) fn accepted_schema_info_for_entity<E>(
864 &self,
865 ) -> Result<SchemaInfo, InternalError>
866 where
867 E: EntityKind<Canister = C>,
868 {
869 let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
870
871 Ok(catalog.accepted_schema_info_for::<E>())
872 }
873
874 #[cfg(feature = "sql")]
878 pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
879 accepted_schema: &AcceptedSchemaSnapshot,
880 ) -> Result<EntityAuthority, InternalError>
881 where
882 E: EntityKind<Canister = C>,
883 {
884 EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
885 }
886
887 fn ensure_generated_compatible_accepted_save_schema<E>(
892 &self,
893 ) -> Result<
894 (
895 AcceptedRowDecodeContract,
896 SchemaInfo,
897 CommitSchemaFingerprint,
898 ),
899 InternalError,
900 >
901 where
902 E: EntityKind<Canister = C>,
903 {
904 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
905 let (accepted_row_layout, _) =
906 AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
907 &accepted_schema,
908 E::MODEL,
909 )?;
910 let (row_decode_contract, _, schema_info, schema_fingerprint) =
911 accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
912
913 Ok((row_decode_contract, schema_info, schema_fingerprint))
914 }
915
916 pub fn storage_report(
918 &self,
919 name_to_path: &[(&'static str, &'static str)],
920 ) -> Result<StorageReport, InternalError> {
921 self.db.storage_report(name_to_path)
922 }
923
924 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
926 self.db.storage_report_default()
927 }
928
929 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
931 self.db.integrity_report()
932 }
933
934 #[must_use]
939 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
940 where
941 E: EntityKind<Canister = C> + EntityValue,
942 {
943 LoadExecutor::new(self.db, self.debug)
944 }
945
946 #[must_use]
947 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
948 where
949 E: PersistedRow<Canister = C> + EntityValue,
950 {
951 DeleteExecutor::new(self.db)
952 }
953
954 #[must_use]
955 pub(in crate::db) const fn save_executor<E>(
956 &self,
957 accepted_row_decode_contract: AcceptedRowDecodeContract,
958 accepted_schema_info: SchemaInfo,
959 accepted_schema_fingerprint: CommitSchemaFingerprint,
960 ) -> SaveExecutor<E>
961 where
962 E: PersistedRow<Canister = C> + EntityValue,
963 {
964 SaveExecutor::new_with_accepted_contract(
965 self.db,
966 self.debug,
967 accepted_row_decode_contract,
968 accepted_schema_info,
969 accepted_schema_fingerprint,
970 )
971 }
972}