oxios_kernel/
resource_monitor.rs1use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::VecDeque;
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use sysinfo::System;
14
15#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ResourceSnapshot {
18 pub timestamp: DateTime<Utc>,
20 pub cpu_percent: f32,
22 pub memory_used_mb: u64,
24 pub memory_total_mb: u64,
26 pub active_agents: usize,
28 pub pending_tasks: usize,
30 pub total_token_usage: u64,
32 pub disk_used_gb: f64,
34 pub load_avg_1m: f32,
36}
37
38#[derive(Debug, Clone, Copy)]
40pub struct OverloadThreshold {
41 pub cpu_percent: f32,
43 pub memory_percent: f32,
45 pub load_avg: f32,
47}
48
49impl Default for OverloadThreshold {
50 fn default() -> Self {
51 Self {
52 cpu_percent: 90.0,
53 memory_percent: 90.0,
54 load_avg: 8.0,
55 }
56 }
57}
58
59pub struct ResourceMonitor {
64 interval_secs: u64,
66 history_max: usize,
68 history: RwLock<VecDeque<ResourceSnapshot>>,
69 total_token_usage: AtomicU64,
70 active_agents: AtomicUsize,
71 pending_tasks: AtomicUsize,
72 overload_threshold: RwLock<OverloadThreshold>,
73 sys: parking_lot::Mutex<System>,
75}
76
77impl Default for ResourceMonitor {
78 fn default() -> Self {
79 Self::new(60, 60)
80 }
81}
82
83impl ResourceMonitor {
84 pub fn new(interval_secs: u64, history_max: usize) -> Self {
86 Self {
87 interval_secs,
88 history_max,
89 history: RwLock::new(VecDeque::with_capacity(history_max)),
90 total_token_usage: AtomicU64::new(0),
91 active_agents: AtomicUsize::new(0),
92 pending_tasks: AtomicUsize::new(0),
93 overload_threshold: RwLock::new(OverloadThreshold::default()),
94 sys: parking_lot::Mutex::new(System::new_all()),
95 }
96 }
97
98 pub fn snapshot(&self) -> ResourceSnapshot {
103 let mut sys = self.sys.lock();
104 sys.refresh_all();
105
106 let cpu_percent =
108 sys.cpus().iter().map(|c| c.cpu_usage()).sum::<f32>() / sys.cpus().len().max(1) as f32;
109
110 let total_memory = sys.total_memory();
111 let used_memory = sys.used_memory();
112 let memory_total_mb = total_memory / (1024 * 1024);
113 let memory_used_mb = used_memory / (1024 * 1024);
114
115 let load_avg_1m = System::load_average().one as f32;
116
117 let disk_used_gb = estimate_disk_usage();
118
119 ResourceSnapshot {
120 timestamp: Utc::now(),
121 cpu_percent,
122 memory_used_mb,
123 memory_total_mb,
124 active_agents: self.active_agents.load(Ordering::Relaxed),
125 pending_tasks: self.pending_tasks.load(Ordering::Relaxed),
126 total_token_usage: self.total_token_usage.load(Ordering::Relaxed),
127 disk_used_gb,
128 load_avg_1m,
129 }
130 }
131
132 pub fn record_snapshot(&self) {
137 let snap = self.snapshot();
138 let mut history = self.history.write();
139 if history.len() >= self.history_max {
140 history.pop_front();
141 }
142 history.push_back(snap);
143 }
144
145 pub fn start_sampling(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
150 let monitor = Arc::clone(self);
151 let interval = self.interval_secs;
152 tokio::spawn(async move {
153 let mut ticker = tokio::time::interval(std::time::Duration::from_secs(interval));
154 loop {
155 ticker.tick().await;
156 monitor.record_snapshot();
157 }
158 })
159 }
160
161 pub fn history(&self, last_n: usize) -> Vec<ResourceSnapshot> {
163 let guard = self.history.read();
164 let n = last_n.min(guard.len());
165 guard.iter().rev().take(n).cloned().collect()
166 }
167
168 pub fn is_overloaded(&self) -> bool {
170 let snap = self.snapshot();
171 let memory_percent = if snap.memory_total_mb > 0 {
172 (snap.memory_used_mb as f32 / snap.memory_total_mb as f32) * 100.0
173 } else {
174 0.0
175 };
176
177 let t = self.overload_threshold.read();
178 snap.cpu_percent >= t.cpu_percent
179 || memory_percent >= t.memory_percent
180 || snap.load_avg_1m >= t.load_avg
181 }
182
183 pub fn set_active_agents(&self, count: usize) {
185 self.active_agents.store(count, Ordering::Relaxed);
186 }
187
188 pub fn set_pending_tasks(&self, count: usize) {
190 self.pending_tasks.store(count, Ordering::Relaxed);
191 }
192
193 pub fn add_token_usage(&self, tokens: u64) {
195 self.total_token_usage.fetch_add(tokens, Ordering::Relaxed);
196 }
197
198 pub fn overload_threshold(&self) -> OverloadThreshold {
200 *self.overload_threshold.read()
201 }
202
203 pub fn set_overload_threshold(&self, threshold: OverloadThreshold) {
205 *self.overload_threshold.write() = threshold;
206 tracing::info!("ResourceMonitor thresholds hot-reloaded");
207 }
208}
209
210fn estimate_disk_usage() -> f64 {
213 let cwd = std::env::current_dir().unwrap_or_default();
214 walk_dir_size(&cwd) as f64 / (1024.0 * 1024.0 * 1024.0)
215}
216
217fn walk_dir_size(path: &std::path::Path) -> u64 {
219 let mut total = 0u64;
220 if let Ok(entries) = std::fs::read_dir(path) {
221 for entry in entries.flatten() {
222 let meta = entry.metadata();
223 if let Ok(m) = meta {
224 if m.is_file() {
225 total += m.len();
226 } else if m.is_dir() {
227 total += walk_dir_size(&entry.path());
228 }
229 }
230 }
231 }
232 total
233}
234
235#[cfg(test)]
236mod tests {
237 use super::*;
238
239 #[test]
240 fn test_snapshot_structure() {
241 let monitor = ResourceMonitor::default();
242 let snap = monitor.snapshot();
243
244 assert!(snap.timestamp <= Utc::now());
245 assert!(snap.cpu_percent >= 0.0);
247 assert!(snap.disk_used_gb >= 0.0);
248 assert!(snap.load_avg_1m >= 0.0);
249 }
250
251 #[test]
252 fn test_is_overloaded_default_threshold() {
253 let monitor = ResourceMonitor::default();
254 let _ = monitor.is_overloaded();
258 }
259
260 #[test]
261 fn test_is_overloaded_high_thresholds_not_overloaded() {
262 let monitor = ResourceMonitor::default();
265 let result = monitor.is_overloaded();
268 let _ = result;
271 }
272
273 #[test]
274 fn test_history_management() {
275 let monitor = ResourceMonitor::new(1, 5);
276
277 assert!(monitor.history(10).is_empty());
279
280 for _ in 0..3 {
282 monitor.record_snapshot();
283 }
284
285 let history = monitor.history(10);
287 assert_eq!(history.len(), 3);
288 }
289
290 #[test]
291 fn test_history_eviction() {
292 let monitor = ResourceMonitor::new(1, 3);
293
294 for _ in 0..5 {
296 monitor.record_snapshot();
297 }
298
299 let history = monitor.history(10);
301 assert_eq!(history.len(), 3);
302 }
303
304 #[test]
305 fn test_set_active_agents() {
306 let monitor = ResourceMonitor::default();
307 monitor.set_active_agents(5);
308 let snap = monitor.snapshot();
309 assert_eq!(snap.active_agents, 5);
310 }
311
312 #[test]
313 fn test_set_pending_tasks() {
314 let monitor = ResourceMonitor::default();
315 monitor.set_pending_tasks(3);
316 let snap = monitor.snapshot();
317 assert_eq!(snap.pending_tasks, 3);
318 }
319
320 #[test]
321 fn test_add_token_usage() {
322 let monitor = ResourceMonitor::default();
323 monitor.add_token_usage(100);
324 monitor.add_token_usage(200);
325 let snap = monitor.snapshot();
326 assert_eq!(snap.total_token_usage, 300);
327 }
328
329 #[test]
330 fn test_overload_threshold_default() {
331 let threshold = OverloadThreshold::default();
332 assert_eq!(threshold.cpu_percent, 90.0);
333 assert_eq!(threshold.memory_percent, 90.0);
334 assert_eq!(threshold.load_avg, 8.0);
335 }
336
337 #[test]
338 fn test_overload_threshold_custom() {
339 let threshold = OverloadThreshold {
340 cpu_percent: 75.0,
341 memory_percent: 80.0,
342 load_avg: 4.0,
343 };
344 assert_eq!(threshold.cpu_percent, 75.0);
345 assert_eq!(threshold.memory_percent, 80.0);
346 assert_eq!(threshold.load_avg, 4.0);
347 }
348
349 #[test]
350 fn test_history_last_n() {
351 let monitor = ResourceMonitor::new(1, 10);
352 let empty = monitor.history(5);
353 assert!(empty.is_empty());
354
355 let many = monitor.history(100);
356 assert!(many.is_empty());
357 }
358
359 #[test]
360 fn test_load_average_struct() {
361 let la = System::load_average();
362 assert!(la.one >= 0.0);
363 assert!(la.five >= 0.0);
364 assert!(la.fifteen >= 0.0);
365 }
366}