use crate::fscore::impl_::LcpfsController;
use crate::fscore::structs::Dva;
use crate::hw::smart::{SmartAttribute, get_smart_data};
use alloc::collections::VecDeque;
use alloc::string::String;
use alloc::vec::Vec;
use lazy_static::lazy_static;
use libm::{fabs, sqrt};
use spin::Mutex;
#[derive(Clone, Copy)]
pub struct LearnedThreshold {
pub value: f64,
pub uncertainty: f64,
pub observations: u64,
pub learning_rate: f64,
pub mean_outcome: f64,
pub variance: f64,
}
impl LearnedThreshold {
pub const fn uninformed(initial_guess: f64) -> Self {
Self {
value: initial_guess,
uncertainty: f64::MAX,
observations: 0,
learning_rate: 1.0,
mean_outcome: 0.0,
variance: f64::MAX,
}
}
pub fn observe(&mut self, action_value: f64, outcome_delta_epsilon: f64) {
self.observations += 1;
let n = self.observations as f64;
let delta = outcome_delta_epsilon - self.mean_outcome;
self.mean_outcome += delta / n;
let delta2 = outcome_delta_epsilon - self.mean_outcome;
if self.observations > 1 {
let m2 = self.variance * (n - 2.0) + delta * delta2;
self.variance = m2 / (n - 1.0);
self.uncertainty = sqrt(self.variance / n);
}
let adjustment = if outcome_delta_epsilon < 0.0 {
(action_value - self.value) * self.learning_rate
} else {
(self.value - action_value) * self.learning_rate * 0.5
};
self.value += adjustment;
self.learning_rate = 1.0 / (1.0 + sqrt(self.observations as f64) * 0.1);
}
pub fn confidence(&self) -> f64 {
if self.observations == 0 {
return 0.0;
}
let obs_factor = 1.0 - 1.0 / (1.0 + self.observations as f64 * 0.01);
let unc_factor = 1.0 / (1.0 + fabs(self.uncertainty));
obs_factor * unc_factor
}
pub fn should_act(&self, current_value: f64, estimated_benefit: f64) -> bool {
let benefit_over_uncertainty = estimated_benefit / (self.uncertainty + 1e-10);
current_value >= self.value && benefit_over_uncertainty > 1.0
}
}
#[derive(Clone, Copy, Default)]
pub struct VdevHealthObservation {
pub vdev_id: usize,
pub timestamp_ms: u64,
pub latency_avg_us: f64,
pub latency_p99_us: f64,
pub read_errors: u64,
pub write_errors: u64,
pub reallocated_sectors: u64,
pub pending_sectors: u64,
pub temperature_c: f64,
pub power_on_hours: u64,
pub uncorrectable_errors: u64,
pub io_ops: u64,
}
impl VdevHealthObservation {
pub fn failure_risk(&self, learned: &EvacuationEngine) -> f64 {
let mut risk = 0.0;
if self.latency_p99_us > learned.threshold_latency.value {
risk += 0.3 * (self.latency_p99_us / learned.threshold_latency.value).min(2.0);
}
let error_rate = if self.io_ops > 0 {
(self.read_errors + self.write_errors) as f64 / self.io_ops as f64
} else {
0.0
};
if error_rate > learned.threshold_error_rate.value {
risk += 0.4 * (error_rate / learned.threshold_error_rate.value).min(3.0);
}
if self.reallocated_sectors as f64 > learned.threshold_reallocated.value {
risk += 0.2;
}
if self.pending_sectors as f64 > learned.threshold_pending.value {
risk += 0.2;
}
if self.temperature_c > learned.threshold_temperature.value {
risk +=
0.1 * ((self.temperature_c - learned.threshold_temperature.value) / 10.0).min(1.0);
}
risk.min(1.0)
}
}
#[derive(Clone, Copy)]
pub struct EvacuationOutcome {
pub vdev_id: usize,
pub started_at_risk: f64,
pub blocks_evacuated: u64,
pub time_taken_ms: u64,
pub drive_failed_during: bool,
pub epsilon_before: f64,
pub epsilon_after: f64,
}
impl EvacuationOutcome {
pub fn delta_epsilon(&self) -> f64 {
self.epsilon_after - self.epsilon_before
}
pub fn was_successful(&self) -> bool {
!self.drive_failed_during && self.delta_epsilon() <= 0.0
}
}
lazy_static! {
pub static ref EVAC_ENGINE: Mutex<EvacuationEngine> = Mutex::new(EvacuationEngine::new());
}
pub struct EvacuationEngine {
pub is_running: bool,
pub evacuating_vdev: Option<usize>,
pub blocks_migrated: u64,
pub target_vdev: Option<usize>,
observations: VecDeque<VdevHealthObservation>,
outcomes: VecDeque<EvacuationOutcome>,
threshold_latency: LearnedThreshold,
threshold_error_rate: LearnedThreshold,
threshold_reallocated: LearnedThreshold,
threshold_pending: LearnedThreshold,
threshold_temperature: LearnedThreshold,
threshold_evac_risk: LearnedThreshold,
batch_size: LearnedThreshold,
batch_pause_us: LearnedThreshold,
current_epsilon: f64,
last_risk_score: f64,
}
impl Default for EvacuationEngine {
fn default() -> Self {
Self::new()
}
}
impl EvacuationEngine {
pub fn new() -> Self {
Self {
is_running: false,
evacuating_vdev: None,
blocks_migrated: 0,
target_vdev: None,
observations: VecDeque::with_capacity(1000),
outcomes: VecDeque::with_capacity(100),
threshold_latency: LearnedThreshold::uninformed(50_000.0), threshold_error_rate: LearnedThreshold::uninformed(0.001), threshold_reallocated: LearnedThreshold::uninformed(100.0), threshold_pending: LearnedThreshold::uninformed(10.0), threshold_temperature: LearnedThreshold::uninformed(55.0), threshold_evac_risk: LearnedThreshold::uninformed(0.5), batch_size: LearnedThreshold::uninformed(1000.0), batch_pause_us: LearnedThreshold::uninformed(1000.0),
current_epsilon: 0.0,
last_risk_score: 0.0,
}
}
pub fn update_epsilon(&mut self, epsilon: f64) {
self.current_epsilon = epsilon;
}
pub fn observe(&mut self, obs: VdevHealthObservation) {
self.observations.push_back(obs);
while self.observations.len() > 1000 {
self.observations.pop_front();
}
}
pub fn should_evacuate(&mut self, vdev_id: usize) -> bool {
if self.is_running {
return false;
}
let latest = self
.observations
.iter()
.rev()
.find(|o| o.vdev_id == vdev_id);
let obs = match latest {
Some(o) => o,
None => return false,
};
let risk = obs.failure_risk(self);
self.last_risk_score = risk;
let evac_benefit = self.estimate_evac_benefit(vdev_id, risk);
self.threshold_evac_risk.should_act(risk, evac_benefit)
&& self.threshold_evac_risk.confidence() > 0.1
}
fn estimate_evac_benefit(&self, vdev_id: usize, current_risk: f64) -> f64 {
let similar_outcomes: Vec<_> = self
.outcomes
.iter()
.filter(|o| fabs(o.started_at_risk - current_risk) < 0.2)
.collect();
if similar_outcomes.is_empty() {
let potential_loss = current_risk * 1_000_000.0; return potential_loss * 0.8; }
let successful: Vec<_> = similar_outcomes
.iter()
.filter(|o| o.was_successful())
.collect();
if successful.is_empty() {
return 0.0;
}
let avg_improvement: f64 =
successful.iter().map(|o| -o.delta_epsilon()).sum::<f64>() / successful.len() as f64;
avg_improvement.max(0.0)
}
pub fn find_target_vdev(&self) -> Option<usize> {
let mut best_target: Option<(usize, f64)> = None;
for obs in self.observations.iter().rev() {
if Some(obs.vdev_id) == self.evacuating_vdev {
continue;
}
let risk = obs.failure_risk(self);
match best_target {
None => best_target = Some((obs.vdev_id, risk)),
Some((_, best_risk)) if risk < best_risk => {
best_target = Some((obs.vdev_id, risk));
}
_ => {}
}
}
best_target.map(|(id, _)| id)
}
pub fn start_evacuation(&mut self, dying_vdev: usize) -> Result<(), &'static str> {
if self.is_running {
return Err("Evacuation already in progress");
}
let target = self
.find_target_vdev()
.ok_or("No suitable target VDEV found")?;
self.is_running = true;
self.evacuating_vdev = Some(dying_vdev);
self.target_vdev = Some(target);
self.blocks_migrated = 0;
crate::lcpfs_println!(
"[ EVAC ] INITIATING PI-CONTROLLED EVACUATION: VDEV {} -> VDEV {}",
dying_vdev,
target
);
crate::lcpfs_println!(
"[ EVAC ] Parameters: batch={}, pause={}μs (learned)",
self.batch_size.value as u64,
self.batch_pause_us.value as u64
);
crate::spawn_on_core(Self::evac_task, Some(2));
Ok(())
}
fn evac_task() {
let mut engine = EVAC_ENGINE.lock();
let dying_vdev = match engine.evacuating_vdev {
Some(v) => v,
None => {
engine.is_running = false;
return;
}
};
let target_vdev = match engine.target_vdev {
Some(v) => v,
None => {
engine.is_running = false;
return;
}
};
let batch_size = engine.batch_size.value.max(100.0) as u64;
let epsilon_before = engine.current_epsilon;
let start_time = crate::get_time();
use crate::BLOCK_DEVICES;
use alloc::vec;
let total_blocks = {
let devices = BLOCK_DEVICES.lock();
if let Some(dev) = devices.get(dying_vdev) {
dev.block_count()
} else {
crate::lcpfs_println!("[ EVAC ] Error: Source VDEV {} not found", dying_vdev);
engine.is_running = false;
return;
}
};
crate::lcpfs_println!(
"[ EVAC ] Migrating {} blocks from VDEV {} to VDEV {}",
total_blocks,
dying_vdev,
target_vdev
);
let mut block_id = 0u64;
let mut failed_blocks = 0u64;
while block_id < total_blocks as u64 {
let batch_end = (block_id + batch_size).min(total_blocks as u64);
for bid in block_id..batch_end {
let mut buffer = vec![0u8; 512];
let read_success = {
let mut devices = BLOCK_DEVICES.lock();
if let Some(src) = devices.get_mut(dying_vdev) {
src.read_block(bid as usize, &mut buffer).is_ok()
} else {
false
}
};
if !read_success {
failed_blocks += 1;
continue; }
let write_success = {
let mut devices = BLOCK_DEVICES.lock();
if let Some(dst) = devices.get_mut(target_vdev) {
dst.write_block(bid as usize, &buffer).is_ok()
} else {
false
}
};
if write_success {
engine.blocks_migrated += 1;
} else {
failed_blocks += 1;
}
}
block_id = batch_end;
if block_id % 1000 == 0 {
crate::lcpfs_println!(
"[ EVAC ] Progress: {}/{} blocks migrated ({} failed)",
engine.blocks_migrated,
total_blocks,
failed_blocks
);
}
}
let time_taken_ms = (crate::get_time() - start_time) / 1_000_000;
let drive_failed = failed_blocks > (total_blocks as u64 / 10); let outcome = EvacuationOutcome {
vdev_id: dying_vdev,
started_at_risk: engine.last_risk_score,
blocks_evacuated: engine.blocks_migrated,
time_taken_ms,
drive_failed_during: drive_failed,
epsilon_before,
epsilon_after: engine.current_epsilon,
};
engine.learn_from_outcome(&outcome);
engine.outcomes.push_back(outcome);
while engine.outcomes.len() > 100 {
engine.outcomes.pop_front();
}
engine.is_running = false;
engine.evacuating_vdev = None;
engine.target_vdev = None;
crate::lcpfs_println!(
"[ EVAC ] EVACUATION COMPLETE: {} blocks migrated from VDEV {} to VDEV {}",
engine.blocks_migrated,
dying_vdev,
target_vdev
);
}
fn learn_from_outcome(&mut self, outcome: &EvacuationOutcome) {
let delta = outcome.delta_epsilon();
self.threshold_evac_risk
.observe(outcome.started_at_risk, delta);
if outcome.drive_failed_during {
self.threshold_evac_risk.observe(
outcome.started_at_risk * 0.5,
-1000.0, );
}
if outcome.time_taken_ms > 0 {
let throughput = outcome.blocks_evacuated as f64 / outcome.time_taken_ms as f64;
self.batch_size.observe(self.batch_size.value, -throughput);
}
}
pub fn stats(&self) -> EvacStats {
EvacStats {
is_running: self.is_running,
evacuating_vdev: self.evacuating_vdev,
target_vdev: self.target_vdev,
blocks_migrated: self.blocks_migrated,
evac_risk_threshold: self.threshold_evac_risk.value,
evac_risk_confidence: self.threshold_evac_risk.confidence(),
latency_threshold_us: self.threshold_latency.value as u64,
latency_confidence: self.threshold_latency.confidence(),
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct EvacStats {
pub is_running: bool,
pub evacuating_vdev: Option<usize>,
pub target_vdev: Option<usize>,
pub blocks_migrated: u64,
pub evac_risk_threshold: f64,
pub evac_risk_confidence: f64,
pub latency_threshold_us: u64,
pub latency_confidence: f64,
}
pub struct HealthMonitor {
pub latencies: Vec<u64>,
pub error_counts: Vec<u64>,
}
impl Default for HealthMonitor {
fn default() -> Self {
Self::new()
}
}
impl HealthMonitor {
pub fn new() -> Self {
Self {
latencies: Vec::new(),
error_counts: Vec::new(),
}
}
pub fn record_latency(&mut self, vdev_id: usize, latency_us: u64) {
while self.latencies.len() <= vdev_id {
self.latencies.push(0);
}
self.latencies[vdev_id] = latency_us;
}
pub fn record_error(&mut self, vdev_id: usize) {
while self.error_counts.len() <= vdev_id {
self.error_counts.push(0);
}
self.error_counts[vdev_id] += 1;
}
}
pub fn update_epsilon(epsilon: f64) {
EVAC_ENGINE.lock().update_epsilon(epsilon);
}
pub fn observe_health(obs: VdevHealthObservation) {
EVAC_ENGINE.lock().observe(obs);
}
pub fn should_evacuate(vdev_id: usize) -> bool {
EVAC_ENGINE.lock().should_evacuate(vdev_id)
}
pub fn start_evacuation(dying_vdev: usize) -> Result<(), &'static str> {
EVAC_ENGINE.lock().start_evacuation(dying_vdev)
}
pub fn stats() -> EvacStats {
EVAC_ENGINE.lock().stats()
}
pub fn check_health(_controller: &LcpfsController) {
let vdev_id = 0;
let timestamp_ms = crate::time::now() * 1000;
let smart_data = get_smart_data(vdev_id as u64);
let obs = if let Some(data) = smart_data {
let attrs = &data.attributes;
let reallocated = attrs
.get(&SmartAttribute::ReallocatedSectors)
.map(|v| v.current)
.unwrap_or(0);
let pending = attrs
.get(&SmartAttribute::CurrentPendingSectors)
.map(|v| v.current)
.unwrap_or(0);
let uncorrectable = attrs
.get(&SmartAttribute::ReportedUncorrectable)
.map(|v| v.current)
.unwrap_or(0);
let temperature = attrs
.get(&SmartAttribute::Temperature)
.map(|v| v.current as f64)
.unwrap_or(40.0);
let power_on_hours = attrs
.get(&SmartAttribute::PowerOnHours)
.map(|v| v.current)
.unwrap_or(0);
VdevHealthObservation {
vdev_id,
timestamp_ms,
latency_avg_us: 100.0, latency_p99_us: 200.0, read_errors: 0,
write_errors: 0,
reallocated_sectors: reallocated,
pending_sectors: pending,
temperature_c: temperature,
power_on_hours,
uncorrectable_errors: uncorrectable,
io_ops: 1000,
}
} else {
VdevHealthObservation {
vdev_id,
timestamp_ms,
latency_avg_us: 100.0,
latency_p99_us: 200.0,
read_errors: 0,
write_errors: 0,
reallocated_sectors: 0,
pending_sectors: 0,
temperature_c: 40.0,
power_on_hours: 0,
uncorrectable_errors: 0,
io_ops: 1000,
}
};
observe_health(obs);
if should_evacuate(vdev_id) {
let _ = start_evacuation(vdev_id);
}
}