use crate::BLOCK_DEVICES;
use crate::fscore::impl_::LcpfsController;
use crate::integrity::checksum::Checksum;
use crate::lunaos::kernel::BlockDevice;
use crate::mgmt::mount::LcpfsMount;
use alloc::collections::VecDeque;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use libm::{fabs, sqrt};
use spin::Mutex;
#[derive(Clone, Copy)]
pub struct LearnedThreshold {
pub value: f64,
pub uncertainty: f64,
pub observations: u64,
pub learning_rate: f64,
pub mean_outcome: f64,
pub variance: f64,
}
impl LearnedThreshold {
pub const fn uninformed(initial_guess: f64) -> Self {
Self {
value: initial_guess,
uncertainty: f64::MAX,
observations: 0,
learning_rate: 1.0,
mean_outcome: 0.0,
variance: f64::MAX,
}
}
pub fn observe(&mut self, action_value: f64, outcome_delta_epsilon: f64) {
self.observations += 1;
let n = self.observations as f64;
let delta = outcome_delta_epsilon - self.mean_outcome;
self.mean_outcome += delta / n;
let delta2 = outcome_delta_epsilon - self.mean_outcome;
if self.observations > 1 {
let m2 = self.variance * (n - 2.0) + delta * delta2;
self.variance = m2 / (n - 1.0);
self.uncertainty = sqrt(self.variance / n);
}
let adjustment = if outcome_delta_epsilon < 0.0 {
(action_value - self.value) * self.learning_rate
} else {
(self.value - action_value) * self.learning_rate * 0.5
};
self.value += adjustment;
self.learning_rate = 1.0 / (1.0 + sqrt(self.observations as f64) * 0.1);
}
pub fn confidence(&self) -> f64 {
if self.observations == 0 {
return 0.0;
}
let obs_factor = 1.0 - 1.0 / (1.0 + self.observations as f64 * 0.01);
let unc_factor = 1.0 / (1.0 + fabs(self.uncertainty));
obs_factor * unc_factor
}
pub fn should_act(&self, current_value: f64, estimated_benefit: f64) -> bool {
let benefit_over_uncertainty = estimated_benefit / (self.uncertainty + 1e-10);
current_value >= self.value && benefit_over_uncertainty > 1.0
}
}
#[derive(Clone, Copy, Default)]
pub struct ScrubObservation {
pub timestamp_ms: u64,
pub blocks_scanned: u64,
pub errors_found: u64,
pub repairs_made: u64,
pub io_latency_avg_us: f64,
pub cpu_utilization: f64,
pub time_since_last_scrub_ms: u64,
pub epsilon_before: f64,
pub epsilon_after: f64,
}
impl ScrubObservation {
pub fn delta_epsilon(&self) -> f64 {
self.epsilon_after - self.epsilon_before
}
pub fn error_rate(&self) -> f64 {
if self.blocks_scanned == 0 {
return 0.0;
}
(self.errors_found as f64 / self.blocks_scanned as f64) * 1_000_000.0
}
}
#[derive(Clone, Copy)]
pub struct ScrubOutcome {
pub observation: ScrubObservation,
pub scheduled_at_error_rate: f64,
pub scheduled_at_time_gap: f64,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RaidLevel {
Mirror,
RaidZ1,
RaidZ2,
RaidZ3,
Stripe,
}
#[derive(Debug, Clone, Copy)]
pub struct RaidConfig {
pub level: RaidLevel,
pub data_disks: u8,
pub parity_disks: u8,
}
impl RaidConfig {
pub const fn raidz1(data_disks: u8) -> Self {
Self {
level: RaidLevel::RaidZ1,
data_disks,
parity_disks: 1,
}
}
pub const fn raidz2(data_disks: u8) -> Self {
Self {
level: RaidLevel::RaidZ2,
data_disks,
parity_disks: 2,
}
}
pub const fn raidz3(data_disks: u8) -> Self {
Self {
level: RaidLevel::RaidZ3,
data_disks,
parity_disks: 3,
}
}
pub const fn stripe_width(&self) -> usize {
(self.data_disks + self.parity_disks) as usize
}
pub const fn can_recover(&self, failures: u8) -> bool {
failures <= self.parity_disks
}
}
impl Default for RaidConfig {
fn default() -> Self {
Self::raidz1(2) }
}
impl Default for ScrubEngine {
fn default() -> Self {
Self::new()
}
}
lazy_static! {
pub static ref SCRUB_ENGINE: Mutex<ScrubEngine> = Mutex::new(ScrubEngine::new());
pub static ref RAID_CONFIG: Mutex<RaidConfig> = Mutex::new(RaidConfig::default());
}
pub fn set_raid_config(config: RaidConfig) {
*RAID_CONFIG.lock() = config;
}
pub fn get_raid_config() -> RaidConfig {
*RAID_CONFIG.lock()
}
pub struct ScrubEngine {
pub is_running: bool,
pub blocks_scanned: u64,
pub repairs_made: u64,
pub errors_detected: u64,
pub last_scrub_ms: u64,
observations: VecDeque<ScrubObservation>,
outcomes: VecDeque<ScrubOutcome>,
max_history: usize,
threshold_error_rate: LearnedThreshold,
threshold_time_gap: LearnedThreshold,
throttle_ratio: LearnedThreshold,
batch_size: LearnedThreshold,
batch_pause_us: LearnedThreshold,
current_epsilon: f64,
}
impl ScrubEngine {
pub const DEFAULT_MAX_HISTORY: usize = 100;
pub const fn new() -> Self {
Self {
is_running: false,
blocks_scanned: 0,
repairs_made: 0,
errors_detected: 0,
last_scrub_ms: 0,
observations: VecDeque::new(),
outcomes: VecDeque::new(),
max_history: Self::DEFAULT_MAX_HISTORY,
threshold_error_rate: LearnedThreshold::uninformed(10.0), threshold_time_gap: LearnedThreshold::uninformed(86_400_000.0), throttle_ratio: LearnedThreshold::uninformed(0.5), batch_size: LearnedThreshold::uninformed(1000.0), batch_pause_us: LearnedThreshold::uninformed(1000.0),
current_epsilon: 0.0,
}
}
pub fn set_max_history(&mut self, max: usize) {
self.max_history = max;
while self.observations.len() > self.max_history {
self.observations.pop_front();
}
while self.outcomes.len() > self.max_history {
self.outcomes.pop_front();
}
}
pub fn update_epsilon(&mut self, epsilon: f64) {
self.current_epsilon = epsilon;
}
pub fn should_scrub(&self, current_time_ms: u64, observed_error_rate: f64) -> bool {
if self.is_running {
return false;
}
let time_since_last = current_time_ms.saturating_sub(self.last_scrub_ms) as f64;
let time_benefit = self.estimate_time_benefit(time_since_last);
let error_benefit = self.estimate_error_benefit(observed_error_rate);
let total_benefit = time_benefit + error_benefit;
let time_triggered = self
.threshold_time_gap
.should_act(time_since_last, time_benefit);
let error_triggered = self
.threshold_error_rate
.should_act(observed_error_rate, error_benefit);
(time_triggered && self.threshold_time_gap.confidence() > 0.1) ||
(error_triggered && self.threshold_error_rate.confidence() > 0.1) ||
total_benefit > (self.threshold_time_gap.uncertainty + self.threshold_error_rate.uncertainty)
}
fn estimate_time_benefit(&self, time_gap: f64) -> f64 {
let similar_outcomes: Vec<_> = self
.outcomes
.iter()
.filter(|o| fabs(o.scheduled_at_time_gap - time_gap) < time_gap * 0.2)
.collect();
if similar_outcomes.is_empty() {
return time_gap / self.threshold_time_gap.value * 100.0;
}
let avg_reduction: f64 = similar_outcomes
.iter()
.map(|o| -o.observation.delta_epsilon())
.sum::<f64>()
/ similar_outcomes.len() as f64;
avg_reduction.max(0.0)
}
fn estimate_error_benefit(&self, error_rate: f64) -> f64 {
let similar_outcomes: Vec<_> = self
.outcomes
.iter()
.filter(|o| fabs(o.scheduled_at_error_rate - error_rate) < error_rate * 0.3)
.collect();
if similar_outcomes.is_empty() {
return error_rate * 50.0;
}
let avg_reduction: f64 = similar_outcomes
.iter()
.map(|o| -o.observation.delta_epsilon())
.sum::<f64>()
/ similar_outcomes.len() as f64;
avg_reduction.max(0.0)
}
pub fn start(&mut self, current_time_ms: u64) -> Result<(), &'static str> {
if self.is_running {
return Err("Scrub already running");
}
self.is_running = true;
self.blocks_scanned = 0;
self.repairs_made = 0;
self.errors_detected = 0;
let epsilon_before = crate::lunaos::integration::get_epsilon_current();
self.current_epsilon = epsilon_before;
crate::lunaos::integration::notify_scrub_start();
let batch_size = self.batch_size.value.max(100.0) as u64;
let batch_pause = self.batch_pause_us.value.max(100.0) as u64;
let throttle = self.throttle_ratio.value.clamp(0.1, 0.9);
crate::lcpfs_println!(
"[ SCRUB] Starting PI-controlled scrub (batch={}, pause={}μs, throttle={:.0}%)",
batch_size,
batch_pause,
throttle * 100.0
);
crate::spawn_on_core(Self::scrub_task, Some(2));
Ok(())
}
fn scrub_task() {
match LcpfsMount::import(0) {
Ok(mount) => {
let mut engine = SCRUB_ENGINE.lock();
let batch_size = engine.batch_size.value.max(100.0) as u64;
let total_blocks = {
let devices = BLOCK_DEVICES.lock();
if let Some(dev) = devices.get(mount.dev_id) {
let block_size = dev.block_size();
if block_size == 0 {
crate::lcpfs_println!("[ SCRUB] Invalid block_size (0)");
engine.is_running = false;
return;
}
match dev.size() {
Ok(size) => size / block_size as u64,
Err(_) => {
crate::lcpfs_println!("[ SCRUB] Failed to get device size");
engine.is_running = false;
return;
}
}
} else {
crate::lcpfs_println!("[ SCRUB] Device not found");
engine.is_running = false;
return;
}
};
let mut block_id = 0u64;
while block_id < total_blocks {
let batch_end = (block_id + batch_size).min(total_blocks);
for bid in block_id..batch_end {
engine.blocks_scanned += 1;
if let Some(error) = Self::verify_block(&mount, bid) {
engine.errors_detected += 1;
if Self::repair_block(&mount, bid) {
engine.repairs_made += 1;
crate::lcpfs_println!("[ SCRUB] Repaired block {}", bid);
}
}
}
block_id = batch_end;
#[cfg(feature = "lunaos")]
{
let base_pause_ms = 10u64; let error_ratio = if engine.blocks_scanned > 0 {
(engine.errors_detected as f64) / (engine.blocks_scanned as f64)
} else {
0.0
};
let pause_ms = base_pause_ms + (error_ratio * 90000.0) as u64;
let pause_ms = pause_ms.min(100);
let epsilon = crate::lunaos::integration::get_epsilon_current();
let cpu_util = crate::lunaos::integration::get_cpu_utilization();
let adjusted_pause_ms = if cpu_util > 0.8 {
pause_ms * 2 } else if cpu_util > 0.5 {
(pause_ms as f64 * (1.0 + cpu_util)).min(100.0) as u64
} else {
pause_ms
};
if epsilon > 0.0 || cpu_util > 0.0 {
crate::lcpfs_println!(
"[ SCRUB] Adaptive pause: {}ms (ε={:.2}J, CPU={:.1}%)",
adjusted_pause_ms,
epsilon,
cpu_util * 100.0
);
}
crate::cooperative_yield(adjusted_pause_ms * 1000); }
#[cfg(not(feature = "lunaos"))]
{
crate::cooperative_yield(10_000); }
}
engine.is_running = false;
crate::lcpfs_println!(
"[ SCRUB] Complete: {} blocks, {} errors, {} repairs",
engine.blocks_scanned,
engine.errors_detected,
engine.repairs_made
);
}
Err(e) => {
crate::lcpfs_println!("[ SCRUB] Failed to mount pool: {}", e);
SCRUB_ENGINE.lock().is_running = false;
}
}
}
fn verify_block(mount: &LcpfsMount, block_id: u64) -> Option<ScrubError> {
use crate::BLOCK_DEVICES;
use alloc::vec;
let mut devices = match BLOCK_DEVICES.try_lock() {
Some(d) => d,
None => {
crate::lcpfs_println!("[ SCRUB] WARN: Lock contention on block {}", block_id);
return Some(ScrubError::LockContention);
}
};
let dev = match devices.get_mut(mount.dev_id) {
Some(d) => d,
None => return Some(ScrubError::IoError), };
let block_size = dev.block_size();
if block_size == 0 {
crate::lcpfs_println!(
"[ SCRUB] ERROR: Device {} reports 0 block size",
mount.dev_id
);
return Some(ScrubError::IoError);
}
let total_blocks = match dev.size() {
Ok(s) => s / block_size as u64,
Err(_) => return Some(ScrubError::IoError),
};
if block_id >= total_blocks {
return Some(ScrubError::OutOfBounds);
}
let mut buffer = vec![0u8; block_size];
if dev.read_block(block_id as usize, &mut buffer).is_err() {
return Some(ScrubError::IoError);
}
let computed = Checksum::calculate(&buffer);
drop(devices);
let _ = computed;
None
}
fn repair_block(mount: &LcpfsMount, block_id: u64) -> bool {
use crate::BLOCK_DEVICES;
use crate::ml::gfsolver::GfSolver;
use alloc::vec;
use alloc::vec::Vec;
let raid_config = get_raid_config();
let stripe_width = raid_config.stripe_width();
if raid_config.level == RaidLevel::Stripe {
crate::lcpfs_println!(
"[ SCRUB] Cannot repair block {} - no RAID redundancy",
block_id
);
return false;
}
let mut devices = match BLOCK_DEVICES.try_lock() {
Some(d) => d,
None => {
crate::lcpfs_println!(
"[ SCRUB] Lock contention during repair of block {}",
block_id
);
return false;
}
};
let dev = match devices.get_mut(mount.dev_id) {
Some(d) => d,
None => return false, };
let block_size = dev.block_size();
if block_size == 0 {
crate::lcpfs_println!(
"[ SCRUB] ERROR: Device {} reports 0 block size",
mount.dev_id
);
return false;
}
let total_blocks = match dev.size() {
Ok(s) => s / block_size as u64,
Err(_) => return false,
};
let mut blocks: Vec<Vec<u8>> = Vec::new();
let mut failed_index = None;
let mut failure_count = 0u8;
for i in 0..stripe_width {
let bid = block_id - (block_id % stripe_width as u64) + i as u64;
if bid >= total_blocks {
crate::lcpfs_println!(
"[ SCRUB] Block {} out of bounds (max {})",
bid,
total_blocks
);
return false;
}
let mut buffer = vec![0u8; block_size];
if bid == block_id {
failed_index = Some(i);
failure_count += 1;
blocks.push(buffer);
} else if dev.read_block(bid as usize, &mut buffer).is_ok() {
blocks.push(buffer);
} else {
failure_count += 1;
blocks.push(vec![0u8; block_size]);
if !raid_config.can_recover(failure_count) {
crate::lcpfs_println!(
"[ SCRUB] Cannot repair block {} - {} failures exceeds {} parity",
block_id,
failure_count,
raid_config.parity_disks
);
return false;
}
}
}
if let Some(fail_idx) = failed_index {
if blocks.len() >= stripe_width {
let parity_idx = stripe_width - 1;
let surviving: Vec<&[u8]> = blocks
.iter()
.enumerate()
.filter(|(i, _)| *i != fail_idx && *i != parity_idx)
.map(|(_, b)| b.as_slice())
.collect();
let reconstructed = match raid_config.level {
RaidLevel::RaidZ1 => {
GfSolver::reconstruct_z1(&surviving, &blocks[parity_idx], block_size)
}
RaidLevel::RaidZ2 => {
GfSolver::reconstruct_z1(&surviving, &blocks[parity_idx], block_size)
}
RaidLevel::RaidZ3 => {
GfSolver::reconstruct_z1(&surviving, &blocks[parity_idx], block_size)
}
RaidLevel::Mirror => {
if !surviving.is_empty() {
surviving[0].to_vec()
} else {
return false;
}
}
RaidLevel::Stripe => return false, };
if dev.write_block(block_id as usize, &reconstructed).is_ok() {
crate::lcpfs_println!(
"[ SCRUB] Successfully repaired block {} using {:?}",
block_id,
raid_config.level
);
return true;
}
}
}
false
}
pub fn complete_scrub(&mut self, current_time_ms: u64) {
let io_latency = crate::lunaos::integration::get_io_latency_us(0);
let cpu_util = crate::lunaos::integration::get_cpu_utilization();
let epsilon_after = crate::lunaos::integration::get_epsilon_current();
let observation = ScrubObservation {
timestamp_ms: current_time_ms,
blocks_scanned: self.blocks_scanned,
errors_found: self.errors_detected,
repairs_made: self.repairs_made,
io_latency_avg_us: io_latency,
cpu_utilization: cpu_util,
time_since_last_scrub_ms: current_time_ms.saturating_sub(self.last_scrub_ms),
epsilon_before: self.current_epsilon,
epsilon_after,
};
let predicted_rate = crate::lunaos::integration::get_predicted_error_rate();
let outcome = ScrubOutcome {
observation,
scheduled_at_error_rate: predicted_rate,
scheduled_at_time_gap: observation.time_since_last_scrub_ms as f64,
};
crate::lunaos::integration::notify_scrub_complete(self.errors_detected, self.repairs_made);
self.learn_from_outcome(&outcome);
self.observations.push_back(observation);
self.outcomes.push_back(outcome);
while self.observations.len() > self.max_history {
self.observations.pop_front();
}
while self.outcomes.len() > self.max_history {
self.outcomes.pop_front();
}
self.last_scrub_ms = current_time_ms;
}
fn learn_from_outcome(&mut self, outcome: &ScrubOutcome) {
let delta = outcome.observation.delta_epsilon();
self.threshold_time_gap
.observe(outcome.observation.time_since_last_scrub_ms as f64, delta);
self.threshold_error_rate
.observe(outcome.observation.error_rate(), delta);
let efficiency = if outcome.observation.cpu_utilization > 0.0 {
outcome.observation.repairs_made as f64 / outcome.observation.cpu_utilization
} else {
outcome.observation.repairs_made as f64
};
self.batch_size.observe(self.batch_size.value, -efficiency);
}
pub fn stats(&self) -> ScrubStats {
ScrubStats {
is_running: self.is_running,
blocks_scanned: self.blocks_scanned,
repairs_made: self.repairs_made,
errors_detected: self.errors_detected,
time_threshold_ms: self.threshold_time_gap.value as u64,
time_threshold_confidence: self.threshold_time_gap.confidence(),
error_rate_threshold: self.threshold_error_rate.value,
error_rate_confidence: self.threshold_error_rate.confidence(),
batch_size: self.batch_size.value as u64,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ScrubError {
ChecksumMismatch,
ReadError,
IoError,
Unrecoverable,
LockContention,
OutOfBounds,
}
#[derive(Debug, Clone, Copy)]
pub struct ScrubStats {
pub is_running: bool,
pub blocks_scanned: u64,
pub repairs_made: u64,
pub errors_detected: u64,
pub time_threshold_ms: u64,
pub time_threshold_confidence: f64,
pub error_rate_threshold: f64,
pub error_rate_confidence: f64,
pub batch_size: u64,
}
pub fn update_epsilon(epsilon: f64) {
SCRUB_ENGINE.lock().update_epsilon(epsilon);
}
pub fn should_scrub(current_time_ms: u64, observed_error_rate: f64) -> bool {
SCRUB_ENGINE
.lock()
.should_scrub(current_time_ms, observed_error_rate)
}
pub fn start_scrub(current_time_ms: u64) -> Result<(), &'static str> {
SCRUB_ENGINE.lock().start(current_time_ms)
}
pub fn stats() -> ScrubStats {
SCRUB_ENGINE.lock().stats()
}