Skip to main content

oxios_kernel/
resource_monitor.rs

1//! Resource monitoring for the Oxios kernel.
2//!
3//! Collects system metrics (CPU, memory, disk) and agent-level metrics
4//! (active agents, pending tasks, token usage) to support scheduler decisions
5//! and admin API endpoints.
6
7use chrono::{DateTime, Utc};
8use parking_lot::RwLock;
9use serde::{Deserialize, Serialize};
10use std::collections::VecDeque;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
13use sysinfo::System;
14
15/// Snapshot of system and agent resource usage at a point in time.
16#[derive(Debug, Clone, Serialize, Deserialize)]
17pub struct ResourceSnapshot {
18    /// Timestamp of the snapshot.
19    pub timestamp: DateTime<Utc>,
20    /// CPU usage percentage (0.0–100.0).
21    pub cpu_percent: f32,
22    /// Memory used in megabytes.
23    pub memory_used_mb: u64,
24    /// Total memory in megabytes.
25    pub memory_total_mb: u64,
26    /// Number of currently active agents.
27    pub active_agents: usize,
28    /// Number of pending tasks in the scheduler.
29    pub pending_tasks: usize,
30    /// Cumulative token usage across all agents.
31    pub total_token_usage: u64,
32    /// Disk usage in gigabytes (estimated from workspace directory size).
33    pub disk_used_gb: f64,
34    /// 1-minute load average.
35    pub load_avg_1m: f32,
36}
37
38/// Thresholds that define an "overloaded" system.
39#[derive(Debug, Clone, Copy)]
40pub struct OverloadThreshold {
41    /// Maximum CPU percentage before considered overloaded.
42    pub cpu_percent: f32,
43    /// Maximum memory percentage before considered overloaded.
44    pub memory_percent: f32,
45    /// Maximum load average before considered overloaded.
46    pub load_avg: f32,
47}
48
49impl Default for OverloadThreshold {
50    fn default() -> Self {
51        Self {
52            cpu_percent: 90.0,
53            memory_percent: 90.0,
54            load_avg: 8.0,
55        }
56    }
57}
58
59/// Resource monitor collecting system and agent metrics.
60///
61/// Snapshots are automatically pushed to history when `record_snapshot()` is called.
62/// Use `start_sampling()` to spawn a background task that periodically records snapshots.
63pub struct ResourceMonitor {
64    /// Sampling interval in seconds.
65    interval_secs: u64,
66    /// Maximum number of history entries to retain.
67    history_max: usize,
68    history: RwLock<VecDeque<ResourceSnapshot>>,
69    total_token_usage: AtomicU64,
70    active_agents: AtomicUsize,
71    pending_tasks: AtomicUsize,
72    overload_threshold: RwLock<OverloadThreshold>,
73    sys: parking_lot::Mutex<System>,
74    /// Cached disk-usage estimate (GB) with a TTL, so `snapshot()` does not
75    /// walk the filesystem on every call. Disk usage is an approximation
76    /// anyway and does not change meaningfully within the TTL window.
77    disk_cache: parking_lot::Mutex<Option<(std::time::Instant, f64)>>,
78}
79
80impl Default for ResourceMonitor {
81    fn default() -> Self {
82        Self::new(60, 60)
83    }
84}
85
86impl ResourceMonitor {
87    /// Create a new monitor with the given sampling interval and history size.
88    pub fn new(interval_secs: u64, history_max: usize) -> Self {
89        Self {
90            interval_secs,
91            history_max,
92            history: RwLock::new(VecDeque::with_capacity(history_max)),
93            total_token_usage: AtomicU64::new(0),
94            active_agents: AtomicUsize::new(0),
95            pending_tasks: AtomicUsize::new(0),
96            overload_threshold: RwLock::new(OverloadThreshold::default()),
97            sys: parking_lot::Mutex::new(System::new_all()),
98            disk_cache: parking_lot::Mutex::new(None),
99        }
100    }
101
102    /// Take a snapshot of current resource usage.
103    ///
104    /// Uses the shared `sysinfo::System` instance (refreshed on each call)
105    /// instead of creating a new one each time.
106    pub fn snapshot(&self) -> ResourceSnapshot {
107        let mut sys = self.sys.lock();
108        sys.refresh_all();
109
110        // CPU: average across all cores
111        let cpu_percent =
112            sys.cpus().iter().map(|c| c.cpu_usage()).sum::<f32>() / sys.cpus().len().max(1) as f32;
113
114        let total_memory = sys.total_memory();
115        let used_memory = sys.used_memory();
116        let memory_total_mb = total_memory / (1024 * 1024);
117        let memory_used_mb = used_memory / (1024 * 1024);
118
119        let load_avg_1m = System::load_average().one as f32;
120
121        let disk_used_gb = self.cached_disk_usage();
122
123        ResourceSnapshot {
124            timestamp: Utc::now(),
125            cpu_percent,
126            memory_used_mb,
127            memory_total_mb,
128            active_agents: self.active_agents.load(Ordering::Relaxed),
129            pending_tasks: self.pending_tasks.load(Ordering::Relaxed),
130            total_token_usage: self.total_token_usage.load(Ordering::Relaxed),
131            disk_used_gb,
132            load_avg_1m,
133        }
134    }
135
136    /// Return the cached disk-usage estimate, refreshing it only after the TTL
137    /// expires. Walking the working directory is bounded (see `walk_dir_size`)
138    /// and never follows symlinks, so it cannot loop or run away on a large
139    /// monorepo, but it is still far too expensive to repeat on every snapshot.
140    fn cached_disk_usage(&self) -> f64 {
141        const DISK_CACHE_TTL_SECS: u64 = 300;
142        let now = std::time::Instant::now();
143        {
144            let cache = self.disk_cache.lock();
145            if let Some((ts, val)) = *cache
146                && now.duration_since(ts).as_secs() < DISK_CACHE_TTL_SECS
147            {
148                return val;
149            }
150        }
151        let cwd = std::env::current_dir().unwrap_or_default();
152        let gb = walk_dir_size(&cwd) as f64 / (1024.0 * 1024.0 * 1024.0);
153        *self.disk_cache.lock() = Some((now, gb));
154        gb
155    }
156
157    /// Record a snapshot into the history buffer.
158    ///
159    /// Call this to push the current metrics into the history ring buffer.
160    /// Oldest entries are evicted when `history_max` is reached.
161    pub fn record_snapshot(&self) {
162        let snap = self.snapshot();
163        let mut history = self.history.write();
164        if history.len() >= self.history_max {
165            history.pop_front();
166        }
167        history.push_back(snap);
168    }
169
170    /// Spawn a background task that periodically records snapshots.
171    ///
172    /// Returns a `tokio::task::JoinHandle` that can be aborted to stop sampling.
173    /// Uses the `interval_secs` configured at construction time.
174    pub fn start_sampling(self: &Arc<Self>) -> tokio::task::JoinHandle<()> {
175        let monitor = Arc::clone(self);
176        let interval = self.interval_secs;
177        tokio::spawn(async move {
178            let mut ticker = tokio::time::interval(std::time::Duration::from_secs(interval));
179            loop {
180                ticker.tick().await;
181                monitor.record_snapshot();
182            }
183        })
184    }
185
186    /// Returns historical snapshots, newest first.
187    pub fn history(&self, last_n: usize) -> Vec<ResourceSnapshot> {
188        let guard = self.history.read();
189        let n = last_n.min(guard.len());
190        guard.iter().rev().take(n).cloned().collect()
191    }
192
193    /// Returns true if the system is currently overloaded.
194    pub fn is_overloaded(&self) -> bool {
195        let snap = self.snapshot();
196        let memory_percent = if snap.memory_total_mb > 0 {
197            (snap.memory_used_mb as f32 / snap.memory_total_mb as f32) * 100.0
198        } else {
199            0.0
200        };
201
202        let t = self.overload_threshold.read();
203        snap.cpu_percent >= t.cpu_percent
204            || memory_percent >= t.memory_percent
205            || snap.load_avg_1m >= t.load_avg
206    }
207
208    /// Update the active agent count.
209    pub fn set_active_agents(&self, count: usize) {
210        self.active_agents.store(count, Ordering::Relaxed);
211    }
212
213    /// Update the pending tasks count.
214    pub fn set_pending_tasks(&self, count: usize) {
215        self.pending_tasks.store(count, Ordering::Relaxed);
216    }
217
218    /// Add to the cumulative token usage counter.
219    pub fn add_token_usage(&self, tokens: u64) {
220        self.total_token_usage.fetch_add(tokens, Ordering::Relaxed);
221    }
222
223    /// Returns a copy of the current overload threshold.
224    pub fn overload_threshold(&self) -> OverloadThreshold {
225        *self.overload_threshold.read()
226    }
227
228    /// Hot-reload overload thresholds without restart.
229    pub fn set_overload_threshold(&self, threshold: OverloadThreshold) {
230        *self.overload_threshold.write() = threshold;
231        tracing::info!("ResourceMonitor thresholds hot-reloaded");
232    }
233}
234
235/// Maximum directory depth for the disk-usage walk. Bounds stack usage and
236/// traversal time on deeply-nested trees.
237const DISK_WALK_MAX_DEPTH: u8 = 10;
238/// Maximum number of directory entries visited per walk. Bounds the total work
239/// on directories with millions of entries (e.g. `node_modules`).
240const DISK_WALK_MAX_ENTRIES: usize = 200_000;
241
242/// Recursively compute the size of a directory in bytes.
243///
244/// Safety properties:
245/// - Uses `symlink_metadata` so symlinks are never followed — this prevents
246///   cycles and prevents the walk from escaping the workspace via a symlink.
247/// - Bounded by `DISK_WALK_MAX_DEPTH` (stack-depth limit) and
248///   `DISK_WALK_MAX_ENTRIES` (per-directory entry cap) so the traversal cannot
249///   explode on huge or pathological trees.
250fn walk_dir_size(path: &std::path::Path) -> u64 {
251    fn walk(path: &std::path::Path, depth: u8) -> u64 {
252        if depth >= DISK_WALK_MAX_DEPTH {
253            return 0;
254        }
255        let mut total = 0u64;
256        let Ok(entries) = std::fs::read_dir(path) else {
257            return 0;
258        };
259        for (visited, entry) in entries.flatten().enumerate() {
260            if visited >= DISK_WALK_MAX_ENTRIES {
261                break;
262            }
263            // symlink_metadata so we never follow symlinks (no cycles, no escape).
264            let Ok(m) = std::fs::symlink_metadata(entry.path()) else {
265                continue;
266            };
267            if m.is_file() {
268                total += m.len();
269            } else if m.is_dir() {
270                total += walk(&entry.path(), depth + 1);
271            }
272        }
273        total
274    }
275    walk(path, 0)
276}
277
278#[cfg(test)]
279mod tests {
280    use super::*;
281
282    #[test]
283    fn test_snapshot_structure() {
284        let monitor = ResourceMonitor::default();
285        let snap = monitor.snapshot();
286
287        assert!(snap.timestamp <= Utc::now());
288        // CPU and memory values should be non-negative (floats can be negative)
289        assert!(snap.cpu_percent >= 0.0);
290        assert!(snap.disk_used_gb >= 0.0);
291        assert!(snap.load_avg_1m >= 0.0);
292    }
293
294    #[test]
295    fn test_is_overloaded_default_threshold() {
296        let monitor = ResourceMonitor::default();
297        // With default thresholds (90% CPU, 90% memory, load 8.0),
298        // most machines should not be overloaded unless under extreme load.
299        // This is a smoke test — the logic is correct even if the system IS overloaded.
300        let _ = monitor.is_overloaded();
301    }
302
303    #[test]
304    fn test_is_overloaded_high_thresholds_not_overloaded() {
305        // Bypass low default thresholds by using a monitor that will only
306        // be overloaded if values exceed 100% — which they never should.
307        let monitor = ResourceMonitor::default();
308        // No explicit setter for threshold; using default which is 90%.
309        // This test verifies the comparison logic doesn't panic.
310        let result = monitor.is_overloaded();
311        // We can't assert false because the system might genuinely be overloaded.
312        // Instead, just verify no panic and a bool is returned.
313        let _ = result;
314    }
315
316    #[test]
317    fn test_history_management() {
318        let monitor = ResourceMonitor::new(1, 5);
319
320        // Initially empty
321        assert!(monitor.history(10).is_empty());
322
323        // Record snapshots
324        for _ in 0..3 {
325            monitor.record_snapshot();
326        }
327
328        // History should now have 3 entries
329        let history = monitor.history(10);
330        assert_eq!(history.len(), 3);
331    }
332
333    #[test]
334    fn test_history_eviction() {
335        let monitor = ResourceMonitor::new(1, 3);
336
337        // Record more than capacity
338        for _ in 0..5 {
339            monitor.record_snapshot();
340        }
341
342        // Should only retain last 3
343        let history = monitor.history(10);
344        assert_eq!(history.len(), 3);
345    }
346
347    #[test]
348    fn test_set_active_agents() {
349        let monitor = ResourceMonitor::default();
350        monitor.set_active_agents(5);
351        let snap = monitor.snapshot();
352        assert_eq!(snap.active_agents, 5);
353    }
354
355    #[test]
356    fn test_set_pending_tasks() {
357        let monitor = ResourceMonitor::default();
358        monitor.set_pending_tasks(3);
359        let snap = monitor.snapshot();
360        assert_eq!(snap.pending_tasks, 3);
361    }
362
363    #[test]
364    fn test_add_token_usage() {
365        let monitor = ResourceMonitor::default();
366        monitor.add_token_usage(100);
367        monitor.add_token_usage(200);
368        let snap = monitor.snapshot();
369        assert_eq!(snap.total_token_usage, 300);
370    }
371
372    #[test]
373    fn test_overload_threshold_default() {
374        let threshold = OverloadThreshold::default();
375        assert_eq!(threshold.cpu_percent, 90.0);
376        assert_eq!(threshold.memory_percent, 90.0);
377        assert_eq!(threshold.load_avg, 8.0);
378    }
379
380    #[test]
381    fn test_overload_threshold_custom() {
382        let threshold = OverloadThreshold {
383            cpu_percent: 75.0,
384            memory_percent: 80.0,
385            load_avg: 4.0,
386        };
387        assert_eq!(threshold.cpu_percent, 75.0);
388        assert_eq!(threshold.memory_percent, 80.0);
389        assert_eq!(threshold.load_avg, 4.0);
390    }
391
392    #[test]
393    fn test_history_last_n() {
394        let monitor = ResourceMonitor::new(1, 10);
395        let empty = monitor.history(5);
396        assert!(empty.is_empty());
397
398        let many = monitor.history(100);
399        assert!(many.is_empty());
400    }
401
402    #[test]
403    fn test_load_average_struct() {
404        let la = System::load_average();
405        assert!(la.one >= 0.0);
406        assert!(la.five >= 0.0);
407        assert!(la.fifteen >= 0.0);
408    }
409}