Skip to main content

icydb_core/db/diagnostics/
mod.rs

1//! Module: diagnostics
2//! Responsibility: read-only storage footprint and integrity snapshots.
3//! Does not own: recovery, write-path mutation, or query planning semantics.
4//! Boundary: consumes `Db`/store read APIs and returns DTO snapshots.
5
6mod execution_trace;
7#[cfg(test)]
8mod tests;
9
10use crate::{
11    db::{
12        Db, EntityName,
13        codec::deserialize_row,
14        commit::CommitRowOp,
15        data::{DataKey, StorageKey},
16        index::IndexKey,
17        registry::StoreHandle,
18    },
19    error::{ErrorClass, InternalError},
20    traits::CanisterKind,
21    value::Value,
22};
23use candid::CandidType;
24use serde::{Deserialize, Serialize};
25use serde_cbor::Value as CborValue;
26use std::collections::{BTreeMap, BTreeSet};
27
28pub(crate) use execution_trace::ExecutionOptimizationCounter;
29pub(crate) use execution_trace::record_execution_optimization_hit_for_tests;
30#[cfg(test)]
31pub(crate) use execution_trace::take_execution_optimization_hits_for_tests;
32pub use execution_trace::{
33    ExecutionAccessPathVariant, ExecutionMetrics, ExecutionOptimization, ExecutionTrace,
34};
35
36///
37/// StorageReport
38/// Live storage snapshot report
39///
40
41#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
42pub struct StorageReport {
43    pub(crate) storage_data: Vec<DataStoreSnapshot>,
44    pub(crate) storage_index: Vec<IndexStoreSnapshot>,
45    pub(crate) entity_storage: Vec<EntitySnapshot>,
46    pub(crate) corrupted_keys: u64,
47    pub(crate) corrupted_entries: u64,
48}
49
50///
51/// IntegrityTotals
52/// Aggregated integrity-scan counters across all stores.
53///
54
55#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
56pub struct IntegrityTotals {
57    pub(crate) data_rows_scanned: u64,
58    pub(crate) index_entries_scanned: u64,
59    pub(crate) corrupted_data_keys: u64,
60    pub(crate) corrupted_data_rows: u64,
61    pub(crate) corrupted_index_keys: u64,
62    pub(crate) corrupted_index_entries: u64,
63    pub(crate) missing_index_entries: u64,
64    pub(crate) divergent_index_entries: u64,
65    pub(crate) orphan_index_references: u64,
66    pub(crate) compatibility_findings: u64,
67    pub(crate) misuse_findings: u64,
68}
69
70impl IntegrityTotals {
71    const fn add_store_snapshot(&mut self, store: &IntegrityStoreSnapshot) {
72        self.data_rows_scanned = self
73            .data_rows_scanned
74            .saturating_add(store.data_rows_scanned);
75        self.index_entries_scanned = self
76            .index_entries_scanned
77            .saturating_add(store.index_entries_scanned);
78        self.corrupted_data_keys = self
79            .corrupted_data_keys
80            .saturating_add(store.corrupted_data_keys);
81        self.corrupted_data_rows = self
82            .corrupted_data_rows
83            .saturating_add(store.corrupted_data_rows);
84        self.corrupted_index_keys = self
85            .corrupted_index_keys
86            .saturating_add(store.corrupted_index_keys);
87        self.corrupted_index_entries = self
88            .corrupted_index_entries
89            .saturating_add(store.corrupted_index_entries);
90        self.missing_index_entries = self
91            .missing_index_entries
92            .saturating_add(store.missing_index_entries);
93        self.divergent_index_entries = self
94            .divergent_index_entries
95            .saturating_add(store.divergent_index_entries);
96        self.orphan_index_references = self
97            .orphan_index_references
98            .saturating_add(store.orphan_index_references);
99        self.compatibility_findings = self
100            .compatibility_findings
101            .saturating_add(store.compatibility_findings);
102        self.misuse_findings = self.misuse_findings.saturating_add(store.misuse_findings);
103    }
104
105    /// Return total number of data rows scanned.
106    #[must_use]
107    pub const fn data_rows_scanned(&self) -> u64 {
108        self.data_rows_scanned
109    }
110
111    /// Return total number of index entries scanned.
112    #[must_use]
113    pub const fn index_entries_scanned(&self) -> u64 {
114        self.index_entries_scanned
115    }
116
117    /// Return total number of corrupted data-key findings.
118    #[must_use]
119    pub const fn corrupted_data_keys(&self) -> u64 {
120        self.corrupted_data_keys
121    }
122
123    /// Return total number of corrupted data-row findings.
124    #[must_use]
125    pub const fn corrupted_data_rows(&self) -> u64 {
126        self.corrupted_data_rows
127    }
128
129    /// Return total number of corrupted index-key findings.
130    #[must_use]
131    pub const fn corrupted_index_keys(&self) -> u64 {
132        self.corrupted_index_keys
133    }
134
135    /// Return total number of corrupted index-entry findings.
136    #[must_use]
137    pub const fn corrupted_index_entries(&self) -> u64 {
138        self.corrupted_index_entries
139    }
140
141    /// Return total number of missing index-entry findings.
142    #[must_use]
143    pub const fn missing_index_entries(&self) -> u64 {
144        self.missing_index_entries
145    }
146
147    /// Return total number of divergent index-entry findings.
148    #[must_use]
149    pub const fn divergent_index_entries(&self) -> u64 {
150        self.divergent_index_entries
151    }
152
153    /// Return total number of orphan index-reference findings.
154    #[must_use]
155    pub const fn orphan_index_references(&self) -> u64 {
156        self.orphan_index_references
157    }
158
159    /// Return total number of compatibility findings.
160    #[must_use]
161    pub const fn compatibility_findings(&self) -> u64 {
162        self.compatibility_findings
163    }
164
165    /// Return total number of misuse findings.
166    #[must_use]
167    pub const fn misuse_findings(&self) -> u64 {
168        self.misuse_findings
169    }
170}
171
172///
173/// IntegrityStoreSnapshot
174/// Per-store integrity findings and scan counters.
175///
176
177#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
178pub struct IntegrityStoreSnapshot {
179    pub(crate) path: String,
180    pub(crate) data_rows_scanned: u64,
181    pub(crate) index_entries_scanned: u64,
182    pub(crate) corrupted_data_keys: u64,
183    pub(crate) corrupted_data_rows: u64,
184    pub(crate) corrupted_index_keys: u64,
185    pub(crate) corrupted_index_entries: u64,
186    pub(crate) missing_index_entries: u64,
187    pub(crate) divergent_index_entries: u64,
188    pub(crate) orphan_index_references: u64,
189    pub(crate) compatibility_findings: u64,
190    pub(crate) misuse_findings: u64,
191}
192
193impl IntegrityStoreSnapshot {
194    /// Construct one empty store-level integrity snapshot.
195    #[must_use]
196    pub fn new(path: String) -> Self {
197        Self {
198            path,
199            ..Self::default()
200        }
201    }
202
203    /// Borrow store path.
204    #[must_use]
205    pub const fn path(&self) -> &str {
206        self.path.as_str()
207    }
208
209    /// Return number of scanned data rows.
210    #[must_use]
211    pub const fn data_rows_scanned(&self) -> u64 {
212        self.data_rows_scanned
213    }
214
215    /// Return number of scanned index entries.
216    #[must_use]
217    pub const fn index_entries_scanned(&self) -> u64 {
218        self.index_entries_scanned
219    }
220
221    /// Return number of corrupted data-key findings.
222    #[must_use]
223    pub const fn corrupted_data_keys(&self) -> u64 {
224        self.corrupted_data_keys
225    }
226
227    /// Return number of corrupted data-row findings.
228    #[must_use]
229    pub const fn corrupted_data_rows(&self) -> u64 {
230        self.corrupted_data_rows
231    }
232
233    /// Return number of corrupted index-key findings.
234    #[must_use]
235    pub const fn corrupted_index_keys(&self) -> u64 {
236        self.corrupted_index_keys
237    }
238
239    /// Return number of corrupted index-entry findings.
240    #[must_use]
241    pub const fn corrupted_index_entries(&self) -> u64 {
242        self.corrupted_index_entries
243    }
244
245    /// Return number of missing index-entry findings.
246    #[must_use]
247    pub const fn missing_index_entries(&self) -> u64 {
248        self.missing_index_entries
249    }
250
251    /// Return number of divergent index-entry findings.
252    #[must_use]
253    pub const fn divergent_index_entries(&self) -> u64 {
254        self.divergent_index_entries
255    }
256
257    /// Return number of orphan index-reference findings.
258    #[must_use]
259    pub const fn orphan_index_references(&self) -> u64 {
260        self.orphan_index_references
261    }
262
263    /// Return number of compatibility findings.
264    #[must_use]
265    pub const fn compatibility_findings(&self) -> u64 {
266        self.compatibility_findings
267    }
268
269    /// Return number of misuse findings.
270    #[must_use]
271    pub const fn misuse_findings(&self) -> u64 {
272        self.misuse_findings
273    }
274}
275
276///
277/// IntegrityReport
278/// Full integrity-scan output across all registered stores.
279///
280
281#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
282pub struct IntegrityReport {
283    pub(crate) stores: Vec<IntegrityStoreSnapshot>,
284    pub(crate) totals: IntegrityTotals,
285}
286
287impl IntegrityReport {
288    /// Construct one integrity report payload.
289    #[must_use]
290    pub const fn new(stores: Vec<IntegrityStoreSnapshot>, totals: IntegrityTotals) -> Self {
291        Self { stores, totals }
292    }
293
294    /// Borrow per-store integrity snapshots.
295    #[must_use]
296    pub const fn stores(&self) -> &[IntegrityStoreSnapshot] {
297        self.stores.as_slice()
298    }
299
300    /// Borrow aggregated integrity totals.
301    #[must_use]
302    pub const fn totals(&self) -> &IntegrityTotals {
303        &self.totals
304    }
305}
306
307impl StorageReport {
308    /// Construct one storage report payload.
309    #[must_use]
310    pub const fn new(
311        storage_data: Vec<DataStoreSnapshot>,
312        storage_index: Vec<IndexStoreSnapshot>,
313        entity_storage: Vec<EntitySnapshot>,
314        corrupted_keys: u64,
315        corrupted_entries: u64,
316    ) -> Self {
317        Self {
318            storage_data,
319            storage_index,
320            entity_storage,
321            corrupted_keys,
322            corrupted_entries,
323        }
324    }
325
326    /// Borrow data-store snapshots.
327    #[must_use]
328    pub const fn storage_data(&self) -> &[DataStoreSnapshot] {
329        self.storage_data.as_slice()
330    }
331
332    /// Borrow index-store snapshots.
333    #[must_use]
334    pub const fn storage_index(&self) -> &[IndexStoreSnapshot] {
335        self.storage_index.as_slice()
336    }
337
338    /// Borrow entity-level storage snapshots.
339    #[must_use]
340    pub const fn entity_storage(&self) -> &[EntitySnapshot] {
341        self.entity_storage.as_slice()
342    }
343
344    /// Return count of corrupted decoded data keys.
345    #[must_use]
346    pub const fn corrupted_keys(&self) -> u64 {
347        self.corrupted_keys
348    }
349
350    /// Return count of corrupted index entries.
351    #[must_use]
352    pub const fn corrupted_entries(&self) -> u64 {
353        self.corrupted_entries
354    }
355}
356
357///
358/// DataStoreSnapshot
359/// Store-level snapshot metrics.
360///
361
362#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
363pub struct DataStoreSnapshot {
364    pub(crate) path: String,
365    pub(crate) entries: u64,
366    pub(crate) memory_bytes: u64,
367}
368
369impl DataStoreSnapshot {
370    /// Construct one data-store snapshot row.
371    #[must_use]
372    pub const fn new(path: String, entries: u64, memory_bytes: u64) -> Self {
373        Self {
374            path,
375            entries,
376            memory_bytes,
377        }
378    }
379
380    /// Borrow store path.
381    #[must_use]
382    pub const fn path(&self) -> &str {
383        self.path.as_str()
384    }
385
386    /// Return row count.
387    #[must_use]
388    pub const fn entries(&self) -> u64 {
389        self.entries
390    }
391
392    /// Return memory usage in bytes.
393    #[must_use]
394    pub const fn memory_bytes(&self) -> u64 {
395        self.memory_bytes
396    }
397}
398
399///
400/// IndexStoreSnapshot
401/// Index-store snapshot metrics
402///
403
404#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
405pub struct IndexStoreSnapshot {
406    pub(crate) path: String,
407    pub(crate) entries: u64,
408    pub(crate) user_entries: u64,
409    pub(crate) system_entries: u64,
410    pub(crate) memory_bytes: u64,
411}
412
413impl IndexStoreSnapshot {
414    /// Construct one index-store snapshot row.
415    #[must_use]
416    pub const fn new(
417        path: String,
418        entries: u64,
419        user_entries: u64,
420        system_entries: u64,
421        memory_bytes: u64,
422    ) -> Self {
423        Self {
424            path,
425            entries,
426            user_entries,
427            system_entries,
428            memory_bytes,
429        }
430    }
431
432    /// Borrow store path.
433    #[must_use]
434    pub const fn path(&self) -> &str {
435        self.path.as_str()
436    }
437
438    /// Return total entry count.
439    #[must_use]
440    pub const fn entries(&self) -> u64 {
441        self.entries
442    }
443
444    /// Return user-namespace entry count.
445    #[must_use]
446    pub const fn user_entries(&self) -> u64 {
447        self.user_entries
448    }
449
450    /// Return system-namespace entry count.
451    #[must_use]
452    pub const fn system_entries(&self) -> u64 {
453        self.system_entries
454    }
455
456    /// Return memory usage in bytes.
457    #[must_use]
458    pub const fn memory_bytes(&self) -> u64 {
459        self.memory_bytes
460    }
461}
462
463///
464/// EntitySnapshot
465/// Per-entity storage breakdown across stores
466///
467
468#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
469pub struct EntitySnapshot {
470    /// Store path (e.g., icydb_schema_tests::schema::TestDataStore)
471    pub(crate) store: String,
472
473    /// Entity path (e.g., icydb_schema_tests::canister::db::Index)
474    pub(crate) path: String,
475
476    /// Number of rows for this entity in the store
477    pub(crate) entries: u64,
478
479    /// Approximate bytes used (key + value)
480    pub(crate) memory_bytes: u64,
481
482    /// Minimum primary key for this entity (entity-local ordering)
483    pub(crate) min_key: Option<Value>,
484
485    /// Maximum primary key for this entity (entity-local ordering)
486    pub(crate) max_key: Option<Value>,
487}
488
489impl EntitySnapshot {
490    /// Construct one entity-storage snapshot row.
491    #[must_use]
492    pub const fn new(
493        store: String,
494        path: String,
495        entries: u64,
496        memory_bytes: u64,
497        min_key: Option<Value>,
498        max_key: Option<Value>,
499    ) -> Self {
500        Self {
501            store,
502            path,
503            entries,
504            memory_bytes,
505            min_key,
506            max_key,
507        }
508    }
509
510    /// Borrow store path.
511    #[must_use]
512    pub const fn store(&self) -> &str {
513        self.store.as_str()
514    }
515
516    /// Borrow entity path.
517    #[must_use]
518    pub const fn path(&self) -> &str {
519        self.path.as_str()
520    }
521
522    /// Return row count.
523    #[must_use]
524    pub const fn entries(&self) -> u64 {
525        self.entries
526    }
527
528    /// Return memory usage in bytes.
529    #[must_use]
530    pub const fn memory_bytes(&self) -> u64 {
531        self.memory_bytes
532    }
533
534    /// Borrow optional minimum primary key.
535    #[must_use]
536    pub const fn min_key(&self) -> Option<&Value> {
537        self.min_key.as_ref()
538    }
539
540    /// Borrow optional maximum primary key.
541    #[must_use]
542    pub const fn max_key(&self) -> Option<&Value> {
543        self.max_key.as_ref()
544    }
545}
546
547///
548/// EntityStats
549/// Internal struct for building per-entity stats before snapshotting.
550///
551
552#[derive(Default)]
553struct EntityStats {
554    entries: u64,
555    memory_bytes: u64,
556    min_key: Option<StorageKey>,
557    max_key: Option<StorageKey>,
558}
559
560impl EntityStats {
561    // Accumulate per-entity counters and keep min/max over entity-local storage keys.
562    fn update(&mut self, dk: &DataKey, value_len: u64) {
563        self.entries = self.entries.saturating_add(1);
564        self.memory_bytes = self
565            .memory_bytes
566            .saturating_add(DataKey::entry_size_bytes(value_len));
567
568        let k = dk.storage_key();
569
570        match &mut self.min_key {
571            Some(min) if k < *min => *min = k,
572            None => self.min_key = Some(k),
573            _ => {}
574        }
575
576        match &mut self.max_key {
577            Some(max) if k > *max => *max = k,
578            None => self.max_key = Some(k),
579            _ => {}
580        }
581    }
582}
583
584/// Build one deterministic storage snapshot with per-entity rollups.
585///
586/// This path is read-only and fail-closed on decode/validation errors by counting
587/// corrupted keys/entries instead of panicking.
588pub(crate) fn storage_report<C: CanisterKind>(
589    db: &Db<C>,
590    name_to_path: &[(&'static str, &'static str)],
591) -> Result<StorageReport, InternalError> {
592    db.ensure_recovered_state()?;
593    // Build name→path map once, reuse across stores.
594    let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
595    let mut data = Vec::new();
596    let mut index = Vec::new();
597    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
598    let mut corrupted_keys = 0u64;
599    let mut corrupted_entries = 0u64;
600
601    db.with_store_registry(|reg| {
602        // Keep diagnostics snapshots deterministic by traversing stores in path order.
603        let mut stores = reg.iter().collect::<Vec<_>>();
604        stores.sort_by_key(|(path, _)| *path);
605
606        for (path, store_handle) in stores {
607            // Phase 1: collect data-store snapshots and per-entity stats.
608            store_handle.with_data(|store| {
609                data.push(DataStoreSnapshot::new(
610                    path.to_string(),
611                    store.len(),
612                    store.memory_bytes(),
613                ));
614
615                // Track per-entity counts, memory, and min/max Keys (not DataKeys)
616                let mut by_entity: BTreeMap<EntityName, EntityStats> = BTreeMap::new();
617
618                for entry in store.iter() {
619                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
620                        corrupted_keys = corrupted_keys.saturating_add(1);
621                        continue;
622                    };
623
624                    let value_len = entry.value().len() as u64;
625
626                    by_entity
627                        .entry(*dk.entity_name())
628                        .or_default()
629                        .update(&dk, value_len);
630                }
631
632                for (entity_name, stats) in by_entity {
633                    let path_name = name_map
634                        .get(entity_name.as_str())
635                        .copied()
636                        .unwrap_or(entity_name.as_str());
637                    entity_storage.push(EntitySnapshot::new(
638                        path.to_string(),
639                        path_name.to_string(),
640                        stats.entries,
641                        stats.memory_bytes,
642                        stats.min_key.map(|key| key.as_value()),
643                        stats.max_key.map(|key| key.as_value()),
644                    ));
645                }
646            });
647
648            // Phase 2: collect index-store snapshots and integrity counters.
649            store_handle.with_index(|store| {
650                let mut user_entries = 0u64;
651                let mut system_entries = 0u64;
652
653                for (key, value) in store.entries() {
654                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
655                        corrupted_entries = corrupted_entries.saturating_add(1);
656                        continue;
657                    };
658
659                    if decoded_key.uses_system_namespace() {
660                        system_entries = system_entries.saturating_add(1);
661                    } else {
662                        user_entries = user_entries.saturating_add(1);
663                    }
664
665                    if value.validate().is_err() {
666                        corrupted_entries = corrupted_entries.saturating_add(1);
667                    }
668                }
669
670                index.push(IndexStoreSnapshot::new(
671                    path.to_string(),
672                    store.len(),
673                    user_entries,
674                    system_entries,
675                    store.memory_bytes(),
676                ));
677            });
678        }
679    });
680
681    // Phase 3: enforce deterministic entity snapshot emission order.
682    // This remains stable even if outer store traversal internals change.
683    entity_storage
684        .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
685
686    Ok(StorageReport::new(
687        data,
688        index,
689        entity_storage,
690        corrupted_keys,
691        corrupted_entries,
692    ))
693}
694
695/// Build one deterministic integrity scan over all registered stores.
696///
697/// This scan is read-only and classifies findings as:
698/// - corruption: malformed persisted bytes or inconsistent structural links
699/// - compatibility: persisted payloads outside decode compatibility windows
700/// - misuse: unsupported runtime wiring (for example missing entity hooks)
701pub(crate) fn integrity_report<C: CanisterKind>(
702    db: &Db<C>,
703) -> Result<IntegrityReport, InternalError> {
704    db.ensure_recovered_state()?;
705
706    integrity_report_after_recovery(db)
707}
708
709/// Build one deterministic integrity scan after recovery has already completed.
710///
711/// Callers running inside recovery flow should use this variant to avoid
712/// recursive recovery gating.
713pub(in crate::db) fn integrity_report_after_recovery<C: CanisterKind>(
714    db: &Db<C>,
715) -> Result<IntegrityReport, InternalError> {
716    build_integrity_report(db)
717}
718
719fn build_integrity_report<C: CanisterKind>(db: &Db<C>) -> Result<IntegrityReport, InternalError> {
720    let mut stores = Vec::new();
721    let mut totals = IntegrityTotals::default();
722    let global_live_keys_by_entity = collect_global_live_keys_by_entity(db)?;
723
724    db.with_store_registry(|reg| {
725        // Keep deterministic output order across registry traversal implementations.
726        let mut store_entries = reg.iter().collect::<Vec<_>>();
727        store_entries.sort_by_key(|(path, _)| *path);
728
729        for (path, store_handle) in store_entries {
730            let mut snapshot = IntegrityStoreSnapshot::new(path.to_string());
731            scan_store_forward_integrity(db, store_handle, &mut snapshot)?;
732            scan_store_reverse_integrity(store_handle, &global_live_keys_by_entity, &mut snapshot);
733
734            totals.add_store_snapshot(&snapshot);
735            stores.push(snapshot);
736        }
737
738        Ok::<(), InternalError>(())
739    })?;
740
741    Ok(IntegrityReport::new(stores, totals))
742}
743
744// Build one global map of live data keys grouped by entity across all stores.
745fn collect_global_live_keys_by_entity<C: CanisterKind>(
746    db: &Db<C>,
747) -> Result<BTreeMap<EntityName, BTreeSet<StorageKey>>, InternalError> {
748    let mut keys = BTreeMap::<EntityName, BTreeSet<StorageKey>>::new();
749
750    db.with_store_registry(|reg| {
751        for (_, store_handle) in reg.iter() {
752            store_handle.with_data(|data_store| {
753                for entry in data_store.iter() {
754                    if let Ok(data_key) = DataKey::try_from_raw(entry.key()) {
755                        keys.entry(*data_key.entity_name())
756                            .or_default()
757                            .insert(data_key.storage_key());
758                    }
759                }
760            });
761        }
762
763        Ok::<(), InternalError>(())
764    })?;
765
766    Ok(keys)
767}
768
769// Run forward (data -> index) integrity checks for one store.
770fn scan_store_forward_integrity<C: CanisterKind>(
771    db: &Db<C>,
772    store_handle: StoreHandle,
773    snapshot: &mut IntegrityStoreSnapshot,
774) -> Result<(), InternalError> {
775    store_handle.with_data(|data_store| {
776        for entry in data_store.iter() {
777            snapshot.data_rows_scanned = snapshot.data_rows_scanned.saturating_add(1);
778
779            let raw_key = *entry.key();
780
781            let Ok(data_key) = DataKey::try_from_raw(&raw_key) else {
782                snapshot.corrupted_data_keys = snapshot.corrupted_data_keys.saturating_add(1);
783                continue;
784            };
785
786            let entity_name = data_key.entity_name().as_str();
787            let hooks = match db.runtime_hook_for_entity_name(entity_name) {
788                Ok(hooks) => hooks,
789                Err(err) => {
790                    classify_scan_error(err, snapshot)?;
791                    continue;
792                }
793            };
794
795            let marker_row = CommitRowOp::new(
796                hooks.entity_path,
797                raw_key.as_bytes().to_vec(),
798                None,
799                Some(entry.value().as_bytes().to_vec()),
800                (hooks.commit_schema_fingerprint)(),
801            );
802
803            // Validate envelope compatibility before typed preparation so
804            // incompatible persisted formats remain compatibility-classified.
805            if let Err(err) = deserialize_row::<CborValue>(entry.value().as_bytes()) {
806                classify_scan_error(err, snapshot)?;
807                continue;
808            }
809
810            let prepared = match db.prepare_row_commit_op(&marker_row) {
811                Ok(prepared) => prepared,
812                Err(err) => {
813                    classify_scan_error(err, snapshot)?;
814                    continue;
815                }
816            };
817
818            for index_op in prepared.index_ops {
819                let Some(expected_value) = index_op.value else {
820                    continue;
821                };
822
823                let actual = index_op
824                    .store
825                    .with_borrow(|index_store| index_store.get(&index_op.key));
826                match actual {
827                    Some(actual_value) if actual_value == expected_value => {}
828                    Some(_) => {
829                        snapshot.divergent_index_entries =
830                            snapshot.divergent_index_entries.saturating_add(1);
831                    }
832                    None => {
833                        snapshot.missing_index_entries =
834                            snapshot.missing_index_entries.saturating_add(1);
835                    }
836                }
837            }
838        }
839
840        Ok::<(), InternalError>(())
841    })
842}
843
844// Run reverse (index -> data) integrity checks for one store.
845fn scan_store_reverse_integrity(
846    store_handle: StoreHandle,
847    live_keys_by_entity: &BTreeMap<EntityName, BTreeSet<StorageKey>>,
848    snapshot: &mut IntegrityStoreSnapshot,
849) {
850    store_handle.with_index(|index_store| {
851        for (raw_index_key, raw_index_entry) in index_store.entries() {
852            snapshot.index_entries_scanned = snapshot.index_entries_scanned.saturating_add(1);
853
854            let Ok(decoded_index_key) = IndexKey::try_from_raw(&raw_index_key) else {
855                snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
856                continue;
857            };
858
859            let Some(index_entity_name) = data_entity_name_for_index_key(&decoded_index_key) else {
860                snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
861                continue;
862            };
863
864            let Ok(indexed_primary_keys) = raw_index_entry.decode_keys() else {
865                snapshot.corrupted_index_entries =
866                    snapshot.corrupted_index_entries.saturating_add(1);
867                continue;
868            };
869
870            for primary_key in indexed_primary_keys {
871                let exists = live_keys_by_entity
872                    .get(&index_entity_name)
873                    .is_some_and(|entity_keys| entity_keys.contains(&primary_key));
874                if !exists {
875                    snapshot.orphan_index_references =
876                        snapshot.orphan_index_references.saturating_add(1);
877                }
878            }
879        }
880    });
881}
882
883// Map scan-time errors into explicit integrity classification buckets.
884fn classify_scan_error(
885    err: InternalError,
886    snapshot: &mut IntegrityStoreSnapshot,
887) -> Result<(), InternalError> {
888    match err.class() {
889        ErrorClass::Corruption => {
890            snapshot.corrupted_data_rows = snapshot.corrupted_data_rows.saturating_add(1);
891            Ok(())
892        }
893        ErrorClass::IncompatiblePersistedFormat => {
894            snapshot.compatibility_findings = snapshot.compatibility_findings.saturating_add(1);
895            Ok(())
896        }
897        ErrorClass::Unsupported | ErrorClass::NotFound | ErrorClass::Conflict => {
898            snapshot.misuse_findings = snapshot.misuse_findings.saturating_add(1);
899            Ok(())
900        }
901        ErrorClass::Internal | ErrorClass::InvariantViolation => Err(err),
902    }
903}
904
905// Parse the data-entity identity from one decoded index key.
906fn data_entity_name_for_index_key(index_key: &IndexKey) -> Option<EntityName> {
907    let full_name = index_key.index_id().0.as_str();
908    let entity_name = full_name
909        .split_once('|')
910        .map_or(full_name, |(entity, _)| entity);
911    EntityName::try_from_str(entity_name).ok()
912}