Skip to main content

obj/
integrity.rs

1//! `Db::integrity_check` — full bidirectional walk (M11 #90).
2//!
3//! Thin orchestrator over the obj-core
4//! [`obj_core::integrity`](obj_core::integrity) module: opens a read
5//! snapshot, walks every tree, cross-references each `Active` index
6//! against its primary, sweeps the freelist, and compares the set
7//! of reachable pages to `0..page_count` to surface orphans.
8//!
9//! The walk holds the pager mutex for its duration — readers and
10//! writers in other threads will queue behind it. M11 ships
11//! correctness; future revisions may relax the locking via a
12//! snapshot-aware walker that does not block writers.
13//!
14//! Power-of-ten Rule 4: this module's public entry point is short;
15//! helpers below the line factor per-collection work.
16
17use std::collections::HashSet;
18use std::sync::MutexGuard;
19use std::time::Instant;
20
21use obj_core::integrity::{
22    check_catalog_pointers, collect_primary_ids, cross_reference_index, walk_btree, walk_freelist,
23    IntegrityFailure, IntegrityReport, TreeContext,
24};
25use obj_core::pager::page::PageId;
26use obj_core::pager::Pager;
27use obj_core::platform::FileHandle;
28use obj_core::{Catalog, CollectionDescriptor, Error, IndexKind, IndexStatus, Result};
29
30use crate::Db;
31
32impl Db {
33    /// Run the on-demand full integrity walk and return a structured
34    /// [`IntegrityReport`].
35    ///
36    /// The walk:
37    /// 1. Opens a read snapshot (does NOT block writers).
38    /// 2. Walks the catalog B-tree and every `Active` collection's
39    ///    primary + index B-trees, validating per-page CRCs, sort
40    ///    invariants, depth and sibling-chain invariants.
41    /// 3. Cross-references each `Active` index against its primary:
42    ///    every index entry must point at an extant primary id, and
43    ///    every primary id must be referenced by at least one entry
44    ///    in each non-`Each` `Active` index.
45    /// 4. Sweeps the freelist chain.
46    /// 5. Compares the set of reachable pages to `0..page_count`,
47    ///    emitting [`IntegrityFailure::OrphanPage`] for each
48    ///    unreferenced page id.
49    ///
50    /// I/O failures during the walk surface as `Err(_)`; content-
51    /// level violations are accumulated into
52    /// `report.failures` and the walk continues.
53    ///
54    /// A lighter-weight catalog-only walk runs automatically at
55    /// [`Db::open`] time; opt out via
56    /// [`Config::skip_open_check`](crate::Config::skip_open_check).
57    ///
58    /// # Examples
59    ///
60    /// ```
61    /// # fn main() -> obj::Result<()> {
62    /// use obj::Db;
63    /// use serde::{Deserialize, Serialize};
64    ///
65    /// #[derive(Debug, Serialize, Deserialize, obj::Document)]
66    /// #[obj(collection = "users_integrity_doc")]
67    /// struct User { email: String }
68    ///
69    /// let dir = tempfile::tempdir()?;
70    /// let db = Db::open(dir.path().join("check.obj"))?;
71    /// for i in 0..16u32 {
72    ///     let _ = db.insert(User { email: format!("u{i}@example.com") })?;
73    /// }
74    /// let report = db.integrity_check()?;
75    /// assert!(report.is_ok(), "clean db must pass: {:?}", report.failures);
76    /// assert!(report.pages_checked > 0);
77    /// # Ok(())
78    /// # }
79    /// ```
80    ///
81    /// # Errors
82    ///
83    /// - [`Error::Io`] on cache-miss read failure during the walk.
84    /// - [`Error::Busy`] if the pager mutex is poisoned.
85    /// - Pager / B-tree errors propagated from the catalog walk.
86    #[cfg_attr(
87        feature = "tracing",
88        tracing::instrument(name = "db.integrity_check", level = "info", skip_all)
89    )]
90    pub fn integrity_check(&self) -> Result<IntegrityReport> {
91        let start = Instant::now();
92        let mut state = IntegrityState::new();
93        let mut pager = lock_pager(self)?;
94        state.pages_checked = state.pages_checked.saturating_add(1); // page 0 (header)
95        walk_catalog(&mut pager, &mut state)?;
96        walk_collections(&mut pager, &mut state)?;
97        walk_freelist_chain(&mut pager, &mut state)?;
98        detect_orphan_pages(&pager, &mut state);
99        Ok(IntegrityReport::new(
100            state.failures,
101            state.pages_checked,
102            start.elapsed(),
103        ))
104    }
105}
106
107/// Working state for the integrity walk. Accumulated as the walk
108/// progresses; consumed when the [`IntegrityReport`] is constructed.
109struct IntegrityState {
110    failures: Vec<IntegrityFailure>,
111    reachable: HashSet<PageId>,
112    pages_checked: u64,
113}
114
115impl IntegrityState {
116    fn new() -> Self {
117        Self {
118            failures: Vec::new(),
119            reachable: HashSet::new(),
120            pages_checked: 0,
121        }
122    }
123}
124
125fn lock_pager(db: &Db) -> Result<MutexGuard<'_, Pager<FileHandle>>> {
126    db.env.pager().lock().map_err(|_| Error::Busy {
127        kind: obj_core::LockKind::WriterInProcess,
128    })
129}
130
131fn walk_catalog(pager: &mut Pager<FileHandle>, state: &mut IntegrityState) -> Result<()> {
132    let raw = pager.root_catalog();
133    let Some(root) = PageId::new(raw) else {
134        return Ok(());
135    };
136    let page_count = pager.page_count();
137    if root.get() >= page_count {
138        state
139            .failures
140            .push(IntegrityFailure::DanglingCatalogPointer {
141                collection: "<catalog>".to_owned(),
142                index: None,
143                page_id: root.get(),
144            });
145        return Ok(());
146    }
147    let ctx = TreeContext {
148        label: "catalog".to_owned(),
149        root,
150    };
151    let walked = walk_btree(pager, &ctx, &mut state.reachable, &mut state.failures)?;
152    state.pages_checked = state.pages_checked.saturating_add(walked);
153    Ok(())
154}
155
156fn walk_collections(pager: &mut Pager<FileHandle>, state: &mut IntegrityState) -> Result<()> {
157    let raw = pager.root_catalog();
158    if PageId::new(raw).is_none() {
159        return Ok(());
160    }
161    let catalog = match Catalog::<FileHandle>::open_or_init(pager) {
162        Ok(c) => c,
163        Err(Error::Corruption { .. }) => return Ok(()),
164        Err(e) => return Err(e),
165    };
166    let rows = match catalog.list_collections(pager) {
167        Ok(r) => r,
168        Err(Error::Corruption { .. }) => return Ok(()),
169        Err(e) => return Err(e),
170    };
171    let page_count = pager.page_count();
172    for (name, descriptor) in rows {
173        check_catalog_pointers(&name, &descriptor, page_count, &mut state.failures);
174        walk_one_collection(pager, &name, &descriptor, state)?;
175    }
176    Ok(())
177}
178
179fn walk_one_collection(
180    pager: &mut Pager<FileHandle>,
181    name: &str,
182    descriptor: &CollectionDescriptor,
183    state: &mut IntegrityState,
184) -> Result<()> {
185    walk_primary_tree(pager, name, descriptor, state)?;
186    let mut primary_ids: HashSet<u64> = HashSet::new();
187    let _scanned = collect_primary_ids(pager, descriptor, &mut primary_ids)?;
188    let mut per_index: Vec<(String, IndexKind, HashSet<u64>)> = Vec::new();
189    for index in &descriptor.indexes {
190        if index.status != IndexStatus::Active {
191            continue;
192        }
193        walk_index_tree(pager, name, descriptor, index, state)?;
194        let mut referenced: HashSet<u64> = HashSet::new();
195        let _entries = cross_reference_index::<FileHandle>(
196            pager,
197            name,
198            index,
199            &primary_ids,
200            &mut referenced,
201            &mut state.failures,
202        )?;
203        per_index.push((index.name.clone(), index.kind, referenced));
204    }
205    obj_core::integrity::check_primary_to_index(
206        name,
207        descriptor,
208        &primary_ids,
209        &per_index,
210        &mut state.failures,
211    );
212    Ok(())
213}
214
215fn walk_primary_tree(
216    pager: &mut Pager<FileHandle>,
217    name: &str,
218    descriptor: &CollectionDescriptor,
219    state: &mut IntegrityState,
220) -> Result<()> {
221    let page_count = pager.page_count();
222    let Some(root) = PageId::new(descriptor.primary_root) else {
223        return Ok(());
224    };
225    if root.get() >= page_count {
226        return Ok(());
227    }
228    let ctx = TreeContext {
229        label: format!("primary:{name}"),
230        root,
231    };
232    let walked = walk_btree(pager, &ctx, &mut state.reachable, &mut state.failures)?;
233    state.pages_checked = state.pages_checked.saturating_add(walked);
234    Ok(())
235}
236
237fn walk_index_tree(
238    pager: &mut Pager<FileHandle>,
239    collection: &str,
240    _descriptor: &CollectionDescriptor,
241    index: &obj_core::IndexDescriptor,
242    state: &mut IntegrityState,
243) -> Result<()> {
244    let page_count = pager.page_count();
245    let Some(root) = PageId::new(index.root_page_id) else {
246        return Ok(());
247    };
248    if root.get() >= page_count {
249        return Ok(());
250    }
251    let ctx = TreeContext {
252        label: format!("index:{}.{}", collection, index.name),
253        root,
254    };
255    let walked = walk_btree(pager, &ctx, &mut state.reachable, &mut state.failures)?;
256    state.pages_checked = state.pages_checked.saturating_add(walked);
257    Ok(())
258}
259
260fn walk_freelist_chain(pager: &mut Pager<FileHandle>, state: &mut IntegrityState) -> Result<()> {
261    let head = pager.freelist_head();
262    let page_count = pager.page_count();
263    let walked = walk_freelist(
264        pager,
265        head,
266        page_count,
267        &mut state.reachable,
268        &mut state.failures,
269    )?;
270    state.pages_checked = state.pages_checked.saturating_add(walked);
271    Ok(())
272}
273
274fn detect_orphan_pages(pager: &Pager<FileHandle>, state: &mut IntegrityState) {
275    let page_count = pager.page_count();
276    // Page 0 is the header — reachable by definition.
277    let mut id: u64 = 1;
278    while id < page_count {
279        if let Some(pid) = PageId::new(id) {
280            if !state.reachable.contains(&pid) {
281                state
282                    .failures
283                    .push(IntegrityFailure::OrphanPage { page_id: id });
284            }
285        }
286        id = id.saturating_add(1);
287    }
288}