Skip to main content

icydb_core/db/diagnostics/
mod.rs

1//! Module: diagnostics
2//! Responsibility: read-only storage footprint and integrity snapshots.
3//! Does not own: recovery, write-path mutation, or query planning semantics.
4//! Boundary: consumes `Db`/store read APIs and returns DTO snapshots.
5
6mod execution_trace;
7
8use crate::{
9    db::{
10        Db,
11        commit::CommitRowOp,
12        data::{DataKey, StorageKey, decode_structural_row_cbor},
13        index::IndexKey,
14        registry::StoreHandle,
15    },
16    error::{ErrorClass, InternalError},
17    traits::CanisterKind,
18    types::EntityTag,
19    value::Value,
20};
21use candid::CandidType;
22use serde::{Deserialize, Serialize};
23use std::collections::{BTreeMap, BTreeSet};
24
25pub use execution_trace::{
26    ExecutionAccessPathVariant, ExecutionMetrics, ExecutionOptimization, ExecutionTrace,
27};
28
29///
30/// StorageReport
31/// Live storage snapshot report
32///
33
34#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
35pub struct StorageReport {
36    pub(crate) storage_data: Vec<DataStoreSnapshot>,
37    pub(crate) storage_index: Vec<IndexStoreSnapshot>,
38    pub(crate) entity_storage: Vec<EntitySnapshot>,
39    pub(crate) corrupted_keys: u64,
40    pub(crate) corrupted_entries: u64,
41}
42
43///
44/// IntegrityTotals
45/// Aggregated integrity-scan counters across all stores.
46///
47
48#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
49pub struct IntegrityTotals {
50    pub(crate) data_rows_scanned: u64,
51    pub(crate) index_entries_scanned: u64,
52    pub(crate) corrupted_data_keys: u64,
53    pub(crate) corrupted_data_rows: u64,
54    pub(crate) corrupted_index_keys: u64,
55    pub(crate) corrupted_index_entries: u64,
56    pub(crate) missing_index_entries: u64,
57    pub(crate) divergent_index_entries: u64,
58    pub(crate) orphan_index_references: u64,
59    pub(crate) compatibility_findings: u64,
60    pub(crate) misuse_findings: u64,
61}
62
63impl IntegrityTotals {
64    const fn add_store_snapshot(&mut self, store: &IntegrityStoreSnapshot) {
65        self.data_rows_scanned = self
66            .data_rows_scanned
67            .saturating_add(store.data_rows_scanned);
68        self.index_entries_scanned = self
69            .index_entries_scanned
70            .saturating_add(store.index_entries_scanned);
71        self.corrupted_data_keys = self
72            .corrupted_data_keys
73            .saturating_add(store.corrupted_data_keys);
74        self.corrupted_data_rows = self
75            .corrupted_data_rows
76            .saturating_add(store.corrupted_data_rows);
77        self.corrupted_index_keys = self
78            .corrupted_index_keys
79            .saturating_add(store.corrupted_index_keys);
80        self.corrupted_index_entries = self
81            .corrupted_index_entries
82            .saturating_add(store.corrupted_index_entries);
83        self.missing_index_entries = self
84            .missing_index_entries
85            .saturating_add(store.missing_index_entries);
86        self.divergent_index_entries = self
87            .divergent_index_entries
88            .saturating_add(store.divergent_index_entries);
89        self.orphan_index_references = self
90            .orphan_index_references
91            .saturating_add(store.orphan_index_references);
92        self.compatibility_findings = self
93            .compatibility_findings
94            .saturating_add(store.compatibility_findings);
95        self.misuse_findings = self.misuse_findings.saturating_add(store.misuse_findings);
96    }
97
98    /// Return total number of data rows scanned.
99    #[must_use]
100    pub const fn data_rows_scanned(&self) -> u64 {
101        self.data_rows_scanned
102    }
103
104    /// Return total number of index entries scanned.
105    #[must_use]
106    pub const fn index_entries_scanned(&self) -> u64 {
107        self.index_entries_scanned
108    }
109
110    /// Return total number of corrupted data-key findings.
111    #[must_use]
112    pub const fn corrupted_data_keys(&self) -> u64 {
113        self.corrupted_data_keys
114    }
115
116    /// Return total number of corrupted data-row findings.
117    #[must_use]
118    pub const fn corrupted_data_rows(&self) -> u64 {
119        self.corrupted_data_rows
120    }
121
122    /// Return total number of corrupted index-key findings.
123    #[must_use]
124    pub const fn corrupted_index_keys(&self) -> u64 {
125        self.corrupted_index_keys
126    }
127
128    /// Return total number of corrupted index-entry findings.
129    #[must_use]
130    pub const fn corrupted_index_entries(&self) -> u64 {
131        self.corrupted_index_entries
132    }
133
134    /// Return total number of missing index-entry findings.
135    #[must_use]
136    pub const fn missing_index_entries(&self) -> u64 {
137        self.missing_index_entries
138    }
139
140    /// Return total number of divergent index-entry findings.
141    #[must_use]
142    pub const fn divergent_index_entries(&self) -> u64 {
143        self.divergent_index_entries
144    }
145
146    /// Return total number of orphan index-reference findings.
147    #[must_use]
148    pub const fn orphan_index_references(&self) -> u64 {
149        self.orphan_index_references
150    }
151
152    /// Return total number of compatibility findings.
153    #[must_use]
154    pub const fn compatibility_findings(&self) -> u64 {
155        self.compatibility_findings
156    }
157
158    /// Return total number of misuse findings.
159    #[must_use]
160    pub const fn misuse_findings(&self) -> u64 {
161        self.misuse_findings
162    }
163}
164
165///
166/// IntegrityStoreSnapshot
167/// Per-store integrity findings and scan counters.
168///
169
170#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
171pub struct IntegrityStoreSnapshot {
172    pub(crate) path: String,
173    pub(crate) data_rows_scanned: u64,
174    pub(crate) index_entries_scanned: u64,
175    pub(crate) corrupted_data_keys: u64,
176    pub(crate) corrupted_data_rows: u64,
177    pub(crate) corrupted_index_keys: u64,
178    pub(crate) corrupted_index_entries: u64,
179    pub(crate) missing_index_entries: u64,
180    pub(crate) divergent_index_entries: u64,
181    pub(crate) orphan_index_references: u64,
182    pub(crate) compatibility_findings: u64,
183    pub(crate) misuse_findings: u64,
184}
185
186impl IntegrityStoreSnapshot {
187    /// Construct one empty store-level integrity snapshot.
188    #[must_use]
189    pub fn new(path: String) -> Self {
190        Self {
191            path,
192            ..Self::default()
193        }
194    }
195
196    /// Borrow store path.
197    #[must_use]
198    pub const fn path(&self) -> &str {
199        self.path.as_str()
200    }
201
202    /// Return number of scanned data rows.
203    #[must_use]
204    pub const fn data_rows_scanned(&self) -> u64 {
205        self.data_rows_scanned
206    }
207
208    /// Return number of scanned index entries.
209    #[must_use]
210    pub const fn index_entries_scanned(&self) -> u64 {
211        self.index_entries_scanned
212    }
213
214    /// Return number of corrupted data-key findings.
215    #[must_use]
216    pub const fn corrupted_data_keys(&self) -> u64 {
217        self.corrupted_data_keys
218    }
219
220    /// Return number of corrupted data-row findings.
221    #[must_use]
222    pub const fn corrupted_data_rows(&self) -> u64 {
223        self.corrupted_data_rows
224    }
225
226    /// Return number of corrupted index-key findings.
227    #[must_use]
228    pub const fn corrupted_index_keys(&self) -> u64 {
229        self.corrupted_index_keys
230    }
231
232    /// Return number of corrupted index-entry findings.
233    #[must_use]
234    pub const fn corrupted_index_entries(&self) -> u64 {
235        self.corrupted_index_entries
236    }
237
238    /// Return number of missing index-entry findings.
239    #[must_use]
240    pub const fn missing_index_entries(&self) -> u64 {
241        self.missing_index_entries
242    }
243
244    /// Return number of divergent index-entry findings.
245    #[must_use]
246    pub const fn divergent_index_entries(&self) -> u64 {
247        self.divergent_index_entries
248    }
249
250    /// Return number of orphan index-reference findings.
251    #[must_use]
252    pub const fn orphan_index_references(&self) -> u64 {
253        self.orphan_index_references
254    }
255
256    /// Return number of compatibility findings.
257    #[must_use]
258    pub const fn compatibility_findings(&self) -> u64 {
259        self.compatibility_findings
260    }
261
262    /// Return number of misuse findings.
263    #[must_use]
264    pub const fn misuse_findings(&self) -> u64 {
265        self.misuse_findings
266    }
267}
268
269///
270/// IntegrityReport
271/// Full integrity-scan output across all registered stores.
272///
273
274#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
275pub struct IntegrityReport {
276    pub(crate) stores: Vec<IntegrityStoreSnapshot>,
277    pub(crate) totals: IntegrityTotals,
278}
279
280impl IntegrityReport {
281    /// Construct one integrity report payload.
282    #[must_use]
283    pub const fn new(stores: Vec<IntegrityStoreSnapshot>, totals: IntegrityTotals) -> Self {
284        Self { stores, totals }
285    }
286
287    /// Borrow per-store integrity snapshots.
288    #[must_use]
289    pub const fn stores(&self) -> &[IntegrityStoreSnapshot] {
290        self.stores.as_slice()
291    }
292
293    /// Borrow aggregated integrity totals.
294    #[must_use]
295    pub const fn totals(&self) -> &IntegrityTotals {
296        &self.totals
297    }
298}
299
300impl StorageReport {
301    /// Construct one storage report payload.
302    #[must_use]
303    pub const fn new(
304        storage_data: Vec<DataStoreSnapshot>,
305        storage_index: Vec<IndexStoreSnapshot>,
306        entity_storage: Vec<EntitySnapshot>,
307        corrupted_keys: u64,
308        corrupted_entries: u64,
309    ) -> Self {
310        Self {
311            storage_data,
312            storage_index,
313            entity_storage,
314            corrupted_keys,
315            corrupted_entries,
316        }
317    }
318
319    /// Borrow data-store snapshots.
320    #[must_use]
321    pub const fn storage_data(&self) -> &[DataStoreSnapshot] {
322        self.storage_data.as_slice()
323    }
324
325    /// Borrow index-store snapshots.
326    #[must_use]
327    pub const fn storage_index(&self) -> &[IndexStoreSnapshot] {
328        self.storage_index.as_slice()
329    }
330
331    /// Borrow entity-level storage snapshots.
332    #[must_use]
333    pub const fn entity_storage(&self) -> &[EntitySnapshot] {
334        self.entity_storage.as_slice()
335    }
336
337    /// Return count of corrupted decoded data keys.
338    #[must_use]
339    pub const fn corrupted_keys(&self) -> u64 {
340        self.corrupted_keys
341    }
342
343    /// Return count of corrupted index entries.
344    #[must_use]
345    pub const fn corrupted_entries(&self) -> u64 {
346        self.corrupted_entries
347    }
348}
349
350///
351/// DataStoreSnapshot
352/// Store-level snapshot metrics.
353///
354
355#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
356pub struct DataStoreSnapshot {
357    pub(crate) path: String,
358    pub(crate) entries: u64,
359    pub(crate) memory_bytes: u64,
360}
361
362impl DataStoreSnapshot {
363    /// Construct one data-store snapshot row.
364    #[must_use]
365    pub const fn new(path: String, entries: u64, memory_bytes: u64) -> Self {
366        Self {
367            path,
368            entries,
369            memory_bytes,
370        }
371    }
372
373    /// Borrow store path.
374    #[must_use]
375    pub const fn path(&self) -> &str {
376        self.path.as_str()
377    }
378
379    /// Return row count.
380    #[must_use]
381    pub const fn entries(&self) -> u64 {
382        self.entries
383    }
384
385    /// Return memory usage in bytes.
386    #[must_use]
387    pub const fn memory_bytes(&self) -> u64 {
388        self.memory_bytes
389    }
390}
391
392///
393/// IndexStoreSnapshot
394/// Index-store snapshot metrics
395///
396
397#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
398pub struct IndexStoreSnapshot {
399    pub(crate) path: String,
400    pub(crate) entries: u64,
401    pub(crate) user_entries: u64,
402    pub(crate) system_entries: u64,
403    pub(crate) memory_bytes: u64,
404}
405
406impl IndexStoreSnapshot {
407    /// Construct one index-store snapshot row.
408    #[must_use]
409    pub const fn new(
410        path: String,
411        entries: u64,
412        user_entries: u64,
413        system_entries: u64,
414        memory_bytes: u64,
415    ) -> Self {
416        Self {
417            path,
418            entries,
419            user_entries,
420            system_entries,
421            memory_bytes,
422        }
423    }
424
425    /// Borrow store path.
426    #[must_use]
427    pub const fn path(&self) -> &str {
428        self.path.as_str()
429    }
430
431    /// Return total entry count.
432    #[must_use]
433    pub const fn entries(&self) -> u64 {
434        self.entries
435    }
436
437    /// Return user-namespace entry count.
438    #[must_use]
439    pub const fn user_entries(&self) -> u64 {
440        self.user_entries
441    }
442
443    /// Return system-namespace entry count.
444    #[must_use]
445    pub const fn system_entries(&self) -> u64 {
446        self.system_entries
447    }
448
449    /// Return memory usage in bytes.
450    #[must_use]
451    pub const fn memory_bytes(&self) -> u64 {
452        self.memory_bytes
453    }
454}
455
456///
457/// EntitySnapshot
458/// Per-entity storage breakdown across stores
459///
460
461#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
462pub struct EntitySnapshot {
463    /// Store path (e.g., icydb_schema_tests::schema::TestDataStore)
464    pub(crate) store: String,
465
466    /// Entity path (e.g., icydb_schema_tests::canister::db::Index)
467    pub(crate) path: String,
468
469    /// Number of rows for this entity in the store
470    pub(crate) entries: u64,
471
472    /// Approximate bytes used (key + value)
473    pub(crate) memory_bytes: u64,
474
475    /// Minimum primary key for this entity (entity-local ordering)
476    pub(crate) min_key: Option<Value>,
477
478    /// Maximum primary key for this entity (entity-local ordering)
479    pub(crate) max_key: Option<Value>,
480}
481
482impl EntitySnapshot {
483    /// Construct one entity-storage snapshot row.
484    #[must_use]
485    pub const fn new(
486        store: String,
487        path: String,
488        entries: u64,
489        memory_bytes: u64,
490        min_key: Option<Value>,
491        max_key: Option<Value>,
492    ) -> Self {
493        Self {
494            store,
495            path,
496            entries,
497            memory_bytes,
498            min_key,
499            max_key,
500        }
501    }
502
503    /// Borrow store path.
504    #[must_use]
505    pub const fn store(&self) -> &str {
506        self.store.as_str()
507    }
508
509    /// Borrow entity path.
510    #[must_use]
511    pub const fn path(&self) -> &str {
512        self.path.as_str()
513    }
514
515    /// Return row count.
516    #[must_use]
517    pub const fn entries(&self) -> u64 {
518        self.entries
519    }
520
521    /// Return memory usage in bytes.
522    #[must_use]
523    pub const fn memory_bytes(&self) -> u64 {
524        self.memory_bytes
525    }
526
527    /// Borrow optional minimum primary key.
528    #[must_use]
529    pub const fn min_key(&self) -> Option<&Value> {
530        self.min_key.as_ref()
531    }
532
533    /// Borrow optional maximum primary key.
534    #[must_use]
535    pub const fn max_key(&self) -> Option<&Value> {
536        self.max_key.as_ref()
537    }
538}
539
540///
541/// EntityStats
542/// Internal struct for building per-entity stats before snapshotting.
543///
544
545#[derive(Default)]
546struct EntityStats {
547    entries: u64,
548    memory_bytes: u64,
549    min_key: Option<StorageKey>,
550    max_key: Option<StorageKey>,
551}
552
553impl EntityStats {
554    // Accumulate per-entity counters and keep min/max over entity-local storage keys.
555    fn update(&mut self, dk: &DataKey, value_len: u64) {
556        self.entries = self.entries.saturating_add(1);
557        self.memory_bytes = self
558            .memory_bytes
559            .saturating_add(DataKey::entry_size_bytes(value_len));
560
561        let k = dk.storage_key();
562
563        match &mut self.min_key {
564            Some(min) if k < *min => *min = k,
565            None => self.min_key = Some(k),
566            _ => {}
567        }
568
569        match &mut self.max_key {
570            Some(max) if k > *max => *max = k,
571            None => self.max_key = Some(k),
572            _ => {}
573        }
574    }
575}
576
577/// Build one deterministic storage snapshot with per-entity rollups.
578///
579/// This path is read-only and fail-closed on decode/validation errors by counting
580/// corrupted keys/entries instead of panicking.
581pub(crate) fn storage_report<C: CanisterKind>(
582    db: &Db<C>,
583    name_to_path: &[(&'static str, &'static str)],
584) -> Result<StorageReport, InternalError> {
585    db.ensure_recovered_state()?;
586    // Build name→path map once, reuse across stores.
587    let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
588    let runtime_name_to_tag: BTreeMap<&str, EntityTag> =
589        db.runtime_entity_name_tag_pairs().into_iter().collect();
590    // Build one deterministic tag→path alias map to preserve report naming even
591    // after persisted keys move from string names to tag identities.
592    let mut tag_name_map = BTreeMap::<EntityTag, &str>::new();
593    for (entity_name, entity_tag) in &runtime_name_to_tag {
594        let path_name = name_map.get(entity_name).copied().unwrap_or(*entity_name);
595        tag_name_map.entry(*entity_tag).or_insert(path_name);
596    }
597    let mut data = Vec::new();
598    let mut index = Vec::new();
599    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
600    let mut corrupted_keys = 0u64;
601    let mut corrupted_entries = 0u64;
602
603    db.with_store_registry(|reg| {
604        // Keep diagnostics snapshots deterministic by traversing stores in path order.
605        let mut stores = reg.iter().collect::<Vec<_>>();
606        stores.sort_by_key(|(path, _)| *path);
607
608        for (path, store_handle) in stores {
609            // Phase 1: collect data-store snapshots and per-entity stats.
610            store_handle.with_data(|store| {
611                data.push(DataStoreSnapshot::new(
612                    path.to_string(),
613                    store.len(),
614                    store.memory_bytes(),
615                ));
616
617                // Track per-entity counts, memory, and min/max Keys (not DataKeys)
618                let mut by_entity: BTreeMap<EntityTag, EntityStats> = BTreeMap::new();
619
620                for entry in store.iter() {
621                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
622                        corrupted_keys = corrupted_keys.saturating_add(1);
623                        continue;
624                    };
625
626                    let value_len = entry.value().len() as u64;
627
628                    by_entity
629                        .entry(dk.entity_tag())
630                        .or_default()
631                        .update(&dk, value_len);
632                }
633
634                for (entity_tag, stats) in by_entity {
635                    let path_name = tag_name_map
636                        .get(&entity_tag)
637                        .copied()
638                        .map(str::to_string)
639                        .or_else(|| {
640                            db.runtime_hook_for_entity_tag(entity_tag)
641                                .ok()
642                                .map(|hooks| {
643                                    name_map
644                                        .get(hooks.entity_name)
645                                        .copied()
646                                        .unwrap_or(hooks.entity_name)
647                                        .to_string()
648                                })
649                        })
650                        .unwrap_or_else(|| format!("#{}", entity_tag.value()));
651                    entity_storage.push(EntitySnapshot::new(
652                        path.to_string(),
653                        path_name,
654                        stats.entries,
655                        stats.memory_bytes,
656                        stats.min_key.map(|key| key.as_value()),
657                        stats.max_key.map(|key| key.as_value()),
658                    ));
659                }
660            });
661
662            // Phase 2: collect index-store snapshots and integrity counters.
663            store_handle.with_index(|store| {
664                let mut user_entries = 0u64;
665                let mut system_entries = 0u64;
666
667                for (key, value) in store.entries() {
668                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
669                        corrupted_entries = corrupted_entries.saturating_add(1);
670                        continue;
671                    };
672
673                    if decoded_key.uses_system_namespace() {
674                        system_entries = system_entries.saturating_add(1);
675                    } else {
676                        user_entries = user_entries.saturating_add(1);
677                    }
678
679                    if value.validate().is_err() {
680                        corrupted_entries = corrupted_entries.saturating_add(1);
681                    }
682                }
683
684                index.push(IndexStoreSnapshot::new(
685                    path.to_string(),
686                    store.len(),
687                    user_entries,
688                    system_entries,
689                    store.memory_bytes(),
690                ));
691            });
692        }
693    });
694
695    // Phase 3: enforce deterministic entity snapshot emission order.
696    // This remains stable even if outer store traversal internals change.
697    entity_storage
698        .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
699
700    Ok(StorageReport::new(
701        data,
702        index,
703        entity_storage,
704        corrupted_keys,
705        corrupted_entries,
706    ))
707}
708
709/// Build one deterministic integrity scan over all registered stores.
710///
711/// This scan is read-only and classifies findings as:
712/// - corruption: malformed persisted bytes or inconsistent structural links
713/// - compatibility: persisted payloads outside decode compatibility windows
714/// - misuse: unsupported runtime wiring (for example missing entity hooks)
715pub(crate) fn integrity_report<C: CanisterKind>(
716    db: &Db<C>,
717) -> Result<IntegrityReport, InternalError> {
718    db.ensure_recovered_state()?;
719
720    integrity_report_after_recovery(db)
721}
722
723/// Build one deterministic integrity scan after recovery has already completed.
724///
725/// Callers running inside recovery flow should use this variant to avoid
726/// recursive recovery gating.
727pub(in crate::db) fn integrity_report_after_recovery<C: CanisterKind>(
728    db: &Db<C>,
729) -> Result<IntegrityReport, InternalError> {
730    build_integrity_report(db)
731}
732
733fn build_integrity_report<C: CanisterKind>(db: &Db<C>) -> Result<IntegrityReport, InternalError> {
734    let mut stores = Vec::new();
735    let mut totals = IntegrityTotals::default();
736    let global_live_keys_by_entity = collect_global_live_keys_by_entity(db)?;
737
738    db.with_store_registry(|reg| {
739        // Keep deterministic output order across registry traversal implementations.
740        let mut store_entries = reg.iter().collect::<Vec<_>>();
741        store_entries.sort_by_key(|(path, _)| *path);
742
743        for (path, store_handle) in store_entries {
744            let mut snapshot = IntegrityStoreSnapshot::new(path.to_string());
745            scan_store_forward_integrity(db, store_handle, &mut snapshot)?;
746            scan_store_reverse_integrity(store_handle, &global_live_keys_by_entity, &mut snapshot);
747
748            totals.add_store_snapshot(&snapshot);
749            stores.push(snapshot);
750        }
751
752        Ok::<(), InternalError>(())
753    })?;
754
755    Ok(IntegrityReport::new(stores, totals))
756}
757
758// Build one global map of live data keys grouped by entity across all stores.
759fn collect_global_live_keys_by_entity<C: CanisterKind>(
760    db: &Db<C>,
761) -> Result<BTreeMap<EntityTag, BTreeSet<StorageKey>>, InternalError> {
762    let mut keys = BTreeMap::<EntityTag, BTreeSet<StorageKey>>::new();
763
764    db.with_store_registry(|reg| {
765        for (_, store_handle) in reg.iter() {
766            store_handle.with_data(|data_store| {
767                for entry in data_store.iter() {
768                    if let Ok(data_key) = DataKey::try_from_raw(entry.key()) {
769                        keys.entry(data_key.entity_tag())
770                            .or_default()
771                            .insert(data_key.storage_key());
772                    }
773                }
774            });
775        }
776
777        Ok::<(), InternalError>(())
778    })?;
779
780    Ok(keys)
781}
782
783// Run forward (data -> index) integrity checks for one store.
784fn scan_store_forward_integrity<C: CanisterKind>(
785    db: &Db<C>,
786    store_handle: StoreHandle,
787    snapshot: &mut IntegrityStoreSnapshot,
788) -> Result<(), InternalError> {
789    store_handle.with_data(|data_store| {
790        for entry in data_store.iter() {
791            snapshot.data_rows_scanned = snapshot.data_rows_scanned.saturating_add(1);
792
793            let raw_key = *entry.key();
794
795            let Ok(data_key) = DataKey::try_from_raw(&raw_key) else {
796                snapshot.corrupted_data_keys = snapshot.corrupted_data_keys.saturating_add(1);
797                continue;
798            };
799
800            let hooks = match db.runtime_hook_for_entity_tag(data_key.entity_tag()) {
801                Ok(hooks) => hooks,
802                Err(err) => {
803                    classify_scan_error(err, snapshot)?;
804                    continue;
805                }
806            };
807
808            let marker_row = CommitRowOp::new(
809                hooks.entity_path,
810                raw_key.as_bytes().to_vec(),
811                None,
812                Some(entry.value().as_bytes().to_vec()),
813                (hooks.commit_schema_fingerprint)(),
814            );
815
816            // Validate envelope compatibility before typed preparation so
817            // incompatible persisted formats remain compatibility-classified.
818            if let Err(err) = decode_structural_row_cbor(&entry.value()) {
819                classify_scan_error(err, snapshot)?;
820                continue;
821            }
822
823            let prepared = match db.prepare_row_commit_op(&marker_row) {
824                Ok(prepared) => prepared,
825                Err(err) => {
826                    classify_scan_error(err, snapshot)?;
827                    continue;
828                }
829            };
830
831            for index_op in prepared.index_ops {
832                let Some(expected_value) = index_op.value else {
833                    continue;
834                };
835
836                let actual = index_op
837                    .store
838                    .with_borrow(|index_store| index_store.get(&index_op.key));
839                match actual {
840                    Some(actual_value) if actual_value == expected_value => {}
841                    Some(_) => {
842                        snapshot.divergent_index_entries =
843                            snapshot.divergent_index_entries.saturating_add(1);
844                    }
845                    None => {
846                        snapshot.missing_index_entries =
847                            snapshot.missing_index_entries.saturating_add(1);
848                    }
849                }
850            }
851        }
852
853        Ok::<(), InternalError>(())
854    })
855}
856
857// Run reverse (index -> data) integrity checks for one store.
858fn scan_store_reverse_integrity(
859    store_handle: StoreHandle,
860    live_keys_by_entity: &BTreeMap<EntityTag, BTreeSet<StorageKey>>,
861    snapshot: &mut IntegrityStoreSnapshot,
862) {
863    store_handle.with_index(|index_store| {
864        for (raw_index_key, raw_index_entry) in index_store.entries() {
865            snapshot.index_entries_scanned = snapshot.index_entries_scanned.saturating_add(1);
866
867            let Ok(decoded_index_key) = IndexKey::try_from_raw(&raw_index_key) else {
868                snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
869                continue;
870            };
871
872            let index_entity_tag = data_entity_tag_for_index_key(&decoded_index_key);
873
874            let Ok(indexed_primary_keys) = raw_index_entry.decode_keys() else {
875                snapshot.corrupted_index_entries =
876                    snapshot.corrupted_index_entries.saturating_add(1);
877                continue;
878            };
879
880            for primary_key in indexed_primary_keys {
881                let exists = live_keys_by_entity
882                    .get(&index_entity_tag)
883                    .is_some_and(|entity_keys| entity_keys.contains(&primary_key));
884                if !exists {
885                    snapshot.orphan_index_references =
886                        snapshot.orphan_index_references.saturating_add(1);
887                }
888            }
889        }
890    });
891}
892
893// Map scan-time errors into explicit integrity classification buckets.
894fn classify_scan_error(
895    err: InternalError,
896    snapshot: &mut IntegrityStoreSnapshot,
897) -> Result<(), InternalError> {
898    match err.class() {
899        ErrorClass::Corruption => {
900            snapshot.corrupted_data_rows = snapshot.corrupted_data_rows.saturating_add(1);
901            Ok(())
902        }
903        ErrorClass::IncompatiblePersistedFormat => {
904            snapshot.compatibility_findings = snapshot.compatibility_findings.saturating_add(1);
905            Ok(())
906        }
907        ErrorClass::Unsupported | ErrorClass::NotFound | ErrorClass::Conflict => {
908            snapshot.misuse_findings = snapshot.misuse_findings.saturating_add(1);
909            Ok(())
910        }
911        ErrorClass::Internal | ErrorClass::InvariantViolation => Err(err),
912    }
913}
914
915// Parse the data-entity identity from one decoded index key.
916const fn data_entity_tag_for_index_key(index_key: &IndexKey) -> EntityTag {
917    index_key.index_id().entity_tag
918}