use crate::error::{RusTorchError, RusTorchResult};
use std::collections::VecDeque;
use std::sync::{Arc, Mutex, RwLock};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum PressureLevel {
Low = 0,
Medium = 1,
High = 2,
Critical = 3,
}
impl PressureLevel {
pub fn from_ratio(ratio: f64) -> Self {
match ratio {
r if r < 0.5 => PressureLevel::Low,
r if r < 0.75 => PressureLevel::Medium,
r if r < 0.9 => PressureLevel::High,
_ => PressureLevel::Critical,
}
}
pub fn threshold(&self) -> f64 {
match self {
PressureLevel::Low => 0.5,
PressureLevel::Medium => 0.75,
PressureLevel::High => 0.9,
PressureLevel::Critical => 1.0,
}
}
pub fn gc_strategy(&self) -> GcStrategy {
match self {
PressureLevel::Low => GcStrategy::Lazy,
PressureLevel::Medium => GcStrategy::Conservative,
PressureLevel::High => GcStrategy::Aggressive,
PressureLevel::Critical => GcStrategy::Emergency,
}
}
}
#[derive(Debug, Clone, Copy, PartialEq)]
pub enum GcStrategy {
Lazy,
Conservative,
Aggressive,
Emergency,
}
impl GcStrategy {
pub fn collection_interval(&self) -> Duration {
match self {
GcStrategy::Lazy => Duration::from_secs(300), GcStrategy::Conservative => Duration::from_secs(60), GcStrategy::Aggressive => Duration::from_secs(10), GcStrategy::Emergency => Duration::from_millis(100), }
}
pub fn memory_threshold(&self) -> f64 {
match self {
GcStrategy::Lazy => 0.9,
GcStrategy::Conservative => 0.8,
GcStrategy::Aggressive => 0.7,
GcStrategy::Emergency => 0.6,
}
}
}
#[derive(Debug, Clone)]
pub struct MemorySnapshot {
pub timestamp: SystemTime,
pub system_total: usize,
pub system_available: usize,
pub process_used: usize,
pub rustorch_used: usize,
pub pressure_ratio: f64,
pub pressure_level: PressureLevel,
}
#[derive(Debug, Clone)]
pub struct PressureTrend {
pub direction: f64,
pub strength: f64,
pub predicted_pressure: f64,
pub confidence: f64,
}
pub struct AdaptivePressureMonitor {
config: MonitorConfig,
current_snapshot: RwLock<Option<MemorySnapshot>>,
history: Mutex<VecDeque<MemorySnapshot>>,
gc_strategy: RwLock<GcStrategy>,
monitor_handle: Mutex<Option<thread::JoinHandle<()>>>,
running: Arc<RwLock<bool>>,
stats: RwLock<MonitorStats>,
}
#[derive(Clone, Debug)]
pub struct MonitorConfig {
pub monitor_interval: Duration,
pub max_history_entries: usize,
pub system_memory_threshold: f64,
pub rustorch_memory_limit: usize,
pub enable_prediction: bool,
pub prediction_window: usize,
}
impl Default for MonitorConfig {
fn default() -> Self {
Self {
monitor_interval: Duration::from_secs(5),
max_history_entries: 1000,
system_memory_threshold: 0.85,
rustorch_memory_limit: 2 * 1024 * 1024 * 1024, enable_prediction: true,
prediction_window: 10,
}
}
}
#[derive(Debug, Clone)]
pub struct MonitorStats {
pub total_snapshots: usize,
pub pressure_distribution: [usize; 4], pub avg_pressure: f64,
pub peak_pressure: f64,
pub strategy_changes: usize,
pub prediction_accuracy: Option<f64>,
}
impl Default for MonitorStats {
fn default() -> Self {
Self {
total_snapshots: 0,
pressure_distribution: [0; 4],
avg_pressure: 0.0,
peak_pressure: 0.0,
strategy_changes: 0,
prediction_accuracy: None,
}
}
}
impl AdaptivePressureMonitor {
pub fn new(config: MonitorConfig) -> Self {
Self {
config,
current_snapshot: RwLock::new(None),
history: Mutex::new(VecDeque::new()),
gc_strategy: RwLock::new(GcStrategy::Conservative),
monitor_handle: Mutex::new(None),
running: Arc::new(RwLock::new(false)),
stats: RwLock::new(MonitorStats::default()),
}
}
pub fn start_monitoring(&self) -> RusTorchResult<()> {
let mut running = self.running.write().map_err(|_| {
RusTorchError::MemoryError("Failed to acquire running write lock".to_string())
})?;
if *running {
return Err(RusTorchError::MemoryError(
"Monitor already running".to_string(),
));
}
*running = true;
let running_flag = self.running.clone();
let config = self.config.clone();
let current_snapshot = Arc::new(RwLock::new(None));
let history = Arc::new(Mutex::new(VecDeque::new()));
let gc_strategy = Arc::new(RwLock::new(GcStrategy::Conservative));
let stats = Arc::new(RwLock::new(MonitorStats::default()));
let handle = thread::spawn(move || {
Self::monitor_loop(
running_flag,
config,
current_snapshot,
history,
gc_strategy,
stats,
);
});
let mut handle_guard = self.monitor_handle.lock().map_err(|_| {
RusTorchError::MemoryError("Failed to acquire monitor handle lock".to_string())
})?;
*handle_guard = Some(handle);
Ok(())
}
pub fn stop_monitoring(&self) -> RusTorchResult<()> {
let mut running = self.running.write().map_err(|_| {
RusTorchError::MemoryError("Failed to acquire running write lock".to_string())
})?;
if !*running {
return Ok(());
}
*running = false;
let mut handle_guard = self.monitor_handle.lock().map_err(|_| {
RusTorchError::MemoryError("Failed to acquire monitor handle lock".to_string())
})?;
if let Some(handle) = handle_guard.take() {
handle.join().map_err(|_| {
RusTorchError::MemoryError("Failed to join monitor thread".to_string())
})?;
}
Ok(())
}
pub fn get_current_snapshot(&self) -> RusTorchResult<Option<MemorySnapshot>> {
let snapshot = self.current_snapshot.read().map_err(|_| {
RusTorchError::MemoryError("Failed to acquire snapshot read lock".to_string())
})?;
Ok(snapshot.clone())
}
pub fn get_gc_strategy(&self) -> RusTorchResult<GcStrategy> {
let strategy = self.gc_strategy.read().map_err(|_| {
RusTorchError::MemoryError("Failed to acquire strategy read lock".to_string())
})?;
Ok(*strategy)
}
pub fn get_stats(&self) -> RusTorchResult<MonitorStats> {
let stats = self.stats.read().map_err(|_| {
RusTorchError::MemoryError("Failed to acquire stats read lock".to_string())
})?;
Ok(stats.clone())
}
pub fn analyze_trend(&self) -> RusTorchResult<Option<PressureTrend>> {
if !self.config.enable_prediction {
return Ok(None);
}
let history = self.history.lock().map_err(|_| {
RusTorchError::MemoryError("Failed to acquire history lock".to_string())
})?;
if history.len() < self.config.prediction_window {
return Ok(None);
}
let recent_data: Vec<_> = history
.iter()
.rev()
.take(self.config.prediction_window)
.collect();
let mut sum_x = 0.0;
let mut sum_y = 0.0;
let mut sum_xy = 0.0;
let mut sum_x2 = 0.0;
let n = recent_data.len() as f64;
for (i, snapshot) in recent_data.iter().enumerate() {
let x = i as f64;
let y = snapshot.pressure_ratio;
sum_x += x;
sum_y += y;
sum_xy += x * y;
sum_x2 += x * x;
}
let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x);
let intercept = (sum_y - slope * sum_x) / n;
let mean_x = sum_x / n;
let mean_y = sum_y / n;
let mut numerator = 0.0;
let mut denom_x = 0.0;
let mut denom_y = 0.0;
for (i, snapshot) in recent_data.iter().enumerate() {
let x = i as f64;
let y = snapshot.pressure_ratio;
numerator += (x - mean_x) * (y - mean_y);
denom_x += (x - mean_x).powi(2);
denom_y += (y - mean_y).powi(2);
}
let correlation = numerator / (denom_x * denom_y).sqrt();
let confidence = correlation.abs();
let next_x = n;
let predicted_pressure = (slope * next_x + intercept).max(0.0).min(1.0);
Ok(Some(PressureTrend {
direction: slope.signum(),
strength: slope.abs(),
predicted_pressure,
confidence,
}))
}
fn monitor_loop(
running: Arc<RwLock<bool>>,
config: MonitorConfig,
current_snapshot: Arc<RwLock<Option<MemorySnapshot>>>,
history: Arc<Mutex<VecDeque<MemorySnapshot>>>,
gc_strategy: Arc<RwLock<GcStrategy>>,
stats: Arc<RwLock<MonitorStats>>,
) {
while Self::is_running(&running) {
if let Ok(snapshot) = Self::take_memory_snapshot(&config) {
if let Ok(mut current) = current_snapshot.write() {
*current = Some(snapshot.clone());
}
if let Ok(mut hist) = history.lock() {
hist.push_back(snapshot.clone());
if hist.len() > config.max_history_entries {
hist.pop_front();
}
}
Self::update_gc_strategy(&snapshot, &gc_strategy, &stats);
Self::update_stats(&snapshot, &stats);
}
thread::sleep(config.monitor_interval);
}
}
fn is_running(running: &Arc<RwLock<bool>>) -> bool {
running.read().map(|r| *r).unwrap_or(false)
}
fn take_memory_snapshot(config: &MonitorConfig) -> RusTorchResult<MemorySnapshot> {
let timestamp = SystemTime::now();
let system_total = 16 * 1024 * 1024 * 1024; let system_available = 8 * 1024 * 1024 * 1024;
let process_used = 1024 * 1024 * 1024;
let rustorch_used = 512 * 1024 * 1024;
let pressure_ratio = rustorch_used as f64 / config.rustorch_memory_limit as f64;
let pressure_level = PressureLevel::from_ratio(pressure_ratio);
Ok(MemorySnapshot {
timestamp,
system_total,
system_available,
process_used,
rustorch_used,
pressure_ratio,
pressure_level,
})
}
fn update_gc_strategy(
snapshot: &MemorySnapshot,
gc_strategy: &Arc<RwLock<GcStrategy>>,
stats: &Arc<RwLock<MonitorStats>>,
) {
let new_strategy = snapshot.pressure_level.gc_strategy();
if let (Ok(mut current_strategy), Ok(mut stat_guard)) = (gc_strategy.write(), stats.write())
{
if *current_strategy != new_strategy {
*current_strategy = new_strategy;
stat_guard.strategy_changes += 1;
}
}
}
fn update_stats(snapshot: &MemorySnapshot, stats: &Arc<RwLock<MonitorStats>>) {
if let Ok(mut stat_guard) = stats.write() {
stat_guard.total_snapshots += 1;
stat_guard.pressure_distribution[snapshot.pressure_level as usize] += 1;
let alpha = 0.1; stat_guard.avg_pressure =
alpha * snapshot.pressure_ratio + (1.0 - alpha) * stat_guard.avg_pressure;
if snapshot.pressure_ratio > stat_guard.peak_pressure {
stat_guard.peak_pressure = snapshot.pressure_ratio;
}
}
}
}
impl Drop for AdaptivePressureMonitor {
fn drop(&mut self) {
let _ = self.stop_monitoring();
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pressure_level_conversion() {
assert_eq!(PressureLevel::from_ratio(0.3), PressureLevel::Low);
assert_eq!(PressureLevel::from_ratio(0.6), PressureLevel::Medium);
assert_eq!(PressureLevel::from_ratio(0.8), PressureLevel::High);
assert_eq!(PressureLevel::from_ratio(0.95), PressureLevel::Critical);
}
#[test]
fn test_gc_strategy_intervals() {
assert!(
GcStrategy::Emergency.collection_interval()
< GcStrategy::Aggressive.collection_interval()
);
assert!(
GcStrategy::Aggressive.collection_interval()
< GcStrategy::Conservative.collection_interval()
);
assert!(
GcStrategy::Conservative.collection_interval() < GcStrategy::Lazy.collection_interval()
);
}
#[test]
fn test_monitor_creation() {
let config = MonitorConfig::default();
let monitor = AdaptivePressureMonitor::new(config);
let strategy = monitor.get_gc_strategy().unwrap();
assert_eq!(strategy, GcStrategy::Conservative);
}
#[test]
#[cfg(not(feature = "ci-fast"))]
fn test_monitor_lifecycle() {
if std::env::var("CI").is_ok() {
return;
}
let config = MonitorConfig::default();
let monitor = AdaptivePressureMonitor::new(config);
drop(monitor);
}
#[test]
fn test_trend_analysis_insufficient_data() {
let config = MonitorConfig {
prediction_window: 5,
..MonitorConfig::default()
};
let monitor = AdaptivePressureMonitor::new(config);
let trend = monitor.analyze_trend().unwrap();
assert!(trend.is_none()); }
}