openhawk-core 0.1.0

Core runtime for OpenHawk Agent OS — agent lifecycle, orchestration, config, healing
Documentation
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;

use sysinfo::{Pid, System};
use tokio::sync::mpsc;

use crate::types::ProcessId;

#[derive(Debug, Clone)]
pub struct ResourceLimits {
    pub cpu_percent: u8,
    pub memory_mb: u64,
    pub max_open_fds: u64,
}

#[derive(Debug, Clone)]
pub struct ResourceSnapshot {
    pub pid: ProcessId,
    pub cpu_percent: f32,
    pub memory_bytes: u64,
}

#[derive(Debug)]
pub enum ResourceEvent {
    MemoryExceeded { pid: ProcessId, memory_bytes: u64, limit_bytes: u64 },
}

pub struct ResourceMonitor {
    pub limits: Arc<Mutex<HashMap<ProcessId, ResourceLimits>>>,
    pub event_tx: mpsc::UnboundedSender<ResourceEvent>,
    pub event_rx: Arc<Mutex<mpsc::UnboundedReceiver<ResourceEvent>>>,
}

impl ResourceMonitor {
    pub fn new() -> Self {
        let (tx, rx) = mpsc::unbounded_channel();
        Self {
            limits: Arc::new(Mutex::new(HashMap::new())),
            event_tx: tx,
            event_rx: Arc::new(Mutex::new(rx)),
        }
    }

    pub fn register(&self, pid: ProcessId, limits: ResourceLimits) {
        self.limits.lock().unwrap().insert(pid, limits);
    }

    pub fn deregister(&self, pid: ProcessId) {
        self.limits.lock().unwrap().remove(&pid);
    }

    pub fn start_background(self: Arc<Self>, interval: Duration) {
        let monitor = Arc::clone(&self);
        tokio::spawn(async move {
            let mut sys = System::new_all();
            loop {
                tokio::time::sleep(interval).await;
                sys.refresh_all();

                let snapshot: Vec<(ProcessId, ResourceLimits)> =
                    monitor.limits.lock().unwrap().clone().into_iter().collect();

                for (pid, limits) in snapshot {
                    let sysinfo_pid = Pid::from_u32(pid);
                    if let Some(proc) = sys.process(sysinfo_pid) {
                        let mem = proc.memory();
                        let limit_bytes = limits.memory_mb * 1024 * 1024;
                        if mem > limit_bytes {
                            suspend_process(pid);
                            let _ = monitor.event_tx.send(ResourceEvent::MemoryExceeded {
                                pid,
                                memory_bytes: mem,
                                limit_bytes,
                            });
                        }
                    }
                }
            }
        });
    }

    pub fn sample(&self, pid: ProcessId) -> Option<ResourceSnapshot> {
        let mut sys = System::new_all();
        sys.refresh_all();
        sys.process(Pid::from_u32(pid)).map(|p| ResourceSnapshot {
            pid,
            cpu_percent: p.cpu_usage(),
            memory_bytes: p.memory(),
        })
    }
}

impl Default for ResourceMonitor {
    fn default() -> Self {
        Self::new()
    }
}

fn suspend_process(pid: ProcessId) {
    #[cfg(target_family = "unix")]
    {
        use nix::sys::signal::{kill, Signal};
        use nix::unistd::Pid as NixPid;
        let _ = kill(NixPid::from_raw(pid as i32), Signal::SIGSTOP);
    }
    #[cfg(not(target_family = "unix"))]
    let _ = pid;
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_register_and_deregister() {
        let monitor = ResourceMonitor::new();
        monitor.register(1234, ResourceLimits { cpu_percent: 25, memory_mb: 512, max_open_fds: 64 });
        assert!(monitor.limits.lock().unwrap().contains_key(&1234));
        monitor.deregister(1234);
        assert!(!monitor.limits.lock().unwrap().contains_key(&1234));
    }

    #[test]
    fn test_memory_limit_bytes_calculation() {
        let limits = ResourceLimits { cpu_percent: 10, memory_mb: 512, max_open_fds: 64 };
        assert_eq!(limits.memory_mb * 1024 * 1024, 512 * 1024 * 1024);
    }

    #[tokio::test]
    async fn test_memory_exceeded_event_emitted() {
        let monitor = ResourceMonitor::new();
        monitor
            .event_tx
            .send(ResourceEvent::MemoryExceeded {
                pid: 42,
                memory_bytes: 600 * 1024 * 1024,
                limit_bytes: 512 * 1024 * 1024,
            })
            .unwrap();

        let event = monitor.event_rx.lock().unwrap().try_recv().unwrap();
        match event {
            ResourceEvent::MemoryExceeded { pid, memory_bytes, limit_bytes } => {
                assert_eq!(pid, 42);
                assert!(memory_bytes > limit_bytes);
            }
        }
    }
}