use std::collections::{HashMap, VecDeque};
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use sysinfo::System;
use tokio::time::Instant;
use tracing::debug;
use volli_core::env_config;
#[derive(Debug, Clone)]
pub struct HealthMetrics {
pub health_score: f32,
pub load_percentage: f32,
pub max_workers: Option<u32>,
pub current_workers: u32,
pub avg_cpu: Option<f32>,
pub avg_memory: Option<f32>,
pub last_health_update: u64,
}
#[derive(Debug, Clone)]
pub struct HealthConfig {
pub max_workers: Option<u32>,
pub metric_collection_interval: Duration,
pub sample_window_size: usize,
pub cpu_threshold: f32,
pub memory_threshold: f32,
}
impl Default for HealthConfig {
fn default() -> Self {
Self {
max_workers: None, metric_collection_interval: Duration::from_secs(30),
sample_window_size: 5,
cpu_threshold: 80.0,
memory_threshold: 85.0,
}
}
}
pub struct HealthCollector {
config: HealthConfig,
connected_workers: Arc<Mutex<HashMap<String, u32>>>,
synthetic_next: Arc<Mutex<u64>>,
last_cpu_check: Instant,
cpu_samples: VecDeque<f32>,
memory_samples: VecDeque<f32>,
metrics_source: MetricsSource,
override_metrics: Option<HealthOverride>,
}
#[derive(Debug)]
enum MetricsSource {
Sysinfo(Box<System>),
Fixed {
avg_cpu: Option<f32>,
avg_memory: Option<f32>,
},
}
impl MetricsSource {
fn sysinfo() -> Self {
Self::Sysinfo(Box::new(System::new()))
}
fn fixed(avg_cpu: Option<f32>, avg_memory: Option<f32>) -> Self {
Self::Fixed {
avg_cpu,
avg_memory,
}
}
}
impl HealthCollector {
pub fn new(config: HealthConfig) -> Self {
Self::new_with_source(config, MetricsSource::sysinfo())
}
pub fn with_fixed_metrics(
config: HealthConfig,
avg_cpu: Option<f32>,
avg_memory: Option<f32>,
) -> Self {
let mut collector =
Self::new_with_source(config, MetricsSource::fixed(avg_cpu, avg_memory));
collector.last_cpu_check = Instant::now() - collector.config.metric_collection_interval;
collector
}
fn new_with_source(config: HealthConfig, metrics_source: MetricsSource) -> Self {
Self {
config,
connected_workers: Arc::new(Mutex::new(HashMap::new())),
synthetic_next: Arc::new(Mutex::new(0)),
last_cpu_check: Instant::now(),
cpu_samples: VecDeque::new(),
memory_samples: VecDeque::new(),
metrics_source,
override_metrics: None,
}
}
pub async fn collect_metrics(&mut self) -> HealthMetrics {
let current_workers = self.worker_count();
let now = Instant::now();
if now.duration_since(self.last_cpu_check) >= self.config.metric_collection_interval {
self.collect_system_metrics().await;
self.last_cpu_check = now;
}
let ov = self.override_metrics.clone().unwrap_or_default();
let eff_avg_cpu = ov.avg_cpu.or_else(|| self.avg_cpu());
let eff_avg_mem = ov.avg_memory.or_else(|| self.avg_memory());
let eff_load_pct = ov
.load_percentage
.unwrap_or_else(|| self.calculate_load_percentage(current_workers));
let health_score = if let Some(hs) = ov.health_score {
hs
} else {
let mut score = 1.0f32;
if let Some(avg_cpu) = eff_avg_cpu {
let cpu_factor = if avg_cpu > self.config.cpu_threshold {
let over =
(avg_cpu - self.config.cpu_threshold) / (100.0 - self.config.cpu_threshold);
1.0 - over.min(1.0)
} else {
1.0
};
score *= 0.7 + (cpu_factor * 0.3);
}
if let Some(avg_memory) = eff_avg_mem {
let mem_factor = if avg_memory > self.config.memory_threshold {
let over = (avg_memory - self.config.memory_threshold)
/ (100.0 - self.config.memory_threshold);
1.0 - over.min(1.0)
} else {
1.0
};
score *= 0.8 + (mem_factor * 0.2);
}
let load_ratio = (eff_load_pct / 100.0).clamp(0.0, 1.0);
let load_factor = 1.0 - load_ratio; score *= 0.5 + (load_factor * 0.5);
score.clamp(0.0, 1.0)
};
let load_percentage = eff_load_pct;
let avg_cpu = eff_avg_cpu;
let avg_memory = eff_avg_mem;
let last_health_update = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
let mut metrics = HealthMetrics {
health_score,
load_percentage,
max_workers: self.config.max_workers,
current_workers,
avg_cpu,
avg_memory,
last_health_update,
};
if let Some(v) = ov.max_workers {
metrics.max_workers = Some(v);
}
if let Some(v) = ov.current_workers {
metrics.current_workers = v;
}
metrics
}
pub fn register_worker(&self, worker_id: &str) -> u32 {
let mut map = self.connected_workers.lock().unwrap();
let c = map.entry(worker_id.to_string()).or_insert(0);
*c = c.saturating_add(1);
map.len() as u32
}
pub fn unregister_worker(&self, worker_id: &str) -> u32 {
let mut map = self.connected_workers.lock().unwrap();
if let Some(c) = map.get_mut(worker_id) {
if *c > 1 {
*c -= 1;
} else {
map.remove(worker_id);
}
}
map.len() as u32
}
pub fn worker_count(&self) -> u32 {
self.connected_workers.lock().unwrap().len() as u32
}
pub fn set_worker_count(&self, count: u32) {
let mut map = self.connected_workers.lock().unwrap();
map.clear();
for i in 0..count {
map.insert(format!("synthetic_{i}"), 1);
}
let mut n = self.synthetic_next.lock().unwrap();
*n = count as u64;
}
pub fn increment_worker_count(&self) -> u32 {
let mut n = self.synthetic_next.lock().unwrap();
*n += 1;
let id = format!("synthetic_{}", *n);
drop(n);
self.register_worker(&id)
}
pub fn decrement_worker_count(&self) -> u32 {
let mut n = self.synthetic_next.lock().unwrap();
if *n == 0 {
return self.worker_count();
}
let id = format!("synthetic_{}", *n);
*n -= 1;
drop(n);
self.unregister_worker(&id)
}
pub fn set_override(&mut self, ov: Option<HealthOverride>) {
self.override_metrics = ov;
}
pub fn clear_override(&mut self) {
self.override_metrics = None;
}
pub fn can_accept_worker(&self) -> bool {
match self.config.max_workers {
Some(max) => self.worker_count() < max,
None => true, }
}
async fn collect_system_metrics(&mut self) {
let (cpu_avg_opt, memory_usage_opt) = match &mut self.metrics_source {
MetricsSource::Sysinfo(system) => {
system.refresh_cpu_all();
system.refresh_memory();
let delay_ms = if cfg!(test) {
env_config().interval_ms.unwrap_or(0)
} else {
env_config().interval_ms.unwrap_or(200)
};
if delay_ms > 0 {
tokio::time::sleep(Duration::from_millis(delay_ms)).await;
}
system.refresh_cpu_all();
let cpus = system.cpus();
let cpu_avg_opt = if cpus.is_empty() {
None
} else {
let sum = cpus.iter().map(|cpu| cpu.cpu_usage()).sum::<f32>();
let avg = sum / cpus.len() as f32;
if avg.is_finite() && avg >= 0.0 {
Some(avg)
} else {
None
}
};
let memory_usage_opt = if system.total_memory() > 0 {
let m = (system.used_memory() as f32 / system.total_memory() as f32) * 100.0;
if m.is_finite() && m >= 0.0 {
Some(m)
} else {
None
}
} else {
None
};
(cpu_avg_opt, memory_usage_opt)
}
MetricsSource::Fixed {
avg_cpu,
avg_memory,
} => {
let cpu_avg_opt = avg_cpu.and_then(|avg| {
if avg.is_finite() && avg >= 0.0 {
Some(avg)
} else {
None
}
});
let memory_usage_opt = avg_memory.and_then(|avg| {
if avg.is_finite() && avg >= 0.0 {
Some(avg)
} else {
None
}
});
(cpu_avg_opt, memory_usage_opt)
}
};
if let Some(cpu_usage) = cpu_avg_opt {
self.cpu_samples.push_back(cpu_usage);
if self.cpu_samples.len() > self.config.sample_window_size {
self.cpu_samples.pop_front();
}
}
if let Some(memory_usage) = memory_usage_opt {
self.memory_samples.push_back(memory_usage);
if self.memory_samples.len() > self.config.sample_window_size {
self.memory_samples.pop_front();
}
}
if let (Some(c), Some(m)) = (self.avg_cpu(), self.avg_memory()) {
debug!("Collected system metrics: CPU {:.1}%, Memory {:.1}%", c, m);
}
}
#[allow(dead_code)]
fn calculate_health_score(&self, current_workers: u32) -> f32 {
let mut score = 1.0f32;
if let Some(avg_cpu) = self.avg_cpu() {
let cpu_factor = if avg_cpu > self.config.cpu_threshold {
let over_threshold =
(avg_cpu - self.config.cpu_threshold) / (100.0 - self.config.cpu_threshold);
1.0 - over_threshold.min(1.0)
} else {
1.0
};
score *= 0.7 + (cpu_factor * 0.3);
}
if let Some(avg_memory) = self.avg_memory() {
let memory_factor = if avg_memory > self.config.memory_threshold {
let over_threshold = (avg_memory - self.config.memory_threshold)
/ (100.0 - self.config.memory_threshold);
1.0 - over_threshold.min(1.0)
} else {
1.0
};
score *= 0.8 + (memory_factor * 0.2);
}
let load_factor = match self.config.max_workers {
Some(max) if max > 0 => {
let ratio = current_workers as f32 / max as f32;
if ratio > 0.9 {
1.0 - ((ratio - 0.9) / 0.1).min(1.0)
} else if ratio > 0.7 {
1.0 - ((ratio - 0.7) / 0.4) * 0.3
} else {
1.0 }
}
Some(_) => 0.0, None => {
if current_workers > 500 {
0.5 } else if current_workers > 200 {
0.8 } else {
1.0 }
}
};
score *= 0.5 + (load_factor * 0.5);
score.clamp(0.0, 1.0)
}
fn calculate_load_percentage(&self, current_workers: u32) -> f32 {
match self.config.max_workers {
Some(max) if max > 0 => ((current_workers as f32 / max as f32) * 100.0).min(100.0),
Some(_) => 100.0, None => {
let base_load = (current_workers as f32 / 1000.0) * 100.0;
base_load.min(95.0) }
}
}
fn avg_cpu(&self) -> Option<f32> {
if self.cpu_samples.is_empty() {
None
} else {
Some(self.cpu_samples.iter().sum::<f32>() / self.cpu_samples.len() as f32)
}
}
fn avg_memory(&self) -> Option<f32> {
if self.memory_samples.is_empty() {
None
} else {
Some(self.memory_samples.iter().sum::<f32>() / self.memory_samples.len() as f32)
}
}
}
#[derive(Debug, Clone, Default)]
pub struct HealthOverride {
pub health_score: Option<f32>,
pub load_percentage: Option<f32>,
pub max_workers: Option<u32>,
pub current_workers: Option<u32>,
pub avg_cpu: Option<f32>,
pub avg_memory: Option<f32>,
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use volli_core::{EnvironmentConfig, override_env_config};
#[tokio::test]
async fn test_health_collector_basic_functionality() {
let config = HealthConfig {
max_workers: Some(100),
..Default::default()
};
let mut collector = HealthCollector::with_fixed_metrics(config, Some(10.0), Some(20.0));
for i in 0..50 {
collector.register_worker(&format!("w{}", i));
}
let metrics = collector.collect_metrics().await;
assert_eq!(metrics.current_workers, 50);
assert_eq!(metrics.max_workers, Some(100));
assert_eq!(metrics.load_percentage, 50.0);
assert!(metrics.health_score > 0.0 && metrics.health_score <= 1.0);
}
#[test]
fn test_worker_count_operations() {
let config = HealthConfig::default();
let collector = HealthCollector::with_fixed_metrics(config, Some(10.0), Some(20.0));
collector.register_worker("a");
assert_eq!(collector.worker_count(), 1);
let new_count = collector.register_worker("b");
assert_eq!(new_count, 2);
assert_eq!(collector.worker_count(), 2);
let new_count = collector.unregister_worker("b");
assert_eq!(new_count, 1);
assert_eq!(collector.worker_count(), 1);
}
#[test]
fn test_can_accept_worker() {
let config = HealthConfig {
max_workers: Some(5),
..Default::default()
};
let collector = HealthCollector::with_fixed_metrics(config, Some(10.0), Some(20.0));
collector.register_worker("a");
collector.register_worker("b");
collector.register_worker("c");
assert!(collector.can_accept_worker());
collector.register_worker("d");
collector.register_worker("e");
assert!(!collector.can_accept_worker());
let config = HealthConfig {
max_workers: None,
..Default::default()
};
let collector = HealthCollector::with_fixed_metrics(config, Some(10.0), Some(20.0));
for i in 0..1000 {
collector.register_worker(&format!("w{}", i));
}
assert!(collector.can_accept_worker());
}
#[test]
fn test_load_percentage_calculation() {
let config = HealthConfig {
max_workers: Some(100),
..Default::default()
};
let collector = HealthCollector::with_fixed_metrics(config, Some(10.0), Some(20.0));
assert_eq!(collector.calculate_load_percentage(50), 50.0);
assert_eq!(collector.calculate_load_percentage(100), 100.0);
assert_eq!(collector.calculate_load_percentage(150), 100.0);
let config = HealthConfig {
max_workers: None,
..Default::default()
};
let collector = HealthCollector::with_fixed_metrics(config, Some(10.0), Some(20.0));
assert_eq!(collector.calculate_load_percentage(100), 10.0); assert_eq!(collector.calculate_load_percentage(1000), 95.0); }
#[tokio::test]
async fn test_sysinfo_metrics_sampling() {
let _env = override_env_config(EnvironmentConfig {
interval_ms: Some(0),
..Default::default()
});
let config = HealthConfig {
metric_collection_interval: Duration::from_millis(0),
..Default::default()
};
let mut collector = HealthCollector::new(config);
let metrics = collector.collect_metrics().await;
if let Some(cpu) = metrics.avg_cpu {
assert!(cpu.is_finite() && cpu >= 0.0, "cpu avg should be valid");
}
if let Some(memory) = metrics.avg_memory {
assert!(
memory.is_finite() && memory >= 0.0,
"memory avg should be valid"
);
}
assert!(
(0.0..=1.0).contains(&metrics.health_score),
"health score should be within range"
);
}
#[tokio::test]
async fn test_none_samples_do_not_break_scoring() {
let config = HealthConfig {
max_workers: Some(10),
..Default::default()
};
let mut collector = HealthCollector::with_fixed_metrics(config, None, None);
collector.set_worker_count(5);
let metrics = collector.collect_metrics().await;
assert!(metrics.avg_cpu.is_none());
assert!(metrics.avg_memory.is_none());
assert_eq!(metrics.load_percentage, 50.0);
assert!(
(0.0..=1.0).contains(&metrics.health_score),
"health score should be within range"
);
}
#[tokio::test]
async fn test_health_override_partial_and_clear() {
let config = HealthConfig {
max_workers: Some(10),
..Default::default()
};
let mut collector = HealthCollector::with_fixed_metrics(config, Some(10.0), Some(20.0));
collector.set_worker_count(2);
let baseline = collector.collect_metrics().await;
collector.set_override(Some(HealthOverride {
avg_cpu: Some(95.0),
load_percentage: Some(80.0),
..Default::default()
}));
let overridden = collector.collect_metrics().await;
assert_eq!(overridden.avg_cpu, Some(95.0));
assert_eq!(overridden.load_percentage, 80.0);
assert!(overridden.health_score <= baseline.health_score);
collector.clear_override();
let restored = collector.collect_metrics().await;
assert_eq!(restored.avg_cpu, Some(10.0));
assert_eq!(restored.avg_memory, Some(20.0));
}
}