Skip to main content

openhawk_core/
resource_monitor.rs

1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use sysinfo::{Pid, System};
6use tokio::sync::mpsc;
7
8use crate::types::ProcessId;
9
10#[derive(Debug, Clone)]
11pub struct ResourceLimits {
12    pub cpu_percent: u8,
13    pub memory_mb: u64,
14    pub max_open_fds: u64,
15}
16
17#[derive(Debug, Clone)]
18pub struct ResourceSnapshot {
19    pub pid: ProcessId,
20    pub cpu_percent: f32,
21    pub memory_bytes: u64,
22}
23
24#[derive(Debug)]
25pub enum ResourceEvent {
26    MemoryExceeded { pid: ProcessId, memory_bytes: u64, limit_bytes: u64 },
27}
28
29pub struct ResourceMonitor {
30    pub limits: Arc<Mutex<HashMap<ProcessId, ResourceLimits>>>,
31    pub event_tx: mpsc::UnboundedSender<ResourceEvent>,
32    pub event_rx: Arc<Mutex<mpsc::UnboundedReceiver<ResourceEvent>>>,
33}
34
35impl ResourceMonitor {
36    pub fn new() -> Self {
37        let (tx, rx) = mpsc::unbounded_channel();
38        Self {
39            limits: Arc::new(Mutex::new(HashMap::new())),
40            event_tx: tx,
41            event_rx: Arc::new(Mutex::new(rx)),
42        }
43    }
44
45    pub fn register(&self, pid: ProcessId, limits: ResourceLimits) {
46        self.limits.lock().unwrap().insert(pid, limits);
47    }
48
49    pub fn deregister(&self, pid: ProcessId) {
50        self.limits.lock().unwrap().remove(&pid);
51    }
52
53    pub fn start_background(self: Arc<Self>, interval: Duration) {
54        let monitor = Arc::clone(&self);
55        tokio::spawn(async move {
56            let mut sys = System::new_all();
57            loop {
58                tokio::time::sleep(interval).await;
59                sys.refresh_all();
60
61                let snapshot: Vec<(ProcessId, ResourceLimits)> =
62                    monitor.limits.lock().unwrap().clone().into_iter().collect();
63
64                for (pid, limits) in snapshot {
65                    let sysinfo_pid = Pid::from_u32(pid);
66                    if let Some(proc) = sys.process(sysinfo_pid) {
67                        let mem = proc.memory();
68                        let limit_bytes = limits.memory_mb * 1024 * 1024;
69                        if mem > limit_bytes {
70                            suspend_process(pid);
71                            let _ = monitor.event_tx.send(ResourceEvent::MemoryExceeded {
72                                pid,
73                                memory_bytes: mem,
74                                limit_bytes,
75                            });
76                        }
77                    }
78                }
79            }
80        });
81    }
82
83    pub fn sample(&self, pid: ProcessId) -> Option<ResourceSnapshot> {
84        let mut sys = System::new_all();
85        sys.refresh_all();
86        sys.process(Pid::from_u32(pid)).map(|p| ResourceSnapshot {
87            pid,
88            cpu_percent: p.cpu_usage(),
89            memory_bytes: p.memory(),
90        })
91    }
92}
93
94impl Default for ResourceMonitor {
95    fn default() -> Self {
96        Self::new()
97    }
98}
99
100fn suspend_process(pid: ProcessId) {
101    #[cfg(target_family = "unix")]
102    {
103        use nix::sys::signal::{kill, Signal};
104        use nix::unistd::Pid as NixPid;
105        let _ = kill(NixPid::from_raw(pid as i32), Signal::SIGSTOP);
106    }
107    #[cfg(not(target_family = "unix"))]
108    let _ = pid;
109}
110
111#[cfg(test)]
112mod tests {
113    use super::*;
114
115    #[test]
116    fn test_register_and_deregister() {
117        let monitor = ResourceMonitor::new();
118        monitor.register(1234, ResourceLimits { cpu_percent: 25, memory_mb: 512, max_open_fds: 64 });
119        assert!(monitor.limits.lock().unwrap().contains_key(&1234));
120        monitor.deregister(1234);
121        assert!(!monitor.limits.lock().unwrap().contains_key(&1234));
122    }
123
124    #[test]
125    fn test_memory_limit_bytes_calculation() {
126        let limits = ResourceLimits { cpu_percent: 10, memory_mb: 512, max_open_fds: 64 };
127        assert_eq!(limits.memory_mb * 1024 * 1024, 512 * 1024 * 1024);
128    }
129
130    #[tokio::test]
131    async fn test_memory_exceeded_event_emitted() {
132        let monitor = ResourceMonitor::new();
133        monitor
134            .event_tx
135            .send(ResourceEvent::MemoryExceeded {
136                pid: 42,
137                memory_bytes: 600 * 1024 * 1024,
138                limit_bytes: 512 * 1024 * 1024,
139            })
140            .unwrap();
141
142        let event = monitor.event_rx.lock().unwrap().try_recv().unwrap();
143        match event {
144            ResourceEvent::MemoryExceeded { pid, memory_bytes, limit_bytes } => {
145                assert_eq!(pid, 42);
146                assert!(memory_bytes > limit_bytes);
147            }
148        }
149    }
150}