Skip to main content

icydb_core/db/diagnostics/
mod.rs

1//! Module: diagnostics
2//! Responsibility: read-only storage footprint and integrity snapshots.
3//! Does not own: recovery, write-path mutation, or query planning semantics.
4//! Boundary: consumes `Db`/store read APIs and returns DTO snapshots.
5
6mod execution_trace;
7#[cfg(test)]
8mod tests;
9
10use crate::{
11    db::{
12        Db, EntityRuntimeHooks,
13        commit::CommitRowOp,
14        data::{DataKey, StorageKey, decode_structural_row_cbor},
15        index::IndexKey,
16        registry::StoreHandle,
17    },
18    error::{ErrorClass, InternalError},
19    traits::{CanisterKind, Repr},
20    types::EntityTag,
21};
22use candid::CandidType;
23use serde::Deserialize;
24use std::collections::{BTreeMap, BTreeSet};
25
26pub use execution_trace::{
27    ExecutionAccessPathVariant, ExecutionMetrics, ExecutionOptimization, ExecutionTrace,
28};
29
30///
31/// StorageReport
32/// Live storage snapshot payload.
33///
34
35#[derive(CandidType, Clone, Debug, Default, Deserialize)]
36pub struct StorageReport {
37    pub(crate) storage_data: Vec<DataStoreSnapshot>,
38    pub(crate) storage_index: Vec<IndexStoreSnapshot>,
39    pub(crate) entity_storage: Vec<EntitySnapshot>,
40    pub(crate) corrupted_keys: u64,
41    pub(crate) corrupted_entries: u64,
42}
43
44///
45/// IntegrityTotals
46/// Aggregated integrity-scan counters across all stores.
47///
48
49#[derive(CandidType, Clone, Debug, Default, Deserialize)]
50pub struct IntegrityTotals {
51    pub(crate) data_rows_scanned: u64,
52    pub(crate) index_entries_scanned: u64,
53    pub(crate) corrupted_data_keys: u64,
54    pub(crate) corrupted_data_rows: u64,
55    pub(crate) corrupted_index_keys: u64,
56    pub(crate) corrupted_index_entries: u64,
57    pub(crate) missing_index_entries: u64,
58    pub(crate) divergent_index_entries: u64,
59    pub(crate) orphan_index_references: u64,
60    pub(crate) compatibility_findings: u64,
61    pub(crate) misuse_findings: u64,
62}
63
64impl IntegrityTotals {
65    const fn add_store_snapshot(&mut self, store: &IntegrityStoreSnapshot) {
66        self.data_rows_scanned = self
67            .data_rows_scanned
68            .saturating_add(store.data_rows_scanned);
69        self.index_entries_scanned = self
70            .index_entries_scanned
71            .saturating_add(store.index_entries_scanned);
72        self.corrupted_data_keys = self
73            .corrupted_data_keys
74            .saturating_add(store.corrupted_data_keys);
75        self.corrupted_data_rows = self
76            .corrupted_data_rows
77            .saturating_add(store.corrupted_data_rows);
78        self.corrupted_index_keys = self
79            .corrupted_index_keys
80            .saturating_add(store.corrupted_index_keys);
81        self.corrupted_index_entries = self
82            .corrupted_index_entries
83            .saturating_add(store.corrupted_index_entries);
84        self.missing_index_entries = self
85            .missing_index_entries
86            .saturating_add(store.missing_index_entries);
87        self.divergent_index_entries = self
88            .divergent_index_entries
89            .saturating_add(store.divergent_index_entries);
90        self.orphan_index_references = self
91            .orphan_index_references
92            .saturating_add(store.orphan_index_references);
93        self.compatibility_findings = self
94            .compatibility_findings
95            .saturating_add(store.compatibility_findings);
96        self.misuse_findings = self.misuse_findings.saturating_add(store.misuse_findings);
97    }
98
99    /// Return total number of data rows scanned.
100    #[must_use]
101    pub const fn data_rows_scanned(&self) -> u64 {
102        self.data_rows_scanned
103    }
104
105    /// Return total number of index entries scanned.
106    #[must_use]
107    pub const fn index_entries_scanned(&self) -> u64 {
108        self.index_entries_scanned
109    }
110
111    /// Return total number of corrupted data-key findings.
112    #[must_use]
113    pub const fn corrupted_data_keys(&self) -> u64 {
114        self.corrupted_data_keys
115    }
116
117    /// Return total number of corrupted data-row findings.
118    #[must_use]
119    pub const fn corrupted_data_rows(&self) -> u64 {
120        self.corrupted_data_rows
121    }
122
123    /// Return total number of corrupted index-key findings.
124    #[must_use]
125    pub const fn corrupted_index_keys(&self) -> u64 {
126        self.corrupted_index_keys
127    }
128
129    /// Return total number of corrupted index-entry findings.
130    #[must_use]
131    pub const fn corrupted_index_entries(&self) -> u64 {
132        self.corrupted_index_entries
133    }
134
135    /// Return total number of missing index-entry findings.
136    #[must_use]
137    pub const fn missing_index_entries(&self) -> u64 {
138        self.missing_index_entries
139    }
140
141    /// Return total number of divergent index-entry findings.
142    #[must_use]
143    pub const fn divergent_index_entries(&self) -> u64 {
144        self.divergent_index_entries
145    }
146
147    /// Return total number of orphan index-reference findings.
148    #[must_use]
149    pub const fn orphan_index_references(&self) -> u64 {
150        self.orphan_index_references
151    }
152
153    /// Return total number of compatibility findings.
154    #[must_use]
155    pub const fn compatibility_findings(&self) -> u64 {
156        self.compatibility_findings
157    }
158
159    /// Return total number of misuse findings.
160    #[must_use]
161    pub const fn misuse_findings(&self) -> u64 {
162        self.misuse_findings
163    }
164}
165
166///
167/// IntegrityStoreSnapshot
168/// Per-store integrity findings and scan counters.
169///
170
171#[derive(CandidType, Clone, Debug, Default, Deserialize)]
172pub struct IntegrityStoreSnapshot {
173    pub(crate) path: String,
174    pub(crate) data_rows_scanned: u64,
175    pub(crate) index_entries_scanned: u64,
176    pub(crate) corrupted_data_keys: u64,
177    pub(crate) corrupted_data_rows: u64,
178    pub(crate) corrupted_index_keys: u64,
179    pub(crate) corrupted_index_entries: u64,
180    pub(crate) missing_index_entries: u64,
181    pub(crate) divergent_index_entries: u64,
182    pub(crate) orphan_index_references: u64,
183    pub(crate) compatibility_findings: u64,
184    pub(crate) misuse_findings: u64,
185}
186
187impl IntegrityStoreSnapshot {
188    /// Construct one empty store-level integrity snapshot.
189    #[must_use]
190    pub fn new(path: String) -> Self {
191        Self {
192            path,
193            ..Self::default()
194        }
195    }
196
197    /// Borrow store path.
198    #[must_use]
199    pub const fn path(&self) -> &str {
200        self.path.as_str()
201    }
202
203    /// Return number of scanned data rows.
204    #[must_use]
205    pub const fn data_rows_scanned(&self) -> u64 {
206        self.data_rows_scanned
207    }
208
209    /// Return number of scanned index entries.
210    #[must_use]
211    pub const fn index_entries_scanned(&self) -> u64 {
212        self.index_entries_scanned
213    }
214
215    /// Return number of corrupted data-key findings.
216    #[must_use]
217    pub const fn corrupted_data_keys(&self) -> u64 {
218        self.corrupted_data_keys
219    }
220
221    /// Return number of corrupted data-row findings.
222    #[must_use]
223    pub const fn corrupted_data_rows(&self) -> u64 {
224        self.corrupted_data_rows
225    }
226
227    /// Return number of corrupted index-key findings.
228    #[must_use]
229    pub const fn corrupted_index_keys(&self) -> u64 {
230        self.corrupted_index_keys
231    }
232
233    /// Return number of corrupted index-entry findings.
234    #[must_use]
235    pub const fn corrupted_index_entries(&self) -> u64 {
236        self.corrupted_index_entries
237    }
238
239    /// Return number of missing index-entry findings.
240    #[must_use]
241    pub const fn missing_index_entries(&self) -> u64 {
242        self.missing_index_entries
243    }
244
245    /// Return number of divergent index-entry findings.
246    #[must_use]
247    pub const fn divergent_index_entries(&self) -> u64 {
248        self.divergent_index_entries
249    }
250
251    /// Return number of orphan index-reference findings.
252    #[must_use]
253    pub const fn orphan_index_references(&self) -> u64 {
254        self.orphan_index_references
255    }
256
257    /// Return number of compatibility findings.
258    #[must_use]
259    pub const fn compatibility_findings(&self) -> u64 {
260        self.compatibility_findings
261    }
262
263    /// Return number of misuse findings.
264    #[must_use]
265    pub const fn misuse_findings(&self) -> u64 {
266        self.misuse_findings
267    }
268}
269
270///
271/// IntegrityReport
272/// Full integrity-scan output across all registered stores.
273///
274
275#[derive(CandidType, Clone, Debug, Default, Deserialize)]
276pub struct IntegrityReport {
277    pub(crate) stores: Vec<IntegrityStoreSnapshot>,
278    pub(crate) totals: IntegrityTotals,
279}
280
281impl IntegrityReport {
282    /// Construct one integrity report payload.
283    #[must_use]
284    pub const fn new(stores: Vec<IntegrityStoreSnapshot>, totals: IntegrityTotals) -> Self {
285        Self { stores, totals }
286    }
287
288    /// Borrow per-store integrity snapshots.
289    #[must_use]
290    pub const fn stores(&self) -> &[IntegrityStoreSnapshot] {
291        self.stores.as_slice()
292    }
293
294    /// Borrow aggregated integrity totals.
295    #[must_use]
296    pub const fn totals(&self) -> &IntegrityTotals {
297        &self.totals
298    }
299}
300
301impl StorageReport {
302    /// Construct one storage report payload.
303    #[must_use]
304    pub const fn new(
305        storage_data: Vec<DataStoreSnapshot>,
306        storage_index: Vec<IndexStoreSnapshot>,
307        entity_storage: Vec<EntitySnapshot>,
308        corrupted_keys: u64,
309        corrupted_entries: u64,
310    ) -> Self {
311        Self {
312            storage_data,
313            storage_index,
314            entity_storage,
315            corrupted_keys,
316            corrupted_entries,
317        }
318    }
319
320    /// Borrow data-store snapshots.
321    #[must_use]
322    pub const fn storage_data(&self) -> &[DataStoreSnapshot] {
323        self.storage_data.as_slice()
324    }
325
326    /// Borrow index-store snapshots.
327    #[must_use]
328    pub const fn storage_index(&self) -> &[IndexStoreSnapshot] {
329        self.storage_index.as_slice()
330    }
331
332    /// Borrow entity-level storage snapshots.
333    #[must_use]
334    pub const fn entity_storage(&self) -> &[EntitySnapshot] {
335        self.entity_storage.as_slice()
336    }
337
338    /// Return count of corrupted decoded data keys.
339    #[must_use]
340    pub const fn corrupted_keys(&self) -> u64 {
341        self.corrupted_keys
342    }
343
344    /// Return count of corrupted index entries.
345    #[must_use]
346    pub const fn corrupted_entries(&self) -> u64 {
347        self.corrupted_entries
348    }
349}
350
351///
352/// DataStoreSnapshot
353/// Data-store snapshot row.
354///
355
356#[derive(CandidType, Clone, Debug, Default, Deserialize)]
357pub struct DataStoreSnapshot {
358    pub(crate) path: String,
359    pub(crate) entries: u64,
360    pub(crate) memory_bytes: u64,
361}
362
363impl DataStoreSnapshot {
364    /// Construct one data-store snapshot row.
365    #[must_use]
366    pub const fn new(path: String, entries: u64, memory_bytes: u64) -> Self {
367        Self {
368            path,
369            entries,
370            memory_bytes,
371        }
372    }
373
374    /// Borrow store path.
375    #[must_use]
376    pub const fn path(&self) -> &str {
377        self.path.as_str()
378    }
379
380    /// Return row count.
381    #[must_use]
382    pub const fn entries(&self) -> u64 {
383        self.entries
384    }
385
386    /// Return memory usage in bytes.
387    #[must_use]
388    pub const fn memory_bytes(&self) -> u64 {
389        self.memory_bytes
390    }
391}
392
393///
394/// IndexStoreSnapshot
395/// Index-store snapshot row.
396///
397
398#[derive(CandidType, Clone, Debug, Default, Deserialize)]
399pub struct IndexStoreSnapshot {
400    pub(crate) path: String,
401    pub(crate) entries: u64,
402    pub(crate) user_entries: u64,
403    pub(crate) system_entries: u64,
404    pub(crate) memory_bytes: u64,
405}
406
407impl IndexStoreSnapshot {
408    /// Construct one index-store snapshot row.
409    #[must_use]
410    pub const fn new(
411        path: String,
412        entries: u64,
413        user_entries: u64,
414        system_entries: u64,
415        memory_bytes: u64,
416    ) -> Self {
417        Self {
418            path,
419            entries,
420            user_entries,
421            system_entries,
422            memory_bytes,
423        }
424    }
425
426    /// Borrow store path.
427    #[must_use]
428    pub const fn path(&self) -> &str {
429        self.path.as_str()
430    }
431
432    /// Return total entry count.
433    #[must_use]
434    pub const fn entries(&self) -> u64 {
435        self.entries
436    }
437
438    /// Return user-namespace entry count.
439    #[must_use]
440    pub const fn user_entries(&self) -> u64 {
441        self.user_entries
442    }
443
444    /// Return system-namespace entry count.
445    #[must_use]
446    pub const fn system_entries(&self) -> u64 {
447        self.system_entries
448    }
449
450    /// Return memory usage in bytes.
451    #[must_use]
452    pub const fn memory_bytes(&self) -> u64 {
453        self.memory_bytes
454    }
455}
456
457///
458/// EntitySnapshot
459/// Per-entity storage snapshot row.
460///
461
462#[derive(CandidType, Clone, Debug, Default, Deserialize)]
463pub struct EntitySnapshot {
464    pub(crate) store: String,
465
466    pub(crate) path: String,
467
468    pub(crate) entries: u64,
469
470    pub(crate) memory_bytes: u64,
471
472    pub(crate) min_key: Option<String>,
473
474    pub(crate) max_key: Option<String>,
475}
476
477impl EntitySnapshot {
478    /// Construct one entity-storage snapshot row.
479    #[must_use]
480    pub fn new(
481        store: String,
482        path: String,
483        entries: u64,
484        memory_bytes: u64,
485        min_key: Option<StorageKey>,
486        max_key: Option<StorageKey>,
487    ) -> Self {
488        Self {
489            store,
490            path,
491            entries,
492            memory_bytes,
493            min_key: min_key.map(Self::storage_key_text),
494            max_key: max_key.map(Self::storage_key_text),
495        }
496    }
497
498    // Keep snapshot key rendering local to the diagnostics contract so the
499    // canister DTO does not retain the full `Value` Candid surface.
500    fn storage_key_text(key: StorageKey) -> String {
501        match key {
502            StorageKey::Account(value) => value.to_string(),
503            StorageKey::Int(value) => value.to_string(),
504            StorageKey::Principal(value) => value.to_string(),
505            StorageKey::Subaccount(value) => value.to_string(),
506            StorageKey::Timestamp(value) => value.repr().to_string(),
507            StorageKey::Uint(value) => value.to_string(),
508            StorageKey::Ulid(value) => value.to_string(),
509            StorageKey::Unit => "()".to_string(),
510        }
511    }
512
513    /// Borrow store path.
514    #[must_use]
515    pub const fn store(&self) -> &str {
516        self.store.as_str()
517    }
518
519    /// Borrow entity path.
520    #[must_use]
521    pub const fn path(&self) -> &str {
522        self.path.as_str()
523    }
524
525    /// Return row count.
526    #[must_use]
527    pub const fn entries(&self) -> u64 {
528        self.entries
529    }
530
531    /// Return memory usage in bytes.
532    #[must_use]
533    pub const fn memory_bytes(&self) -> u64 {
534        self.memory_bytes
535    }
536
537    /// Borrow optional minimum primary key.
538    #[must_use]
539    pub fn min_key(&self) -> Option<&str> {
540        self.min_key.as_deref()
541    }
542
543    /// Borrow optional maximum primary key.
544    #[must_use]
545    pub fn max_key(&self) -> Option<&str> {
546        self.max_key.as_deref()
547    }
548}
549
550///
551/// EntityStats
552/// Internal struct for building per-entity stats before snapshotting.
553///
554
555#[derive(Default)]
556struct EntityStats {
557    entries: u64,
558    memory_bytes: u64,
559    min_key: Option<StorageKey>,
560    max_key: Option<StorageKey>,
561}
562
563impl EntityStats {
564    // Accumulate per-entity counters and keep min/max over entity-local storage keys.
565    fn update(&mut self, dk: &DataKey, value_len: u64) {
566        self.entries = self.entries.saturating_add(1);
567        self.memory_bytes = self
568            .memory_bytes
569            .saturating_add(DataKey::entry_size_bytes(value_len));
570
571        let k = dk.storage_key();
572
573        match &mut self.min_key {
574            Some(min) if k < *min => *min = k,
575            None => self.min_key = Some(k),
576            _ => {}
577        }
578
579        match &mut self.max_key {
580            Some(max) if k > *max => *max = k,
581            None => self.max_key = Some(k),
582            _ => {}
583        }
584    }
585}
586
587fn storage_report_name_for_hook<'a, C: CanisterKind>(
588    name_map: &BTreeMap<&'static str, &'a str>,
589    hooks: &EntityRuntimeHooks<C>,
590) -> &'a str {
591    name_map
592        .get(hooks.entity_path)
593        .copied()
594        .or_else(|| name_map.get(hooks.model.name()).copied())
595        .unwrap_or(hooks.entity_path)
596}
597
598/// Build one deterministic storage snapshot with per-entity rollups.
599///
600/// This path is read-only and fail-closed on decode/validation errors by counting
601/// corrupted keys/entries instead of panicking.
602pub(crate) fn storage_report<C: CanisterKind>(
603    db: &Db<C>,
604    name_to_path: &[(&'static str, &'static str)],
605) -> Result<StorageReport, InternalError> {
606    db.ensure_recovered_state()?;
607    // Build one optional alias map once, then resolve report names from the
608    // runtime hook table so entity tags keep distinct path identity even when
609    // multiple hooks intentionally share the same model name.
610    let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
611    let mut tag_name_map = BTreeMap::<EntityTag, &str>::new();
612    for hooks in db.entity_runtime_hooks {
613        tag_name_map
614            .entry(hooks.entity_tag)
615            .or_insert_with(|| storage_report_name_for_hook(&name_map, hooks));
616    }
617    let mut data = Vec::new();
618    let mut index = Vec::new();
619    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
620    let mut corrupted_keys = 0u64;
621    let mut corrupted_entries = 0u64;
622
623    db.with_store_registry(|reg| {
624        // Keep diagnostics snapshots deterministic by traversing stores in path order.
625        let mut stores = reg.iter().collect::<Vec<_>>();
626        stores.sort_by_key(|(path, _)| *path);
627
628        for (path, store_handle) in stores {
629            // Phase 1: collect data-store snapshots and per-entity stats.
630            store_handle.with_data(|store| {
631                data.push(DataStoreSnapshot::new(
632                    path.to_string(),
633                    store.len(),
634                    store.memory_bytes(),
635                ));
636
637                // Track per-entity counts, memory, and min/max Keys (not DataKeys)
638                let mut by_entity: BTreeMap<EntityTag, EntityStats> = BTreeMap::new();
639
640                for entry in store.iter() {
641                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
642                        corrupted_keys = corrupted_keys.saturating_add(1);
643                        continue;
644                    };
645
646                    let value_len = entry.value().len() as u64;
647
648                    by_entity
649                        .entry(dk.entity_tag())
650                        .or_default()
651                        .update(&dk, value_len);
652                }
653
654                for (entity_tag, stats) in by_entity {
655                    let path_name = tag_name_map
656                        .get(&entity_tag)
657                        .copied()
658                        .map(str::to_string)
659                        .or_else(|| {
660                            db.runtime_hook_for_entity_tag(entity_tag)
661                                .ok()
662                                .map(|hooks| {
663                                    storage_report_name_for_hook(&name_map, hooks).to_string()
664                                })
665                        })
666                        .unwrap_or_else(|| format!("#{}", entity_tag.value()));
667                    entity_storage.push(EntitySnapshot::new(
668                        path.to_string(),
669                        path_name,
670                        stats.entries,
671                        stats.memory_bytes,
672                        stats.min_key,
673                        stats.max_key,
674                    ));
675                }
676            });
677
678            // Phase 2: collect index-store snapshots and integrity counters.
679            store_handle.with_index(|store| {
680                let mut user_entries = 0u64;
681                let mut system_entries = 0u64;
682
683                for (key, value) in store.entries() {
684                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
685                        corrupted_entries = corrupted_entries.saturating_add(1);
686                        continue;
687                    };
688
689                    if decoded_key.uses_system_namespace() {
690                        system_entries = system_entries.saturating_add(1);
691                    } else {
692                        user_entries = user_entries.saturating_add(1);
693                    }
694
695                    if value.validate().is_err() {
696                        corrupted_entries = corrupted_entries.saturating_add(1);
697                    }
698                }
699
700                index.push(IndexStoreSnapshot::new(
701                    path.to_string(),
702                    store.len(),
703                    user_entries,
704                    system_entries,
705                    store.memory_bytes(),
706                ));
707            });
708        }
709    });
710
711    // Phase 3: enforce deterministic entity snapshot emission order.
712    // This remains stable even if outer store traversal internals change.
713    entity_storage
714        .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
715
716    Ok(StorageReport::new(
717        data,
718        index,
719        entity_storage,
720        corrupted_keys,
721        corrupted_entries,
722    ))
723}
724
725/// Build one deterministic integrity scan over all registered stores.
726///
727/// This scan is read-only and classifies findings as:
728/// - corruption: malformed persisted bytes or inconsistent structural links
729/// - compatibility: persisted payloads outside decode compatibility windows
730/// - misuse: unsupported runtime wiring (for example missing entity hooks)
731pub(crate) fn integrity_report<C: CanisterKind>(
732    db: &Db<C>,
733) -> Result<IntegrityReport, InternalError> {
734    db.ensure_recovered_state()?;
735
736    integrity_report_after_recovery(db)
737}
738
739/// Build one deterministic integrity scan after recovery has already completed.
740///
741/// Callers running inside recovery flow should use this variant to avoid
742/// recursive recovery gating.
743pub(in crate::db) fn integrity_report_after_recovery<C: CanisterKind>(
744    db: &Db<C>,
745) -> Result<IntegrityReport, InternalError> {
746    build_integrity_report(db)
747}
748
749fn build_integrity_report<C: CanisterKind>(db: &Db<C>) -> Result<IntegrityReport, InternalError> {
750    let mut stores = Vec::new();
751    let mut totals = IntegrityTotals::default();
752    let global_live_keys_by_entity = collect_global_live_keys_by_entity(db)?;
753
754    db.with_store_registry(|reg| {
755        // Keep deterministic output order across registry traversal implementations.
756        let mut store_entries = reg.iter().collect::<Vec<_>>();
757        store_entries.sort_by_key(|(path, _)| *path);
758
759        for (path, store_handle) in store_entries {
760            let mut snapshot = IntegrityStoreSnapshot::new(path.to_string());
761            scan_store_forward_integrity(db, store_handle, &mut snapshot)?;
762            scan_store_reverse_integrity(store_handle, &global_live_keys_by_entity, &mut snapshot);
763
764            totals.add_store_snapshot(&snapshot);
765            stores.push(snapshot);
766        }
767
768        Ok::<(), InternalError>(())
769    })?;
770
771    Ok(IntegrityReport::new(stores, totals))
772}
773
774// Build one global map of live data keys grouped by entity across all stores.
775fn collect_global_live_keys_by_entity<C: CanisterKind>(
776    db: &Db<C>,
777) -> Result<BTreeMap<EntityTag, BTreeSet<StorageKey>>, InternalError> {
778    let mut keys = BTreeMap::<EntityTag, BTreeSet<StorageKey>>::new();
779
780    db.with_store_registry(|reg| {
781        for (_, store_handle) in reg.iter() {
782            store_handle.with_data(|data_store| {
783                for entry in data_store.iter() {
784                    if let Ok(data_key) = DataKey::try_from_raw(entry.key()) {
785                        keys.entry(data_key.entity_tag())
786                            .or_default()
787                            .insert(data_key.storage_key());
788                    }
789                }
790            });
791        }
792
793        Ok::<(), InternalError>(())
794    })?;
795
796    Ok(keys)
797}
798
799// Run forward (data -> index) integrity checks for one store.
800fn scan_store_forward_integrity<C: CanisterKind>(
801    db: &Db<C>,
802    store_handle: StoreHandle,
803    snapshot: &mut IntegrityStoreSnapshot,
804) -> Result<(), InternalError> {
805    store_handle.with_data(|data_store| {
806        for entry in data_store.iter() {
807            snapshot.data_rows_scanned = snapshot.data_rows_scanned.saturating_add(1);
808
809            let raw_key = *entry.key();
810
811            let Ok(data_key) = DataKey::try_from_raw(&raw_key) else {
812                snapshot.corrupted_data_keys = snapshot.corrupted_data_keys.saturating_add(1);
813                continue;
814            };
815
816            let hooks = match db.runtime_hook_for_entity_tag(data_key.entity_tag()) {
817                Ok(hooks) => hooks,
818                Err(err) => {
819                    classify_scan_error(err, snapshot)?;
820                    continue;
821                }
822            };
823
824            let marker_row = CommitRowOp::new(
825                hooks.entity_path,
826                raw_key.as_bytes().to_vec(),
827                None,
828                Some(entry.value().as_bytes().to_vec()),
829                crate::db::schema::commit_schema_fingerprint_for_model(
830                    hooks.entity_path,
831                    hooks.model,
832                ),
833            );
834
835            // Validate envelope compatibility before typed preparation so
836            // incompatible persisted formats remain compatibility-classified.
837            if let Err(err) = decode_structural_row_cbor(&entry.value()) {
838                classify_scan_error(err, snapshot)?;
839                continue;
840            }
841
842            let prepared = match db.prepare_row_commit_op(&marker_row) {
843                Ok(prepared) => prepared,
844                Err(err) => {
845                    classify_scan_error(err, snapshot)?;
846                    continue;
847                }
848            };
849
850            for index_op in prepared.index_ops {
851                let Some(expected_value) = index_op.value else {
852                    continue;
853                };
854
855                let actual = index_op
856                    .store
857                    .with_borrow(|index_store| index_store.get(&index_op.key));
858                match actual {
859                    Some(actual_value) if actual_value == expected_value => {}
860                    Some(_) => {
861                        snapshot.divergent_index_entries =
862                            snapshot.divergent_index_entries.saturating_add(1);
863                    }
864                    None => {
865                        snapshot.missing_index_entries =
866                            snapshot.missing_index_entries.saturating_add(1);
867                    }
868                }
869            }
870        }
871
872        Ok::<(), InternalError>(())
873    })
874}
875
876// Run reverse (index -> data) integrity checks for one store.
877fn scan_store_reverse_integrity(
878    store_handle: StoreHandle,
879    live_keys_by_entity: &BTreeMap<EntityTag, BTreeSet<StorageKey>>,
880    snapshot: &mut IntegrityStoreSnapshot,
881) {
882    store_handle.with_index(|index_store| {
883        for (raw_index_key, raw_index_entry) in index_store.entries() {
884            snapshot.index_entries_scanned = snapshot.index_entries_scanned.saturating_add(1);
885
886            let Ok(decoded_index_key) = IndexKey::try_from_raw(&raw_index_key) else {
887                snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
888                continue;
889            };
890
891            let index_entity_tag = data_entity_tag_for_index_key(&decoded_index_key);
892
893            let Ok(indexed_primary_keys) = raw_index_entry.decode_keys() else {
894                snapshot.corrupted_index_entries =
895                    snapshot.corrupted_index_entries.saturating_add(1);
896                continue;
897            };
898
899            for primary_key in indexed_primary_keys {
900                let exists = live_keys_by_entity
901                    .get(&index_entity_tag)
902                    .is_some_and(|entity_keys| entity_keys.contains(&primary_key));
903                if !exists {
904                    snapshot.orphan_index_references =
905                        snapshot.orphan_index_references.saturating_add(1);
906                }
907            }
908        }
909    });
910}
911
912// Map scan-time errors into explicit integrity classification buckets.
913fn classify_scan_error(
914    err: InternalError,
915    snapshot: &mut IntegrityStoreSnapshot,
916) -> Result<(), InternalError> {
917    match err.class() {
918        ErrorClass::Corruption => {
919            snapshot.corrupted_data_rows = snapshot.corrupted_data_rows.saturating_add(1);
920            Ok(())
921        }
922        ErrorClass::IncompatiblePersistedFormat => {
923            snapshot.compatibility_findings = snapshot.compatibility_findings.saturating_add(1);
924            Ok(())
925        }
926        ErrorClass::Unsupported | ErrorClass::NotFound | ErrorClass::Conflict => {
927            snapshot.misuse_findings = snapshot.misuse_findings.saturating_add(1);
928            Ok(())
929        }
930        ErrorClass::Internal | ErrorClass::InvariantViolation => Err(err),
931    }
932}
933
934// Parse the data-entity identity from one decoded index key.
935const fn data_entity_tag_for_index_key(index_key: &IndexKey) -> EntityTag {
936    index_key.index_id().entity_tag
937}