Skip to main content

datasynth_core/
cpu_monitor.rs

1//! CPU load monitoring for preventing system overload.
2//!
3//! This module provides CPU load tracking with configurable thresholds
4//! and optional auto-throttling to maintain system responsiveness.
5
6use std::collections::VecDeque;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::{Arc, RwLock};
9use std::time::{Duration, Instant};
10
11/// CPU load statistics.
12#[derive(Debug, Clone, Default)]
13pub struct CpuStats {
14    /// Current CPU load (0.0 - 1.0)
15    pub current_load: f64,
16    /// Average CPU load over sample window
17    pub average_load: f64,
18    /// Peak CPU load observed
19    pub peak_load: f64,
20    /// Number of samples collected
21    pub samples_collected: u64,
22    /// Whether throttling is currently active
23    pub is_throttling: bool,
24    /// Number of times throttling was triggered
25    pub throttle_count: u64,
26}
27
28/// CPU monitor configuration.
29#[derive(Debug, Clone)]
30pub struct CpuMonitorConfig {
31    /// Enable CPU monitoring
32    pub enabled: bool,
33    /// High load threshold (0.0 - 1.0), triggers warning
34    pub high_load_threshold: f64,
35    /// Critical load threshold (0.0 - 1.0), triggers throttling
36    pub critical_load_threshold: f64,
37    /// Sample interval in milliseconds
38    pub sample_interval_ms: u64,
39    /// Number of samples to keep for averaging
40    pub sample_window_size: usize,
41    /// Enable automatic throttling when critical threshold exceeded
42    pub auto_throttle: bool,
43    /// Throttle delay in milliseconds (pause between operations)
44    pub throttle_delay_ms: u64,
45}
46
47impl Default for CpuMonitorConfig {
48    fn default() -> Self {
49        Self {
50            enabled: false,
51            high_load_threshold: 0.85,
52            critical_load_threshold: 0.95,
53            sample_interval_ms: 1000,
54            sample_window_size: 10,
55            auto_throttle: true,
56            throttle_delay_ms: 50,
57        }
58    }
59}
60
61impl CpuMonitorConfig {
62    /// Create config with specified thresholds.
63    pub fn with_thresholds(high: f64, critical: f64) -> Self {
64        Self {
65            enabled: true,
66            high_load_threshold: high.clamp(0.0, 1.0),
67            critical_load_threshold: critical.clamp(0.0, 1.0),
68            ..Default::default()
69        }
70    }
71
72    /// Enable auto-throttling.
73    pub fn with_auto_throttle(mut self, delay_ms: u64) -> Self {
74        self.auto_throttle = true;
75        self.throttle_delay_ms = delay_ms;
76        self
77    }
78
79    /// Disable auto-throttling.
80    pub fn without_auto_throttle(mut self) -> Self {
81        self.auto_throttle = false;
82        self
83    }
84}
85
86/// CPU load exceeded error.
87#[derive(Debug, Clone)]
88pub struct CpuOverloaded {
89    pub current_load: f64,
90    pub threshold: f64,
91    pub is_critical: bool,
92    pub message: String,
93}
94
95impl std::fmt::Display for CpuOverloaded {
96    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
97        write!(f, "{}", self.message)
98    }
99}
100
101impl std::error::Error for CpuOverloaded {}
102
103/// Thread-safe CPU load monitor.
104#[derive(Debug)]
105pub struct CpuMonitor {
106    config: CpuMonitorConfig,
107    load_history: Arc<RwLock<VecDeque<f64>>>,
108    current_load_raw: AtomicU64,
109    peak_load_raw: AtomicU64,
110    is_throttling: AtomicBool,
111    throttle_count: AtomicU64,
112    samples_collected: AtomicU64,
113    last_sample_time: Arc<RwLock<Option<Instant>>>,
114    // CPU time tracking for load calculation
115    #[cfg(target_os = "linux")]
116    last_cpu_times: Arc<RwLock<Option<(u64, u64)>>>,
117}
118
119impl CpuMonitor {
120    /// Create a new CPU monitor with the given configuration.
121    pub fn new(config: CpuMonitorConfig) -> Self {
122        Self {
123            config,
124            load_history: Arc::new(RwLock::new(VecDeque::new())),
125            current_load_raw: AtomicU64::new(0),
126            peak_load_raw: AtomicU64::new(0),
127            is_throttling: AtomicBool::new(false),
128            throttle_count: AtomicU64::new(0),
129            samples_collected: AtomicU64::new(0),
130            last_sample_time: Arc::new(RwLock::new(None)),
131            #[cfg(target_os = "linux")]
132            last_cpu_times: Arc::new(RwLock::new(None)),
133        }
134    }
135
136    /// Create a disabled CPU monitor.
137    pub fn disabled() -> Self {
138        Self::new(CpuMonitorConfig {
139            enabled: false,
140            ..Default::default()
141        })
142    }
143
144    /// Create an Arc-wrapped CPU monitor for sharing across threads.
145    pub fn shared(config: CpuMonitorConfig) -> Arc<Self> {
146        Arc::new(Self::new(config))
147    }
148
149    /// Check if monitoring is enabled.
150    pub fn is_enabled(&self) -> bool {
151        self.config.enabled
152    }
153
154    /// Sample current CPU load and update statistics.
155    pub fn sample(&self) -> Option<f64> {
156        if !self.config.enabled {
157            return None;
158        }
159
160        // Check if enough time has passed since last sample
161        {
162            let mut last_time = self.last_sample_time.write().ok()?;
163            let now = Instant::now();
164            if let Some(last) = *last_time {
165                if now.duration_since(last).as_millis() < self.config.sample_interval_ms as u128 {
166                    // Return current load without sampling
167                    return Some(f64::from_bits(
168                        self.current_load_raw.load(Ordering::Relaxed),
169                    ));
170                }
171            }
172            *last_time = Some(now);
173        }
174
175        let load = self.get_cpu_load()?;
176
177        // Update current load
178        self.current_load_raw
179            .store(load.to_bits(), Ordering::Relaxed);
180
181        // Update peak
182        let mut peak = f64::from_bits(self.peak_load_raw.load(Ordering::Relaxed));
183        while load > peak {
184            match self.peak_load_raw.compare_exchange_weak(
185                peak.to_bits(),
186                load.to_bits(),
187                Ordering::Relaxed,
188                Ordering::Relaxed,
189            ) {
190                Ok(_) => break,
191                Err(p) => peak = f64::from_bits(p),
192            }
193        }
194
195        // Update history
196        if let Ok(mut history) = self.load_history.write() {
197            history.push_back(load);
198            while history.len() > self.config.sample_window_size {
199                history.pop_front();
200            }
201        }
202
203        self.samples_collected.fetch_add(1, Ordering::Relaxed);
204
205        // Check thresholds and apply throttling
206        if load >= self.config.critical_load_threshold {
207            if self.config.auto_throttle && !self.is_throttling.load(Ordering::Relaxed) {
208                self.is_throttling.store(true, Ordering::Relaxed);
209                self.throttle_count.fetch_add(1, Ordering::Relaxed);
210            }
211        } else if load < self.config.high_load_threshold {
212            self.is_throttling.store(false, Ordering::Relaxed);
213        }
214
215        Some(load)
216    }
217
218    /// Check CPU load and return error if threshold exceeded.
219    pub fn check(&self) -> Result<(), CpuOverloaded> {
220        if !self.config.enabled {
221            return Ok(());
222        }
223
224        let load = self.sample().unwrap_or(0.0);
225
226        if load >= self.config.critical_load_threshold {
227            return Err(CpuOverloaded {
228                current_load: load,
229                threshold: self.config.critical_load_threshold,
230                is_critical: true,
231                message: format!(
232                    "Critical CPU load: {:.1}% exceeds critical threshold of {:.1}%. \
233                     Reduce parallel workers or enable throttling.",
234                    load * 100.0,
235                    self.config.critical_load_threshold * 100.0
236                ),
237            });
238        }
239
240        Ok(())
241    }
242
243    /// Apply throttle delay if throttling is active.
244    pub fn maybe_throttle(&self) {
245        if self.config.auto_throttle && self.is_throttling.load(Ordering::Relaxed) {
246            std::thread::sleep(Duration::from_millis(self.config.throttle_delay_ms));
247        }
248    }
249
250    /// Get current statistics.
251    pub fn stats(&self) -> CpuStats {
252        let current = f64::from_bits(self.current_load_raw.load(Ordering::Relaxed));
253        let peak = f64::from_bits(self.peak_load_raw.load(Ordering::Relaxed));
254
255        let average = if let Ok(history) = self.load_history.read() {
256            if history.is_empty() {
257                0.0
258            } else {
259                history.iter().sum::<f64>() / history.len() as f64
260            }
261        } else {
262            current
263        };
264
265        CpuStats {
266            current_load: current,
267            average_load: average,
268            peak_load: peak,
269            samples_collected: self.samples_collected.load(Ordering::Relaxed),
270            is_throttling: self.is_throttling.load(Ordering::Relaxed),
271            throttle_count: self.throttle_count.load(Ordering::Relaxed),
272        }
273    }
274
275    /// Get current CPU load.
276    pub fn current_load(&self) -> f64 {
277        f64::from_bits(self.current_load_raw.load(Ordering::Relaxed))
278    }
279
280    /// Check if throttling is currently active.
281    pub fn is_throttling(&self) -> bool {
282        self.is_throttling.load(Ordering::Relaxed)
283    }
284
285    /// Check if CPU monitoring is available on this platform.
286    pub fn is_available() -> bool {
287        #[cfg(target_os = "linux")]
288        {
289            std::fs::read_to_string("/proc/stat").is_ok()
290        }
291        #[cfg(target_os = "macos")]
292        {
293            true // Uses top -l 1
294        }
295        #[cfg(not(any(target_os = "linux", target_os = "macos")))]
296        {
297            false
298        }
299    }
300
301    /// Reset statistics (for testing).
302    pub fn reset_stats(&self) {
303        self.current_load_raw.store(0, Ordering::Relaxed);
304        self.peak_load_raw.store(0, Ordering::Relaxed);
305        self.is_throttling.store(false, Ordering::Relaxed);
306        self.throttle_count.store(0, Ordering::Relaxed);
307        self.samples_collected.store(0, Ordering::Relaxed);
308        if let Ok(mut history) = self.load_history.write() {
309            history.clear();
310        }
311    }
312
313    /// Get CPU load (platform-specific implementation).
314    #[cfg(target_os = "linux")]
315    fn get_cpu_load(&self) -> Option<f64> {
316        use std::fs;
317
318        let content = fs::read_to_string("/proc/stat").ok()?;
319        let line = content.lines().next()?;
320
321        // Parse: cpu user nice system idle iowait irq softirq steal guest guest_nice
322        let parts: Vec<u64> = line
323            .split_whitespace()
324            .skip(1) // Skip "cpu"
325            .take(7)
326            .filter_map(|s| s.parse().ok())
327            .collect();
328
329        if parts.len() < 4 {
330            return None;
331        }
332
333        let idle = parts[3];
334        let total: u64 = parts.iter().sum();
335
336        // Get previous values
337        let mut last_times = self.last_cpu_times.write().ok()?;
338
339        let load = if let Some((last_idle, last_total)) = *last_times {
340            let idle_delta = idle.saturating_sub(last_idle);
341            let total_delta = total.saturating_sub(last_total);
342
343            if total_delta > 0 {
344                1.0 - (idle_delta as f64 / total_delta as f64)
345            } else {
346                0.0
347            }
348        } else {
349            0.0
350        };
351
352        *last_times = Some((idle, total));
353
354        Some(load.clamp(0.0, 1.0))
355    }
356
357    #[cfg(target_os = "macos")]
358    fn get_cpu_load(&self) -> Option<f64> {
359        use std::process::Command;
360
361        // Use top -l 1 to get CPU usage
362        let output = Command::new("top")
363            .args(["-l", "1", "-n", "0"])
364            .output()
365            .ok()?;
366
367        let stdout = String::from_utf8_lossy(&output.stdout);
368
369        // Parse "CPU usage: X% user, Y% sys, Z% idle"
370        for line in stdout.lines() {
371            if line.contains("CPU usage:") {
372                // Extract idle percentage
373                if let Some(idle_start) = line.find("idle") {
374                    let before_idle = &line[..idle_start];
375                    let parts: Vec<&str> = before_idle.split_whitespace().collect();
376                    if let Some(idle_str) = parts.last() {
377                        let idle_str = idle_str.trim_end_matches('%').trim_end_matches(',');
378                        if let Ok(idle) = idle_str.parse::<f64>() {
379                            return Some((100.0 - idle) / 100.0);
380                        }
381                    }
382                }
383            }
384        }
385
386        None
387    }
388
389    #[cfg(not(any(target_os = "linux", target_os = "macos")))]
390    fn get_cpu_load(&self) -> Option<f64> {
391        None
392    }
393}
394
395impl Default for CpuMonitor {
396    fn default() -> Self {
397        Self::disabled()
398    }
399}
400
401/// Simple system load check (returns average load if available).
402#[cfg(unix)]
403pub fn get_system_load() -> Option<f64> {
404    use std::fs;
405
406    // Try /proc/loadavg on Linux
407    if let Ok(content) = fs::read_to_string("/proc/loadavg") {
408        let parts: Vec<&str> = content.split_whitespace().collect();
409        if !parts.is_empty() {
410            if let Ok(load) = parts[0].parse::<f64>() {
411                // Convert load average to percentage (assuming single core)
412                // For multi-core, divide by number of cores
413                let num_cpus = num_cpus::get() as f64;
414                return Some((load / num_cpus).clamp(0.0, 1.0));
415            }
416        }
417    }
418
419    None
420}
421
422#[cfg(not(unix))]
423pub fn get_system_load() -> Option<f64> {
424    None
425}
426
427/// Get number of CPU cores (fallback for non-unix).
428#[cfg(not(unix))]
429mod num_cpus {
430    pub fn get() -> usize {
431        1
432    }
433}
434
435#[cfg(unix)]
436mod num_cpus {
437    pub fn get() -> usize {
438        std::thread::available_parallelism()
439            .map(|p| p.get())
440            .unwrap_or(1)
441    }
442}
443
444#[cfg(test)]
445mod tests {
446    use super::*;
447
448    #[test]
449    fn test_cpu_monitor_creation() {
450        let config = CpuMonitorConfig::with_thresholds(0.80, 0.95);
451        let monitor = CpuMonitor::new(config);
452        assert!(monitor.is_enabled());
453    }
454
455    #[test]
456    fn test_cpu_monitor_disabled() {
457        let monitor = CpuMonitor::disabled();
458        assert!(!monitor.is_enabled());
459        assert!(monitor.check().is_ok());
460    }
461
462    #[test]
463    fn test_stats_tracking() {
464        let config = CpuMonitorConfig {
465            enabled: true,
466            sample_interval_ms: 0, // No delay for testing
467            ..Default::default()
468        };
469        let monitor = CpuMonitor::new(config);
470
471        // Sample a few times
472        for _ in 0..5 {
473            let _ = monitor.sample();
474        }
475
476        let stats = monitor.stats();
477        // On supported platforms, we should have samples
478        #[cfg(any(target_os = "linux", target_os = "macos"))]
479        assert!(stats.samples_collected > 0);
480    }
481
482    #[test]
483    fn test_is_available() {
484        #[cfg(target_os = "linux")]
485        assert!(CpuMonitor::is_available());
486    }
487
488    #[test]
489    fn test_throttling_flag() {
490        let monitor = CpuMonitor::disabled();
491        assert!(!monitor.is_throttling());
492    }
493
494    #[test]
495    fn test_config_builders() {
496        let config = CpuMonitorConfig::with_thresholds(0.7, 0.9).with_auto_throttle(100);
497        assert!(config.auto_throttle);
498        assert_eq!(config.throttle_delay_ms, 100);
499
500        let config2 = CpuMonitorConfig::with_thresholds(0.7, 0.9).without_auto_throttle();
501        assert!(!config2.auto_throttle);
502    }
503}