use crate::{
AbstractTree,
Config,
MAX_SEQNO,
SequenceNumberCounter,
runtime_config::EccScheme,
table::{block::Header, block_index::BlockIndex as _},
};
use alloc::sync::Arc;
#[test]
fn ecc_heal_strategy_rewrites_flagged_sst_so_reread_is_clean() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let (sst_path, corrupt_pos) = {
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.page_ecc(true)
.ecc_scheme(EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.open()?;
let crate::AnyTree::Standard(tree) = tree else {
unreachable!("standard tree configured (no kv separation)");
};
for i in 0u64..2_000 {
tree.insert(format!("key-{i:06}"), format!("v{i:06}"), i);
}
tree.flush_active_memtable(2_000)?;
let binding = tree.version_history.read().latest_version();
#[expect(clippy::expect_used, reason = "flush produced exactly one table")]
let table = binding.version.iter_tables().next().expect("one table");
#[expect(clippy::expect_used, reason = "table has at least one data block")]
let keyed = table.block_index.iter().next().expect("a data block")?;
#[allow(
clippy::cast_possible_truncation,
reason = "in-file block offset fits usize; only narrows on 32-bit targets"
)]
let off = keyed.offset().0 as usize;
((*table.path).clone(), off + Header::MIN_LEN + 3)
};
let mut bytes = std::fs::read(&sst_path)?;
#[expect(
clippy::indexing_slicing,
reason = "corrupt_pos is an in-file block offset, in range for the SST bytes"
)]
let slot = &mut bytes[corrupt_pos];
*slot ^= 0x80;
std::fs::write(&sst_path, &bytes)?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.page_ecc(true)
.ecc_scheme(EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.open()?;
let crate::AnyTree::Standard(tree) = tree else {
unreachable!("standard tree configured (no kv separation)");
};
assert!(tree.heal_hints().is_empty(), "fresh tree has no hints");
assert!(!tree.heal_hints().is_enabled(), "auto_heal defaults off");
tree.update_runtime_config(|c| c.auto_heal = true)?;
assert!(
tree.heal_hints().is_enabled(),
"auto_heal toggle syncs the gate"
);
#[expect(clippy::expect_used, reason = "key was inserted before flush")]
let got = tree.get(b"key-000000", MAX_SEQNO)?.expect("key present");
assert_eq!(&*got, b"v000000", "ECC must repair the value on read");
assert!(
!tree.heal_hints().is_empty(),
"a persistent ECC correction must record a heal hint",
);
#[cfg(feature = "metrics")]
{
assert_eq!(
tree.metrics().ecc_auto_heal_scheduled_count(),
1,
"the scheduled SST is counted once",
);
assert_eq!(
tree.metrics().ecc_shard_recovered_count(),
1,
"the RS recovery is counted once",
);
assert_eq!(
tree.metrics().ecc_secded_corrected_count(),
0,
"an RS recovery must not increment the SEC-DED counter",
);
assert_eq!(
tree.metrics().ecc_recovered_count(),
1,
"one total recovery"
);
}
let result = tree.compact(
Arc::new(super::Strategy::new(tree.heal_hints(), u64::MAX)),
0,
)?;
assert!(
tree.heal_hints().is_empty(),
"heal compaction must drain the hint queue, got {result:?}",
);
#[expect(clippy::expect_used, reason = "key survives the rewrite")]
let got = tree
.get(b"key-000000", MAX_SEQNO)?
.expect("key present after heal");
assert_eq!(&*got, b"v000000", "healed value must still be correct");
assert!(
tree.heal_hints().is_empty(),
"the rewritten SST must read clean (no further correction)",
);
Ok(())
}
#[cfg(feature = "metrics")]
#[test]
fn read_healing_single_bit_increments_secded_counter() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let (sst_path, corrupt_pos) = {
let crate::AnyTree::Standard(tree) = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.page_ecc(true)
.ecc_scheme(EccScheme::Secded)
.open()?
else {
unreachable!("standard tree configured (no kv separation)");
};
for i in 0u64..2_000 {
tree.insert(format!("key-{i:06}"), format!("v{i:06}"), i);
}
tree.flush_active_memtable(2_000)?;
let binding = tree.version_history.read().latest_version();
#[expect(clippy::expect_used, reason = "flush produced exactly one table")]
let table = binding.version.iter_tables().next().expect("one table");
#[expect(clippy::expect_used, reason = "table has at least one data block")]
let keyed = table.block_index.iter().next().expect("a data block")?;
#[allow(
clippy::cast_possible_truncation,
reason = "in-file block offset fits usize; only narrows on 32-bit targets"
)]
let off = keyed.offset().0 as usize;
((*table.path).clone(), off + Header::MIN_LEN + 3)
};
let mut bytes = std::fs::read(&sst_path)?;
{
#[expect(
clippy::expect_used,
reason = "corrupt_pos is an in-file block offset, in range for the SST bytes"
)]
let slot = bytes.get_mut(corrupt_pos).expect("corrupt_pos in range");
*slot ^= 0x01;
}
std::fs::write(&sst_path, &bytes)?;
let crate::AnyTree::Standard(tree) = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.page_ecc(true)
.ecc_scheme(EccScheme::Secded)
.open()?
else {
unreachable!("standard tree configured (no kv separation)");
};
#[expect(clippy::expect_used, reason = "key was inserted before flush")]
let got = tree.get(b"key-000000", MAX_SEQNO)?.expect("key present");
assert_eq!(&*got, b"v000000", "SEC-DED must heal the single-bit flip");
assert_eq!(
tree.metrics().ecc_secded_corrected_count(),
1,
"the SEC-DED heal is counted once",
);
assert_eq!(
tree.metrics().ecc_shard_recovered_count(),
0,
"a SEC-DED heal must not increment the RS shard counter",
);
assert_eq!(
tree.metrics().ecc_recovered_count(),
1,
"one total recovery"
);
Ok(())
}
#[test]
fn plan_prewarm_skips_ecc_tables() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let crate::AnyTree::Standard(tree) = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.page_ecc(true)
.ecc_scheme(EccScheme::Secded)
.open()?
else {
unreachable!("standard tree configured (no kv separation)");
};
for i in 0u64..2_000 {
tree.insert(format!("key-{i:06}"), format!("v{i:06}"), i);
}
tree.flush_active_memtable(2_000)?;
let binding = tree.version_history.read().latest_version();
#[expect(clippy::expect_used, reason = "flush produced exactly one table")]
let table = binding.version.iter_tables().next().expect("one table");
let sorted_keys: [(&[u8], u64); 2] = [
(b"key-000000", crate::hash::hash64(b"key-000000")),
(b"key-001000", crate::hash::hash64(b"key-001000")),
];
assert!(
table.plan_prewarm(&sorted_keys, MAX_SEQNO).is_none(),
"plan_prewarm must skip an ECC table so a silently-corrected block is \
never prewarm-cached as clean",
);
Ok(())
}
#[cfg(feature = "zstd")]
#[test]
fn ecc_heal_scheduled_on_partial_decode_corrected_read() -> crate::Result<()> {
use crate::{
CompressionType,
config::{BlockSizePolicy, CompressionPolicy},
};
unsafe { std::env::set_var("LSM_PARTIAL_DECODE", "1") };
let dir = tempfile::tempdir()?;
let open = || {
Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.page_ecc(true)
.ecc_scheme(EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.data_block_compression_policy(CompressionPolicy::all(
#[expect(clippy::expect_used, reason = "19 is a valid zstd level")]
CompressionType::zstd(19).expect("valid level"),
))
.data_block_size_policy(BlockSizePolicy::all(512 * 1024))
.open()
};
let (sst_path, corrupt_pos) = {
let crate::AnyTree::Standard(tree) = open()? else {
unreachable!("standard tree configured");
};
for i in 0u64..20_000 {
tree.insert(
format!("key-{i:08}"),
format!("value-{i:08}-padding-padding"),
0,
);
}
tree.flush_active_memtable(0)?;
let binding = tree.version_history.read().latest_version();
#[expect(clippy::expect_used, reason = "flush produced exactly one table")]
let table = binding.version.iter_tables().next().expect("one table");
#[expect(clippy::expect_used, reason = "table has at least one data block")]
let keyed = table.block_index.iter().next().expect("a data block")?;
#[allow(
clippy::cast_possible_truncation,
reason = "in-file block offset fits usize; only narrows on 32-bit targets"
)]
let off = keyed.offset().0 as usize;
((*table.path).clone(), off + Header::MIN_LEN + 8)
};
let mut bytes = std::fs::read(&sst_path)?;
#[expect(
clippy::indexing_slicing,
reason = "corrupt_pos is an in-file block offset, in range for the SST bytes"
)]
let slot = &mut bytes[corrupt_pos];
*slot ^= 0x01;
std::fs::write(&sst_path, &bytes)?;
let crate::AnyTree::Standard(tree) = open()? else {
unreachable!("standard tree configured");
};
tree.update_runtime_config(|c| c.auto_heal = true)?;
let count = tree
.range(
b"key-00000000".to_vec()..b"key-00000050".to_vec(),
MAX_SEQNO,
None,
)
.count();
assert!(count > 0, "bounded range returned rows");
assert!(
!tree.heal_hints().is_empty(),
"a corrected read on the partial-decode path must schedule healing",
);
Ok(())
}