Skip to main content

icydb_core/db/diagnostics/
mod.rs

1//! Module: diagnostics
2//! Responsibility: read-only storage footprint and integrity snapshots.
3//! Does not own: recovery, write-path mutation, or query planning semantics.
4//! Boundary: consumes `Db`/store read APIs and returns DTO snapshots.
5
6mod execution_trace;
7
8use crate::{
9    db::{
10        Db,
11        codec::deserialize_row,
12        commit::CommitRowOp,
13        data::{DataKey, StorageKey},
14        index::IndexKey,
15        registry::StoreHandle,
16    },
17    error::{ErrorClass, InternalError},
18    traits::CanisterKind,
19    types::EntityTag,
20    value::Value,
21};
22use candid::CandidType;
23use serde::{Deserialize, Serialize};
24use serde_cbor::Value as CborValue;
25use std::collections::{BTreeMap, BTreeSet};
26
27pub use execution_trace::{
28    ExecutionAccessPathVariant, ExecutionMetrics, ExecutionOptimization, ExecutionTrace,
29};
30
31///
32/// StorageReport
33/// Live storage snapshot report
34///
35
36#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
37pub struct StorageReport {
38    pub(crate) storage_data: Vec<DataStoreSnapshot>,
39    pub(crate) storage_index: Vec<IndexStoreSnapshot>,
40    pub(crate) entity_storage: Vec<EntitySnapshot>,
41    pub(crate) corrupted_keys: u64,
42    pub(crate) corrupted_entries: u64,
43}
44
45///
46/// IntegrityTotals
47/// Aggregated integrity-scan counters across all stores.
48///
49
50#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
51pub struct IntegrityTotals {
52    pub(crate) data_rows_scanned: u64,
53    pub(crate) index_entries_scanned: u64,
54    pub(crate) corrupted_data_keys: u64,
55    pub(crate) corrupted_data_rows: u64,
56    pub(crate) corrupted_index_keys: u64,
57    pub(crate) corrupted_index_entries: u64,
58    pub(crate) missing_index_entries: u64,
59    pub(crate) divergent_index_entries: u64,
60    pub(crate) orphan_index_references: u64,
61    pub(crate) compatibility_findings: u64,
62    pub(crate) misuse_findings: u64,
63}
64
65impl IntegrityTotals {
66    const fn add_store_snapshot(&mut self, store: &IntegrityStoreSnapshot) {
67        self.data_rows_scanned = self
68            .data_rows_scanned
69            .saturating_add(store.data_rows_scanned);
70        self.index_entries_scanned = self
71            .index_entries_scanned
72            .saturating_add(store.index_entries_scanned);
73        self.corrupted_data_keys = self
74            .corrupted_data_keys
75            .saturating_add(store.corrupted_data_keys);
76        self.corrupted_data_rows = self
77            .corrupted_data_rows
78            .saturating_add(store.corrupted_data_rows);
79        self.corrupted_index_keys = self
80            .corrupted_index_keys
81            .saturating_add(store.corrupted_index_keys);
82        self.corrupted_index_entries = self
83            .corrupted_index_entries
84            .saturating_add(store.corrupted_index_entries);
85        self.missing_index_entries = self
86            .missing_index_entries
87            .saturating_add(store.missing_index_entries);
88        self.divergent_index_entries = self
89            .divergent_index_entries
90            .saturating_add(store.divergent_index_entries);
91        self.orphan_index_references = self
92            .orphan_index_references
93            .saturating_add(store.orphan_index_references);
94        self.compatibility_findings = self
95            .compatibility_findings
96            .saturating_add(store.compatibility_findings);
97        self.misuse_findings = self.misuse_findings.saturating_add(store.misuse_findings);
98    }
99
100    /// Return total number of data rows scanned.
101    #[must_use]
102    pub const fn data_rows_scanned(&self) -> u64 {
103        self.data_rows_scanned
104    }
105
106    /// Return total number of index entries scanned.
107    #[must_use]
108    pub const fn index_entries_scanned(&self) -> u64 {
109        self.index_entries_scanned
110    }
111
112    /// Return total number of corrupted data-key findings.
113    #[must_use]
114    pub const fn corrupted_data_keys(&self) -> u64 {
115        self.corrupted_data_keys
116    }
117
118    /// Return total number of corrupted data-row findings.
119    #[must_use]
120    pub const fn corrupted_data_rows(&self) -> u64 {
121        self.corrupted_data_rows
122    }
123
124    /// Return total number of corrupted index-key findings.
125    #[must_use]
126    pub const fn corrupted_index_keys(&self) -> u64 {
127        self.corrupted_index_keys
128    }
129
130    /// Return total number of corrupted index-entry findings.
131    #[must_use]
132    pub const fn corrupted_index_entries(&self) -> u64 {
133        self.corrupted_index_entries
134    }
135
136    /// Return total number of missing index-entry findings.
137    #[must_use]
138    pub const fn missing_index_entries(&self) -> u64 {
139        self.missing_index_entries
140    }
141
142    /// Return total number of divergent index-entry findings.
143    #[must_use]
144    pub const fn divergent_index_entries(&self) -> u64 {
145        self.divergent_index_entries
146    }
147
148    /// Return total number of orphan index-reference findings.
149    #[must_use]
150    pub const fn orphan_index_references(&self) -> u64 {
151        self.orphan_index_references
152    }
153
154    /// Return total number of compatibility findings.
155    #[must_use]
156    pub const fn compatibility_findings(&self) -> u64 {
157        self.compatibility_findings
158    }
159
160    /// Return total number of misuse findings.
161    #[must_use]
162    pub const fn misuse_findings(&self) -> u64 {
163        self.misuse_findings
164    }
165}
166
167///
168/// IntegrityStoreSnapshot
169/// Per-store integrity findings and scan counters.
170///
171
172#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
173pub struct IntegrityStoreSnapshot {
174    pub(crate) path: String,
175    pub(crate) data_rows_scanned: u64,
176    pub(crate) index_entries_scanned: u64,
177    pub(crate) corrupted_data_keys: u64,
178    pub(crate) corrupted_data_rows: u64,
179    pub(crate) corrupted_index_keys: u64,
180    pub(crate) corrupted_index_entries: u64,
181    pub(crate) missing_index_entries: u64,
182    pub(crate) divergent_index_entries: u64,
183    pub(crate) orphan_index_references: u64,
184    pub(crate) compatibility_findings: u64,
185    pub(crate) misuse_findings: u64,
186}
187
188impl IntegrityStoreSnapshot {
189    /// Construct one empty store-level integrity snapshot.
190    #[must_use]
191    pub fn new(path: String) -> Self {
192        Self {
193            path,
194            ..Self::default()
195        }
196    }
197
198    /// Borrow store path.
199    #[must_use]
200    pub const fn path(&self) -> &str {
201        self.path.as_str()
202    }
203
204    /// Return number of scanned data rows.
205    #[must_use]
206    pub const fn data_rows_scanned(&self) -> u64 {
207        self.data_rows_scanned
208    }
209
210    /// Return number of scanned index entries.
211    #[must_use]
212    pub const fn index_entries_scanned(&self) -> u64 {
213        self.index_entries_scanned
214    }
215
216    /// Return number of corrupted data-key findings.
217    #[must_use]
218    pub const fn corrupted_data_keys(&self) -> u64 {
219        self.corrupted_data_keys
220    }
221
222    /// Return number of corrupted data-row findings.
223    #[must_use]
224    pub const fn corrupted_data_rows(&self) -> u64 {
225        self.corrupted_data_rows
226    }
227
228    /// Return number of corrupted index-key findings.
229    #[must_use]
230    pub const fn corrupted_index_keys(&self) -> u64 {
231        self.corrupted_index_keys
232    }
233
234    /// Return number of corrupted index-entry findings.
235    #[must_use]
236    pub const fn corrupted_index_entries(&self) -> u64 {
237        self.corrupted_index_entries
238    }
239
240    /// Return number of missing index-entry findings.
241    #[must_use]
242    pub const fn missing_index_entries(&self) -> u64 {
243        self.missing_index_entries
244    }
245
246    /// Return number of divergent index-entry findings.
247    #[must_use]
248    pub const fn divergent_index_entries(&self) -> u64 {
249        self.divergent_index_entries
250    }
251
252    /// Return number of orphan index-reference findings.
253    #[must_use]
254    pub const fn orphan_index_references(&self) -> u64 {
255        self.orphan_index_references
256    }
257
258    /// Return number of compatibility findings.
259    #[must_use]
260    pub const fn compatibility_findings(&self) -> u64 {
261        self.compatibility_findings
262    }
263
264    /// Return number of misuse findings.
265    #[must_use]
266    pub const fn misuse_findings(&self) -> u64 {
267        self.misuse_findings
268    }
269}
270
271///
272/// IntegrityReport
273/// Full integrity-scan output across all registered stores.
274///
275
276#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
277pub struct IntegrityReport {
278    pub(crate) stores: Vec<IntegrityStoreSnapshot>,
279    pub(crate) totals: IntegrityTotals,
280}
281
282impl IntegrityReport {
283    /// Construct one integrity report payload.
284    #[must_use]
285    pub const fn new(stores: Vec<IntegrityStoreSnapshot>, totals: IntegrityTotals) -> Self {
286        Self { stores, totals }
287    }
288
289    /// Borrow per-store integrity snapshots.
290    #[must_use]
291    pub const fn stores(&self) -> &[IntegrityStoreSnapshot] {
292        self.stores.as_slice()
293    }
294
295    /// Borrow aggregated integrity totals.
296    #[must_use]
297    pub const fn totals(&self) -> &IntegrityTotals {
298        &self.totals
299    }
300}
301
302impl StorageReport {
303    /// Construct one storage report payload.
304    #[must_use]
305    pub const fn new(
306        storage_data: Vec<DataStoreSnapshot>,
307        storage_index: Vec<IndexStoreSnapshot>,
308        entity_storage: Vec<EntitySnapshot>,
309        corrupted_keys: u64,
310        corrupted_entries: u64,
311    ) -> Self {
312        Self {
313            storage_data,
314            storage_index,
315            entity_storage,
316            corrupted_keys,
317            corrupted_entries,
318        }
319    }
320
321    /// Borrow data-store snapshots.
322    #[must_use]
323    pub const fn storage_data(&self) -> &[DataStoreSnapshot] {
324        self.storage_data.as_slice()
325    }
326
327    /// Borrow index-store snapshots.
328    #[must_use]
329    pub const fn storage_index(&self) -> &[IndexStoreSnapshot] {
330        self.storage_index.as_slice()
331    }
332
333    /// Borrow entity-level storage snapshots.
334    #[must_use]
335    pub const fn entity_storage(&self) -> &[EntitySnapshot] {
336        self.entity_storage.as_slice()
337    }
338
339    /// Return count of corrupted decoded data keys.
340    #[must_use]
341    pub const fn corrupted_keys(&self) -> u64 {
342        self.corrupted_keys
343    }
344
345    /// Return count of corrupted index entries.
346    #[must_use]
347    pub const fn corrupted_entries(&self) -> u64 {
348        self.corrupted_entries
349    }
350}
351
352///
353/// DataStoreSnapshot
354/// Store-level snapshot metrics.
355///
356
357#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
358pub struct DataStoreSnapshot {
359    pub(crate) path: String,
360    pub(crate) entries: u64,
361    pub(crate) memory_bytes: u64,
362}
363
364impl DataStoreSnapshot {
365    /// Construct one data-store snapshot row.
366    #[must_use]
367    pub const fn new(path: String, entries: u64, memory_bytes: u64) -> Self {
368        Self {
369            path,
370            entries,
371            memory_bytes,
372        }
373    }
374
375    /// Borrow store path.
376    #[must_use]
377    pub const fn path(&self) -> &str {
378        self.path.as_str()
379    }
380
381    /// Return row count.
382    #[must_use]
383    pub const fn entries(&self) -> u64 {
384        self.entries
385    }
386
387    /// Return memory usage in bytes.
388    #[must_use]
389    pub const fn memory_bytes(&self) -> u64 {
390        self.memory_bytes
391    }
392}
393
394///
395/// IndexStoreSnapshot
396/// Index-store snapshot metrics
397///
398
399#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
400pub struct IndexStoreSnapshot {
401    pub(crate) path: String,
402    pub(crate) entries: u64,
403    pub(crate) user_entries: u64,
404    pub(crate) system_entries: u64,
405    pub(crate) memory_bytes: u64,
406}
407
408impl IndexStoreSnapshot {
409    /// Construct one index-store snapshot row.
410    #[must_use]
411    pub const fn new(
412        path: String,
413        entries: u64,
414        user_entries: u64,
415        system_entries: u64,
416        memory_bytes: u64,
417    ) -> Self {
418        Self {
419            path,
420            entries,
421            user_entries,
422            system_entries,
423            memory_bytes,
424        }
425    }
426
427    /// Borrow store path.
428    #[must_use]
429    pub const fn path(&self) -> &str {
430        self.path.as_str()
431    }
432
433    /// Return total entry count.
434    #[must_use]
435    pub const fn entries(&self) -> u64 {
436        self.entries
437    }
438
439    /// Return user-namespace entry count.
440    #[must_use]
441    pub const fn user_entries(&self) -> u64 {
442        self.user_entries
443    }
444
445    /// Return system-namespace entry count.
446    #[must_use]
447    pub const fn system_entries(&self) -> u64 {
448        self.system_entries
449    }
450
451    /// Return memory usage in bytes.
452    #[must_use]
453    pub const fn memory_bytes(&self) -> u64 {
454        self.memory_bytes
455    }
456}
457
458///
459/// EntitySnapshot
460/// Per-entity storage breakdown across stores
461///
462
463#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
464pub struct EntitySnapshot {
465    /// Store path (e.g., icydb_schema_tests::schema::TestDataStore)
466    pub(crate) store: String,
467
468    /// Entity path (e.g., icydb_schema_tests::canister::db::Index)
469    pub(crate) path: String,
470
471    /// Number of rows for this entity in the store
472    pub(crate) entries: u64,
473
474    /// Approximate bytes used (key + value)
475    pub(crate) memory_bytes: u64,
476
477    /// Minimum primary key for this entity (entity-local ordering)
478    pub(crate) min_key: Option<Value>,
479
480    /// Maximum primary key for this entity (entity-local ordering)
481    pub(crate) max_key: Option<Value>,
482}
483
484impl EntitySnapshot {
485    /// Construct one entity-storage snapshot row.
486    #[must_use]
487    pub const fn new(
488        store: String,
489        path: String,
490        entries: u64,
491        memory_bytes: u64,
492        min_key: Option<Value>,
493        max_key: Option<Value>,
494    ) -> Self {
495        Self {
496            store,
497            path,
498            entries,
499            memory_bytes,
500            min_key,
501            max_key,
502        }
503    }
504
505    /// Borrow store path.
506    #[must_use]
507    pub const fn store(&self) -> &str {
508        self.store.as_str()
509    }
510
511    /// Borrow entity path.
512    #[must_use]
513    pub const fn path(&self) -> &str {
514        self.path.as_str()
515    }
516
517    /// Return row count.
518    #[must_use]
519    pub const fn entries(&self) -> u64 {
520        self.entries
521    }
522
523    /// Return memory usage in bytes.
524    #[must_use]
525    pub const fn memory_bytes(&self) -> u64 {
526        self.memory_bytes
527    }
528
529    /// Borrow optional minimum primary key.
530    #[must_use]
531    pub const fn min_key(&self) -> Option<&Value> {
532        self.min_key.as_ref()
533    }
534
535    /// Borrow optional maximum primary key.
536    #[must_use]
537    pub const fn max_key(&self) -> Option<&Value> {
538        self.max_key.as_ref()
539    }
540}
541
542///
543/// EntityStats
544/// Internal struct for building per-entity stats before snapshotting.
545///
546
547#[derive(Default)]
548struct EntityStats {
549    entries: u64,
550    memory_bytes: u64,
551    min_key: Option<StorageKey>,
552    max_key: Option<StorageKey>,
553}
554
555impl EntityStats {
556    // Accumulate per-entity counters and keep min/max over entity-local storage keys.
557    fn update(&mut self, dk: &DataKey, value_len: u64) {
558        self.entries = self.entries.saturating_add(1);
559        self.memory_bytes = self
560            .memory_bytes
561            .saturating_add(DataKey::entry_size_bytes(value_len));
562
563        let k = dk.storage_key();
564
565        match &mut self.min_key {
566            Some(min) if k < *min => *min = k,
567            None => self.min_key = Some(k),
568            _ => {}
569        }
570
571        match &mut self.max_key {
572            Some(max) if k > *max => *max = k,
573            None => self.max_key = Some(k),
574            _ => {}
575        }
576    }
577}
578
579/// Build one deterministic storage snapshot with per-entity rollups.
580///
581/// This path is read-only and fail-closed on decode/validation errors by counting
582/// corrupted keys/entries instead of panicking.
583pub(crate) fn storage_report<C: CanisterKind>(
584    db: &Db<C>,
585    name_to_path: &[(&'static str, &'static str)],
586) -> Result<StorageReport, InternalError> {
587    db.ensure_recovered_state()?;
588    // Build name→path map once, reuse across stores.
589    let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
590    let runtime_name_to_tag: BTreeMap<&str, EntityTag> =
591        db.runtime_entity_name_tag_pairs().into_iter().collect();
592    // Build one deterministic tag→path alias map to preserve report naming even
593    // after persisted keys move from string names to tag identities.
594    let mut tag_name_map = BTreeMap::<EntityTag, &str>::new();
595    for (entity_name, entity_tag) in &runtime_name_to_tag {
596        let path_name = name_map.get(entity_name).copied().unwrap_or(*entity_name);
597        tag_name_map.entry(*entity_tag).or_insert(path_name);
598    }
599    let mut data = Vec::new();
600    let mut index = Vec::new();
601    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
602    let mut corrupted_keys = 0u64;
603    let mut corrupted_entries = 0u64;
604
605    db.with_store_registry(|reg| {
606        // Keep diagnostics snapshots deterministic by traversing stores in path order.
607        let mut stores = reg.iter().collect::<Vec<_>>();
608        stores.sort_by_key(|(path, _)| *path);
609
610        for (path, store_handle) in stores {
611            // Phase 1: collect data-store snapshots and per-entity stats.
612            store_handle.with_data(|store| {
613                data.push(DataStoreSnapshot::new(
614                    path.to_string(),
615                    store.len(),
616                    store.memory_bytes(),
617                ));
618
619                // Track per-entity counts, memory, and min/max Keys (not DataKeys)
620                let mut by_entity: BTreeMap<EntityTag, EntityStats> = BTreeMap::new();
621
622                for entry in store.iter() {
623                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
624                        corrupted_keys = corrupted_keys.saturating_add(1);
625                        continue;
626                    };
627
628                    let value_len = entry.value().len() as u64;
629
630                    by_entity
631                        .entry(dk.entity_tag())
632                        .or_default()
633                        .update(&dk, value_len);
634                }
635
636                for (entity_tag, stats) in by_entity {
637                    let path_name = tag_name_map
638                        .get(&entity_tag)
639                        .copied()
640                        .map(str::to_string)
641                        .or_else(|| {
642                            db.runtime_hook_for_entity_tag(entity_tag)
643                                .ok()
644                                .map(|hooks| {
645                                    name_map
646                                        .get(hooks.entity_name)
647                                        .copied()
648                                        .unwrap_or(hooks.entity_name)
649                                        .to_string()
650                                })
651                        })
652                        .unwrap_or_else(|| format!("#{}", entity_tag.value()));
653                    entity_storage.push(EntitySnapshot::new(
654                        path.to_string(),
655                        path_name,
656                        stats.entries,
657                        stats.memory_bytes,
658                        stats.min_key.map(|key| key.as_value()),
659                        stats.max_key.map(|key| key.as_value()),
660                    ));
661                }
662            });
663
664            // Phase 2: collect index-store snapshots and integrity counters.
665            store_handle.with_index(|store| {
666                let mut user_entries = 0u64;
667                let mut system_entries = 0u64;
668
669                for (key, value) in store.entries() {
670                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
671                        corrupted_entries = corrupted_entries.saturating_add(1);
672                        continue;
673                    };
674
675                    if decoded_key.uses_system_namespace() {
676                        system_entries = system_entries.saturating_add(1);
677                    } else {
678                        user_entries = user_entries.saturating_add(1);
679                    }
680
681                    if value.validate().is_err() {
682                        corrupted_entries = corrupted_entries.saturating_add(1);
683                    }
684                }
685
686                index.push(IndexStoreSnapshot::new(
687                    path.to_string(),
688                    store.len(),
689                    user_entries,
690                    system_entries,
691                    store.memory_bytes(),
692                ));
693            });
694        }
695    });
696
697    // Phase 3: enforce deterministic entity snapshot emission order.
698    // This remains stable even if outer store traversal internals change.
699    entity_storage
700        .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
701
702    Ok(StorageReport::new(
703        data,
704        index,
705        entity_storage,
706        corrupted_keys,
707        corrupted_entries,
708    ))
709}
710
711/// Build one deterministic integrity scan over all registered stores.
712///
713/// This scan is read-only and classifies findings as:
714/// - corruption: malformed persisted bytes or inconsistent structural links
715/// - compatibility: persisted payloads outside decode compatibility windows
716/// - misuse: unsupported runtime wiring (for example missing entity hooks)
717pub(crate) fn integrity_report<C: CanisterKind>(
718    db: &Db<C>,
719) -> Result<IntegrityReport, InternalError> {
720    db.ensure_recovered_state()?;
721
722    integrity_report_after_recovery(db)
723}
724
725/// Build one deterministic integrity scan after recovery has already completed.
726///
727/// Callers running inside recovery flow should use this variant to avoid
728/// recursive recovery gating.
729pub(in crate::db) fn integrity_report_after_recovery<C: CanisterKind>(
730    db: &Db<C>,
731) -> Result<IntegrityReport, InternalError> {
732    build_integrity_report(db)
733}
734
735fn build_integrity_report<C: CanisterKind>(db: &Db<C>) -> Result<IntegrityReport, InternalError> {
736    let mut stores = Vec::new();
737    let mut totals = IntegrityTotals::default();
738    let global_live_keys_by_entity = collect_global_live_keys_by_entity(db)?;
739
740    db.with_store_registry(|reg| {
741        // Keep deterministic output order across registry traversal implementations.
742        let mut store_entries = reg.iter().collect::<Vec<_>>();
743        store_entries.sort_by_key(|(path, _)| *path);
744
745        for (path, store_handle) in store_entries {
746            let mut snapshot = IntegrityStoreSnapshot::new(path.to_string());
747            scan_store_forward_integrity(db, store_handle, &mut snapshot)?;
748            scan_store_reverse_integrity(store_handle, &global_live_keys_by_entity, &mut snapshot);
749
750            totals.add_store_snapshot(&snapshot);
751            stores.push(snapshot);
752        }
753
754        Ok::<(), InternalError>(())
755    })?;
756
757    Ok(IntegrityReport::new(stores, totals))
758}
759
760// Build one global map of live data keys grouped by entity across all stores.
761fn collect_global_live_keys_by_entity<C: CanisterKind>(
762    db: &Db<C>,
763) -> Result<BTreeMap<EntityTag, BTreeSet<StorageKey>>, InternalError> {
764    let mut keys = BTreeMap::<EntityTag, BTreeSet<StorageKey>>::new();
765
766    db.with_store_registry(|reg| {
767        for (_, store_handle) in reg.iter() {
768            store_handle.with_data(|data_store| {
769                for entry in data_store.iter() {
770                    if let Ok(data_key) = DataKey::try_from_raw(entry.key()) {
771                        keys.entry(data_key.entity_tag())
772                            .or_default()
773                            .insert(data_key.storage_key());
774                    }
775                }
776            });
777        }
778
779        Ok::<(), InternalError>(())
780    })?;
781
782    Ok(keys)
783}
784
785// Run forward (data -> index) integrity checks for one store.
786fn scan_store_forward_integrity<C: CanisterKind>(
787    db: &Db<C>,
788    store_handle: StoreHandle,
789    snapshot: &mut IntegrityStoreSnapshot,
790) -> Result<(), InternalError> {
791    store_handle.with_data(|data_store| {
792        for entry in data_store.iter() {
793            snapshot.data_rows_scanned = snapshot.data_rows_scanned.saturating_add(1);
794
795            let raw_key = *entry.key();
796
797            let Ok(data_key) = DataKey::try_from_raw(&raw_key) else {
798                snapshot.corrupted_data_keys = snapshot.corrupted_data_keys.saturating_add(1);
799                continue;
800            };
801
802            let hooks = match db.runtime_hook_for_entity_tag(data_key.entity_tag()) {
803                Ok(hooks) => hooks,
804                Err(err) => {
805                    classify_scan_error(err, snapshot)?;
806                    continue;
807                }
808            };
809
810            let marker_row = CommitRowOp::new(
811                hooks.entity_path,
812                raw_key.as_bytes().to_vec(),
813                None,
814                Some(entry.value().as_bytes().to_vec()),
815                (hooks.commit_schema_fingerprint)(),
816            );
817
818            // Validate envelope compatibility before typed preparation so
819            // incompatible persisted formats remain compatibility-classified.
820            if let Err(err) = deserialize_row::<CborValue>(entry.value().as_bytes()) {
821                classify_scan_error(err, snapshot)?;
822                continue;
823            }
824
825            let prepared = match db.prepare_row_commit_op(&marker_row) {
826                Ok(prepared) => prepared,
827                Err(err) => {
828                    classify_scan_error(err, snapshot)?;
829                    continue;
830                }
831            };
832
833            for index_op in prepared.index_ops {
834                let Some(expected_value) = index_op.value else {
835                    continue;
836                };
837
838                let actual = index_op
839                    .store
840                    .with_borrow(|index_store| index_store.get(&index_op.key));
841                match actual {
842                    Some(actual_value) if actual_value == expected_value => {}
843                    Some(_) => {
844                        snapshot.divergent_index_entries =
845                            snapshot.divergent_index_entries.saturating_add(1);
846                    }
847                    None => {
848                        snapshot.missing_index_entries =
849                            snapshot.missing_index_entries.saturating_add(1);
850                    }
851                }
852            }
853        }
854
855        Ok::<(), InternalError>(())
856    })
857}
858
859// Run reverse (index -> data) integrity checks for one store.
860fn scan_store_reverse_integrity(
861    store_handle: StoreHandle,
862    live_keys_by_entity: &BTreeMap<EntityTag, BTreeSet<StorageKey>>,
863    snapshot: &mut IntegrityStoreSnapshot,
864) {
865    store_handle.with_index(|index_store| {
866        for (raw_index_key, raw_index_entry) in index_store.entries() {
867            snapshot.index_entries_scanned = snapshot.index_entries_scanned.saturating_add(1);
868
869            let Ok(decoded_index_key) = IndexKey::try_from_raw(&raw_index_key) else {
870                snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
871                continue;
872            };
873
874            let index_entity_tag = data_entity_tag_for_index_key(&decoded_index_key);
875
876            let Ok(indexed_primary_keys) = raw_index_entry.decode_keys() else {
877                snapshot.corrupted_index_entries =
878                    snapshot.corrupted_index_entries.saturating_add(1);
879                continue;
880            };
881
882            for primary_key in indexed_primary_keys {
883                let exists = live_keys_by_entity
884                    .get(&index_entity_tag)
885                    .is_some_and(|entity_keys| entity_keys.contains(&primary_key));
886                if !exists {
887                    snapshot.orphan_index_references =
888                        snapshot.orphan_index_references.saturating_add(1);
889                }
890            }
891        }
892    });
893}
894
895// Map scan-time errors into explicit integrity classification buckets.
896fn classify_scan_error(
897    err: InternalError,
898    snapshot: &mut IntegrityStoreSnapshot,
899) -> Result<(), InternalError> {
900    match err.class() {
901        ErrorClass::Corruption => {
902            snapshot.corrupted_data_rows = snapshot.corrupted_data_rows.saturating_add(1);
903            Ok(())
904        }
905        ErrorClass::IncompatiblePersistedFormat => {
906            snapshot.compatibility_findings = snapshot.compatibility_findings.saturating_add(1);
907            Ok(())
908        }
909        ErrorClass::Unsupported | ErrorClass::NotFound | ErrorClass::Conflict => {
910            snapshot.misuse_findings = snapshot.misuse_findings.saturating_add(1);
911            Ok(())
912        }
913        ErrorClass::Internal | ErrorClass::InvariantViolation => Err(err),
914    }
915}
916
917// Parse the data-entity identity from one decoded index key.
918const fn data_entity_tag_for_index_key(index_key: &IndexKey) -> EntityTag {
919    index_key.index_id().entity_tag
920}