use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
use dashmap::DashMap;
use proc_connector::{ProcConnector, ProcEvent};
use crate::utils::uid_to_username;
#[derive(Clone, Debug)]
pub struct ProcInfo {
pub cmd: String,
pub user: String,
}
pub type ProcCache = Arc<DashMap<u32, ProcInfo>>;
pub fn start_proc_listener() -> (ProcCache, Arc<AtomicBool>) {
let cache: ProcCache = Arc::new(DashMap::new());
let cache_clone = cache.clone();
let ready = Arc::new(AtomicBool::new(false));
let ready_clone = ready.clone();
std::thread::Builder::new()
.name("proc-connector".into())
.spawn(move || {
if let Err(e) = run_listener(cache_clone, ready_clone) {
eprintln!("proc connector listener failed: {}", e);
}
})
.ok();
(cache, ready)
}
fn run_listener(cache: ProcCache, ready: Arc<AtomicBool>) -> anyhow::Result<()> {
let conn = ProcConnector::new()
.map_err(|e| anyhow::anyhow!("ProcConnector::new: {}", e))?;
ready.store(true, Ordering::Release);
let mut buf = vec![0u8; 4096];
loop {
match conn.recv_timeout(&mut buf, Duration::from_secs(1)) {
Ok(Some(ProcEvent::Exec { pid, .. })) => {
let cmd = std::fs::read_to_string(format!("/proc/{}/comm", pid))
.ok()
.map(|s| s.trim().to_string())
.unwrap_or_else(|| "unknown".to_string());
let user = read_proc_uid(pid).unwrap_or_else(|| "unknown".to_string());
cache.insert(pid, ProcInfo { cmd, user });
}
Ok(Some(_)) => {
}
Ok(None) => {
}
Err(proc_connector::Error::Interrupted) => {
continue;
}
Err(proc_connector::Error::Overrun) => {
eprintln!("[WARNING] proc connector overrun — some exec events may have been lost");
}
Err(e) => {
eprintln!("proc connector recv error: {}", e);
break;
}
}
}
Ok(())
}
fn read_proc_uid(pid: u32) -> Option<String> {
let status = std::fs::read_to_string(format!("/proc/{}/status", pid)).ok()?;
let uid: u32 = status
.lines()
.find(|l| l.starts_with("Uid:"))?
.split_whitespace()
.nth(1)?
.parse()
.ok()?;
uid_to_username(uid)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_proc_cache_insert_and_get() {
let cache: ProcCache = Arc::new(DashMap::new());
cache.insert(
12345,
ProcInfo {
cmd: "test_process".to_string(),
user: "testuser".to_string(),
},
);
let info = cache.get(&12345);
assert!(info.is_some());
let info = info.unwrap();
assert_eq!(info.cmd, "test_process");
assert_eq!(info.user, "testuser");
}
#[test]
fn test_proc_cache_missing_pid() {
let cache: ProcCache = Arc::new(DashMap::new());
assert!(cache.get(&99999).is_none());
}
#[test]
fn test_proc_cache_overwrite() {
let cache: ProcCache = Arc::new(DashMap::new());
cache.insert(
1,
ProcInfo {
cmd: "old".into(),
user: "a".into(),
},
);
cache.insert(
1,
ProcInfo {
cmd: "new".into(),
user: "b".into(),
},
);
let info = cache.get(&1).unwrap();
assert_eq!(info.cmd, "new");
assert_eq!(info.user, "b");
}
#[test]
fn test_proc_cache_concurrent_access() {
use std::thread;
let cache: ProcCache = Arc::new(DashMap::new());
let mut handles = vec![];
for i in 0..10 {
let cache_clone = cache.clone();
handles.push(thread::spawn(move || {
for j in 0..100 {
let pid = (i * 100 + j) as u32;
cache_clone.insert(
pid,
ProcInfo {
cmd: format!("proc_{}", pid),
user: "test".into(),
},
);
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(cache.len(), 1000);
}
#[test]
#[ignore]
fn test_proc_connector_create() {
let conn = ProcConnector::new();
assert!(conn.is_ok(), "Should be able to create ProcConnector with root");
}
#[test]
#[ignore]
fn test_proc_listener_receives_events() {
let (cache, _ready) = start_proc_listener();
std::thread::sleep(std::time::Duration::from_millis(100));
let mut child = std::process::Command::new("echo")
.arg("test")
.spawn()
.unwrap();
child.wait().unwrap();
std::thread::sleep(std::time::Duration::from_millis(200));
assert!(
!cache.is_empty(),
"Proc cache should have received some events"
);
}
}