Skip to main content

oxios_kernel/
resource_monitor.rs

1//! Resource monitoring for the Oxios kernel.
2//!
3//! Collects system metrics (CPU, memory, disk) and agent-level metrics
4//! (active agents, pending tasks, token usage) to support scheduler decisions
5//! and admin API endpoints.
6
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::VecDeque;
11use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
12use std::sync::Arc;
13use sysinfo::System;
14
15/// Snapshot of system and agent resource usage at a point in time.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ResourceSnapshot {
18    /// Timestamp of the snapshot.
19    pub timestamp: DateTime<Utc>,
20    /// CPU usage percentage (0.0–100.0).
21    pub cpu_percent: f32,
22    /// Memory used in megabytes.
23    pub memory_used_mb: u64,
24    /// Total memory in megabytes.
25    pub memory_total_mb: u64,
26    /// Number of currently active agents.
27    pub active_agents: usize,
28    /// Number of pending tasks in the scheduler.
29    pub pending_tasks: usize,
30    /// Cumulative token usage across all agents.
31    pub total_token_usage: u64,
32    /// Disk usage in gigabytes (estimated from workspace directory size).
33    pub disk_used_gb: f64,
34    /// 1-minute load average.
35    pub load_avg_1m: f32,
36}
37
38/// Thresholds that define an "overloaded" system.
39#[derive(Debug, Clone, Copy)]
40pub struct OverloadThreshold {
41    /// Maximum CPU percentage before considered overloaded.
42    pub cpu_percent: f32,
43    /// Maximum memory percentage before considered overloaded.
44    pub memory_percent: f32,
45    /// Maximum load average before considered overloaded.
46    pub load_avg: f32,
47}
48
49impl Default for OverloadThreshold {
50    fn default() -> Self {
51        Self {
52            cpu_percent: 90.0,
53            memory_percent: 90.0,
54            load_avg: 8.0,
55        }
56    }
57}
58
59/// Resource monitor collecting system and agent metrics.
60///
61/// Snapshots are automatically pushed to history when `record_snapshot()` is called.
62/// Use `start_sampling()` to spawn a background task that periodically records snapshots.
63pub struct ResourceMonitor {
64    /// Sampling interval in seconds.
65    interval_secs: u64,
66    /// Maximum number of history entries to retain.
67    history_max: usize,
68    history: RwLock<VecDeque<ResourceSnapshot>>,
69    total_token_usage: AtomicU64,
70    active_agents: AtomicUsize,
71    pending_tasks: AtomicUsize,
72    overload_threshold: OverloadThreshold,
73    /// Shared `sysinfo::System` instance to avoid recreating on every snapshot.
74    sys: parking_lot::Mutex<System>,
75}
76
77impl Default for ResourceMonitor {
78    fn default() -> Self {
79        Self::new(60, 60)
80    }
81}
82
83impl ResourceMonitor {
84    /// Create a new monitor with the given sampling interval and history size.
85    pub fn new(interval_secs: u64, history_max: usize) -> Self {
86        Self {
87            interval_secs,
88            history_max,
89            history: RwLock::new(VecDeque::with_capacity(history_max)),
90            total_token_usage: AtomicU64::new(0),
91            active_agents: AtomicUsize::new(0),
92            pending_tasks: AtomicUsize::new(0),
93            overload_threshold: OverloadThreshold::default(),
94            sys: parking_lot::Mutex::new(System::new_all()),
95        }
96    }
97
98    /// Take a snapshot of current resource usage.
99    ///
100    /// Uses the shared `sysinfo::System` instance (refreshed on each call)
101    /// instead of creating a new one each time.
102    pub fn snapshot(&self) -> ResourceSnapshot {
103        let mut sys = self.sys.lock();
104        sys.refresh_all();
105
106        // CPU: average across all cores
107        let cpu_percent =
108            sys.cpus().iter().map(|c| c.cpu_usage()).sum::<f32>() / sys.cpus().len().max(1) as f32;
109
110        let total_memory = sys.total_memory();
111        let used_memory = sys.used_memory();
112        let memory_total_mb = total_memory / (1024 * 1024);
113        let memory_used_mb = used_memory / (1024 * 1024);
114
115        let load_avg_1m = System::load_average().one as f32;
116
117        let disk_used_gb = estimate_disk_usage();
118
119        ResourceSnapshot {
120            timestamp: Utc::now(),
121            cpu_percent,
122            memory_used_mb,
123            memory_total_mb,
124            active_agents: self.active_agents.load(Ordering::Relaxed),
125            pending_tasks: self.pending_tasks.load(Ordering::Relaxed),
126            total_token_usage: self.total_token_usage.load(Ordering::Relaxed),
127            disk_used_gb,
128            load_avg_1m,
129        }
130    }
131
132    /// Record a snapshot into the history buffer.
133    ///
134    /// Call this to push the current metrics into the history ring buffer.
135    /// Oldest entries are evicted when `history_max` is reached.
136    pub fn record_snapshot(&self) {
137        let snap = self.snapshot();
138        let mut history = self.history.write();
139        if history.len() >= self.history_max {
140            history.pop_front();
141        }
142        history.push_back(snap);
143    }
144
145    /// Spawn a background task that periodically records snapshots.
146    ///
147    /// Returns a `tokio::task::JoinHandle` that can be aborted to stop sampling.
148    /// Uses the `interval_secs` configured at construction time.
149    pub fn start_sampling(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
150        let monitor = Arc::clone(self);
151        let interval = self.interval_secs;
152        tokio::spawn(async move {
153            let mut ticker = tokio::time::interval(std::time::Duration::from_secs(interval));
154            loop {
155                ticker.tick().await;
156                monitor.record_snapshot();
157            }
158        })
159    }
160
161    /// Returns historical snapshots, newest first.
162    pub fn history(&self, last_n: usize) -> Vec<ResourceSnapshot> {
163        let guard = self.history.read();
164        let n = last_n.min(guard.len());
165        guard.iter().rev().take(n).cloned().collect()
166    }
167
168    /// Returns true if the system is currently overloaded.
169    pub fn is_overloaded(&self) -> bool {
170        let snap = self.snapshot();
171        let memory_percent = if snap.memory_total_mb > 0 {
172            (snap.memory_used_mb as f32 / snap.memory_total_mb as f32) * 100.0
173        } else {
174            0.0
175        };
176
177        snap.cpu_percent >= self.overload_threshold.cpu_percent
178            || memory_percent >= self.overload_threshold.memory_percent
179            || snap.load_avg_1m >= self.overload_threshold.load_avg
180    }
181
182    /// Update the active agent count.
183    pub fn set_active_agents(&self, count: usize) {
184        self.active_agents.store(count, Ordering::Relaxed);
185    }
186
187    /// Update the pending tasks count.
188    pub fn set_pending_tasks(&self, count: usize) {
189        self.pending_tasks.store(count, Ordering::Relaxed);
190    }
191
192    /// Add to the cumulative token usage counter.
193    pub fn add_token_usage(&self, tokens: u64) {
194        self.total_token_usage.fetch_add(tokens, Ordering::Relaxed);
195    }
196
197    /// Returns a copy of the current overload threshold.
198    pub fn overload_threshold(&self) -> OverloadThreshold {
199        self.overload_threshold
200    }
201}
202
203/// Estimate disk usage by walking the current working directory.
204/// Returns size in gigabytes.
205fn estimate_disk_usage() -> f64 {
206    let cwd = std::env::current_dir().unwrap_or_default();
207    walk_dir_size(&cwd) as f64 / (1024.0 * 1024.0 * 1024.0)
208}
209
210/// Recursively compute the size of a directory in bytes.
211fn walk_dir_size(path: &std::path::Path) -> u64 {
212    let mut total = 0u64;
213    if let Ok(entries) = std::fs::read_dir(path) {
214        for entry in entries.flatten() {
215            let meta = entry.metadata();
216            if let Ok(m) = meta {
217                if m.is_file() {
218                    total += m.len();
219                } else if m.is_dir() {
220                    total += walk_dir_size(&entry.path());
221                }
222            }
223        }
224    }
225    total
226}
227
228#[cfg(test)]
229mod tests {
230    use super::*;
231
232    #[test]
233    fn test_snapshot_structure() {
234        let monitor = ResourceMonitor::default();
235        let snap = monitor.snapshot();
236
237        assert!(snap.timestamp <= Utc::now());
238        // CPU and memory values should be non-negative (floats can be negative)
239        assert!(snap.cpu_percent >= 0.0);
240        assert!(snap.disk_used_gb >= 0.0);
241        assert!(snap.load_avg_1m >= 0.0);
242    }
243
244    #[test]
245    fn test_is_overloaded_default_threshold() {
246        let monitor = ResourceMonitor::default();
247        // With default thresholds (90% CPU, 90% memory, load 8.0),
248        // most machines should not be overloaded unless under extreme load.
249        // This is a smoke test — the logic is correct even if the system IS overloaded.
250        let _ = monitor.is_overloaded();
251    }
252
253    #[test]
254    fn test_is_overloaded_high_thresholds_not_overloaded() {
255        // Bypass low default thresholds by using a monitor that will only
256        // be overloaded if values exceed 100% — which they never should.
257        let monitor = ResourceMonitor::default();
258        // No explicit setter for threshold; using default which is 90%.
259        // This test verifies the comparison logic doesn't panic.
260        let result = monitor.is_overloaded();
261        // We can't assert false because the system might genuinely be overloaded.
262        // Instead, just verify no panic and a bool is returned.
263        let _ = result;
264    }
265
266    #[test]
267    fn test_history_management() {
268        let monitor = ResourceMonitor::new(1, 5);
269
270        // Initially empty
271        assert!(monitor.history(10).is_empty());
272
273        // Record snapshots
274        for _ in 0..3 {
275            monitor.record_snapshot();
276        }
277
278        // History should now have 3 entries
279        let history = monitor.history(10);
280        assert_eq!(history.len(), 3);
281    }
282
283    #[test]
284    fn test_history_eviction() {
285        let monitor = ResourceMonitor::new(1, 3);
286
287        // Record more than capacity
288        for _ in 0..5 {
289            monitor.record_snapshot();
290        }
291
292        // Should only retain last 3
293        let history = monitor.history(10);
294        assert_eq!(history.len(), 3);
295    }
296
297    #[test]
298    fn test_set_active_agents() {
299        let monitor = ResourceMonitor::default();
300        monitor.set_active_agents(5);
301        let snap = monitor.snapshot();
302        assert_eq!(snap.active_agents, 5);
303    }
304
305    #[test]
306    fn test_set_pending_tasks() {
307        let monitor = ResourceMonitor::default();
308        monitor.set_pending_tasks(3);
309        let snap = monitor.snapshot();
310        assert_eq!(snap.pending_tasks, 3);
311    }
312
313    #[test]
314    fn test_add_token_usage() {
315        let monitor = ResourceMonitor::default();
316        monitor.add_token_usage(100);
317        monitor.add_token_usage(200);
318        let snap = monitor.snapshot();
319        assert_eq!(snap.total_token_usage, 300);
320    }
321
322    #[test]
323    fn test_overload_threshold_default() {
324        let threshold = OverloadThreshold::default();
325        assert_eq!(threshold.cpu_percent, 90.0);
326        assert_eq!(threshold.memory_percent, 90.0);
327        assert_eq!(threshold.load_avg, 8.0);
328    }
329
330    #[test]
331    fn test_overload_threshold_custom() {
332        let threshold = OverloadThreshold {
333            cpu_percent: 75.0,
334            memory_percent: 80.0,
335            load_avg: 4.0,
336        };
337        assert_eq!(threshold.cpu_percent, 75.0);
338        assert_eq!(threshold.memory_percent, 80.0);
339        assert_eq!(threshold.load_avg, 4.0);
340    }
341
342    #[test]
343    fn test_history_last_n() {
344        let monitor = ResourceMonitor::new(1, 10);
345        let empty = monitor.history(5);
346        assert!(empty.is_empty());
347
348        let many = monitor.history(100);
349        assert!(many.is_empty());
350    }
351
352    #[test]
353    fn test_load_average_struct() {
354        let la = System::load_average();
355        assert!(la.one >= 0.0);
356        assert!(la.five >= 0.0);
357        assert!(la.fifteen >= 0.0);
358    }
359}