use std::collections::HashSet;
use serde::Serialize;
use super::*;
use mnem_core::objects::View;
use mnem_core::prolly::tree::{TreeChunk, load_tree_chunk};
use mnem_core::store::blockstore::recompute_cid;
const DEFAULT_LIMIT: usize = 50_000;
#[derive(clap::Args, Debug)]
#[command(after_long_help = "\
Walks all ops reachable from HEAD and verifies every referenced block is
present and CID-correct (content hashes to its CID). For each commit,
ALL blocks of the nodes, edges, schema, embeddings (G16), and sparse (G17)
Prolly trees are fetched and checked - not just the root. Missing interior
or leaf blocks are reported as errors so corruption inside a Prolly tree
is always detected.
Examples:
mnem fsck # check all ops from HEAD
mnem fsck --limit 100 # only walk the last 100 ops
mnem fsck --json # machine-readable output
")]
pub(crate) struct Args {
#[arg(long)]
pub limit: Option<usize>,
#[arg(long)]
pub json: bool,
}
#[derive(Debug, Serialize)]
struct FsckError {
op: String,
kind: String,
#[serde(skip_serializing_if = "Option::is_none")]
cid: Option<String>,
}
#[derive(Serialize)]
struct FsckReport {
ops_checked: usize,
blocks_verified: usize,
errors: Vec<FsckError>,
ok: bool,
}
fn check_block(
bs: &dyn mnem_core::store::Blockstore,
cid: &mnem_core::id::Cid,
) -> Result<(), String> {
let bytes = bs
.get(cid)
.map_err(|e| format!("store I/O error fetching {cid}: {e}"))?;
let bytes = match bytes {
Some(b) => b,
None => return Err("missing".to_string()),
};
if let Some(computed) = recompute_cid(cid, &bytes) {
if computed != *cid {
return Err(format!(
"CID mismatch: claimed {cid} but content hashes to {computed}"
));
}
}
Ok(())
}
fn walk_prolly_tree(
bs: &dyn mnem_core::store::Blockstore,
root: &mnem_core::id::Cid,
tree_name: &str,
op_cid_str: &str,
errors: &mut Vec<FsckError>,
) -> usize {
let mut stack: Vec<mnem_core::id::Cid> = vec![root.clone()];
let mut blocks_ok: usize = 0;
while let Some(cid) = stack.pop() {
let chunk = match load_tree_chunk(bs, &cid) {
Ok(c) => {
blocks_ok += 1;
c
}
Err(_) => {
errors.push(FsckError {
op: op_cid_str.to_owned(),
kind: format!("missing interior block {cid} in {tree_name} tree"),
cid: Some(cid.to_string()),
});
continue;
}
};
if let TreeChunk::Internal(internal) = chunk {
stack.extend(internal.children);
}
}
blocks_ok
}
pub(crate) fn run(override_path: Option<&Path>, args: Args) -> Result<()> {
let (_data_dir, repo, bs, _ohs) = repo::open_all(override_path)?;
let bs = bs.as_ref();
let limit = args.limit.unwrap_or(DEFAULT_LIMIT);
let head_op_cid = repo.op_id().clone();
if !args.json {
let full = head_op_cid.to_string();
let short = short_cid(&full);
println!("fsck: checking from HEAD (op {short})");
}
let mut errors: Vec<FsckError> = Vec::new();
let mut ops_checked: usize = 0;
let mut blocks_verified: usize = 0;
let mut visited: HashSet<mnem_core::id::Cid> = HashSet::new();
let mut cur = head_op_cid.clone();
loop {
if ops_checked >= limit {
break;
}
if !visited.insert(cur.clone()) {
break;
}
let op_cid_str = cur.to_string();
let op_bytes = match check_block(bs, &cur) {
Ok(()) => {
blocks_verified += 1;
bs.get(&cur)
.map_err(|e| anyhow!("store I/O: {e}"))?
.expect("just verified present")
}
Err(reason) => {
errors.push(FsckError {
op: op_cid_str.clone(),
kind: format!("op block {reason}"),
cid: Some(op_cid_str.clone()),
});
break;
}
};
let op: Operation = match from_canonical_bytes(&op_bytes) {
Ok(o) => o,
Err(e) => {
errors.push(FsckError {
op: op_cid_str.clone(),
kind: format!("op block decode failed: {e}"),
cid: Some(op_cid_str.clone()),
});
break;
}
};
ops_checked += 1;
let view_cid = &op.view;
let view_opt: Option<View> = match check_block(bs, view_cid) {
Ok(()) => {
blocks_verified += 1;
let view_bytes = bs
.get(view_cid)
.map_err(|e| anyhow!("store I/O: {e}"))?
.expect("just verified present");
match from_canonical_bytes::<View>(&view_bytes) {
Ok(v) => Some(v),
Err(e) => {
errors.push(FsckError {
op: op_cid_str.clone(),
kind: format!("view block decode failed: {e}"),
cid: Some(view_cid.to_string()),
});
None
}
}
}
Err(reason) => {
errors.push(FsckError {
op: op_cid_str.clone(),
kind: format!("view block {reason}"),
cid: Some(view_cid.to_string()),
});
None
}
};
if let Some(view) = view_opt {
for head_cid in &view.heads {
let commit_opt: Option<Commit> = match check_block(bs, head_cid) {
Ok(()) => {
blocks_verified += 1;
let commit_bytes = bs
.get(head_cid)
.map_err(|e| anyhow!("store I/O: {e}"))?
.expect("just verified present");
match from_canonical_bytes::<Commit>(&commit_bytes) {
Ok(c) => Some(c),
Err(e) => {
errors.push(FsckError {
op: op_cid_str.clone(),
kind: format!("commit block decode failed: {e}"),
cid: Some(head_cid.to_string()),
});
None
}
}
}
Err(reason) => {
errors.push(FsckError {
op: op_cid_str.clone(),
kind: format!("commit block {reason}"),
cid: Some(head_cid.to_string()),
});
None
}
};
if let Some(commit) = commit_opt {
for (tree_name, tree_cid) in [
("nodes", &commit.nodes),
("edges", &commit.edges),
("schema", &commit.schema),
] {
let tree_blocks =
walk_prolly_tree(bs, tree_cid, tree_name, &op_cid_str, &mut errors);
blocks_verified += tree_blocks;
}
for (tree_name, maybe_cid) in [
("embeddings", commit.embeddings.as_ref()),
("sparse", commit.sparse.as_ref()),
] {
if let Some(cid) = maybe_cid {
let tree_blocks =
walk_prolly_tree(bs, cid, tree_name, &op_cid_str, &mut errors);
blocks_verified += tree_blocks;
}
}
let optional_roots: &[(&str, Option<&mnem_core::id::Cid>)] = &[
("indexes root", commit.indexes.as_ref()),
("delta root", commit.delta.as_ref()),
];
for (label, maybe_cid) in optional_roots {
if let Some(opt_cid) = maybe_cid {
match check_block(bs, opt_cid) {
Ok(()) => blocks_verified += 1,
Err(reason) => {
errors.push(FsckError {
op: op_cid_str.clone(),
kind: format!("{label} {reason}"),
cid: Some(opt_cid.to_string()),
});
}
}
}
}
}
}
}
match op.parents.first() {
Some(parent_cid) => cur = parent_cid.clone(),
None => break, }
}
let ok = errors.is_empty();
if args.json {
let report = FsckReport {
ops_checked,
blocks_verified,
errors,
ok,
};
let line = serde_json::to_string(&report).context("serialising fsck report")?;
println!("{line}");
} else {
println!(" ops checked: {ops_checked}");
println!(" blocks verified: {blocks_verified}");
if errors.is_empty() {
println!(" errors: 0");
println!("ok");
} else {
for e in &errors {
let cid_suffix = e
.cid
.as_deref()
.map(|c| format!(" ({c})"))
.unwrap_or_default();
println!(" ERROR: op {} - {}{}", e.op, e.kind, cid_suffix);
}
println!(" errors: {}", errors.len());
println!("FAILED");
}
}
if !ok {
std::process::exit(1);
}
Ok(())
}
fn short_cid(full: &str) -> String {
if full.len() <= 10 {
full.to_string()
} else {
full.chars().skip(2).take(8).collect()
}
}