Skip to main content

icydb_core/db/diagnostics/
mod.rs

1//! Module: diagnostics
2//! Responsibility: read-only storage footprint and integrity snapshots.
3//! Does not own: recovery, write-path mutation, or query planning semantics.
4//! Boundary: consumes `Db`/store read APIs and returns DTO snapshots.
5
6mod execution_trace;
7#[cfg(test)]
8mod tests;
9
10use crate::{
11    db::{
12        Db, EntityRuntimeHooks,
13        commit::CommitRowOp,
14        data::{DataKey, StorageKey, decode_structural_row_payload},
15        index::{IndexKey, IndexState},
16        registry::StoreHandle,
17    },
18    error::{ErrorClass, InternalError},
19    traits::CanisterKind,
20    types::EntityTag,
21};
22use candid::CandidType;
23use serde::Deserialize;
24use std::collections::{BTreeMap, BTreeSet};
25
26pub use execution_trace::{
27    ExecutionAccessPathVariant, ExecutionMetrics, ExecutionOptimization, ExecutionTrace,
28};
29
30#[cfg_attr(doc, doc = "StorageReport\n\nLive storage snapshot payload.")]
31#[derive(CandidType, Clone, Debug, Default, Deserialize)]
32pub struct StorageReport {
33    pub(crate) storage_data: Vec<DataStoreSnapshot>,
34    pub(crate) storage_index: Vec<IndexStoreSnapshot>,
35    pub(crate) entity_storage: Vec<EntitySnapshot>,
36    pub(crate) corrupted_keys: u64,
37    pub(crate) corrupted_entries: u64,
38}
39
40#[cfg_attr(
41    doc,
42    doc = "IntegrityTotals\n\nAggregated integrity-scan counters across all stores."
43)]
44#[derive(CandidType, Clone, Debug, Default, Deserialize)]
45pub struct IntegrityTotals {
46    pub(crate) data_rows_scanned: u64,
47    pub(crate) index_entries_scanned: u64,
48    pub(crate) corrupted_data_keys: u64,
49    pub(crate) corrupted_data_rows: u64,
50    pub(crate) corrupted_index_keys: u64,
51    pub(crate) corrupted_index_entries: u64,
52    pub(crate) missing_index_entries: u64,
53    pub(crate) divergent_index_entries: u64,
54    pub(crate) orphan_index_references: u64,
55    pub(crate) misuse_findings: u64,
56}
57
58impl IntegrityTotals {
59    const fn add_store_snapshot(&mut self, store: &IntegrityStoreSnapshot) {
60        self.data_rows_scanned = self
61            .data_rows_scanned
62            .saturating_add(store.data_rows_scanned);
63        self.index_entries_scanned = self
64            .index_entries_scanned
65            .saturating_add(store.index_entries_scanned);
66        self.corrupted_data_keys = self
67            .corrupted_data_keys
68            .saturating_add(store.corrupted_data_keys);
69        self.corrupted_data_rows = self
70            .corrupted_data_rows
71            .saturating_add(store.corrupted_data_rows);
72        self.corrupted_index_keys = self
73            .corrupted_index_keys
74            .saturating_add(store.corrupted_index_keys);
75        self.corrupted_index_entries = self
76            .corrupted_index_entries
77            .saturating_add(store.corrupted_index_entries);
78        self.missing_index_entries = self
79            .missing_index_entries
80            .saturating_add(store.missing_index_entries);
81        self.divergent_index_entries = self
82            .divergent_index_entries
83            .saturating_add(store.divergent_index_entries);
84        self.orphan_index_references = self
85            .orphan_index_references
86            .saturating_add(store.orphan_index_references);
87        self.misuse_findings = self.misuse_findings.saturating_add(store.misuse_findings);
88    }
89
90    /// Return total number of data rows scanned.
91    #[must_use]
92    pub const fn data_rows_scanned(&self) -> u64 {
93        self.data_rows_scanned
94    }
95
96    /// Return total number of index entries scanned.
97    #[must_use]
98    pub const fn index_entries_scanned(&self) -> u64 {
99        self.index_entries_scanned
100    }
101
102    /// Return total number of corrupted data-key findings.
103    #[must_use]
104    pub const fn corrupted_data_keys(&self) -> u64 {
105        self.corrupted_data_keys
106    }
107
108    /// Return total number of corrupted data-row findings.
109    #[must_use]
110    pub const fn corrupted_data_rows(&self) -> u64 {
111        self.corrupted_data_rows
112    }
113
114    /// Return total number of corrupted index-key findings.
115    #[must_use]
116    pub const fn corrupted_index_keys(&self) -> u64 {
117        self.corrupted_index_keys
118    }
119
120    /// Return total number of corrupted index-entry findings.
121    #[must_use]
122    pub const fn corrupted_index_entries(&self) -> u64 {
123        self.corrupted_index_entries
124    }
125
126    /// Return total number of missing index-entry findings.
127    #[must_use]
128    pub const fn missing_index_entries(&self) -> u64 {
129        self.missing_index_entries
130    }
131
132    /// Return total number of divergent index-entry findings.
133    #[must_use]
134    pub const fn divergent_index_entries(&self) -> u64 {
135        self.divergent_index_entries
136    }
137
138    /// Return total number of orphan index-reference findings.
139    #[must_use]
140    pub const fn orphan_index_references(&self) -> u64 {
141        self.orphan_index_references
142    }
143
144    /// Return total number of misuse findings.
145    #[must_use]
146    pub const fn misuse_findings(&self) -> u64 {
147        self.misuse_findings
148    }
149}
150
151#[cfg_attr(
152    doc,
153    doc = "IntegrityStoreSnapshot\n\nPer-store integrity findings and scan counters."
154)]
155#[derive(CandidType, Clone, Debug, Default, Deserialize)]
156pub struct IntegrityStoreSnapshot {
157    pub(crate) path: String,
158    pub(crate) data_rows_scanned: u64,
159    pub(crate) index_entries_scanned: u64,
160    pub(crate) corrupted_data_keys: u64,
161    pub(crate) corrupted_data_rows: u64,
162    pub(crate) corrupted_index_keys: u64,
163    pub(crate) corrupted_index_entries: u64,
164    pub(crate) missing_index_entries: u64,
165    pub(crate) divergent_index_entries: u64,
166    pub(crate) orphan_index_references: u64,
167    pub(crate) misuse_findings: u64,
168}
169
170impl IntegrityStoreSnapshot {
171    /// Construct one empty store-level integrity snapshot.
172    #[must_use]
173    pub fn new(path: String) -> Self {
174        Self {
175            path,
176            ..Self::default()
177        }
178    }
179
180    /// Borrow store path.
181    #[must_use]
182    pub const fn path(&self) -> &str {
183        self.path.as_str()
184    }
185
186    /// Return number of scanned data rows.
187    #[must_use]
188    pub const fn data_rows_scanned(&self) -> u64 {
189        self.data_rows_scanned
190    }
191
192    /// Return number of scanned index entries.
193    #[must_use]
194    pub const fn index_entries_scanned(&self) -> u64 {
195        self.index_entries_scanned
196    }
197
198    /// Return number of corrupted data-key findings.
199    #[must_use]
200    pub const fn corrupted_data_keys(&self) -> u64 {
201        self.corrupted_data_keys
202    }
203
204    /// Return number of corrupted data-row findings.
205    #[must_use]
206    pub const fn corrupted_data_rows(&self) -> u64 {
207        self.corrupted_data_rows
208    }
209
210    /// Return number of corrupted index-key findings.
211    #[must_use]
212    pub const fn corrupted_index_keys(&self) -> u64 {
213        self.corrupted_index_keys
214    }
215
216    /// Return number of corrupted index-entry findings.
217    #[must_use]
218    pub const fn corrupted_index_entries(&self) -> u64 {
219        self.corrupted_index_entries
220    }
221
222    /// Return number of missing index-entry findings.
223    #[must_use]
224    pub const fn missing_index_entries(&self) -> u64 {
225        self.missing_index_entries
226    }
227
228    /// Return number of divergent index-entry findings.
229    #[must_use]
230    pub const fn divergent_index_entries(&self) -> u64 {
231        self.divergent_index_entries
232    }
233
234    /// Return number of orphan index-reference findings.
235    #[must_use]
236    pub const fn orphan_index_references(&self) -> u64 {
237        self.orphan_index_references
238    }
239
240    /// Return number of misuse findings.
241    #[must_use]
242    pub const fn misuse_findings(&self) -> u64 {
243        self.misuse_findings
244    }
245}
246
247#[cfg_attr(
248    doc,
249    doc = "IntegrityReport\n\nFull integrity-scan output across all registered stores."
250)]
251#[derive(CandidType, Clone, Debug, Default, Deserialize)]
252pub struct IntegrityReport {
253    pub(crate) stores: Vec<IntegrityStoreSnapshot>,
254    pub(crate) totals: IntegrityTotals,
255}
256
257impl IntegrityReport {
258    /// Construct one integrity report payload.
259    #[must_use]
260    pub const fn new(stores: Vec<IntegrityStoreSnapshot>, totals: IntegrityTotals) -> Self {
261        Self { stores, totals }
262    }
263
264    /// Borrow per-store integrity snapshots.
265    #[must_use]
266    pub const fn stores(&self) -> &[IntegrityStoreSnapshot] {
267        self.stores.as_slice()
268    }
269
270    /// Borrow aggregated integrity totals.
271    #[must_use]
272    pub const fn totals(&self) -> &IntegrityTotals {
273        &self.totals
274    }
275}
276
277impl StorageReport {
278    /// Construct one storage report payload.
279    #[must_use]
280    pub const fn new(
281        storage_data: Vec<DataStoreSnapshot>,
282        storage_index: Vec<IndexStoreSnapshot>,
283        entity_storage: Vec<EntitySnapshot>,
284        corrupted_keys: u64,
285        corrupted_entries: u64,
286    ) -> Self {
287        Self {
288            storage_data,
289            storage_index,
290            entity_storage,
291            corrupted_keys,
292            corrupted_entries,
293        }
294    }
295
296    /// Borrow data-store snapshots.
297    #[must_use]
298    pub const fn storage_data(&self) -> &[DataStoreSnapshot] {
299        self.storage_data.as_slice()
300    }
301
302    /// Borrow index-store snapshots.
303    #[must_use]
304    pub const fn storage_index(&self) -> &[IndexStoreSnapshot] {
305        self.storage_index.as_slice()
306    }
307
308    /// Borrow entity-level storage snapshots.
309    #[must_use]
310    pub const fn entity_storage(&self) -> &[EntitySnapshot] {
311        self.entity_storage.as_slice()
312    }
313
314    /// Return count of corrupted decoded data keys.
315    #[must_use]
316    pub const fn corrupted_keys(&self) -> u64 {
317        self.corrupted_keys
318    }
319
320    /// Return count of corrupted index entries.
321    #[must_use]
322    pub const fn corrupted_entries(&self) -> u64 {
323        self.corrupted_entries
324    }
325}
326
327#[cfg_attr(doc, doc = "DataStoreSnapshot\n\nData-store snapshot row.")]
328#[derive(CandidType, Clone, Debug, Default, Deserialize)]
329pub struct DataStoreSnapshot {
330    pub(crate) path: String,
331    pub(crate) entries: u64,
332    pub(crate) memory_bytes: u64,
333}
334
335impl DataStoreSnapshot {
336    /// Construct one data-store snapshot row.
337    #[must_use]
338    pub const fn new(path: String, entries: u64, memory_bytes: u64) -> Self {
339        Self {
340            path,
341            entries,
342            memory_bytes,
343        }
344    }
345
346    /// Borrow store path.
347    #[must_use]
348    pub const fn path(&self) -> &str {
349        self.path.as_str()
350    }
351
352    /// Return row count.
353    #[must_use]
354    pub const fn entries(&self) -> u64 {
355        self.entries
356    }
357
358    /// Return memory usage in bytes.
359    #[must_use]
360    pub const fn memory_bytes(&self) -> u64 {
361        self.memory_bytes
362    }
363}
364
365#[cfg_attr(doc, doc = "IndexStoreSnapshot\n\nIndex-store snapshot row.")]
366#[derive(CandidType, Clone, Debug, Default, Deserialize)]
367pub struct IndexStoreSnapshot {
368    pub(crate) path: String,
369    pub(crate) entries: u64,
370    pub(crate) user_entries: u64,
371    pub(crate) system_entries: u64,
372    pub(crate) memory_bytes: u64,
373    pub(crate) state: IndexState,
374}
375
376impl IndexStoreSnapshot {
377    /// Construct one index-store snapshot row.
378    #[must_use]
379    pub const fn new(
380        path: String,
381        entries: u64,
382        user_entries: u64,
383        system_entries: u64,
384        memory_bytes: u64,
385        state: IndexState,
386    ) -> Self {
387        Self {
388            path,
389            entries,
390            user_entries,
391            system_entries,
392            memory_bytes,
393            state,
394        }
395    }
396
397    /// Borrow store path.
398    #[must_use]
399    pub const fn path(&self) -> &str {
400        self.path.as_str()
401    }
402
403    /// Return total entry count.
404    #[must_use]
405    pub const fn entries(&self) -> u64 {
406        self.entries
407    }
408
409    /// Return user-namespace entry count.
410    #[must_use]
411    pub const fn user_entries(&self) -> u64 {
412        self.user_entries
413    }
414
415    /// Return system-namespace entry count.
416    #[must_use]
417    pub const fn system_entries(&self) -> u64 {
418        self.system_entries
419    }
420
421    /// Return memory usage in bytes.
422    #[must_use]
423    pub const fn memory_bytes(&self) -> u64 {
424        self.memory_bytes
425    }
426
427    /// Return the current explicit runtime lifecycle state for this index
428    /// store snapshot.
429    #[must_use]
430    pub const fn state(&self) -> IndexState {
431        self.state
432    }
433}
434
435#[cfg_attr(doc, doc = "EntitySnapshot\n\nPer-entity storage snapshot row.")]
436#[derive(CandidType, Clone, Debug, Default, Deserialize)]
437pub struct EntitySnapshot {
438    pub(crate) store: String,
439
440    pub(crate) path: String,
441
442    pub(crate) entries: u64,
443
444    pub(crate) memory_bytes: u64,
445}
446
447impl EntitySnapshot {
448    /// Construct one entity-storage snapshot row.
449    #[must_use]
450    pub const fn new(store: String, path: String, entries: u64, memory_bytes: u64) -> Self {
451        Self {
452            store,
453            path,
454            entries,
455            memory_bytes,
456        }
457    }
458
459    /// Borrow store path.
460    #[must_use]
461    pub const fn store(&self) -> &str {
462        self.store.as_str()
463    }
464
465    /// Borrow entity path.
466    #[must_use]
467    pub const fn path(&self) -> &str {
468        self.path.as_str()
469    }
470
471    /// Return row count.
472    #[must_use]
473    pub const fn entries(&self) -> u64 {
474        self.entries
475    }
476
477    /// Return memory usage in bytes.
478    #[must_use]
479    pub const fn memory_bytes(&self) -> u64 {
480        self.memory_bytes
481    }
482}
483
484#[cfg_attr(
485    doc,
486    doc = "EntityStats\n\nInternal struct for building per-entity stats before snapshotting."
487)]
488#[derive(Default)]
489struct EntityStats {
490    entries: u64,
491    memory_bytes: u64,
492}
493
494impl EntityStats {
495    // Accumulate per-entity entry count and byte footprint for snapshot output.
496    const fn update(&mut self, value_len: u64) {
497        self.entries = self.entries.saturating_add(1);
498        self.memory_bytes = self
499            .memory_bytes
500            .saturating_add(DataKey::entry_size_bytes(value_len));
501    }
502}
503
504// Update one small per-store entity-stat accumulator without pulling ordered
505// map machinery into the default snapshot path. Final output ordering is still
506// enforced later on the emitted snapshot rows.
507fn update_default_entity_stats(
508    entity_stats: &mut Vec<(EntityTag, EntityStats)>,
509    entity_tag: EntityTag,
510    value_len: u64,
511) {
512    if let Some((_, stats)) = entity_stats
513        .iter_mut()
514        .find(|(existing_tag, _)| *existing_tag == entity_tag)
515    {
516        stats.update(value_len);
517        return;
518    }
519
520    let mut stats = EntityStats::default();
521    stats.update(value_len);
522    entity_stats.push((entity_tag, stats));
523}
524
525fn storage_report_name_for_hook<'a, C: CanisterKind>(
526    name_map: &BTreeMap<&'static str, &'a str>,
527    hooks: &EntityRuntimeHooks<C>,
528) -> &'a str {
529    name_map
530        .get(hooks.entity_path)
531        .copied()
532        .or_else(|| name_map.get(hooks.model.name()).copied())
533        .unwrap_or(hooks.entity_path)
534}
535
536// Resolve one default entity path label for storage snapshots without pulling
537// alias/path remapping support into the caller.
538fn storage_report_default_name_for_entity_tag<C: CanisterKind>(
539    db: &Db<C>,
540    entity_tag: EntityTag,
541) -> String {
542    db.runtime_hook_for_entity_tag(entity_tag).ok().map_or_else(
543        || format!("#{}", entity_tag.value()),
544        |hooks| hooks.entity_path.to_string(),
545    )
546}
547
548#[cfg_attr(
549    doc,
550    doc = "Build one deterministic storage snapshot with default entity-path names.\n\nThis variant is used by generated snapshot endpoints that never pass alias remapping, so it keeps the snapshot root independent from optional alias-resolution machinery."
551)]
552pub(crate) fn storage_report_default<C: CanisterKind>(
553    db: &Db<C>,
554) -> Result<StorageReport, InternalError> {
555    db.ensure_recovered_state()?;
556    let mut data = Vec::new();
557    let mut index = Vec::new();
558    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
559    let mut corrupted_keys = 0u64;
560    let mut corrupted_entries = 0u64;
561
562    db.with_store_registry(|reg| {
563        // Keep diagnostics snapshots deterministic by traversing stores in path order.
564        let mut stores = reg.iter().collect::<Vec<_>>();
565        stores.sort_by_key(|(path, _)| *path);
566
567        for (path, store_handle) in stores {
568            // Phase 1: collect data-store snapshots and per-entity stats.
569            store_handle.with_data(|store| {
570                data.push(DataStoreSnapshot::new(
571                    path.to_string(),
572                    store.len(),
573                    store.memory_bytes(),
574                ));
575
576                // Track per-entity counts and byte footprint for snapshot output.
577                let mut by_entity = Vec::<(EntityTag, EntityStats)>::new();
578
579                for entry in store.iter() {
580                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
581                        corrupted_keys = corrupted_keys.saturating_add(1);
582                        continue;
583                    };
584
585                    let value_len = entry.value().len() as u64;
586
587                    update_default_entity_stats(&mut by_entity, dk.entity_tag(), value_len);
588                }
589
590                for (entity_tag, stats) in by_entity {
591                    entity_storage.push(EntitySnapshot::new(
592                        path.to_string(),
593                        storage_report_default_name_for_entity_tag(db, entity_tag),
594                        stats.entries,
595                        stats.memory_bytes,
596                    ));
597                }
598            });
599
600            // Phase 2: collect index-store snapshots and integrity counters.
601            store_handle.with_index(|store| {
602                let mut user_entries = 0u64;
603                let mut system_entries = 0u64;
604
605                for (key, value) in store.entries() {
606                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
607                        corrupted_entries = corrupted_entries.saturating_add(1);
608                        continue;
609                    };
610
611                    if decoded_key.uses_system_namespace() {
612                        system_entries = system_entries.saturating_add(1);
613                    } else {
614                        user_entries = user_entries.saturating_add(1);
615                    }
616
617                    if value.validate().is_err() {
618                        corrupted_entries = corrupted_entries.saturating_add(1);
619                    }
620                }
621
622                index.push(IndexStoreSnapshot::new(
623                    path.to_string(),
624                    store.len(),
625                    user_entries,
626                    system_entries,
627                    store.memory_bytes(),
628                    store.state(),
629                ));
630            });
631        }
632    });
633
634    // Phase 3: enforce deterministic entity snapshot emission order.
635    // This remains stable even if outer store traversal internals change.
636    entity_storage
637        .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
638
639    Ok(StorageReport::new(
640        data,
641        index,
642        entity_storage,
643        corrupted_keys,
644        corrupted_entries,
645    ))
646}
647
648#[cfg_attr(
649    doc,
650    doc = "Build one deterministic storage snapshot with per-entity rollups.\n\nThis path is read-only and fail-closed on decode/validation errors by counting corrupted keys/entries instead of panicking."
651)]
652pub(crate) fn storage_report<C: CanisterKind>(
653    db: &Db<C>,
654    name_to_path: &[(&'static str, &'static str)],
655) -> Result<StorageReport, InternalError> {
656    db.ensure_recovered_state()?;
657    // Build one optional alias map once, then resolve report names from the
658    // runtime hook table so entity tags keep distinct path identity even when
659    // multiple hooks intentionally share the same model name.
660    let name_map: BTreeMap<&'static str, &str> = name_to_path.iter().copied().collect();
661    let mut tag_name_map = BTreeMap::<EntityTag, &str>::new();
662    for hooks in db.entity_runtime_hooks {
663        tag_name_map
664            .entry(hooks.entity_tag)
665            .or_insert_with(|| storage_report_name_for_hook(&name_map, hooks));
666    }
667    let mut data = Vec::new();
668    let mut index = Vec::new();
669    let mut entity_storage: Vec<EntitySnapshot> = Vec::new();
670    let mut corrupted_keys = 0u64;
671    let mut corrupted_entries = 0u64;
672
673    db.with_store_registry(|reg| {
674        // Keep diagnostics snapshots deterministic by traversing stores in path order.
675        let mut stores = reg.iter().collect::<Vec<_>>();
676        stores.sort_by_key(|(path, _)| *path);
677
678        for (path, store_handle) in stores {
679            // Phase 1: collect data-store snapshots and per-entity stats.
680            store_handle.with_data(|store| {
681                data.push(DataStoreSnapshot::new(
682                    path.to_string(),
683                    store.len(),
684                    store.memory_bytes(),
685                ));
686
687                // Track per-entity counts and byte footprint for snapshot output.
688                let mut by_entity: BTreeMap<EntityTag, EntityStats> = BTreeMap::new();
689
690                for entry in store.iter() {
691                    let Ok(dk) = DataKey::try_from_raw(entry.key()) else {
692                        corrupted_keys = corrupted_keys.saturating_add(1);
693                        continue;
694                    };
695
696                    let value_len = entry.value().len() as u64;
697
698                    by_entity
699                        .entry(dk.entity_tag())
700                        .or_default()
701                        .update(value_len);
702                }
703
704                for (entity_tag, stats) in by_entity {
705                    let path_name = tag_name_map
706                        .get(&entity_tag)
707                        .copied()
708                        .map(str::to_string)
709                        .or_else(|| {
710                            db.runtime_hook_for_entity_tag(entity_tag)
711                                .ok()
712                                .map(|hooks| {
713                                    storage_report_name_for_hook(&name_map, hooks).to_string()
714                                })
715                        })
716                        .unwrap_or_else(|| format!("#{}", entity_tag.value()));
717                    entity_storage.push(EntitySnapshot::new(
718                        path.to_string(),
719                        path_name,
720                        stats.entries,
721                        stats.memory_bytes,
722                    ));
723                }
724            });
725
726            // Phase 2: collect index-store snapshots and integrity counters.
727            store_handle.with_index(|store| {
728                let mut user_entries = 0u64;
729                let mut system_entries = 0u64;
730
731                for (key, value) in store.entries() {
732                    let Ok(decoded_key) = IndexKey::try_from_raw(&key) else {
733                        corrupted_entries = corrupted_entries.saturating_add(1);
734                        continue;
735                    };
736
737                    if decoded_key.uses_system_namespace() {
738                        system_entries = system_entries.saturating_add(1);
739                    } else {
740                        user_entries = user_entries.saturating_add(1);
741                    }
742
743                    if value.validate().is_err() {
744                        corrupted_entries = corrupted_entries.saturating_add(1);
745                    }
746                }
747
748                index.push(IndexStoreSnapshot::new(
749                    path.to_string(),
750                    store.len(),
751                    user_entries,
752                    system_entries,
753                    store.memory_bytes(),
754                    store.state(),
755                ));
756            });
757        }
758    });
759
760    // Phase 3: enforce deterministic entity snapshot emission order.
761    // This remains stable even if outer store traversal internals change.
762    entity_storage
763        .sort_by(|left, right| (left.store(), left.path()).cmp(&(right.store(), right.path())));
764
765    Ok(StorageReport::new(
766        data,
767        index,
768        entity_storage,
769        corrupted_keys,
770        corrupted_entries,
771    ))
772}
773
774#[cfg_attr(
775    doc,
776    doc = "Build one deterministic integrity scan over all registered stores.\n\nThis scan is read-only and classifies findings as:\n- corruption: malformed persisted bytes, incompatible persisted formats, or inconsistent structural links\n- misuse: unsupported runtime wiring (for example missing entity hooks)"
777)]
778pub(crate) fn integrity_report<C: CanisterKind>(
779    db: &Db<C>,
780) -> Result<IntegrityReport, InternalError> {
781    db.ensure_recovered_state()?;
782
783    integrity_report_after_recovery(db)
784}
785
786#[cfg_attr(
787    doc,
788    doc = "Build one deterministic integrity scan after recovery has already completed.\n\nCallers running inside recovery flow should use this variant to avoid recursive recovery gating."
789)]
790pub(in crate::db) fn integrity_report_after_recovery<C: CanisterKind>(
791    db: &Db<C>,
792) -> Result<IntegrityReport, InternalError> {
793    build_integrity_report(db)
794}
795
796fn build_integrity_report<C: CanisterKind>(db: &Db<C>) -> Result<IntegrityReport, InternalError> {
797    let mut stores = Vec::new();
798    let mut totals = IntegrityTotals::default();
799    let global_live_keys_by_entity = collect_global_live_keys_by_entity(db)?;
800
801    db.with_store_registry(|reg| {
802        // Keep deterministic output order across registry traversal implementations.
803        let mut store_entries = reg.iter().collect::<Vec<_>>();
804        store_entries.sort_by_key(|(path, _)| *path);
805
806        for (path, store_handle) in store_entries {
807            let mut snapshot = IntegrityStoreSnapshot::new(path.to_string());
808            scan_store_forward_integrity(db, store_handle, &mut snapshot)?;
809            scan_store_reverse_integrity(store_handle, &global_live_keys_by_entity, &mut snapshot);
810
811            totals.add_store_snapshot(&snapshot);
812            stores.push(snapshot);
813        }
814
815        Ok::<(), InternalError>(())
816    })?;
817
818    Ok(IntegrityReport::new(stores, totals))
819}
820
821// Build one global map of live data keys grouped by entity across all stores.
822fn collect_global_live_keys_by_entity<C: CanisterKind>(
823    db: &Db<C>,
824) -> Result<BTreeMap<EntityTag, BTreeSet<StorageKey>>, InternalError> {
825    let mut keys = BTreeMap::<EntityTag, BTreeSet<StorageKey>>::new();
826
827    db.with_store_registry(|reg| {
828        for (_, store_handle) in reg.iter() {
829            store_handle.with_data(|data_store| {
830                for entry in data_store.iter() {
831                    if let Ok(data_key) = DataKey::try_from_raw(entry.key()) {
832                        keys.entry(data_key.entity_tag())
833                            .or_default()
834                            .insert(data_key.storage_key());
835                    }
836                }
837            });
838        }
839
840        Ok::<(), InternalError>(())
841    })?;
842
843    Ok(keys)
844}
845
846// Run forward (data -> index) integrity checks for one store.
847fn scan_store_forward_integrity<C: CanisterKind>(
848    db: &Db<C>,
849    store_handle: StoreHandle,
850    snapshot: &mut IntegrityStoreSnapshot,
851) -> Result<(), InternalError> {
852    store_handle.with_data(|data_store| {
853        for entry in data_store.iter() {
854            snapshot.data_rows_scanned = snapshot.data_rows_scanned.saturating_add(1);
855
856            let raw_key = *entry.key();
857
858            let Ok(data_key) = DataKey::try_from_raw(&raw_key) else {
859                snapshot.corrupted_data_keys = snapshot.corrupted_data_keys.saturating_add(1);
860                continue;
861            };
862
863            let hooks = match db.runtime_hook_for_entity_tag(data_key.entity_tag()) {
864                Ok(hooks) => hooks,
865                Err(err) => {
866                    classify_scan_error(err, snapshot)?;
867                    continue;
868                }
869            };
870
871            let marker_row = CommitRowOp::new(
872                hooks.entity_path,
873                raw_key,
874                None,
875                Some(entry.value().as_bytes().to_vec()),
876                crate::db::schema::commit_schema_fingerprint_for_model(
877                    hooks.entity_path,
878                    hooks.model,
879                ),
880            );
881
882            // Validate the outer row envelope before typed preparation so
883            // hard-cut persisted-format mismatches count as corruption.
884            if let Err(err) = decode_structural_row_payload(&entry.value()) {
885                classify_scan_error(err, snapshot)?;
886                continue;
887            }
888
889            let prepared = match db.prepare_row_commit_op(&marker_row) {
890                Ok(prepared) => prepared,
891                Err(err) => {
892                    classify_scan_error(err, snapshot)?;
893                    continue;
894                }
895            };
896
897            for index_op in prepared.index_ops {
898                let Some(expected_value) = index_op.value else {
899                    continue;
900                };
901
902                let actual = index_op
903                    .store
904                    .with_borrow(|index_store| index_store.get(&index_op.key));
905                match actual {
906                    Some(actual_value) if actual_value == expected_value => {}
907                    Some(_) => {
908                        snapshot.divergent_index_entries =
909                            snapshot.divergent_index_entries.saturating_add(1);
910                    }
911                    None => {
912                        snapshot.missing_index_entries =
913                            snapshot.missing_index_entries.saturating_add(1);
914                    }
915                }
916            }
917        }
918
919        Ok::<(), InternalError>(())
920    })
921}
922
923// Run reverse (index -> data) integrity checks for one store.
924fn scan_store_reverse_integrity(
925    store_handle: StoreHandle,
926    live_keys_by_entity: &BTreeMap<EntityTag, BTreeSet<StorageKey>>,
927    snapshot: &mut IntegrityStoreSnapshot,
928) {
929    store_handle.with_index(|index_store| {
930        for (raw_index_key, raw_index_entry) in index_store.entries() {
931            snapshot.index_entries_scanned = snapshot.index_entries_scanned.saturating_add(1);
932
933            let Ok(decoded_index_key) = IndexKey::try_from_raw(&raw_index_key) else {
934                snapshot.corrupted_index_keys = snapshot.corrupted_index_keys.saturating_add(1);
935                continue;
936            };
937
938            let index_entity_tag = data_entity_tag_for_index_key(&decoded_index_key);
939
940            let Ok(indexed_primary_keys) = raw_index_entry.decode_keys() else {
941                snapshot.corrupted_index_entries =
942                    snapshot.corrupted_index_entries.saturating_add(1);
943                continue;
944            };
945
946            for primary_key in indexed_primary_keys {
947                let exists = live_keys_by_entity
948                    .get(&index_entity_tag)
949                    .is_some_and(|entity_keys| entity_keys.contains(&primary_key));
950                if !exists {
951                    snapshot.orphan_index_references =
952                        snapshot.orphan_index_references.saturating_add(1);
953                }
954            }
955        }
956    });
957}
958
959// Map scan-time errors into explicit integrity classification buckets.
960fn classify_scan_error(
961    err: InternalError,
962    snapshot: &mut IntegrityStoreSnapshot,
963) -> Result<(), InternalError> {
964    match err.class() {
965        ErrorClass::Corruption | ErrorClass::IncompatiblePersistedFormat => {
966            snapshot.corrupted_data_rows = snapshot.corrupted_data_rows.saturating_add(1);
967            Ok(())
968        }
969        ErrorClass::Unsupported | ErrorClass::NotFound | ErrorClass::Conflict => {
970            snapshot.misuse_findings = snapshot.misuse_findings.saturating_add(1);
971            Ok(())
972        }
973        ErrorClass::Internal | ErrorClass::InvariantViolation => Err(err),
974    }
975}
976
977// Parse the data-entity identity from one decoded index key.
978const fn data_entity_tag_for_index_key(index_key: &IndexKey) -> EntityTag {
979    index_key.index_id().entity_tag
980}