use super::{Choice, CompactionStrategy, Input as CompactionInput};
use crate::{
HashSet, compaction::state::CompactionState, config::Config, heal_hints::HealHints,
version::Version,
};
use alloc::sync::Arc;
pub const NAME: &str = "EccHealCompaction";
pub struct Strategy {
hints: Arc<HealHints>,
target_size: u64,
}
impl Strategy {
#[must_use]
pub fn new(hints: Arc<HealHints>, target_size: u64) -> Self {
Self { hints, target_size }
}
}
impl CompactionStrategy for Strategy {
fn get_name(&self) -> &'static str {
NAME
}
fn choose(&self, version: &Version, _cfg: &Config, state: &CompactionState) -> Choice {
while let Some(global_id) = self.hints.pop() {
let table_id = global_id.table_id();
let Some(level_idx) = version
.iter_levels()
.position(|level| level.list_ids().contains(&table_id))
else {
continue;
};
if state.hidden_set().is_hidden(table_id) {
self.hints.record(global_id);
return Choice::DoNothing;
}
#[expect(
clippy::cast_possible_truncation,
reason = "level index is bounded by level_count, which is a u8"
)]
let level = level_idx as u8;
return Choice::Merge(CompactionInput {
table_ids: core::iter::once(table_id).collect::<HashSet<_>>(),
dest_level: level,
canonical_level: level,
target_size: self.target_size,
});
}
Choice::DoNothing
}
}
#[cfg(test)]
mod strategy_tests {
use crate::{AbstractTree, AnyTree, Config, GlobalTableId, SequenceNumberCounter};
use alloc::sync::Arc;
#[test]
fn ecc_heal_drops_ids_no_longer_in_the_tree() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let AnyTree::Standard(tree) = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()?
else {
unreachable!("standard tree configured (no kv separation)");
};
tree.insert("k", "v", 1);
tree.flush_active_memtable(1)?;
let hints = tree.heal_hints();
hints.record(GlobalTableId::from((tree.id(), 999_999)));
assert!(!hints.is_empty());
let result = tree.compact(Arc::new(super::Strategy::new(hints.clone(), u64::MAX)), 0)?;
assert!(
hints.is_empty(),
"a stale id must be drained, not left queued; got {result:?}",
);
Ok(())
}
}
#[cfg(all(test, feature = "page_ecc"))]
mod tests {
use crate::{
AbstractTree,
Config,
MAX_SEQNO,
SequenceNumberCounter,
runtime_config::EccScheme,
table::{block::Header, block_index::BlockIndex as _},
};
use alloc::sync::Arc;
#[test]
fn ecc_heal_strategy_rewrites_flagged_sst_so_reread_is_clean() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let (sst_path, corrupt_pos) = {
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.page_ecc(true)
.ecc_scheme(EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.open()?;
let crate::AnyTree::Standard(tree) = tree else {
unreachable!("standard tree configured (no kv separation)");
};
for i in 0u64..2_000 {
tree.insert(format!("key-{i:06}"), format!("v{i:06}"), i);
}
tree.flush_active_memtable(2_000)?;
#[expect(clippy::expect_used, reason = "test asserts the tree shape")]
let versions = tree.version_history.read().expect("lock not poisoned");
let binding = versions.latest_version();
#[expect(clippy::expect_used, reason = "flush produced exactly one table")]
let table = binding.version.iter_tables().next().expect("one table");
#[expect(clippy::expect_used, reason = "table has at least one data block")]
let keyed = table.block_index.iter().next().expect("a data block")?;
let off = keyed.offset().0 as usize;
((*table.path).clone(), off + Header::MIN_LEN + 3)
};
let mut bytes = std::fs::read(&sst_path)?;
bytes[corrupt_pos] ^= 0x80;
std::fs::write(&sst_path, &bytes)?;
let tree = Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.page_ecc(true)
.ecc_scheme(EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.open()?;
let crate::AnyTree::Standard(tree) = tree else {
unreachable!("standard tree configured (no kv separation)");
};
assert!(tree.heal_hints().is_empty(), "fresh tree has no hints");
assert!(!tree.heal_hints().is_enabled(), "auto_heal defaults off");
tree.update_runtime_config(|c| c.auto_heal = true)?;
assert!(
tree.heal_hints().is_enabled(),
"auto_heal toggle syncs the gate"
);
#[expect(clippy::expect_used, reason = "key was inserted before flush")]
let got = tree.get(b"key-000000", MAX_SEQNO)?.expect("key present");
assert_eq!(&*got, b"v000000", "ECC must repair the value on read");
assert!(
!tree.heal_hints().is_empty(),
"a persistent ECC correction must record a heal hint",
);
#[cfg(feature = "metrics")]
assert_eq!(
tree.metrics().ecc_auto_heal_scheduled_count(),
1,
"the scheduled SST is counted once",
);
let result = tree.compact(
Arc::new(super::Strategy::new(tree.heal_hints(), u64::MAX)),
0,
)?;
assert!(
tree.heal_hints().is_empty(),
"heal compaction must drain the hint queue, got {result:?}",
);
#[expect(clippy::expect_used, reason = "key survives the rewrite")]
let got = tree
.get(b"key-000000", MAX_SEQNO)?
.expect("key present after heal");
assert_eq!(&*got, b"v000000", "healed value must still be correct");
assert!(
tree.heal_hints().is_empty(),
"the rewritten SST must read clean (no further correction)",
);
Ok(())
}
#[cfg(feature = "zstd")]
#[test]
fn ecc_heal_scheduled_on_partial_decode_corrected_read() -> crate::Result<()> {
use crate::{
CompressionType,
config::{BlockSizePolicy, CompressionPolicy},
};
unsafe { std::env::set_var("LSM_PARTIAL_DECODE", "1") };
let dir = tempfile::tempdir()?;
let open = || {
Config::new(
dir.path(),
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.page_ecc(true)
.ecc_scheme(EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.data_block_compression_policy(CompressionPolicy::all(
#[expect(clippy::expect_used, reason = "19 is a valid zstd level")]
CompressionType::zstd(19).expect("valid level"),
))
.data_block_size_policy(BlockSizePolicy::all(512 * 1024))
.open()
};
let (sst_path, corrupt_pos) = {
let crate::AnyTree::Standard(tree) = open()? else {
unreachable!("standard tree configured");
};
for i in 0u64..20_000 {
tree.insert(
format!("key-{i:08}"),
format!("value-{i:08}-padding-padding"),
0,
);
}
tree.flush_active_memtable(0)?;
#[expect(clippy::expect_used, reason = "test asserts the tree shape")]
let versions = tree.version_history.read().expect("lock not poisoned");
let binding = versions.latest_version();
#[expect(clippy::expect_used, reason = "flush produced exactly one table")]
let table = binding.version.iter_tables().next().expect("one table");
#[expect(clippy::expect_used, reason = "table has at least one data block")]
let keyed = table.block_index.iter().next().expect("a data block")?;
let off = keyed.offset().0 as usize;
((*table.path).clone(), off + Header::MIN_LEN + 8)
};
let mut bytes = std::fs::read(&sst_path)?;
bytes[corrupt_pos] ^= 0x01;
std::fs::write(&sst_path, &bytes)?;
let crate::AnyTree::Standard(tree) = open()? else {
unreachable!("standard tree configured");
};
tree.update_runtime_config(|c| c.auto_heal = true)?;
let count = tree
.range(
b"key-00000000".to_vec()..b"key-00000050".to_vec(),
MAX_SEQNO,
None,
)
.count();
assert!(count > 0, "bounded range returned rows");
assert!(
!tree.heal_hints().is_empty(),
"a corrected read on the partial-decode path must schedule healing",
);
Ok(())
}
}