use crate::path::{Path, PathBuf};
use crate::{checksum::Checksum, coding::Decode, io, table::TableId, table::block::Header};
#[cfg(not(feature = "std"))]
use alloc::{boxed::Box, string::String, vec::Vec};
#[cfg(feature = "std")]
#[derive(Debug)]
#[non_exhaustive]
pub enum IntegrityError {
SstFileCorrupted {
table_id: TableId,
path: PathBuf,
expected: Checksum,
got: Checksum,
},
BlobFileCorrupted {
blob_file_id: u64,
path: PathBuf,
expected: Checksum,
got: Checksum,
},
IoError {
path: PathBuf,
error: io::Error,
},
}
#[cfg(feature = "std")]
impl core::fmt::Display for IntegrityError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::SstFileCorrupted {
table_id,
path,
expected,
got,
} => write!(
f,
"SST table {table_id} corrupted at {}: expected {expected}, got {got}",
path.display()
),
Self::BlobFileCorrupted {
blob_file_id,
path,
expected,
got,
} => write!(
f,
"blob file {blob_file_id} corrupted at {}: expected {expected}, got {got}",
path.display()
),
Self::IoError { path, error } => {
write!(f, "I/O error reading {}: {}", path.display(), error)
}
}
}
}
#[cfg(feature = "std")]
impl core::error::Error for IntegrityError {
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
match self {
Self::IoError { error, .. } => Some(error),
_ => None,
}
}
}
#[cfg(feature = "std")]
#[derive(Debug)]
#[non_exhaustive]
pub struct IntegrityReport {
pub sst_files_checked: usize,
pub blob_files_checked: usize,
pub errors: Vec<IntegrityError>,
}
#[cfg(feature = "std")]
impl IntegrityReport {
#[must_use]
pub fn is_ok(&self) -> bool {
self.errors.is_empty()
}
#[must_use]
pub fn files_checked(&self) -> usize {
self.sst_files_checked + self.blob_files_checked
}
}
#[cfg(feature = "std")]
fn stream_checksum(path: &std::path::Path) -> std::io::Result<Checksum> {
use std::io::Read;
let mut reader = std::fs::File::open(path)?;
let mut hasher = xxhash_rust::xxh3::Xxh3Default::new();
let mut buf = vec![0u8; 64 * 1024];
loop {
let n = match reader.read(&mut buf) {
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
if n == 0 {
break;
}
if let Some(chunk) = buf.get(..n) {
hasher.update(chunk);
}
}
Ok(Checksum::from_raw(hasher.digest128()))
}
#[cfg(feature = "std")]
#[must_use]
pub fn verify_integrity(tree: &impl crate::AbstractTree) -> IntegrityReport {
let version = tree.current_version();
let mut report = IntegrityReport {
sst_files_checked: 0,
blob_files_checked: 0,
errors: Vec::new(),
};
for table in version.iter_tables() {
let path = &*table.path;
let expected = table.checksum();
match stream_checksum(path) {
Ok(got) if got != expected => {
report.errors.push(IntegrityError::SstFileCorrupted {
table_id: table.id(),
path: (*table.path).clone(),
expected,
got,
});
}
Ok(_) => {}
Err(e) => {
report.errors.push(IntegrityError::IoError {
path: (*table.path).clone(),
error: e.into(),
});
}
}
report.sst_files_checked += 1;
}
for blob_file in version.blob_files.iter() {
let path = blob_file.path();
let expected = blob_file.checksum();
match stream_checksum(path) {
Ok(got) if got != expected => {
report.errors.push(IntegrityError::BlobFileCorrupted {
blob_file_id: blob_file.id(),
path: path.to_path_buf(),
expected,
got,
});
}
Ok(_) => {}
Err(e) => {
report.errors.push(IntegrityError::IoError {
path: path.to_path_buf(),
error: e.into(),
});
}
}
report.blob_files_checked += 1;
}
report
}
#[derive(Debug)]
#[non_exhaustive]
pub enum BlockVerifyError {
SstFileUnreadable {
table_id: TableId,
path: PathBuf,
error: io::Error,
},
HeaderCorrupted {
table_id: TableId,
path: PathBuf,
offset: u64,
reason: String,
},
DataCorrupted {
table_id: TableId,
path: PathBuf,
offset: u64,
data_length: u32,
expected: Checksum,
got: Checksum,
},
DataReadError {
table_id: TableId,
path: PathBuf,
offset: u64,
data_length: u32,
error: io::Error,
},
TocCorrupted {
table_id: TableId,
path: PathBuf,
section_name: Vec<u8>,
section_offset: u64,
reason: String,
},
}
impl core::fmt::Display for BlockVerifyError {
fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
match self {
Self::SstFileUnreadable {
table_id,
path,
error,
} => write!(
f,
"SST table {table_id} at {} could not be opened/parsed: {error}",
path.display(),
),
Self::HeaderCorrupted {
table_id,
path,
offset,
reason,
} => write!(
f,
"SST table {table_id} at {}: block header at offset {offset} is corrupt ({reason})",
path.display(),
),
Self::DataCorrupted {
table_id,
path,
offset,
data_length,
expected,
got,
} => write!(
f,
"SST table {table_id} at {}: block at offset {offset} ({data_length} bytes) data \
checksum mismatch, expected {expected}, got {got}",
path.display(),
),
Self::DataReadError {
table_id,
path,
offset,
data_length,
error,
} => write!(
f,
"SST table {table_id} at {}: failed to read {data_length}-byte data segment for \
block at offset {offset}: {error}",
path.display(),
),
Self::TocCorrupted {
table_id,
path,
section_name,
section_offset,
reason,
} => write!(
f,
"SST table {table_id} at {}: TOC section {:?} at offset {section_offset} is \
unreachable ({reason})",
path.display(),
String::from_utf8_lossy(section_name),
),
}
}
}
impl core::error::Error for BlockVerifyError {
fn source(&self) -> Option<&(dyn core::error::Error + 'static)> {
match self {
Self::SstFileUnreadable { error, .. } | Self::DataReadError { error, .. } => {
Some(error)
}
_ => None,
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum BlockVerifyWarning {
UnrecognizedEcc {
table_id: TableId,
path: PathBuf,
},
}
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct BlockVerifyReport {
pub sst_files_scanned: usize,
pub blocks_scanned: usize,
pub errors: Vec<BlockVerifyError>,
pub warnings: Vec<BlockVerifyWarning>,
}
impl BlockVerifyReport {
#[must_use]
pub fn is_ok(&self) -> bool {
self.errors.is_empty()
}
#[must_use]
pub fn has_warnings(&self) -> bool {
!self.warnings.is_empty()
}
}
#[derive(Clone, Debug)]
pub struct VerifyOptions {
pub parallelism: usize,
pub throttle: Option<core::time::Duration>,
}
impl Default for VerifyOptions {
fn default() -> Self {
Self {
parallelism: 1,
throttle: None,
}
}
}
impl VerifyOptions {
#[must_use]
pub const fn parallelism(mut self, workers: usize) -> Self {
self.parallelism = workers;
self
}
#[must_use]
pub const fn throttle(mut self, delay: core::time::Duration) -> Self {
self.throttle = Some(delay);
self
}
}
fn merge_report(dst: &mut BlockVerifyReport, src: BlockVerifyReport) {
dst.sst_files_scanned += src.sst_files_scanned;
dst.blocks_scanned += src.blocks_scanned;
dst.errors.extend(src.errors);
dst.warnings.extend(src.warnings);
}
fn scan_one_table(table: &crate::table::Table) -> BlockVerifyReport {
let mut report = BlockVerifyReport {
sst_files_scanned: 1,
..BlockVerifyReport::default()
};
let path: &Path = &table.path;
let table_id = table.id();
let ecc_unrecognized = table.metadata.ecc_unrecognized;
if ecc_unrecognized {
log::warn!(
"table {table_id} at {}: unrecognized ECC scheme — skipping the \
ECC-dependent block sections; recompact to re-stamp with a \
supported scheme",
path.display(),
);
report.warnings.push(BlockVerifyWarning::UnrecognizedEcc {
table_id,
path: path.to_path_buf(),
});
}
let max_enc_overhead = table.encryption.as_ref().map_or(0u32, |e| e.max_overhead());
match scan_sst_blocks(
&*table.fs,
path,
table_id,
max_enc_overhead,
table.metadata.ecc_params,
ecc_unrecognized,
) {
Ok(per_file) => {
report.blocks_scanned += per_file.blocks_scanned;
report.errors.extend(per_file.errors);
}
Err(error) => {
report.errors.push(BlockVerifyError::SstFileUnreadable {
table_id,
path: path.to_path_buf(),
error,
});
}
}
report
}
#[must_use]
pub fn verify_block_checksums(tree: &impl crate::AbstractTree) -> BlockVerifyReport {
verify_block_checksums_with(tree, &VerifyOptions::default())
}
#[must_use]
pub fn verify_block_checksums_with(
tree: &impl crate::AbstractTree,
options: &VerifyOptions,
) -> BlockVerifyReport {
let version = tree.current_version();
let tables: Vec<crate::table::Table> = version.iter_tables().cloned().collect();
#[cfg(not(feature = "std"))]
let _ = options;
#[cfg(feature = "std")]
{
let workers = options.parallelism.max(1).min(tables.len().max(1));
if workers > 1 {
let cursor = core::sync::atomic::AtomicUsize::new(0);
let partials = std::thread::scope(|scope| {
let handles: Vec<_> = (0..workers)
.map(|_| {
scope.spawn(|| {
let mut local = BlockVerifyReport::default();
let mut idx =
cursor.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
while let Some(table) = tables.get(idx) {
merge_report(&mut local, scan_one_table(table));
idx = cursor.fetch_add(1, core::sync::atomic::Ordering::Relaxed);
if tables.get(idx).is_some()
&& let Some(delay) = options.throttle
{
std::thread::sleep(delay);
}
}
local
})
})
.collect();
handles
.into_iter()
.map(|handle| match handle.join() {
Ok(local) => local,
Err(payload) => std::panic::resume_unwind(payload),
})
.collect::<Vec<_>>()
});
let mut report = BlockVerifyReport::default();
for partial in partials {
merge_report(&mut report, partial);
}
return report;
}
}
let mut report = BlockVerifyReport::default();
for (idx, table) in tables.iter().enumerate() {
merge_report(&mut report, scan_one_table(table));
#[cfg(feature = "std")]
if idx + 1 < tables.len()
&& let Some(delay) = options.throttle
{
std::thread::sleep(delay);
}
#[cfg(not(feature = "std"))]
let _ = idx;
}
report
}
pub fn verify_kv_checksums(tree: &impl crate::AbstractTree) -> crate::Result<()> {
let version = tree.current_version();
for table in version.iter_tables() {
table.verify_kv_checksums()?;
}
Ok(())
}
#[cfg(feature = "std")]
#[must_use]
pub fn verify_sst_file(path: &std::path::Path) -> BlockVerifyReport {
use crate::fs::StdFs;
let fs = StdFs;
let mut report = BlockVerifyReport {
sst_files_scanned: 1,
..BlockVerifyReport::default()
};
let mut ecc_unrecognized = false;
let ecc = match read_ecc_params_out_of_band(&fs, path) {
Ok(Some(ScrubEcc::Off)) => None,
Ok(Some(ScrubEcc::Scheme(params))) => Some(params),
Ok(Some(ScrubEcc::Unrecognized)) => {
log::warn!(
"{}: unrecognized ECC scheme — skipping the ECC-dependent block \
sections; recompact to re-stamp with a supported scheme",
path.display(),
);
report.warnings.push(BlockVerifyWarning::UnrecognizedEcc {
table_id: 0,
path: path.to_path_buf(),
});
ecc_unrecognized = true;
None
}
Ok(None) => {
report.errors.push(BlockVerifyError::SstFileUnreadable {
table_id: 0,
path: path.to_path_buf(),
error: io::Error::new(
io::ErrorKind::InvalidData,
"could not decode the SST meta block to determine the ECC scheme \
(corrupt meta, or an encrypted SST with no key out-of-band); \
skipping the block walk — use verify_block_checksums on a live \
tree for ECC-aware verification",
),
});
return report;
}
Err(error) => {
report.errors.push(BlockVerifyError::SstFileUnreadable {
table_id: 0,
path: path.to_path_buf(),
error: error.into(),
});
return report;
}
};
match scan_sst_blocks(&fs, path, 0, 0, ecc, ecc_unrecognized) {
Ok(per_file) => {
report.blocks_scanned = per_file.blocks_scanned;
report.errors = per_file.errors;
}
Err(error) => {
report.errors.push(BlockVerifyError::SstFileUnreadable {
table_id: 0,
path: path.to_path_buf(),
error,
});
}
}
report
}
#[cfg(feature = "std")]
enum ScrubEcc {
Off,
Scheme(crate::table::block::EccParams),
Unrecognized,
}
#[cfg(feature = "std")]
fn read_ecc_params_out_of_band(
fs: &dyn crate::fs::Fs,
path: &std::path::Path,
) -> std::io::Result<Option<ScrubEcc>> {
let mut probe = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
let sfa_reader = crate::sfa::Reader::from_reader(&mut probe)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let toc = sfa_reader.toc();
for name in [b"meta".as_slice(), b"meta_mid".as_slice()] {
let Some((pos, len)) = toc.section(name).map(|e| (e.pos(), e.len())) else {
continue;
};
let Ok(size) = u32::try_from(len) else {
continue;
};
let handle = crate::table::BlockHandle::new(crate::table::BlockOffset(pos), size);
if let Ok(meta) =
crate::table::meta::ParsedMeta::load_with_handle(probe.as_ref(), &handle, None, None)
{
let state = if meta.ecc_unrecognized {
ScrubEcc::Unrecognized
} else if let Some(params) = meta.ecc_params {
ScrubEcc::Scheme(params)
} else {
ScrubEcc::Off
};
return Ok(Some(state));
}
}
Ok(None)
}
struct PerFileScan {
blocks_scanned: usize,
errors: Vec<BlockVerifyError>,
}
fn scan_sst_blocks(
fs: &dyn crate::fs::Fs,
path: &Path,
table_id: TableId,
max_enc_overhead: u32,
ecc: Option<crate::table::block::EccParams>,
ecc_unrecognized: bool,
) -> io::Result<PerFileScan> {
use io::BufReader;
#[cfg(not(feature = "std"))]
use io::{Seek, SeekFrom};
#[cfg(feature = "std")]
use std::io::{Seek, SeekFrom};
let mut file = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
let sfa_reader = crate::sfa::Reader::from_reader(&mut file)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, alloc::format!("{e:?}")))?;
let toc = sfa_reader.toc();
let mut reader = BufReader::with_capacity(64 * 1024, file);
let mut blocks_scanned: usize = 0;
let mut errors: Vec<BlockVerifyError> = Vec::new();
let mut data_buf: Vec<u8> = Vec::new();
for entry in toc.iter() {
if RAW_FORMAT_SECTIONS.contains(&entry.name()) {
continue;
}
let start = entry.pos();
let Some(end) = start.checked_add(entry.len()) else {
errors.push(BlockVerifyError::TocCorrupted {
table_id,
path: path.to_path_buf(),
section_name: entry.name().to_vec(),
section_offset: start,
reason: format!(
"section length {} overflows u64 when added to start offset {start}",
entry.len(),
),
});
continue;
};
if let Err(e) = reader.seek(SeekFrom::Start(start)) {
errors.push(BlockVerifyError::TocCorrupted {
table_id,
path: path.to_path_buf(),
section_name: entry.name().to_vec(),
section_offset: start,
reason: format!("seek to section start failed: {e}"),
});
continue;
}
let mut ctx = WalkCtx {
reader: &mut reader,
table_id,
path,
data_buf: &mut data_buf,
blocks_scanned: &mut blocks_scanned,
errors: &mut errors,
max_data_length: block_data_length_cap(max_enc_overhead),
ecc,
ecc_unrecognized,
};
walk_block_region(&mut ctx, start, end);
}
Ok(PerFileScan {
blocks_scanned,
errors,
})
}
const RAW_FORMAT_SECTIONS: &[&[u8]] = &[b"linked_blob_files", b"table_version", b"meta_separator"];
const MAX_BLOCK_DATA_LENGTH: u64 = 256 * 1024 * 1024;
fn block_data_length_cap(max_enc_overhead: u32) -> u64 {
MAX_BLOCK_DATA_LENGTH + u64::from(max_enc_overhead)
}
struct WalkCtx<'a> {
reader: &'a mut io::BufReader<Box<dyn crate::fs::FsFile>>,
table_id: TableId,
path: &'a Path,
data_buf: &'a mut Vec<u8>,
blocks_scanned: &'a mut usize,
errors: &'a mut Vec<BlockVerifyError>,
max_data_length: u64,
ecc: Option<crate::table::block::EccParams>,
ecc_unrecognized: bool,
}
fn walk_block_region(ctx: &mut WalkCtx<'_>, start_offset: u64, end_offset: u64) {
#[cfg(not(feature = "std"))]
use io::Read;
#[cfg(feature = "std")]
use std::io::Read;
let mut offset = start_offset;
while offset < end_offset {
let remaining_in_section = end_offset - offset;
if remaining_in_section < Header::MIN_LEN as u64 {
ctx.errors.push(BlockVerifyError::HeaderCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
reason: format!(
"section has only {remaining_in_section} bytes left at this offset, \
less than Header::MIN_LEN = {}",
Header::MIN_LEN,
),
});
return;
}
let header = match Header::decode_from(ctx.reader) {
Ok(h) => h,
Err(e) => {
ctx.errors.push(BlockVerifyError::HeaderCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
reason: format!("{e:?}"),
});
return;
}
};
if ctx.ecc_unrecognized && !Header::has_block_flags(header.block_type) {
return;
}
*ctx.blocks_scanned += 1;
let header_len = Header::header_len(header.block_type) as u64;
let block_ecc = if Header::has_block_flags(header.block_type) {
(header.block_flags & crate::table::block::header::block_flags::ECC_PARITY != 0)
.then_some(crate::table::block::EccParams::RS_4_2)
} else {
ctx.ecc
};
let parity_len = block_ecc.map_or(0, |scheme| {
u64::from(crate::table::block::expected_parity_len(
header.data_length,
scheme,
))
});
let data_length_u64 = u64::from(header.data_length);
if data_length_u64 > ctx.max_data_length {
ctx.errors.push(BlockVerifyError::HeaderCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
reason: format!(
"header data_length {data_length_u64} exceeds hard cap {}",
ctx.max_data_length,
),
});
return;
}
if header_len > remaining_in_section {
ctx.errors.push(BlockVerifyError::HeaderCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
reason: format!(
"block header ({header_len} bytes) extends past the section end \
({remaining_in_section} bytes remain)",
),
});
return;
}
let remaining = remaining_in_section - header_len;
let on_disk_payload = data_length_u64 + parity_len;
if on_disk_payload > remaining {
ctx.errors.push(BlockVerifyError::HeaderCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
reason: format!(
"header data_length {data_length_u64} + parity {parity_len} exceeds \
remaining section bytes {remaining}",
),
});
return;
}
let data_length = header.data_length as usize;
ctx.data_buf.resize(data_length, 0);
if let Err(e) = ctx.reader.read_exact(ctx.data_buf.as_mut_slice()) {
ctx.errors.push(BlockVerifyError::DataReadError {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
data_length: header.data_length,
error: e.into(),
});
return;
}
let computed = Checksum::from_raw(crate::hash::hash128(ctx.data_buf));
if computed != header.checksum {
ctx.errors.push(BlockVerifyError::DataCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
data_length: header.data_length,
expected: header.checksum,
got: computed,
});
}
if parity_len > 0 {
let mut scratch = [0u8; 512];
let mut remaining = parity_len;
let drain: io::Result<()> = loop {
if remaining == 0 {
break Ok(());
}
let want =
usize::try_from(remaining.min(scratch.len() as u64)).unwrap_or(scratch.len());
let (head, _) = scratch.split_at_mut(want);
match ctx.reader.read(head) {
Ok(0) => {
break Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
alloc::format!(
"parity trailer truncated: read {} of {parity_len} bytes",
parity_len - remaining
),
));
}
Ok(n) => remaining -= n as u64,
Err(e) => {
let e: io::Error = e.into();
if e.kind() != io::ErrorKind::Interrupted {
break Err(e);
}
}
}
};
if let Err(error) = drain {
ctx.errors.push(BlockVerifyError::DataReadError {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
data_length: header.data_length,
error,
});
return;
}
}
offset += header_len + data_length_u64 + parity_len;
}
}
#[cfg(test)]
#[expect(clippy::unwrap_used, clippy::expect_used, reason = "test assertions")]
mod block_verify_tests;