use alloc::collections::BTreeMap;
use alloc::vec::Vec;
use libm;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum ScrubPriority {
Low,
Normal,
High,
Critical,
}
impl ScrubPriority {
pub fn name(&self) -> &'static str {
match self {
ScrubPriority::Low => "Low",
ScrubPriority::Normal => "Normal",
ScrubPriority::High => "High",
ScrubPriority::Critical => "Critical",
}
}
pub fn scan_interval_sec(&self) -> u64 {
match self {
ScrubPriority::Low => 30 * 24 * 3600, ScrubPriority::Normal => 7 * 24 * 3600, ScrubPriority::High => 24 * 3600, ScrubPriority::Critical => 3600, }
}
pub fn io_weight(&self) -> u64 {
match self {
ScrubPriority::Low => 1,
ScrubPriority::Normal => 2,
ScrubPriority::High => 4,
ScrubPriority::Critical => 8,
}
}
}
#[derive(Debug, Clone)]
pub struct BlockScrubState {
pub dataset_id: u64,
pub block_offset: u64,
pub size: u64,
pub last_scrub: u64,
pub scrub_count: u64,
pub error_count: u64,
pub last_error: u64,
pub priority: ScrubPriority,
pub checksum: [u8; 32],
}
impl BlockScrubState {
pub fn new(dataset_id: u64, block_offset: u64, size: u64, timestamp: u64) -> Self {
Self {
dataset_id,
block_offset,
size,
last_scrub: timestamp,
scrub_count: 0,
error_count: 0,
last_error: 0,
priority: ScrubPriority::Normal,
checksum: [0u8; 32],
}
}
pub fn update_priority(&mut self, current_time: u64) {
if self.error_count == 0 {
self.priority = ScrubPriority::Low;
} else {
let time_since_error = current_time.saturating_sub(self.last_error);
let days_since_error = time_since_error / (24 * 3600 * 1000);
if days_since_error < 1 {
self.priority = ScrubPriority::Critical;
} else if days_since_error < 7 {
self.priority = ScrubPriority::High;
} else if days_since_error < 30 {
self.priority = ScrubPriority::Normal;
} else {
self.priority = ScrubPriority::Low;
}
}
}
pub fn is_scrub_due(&self, current_time: u64) -> bool {
let time_since_scrub = current_time.saturating_sub(self.last_scrub);
let interval_ms = self.priority.scan_interval_sec() * 1000;
time_since_scrub >= interval_ms
}
pub fn time_until_scrub(&self, current_time: u64) -> u64 {
let time_since_scrub = current_time.saturating_sub(self.last_scrub);
let interval_ms = self.priority.scan_interval_sec() * 1000;
interval_ms.saturating_sub(time_since_scrub)
}
}
#[derive(Debug, Clone)]
pub struct ScrubStats {
pub blocks_scrubbed: u64,
pub bytes_scrubbed: u64,
pub errors_found: u64,
pub errors_repaired: u64,
pub total_scrub_time_ms: u64,
pub avg_scrub_rate: f64,
pub scrub_rate_variance: f64,
pub sample_count: u64,
}
impl Default for ScrubStats {
fn default() -> Self {
Self {
blocks_scrubbed: 0,
bytes_scrubbed: 0,
errors_found: 0,
errors_repaired: 0,
total_scrub_time_ms: 0,
avg_scrub_rate: 0.0,
scrub_rate_variance: 0.0,
sample_count: 0,
}
}
}
impl ScrubStats {
pub fn update_scrub_rate(&mut self, rate: f64) {
self.sample_count += 1;
let delta = rate - self.avg_scrub_rate;
self.avg_scrub_rate += delta / self.sample_count as f64;
let delta2 = rate - self.avg_scrub_rate;
self.scrub_rate_variance += delta * delta2;
}
pub fn scrub_rate_stddev(&self) -> f64 {
if self.sample_count < 2 {
return 0.0;
}
libm::sqrt(self.scrub_rate_variance / (self.sample_count - 1) as f64)
}
}
pub struct ScrubScheduler {
blocks: BTreeMap<(u64, u64), BlockScrubState>,
stats: ScrubStats,
io_throttle: u64,
last_scrub_time: u64,
}
impl Default for ScrubScheduler {
fn default() -> Self {
Self::new()
}
}
impl ScrubScheduler {
pub fn new() -> Self {
Self {
blocks: BTreeMap::new(),
stats: ScrubStats::default(),
io_throttle: 0,
last_scrub_time: 0,
}
}
pub fn register_block(
&mut self,
dataset_id: u64,
block_offset: u64,
size: u64,
checksum: [u8; 32],
timestamp: u64,
) {
let mut state = BlockScrubState::new(dataset_id, block_offset, size, timestamp);
state.checksum = checksum;
self.blocks.insert((dataset_id, block_offset), state);
}
pub fn set_throttle(&mut self, bytes_per_sec: u64) {
self.io_throttle = bytes_per_sec;
crate::lcpfs_println!("[ SCRUB ] I/O throttle set to {} bytes/sec", bytes_per_sec);
}
pub fn get_next_block(&mut self, current_time: u64) -> Option<(u64, u64)> {
let mut candidates: Vec<_> = self
.blocks
.iter()
.filter(|(_, state)| state.is_scrub_due(current_time))
.collect();
if candidates.is_empty() {
return None;
}
candidates.sort_by(|a, b| {
b.1.priority.cmp(&a.1.priority).then_with(|| {
a.1.time_until_scrub(current_time)
.cmp(&b.1.time_until_scrub(current_time))
})
});
candidates.first().map(|(key, _)| **key)
}
pub fn scrub_block(
&mut self,
dataset_id: u64,
block_offset: u64,
timestamp: u64,
scrub_time_ms: u64,
checksum_valid: bool,
) -> Result<(), &'static str> {
let state = self
.blocks
.get_mut(&(dataset_id, block_offset))
.ok_or("Block not registered")?;
state.last_scrub = timestamp;
state.scrub_count += 1;
self.stats.blocks_scrubbed += 1;
self.stats.bytes_scrubbed += state.size;
self.stats.total_scrub_time_ms += scrub_time_ms;
let rate = (state.size as f64 * 1000.0) / scrub_time_ms.max(1) as f64;
self.stats.update_scrub_rate(rate);
if !checksum_valid {
state.error_count += 1;
state.last_error = timestamp;
self.stats.errors_found += 1;
crate::lcpfs_println!(
"[ SCRUB ] ERROR: Dataset {} block 0x{:x} checksum mismatch",
dataset_id,
block_offset
);
}
state.update_priority(timestamp);
self.last_scrub_time = timestamp;
Ok(())
}
pub fn repair_block(&mut self, dataset_id: u64, block_offset: u64) -> Result<(), &'static str> {
let state = self
.blocks
.get_mut(&(dataset_id, block_offset))
.ok_or("Block not registered")?;
if state.error_count == 0 {
return Err("No errors to repair");
}
self.stats.errors_repaired += 1;
crate::lcpfs_println!(
"[ SCRUB ] Repaired dataset {} block 0x{:x}",
dataset_id,
block_offset
);
Ok(())
}
pub fn get_blocks_by_priority(&self, priority: ScrubPriority) -> Vec<&BlockScrubState> {
self.blocks
.values()
.filter(|state| state.priority == priority)
.collect()
}
pub fn get_error_blocks(&self) -> Vec<&BlockScrubState> {
self.blocks
.values()
.filter(|state| state.error_count > 0)
.collect()
}
pub fn get_stats(&self) -> ScrubStats {
self.stats.clone()
}
pub fn get_progress(&self, current_time: u64) -> f64 {
if self.blocks.is_empty() {
return 1.0;
}
let scrubbed = self
.blocks
.values()
.filter(|state| !state.is_scrub_due(current_time))
.count();
scrubbed as f64 / self.blocks.len() as f64
}
pub fn estimate_completion(&self, current_time: u64) -> u64 {
if self.stats.avg_scrub_rate == 0.0 {
return u64::MAX;
}
let remaining_bytes: u64 = self
.blocks
.values()
.filter(|state| state.is_scrub_due(current_time))
.map(|state| state.size)
.sum();
((remaining_bytes as f64 / self.stats.avg_scrub_rate) * 1000.0) as u64
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_scrub_priority_intervals() {
assert!(
ScrubPriority::Critical.scan_interval_sec() < ScrubPriority::High.scan_interval_sec()
);
assert!(
ScrubPriority::High.scan_interval_sec() < ScrubPriority::Normal.scan_interval_sec()
);
assert!(ScrubPriority::Normal.scan_interval_sec() < ScrubPriority::Low.scan_interval_sec());
}
#[test]
fn test_scrub_priority_io_weight() {
assert!(ScrubPriority::Critical.io_weight() > ScrubPriority::High.io_weight());
assert!(ScrubPriority::High.io_weight() > ScrubPriority::Normal.io_weight());
assert!(ScrubPriority::Normal.io_weight() > ScrubPriority::Low.io_weight());
}
#[test]
fn test_block_state_creation() {
let state = BlockScrubState::new(1, 0x1000, 4096, 1000);
assert_eq!(state.dataset_id, 1);
assert_eq!(state.block_offset, 0x1000);
assert_eq!(state.size, 4096);
assert_eq!(state.scrub_count, 0);
assert_eq!(state.error_count, 0);
assert_eq!(state.priority, ScrubPriority::Normal);
}
#[test]
fn test_block_priority_update() {
let mut state = BlockScrubState::new(1, 0x1000, 4096, 0);
state.update_priority(1000);
assert_eq!(state.priority, ScrubPriority::Low);
state.error_count = 1;
state.last_error = 1000;
state.update_priority(1000);
assert_eq!(state.priority, ScrubPriority::Critical);
let one_day_ms = 24 * 3600 * 1000;
state.update_priority(1000 + one_day_ms);
assert_eq!(state.priority, ScrubPriority::High);
state.update_priority(1000 + 8 * one_day_ms);
assert_eq!(state.priority, ScrubPriority::Normal);
state.update_priority(1000 + 31 * one_day_ms);
assert_eq!(state.priority, ScrubPriority::Low);
}
#[test]
fn test_block_scrub_due() {
let mut state = BlockScrubState::new(1, 0x1000, 4096, 0);
state.priority = ScrubPriority::Normal;
let interval_ms = 7 * 24 * 3600 * 1000;
assert!(!state.is_scrub_due(interval_ms - 1));
assert!(state.is_scrub_due(interval_ms));
assert!(state.is_scrub_due(interval_ms + 1));
}
#[test]
fn test_scheduler_register_block() {
let mut scheduler = ScrubScheduler::new();
scheduler.register_block(1, 0x1000, 4096, [0u8; 32], 1000);
assert_eq!(scheduler.blocks.len(), 1);
let state = scheduler
.blocks
.get(&(1, 0x1000))
.expect("test: operation should succeed");
assert_eq!(state.size, 4096);
}
#[test]
fn test_scheduler_get_next_block() {
let mut scheduler = ScrubScheduler::new();
scheduler.register_block(1, 0x1000, 4096, [0u8; 32], 0);
scheduler.register_block(2, 0x2000, 8192, [0u8; 32], 0);
if let Some(state) = scheduler.blocks.get_mut(&(2, 0x2000)) {
state.priority = ScrubPriority::Critical;
}
let seven_days_ms = 7 * 24 * 3600 * 1000;
let next = scheduler.get_next_block(seven_days_ms);
assert_eq!(next, Some((2, 0x2000)));
}
#[test]
fn test_scheduler_scrub_block() {
let mut scheduler = ScrubScheduler::new();
scheduler.register_block(1, 0x1000, 4096, [0u8; 32], 0);
scheduler
.scrub_block(1, 0x1000, 1000, 100, true)
.expect("test: operation should succeed");
assert_eq!(scheduler.stats.blocks_scrubbed, 1);
assert_eq!(scheduler.stats.bytes_scrubbed, 4096);
let state = scheduler
.blocks
.get(&(1, 0x1000))
.expect("test: operation should succeed");
assert_eq!(state.scrub_count, 1);
assert_eq!(state.last_scrub, 1000);
}
#[test]
fn test_scheduler_error_detection() {
let mut scheduler = ScrubScheduler::new();
scheduler.register_block(1, 0x1000, 4096, [0u8; 32], 0);
scheduler
.scrub_block(1, 0x1000, 1000, 100, false)
.expect("test: operation should succeed");
assert_eq!(scheduler.stats.errors_found, 1);
let state = scheduler
.blocks
.get(&(1, 0x1000))
.expect("test: operation should succeed");
assert_eq!(state.error_count, 1);
assert_eq!(state.last_error, 1000);
}
#[test]
fn test_scheduler_repair() {
let mut scheduler = ScrubScheduler::new();
scheduler.register_block(1, 0x1000, 4096, [0u8; 32], 0);
scheduler
.scrub_block(1, 0x1000, 1000, 100, false)
.expect("test: operation should succeed");
scheduler
.repair_block(1, 0x1000)
.expect("test: operation should succeed");
assert_eq!(scheduler.stats.errors_repaired, 1);
}
#[test]
fn test_scheduler_priority_filtering() {
let mut scheduler = ScrubScheduler::new();
scheduler.register_block(1, 0x1000, 4096, [0u8; 32], 0);
scheduler.register_block(2, 0x2000, 4096, [0u8; 32], 0);
scheduler.register_block(3, 0x3000, 4096, [0u8; 32], 0);
scheduler
.scrub_block(1, 0x1000, 1000, 100, false)
.expect("test: operation should succeed");
if let Some(state) = scheduler.blocks.get_mut(&(1, 0x1000)) {
state.update_priority(1000);
}
let critical_blocks = scheduler.get_blocks_by_priority(ScrubPriority::Critical);
assert_eq!(critical_blocks.len(), 1);
}
#[test]
fn test_scheduler_progress() {
let mut scheduler = ScrubScheduler::new();
scheduler.register_block(1, 0x1000, 4096, [0u8; 32], 0);
scheduler.register_block(2, 0x2000, 4096, [0u8; 32], 0);
let seven_days_ms = 7 * 24 * 3600 * 1000;
let progress = scheduler.get_progress(seven_days_ms);
assert_eq!(progress, 0.0);
scheduler
.scrub_block(1, 0x1000, seven_days_ms, 100, true)
.expect("test: operation should succeed");
let progress = scheduler.get_progress(seven_days_ms);
assert_eq!(progress, 0.5);
}
#[test]
fn test_welford_scrub_rate() {
let mut stats = ScrubStats::default();
stats.update_scrub_rate(100.0);
stats.update_scrub_rate(200.0);
stats.update_scrub_rate(300.0);
assert_eq!(stats.avg_scrub_rate, 200.0);
assert!(stats.scrub_rate_stddev() > 0.0);
}
#[test]
fn test_scheduler_throttle() {
let mut scheduler = ScrubScheduler::new();
scheduler.set_throttle(1_000_000); assert_eq!(scheduler.io_throttle, 1_000_000);
}
#[test]
fn test_error_blocks_filter() {
let mut scheduler = ScrubScheduler::new();
scheduler.register_block(1, 0x1000, 4096, [0u8; 32], 0);
scheduler.register_block(2, 0x2000, 4096, [0u8; 32], 0);
scheduler.register_block(3, 0x3000, 4096, [0u8; 32], 0);
scheduler
.scrub_block(1, 0x1000, 1000, 100, false)
.expect("test: operation should succeed"); scheduler
.scrub_block(2, 0x2000, 1000, 100, true)
.expect("test: operation should succeed"); scheduler
.scrub_block(3, 0x3000, 1000, 100, false)
.expect("test: operation should succeed");
let error_blocks = scheduler.get_error_blocks();
assert_eq!(error_blocks.len(), 2);
}
}