1mod bounded_cache;
7mod query;
8mod response;
9#[cfg(feature = "sql")]
10mod sql;
11#[cfg(all(test, feature = "sql"))]
15mod tests;
16mod write;
17
18#[cfg(any(test, feature = "sql-explain"))]
19use crate::db::{IndexState, QueryError, query::plan::VisibleIndexes};
20use crate::{
21 db::{
22 Db, EntityCatalogCounts, EntityCatalogDescription, EntityFieldDescription,
23 EntityRuntimeHooks, EntitySchemaDescription, FluentDeleteQuery, FluentLoadQuery,
24 IntegrityReport, MissingRowPolicy, PersistedRow, Query, StorageReport,
25 StoreCatalogDescription, StoreRegistry, WriteBatchResponse,
26 commit::CommitSchemaFingerprint,
27 executor::{DeleteExecutor, EntityAuthority, LoadExecutor, SaveExecutor},
28 schema::{
29 AcceptedCatalogIdentity, AcceptedCatalogSnapshotSelection, AcceptedRowDecodeContract,
30 AcceptedRowLayoutRuntimeContract, AcceptedSchemaSnapshot, PersistedFieldKind,
31 PersistedFieldSnapshot, SchemaInfo, SchemaVersion, accepted_commit_schema_fingerprint,
32 accepted_schema_cache_fingerprint, describe_entity_fields,
33 describe_entity_fields_with_persisted_schema, describe_entity_model,
34 describe_entity_model_with_persisted_schema, ensure_accepted_schema_snapshot,
35 show_indexes_for_model, show_indexes_for_model_with_runtime_state,
36 show_indexes_for_schema_info_with_runtime_state,
37 },
38 },
39 error::InternalError,
40 metrics::sink::{ExecKind, MetricsSink, record_exec_error_for_path, with_metrics_sink},
41 model::entity::EntityModel,
42 traits::{CanisterKind, EntityKind, EntityValue, Path},
43 value::Value,
44};
45use std::{
46 cell::{OnceCell, RefCell},
47 collections::HashMap,
48 thread::LocalKey,
49};
50
51#[cfg(feature = "diagnostics")]
52pub use query::{
53 DirectDataRowAttribution, FluentTerminalExecutionAttribution, GroupedCountAttribution,
54 GroupedExecutionAttribution, KernelRowAttribution, QueryExecutionAttribution,
55 ScalarAggregateAttribution,
56};
57pub(in crate::db) use response::finalize_scalar_paged_execution;
58pub(in crate::db) use response::finalize_structural_grouped_projection_result;
59#[cfg(feature = "sql")]
60pub(in crate::db) use response::sql_grouped_cursor_from_bytes;
61#[cfg(feature = "sql")]
62pub use sql::{
63 SqlAdminBulkDeletePlan, SqlAdminBulkUpdatePlan, SqlDdlExecutionStatus, SqlDdlMutationKind,
64 SqlDdlPreparationReport, SqlDeleteExposurePolicy, SqlDeletePolicyContext,
65 SqlDeletePolicyRejection, SqlDeletePolicyReport, SqlDeleteStatementClassification,
66 SqlPublicBoundedDeletePlan, SqlPublicBoundedUpdatePlan, SqlPublicPrimaryKeyDeletePlan,
67 SqlPublicPrimaryKeyUpdatePlan, SqlSessionCurrentDeletePlan, SqlSessionCurrentUpdatePlan,
68 SqlStatementDispatch, SqlStatementResult, SqlStatementShellSurface, SqlStatementSurface,
69 SqlUpdateAssignmentPolicy, SqlUpdateExposurePolicy, SqlUpdatePolicyContext,
70 SqlUpdatePolicyRejection, SqlUpdatePolicyReport, SqlUpdateStatementClassification,
71 SqlValidatedDeletePlan, SqlValidatedUpdatePlan, SqlWriteExecutionBounds, SqlWriteOrderProof,
72 SqlWriteReturningBounds, SqlWriteReturningShape, SqlWriteStatementShape, SqlWriteWhereProof,
73 classify_sql_delete_policy, classify_sql_update_policy, sql_statement_dispatch,
74 sql_statement_entity_name, sql_statement_shell_surface, sql_statement_surface,
75};
76#[cfg(all(feature = "sql", feature = "diagnostics"))]
77pub use sql::{
78 SqlCompileAttribution, SqlExecutionAttribution, SqlHybridCoveringAttribution,
79 SqlOutputBlobAttribution, SqlPureCoveringAttribution, SqlQueryCacheAttribution,
80 SqlQueryExecutionAttribution, SqlScalarAggregateAttribution,
81};
82#[cfg(all(feature = "sql", feature = "diagnostics"))]
83pub use sql::{SqlProjectionMaterializationMetrics, with_sql_projection_materialization_metrics};
84
85pub struct DbSession<C: CanisterKind> {
92 db: Db<C>,
93 debug: bool,
94 metrics: Option<&'static dyn MetricsSink>,
95}
96
97#[cfg(test)]
98#[derive(Clone, Copy, Debug, Default, Eq, PartialEq)]
99pub(in crate::db) struct AcceptedCatalogRuntimeCounterSnapshot {
100 pub schema_info_projections: u64,
101 pub persisted_schema_decodes: u64,
102 pub generated_compatible_row_layout_proofs: u64,
103 pub latest_by_entity_calls: u64,
104 pub visible_index_projections: u64,
105}
106
107#[derive(Clone, Debug)]
108struct AcceptedSchemaQueryCacheEntry {
109 snapshot: AcceptedSchemaSnapshot,
110 identity: AcceptedCatalogIdentity,
111}
112
113type AcceptedSchemaQueryCacheKey = (usize, &'static str);
114
115pub(in crate::db) type AcceptedSaveContract = (
116 AcceptedRowDecodeContract,
117 AcceptedRowDecodeContract,
118 SchemaInfo,
119 CommitSchemaFingerprint,
120);
121
122#[derive(Clone, Debug)]
123pub(in crate::db) struct AcceptedSchemaCatalogContext {
124 snapshot: AcceptedSchemaSnapshot,
125 identity: AcceptedCatalogIdentity,
126 schema_info: OnceCell<SchemaInfo>,
127}
128
129impl AcceptedSchemaCatalogContext {
130 #[must_use]
131 pub(in crate::db) const fn new(
132 snapshot: AcceptedSchemaSnapshot,
133 identity: AcceptedCatalogIdentity,
134 ) -> Self {
135 Self {
136 snapshot,
137 identity,
138 schema_info: OnceCell::new(),
139 }
140 }
141
142 #[must_use]
143 pub(in crate::db) const fn snapshot(&self) -> &AcceptedSchemaSnapshot {
144 &self.snapshot
145 }
146
147 #[must_use]
148 pub(in crate::db) const fn schema_version(&self) -> SchemaVersion {
149 self.identity.accepted_schema_version()
150 }
151
152 #[must_use]
153 pub(in crate::db) const fn fingerprint(&self) -> CommitSchemaFingerprint {
154 self.identity.accepted_schema_fingerprint()
155 }
156
157 #[must_use]
158 pub(in crate::db) const fn fingerprint_method_version(&self) -> u8 {
159 self.identity.fingerprint_method_version()
160 }
161
162 #[must_use]
163 #[cfg(feature = "sql")]
164 pub(in crate::db) const fn identity(&self) -> AcceptedCatalogIdentity {
165 self.identity
166 }
167
168 fn debug_assert_matches_entity<E>(&self)
169 where
170 E: EntityKind,
171 {
172 debug_assert_eq!(self.identity.entity_tag(), E::ENTITY_TAG);
173 debug_assert_eq!(self.identity.entity_path(), E::PATH);
174 debug_assert_eq!(self.identity.store_path(), E::Store::PATH);
175 }
176
177 pub(in crate::db) fn accepted_entity_authority_for<E>(
178 &self,
179 ) -> Result<EntityAuthority, InternalError>
180 where
181 E: EntityKind,
182 {
183 let schema_info = self.accepted_schema_info_for::<E>();
184
185 self.accepted_entity_authority_for_schema_info::<E>(schema_info)
186 }
187
188 fn accepted_entity_authority_for_schema_info<E>(
189 &self,
190 schema_info: SchemaInfo,
191 ) -> Result<EntityAuthority, InternalError>
192 where
193 E: EntityKind,
194 {
195 self.debug_assert_matches_entity::<E>();
196 let authority = EntityAuthority::new(E::MODEL, E::ENTITY_TAG, E::Store::PATH);
197 let (accepted_row_layout, row_proof) =
198 AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
199 &self.snapshot,
200 authority.model(),
201 )?;
202 let row_decode_contract = accepted_row_layout.row_decode_contract();
203
204 Ok(
205 authority.with_accepted_row_decode_contract(
206 row_proof,
207 row_decode_contract,
208 schema_info,
209 ),
210 )
211 }
212
213 #[cfg(feature = "sql")]
214 pub(in crate::db) fn accepted_entity_authority_and_schema_info_for<E>(
215 &self,
216 ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
217 where
218 E: EntityKind,
219 {
220 let schema_info = self.accepted_schema_info_for::<E>();
221 let authority = self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?;
222
223 Ok((authority, schema_info))
224 }
225
226 #[cfg(feature = "sql")]
227 pub(in crate::db) fn accepted_or_provided_entity_authority_for<E>(
228 &self,
229 accepted_authority: Option<&EntityAuthority>,
230 ) -> Result<EntityAuthority, InternalError>
231 where
232 E: EntityKind,
233 {
234 match accepted_authority {
235 Some(authority) => Ok(authority.clone()),
236 None => self.accepted_entity_authority_for::<E>(),
237 }
238 }
239
240 #[cfg(feature = "sql-explain")]
241 pub(in crate::db) fn accepted_or_provided_entity_authority_and_schema_info_for<E>(
242 &self,
243 accepted_authority: Option<&EntityAuthority>,
244 ) -> Result<(EntityAuthority, SchemaInfo), InternalError>
245 where
246 E: EntityKind,
247 {
248 let schema_info = self.accepted_schema_info_for::<E>();
249 let authority = match accepted_authority {
250 Some(authority) => authority.clone(),
251 None => self.accepted_entity_authority_for_schema_info::<E>(schema_info.clone())?,
252 };
253
254 Ok((authority, schema_info))
255 }
256
257 #[must_use]
258 pub(in crate::db) fn accepted_schema_info_for<E>(&self) -> SchemaInfo
259 where
260 E: EntityKind,
261 {
262 self.debug_assert_matches_entity::<E>();
263 self.schema_info
264 .get_or_init(|| {
265 SchemaInfo::from_accepted_snapshot_for_model_with_expression_indexes(
266 E::MODEL,
267 &self.snapshot,
268 true,
269 )
270 })
271 .clone()
272 }
273}
274
275pub(in crate::db) fn accepted_save_contract_for_descriptor<E>(
276 accepted_schema: &AcceptedSchemaSnapshot,
277 descriptor: &AcceptedRowLayoutRuntimeContract<'_>,
278) -> Result<AcceptedSaveContract, InternalError>
279where
280 E: EntityKind,
281{
282 let row_decode_contract = descriptor.row_decode_contract();
283 let mutation_row_decode_contract = row_decode_contract.clone();
284 let schema_info = SchemaInfo::from_accepted_snapshot_for_model(E::MODEL, accepted_schema);
285 let schema_fingerprint = accepted_commit_schema_fingerprint(accepted_schema)?;
286
287 Ok((
288 row_decode_contract,
289 mutation_row_decode_contract,
290 schema_info,
291 schema_fingerprint,
292 ))
293}
294
295fn relation_field_count(fields: &[PersistedFieldSnapshot]) -> usize {
296 fields
297 .iter()
298 .filter(|field| persisted_kind_is_relation_field(field.kind()))
299 .count()
300}
301
302fn persisted_kind_is_relation_field(kind: &PersistedFieldKind) -> bool {
303 match kind {
304 PersistedFieldKind::Relation { .. } => true,
305 PersistedFieldKind::List(inner) | PersistedFieldKind::Set(inner) => {
306 matches!(inner.as_ref(), PersistedFieldKind::Relation { .. })
307 }
308 _ => false,
309 }
310}
311
312thread_local! {
313 static ACCEPTED_SCHEMA_QUERY_CACHES: RefCell<HashMap<(usize, &'static str), AcceptedSchemaQueryCacheEntry>> =
318 RefCell::new(HashMap::default());
319}
320
321impl<C: CanisterKind> DbSession<C> {
322 #[must_use]
324 pub(crate) const fn new(db: Db<C>) -> Self {
325 Self {
326 db,
327 debug: false,
328 metrics: None,
329 }
330 }
331
332 #[must_use]
334 pub const fn new_with_hooks(
335 store: &'static LocalKey<StoreRegistry>,
336 entity_runtime_hooks: &'static [EntityRuntimeHooks<C>],
337 ) -> Self {
338 Self::new(Db::new_with_hooks(store, entity_runtime_hooks))
339 }
340
341 #[cfg(test)]
342 pub(in crate::db) fn reset_accepted_catalog_runtime_counters_for_tests() {
343 crate::db::schema::reset_accepted_schema_info_projection_count_for_tests();
344 crate::db::schema::reset_persisted_schema_snapshot_decode_count_for_tests();
345 crate::db::schema::reset_generated_compatible_row_layout_proof_count_for_tests();
346 crate::db::schema::reset_latest_raw_snapshots_by_entity_call_count_for_tests();
347 query::reset_visible_index_projection_count_for_tests();
348 }
349
350 #[cfg(test)]
351 pub(in crate::db) fn accepted_catalog_runtime_counter_snapshot_for_tests()
352 -> AcceptedCatalogRuntimeCounterSnapshot {
353 AcceptedCatalogRuntimeCounterSnapshot {
354 schema_info_projections:
355 crate::db::schema::accepted_schema_info_projection_count_for_tests(),
356 persisted_schema_decodes:
357 crate::db::schema::persisted_schema_snapshot_decode_count_for_tests(),
358 generated_compatible_row_layout_proofs:
359 crate::db::schema::generated_compatible_row_layout_proof_count_for_tests(),
360 latest_by_entity_calls:
361 crate::db::schema::latest_raw_snapshots_by_entity_call_count_for_tests(),
362 visible_index_projections: query::visible_index_projection_count_for_tests(),
363 }
364 }
365
366 #[must_use]
368 pub const fn debug(mut self) -> Self {
369 self.debug = true;
370 self
371 }
372
373 #[must_use]
375 pub const fn metrics_sink(mut self, sink: &'static dyn MetricsSink) -> Self {
376 self.metrics = Some(sink);
377 self
378 }
379
380 const fn fluent_load_query<E>(&self, consistency: MissingRowPolicy) -> FluentLoadQuery<'_, E>
383 where
384 E: EntityKind<Canister = C>,
385 {
386 FluentLoadQuery::new(self, Query::new(consistency))
387 }
388
389 fn fluent_delete_query<E>(&self, consistency: MissingRowPolicy) -> FluentDeleteQuery<'_, E>
393 where
394 E: PersistedRow<Canister = C>,
395 {
396 FluentDeleteQuery::new(self, Query::new(consistency).delete())
397 }
398
399 fn with_metrics<T>(&self, f: impl FnOnce() -> T) -> T {
400 if let Some(sink) = self.metrics {
401 with_metrics_sink(sink, f)
402 } else {
403 f()
404 }
405 }
406
407 fn execute_save_with<E, T, R>(
409 &self,
410 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
411 map: impl FnOnce(T) -> R,
412 ) -> Result<R, InternalError>
413 where
414 E: PersistedRow<Canister = C> + EntityValue,
415 {
416 let (contract, schema_info, schema_fingerprint) = match self
417 .with_metrics(|| self.ensure_generated_compatible_accepted_save_schema::<E>())
418 {
419 Ok(authority) => authority,
420 Err(error) => {
421 self.with_metrics(|| record_exec_error_for_path(ExecKind::Save, E::PATH, &error));
422
423 return Err(error);
424 }
425 };
426 let value = self.with_metrics(|| {
427 op(self.save_executor::<E>(contract, schema_info, schema_fingerprint))
428 })?;
429
430 Ok(map(value))
431 }
432
433 fn execute_save_with_checked_accepted_row_contract<E, T, R>(
438 &self,
439 accepted_row_decode_contract: AcceptedRowDecodeContract,
440 accepted_schema_info: SchemaInfo,
441 accepted_schema_fingerprint: CommitSchemaFingerprint,
442 op: impl FnOnce(SaveExecutor<E>) -> Result<T, InternalError>,
443 map: impl FnOnce(T) -> R,
444 ) -> Result<R, InternalError>
445 where
446 E: PersistedRow<Canister = C> + EntityValue,
447 {
448 let value = self.with_metrics(|| {
449 op(self.save_executor::<E>(
450 accepted_row_decode_contract,
451 accepted_schema_info,
452 accepted_schema_fingerprint,
453 ))
454 })?;
455
456 Ok(map(value))
457 }
458
459 fn execute_save_entity<E>(
461 &self,
462 op: impl FnOnce(SaveExecutor<E>) -> Result<E, InternalError>,
463 ) -> Result<E, InternalError>
464 where
465 E: PersistedRow<Canister = C> + EntityValue,
466 {
467 self.execute_save_with(op, std::convert::identity)
468 }
469
470 fn execute_save_batch<E>(
471 &self,
472 op: impl FnOnce(SaveExecutor<E>) -> Result<Vec<E>, InternalError>,
473 ) -> Result<WriteBatchResponse<E>, InternalError>
474 where
475 E: PersistedRow<Canister = C> + EntityValue,
476 {
477 self.execute_save_with(op, WriteBatchResponse::new)
478 }
479
480 #[must_use]
486 pub const fn load<E>(&self) -> FluentLoadQuery<'_, E>
487 where
488 E: EntityKind<Canister = C>,
489 {
490 self.fluent_load_query(MissingRowPolicy::Ignore)
491 }
492
493 #[must_use]
495 pub const fn load_with_consistency<E>(
496 &self,
497 consistency: MissingRowPolicy,
498 ) -> FluentLoadQuery<'_, E>
499 where
500 E: EntityKind<Canister = C>,
501 {
502 self.fluent_load_query(consistency)
503 }
504
505 #[must_use]
507 pub fn delete<E>(&self) -> FluentDeleteQuery<'_, E>
508 where
509 E: PersistedRow<Canister = C>,
510 {
511 self.fluent_delete_query(MissingRowPolicy::Ignore)
512 }
513
514 #[must_use]
516 pub fn delete_with_consistency<E>(
517 &self,
518 consistency: MissingRowPolicy,
519 ) -> FluentDeleteQuery<'_, E>
520 where
521 E: PersistedRow<Canister = C>,
522 {
523 self.fluent_delete_query(consistency)
524 }
525
526 #[must_use]
530 pub const fn select_one(&self) -> Value {
531 Value::Int64(1)
532 }
533
534 #[must_use]
541 pub fn show_indexes<E>(&self) -> Vec<String>
542 where
543 E: EntityKind<Canister = C>,
544 {
545 self.show_indexes_for_store_model(E::Store::PATH, E::MODEL)
546 }
547
548 #[must_use]
554 pub fn show_indexes_for_model(&self, model: &'static EntityModel) -> Vec<String> {
555 show_indexes_for_model(model)
556 }
557
558 pub fn try_show_indexes<E>(&self) -> Result<Vec<String>, InternalError>
563 where
564 E: EntityKind<Canister = C>,
565 {
566 let schema = self.accepted_schema_info_for_entity::<E>()?;
567
568 Ok(self.show_indexes_for_store_schema_info(E::Store::PATH, &schema))
569 }
570
571 pub(in crate::db) fn show_indexes_for_store_model(
575 &self,
576 store_path: &str,
577 model: &'static EntityModel,
578 ) -> Vec<String> {
579 let runtime_state = self
580 .db
581 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
582 .map(|store| store.index_state());
583
584 show_indexes_for_model_with_runtime_state(model, runtime_state)
585 }
586
587 pub(in crate::db) fn show_indexes_for_store_schema_info(
591 &self,
592 store_path: &str,
593 schema: &SchemaInfo,
594 ) -> Vec<String> {
595 let runtime_state = self
596 .db
597 .with_store_registry(|registry| registry.try_get_store(store_path).ok())
598 .map(|store| store.index_state());
599
600 show_indexes_for_schema_info_with_runtime_state(schema, runtime_state)
601 }
602
603 #[must_use]
609 pub fn show_columns<E>(&self) -> Vec<EntityFieldDescription>
610 where
611 E: EntityKind<Canister = C>,
612 {
613 self.show_columns_for_model(E::MODEL)
614 }
615
616 #[must_use]
618 pub fn show_columns_for_model(
619 &self,
620 model: &'static EntityModel,
621 ) -> Vec<EntityFieldDescription> {
622 describe_entity_fields(model)
623 }
624
625 pub fn try_show_columns<E>(&self) -> Result<Vec<EntityFieldDescription>, InternalError>
631 where
632 E: EntityKind<Canister = C>,
633 {
634 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
635
636 Ok(describe_entity_fields_with_persisted_schema(&snapshot))
637 }
638
639 #[must_use]
646 pub fn show_entities(&self) -> Vec<crate::db::EntityCatalogDescription> {
647 self.try_show_entities().expect("session invariant")
648 }
649
650 pub fn try_show_entities(&self) -> Result<Vec<EntityCatalogDescription>, InternalError> {
652 let mut entities = Vec::with_capacity(self.db.entity_runtime_hooks.len());
653
654 for hooks in self.db.entity_runtime_hooks {
655 let store = self.db.recovered_store(hooks.store_path)?;
656 let storage = store
657 .storage_capabilities()
658 .storage_mode()
659 .as_str()
660 .to_string();
661 let accepted = self.accepted_schema_catalog_context_for_runtime_hook(hooks, store)?;
662 let snapshot = accepted.snapshot().persisted_snapshot();
663
664 entities.push(EntityCatalogDescription::new(
665 snapshot.entity_name().to_string(),
666 snapshot.entity_path().to_string(),
667 hooks.store_path.to_string(),
668 storage,
669 EntityCatalogCounts::new(
670 u32::try_from(snapshot.fields().len()).unwrap_or(u32::MAX),
671 u32::try_from(snapshot.indexes().len()).unwrap_or(u32::MAX),
672 u32::try_from(relation_field_count(snapshot.fields())).unwrap_or(u32::MAX),
673 snapshot.version().get(),
674 ),
675 ));
676 }
677
678 Ok(entities)
679 }
680
681 fn accepted_schema_catalog_context_for_runtime_hook(
682 &self,
683 hooks: &EntityRuntimeHooks<C>,
684 store: crate::db::registry::StoreHandle,
685 ) -> Result<AcceptedSchemaCatalogContext, InternalError> {
686 let cache_key = self.accepted_schema_query_cache_key(hooks.entity_path);
687 if let Some(context) =
688 Self::accepted_schema_catalog_context_from_runtime_hook_cache(cache_key, hooks, store)?
689 {
690 return Ok(context);
691 }
692
693 let snapshot = Self::load_accepted_schema_snapshot_for_runtime_hook(hooks, store)?;
694 let identity = AcceptedCatalogIdentity::new(
695 hooks.entity_tag,
696 hooks.entity_path,
697 hooks.store_path,
698 snapshot.persisted_snapshot().version(),
699 accepted_schema_cache_fingerprint(&snapshot)?,
700 );
701 let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), identity);
702 Self::insert_accepted_schema_query_cache(cache_key, snapshot, identity);
703
704 Ok(context)
705 }
706
707 fn accepted_schema_catalog_context_from_runtime_hook_cache(
708 cache_key: AcceptedSchemaQueryCacheKey,
709 hooks: &EntityRuntimeHooks<C>,
710 store: crate::db::registry::StoreHandle,
711 ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError> {
712 let selection = store.with_schema_mut(|schema_store| {
713 schema_store.latest_catalog_identity(
714 hooks.entity_tag,
715 hooks.entity_path,
716 hooks.store_path,
717 )
718 })?;
719 if let Some(selection) = selection
720 && let Some(context) = Self::accepted_schema_catalog_context_from_query_cache(
721 cache_key,
722 selection.identity(),
723 )
724 {
725 return Ok(Some(context));
726 }
727
728 Ok(None)
729 }
730
731 fn load_accepted_schema_snapshot_for_runtime_hook(
732 hooks: &EntityRuntimeHooks<C>,
733 store: crate::db::registry::StoreHandle,
734 ) -> Result<AcceptedSchemaSnapshot, InternalError> {
735 store.with_schema_mut(|schema_store| {
736 if let Some(snapshot) = schema_store.latest_persisted_snapshot(hooks.entity_tag)? {
737 let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
738 if accepted.entity_path() == hooks.entity_path {
739 return Ok(accepted);
740 }
741 }
742
743 ensure_accepted_schema_snapshot(
744 schema_store,
745 hooks.entity_tag,
746 hooks.entity_path,
747 hooks.model,
748 )
749 })
750 }
751
752 #[must_use]
754 pub fn show_stores(&self) -> Vec<StoreCatalogDescription> {
755 self.db.runtime_store_catalog()
756 }
757
758 #[must_use]
760 pub fn show_memory(&self) -> Vec<crate::db::MemoryCatalogDescription> {
761 self.db.runtime_memory_catalog()
762 }
763
764 #[cfg(any(test, feature = "sql-explain"))]
767 fn visible_indexes_for_store_accepted_schema(
768 &self,
769 store_path: &str,
770 schema_info: &SchemaInfo,
771 ) -> Result<VisibleIndexes<'static>, QueryError> {
772 let store = self
775 .db
776 .recovered_store(store_path)
777 .map_err(QueryError::execute)?;
778 let state = store.index_state();
779 if state != IndexState::Ready {
780 return Ok(VisibleIndexes::none());
781 }
782 debug_assert_eq!(state, IndexState::Ready);
783
784 let visible_indexes = VisibleIndexes::accepted_schema_visible(schema_info);
787 debug_assert!(visible_indexes.accepted_field_path_contracts_are_consistent());
788 debug_assert!(visible_indexes.accepted_expression_contracts_are_consistent());
789 debug_assert_eq!(
790 visible_indexes.accepted_expression_index_count(),
791 Some(visible_indexes.accepted_expression_indexes().len()),
792 );
793
794 Ok(visible_indexes)
795 }
796
797 #[must_use]
803 pub fn describe_entity<E>(&self) -> EntitySchemaDescription
804 where
805 E: EntityKind<Canister = C>,
806 {
807 self.describe_entity_model(E::MODEL)
808 }
809
810 #[must_use]
812 pub fn describe_entity_model(&self, model: &'static EntityModel) -> EntitySchemaDescription {
813 describe_entity_model(model)
814 }
815
816 pub fn try_describe_entity<E>(&self) -> Result<EntitySchemaDescription, InternalError>
822 where
823 E: EntityKind<Canister = C>,
824 {
825 let snapshot = self.ensure_accepted_schema_snapshot::<E>()?;
826
827 Ok(describe_entity_model_with_persisted_schema(
828 E::MODEL,
829 &snapshot,
830 ))
831 }
832
833 fn ensure_accepted_schema_snapshot<E>(&self) -> Result<AcceptedSchemaSnapshot, InternalError>
837 where
838 E: EntityKind<Canister = C>,
839 {
840 let store = self.db.recovered_store(E::Store::PATH)?;
841
842 store.with_schema_mut(|schema_store| {
843 ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
844 })
845 }
846
847 fn accepted_schema_query_cache_key(
848 &self,
849 entity_path: &'static str,
850 ) -> AcceptedSchemaQueryCacheKey {
851 (self.db.cache_scope_id(), entity_path)
852 }
853
854 fn accepted_schema_catalog_context_from_query_cache(
855 cache_key: AcceptedSchemaQueryCacheKey,
856 identity: AcceptedCatalogIdentity,
857 ) -> Option<AcceptedSchemaCatalogContext> {
858 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
859 cache.borrow().get(&cache_key).and_then(|entry| {
860 (entry.identity == identity)
861 .then(|| AcceptedSchemaCatalogContext::new(entry.snapshot.clone(), identity))
862 })
863 })
864 }
865
866 fn insert_accepted_schema_query_cache(
867 cache_key: AcceptedSchemaQueryCacheKey,
868 snapshot: AcceptedSchemaSnapshot,
869 identity: AcceptedCatalogIdentity,
870 ) {
871 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
872 cache.borrow_mut().insert(
873 cache_key,
874 AcceptedSchemaQueryCacheEntry { snapshot, identity },
875 );
876 });
877 }
878
879 #[cfg(test)]
880 pub(in crate::db) fn clear_accepted_schema_query_cache_for_tests() {
881 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
882 cache.borrow_mut().clear();
883 });
884 }
885
886 pub(in crate::db::session) fn accepted_schema_catalog_context_for_query<E>(
891 &self,
892 ) -> Result<AcceptedSchemaCatalogContext, InternalError>
893 where
894 E: EntityKind<Canister = C>,
895 {
896 let cache_key = self.accepted_schema_query_cache_key(E::PATH);
897 if let Some(entry) =
898 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| cache.borrow().get(&cache_key).cloned())
899 {
900 return Ok(AcceptedSchemaCatalogContext::new(
901 entry.snapshot,
902 entry.identity,
903 ));
904 }
905
906 let snapshot = self.load_accepted_schema_snapshot_for_query::<E>()?;
907 let fingerprint = accepted_schema_cache_fingerprint(&snapshot)?;
908 let identity = AcceptedCatalogIdentity::new(
909 E::ENTITY_TAG,
910 E::PATH,
911 E::Store::PATH,
912 snapshot.persisted_snapshot().version(),
913 fingerprint,
914 );
915 Self::insert_accepted_schema_query_cache(cache_key, snapshot.clone(), identity);
916
917 Ok(AcceptedSchemaCatalogContext::new(snapshot, identity))
918 }
919
920 pub(in crate::db::session) fn accepted_catalog_snapshot_selection_for_query<E>(
921 &self,
922 ) -> Result<Option<AcceptedCatalogSnapshotSelection>, InternalError>
923 where
924 E: EntityKind<Canister = C>,
925 {
926 let store = self.db.recovered_store(E::Store::PATH)?;
927
928 store.with_schema_mut(|schema_store| {
929 schema_store.latest_catalog_identity(E::ENTITY_TAG, E::PATH, E::Store::PATH)
930 })
931 }
932
933 pub(in crate::db::session) fn accepted_schema_catalog_context_from_selection<E>(
934 &self,
935 selection: &AcceptedCatalogSnapshotSelection,
936 ) -> Result<Option<AcceptedSchemaCatalogContext>, InternalError>
937 where
938 E: EntityKind<Canister = C>,
939 {
940 let cache_key = self.accepted_schema_query_cache_key(E::PATH);
941 if let Some(context) =
942 Self::accepted_schema_catalog_context_from_query_cache(cache_key, selection.identity())
943 {
944 return Ok(Some(context));
945 }
946
947 let snapshot = selection.decode_verified()?;
948 if snapshot.persisted_snapshot().fields().len() != E::MODEL.fields().len() {
949 return Ok(None);
950 }
951 let context = AcceptedSchemaCatalogContext::new(snapshot.clone(), selection.identity());
952
953 Self::insert_accepted_schema_query_cache(cache_key, snapshot, selection.identity());
954
955 Ok(Some(context))
956 }
957
958 #[cfg(feature = "sql")]
959 fn invalidate_accepted_schema_query_cache_for_entity<E>(&self)
960 where
961 E: EntityKind<Canister = C>,
962 {
963 let cache_key = self.accepted_schema_query_cache_key(E::PATH);
964 ACCEPTED_SCHEMA_QUERY_CACHES.with(|cache| {
965 cache.borrow_mut().remove(&cache_key);
966 });
967 }
968
969 fn load_accepted_schema_snapshot_for_query<E>(
970 &self,
971 ) -> Result<AcceptedSchemaSnapshot, InternalError>
972 where
973 E: EntityKind<Canister = C>,
974 {
975 let store = self.db.recovered_store(E::Store::PATH)?;
976
977 store.with_schema_mut(|schema_store| {
978 if let Some(snapshot) = schema_store.latest_persisted_snapshot(E::ENTITY_TAG)? {
979 let accepted = AcceptedSchemaSnapshot::try_new(snapshot)?;
980 if AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
981 &accepted,
982 E::MODEL,
983 )
984 .is_ok()
985 {
986 return Ok(accepted);
987 }
988 }
989
990 ensure_accepted_schema_snapshot(schema_store, E::ENTITY_TAG, E::PATH, E::MODEL)
991 })
992 }
993
994 pub(in crate::db) fn accepted_schema_info_for_entity<E>(
998 &self,
999 ) -> Result<SchemaInfo, InternalError>
1000 where
1001 E: EntityKind<Canister = C>,
1002 {
1003 let catalog = self.accepted_schema_catalog_context_for_query::<E>()?;
1004
1005 Ok(catalog.accepted_schema_info_for::<E>())
1006 }
1007
1008 #[cfg(feature = "sql")]
1012 pub(in crate::db) fn accepted_entity_authority_for_schema<E>(
1013 accepted_schema: &AcceptedSchemaSnapshot,
1014 ) -> Result<EntityAuthority, InternalError>
1015 where
1016 E: EntityKind<Canister = C>,
1017 {
1018 EntityAuthority::from_accepted_schema_for_type::<E>(accepted_schema)
1019 }
1020
1021 fn ensure_generated_compatible_accepted_save_schema<E>(
1026 &self,
1027 ) -> Result<
1028 (
1029 AcceptedRowDecodeContract,
1030 SchemaInfo,
1031 CommitSchemaFingerprint,
1032 ),
1033 InternalError,
1034 >
1035 where
1036 E: EntityKind<Canister = C>,
1037 {
1038 let accepted_schema = self.ensure_accepted_schema_snapshot::<E>()?;
1039 let (accepted_row_layout, _) =
1040 AcceptedRowLayoutRuntimeContract::from_generated_compatible_schema(
1041 &accepted_schema,
1042 E::MODEL,
1043 )?;
1044 let (row_decode_contract, _, schema_info, schema_fingerprint) =
1045 accepted_save_contract_for_descriptor::<E>(&accepted_schema, &accepted_row_layout)?;
1046
1047 Ok((row_decode_contract, schema_info, schema_fingerprint))
1048 }
1049
1050 pub fn storage_report(
1052 &self,
1053 name_to_path: &[(&'static str, &'static str)],
1054 ) -> Result<StorageReport, InternalError> {
1055 self.db.storage_report(name_to_path)
1056 }
1057
1058 pub fn storage_report_default(&self) -> Result<StorageReport, InternalError> {
1060 self.db.storage_report_default()
1061 }
1062
1063 pub fn integrity_report(&self) -> Result<IntegrityReport, InternalError> {
1065 self.db.integrity_report()
1066 }
1067
1068 #[must_use]
1073 pub(in crate::db) const fn load_executor<E>(&self) -> LoadExecutor<E>
1074 where
1075 E: EntityKind<Canister = C> + EntityValue,
1076 {
1077 LoadExecutor::new(self.db, self.debug)
1078 }
1079
1080 #[must_use]
1081 pub(in crate::db) const fn delete_executor<E>(&self) -> DeleteExecutor<E>
1082 where
1083 E: PersistedRow<Canister = C> + EntityValue,
1084 {
1085 DeleteExecutor::new(self.db)
1086 }
1087
1088 #[must_use]
1089 pub(in crate::db) const fn save_executor<E>(
1090 &self,
1091 accepted_row_decode_contract: AcceptedRowDecodeContract,
1092 accepted_schema_info: SchemaInfo,
1093 accepted_schema_fingerprint: CommitSchemaFingerprint,
1094 ) -> SaveExecutor<E>
1095 where
1096 E: PersistedRow<Canister = C> + EntityValue,
1097 {
1098 SaveExecutor::new_with_accepted_contract(
1099 self.db,
1100 self.debug,
1101 accepted_row_decode_contract,
1102 accepted_schema_info,
1103 accepted_schema_fingerprint,
1104 )
1105 }
1106}