use crate::AbstractTree;
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
#[derive(Debug)]
#[non_exhaustive]
pub enum ScrubError {
UncorrectableBlock {
table_id: crate::table::TableId,
path: PathBuf,
block_offset: u64,
reason: String,
},
BlockIndexUnreadable {
table_id: crate::table::TableId,
path: PathBuf,
reason: String,
},
}
#[derive(Debug, Default)]
#[non_exhaustive]
pub struct PatrolScrubReport {
pub sst_files_scanned: usize,
pub blocks_scanned: usize,
pub corrections_applied: usize,
pub ssts_scheduled_for_rewrite: usize,
pub uncorrectable_blocks: usize,
pub errors: Vec<ScrubError>,
}
impl PatrolScrubReport {
#[must_use]
pub fn is_ok(&self) -> bool {
self.uncorrectable_blocks == 0
}
fn merge(&mut self, other: Self) {
self.sst_files_scanned += other.sst_files_scanned;
self.blocks_scanned += other.blocks_scanned;
self.corrections_applied += other.corrections_applied;
self.ssts_scheduled_for_rewrite += other.ssts_scheduled_for_rewrite;
self.uncorrectable_blocks += other.uncorrectable_blocks;
self.errors.extend(other.errors);
}
}
#[derive(Clone, Debug)]
pub struct PatrolScrubOptions {
pub parallelism: usize,
pub throttle: Option<std::time::Duration>,
}
impl Default for PatrolScrubOptions {
fn default() -> Self {
Self {
parallelism: 1,
throttle: None,
}
}
}
impl PatrolScrubOptions {
#[must_use]
pub const fn parallelism(mut self, workers: usize) -> Self {
self.parallelism = workers;
self
}
#[must_use]
pub const fn throttle(mut self, delay: std::time::Duration) -> Self {
self.throttle = Some(delay);
self
}
}
#[must_use]
pub fn patrol_scrub(tree: &impl AbstractTree, options: &PatrolScrubOptions) -> PatrolScrubReport {
let version = tree.current_version();
let tables: Vec<crate::table::Table> = version.iter_tables().cloned().collect();
let workers = options.parallelism.max(1).min(tables.len().max(1));
if workers <= 1 {
let mut report = PatrolScrubReport::default();
for (idx, table) in tables.iter().enumerate() {
report.merge(table.scrub_data_blocks());
if idx + 1 < tables.len()
&& let Some(delay) = options.throttle
{
std::thread::sleep(delay);
}
}
return report;
}
let cursor = AtomicUsize::new(0);
let partials = std::thread::scope(|scope| {
let handles: Vec<_> = (0..workers)
.map(|_| {
scope.spawn(|| {
let mut local = PatrolScrubReport::default();
let mut idx = cursor.fetch_add(1, Ordering::Relaxed);
while let Some(table) = tables.get(idx) {
local.merge(table.scrub_data_blocks());
idx = cursor.fetch_add(1, Ordering::Relaxed);
if tables.get(idx).is_some()
&& let Some(delay) = options.throttle
{
std::thread::sleep(delay);
}
}
local
})
})
.collect();
handles
.into_iter()
.map(|handle| match handle.join() {
Ok(local) => local,
Err(payload) => std::panic::resume_unwind(payload),
})
.collect::<Vec<_>>()
});
let mut report = PatrolScrubReport::default();
for partial in partials {
report.merge(partial);
}
report
}
#[cfg(test)]
mod tests {
#![expect(
clippy::expect_used,
reason = "tests assert on known-present values; a panic is the failure signal"
)]
use super::*;
use crate::{AbstractTree, AnyTree, Config, SequenceNumberCounter};
fn standard_tree(dir: &std::path::Path) -> AnyTree {
Config::new(
dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.open()
.expect("open tree")
}
#[test]
fn report_merge_sums_every_counter_and_concatenates_errors() {
let mut acc = PatrolScrubReport {
sst_files_scanned: 1,
blocks_scanned: 10,
corrections_applied: 2,
ssts_scheduled_for_rewrite: 1,
uncorrectable_blocks: 0,
errors: vec![],
};
acc.merge(PatrolScrubReport {
sst_files_scanned: 2,
blocks_scanned: 5,
corrections_applied: 1,
ssts_scheduled_for_rewrite: 1,
uncorrectable_blocks: 3,
errors: vec![ScrubError::UncorrectableBlock {
table_id: 7,
path: "/x".into(),
block_offset: 42,
reason: "boom".into(),
}],
});
assert_eq!(acc.sst_files_scanned, 3);
assert_eq!(acc.blocks_scanned, 15);
assert_eq!(acc.corrections_applied, 3);
assert_eq!(acc.ssts_scheduled_for_rewrite, 2);
assert_eq!(acc.uncorrectable_blocks, 3);
assert_eq!(acc.errors.len(), 1);
}
#[test]
fn report_is_ok_only_when_no_uncorrectable_blocks() {
let mut report = PatrolScrubReport::default();
assert!(report.is_ok(), "a fresh empty report is ok");
report.corrections_applied = 5;
assert!(report.is_ok(), "corrected blocks do not make a scrub fail");
report.uncorrectable_blocks = 1;
assert!(!report.is_ok(), "an uncorrectable block fails the scrub");
}
#[test]
fn options_builder_sets_parallelism_and_throttle() {
let opts = PatrolScrubOptions::default()
.parallelism(4)
.throttle(std::time::Duration::from_millis(7));
assert_eq!(opts.parallelism, 4);
assert_eq!(opts.throttle, Some(std::time::Duration::from_millis(7)));
}
#[test]
fn patrol_scrub_on_clean_non_ecc_tree_reads_blocks_without_findings() {
let dir = tempfile::tempdir().expect("tempdir");
let AnyTree::Standard(tree) = standard_tree(dir.path()) else {
unreachable!("standard tree configured");
};
for i in 0u64..500 {
tree.insert(format!("key-{i:06}"), format!("v{i:06}"), i);
}
tree.flush_active_memtable(500).expect("flush");
let report = patrol_scrub(&tree, &PatrolScrubOptions::default());
assert_eq!(report.sst_files_scanned, 1, "one flushed SST");
assert!(report.blocks_scanned >= 1, "at least one data block read");
assert_eq!(report.corrections_applied, 0, "no ECC, nothing to correct");
assert_eq!(
report.uncorrectable_blocks, 0,
"clean tree has no corruption"
);
assert!(report.is_ok());
}
#[test]
fn patrol_scrub_empty_tree_scans_nothing() {
let dir = tempfile::tempdir().expect("tempdir");
let AnyTree::Standard(tree) = standard_tree(dir.path()) else {
unreachable!("standard tree configured");
};
let report = patrol_scrub(&tree, &PatrolScrubOptions::default());
assert_eq!(report.sst_files_scanned, 0);
assert_eq!(report.blocks_scanned, 0);
assert!(report.is_ok());
}
#[test]
fn patrol_scrub_parallel_over_many_ssts_visits_every_file() {
let dir = tempfile::tempdir().expect("tempdir");
let AnyTree::Standard(tree) = standard_tree(dir.path()) else {
unreachable!("standard tree configured");
};
for batch in 0u64..4 {
for i in 0u64..200 {
let k = batch * 1_000 + i;
tree.insert(format!("key-{k:06}"), format!("v{k:06}"), k);
}
tree.flush_active_memtable((batch + 1) * 1_000)
.expect("flush");
}
let opts = PatrolScrubOptions::default()
.parallelism(3)
.throttle(std::time::Duration::from_millis(1));
let report = patrol_scrub(&tree, &opts);
assert_eq!(report.sst_files_scanned, 4, "every SST scrubbed once");
assert!(report.blocks_scanned >= 4);
assert!(report.is_ok());
}
}
#[cfg(all(test, feature = "page_ecc"))]
mod ecc_tests {
#![expect(
clippy::expect_used,
reason = "tests assert on known-present values; a panic is the failure signal"
)]
#![allow(
clippy::cast_possible_truncation,
reason = "in-file block offsets fit usize; only narrow on 32-bit targets"
)]
use super::*;
use crate::{
AbstractTree,
MAX_SEQNO,
SequenceNumberCounter,
runtime_config::EccScheme,
table::{block::Header, block_index::BlockIndex as _},
};
fn open_ecc_tree(dir: &std::path::Path) -> crate::Tree {
let crate::AnyTree::Standard(tree) = crate::Config::new(
dir,
SequenceNumberCounter::default(),
SequenceNumberCounter::default(),
)
.page_ecc(true)
.ecc_scheme(EccScheme::ReedSolomon {
data_shards: 8,
parity_shards: 2,
})
.open()
.expect("open ecc tree") else {
unreachable!("standard tree configured (no kv separation)");
};
tree
}
fn write_ecc_sst(dir: &std::path::Path) -> (std::path::PathBuf, crate::table::BlockHandle) {
let tree = open_ecc_tree(dir);
for i in 0u64..2_000 {
tree.insert(format!("key-{i:06}"), format!("v{i:06}"), i);
}
tree.flush_active_memtable(2_000).expect("flush");
let binding = tree.version_history.read().latest_version();
let table = binding
.version
.iter_tables()
.next()
.expect("flush produced one table");
let keyed = table
.block_index
.iter()
.next()
.expect("table has at least one data block")
.expect("block index entry decodes");
let handle = crate::table::BlockHandle::new(keyed.offset(), keyed.size());
((*table.path).clone(), handle)
}
#[test]
fn patrol_scrub_corrects_seeded_single_bit_fault_and_schedules_heal() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let (sst_path, block) = write_ecc_sst(dir.path());
let corrupt_pos = block.offset().0 as usize + Header::MIN_LEN + 3;
let mut bytes = std::fs::read(&sst_path)?;
let slot = bytes
.get_mut(corrupt_pos)
.expect("corrupt_pos in range for the SST bytes");
*slot ^= 0x80;
std::fs::write(&sst_path, &bytes)?;
let tree = open_ecc_tree(dir.path());
tree.update_runtime_config(|c| c.auto_heal = true)?;
assert!(tree.heal_hints().is_empty(), "fresh tree has no hints");
let report = patrol_scrub(&tree, &PatrolScrubOptions::default());
assert!(
report.corrections_applied >= 1,
"scrub must correct the seeded fault: {report:?}",
);
assert_eq!(
report.ssts_scheduled_for_rewrite, 1,
"the corrected SST is queued for healing exactly once: {report:?}",
);
assert_eq!(report.uncorrectable_blocks, 0, "{report:?}");
assert!(
report.is_ok(),
"a fully-correctable scrub is ok: {report:?}"
);
assert!(
!tree.heal_hints().is_empty(),
"the SST is recorded in the heal queue",
);
#[cfg(feature = "metrics")]
assert_eq!(
tree.metrics().ecc_auto_heal_scheduled_count(),
1,
"the scheduled SST is counted once in metrics",
);
Ok(())
}
#[test]
fn patrol_scrub_corrects_without_scheduling_when_auto_heal_off() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let (sst_path, block) = write_ecc_sst(dir.path());
let corrupt_pos = block.offset().0 as usize + Header::MIN_LEN + 3;
let mut bytes = std::fs::read(&sst_path)?;
let slot = bytes.get_mut(corrupt_pos).expect("corrupt_pos in range");
*slot ^= 0x80;
std::fs::write(&sst_path, &bytes)?;
let tree = open_ecc_tree(dir.path());
assert!(!tree.heal_hints().is_enabled(), "auto_heal defaults off");
let report = patrol_scrub(&tree, &PatrolScrubOptions::default());
assert!(
report.corrections_applied >= 1,
"correction-on-read still happens with auto_heal off: {report:?}",
);
assert_eq!(
report.ssts_scheduled_for_rewrite, 0,
"auto_heal off suppresses rewrite scheduling: {report:?}",
);
assert!(
tree.heal_hints().is_empty(),
"no SST queued when scheduling is off",
);
assert!(report.is_ok());
Ok(())
}
#[test]
fn patrol_scrub_reports_uncorrectable_block_not_silently_skipped() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let (sst_path, block) = write_ecc_sst(dir.path());
let payload_start = block.offset().0 as usize + Header::MIN_LEN;
let payload_end = block.offset().0 as usize + block.size() as usize;
let mut bytes = std::fs::read(&sst_path)?;
for slot in bytes
.get_mut(payload_start..payload_end)
.expect("block payload range in bounds")
{
*slot ^= 0xFF;
}
std::fs::write(&sst_path, &bytes)?;
let tree = open_ecc_tree(dir.path());
tree.update_runtime_config(|c| c.auto_heal = true)?;
let report = patrol_scrub(&tree, &PatrolScrubOptions::default());
assert!(
report.uncorrectable_blocks >= 1,
"an unrecoverable block must be reported, not skipped: {report:?}",
);
assert!(!report.is_ok(), "uncorrectable corruption fails the scrub");
assert!(
report
.errors
.iter()
.any(|e| matches!(e, ScrubError::UncorrectableBlock { .. })),
"the finding is an UncorrectableBlock: {report:?}",
);
Ok(())
}
#[test]
fn patrol_scrub_clean_ecc_tree_reports_no_corrections() -> crate::Result<()> {
let dir = tempfile::tempdir()?;
let _ = write_ecc_sst(dir.path());
let tree = open_ecc_tree(dir.path());
let report = patrol_scrub(&tree, &PatrolScrubOptions::default());
assert_eq!(report.sst_files_scanned, 1);
assert!(report.blocks_scanned >= 1);
assert_eq!(report.corrections_applied, 0, "no fault → no correction");
assert_eq!(report.uncorrectable_blocks, 0);
assert!(report.is_ok());
let got = tree.get(b"key-000000", MAX_SEQNO)?.expect("key present");
assert_eq!(&*got, b"v000000");
Ok(())
}
}