use std::sync::Arc;
use dashmap::DashMap;
use proc_connector::{NetlinkMessageIter, ProcConnector, ProcEvent};
use crate::utils::uid_to_username;
#[derive(Clone, Debug)]
pub struct ProcInfo {
pub cmd: String,
pub user: String,
}
pub type ProcCache = Arc<DashMap<u32, ProcInfo>>;
pub fn new_cache() -> ProcCache {
Arc::new(DashMap::new())
}
pub fn try_create_connector() -> Option<ProcConnector> {
let conn = match ProcConnector::new() {
Ok(c) => c,
Err(e) => {
eprintln!("[WARNING] Failed to create proc connector: {e}. \
Process name attribution will be unavailable.");
return None;
}
};
if let Err(e) = conn.set_nonblocking() {
eprintln!("[WARNING] Failed to set proc connector non-blocking: {e}");
return None;
}
Some(conn)
}
pub fn handle_proc_events(cache: &ProcCache, data: &[u8], n: usize) -> bool {
let mut processed = false;
for msg in NetlinkMessageIter::new(data, n) {
match msg {
Ok(Some(ProcEvent::Exec { pid, .. })) => {
let cmd = std::fs::read_to_string(format!("/proc/{}/comm", pid))
.ok()
.map(|s| s.trim().to_string())
.unwrap_or_else(|| "unknown".to_string());
let user = read_proc_uid(pid).unwrap_or_else(|| "unknown".to_string());
cache.insert(pid, ProcInfo { cmd, user });
processed = true;
}
Ok(Some(_)) => {
}
Ok(None) => {
}
Err(proc_connector::Error::Overrun) => {
eprintln!("[WARNING] proc connector overrun — some exec events may have been lost");
}
Err(proc_connector::Error::Truncated) => {
eprintln!("[WARNING] proc connector truncated message, continuing...");
}
Err(e) => {
eprintln!("proc connector parse error: {e}");
}
}
}
processed
}
fn read_proc_uid(pid: u32) -> Option<String> {
let status = std::fs::read_to_string(format!("/proc/{}/status", pid)).ok()?;
let uid: u32 = status
.lines()
.find(|l| l.starts_with("Uid:"))?
.split_whitespace()
.nth(1)?
.parse()
.ok()?;
uid_to_username(uid)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_proc_cache_insert_and_get() {
let cache: ProcCache = Arc::new(DashMap::new());
cache.insert(
12345,
ProcInfo {
cmd: "test_process".to_string(),
user: "testuser".to_string(),
},
);
let info = cache.get(&12345);
assert!(info.is_some());
let info = info.unwrap();
assert_eq!(info.cmd, "test_process");
assert_eq!(info.user, "testuser");
}
#[test]
fn test_proc_cache_missing_pid() {
let cache: ProcCache = Arc::new(DashMap::new());
assert!(cache.get(&99999).is_none());
}
#[test]
fn test_proc_cache_overwrite() {
let cache: ProcCache = Arc::new(DashMap::new());
cache.insert(
1,
ProcInfo {
cmd: "old".into(),
user: "a".into(),
},
);
cache.insert(
1,
ProcInfo {
cmd: "new".into(),
user: "b".into(),
},
);
let info = cache.get(&1).unwrap();
assert_eq!(info.cmd, "new");
assert_eq!(info.user, "b");
}
#[test]
fn test_proc_cache_concurrent_access() {
use std::thread;
let cache: ProcCache = Arc::new(DashMap::new());
let mut handles = vec![];
for i in 0..10 {
let cache_clone = cache.clone();
handles.push(thread::spawn(move || {
for j in 0..100 {
let pid = (i * 100 + j) as u32;
cache_clone.insert(
pid,
ProcInfo {
cmd: format!("proc_{}", pid),
user: "test".into(),
},
);
}
}));
}
for h in handles {
h.join().unwrap();
}
assert_eq!(cache.len(), 1000);
}
#[test]
fn test_handle_proc_events_empty() {
let cache: ProcCache = Arc::new(DashMap::new());
let result = handle_proc_events(&cache, &[], 0);
assert!(!result);
}
#[test]
fn test_handle_proc_events_non_exec_ignored() {
let cache: ProcCache = Arc::new(DashMap::new());
cache.insert(
42,
ProcInfo {
cmd: "test".into(),
user: "root".into(),
},
);
assert_eq!(cache.len(), 1);
}
#[test]
#[ignore]
fn test_proc_connector_create() {
let conn = ProcConnector::new();
assert!(conn.is_ok(), "Should be able to create ProcConnector with root");
}
#[test]
#[ignore]
fn test_proc_connector_receives_events_async() {
let conn = ProcConnector::new().expect("create connector");
conn.set_nonblocking().expect("set non-blocking");
let mut child = std::process::Command::new("echo")
.arg("test")
.spawn()
.unwrap();
child.wait().unwrap();
std::thread::sleep(std::time::Duration::from_millis(200));
let mut buf = vec![0u8; 65536];
let cache = new_cache();
loop {
match conn.recv_raw(&mut buf) {
Ok(n) => {
handle_proc_events(&cache, &buf, n);
}
Err(proc_connector::Error::WouldBlock) => break,
Err(proc_connector::Error::Interrupted) => continue,
Err(e) => {
panic!("recv error: {e}");
}
}
}
assert!(!cache.is_empty(), "Should have cached at least one exec event");
}
}