use std::collections::HashMap;
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::sync::Mutex;
use tracing::{debug, info, warn};
static GLOBAL_REGISTRY: OnceLock<Arc<ProcessRegistry>> = OnceLock::new();
#[must_use]
pub fn global_registry() -> Arc<ProcessRegistry> {
GLOBAL_REGISTRY
.get_or_init(|| Arc::new(ProcessRegistry::new()))
.clone()
}
pub struct ProcessRegistry {
pids: Mutex<HashMap<u32, String>>,
}
impl ProcessRegistry {
#[must_use]
pub fn new() -> Self {
Self {
pids: Mutex::new(HashMap::new()),
}
}
pub async fn register(&self, pid: u32, task_name: String) {
let mut pids = self.pids.lock().await;
debug!(pid, task = %task_name, "Registering process");
pids.insert(pid, task_name);
}
pub async fn unregister(&self, pid: u32) {
let mut pids = self.pids.lock().await;
if let Some(task_name) = pids.remove(&pid) {
debug!(pid, task = %task_name, "Unregistering process");
}
}
pub async fn count(&self) -> usize {
self.pids.lock().await.len()
}
pub async fn terminate_all(&self, timeout: Duration) {
let mut pids = self.pids.lock().await;
if pids.is_empty() {
return;
}
info!(count = pids.len(), "Terminating child processes");
for (pid, task_name) in pids.iter() {
debug!(pid, task = %task_name, "Sending SIGTERM");
Self::send_term_signal(*pid);
}
let deadline = std::time::Instant::now() + timeout;
while !pids.is_empty() && std::time::Instant::now() < deadline {
let mut exited = Vec::new();
for (pid, _) in pids.iter() {
if !Self::is_process_alive(*pid) {
exited.push(*pid);
}
}
for pid in exited {
if let Some(task_name) = pids.remove(&pid) {
debug!(pid, task = %task_name, "Process exited gracefully");
}
}
if !pids.is_empty() {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
for (pid, task_name) in pids.drain() {
warn!(pid, task = %task_name, "Force killing process after timeout");
Self::send_kill_signal(pid);
}
}
#[cfg(unix)]
fn send_term_signal(pid: u32) {
#[expect(unsafe_code, reason = "Required for POSIX signal handling")]
unsafe {
libc::kill(-(pid as i32), libc::SIGTERM);
}
}
#[cfg(unix)]
fn send_kill_signal(pid: u32) {
#[expect(unsafe_code, reason = "Required for POSIX signal handling")]
unsafe {
libc::kill(-(pid as i32), libc::SIGKILL);
}
}
#[cfg(unix)]
fn is_process_alive(pid: u32) -> bool {
#[expect(unsafe_code, reason = "Required for POSIX process existence check")]
unsafe {
libc::kill(pid as i32, 0) == 0
}
}
#[cfg(windows)]
fn send_term_signal(pid: u32) {
use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, Signal, System};
let mut system = System::new();
let process_pid = Pid::from(pid as usize);
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[process_pid]),
false,
ProcessRefreshKind::nothing(),
);
if let Some(process) = system.process(process_pid) {
let _ = process.kill_with(Signal::Term);
}
}
#[cfg(windows)]
fn send_kill_signal(pid: u32) {
use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, Signal, System};
let mut system = System::new();
let process_pid = Pid::from(pid as usize);
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[process_pid]),
false,
ProcessRefreshKind::nothing(),
);
if let Some(process) = system.process(process_pid) {
let _ = process.kill_with(Signal::Kill);
}
}
#[cfg(windows)]
fn is_process_alive(pid: u32) -> bool {
use sysinfo::{Pid, ProcessRefreshKind, ProcessesToUpdate, System};
let mut system = System::new();
let process_pid = Pid::from(pid as usize);
system.refresh_processes_specifics(
ProcessesToUpdate::Some(&[process_pid]),
false,
ProcessRefreshKind::nothing(),
);
system.process(process_pid).is_some()
}
}
impl Default for ProcessRegistry {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_registry_new() {
let registry = ProcessRegistry::new();
assert_eq!(registry.count().await, 0);
}
#[tokio::test]
async fn test_register_unregister() {
let registry = ProcessRegistry::new();
registry.register(1234, "test_task".to_string()).await;
assert_eq!(registry.count().await, 1);
registry.unregister(1234).await;
assert_eq!(registry.count().await, 0);
}
#[tokio::test]
async fn test_unregister_nonexistent() {
let registry = ProcessRegistry::new();
registry.unregister(9999).await;
assert_eq!(registry.count().await, 0);
}
#[tokio::test]
async fn test_terminate_empty() {
let registry = ProcessRegistry::new();
registry.terminate_all(Duration::from_secs(1)).await;
assert_eq!(registry.count().await, 0);
}
#[tokio::test]
async fn test_global_registry_singleton() {
let r1 = global_registry();
let r2 = global_registry();
assert!(Arc::ptr_eq(&r1, &r2));
}
}