memscope_rs/async_memory/
system_monitor.rs

1//! System resource monitoring for async tasks
2//!
3//! Provides comprehensive system resource monitoring capabilities including
4//! CPU, IO, Network, and GPU usage tracking at the async task level.
5
6use serde::{Deserialize, Serialize};
7use std::collections::HashMap;
8use std::sync::atomic::{AtomicU64, Ordering};
9use std::time::Instant;
10
11use crate::async_memory::TaskId;
12
13/// Comprehensive system resource metrics for an async task
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct TaskSystemMetrics {
16    pub task_id: TaskId,
17    pub cpu_metrics: CpuMetrics,
18    pub io_metrics: IoMetrics,
19    pub network_metrics: NetworkMetrics,
20    pub gpu_metrics: Option<GpuMetrics>,
21    pub timestamp: u64,
22}
23
24/// CPU usage metrics for a task
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct CpuMetrics {
27    /// CPU time consumed by this task (in microseconds)
28    pub cpu_time_us: u64,
29    /// CPU usage percentage (0.0 - 100.0)
30    pub cpu_usage_percent: f64,
31    /// Number of context switches
32    pub context_switches: u64,
33    /// Time spent in user mode (microseconds)
34    pub user_time_us: u64,
35    /// Time spent in kernel mode (microseconds)  
36    pub kernel_time_us: u64,
37    /// CPU core affinity (which cores this task ran on)
38    pub core_affinity: Vec<u32>,
39    /// CPU cache misses
40    pub cache_misses: Option<u64>,
41    /// CPU instructions executed
42    pub instructions: Option<u64>,
43}
44
45/// IO operation metrics for a task
46#[derive(Debug, Clone, Serialize, Deserialize)]
47pub struct IoMetrics {
48    /// Total bytes read
49    pub bytes_read: u64,
50    /// Total bytes written
51    pub bytes_written: u64,
52    /// Number of read operations
53    pub read_operations: u64,
54    /// Number of write operations
55    pub write_operations: u64,
56    /// Total time spent in IO operations (microseconds)
57    pub io_wait_time_us: u64,
58    /// Average IO operation latency (microseconds)
59    pub avg_io_latency_us: f64,
60    /// IO bandwidth utilization (MB/s)
61    pub io_bandwidth_mbps: f64,
62    /// Disk queue depth
63    pub disk_queue_depth: u32,
64    /// IO operation types breakdown
65    pub io_types: IoTypeBreakdown,
66}
67
68/// Breakdown of IO operations by type
69#[derive(Debug, Clone, Serialize, Deserialize)]
70pub struct IoTypeBreakdown {
71    pub file_operations: u64,
72    pub network_operations: u64,
73    pub pipe_operations: u64,
74    pub socket_operations: u64,
75}
76
77/// Network usage metrics for a task
78#[derive(Debug, Clone, Serialize, Deserialize)]
79pub struct NetworkMetrics {
80    /// Bytes transmitted
81    pub bytes_sent: u64,
82    /// Bytes received
83    pub bytes_received: u64,
84    /// Number of packets sent
85    pub packets_sent: u64,
86    /// Number of packets received
87    pub packets_received: u64,
88    /// Number of active connections
89    pub active_connections: u32,
90    /// Network latency (milliseconds)
91    pub network_latency_ms: f64,
92    /// Bandwidth utilization (Mbps)
93    pub bandwidth_utilization_mbps: f64,
94    /// Connection types breakdown
95    pub connection_types: ConnectionTypeBreakdown,
96    /// Network errors
97    pub network_errors: u64,
98}
99
100/// Breakdown of connections by type
101#[derive(Debug, Clone, Serialize, Deserialize)]
102pub struct ConnectionTypeBreakdown {
103    pub tcp_connections: u32,
104    pub udp_connections: u32,
105    pub unix_sockets: u32,
106    pub websocket_connections: u32,
107}
108
109/// GPU usage metrics for a task (if available)
110#[derive(Debug, Clone, Serialize, Deserialize)]
111pub struct GpuMetrics {
112    /// GPU device ID
113    pub device_id: u32,
114    /// GPU device name
115    pub device_name: String,
116    /// GPU utilization percentage (0.0 - 100.0)
117    pub gpu_utilization_percent: f64,
118    /// GPU memory used by this task (bytes)
119    pub gpu_memory_used: u64,
120    /// GPU memory total (bytes)
121    pub gpu_memory_total: u64,
122    /// GPU compute operations
123    pub compute_operations: u64,
124    /// GPU memory bandwidth used (GB/s)
125    pub memory_bandwidth_gbps: f64,
126    /// GPU temperature (Celsius)
127    pub temperature_celsius: f32,
128    /// Power consumption (Watts)
129    pub power_consumption_watts: f32,
130}
131
132/// System resource monitor for async tasks
133pub struct SystemResourceMonitor {
134    /// Per-task system metrics
135    task_metrics: HashMap<TaskId, TaskSystemMetrics>,
136    /// Baseline system metrics for comparison
137    baseline_metrics: Option<SystemBaseline>,
138    /// Monitor start time
139    start_time: Instant,
140    /// Monitoring counters
141    counters: MonitoringCounters,
142}
143
144/// Baseline system metrics for calculating task-specific usage
145#[derive(Debug, Clone)]
146struct SystemBaseline {
147    _total_cpu_time: u64,
148    _total_io_bytes: u64,
149    _total_network_bytes: u64,
150    _timestamp: Instant,
151}
152
153/// Internal monitoring counters
154struct MonitoringCounters {
155    cpu_samples: AtomicU64,
156    io_samples: AtomicU64,
157    network_samples: AtomicU64,
158    gpu_samples: AtomicU64,
159}
160
161impl SystemResourceMonitor {
162    /// Create new system resource monitor
163    pub fn new() -> Self {
164        Self {
165            task_metrics: HashMap::new(),
166            baseline_metrics: None,
167            start_time: Instant::now(),
168            counters: MonitoringCounters {
169                cpu_samples: AtomicU64::new(0),
170                io_samples: AtomicU64::new(0),
171                network_samples: AtomicU64::new(0),
172                gpu_samples: AtomicU64::new(0),
173            },
174        }
175    }
176
177    /// Initialize monitoring with baseline measurements
178    pub fn initialize(&mut self) -> Result<(), std::io::Error> {
179        self.baseline_metrics = Some(SystemBaseline {
180            _total_cpu_time: self.get_system_cpu_time()?,
181            _total_io_bytes: self.get_system_io_bytes()?,
182            _total_network_bytes: self.get_system_network_bytes()?,
183            _timestamp: Instant::now(),
184        });
185        Ok(())
186    }
187
188    /// Start monitoring a specific task
189    pub fn start_task_monitoring(&mut self, task_id: TaskId) {
190        let metrics = TaskSystemMetrics {
191            task_id,
192            cpu_metrics: self.collect_cpu_metrics(task_id),
193            io_metrics: self.collect_io_metrics(task_id),
194            network_metrics: self.collect_network_metrics(task_id),
195            gpu_metrics: self.collect_gpu_metrics(task_id),
196            timestamp: current_timestamp(),
197        };
198
199        self.task_metrics.insert(task_id, metrics);
200    }
201
202    /// Update metrics for a running task
203    pub fn update_task_metrics(&mut self, task_id: TaskId) {
204        // Collect all metrics first to avoid borrowing conflicts
205        let cpu_metrics = self.collect_cpu_metrics(task_id);
206        let io_metrics = self.collect_io_metrics(task_id);
207        let network_metrics = self.collect_network_metrics(task_id);
208        let gpu_metrics = self.collect_gpu_metrics(task_id);
209        let timestamp = current_timestamp();
210
211        // Then update the stored metrics
212        if let Some(metrics) = self.task_metrics.get_mut(&task_id) {
213            metrics.cpu_metrics = cpu_metrics;
214            metrics.io_metrics = io_metrics;
215            metrics.network_metrics = network_metrics;
216            metrics.gpu_metrics = gpu_metrics;
217            metrics.timestamp = timestamp;
218        }
219    }
220
221    /// Get metrics for a specific task
222    pub fn get_task_metrics(&self, task_id: TaskId) -> Option<&TaskSystemMetrics> {
223        self.task_metrics.get(&task_id)
224    }
225
226    /// Get all task metrics
227    pub fn get_all_metrics(&self) -> &HashMap<TaskId, TaskSystemMetrics> {
228        &self.task_metrics
229    }
230
231    /// Collect CPU metrics for a task
232    fn collect_cpu_metrics(&self, task_id: TaskId) -> CpuMetrics {
233        self.counters.cpu_samples.fetch_add(1, Ordering::Relaxed);
234
235        // Simulate CPU metrics collection
236        // In real implementation, this would read from /proc/[pid]/stat,
237        // performance counters, or use platform-specific APIs
238        CpuMetrics {
239            cpu_time_us: self.get_task_cpu_time(task_id),
240            cpu_usage_percent: self.calculate_cpu_usage(task_id),
241            context_switches: self.get_context_switches(task_id),
242            user_time_us: self.get_user_time(task_id),
243            kernel_time_us: self.get_kernel_time(task_id),
244            core_affinity: self.get_core_affinity(task_id),
245            cache_misses: self.get_cache_misses(task_id),
246            instructions: self.get_instructions(task_id),
247        }
248    }
249
250    /// Collect IO metrics for a task
251    fn collect_io_metrics(&self, task_id: TaskId) -> IoMetrics {
252        self.counters.io_samples.fetch_add(1, Ordering::Relaxed);
253
254        // Simulate IO metrics collection
255        // In real implementation, this would read from /proc/[pid]/io,
256        // iotop data, or use BPF/eBPF tracing
257        IoMetrics {
258            bytes_read: self.get_bytes_read(task_id),
259            bytes_written: self.get_bytes_written(task_id),
260            read_operations: self.get_read_ops(task_id),
261            write_operations: self.get_write_ops(task_id),
262            io_wait_time_us: self.get_io_wait_time(task_id),
263            avg_io_latency_us: self.calculate_avg_io_latency(task_id),
264            io_bandwidth_mbps: self.calculate_io_bandwidth(task_id),
265            disk_queue_depth: self.get_disk_queue_depth(task_id),
266            io_types: self.get_io_type_breakdown(task_id),
267        }
268    }
269
270    /// Collect network metrics for a task
271    fn collect_network_metrics(&self, task_id: TaskId) -> NetworkMetrics {
272        self.counters
273            .network_samples
274            .fetch_add(1, Ordering::Relaxed);
275
276        // Simulate network metrics collection
277        // In real implementation, this would use netstat, ss, or netlink sockets
278        NetworkMetrics {
279            bytes_sent: self.get_bytes_sent(task_id),
280            bytes_received: self.get_bytes_received(task_id),
281            packets_sent: self.get_packets_sent(task_id),
282            packets_received: self.get_packets_received(task_id),
283            active_connections: self.get_active_connections(task_id),
284            network_latency_ms: self.measure_network_latency(task_id),
285            bandwidth_utilization_mbps: self.calculate_bandwidth_utilization(task_id),
286            connection_types: self.get_connection_breakdown(task_id),
287            network_errors: self.get_network_errors(task_id),
288        }
289    }
290
291    /// Collect GPU metrics for a task (if GPU is available)
292    fn collect_gpu_metrics(&self, task_id: TaskId) -> Option<GpuMetrics> {
293        if !self.is_gpu_available() {
294            return None;
295        }
296
297        self.counters.gpu_samples.fetch_add(1, Ordering::Relaxed);
298
299        // Simulate GPU metrics collection
300        // In real implementation, this would use NVIDIA ML, ROCm, or Intel GPU APIs
301        Some(GpuMetrics {
302            device_id: 0,
303            device_name: self.get_gpu_device_name(),
304            gpu_utilization_percent: self.get_gpu_utilization(task_id),
305            gpu_memory_used: self.get_gpu_memory_used(task_id),
306            gpu_memory_total: self.get_gpu_memory_total(),
307            compute_operations: self.get_gpu_compute_ops(task_id),
308            memory_bandwidth_gbps: self.get_gpu_memory_bandwidth(task_id),
309            temperature_celsius: self.get_gpu_temperature(),
310            power_consumption_watts: self.get_gpu_power_consumption(),
311        })
312    }
313
314    // Simulation methods for demonstration
315    // In real implementation, these would interface with actual system APIs
316
317    fn get_system_cpu_time(&self) -> Result<u64, std::io::Error> {
318        // Read from /proc/stat or equivalent
319        Ok(1000000) // Simulated value
320    }
321
322    fn get_system_io_bytes(&self) -> Result<u64, std::io::Error> {
323        // Read from /proc/diskstats or equivalent
324        Ok(50000000) // Simulated value
325    }
326
327    fn get_system_network_bytes(&self) -> Result<u64, std::io::Error> {
328        // Read from /proc/net/dev or equivalent
329        Ok(25000000) // Simulated value
330    }
331
332    fn get_task_cpu_time(&self, task_id: TaskId) -> u64 {
333        // Simulate CPU time based on task ID and elapsed time
334        let elapsed = self.start_time.elapsed().as_micros() as u64;
335        (task_id as u64 * 1000) + (elapsed / 100)
336    }
337
338    fn calculate_cpu_usage(&self, task_id: TaskId) -> f64 {
339        // Simulate CPU usage calculation
340        let base_usage = (task_id as f64 % 100.0) / 10.0;
341        let time_factor = (self.start_time.elapsed().as_secs() as f64).sin().abs();
342        (base_usage + time_factor * 5.0).min(100.0)
343    }
344
345    fn get_context_switches(&self, task_id: TaskId) -> u64 {
346        (task_id as u64 * 10) + (self.start_time.elapsed().as_millis() as u64 / 100)
347    }
348
349    fn get_user_time(&self, task_id: TaskId) -> u64 {
350        self.get_task_cpu_time(task_id) * 70 / 100 // 70% user time
351    }
352
353    fn get_kernel_time(&self, task_id: TaskId) -> u64 {
354        self.get_task_cpu_time(task_id) * 30 / 100 // 30% kernel time
355    }
356
357    fn get_core_affinity(&self, _task_id: TaskId) -> Vec<u32> {
358        vec![0, 1, 2, 3] // Simulate running on cores 0-3
359    }
360
361    fn get_cache_misses(&self, task_id: TaskId) -> Option<u64> {
362        Some(task_id as u64 * 500 + self.start_time.elapsed().as_millis() as u64)
363    }
364
365    fn get_instructions(&self, task_id: TaskId) -> Option<u64> {
366        Some(task_id as u64 * 1000000 + self.start_time.elapsed().as_micros() as u64 * 10)
367    }
368
369    fn get_bytes_read(&self, task_id: TaskId) -> u64 {
370        task_id as u64 * 4096 + self.start_time.elapsed().as_millis() as u64 * 100
371    }
372
373    fn get_bytes_written(&self, task_id: TaskId) -> u64 {
374        task_id as u64 * 2048 + self.start_time.elapsed().as_millis() as u64 * 50
375    }
376
377    fn get_read_ops(&self, task_id: TaskId) -> u64 {
378        task_id as u64 * 10 + self.start_time.elapsed().as_millis() as u64 / 10
379    }
380
381    fn get_write_ops(&self, task_id: TaskId) -> u64 {
382        task_id as u64 * 5 + self.start_time.elapsed().as_millis() as u64 / 20
383    }
384
385    fn get_io_wait_time(&self, task_id: TaskId) -> u64 {
386        task_id as u64 * 1000 + self.start_time.elapsed().as_micros() as u64 / 10
387    }
388
389    fn calculate_avg_io_latency(&self, task_id: TaskId) -> f64 {
390        let base_latency = 100.0 + (task_id as f64 % 50.0);
391        let load_factor = (self.start_time.elapsed().as_secs() as f64 / 10.0).sin() + 1.0;
392        base_latency * load_factor
393    }
394
395    fn calculate_io_bandwidth(&self, task_id: TaskId) -> f64 {
396        let total_bytes = self.get_bytes_read(task_id) + self.get_bytes_written(task_id);
397        let elapsed_secs = self.start_time.elapsed().as_secs_f64().max(1.0);
398        (total_bytes as f64 / 1_048_576.0) / elapsed_secs // MB/s
399    }
400
401    fn get_disk_queue_depth(&self, _task_id: TaskId) -> u32 {
402        // Simulate variable queue depth
403        ((self.start_time.elapsed().as_secs() % 10) + 1) as u32
404    }
405
406    fn get_io_type_breakdown(&self, task_id: TaskId) -> IoTypeBreakdown {
407        let total_ops = self.get_read_ops(task_id) + self.get_write_ops(task_id);
408        IoTypeBreakdown {
409            file_operations: total_ops * 60 / 100,
410            network_operations: total_ops * 25 / 100,
411            pipe_operations: total_ops * 10 / 100,
412            socket_operations: total_ops * 5 / 100,
413        }
414    }
415
416    fn get_bytes_sent(&self, task_id: TaskId) -> u64 {
417        task_id as u64 * 1024 + self.start_time.elapsed().as_millis() as u64 * 200
418    }
419
420    fn get_bytes_received(&self, task_id: TaskId) -> u64 {
421        task_id as u64 * 2048 + self.start_time.elapsed().as_millis() as u64 * 300
422    }
423
424    fn get_packets_sent(&self, task_id: TaskId) -> u64 {
425        self.get_bytes_sent(task_id) / 1500 // Assume 1500 byte packets
426    }
427
428    fn get_packets_received(&self, task_id: TaskId) -> u64 {
429        self.get_bytes_received(task_id) / 1500
430    }
431
432    fn get_active_connections(&self, task_id: TaskId) -> u32 {
433        ((task_id as u32 % 10) + 1) + (self.start_time.elapsed().as_secs() as u32 % 5)
434    }
435
436    fn measure_network_latency(&self, _task_id: TaskId) -> f64 {
437        // Simulate network latency with some variation
438        10.0 + (self.start_time.elapsed().as_millis() as f64 % 100.0) / 10.0
439    }
440
441    fn calculate_bandwidth_utilization(&self, task_id: TaskId) -> f64 {
442        let total_bytes = self.get_bytes_sent(task_id) + self.get_bytes_received(task_id);
443        let elapsed_secs = self.start_time.elapsed().as_secs_f64().max(1.0);
444        (total_bytes as f64 * 8.0) / (1_000_000.0 * elapsed_secs) // Mbps
445    }
446
447    fn get_connection_breakdown(&self, task_id: TaskId) -> ConnectionTypeBreakdown {
448        let total_conn = self.get_active_connections(task_id);
449        ConnectionTypeBreakdown {
450            tcp_connections: total_conn * 70 / 100,
451            udp_connections: total_conn * 20 / 100,
452            unix_sockets: total_conn * 5 / 100,
453            websocket_connections: total_conn * 5 / 100,
454        }
455    }
456
457    fn get_network_errors(&self, task_id: TaskId) -> u64 {
458        (task_id as u64 % 100) + self.start_time.elapsed().as_secs()
459    }
460
461    fn is_gpu_available(&self) -> bool {
462        // In real implementation, check for NVIDIA, AMD, or Intel GPU
463        true // Simulate GPU availability
464    }
465
466    fn get_gpu_device_name(&self) -> String {
467        "NVIDIA GeForce RTX 4090".to_string() // Simulated
468    }
469
470    fn get_gpu_utilization(&self, task_id: TaskId) -> f64 {
471        let base_util = (task_id as f64 % 80.0) + 10.0;
472        let time_factor = (self.start_time.elapsed().as_secs() as f64 / 5.0)
473            .sin()
474            .abs();
475        (base_util + time_factor * 15.0).min(100.0)
476    }
477
478    fn get_gpu_memory_used(&self, task_id: TaskId) -> u64 {
479        let base_mem = (task_id as u64 % 1000) * 1_048_576; // Base MB in bytes
480        base_mem + (self.start_time.elapsed().as_millis() as u64 * 1024) % 1_073_741_824
481    }
482
483    fn get_gpu_memory_total(&self) -> u64 {
484        24 * 1024 * 1024 * 1024 // 24GB in bytes
485    }
486
487    fn get_gpu_compute_ops(&self, task_id: TaskId) -> u64 {
488        task_id as u64 * 1000000 + self.start_time.elapsed().as_micros() as u64 * 100
489    }
490
491    fn get_gpu_memory_bandwidth(&self, task_id: TaskId) -> f64 {
492        let base_bandwidth = 900.0 + (task_id as f64 % 100.0); // GB/s
493        let utilization_factor = self.get_gpu_utilization(task_id) / 100.0;
494        base_bandwidth * utilization_factor
495    }
496
497    fn get_gpu_temperature(&self) -> f32 {
498        65.0 + (self.start_time.elapsed().as_secs() as f32 % 20.0) // 65-85°C
499    }
500
501    fn get_gpu_power_consumption(&self) -> f32 {
502        300.0 + (self.start_time.elapsed().as_secs() as f32 % 150.0) // 300-450W
503    }
504}
505
506impl Default for SystemResourceMonitor {
507    fn default() -> Self {
508        Self::new()
509    }
510}
511
512/// Get current timestamp
513fn current_timestamp() -> u64 {
514    use std::time::{SystemTime, UNIX_EPOCH};
515    SystemTime::now()
516        .duration_since(UNIX_EPOCH)
517        .unwrap()
518        .as_millis() as u64
519}
520
521#[cfg(test)]
522mod tests {
523    use super::*;
524
525    #[test]
526    fn test_system_monitor_creation() {
527        let monitor = SystemResourceMonitor::new();
528        assert!(monitor.task_metrics.is_empty());
529        assert!(monitor.baseline_metrics.is_none());
530    }
531
532    #[test]
533    fn test_task_monitoring() {
534        let mut monitor = SystemResourceMonitor::new();
535        let task_id = 1234;
536
537        monitor.start_task_monitoring(task_id);
538        assert!(monitor.task_metrics.contains_key(&task_id));
539
540        let metrics = monitor.get_task_metrics(task_id).unwrap();
541        assert_eq!(metrics.task_id, task_id);
542        assert!(metrics.cpu_metrics.cpu_time_us > 0);
543        // bytes_read and bytes_sent are u64, always >= 0, so no need to check
544    }
545
546    #[test]
547    fn test_cpu_metrics_calculation() {
548        let monitor = SystemResourceMonitor::new();
549        let cpu_metrics = monitor.collect_cpu_metrics(1000);
550
551        assert!(cpu_metrics.cpu_usage_percent >= 0.0);
552        assert!(cpu_metrics.cpu_usage_percent <= 100.0);
553        assert!(cpu_metrics.user_time_us <= cpu_metrics.cpu_time_us);
554        assert!(cpu_metrics.kernel_time_us <= cpu_metrics.cpu_time_us);
555        assert!(!cpu_metrics.core_affinity.is_empty());
556    }
557
558    #[test]
559    fn test_io_metrics_calculation() {
560        let monitor = SystemResourceMonitor::new();
561        let io_metrics = monitor.collect_io_metrics(2000);
562
563        // bytes_read and bytes_written are u64, always >= 0
564        assert!(io_metrics.avg_io_latency_us > 0.0);
565        assert!(io_metrics.disk_queue_depth > 0);
566    }
567
568    #[test]
569    fn test_network_metrics_calculation() {
570        let monitor = SystemResourceMonitor::new();
571        let network_metrics = monitor.collect_network_metrics(3000);
572
573        // bytes_sent and bytes_received are u64, always >= 0
574        assert!(network_metrics.active_connections > 0);
575        assert!(network_metrics.network_latency_ms > 0.0);
576    }
577
578    #[test]
579    fn test_gpu_metrics_availability() {
580        let monitor = SystemResourceMonitor::new();
581        let gpu_metrics = monitor.collect_gpu_metrics(4000);
582
583        if let Some(gpu) = gpu_metrics {
584            assert!(gpu.gpu_utilization_percent >= 0.0);
585            assert!(gpu.gpu_utilization_percent <= 100.0);
586            assert!(gpu.gpu_memory_used <= gpu.gpu_memory_total);
587            assert!(!gpu.device_name.is_empty());
588        }
589    }
590}