Skip to main content

icydb_core/db/diagnostics/
mod.rs

1//! Module: diagnostics
2//! Responsibility: read-only storage footprint and integrity snapshots.
3//! Does not own: recovery, write-path mutation, or query planning semantics.
4//! Boundary: consumes `Db`/store read APIs and returns DTO snapshots.
5
6mod execution_trace;
7#[cfg(test)]
8mod tests;
9
10use crate::{
11    db::{
12        Db, EntityRuntimeHooks,
13        commit::CommitRowOp,
14        data::{DataKey, StorageKey, decode_structural_row_cbor},
15        index::{IndexKey, IndexState},
16        registry::StoreHandle,
17    },
18    error::{ErrorClass, InternalError},
19    traits::CanisterKind,
20    types::EntityTag,
21};
22use candid::CandidType;
23use serde::Deserialize;
24use std::collections::{BTreeMap, BTreeSet};
25
26pub use execution_trace::{
27    ExecutionAccessPathVariant, ExecutionMetrics, ExecutionOptimization, ExecutionTrace,
28};
29
30#[cfg_attr(doc, doc = "StorageReport\n\nLive storage snapshot payload.")]
31#[derive(CandidType, Clone, Debug, Default, Deserialize)]
32pub struct StorageReport {
33    pub(crate) storage_data: Vec<DataStoreSnapshot>,
34    pub(crate) storage_index: Vec<IndexStoreSnapshot>,
35    pub(crate) entity_storage: Vec<EntitySnapshot>,
36    pub(crate) corrupted_keys: u64,
37    pub(crate) corrupted_entries: u64,
38}
39
40#[cfg_attr(
41    doc,
42    doc = "IntegrityTotals\n\nAggregated integrity-scan counters across all stores."
43)]
44#[derive(CandidType, Clone, Debug, Default, Deserialize)]
45pub struct IntegrityTotals {
46    pub(crate) data_rows_scanned: u64,
47    pub(crate) index_entries_scanned: u64,
48    pub(crate) corrupted_data_keys: u64,
49    pub(crate) corrupted_data_rows: u64,
50    pub(crate) corrupted_index_keys: u64,
51    pub(crate) corrupted_index_entries: u64,
52    pub(crate) missing_index_entries: u64,
53    pub(crate) divergent_index_entries: u64,
54    pub(crate) orphan_index_references: u64,
55    pub(crate) compatibility_findings: u64,
56    pub(crate) misuse_findings: u64,
57}
58
59impl IntegrityTotals {
60    const fn add_store_snapshot(&mut self, store: &IntegrityStoreSnapshot) {
61        self.data_rows_scanned = self
62            .data_rows_scanned
63            .saturating_add(store.data_rows_scanned);
64        self.index_entries_scanned = self
65            .index_entries_scanned
66            .saturating_add(store.index_entries_scanned);
67        self.corrupted_data_keys = self
68            .corrupted_data_keys
69            .saturating_add(store.corrupted_data_keys);
70        self.corrupted_data_rows = self
71            .corrupted_data_rows
72            .saturating_add(store.corrupted_data_rows);
73        self.corrupted_index_keys = self
74            .corrupted_index_keys
75            .saturating_add(store.corrupted_index_keys);
76        self.corrupted_index_entries = self
77            .corrupted_index_entries
78            .saturating_add(store.corrupted_index_entries);
79        self.missing_index_entries = self
80            .missing_index_entries
81            .saturating_add(store.missing_index_entries);
82        self.divergent_index_entries = self
83            .divergent_index_entries
84            .saturating_add(store.divergent_index_entries);
85        self.orphan_index_references = self
86            .orphan_index_references
87            .saturating_add(store.orphan_index_references);
88        self.compatibility_findings = self
89            .compatibility_findings
90            .saturating_add(store.compatibility_findings);
91        self.misuse_findings = self.misuse_findings.saturating_add(store.misuse_findings);
92    }
93
94    /// Return total number of data rows scanned.
95    #[must_use]
96    pub const fn data_rows_scanned(&self) -> u64 {
97        self.data_rows_scanned
98    }
99
100    /// Return total number of index entries scanned.
101    #[must_use]
102    pub const fn index_entries_scanned(&self) -> u64 {
103        self.index_entries_scanned
104    }
105
106    /// Return total number of corrupted data-key findings.
107    #[must_use]
108    pub const fn corrupted_data_keys(&self) -> u64 {
109        self.corrupted_data_keys
110    }
111
112    /// Return total number of corrupted data-row findings.
113    #[must_use]
114    pub const fn corrupted_data_rows(&self) -> u64 {
115        self.corrupted_data_rows
116    }
117
118    /// Return total number of corrupted index-key findings.
119    #[must_use]
120    pub const fn corrupted_index_keys(&self) -> u64 {
121        self.corrupted_index_keys
122    }
123
124    /// Return total number of corrupted index-entry findings.
125    #[must_use]
126    pub const fn corrupted_index_entries(&self) -> u64 {
127        self.corrupted_index_entries
128    }
129
130    /// Return total number of missing index-entry findings.
131    #[must_use]
132    pub const fn missing_index_entries(&self) -> u64 {
133        self.missing_index_entries
134    }
135
136    /// Return total number of divergent index-entry findings.
137    #[must_use]
138    pub const fn divergent_index_entries(&self) -> u64 {
139        self.divergent_index_entries
140    }
141
142    /// Return total number of orphan index-reference findings.
143    #[must_use]
144    pub const fn orphan_index_references(&self) -> u64 {
145        self.orphan_index_references
146    }
147
148    /// Return total number of compatibility findings.
149    #[must_use]
150    pub const fn compatibility_findings(&self) -> u64 {
151        self.compatibility_findings
152    }
153
154    /// Return total number of misuse findings.
155    #[must_use]
156    pub const fn misuse_findings(&self) -> u64 {
157        self.misuse_findings
158    }
159}
160
161#[cfg_attr(
162    doc,
163    doc = "IntegrityStoreSnapshot\n\nPer-store integrity findings and scan counters."
164)]
165#[derive(CandidType, Clone, Debug, Default, Deserialize)]
166pub struct IntegrityStoreSnapshot {
167    pub(crate) path: String,
168    pub(crate) data_rows_scanned: u64,
169    pub(crate) index_entries_scanned: u64,
170    pub(crate) corrupted_data_keys: u64,
171    pub(crate) corrupted_data_rows: u64,
172    pub(crate) corrupted_index_keys: u64,
173    pub(crate) corrupted_index_entries: u64,
174    pub(crate) missing_index_entries: u64,
175    pub(crate) divergent_index_entries: u64,
176    pub(crate) orphan_index_references: u64,
177    pub(crate) compatibility_findings: u64,
178    pub(crate) misuse_findings: u64,
179}
180
181impl IntegrityStoreSnapshot {
182    /// Construct one empty store-level integrity snapshot.
183    #[must_use]
184    pub fn new(path: String) -> Self {
185        Self {
186            path,
187            ..Self::default()
188        }
189    }
190
191    /// Borrow store path.
192    #[must_use]
193    pub const fn path(&self) -> &str {
194        self.path.as_str()
195    }
196
197    /// Return number of scanned data rows.
198    #[must_use]
199    pub const fn data_rows_scanned(&self) -> u64 {
200        self.data_rows_scanned
201    }
202
203    /// Return number of scanned index entries.
204    #[must_use]
205    pub const fn index_entries_scanned(&self) -> u64 {
206        self.index_entries_scanned
207    }
208
209    /// Return number of corrupted data-key findings.
210    #[must_use]
211    pub const fn corrupted_data_keys(&self) -> u64 {
212        self.corrupted_data_keys
213    }
214
215    /// Return number of corrupted data-row findings.
216    #[must_use]
217    pub const fn corrupted_data_rows(&self) -> u64 {
218        self.corrupted_data_rows
219    }
220
221    /// Return number of corrupted index-key findings.
222    #[must_use]
223    pub const fn corrupted_index_keys(&self) -> u64 {
224        self.corrupted_index_keys
225    }
226
227    /// Return number of corrupted index-entry findings.
228    #[must_use]
229    pub const fn corrupted_index_entries(&self) -> u64 {
230        self.corrupted_index_entries
231    }
232
233    /// Return number of missing index-entry findings.
234    #[must_use]
235    pub const fn missing_index_entries(&self) -> u64 {
236        self.missing_index_entries
237    }
238
239    /// Return number of divergent index-entry findings.
240    #[must_use]
241    pub const fn divergent_index_entries(&self) -> u64 {
242        self.divergent_index_entries
243    }
244
245    /// Return number of orphan index-reference findings.
246    #[must_use]
247    pub const fn orphan_index_references(&self) -> u64 {
248        self.orphan_index_references
249    }
250
251    /// Return number of compatibility findings.
252    #[must_use]
253    pub const fn compatibility_findings(&self) -> u64 {
254        self.compatibility_findings
255    }
256
257    /// Return number of misuse findings.
258    #[must_use]
259    pub const fn misuse_findings(&self) -> u64 {
260        self.misuse_findings
261    }
262}
263
264#[cfg_attr(
265    doc,
266    doc = "IntegrityReport\n\nFull integrity-scan output across all registered stores."
267)]
268#[derive(CandidType, Clone, Debug, Default, Deserialize)]
269pub struct IntegrityReport {
270    pub(crate) stores: Vec<IntegrityStoreSnapshot>,
271    pub(crate) totals: IntegrityTotals,
272}
273
274impl IntegrityReport {
275    /// Construct one integrity report payload.
276    #[must_use]
277    pub const fn new(stores: Vec<IntegrityStoreSnapshot>, totals: IntegrityTotals) -> Self {
278        Self { stores, totals }
279    }
280
281    /// Borrow per-store integrity snapshots.
282    #[must_use]
283    pub const fn stores(&self) -> &[IntegrityStoreSnapshot] {
284        self.stores.as_slice()
285    }
286
287    /// Borrow aggregated integrity totals.
288    #[must_use]
289    pub const fn totals(&self) -> &IntegrityTotals {
290        &self.totals
291    }
292}
293
294impl StorageReport {
295    /// Construct one storage report payload.
296    #[must_use]
297    pub const fn new(
298        storage_data: Vec<DataStoreSnapshot>,
299        storage_index: Vec<IndexStoreSnapshot>,
300        entity_storage: Vec<EntitySnapshot>,
301        corrupted_keys: u64,
302        corrupted_entries: u64,
303    ) -> Self {
304        Self {
305            storage_data,
306            storage_index,
307            entity_storage,
308            corrupted_keys,
309            corrupted_entries,
310        }
311    }
312
313    /// Borrow data-store snapshots.
314    #[must_use]
315    pub const fn storage_data(&self) -> &[DataStoreSnapshot] {
316        self.storage_data.as_slice()
317    }
318
319    /// Borrow index-store snapshots.
320    #[must_use]
321    pub const fn storage_index(&self) -> &[IndexStoreSnapshot] {
322        self.storage_index.as_slice()
323    }
324
325    /// Borrow entity-level storage snapshots.
326    #[must_use]
327    pub const fn entity_storage(&self) -> &[EntitySnapshot] {
328        self.entity_storage.as_slice()
329    }
330
331    /// Return count of corrupted decoded data keys.
332    #[must_use]
333    pub const fn corrupted_keys(&self) -> u64 {
334        self.corrupted_keys
335    }
336
337    /// Return count of corrupted index entries.
338    #[must_use]
339    pub const fn corrupted_entries(&self) -> u64 {
340        self.corrupted_entries
341    }
342}
343
344#[cfg_attr(doc, doc = "DataStoreSnapshot\n\nData-store snapshot row.")]
345#[derive(CandidType, Clone, Debug, Default, Deserialize)]
346pub struct DataStoreSnapshot {
347    pub(crate) path: String,
348    pub(crate) entries: u64,
349    pub(crate) memory_bytes: u64,
350}
351
352impl DataStoreSnapshot {
353    /// Construct one data-store snapshot row.
354    #[must_use]
355    pub const fn new(path: String, entries: u64, memory_bytes: u64) -> Self {
356        Self {
357            path,
358            entries,
359            memory_bytes,
360        }
361    }
362
363    /// Borrow store path.
364    #[must_use]
365    pub const fn path(&self) -> &str {
366        self.path.as_str()
367    }
368
369    /// Return row count.
370    #[must_use]
371    pub const fn entries(&self) -> u64 {
372        self.entries
373    }
374
375    /// Return memory usage in bytes.
376    #[must_use]
377    pub const fn memory_bytes(&self) -> u64 {
378        self.memory_bytes
379    }
380}
381
382#[cfg_attr(doc, doc = "IndexStoreSnapshot\n\nIndex-store snapshot row.")]
383#[derive(CandidType, Clone, Debug, Default, Deserialize)]
384pub struct IndexStoreSnapshot {
385    pub(crate) path: String,
386    pub(crate) entries: u64,
387    pub(crate) user_entries: u64,
388    pub(crate) system_entries: u64,
389    pub(crate) memory_bytes: u64,
390    pub(crate) state: IndexState,
391}
392
393impl IndexStoreSnapshot {
394    /// Construct one index-store snapshot row.
395    #[must_use]
396    pub const fn new(
397        path: String,
398        entries: u64,
399        user_entries: u64,
400        system_entries: u64,
401        memory_bytes: u64,
402        state: IndexState,
403    ) -> Self {
404        Self {
405            path,
406            entries,
407            user_entries,
408            system_entries,
409            memory_bytes,
410            state,
411        }
412    }
413
414    /// Borrow store path.
415    #[must_use]
416    pub const fn path(&self) -> &str {
417        self.path.as_str()
418    }
419
420    /// Return total entry count.
421    #[must_use]
422    pub const fn entries(&self) -> u64 {
423        self.entries
424    }
425
426    /// Return user-namespace entry count.
427    #[must_use]
428    pub const fn user_entries(&self) -> u64 {
429        self.user_entries
430    }
431
432    /// Return system-namespace entry count.
433    #[must_use]
434    pub const fn system_entries(&self) -> u64 {
435        self.system_entries
436    }
437
438    /// Return memory usage in bytes.
439    #[must_use]
440    pub const fn memory_bytes(&self) -> u64 {
441        self.memory_bytes
442    }
443
444    /// Return the current explicit runtime lifecycle state for this index
445    /// store snapshot.
446    #[must_use]
447    pub const fn state(&self) -> IndexState {
448        self.state
449    }
450}
451
452#[cfg_attr(doc, doc = "EntitySnapshot\n\nPer-entity storage snapshot row.")]
453#[derive(CandidType, Clone, Debug, Default, Deserialize)]
454pub struct EntitySnapshot {
455    pub(crate) store: String,
456
457    pub(crate) path: String,
458
459    pub(crate) entries: u64,
460
461    pub(crate) memory_bytes: u64,
462}
463
464impl EntitySnapshot {
465    /// Construct one entity-storage snapshot row.
466    #[must_use]
467    pub const fn new(store: String, path: String, entries: u64, memory_bytes: u64) -> Self {
468        Self {
469            store,
470            path,
471            entries,
472            memory_bytes,
473        }
474    }
475
476    /// Borrow store path.
477    #[must_use]
478    pub const fn store(&self) -> &str {
479        self.store.as_str()
480    }
481
482    /// Borrow entity path.
483    #[must_use]
484    pub const fn path(&self) -> &str {
485        self.path.as_str()
486    }
487
488    /// Return row count.
489    #[must_use]
490    pub const fn entries(&self) -> u64 {
491        self.entries
492    }
493
494    /// Return memory usage in bytes.
495    #[must_use]
496    pub const fn memory_bytes(&self) -> u64 {
497        self.memory_bytes
498    }
499}
500
501#[cfg_attr(
502    doc,
503    doc = "EntityStats\n\nInternal struct for building per-entity stats before snapshotting."
504)]
505#[derive(Default)]
506struct EntityStats {
507    entries: u64,
508    memory_bytes: u64,
509}
510
511impl EntityStats {
512    // Accumulate per-entity entry count and byte footprint for snapshot output.
513    const fn update(&mut self, value_len: u64) {
514        self.entries = self.entries.saturating_add(1);
515        self.memory_bytes = self
516            .memory_bytes
517            .saturating_add(DataKey::entry_size_bytes(value_len));
518    }
519}
520
521// Update one small per-store entity-stat accumulator without pulling ordered
522// map machinery into the default snapshot path. Final output ordering is still
523// enforced later on the emitted snapshot rows.
524fn update_default_entity_stats(
525    entity_stats: &mut Vec<(EntityTag, EntityStats)>,
526    entity_tag: EntityTag,
527    value_len: u64,
528) {
529    if let Some((_, stats)) = entity_stats
530        .iter_mut()
531        .find(|(existing_tag, _)| *existing_tag == entity_tag)
532    {
533        stats.update(value_len);
534        return;
535    }
536
537    let mut stats = EntityStats::default();
538    stats.update(value_len);
539    entity_stats.push((entity_tag, stats));
540}
541
542fn storage_report_name_for_hook<'a, C: CanisterKind>(
543    name_map: &BTreeMap<&'static str, &'a str>,
544    hooks: &EntityRuntimeHooks<C>,
545) -> &'a str {
546    name_map
547        .get(hooks.entity_path)
548        .copied()
549        .or_else(|| name_map.get(hooks.model.name()).copied())
550        .unwrap_or(hooks.entity_path)
551}
552
553// Resolve one default entity path label for storage snapshots without pulling
554// alias/path remapping support into the caller.
555fn storage_report_default_name_for_entity_tag<C: CanisterKind>(
556    db: &Db<C>,
557    entity_tag: EntityTag,
558) -> String {
559    db.runtime_hook_for_entity_tag(entity_tag).ok().map_or_else(
560        || format!("#{}", entity_tag.value()),
561        |hooks| hooks.entity_path.to_string(),
562    )
563}
564
565#[cfg_attr(
566    doc,
567    doc = "Build one deterministic storage snapshot with default entity-path names.\n\nThis variant is used by generated snapshot endpoints that never pass alias remapping, so it keeps the snapshot root independent from optional alias-resolution machinery."
568)]
569pub(crate) fn storage_report_default<C: CanisterKind>(
570    db: &Db<C>,
571) -> Result<StorageReport, InternalError> {
572    db.ensure_recovered_state()?;
573    let mut data = Vec::new();
574    let mut index = Vec::new();
575    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
576    let mut corrupted_keys = 0u64;
577    let mut corrupted_entries = 0u64;
578
579    db.with_store_registry(|reg| {
580        // Keep diagnostics snapshots deterministic by traversing stores in path order.
581        let mut stores = reg.iter().collect::<Vec<_>>();
582        stores.sort_by_key(|(path, _)| *path);
583
584        for (path, store_handle) in stores {
585            // Phase 1: collect data-store snapshots and per-entity stats.
586            store_handle.with_data(|store| {
587                data.push(DataStoreSnapshot::new(
588                    path.to_string(),
589                    store.len(),
590                    store.memory_bytes(),
591                ));
592
593                // Track per-entity counts and byte footprint for snapshot output.
594                let mut by_entity = Vec::<(EntityTag, EntityStats)>::new();
595
596                for entry in store.iter() {
597                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
598                        corrupted_keys = corrupted_keys.saturating_add(1);
599                        continue;
600                    };
601
602                    let value_len = entry.value().len() as u64;
603
604                    update_default_entity_stats(&mut by_entity, dk.entity_tag(), value_len);
605                }
606
607                for (entity_tag, stats) in by_entity {
608                    entity_storage.push(EntitySnapshot::new(
609                        path.to_string(),
610                        storage_report_default_name_for_entity_tag(db, entity_tag),
611                        stats.entries,
612                        stats.memory_bytes,
613                    ));
614                }
615            });
616
617            // Phase 2: collect index-store snapshots and integrity counters.
618            store_handle.with_index(|store| {
619                let mut user_entries = 0u64;
620                let mut system_entries = 0u64;
621
622                for (key, value) in store.entries() {
623                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
624                        corrupted_entries = corrupted_entries.saturating_add(1);
625                        continue;
626                    };
627
628                    if decoded_key.uses_system_namespace() {
629                        system_entries = system_entries.saturating_add(1);
630                    } else {
631                        user_entries = user_entries.saturating_add(1);
632                    }
633
634                    if value.validate().is_err() {
635                        corrupted_entries = corrupted_entries.saturating_add(1);
636                    }
637                }
638
639                index.push(IndexStoreSnapshot::new(
640                    path.to_string(),
641                    store.len(),
642                    user_entries,
643                    system_entries,
644                    store.memory_bytes(),
645                    store.state(),
646                ));
647            });
648        }
649    });
650
651    // Phase 3: enforce deterministic entity snapshot emission order.
652    // This remains stable even if outer store traversal internals change.
653    entity_storage
654        .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
655
656    Ok(StorageReport::new(
657        data,
658        index,
659        entity_storage,
660        corrupted_keys,
661        corrupted_entries,
662    ))
663}
664
665#[cfg_attr(
666    doc,
667    doc = "Build one deterministic storage snapshot with per-entity rollups.\n\nThis path is read-only and fail-closed on decode/validation errors by counting corrupted keys/entries instead of panicking."
668)]
669pub(crate) fn storage_report<C: CanisterKind>(
670    db: &Db<C>,
671    name_to_path: &[(&'static str, &'static str)],
672) -> Result<StorageReport, InternalError> {
673    db.ensure_recovered_state()?;
674    // Build one optional alias map once, then resolve report names from the
675    // runtime hook table so entity tags keep distinct path identity even when
676    // multiple hooks intentionally share the same model name.
677    let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
678    let mut tag_name_map = BTreeMap::<EntityTag, &str>::new();
679    for hooks in db.entity_runtime_hooks {
680        tag_name_map
681            .entry(hooks.entity_tag)
682            .or_insert_with(|| storage_report_name_for_hook(&name_map, hooks));
683    }
684    let mut data = Vec::new();
685    let mut index = Vec::new();
686    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
687    let mut corrupted_keys = 0u64;
688    let mut corrupted_entries = 0u64;
689
690    db.with_store_registry(|reg| {
691        // Keep diagnostics snapshots deterministic by traversing stores in path order.
692        let mut stores = reg.iter().collect::<Vec<_>>();
693        stores.sort_by_key(|(path, _)| *path);
694
695        for (path, store_handle) in stores {
696            // Phase 1: collect data-store snapshots and per-entity stats.
697            store_handle.with_data(|store| {
698                data.push(DataStoreSnapshot::new(
699                    path.to_string(),
700                    store.len(),
701                    store.memory_bytes(),
702                ));
703
704                // Track per-entity counts and byte footprint for snapshot output.
705                let mut by_entity: BTreeMap<EntityTag, EntityStats> = BTreeMap::new();
706
707                for entry in store.iter() {
708                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
709                        corrupted_keys = corrupted_keys.saturating_add(1);
710                        continue;
711                    };
712
713                    let value_len = entry.value().len() as u64;
714
715                    by_entity
716                        .entry(dk.entity_tag())
717                        .or_default()
718                        .update(value_len);
719                }
720
721                for (entity_tag, stats) in by_entity {
722                    let path_name = tag_name_map
723                        .get(&entity_tag)
724                        .copied()
725                        .map(str::to_string)
726                        .or_else(|| {
727                            db.runtime_hook_for_entity_tag(entity_tag)
728                                .ok()
729                                .map(|hooks| {
730                                    storage_report_name_for_hook(&name_map, hooks).to_string()
731                                })
732                        })
733                        .unwrap_or_else(|| format!("#{}", entity_tag.value()));
734                    entity_storage.push(EntitySnapshot::new(
735                        path.to_string(),
736                        path_name,
737                        stats.entries,
738                        stats.memory_bytes,
739                    ));
740                }
741            });
742
743            // Phase 2: collect index-store snapshots and integrity counters.
744            store_handle.with_index(|store| {
745                let mut user_entries = 0u64;
746                let mut system_entries = 0u64;
747
748                for (key, value) in store.entries() {
749                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
750                        corrupted_entries = corrupted_entries.saturating_add(1);
751                        continue;
752                    };
753
754                    if decoded_key.uses_system_namespace() {
755                        system_entries = system_entries.saturating_add(1);
756                    } else {
757                        user_entries = user_entries.saturating_add(1);
758                    }
759
760                    if value.validate().is_err() {
761                        corrupted_entries = corrupted_entries.saturating_add(1);
762                    }
763                }
764
765                index.push(IndexStoreSnapshot::new(
766                    path.to_string(),
767                    store.len(),
768                    user_entries,
769                    system_entries,
770                    store.memory_bytes(),
771                    store.state(),
772                ));
773            });
774        }
775    });
776
777    // Phase 3: enforce deterministic entity snapshot emission order.
778    // This remains stable even if outer store traversal internals change.
779    entity_storage
780        .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
781
782    Ok(StorageReport::new(
783        data,
784        index,
785        entity_storage,
786        corrupted_keys,
787        corrupted_entries,
788    ))
789}
790
791#[cfg_attr(
792    doc,
793    doc = "Build one deterministic integrity scan over all registered stores.\n\nThis scan is read-only and classifies findings as:\n- corruption: malformed persisted bytes or inconsistent structural links\n- compatibility: persisted payloads outside decode compatibility windows\n- misuse: unsupported runtime wiring (for example missing entity hooks)"
794)]
795pub(crate) fn integrity_report<C: CanisterKind>(
796    db: &Db<C>,
797) -> Result<IntegrityReport, InternalError> {
798    db.ensure_recovered_state()?;
799
800    integrity_report_after_recovery(db)
801}
802
803#[cfg_attr(
804    doc,
805    doc = "Build one deterministic integrity scan after recovery has already completed.\n\nCallers running inside recovery flow should use this variant to avoid recursive recovery gating."
806)]
807pub(in crate::db) fn integrity_report_after_recovery<C: CanisterKind>(
808    db: &Db<C>,
809) -> Result<IntegrityReport, InternalError> {
810    build_integrity_report(db)
811}
812
813fn build_integrity_report<C: CanisterKind>(db: &Db<C>) -> Result<IntegrityReport, InternalError> {
814    let mut stores = Vec::new();
815    let mut totals = IntegrityTotals::default();
816    let global_live_keys_by_entity = collect_global_live_keys_by_entity(db)?;
817
818    db.with_store_registry(|reg| {
819        // Keep deterministic output order across registry traversal implementations.
820        let mut store_entries = reg.iter().collect::<Vec<_>>();
821        store_entries.sort_by_key(|(path, _)| *path);
822
823        for (path, store_handle) in store_entries {
824            let mut snapshot = IntegrityStoreSnapshot::new(path.to_string());
825            scan_store_forward_integrity(db, store_handle, &mut snapshot)?;
826            scan_store_reverse_integrity(store_handle, &global_live_keys_by_entity, &mut snapshot);
827
828            totals.add_store_snapshot(&snapshot);
829            stores.push(snapshot);
830        }
831
832        Ok::<(), InternalError>(())
833    })?;
834
835    Ok(IntegrityReport::new(stores, totals))
836}
837
838// Build one global map of live data keys grouped by entity across all stores.
839fn collect_global_live_keys_by_entity<C: CanisterKind>(
840    db: &Db<C>,
841) -> Result<BTreeMap<EntityTag, BTreeSet<StorageKey>>, InternalError> {
842    let mut keys = BTreeMap::<EntityTag, BTreeSet<StorageKey>>::new();
843
844    db.with_store_registry(|reg| {
845        for (_, store_handle) in reg.iter() {
846            store_handle.with_data(|data_store| {
847                for entry in data_store.iter() {
848                    if let Ok(data_key) = DataKey::try_from_raw(entry.key()) {
849                        keys.entry(data_key.entity_tag())
850                            .or_default()
851                            .insert(data_key.storage_key());
852                    }
853                }
854            });
855        }
856
857        Ok::<(), InternalError>(())
858    })?;
859
860    Ok(keys)
861}
862
863// Run forward (data -> index) integrity checks for one store.
864fn scan_store_forward_integrity<C: CanisterKind>(
865    db: &Db<C>,
866    store_handle: StoreHandle,
867    snapshot: &mut IntegrityStoreSnapshot,
868) -> Result<(), InternalError> {
869    store_handle.with_data(|data_store| {
870        for entry in data_store.iter() {
871            snapshot.data_rows_scanned = snapshot.data_rows_scanned.saturating_add(1);
872
873            let raw_key = *entry.key();
874
875            let Ok(data_key) = DataKey::try_from_raw(&raw_key) else {
876                snapshot.corrupted_data_keys = snapshot.corrupted_data_keys.saturating_add(1);
877                continue;
878            };
879
880            let hooks = match db.runtime_hook_for_entity_tag(data_key.entity_tag()) {
881                Ok(hooks) => hooks,
882                Err(err) => {
883                    classify_scan_error(err, snapshot)?;
884                    continue;
885                }
886            };
887
888            let marker_row = CommitRowOp::new(
889                hooks.entity_path,
890                raw_key,
891                None,
892                Some(entry.value().as_bytes().to_vec()),
893                crate::db::schema::commit_schema_fingerprint_for_model(
894                    hooks.entity_path,
895                    hooks.model,
896                ),
897            );
898
899            // Validate envelope compatibility before typed preparation so
900            // incompatible persisted formats remain compatibility-classified.
901            if let Err(err) = decode_structural_row_cbor(&entry.value()) {
902                classify_scan_error(err, snapshot)?;
903                continue;
904            }
905
906            let prepared = match db.prepare_row_commit_op(&marker_row) {
907                Ok(prepared) => prepared,
908                Err(err) => {
909                    classify_scan_error(err, snapshot)?;
910                    continue;
911                }
912            };
913
914            for index_op in prepared.index_ops {
915                let Some(expected_value) = index_op.value else {
916                    continue;
917                };
918
919                let actual = index_op
920                    .store
921                    .with_borrow(|index_store| index_store.get(&index_op.key));
922                match actual {
923                    Some(actual_value) if actual_value == expected_value => {}
924                    Some(_) => {
925                        snapshot.divergent_index_entries =
926                            snapshot.divergent_index_entries.saturating_add(1);
927                    }
928                    None => {
929                        snapshot.missing_index_entries =
930                            snapshot.missing_index_entries.saturating_add(1);
931                    }
932                }
933            }
934        }
935
936        Ok::<(), InternalError>(())
937    })
938}
939
940// Run reverse (index -> data) integrity checks for one store.
941fn scan_store_reverse_integrity(
942    store_handle: StoreHandle,
943    live_keys_by_entity: &BTreeMap<EntityTag, BTreeSet<StorageKey>>,
944    snapshot: &mut IntegrityStoreSnapshot,
945) {
946    store_handle.with_index(|index_store| {
947        for (raw_index_key, raw_index_entry) in index_store.entries() {
948            snapshot.index_entries_scanned = snapshot.index_entries_scanned.saturating_add(1);
949
950            let Ok(decoded_index_key) = IndexKey::try_from_raw(&raw_index_key) else {
951                snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
952                continue;
953            };
954
955            let index_entity_tag = data_entity_tag_for_index_key(&decoded_index_key);
956
957            let Ok(indexed_primary_keys) = raw_index_entry.decode_keys() else {
958                snapshot.corrupted_index_entries =
959                    snapshot.corrupted_index_entries.saturating_add(1);
960                continue;
961            };
962
963            for primary_key in indexed_primary_keys {
964                let exists = live_keys_by_entity
965                    .get(&index_entity_tag)
966                    .is_some_and(|entity_keys| entity_keys.contains(&primary_key));
967                if !exists {
968                    snapshot.orphan_index_references =
969                        snapshot.orphan_index_references.saturating_add(1);
970                }
971            }
972        }
973    });
974}
975
976// Map scan-time errors into explicit integrity classification buckets.
977fn classify_scan_error(
978    err: InternalError,
979    snapshot: &mut IntegrityStoreSnapshot,
980) -> Result<(), InternalError> {
981    match err.class() {
982        ErrorClass::Corruption => {
983            snapshot.corrupted_data_rows = snapshot.corrupted_data_rows.saturating_add(1);
984            Ok(())
985        }
986        ErrorClass::IncompatiblePersistedFormat => {
987            snapshot.compatibility_findings = snapshot.compatibility_findings.saturating_add(1);
988            Ok(())
989        }
990        ErrorClass::Unsupported | ErrorClass::NotFound | ErrorClass::Conflict => {
991            snapshot.misuse_findings = snapshot.misuse_findings.saturating_add(1);
992            Ok(())
993        }
994        ErrorClass::Internal | ErrorClass::InvariantViolation => Err(err),
995    }
996}
997
998// Parse the data-entity identity from one decoded index key.
999const fn data_entity_tag_for_index_key(index_key: &IndexKey) -> EntityTag {
1000    index_key.index_id().entity_tag
1001}