Skip to main content

obj_core/
integrity.rs

1//! On-disk integrity check (M11 #90).
2//!
3//! [`IntegrityReport`] is the structured output of
4//! [`crate::pager::Pager`]-level walks that verify every documented
5//! invariant of the `.obj` file format. The public driver lives in
6//! the `obj` crate (`obj::Db::integrity_check`); this module hosts
7//! the data types and the obj-core-internal helpers that do the
8//! heavy lifting.
9//!
10//! # Power-of-ten posture
11//!
12//! - **Rule 2.** Every walk is bounded by `page_count` (the page-
13//!   reachability sweep), [`crate::btree::MAX_BTREE_DEPTH`] (the
14//!   B-tree descent), or [`crate::btree::MAX_RANGE_NODES`] (the
15//!   B-tree iteration).
16//! - **Rule 4.** Each entry point is short; the implementation
17//!   factors per-tree / per-index helpers.
18//! - **Rule 7.** No `unwrap` / `expect` on the production path. I/O
19//!   failures bubble up via `Result::Err`; invariant violations are
20//!   recorded into the report's `failures` vector and the walk
21//!   continues.
22//! - **Rule 9.** No `dyn`: every helper is generic over
23//!   `F: FileBackend` (no trait-object dispatch on the hot path).
24
25#![forbid(unsafe_code)]
26// The integrity walker only ever uses the default `HashSet` hasher;
27// the helpers below take `&HashSet<u64>` / `&HashSet<PageId>` rather
28// than the `HashSet<_, S: BuildHasher>` shape that
29// `clippy::pedantic::implicit_hasher` would prefer. Adding a hasher
30// type parameter to every helper signature is noise for an internal
31// integrity API that callers never instantiate with a non-default
32// hasher.
33#![allow(clippy::implicit_hasher)]
34
35use std::collections::HashSet;
36use std::time::Duration;
37
38use crate::btree::node::{decode_node, NodeKind};
39use crate::btree::{MAX_BTREE_DEPTH, MAX_RANGE_NODES};
40use crate::catalog::{Catalog, CollectionDescriptor, IndexDescriptor, IndexStatus};
41use crate::error::{Error, Result};
42use crate::id::Id;
43use crate::index::IndexKind;
44use crate::pager::checksum::page_trailer_valid;
45use crate::pager::freelist::decode as decode_freelist_page;
46use crate::pager::page::PageId;
47use crate::pager::Pager;
48use crate::platform::FileBackend;
49
50use heapless::Vec as HeaplessVec;
51
52/// Categorical reasons an integrity walk records a failure. Every
53/// variant carries the locus of the problem (page id, collection,
54/// index name, document id) so an operator can root-cause without
55/// re-running the check.
56///
57/// The integrity walker accumulates a `Vec<IntegrityFailure>` and
58/// returns it inside an [`IntegrityReport`] — every failure here is
59/// recoverable in the sense that the check **completed**; the
60/// caller decides whether to repair the file or refuse to open it.
61/// I/O failures during the walk surface as an outer `Result::Err`
62/// instead (the walk could not finish).
63#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
64#[non_exhaustive]
65pub enum IntegrityFailure {
66    /// A page's CRC32C trailer did not verify. The page-id is the
67    /// locus; page `0` would be a header CRC failure (the file
68    /// header carries its own CRC, not a page trailer — this
69    /// variant is emitted for any page whose trailer fails the
70    /// per-page integrity check).
71    ChecksumMismatch {
72        /// Page where the corruption was detected.
73        page_id: u64,
74    },
75    /// A page in `1..page_count` is not reachable from any root the
76    /// walker recognises (catalog, freelist, any primary or index
77    /// B-tree). Indicates either a leaked page (allocated, never
78    /// linked) or a corrupted root pointer somewhere upstream.
79    OrphanPage {
80        /// The unreferenced page id.
81        page_id: u64,
82    },
83    /// An index B-tree entry references a primary id that does NOT
84    /// exist in the collection's primary B-tree. Indicates the
85    /// index is out-of-sync (likely a partial write that survived
86    /// recovery, or a forensic mutation outside obj).
87    OrphanIndexEntry {
88        /// Owning collection name.
89        collection: String,
90        /// Owning index name.
91        index: String,
92        /// Primary id the index entry pointed at.
93        id: u64,
94    },
95    /// A primary B-tree entry has no matching entry in one of the
96    /// collection's `Active` indexes. The walker checks the
97    /// per-index `(id-suffix-or-value → at-least-one-entry)`
98    /// invariant; an `Each` index with an empty sequence is NOT
99    /// reported (legal: the doc contributes no keys). For
100    /// `Standard` / `Unique` / `Composite` the absence of any
101    /// matching entry is a violation.
102    MissingIndexEntry {
103        /// Owning collection name.
104        collection: String,
105        /// Owning index name.
106        index: String,
107        /// Primary id with no matching index entry.
108        id: u64,
109    },
110    /// A B+tree node failed the sort invariant: a key was followed
111    /// by a strictly-lesser-or-equal key inside the same node.
112    BTreeSortViolation {
113        /// Page where the violation was detected.
114        page_id: u64,
115    },
116    /// A B+tree's child-pointer graph contains a cycle — the same
117    /// page is reachable as the child of two distinct ancestors (or
118    /// of itself). The `tree` field names the containing B-tree by
119    /// a human-readable label (`"catalog"`,
120    /// `"primary:<collection>"`, `"index:<collection>.<index>"`).
121    ///
122    /// Historically this variant also covered leaf `next_sibling`
123    /// chain breakage; the chain check was removed in M11 because
124    /// `CoW` deliberately leaves left-neighbour `next_sibling`
125    /// pointers stale (see `crates/obj-core/src/btree/range.rs`
126    /// lines 16-31). The variant name is preserved for compatibility
127    /// — it now signals only the descent-graph cycle path.
128    BTreeSiblingChainBroken {
129        /// Label identifying which B-tree the violation surfaced on.
130        tree: String,
131        /// Page where the cycle was detected.
132        page_id: u64,
133    },
134    /// A B+tree's depth exceeded
135    /// [`crate::btree::MAX_BTREE_DEPTH`]. Indicates a runaway tree
136    /// shape — by construction the obj writer cannot produce one;
137    /// surfacing here means the file was mutated outside obj.
138    BTreeDepthExceeded {
139        /// Label identifying which B-tree tripped the depth bound.
140        tree: String,
141        /// The bound that was exceeded.
142        limit: usize,
143    },
144    /// A B+tree node's `level` value was inconsistent with its
145    /// kind — leaves must be level 0, internals strictly positive,
146    /// and the level must decrease by exactly 1 when descending
147    /// from an internal node to its child.
148    BTreeLevelInvariantViolated {
149        /// Label identifying the B-tree.
150        tree: String,
151        /// Page where the violation was detected.
152        page_id: u64,
153    },
154    /// A `CollectionDescriptor` carried a `primary_root` or
155    /// `IndexDescriptor.root_page_id` that does not point at a
156    /// page id in `1..page_count` — the catalog references a page
157    /// the file does not contain.
158    DanglingCatalogPointer {
159        /// Owning collection name.
160        collection: String,
161        /// Optional index name when the dangling pointer is on an
162        /// index descriptor; `None` when it is the
163        /// `primary_root` itself.
164        index: Option<String>,
165        /// The out-of-range page id.
166        page_id: u64,
167    },
168    /// The freelist chain was broken — a `next` link pointed at a
169    /// non-freelist page (or a freelist page failed to decode), at
170    /// a page id past `page_count`, or the chain looped.
171    FreelistChainBroken {
172        /// Page id where the broken link was detected.
173        page_id: u64,
174    },
175}
176
177/// Structured result of an [integrity check](crate::integrity).
178///
179/// The public driver returns this inside `Ok(_)` whenever the walk
180/// completes (whether or not it found any failures). An `Err`
181/// outer result means the walk could not finish — an I/O failure,
182/// not a content-level integrity violation.
183#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
184#[non_exhaustive]
185pub struct IntegrityReport {
186    /// Every detected violation, in walker-encounter order.
187    pub failures: Vec<IntegrityFailure>,
188    /// Pages the walker actually inspected (header + every tree
189    /// node + every freelist link). Provides a sanity-check vs
190    /// `page_count`: a check that ran without errors but visited
191    /// noticeably fewer pages than the file holds suggests a
192    /// reachability problem (the [`IntegrityFailure::OrphanPage`]
193    /// failures cover the concrete cases).
194    pub pages_checked: u64,
195    /// Wall-clock duration the walk took. Useful for the operator
196    /// to know whether the check finished within a maintenance
197    /// window.
198    pub elapsed: Duration,
199}
200
201impl IntegrityReport {
202    /// Assemble a report from its parts.
203    ///
204    /// Provided so callers outside `obj-core` (the `obj` layer's
205    /// `integrity_check` driver) can build a report now that the
206    /// struct is `#[non_exhaustive]` and can no longer be
207    /// struct-literal-constructed across crate boundaries.
208    #[must_use]
209    pub fn new(failures: Vec<IntegrityFailure>, pages_checked: u64, elapsed: Duration) -> Self {
210        Self {
211            failures,
212            pages_checked,
213            elapsed,
214        }
215    }
216
217    /// `true` iff no failure was recorded — the report indicates a
218    /// healthy database.
219    #[must_use]
220    pub fn is_ok(&self) -> bool {
221        self.failures.is_empty()
222    }
223}
224
225/// Snapshot of the on-disk shape needed to walk a single B-tree.
226/// Used by the obj layer's wrapper to thread per-tree context
227/// (label, root) into the obj-core helper.
228#[derive(Debug, Clone)]
229pub struct TreeContext {
230    /// Human-readable label used in failure messages (`"catalog"`,
231    /// `"primary:users"`, `"index:users.by_email"`).
232    pub label: String,
233    /// Root page id for the tree.
234    pub root: PageId,
235}
236
237/// Walk a single B-tree from `ctx.root`, recording any per-node
238/// invariant violations into `failures` and inserting every visited
239/// node page-id into `reachable`. Returns the number of pages this
240/// walk inspected.
241///
242/// The walk traverses the tree level by level via an explicit
243/// `Vec<PageId>` queue and a `HashSet<PageId>` already-seen guard
244/// (so a cycle introduced by an out-of-band mutation cannot
245/// infinitely-loop the walker). Bounded by
246/// [`MAX_RANGE_NODES`] per tree (Rule 2).
247///
248/// # Errors
249///
250/// Returns [`Error::Io`] on cache-miss read failure. Never returns
251/// an `Error::Corruption` directly — corruption surfaces as a
252/// [`IntegrityFailure`] entry in `failures`.
253pub fn walk_btree<F: FileBackend>(
254    pager: &mut Pager<F>,
255    ctx: &TreeContext,
256    reachable: &mut HashSet<PageId>,
257    failures: &mut Vec<IntegrityFailure>,
258) -> Result<u64> {
259    let mut queue: Vec<(PageId, u32)> = Vec::new();
260    queue.push((ctx.root, 0));
261    let mut visited: HashSet<PageId> = HashSet::new();
262    let mut pages_walked: u64 = 0;
263    let mut visit = WalkVisitState {
264        visited: &mut visited,
265        reachable,
266        pages_walked: &mut pages_walked,
267        failures,
268    };
269    while let Some((pid, depth)) = queue.pop() {
270        if depth as usize > MAX_BTREE_DEPTH {
271            visit.failures.push(IntegrityFailure::BTreeDepthExceeded {
272                tree: ctx.label.clone(),
273                limit: MAX_BTREE_DEPTH,
274            });
275            return Ok(*visit.pages_walked);
276        }
277        match record_btree_visit(pid, ctx, &mut visit) {
278            VisitStep::Cycle => continue,
279            VisitStep::BudgetExceeded => return Ok(*visit.pages_walked),
280            VisitStep::Walked => {}
281        }
282        let Some(decoded) = read_and_validate_node(pager, pid, &ctx.label, visit.failures)? else {
283            continue;
284        };
285        if matches!(decoded.kind, NodeKind::Internal) {
286            for raw in &decoded.children {
287                if let Some(child) = PageId::new(*raw) {
288                    queue.push((child, depth + 1));
289                }
290            }
291        }
292    }
293    // NOTE (M11): we deliberately do NOT walk leaf `next_sibling`
294    // pointers here. The `CoW` B+tree (see
295    // `crates/obj-core/src/btree/range.rs` lines 16-31) leaves
296    // left-neighbour `next_sibling` pointers stale by design — they
297    // point at the OLD page-id of a right neighbour that has since
298    // been rewritten and freed. Cascading those updates would
299    // unfold the rewrite up the tree arbitrarily. `RangeIter` does
300    // not trust `next_sibling`; `integrity_check` doesn't either.
301    // The remaining checks (per-page CRC, sort, depth, level, the
302    // bidirectional primary↔index cross-reference, and the cycle
303    // guard above) cover every corruption mode that matters.
304    let final_pages_walked = *visit.pages_walked;
305    Ok(final_pages_walked)
306}
307
308/// Mutable per-walk state shared by the [`walk_btree`] driver loop and
309/// its per-node helper.  Grouped into one struct so the helper takes
310/// a single reference rather than a long argument list (Rule 4).
311struct WalkVisitState<'a> {
312    /// Set of page-ids already popped off the queue during this walk;
313    /// used to detect cycles introduced by out-of-band mutation.
314    visited: &'a mut HashSet<PageId>,
315    /// Caller-owned set of every page-id reachable from the root.
316    reachable: &'a mut HashSet<PageId>,
317    /// Per-tree count of pages successfully popped + counted, used to
318    /// enforce the `MAX_RANGE_NODES` budget.
319    pages_walked: &'a mut u64,
320    /// Caller-owned failure log; every detected invariant violation is
321    /// pushed here rather than surfaced as `Err`.
322    failures: &'a mut Vec<IntegrityFailure>,
323}
324
325/// Outcome of [`record_btree_visit`]: the cycle guard, reachable-set
326/// update, and per-tree node-count budget consolidated into a single
327/// dispatch so the [`walk_btree`] driver loop reads as a flat
328/// state-machine step.
329enum VisitStep {
330    /// The page was already visited in this walk — a cycle.  The
331    /// failure has been recorded; the caller should skip this entry
332    /// and pop the next queued page.
333    Cycle,
334    /// The per-tree node-count budget has just been crossed.  The
335    /// failure has been recorded; the caller must stop the walk and
336    /// return the current `pages_walked` count.
337    BudgetExceeded,
338    /// The page was newly visited and counted.  The caller may now
339    /// validate the node and enqueue its children.
340    Walked,
341}
342
343/// Apply the cycle guard, mark `pid` reachable, and tick the per-tree
344/// node-count budget.  Records the appropriate [`IntegrityFailure`]
345/// when a cycle is detected or the budget is exceeded.
346fn record_btree_visit(pid: PageId, ctx: &TreeContext, visit: &mut WalkVisitState<'_>) -> VisitStep {
347    if !visit.visited.insert(pid) {
348        // Cycle in the descent graph — the same page is
349        // reachable as the child of two distinct ancestors.
350        // A `CoW` B-tree never produces this shape; surfacing
351        // here means the file was mutated outside obj.
352        // Recorded under the `BTreeSiblingChainBroken` variant
353        // (see its docstring — the variant name is retained
354        // for compatibility now that the leaf sibling-chain
355        // check is gone).
356        visit
357            .failures
358            .push(IntegrityFailure::BTreeSiblingChainBroken {
359                tree: ctx.label.clone(),
360                page_id: pid.get(),
361            });
362        return VisitStep::Cycle;
363    }
364    visit.reachable.insert(pid);
365    *visit.pages_walked = visit.pages_walked.saturating_add(1);
366    if *visit.pages_walked > MAX_RANGE_NODES as u64 {
367        visit.failures.push(IntegrityFailure::BTreeDepthExceeded {
368            tree: ctx.label.clone(),
369            limit: MAX_RANGE_NODES,
370        });
371        return VisitStep::BudgetExceeded;
372    }
373    VisitStep::Walked
374}
375
376/// Read a B-tree node page and verify per-page invariants (trailer
377/// CRC + decoder agreement + sort invariant). Returns the decoded
378/// node when validation passes; returns `Ok(None)` and records the
379/// failure when the page is so broken decoding cannot continue
380/// (the caller treats this as "do not descend further" and moves
381/// on to the next queued page).
382fn read_and_validate_node<F: FileBackend>(
383    pager: &mut Pager<F>,
384    pid: PageId,
385    tree_label: &str,
386    failures: &mut Vec<IntegrityFailure>,
387) -> Result<Option<crate::btree::node::DecodedNode>> {
388    let page = match pager.read_page(pid) {
389        Ok(pr) => pr.to_owned_page(),
390        Err(Error::Corruption { page_id }) => {
391            failures.push(IntegrityFailure::ChecksumMismatch { page_id });
392            return Ok(None);
393        }
394        Err(e) => return Err(e),
395    };
396    if !page_trailer_valid(&page) {
397        failures.push(IntegrityFailure::ChecksumMismatch { page_id: pid.get() });
398        return Ok(None);
399    }
400    if let Ok(decoded) = decode_node(page.as_bytes()) {
401        verify_sort_invariant(pid, &decoded, failures);
402        verify_level_invariant(pid, &decoded, tree_label, failures);
403        Ok(Some(decoded))
404    } else {
405        failures.push(IntegrityFailure::ChecksumMismatch { page_id: pid.get() });
406        Ok(None)
407    }
408}
409
410/// Confirm the node's keys are in strictly-ascending order.
411fn verify_sort_invariant(
412    pid: PageId,
413    decoded: &crate::btree::node::DecodedNode,
414    failures: &mut Vec<IntegrityFailure>,
415) {
416    let mut prev: Option<&[u8]> = None;
417    let keys: Box<dyn Iterator<Item = &[u8]> + '_> = match decoded.kind {
418        NodeKind::Leaf => Box::new(decoded.leaves.iter().map(|e| e.key.as_slice())),
419        NodeKind::Internal => Box::new(decoded.internals.iter().map(|e| e.key.as_slice())),
420    };
421    for key in keys {
422        if let Some(p) = prev {
423            if key <= p {
424                failures.push(IntegrityFailure::BTreeSortViolation { page_id: pid.get() });
425                return;
426            }
427        }
428        prev = Some(key);
429    }
430}
431
432/// Confirm leaf-level = 0 and internal-level > 0.
433fn verify_level_invariant(
434    pid: PageId,
435    decoded: &crate::btree::node::DecodedNode,
436    tree_label: &str,
437    failures: &mut Vec<IntegrityFailure>,
438) {
439    let level_ok = match decoded.kind {
440        NodeKind::Leaf => decoded.level == 0,
441        NodeKind::Internal => decoded.level > 0,
442    };
443    if !level_ok {
444        failures.push(IntegrityFailure::BTreeLevelInvariantViolated {
445            tree: tree_label.to_owned(),
446            page_id: pid.get(),
447        });
448    }
449}
450
451/// Walk the freelist chain starting at `head`, inserting every link
452/// page into `reachable` and recording any chain breakage into
453/// `failures`. Returns the number of freelist link pages walked.
454///
455/// Bounded by `page_count` (a freelist chain cannot have more entries
456/// than the file has pages).
457///
458/// # Errors
459///
460/// Returns [`Error::Io`] on cache-miss read failure. Content-level
461/// breakage is recorded into `failures` (the walk does NOT abort
462/// on a broken link; the walker records the failure and stops the
463/// freelist sweep so a subsequent walk does not loop).
464pub fn walk_freelist<F: FileBackend>(
465    pager: &mut Pager<F>,
466    head_raw: u64,
467    page_count: u64,
468    reachable: &mut HashSet<PageId>,
469    failures: &mut Vec<IntegrityFailure>,
470) -> Result<u64> {
471    let Some(mut current_raw) = Some(head_raw) else {
472        return Ok(0);
473    };
474    if current_raw == 0 {
475        return Ok(0);
476    }
477    let mut current = current_raw;
478    let mut steps: u64 = 0;
479    let mut seen: HashSet<PageId> = HashSet::new();
480    while current != 0 {
481        steps = steps.saturating_add(1);
482        if steps > page_count {
483            failures.push(IntegrityFailure::FreelistChainBroken { page_id: current });
484            return Ok(steps);
485        }
486        let Some(pid) = PageId::new(current) else {
487            failures.push(IntegrityFailure::FreelistChainBroken { page_id: current });
488            return Ok(steps);
489        };
490        if pid.get() >= page_count {
491            failures.push(IntegrityFailure::FreelistChainBroken { page_id: pid.get() });
492            return Ok(steps);
493        }
494        if !seen.insert(pid) {
495            failures.push(IntegrityFailure::FreelistChainBroken { page_id: pid.get() });
496            return Ok(steps);
497        }
498        reachable.insert(pid);
499        let page = match pager.read_page(pid) {
500            Ok(pr) => pr.to_owned_page(),
501            Err(Error::Corruption { page_id }) => {
502                failures.push(IntegrityFailure::ChecksumMismatch { page_id });
503                return Ok(steps);
504            }
505            Err(e) => return Err(e),
506        };
507        if !page_trailer_valid(&page) {
508            failures.push(IntegrityFailure::ChecksumMismatch { page_id: pid.get() });
509            return Ok(steps);
510        }
511        let Some(entry) = decode_freelist_page(&page) else {
512            failures.push(IntegrityFailure::FreelistChainBroken { page_id: pid.get() });
513            return Ok(steps);
514        };
515        current_raw = entry.next;
516        current = current_raw;
517    }
518    Ok(steps)
519}
520
521/// Confirm the catalog descriptor's `primary_root` + each `Active`
522/// index's `root_page_id` point at pages within `1..page_count`.
523/// Records `DanglingCatalogPointer` failures for every miss.
524pub fn check_catalog_pointers(
525    name: &str,
526    descriptor: &CollectionDescriptor,
527    page_count: u64,
528    failures: &mut Vec<IntegrityFailure>,
529) {
530    if descriptor.primary_root == 0 || descriptor.primary_root >= page_count {
531        failures.push(IntegrityFailure::DanglingCatalogPointer {
532            collection: name.to_owned(),
533            index: None,
534            page_id: descriptor.primary_root,
535        });
536    }
537    for index in &descriptor.indexes {
538        if index.status != IndexStatus::Active {
539            continue;
540        }
541        if index.root_page_id == 0 || index.root_page_id >= page_count {
542            failures.push(IntegrityFailure::DanglingCatalogPointer {
543                collection: name.to_owned(),
544                index: Some(index.name.clone()),
545                page_id: index.root_page_id,
546            });
547        }
548    }
549}
550
551/// Verify every entry in an `Active` index B-tree (`Standard`,
552/// `Each`, `Composite`: the trailing 8-byte key suffix is the id;
553/// `Unique`: the value is the id) references a primary id that
554/// exists in the primary B-tree.
555///
556/// `primary_ids` is the set of every id present in the collection's
557/// primary tree (pre-computed by the caller's primary walk).
558/// `referenced_ids` (out-param) accumulates the set of primary ids
559/// referenced by AT LEAST ONE index entry — the caller uses it to
560/// run the Primary → Index `MissingIndexEntry` check.
561///
562/// # Errors
563///
564/// Returns [`Error::Io`] on cache-miss read failure. Content-level
565/// breakage is recorded into `failures` and iteration continues.
566pub fn cross_reference_index<F: FileBackend>(
567    pager: &mut Pager<F>,
568    collection: &str,
569    index: &IndexDescriptor,
570    primary_ids: &HashSet<u64>,
571    referenced_ids: &mut HashSet<u64>,
572    failures: &mut Vec<IntegrityFailure>,
573) -> Result<u64> {
574    let Some(root) = PageId::new(index.root_page_id) else {
575        return Ok(0);
576    };
577    let tree = crate::btree::BTree::<F>::open(pager, root)?;
578    let iter = match tree.range(pager, ..) {
579        Ok(it) => it,
580        Err(Error::Corruption { page_id }) => {
581            failures.push(IntegrityFailure::ChecksumMismatch { page_id });
582            return Ok(0);
583        }
584        Err(e) => return Err(e),
585    };
586    let mut scanned: u64 = 0;
587    for step in iter {
588        scanned = scanned
589            .checked_add(1)
590            .ok_or(Error::BTreeInvariantViolated {
591                reason: "index entry count overflow",
592            })?;
593        let (full_key, value) = match step {
594            Ok(kv) => kv,
595            Err(Error::Corruption { page_id }) => {
596                failures.push(IntegrityFailure::ChecksumMismatch { page_id });
597                return Ok(scanned);
598            }
599            Err(e) => return Err(e),
600        };
601        let Some(raw_id) = recover_id_from_entry(&full_key, &value, index.kind) else {
602            failures.push(IntegrityFailure::OrphanIndexEntry {
603                collection: collection.to_owned(),
604                index: index.name.clone(),
605                id: 0,
606            });
607            continue;
608        };
609        if !primary_ids.contains(&raw_id) {
610            failures.push(IntegrityFailure::OrphanIndexEntry {
611                collection: collection.to_owned(),
612                index: index.name.clone(),
613                id: raw_id,
614            });
615            continue;
616        }
617        referenced_ids.insert(raw_id);
618    }
619    Ok(scanned)
620}
621
622/// Recover the `Id` from one index B-tree entry. Mirrors the
623/// `Collection<T>::id_from_index_entry` private helper but is
624/// type-agnostic. For `Unique` indexes the id is the value; for
625/// every other kind it is the trailing 8-byte big-endian suffix of
626/// the key.
627fn recover_id_from_entry(full_key: &[u8], value: &[u8], kind: IndexKind) -> Option<u64> {
628    let bytes: &[u8] = match kind {
629        IndexKind::Unique => value,
630        IndexKind::Standard | IndexKind::Each | IndexKind::Composite => {
631            if full_key.len() < 8 {
632                return None;
633            }
634            &full_key[full_key.len() - 8..]
635        }
636    };
637    let id = Id::from_be_bytes(bytes)?;
638    Some(id.get())
639}
640
641/// Walk every entry in the primary B-tree of `descriptor`,
642/// inserting each id into `primary_ids` and recording any
643/// per-page failures (CRC, sort, depth) along the way. Returns the
644/// number of leaf entries scanned.
645///
646/// # Errors
647///
648/// Returns [`Error::Io`] on cache-miss read failure. Content-level
649/// breakage is recorded into `failures` (via `walk_btree` if the
650/// caller paired the two walks) or surfaced as an early return
651/// from this function on a B-tree decode failure.
652pub fn collect_primary_ids<F: FileBackend>(
653    pager: &mut Pager<F>,
654    descriptor: &CollectionDescriptor,
655    primary_ids: &mut HashSet<u64>,
656) -> Result<u64> {
657    let Some(root) = PageId::new(descriptor.primary_root) else {
658        return Ok(0);
659    };
660    let tree = crate::btree::BTree::<F>::open(pager, root)?;
661    let iter = match tree.range(pager, ..) {
662        Ok(it) => it,
663        Err(Error::Corruption { .. }) => return Ok(0),
664        Err(e) => return Err(e),
665    };
666    let mut count: u64 = 0;
667    for step in iter {
668        count = count.checked_add(1).ok_or(Error::BTreeInvariantViolated {
669            reason: "primary entry count overflow",
670        })?;
671        let (key, _value) = match step {
672            Ok(kv) => kv,
673            Err(Error::Corruption { .. }) => return Ok(count),
674            Err(e) => return Err(e),
675        };
676        if let Some(id) = Id::from_be_bytes(&key) {
677            primary_ids.insert(id.get());
678        }
679    }
680    Ok(count)
681}
682
683/// Build the set of every primary id that has AT LEAST ONE
684/// surviving index entry across the collection's `Active` indexes,
685/// and emit `MissingIndexEntry` failures for any primary id that is
686/// NOT referenced by an index whose kind is `Standard`, `Unique`,
687/// or `Composite`. `Each` indexes are exempted — they legally
688/// reference zero entries when the source sequence is empty.
689pub fn check_primary_to_index(
690    collection: &str,
691    descriptor: &CollectionDescriptor,
692    primary_ids: &HashSet<u64>,
693    per_index_referenced: &[(String, IndexKind, HashSet<u64>)],
694    failures: &mut Vec<IntegrityFailure>,
695) {
696    for (index_name, kind, referenced) in per_index_referenced {
697        if matches!(*kind, IndexKind::Each) {
698            continue;
699        }
700        for id in primary_ids {
701            if !referenced.contains(id) {
702                failures.push(IntegrityFailure::MissingIndexEntry {
703                    collection: collection.to_owned(),
704                    index: index_name.clone(),
705                    id: *id,
706                });
707            }
708        }
709    }
710    debug_assert!(
711        descriptor.indexes.iter().all(|d| {
712            d.status != IndexStatus::Active
713                || per_index_referenced.iter().any(|(n, _, _)| n == &d.name)
714        }),
715        "every Active index must contribute a referenced-ids set",
716    );
717}
718
719/// Compute the fast subset of the integrity walk: validate the
720/// file-header invariants and walk the catalog tree (and only the
721/// catalog tree), without descending into any per-collection
722/// primary or index.
723///
724/// Used by the obj crate's open-time fast check (M11 #91) and by
725/// the obj crate's `Db::integrity_check` for the catalog portion
726/// of the full walk.
727///
728/// # Errors
729///
730/// - [`Error::Io`] on cache-miss read failure.
731///
732/// The returned `IntegrityReport` carries failures encountered
733/// during the catalog walk; the caller decides whether to surface
734/// them as `Err(Error::Corruption { ... })` (the open-time fast
735/// path does) or to merge them into a broader report (the on-
736/// demand full walk does).
737pub fn quick_check<F: FileBackend>(pager: &mut Pager<F>) -> Result<IntegrityReport> {
738    let start = std::time::Instant::now();
739    let mut failures: Vec<IntegrityFailure> = Vec::new();
740    let mut reachable: HashSet<PageId> = HashSet::new();
741    let page_count = pager.page_count();
742    let mut pages_checked: u64 = 1; // page 0 (header) — `Pager::open` already verified it.
743    if let Some(root) = PageId::new(pager.root_catalog()) {
744        if root.get() >= page_count {
745            failures.push(IntegrityFailure::DanglingCatalogPointer {
746                collection: "<catalog>".to_owned(),
747                index: None,
748                page_id: root.get(),
749            });
750        } else {
751            let ctx = TreeContext {
752                label: "catalog".to_owned(),
753                root,
754            };
755            pages_checked = pages_checked.saturating_add(walk_btree(
756                pager,
757                &ctx,
758                &mut reachable,
759                &mut failures,
760            )?);
761            list_catalog_for_pointer_check(pager, page_count, &mut failures)?;
762        }
763    }
764    Ok(IntegrityReport {
765        failures,
766        pages_checked,
767        elapsed: start.elapsed(),
768    })
769}
770
771/// Read every catalog row and check the `primary_root` +
772/// `root_page_id` pointers without walking the per-collection trees.
773/// Used by [`quick_check`].
774fn list_catalog_for_pointer_check<F: FileBackend>(
775    pager: &mut Pager<F>,
776    page_count: u64,
777    failures: &mut Vec<IntegrityFailure>,
778) -> Result<()> {
779    let raw = pager.root_catalog();
780    if PageId::new(raw).is_none() {
781        return Ok(());
782    }
783    let catalog = match Catalog::<F>::open_or_init(pager) {
784        Ok(c) => c,
785        Err(Error::Corruption { .. }) => return Ok(()),
786        Err(e) => return Err(e),
787    };
788    let rows = match catalog.list_collections(pager) {
789        Ok(r) => r,
790        Err(Error::Corruption { .. }) => return Ok(()),
791        Err(e) => return Err(e),
792    };
793    for (name, descriptor) in rows {
794        check_catalog_pointers(&name, &descriptor, page_count, failures);
795    }
796    Ok(())
797}
798
799/// Helper: ensure `path` is the canonical descent stack shape used
800/// by the integrity walk's caller. Re-exported for the obj crate's
801/// per-T extension so it does not have to reimplement the
802/// `HeaplessVec<PageId, MAX_BTREE_DEPTH>` pattern.
803#[must_use]
804pub fn fresh_descent_stack() -> HeaplessVec<PageId, MAX_BTREE_DEPTH> {
805    HeaplessVec::new()
806}