Skip to main content

lsm_tree/compaction/
heal.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2026-present, Structured World Foundation
3
4//! Compaction strategy that rewrites SSTs flagged for ECC self-healing.
5//!
6//! A block read that recovers its payload from Page-ECC parity returns correct
7//! bytes but leaves the fault on disk (SSTs are immutable, so the block cannot
8//! be patched). The read path records the owning SST in the tree's
9//! [`HealHints`]; this strategy claims one such SST per pass and emits a
10//! single-table [`Merge`](super::Choice::Merge) back into its own level. The
11//! merge re-reads the block (correcting it once more on the way in) and writes a
12//! fresh SST with newly computed parity, so subsequent reads need no correction.
13//!
14//! Run it repeatedly (from the compaction loop, leader-only in a clustered
15//! deployment) until [`HealHints::is_empty`] reports the queue drained.
16
17use super::{Choice, CompactionStrategy, Input as CompactionInput};
18use crate::{
19    HashSet, compaction::state::CompactionState, config::Config, heal_hints::HealHints,
20    version::Version,
21};
22use alloc::sync::Arc;
23
24/// Name reported by [`CompactionStrategy::get_name`].
25pub const NAME: &str = "EccHealCompaction";
26
27/// Rewrites one ECC-flagged SST per invocation to clear a latent parity fault.
28///
29/// Holds a shared handle to the tree's [`HealHints`]; obtain it from
30/// [`Tree::heal_hints`](crate::Tree::heal_hints).
31pub struct Strategy {
32    hints: Arc<HealHints>,
33    target_size: u64,
34}
35
36impl Strategy {
37    /// Builds a heal strategy over `hints`.
38    ///
39    /// `target_size` caps the rewritten output run's table size (use the level's
40    /// target, or [`u64::MAX`] to keep the SST a single table).
41    #[must_use]
42    pub fn new(hints: Arc<HealHints>, target_size: u64) -> Self {
43        Self { hints, target_size }
44    }
45}
46
47impl CompactionStrategy for Strategy {
48    fn get_name(&self) -> &'static str {
49        NAME
50    }
51
52    fn choose(&self, version: &Version, _cfg: &Config, state: &CompactionState) -> Choice {
53        // Claim flagged SSTs one at a time. An id no longer in the tree (already
54        // compacted away since the hint) is dropped; one currently hidden in
55        // another compaction is re-queued for the next pass.
56        while let Some(global_id) = self.hints.pop() {
57            let table_id = global_id.table_id();
58
59            let Some(level_idx) = version
60                .iter_levels()
61                .position(|level| level.list_ids().contains(&table_id))
62            else {
63                // Gone — nothing left to heal for this id.
64                continue;
65            };
66
67            if state.hidden_set().is_hidden(table_id) {
68                // Busy in another compaction; put it back and try next pass.
69                self.hints.record(global_id);
70                return Choice::DoNothing;
71            }
72
73            #[expect(
74                clippy::cast_possible_truncation,
75                reason = "level index is bounded by level_count, which is a u8"
76            )]
77            let level = level_idx as u8;
78
79            return Choice::Merge(CompactionInput {
80                table_ids: core::iter::once(table_id).collect::<HashSet<_>>(),
81                dest_level: level,
82                canonical_level: level,
83                target_size: self.target_size,
84            });
85        }
86
87        Choice::DoNothing
88    }
89}
90
91#[cfg(test)]
92mod strategy_tests;
93
94#[cfg(all(test, feature = "page_ecc"))]
95mod tests;