use super::*;
use crate::{
AbstractTree, Config, SequenceNumberCounter, compression::CompressionType,
config::CompressionPolicy,
};
use std::io::{Read, Seek, SeekFrom, Write};
use test_log::test;
fn populate_tree(dir: &std::path::Path, items: usize) {
let cfg = Config::new(
dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None));
let tree = cfg.open().unwrap();
for i in 0u64..items as u64 {
let key = format!("k{i:08}");
let val = format!("v{i:08}");
tree.insert(key.as_bytes(), val.as_bytes(), 1 + i);
}
tree.flush_active_memtable(1 + items as u64).unwrap();
drop(tree);
}
fn reopen_tree(dir: &std::path::Path) -> crate::AnyTree {
Config::new(
dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None))
.open()
.unwrap()
}
fn populate_tree_kv_checked(dir: &std::path::Path, items: usize) {
use crate::AbstractTree;
use crate::runtime_config::KvChecksumPolicy;
let cfg = Config::new(
dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None));
let any = cfg.open().unwrap();
let crate::AnyTree::Standard(tree) = any else {
panic!("expected Standard tree");
};
tree.update_runtime_config(|c| {
c.kv_checksums = KvChecksumPolicy::AllLevels;
})
.unwrap();
for i in 0u64..items as u64 {
let key = format!("k{i:08}");
let val = format!("v{i:08}");
tree.insert(key.as_bytes(), val.as_bytes(), 1 + i);
}
tree.flush_active_memtable(1 + items as u64).unwrap();
drop(tree);
}
#[test]
fn verify_block_checksums_clean_tree_has_no_errors() {
let dir = tempfile::tempdir().unwrap();
populate_tree(dir.path(), 1_000);
let tree = reopen_tree(dir.path());
let report = verify_block_checksums(&tree);
assert!(
report.is_ok(),
"expected clean tree to verify with zero errors, got {:?}",
report.errors
);
assert!(
report.blocks_scanned > 0,
"expected at least one block scanned",
);
assert!(
report.sst_files_scanned >= 1,
"expected at least one SST scanned",
);
}
#[cfg(feature = "page_ecc")]
#[test]
fn verify_block_checksums_clean_page_ecc_tree_has_no_errors() {
use crate::AbstractTree;
let dir = tempfile::tempdir().unwrap();
{
let any = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None))
.page_ecc(true)
.ecc_scheme(crate::runtime_config::EccScheme::ReedSolomon {
data_shards: 4,
parity_shards: 2,
})
.open()
.unwrap();
for i in 0u64..2_000 {
let key = format!("k{i:08}");
let val = format!("v{i:08}");
any.insert(key.as_bytes(), val.as_bytes(), 1 + i);
}
any.flush_active_memtable(2_001).unwrap();
drop(any);
}
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None))
.page_ecc(true)
.ecc_scheme(crate::runtime_config::EccScheme::ReedSolomon {
data_shards: 4,
parity_shards: 2,
})
.open()
.unwrap();
let report = verify_block_checksums(&tree);
assert!(
report.is_ok(),
"page_ecc tree must verify with zero errors (parity trailers skipped \
per block), got {:?}",
report.errors,
);
assert!(
report.blocks_scanned > 1,
"expected multiple blocks scanned to exercise cross-block alignment",
);
}
#[cfg(feature = "page_ecc")]
#[test]
fn verify_block_checksums_clean_nondefault_ecc_tree_has_no_errors() {
use crate::AbstractTree;
let dir = tempfile::tempdir().unwrap();
{
let any = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None))
.page_ecc(true)
.ecc_scheme(crate::runtime_config::EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.open()
.unwrap();
for i in 0u64..2_000 {
let key = format!("k{i:08}");
let val = format!("v{i:08}");
any.insert(key.as_bytes(), val.as_bytes(), 1 + i);
}
any.flush_active_memtable(2_001).unwrap();
drop(any);
}
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None))
.page_ecc(true)
.ecc_scheme(crate::runtime_config::EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.open()
.unwrap();
let report = verify_block_checksums(&tree);
assert!(
report.is_ok(),
"non-default-scheme ECC tree must verify with zero errors \
(parity sized from the descriptor, not RS(4,2)), got {:?}",
report.errors,
);
assert!(
report.blocks_scanned > 1,
"expected multiple blocks scanned to exercise cross-block alignment",
);
}
fn pick_first_sst_path(dir: &std::path::Path) -> std::path::PathBuf {
let tree = reopen_tree(dir);
let path = tree
.current_version()
.iter_tables()
.next()
.map(|table| (*table.path).clone())
.expect("at least one populated SST file");
drop(tree);
path
}
#[test]
fn verify_block_checksums_detects_flipped_byte_in_data_block() {
use crate::table::block::Header;
let dir = tempfile::tempdir().unwrap();
populate_tree(dir.path(), 1_000);
let sst_path = pick_first_sst_path(dir.path());
let flip_offset = Header::MIN_LEN as u64;
{
let mut f = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&sst_path)
.unwrap();
f.seek(SeekFrom::Start(flip_offset)).unwrap();
let mut byte = [0u8; 1];
f.read_exact(&mut byte).unwrap();
byte[0] ^= 0xFF;
f.seek(SeekFrom::Start(flip_offset)).unwrap();
f.write_all(&byte).unwrap();
f.sync_all().unwrap();
}
let tree = reopen_tree(dir.path());
let report = verify_block_checksums(&tree);
assert!(
!report.is_ok(),
"expected corruption to surface as report errors, got {report:?}",
);
let has_data_corruption = report.errors.iter().any(|e| {
matches!(
e,
BlockVerifyError::DataCorrupted { path, .. } if path == &sst_path,
)
});
assert!(
has_data_corruption,
"expected a DataCorrupted error for {}, got {:?}",
sst_path.display(),
report.errors,
);
}
#[test]
fn verify_kv_checksums_clean_kv_checked_tree_passes() {
let dir = tempfile::tempdir().unwrap();
populate_tree_kv_checked(dir.path(), 500);
let tree = reopen_tree(dir.path());
let crate::AnyTree::Standard(tree) = tree else {
panic!("expected Standard tree");
};
verify_kv_checksums(&tree).expect("clean kv-checked tree must pass per-KV scrub");
}
#[test]
fn verify_kv_checked_detects_corrupted_digest_under_valid_block_checksum() {
use crate::InternalValue;
use crate::ValueType::Value;
use crate::comparator::default_comparator;
use crate::runtime_config::ChecksumAlgorithm;
use crate::table::block::header::block_flags;
use crate::table::block::{Block, BlockIdentity, BlockTransform, BlockType, kv_checksum};
use crate::table::data_block::DataBlock;
let algo = ChecksumAlgorithm::Xxh3_64;
let items = [
InternalValue::from_components(b"alpha".to_vec(), b"one".to_vec(), 3, Value),
InternalValue::from_components(b"bravo".to_vec(), b"two".to_vec(), 2, Value),
];
let digests: Vec<u64> = items
.iter()
.map(|it| kv_checksum::kv_digest(it, algo).expect("xxh3 always available"))
.collect();
let mut payload = Vec::new();
DataBlock::encode_kv_checked_into(&mut payload, &items, &digests, algo, 2, 0.0).unwrap();
let inner_len = kv_checksum::split_inner(&payload).unwrap().len();
*payload.get_mut(inner_len).expect("digest array byte") ^= 0xFF;
let id = BlockIdentity::for_test(0, BlockType::Data);
let mut buf = Vec::new();
Block::write_into_with_flags(
&mut buf,
&payload,
id,
&BlockTransform::PLAIN,
block_flags::KV_CHECKSUM_FOOTER,
)
.unwrap();
let block = Block::from_reader(&mut &buf[..], id, &BlockTransform::PLAIN).unwrap();
let err = DataBlock::verify_kv_checked(&block.data, block.header, default_comparator(), None)
.expect_err("corrupted stored digest must fail the per-KV verifier");
assert!(
matches!(err, crate::Error::ChecksumMismatch { .. }),
"expected ChecksumMismatch, got {err:?}"
);
}
#[test]
fn verify_kv_checked_rejects_non_data_block_type() {
use crate::InternalValue;
use crate::ValueType::Value;
use crate::comparator::default_comparator;
use crate::runtime_config::ChecksumAlgorithm;
use crate::table::block::header::block_flags;
use crate::table::block::{Block, BlockIdentity, BlockTransform, BlockType, kv_checksum};
use crate::table::data_block::DataBlock;
let algo = ChecksumAlgorithm::Xxh3_64;
let items = [
InternalValue::from_components(b"alpha".to_vec(), b"one".to_vec(), 3, Value),
InternalValue::from_components(b"bravo".to_vec(), b"two".to_vec(), 2, Value),
];
let digests: Vec<u64> = items
.iter()
.map(|it| kv_checksum::kv_digest(it, algo).expect("xxh3 always available"))
.collect();
let mut payload = Vec::new();
DataBlock::encode_kv_checked_into(&mut payload, &items, &digests, algo, 2, 0.0).unwrap();
let id = BlockIdentity::for_test(0, BlockType::Data);
let mut buf = Vec::new();
Block::write_into_with_flags(
&mut buf,
&payload,
id,
&BlockTransform::PLAIN,
block_flags::KV_CHECKSUM_FOOTER,
)
.unwrap();
let block = Block::from_reader(&mut &buf[..], id, &BlockTransform::PLAIN).unwrap();
let mut bad_header = block.header;
bad_header.block_type = BlockType::Index;
let err = DataBlock::verify_kv_checked(&block.data, bad_header, default_comparator(), None)
.expect_err("non-Data block_type must be rejected, not coerced");
assert!(
matches!(err, crate::Error::InvalidTag(("BlockType", _))),
"expected InvalidTag(BlockType), got {err:?}"
);
}
#[test]
fn verify_sst_file_clean_file_has_no_errors() {
let dir = tempfile::tempdir().unwrap();
populate_tree(dir.path(), 1_000);
let sst_path = pick_first_sst_path(dir.path());
let report = verify_sst_file(&sst_path);
assert!(
report.is_ok(),
"expected clean SST to verify with zero errors, got {:?}",
report.errors,
);
assert_eq!(
report.sst_files_scanned, 1,
"wrapper must always stamp sst_files_scanned = 1",
);
assert!(
report.blocks_scanned > 0,
"expected at least one block scanned in a populated SST",
);
}
#[test]
fn verify_sst_file_missing_file_reports_unreadable() {
let dir = tempfile::tempdir().unwrap();
let missing_path = dir.path().join("does-not-exist-sst-12345.sst");
assert!(
!missing_path.exists(),
"tempdir entry must be absent for this test to exercise the missing-file branch",
);
let report = verify_sst_file(&missing_path);
assert_eq!(
report.sst_files_scanned, 1,
"wrapper stamps sst_files_scanned = 1 even on file-open failure \
so callers see the attempt was made",
);
assert_eq!(
report.blocks_scanned, 0,
"no blocks could be walked because the file couldn't be opened",
);
assert_eq!(
report.errors.len(),
1,
"expected exactly one error, got {:?}",
report.errors,
);
let err = report.errors.first().unwrap();
assert!(
matches!(
err,
BlockVerifyError::SstFileUnreadable { table_id: 0, path, .. }
if path == &missing_path,
),
"expected SstFileUnreadable for {}, got {err:?}",
missing_path.display(),
);
}
#[test]
#[expect(
clippy::indexing_slicing,
clippy::cast_possible_truncation,
reason = "synthetic SFA forgery — offsets are all in-bounds by \
construction (we just wrote the bytes ourselves), and \
the u64 -> usize cast cannot overflow on any target \
the test runs on (the forged archive is < 1 KiB)"
)]
fn walk_block_region_reports_data_read_error_on_truncated_data_segment() {
use crate::coding::Encode;
use crate::fs::{Fs, FsOpenOptions, MemFs};
use crate::table::block::{BlockType, Header};
const TRAILER_LEN: usize = 4 + 1 + 1 + 16 + 8 + 8;
const DATA_LENGTH: u32 = 4096;
const HEADER_LEN: u64 = Header::MIN_LEN as u64;
let header = Header {
checksum: Checksum::from_raw(0xDEAD_BEEF_DEAD_BEEF),
data_length: DATA_LENGTH,
uncompressed_length: DATA_LENGTH,
..Header::test_dummy(BlockType::Data)
};
let mut archive_bytes: Vec<u8> = Vec::new();
{
let mut writer = crate::sfa::Writer::from_writer(std::io::Cursor::new(&mut archive_bytes));
writer.start("data").unwrap();
writer.write_all(&header.encode_into_vec()).unwrap();
writer.finish().unwrap();
}
let trailer_start = archive_bytes.len() - TRAILER_LEN;
let toc_pos_bytes: [u8; 8] = archive_bytes[trailer_start + 22..trailer_start + 30]
.try_into()
.unwrap();
let toc_len_bytes: [u8; 8] = archive_bytes[trailer_start + 30..trailer_start + 38]
.try_into()
.unwrap();
let toc_pos = u64::from_le_bytes(toc_pos_bytes) as usize;
let toc_len = u64::from_le_bytes(toc_len_bytes) as usize;
let first_entry_offset = toc_pos + 4 + 4;
let len_field_offset = first_entry_offset + 8;
let lied_len: u64 = HEADER_LEN + u64::from(DATA_LENGTH);
archive_bytes[len_field_offset..len_field_offset + 8].copy_from_slice(&lied_len.to_le_bytes());
let new_toc_checksum = crate::hash::hash128(&archive_bytes[toc_pos..toc_pos + toc_len]);
let csum_field_offset = trailer_start + 4 + 1 + 1;
archive_bytes[csum_field_offset..csum_field_offset + 16]
.copy_from_slice(&new_toc_checksum.to_le_bytes());
let fs = MemFs::new();
let path = std::path::Path::new("/forged.sst");
{
let mut f = fs
.open(
path,
&FsOpenOptions::new().write(true).create(true).truncate(true),
)
.unwrap();
f.write_all(&archive_bytes).unwrap();
}
let table_id: TableId = 42;
let scan = scan_sst_blocks(&fs, path, table_id, 0, None, false)
.expect("forged SFA must parse cleanly");
assert_eq!(
scan.errors.len(),
1,
"expected exactly one error, got {:?}",
scan.errors,
);
let err = scan.errors.first().unwrap();
assert!(
matches!(
err,
BlockVerifyError::DataReadError {
table_id: t,
offset: 0,
data_length: d,
..
} if *t == table_id && *d == DATA_LENGTH,
),
"expected DataReadError {{ table_id: {table_id}, offset: 0, \
data_length: {DATA_LENGTH}, .. }}; got {err:?}",
);
assert_eq!(
scan.blocks_scanned, 1,
"header decoded successfully, so blocks_scanned must count this block \
even though the data segment read failed",
);
}
#[test]
#[expect(
clippy::indexing_slicing,
clippy::cast_possible_truncation,
reason = "synthetic SFA forgery — offsets are in-bounds by construction (we wrote the \
bytes ourselves) and the archive is < 8 KiB, so the casts cannot overflow"
)]
fn walk_block_region_reports_data_read_error_on_truncated_parity_trailer() {
use crate::coding::Encode;
use crate::fs::{Fs, FsOpenOptions, MemFs};
use crate::table::block::{BlockType, EccParams, Header, expected_parity_len};
const TRAILER_LEN: usize = 4 + 1 + 1 + 16 + 8 + 8;
const DATA_LENGTH: u32 = 4096;
const HEADER_LEN: u64 = Header::MIN_LEN as u64;
let data = vec![0xABu8; DATA_LENGTH as usize];
let header = Header {
checksum: Checksum::from_raw(crate::hash::hash128(&data)),
data_length: DATA_LENGTH,
uncompressed_length: DATA_LENGTH,
..Header::test_dummy(BlockType::Data)
};
let mut archive_bytes: Vec<u8> = Vec::new();
{
let mut writer = crate::sfa::Writer::from_writer(std::io::Cursor::new(&mut archive_bytes));
writer.start("data").unwrap();
writer.write_all(&header.encode_into_vec()).unwrap();
writer.write_all(&data).unwrap();
writer.finish().unwrap();
}
let parity_len = u64::from(expected_parity_len(DATA_LENGTH, EccParams::RS_4_2));
let trailer_start = archive_bytes.len() - TRAILER_LEN;
let toc_pos = u64::from_le_bytes(
archive_bytes[trailer_start + 22..trailer_start + 30]
.try_into()
.unwrap(),
) as usize;
let toc_len = u64::from_le_bytes(
archive_bytes[trailer_start + 30..trailer_start + 38]
.try_into()
.unwrap(),
) as usize;
let len_field_offset = toc_pos + 4 + 4 + 8;
let lied_len: u64 = HEADER_LEN + u64::from(DATA_LENGTH) + parity_len;
archive_bytes[len_field_offset..len_field_offset + 8].copy_from_slice(&lied_len.to_le_bytes());
let new_toc_checksum = crate::hash::hash128(&archive_bytes[toc_pos..toc_pos + toc_len]);
let csum_field_offset = trailer_start + 4 + 1 + 1;
archive_bytes[csum_field_offset..csum_field_offset + 16]
.copy_from_slice(&new_toc_checksum.to_le_bytes());
let fs = MemFs::new();
let path = std::path::Path::new("/forged-parity.sst");
{
let mut f = fs
.open(
path,
&FsOpenOptions::new().write(true).create(true).truncate(true),
)
.unwrap();
f.write_all(&archive_bytes).unwrap();
}
let table_id: TableId = 7;
let scan = scan_sst_blocks(&fs, path, table_id, 0, Some(EccParams::RS_4_2), false)
.expect("forged SFA must parse cleanly");
assert!(
scan.errors.iter().any(|e| matches!(
e,
BlockVerifyError::DataReadError { table_id: t, offset: 0, error, .. }
if *t == table_id && error.kind() == crate::io::ErrorKind::UnexpectedEof
)),
"expected a truncated-parity DataReadError, got {:?}",
scan.errors,
);
}
#[test]
#[expect(
clippy::indexing_slicing,
clippy::cast_possible_truncation,
reason = "synthetic SFA forgery — offsets are in-bounds by construction \
and the forged archive is < 1 KiB"
)]
fn walk_block_region_reports_header_crossing_section_boundary() {
use crate::coding::Encode;
use crate::fs::{Fs, FsOpenOptions, MemFs};
use crate::table::block::{BlockType, Header};
const TRAILER_LEN: usize = 4 + 1 + 1 + 16 + 8 + 8;
let header = Header {
checksum: Checksum::from_raw(0xDEAD_BEEF_DEAD_BEEF),
data_length: 0,
uncompressed_length: 0,
..Header::test_dummy(BlockType::Meta)
};
assert_eq!(
Header::header_len(BlockType::Meta) as u64,
Header::MIN_LEN as u64 + 1,
);
let mut archive_bytes: Vec<u8> = Vec::new();
{
let mut writer = crate::sfa::Writer::from_writer(std::io::Cursor::new(&mut archive_bytes));
writer.start("data").unwrap();
writer.write_all(&header.encode_into_vec()).unwrap();
writer.finish().unwrap();
}
let trailer_start = archive_bytes.len() - TRAILER_LEN;
let toc_pos_bytes: [u8; 8] = archive_bytes[trailer_start + 22..trailer_start + 30]
.try_into()
.unwrap();
let toc_len_bytes: [u8; 8] = archive_bytes[trailer_start + 30..trailer_start + 38]
.try_into()
.unwrap();
let toc_pos = u64::from_le_bytes(toc_pos_bytes) as usize;
let toc_len = u64::from_le_bytes(toc_len_bytes) as usize;
let first_entry_offset = toc_pos + 4 + 4;
let len_field_offset = first_entry_offset + 8;
let lied_len: u64 = Header::MIN_LEN as u64;
archive_bytes[len_field_offset..len_field_offset + 8].copy_from_slice(&lied_len.to_le_bytes());
let new_toc_checksum = crate::hash::hash128(&archive_bytes[toc_pos..toc_pos + toc_len]);
let csum_field_offset = trailer_start + 4 + 1 + 1;
archive_bytes[csum_field_offset..csum_field_offset + 16]
.copy_from_slice(&new_toc_checksum.to_le_bytes());
let fs = MemFs::new();
let path = std::path::Path::new("/forged-boundary.sst");
{
let mut f = fs
.open(
path,
&FsOpenOptions::new().write(true).create(true).truncate(true),
)
.unwrap();
f.write_all(&archive_bytes).unwrap();
}
let table_id: TableId = 7;
let scan = scan_sst_blocks(&fs, path, table_id, 0, None, false)
.expect("forged SFA must parse cleanly");
assert_eq!(
scan.errors.len(),
1,
"expected exactly one error, got {:?}",
scan.errors,
);
let err = scan.errors.first().unwrap();
assert!(
matches!(
err,
BlockVerifyError::HeaderCorrupted { table_id: t, offset: 0, reason, .. }
if *t == table_id && reason.contains("extends past the section end"),
),
"expected a section-boundary HeaderCorrupted; got {err:?}",
);
}
fn populate_multi_sst(dir: &std::path::Path, batches: usize, per_batch: usize) {
let cfg = Config::new(
dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.data_block_compression_policy(CompressionPolicy::all(CompressionType::None));
let tree = cfg.open().unwrap();
let mut seqno = 1u64;
for b in 0..batches {
for i in 0..per_batch {
let key = format!("b{b:03}k{i:08}");
tree.insert(key.as_bytes(), b"v".as_slice(), seqno);
seqno += 1;
}
tree.flush_active_memtable(seqno).unwrap();
seqno += 1;
}
drop(tree);
}
#[test]
fn verify_checksum_method_on_clean_tree_is_ok() {
let dir = tempfile::tempdir().unwrap();
populate_tree(dir.path(), 500);
let tree = reopen_tree(dir.path());
let report = tree.verify_checksum();
assert!(report.is_ok(), "clean tree must verify clean: {report:?}");
assert!(report.sst_files_scanned >= 1);
assert!(report.blocks_scanned >= 1);
}
#[test]
fn verify_checksum_with_parallel_matches_sequential() {
let dir = tempfile::tempdir().unwrap();
populate_multi_sst(dir.path(), 5, 300);
let tree = reopen_tree(dir.path());
let seq = tree.verify_checksum_with(&VerifyOptions::default());
let par = tree.verify_checksum_with(&VerifyOptions::default().parallelism(4));
assert!(
seq.sst_files_scanned >= 2,
"need >1 SST to exercise parallelism, got {}",
seq.sst_files_scanned,
);
assert_eq!(seq.sst_files_scanned, par.sst_files_scanned);
assert_eq!(seq.blocks_scanned, par.blocks_scanned);
assert_eq!(seq.errors.len(), par.errors.len());
assert!(
seq.is_ok() && par.is_ok(),
"clean tree: seq={seq:?} par={par:?}"
);
}
#[test]
fn verify_checksum_with_throttle_runs_inter_sst_pause() {
let dir = tempfile::tempdir().unwrap();
populate_multi_sst(dir.path(), 3, 300);
let tree = reopen_tree(dir.path());
let report = tree.verify_checksum_with(
&VerifyOptions::default().throttle(std::time::Duration::from_nanos(1)),
);
assert!(
report.sst_files_scanned >= 2,
"need >1 SST to exercise the inter-SST throttle, got {}",
report.sst_files_scanned,
);
assert!(report.is_ok(), "clean tree must verify clean: {report:?}");
}
#[test]
fn verify_checksum_with_parallel_detects_corruption() {
use crate::table::block::Header;
let dir = tempfile::tempdir().unwrap();
populate_multi_sst(dir.path(), 4, 300);
let sst_path = pick_first_sst_path(dir.path());
let flip_offset = Header::MIN_LEN as u64;
{
let mut f = std::fs::OpenOptions::new()
.read(true)
.write(true)
.open(&sst_path)
.unwrap();
f.seek(SeekFrom::Start(flip_offset)).unwrap();
let mut byte = [0u8; 1];
f.read_exact(&mut byte).unwrap();
byte[0] ^= 0xFF;
f.seek(SeekFrom::Start(flip_offset)).unwrap();
f.write_all(&byte).unwrap();
f.sync_all().unwrap();
}
let tree = reopen_tree(dir.path());
let report = tree.verify_checksum_with(&VerifyOptions::default().parallelism(4));
assert!(
!report.is_ok(),
"parallel scrub must surface the flipped byte: {report:?}",
);
}
#[test]
fn verify_checksum_with_throttle_completes_clean() {
let dir = tempfile::tempdir().unwrap();
populate_multi_sst(dir.path(), 3, 200);
let tree = reopen_tree(dir.path());
let opts = VerifyOptions::default()
.parallelism(2)
.throttle(std::time::Duration::from_millis(1));
let report = tree.verify_checksum_with(&opts);
assert!(
report.is_ok(),
"throttled scrub must still verify clean: {report:?}"
);
assert!(report.sst_files_scanned >= 2);
}
#[test]
fn verify_checksum_with_throttle_does_not_sleep_after_last_sst() {
let dir = tempfile::tempdir().unwrap();
populate_multi_sst(dir.path(), 1, 50);
let tree = reopen_tree(dir.path());
let throttle = std::time::Duration::from_millis(400);
let opts = VerifyOptions::default().parallelism(1).throttle(throttle);
let start = std::time::Instant::now();
let report = tree.verify_checksum_with(&opts);
let elapsed = start.elapsed();
assert!(report.is_ok(), "clean single-SST scrub: {report:?}");
assert_eq!(report.sst_files_scanned, 1, "test needs exactly one SST");
assert!(
elapsed < throttle / 2,
"a single-SST scrub must not sleep the inter-SST throttle after the \
last table: took {elapsed:?} with a {throttle:?} throttle",
);
}