1mod execution_trace;
7#[cfg(test)]
8mod tests;
9
10use crate::{
11 db::{
12 Db, EntityRuntimeHooks,
13 commit::CommitRowOp,
14 data::{DataKey, StorageKey, decode_structural_row_payload},
15 index::{IndexKey, IndexState},
16 registry::StoreHandle,
17 },
18 error::{ErrorClass, InternalError},
19 traits::CanisterKind,
20 types::EntityTag,
21};
22use candid::CandidType;
23use serde::Deserialize;
24use std::collections::{BTreeMap, BTreeSet};
25
26pub use execution_trace::{
27 ExecutionAccessPathVariant, ExecutionMetrics, ExecutionOptimization, ExecutionTrace,
28};
29
30#[cfg_attr(doc, doc = "StorageReport\n\nLive storage snapshot payload.")]
31#[derive(CandidType, Clone, Debug, Default, Deserialize)]
32pub struct StorageReport {
33 pub(crate) storage_data: Vec<DataStoreSnapshot>,
34 pub(crate) storage_index: Vec<IndexStoreSnapshot>,
35 pub(crate) entity_storage: Vec<EntitySnapshot>,
36 pub(crate) corrupted_keys: u64,
37 pub(crate) corrupted_entries: u64,
38}
39
40#[cfg_attr(
41 doc,
42 doc = "IntegrityTotals\n\nAggregated integrity-scan counters across all stores."
43)]
44#[derive(CandidType, Clone, Debug, Default, Deserialize)]
45pub struct IntegrityTotals {
46 pub(crate) data_rows_scanned: u64,
47 pub(crate) index_entries_scanned: u64,
48 pub(crate) corrupted_data_keys: u64,
49 pub(crate) corrupted_data_rows: u64,
50 pub(crate) corrupted_index_keys: u64,
51 pub(crate) corrupted_index_entries: u64,
52 pub(crate) missing_index_entries: u64,
53 pub(crate) divergent_index_entries: u64,
54 pub(crate) orphan_index_references: u64,
55 pub(crate) misuse_findings: u64,
56}
57
58impl IntegrityTotals {
59 const fn add_store_snapshot(&mut self, store: &IntegrityStoreSnapshot) {
60 self.data_rows_scanned = self
61 .data_rows_scanned
62 .saturating_add(store.data_rows_scanned);
63 self.index_entries_scanned = self
64 .index_entries_scanned
65 .saturating_add(store.index_entries_scanned);
66 self.corrupted_data_keys = self
67 .corrupted_data_keys
68 .saturating_add(store.corrupted_data_keys);
69 self.corrupted_data_rows = self
70 .corrupted_data_rows
71 .saturating_add(store.corrupted_data_rows);
72 self.corrupted_index_keys = self
73 .corrupted_index_keys
74 .saturating_add(store.corrupted_index_keys);
75 self.corrupted_index_entries = self
76 .corrupted_index_entries
77 .saturating_add(store.corrupted_index_entries);
78 self.missing_index_entries = self
79 .missing_index_entries
80 .saturating_add(store.missing_index_entries);
81 self.divergent_index_entries = self
82 .divergent_index_entries
83 .saturating_add(store.divergent_index_entries);
84 self.orphan_index_references = self
85 .orphan_index_references
86 .saturating_add(store.orphan_index_references);
87 self.misuse_findings = self.misuse_findings.saturating_add(store.misuse_findings);
88 }
89
90 #[must_use]
92 pub const fn data_rows_scanned(&self) -> u64 {
93 self.data_rows_scanned
94 }
95
96 #[must_use]
98 pub const fn index_entries_scanned(&self) -> u64 {
99 self.index_entries_scanned
100 }
101
102 #[must_use]
104 pub const fn corrupted_data_keys(&self) -> u64 {
105 self.corrupted_data_keys
106 }
107
108 #[must_use]
110 pub const fn corrupted_data_rows(&self) -> u64 {
111 self.corrupted_data_rows
112 }
113
114 #[must_use]
116 pub const fn corrupted_index_keys(&self) -> u64 {
117 self.corrupted_index_keys
118 }
119
120 #[must_use]
122 pub const fn corrupted_index_entries(&self) -> u64 {
123 self.corrupted_index_entries
124 }
125
126 #[must_use]
128 pub const fn missing_index_entries(&self) -> u64 {
129 self.missing_index_entries
130 }
131
132 #[must_use]
134 pub const fn divergent_index_entries(&self) -> u64 {
135 self.divergent_index_entries
136 }
137
138 #[must_use]
140 pub const fn orphan_index_references(&self) -> u64 {
141 self.orphan_index_references
142 }
143
144 #[must_use]
146 pub const fn misuse_findings(&self) -> u64 {
147 self.misuse_findings
148 }
149}
150
151#[cfg_attr(
152 doc,
153 doc = "IntegrityStoreSnapshot\n\nPer-store integrity findings and scan counters."
154)]
155#[derive(CandidType, Clone, Debug, Default, Deserialize)]
156pub struct IntegrityStoreSnapshot {
157 pub(crate) path: String,
158 pub(crate) data_rows_scanned: u64,
159 pub(crate) index_entries_scanned: u64,
160 pub(crate) corrupted_data_keys: u64,
161 pub(crate) corrupted_data_rows: u64,
162 pub(crate) corrupted_index_keys: u64,
163 pub(crate) corrupted_index_entries: u64,
164 pub(crate) missing_index_entries: u64,
165 pub(crate) divergent_index_entries: u64,
166 pub(crate) orphan_index_references: u64,
167 pub(crate) misuse_findings: u64,
168}
169
170impl IntegrityStoreSnapshot {
171 #[must_use]
173 pub fn new(path: String) -> Self {
174 Self {
175 path,
176 ..Self::default()
177 }
178 }
179
180 #[must_use]
182 pub const fn path(&self) -> &str {
183 self.path.as_str()
184 }
185
186 #[must_use]
188 pub const fn data_rows_scanned(&self) -> u64 {
189 self.data_rows_scanned
190 }
191
192 #[must_use]
194 pub const fn index_entries_scanned(&self) -> u64 {
195 self.index_entries_scanned
196 }
197
198 #[must_use]
200 pub const fn corrupted_data_keys(&self) -> u64 {
201 self.corrupted_data_keys
202 }
203
204 #[must_use]
206 pub const fn corrupted_data_rows(&self) -> u64 {
207 self.corrupted_data_rows
208 }
209
210 #[must_use]
212 pub const fn corrupted_index_keys(&self) -> u64 {
213 self.corrupted_index_keys
214 }
215
216 #[must_use]
218 pub const fn corrupted_index_entries(&self) -> u64 {
219 self.corrupted_index_entries
220 }
221
222 #[must_use]
224 pub const fn missing_index_entries(&self) -> u64 {
225 self.missing_index_entries
226 }
227
228 #[must_use]
230 pub const fn divergent_index_entries(&self) -> u64 {
231 self.divergent_index_entries
232 }
233
234 #[must_use]
236 pub const fn orphan_index_references(&self) -> u64 {
237 self.orphan_index_references
238 }
239
240 #[must_use]
242 pub const fn misuse_findings(&self) -> u64 {
243 self.misuse_findings
244 }
245}
246
247#[cfg_attr(
248 doc,
249 doc = "IntegrityReport\n\nFull integrity-scan output across all registered stores."
250)]
251#[derive(CandidType, Clone, Debug, Default, Deserialize)]
252pub struct IntegrityReport {
253 pub(crate) stores: Vec<IntegrityStoreSnapshot>,
254 pub(crate) totals: IntegrityTotals,
255}
256
257impl IntegrityReport {
258 #[must_use]
260 pub const fn new(stores: Vec<IntegrityStoreSnapshot>, totals: IntegrityTotals) -> Self {
261 Self { stores, totals }
262 }
263
264 #[must_use]
266 pub const fn stores(&self) -> &[IntegrityStoreSnapshot] {
267 self.stores.as_slice()
268 }
269
270 #[must_use]
272 pub const fn totals(&self) -> &IntegrityTotals {
273 &self.totals
274 }
275}
276
277impl StorageReport {
278 #[must_use]
280 pub const fn new(
281 storage_data: Vec<DataStoreSnapshot>,
282 storage_index: Vec<IndexStoreSnapshot>,
283 entity_storage: Vec<EntitySnapshot>,
284 corrupted_keys: u64,
285 corrupted_entries: u64,
286 ) -> Self {
287 Self {
288 storage_data,
289 storage_index,
290 entity_storage,
291 corrupted_keys,
292 corrupted_entries,
293 }
294 }
295
296 #[must_use]
298 pub const fn storage_data(&self) -> &[DataStoreSnapshot] {
299 self.storage_data.as_slice()
300 }
301
302 #[must_use]
304 pub const fn storage_index(&self) -> &[IndexStoreSnapshot] {
305 self.storage_index.as_slice()
306 }
307
308 #[must_use]
310 pub const fn entity_storage(&self) -> &[EntitySnapshot] {
311 self.entity_storage.as_slice()
312 }
313
314 #[must_use]
316 pub const fn corrupted_keys(&self) -> u64 {
317 self.corrupted_keys
318 }
319
320 #[must_use]
322 pub const fn corrupted_entries(&self) -> u64 {
323 self.corrupted_entries
324 }
325}
326
327#[cfg_attr(doc, doc = "DataStoreSnapshot\n\nData-store snapshot row.")]
328#[derive(CandidType, Clone, Debug, Default, Deserialize)]
329pub struct DataStoreSnapshot {
330 pub(crate) path: String,
331 pub(crate) entries: u64,
332 pub(crate) memory_bytes: u64,
333}
334
335impl DataStoreSnapshot {
336 #[must_use]
338 pub const fn new(path: String, entries: u64, memory_bytes: u64) -> Self {
339 Self {
340 path,
341 entries,
342 memory_bytes,
343 }
344 }
345
346 #[must_use]
348 pub const fn path(&self) -> &str {
349 self.path.as_str()
350 }
351
352 #[must_use]
354 pub const fn entries(&self) -> u64 {
355 self.entries
356 }
357
358 #[must_use]
360 pub const fn memory_bytes(&self) -> u64 {
361 self.memory_bytes
362 }
363}
364
365#[cfg_attr(doc, doc = "IndexStoreSnapshot\n\nIndex-store snapshot row.")]
366#[derive(CandidType, Clone, Debug, Default, Deserialize)]
367pub struct IndexStoreSnapshot {
368 pub(crate) path: String,
369 pub(crate) entries: u64,
370 pub(crate) user_entries: u64,
371 pub(crate) system_entries: u64,
372 pub(crate) memory_bytes: u64,
373 pub(crate) state: IndexState,
374}
375
376impl IndexStoreSnapshot {
377 #[must_use]
379 pub const fn new(
380 path: String,
381 entries: u64,
382 user_entries: u64,
383 system_entries: u64,
384 memory_bytes: u64,
385 state: IndexState,
386 ) -> Self {
387 Self {
388 path,
389 entries,
390 user_entries,
391 system_entries,
392 memory_bytes,
393 state,
394 }
395 }
396
397 #[must_use]
399 pub const fn path(&self) -> &str {
400 self.path.as_str()
401 }
402
403 #[must_use]
405 pub const fn entries(&self) -> u64 {
406 self.entries
407 }
408
409 #[must_use]
411 pub const fn user_entries(&self) -> u64 {
412 self.user_entries
413 }
414
415 #[must_use]
417 pub const fn system_entries(&self) -> u64 {
418 self.system_entries
419 }
420
421 #[must_use]
423 pub const fn memory_bytes(&self) -> u64 {
424 self.memory_bytes
425 }
426
427 #[must_use]
430 pub const fn state(&self) -> IndexState {
431 self.state
432 }
433}
434
435#[cfg_attr(doc, doc = "EntitySnapshot\n\nPer-entity storage snapshot row.")]
436#[derive(CandidType, Clone, Debug, Default, Deserialize)]
437pub struct EntitySnapshot {
438 pub(crate) store: String,
439
440 pub(crate) path: String,
441
442 pub(crate) entries: u64,
443
444 pub(crate) memory_bytes: u64,
445}
446
447impl EntitySnapshot {
448 #[must_use]
450 pub const fn new(store: String, path: String, entries: u64, memory_bytes: u64) -> Self {
451 Self {
452 store,
453 path,
454 entries,
455 memory_bytes,
456 }
457 }
458
459 #[must_use]
461 pub const fn store(&self) -> &str {
462 self.store.as_str()
463 }
464
465 #[must_use]
467 pub const fn path(&self) -> &str {
468 self.path.as_str()
469 }
470
471 #[must_use]
473 pub const fn entries(&self) -> u64 {
474 self.entries
475 }
476
477 #[must_use]
479 pub const fn memory_bytes(&self) -> u64 {
480 self.memory_bytes
481 }
482}
483
484#[cfg_attr(
485 doc,
486 doc = "EntityStats\n\nInternal struct for building per-entity stats before snapshotting."
487)]
488#[derive(Default)]
489struct EntityStats {
490 entries: u64,
491 memory_bytes: u64,
492}
493
494impl EntityStats {
495 const fn update(&mut self, value_len: u64) {
497 self.entries = self.entries.saturating_add(1);
498 self.memory_bytes = self
499 .memory_bytes
500 .saturating_add(DataKey::entry_size_bytes(value_len));
501 }
502}
503
504fn update_default_entity_stats(
508 entity_stats: &mut Vec<(EntityTag, EntityStats)>,
509 entity_tag: EntityTag,
510 value_len: u64,
511) {
512 if let Some((_, stats)) = entity_stats
513 .iter_mut()
514 .find(|(existing_tag, _)| *existing_tag == entity_tag)
515 {
516 stats.update(value_len);
517 return;
518 }
519
520 let mut stats = EntityStats::default();
521 stats.update(value_len);
522 entity_stats.push((entity_tag, stats));
523}
524
525fn storage_report_name_for_hook<'a, C: CanisterKind>(
526 name_map: &BTreeMap<&'static str, &'a str>,
527 hooks: &EntityRuntimeHooks<C>,
528) -> &'a str {
529 name_map
530 .get(hooks.entity_path)
531 .copied()
532 .or_else(|| name_map.get(hooks.model.name()).copied())
533 .unwrap_or(hooks.entity_path)
534}
535
536fn storage_report_default_name_for_entity_tag<C: CanisterKind>(
539 db: &Db<C>,
540 entity_tag: EntityTag,
541) -> String {
542 db.runtime_hook_for_entity_tag(entity_tag).ok().map_or_else(
543 || format!("#{}", entity_tag.value()),
544 |hooks| hooks.entity_path.to_string(),
545 )
546}
547
548#[cfg_attr(
549 doc,
550 doc = "Build one deterministic storage snapshot with default entity-path names.\n\nThis variant is used by generated snapshot endpoints that never pass alias remapping, so it keeps the snapshot root independent from optional alias-resolution machinery."
551)]
552pub(crate) fn storage_report_default<C: CanisterKind>(
553 db: &Db<C>,
554) -> Result<StorageReport, InternalError> {
555 db.ensure_recovered_state()?;
556 let mut data = Vec::new();
557 let mut index = Vec::new();
558 let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
559 let mut corrupted_keys = 0u64;
560 let mut corrupted_entries = 0u64;
561
562 db.with_store_registry(|reg| {
563 let mut stores = reg.iter().collect::<Vec<_>>();
565 stores.sort_by_key(|(path, _)| *path);
566
567 for (path, store_handle) in stores {
568 store_handle.with_data(|store| {
570 data.push(DataStoreSnapshot::new(
571 path.to_string(),
572 store.len(),
573 store.memory_bytes(),
574 ));
575
576 let mut by_entity = Vec::<(EntityTag, EntityStats)>::new();
578
579 for entry in store.iter() {
580 let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
581 corrupted_keys = corrupted_keys.saturating_add(1);
582 continue;
583 };
584
585 let value_len = entry.value().len() as u64;
586
587 update_default_entity_stats(&mut by_entity, dk.entity_tag(), value_len);
588 }
589
590 for (entity_tag, stats) in by_entity {
591 entity_storage.push(EntitySnapshot::new(
592 path.to_string(),
593 storage_report_default_name_for_entity_tag(db, entity_tag),
594 stats.entries,
595 stats.memory_bytes,
596 ));
597 }
598 });
599
600 store_handle.with_index(|store| {
602 let mut user_entries = 0u64;
603 let mut system_entries = 0u64;
604
605 for (key, value) in store.entries() {
606 let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
607 corrupted_entries = corrupted_entries.saturating_add(1);
608 continue;
609 };
610
611 if decoded_key.uses_system_namespace() {
612 system_entries = system_entries.saturating_add(1);
613 } else {
614 user_entries = user_entries.saturating_add(1);
615 }
616
617 if value.validate().is_err() {
618 corrupted_entries = corrupted_entries.saturating_add(1);
619 }
620 }
621
622 index.push(IndexStoreSnapshot::new(
623 path.to_string(),
624 store.len(),
625 user_entries,
626 system_entries,
627 store.memory_bytes(),
628 store.state(),
629 ));
630 });
631 }
632 });
633
634 entity_storage
637 .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
638
639 Ok(StorageReport::new(
640 data,
641 index,
642 entity_storage,
643 corrupted_keys,
644 corrupted_entries,
645 ))
646}
647
648#[cfg_attr(
649 doc,
650 doc = "Build one deterministic storage snapshot with per-entity rollups.\n\nThis path is read-only and fail-closed on decode/validation errors by counting corrupted keys/entries instead of panicking."
651)]
652pub(crate) fn storage_report<C: CanisterKind>(
653 db: &Db<C>,
654 name_to_path: &[(&'static str, &'static str)],
655) -> Result<StorageReport, InternalError> {
656 db.ensure_recovered_state()?;
657 let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
661 let mut tag_name_map = BTreeMap::<EntityTag, &str>::new();
662 for hooks in db.entity_runtime_hooks {
663 tag_name_map
664 .entry(hooks.entity_tag)
665 .or_insert_with(|| storage_report_name_for_hook(&name_map, hooks));
666 }
667 let mut data = Vec::new();
668 let mut index = Vec::new();
669 let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
670 let mut corrupted_keys = 0u64;
671 let mut corrupted_entries = 0u64;
672
673 db.with_store_registry(|reg| {
674 let mut stores = reg.iter().collect::<Vec<_>>();
676 stores.sort_by_key(|(path, _)| *path);
677
678 for (path, store_handle) in stores {
679 store_handle.with_data(|store| {
681 data.push(DataStoreSnapshot::new(
682 path.to_string(),
683 store.len(),
684 store.memory_bytes(),
685 ));
686
687 let mut by_entity: BTreeMap<EntityTag, EntityStats> = BTreeMap::new();
689
690 for entry in store.iter() {
691 let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
692 corrupted_keys = corrupted_keys.saturating_add(1);
693 continue;
694 };
695
696 let value_len = entry.value().len() as u64;
697
698 by_entity
699 .entry(dk.entity_tag())
700 .or_default()
701 .update(value_len);
702 }
703
704 for (entity_tag, stats) in by_entity {
705 let path_name = tag_name_map
706 .get(&entity_tag)
707 .copied()
708 .map(str::to_string)
709 .or_else(|| {
710 db.runtime_hook_for_entity_tag(entity_tag)
711 .ok()
712 .map(|hooks| {
713 storage_report_name_for_hook(&name_map, hooks).to_string()
714 })
715 })
716 .unwrap_or_else(|| format!("#{}", entity_tag.value()));
717 entity_storage.push(EntitySnapshot::new(
718 path.to_string(),
719 path_name,
720 stats.entries,
721 stats.memory_bytes,
722 ));
723 }
724 });
725
726 store_handle.with_index(|store| {
728 let mut user_entries = 0u64;
729 let mut system_entries = 0u64;
730
731 for (key, value) in store.entries() {
732 let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
733 corrupted_entries = corrupted_entries.saturating_add(1);
734 continue;
735 };
736
737 if decoded_key.uses_system_namespace() {
738 system_entries = system_entries.saturating_add(1);
739 } else {
740 user_entries = user_entries.saturating_add(1);
741 }
742
743 if value.validate().is_err() {
744 corrupted_entries = corrupted_entries.saturating_add(1);
745 }
746 }
747
748 index.push(IndexStoreSnapshot::new(
749 path.to_string(),
750 store.len(),
751 user_entries,
752 system_entries,
753 store.memory_bytes(),
754 store.state(),
755 ));
756 });
757 }
758 });
759
760 entity_storage
763 .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
764
765 Ok(StorageReport::new(
766 data,
767 index,
768 entity_storage,
769 corrupted_keys,
770 corrupted_entries,
771 ))
772}
773
774#[cfg_attr(
775 doc,
776 doc = "Build one deterministic integrity scan over all registered stores.\n\nThis scan is read-only and classifies findings as:\n- corruption: malformed persisted bytes, incompatible persisted formats, or inconsistent structural links\n- misuse: unsupported runtime wiring (for example missing entity hooks)"
777)]
778pub(crate) fn integrity_report<C: CanisterKind>(
779 db: &Db<C>,
780) -> Result<IntegrityReport, InternalError> {
781 db.ensure_recovered_state()?;
782
783 integrity_report_after_recovery(db)
784}
785
786#[cfg_attr(
787 doc,
788 doc = "Build one deterministic integrity scan after recovery has already completed.\n\nCallers running inside recovery flow should use this variant to avoid recursive recovery gating."
789)]
790pub(in crate::db) fn integrity_report_after_recovery<C: CanisterKind>(
791 db: &Db<C>,
792) -> Result<IntegrityReport, InternalError> {
793 build_integrity_report(db)
794}
795
796fn build_integrity_report<C: CanisterKind>(db: &Db<C>) -> Result<IntegrityReport, InternalError> {
797 let mut stores = Vec::new();
798 let mut totals = IntegrityTotals::default();
799 let global_live_keys_by_entity = collect_global_live_keys_by_entity(db)?;
800
801 db.with_store_registry(|reg| {
802 let mut store_entries = reg.iter().collect::<Vec<_>>();
804 store_entries.sort_by_key(|(path, _)| *path);
805
806 for (path, store_handle) in store_entries {
807 let mut snapshot = IntegrityStoreSnapshot::new(path.to_string());
808 scan_store_forward_integrity(db, store_handle, &mut snapshot)?;
809 scan_store_reverse_integrity(store_handle, &global_live_keys_by_entity, &mut snapshot);
810
811 totals.add_store_snapshot(&snapshot);
812 stores.push(snapshot);
813 }
814
815 Ok::<(), InternalError>(())
816 })?;
817
818 Ok(IntegrityReport::new(stores, totals))
819}
820
821fn collect_global_live_keys_by_entity<C: CanisterKind>(
823 db: &Db<C>,
824) -> Result<BTreeMap<EntityTag, BTreeSet<StorageKey>>, InternalError> {
825 let mut keys = BTreeMap::<EntityTag, BTreeSet<StorageKey>>::new();
826
827 db.with_store_registry(|reg| {
828 for (_, store_handle) in reg.iter() {
829 store_handle.with_data(|data_store| {
830 for entry in data_store.iter() {
831 if let Ok(data_key) = DataKey::try_from_raw(entry.key()) {
832 keys.entry(data_key.entity_tag())
833 .or_default()
834 .insert(data_key.storage_key());
835 }
836 }
837 });
838 }
839
840 Ok::<(), InternalError>(())
841 })?;
842
843 Ok(keys)
844}
845
846fn scan_store_forward_integrity<C: CanisterKind>(
848 db: &Db<C>,
849 store_handle: StoreHandle,
850 snapshot: &mut IntegrityStoreSnapshot,
851) -> Result<(), InternalError> {
852 store_handle.with_data(|data_store| {
853 for entry in data_store.iter() {
854 snapshot.data_rows_scanned = snapshot.data_rows_scanned.saturating_add(1);
855
856 let raw_key = *entry.key();
857
858 let Ok(data_key) = DataKey::try_from_raw(&raw_key) else {
859 snapshot.corrupted_data_keys = snapshot.corrupted_data_keys.saturating_add(1);
860 continue;
861 };
862
863 let hooks = match db.runtime_hook_for_entity_tag(data_key.entity_tag()) {
864 Ok(hooks) => hooks,
865 Err(err) => {
866 classify_scan_error(err, snapshot)?;
867 continue;
868 }
869 };
870
871 let marker_row = CommitRowOp::new(
872 hooks.entity_path,
873 raw_key,
874 None,
875 Some(entry.value().as_bytes().to_vec()),
876 crate::db::schema::commit_schema_fingerprint_for_model(
877 hooks.entity_path,
878 hooks.model,
879 ),
880 );
881
882 if let Err(err) = decode_structural_row_payload(&entry.value()) {
885 classify_scan_error(err, snapshot)?;
886 continue;
887 }
888
889 let prepared = match db.prepare_row_commit_op(&marker_row) {
890 Ok(prepared) => prepared,
891 Err(err) => {
892 classify_scan_error(err, snapshot)?;
893 continue;
894 }
895 };
896
897 for index_op in prepared.index_ops {
898 let Some(expected_value) = index_op.value else {
899 continue;
900 };
901
902 let actual = index_op
903 .store
904 .with_borrow(|index_store| index_store.get(&index_op.key));
905 match actual {
906 Some(actual_value) if actual_value == expected_value => {}
907 Some(_) => {
908 snapshot.divergent_index_entries =
909 snapshot.divergent_index_entries.saturating_add(1);
910 }
911 None => {
912 snapshot.missing_index_entries =
913 snapshot.missing_index_entries.saturating_add(1);
914 }
915 }
916 }
917 }
918
919 Ok::<(), InternalError>(())
920 })
921}
922
923fn scan_store_reverse_integrity(
925 store_handle: StoreHandle,
926 live_keys_by_entity: &BTreeMap<EntityTag, BTreeSet<StorageKey>>,
927 snapshot: &mut IntegrityStoreSnapshot,
928) {
929 store_handle.with_index(|index_store| {
930 for (raw_index_key, raw_index_entry) in index_store.entries() {
931 snapshot.index_entries_scanned = snapshot.index_entries_scanned.saturating_add(1);
932
933 let Ok(decoded_index_key) = IndexKey::try_from_raw(&raw_index_key) else {
934 snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
935 continue;
936 };
937
938 let index_entity_tag = data_entity_tag_for_index_key(&decoded_index_key);
939
940 let Ok(indexed_primary_keys) = raw_index_entry.decode_keys() else {
941 snapshot.corrupted_index_entries =
942 snapshot.corrupted_index_entries.saturating_add(1);
943 continue;
944 };
945
946 for primary_key in indexed_primary_keys {
947 let exists = live_keys_by_entity
948 .get(&index_entity_tag)
949 .is_some_and(|entity_keys| entity_keys.contains(&primary_key));
950 if !exists {
951 snapshot.orphan_index_references =
952 snapshot.orphan_index_references.saturating_add(1);
953 }
954 }
955 }
956 });
957}
958
959fn classify_scan_error(
961 err: InternalError,
962 snapshot: &mut IntegrityStoreSnapshot,
963) -> Result<(), InternalError> {
964 match err.class() {
965 ErrorClass::Corruption | ErrorClass::IncompatiblePersistedFormat => {
966 snapshot.corrupted_data_rows = snapshot.corrupted_data_rows.saturating_add(1);
967 Ok(())
968 }
969 ErrorClass::Unsupported | ErrorClass::NotFound | ErrorClass::Conflict => {
970 snapshot.misuse_findings = snapshot.misuse_findings.saturating_add(1);
971 Ok(())
972 }
973 ErrorClass::Internal | ErrorClass::InvariantViolation => Err(err),
974 }
975}
976
977const fn data_entity_tag_for_index_key(index_key: &IndexKey) -> EntityTag {
979 index_key.index_id().entity_tag
980}