lsm_tree/compaction/heal.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2026-present, Structured World Foundation
3
4//! Compaction strategy that rewrites SSTs flagged for ECC self-healing.
5//!
6//! A block read that recovers its payload from Page-ECC parity returns correct
7//! bytes but leaves the fault on disk (SSTs are immutable, so the block cannot
8//! be patched). The read path records the owning SST in the tree's
9//! [`HealHints`]; this strategy claims one such SST per pass and emits a
10//! single-table [`Merge`](super::Choice::Merge) back into its own level. The
11//! merge re-reads the block (correcting it once more on the way in) and writes a
12//! fresh SST with newly computed parity, so subsequent reads need no correction.
13//!
14//! Run it repeatedly (from the compaction loop, leader-only in a clustered
15//! deployment) until [`HealHints::is_empty`] reports the queue drained.
16
17use super::{Choice, CompactionStrategy, Input as CompactionInput};
18use crate::{
19 HashSet, compaction::state::CompactionState, config::Config, heal_hints::HealHints,
20 version::Version,
21};
22use alloc::sync::Arc;
23
24/// Name reported by [`CompactionStrategy::get_name`].
25pub const NAME: &str = "EccHealCompaction";
26
27/// Rewrites one ECC-flagged SST per invocation to clear a latent parity fault.
28///
29/// Holds a shared handle to the tree's [`HealHints`]; obtain it from
30/// [`Tree::heal_hints`](crate::Tree::heal_hints).
31pub struct Strategy {
32 hints: Arc<HealHints>,
33 target_size: u64,
34}
35
36impl Strategy {
37 /// Builds a heal strategy over `hints`.
38 ///
39 /// `target_size` caps the rewritten output run's table size (use the level's
40 /// target, or [`u64::MAX`] to keep the SST a single table).
41 #[must_use]
42 pub fn new(hints: Arc<HealHints>, target_size: u64) -> Self {
43 Self { hints, target_size }
44 }
45}
46
47impl CompactionStrategy for Strategy {
48 fn get_name(&self) -> &'static str {
49 NAME
50 }
51
52 fn choose(&self, version: &Version, _cfg: &Config, state: &CompactionState) -> Choice {
53 // Claim flagged SSTs one at a time. An id no longer in the tree (already
54 // compacted away since the hint) is dropped; one currently hidden in
55 // another compaction is re-queued for the next pass.
56 while let Some(global_id) = self.hints.pop() {
57 let table_id = global_id.table_id();
58
59 let Some(level_idx) = version
60 .iter_levels()
61 .position(|level| level.list_ids().contains(&table_id))
62 else {
63 // Gone — nothing left to heal for this id.
64 continue;
65 };
66
67 if state.hidden_set().is_hidden(table_id) {
68 // Busy in another compaction; put it back and try next pass.
69 self.hints.record(global_id);
70 return Choice::DoNothing;
71 }
72
73 #[expect(
74 clippy::cast_possible_truncation,
75 reason = "level index is bounded by level_count, which is a u8"
76 )]
77 let level = level_idx as u8;
78
79 return Choice::Merge(CompactionInput {
80 table_ids: core::iter::once(table_id).collect::<HashSet<_>>(),
81 dest_level: level,
82 canonical_level: level,
83 target_size: self.target_size,
84 });
85 }
86
87 Choice::DoNothing
88 }
89}
90
91#[cfg(test)]
92mod strategy_tests;
93
94#[cfg(all(test, feature = "page_ecc"))]
95mod tests;