1mod execution_trace;
7#[cfg(test)]
8mod tests;
9
10use crate::{
11 db::{
12 Db, EntityRuntimeHooks,
13 commit::CommitRowOp,
14 data::{DataKey, StorageKey, decode_structural_row_cbor},
15 index::{IndexKey, IndexState},
16 registry::StoreHandle,
17 },
18 error::{ErrorClass, InternalError},
19 traits::CanisterKind,
20 types::EntityTag,
21};
22use candid::CandidType;
23use serde::Deserialize;
24use std::collections::{BTreeMap, BTreeSet};
25
26pub use execution_trace::{
27 ExecutionAccessPathVariant, ExecutionMetrics, ExecutionOptimization, ExecutionTrace,
28};
29
30#[cfg_attr(doc, doc = "StorageReport\n\nLive storage snapshot payload.")]
31#[derive(CandidType, Clone, Debug, Default, Deserialize)]
32pub struct StorageReport {
33 pub(crate) storage_data: Vec<DataStoreSnapshot>,
34 pub(crate) storage_index: Vec<IndexStoreSnapshot>,
35 pub(crate) entity_storage: Vec<EntitySnapshot>,
36 pub(crate) corrupted_keys: u64,
37 pub(crate) corrupted_entries: u64,
38}
39
40#[cfg_attr(
41 doc,
42 doc = "IntegrityTotals\n\nAggregated integrity-scan counters across all stores."
43)]
44#[derive(CandidType, Clone, Debug, Default, Deserialize)]
45pub struct IntegrityTotals {
46 pub(crate) data_rows_scanned: u64,
47 pub(crate) index_entries_scanned: u64,
48 pub(crate) corrupted_data_keys: u64,
49 pub(crate) corrupted_data_rows: u64,
50 pub(crate) corrupted_index_keys: u64,
51 pub(crate) corrupted_index_entries: u64,
52 pub(crate) missing_index_entries: u64,
53 pub(crate) divergent_index_entries: u64,
54 pub(crate) orphan_index_references: u64,
55 pub(crate) compatibility_findings: u64,
56 pub(crate) misuse_findings: u64,
57}
58
59impl IntegrityTotals {
60 const fn add_store_snapshot(&mut self, store: &IntegrityStoreSnapshot) {
61 self.data_rows_scanned = self
62 .data_rows_scanned
63 .saturating_add(store.data_rows_scanned);
64 self.index_entries_scanned = self
65 .index_entries_scanned
66 .saturating_add(store.index_entries_scanned);
67 self.corrupted_data_keys = self
68 .corrupted_data_keys
69 .saturating_add(store.corrupted_data_keys);
70 self.corrupted_data_rows = self
71 .corrupted_data_rows
72 .saturating_add(store.corrupted_data_rows);
73 self.corrupted_index_keys = self
74 .corrupted_index_keys
75 .saturating_add(store.corrupted_index_keys);
76 self.corrupted_index_entries = self
77 .corrupted_index_entries
78 .saturating_add(store.corrupted_index_entries);
79 self.missing_index_entries = self
80 .missing_index_entries
81 .saturating_add(store.missing_index_entries);
82 self.divergent_index_entries = self
83 .divergent_index_entries
84 .saturating_add(store.divergent_index_entries);
85 self.orphan_index_references = self
86 .orphan_index_references
87 .saturating_add(store.orphan_index_references);
88 self.compatibility_findings = self
89 .compatibility_findings
90 .saturating_add(store.compatibility_findings);
91 self.misuse_findings = self.misuse_findings.saturating_add(store.misuse_findings);
92 }
93
94 #[must_use]
96 pub const fn data_rows_scanned(&self) -> u64 {
97 self.data_rows_scanned
98 }
99
100 #[must_use]
102 pub const fn index_entries_scanned(&self) -> u64 {
103 self.index_entries_scanned
104 }
105
106 #[must_use]
108 pub const fn corrupted_data_keys(&self) -> u64 {
109 self.corrupted_data_keys
110 }
111
112 #[must_use]
114 pub const fn corrupted_data_rows(&self) -> u64 {
115 self.corrupted_data_rows
116 }
117
118 #[must_use]
120 pub const fn corrupted_index_keys(&self) -> u64 {
121 self.corrupted_index_keys
122 }
123
124 #[must_use]
126 pub const fn corrupted_index_entries(&self) -> u64 {
127 self.corrupted_index_entries
128 }
129
130 #[must_use]
132 pub const fn missing_index_entries(&self) -> u64 {
133 self.missing_index_entries
134 }
135
136 #[must_use]
138 pub const fn divergent_index_entries(&self) -> u64 {
139 self.divergent_index_entries
140 }
141
142 #[must_use]
144 pub const fn orphan_index_references(&self) -> u64 {
145 self.orphan_index_references
146 }
147
148 #[must_use]
150 pub const fn compatibility_findings(&self) -> u64 {
151 self.compatibility_findings
152 }
153
154 #[must_use]
156 pub const fn misuse_findings(&self) -> u64 {
157 self.misuse_findings
158 }
159}
160
161#[cfg_attr(
162 doc,
163 doc = "IntegrityStoreSnapshot\n\nPer-store integrity findings and scan counters."
164)]
165#[derive(CandidType, Clone, Debug, Default, Deserialize)]
166pub struct IntegrityStoreSnapshot {
167 pub(crate) path: String,
168 pub(crate) data_rows_scanned: u64,
169 pub(crate) index_entries_scanned: u64,
170 pub(crate) corrupted_data_keys: u64,
171 pub(crate) corrupted_data_rows: u64,
172 pub(crate) corrupted_index_keys: u64,
173 pub(crate) corrupted_index_entries: u64,
174 pub(crate) missing_index_entries: u64,
175 pub(crate) divergent_index_entries: u64,
176 pub(crate) orphan_index_references: u64,
177 pub(crate) compatibility_findings: u64,
178 pub(crate) misuse_findings: u64,
179}
180
181impl IntegrityStoreSnapshot {
182 #[must_use]
184 pub fn new(path: String) -> Self {
185 Self {
186 path,
187 ..Self::default()
188 }
189 }
190
191 #[must_use]
193 pub const fn path(&self) -> &str {
194 self.path.as_str()
195 }
196
197 #[must_use]
199 pub const fn data_rows_scanned(&self) -> u64 {
200 self.data_rows_scanned
201 }
202
203 #[must_use]
205 pub const fn index_entries_scanned(&self) -> u64 {
206 self.index_entries_scanned
207 }
208
209 #[must_use]
211 pub const fn corrupted_data_keys(&self) -> u64 {
212 self.corrupted_data_keys
213 }
214
215 #[must_use]
217 pub const fn corrupted_data_rows(&self) -> u64 {
218 self.corrupted_data_rows
219 }
220
221 #[must_use]
223 pub const fn corrupted_index_keys(&self) -> u64 {
224 self.corrupted_index_keys
225 }
226
227 #[must_use]
229 pub const fn corrupted_index_entries(&self) -> u64 {
230 self.corrupted_index_entries
231 }
232
233 #[must_use]
235 pub const fn missing_index_entries(&self) -> u64 {
236 self.missing_index_entries
237 }
238
239 #[must_use]
241 pub const fn divergent_index_entries(&self) -> u64 {
242 self.divergent_index_entries
243 }
244
245 #[must_use]
247 pub const fn orphan_index_references(&self) -> u64 {
248 self.orphan_index_references
249 }
250
251 #[must_use]
253 pub const fn compatibility_findings(&self) -> u64 {
254 self.compatibility_findings
255 }
256
257 #[must_use]
259 pub const fn misuse_findings(&self) -> u64 {
260 self.misuse_findings
261 }
262}
263
264#[cfg_attr(
265 doc,
266 doc = "IntegrityReport\n\nFull integrity-scan output across all registered stores."
267)]
268#[derive(CandidType, Clone, Debug, Default, Deserialize)]
269pub struct IntegrityReport {
270 pub(crate) stores: Vec<IntegrityStoreSnapshot>,
271 pub(crate) totals: IntegrityTotals,
272}
273
274impl IntegrityReport {
275 #[must_use]
277 pub const fn new(stores: Vec<IntegrityStoreSnapshot>, totals: IntegrityTotals) -> Self {
278 Self { stores, totals }
279 }
280
281 #[must_use]
283 pub const fn stores(&self) -> &[IntegrityStoreSnapshot] {
284 self.stores.as_slice()
285 }
286
287 #[must_use]
289 pub const fn totals(&self) -> &IntegrityTotals {
290 &self.totals
291 }
292}
293
294impl StorageReport {
295 #[must_use]
297 pub const fn new(
298 storage_data: Vec<DataStoreSnapshot>,
299 storage_index: Vec<IndexStoreSnapshot>,
300 entity_storage: Vec<EntitySnapshot>,
301 corrupted_keys: u64,
302 corrupted_entries: u64,
303 ) -> Self {
304 Self {
305 storage_data,
306 storage_index,
307 entity_storage,
308 corrupted_keys,
309 corrupted_entries,
310 }
311 }
312
313 #[must_use]
315 pub const fn storage_data(&self) -> &[DataStoreSnapshot] {
316 self.storage_data.as_slice()
317 }
318
319 #[must_use]
321 pub const fn storage_index(&self) -> &[IndexStoreSnapshot] {
322 self.storage_index.as_slice()
323 }
324
325 #[must_use]
327 pub const fn entity_storage(&self) -> &[EntitySnapshot] {
328 self.entity_storage.as_slice()
329 }
330
331 #[must_use]
333 pub const fn corrupted_keys(&self) -> u64 {
334 self.corrupted_keys
335 }
336
337 #[must_use]
339 pub const fn corrupted_entries(&self) -> u64 {
340 self.corrupted_entries
341 }
342}
343
344#[cfg_attr(doc, doc = "DataStoreSnapshot\n\nData-store snapshot row.")]
345#[derive(CandidType, Clone, Debug, Default, Deserialize)]
346pub struct DataStoreSnapshot {
347 pub(crate) path: String,
348 pub(crate) entries: u64,
349 pub(crate) memory_bytes: u64,
350}
351
352impl DataStoreSnapshot {
353 #[must_use]
355 pub const fn new(path: String, entries: u64, memory_bytes: u64) -> Self {
356 Self {
357 path,
358 entries,
359 memory_bytes,
360 }
361 }
362
363 #[must_use]
365 pub const fn path(&self) -> &str {
366 self.path.as_str()
367 }
368
369 #[must_use]
371 pub const fn entries(&self) -> u64 {
372 self.entries
373 }
374
375 #[must_use]
377 pub const fn memory_bytes(&self) -> u64 {
378 self.memory_bytes
379 }
380}
381
382#[cfg_attr(doc, doc = "IndexStoreSnapshot\n\nIndex-store snapshot row.")]
383#[derive(CandidType, Clone, Debug, Default, Deserialize)]
384pub struct IndexStoreSnapshot {
385 pub(crate) path: String,
386 pub(crate) entries: u64,
387 pub(crate) user_entries: u64,
388 pub(crate) system_entries: u64,
389 pub(crate) memory_bytes: u64,
390 pub(crate) state: IndexState,
391}
392
393impl IndexStoreSnapshot {
394 #[must_use]
396 pub const fn new(
397 path: String,
398 entries: u64,
399 user_entries: u64,
400 system_entries: u64,
401 memory_bytes: u64,
402 state: IndexState,
403 ) -> Self {
404 Self {
405 path,
406 entries,
407 user_entries,
408 system_entries,
409 memory_bytes,
410 state,
411 }
412 }
413
414 #[must_use]
416 pub const fn path(&self) -> &str {
417 self.path.as_str()
418 }
419
420 #[must_use]
422 pub const fn entries(&self) -> u64 {
423 self.entries
424 }
425
426 #[must_use]
428 pub const fn user_entries(&self) -> u64 {
429 self.user_entries
430 }
431
432 #[must_use]
434 pub const fn system_entries(&self) -> u64 {
435 self.system_entries
436 }
437
438 #[must_use]
440 pub const fn memory_bytes(&self) -> u64 {
441 self.memory_bytes
442 }
443
444 #[must_use]
447 pub const fn state(&self) -> IndexState {
448 self.state
449 }
450}
451
452#[cfg_attr(doc, doc = "EntitySnapshot\n\nPer-entity storage snapshot row.")]
453#[derive(CandidType, Clone, Debug, Default, Deserialize)]
454pub struct EntitySnapshot {
455 pub(crate) store: String,
456
457 pub(crate) path: String,
458
459 pub(crate) entries: u64,
460
461 pub(crate) memory_bytes: u64,
462}
463
464impl EntitySnapshot {
465 #[must_use]
467 pub const fn new(store: String, path: String, entries: u64, memory_bytes: u64) -> Self {
468 Self {
469 store,
470 path,
471 entries,
472 memory_bytes,
473 }
474 }
475
476 #[must_use]
478 pub const fn store(&self) -> &str {
479 self.store.as_str()
480 }
481
482 #[must_use]
484 pub const fn path(&self) -> &str {
485 self.path.as_str()
486 }
487
488 #[must_use]
490 pub const fn entries(&self) -> u64 {
491 self.entries
492 }
493
494 #[must_use]
496 pub const fn memory_bytes(&self) -> u64 {
497 self.memory_bytes
498 }
499}
500
501#[cfg_attr(
502 doc,
503 doc = "EntityStats\n\nInternal struct for building per-entity stats before snapshotting."
504)]
505#[derive(Default)]
506struct EntityStats {
507 entries: u64,
508 memory_bytes: u64,
509}
510
511impl EntityStats {
512 const fn update(&mut self, value_len: u64) {
514 self.entries = self.entries.saturating_add(1);
515 self.memory_bytes = self
516 .memory_bytes
517 .saturating_add(DataKey::entry_size_bytes(value_len));
518 }
519}
520
521fn update_default_entity_stats(
525 entity_stats: &mut Vec<(EntityTag, EntityStats)>,
526 entity_tag: EntityTag,
527 value_len: u64,
528) {
529 if let Some((_, stats)) = entity_stats
530 .iter_mut()
531 .find(|(existing_tag, _)| *existing_tag == entity_tag)
532 {
533 stats.update(value_len);
534 return;
535 }
536
537 let mut stats = EntityStats::default();
538 stats.update(value_len);
539 entity_stats.push((entity_tag, stats));
540}
541
542fn storage_report_name_for_hook<'a, C: CanisterKind>(
543 name_map: &BTreeMap<&'static str, &'a str>,
544 hooks: &EntityRuntimeHooks<C>,
545) -> &'a str {
546 name_map
547 .get(hooks.entity_path)
548 .copied()
549 .or_else(|| name_map.get(hooks.model.name()).copied())
550 .unwrap_or(hooks.entity_path)
551}
552
553fn storage_report_default_name_for_entity_tag<C: CanisterKind>(
556 db: &Db<C>,
557 entity_tag: EntityTag,
558) -> String {
559 db.runtime_hook_for_entity_tag(entity_tag).ok().map_or_else(
560 || format!("#{}", entity_tag.value()),
561 |hooks| hooks.entity_path.to_string(),
562 )
563}
564
565#[cfg_attr(
566 doc,
567 doc = "Build one deterministic storage snapshot with default entity-path names.\n\nThis variant is used by generated snapshot endpoints that never pass alias remapping, so it keeps the snapshot root independent from optional alias-resolution machinery."
568)]
569pub(crate) fn storage_report_default<C: CanisterKind>(
570 db: &Db<C>,
571) -> Result<StorageReport, InternalError> {
572 db.ensure_recovered_state()?;
573 let mut data = Vec::new();
574 let mut index = Vec::new();
575 let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
576 let mut corrupted_keys = 0u64;
577 let mut corrupted_entries = 0u64;
578
579 db.with_store_registry(|reg| {
580 let mut stores = reg.iter().collect::<Vec<_>>();
582 stores.sort_by_key(|(path, _)| *path);
583
584 for (path, store_handle) in stores {
585 store_handle.with_data(|store| {
587 data.push(DataStoreSnapshot::new(
588 path.to_string(),
589 store.len(),
590 store.memory_bytes(),
591 ));
592
593 let mut by_entity = Vec::<(EntityTag, EntityStats)>::new();
595
596 for entry in store.iter() {
597 let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
598 corrupted_keys = corrupted_keys.saturating_add(1);
599 continue;
600 };
601
602 let value_len = entry.value().len() as u64;
603
604 update_default_entity_stats(&mut by_entity, dk.entity_tag(), value_len);
605 }
606
607 for (entity_tag, stats) in by_entity {
608 entity_storage.push(EntitySnapshot::new(
609 path.to_string(),
610 storage_report_default_name_for_entity_tag(db, entity_tag),
611 stats.entries,
612 stats.memory_bytes,
613 ));
614 }
615 });
616
617 store_handle.with_index(|store| {
619 let mut user_entries = 0u64;
620 let mut system_entries = 0u64;
621
622 for (key, value) in store.entries() {
623 let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
624 corrupted_entries = corrupted_entries.saturating_add(1);
625 continue;
626 };
627
628 if decoded_key.uses_system_namespace() {
629 system_entries = system_entries.saturating_add(1);
630 } else {
631 user_entries = user_entries.saturating_add(1);
632 }
633
634 if value.validate().is_err() {
635 corrupted_entries = corrupted_entries.saturating_add(1);
636 }
637 }
638
639 index.push(IndexStoreSnapshot::new(
640 path.to_string(),
641 store.len(),
642 user_entries,
643 system_entries,
644 store.memory_bytes(),
645 store.state(),
646 ));
647 });
648 }
649 });
650
651 entity_storage
654 .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
655
656 Ok(StorageReport::new(
657 data,
658 index,
659 entity_storage,
660 corrupted_keys,
661 corrupted_entries,
662 ))
663}
664
665#[cfg_attr(
666 doc,
667 doc = "Build one deterministic storage snapshot with per-entity rollups.\n\nThis path is read-only and fail-closed on decode/validation errors by counting corrupted keys/entries instead of panicking."
668)]
669pub(crate) fn storage_report<C: CanisterKind>(
670 db: &Db<C>,
671 name_to_path: &[(&'static str, &'static str)],
672) -> Result<StorageReport, InternalError> {
673 db.ensure_recovered_state()?;
674 let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
678 let mut tag_name_map = BTreeMap::<EntityTag, &str>::new();
679 for hooks in db.entity_runtime_hooks {
680 tag_name_map
681 .entry(hooks.entity_tag)
682 .or_insert_with(|| storage_report_name_for_hook(&name_map, hooks));
683 }
684 let mut data = Vec::new();
685 let mut index = Vec::new();
686 let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
687 let mut corrupted_keys = 0u64;
688 let mut corrupted_entries = 0u64;
689
690 db.with_store_registry(|reg| {
691 let mut stores = reg.iter().collect::<Vec<_>>();
693 stores.sort_by_key(|(path, _)| *path);
694
695 for (path, store_handle) in stores {
696 store_handle.with_data(|store| {
698 data.push(DataStoreSnapshot::new(
699 path.to_string(),
700 store.len(),
701 store.memory_bytes(),
702 ));
703
704 let mut by_entity: BTreeMap<EntityTag, EntityStats> = BTreeMap::new();
706
707 for entry in store.iter() {
708 let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
709 corrupted_keys = corrupted_keys.saturating_add(1);
710 continue;
711 };
712
713 let value_len = entry.value().len() as u64;
714
715 by_entity
716 .entry(dk.entity_tag())
717 .or_default()
718 .update(value_len);
719 }
720
721 for (entity_tag, stats) in by_entity {
722 let path_name = tag_name_map
723 .get(&entity_tag)
724 .copied()
725 .map(str::to_string)
726 .or_else(|| {
727 db.runtime_hook_for_entity_tag(entity_tag)
728 .ok()
729 .map(|hooks| {
730 storage_report_name_for_hook(&name_map, hooks).to_string()
731 })
732 })
733 .unwrap_or_else(|| format!("#{}", entity_tag.value()));
734 entity_storage.push(EntitySnapshot::new(
735 path.to_string(),
736 path_name,
737 stats.entries,
738 stats.memory_bytes,
739 ));
740 }
741 });
742
743 store_handle.with_index(|store| {
745 let mut user_entries = 0u64;
746 let mut system_entries = 0u64;
747
748 for (key, value) in store.entries() {
749 let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
750 corrupted_entries = corrupted_entries.saturating_add(1);
751 continue;
752 };
753
754 if decoded_key.uses_system_namespace() {
755 system_entries = system_entries.saturating_add(1);
756 } else {
757 user_entries = user_entries.saturating_add(1);
758 }
759
760 if value.validate().is_err() {
761 corrupted_entries = corrupted_entries.saturating_add(1);
762 }
763 }
764
765 index.push(IndexStoreSnapshot::new(
766 path.to_string(),
767 store.len(),
768 user_entries,
769 system_entries,
770 store.memory_bytes(),
771 store.state(),
772 ));
773 });
774 }
775 });
776
777 entity_storage
780 .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
781
782 Ok(StorageReport::new(
783 data,
784 index,
785 entity_storage,
786 corrupted_keys,
787 corrupted_entries,
788 ))
789}
790
791#[cfg_attr(
792 doc,
793 doc = "Build one deterministic integrity scan over all registered stores.\n\nThis scan is read-only and classifies findings as:\n- corruption: malformed persisted bytes or inconsistent structural links\n- compatibility: persisted payloads outside decode compatibility windows\n- misuse: unsupported runtime wiring (for example missing entity hooks)"
794)]
795pub(crate) fn integrity_report<C: CanisterKind>(
796 db: &Db<C>,
797) -> Result<IntegrityReport, InternalError> {
798 db.ensure_recovered_state()?;
799
800 integrity_report_after_recovery(db)
801}
802
803#[cfg_attr(
804 doc,
805 doc = "Build one deterministic integrity scan after recovery has already completed.\n\nCallers running inside recovery flow should use this variant to avoid recursive recovery gating."
806)]
807pub(in crate::db) fn integrity_report_after_recovery<C: CanisterKind>(
808 db: &Db<C>,
809) -> Result<IntegrityReport, InternalError> {
810 build_integrity_report(db)
811}
812
813fn build_integrity_report<C: CanisterKind>(db: &Db<C>) -> Result<IntegrityReport, InternalError> {
814 let mut stores = Vec::new();
815 let mut totals = IntegrityTotals::default();
816 let global_live_keys_by_entity = collect_global_live_keys_by_entity(db)?;
817
818 db.with_store_registry(|reg| {
819 let mut store_entries = reg.iter().collect::<Vec<_>>();
821 store_entries.sort_by_key(|(path, _)| *path);
822
823 for (path, store_handle) in store_entries {
824 let mut snapshot = IntegrityStoreSnapshot::new(path.to_string());
825 scan_store_forward_integrity(db, store_handle, &mut snapshot)?;
826 scan_store_reverse_integrity(store_handle, &global_live_keys_by_entity, &mut snapshot);
827
828 totals.add_store_snapshot(&snapshot);
829 stores.push(snapshot);
830 }
831
832 Ok::<(), InternalError>(())
833 })?;
834
835 Ok(IntegrityReport::new(stores, totals))
836}
837
838fn collect_global_live_keys_by_entity<C: CanisterKind>(
840 db: &Db<C>,
841) -> Result<BTreeMap<EntityTag, BTreeSet<StorageKey>>, InternalError> {
842 let mut keys = BTreeMap::<EntityTag, BTreeSet<StorageKey>>::new();
843
844 db.with_store_registry(|reg| {
845 for (_, store_handle) in reg.iter() {
846 store_handle.with_data(|data_store| {
847 for entry in data_store.iter() {
848 if let Ok(data_key) = DataKey::try_from_raw(entry.key()) {
849 keys.entry(data_key.entity_tag())
850 .or_default()
851 .insert(data_key.storage_key());
852 }
853 }
854 });
855 }
856
857 Ok::<(), InternalError>(())
858 })?;
859
860 Ok(keys)
861}
862
863fn scan_store_forward_integrity<C: CanisterKind>(
865 db: &Db<C>,
866 store_handle: StoreHandle,
867 snapshot: &mut IntegrityStoreSnapshot,
868) -> Result<(), InternalError> {
869 store_handle.with_data(|data_store| {
870 for entry in data_store.iter() {
871 snapshot.data_rows_scanned = snapshot.data_rows_scanned.saturating_add(1);
872
873 let raw_key = *entry.key();
874
875 let Ok(data_key) = DataKey::try_from_raw(&raw_key) else {
876 snapshot.corrupted_data_keys = snapshot.corrupted_data_keys.saturating_add(1);
877 continue;
878 };
879
880 let hooks = match db.runtime_hook_for_entity_tag(data_key.entity_tag()) {
881 Ok(hooks) => hooks,
882 Err(err) => {
883 classify_scan_error(err, snapshot)?;
884 continue;
885 }
886 };
887
888 let marker_row = CommitRowOp::new(
889 hooks.entity_path,
890 raw_key,
891 None,
892 Some(entry.value().as_bytes().to_vec()),
893 crate::db::schema::commit_schema_fingerprint_for_model(
894 hooks.entity_path,
895 hooks.model,
896 ),
897 );
898
899 if let Err(err) = decode_structural_row_cbor(&entry.value()) {
902 classify_scan_error(err, snapshot)?;
903 continue;
904 }
905
906 let prepared = match db.prepare_row_commit_op(&marker_row) {
907 Ok(prepared) => prepared,
908 Err(err) => {
909 classify_scan_error(err, snapshot)?;
910 continue;
911 }
912 };
913
914 for index_op in prepared.index_ops {
915 let Some(expected_value) = index_op.value else {
916 continue;
917 };
918
919 let actual = index_op
920 .store
921 .with_borrow(|index_store| index_store.get(&index_op.key));
922 match actual {
923 Some(actual_value) if actual_value == expected_value => {}
924 Some(_) => {
925 snapshot.divergent_index_entries =
926 snapshot.divergent_index_entries.saturating_add(1);
927 }
928 None => {
929 snapshot.missing_index_entries =
930 snapshot.missing_index_entries.saturating_add(1);
931 }
932 }
933 }
934 }
935
936 Ok::<(), InternalError>(())
937 })
938}
939
940fn scan_store_reverse_integrity(
942 store_handle: StoreHandle,
943 live_keys_by_entity: &BTreeMap<EntityTag, BTreeSet<StorageKey>>,
944 snapshot: &mut IntegrityStoreSnapshot,
945) {
946 store_handle.with_index(|index_store| {
947 for (raw_index_key, raw_index_entry) in index_store.entries() {
948 snapshot.index_entries_scanned = snapshot.index_entries_scanned.saturating_add(1);
949
950 let Ok(decoded_index_key) = IndexKey::try_from_raw(&raw_index_key) else {
951 snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
952 continue;
953 };
954
955 let index_entity_tag = data_entity_tag_for_index_key(&decoded_index_key);
956
957 let Ok(indexed_primary_keys) = raw_index_entry.decode_keys() else {
958 snapshot.corrupted_index_entries =
959 snapshot.corrupted_index_entries.saturating_add(1);
960 continue;
961 };
962
963 for primary_key in indexed_primary_keys {
964 let exists = live_keys_by_entity
965 .get(&index_entity_tag)
966 .is_some_and(|entity_keys| entity_keys.contains(&primary_key));
967 if !exists {
968 snapshot.orphan_index_references =
969 snapshot.orphan_index_references.saturating_add(1);
970 }
971 }
972 }
973 });
974}
975
976fn classify_scan_error(
978 err: InternalError,
979 snapshot: &mut IntegrityStoreSnapshot,
980) -> Result<(), InternalError> {
981 match err.class() {
982 ErrorClass::Corruption => {
983 snapshot.corrupted_data_rows = snapshot.corrupted_data_rows.saturating_add(1);
984 Ok(())
985 }
986 ErrorClass::IncompatiblePersistedFormat => {
987 snapshot.compatibility_findings = snapshot.compatibility_findings.saturating_add(1);
988 Ok(())
989 }
990 ErrorClass::Unsupported | ErrorClass::NotFound | ErrorClass::Conflict => {
991 snapshot.misuse_findings = snapshot.misuse_findings.saturating_add(1);
992 Ok(())
993 }
994 ErrorClass::Internal | ErrorClass::InvariantViolation => Err(err),
995 }
996}
997
998const fn data_entity_tag_for_index_key(index_key: &IndexKey) -> EntityTag {
1000 index_key.index_id().entity_tag
1001}