use super::{DropReason, salvage_sst};
use crate::comparator::default_comparator;
use crate::fs::{Fs, StdFs};
use crate::table::{Table, Writer};
use crate::{InternalValue, ValueType};
use alloc::sync::Arc;
use tempfile::tempdir;
use test_log::test;
fn iv(i: u32) -> InternalValue {
InternalValue::from_components(
format!("key{i:05}").into_bytes(),
format!("val{i:05}").into_bytes(),
1,
ValueType::Value,
)
}
fn open(path: std::path::PathBuf, fs: &Arc<dyn Fs>) -> crate::Result<Table> {
let checksum = crate::Checksum::from_raw(crate::repair::compute_table_checksum(&**fs, &path)?);
Table::recover(
path,
checksum,
0,
0,
0,
Arc::new(crate::cache::Cache::with_capacity_bytes(1 << 20)),
Some(Arc::new(crate::descriptor_table::DescriptorTable::new(8))),
Arc::clone(fs),
false,
false,
None,
#[cfg(zstd_any)]
None,
default_comparator(),
#[cfg(feature = "metrics")]
Arc::new(crate::Metrics::default()),
)
}
fn reopen_item_count(path: std::path::PathBuf, fs: &Arc<dyn Fs>) -> crate::Result<u64> {
Ok(open(path, fs)?.metadata.item_count)
}
#[test]
fn salvage_of_a_healthy_sst_recovers_every_block() -> crate::Result<()> {
let dir = tempdir()?;
let source = dir.path().join("source");
let dest = dir.path().join("salvaged");
let fs: Arc<dyn Fs> = Arc::new(StdFs);
let mut writer = Writer::new(source.clone(), 0, 0, Arc::clone(&fs))?.use_data_block_size(256);
let n = 200u32;
for i in 0..n {
writer.write(iv(i))?;
}
assert!(writer.finish()?.is_some(), "source SST is non-empty");
let report = salvage_sst(&source, dest.clone(), &fs)?;
assert!(
report.is_complete(),
"a healthy SST salvages with no dropped blocks: {report:?}",
);
assert!(
report.blocks_total >= 2,
"256-byte blocks over 200 entries should yield several data blocks, got {}",
report.blocks_total,
);
assert_eq!(
report.blocks_salvaged, report.blocks_total,
"every block of a healthy SST is salvaged",
);
assert_eq!(
report.entries_salvaged,
u64::from(n),
"every entry is recovered",
);
assert_eq!(
report.salvaged_path.as_deref(),
Some(dest.as_path()),
"a salvaged file is written when at least one block is recovered",
);
assert_eq!(
reopen_item_count(dest, &fs)?,
u64::from(n),
"the salvaged SST reopens with the full item count",
);
Ok(())
}
#[test]
fn salvage_drops_a_corrupted_block_and_keeps_the_rest() -> crate::Result<()> {
let dir = tempdir()?;
let source = dir.path().join("source");
let dest = dir.path().join("salvaged");
let fs: Arc<dyn Fs> = Arc::new(StdFs);
let mut writer = Writer::new(source.clone(), 0, 0, Arc::clone(&fs))?.use_data_block_size(256);
let n = 200u32;
for i in 0..n {
writer.write(iv(i))?;
}
assert!(writer.finish()?.is_some(), "source SST is non-empty");
let target = {
let table = open(source.clone(), &fs)?;
let offsets: alloc::vec::Vec<u64> = table
.data_block_handles()
.filter_map(Result::ok)
.map(|kh| *kh.as_ref().offset())
.collect();
let Some(&second) = offsets.get(1) else {
panic!("source SST must have at least two data blocks, got {offsets:?}");
};
second
};
let flip = usize::try_from(target).unwrap_or(0) + 16;
let mut bytes = std::fs::read(&source)?;
if let Some(b) = bytes.get_mut(flip) {
*b ^= 0xFF;
}
std::fs::write(&source, &bytes)?;
let report = salvage_sst(&source, dest.clone(), &fs)?;
assert!(
!report.is_complete(),
"a corrupted block must be reported as dropped: {report:?}",
);
assert_eq!(
report.dropped.len(),
1,
"exactly the one corrupted block is dropped: {report:?}",
);
assert_eq!(
report.blocks_salvaged,
report.blocks_total - 1,
"every block but the corrupted one is recovered",
);
assert!(
report.entries_salvaged > 0 && report.entries_salvaged < u64::from(n),
"a partial key range is recovered, got {} of {n}",
report.entries_salvaged,
);
assert!(
report.dropped.first().is_some_and(|d| {
matches!(d.reason, DropReason::ChecksumMismatch) && d.key_range.is_some()
}),
"the dropped block reports a checksum mismatch and names the key range it lost: {report:?}",
);
assert_eq!(report.salvaged_path.as_deref(), Some(dest.as_path()));
assert_eq!(
reopen_item_count(dest, &fs)?,
report.entries_salvaged,
"the salvaged SST holds exactly the entries the report counted",
);
Ok(())
}
#[cfg(feature = "columnar")]
#[test]
fn salvage_drops_a_corrupted_columnar_block_and_keeps_the_rest() -> crate::Result<()> {
let dir = tempdir()?;
let source = dir.path().join("source");
let dest = dir.path().join("salvaged");
let fs: Arc<dyn Fs> = Arc::new(StdFs);
let mut writer = Writer::new(source.clone(), 0, 0, Arc::clone(&fs))?
.use_columnar(true)
.use_zone_map(true)
.use_data_block_size(256);
let n = 200u32;
for i in 0..n {
writer.write(iv(i))?;
}
assert!(
writer.finish()?.is_some(),
"source columnar SST is non-empty"
);
let target = {
let table = open(source.clone(), &fs)?;
let offsets: alloc::vec::Vec<u64> = table
.data_block_handles()
.filter_map(Result::ok)
.map(|kh| *kh.as_ref().offset())
.collect();
let Some(&second) = offsets.get(1) else {
panic!("source columnar SST must have at least two data blocks, got {offsets:?}");
};
second
};
let flip = usize::try_from(target).unwrap_or(0) + 16;
let mut bytes = std::fs::read(&source)?;
if let Some(b) = bytes.get_mut(flip) {
*b ^= 0xFF;
}
std::fs::write(&source, &bytes)?;
let report = salvage_sst(&source, dest.clone(), &fs)?;
assert_eq!(
report.dropped.len(),
1,
"exactly the one corrupted columnar block is dropped: {report:?}",
);
assert_eq!(
report.blocks_salvaged,
report.blocks_total - 1,
"every columnar block but the corrupted one is recovered",
);
assert!(
report.entries_salvaged > 0 && report.entries_salvaged < u64::from(n),
"a partial key range is recovered, got {} of {n}",
report.entries_salvaged,
);
assert_eq!(report.salvaged_path.as_deref(), Some(dest.as_path()));
assert_eq!(reopen_item_count(dest, &fs)?, report.entries_salvaged);
Ok(())
}
#[cfg(feature = "columnar")]
#[test]
fn salvage_tolerates_a_corrupt_delete_bitmap_as_all_live() -> crate::Result<()> {
use crate::config::DeleteStrategy;
let dir = tempdir()?;
let source = dir.path().join("source");
let dest = dir.path().join("salvaged");
let fs: Arc<dyn Fs> = Arc::new(StdFs);
let n = 200u32;
let mut writer = Writer::new(source.clone(), 0, 0, Arc::clone(&fs))?
.use_columnar(true)
.use_zone_map(true)
.use_data_block_size(256)
.delete_strategy(DeleteStrategy::MergeOnRead);
for i in 0..n {
writer.write(iv(i))?;
}
for pos in [5u32, 50, 150] {
writer.delete_bitmap_mut().insert(pos);
}
assert!(
writer.finish()?.is_some(),
"source columnar+deletes SST is non-empty",
);
let (db_pos, db_len) = {
let mut f = std::fs::File::open(&source)?;
let reader = match crate::sfa::Reader::from_reader(&mut f) {
Ok(r) => r,
Err(e) => panic!("reading the SFA trailer failed: {e:?}"),
};
let Some(entry) = reader.toc().iter().find(|e| e.name() == b"delete_bitmap") else {
panic!("source must carry a delete_bitmap section");
};
(entry.pos(), entry.len())
};
let flip = usize::try_from(db_pos + db_len / 2).unwrap_or(0);
let mut bytes = std::fs::read(&source)?;
if let Some(b) = bytes.get_mut(flip) {
*b ^= 0xFF;
}
std::fs::write(&source, &bytes)?;
assert!(
open(source.clone(), &fs).is_err(),
"normal recovery must fail closed on a corrupt delete-bitmap",
);
let report = salvage_sst(&source, dest.clone(), &fs)?;
assert!(
report.is_complete(),
"the data blocks are intact; only the sidecar was corrupt: {report:?}",
);
assert_eq!(
report.entries_salvaged,
u64::from(n),
"every row is recovered live, the corrupt bitmap is ignored",
);
assert_eq!(reopen_item_count(dest, &fs)?, u64::from(n));
Ok(())
}
#[cfg(feature = "columnar")]
#[test]
fn salvage_ignores_a_delete_bitmap_without_a_readable_zone_map() -> crate::Result<()> {
use crate::config::DeleteStrategy;
let dir = tempdir()?;
let source = dir.path().join("source");
let dest = dir.path().join("salvaged");
let fs: Arc<dyn Fs> = Arc::new(StdFs);
let n = 200u32;
let mut writer = Writer::new(source.clone(), 0, 0, Arc::clone(&fs))?
.use_columnar(true)
.use_zone_map(true)
.use_data_block_size(256)
.delete_strategy(DeleteStrategy::MergeOnRead);
for i in 0..n {
writer.write(iv(i))?;
}
for pos in [5u32, 50, 150] {
writer.delete_bitmap_mut().insert(pos);
}
assert!(
writer.finish()?.is_some(),
"source columnar+deletes SST is non-empty",
);
let (zm_pos, zm_len) = {
let mut f = std::fs::File::open(&source)?;
let reader = match crate::sfa::Reader::from_reader(&mut f) {
Ok(r) => r,
Err(e) => panic!("reading the SFA trailer failed: {e:?}"),
};
let Some(entry) = reader.toc().iter().find(|e| e.name() == b"zone_map") else {
panic!("source must carry a zone_map section");
};
(entry.pos(), entry.len())
};
let flip = usize::try_from(zm_pos + zm_len / 2).unwrap_or(0);
let mut bytes = std::fs::read(&source)?;
if let Some(b) = bytes.get_mut(flip) {
*b ^= 0xFF;
}
std::fs::write(&source, &bytes)?;
assert!(
open(source.clone(), &fs).is_err(),
"normal recovery must reject a bitmap with no readable zone map",
);
let report = salvage_sst(&source, dest.clone(), &fs)?;
assert!(
report.is_complete(),
"the data blocks are intact; only the zone map was corrupt: {report:?}",
);
assert_eq!(
report.entries_salvaged,
u64::from(n),
"every row is recovered live once the unpositionable bitmap is ignored",
);
assert_eq!(reopen_item_count(dest, &fs)?, u64::from(n));
Ok(())
}
#[test]
fn salvage_sst_errors_when_the_source_cannot_be_opened() -> crate::Result<()> {
let dir = tempdir()?;
let source = dir.path().join("source");
let dest = dir.path().join("salvaged");
let fs: Arc<dyn Fs> = Arc::new(StdFs);
let mut writer = Writer::new(source.clone(), 0, 0, Arc::clone(&fs))?;
for i in 0..50 {
writer.write(iv(i))?;
}
assert!(writer.finish()?.is_some(), "source SST is non-empty");
let mut bytes = std::fs::read(&source)?;
bytes.truncate(bytes.len() / 2);
std::fs::write(&source, &bytes)?;
assert!(
salvage_sst(&source, dest.clone(), &fs).is_err(),
"an unparseable container must fail salvage, not write a partial file",
);
assert!(
!dest.exists(),
"no destination is written on an open failure"
);
Ok(())
}
#[test]
fn salvage_sst_recovers_nothing_when_the_only_block_is_corrupt() -> crate::Result<()> {
let dir = tempdir()?;
let source = dir.path().join("source");
let dest = dir.path().join("salvaged");
let fs: Arc<dyn Fs> = Arc::new(StdFs);
let mut writer = Writer::new(source.clone(), 0, 0, Arc::clone(&fs))?;
for i in 0..8 {
writer.write(iv(i))?;
}
assert!(writer.finish()?.is_some(), "source SST is non-empty");
let target = {
let table = open(source.clone(), &fs)?;
let offsets: alloc::vec::Vec<u64> = table
.data_block_handles()
.filter_map(Result::ok)
.map(|kh| *kh.as_ref().offset())
.collect();
let Some(&only) = offsets.first() else {
panic!("expected a single data block, got {offsets:?}");
};
assert_eq!(
offsets.len(),
1,
"expected a single data block, got {offsets:?}"
);
only
};
let flip = usize::try_from(target).unwrap_or(0) + 16;
let mut bytes = std::fs::read(&source)?;
if let Some(b) = bytes.get_mut(flip) {
*b ^= 0xFF;
}
std::fs::write(&source, &bytes)?;
let report = salvage_sst(&source, dest.clone(), &fs)?;
assert_eq!(report.blocks_salvaged, 0, "the only block was corrupt");
assert_eq!(report.entries_salvaged, 0, "no entries recovered");
assert_eq!(report.dropped.len(), 1, "the dropped block is reported");
assert!(
report.salvaged_path.is_none(),
"nothing recoverable means no file is written",
);
assert!(!dest.exists(), "no destination file on an empty salvage");
Ok(())
}
#[cfg(feature = "columnar")]
#[test]
fn salvage_skips_a_wholly_deleted_block() -> crate::Result<()> {
use crate::config::DeleteStrategy;
let dir = tempdir()?;
let source = dir.path().join("source");
let dest = dir.path().join("salvaged");
let fs: Arc<dyn Fs> = Arc::new(StdFs);
let n = 200u32;
let mut writer = Writer::new(source.clone(), 0, 0, Arc::clone(&fs))?
.use_columnar(true)
.use_zone_map(true)
.use_data_block_size(256)
.delete_strategy(DeleteStrategy::MergeOnRead);
for i in 0..n {
writer.write(iv(i))?;
}
let deleted = 60u32;
for pos in 0..deleted {
writer.delete_bitmap_mut().insert(pos);
}
assert!(
writer.finish()?.is_some(),
"source columnar+deletes SST is non-empty",
);
let report = salvage_sst(&source, dest.clone(), &fs)?;
assert!(
report.is_complete(),
"wholly-deleted blocks are skipped, not dropped: {report:?}",
);
assert!(
report.blocks_salvaged < report.blocks_total,
"at least one leading block was wholly deleted and skipped: {report:?}",
);
assert_eq!(
report.entries_salvaged,
u64::from(n - deleted),
"every live row is recovered, the deleted prefix is skipped",
);
assert_eq!(reopen_item_count(dest, &fs)?, u64::from(n - deleted));
Ok(())
}
#[test]
fn salvage_rejects_an_sst_with_range_tombstones() -> crate::Result<()> {
use crate::UserKey;
use crate::range_tombstone::RangeTombstone;
let dir = tempdir()?;
let source = dir.path().join("source");
let dest = dir.path().join("salvaged");
let fs: Arc<dyn Fs> = Arc::new(StdFs);
let mut writer = Writer::new(source.clone(), 0, 0, Arc::clone(&fs))?;
for i in 0..20 {
writer.write(iv(i))?;
}
writer.write_range_tombstone(RangeTombstone::new(
UserKey::from(b"key00005".as_slice()),
UserKey::from(b"key00010".as_slice()),
2,
));
assert!(writer.finish()?.is_some(), "source SST is non-empty");
let result = salvage_sst(&source, dest.clone(), &fs);
assert!(
matches!(result, Err(crate::Error::FeatureUnsupported(_))),
"an SST with range tombstones must fail closed, got {result:?}",
);
assert!(
!dest.exists(),
"no salvaged file is written when salvage fails closed",
);
Ok(())
}
#[test]
fn salvage_sst_reads_and_writes_through_the_injected_fs() -> crate::Result<()> {
use crate::fs::MemFs;
let fs: Arc<dyn Fs> = Arc::new(MemFs::new());
let dir = std::path::absolute("/memfs")?;
fs.create_dir_all(&dir)?;
let source = dir.join("source");
let dest = dir.join("salvaged");
let mut writer = Writer::new(source.clone(), 0, 0, Arc::clone(&fs))?.use_data_block_size(256);
let n = 200u32;
for i in 0..n {
writer.write(iv(i))?;
}
assert!(
writer.finish()?.is_some(),
"in-memory source SST is non-empty"
);
let report = salvage_sst(&source, dest.clone(), &fs)?;
assert!(
report.is_complete(),
"a healthy in-memory SST salvages with no dropped blocks: {report:?}",
);
assert_eq!(
report.entries_salvaged,
u64::from(n),
"every entry is recovered through the in-memory backend",
);
assert_eq!(report.salvaged_path.as_deref(), Some(dest.as_path()));
assert_eq!(
reopen_item_count(dest, &fs)?,
u64::from(n),
"the salvaged SST reopens through the same in-memory backend",
);
Ok(())
}