#![forbid(unsafe_code)]
#![allow(clippy::implicit_hasher)]
use std::collections::HashSet;
use std::time::Duration;
use crate::btree::node::{decode_node, NodeKind};
use crate::btree::{MAX_BTREE_DEPTH, MAX_RANGE_NODES};
use crate::catalog::{Catalog, CollectionDescriptor, IndexDescriptor, IndexStatus};
use crate::error::{Error, Result};
use crate::id::Id;
use crate::index::IndexKind;
use crate::pager::checksum::page_trailer_valid;
use crate::pager::freelist::decode as decode_freelist_page;
use crate::pager::page::PageId;
use crate::pager::Pager;
use crate::platform::FileBackend;
use heapless::Vec as HeaplessVec;
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub enum IntegrityFailure {
ChecksumMismatch {
page_id: u64,
},
OrphanPage {
page_id: u64,
},
OrphanIndexEntry {
collection: String,
index: String,
id: u64,
},
MissingIndexEntry {
collection: String,
index: String,
id: u64,
},
BTreeSortViolation {
page_id: u64,
},
BTreeSiblingChainBroken {
tree: String,
page_id: u64,
},
BTreeDepthExceeded {
tree: String,
limit: usize,
},
BTreeLevelInvariantViolated {
tree: String,
page_id: u64,
},
DanglingCatalogPointer {
collection: String,
index: Option<String>,
page_id: u64,
},
FreelistChainBroken {
page_id: u64,
},
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
#[non_exhaustive]
pub struct IntegrityReport {
pub failures: Vec<IntegrityFailure>,
pub pages_checked: u64,
pub elapsed: Duration,
}
impl IntegrityReport {
#[must_use]
pub fn new(failures: Vec<IntegrityFailure>, pages_checked: u64, elapsed: Duration) -> Self {
Self {
failures,
pages_checked,
elapsed,
}
}
#[must_use]
pub fn is_ok(&self) -> bool {
self.failures.is_empty()
}
}
#[derive(Debug, Clone)]
pub struct TreeContext {
pub label: String,
pub root: PageId,
}
pub fn walk_btree<F: FileBackend>(
pager: &mut Pager<F>,
ctx: &TreeContext,
reachable: &mut HashSet<PageId>,
failures: &mut Vec<IntegrityFailure>,
) -> Result<u64> {
let mut queue: Vec<(PageId, u32)> = Vec::new();
queue.push((ctx.root, 0));
let mut visited: HashSet<PageId> = HashSet::new();
let mut pages_walked: u64 = 0;
let mut visit = WalkVisitState {
visited: &mut visited,
reachable,
pages_walked: &mut pages_walked,
failures,
};
while let Some((pid, depth)) = queue.pop() {
if depth as usize > MAX_BTREE_DEPTH {
visit.failures.push(IntegrityFailure::BTreeDepthExceeded {
tree: ctx.label.clone(),
limit: MAX_BTREE_DEPTH,
});
return Ok(*visit.pages_walked);
}
match record_btree_visit(pid, ctx, &mut visit) {
VisitStep::Cycle => continue,
VisitStep::BudgetExceeded => return Ok(*visit.pages_walked),
VisitStep::Walked => {}
}
let Some(decoded) = read_and_validate_node(pager, pid, &ctx.label, visit.failures)? else {
continue;
};
if matches!(decoded.kind, NodeKind::Internal) {
for raw in &decoded.children {
if let Some(child) = PageId::new(*raw) {
queue.push((child, depth + 1));
}
}
}
}
let final_pages_walked = *visit.pages_walked;
Ok(final_pages_walked)
}
struct WalkVisitState<'a> {
visited: &'a mut HashSet<PageId>,
reachable: &'a mut HashSet<PageId>,
pages_walked: &'a mut u64,
failures: &'a mut Vec<IntegrityFailure>,
}
enum VisitStep {
Cycle,
BudgetExceeded,
Walked,
}
fn record_btree_visit(pid: PageId, ctx: &TreeContext, visit: &mut WalkVisitState<'_>) -> VisitStep {
if !visit.visited.insert(pid) {
visit
.failures
.push(IntegrityFailure::BTreeSiblingChainBroken {
tree: ctx.label.clone(),
page_id: pid.get(),
});
return VisitStep::Cycle;
}
visit.reachable.insert(pid);
*visit.pages_walked = visit.pages_walked.saturating_add(1);
if *visit.pages_walked > MAX_RANGE_NODES as u64 {
visit.failures.push(IntegrityFailure::BTreeDepthExceeded {
tree: ctx.label.clone(),
limit: MAX_RANGE_NODES,
});
return VisitStep::BudgetExceeded;
}
VisitStep::Walked
}
fn read_and_validate_node<F: FileBackend>(
pager: &mut Pager<F>,
pid: PageId,
tree_label: &str,
failures: &mut Vec<IntegrityFailure>,
) -> Result<Option<crate::btree::node::DecodedNode>> {
let page = match pager.read_page(pid) {
Ok(pr) => pr.to_owned_page(),
Err(Error::Corruption { page_id }) => {
failures.push(IntegrityFailure::ChecksumMismatch { page_id });
return Ok(None);
}
Err(e) => return Err(e),
};
if !page_trailer_valid(&page) {
failures.push(IntegrityFailure::ChecksumMismatch { page_id: pid.get() });
return Ok(None);
}
if let Ok(decoded) = decode_node(page.as_bytes()) {
verify_sort_invariant(pid, &decoded, failures);
verify_level_invariant(pid, &decoded, tree_label, failures);
Ok(Some(decoded))
} else {
failures.push(IntegrityFailure::ChecksumMismatch { page_id: pid.get() });
Ok(None)
}
}
fn verify_sort_invariant(
pid: PageId,
decoded: &crate::btree::node::DecodedNode,
failures: &mut Vec<IntegrityFailure>,
) {
let mut prev: Option<&[u8]> = None;
let keys: Box<dyn Iterator<Item = &[u8]> + '_> = match decoded.kind {
NodeKind::Leaf => Box::new(decoded.leaves.iter().map(|e| e.key.as_slice())),
NodeKind::Internal => Box::new(decoded.internals.iter().map(|e| e.key.as_slice())),
};
for key in keys {
if let Some(p) = prev {
if key <= p {
failures.push(IntegrityFailure::BTreeSortViolation { page_id: pid.get() });
return;
}
}
prev = Some(key);
}
}
fn verify_level_invariant(
pid: PageId,
decoded: &crate::btree::node::DecodedNode,
tree_label: &str,
failures: &mut Vec<IntegrityFailure>,
) {
let level_ok = match decoded.kind {
NodeKind::Leaf => decoded.level == 0,
NodeKind::Internal => decoded.level > 0,
};
if !level_ok {
failures.push(IntegrityFailure::BTreeLevelInvariantViolated {
tree: tree_label.to_owned(),
page_id: pid.get(),
});
}
}
pub fn walk_freelist<F: FileBackend>(
pager: &mut Pager<F>,
head_raw: u64,
page_count: u64,
reachable: &mut HashSet<PageId>,
failures: &mut Vec<IntegrityFailure>,
) -> Result<u64> {
let Some(mut current_raw) = Some(head_raw) else {
return Ok(0);
};
if current_raw == 0 {
return Ok(0);
}
let mut current = current_raw;
let mut steps: u64 = 0;
let mut seen: HashSet<PageId> = HashSet::new();
while current != 0 {
steps = steps.saturating_add(1);
if steps > page_count {
failures.push(IntegrityFailure::FreelistChainBroken { page_id: current });
return Ok(steps);
}
let Some(pid) = PageId::new(current) else {
failures.push(IntegrityFailure::FreelistChainBroken { page_id: current });
return Ok(steps);
};
if pid.get() >= page_count {
failures.push(IntegrityFailure::FreelistChainBroken { page_id: pid.get() });
return Ok(steps);
}
if !seen.insert(pid) {
failures.push(IntegrityFailure::FreelistChainBroken { page_id: pid.get() });
return Ok(steps);
}
reachable.insert(pid);
let page = match pager.read_page(pid) {
Ok(pr) => pr.to_owned_page(),
Err(Error::Corruption { page_id }) => {
failures.push(IntegrityFailure::ChecksumMismatch { page_id });
return Ok(steps);
}
Err(e) => return Err(e),
};
if !page_trailer_valid(&page) {
failures.push(IntegrityFailure::ChecksumMismatch { page_id: pid.get() });
return Ok(steps);
}
let Some(entry) = decode_freelist_page(&page) else {
failures.push(IntegrityFailure::FreelistChainBroken { page_id: pid.get() });
return Ok(steps);
};
current_raw = entry.next;
current = current_raw;
}
Ok(steps)
}
pub fn check_catalog_pointers(
name: &str,
descriptor: &CollectionDescriptor,
page_count: u64,
failures: &mut Vec<IntegrityFailure>,
) {
if descriptor.primary_root == 0 || descriptor.primary_root >= page_count {
failures.push(IntegrityFailure::DanglingCatalogPointer {
collection: name.to_owned(),
index: None,
page_id: descriptor.primary_root,
});
}
for index in &descriptor.indexes {
if index.status != IndexStatus::Active {
continue;
}
if index.root_page_id == 0 || index.root_page_id >= page_count {
failures.push(IntegrityFailure::DanglingCatalogPointer {
collection: name.to_owned(),
index: Some(index.name.clone()),
page_id: index.root_page_id,
});
}
}
}
pub fn cross_reference_index<F: FileBackend>(
pager: &mut Pager<F>,
collection: &str,
index: &IndexDescriptor,
primary_ids: &HashSet<u64>,
referenced_ids: &mut HashSet<u64>,
failures: &mut Vec<IntegrityFailure>,
) -> Result<u64> {
let Some(root) = PageId::new(index.root_page_id) else {
return Ok(0);
};
let tree = crate::btree::BTree::<F>::open(pager, root)?;
let iter = match tree.range(pager, ..) {
Ok(it) => it,
Err(Error::Corruption { page_id }) => {
failures.push(IntegrityFailure::ChecksumMismatch { page_id });
return Ok(0);
}
Err(e) => return Err(e),
};
let mut scanned: u64 = 0;
for step in iter {
scanned = scanned
.checked_add(1)
.ok_or(Error::BTreeInvariantViolated {
reason: "index entry count overflow",
})?;
let (full_key, value) = match step {
Ok(kv) => kv,
Err(Error::Corruption { page_id }) => {
failures.push(IntegrityFailure::ChecksumMismatch { page_id });
return Ok(scanned);
}
Err(e) => return Err(e),
};
let Some(raw_id) = recover_id_from_entry(&full_key, &value, index.kind) else {
failures.push(IntegrityFailure::OrphanIndexEntry {
collection: collection.to_owned(),
index: index.name.clone(),
id: 0,
});
continue;
};
if !primary_ids.contains(&raw_id) {
failures.push(IntegrityFailure::OrphanIndexEntry {
collection: collection.to_owned(),
index: index.name.clone(),
id: raw_id,
});
continue;
}
referenced_ids.insert(raw_id);
}
Ok(scanned)
}
fn recover_id_from_entry(full_key: &[u8], value: &[u8], kind: IndexKind) -> Option<u64> {
let bytes: &[u8] = match kind {
IndexKind::Unique => value,
IndexKind::Standard | IndexKind::Each | IndexKind::Composite => {
if full_key.len() < 8 {
return None;
}
&full_key[full_key.len() - 8..]
}
};
let id = Id::from_be_bytes(bytes)?;
Some(id.get())
}
pub fn collect_primary_ids<F: FileBackend>(
pager: &mut Pager<F>,
descriptor: &CollectionDescriptor,
primary_ids: &mut HashSet<u64>,
) -> Result<u64> {
let Some(root) = PageId::new(descriptor.primary_root) else {
return Ok(0);
};
let tree = crate::btree::BTree::<F>::open(pager, root)?;
let iter = match tree.range(pager, ..) {
Ok(it) => it,
Err(Error::Corruption { .. }) => return Ok(0),
Err(e) => return Err(e),
};
let mut count: u64 = 0;
for step in iter {
count = count.checked_add(1).ok_or(Error::BTreeInvariantViolated {
reason: "primary entry count overflow",
})?;
let (key, _value) = match step {
Ok(kv) => kv,
Err(Error::Corruption { .. }) => return Ok(count),
Err(e) => return Err(e),
};
if let Some(id) = Id::from_be_bytes(&key) {
primary_ids.insert(id.get());
}
}
Ok(count)
}
pub fn check_primary_to_index(
collection: &str,
descriptor: &CollectionDescriptor,
primary_ids: &HashSet<u64>,
per_index_referenced: &[(String, IndexKind, HashSet<u64>)],
failures: &mut Vec<IntegrityFailure>,
) {
for (index_name, kind, referenced) in per_index_referenced {
if matches!(*kind, IndexKind::Each) {
continue;
}
for id in primary_ids {
if !referenced.contains(id) {
failures.push(IntegrityFailure::MissingIndexEntry {
collection: collection.to_owned(),
index: index_name.clone(),
id: *id,
});
}
}
}
debug_assert!(
descriptor.indexes.iter().all(|d| {
d.status != IndexStatus::Active
|| per_index_referenced.iter().any(|(n, _, _)| n == &d.name)
}),
"every Active index must contribute a referenced-ids set",
);
}
pub fn quick_check<F: FileBackend>(pager: &mut Pager<F>) -> Result<IntegrityReport> {
let start = std::time::Instant::now();
let mut failures: Vec<IntegrityFailure> = Vec::new();
let mut reachable: HashSet<PageId> = HashSet::new();
let page_count = pager.page_count();
let mut pages_checked: u64 = 1; if let Some(root) = PageId::new(pager.root_catalog()) {
if root.get() >= page_count {
failures.push(IntegrityFailure::DanglingCatalogPointer {
collection: "<catalog>".to_owned(),
index: None,
page_id: root.get(),
});
} else {
let ctx = TreeContext {
label: "catalog".to_owned(),
root,
};
pages_checked = pages_checked.saturating_add(walk_btree(
pager,
&ctx,
&mut reachable,
&mut failures,
)?);
list_catalog_for_pointer_check(pager, page_count, &mut failures)?;
}
}
Ok(IntegrityReport {
failures,
pages_checked,
elapsed: start.elapsed(),
})
}
fn list_catalog_for_pointer_check<F: FileBackend>(
pager: &mut Pager<F>,
page_count: u64,
failures: &mut Vec<IntegrityFailure>,
) -> Result<()> {
let raw = pager.root_catalog();
if PageId::new(raw).is_none() {
return Ok(());
}
let catalog = match Catalog::<F>::open_or_init(pager) {
Ok(c) => c,
Err(Error::Corruption { .. }) => return Ok(()),
Err(e) => return Err(e),
};
let rows = match catalog.list_collections(pager) {
Ok(r) => r,
Err(Error::Corruption { .. }) => return Ok(()),
Err(e) => return Err(e),
};
for (name, descriptor) in rows {
check_catalog_pointers(&name, &descriptor, page_count, failures);
}
Ok(())
}
#[must_use]
pub fn fresh_descent_stack() -> HeaplessVec<PageId, MAX_BTREE_DEPTH> {
HeaplessVec::new()
}