Skip to main content

icydb_core/db/diagnostics/
mod.rs

1//! Module: diagnostics
2//! Responsibility: read-only storage footprint and integrity snapshots.
3//! Does not own: recovery, write-path mutation, or query planning semantics.
4//! Boundary: consumes `Db`/store read APIs and returns DTO snapshots.
5
6mod execution_trace;
7
8use crate::{
9    db::{
10        Db,
11        commit::CommitRowOp,
12        data::{DataKey, StorageKey, decode_structural_row_cbor},
13        index::IndexKey,
14        registry::StoreHandle,
15    },
16    error::{ErrorClass, InternalError},
17    traits::{CanisterKind, Repr},
18    types::EntityTag,
19};
20use candid::CandidType;
21use serde::Deserialize;
22use std::collections::{BTreeMap, BTreeSet};
23
24pub use execution_trace::{
25    ExecutionAccessPathVariant, ExecutionMetrics, ExecutionOptimization, ExecutionTrace,
26};
27
28///
29/// StorageReport
30/// Live storage snapshot report
31///
32
33#[derive(CandidType, Clone, Debug, Default, Deserialize)]
34pub struct StorageReport {
35    pub(crate) storage_data: Vec<DataStoreSnapshot>,
36    pub(crate) storage_index: Vec<IndexStoreSnapshot>,
37    pub(crate) entity_storage: Vec<EntitySnapshot>,
38    pub(crate) corrupted_keys: u64,
39    pub(crate) corrupted_entries: u64,
40}
41
42///
43/// IntegrityTotals
44/// Aggregated integrity-scan counters across all stores.
45///
46
47#[derive(CandidType, Clone, Debug, Default, Deserialize)]
48pub struct IntegrityTotals {
49    pub(crate) data_rows_scanned: u64,
50    pub(crate) index_entries_scanned: u64,
51    pub(crate) corrupted_data_keys: u64,
52    pub(crate) corrupted_data_rows: u64,
53    pub(crate) corrupted_index_keys: u64,
54    pub(crate) corrupted_index_entries: u64,
55    pub(crate) missing_index_entries: u64,
56    pub(crate) divergent_index_entries: u64,
57    pub(crate) orphan_index_references: u64,
58    pub(crate) compatibility_findings: u64,
59    pub(crate) misuse_findings: u64,
60}
61
62impl IntegrityTotals {
63    const fn add_store_snapshot(&mut self, store: &IntegrityStoreSnapshot) {
64        self.data_rows_scanned = self
65            .data_rows_scanned
66            .saturating_add(store.data_rows_scanned);
67        self.index_entries_scanned = self
68            .index_entries_scanned
69            .saturating_add(store.index_entries_scanned);
70        self.corrupted_data_keys = self
71            .corrupted_data_keys
72            .saturating_add(store.corrupted_data_keys);
73        self.corrupted_data_rows = self
74            .corrupted_data_rows
75            .saturating_add(store.corrupted_data_rows);
76        self.corrupted_index_keys = self
77            .corrupted_index_keys
78            .saturating_add(store.corrupted_index_keys);
79        self.corrupted_index_entries = self
80            .corrupted_index_entries
81            .saturating_add(store.corrupted_index_entries);
82        self.missing_index_entries = self
83            .missing_index_entries
84            .saturating_add(store.missing_index_entries);
85        self.divergent_index_entries = self
86            .divergent_index_entries
87            .saturating_add(store.divergent_index_entries);
88        self.orphan_index_references = self
89            .orphan_index_references
90            .saturating_add(store.orphan_index_references);
91        self.compatibility_findings = self
92            .compatibility_findings
93            .saturating_add(store.compatibility_findings);
94        self.misuse_findings = self.misuse_findings.saturating_add(store.misuse_findings);
95    }
96
97    /// Return total number of data rows scanned.
98    #[must_use]
99    pub const fn data_rows_scanned(&self) -> u64 {
100        self.data_rows_scanned
101    }
102
103    /// Return total number of index entries scanned.
104    #[must_use]
105    pub const fn index_entries_scanned(&self) -> u64 {
106        self.index_entries_scanned
107    }
108
109    /// Return total number of corrupted data-key findings.
110    #[must_use]
111    pub const fn corrupted_data_keys(&self) -> u64 {
112        self.corrupted_data_keys
113    }
114
115    /// Return total number of corrupted data-row findings.
116    #[must_use]
117    pub const fn corrupted_data_rows(&self) -> u64 {
118        self.corrupted_data_rows
119    }
120
121    /// Return total number of corrupted index-key findings.
122    #[must_use]
123    pub const fn corrupted_index_keys(&self) -> u64 {
124        self.corrupted_index_keys
125    }
126
127    /// Return total number of corrupted index-entry findings.
128    #[must_use]
129    pub const fn corrupted_index_entries(&self) -> u64 {
130        self.corrupted_index_entries
131    }
132
133    /// Return total number of missing index-entry findings.
134    #[must_use]
135    pub const fn missing_index_entries(&self) -> u64 {
136        self.missing_index_entries
137    }
138
139    /// Return total number of divergent index-entry findings.
140    #[must_use]
141    pub const fn divergent_index_entries(&self) -> u64 {
142        self.divergent_index_entries
143    }
144
145    /// Return total number of orphan index-reference findings.
146    #[must_use]
147    pub const fn orphan_index_references(&self) -> u64 {
148        self.orphan_index_references
149    }
150
151    /// Return total number of compatibility findings.
152    #[must_use]
153    pub const fn compatibility_findings(&self) -> u64 {
154        self.compatibility_findings
155    }
156
157    /// Return total number of misuse findings.
158    #[must_use]
159    pub const fn misuse_findings(&self) -> u64 {
160        self.misuse_findings
161    }
162}
163
164///
165/// IntegrityStoreSnapshot
166/// Per-store integrity findings and scan counters.
167///
168
169#[derive(CandidType, Clone, Debug, Default, Deserialize)]
170pub struct IntegrityStoreSnapshot {
171    pub(crate) path: String,
172    pub(crate) data_rows_scanned: u64,
173    pub(crate) index_entries_scanned: u64,
174    pub(crate) corrupted_data_keys: u64,
175    pub(crate) corrupted_data_rows: u64,
176    pub(crate) corrupted_index_keys: u64,
177    pub(crate) corrupted_index_entries: u64,
178    pub(crate) missing_index_entries: u64,
179    pub(crate) divergent_index_entries: u64,
180    pub(crate) orphan_index_references: u64,
181    pub(crate) compatibility_findings: u64,
182    pub(crate) misuse_findings: u64,
183}
184
185impl IntegrityStoreSnapshot {
186    /// Construct one empty store-level integrity snapshot.
187    #[must_use]
188    pub fn new(path: String) -> Self {
189        Self {
190            path,
191            ..Self::default()
192        }
193    }
194
195    /// Borrow store path.
196    #[must_use]
197    pub const fn path(&self) -> &str {
198        self.path.as_str()
199    }
200
201    /// Return number of scanned data rows.
202    #[must_use]
203    pub const fn data_rows_scanned(&self) -> u64 {
204        self.data_rows_scanned
205    }
206
207    /// Return number of scanned index entries.
208    #[must_use]
209    pub const fn index_entries_scanned(&self) -> u64 {
210        self.index_entries_scanned
211    }
212
213    /// Return number of corrupted data-key findings.
214    #[must_use]
215    pub const fn corrupted_data_keys(&self) -> u64 {
216        self.corrupted_data_keys
217    }
218
219    /// Return number of corrupted data-row findings.
220    #[must_use]
221    pub const fn corrupted_data_rows(&self) -> u64 {
222        self.corrupted_data_rows
223    }
224
225    /// Return number of corrupted index-key findings.
226    #[must_use]
227    pub const fn corrupted_index_keys(&self) -> u64 {
228        self.corrupted_index_keys
229    }
230
231    /// Return number of corrupted index-entry findings.
232    #[must_use]
233    pub const fn corrupted_index_entries(&self) -> u64 {
234        self.corrupted_index_entries
235    }
236
237    /// Return number of missing index-entry findings.
238    #[must_use]
239    pub const fn missing_index_entries(&self) -> u64 {
240        self.missing_index_entries
241    }
242
243    /// Return number of divergent index-entry findings.
244    #[must_use]
245    pub const fn divergent_index_entries(&self) -> u64 {
246        self.divergent_index_entries
247    }
248
249    /// Return number of orphan index-reference findings.
250    #[must_use]
251    pub const fn orphan_index_references(&self) -> u64 {
252        self.orphan_index_references
253    }
254
255    /// Return number of compatibility findings.
256    #[must_use]
257    pub const fn compatibility_findings(&self) -> u64 {
258        self.compatibility_findings
259    }
260
261    /// Return number of misuse findings.
262    #[must_use]
263    pub const fn misuse_findings(&self) -> u64 {
264        self.misuse_findings
265    }
266}
267
268///
269/// IntegrityReport
270/// Full integrity-scan output across all registered stores.
271///
272
273#[derive(CandidType, Clone, Debug, Default, Deserialize)]
274pub struct IntegrityReport {
275    pub(crate) stores: Vec<IntegrityStoreSnapshot>,
276    pub(crate) totals: IntegrityTotals,
277}
278
279impl IntegrityReport {
280    /// Construct one integrity report payload.
281    #[must_use]
282    pub const fn new(stores: Vec<IntegrityStoreSnapshot>, totals: IntegrityTotals) -> Self {
283        Self { stores, totals }
284    }
285
286    /// Borrow per-store integrity snapshots.
287    #[must_use]
288    pub const fn stores(&self) -> &[IntegrityStoreSnapshot] {
289        self.stores.as_slice()
290    }
291
292    /// Borrow aggregated integrity totals.
293    #[must_use]
294    pub const fn totals(&self) -> &IntegrityTotals {
295        &self.totals
296    }
297}
298
299impl StorageReport {
300    /// Construct one storage report payload.
301    #[must_use]
302    pub const fn new(
303        storage_data: Vec<DataStoreSnapshot>,
304        storage_index: Vec<IndexStoreSnapshot>,
305        entity_storage: Vec<EntitySnapshot>,
306        corrupted_keys: u64,
307        corrupted_entries: u64,
308    ) -> Self {
309        Self {
310            storage_data,
311            storage_index,
312            entity_storage,
313            corrupted_keys,
314            corrupted_entries,
315        }
316    }
317
318    /// Borrow data-store snapshots.
319    #[must_use]
320    pub const fn storage_data(&self) -> &[DataStoreSnapshot] {
321        self.storage_data.as_slice()
322    }
323
324    /// Borrow index-store snapshots.
325    #[must_use]
326    pub const fn storage_index(&self) -> &[IndexStoreSnapshot] {
327        self.storage_index.as_slice()
328    }
329
330    /// Borrow entity-level storage snapshots.
331    #[must_use]
332    pub const fn entity_storage(&self) -> &[EntitySnapshot] {
333        self.entity_storage.as_slice()
334    }
335
336    /// Return count of corrupted decoded data keys.
337    #[must_use]
338    pub const fn corrupted_keys(&self) -> u64 {
339        self.corrupted_keys
340    }
341
342    /// Return count of corrupted index entries.
343    #[must_use]
344    pub const fn corrupted_entries(&self) -> u64 {
345        self.corrupted_entries
346    }
347}
348
349///
350/// DataStoreSnapshot
351/// Store-level snapshot metrics.
352///
353
354#[derive(CandidType, Clone, Debug, Default, Deserialize)]
355pub struct DataStoreSnapshot {
356    pub(crate) path: String,
357    pub(crate) entries: u64,
358    pub(crate) memory_bytes: u64,
359}
360
361impl DataStoreSnapshot {
362    /// Construct one data-store snapshot row.
363    #[must_use]
364    pub const fn new(path: String, entries: u64, memory_bytes: u64) -> Self {
365        Self {
366            path,
367            entries,
368            memory_bytes,
369        }
370    }
371
372    /// Borrow store path.
373    #[must_use]
374    pub const fn path(&self) -> &str {
375        self.path.as_str()
376    }
377
378    /// Return row count.
379    #[must_use]
380    pub const fn entries(&self) -> u64 {
381        self.entries
382    }
383
384    /// Return memory usage in bytes.
385    #[must_use]
386    pub const fn memory_bytes(&self) -> u64 {
387        self.memory_bytes
388    }
389}
390
391///
392/// IndexStoreSnapshot
393/// Index-store snapshot metrics
394///
395
396#[derive(CandidType, Clone, Debug, Default, Deserialize)]
397pub struct IndexStoreSnapshot {
398    pub(crate) path: String,
399    pub(crate) entries: u64,
400    pub(crate) user_entries: u64,
401    pub(crate) system_entries: u64,
402    pub(crate) memory_bytes: u64,
403}
404
405impl IndexStoreSnapshot {
406    /// Construct one index-store snapshot row.
407    #[must_use]
408    pub const fn new(
409        path: String,
410        entries: u64,
411        user_entries: u64,
412        system_entries: u64,
413        memory_bytes: u64,
414    ) -> Self {
415        Self {
416            path,
417            entries,
418            user_entries,
419            system_entries,
420            memory_bytes,
421        }
422    }
423
424    /// Borrow store path.
425    #[must_use]
426    pub const fn path(&self) -> &str {
427        self.path.as_str()
428    }
429
430    /// Return total entry count.
431    #[must_use]
432    pub const fn entries(&self) -> u64 {
433        self.entries
434    }
435
436    /// Return user-namespace entry count.
437    #[must_use]
438    pub const fn user_entries(&self) -> u64 {
439        self.user_entries
440    }
441
442    /// Return system-namespace entry count.
443    #[must_use]
444    pub const fn system_entries(&self) -> u64 {
445        self.system_entries
446    }
447
448    /// Return memory usage in bytes.
449    #[must_use]
450    pub const fn memory_bytes(&self) -> u64 {
451        self.memory_bytes
452    }
453}
454
455///
456/// EntitySnapshot
457/// Per-entity storage breakdown across stores
458///
459
460#[derive(CandidType, Clone, Debug, Default, Deserialize)]
461pub struct EntitySnapshot {
462    /// Store path (e.g., icydb_schema_tests::schema::TestDataStore)
463    pub(crate) store: String,
464
465    /// Entity path (e.g., icydb_schema_tests::canister::db::Index)
466    pub(crate) path: String,
467
468    /// Number of rows for this entity in the store
469    pub(crate) entries: u64,
470
471    /// Approximate bytes used (key + value)
472    pub(crate) memory_bytes: u64,
473
474    /// Minimum primary key for this entity (entity-local ordering)
475    pub(crate) min_key: Option<String>,
476
477    /// Maximum primary key for this entity (entity-local ordering)
478    pub(crate) max_key: Option<String>,
479}
480
481impl EntitySnapshot {
482    /// Construct one entity-storage snapshot row.
483    #[must_use]
484    pub fn new(
485        store: String,
486        path: String,
487        entries: u64,
488        memory_bytes: u64,
489        min_key: Option<StorageKey>,
490        max_key: Option<StorageKey>,
491    ) -> Self {
492        Self {
493            store,
494            path,
495            entries,
496            memory_bytes,
497            min_key: min_key.map(Self::storage_key_text),
498            max_key: max_key.map(Self::storage_key_text),
499        }
500    }
501
502    // Keep snapshot key rendering local to the diagnostics contract so the
503    // canister DTO does not retain the full `Value` Candid surface.
504    fn storage_key_text(key: StorageKey) -> String {
505        match key {
506            StorageKey::Account(value) => value.to_string(),
507            StorageKey::Int(value) => value.to_string(),
508            StorageKey::Principal(value) => value.to_string(),
509            StorageKey::Subaccount(value) => value.to_string(),
510            StorageKey::Timestamp(value) => value.repr().to_string(),
511            StorageKey::Uint(value) => value.to_string(),
512            StorageKey::Ulid(value) => value.to_string(),
513            StorageKey::Unit => "()".to_string(),
514        }
515    }
516
517    /// Borrow store path.
518    #[must_use]
519    pub const fn store(&self) -> &str {
520        self.store.as_str()
521    }
522
523    /// Borrow entity path.
524    #[must_use]
525    pub const fn path(&self) -> &str {
526        self.path.as_str()
527    }
528
529    /// Return row count.
530    #[must_use]
531    pub const fn entries(&self) -> u64 {
532        self.entries
533    }
534
535    /// Return memory usage in bytes.
536    #[must_use]
537    pub const fn memory_bytes(&self) -> u64 {
538        self.memory_bytes
539    }
540
541    /// Borrow optional minimum primary key.
542    #[must_use]
543    pub fn min_key(&self) -> Option<&str> {
544        self.min_key.as_deref()
545    }
546
547    /// Borrow optional maximum primary key.
548    #[must_use]
549    pub fn max_key(&self) -> Option<&str> {
550        self.max_key.as_deref()
551    }
552}
553
554///
555/// EntityStats
556/// Internal struct for building per-entity stats before snapshotting.
557///
558
559#[derive(Default)]
560struct EntityStats {
561    entries: u64,
562    memory_bytes: u64,
563    min_key: Option<StorageKey>,
564    max_key: Option<StorageKey>,
565}
566
567impl EntityStats {
568    // Accumulate per-entity counters and keep min/max over entity-local storage keys.
569    fn update(&mut self, dk: &DataKey, value_len: u64) {
570        self.entries = self.entries.saturating_add(1);
571        self.memory_bytes = self
572            .memory_bytes
573            .saturating_add(DataKey::entry_size_bytes(value_len));
574
575        let k = dk.storage_key();
576
577        match &mut self.min_key {
578            Some(min) if k < *min => *min = k,
579            None => self.min_key = Some(k),
580            _ => {}
581        }
582
583        match &mut self.max_key {
584            Some(max) if k > *max => *max = k,
585            None => self.max_key = Some(k),
586            _ => {}
587        }
588    }
589}
590
591/// Build one deterministic storage snapshot with per-entity rollups.
592///
593/// This path is read-only and fail-closed on decode/validation errors by counting
594/// corrupted keys/entries instead of panicking.
595pub(crate) fn storage_report<C: CanisterKind>(
596    db: &Db<C>,
597    name_to_path: &[(&'static str, &'static str)],
598) -> Result<StorageReport, InternalError> {
599    db.ensure_recovered_state()?;
600    // Build name→path map once, reuse across stores.
601    let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
602    let runtime_name_to_tag: BTreeMap<&str, EntityTag> =
603        db.runtime_entity_name_tag_pairs().into_iter().collect();
604    // Build one deterministic tag→path alias map to preserve report naming even
605    // after persisted keys move from string names to tag identities.
606    let mut tag_name_map = BTreeMap::<EntityTag, &str>::new();
607    for (entity_name, entity_tag) in &runtime_name_to_tag {
608        let path_name = name_map.get(entity_name).copied().unwrap_or(*entity_name);
609        tag_name_map.entry(*entity_tag).or_insert(path_name);
610    }
611    let mut data = Vec::new();
612    let mut index = Vec::new();
613    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
614    let mut corrupted_keys = 0u64;
615    let mut corrupted_entries = 0u64;
616
617    db.with_store_registry(|reg| {
618        // Keep diagnostics snapshots deterministic by traversing stores in path order.
619        let mut stores = reg.iter().collect::<Vec<_>>();
620        stores.sort_by_key(|(path, _)| *path);
621
622        for (path, store_handle) in stores {
623            // Phase 1: collect data-store snapshots and per-entity stats.
624            store_handle.with_data(|store| {
625                data.push(DataStoreSnapshot::new(
626                    path.to_string(),
627                    store.len(),
628                    store.memory_bytes(),
629                ));
630
631                // Track per-entity counts, memory, and min/max Keys (not DataKeys)
632                let mut by_entity: BTreeMap<EntityTag, EntityStats> = BTreeMap::new();
633
634                for entry in store.iter() {
635                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
636                        corrupted_keys = corrupted_keys.saturating_add(1);
637                        continue;
638                    };
639
640                    let value_len = entry.value().len() as u64;
641
642                    by_entity
643                        .entry(dk.entity_tag())
644                        .or_default()
645                        .update(&dk, value_len);
646                }
647
648                for (entity_tag, stats) in by_entity {
649                    let path_name = tag_name_map
650                        .get(&entity_tag)
651                        .copied()
652                        .map(str::to_string)
653                        .or_else(|| {
654                            db.runtime_hook_for_entity_tag(entity_tag)
655                                .ok()
656                                .map(|hooks| {
657                                    name_map
658                                        .get(hooks.model.name())
659                                        .copied()
660                                        .unwrap_or_else(|| hooks.model.name())
661                                        .to_string()
662                                })
663                        })
664                        .unwrap_or_else(|| format!("#{}", entity_tag.value()));
665                    entity_storage.push(EntitySnapshot::new(
666                        path.to_string(),
667                        path_name,
668                        stats.entries,
669                        stats.memory_bytes,
670                        stats.min_key,
671                        stats.max_key,
672                    ));
673                }
674            });
675
676            // Phase 2: collect index-store snapshots and integrity counters.
677            store_handle.with_index(|store| {
678                let mut user_entries = 0u64;
679                let mut system_entries = 0u64;
680
681                for (key, value) in store.entries() {
682                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
683                        corrupted_entries = corrupted_entries.saturating_add(1);
684                        continue;
685                    };
686
687                    if decoded_key.uses_system_namespace() {
688                        system_entries = system_entries.saturating_add(1);
689                    } else {
690                        user_entries = user_entries.saturating_add(1);
691                    }
692
693                    if value.validate().is_err() {
694                        corrupted_entries = corrupted_entries.saturating_add(1);
695                    }
696                }
697
698                index.push(IndexStoreSnapshot::new(
699                    path.to_string(),
700                    store.len(),
701                    user_entries,
702                    system_entries,
703                    store.memory_bytes(),
704                ));
705            });
706        }
707    });
708
709    // Phase 3: enforce deterministic entity snapshot emission order.
710    // This remains stable even if outer store traversal internals change.
711    entity_storage
712        .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
713
714    Ok(StorageReport::new(
715        data,
716        index,
717        entity_storage,
718        corrupted_keys,
719        corrupted_entries,
720    ))
721}
722
723/// Build one deterministic integrity scan over all registered stores.
724///
725/// This scan is read-only and classifies findings as:
726/// - corruption: malformed persisted bytes or inconsistent structural links
727/// - compatibility: persisted payloads outside decode compatibility windows
728/// - misuse: unsupported runtime wiring (for example missing entity hooks)
729pub(crate) fn integrity_report<C: CanisterKind>(
730    db: &Db<C>,
731) -> Result<IntegrityReport, InternalError> {
732    db.ensure_recovered_state()?;
733
734    integrity_report_after_recovery(db)
735}
736
737/// Build one deterministic integrity scan after recovery has already completed.
738///
739/// Callers running inside recovery flow should use this variant to avoid
740/// recursive recovery gating.
741pub(in crate::db) fn integrity_report_after_recovery<C: CanisterKind>(
742    db: &Db<C>,
743) -> Result<IntegrityReport, InternalError> {
744    build_integrity_report(db)
745}
746
747fn build_integrity_report<C: CanisterKind>(db: &Db<C>) -> Result<IntegrityReport, InternalError> {
748    let mut stores = Vec::new();
749    let mut totals = IntegrityTotals::default();
750    let global_live_keys_by_entity = collect_global_live_keys_by_entity(db)?;
751
752    db.with_store_registry(|reg| {
753        // Keep deterministic output order across registry traversal implementations.
754        let mut store_entries = reg.iter().collect::<Vec<_>>();
755        store_entries.sort_by_key(|(path, _)| *path);
756
757        for (path, store_handle) in store_entries {
758            let mut snapshot = IntegrityStoreSnapshot::new(path.to_string());
759            scan_store_forward_integrity(db, store_handle, &mut snapshot)?;
760            scan_store_reverse_integrity(store_handle, &global_live_keys_by_entity, &mut snapshot);
761
762            totals.add_store_snapshot(&snapshot);
763            stores.push(snapshot);
764        }
765
766        Ok::<(), InternalError>(())
767    })?;
768
769    Ok(IntegrityReport::new(stores, totals))
770}
771
772// Build one global map of live data keys grouped by entity across all stores.
773fn collect_global_live_keys_by_entity<C: CanisterKind>(
774    db: &Db<C>,
775) -> Result<BTreeMap<EntityTag, BTreeSet<StorageKey>>, InternalError> {
776    let mut keys = BTreeMap::<EntityTag, BTreeSet<StorageKey>>::new();
777
778    db.with_store_registry(|reg| {
779        for (_, store_handle) in reg.iter() {
780            store_handle.with_data(|data_store| {
781                for entry in data_store.iter() {
782                    if let Ok(data_key) = DataKey::try_from_raw(entry.key()) {
783                        keys.entry(data_key.entity_tag())
784                            .or_default()
785                            .insert(data_key.storage_key());
786                    }
787                }
788            });
789        }
790
791        Ok::<(), InternalError>(())
792    })?;
793
794    Ok(keys)
795}
796
797// Run forward (data -> index) integrity checks for one store.
798fn scan_store_forward_integrity<C: CanisterKind>(
799    db: &Db<C>,
800    store_handle: StoreHandle,
801    snapshot: &mut IntegrityStoreSnapshot,
802) -> Result<(), InternalError> {
803    store_handle.with_data(|data_store| {
804        for entry in data_store.iter() {
805            snapshot.data_rows_scanned = snapshot.data_rows_scanned.saturating_add(1);
806
807            let raw_key = *entry.key();
808
809            let Ok(data_key) = DataKey::try_from_raw(&raw_key) else {
810                snapshot.corrupted_data_keys = snapshot.corrupted_data_keys.saturating_add(1);
811                continue;
812            };
813
814            let hooks = match db.runtime_hook_for_entity_tag(data_key.entity_tag()) {
815                Ok(hooks) => hooks,
816                Err(err) => {
817                    classify_scan_error(err, snapshot)?;
818                    continue;
819                }
820            };
821
822            let marker_row = CommitRowOp::new(
823                hooks.entity_path,
824                raw_key.as_bytes().to_vec(),
825                None,
826                Some(entry.value().as_bytes().to_vec()),
827                crate::db::schema::commit_schema_fingerprint_for_model(
828                    hooks.entity_path,
829                    hooks.model,
830                ),
831            );
832
833            // Validate envelope compatibility before typed preparation so
834            // incompatible persisted formats remain compatibility-classified.
835            if let Err(err) = decode_structural_row_cbor(&entry.value()) {
836                classify_scan_error(err, snapshot)?;
837                continue;
838            }
839
840            let prepared = match db.prepare_row_commit_op(&marker_row) {
841                Ok(prepared) => prepared,
842                Err(err) => {
843                    classify_scan_error(err, snapshot)?;
844                    continue;
845                }
846            };
847
848            for index_op in prepared.index_ops {
849                let Some(expected_value) = index_op.value else {
850                    continue;
851                };
852
853                let actual = index_op
854                    .store
855                    .with_borrow(|index_store| index_store.get(&index_op.key));
856                match actual {
857                    Some(actual_value) if actual_value == expected_value => {}
858                    Some(_) => {
859                        snapshot.divergent_index_entries =
860                            snapshot.divergent_index_entries.saturating_add(1);
861                    }
862                    None => {
863                        snapshot.missing_index_entries =
864                            snapshot.missing_index_entries.saturating_add(1);
865                    }
866                }
867            }
868        }
869
870        Ok::<(), InternalError>(())
871    })
872}
873
874// Run reverse (index -> data) integrity checks for one store.
875fn scan_store_reverse_integrity(
876    store_handle: StoreHandle,
877    live_keys_by_entity: &BTreeMap<EntityTag, BTreeSet<StorageKey>>,
878    snapshot: &mut IntegrityStoreSnapshot,
879) {
880    store_handle.with_index(|index_store| {
881        for (raw_index_key, raw_index_entry) in index_store.entries() {
882            snapshot.index_entries_scanned = snapshot.index_entries_scanned.saturating_add(1);
883
884            let Ok(decoded_index_key) = IndexKey::try_from_raw(&raw_index_key) else {
885                snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
886                continue;
887            };
888
889            let index_entity_tag = data_entity_tag_for_index_key(&decoded_index_key);
890
891            let Ok(indexed_primary_keys) = raw_index_entry.decode_keys() else {
892                snapshot.corrupted_index_entries =
893                    snapshot.corrupted_index_entries.saturating_add(1);
894                continue;
895            };
896
897            for primary_key in indexed_primary_keys {
898                let exists = live_keys_by_entity
899                    .get(&index_entity_tag)
900                    .is_some_and(|entity_keys| entity_keys.contains(&primary_key));
901                if !exists {
902                    snapshot.orphan_index_references =
903                        snapshot.orphan_index_references.saturating_add(1);
904                }
905            }
906        }
907    });
908}
909
910// Map scan-time errors into explicit integrity classification buckets.
911fn classify_scan_error(
912    err: InternalError,
913    snapshot: &mut IntegrityStoreSnapshot,
914) -> Result<(), InternalError> {
915    match err.class() {
916        ErrorClass::Corruption => {
917            snapshot.corrupted_data_rows = snapshot.corrupted_data_rows.saturating_add(1);
918            Ok(())
919        }
920        ErrorClass::IncompatiblePersistedFormat => {
921            snapshot.compatibility_findings = snapshot.compatibility_findings.saturating_add(1);
922            Ok(())
923        }
924        ErrorClass::Unsupported | ErrorClass::NotFound | ErrorClass::Conflict => {
925            snapshot.misuse_findings = snapshot.misuse_findings.saturating_add(1);
926            Ok(())
927        }
928        ErrorClass::Internal | ErrorClass::InvariantViolation => Err(err),
929    }
930}
931
932// Parse the data-entity identity from one decoded index key.
933const fn data_entity_tag_for_index_key(index_key: &IndexKey) -> EntityTag {
934    index_key.index_id().entity_tag
935}