use anyhow::Result;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
#[derive(Debug, Clone)]
pub struct SystemLoadMonitor {
cpu_usage: Arc<AtomicU64>,
memory_usage: Arc<AtomicU64>,
last_update: Arc<std::sync::Mutex<Instant>>,
update_interval: Duration,
}
impl SystemLoadMonitor {
pub fn new() -> Self {
Self::with_update_interval(Duration::from_secs(1))
}
pub fn with_update_interval(interval: Duration) -> Self {
Self {
cpu_usage: Arc::new(AtomicU64::new(0)),
memory_usage: Arc::new(AtomicU64::new(0)),
last_update: Arc::new(std::sync::Mutex::new(Instant::now())),
update_interval: interval,
}
}
pub fn cpu_usage(&self) -> f64 {
self.maybe_update();
f64::from_bits(self.cpu_usage.load(Ordering::Relaxed)) / 100.0
}
pub fn memory_usage(&self) -> f64 {
self.maybe_update();
f64::from_bits(self.memory_usage.load(Ordering::Relaxed)) / 100.0
}
pub fn overall_load(&self) -> f64 {
let cpu = self.cpu_usage();
let mem = self.memory_usage();
(cpu * 0.6 + mem * 0.4).min(1.0)
}
pub fn is_high_load(&self, threshold: f64) -> bool {
self.overall_load() > threshold
}
pub fn is_low_load(&self, threshold: f64) -> bool {
self.overall_load() < threshold
}
pub fn recommended_concurrency(&self, max_concurrency: usize) -> usize {
let load = self.overall_load();
let scale_factor = if load < 0.5 {
1.0
} else if load < 0.7 {
0.75
} else if load < 0.8 {
0.5
} else if load < 0.9 {
0.4
} else {
0.25
};
((max_concurrency as f64 * scale_factor).max(1.0) as usize).min(max_concurrency)
}
fn maybe_update(&self) {
let mut last_update = self.last_update.lock().expect("Lock poisoned");
if last_update.elapsed() < self.update_interval {
return; }
*last_update = Instant::now();
drop(last_update);
if let Ok((cpu, memory)) = self.read_system_metrics() {
self.cpu_usage.store(cpu.to_bits(), Ordering::Relaxed);
self.memory_usage.store(memory.to_bits(), Ordering::Relaxed);
}
}
fn read_system_metrics(&self) -> Result<(f64, f64)> {
#[cfg(target_os = "linux")]
{
self.read_linux_metrics()
}
#[cfg(target_os = "macos")]
{
self.read_macos_metrics()
}
#[cfg(target_os = "windows")]
{
self.read_windows_metrics()
}
#[cfg(not(any(target_os = "linux", target_os = "macos", target_os = "windows")))]
{
Ok((50.0, 50.0)) }
}
#[cfg(target_os = "linux")]
fn read_linux_metrics(&self) -> Result<(f64, f64)> {
let cpu = self.estimate_cpu_from_loadavg()?;
let memory = self.estimate_memory_from_available()?;
Ok((cpu, memory))
}
#[cfg(target_os = "macos")]
fn read_macos_metrics(&self) -> Result<(f64, f64)> {
let cpu = 30.0; let memory = 40.0;
Ok((cpu, memory))
}
#[cfg(target_os = "windows")]
fn read_windows_metrics(&self) -> Result<(f64, f64)> {
let cpu = 40.0;
let memory = 45.0;
Ok((cpu, memory))
}
#[cfg(target_os = "linux")]
fn estimate_cpu_from_loadavg(&self) -> Result<f64> {
use std::fs;
if let Ok(loadavg) = fs::read_to_string("/proc/loadavg") {
if let Some(load_str) = loadavg.split_whitespace().next() {
if let Ok(load) = load_str.parse::<f64>() {
let num_cpus = num_cpus::get() as f64;
return Ok((load / num_cpus * 100.0).min(100.0));
}
}
}
Ok(30.0) }
#[cfg(target_os = "linux")]
fn estimate_memory_from_available(&self) -> Result<f64> {
use std::fs;
if let Ok(meminfo) = fs::read_to_string("/proc/meminfo") {
let mut total = 0u64;
let mut available = 0u64;
for line in meminfo.lines() {
if line.starts_with("MemTotal:") {
if let Some(val) = line.split_whitespace().nth(1) {
total = val.parse().unwrap_or(0);
}
} else if line.starts_with("MemAvailable:") {
if let Some(val) = line.split_whitespace().nth(1) {
available = val.parse().unwrap_or(0);
}
}
}
if total > 0 {
let used = total.saturating_sub(available);
return Ok((used as f64 / total as f64 * 100.0).min(100.0));
}
}
Ok(50.0) }
}
impl Default for SystemLoadMonitor {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct AdaptiveConcurrencyController {
monitor: SystemLoadMonitor,
max_concurrency: usize,
current_concurrency: Arc<AtomicU64>,
high_load_threshold: f64,
low_load_threshold: f64,
adjustment_interval: Duration,
last_adjustment: Arc<std::sync::Mutex<Instant>>,
}
impl AdaptiveConcurrencyController {
pub fn new(max_concurrency: usize) -> Self {
Self {
monitor: SystemLoadMonitor::new(),
max_concurrency,
current_concurrency: Arc::new(AtomicU64::new(max_concurrency as u64)),
high_load_threshold: 0.75, low_load_threshold: 0.40, adjustment_interval: Duration::from_secs(5),
last_adjustment: Arc::new(std::sync::Mutex::new(Instant::now())),
}
}
pub fn current_concurrency(&self) -> usize {
self.current_concurrency.load(Ordering::Relaxed) as usize
}
pub fn update_concurrency(&self) {
let mut last_adj = self.last_adjustment.lock().expect("Lock poisoned");
if last_adj.elapsed() < self.adjustment_interval {
return; }
*last_adj = Instant::now();
drop(last_adj);
let load = self.monitor.overall_load();
let current = self.current_concurrency.load(Ordering::Relaxed) as usize;
let new_concurrency = if load > self.high_load_threshold {
((current as f64 * 0.75).max(1.0) as usize).min(self.max_concurrency)
} else if load < self.low_load_threshold {
((current as f64 * 1.25) as usize).min(self.max_concurrency)
} else {
self.monitor.recommended_concurrency(self.max_concurrency)
};
self.current_concurrency
.store(new_concurrency as u64, Ordering::Relaxed);
}
pub fn monitor(&self) -> &SystemLoadMonitor {
&self.monitor
}
pub fn with_thresholds(mut self, high: f64, low: f64) -> Self {
self.high_load_threshold = high.clamp(0.0, 1.0);
self.low_load_threshold = low.clamp(0.0, 1.0);
self
}
pub fn with_adjustment_interval(mut self, interval: Duration) -> Self {
self.adjustment_interval = interval;
self
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_system_load_monitor_creation() {
let monitor = SystemLoadMonitor::new();
let cpu = monitor.cpu_usage();
let mem = monitor.memory_usage();
assert!((0.0..=100.0).contains(&cpu));
assert!((0.0..=100.0).contains(&mem));
}
#[test]
fn test_overall_load_calculation() {
let monitor = SystemLoadMonitor::new();
let load = monitor.overall_load();
assert!((0.0..=1.0).contains(&load));
}
#[test]
fn test_recommended_concurrency() {
let monitor = SystemLoadMonitor::new();
let rec = monitor.recommended_concurrency(16);
assert!(rec >= 1);
assert!(rec <= 16);
}
#[test]
fn test_high_low_load_detection() {
let monitor = SystemLoadMonitor::new();
let is_high = monitor.is_high_load(0.75);
let is_low = monitor.is_low_load(0.40);
if is_high {
assert!(!is_low);
}
}
#[test]
fn test_adaptive_concurrency_controller() {
let controller = AdaptiveConcurrencyController::new(16);
let initial = controller.current_concurrency();
assert_eq!(initial, 16);
controller.update_concurrency();
let after_update = controller.current_concurrency();
assert!(after_update >= 1);
assert!(after_update <= 16);
}
#[test]
fn test_concurrency_adjustment_interval() {
let controller = AdaptiveConcurrencyController::new(16);
controller.update_concurrency();
let first = controller.current_concurrency();
controller.update_concurrency();
let second = controller.current_concurrency();
assert_eq!(first, second, "Concurrency should not change immediately");
}
#[test]
fn test_threshold_configuration() {
let controller = AdaptiveConcurrencyController::new(16).with_thresholds(0.80, 0.30);
assert_eq!(controller.high_load_threshold, 0.80);
assert_eq!(controller.low_load_threshold, 0.30);
}
#[test]
fn test_adjustment_interval_configuration() {
let interval = Duration::from_secs(10);
let controller = AdaptiveConcurrencyController::new(16).with_adjustment_interval(interval);
assert_eq!(controller.adjustment_interval, interval);
}
#[test]
fn test_monitor_access() {
let controller = AdaptiveConcurrencyController::new(16);
let monitor = controller.monitor();
let cpu = monitor.cpu_usage();
let mem = monitor.memory_usage();
assert!((0.0..=100.0).contains(&cpu));
assert!((0.0..=100.0).contains(&mem));
}
#[test]
fn test_concurrency_bounds() {
let controller = AdaptiveConcurrencyController::new(8);
for _ in 0..10 {
controller.update_concurrency();
let current = controller.current_concurrency();
assert!(current >= 1, "Concurrency should never be zero");
assert!(current <= 8, "Concurrency should not exceed max");
}
}
}