use crate::{checksum::Checksum, coding::Decode, table::TableId, table::block::Header};
use std::path::{Path, PathBuf};
#[derive(Debug)]
#[non_exhaustive]
pub enum IntegrityError {
SstFileCorrupted {
table_id: TableId,
path: PathBuf,
expected: Checksum,
got: Checksum,
},
BlobFileCorrupted {
blob_file_id: u64,
path: PathBuf,
expected: Checksum,
got: Checksum,
},
IoError {
path: PathBuf,
error: std::io::Error,
},
}
impl std::fmt::Display for IntegrityError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SstFileCorrupted {
table_id,
path,
expected,
got,
} => write!(
f,
"SST table {table_id} corrupted at {}: expected {expected}, got {got}",
path.display()
),
Self::BlobFileCorrupted {
blob_file_id,
path,
expected,
got,
} => write!(
f,
"blob file {blob_file_id} corrupted at {}: expected {expected}, got {got}",
path.display()
),
Self::IoError { path, error } => {
write!(f, "I/O error reading {}: {}", path.display(), error)
}
}
}
}
impl std::error::Error for IntegrityError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::IoError { error, .. } => Some(error),
_ => None,
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub struct IntegrityReport {
pub sst_files_checked: usize,
pub blob_files_checked: usize,
pub errors: Vec<IntegrityError>,
}
impl IntegrityReport {
#[must_use]
pub fn is_ok(&self) -> bool {
self.errors.is_empty()
}
#[must_use]
pub fn files_checked(&self) -> usize {
self.sst_files_checked + self.blob_files_checked
}
}
fn stream_checksum(path: &std::path::Path) -> std::io::Result<Checksum> {
use std::io::Read;
let mut reader = std::fs::File::open(path)?;
let mut hasher = xxhash_rust::xxh3::Xxh3Default::new();
let mut buf = vec![0u8; 64 * 1024];
loop {
let n = match reader.read(&mut buf) {
Ok(n) => n,
Err(e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
if n == 0 {
break;
}
if let Some(chunk) = buf.get(..n) {
hasher.update(chunk);
}
}
Ok(Checksum::from_raw(hasher.digest128()))
}
#[must_use]
pub fn verify_integrity(tree: &impl crate::AbstractTree) -> IntegrityReport {
let version = tree.current_version();
let mut report = IntegrityReport {
sst_files_checked: 0,
blob_files_checked: 0,
errors: Vec::new(),
};
for table in version.iter_tables() {
let path = &*table.path;
let expected = table.checksum();
match stream_checksum(path) {
Ok(got) if got != expected => {
report.errors.push(IntegrityError::SstFileCorrupted {
table_id: table.id(),
path: (*table.path).clone(),
expected,
got,
});
}
Ok(_) => {}
Err(e) => {
report.errors.push(IntegrityError::IoError {
path: (*table.path).clone(),
error: e,
});
}
}
report.sst_files_checked += 1;
}
for blob_file in version.blob_files.iter() {
let path = blob_file.path();
let expected = blob_file.checksum();
match stream_checksum(path) {
Ok(got) if got != expected => {
report.errors.push(IntegrityError::BlobFileCorrupted {
blob_file_id: blob_file.id(),
path: path.to_path_buf(),
expected,
got,
});
}
Ok(_) => {}
Err(e) => {
report.errors.push(IntegrityError::IoError {
path: path.to_path_buf(),
error: e,
});
}
}
report.blob_files_checked += 1;
}
report
}
#[derive(Debug)]
#[non_exhaustive]
pub enum BlockVerifyError {
SstFileUnreadable {
table_id: TableId,
path: PathBuf,
error: std::io::Error,
},
HeaderCorrupted {
table_id: TableId,
path: PathBuf,
offset: u64,
reason: String,
},
DataCorrupted {
table_id: TableId,
path: PathBuf,
offset: u64,
data_length: u32,
expected: Checksum,
got: Checksum,
},
DataReadError {
table_id: TableId,
path: PathBuf,
offset: u64,
data_length: u32,
error: std::io::Error,
},
TocCorrupted {
table_id: TableId,
path: PathBuf,
section_name: Vec<u8>,
section_offset: u64,
reason: String,
},
}
impl std::fmt::Display for BlockVerifyError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::SstFileUnreadable {
table_id,
path,
error,
} => write!(
f,
"SST table {table_id} at {} could not be opened/parsed: {error}",
path.display(),
),
Self::HeaderCorrupted {
table_id,
path,
offset,
reason,
} => write!(
f,
"SST table {table_id} at {}: block header at offset {offset} is corrupt ({reason})",
path.display(),
),
Self::DataCorrupted {
table_id,
path,
offset,
data_length,
expected,
got,
} => write!(
f,
"SST table {table_id} at {}: block at offset {offset} ({data_length} bytes) data \
checksum mismatch, expected {expected}, got {got}",
path.display(),
),
Self::DataReadError {
table_id,
path,
offset,
data_length,
error,
} => write!(
f,
"SST table {table_id} at {}: failed to read {data_length}-byte data segment for \
block at offset {offset}: {error}",
path.display(),
),
Self::TocCorrupted {
table_id,
path,
section_name,
section_offset,
reason,
} => write!(
f,
"SST table {table_id} at {}: TOC section {:?} at offset {section_offset} is \
unreachable ({reason})",
path.display(),
String::from_utf8_lossy(section_name),
),
}
}
}
impl std::error::Error for BlockVerifyError {
fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
match self {
Self::SstFileUnreadable { error, .. } | Self::DataReadError { error, .. } => {
Some(error)
}
_ => None,
}
}
}
#[derive(Debug)]
#[non_exhaustive]
pub enum BlockVerifyWarning {
UnrecognizedEcc {
table_id: TableId,
path: std::path::PathBuf,
},
}
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct BlockVerifyReport {
pub sst_files_scanned: usize,
pub blocks_scanned: usize,
pub errors: Vec<BlockVerifyError>,
pub warnings: Vec<BlockVerifyWarning>,
}
impl BlockVerifyReport {
#[must_use]
pub fn is_ok(&self) -> bool {
self.errors.is_empty()
}
#[must_use]
pub fn has_warnings(&self) -> bool {
!self.warnings.is_empty()
}
}
#[derive(Clone, Debug)]
pub struct VerifyOptions {
pub parallelism: usize,
pub throttle: Option<std::time::Duration>,
}
impl Default for VerifyOptions {
fn default() -> Self {
Self {
parallelism: 1,
throttle: None,
}
}
}
impl VerifyOptions {
#[must_use]
pub const fn parallelism(mut self, workers: usize) -> Self {
self.parallelism = workers;
self
}
#[must_use]
pub const fn throttle(mut self, delay: std::time::Duration) -> Self {
self.throttle = Some(delay);
self
}
}
fn merge_report(dst: &mut BlockVerifyReport, src: BlockVerifyReport) {
dst.sst_files_scanned += src.sst_files_scanned;
dst.blocks_scanned += src.blocks_scanned;
dst.errors.extend(src.errors);
dst.warnings.extend(src.warnings);
}
fn scan_one_table(table: &crate::table::Table) -> BlockVerifyReport {
let mut report = BlockVerifyReport {
sst_files_scanned: 1,
..BlockVerifyReport::default()
};
let path: &Path = &table.path;
let table_id = table.id();
let ecc_unrecognized = table.metadata.ecc_unrecognized;
if ecc_unrecognized {
log::warn!(
"table {table_id} at {}: unrecognized ECC scheme — skipping the \
ECC-dependent block sections; recompact to re-stamp with a \
supported scheme",
path.display(),
);
report.warnings.push(BlockVerifyWarning::UnrecognizedEcc {
table_id,
path: path.to_path_buf(),
});
}
let max_enc_overhead = table.encryption.as_ref().map_or(0u32, |e| e.max_overhead());
match scan_sst_blocks(
&*table.fs,
path,
table_id,
max_enc_overhead,
table.metadata.ecc_params,
ecc_unrecognized,
) {
Ok(per_file) => {
report.blocks_scanned += per_file.blocks_scanned;
report.errors.extend(per_file.errors);
}
Err(error) => {
report.errors.push(BlockVerifyError::SstFileUnreadable {
table_id,
path: path.to_path_buf(),
error,
});
}
}
report
}
#[must_use]
pub fn verify_block_checksums(tree: &impl crate::AbstractTree) -> BlockVerifyReport {
verify_block_checksums_with(tree, &VerifyOptions::default())
}
#[must_use]
pub fn verify_block_checksums_with(
tree: &impl crate::AbstractTree,
options: &VerifyOptions,
) -> BlockVerifyReport {
let version = tree.current_version();
let tables: Vec<crate::table::Table> = version.iter_tables().cloned().collect();
let workers = options.parallelism.max(1).min(tables.len().max(1));
if workers <= 1 {
let mut report = BlockVerifyReport::default();
for (idx, table) in tables.iter().enumerate() {
merge_report(&mut report, scan_one_table(table));
if idx + 1 < tables.len()
&& let Some(delay) = options.throttle
{
std::thread::sleep(delay);
}
}
return report;
}
let cursor = std::sync::atomic::AtomicUsize::new(0);
let partials = std::thread::scope(|scope| {
let handles: Vec<_> = (0..workers)
.map(|_| {
scope.spawn(|| {
let mut local = BlockVerifyReport::default();
let mut idx = cursor.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
while let Some(table) = tables.get(idx) {
merge_report(&mut local, scan_one_table(table));
idx = cursor.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if tables.get(idx).is_some()
&& let Some(delay) = options.throttle
{
std::thread::sleep(delay);
}
}
local
})
})
.collect();
handles
.into_iter()
.map(|handle| match handle.join() {
Ok(local) => local,
Err(payload) => std::panic::resume_unwind(payload),
})
.collect::<Vec<_>>()
});
let mut report = BlockVerifyReport::default();
for partial in partials {
merge_report(&mut report, partial);
}
report
}
pub fn verify_kv_checksums(tree: &impl crate::AbstractTree) -> crate::Result<()> {
let version = tree.current_version();
for table in version.iter_tables() {
table.verify_kv_checksums()?;
}
Ok(())
}
#[cfg(feature = "std")]
#[must_use]
pub fn verify_sst_file(path: &std::path::Path) -> BlockVerifyReport {
use crate::fs::StdFs;
let fs = StdFs;
let mut report = BlockVerifyReport {
sst_files_scanned: 1,
..BlockVerifyReport::default()
};
let mut ecc_unrecognized = false;
let ecc = match read_ecc_params_out_of_band(&fs, path) {
Ok(Some(ScrubEcc::Off)) => None,
Ok(Some(ScrubEcc::Scheme(params))) => Some(params),
Ok(Some(ScrubEcc::Unrecognized)) => {
log::warn!(
"{}: unrecognized ECC scheme — skipping the ECC-dependent block \
sections; recompact to re-stamp with a supported scheme",
path.display(),
);
report.warnings.push(BlockVerifyWarning::UnrecognizedEcc {
table_id: 0,
path: path.to_path_buf(),
});
ecc_unrecognized = true;
None
}
Ok(None) => {
report.errors.push(BlockVerifyError::SstFileUnreadable {
table_id: 0,
path: path.to_path_buf(),
error: std::io::Error::new(
std::io::ErrorKind::InvalidData,
"could not decode the SST meta block to determine the ECC scheme \
(corrupt meta, or an encrypted SST with no key out-of-band); \
skipping the block walk — use verify_block_checksums on a live \
tree for ECC-aware verification",
),
});
return report;
}
Err(error) => {
report.errors.push(BlockVerifyError::SstFileUnreadable {
table_id: 0,
path: path.to_path_buf(),
error,
});
return report;
}
};
match scan_sst_blocks(&fs, path, 0, 0, ecc, ecc_unrecognized) {
Ok(per_file) => {
report.blocks_scanned = per_file.blocks_scanned;
report.errors = per_file.errors;
}
Err(error) => {
report.errors.push(BlockVerifyError::SstFileUnreadable {
table_id: 0,
path: path.to_path_buf(),
error,
});
}
}
report
}
#[cfg(feature = "std")]
enum ScrubEcc {
Off,
Scheme(crate::table::block::EccParams),
Unrecognized,
}
#[cfg(feature = "std")]
fn read_ecc_params_out_of_band(
fs: &dyn crate::fs::Fs,
path: &std::path::Path,
) -> std::io::Result<Option<ScrubEcc>> {
let mut probe = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
let sfa_reader = crate::sfa::Reader::from_reader(&mut probe)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let toc = sfa_reader.toc();
for name in [b"meta".as_slice(), b"meta_mid".as_slice()] {
let Some((pos, len)) = toc.section(name).map(|e| (e.pos(), e.len())) else {
continue;
};
let Ok(size) = u32::try_from(len) else {
continue;
};
let handle = crate::table::BlockHandle::new(crate::table::BlockOffset(pos), size);
if let Ok(meta) =
crate::table::meta::ParsedMeta::load_with_handle(probe.as_ref(), &handle, None, None)
{
let state = if meta.ecc_unrecognized {
ScrubEcc::Unrecognized
} else if let Some(params) = meta.ecc_params {
ScrubEcc::Scheme(params)
} else {
ScrubEcc::Off
};
return Ok(Some(state));
}
}
Ok(None)
}
struct PerFileScan {
blocks_scanned: usize,
errors: Vec<BlockVerifyError>,
}
fn scan_sst_blocks(
fs: &dyn crate::fs::Fs,
path: &Path,
table_id: TableId,
max_enc_overhead: u32,
ecc: Option<crate::table::block::EccParams>,
ecc_unrecognized: bool,
) -> std::io::Result<PerFileScan> {
use std::io::{BufReader, Seek, SeekFrom};
let mut file = fs.open(path, &crate::fs::FsOpenOptions::new().read(true))?;
let sfa_reader = crate::sfa::Reader::from_reader(&mut file)
.map_err(|e| std::io::Error::new(std::io::ErrorKind::InvalidData, e))?;
let toc = sfa_reader.toc();
let mut reader = BufReader::with_capacity(64 * 1024, file);
let mut blocks_scanned: usize = 0;
let mut errors: Vec<BlockVerifyError> = Vec::new();
let mut data_buf: Vec<u8> = Vec::new();
for entry in toc.iter() {
if RAW_FORMAT_SECTIONS.contains(&entry.name()) {
continue;
}
let start = entry.pos();
let Some(end) = start.checked_add(entry.len()) else {
errors.push(BlockVerifyError::TocCorrupted {
table_id,
path: path.to_path_buf(),
section_name: entry.name().to_vec(),
section_offset: start,
reason: format!(
"section length {} overflows u64 when added to start offset {start}",
entry.len(),
),
});
continue;
};
if let Err(e) = reader.seek(SeekFrom::Start(start)) {
errors.push(BlockVerifyError::TocCorrupted {
table_id,
path: path.to_path_buf(),
section_name: entry.name().to_vec(),
section_offset: start,
reason: format!("seek to section start failed: {e}"),
});
continue;
}
let mut ctx = WalkCtx {
reader: &mut reader,
table_id,
path,
data_buf: &mut data_buf,
blocks_scanned: &mut blocks_scanned,
errors: &mut errors,
max_data_length: block_data_length_cap(max_enc_overhead),
ecc,
ecc_unrecognized,
};
walk_block_region(&mut ctx, start, end);
}
Ok(PerFileScan {
blocks_scanned,
errors,
})
}
const RAW_FORMAT_SECTIONS: &[&[u8]] = &[b"linked_blob_files", b"table_version", b"meta_separator"];
const MAX_BLOCK_DATA_LENGTH: u64 = 256 * 1024 * 1024;
fn block_data_length_cap(max_enc_overhead: u32) -> u64 {
MAX_BLOCK_DATA_LENGTH + u64::from(max_enc_overhead)
}
struct WalkCtx<'a> {
reader: &'a mut std::io::BufReader<Box<dyn crate::fs::FsFile>>,
table_id: TableId,
path: &'a Path,
data_buf: &'a mut Vec<u8>,
blocks_scanned: &'a mut usize,
errors: &'a mut Vec<BlockVerifyError>,
max_data_length: u64,
ecc: Option<crate::table::block::EccParams>,
ecc_unrecognized: bool,
}
fn walk_block_region(ctx: &mut WalkCtx<'_>, start_offset: u64, end_offset: u64) {
use std::io::Read;
let mut offset = start_offset;
while offset < end_offset {
let remaining_in_section = end_offset - offset;
if remaining_in_section < Header::MIN_LEN as u64 {
ctx.errors.push(BlockVerifyError::HeaderCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
reason: format!(
"section has only {remaining_in_section} bytes left at this offset, \
less than Header::MIN_LEN = {}",
Header::MIN_LEN,
),
});
return;
}
let header = match Header::decode_from(ctx.reader) {
Ok(h) => h,
Err(e) => {
ctx.errors.push(BlockVerifyError::HeaderCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
reason: format!("{e:?}"),
});
return;
}
};
if ctx.ecc_unrecognized && !Header::has_block_flags(header.block_type) {
return;
}
*ctx.blocks_scanned += 1;
let header_len = Header::header_len(header.block_type) as u64;
let block_ecc = if Header::has_block_flags(header.block_type) {
(header.block_flags & crate::table::block::header::block_flags::ECC_PARITY != 0)
.then_some(crate::table::block::EccParams::RS_4_2)
} else {
ctx.ecc
};
let parity_len = block_ecc.map_or(0, |scheme| {
u64::from(crate::table::block::expected_parity_len(
header.data_length,
scheme,
))
});
let data_length_u64 = u64::from(header.data_length);
if data_length_u64 > ctx.max_data_length {
ctx.errors.push(BlockVerifyError::HeaderCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
reason: format!(
"header data_length {data_length_u64} exceeds hard cap {}",
ctx.max_data_length,
),
});
return;
}
if header_len > remaining_in_section {
ctx.errors.push(BlockVerifyError::HeaderCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
reason: format!(
"block header ({header_len} bytes) extends past the section end \
({remaining_in_section} bytes remain)",
),
});
return;
}
let remaining = remaining_in_section - header_len;
let on_disk_payload = data_length_u64 + parity_len;
if on_disk_payload > remaining {
ctx.errors.push(BlockVerifyError::HeaderCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
reason: format!(
"header data_length {data_length_u64} + parity {parity_len} exceeds \
remaining section bytes {remaining}",
),
});
return;
}
let data_length = header.data_length as usize;
ctx.data_buf.resize(data_length, 0);
if let Err(e) = ctx.reader.read_exact(ctx.data_buf.as_mut_slice()) {
ctx.errors.push(BlockVerifyError::DataReadError {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
data_length: header.data_length,
error: e,
});
return;
}
let computed = Checksum::from_raw(crate::hash::hash128(ctx.data_buf));
if computed != header.checksum {
ctx.errors.push(BlockVerifyError::DataCorrupted {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
data_length: header.data_length,
expected: header.checksum,
got: computed,
});
}
if parity_len > 0 {
match std::io::copy(
&mut ctx.reader.by_ref().take(parity_len),
&mut std::io::sink(),
) {
Ok(n) if n == parity_len => {}
Ok(n) => {
ctx.errors.push(BlockVerifyError::DataReadError {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
data_length: header.data_length,
error: std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
format!("parity trailer truncated: read {n} of {parity_len} bytes"),
),
});
return;
}
Err(e) => {
ctx.errors.push(BlockVerifyError::DataReadError {
table_id: ctx.table_id,
path: ctx.path.to_path_buf(),
offset,
data_length: header.data_length,
error: e,
});
return;
}
}
}
offset += header_len + data_length_u64 + parity_len;
}
}
#[cfg(test)]
#[expect(clippy::unwrap_used, clippy::expect_used, reason = "test assertions")]
mod block_verify_tests {
use super::*;
use crate::{
AbstractTree, Config, SequenceNumberCounter, compression::CompressionType,
config::CompressionPolicy,
};
use std::io::{Read, Seek, SeekFrom, Write};
use test_log::test;
fn populate_tree(dir: &std::path::Path, items: usize) {
let cfg = Config::new(
dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None));
let tree = cfg.open().unwrap();
for i in 0u64..items as u64 {
let key = format!("k{i:08}");
let val = format!("v{i:08}");
tree.insert(key.as_bytes(), val.as_bytes(), 1 + i);
}
tree.flush_active_memtable(1 + items as u64).unwrap();
drop(tree);
}
fn reopen_tree(dir: &std::path::Path) -> crate::AnyTree {
Config::new(
dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None))
.open()
.unwrap()
}
fn populate_tree_kv_checked(dir: &std::path::Path, items: usize) {
use crate::AbstractTree;
use crate::runtime_config::KvChecksumPolicy;
let cfg = Config::new(
dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None));
let any = cfg.open().unwrap();
let crate::AnyTree::Standard(tree) = any else {
panic!("expected Standard tree");
};
tree.update_runtime_config(|c| {
c.kv_checksums = KvChecksumPolicy::AllLevels;
})
.unwrap();
for i in 0u64..items as u64 {
let key = format!("k{i:08}");
let val = format!("v{i:08}");
tree.insert(key.as_bytes(), val.as_bytes(), 1 + i);
}
tree.flush_active_memtable(1 + items as u64).unwrap();
drop(tree);
}
#[test]
fn verify_block_checksums_clean_tree_has_no_errors() {
let dir = tempfile::tempdir().unwrap();
populate_tree(dir.path(), 1_000);
let tree = reopen_tree(dir.path());
let report = verify_block_checksums(&tree);
assert!(
report.is_ok(),
"expected clean tree to verify with zero errors, got {:?}",
report.errors
);
assert!(
report.blocks_scanned > 0,
"expected at least one block scanned",
);
assert!(
report.sst_files_scanned >= 1,
"expected at least one SST scanned",
);
}
#[cfg(feature = "page_ecc")]
#[test]
fn verify_block_checksums_clean_page_ecc_tree_has_no_errors() {
use crate::AbstractTree;
let dir = tempfile::tempdir().unwrap();
{
let any = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None))
.page_ecc(true)
.ecc_scheme(crate::runtime_config::EccScheme::ReedSolomon {
data_shards: 4,
parity_shards: 2,
})
.open()
.unwrap();
for i in 0u64..2_000 {
let key = format!("k{i:08}");
let val = format!("v{i:08}");
any.insert(key.as_bytes(), val.as_bytes(), 1 + i);
}
any.flush_active_memtable(2_001).unwrap();
drop(any);
}
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None))
.page_ecc(true)
.ecc_scheme(crate::runtime_config::EccScheme::ReedSolomon {
data_shards: 4,
parity_shards: 2,
})
.open()
.unwrap();
let report = verify_block_checksums(&tree);
assert!(
report.is_ok(),
"page_ecc tree must verify with zero errors (parity trailers skipped \
per block), got {:?}",
report.errors,
);
assert!(
report.blocks_scanned > 1,
"expected multiple blocks scanned to exercise cross-block alignment",
);
}
#[cfg(feature = "page_ecc")]
#[test]
fn verify_block_checksums_clean_nondefault_ecc_tree_has_no_errors() {
use crate::AbstractTree;
let dir = tempfile::tempdir().unwrap();
{
let any = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None))
.page_ecc(true)
.ecc_scheme(crate::runtime_config::EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.open()
.unwrap();
for i in 0u64..2_000 {
let key = format!("k{i:08}");
let val = format!("v{i:08}");
any.insert(key.as_bytes(), val.as_bytes(), 1 + i);
}
any.flush_active_memtable(2_001).unwrap();
drop(any);
}
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None))
.page_ecc(true)
.ecc_scheme(crate::runtime_config::EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.open()
.unwrap();
let report = verify_block_checksums(&tree);
assert!(
report.is_ok(),
"non-default-scheme ECC tree must verify with zero errors \
(parity sized from the descriptor, not RS(4,2)), got {:?}",
report.errors,
);
assert!(
report.blocks_scanned > 1,
"expected multiple blocks scanned to exercise cross-block alignment",
);
}
fn pick_first_sst_path(dir: &std::path::Path) -> std::path::PathBuf {
let tree = reopen_tree(dir);
let path = tree
.current_version()
.iter_tables()
.next()
.map(|table| (*table.path).clone())
.expect("at least one populated SST file");
drop(tree);
path
}
#[test]
fn verify_block_checksums_detects_flipped_byte_in_data_block() {
use crate::table::block::Header;
let dir = tempfile::tempdir().unwrap();
populate_tree(dir.path(), 1_000);
let sst_path = pick_first_sst_path(dir.path());
let flip_offset = Header::MIN_LEN as u64;
{
let mut f = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&sst_path)
.unwrap();
f.seek(SeekFrom::Start(flip_offset)).unwrap();
let mut byte = [0u8; 1];
f.read_exact(&mut byte).unwrap();
byte[0] ^= 0xFF;
f.seek(SeekFrom::Start(flip_offset)).unwrap();
f.write_all(&byte).unwrap();
f.sync_all().unwrap();
}
let tree = reopen_tree(dir.path());
let report = verify_block_checksums(&tree);
assert!(
!report.is_ok(),
"expected corruption to surface as report errors, got {report:?}",
);
let has_data_corruption = report.errors.iter().any(|e| {
matches!(
e,
BlockVerifyError::DataCorrupted { path, .. } if path == &sst_path,
)
});
assert!(
has_data_corruption,
"expected a DataCorrupted error for {}, got {:?}",
sst_path.display(),
report.errors,
);
}
#[test]
fn verify_kv_checksums_clean_kv_checked_tree_passes() {
let dir = tempfile::tempdir().unwrap();
populate_tree_kv_checked(dir.path(), 500);
let tree = reopen_tree(dir.path());
let crate::AnyTree::Standard(tree) = tree else {
panic!("expected Standard tree");
};
verify_kv_checksums(&tree).expect("clean kv-checked tree must pass per-KV scrub");
}
#[test]
fn verify_kv_checked_detects_corrupted_digest_under_valid_block_checksum() {
use crate::InternalValue;
use crate::ValueType::Value;
use crate::comparator::default_comparator;
use crate::runtime_config::ChecksumAlgorithm;
use crate::table::block::header::block_flags;
use crate::table::block::{Block, BlockIdentity, BlockTransform, BlockType, kv_checksum};
use crate::table::data_block::DataBlock;
let algo = ChecksumAlgorithm::Xxh3_64;
let items = [
InternalValue::from_components(b"alpha".to_vec(), b"one".to_vec(), 3, Value),
InternalValue::from_components(b"bravo".to_vec(), b"two".to_vec(), 2, Value),
];
let digests: Vec<u64> = items
.iter()
.map(|it| kv_checksum::kv_digest(it, algo).expect("xxh3 always available"))
.collect();
let mut payload = Vec::new();
DataBlock::encode_kv_checked_into(&mut payload, &items, &digests, algo, 2, 0.0).unwrap();
let inner_len = kv_checksum::split_inner(&payload).unwrap().len();
*payload.get_mut(inner_len).expect("digest array byte") ^= 0xFF;
let id = BlockIdentity::for_test(0, BlockType::Data);
let mut buf = Vec::new();
Block::write_into_with_flags(
&mut buf,
&payload,
id,
&BlockTransform::PLAIN,
block_flags::KV_CHECKSUM_FOOTER,
)
.unwrap();
let block = Block::from_reader(&mut &buf[..], id, &BlockTransform::PLAIN).unwrap();
let err =
DataBlock::verify_kv_checked(&block.data, block.header, default_comparator(), None)
.expect_err("corrupted stored digest must fail the per-KV verifier");
assert!(
matches!(err, crate::Error::ChecksumMismatch { .. }),
"expected ChecksumMismatch, got {err:?}"
);
}
#[test]
fn verify_kv_checked_rejects_non_data_block_type() {
use crate::InternalValue;
use crate::ValueType::Value;
use crate::comparator::default_comparator;
use crate::runtime_config::ChecksumAlgorithm;
use crate::table::block::header::block_flags;
use crate::table::block::{Block, BlockIdentity, BlockTransform, BlockType, kv_checksum};
use crate::table::data_block::DataBlock;
let algo = ChecksumAlgorithm::Xxh3_64;
let items = [
InternalValue::from_components(b"alpha".to_vec(), b"one".to_vec(), 3, Value),
InternalValue::from_components(b"bravo".to_vec(), b"two".to_vec(), 2, Value),
];
let digests: Vec<u64> = items
.iter()
.map(|it| kv_checksum::kv_digest(it, algo).expect("xxh3 always available"))
.collect();
let mut payload = Vec::new();
DataBlock::encode_kv_checked_into(&mut payload, &items, &digests, algo, 2, 0.0).unwrap();
let id = BlockIdentity::for_test(0, BlockType::Data);
let mut buf = Vec::new();
Block::write_into_with_flags(
&mut buf,
&payload,
id,
&BlockTransform::PLAIN,
block_flags::KV_CHECKSUM_FOOTER,
)
.unwrap();
let block = Block::from_reader(&mut &buf[..], id, &BlockTransform::PLAIN).unwrap();
let mut bad_header = block.header;
bad_header.block_type = BlockType::Index;
let err = DataBlock::verify_kv_checked(&block.data, bad_header, default_comparator(), None)
.expect_err("non-Data block_type must be rejected, not coerced");
assert!(
matches!(err, crate::Error::InvalidTag(("BlockType", _))),
"expected InvalidTag(BlockType), got {err:?}"
);
}
#[test]
fn verify_sst_file_clean_file_has_no_errors() {
let dir = tempfile::tempdir().unwrap();
populate_tree(dir.path(), 1_000);
let sst_path = pick_first_sst_path(dir.path());
let report = verify_sst_file(&sst_path);
assert!(
report.is_ok(),
"expected clean SST to verify with zero errors, got {:?}",
report.errors,
);
assert_eq!(
report.sst_files_scanned, 1,
"wrapper must always stamp sst_files_scanned = 1",
);
assert!(
report.blocks_scanned > 0,
"expected at least one block scanned in a populated SST",
);
}
#[test]
fn verify_sst_file_missing_file_reports_unreadable() {
let dir = tempfile::tempdir().unwrap();
let missing_path = dir.path().join("does-not-exist-sst-12345.sst");
assert!(
!missing_path.exists(),
"tempdir entry must be absent for this test to exercise the missing-file branch",
);
let report = verify_sst_file(&missing_path);
assert_eq!(
report.sst_files_scanned, 1,
"wrapper stamps sst_files_scanned = 1 even on file-open failure \
so callers see the attempt was made",
);
assert_eq!(
report.blocks_scanned, 0,
"no blocks could be walked because the file couldn't be opened",
);
assert_eq!(
report.errors.len(),
1,
"expected exactly one error, got {:?}",
report.errors,
);
let err = report.errors.first().unwrap();
assert!(
matches!(
err,
BlockVerifyError::SstFileUnreadable { table_id: 0, path, .. }
if path == &missing_path,
),
"expected SstFileUnreadable for {}, got {err:?}",
missing_path.display(),
);
}
#[test]
#[expect(
clippy::indexing_slicing,
clippy::cast_possible_truncation,
reason = "synthetic SFA forgery — offsets are all in-bounds by \
construction (we just wrote the bytes ourselves), and \
the u64 -> usize cast cannot overflow on any target \
the test runs on (the forged archive is < 1 KiB)"
)]
fn walk_block_region_reports_data_read_error_on_truncated_data_segment() {
use crate::coding::Encode;
use crate::fs::{Fs, FsOpenOptions, MemFs};
use crate::table::block::{BlockType, Header};
const TRAILER_LEN: usize = 4 + 1 + 1 + 16 + 8 + 8;
const DATA_LENGTH: u32 = 4096;
const HEADER_LEN: u64 = Header::MIN_LEN as u64;
let header = Header {
checksum: Checksum::from_raw(0xDEAD_BEEF_DEAD_BEEF),
data_length: DATA_LENGTH,
uncompressed_length: DATA_LENGTH,
..Header::test_dummy(BlockType::Data)
};
let mut archive_bytes: Vec<u8> = Vec::new();
{
let mut writer =
crate::sfa::Writer::from_writer(std::io::Cursor::new(&mut archive_bytes));
writer.start("data").unwrap();
writer.write_all(&header.encode_into_vec()).unwrap();
writer.finish().unwrap();
}
let trailer_start = archive_bytes.len() - TRAILER_LEN;
let toc_pos_bytes: [u8; 8] = archive_bytes[trailer_start + 22..trailer_start + 30]
.try_into()
.unwrap();
let toc_len_bytes: [u8; 8] = archive_bytes[trailer_start + 30..trailer_start + 38]
.try_into()
.unwrap();
let toc_pos = u64::from_le_bytes(toc_pos_bytes) as usize;
let toc_len = u64::from_le_bytes(toc_len_bytes) as usize;
let first_entry_offset = toc_pos + 4 + 4;
let len_field_offset = first_entry_offset + 8;
let lied_len: u64 = HEADER_LEN + u64::from(DATA_LENGTH);
archive_bytes[len_field_offset..len_field_offset + 8]
.copy_from_slice(&lied_len.to_le_bytes());
let new_toc_checksum = crate::hash::hash128(&archive_bytes[toc_pos..toc_pos + toc_len]);
let csum_field_offset = trailer_start + 4 + 1 + 1;
archive_bytes[csum_field_offset..csum_field_offset + 16]
.copy_from_slice(&new_toc_checksum.to_le_bytes());
let fs = MemFs::new();
let path = std::path::Path::new("/forged.sst");
{
let mut f = fs
.open(
path,
&FsOpenOptions::new().write(true).create(true).truncate(true),
)
.unwrap();
f.write_all(&archive_bytes).unwrap();
}
let table_id: TableId = 42;
let scan = scan_sst_blocks(&fs, path, table_id, 0, None, false)
.expect("forged SFA must parse cleanly");
assert_eq!(
scan.errors.len(),
1,
"expected exactly one error, got {:?}",
scan.errors,
);
let err = scan.errors.first().unwrap();
assert!(
matches!(
err,
BlockVerifyError::DataReadError {
table_id: t,
offset: 0,
data_length: d,
..
} if *t == table_id && *d == DATA_LENGTH,
),
"expected DataReadError {{ table_id: {table_id}, offset: 0, \
data_length: {DATA_LENGTH}, .. }}; got {err:?}",
);
assert_eq!(
scan.blocks_scanned, 1,
"header decoded successfully, so blocks_scanned must count this block \
even though the data segment read failed",
);
}
#[test]
#[expect(
clippy::indexing_slicing,
clippy::cast_possible_truncation,
reason = "synthetic SFA forgery — offsets are in-bounds by construction \
and the forged archive is < 1 KiB"
)]
fn walk_block_region_reports_header_crossing_section_boundary() {
use crate::coding::Encode;
use crate::fs::{Fs, FsOpenOptions, MemFs};
use crate::table::block::{BlockType, Header};
const TRAILER_LEN: usize = 4 + 1 + 1 + 16 + 8 + 8;
let header = Header {
checksum: Checksum::from_raw(0xDEAD_BEEF_DEAD_BEEF),
data_length: 0,
uncompressed_length: 0,
..Header::test_dummy(BlockType::Meta)
};
assert_eq!(
Header::header_len(BlockType::Meta) as u64,
Header::MIN_LEN as u64 + 1,
);
let mut archive_bytes: Vec<u8> = Vec::new();
{
let mut writer =
crate::sfa::Writer::from_writer(std::io::Cursor::new(&mut archive_bytes));
writer.start("data").unwrap();
writer.write_all(&header.encode_into_vec()).unwrap();
writer.finish().unwrap();
}
let trailer_start = archive_bytes.len() - TRAILER_LEN;
let toc_pos_bytes: [u8; 8] = archive_bytes[trailer_start + 22..trailer_start + 30]
.try_into()
.unwrap();
let toc_len_bytes: [u8; 8] = archive_bytes[trailer_start + 30..trailer_start + 38]
.try_into()
.unwrap();
let toc_pos = u64::from_le_bytes(toc_pos_bytes) as usize;
let toc_len = u64::from_le_bytes(toc_len_bytes) as usize;
let first_entry_offset = toc_pos + 4 + 4;
let len_field_offset = first_entry_offset + 8;
let lied_len: u64 = Header::MIN_LEN as u64;
archive_bytes[len_field_offset..len_field_offset + 8]
.copy_from_slice(&lied_len.to_le_bytes());
let new_toc_checksum = crate::hash::hash128(&archive_bytes[toc_pos..toc_pos + toc_len]);
let csum_field_offset = trailer_start + 4 + 1 + 1;
archive_bytes[csum_field_offset..csum_field_offset + 16]
.copy_from_slice(&new_toc_checksum.to_le_bytes());
let fs = MemFs::new();
let path = std::path::Path::new("/forged-boundary.sst");
{
let mut f = fs
.open(
path,
&FsOpenOptions::new().write(true).create(true).truncate(true),
)
.unwrap();
f.write_all(&archive_bytes).unwrap();
}
let table_id: TableId = 7;
let scan = scan_sst_blocks(&fs, path, table_id, 0, None, false)
.expect("forged SFA must parse cleanly");
assert_eq!(
scan.errors.len(),
1,
"expected exactly one error, got {:?}",
scan.errors,
);
let err = scan.errors.first().unwrap();
assert!(
matches!(
err,
BlockVerifyError::HeaderCorrupted { table_id: t, offset: 0, reason, .. }
if *t == table_id && reason.contains("extends past the section end"),
),
"expected a section-boundary HeaderCorrupted; got {err:?}",
);
}
fn populate_multi_sst(dir: &std::path::Path, batches: usize, per_batch: usize) {
let cfg = Config::new(
dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None));
let tree = cfg.open().unwrap();
let mut seqno = 1u64;
for b in 0..batches {
for i in 0..per_batch {
let key = format!("b{b:03}k{i:08}");
tree.insert(key.as_bytes(), b"v".as_slice(), seqno);
seqno += 1;
}
tree.flush_active_memtable(seqno).unwrap();
seqno += 1;
}
drop(tree);
}
#[test]
fn verify_checksum_method_on_clean_tree_is_ok() {
let dir = tempfile::tempdir().unwrap();
populate_tree(dir.path(), 500);
let tree = reopen_tree(dir.path());
let report = tree.verify_checksum();
assert!(report.is_ok(), "clean tree must verify clean: {report:?}");
assert!(report.sst_files_scanned >= 1);
assert!(report.blocks_scanned >= 1);
}
#[test]
fn verify_checksum_with_parallel_matches_sequential() {
let dir = tempfile::tempdir().unwrap();
populate_multi_sst(dir.path(), 5, 300);
let tree = reopen_tree(dir.path());
let seq = tree.verify_checksum_with(&VerifyOptions::default());
let par = tree.verify_checksum_with(&VerifyOptions::default().parallelism(4));
assert!(
seq.sst_files_scanned >= 2,
"need >1 SST to exercise parallelism, got {}",
seq.sst_files_scanned,
);
assert_eq!(seq.sst_files_scanned, par.sst_files_scanned);
assert_eq!(seq.blocks_scanned, par.blocks_scanned);
assert_eq!(seq.errors.len(), par.errors.len());
assert!(
seq.is_ok() && par.is_ok(),
"clean tree: seq={seq:?} par={par:?}"
);
}
#[test]
fn verify_checksum_with_parallel_detects_corruption() {
use crate::table::block::Header;
let dir = tempfile::tempdir().unwrap();
populate_multi_sst(dir.path(), 4, 300);
let sst_path = pick_first_sst_path(dir.path());
let flip_offset = Header::MIN_LEN as u64;
{
let mut f = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&sst_path)
.unwrap();
f.seek(SeekFrom::Start(flip_offset)).unwrap();
let mut byte = [0u8; 1];
f.read_exact(&mut byte).unwrap();
byte[0] ^= 0xFF;
f.seek(SeekFrom::Start(flip_offset)).unwrap();
f.write_all(&byte).unwrap();
f.sync_all().unwrap();
}
let tree = reopen_tree(dir.path());
let report = tree.verify_checksum_with(&VerifyOptions::default().parallelism(4));
assert!(
!report.is_ok(),
"parallel scrub must surface the flipped byte: {report:?}",
);
}
#[test]
fn verify_checksum_with_throttle_completes_clean() {
let dir = tempfile::tempdir().unwrap();
populate_multi_sst(dir.path(), 3, 200);
let tree = reopen_tree(dir.path());
let opts = VerifyOptions::default()
.parallelism(2)
.throttle(std::time::Duration::from_millis(1));
let report = tree.verify_checksum_with(&opts);
assert!(
report.is_ok(),
"throttled scrub must still verify clean: {report:?}"
);
assert!(report.sst_files_scanned >= 2);
}
#[test]
fn verify_checksum_with_throttle_does_not_sleep_after_last_sst() {
let dir = tempfile::tempdir().unwrap();
populate_multi_sst(dir.path(), 1, 50);
let tree = reopen_tree(dir.path());
let throttle = std::time::Duration::from_millis(400);
let opts = VerifyOptions::default().parallelism(1).throttle(throttle);
let start = std::time::Instant::now();
let report = tree.verify_checksum_with(&opts);
let elapsed = start.elapsed();
assert!(report.is_ok(), "clean single-SST scrub: {report:?}");
assert_eq!(report.sst_files_scanned, 1, "test needs exactly one SST");
assert!(
elapsed < throttle / 2,
"a single-SST scrub must not sleep the inter-SST throttle after the \
last table: took {elapsed:?} with a {throttle:?} throttle",
);
}
}