use super::*;
use crate::coding::Encode;
use crate::fs::{FsOpenOptions, MemFs};
use crate::io::{LittleEndian, WriteBytesExt};
use crate::version::edit::{AddedBlobFile, ChangedLevel, TableDesc, VersionEdit};
use std::io::Write;
fn recovery_with(version_id: u64, table_ids: Vec<Vec<Vec<RecoveredTable>>>) -> Recovery {
Recovery {
tree_type: TreeType::Standard,
snapshot_id: version_id,
curr_version_id: version_id,
table_ids,
blob_file_ids: Vec::new(),
gc_stats: crate::blob_tree::FragmentationMap::default(),
restrictions: crate::HashMap::default(),
stats: RecoveryStats::default(),
}
}
fn rtable(id: u64, seqno: u64) -> RecoveredTable {
RecoveredTable {
id,
checksum: Checksum::from_raw(u128::from(id) * 31),
global_seqno: seqno,
}
}
fn tdesc(id: u64, seqno: u64) -> TableDesc {
TableDesc {
id,
checksum: u128::from(id) * 31,
global_seqno: seqno,
}
}
#[test]
fn apply_replaces_a_changed_levels_run_layout_wholesale() {
let mut rec = recovery_with(1, vec![vec![vec![rtable(1, 10)]]]);
let edit = VersionEdit {
new_version_id: 2,
changed_levels: vec![ChangedLevel {
level: 0,
runs: vec![vec![tdesc(1, 10)], vec![tdesc(2, 11)]],
}],
..Default::default()
};
rec.apply_edit(&edit).expect("apply");
assert_eq!(rec.curr_version_id, 2);
assert_eq!(
rec.table_ids,
vec![vec![vec![rtable(1, 10)], vec![rtable(2, 11)]]],
"changed level's run grouping must be reconstructed exactly",
);
}
#[test]
fn apply_leaves_unmentioned_levels_untouched() {
let mut rec = recovery_with(1, vec![vec![vec![rtable(1, 10)]], vec![vec![rtable(9, 5)]]]);
let edit = VersionEdit {
new_version_id: 2,
changed_levels: vec![ChangedLevel {
level: 0,
runs: vec![vec![tdesc(3, 12)]],
}],
..Default::default()
};
rec.apply_edit(&edit).expect("apply");
assert_eq!(rec.table_ids[0], vec![vec![rtable(3, 12)]]);
assert_eq!(
rec.table_ids[1],
vec![vec![rtable(9, 5)]],
"a level the edit does not mention is left as-is",
);
}
#[test]
fn apply_empties_a_drained_level() {
let mut rec = recovery_with(1, vec![vec![vec![rtable(1, 10)]]]);
let edit = VersionEdit {
new_version_id: 2,
changed_levels: vec![ChangedLevel {
level: 0,
runs: vec![],
}],
..Default::default()
};
rec.apply_edit(&edit).expect("apply");
assert!(
rec.table_ids[0].is_empty(),
"a compaction that drains a level leaves zero runs",
);
}
#[test]
fn apply_grows_levels_for_a_higher_index() {
let mut rec = recovery_with(1, vec![vec![vec![rtable(1, 10)]]]);
let edit = VersionEdit {
new_version_id: 2,
changed_levels: vec![ChangedLevel {
level: 2,
runs: vec![vec![tdesc(5, 20)]],
}],
..Default::default()
};
rec.apply_edit(&edit).expect("apply");
assert_eq!(rec.table_ids.len(), 3, "levels grew to fit index 2");
assert!(rec.table_ids[1].is_empty(), "the gap level is empty");
assert_eq!(rec.table_ids[2], vec![vec![rtable(5, 20)]]);
}
#[test]
fn apply_edit_merges_and_advances_restrictions() {
let mut rec = recovery_with(1, vec![vec![vec![rtable(1, 10)]]]);
assert!(rec.restrictions.is_empty(), "starts unrestricted");
rec.apply_edit(&VersionEdit {
new_version_id: 2,
restrictions: vec![(1, crate::UserKey::from(&b"ccc"[..]))],
..Default::default()
})
.expect("apply");
assert_eq!(
rec.restrictions.get(&1),
Some(&crate::UserKey::from(&b"ccc"[..])),
);
rec.apply_edit(&VersionEdit {
new_version_id: 3,
restrictions: vec![(1, crate::UserKey::from(&b"mmm"[..]))],
..Default::default()
})
.expect("apply");
assert_eq!(
rec.restrictions.get(&1),
Some(&crate::UserKey::from(&b"mmm"[..])),
"a later slice's higher bound overwrites the earlier one",
);
}
#[test]
fn parse_restrictions_section_roundtrips_entries() {
let mut bytes = Vec::new();
bytes
.write_u32::<LittleEndian>(2)
.expect("encode test bytes");
bytes
.write_u64::<LittleEndian>(7)
.expect("encode test bytes");
bytes
.write_u32::<LittleEndian>(3)
.expect("encode test bytes");
bytes.write_all(b"mmm").expect("encode test bytes");
bytes
.write_u64::<LittleEndian>(42)
.expect("encode test bytes");
bytes
.write_u32::<LittleEndian>(4)
.expect("encode test bytes");
bytes.write_all(b"zzzz").expect("encode test bytes");
let map = parse_restrictions_section(&bytes).expect("parse");
assert_eq!(map.len(), 2);
assert_eq!(map.get(&7), Some(&crate::UserKey::from(&b"mmm"[..])));
assert_eq!(map.get(&42), Some(&crate::UserKey::from(&b"zzzz"[..])));
}
#[test]
fn parse_restrictions_section_rejects_a_truncated_key() {
let mut bytes = Vec::new();
bytes
.write_u32::<LittleEndian>(1)
.expect("encode test bytes");
bytes
.write_u64::<LittleEndian>(1)
.expect("encode test bytes");
bytes
.write_u32::<LittleEndian>(8)
.expect("encode test bytes");
bytes.write_all(b"xy").expect("encode test bytes");
assert!(
parse_restrictions_section(&bytes).is_err(),
"a key shorter than its length prefix must not silently un-clamp",
);
}
#[test]
fn parse_restrictions_section_rejects_a_duplicate_table_id() {
let mut bytes = Vec::new();
bytes
.write_u32::<LittleEndian>(2)
.expect("encode test bytes");
bytes
.write_u64::<LittleEndian>(5)
.expect("encode test bytes");
bytes
.write_u32::<LittleEndian>(3)
.expect("encode test bytes");
bytes.write_all(b"mmm").expect("encode test bytes");
bytes
.write_u64::<LittleEndian>(5)
.expect("encode test bytes");
bytes
.write_u32::<LittleEndian>(3)
.expect("encode test bytes");
bytes.write_all(b"ccc").expect("encode test bytes");
assert!(
parse_restrictions_section(&bytes).is_err(),
"a duplicate table id must not silently un-clamp an advanced bound",
);
}
#[test]
fn apply_adds_updates_and_removes_blob_files() {
let mut rec = recovery_with(1, vec![]);
rec.blob_file_ids = vec![(100, Checksum::from_raw(1)), (200, Checksum::from_raw(2))];
let edit = VersionEdit {
new_version_id: 2,
added_blob_files: vec![
AddedBlobFile {
id: 300,
checksum: 9,
},
AddedBlobFile {
id: 100,
checksum: 7,
},
],
removed_blob_file_ids: vec![200],
..Default::default()
};
rec.apply_edit(&edit).expect("apply");
assert!(
!rec.blob_file_ids.iter().any(|(id, _)| *id == 200),
"removed blob is gone",
);
assert_eq!(
rec.blob_file_ids
.iter()
.find(|(id, _)| *id == 100)
.map(|(_, c)| *c),
Some(Checksum::from_raw(7)),
"existing blob's checksum updated in place",
);
assert!(
rec.blob_file_ids
.iter()
.any(|(id, c)| *id == 300 && *c == Checksum::from_raw(9)),
"new blob appended",
);
}
#[test]
fn apply_overwrites_gc_stats_when_present() {
let mut rec = recovery_with(1, vec![]);
let mut gc = crate::blob_tree::FragmentationMap::default();
gc.insert(42, crate::blob_tree::FragmentationEntry::new(2, 50, 60));
let mut bytes = Vec::new();
gc.encode_into(&mut bytes).expect("encode gc");
let edit = VersionEdit {
new_version_id: 2,
gc_stats: Some(bytes),
..Default::default()
};
rec.apply_edit(&edit).expect("apply");
assert_eq!(rec.gc_stats, gc, "GC stats overwritten from the edit");
}
fn write_current(folder: &Path, version_id: u64, fs: &dyn Fs) -> crate::Result<()> {
let manifest_path = folder.join(format!("v{version_id}"));
let archive = crate::manifest_blocks::reader::ManifestArchiveReader::open(
&manifest_path,
fs,
alloc::sync::Arc::new(crate::runtime_config::RuntimeConfig::default()),
None,
)?;
let checksum = crate::manifest_blocks::current_digest::compute(version_id, archive.footer())?;
let path = folder.join(CURRENT_VERSION_FILE);
let mut f = fs.open(
&path,
&FsOpenOptions::new().write(true).create(true).truncate(true),
)?;
f.write_u64::<LittleEndian>(version_id)?;
f.write_u128::<LittleEndian>(checksum)?;
f.write_u8(0)?; Ok(())
}
type FixtureWriter = crate::manifest_blocks::writer::ManifestArchiveWriter;
fn open_fixture_writer(folder: &Path, id: u64, fs: &dyn Fs) -> crate::Result<FixtureWriter> {
let path = folder.join(format!("v{id}"));
FixtureWriter::create(
&path,
fs,
alloc::sync::Arc::new(crate::runtime_config::RuntimeConfig::default()),
None,
crate::fs::SyncMode::Normal,
)
}
fn write_tree_type(w: &mut FixtureWriter) -> crate::Result<()> {
w.start("tree_type")?;
w.write_u8(0)?;
Ok(())
}
fn write_empty_blob_files(w: &mut FixtureWriter) -> crate::Result<()> {
w.start("blob_files")?;
w.write_u32::<LittleEndian>(0)?;
Ok(())
}
fn write_empty_blob_gc_stats(w: &mut FixtureWriter) -> crate::Result<()> {
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
Ok(())
}
fn write_corrupt_table_count(folder: &Path, id: u64, fs: &dyn Fs) -> crate::Result<()> {
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(1)?; w.write_u8(1)?; w.write_u32::<LittleEndian>(u32::MAX)?;
write_empty_blob_files(&mut w)?;
write_empty_blob_gc_stats(&mut w)?;
w.finish()?;
Ok(())
}
fn write_corrupt_blob_count(folder: &Path, id: u64, fs: &dyn Fs) -> crate::Result<()> {
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(0)?;
w.start("blob_files")?;
w.write_u32::<LittleEndian>(u32::MAX)?;
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
w.finish()?;
Ok(())
}
#[test]
fn recover_rejects_corrupt_table_count() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/corrupt/tables");
fs.create_dir_all(folder)?;
write_corrupt_table_count(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let Err(err) = recover(folder, &fs, ManifestRecoveryMode::AbsoluteConsistency, None) else {
panic!("corrupt table_count should fail");
};
assert!(
matches!(err, crate::Error::Unrecoverable),
"expected Unrecoverable, got: {err:?}"
);
Ok(())
}
#[test]
fn recover_rejects_corrupt_blob_file_count() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/corrupt/blobs");
fs.create_dir_all(folder)?;
write_corrupt_blob_count(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let Err(err) = recover(folder, &fs, ManifestRecoveryMode::AbsoluteConsistency, None) else {
panic!("corrupt blob_file_count should fail");
};
assert!(
matches!(err, crate::Error::Unrecoverable),
"expected Unrecoverable, got: {err:?}"
);
Ok(())
}
fn write_truncated_tables_tail(
folder: &Path,
id: u64,
declared: u32,
actual: u32,
fs: &dyn Fs,
) -> crate::Result<()> {
assert!(
actual < declared,
"actual must be < declared for truncation"
);
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(1)?; w.write_u8(1)?; w.write_u32::<LittleEndian>(declared)?;
for entry_id in 0..actual {
crate::version::framing::write_framed_record(&mut w, &mut Vec::new(), |payload| {
payload.write_u64::<LittleEndian>(u64::from(entry_id))?;
payload.write_u8(0)?; payload.write_u128::<LittleEndian>(0)?; payload.write_u64::<LittleEndian>(0)?; Ok(())
})?;
}
write_empty_blob_files(&mut w)?;
write_empty_blob_gc_stats(&mut w)?;
w.finish()?;
Ok(())
}
#[test]
fn recover_absolute_consistency_rejects_truncated_tables_tail() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/absolute/tail");
fs.create_dir_all(folder)?;
write_truncated_tables_tail(folder, 1, 5, 1, &fs)?;
write_current(folder, 1, &fs)?;
let result = recover(folder, &fs, ManifestRecoveryMode::AbsoluteConsistency, None);
let err = result.expect_err("truncated tail must abort under AbsoluteConsistency");
assert!(
matches!(&err, crate::Error::Io(e) if e.kind() == crate::io::ErrorKind::UnexpectedEof)
|| matches!(err, crate::Error::Unrecoverable),
"expected UnexpectedEof or Unrecoverable, got: {err:?}",
);
Ok(())
}
#[test]
fn recover_tolerate_tail_keeps_consistent_prefix_of_tables() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/tolerate/tail");
fs.create_dir_all(folder)?;
write_truncated_tables_tail(folder, 1, 5, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(
folder,
&fs,
ManifestRecoveryMode::TolerateCorruptedTailRecords,
None,
)?;
assert_eq!(
recovery.table_ids.len(),
1,
"expected 1 level, got {}: {:?}",
recovery.table_ids.len(),
recovery.table_ids.iter().map(Vec::len).collect::<Vec<_>>(),
);
let level = &recovery.table_ids[0];
assert_eq!(level.len(), 1, "expected 1 run in the recovered level");
assert_eq!(
level[0].len(),
1,
"expected 1 table record (the consistent prefix); got {}",
level[0].len(),
);
Ok(())
}
#[test]
fn recover_tolerate_tail_does_not_swallow_invalid_tag() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/tolerate/bad_tag");
fs.create_dir_all(folder)?;
let mut w = open_fixture_writer(folder, 1, &fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(1)?; w.write_u8(1)?; w.write_u32::<LittleEndian>(1)?; crate::version::framing::write_framed_record(&mut w, &mut Vec::new(), |payload| {
payload.write_u64::<LittleEndian>(0)?; payload.write_u8(0xFF)?; payload.write_u128::<LittleEndian>(0)?;
payload.write_u64::<LittleEndian>(0)?;
Ok(())
})?;
write_empty_blob_files(&mut w)?;
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
w.finish()?;
write_current(folder, 1, &fs)?;
let result = recover(
folder,
&fs,
ManifestRecoveryMode::TolerateCorruptedTailRecords,
None,
);
let err = result.expect_err("InvalidTag must still abort under TolerateCorruptedTailRecords");
assert!(
matches!(err, crate::Error::InvalidTag(("ChecksumType", 0xFF))),
"expected InvalidTag, got: {err:?}",
);
Ok(())
}
fn write_truncated_at_second_run(folder: &Path, id: u64, fs: &dyn Fs) -> crate::Result<()> {
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(1)?; w.write_u8(2)?; w.write_u32::<LittleEndian>(1)?;
crate::version::framing::write_framed_record(&mut w, &mut Vec::new(), |payload| {
payload.write_u64::<LittleEndian>(42)?; payload.write_u8(0)?; payload.write_u128::<LittleEndian>(0)?;
payload.write_u64::<LittleEndian>(0)?;
Ok(())
})?;
w.write_u8(0xAA)?;
w.write_u8(0xBB)?;
write_empty_blob_files(&mut w)?;
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
w.finish()?;
Ok(())
}
#[test]
fn recover_tolerate_tail_keeps_consistent_prefix_within_a_level() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/tolerate/midlevel");
fs.create_dir_all(folder)?;
write_truncated_at_second_run(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(
folder,
&fs,
ManifestRecoveryMode::TolerateCorruptedTailRecords,
None,
)?;
assert_eq!(
recovery.table_ids.len(),
1,
"expected the partially-decoded level to be present, got {} levels",
recovery.table_ids.len(),
);
let level = &recovery.table_ids[0];
assert_eq!(
level.len(),
1,
"expected 1 surviving run (the consistent prefix), got {}",
level.len(),
);
assert_eq!(
level[0].len(),
1,
"expected the run to contain its 1 fully-decoded entry",
);
assert_eq!(level[0][0].id, 42, "wrong entry id recovered");
Ok(())
}
fn write_truncated_blob_tail(
folder: &Path,
id: u64,
declared: u32,
actual: u32,
fs: &dyn Fs,
) -> crate::Result<()> {
assert!(
actual < declared,
"actual must be < declared for truncation"
);
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(0)?; w.start("blob_files")?;
w.write_u32::<LittleEndian>(declared)?;
for entry_id in 0..actual {
crate::version::framing::write_framed_record(&mut w, &mut Vec::new(), |payload| {
payload.write_u64::<LittleEndian>(u64::from(entry_id))?;
payload.write_u8(0)?; payload.write_u128::<LittleEndian>(0)?; Ok(())
})?;
}
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
w.finish()?;
Ok(())
}
#[test]
fn recover_tolerate_tail_keeps_consistent_prefix_of_blob_files() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/tolerate/blob_tail");
fs.create_dir_all(folder)?;
write_truncated_blob_tail(folder, 1, 5, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(
folder,
&fs,
ManifestRecoveryMode::TolerateCorruptedTailRecords,
None,
)?;
assert_eq!(
recovery.blob_file_ids.len(),
1,
"expected 1 surviving blob_file entry (the consistent prefix), got {}",
recovery.blob_file_ids.len(),
);
assert_eq!(
recovery.blob_file_ids[0].0, 0,
"wrong blob_file id recovered",
);
Ok(())
}
fn write_truncated_blob_gc_stats(folder: &Path, id: u64, fs: &dyn Fs) -> crate::Result<()> {
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(0)?; w.start("blob_files")?;
w.write_u32::<LittleEndian>(0)?;
w.start("blob_gc_stats")?;
w.finish()?;
Ok(())
}
#[test]
fn recover_tolerate_tail_handles_truncated_blob_gc_stats() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/tolerate/gc_stats");
fs.create_dir_all(folder)?;
write_truncated_blob_gc_stats(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let strict = recover(folder, &fs, ManifestRecoveryMode::AbsoluteConsistency, None);
assert!(
strict.is_err(),
"strict mode must abort on truncated blob_gc_stats; got Ok",
);
let lenient = recover(
folder,
&fs,
ManifestRecoveryMode::TolerateCorruptedTailRecords,
None,
)?;
assert_eq!(
lenient.gc_stats,
crate::blob_tree::FragmentationMap::default(),
"tolerant mode must produce default (empty) gc_stats on truncated section",
);
Ok(())
}
fn write_good_table_record<W: std::io::Write>(w: &mut W, id: u64) -> crate::Result<()> {
crate::version::framing::write_framed_record(w, &mut Vec::new(), |payload| {
payload.write_u64::<LittleEndian>(id)?;
payload.write_u8(0)?; payload.write_u128::<LittleEndian>(0)?;
payload.write_u64::<LittleEndian>(0)?;
Ok(())
})
}
fn write_bad_table_record<W: std::io::Write>(w: &mut W, id: u64) -> crate::Result<()> {
let mut payload: Vec<u8> = Vec::new();
payload.write_u64::<LittleEndian>(id)?;
payload.write_u8(0)?;
payload.write_u128::<LittleEndian>(0)?;
payload.write_u64::<LittleEndian>(0)?;
#[expect(
clippy::cast_possible_truncation,
reason = "payload is 33 bytes — fits in u32"
)]
let len = payload.len() as u32;
w.write_u32::<LittleEndian>(len)?;
w.write_u64::<LittleEndian>(0xDEAD_BEEF_DEAD_BEEF)?;
w.write_all(&payload)?;
Ok(())
}
fn write_manifest_with_mid_record_corruption(
folder: &Path,
id: u64,
fs: &dyn Fs,
) -> crate::Result<()> {
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(2)?; w.write_u8(1)?;
w.write_u32::<LittleEndian>(3)?;
write_good_table_record(&mut w, 100)?;
write_bad_table_record(&mut w, 101)?;
write_good_table_record(&mut w, 102)?;
w.write_u8(1)?;
w.write_u32::<LittleEndian>(2)?;
write_good_table_record(&mut w, 200)?;
write_good_table_record(&mut w, 201)?;
write_empty_blob_files(&mut w)?;
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
w.finish()?;
Ok(())
}
#[test]
fn recover_absolute_consistency_rejects_mid_record_corruption() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/absolute/mid_corrupt");
fs.create_dir_all(folder)?;
write_manifest_with_mid_record_corruption(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let err = recover(folder, &fs, ManifestRecoveryMode::AbsoluteConsistency, None)
.expect_err("corrupt record must abort AbsoluteConsistency");
assert!(
matches!(
err,
crate::Error::ManifestFrameChecksumMismatch {
section: "tables",
..
}
),
"expected ManifestFrameChecksumMismatch on the tables section, got: {err:?}",
);
Ok(())
}
#[test]
fn recover_pit_truncates_at_corrupt_record() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/pit/mid_corrupt");
fs.create_dir_all(folder)?;
write_manifest_with_mid_record_corruption(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(folder, &fs, ManifestRecoveryMode::PointInTimeRecovery, None)?;
assert_eq!(
recovery.table_ids.len(),
2,
"expected both persisted level slots to survive (level 1 padded \
empty after PIT truncated its records); got {}",
recovery.table_ids.len(),
);
let level = &recovery.table_ids[0];
assert_eq!(level.len(), 1, "expected 1 run in level 0");
let run = &level[0];
assert_eq!(
run.len(),
1,
"expected only the pre-corruption record to survive in run 0; got {} records",
run.len(),
);
assert!(
recovery.table_ids[1].is_empty(),
"expected level 1 to be empty (PIT dropped its records); got {} runs",
recovery.table_ids[1].len(),
);
assert_eq!(run[0].id, 100, "expected id=100 (the good prefix record)");
Ok(())
}
#[test]
fn recover_skip_any_skips_corrupt_record_and_keeps_neighbours() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/skip_any/mid_corrupt");
fs.create_dir_all(folder)?;
write_manifest_with_mid_record_corruption(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(
folder,
&fs,
ManifestRecoveryMode::SkipAnyCorruptedRecords,
None,
)?;
assert_eq!(
recovery.table_ids.len(),
2,
"expected both levels to survive under SkipAny",
);
let l0_run = &recovery.table_ids[0][0];
assert_eq!(
l0_run.len(),
2,
"expected 2 records in level-0 run 0 (id=100 + id=102, skipping the corrupt id=101); got {}",
l0_run.len(),
);
assert_eq!(l0_run[0].id, 100);
assert_eq!(l0_run[1].id, 102);
let l1_run = &recovery.table_ids[1][0];
assert_eq!(
l1_run.len(),
2,
"expected level 1 to recover its full 2 records under SkipAny",
);
assert_eq!(l1_run[0].id, 200);
assert_eq!(l1_run[1].id, 201);
Ok(())
}
fn write_manifest_with_corrupt_blob_record(
folder: &Path,
id: u64,
fs: &dyn Fs,
) -> crate::Result<()> {
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(0)?;
w.start("blob_files")?;
w.write_u32::<LittleEndian>(3)?;
crate::version::framing::write_framed_record(&mut w, &mut Vec::new(), |payload| {
payload.write_u64::<LittleEndian>(10)?;
payload.write_u8(0)?;
payload.write_u128::<LittleEndian>(0)?;
Ok(())
})?;
let mut payload: Vec<u8> = Vec::new();
payload.write_u64::<LittleEndian>(11)?;
payload.write_u8(0)?;
payload.write_u128::<LittleEndian>(0)?;
#[expect(
clippy::cast_possible_truncation,
reason = "payload is 25 bytes — fits in u32"
)]
let len = payload.len() as u32;
w.write_u32::<LittleEndian>(len)?;
w.write_u64::<LittleEndian>(0xDEAD_BEEF_DEAD_BEEF)?;
w.write_all(&payload)?;
crate::version::framing::write_framed_record(&mut w, &mut Vec::new(), |payload| {
payload.write_u64::<LittleEndian>(12)?;
payload.write_u8(0)?;
payload.write_u128::<LittleEndian>(0)?;
Ok(())
})?;
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
w.finish()?;
Ok(())
}
#[test]
fn recover_skip_any_skips_corrupt_blob_record() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/skip_any/blob_mid_corrupt");
fs.create_dir_all(folder)?;
write_manifest_with_corrupt_blob_record(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(
folder,
&fs,
ManifestRecoveryMode::SkipAnyCorruptedRecords,
None,
)?;
let ids: Vec<u64> = recovery.blob_file_ids.iter().map(|(id, _)| *id).collect();
assert_eq!(
ids,
vec![10, 12],
"expected SkipAny to keep ids 10 and 12 while skipping the corrupt id 11",
);
Ok(())
}
fn write_manifest_with_corrupt_first_record_of_second_level(
folder: &Path,
id: u64,
fs: &dyn Fs,
) -> crate::Result<()> {
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(2)?; w.write_u8(1)?;
w.write_u32::<LittleEndian>(1)?;
write_good_table_record(&mut w, 100)?;
w.write_u8(1)?;
w.write_u32::<LittleEndian>(1)?;
write_bad_table_record(&mut w, 200)?;
write_empty_blob_files(&mut w)?;
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
w.finish()?;
Ok(())
}
#[test]
fn recover_pit_drops_empty_run_when_corruption_hits_first_record() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/pit/empty_run");
fs.create_dir_all(folder)?;
write_manifest_with_corrupt_first_record_of_second_level(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(folder, &fs, ManifestRecoveryMode::PointInTimeRecovery, None)?;
assert_eq!(
recovery.table_ids.len(),
2,
"expected the recovered shape to preserve the persisted \
level_count (2); got {} levels",
recovery.table_ids.len(),
);
for (level_idx, level) in recovery.table_ids.iter().enumerate() {
for (run_idx, run) in level.iter().enumerate() {
assert!(
!run.is_empty(),
"level {level_idx} run {run_idx} is empty — \
Version::from_recovery calls Run::new on this and \
panics via the .expect(\"persisted runs should not \
be empty\")",
);
}
}
assert_eq!(recovery.table_ids[0].len(), 1, "level 0 should have 1 run");
assert_eq!(recovery.table_ids[0][0][0].id, 100);
assert!(
recovery.table_ids[1].is_empty(),
"level 1 should have no runs after PIT dropped its corrupt-only run",
);
Ok(())
}
fn write_manifest_with_all_records_in_run_corrupt(
folder: &Path,
id: u64,
fs: &dyn Fs,
) -> crate::Result<()> {
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(2)?; w.write_u8(1)?;
w.write_u32::<LittleEndian>(1)?;
write_bad_table_record(&mut w, 100)?;
w.write_u8(1)?;
w.write_u32::<LittleEndian>(1)?;
write_good_table_record(&mut w, 200)?;
write_empty_blob_files(&mut w)?;
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
w.finish()?;
Ok(())
}
#[test]
fn recover_skip_any_drops_run_when_all_records_corrupt() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/skip_any/all_corrupt_run");
fs.create_dir_all(folder)?;
write_manifest_with_all_records_in_run_corrupt(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(
folder,
&fs,
ManifestRecoveryMode::SkipAnyCorruptedRecords,
None,
)?;
assert_eq!(recovery.table_ids.len(), 2);
for (run_idx, run) in recovery.table_ids[0].iter().enumerate() {
assert!(
!run.is_empty(),
"level 0 run {run_idx} is empty in Recovery — \
Version::from_recovery's Run::new(empty).expect() panics here",
);
}
assert_eq!(recovery.table_ids[1].len(), 1);
assert_eq!(recovery.table_ids[1][0][0].id, 200);
Ok(())
}
#[test]
fn recover_pit_truncates_remaining_blob_records_on_corruption() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/pit/blob_mid_corrupt");
fs.create_dir_all(folder)?;
write_manifest_with_corrupt_blob_record(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(folder, &fs, ManifestRecoveryMode::PointInTimeRecovery, None)?;
let ids: Vec<u64> = recovery.blob_file_ids.iter().map(|(id, _)| *id).collect();
assert_eq!(
ids,
vec![10],
"PIT must drop the corrupt blob record AND every blob record after it; \
expected only id=10 (the good prefix), got {ids:?}",
);
Ok(())
}
fn write_manifest_skip_any_then_tail_truncated(
folder: &Path,
id: u64,
fs: &dyn Fs,
) -> crate::Result<()> {
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(1)?; w.write_u8(1)?; w.write_u32::<LittleEndian>(3)?; write_good_table_record(&mut w, 100)?;
write_bad_table_record(&mut w, 101)?;
write_empty_blob_files(&mut w)?;
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
w.finish()?;
Ok(())
}
#[test]
fn recover_skip_any_then_tail_accounts_corruption_separately() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/skip_any/then_tail");
fs.create_dir_all(folder)?;
write_manifest_skip_any_then_tail_truncated(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(
folder,
&fs,
ManifestRecoveryMode::SkipAnyCorruptedRecords,
None,
)?;
assert_eq!(
recovery.stats.tables_dropped_to_corruption, 1,
"the corrupt record must land in the corruption counter",
);
assert_eq!(
recovery.stats.tables_dropped_to_tail, 1,
"exactly one trailing record was truncated; the previously-skipped \
corrupt record must NOT be re-counted here as tail (pre-fix value \
would be 2)",
);
Ok(())
}
fn write_manifest_blob_skip_any_then_tail_truncated(
folder: &Path,
id: u64,
fs: &dyn Fs,
) -> crate::Result<()> {
let mut w = open_fixture_writer(folder, id, fs)?;
write_tree_type(&mut w)?;
w.start("tables")?;
w.write_u8(0)?;
w.start("blob_files")?;
w.write_u32::<LittleEndian>(3)?; crate::version::framing::write_framed_record(&mut w, &mut Vec::new(), |payload| {
payload.write_u64::<LittleEndian>(10)?;
payload.write_u8(0)?;
payload.write_u128::<LittleEndian>(0)?;
Ok(())
})?;
let mut payload: Vec<u8> = Vec::new();
payload.write_u64::<LittleEndian>(11)?;
payload.write_u8(0)?;
payload.write_u128::<LittleEndian>(0)?;
#[expect(
clippy::cast_possible_truncation,
reason = "payload is 25 bytes — fits in u32"
)]
let len = payload.len() as u32;
w.write_u32::<LittleEndian>(len)?;
w.write_u64::<LittleEndian>(0xDEAD_BEEF_DEAD_BEEF)?;
w.write_all(&payload)?;
w.start("blob_gc_stats")?;
w.write_u32::<LittleEndian>(0)?;
w.finish()?;
Ok(())
}
#[test]
fn recover_skip_any_then_tail_accounts_blob_corruption_separately() -> crate::Result<()> {
let fs = MemFs::new();
let folder = Path::new("/skip_any/blob_then_tail");
fs.create_dir_all(folder)?;
write_manifest_blob_skip_any_then_tail_truncated(folder, 1, &fs)?;
write_current(folder, 1, &fs)?;
let recovery = recover(
folder,
&fs,
ManifestRecoveryMode::SkipAnyCorruptedRecords,
None,
)?;
assert_eq!(
recovery.stats.blob_dropped_to_corruption, 1,
"the corrupt blob record must land in the corruption counter",
);
assert_eq!(
recovery.stats.blob_dropped_to_tail, 1,
"exactly one trailing blob record was truncated; the previously-skipped \
corrupt record must NOT be re-counted here as tail (pre-fix value \
would be 2)",
);
Ok(())
}