openhawk_core/
resource_monitor.rs1use std::collections::HashMap;
2use std::sync::{Arc, Mutex};
3use std::time::Duration;
4
5use sysinfo::{Pid, System};
6use tokio::sync::mpsc;
7
8use crate::types::ProcessId;
9
10#[derive(Debug, Clone)]
11pub struct ResourceLimits {
12 pub cpu_percent: u8,
13 pub memory_mb: u64,
14 pub max_open_fds: u64,
15}
16
17#[derive(Debug, Clone)]
18pub struct ResourceSnapshot {
19 pub pid: ProcessId,
20 pub cpu_percent: f32,
21 pub memory_bytes: u64,
22}
23
24#[derive(Debug)]
25pub enum ResourceEvent {
26 MemoryExceeded { pid: ProcessId, memory_bytes: u64, limit_bytes: u64 },
27}
28
29pub struct ResourceMonitor {
30 pub limits: Arc<Mutex<HashMap<ProcessId, ResourceLimits>>>,
31 pub event_tx: mpsc::UnboundedSender<ResourceEvent>,
32 pub event_rx: Arc<Mutex<mpsc::UnboundedReceiver<ResourceEvent>>>,
33}
34
35impl ResourceMonitor {
36 pub fn new() -> Self {
37 let (tx, rx) = mpsc::unbounded_channel();
38 Self {
39 limits: Arc::new(Mutex::new(HashMap::new())),
40 event_tx: tx,
41 event_rx: Arc::new(Mutex::new(rx)),
42 }
43 }
44
45 pub fn register(&self, pid: ProcessId, limits: ResourceLimits) {
46 self.limits.lock().unwrap().insert(pid, limits);
47 }
48
49 pub fn deregister(&self, pid: ProcessId) {
50 self.limits.lock().unwrap().remove(&pid);
51 }
52
53 pub fn start_background(self: Arc<Self>, interval: Duration) {
54 let monitor = Arc::clone(&self);
55 tokio::spawn(async move {
56 let mut sys = System::new_all();
57 loop {
58 tokio::time::sleep(interval).await;
59 sys.refresh_all();
60
61 let snapshot: Vec<(ProcessId, ResourceLimits)> =
62 monitor.limits.lock().unwrap().clone().into_iter().collect();
63
64 for (pid, limits) in snapshot {
65 let sysinfo_pid = Pid::from_u32(pid);
66 if let Some(proc) = sys.process(sysinfo_pid) {
67 let mem = proc.memory();
68 let limit_bytes = limits.memory_mb * 1024 * 1024;
69 if mem > limit_bytes {
70 suspend_process(pid);
71 let _ = monitor.event_tx.send(ResourceEvent::MemoryExceeded {
72 pid,
73 memory_bytes: mem,
74 limit_bytes,
75 });
76 }
77 }
78 }
79 }
80 });
81 }
82
83 pub fn sample(&self, pid: ProcessId) -> Option<ResourceSnapshot> {
84 let mut sys = System::new_all();
85 sys.refresh_all();
86 sys.process(Pid::from_u32(pid)).map(|p| ResourceSnapshot {
87 pid,
88 cpu_percent: p.cpu_usage(),
89 memory_bytes: p.memory(),
90 })
91 }
92}
93
94impl Default for ResourceMonitor {
95 fn default() -> Self {
96 Self::new()
97 }
98}
99
100fn suspend_process(pid: ProcessId) {
101 #[cfg(target_family = "unix")]
102 {
103 use nix::sys::signal::{kill, Signal};
104 use nix::unistd::Pid as NixPid;
105 let _ = kill(NixPid::from_raw(pid as i32), Signal::SIGSTOP);
106 }
107 #[cfg(not(target_family = "unix"))]
108 let _ = pid;
109}
110
111#[cfg(test)]
112mod tests {
113 use super::*;
114
115 #[test]
116 fn test_register_and_deregister() {
117 let monitor = ResourceMonitor::new();
118 monitor.register(1234, ResourceLimits { cpu_percent: 25, memory_mb: 512, max_open_fds: 64 });
119 assert!(monitor.limits.lock().unwrap().contains_key(&1234));
120 monitor.deregister(1234);
121 assert!(!monitor.limits.lock().unwrap().contains_key(&1234));
122 }
123
124 #[test]
125 fn test_memory_limit_bytes_calculation() {
126 let limits = ResourceLimits { cpu_percent: 10, memory_mb: 512, max_open_fds: 64 };
127 assert_eq!(limits.memory_mb * 1024 * 1024, 512 * 1024 * 1024);
128 }
129
130 #[tokio::test]
131 async fn test_memory_exceeded_event_emitted() {
132 let monitor = ResourceMonitor::new();
133 monitor
134 .event_tx
135 .send(ResourceEvent::MemoryExceeded {
136 pid: 42,
137 memory_bytes: 600 * 1024 * 1024,
138 limit_bytes: 512 * 1024 * 1024,
139 })
140 .unwrap();
141
142 let event = monitor.event_rx.lock().unwrap().try_recv().unwrap();
143 match event {
144 ResourceEvent::MemoryExceeded { pid, memory_bytes, limit_bytes } => {
145 assert_eq!(pid, 42);
146 assert!(memory_bytes > limit_bytes);
147 }
148 }
149 }
150}