scirs2_io/pipeline/advanced_optimization/
monitoring.rs

1//! System resource monitoring for pipeline optimization
2//!
3//! This module provides real-time monitoring of system resources including
4//! CPU usage, memory utilization, I/O performance, and cache efficiency.
5
6use crate::error::{IoError, Result};
7use std::sync::{Arc, RwLock};
8use std::time::{Duration, Instant};
9
10use super::config::{CachePerformance, MemoryUsage, NumaTopology, SystemMetrics};
11
12/// Real-time system resource monitor
13#[derive(Debug)]
14pub struct ResourceMonitor {
15    /// Last update timestamp
16    last_update: Instant,
17    /// Update frequency
18    update_frequency: Duration,
19    /// Cached metrics to avoid frequent system calls
20    cached_metrics: Option<SystemMetrics>,
21    /// Monitoring history for trend analysis
22    metrics_history: Vec<SystemMetrics>,
23    /// Maximum history size
24    max_history_size: usize,
25}
26
27impl Default for ResourceMonitor {
28    fn default() -> Self {
29        Self::new()
30    }
31}
32
33impl ResourceMonitor {
34    pub fn new() -> Self {
35        Self {
36            last_update: Instant::now(),
37            update_frequency: Duration::from_millis(500), // Update every 500ms
38            cached_metrics: None,
39            metrics_history: Vec::new(),
40            max_history_size: 100, // Keep last 100 samples
41        }
42    }
43
44    /// Get current system metrics with caching
45    pub fn get_current_metrics(&mut self) -> Result<SystemMetrics> {
46        let now = Instant::now();
47
48        // Check if we need to update cached metrics
49        if self.cached_metrics.is_none()
50            || now.duration_since(self.last_update) >= self.update_frequency
51        {
52            let metrics = self.collect_system_metrics()?;
53            self.cached_metrics = Some(metrics.clone());
54            self.last_update = now;
55
56            // Add to history
57            self.metrics_history.push(metrics.clone());
58            if self.metrics_history.len() > self.max_history_size {
59                self.metrics_history.remove(0);
60            }
61
62            Ok(metrics)
63        } else {
64            Ok(self.cached_metrics.as_ref().unwrap().clone())
65        }
66    }
67
68    /// Collect fresh system metrics
69    fn collect_system_metrics(&self) -> Result<SystemMetrics> {
70        Ok(SystemMetrics {
71            cpu_usage: self.get_cpu_usage()?,
72            memory_usage: self.get_memory_usage()?,
73            io_utilization: self.get_io_utilization()?,
74            network_bandwidth_usage: self.get_network_usage()?,
75            cache_performance: self.get_cache_performance()?,
76            numa_topology: self.get_numa_topology()?,
77        })
78    }
79
80    /// Get CPU usage percentage
81    fn get_cpu_usage(&self) -> Result<f64> {
82        #[cfg(target_os = "linux")]
83        {
84            self.get_linux_cpu_usage()
85        }
86        #[cfg(target_os = "windows")]
87        {
88            self.get_windows_cpu_usage()
89        }
90        #[cfg(target_os = "macos")]
91        {
92            self.get_macos_cpu_usage()
93        }
94        #[cfg(not(any(target_os = "linux", target_os = "windows", target_os = "macos")))]
95        {
96            Ok(0.5) // Default fallback
97        }
98    }
99
100    #[cfg(target_os = "linux")]
101    fn get_linux_cpu_usage(&self) -> Result<f64> {
102        // Read /proc/stat for CPU usage
103        let stat_content = std::fs::read_to_string("/proc/stat")
104            .map_err(|e| IoError::Other(format!("Failed to read /proc/stat: {}", e)))?;
105
106        if let Some(cpu_line) = stat_content.lines().next() {
107            let values: Vec<u64> = cpu_line
108                .split_whitespace()
109                .skip(1)
110                .take(4)
111                .filter_map(|s| s.parse().ok())
112                .collect();
113
114            if values.len() >= 4 {
115                let idle = values[3];
116                let total: u64 = values.iter().sum();
117                return Ok(1.0 - (idle as f64) / (total as f64));
118            }
119        }
120
121        Ok(0.5) // Fallback
122    }
123
124    #[cfg(target_os = "windows")]
125    fn get_windows_cpu_usage(&self) -> Result<f64> {
126        // Windows-specific implementation would go here
127        // For now, return a placeholder
128        Ok(0.5)
129    }
130
131    #[cfg(target_os = "macos")]
132    fn get_macos_cpu_usage(&self) -> Result<f64> {
133        // macOS-specific implementation would go here
134        // For now, return a placeholder
135        Ok(0.5)
136    }
137
138    /// Get memory usage information
139    fn get_memory_usage(&self) -> Result<MemoryUsage> {
140        #[cfg(target_os = "linux")]
141        {
142            self.get_linux_memory_usage()
143        }
144        #[cfg(not(target_os = "linux"))]
145        {
146            Ok(MemoryUsage {
147                total: 8 * 1024 * 1024 * 1024,     // 8GB fallback
148                available: 4 * 1024 * 1024 * 1024, // 4GB fallback
149                used: 4 * 1024 * 1024 * 1024,
150                utilization: 0.5,
151            })
152        }
153    }
154
155    #[cfg(target_os = "linux")]
156    fn get_linux_memory_usage(&self) -> Result<MemoryUsage> {
157        let meminfo_content = std::fs::read_to_string("/proc/meminfo")
158            .map_err(|e| IoError::Other(format!("Failed to read /proc/meminfo: {}", e)))?;
159
160        let mut total = 0u64;
161        let mut available = 0u64;
162
163        for line in meminfo_content.lines() {
164            if line.starts_with("MemTotal:") {
165                total = line
166                    .split_whitespace()
167                    .nth(1)
168                    .and_then(|s| s.parse().ok())
169                    .unwrap_or(0)
170                    * 1024; // Convert KB to bytes
171            } else if line.starts_with("MemAvailable:") {
172                available = line
173                    .split_whitespace()
174                    .nth(1)
175                    .and_then(|s| s.parse().ok())
176                    .unwrap_or(0)
177                    * 1024; // Convert KB to bytes
178            }
179        }
180
181        let used = total - available;
182        let utilization = if total > 0 {
183            used as f64 / total as f64
184        } else {
185            0.0
186        };
187
188        Ok(MemoryUsage {
189            total,
190            available,
191            used,
192            utilization,
193        })
194    }
195
196    /// Get I/O utilization
197    fn get_io_utilization(&self) -> Result<f64> {
198        // Simplified I/O utilization - could be expanded with platform-specific code
199        #[cfg(target_os = "linux")]
200        {
201            self.get_linux_io_utilization()
202        }
203        #[cfg(not(target_os = "linux"))]
204        {
205            Ok(0.3) // Placeholder
206        }
207    }
208
209    #[cfg(target_os = "linux")]
210    fn get_linux_io_utilization(&self) -> Result<f64> {
211        // Read /proc/diskstats for I/O statistics
212        // This is a simplified implementation
213        match std::fs::read_to_string("/proc/diskstats") {
214            Ok(content) => {
215                // Parse diskstats and calculate utilization
216                // For simplicity, return a placeholder
217                Ok(0.3)
218            }
219            Err(_) => Ok(0.3), // Fallback
220        }
221    }
222
223    /// Get network bandwidth usage
224    fn get_network_usage(&self) -> Result<f64> {
225        // Simplified network usage - could be expanded with platform-specific code
226        #[cfg(target_os = "linux")]
227        {
228            self.get_linux_network_usage()
229        }
230        #[cfg(not(target_os = "linux"))]
231        {
232            Ok(0.2) // Placeholder
233        }
234    }
235
236    #[cfg(target_os = "linux")]
237    fn get_linux_network_usage(&self) -> Result<f64> {
238        // Read /proc/net/dev for network statistics
239        // This is a simplified implementation
240        match std::fs::read_to_string("/proc/net/dev") {
241            Ok(content) => {
242                // Parse network stats and calculate usage
243                // For simplicity, return a placeholder
244                Ok(0.2)
245            }
246            Err(_) => Ok(0.2), // Fallback
247        }
248    }
249
250    /// Get cache performance metrics
251    fn get_cache_performance(&self) -> Result<CachePerformance> {
252        // This would typically require hardware performance counters
253        // For now, return reasonable defaults
254        Ok(CachePerformance {
255            l1_hit_rate: 0.95,
256            l2_hit_rate: 0.85,
257            l3_hit_rate: 0.75,
258            tlb_hit_rate: 0.99,
259        })
260    }
261
262    /// Get NUMA topology information
263    fn get_numa_topology(&self) -> Result<NumaTopology> {
264        #[cfg(target_os = "linux")]
265        {
266            self.get_linux_numa_topology()
267        }
268        #[cfg(not(target_os = "linux"))]
269        {
270            Ok(NumaTopology::default())
271        }
272    }
273
274    #[cfg(target_os = "linux")]
275    fn get_linux_numa_topology(&self) -> Result<NumaTopology> {
276        // Read NUMA topology from /sys/devices/system/node/
277        // This is a simplified implementation
278        match std::fs::read_dir("/sys/devices/system/node/") {
279            Ok(_entries) => {
280                // Parse NUMA node information
281                // For simplicity, return default topology
282                Ok(NumaTopology::default())
283            }
284            Err(_) => Ok(NumaTopology::default()),
285        }
286    }
287
288    /// Get metrics trend over time
289    pub fn get_metrics_trend(&self, duration: Duration) -> Vec<&SystemMetrics> {
290        let cutoff_time = Instant::now() - duration;
291        // For simplicity, return recent metrics
292        // In a real implementation, we'd need to track timestamps
293        self.metrics_history.iter().collect()
294    }
295
296    /// Check if system is under high load
297    pub fn is_high_load(&self) -> bool {
298        if let Some(metrics) = &self.cached_metrics {
299            metrics.cpu_usage > 0.8
300                || metrics.memory_usage.utilization > 0.9
301                || metrics.io_utilization > 0.8
302        } else {
303            false
304        }
305    }
306
307    /// Get resource utilization score (0.0 to 1.0)
308    pub fn get_utilization_score(&self) -> f64 {
309        if let Some(metrics) = &self.cached_metrics {
310            (metrics.cpu_usage + metrics.memory_usage.utilization + metrics.io_utilization) / 3.0
311        } else {
312            0.5
313        }
314    }
315
316    /// Predict resource pressure in near future
317    pub fn predict_resource_pressure(&self, lookahead: Duration) -> f64 {
318        // Simple linear extrapolation based on recent trends
319        if self.metrics_history.len() < 2 {
320            return self.get_utilization_score();
321        }
322
323        let recent_scores: Vec<f64> = self
324            .metrics_history
325            .iter()
326            .rev()
327            .take(10)
328            .map(|m| (m.cpu_usage + m.memory_usage.utilization + m.io_utilization) / 3.0)
329            .collect();
330
331        if recent_scores.len() < 2 {
332            return recent_scores[0];
333        }
334
335        // Calculate trend slope
336        let n = recent_scores.len() as f64;
337        let sum_x: f64 = (0..recent_scores.len()).map(|i| i as f64).sum();
338        let sum_y: f64 = recent_scores.iter().sum();
339        let sum_xy: f64 = recent_scores
340            .iter()
341            .enumerate()
342            .map(|(i, &y)| i as f64 * y)
343            .sum();
344        let sum_x2: f64 = (0..recent_scores.len()).map(|i| (i as f64).powi(2)).sum();
345
346        let slope = (n * sum_xy - sum_x * sum_y) / (n * sum_x2 - sum_x * sum_x);
347        let intercept = (sum_y - slope * sum_x) / n;
348
349        // Project into the future
350        let future_steps = lookahead.as_secs() as f64 / self.update_frequency.as_secs() as f64;
351        let predicted = intercept + slope * (n + future_steps);
352
353        predicted.clamp(0.0, 1.0)
354    }
355}