Skip to main content

icydb_core/db/diagnostics/
mod.rs

1//! Module: diagnostics
2//! Responsibility: read-only storage footprint and integrity snapshots.
3//! Does not own: recovery, write-path mutation, or query planning semantics.
4//! Boundary: consumes `Db`/store read APIs and returns DTO snapshots.
5
6mod execution_trace;
7#[cfg(test)]
8mod tests;
9
10use crate::{
11    db::{
12        Db,
13        codec::deserialize_row,
14        commit::CommitRowOp,
15        data::{DataKey, StorageKey},
16        index::IndexKey,
17        registry::StoreHandle,
18    },
19    error::{ErrorClass, InternalError},
20    traits::CanisterKind,
21    types::EntityTag,
22    value::Value,
23};
24use candid::CandidType;
25use serde::{Deserialize, Serialize};
26use serde_cbor::Value as CborValue;
27use std::collections::{BTreeMap, BTreeSet};
28
29pub(crate) use execution_trace::ExecutionOptimizationCounter;
30pub(crate) use execution_trace::record_execution_optimization_hit_for_tests;
31#[cfg(test)]
32pub(crate) use execution_trace::take_execution_optimization_hits_for_tests;
33pub use execution_trace::{
34    ExecutionAccessPathVariant, ExecutionMetrics, ExecutionOptimization, ExecutionTrace,
35};
36
37///
38/// StorageReport
39/// Live storage snapshot report
40///
41
42#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
43pub struct StorageReport {
44    pub(crate) storage_data: Vec<DataStoreSnapshot>,
45    pub(crate) storage_index: Vec<IndexStoreSnapshot>,
46    pub(crate) entity_storage: Vec<EntitySnapshot>,
47    pub(crate) corrupted_keys: u64,
48    pub(crate) corrupted_entries: u64,
49}
50
51///
52/// IntegrityTotals
53/// Aggregated integrity-scan counters across all stores.
54///
55
56#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
57pub struct IntegrityTotals {
58    pub(crate) data_rows_scanned: u64,
59    pub(crate) index_entries_scanned: u64,
60    pub(crate) corrupted_data_keys: u64,
61    pub(crate) corrupted_data_rows: u64,
62    pub(crate) corrupted_index_keys: u64,
63    pub(crate) corrupted_index_entries: u64,
64    pub(crate) missing_index_entries: u64,
65    pub(crate) divergent_index_entries: u64,
66    pub(crate) orphan_index_references: u64,
67    pub(crate) compatibility_findings: u64,
68    pub(crate) misuse_findings: u64,
69}
70
71impl IntegrityTotals {
72    const fn add_store_snapshot(&mut self, store: &IntegrityStoreSnapshot) {
73        self.data_rows_scanned = self
74            .data_rows_scanned
75            .saturating_add(store.data_rows_scanned);
76        self.index_entries_scanned = self
77            .index_entries_scanned
78            .saturating_add(store.index_entries_scanned);
79        self.corrupted_data_keys = self
80            .corrupted_data_keys
81            .saturating_add(store.corrupted_data_keys);
82        self.corrupted_data_rows = self
83            .corrupted_data_rows
84            .saturating_add(store.corrupted_data_rows);
85        self.corrupted_index_keys = self
86            .corrupted_index_keys
87            .saturating_add(store.corrupted_index_keys);
88        self.corrupted_index_entries = self
89            .corrupted_index_entries
90            .saturating_add(store.corrupted_index_entries);
91        self.missing_index_entries = self
92            .missing_index_entries
93            .saturating_add(store.missing_index_entries);
94        self.divergent_index_entries = self
95            .divergent_index_entries
96            .saturating_add(store.divergent_index_entries);
97        self.orphan_index_references = self
98            .orphan_index_references
99            .saturating_add(store.orphan_index_references);
100        self.compatibility_findings = self
101            .compatibility_findings
102            .saturating_add(store.compatibility_findings);
103        self.misuse_findings = self.misuse_findings.saturating_add(store.misuse_findings);
104    }
105
106    /// Return total number of data rows scanned.
107    #[must_use]
108    pub const fn data_rows_scanned(&self) -> u64 {
109        self.data_rows_scanned
110    }
111
112    /// Return total number of index entries scanned.
113    #[must_use]
114    pub const fn index_entries_scanned(&self) -> u64 {
115        self.index_entries_scanned
116    }
117
118    /// Return total number of corrupted data-key findings.
119    #[must_use]
120    pub const fn corrupted_data_keys(&self) -> u64 {
121        self.corrupted_data_keys
122    }
123
124    /// Return total number of corrupted data-row findings.
125    #[must_use]
126    pub const fn corrupted_data_rows(&self) -> u64 {
127        self.corrupted_data_rows
128    }
129
130    /// Return total number of corrupted index-key findings.
131    #[must_use]
132    pub const fn corrupted_index_keys(&self) -> u64 {
133        self.corrupted_index_keys
134    }
135
136    /// Return total number of corrupted index-entry findings.
137    #[must_use]
138    pub const fn corrupted_index_entries(&self) -> u64 {
139        self.corrupted_index_entries
140    }
141
142    /// Return total number of missing index-entry findings.
143    #[must_use]
144    pub const fn missing_index_entries(&self) -> u64 {
145        self.missing_index_entries
146    }
147
148    /// Return total number of divergent index-entry findings.
149    #[must_use]
150    pub const fn divergent_index_entries(&self) -> u64 {
151        self.divergent_index_entries
152    }
153
154    /// Return total number of orphan index-reference findings.
155    #[must_use]
156    pub const fn orphan_index_references(&self) -> u64 {
157        self.orphan_index_references
158    }
159
160    /// Return total number of compatibility findings.
161    #[must_use]
162    pub const fn compatibility_findings(&self) -> u64 {
163        self.compatibility_findings
164    }
165
166    /// Return total number of misuse findings.
167    #[must_use]
168    pub const fn misuse_findings(&self) -> u64 {
169        self.misuse_findings
170    }
171}
172
173///
174/// IntegrityStoreSnapshot
175/// Per-store integrity findings and scan counters.
176///
177
178#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
179pub struct IntegrityStoreSnapshot {
180    pub(crate) path: String,
181    pub(crate) data_rows_scanned: u64,
182    pub(crate) index_entries_scanned: u64,
183    pub(crate) corrupted_data_keys: u64,
184    pub(crate) corrupted_data_rows: u64,
185    pub(crate) corrupted_index_keys: u64,
186    pub(crate) corrupted_index_entries: u64,
187    pub(crate) missing_index_entries: u64,
188    pub(crate) divergent_index_entries: u64,
189    pub(crate) orphan_index_references: u64,
190    pub(crate) compatibility_findings: u64,
191    pub(crate) misuse_findings: u64,
192}
193
194impl IntegrityStoreSnapshot {
195    /// Construct one empty store-level integrity snapshot.
196    #[must_use]
197    pub fn new(path: String) -> Self {
198        Self {
199            path,
200            ..Self::default()
201        }
202    }
203
204    /// Borrow store path.
205    #[must_use]
206    pub const fn path(&self) -> &str {
207        self.path.as_str()
208    }
209
210    /// Return number of scanned data rows.
211    #[must_use]
212    pub const fn data_rows_scanned(&self) -> u64 {
213        self.data_rows_scanned
214    }
215
216    /// Return number of scanned index entries.
217    #[must_use]
218    pub const fn index_entries_scanned(&self) -> u64 {
219        self.index_entries_scanned
220    }
221
222    /// Return number of corrupted data-key findings.
223    #[must_use]
224    pub const fn corrupted_data_keys(&self) -> u64 {
225        self.corrupted_data_keys
226    }
227
228    /// Return number of corrupted data-row findings.
229    #[must_use]
230    pub const fn corrupted_data_rows(&self) -> u64 {
231        self.corrupted_data_rows
232    }
233
234    /// Return number of corrupted index-key findings.
235    #[must_use]
236    pub const fn corrupted_index_keys(&self) -> u64 {
237        self.corrupted_index_keys
238    }
239
240    /// Return number of corrupted index-entry findings.
241    #[must_use]
242    pub const fn corrupted_index_entries(&self) -> u64 {
243        self.corrupted_index_entries
244    }
245
246    /// Return number of missing index-entry findings.
247    #[must_use]
248    pub const fn missing_index_entries(&self) -> u64 {
249        self.missing_index_entries
250    }
251
252    /// Return number of divergent index-entry findings.
253    #[must_use]
254    pub const fn divergent_index_entries(&self) -> u64 {
255        self.divergent_index_entries
256    }
257
258    /// Return number of orphan index-reference findings.
259    #[must_use]
260    pub const fn orphan_index_references(&self) -> u64 {
261        self.orphan_index_references
262    }
263
264    /// Return number of compatibility findings.
265    #[must_use]
266    pub const fn compatibility_findings(&self) -> u64 {
267        self.compatibility_findings
268    }
269
270    /// Return number of misuse findings.
271    #[must_use]
272    pub const fn misuse_findings(&self) -> u64 {
273        self.misuse_findings
274    }
275}
276
277///
278/// IntegrityReport
279/// Full integrity-scan output across all registered stores.
280///
281
282#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
283pub struct IntegrityReport {
284    pub(crate) stores: Vec<IntegrityStoreSnapshot>,
285    pub(crate) totals: IntegrityTotals,
286}
287
288impl IntegrityReport {
289    /// Construct one integrity report payload.
290    #[must_use]
291    pub const fn new(stores: Vec<IntegrityStoreSnapshot>, totals: IntegrityTotals) -> Self {
292        Self { stores, totals }
293    }
294
295    /// Borrow per-store integrity snapshots.
296    #[must_use]
297    pub const fn stores(&self) -> &[IntegrityStoreSnapshot] {
298        self.stores.as_slice()
299    }
300
301    /// Borrow aggregated integrity totals.
302    #[must_use]
303    pub const fn totals(&self) -> &IntegrityTotals {
304        &self.totals
305    }
306}
307
308impl StorageReport {
309    /// Construct one storage report payload.
310    #[must_use]
311    pub const fn new(
312        storage_data: Vec<DataStoreSnapshot>,
313        storage_index: Vec<IndexStoreSnapshot>,
314        entity_storage: Vec<EntitySnapshot>,
315        corrupted_keys: u64,
316        corrupted_entries: u64,
317    ) -> Self {
318        Self {
319            storage_data,
320            storage_index,
321            entity_storage,
322            corrupted_keys,
323            corrupted_entries,
324        }
325    }
326
327    /// Borrow data-store snapshots.
328    #[must_use]
329    pub const fn storage_data(&self) -> &[DataStoreSnapshot] {
330        self.storage_data.as_slice()
331    }
332
333    /// Borrow index-store snapshots.
334    #[must_use]
335    pub const fn storage_index(&self) -> &[IndexStoreSnapshot] {
336        self.storage_index.as_slice()
337    }
338
339    /// Borrow entity-level storage snapshots.
340    #[must_use]
341    pub const fn entity_storage(&self) -> &[EntitySnapshot] {
342        self.entity_storage.as_slice()
343    }
344
345    /// Return count of corrupted decoded data keys.
346    #[must_use]
347    pub const fn corrupted_keys(&self) -> u64 {
348        self.corrupted_keys
349    }
350
351    /// Return count of corrupted index entries.
352    #[must_use]
353    pub const fn corrupted_entries(&self) -> u64 {
354        self.corrupted_entries
355    }
356}
357
358///
359/// DataStoreSnapshot
360/// Store-level snapshot metrics.
361///
362
363#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
364pub struct DataStoreSnapshot {
365    pub(crate) path: String,
366    pub(crate) entries: u64,
367    pub(crate) memory_bytes: u64,
368}
369
370impl DataStoreSnapshot {
371    /// Construct one data-store snapshot row.
372    #[must_use]
373    pub const fn new(path: String, entries: u64, memory_bytes: u64) -> Self {
374        Self {
375            path,
376            entries,
377            memory_bytes,
378        }
379    }
380
381    /// Borrow store path.
382    #[must_use]
383    pub const fn path(&self) -> &str {
384        self.path.as_str()
385    }
386
387    /// Return row count.
388    #[must_use]
389    pub const fn entries(&self) -> u64 {
390        self.entries
391    }
392
393    /// Return memory usage in bytes.
394    #[must_use]
395    pub const fn memory_bytes(&self) -> u64 {
396        self.memory_bytes
397    }
398}
399
400///
401/// IndexStoreSnapshot
402/// Index-store snapshot metrics
403///
404
405#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
406pub struct IndexStoreSnapshot {
407    pub(crate) path: String,
408    pub(crate) entries: u64,
409    pub(crate) user_entries: u64,
410    pub(crate) system_entries: u64,
411    pub(crate) memory_bytes: u64,
412}
413
414impl IndexStoreSnapshot {
415    /// Construct one index-store snapshot row.
416    #[must_use]
417    pub const fn new(
418        path: String,
419        entries: u64,
420        user_entries: u64,
421        system_entries: u64,
422        memory_bytes: u64,
423    ) -> Self {
424        Self {
425            path,
426            entries,
427            user_entries,
428            system_entries,
429            memory_bytes,
430        }
431    }
432
433    /// Borrow store path.
434    #[must_use]
435    pub const fn path(&self) -> &str {
436        self.path.as_str()
437    }
438
439    /// Return total entry count.
440    #[must_use]
441    pub const fn entries(&self) -> u64 {
442        self.entries
443    }
444
445    /// Return user-namespace entry count.
446    #[must_use]
447    pub const fn user_entries(&self) -> u64 {
448        self.user_entries
449    }
450
451    /// Return system-namespace entry count.
452    #[must_use]
453    pub const fn system_entries(&self) -> u64 {
454        self.system_entries
455    }
456
457    /// Return memory usage in bytes.
458    #[must_use]
459    pub const fn memory_bytes(&self) -> u64 {
460        self.memory_bytes
461    }
462}
463
464///
465/// EntitySnapshot
466/// Per-entity storage breakdown across stores
467///
468
469#[derive(CandidType, Clone, Debug, Default, Deserialize, Serialize)]
470pub struct EntitySnapshot {
471    /// Store path (e.g., icydb_schema_tests::schema::TestDataStore)
472    pub(crate) store: String,
473
474    /// Entity path (e.g., icydb_schema_tests::canister::db::Index)
475    pub(crate) path: String,
476
477    /// Number of rows for this entity in the store
478    pub(crate) entries: u64,
479
480    /// Approximate bytes used (key + value)
481    pub(crate) memory_bytes: u64,
482
483    /// Minimum primary key for this entity (entity-local ordering)
484    pub(crate) min_key: Option<Value>,
485
486    /// Maximum primary key for this entity (entity-local ordering)
487    pub(crate) max_key: Option<Value>,
488}
489
490impl EntitySnapshot {
491    /// Construct one entity-storage snapshot row.
492    #[must_use]
493    pub const fn new(
494        store: String,
495        path: String,
496        entries: u64,
497        memory_bytes: u64,
498        min_key: Option<Value>,
499        max_key: Option<Value>,
500    ) -> Self {
501        Self {
502            store,
503            path,
504            entries,
505            memory_bytes,
506            min_key,
507            max_key,
508        }
509    }
510
511    /// Borrow store path.
512    #[must_use]
513    pub const fn store(&self) -> &str {
514        self.store.as_str()
515    }
516
517    /// Borrow entity path.
518    #[must_use]
519    pub const fn path(&self) -> &str {
520        self.path.as_str()
521    }
522
523    /// Return row count.
524    #[must_use]
525    pub const fn entries(&self) -> u64 {
526        self.entries
527    }
528
529    /// Return memory usage in bytes.
530    #[must_use]
531    pub const fn memory_bytes(&self) -> u64 {
532        self.memory_bytes
533    }
534
535    /// Borrow optional minimum primary key.
536    #[must_use]
537    pub const fn min_key(&self) -> Option<&Value> {
538        self.min_key.as_ref()
539    }
540
541    /// Borrow optional maximum primary key.
542    #[must_use]
543    pub const fn max_key(&self) -> Option<&Value> {
544        self.max_key.as_ref()
545    }
546}
547
548///
549/// EntityStats
550/// Internal struct for building per-entity stats before snapshotting.
551///
552
553#[derive(Default)]
554struct EntityStats {
555    entries: u64,
556    memory_bytes: u64,
557    min_key: Option<StorageKey>,
558    max_key: Option<StorageKey>,
559}
560
561impl EntityStats {
562    // Accumulate per-entity counters and keep min/max over entity-local storage keys.
563    fn update(&mut self, dk: &DataKey, value_len: u64) {
564        self.entries = self.entries.saturating_add(1);
565        self.memory_bytes = self
566            .memory_bytes
567            .saturating_add(DataKey::entry_size_bytes(value_len));
568
569        let k = dk.storage_key();
570
571        match &mut self.min_key {
572            Some(min) if k < *min => *min = k,
573            None => self.min_key = Some(k),
574            _ => {}
575        }
576
577        match &mut self.max_key {
578            Some(max) if k > *max => *max = k,
579            None => self.max_key = Some(k),
580            _ => {}
581        }
582    }
583}
584
585/// Build one deterministic storage snapshot with per-entity rollups.
586///
587/// This path is read-only and fail-closed on decode/validation errors by counting
588/// corrupted keys/entries instead of panicking.
589pub(crate) fn storage_report<C: CanisterKind>(
590    db: &Db<C>,
591    name_to_path: &[(&'static str, &'static str)],
592) -> Result<StorageReport, InternalError> {
593    db.ensure_recovered_state()?;
594    // Build name→path map once, reuse across stores.
595    let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
596    let runtime_name_to_tag: BTreeMap<&str, EntityTag> =
597        db.runtime_entity_name_tag_pairs().into_iter().collect();
598    // Build one deterministic tag→path alias map to preserve report naming even
599    // after persisted keys move from string names to tag identities.
600    let mut tag_name_map = BTreeMap::<EntityTag, &str>::new();
601    for (entity_name, entity_tag) in &runtime_name_to_tag {
602        let path_name = name_map.get(entity_name).copied().unwrap_or(*entity_name);
603        tag_name_map.entry(*entity_tag).or_insert(path_name);
604    }
605    let mut data = Vec::new();
606    let mut index = Vec::new();
607    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
608    let mut corrupted_keys = 0u64;
609    let mut corrupted_entries = 0u64;
610
611    db.with_store_registry(|reg| {
612        // Keep diagnostics snapshots deterministic by traversing stores in path order.
613        let mut stores = reg.iter().collect::<Vec<_>>();
614        stores.sort_by_key(|(path, _)| *path);
615
616        for (path, store_handle) in stores {
617            // Phase 1: collect data-store snapshots and per-entity stats.
618            store_handle.with_data(|store| {
619                data.push(DataStoreSnapshot::new(
620                    path.to_string(),
621                    store.len(),
622                    store.memory_bytes(),
623                ));
624
625                // Track per-entity counts, memory, and min/max Keys (not DataKeys)
626                let mut by_entity: BTreeMap<EntityTag, EntityStats> = BTreeMap::new();
627
628                for entry in store.iter() {
629                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
630                        corrupted_keys = corrupted_keys.saturating_add(1);
631                        continue;
632                    };
633
634                    let value_len = entry.value().len() as u64;
635
636                    by_entity
637                        .entry(dk.entity_tag())
638                        .or_default()
639                        .update(&dk, value_len);
640                }
641
642                for (entity_tag, stats) in by_entity {
643                    let path_name = tag_name_map
644                        .get(&entity_tag)
645                        .copied()
646                        .map(str::to_string)
647                        .or_else(|| {
648                            db.runtime_hook_for_entity_tag(entity_tag)
649                                .ok()
650                                .map(|hooks| {
651                                    name_map
652                                        .get(hooks.entity_name)
653                                        .copied()
654                                        .unwrap_or(hooks.entity_name)
655                                        .to_string()
656                                })
657                        })
658                        .unwrap_or_else(|| format!("#{}", entity_tag.value()));
659                    entity_storage.push(EntitySnapshot::new(
660                        path.to_string(),
661                        path_name,
662                        stats.entries,
663                        stats.memory_bytes,
664                        stats.min_key.map(|key| key.as_value()),
665                        stats.max_key.map(|key| key.as_value()),
666                    ));
667                }
668            });
669
670            // Phase 2: collect index-store snapshots and integrity counters.
671            store_handle.with_index(|store| {
672                let mut user_entries = 0u64;
673                let mut system_entries = 0u64;
674
675                for (key, value) in store.entries() {
676                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
677                        corrupted_entries = corrupted_entries.saturating_add(1);
678                        continue;
679                    };
680
681                    if decoded_key.uses_system_namespace() {
682                        system_entries = system_entries.saturating_add(1);
683                    } else {
684                        user_entries = user_entries.saturating_add(1);
685                    }
686
687                    if value.validate().is_err() {
688                        corrupted_entries = corrupted_entries.saturating_add(1);
689                    }
690                }
691
692                index.push(IndexStoreSnapshot::new(
693                    path.to_string(),
694                    store.len(),
695                    user_entries,
696                    system_entries,
697                    store.memory_bytes(),
698                ));
699            });
700        }
701    });
702
703    // Phase 3: enforce deterministic entity snapshot emission order.
704    // This remains stable even if outer store traversal internals change.
705    entity_storage
706        .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
707
708    Ok(StorageReport::new(
709        data,
710        index,
711        entity_storage,
712        corrupted_keys,
713        corrupted_entries,
714    ))
715}
716
717/// Build one deterministic integrity scan over all registered stores.
718///
719/// This scan is read-only and classifies findings as:
720/// - corruption: malformed persisted bytes or inconsistent structural links
721/// - compatibility: persisted payloads outside decode compatibility windows
722/// - misuse: unsupported runtime wiring (for example missing entity hooks)
723pub(crate) fn integrity_report<C: CanisterKind>(
724    db: &Db<C>,
725) -> Result<IntegrityReport, InternalError> {
726    db.ensure_recovered_state()?;
727
728    integrity_report_after_recovery(db)
729}
730
731/// Build one deterministic integrity scan after recovery has already completed.
732///
733/// Callers running inside recovery flow should use this variant to avoid
734/// recursive recovery gating.
735pub(in crate::db) fn integrity_report_after_recovery<C: CanisterKind>(
736    db: &Db<C>,
737) -> Result<IntegrityReport, InternalError> {
738    build_integrity_report(db)
739}
740
741fn build_integrity_report<C: CanisterKind>(db: &Db<C>) -> Result<IntegrityReport, InternalError> {
742    let mut stores = Vec::new();
743    let mut totals = IntegrityTotals::default();
744    let global_live_keys_by_entity = collect_global_live_keys_by_entity(db)?;
745
746    db.with_store_registry(|reg| {
747        // Keep deterministic output order across registry traversal implementations.
748        let mut store_entries = reg.iter().collect::<Vec<_>>();
749        store_entries.sort_by_key(|(path, _)| *path);
750
751        for (path, store_handle) in store_entries {
752            let mut snapshot = IntegrityStoreSnapshot::new(path.to_string());
753            scan_store_forward_integrity(db, store_handle, &mut snapshot)?;
754            scan_store_reverse_integrity(store_handle, &global_live_keys_by_entity, &mut snapshot);
755
756            totals.add_store_snapshot(&snapshot);
757            stores.push(snapshot);
758        }
759
760        Ok::<(), InternalError>(())
761    })?;
762
763    Ok(IntegrityReport::new(stores, totals))
764}
765
766// Build one global map of live data keys grouped by entity across all stores.
767fn collect_global_live_keys_by_entity<C: CanisterKind>(
768    db: &Db<C>,
769) -> Result<BTreeMap<EntityTag, BTreeSet<StorageKey>>, InternalError> {
770    let mut keys = BTreeMap::<EntityTag, BTreeSet<StorageKey>>::new();
771
772    db.with_store_registry(|reg| {
773        for (_, store_handle) in reg.iter() {
774            store_handle.with_data(|data_store| {
775                for entry in data_store.iter() {
776                    if let Ok(data_key) = DataKey::try_from_raw(entry.key()) {
777                        keys.entry(data_key.entity_tag())
778                            .or_default()
779                            .insert(data_key.storage_key());
780                    }
781                }
782            });
783        }
784
785        Ok::<(), InternalError>(())
786    })?;
787
788    Ok(keys)
789}
790
791// Run forward (data -> index) integrity checks for one store.
792fn scan_store_forward_integrity<C: CanisterKind>(
793    db: &Db<C>,
794    store_handle: StoreHandle,
795    snapshot: &mut IntegrityStoreSnapshot,
796) -> Result<(), InternalError> {
797    store_handle.with_data(|data_store| {
798        for entry in data_store.iter() {
799            snapshot.data_rows_scanned = snapshot.data_rows_scanned.saturating_add(1);
800
801            let raw_key = *entry.key();
802
803            let Ok(data_key) = DataKey::try_from_raw(&raw_key) else {
804                snapshot.corrupted_data_keys = snapshot.corrupted_data_keys.saturating_add(1);
805                continue;
806            };
807
808            let hooks = match db.runtime_hook_for_entity_tag(data_key.entity_tag()) {
809                Ok(hooks) => hooks,
810                Err(err) => {
811                    classify_scan_error(err, snapshot)?;
812                    continue;
813                }
814            };
815
816            let marker_row = CommitRowOp::new(
817                hooks.entity_path,
818                raw_key.as_bytes().to_vec(),
819                None,
820                Some(entry.value().as_bytes().to_vec()),
821                (hooks.commit_schema_fingerprint)(),
822            );
823
824            // Validate envelope compatibility before typed preparation so
825            // incompatible persisted formats remain compatibility-classified.
826            if let Err(err) = deserialize_row::<CborValue>(entry.value().as_bytes()) {
827                classify_scan_error(err, snapshot)?;
828                continue;
829            }
830
831            let prepared = match db.prepare_row_commit_op(&marker_row) {
832                Ok(prepared) => prepared,
833                Err(err) => {
834                    classify_scan_error(err, snapshot)?;
835                    continue;
836                }
837            };
838
839            for index_op in prepared.index_ops {
840                let Some(expected_value) = index_op.value else {
841                    continue;
842                };
843
844                let actual = index_op
845                    .store
846                    .with_borrow(|index_store| index_store.get(&index_op.key));
847                match actual {
848                    Some(actual_value) if actual_value == expected_value => {}
849                    Some(_) => {
850                        snapshot.divergent_index_entries =
851                            snapshot.divergent_index_entries.saturating_add(1);
852                    }
853                    None => {
854                        snapshot.missing_index_entries =
855                            snapshot.missing_index_entries.saturating_add(1);
856                    }
857                }
858            }
859        }
860
861        Ok::<(), InternalError>(())
862    })
863}
864
865// Run reverse (index -> data) integrity checks for one store.
866fn scan_store_reverse_integrity(
867    store_handle: StoreHandle,
868    live_keys_by_entity: &BTreeMap<EntityTag, BTreeSet<StorageKey>>,
869    snapshot: &mut IntegrityStoreSnapshot,
870) {
871    store_handle.with_index(|index_store| {
872        for (raw_index_key, raw_index_entry) in index_store.entries() {
873            snapshot.index_entries_scanned = snapshot.index_entries_scanned.saturating_add(1);
874
875            let Ok(decoded_index_key) = IndexKey::try_from_raw(&raw_index_key) else {
876                snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
877                continue;
878            };
879
880            let index_entity_tag = data_entity_tag_for_index_key(&decoded_index_key);
881
882            let Ok(indexed_primary_keys) = raw_index_entry.decode_keys() else {
883                snapshot.corrupted_index_entries =
884                    snapshot.corrupted_index_entries.saturating_add(1);
885                continue;
886            };
887
888            for primary_key in indexed_primary_keys {
889                let exists = live_keys_by_entity
890                    .get(&index_entity_tag)
891                    .is_some_and(|entity_keys| entity_keys.contains(&primary_key));
892                if !exists {
893                    snapshot.orphan_index_references =
894                        snapshot.orphan_index_references.saturating_add(1);
895                }
896            }
897        }
898    });
899}
900
901// Map scan-time errors into explicit integrity classification buckets.
902fn classify_scan_error(
903    err: InternalError,
904    snapshot: &mut IntegrityStoreSnapshot,
905) -> Result<(), InternalError> {
906    match err.class() {
907        ErrorClass::Corruption => {
908            snapshot.corrupted_data_rows = snapshot.corrupted_data_rows.saturating_add(1);
909            Ok(())
910        }
911        ErrorClass::IncompatiblePersistedFormat => {
912            snapshot.compatibility_findings = snapshot.compatibility_findings.saturating_add(1);
913            Ok(())
914        }
915        ErrorClass::Unsupported | ErrorClass::NotFound | ErrorClass::Conflict => {
916            snapshot.misuse_findings = snapshot.misuse_findings.saturating_add(1);
917            Ok(())
918        }
919        ErrorClass::Internal | ErrorClass::InvariantViolation => Err(err),
920    }
921}
922
923// Parse the data-entity identity from one decoded index key.
924const fn data_entity_tag_for_index_key(index_key: &IndexKey) -> EntityTag {
925    index_key.index_id().entity_tag
926}