use crate::Result;
use crate::settings::settings;
use miette::IntoDiagnostic;
use once_cell::sync::Lazy;
use std::sync::Mutex;
use sysinfo::ProcessesToUpdate;
pub struct Procs {
system: Mutex<sysinfo::System>,
}
pub static PROCS: Lazy<Procs> = Lazy::new(Procs::new);
impl Default for Procs {
fn default() -> Self {
Self::new()
}
}
impl Procs {
pub fn new() -> Self {
let procs = Self {
system: Mutex::new(sysinfo::System::new()),
};
procs.refresh_processes();
procs
}
fn lock_system(&self) -> std::sync::MutexGuard<'_, sysinfo::System> {
self.system.lock().unwrap_or_else(|poisoned| {
warn!("System mutex was poisoned, recovering");
poisoned.into_inner()
})
}
pub fn title(&self, pid: u32) -> Option<String> {
self.lock_system()
.process(sysinfo::Pid::from_u32(pid))
.map(|p| p.name().to_string_lossy().to_string())
}
pub fn is_running(&self, pid: u32) -> bool {
self.lock_system()
.process(sysinfo::Pid::from_u32(pid))
.is_some()
}
#[allow(dead_code)]
pub fn all_children(&self, pid: u32) -> Vec<u32> {
let system = self.lock_system();
let all = system.processes();
let mut children = vec![];
for (child_pid, process) in all {
let mut process = process;
while let Some(parent) = process.parent() {
if parent == sysinfo::Pid::from_u32(pid) {
children.push(child_pid.as_u32());
break;
}
match system.process(parent) {
Some(p) => process = p,
None => break,
}
}
}
children
}
pub async fn kill_process_group_async(&self, pid: u32) -> Result<bool> {
tokio::task::spawn_blocking(move || PROCS.kill_process_group(pid))
.await
.into_diagnostic()?
}
#[cfg(unix)]
fn kill_process_group(&self, pid: u32) -> Result<bool> {
let pgid = pid as i32;
debug!("killing process group {pgid}");
let ret = unsafe { libc::killpg(pgid, libc::SIGTERM) };
if ret == -1 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ESRCH) {
debug!("process group {pgid} no longer exists");
return Ok(false);
}
if err.raw_os_error() == Some(libc::EPERM) {
return Err(miette::miette!(
"failed to send SIGTERM to process group {pgid}: permission denied"
));
}
warn!("failed to send SIGTERM to process group {pgid}: {err}");
}
let stop_timeout = settings().supervisor_stop_timeout();
let fast_ms = 10u64;
let slow_ms = 50u64;
let total_ms = stop_timeout.as_millis().max(1) as u64;
let fast_count = ((total_ms / fast_ms) as usize).min(10);
let fast_total_ms = fast_ms * fast_count as u64;
let remaining_ms = total_ms.saturating_sub(fast_total_ms);
let slow_count = (remaining_ms / slow_ms) as usize;
let fast_checks =
std::iter::repeat_n(std::time::Duration::from_millis(fast_ms), fast_count);
let slow_checks =
std::iter::repeat_n(std::time::Duration::from_millis(slow_ms), slow_count);
let mut elapsed_ms = 0u64;
for sleep_duration in fast_checks.chain(slow_checks) {
std::thread::sleep(sleep_duration);
self.refresh_pids(&[pid]);
elapsed_ms += sleep_duration.as_millis() as u64;
if self.is_terminated_or_zombie(sysinfo::Pid::from_u32(pid)) {
debug!("process group {pgid} terminated after SIGTERM ({elapsed_ms} ms)",);
return Ok(true);
}
}
warn!(
"process group {pgid} did not respond to SIGTERM after {}ms, sending SIGKILL",
stop_timeout.as_millis()
);
let ret = unsafe { libc::killpg(pgid, libc::SIGKILL) };
if ret == -1 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() != Some(libc::ESRCH) {
warn!("failed to send SIGKILL to process group {pgid}: {err}");
}
}
std::thread::sleep(std::time::Duration::from_millis(100));
Ok(true)
}
#[cfg(not(unix))]
fn kill_process_group(&self, pid: u32) -> Result<bool> {
self.kill(pid)
}
pub async fn kill_async(&self, pid: u32) -> Result<bool> {
tokio::task::spawn_blocking(move || PROCS.kill(pid))
.await
.into_diagnostic()?
}
fn kill(&self, pid: u32) -> Result<bool> {
let sysinfo_pid = sysinfo::Pid::from_u32(pid);
if self.is_terminated_or_zombie(sysinfo_pid) {
return Ok(false);
}
debug!("killing process {pid}");
#[cfg(windows)]
{
if let Some(process) = self.lock_system().process(sysinfo_pid) {
process.kill();
process.wait();
}
return Ok(true);
}
#[cfg(unix)]
{
debug!("sending SIGTERM to process {pid}");
let ret = unsafe { libc::kill(pid as i32, libc::SIGTERM) };
if ret == -1 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() == Some(libc::ESRCH) {
debug!("process {pid} no longer exists");
return Ok(false);
}
if err.raw_os_error() == Some(libc::EPERM) {
return Err(miette::miette!(
"failed to send SIGTERM to process {pid}: permission denied"
));
}
return Err(miette::miette!(
"failed to send SIGTERM to process {pid}: {err}"
));
}
let stop_timeout = settings().supervisor_stop_timeout();
let fast_ms = 10u64;
let slow_ms = 50u64;
let total_ms = stop_timeout.as_millis().max(1) as u64;
let fast_count = ((total_ms / fast_ms) as usize).min(10);
let fast_total_ms = fast_ms * fast_count as u64;
let remaining_ms = total_ms.saturating_sub(fast_total_ms);
let slow_count = (remaining_ms / slow_ms) as usize;
for i in 0..fast_count {
std::thread::sleep(std::time::Duration::from_millis(fast_ms));
self.refresh_pids(&[pid]);
if self.is_terminated_or_zombie(sysinfo_pid) {
debug!(
"process {pid} terminated after SIGTERM ({} ms)",
(i + 1) * fast_ms as usize
);
return Ok(true);
}
}
for i in 0..slow_count {
std::thread::sleep(std::time::Duration::from_millis(slow_ms));
self.refresh_pids(&[pid]);
if self.is_terminated_or_zombie(sysinfo_pid) {
debug!(
"process {pid} terminated after SIGTERM ({} ms)",
fast_total_ms + (i + 1) as u64 * slow_ms
);
return Ok(true);
}
}
warn!(
"process {pid} did not respond to SIGTERM after {}ms, sending SIGKILL",
stop_timeout.as_millis()
);
let ret = unsafe { libc::kill(pid as i32, libc::SIGKILL) };
if ret == -1 {
let err = std::io::Error::last_os_error();
if err.raw_os_error() != Some(libc::ESRCH) {
warn!("failed to send SIGKILL to process {pid}: {err}");
}
}
std::thread::sleep(std::time::Duration::from_millis(100));
Ok(true)
}
}
fn is_terminated_or_zombie(&self, sysinfo_pid: sysinfo::Pid) -> bool {
let system = self.lock_system();
match system.process(sysinfo_pid) {
None => true,
Some(process) => {
#[cfg(unix)]
{
matches!(process.status(), sysinfo::ProcessStatus::Zombie)
}
#[cfg(not(unix))]
{
let _ = process;
false
}
}
}
}
pub(crate) fn refresh_processes(&self) {
self.lock_system()
.refresh_processes(ProcessesToUpdate::All, true);
}
pub(crate) fn refresh_pids(&self, pids: &[u32]) {
let sysinfo_pids: Vec<sysinfo::Pid> =
pids.iter().map(|p| sysinfo::Pid::from_u32(*p)).collect();
self.lock_system()
.refresh_processes(ProcessesToUpdate::Some(&sysinfo_pids), true);
}
pub fn get_batch_group_stats(&self, pids: &[u32]) -> Vec<(u32, Option<ProcessStats>)> {
let system = self.lock_system();
let processes = system.processes();
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let mut children_map: std::collections::HashMap<sysinfo::Pid, Vec<sysinfo::Pid>> =
std::collections::HashMap::new();
for (child_pid, child) in processes {
if let Some(ppid) = child.parent() {
children_map.entry(ppid).or_default().push(*child_pid);
}
}
pids.iter()
.map(|&pid| {
let root_pid = sysinfo::Pid::from_u32(pid);
let Some(root) = processes.get(&root_pid) else {
return (pid, None);
};
let root_disk = root.disk_usage();
let mut stats = ProcessStats {
cpu_percent: root.cpu_usage(),
memory_bytes: root.memory(),
uptime_secs: now.saturating_sub(root.start_time()),
disk_read_bytes: root_disk.read_bytes,
disk_write_bytes: root_disk.written_bytes,
};
let mut queue = std::collections::VecDeque::new();
if let Some(direct_children) = children_map.get(&root_pid) {
queue.extend(direct_children);
}
while let Some(child_pid) = queue.pop_front() {
if let Some(child) = processes.get(&child_pid) {
let disk = child.disk_usage();
stats.cpu_percent += child.cpu_usage();
stats.memory_bytes += child.memory();
stats.disk_read_bytes += disk.read_bytes;
stats.disk_write_bytes += disk.written_bytes;
}
if let Some(grandchildren) = children_map.get(&child_pid) {
queue.extend(grandchildren);
}
}
(pid, Some(stats))
})
.collect()
}
pub fn get_stats(&self, pid: u32) -> Option<ProcessStats> {
let system = self.lock_system();
system.process(sysinfo::Pid::from_u32(pid)).map(|p| {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let disk = p.disk_usage();
ProcessStats {
cpu_percent: p.cpu_usage(),
memory_bytes: p.memory(),
uptime_secs: now.saturating_sub(p.start_time()),
disk_read_bytes: disk.read_bytes,
disk_write_bytes: disk.written_bytes,
}
})
}
pub fn get_extended_stats(&self, pid: u32) -> Option<ExtendedProcessStats> {
let system = self.lock_system();
system.process(sysinfo::Pid::from_u32(pid)).map(|p| {
let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let disk = p.disk_usage();
ExtendedProcessStats {
name: p.name().to_string_lossy().to_string(),
exe_path: p.exe().map(|e| e.to_string_lossy().to_string()),
cwd: p.cwd().map(|c| c.to_string_lossy().to_string()),
environ: p
.environ()
.iter()
.take(20)
.map(|s| s.to_string_lossy().to_string())
.collect(),
status: format!("{:?}", p.status()),
cpu_percent: p.cpu_usage(),
memory_bytes: p.memory(),
virtual_memory_bytes: p.virtual_memory(),
uptime_secs: now.saturating_sub(p.start_time()),
start_time: p.start_time(),
disk_read_bytes: disk.read_bytes,
disk_write_bytes: disk.written_bytes,
parent_pid: p.parent().map(|pp| pp.as_u32()),
thread_count: p.tasks().map(|t| t.len()).unwrap_or(0),
user_id: p.user_id().map(|u| u.to_string()),
}
})
}
}
#[derive(Debug, Clone, Copy)]
pub struct ProcessStats {
pub cpu_percent: f32,
pub memory_bytes: u64,
pub uptime_secs: u64,
pub disk_read_bytes: u64,
pub disk_write_bytes: u64,
}
impl ProcessStats {
pub fn memory_display(&self) -> String {
format_bytes(self.memory_bytes)
}
pub fn cpu_display(&self) -> String {
format!("{:.1}%", self.cpu_percent)
}
pub fn uptime_display(&self) -> String {
format_duration(self.uptime_secs)
}
pub fn disk_read_display(&self) -> String {
format_bytes_per_sec(self.disk_read_bytes)
}
pub fn disk_write_display(&self) -> String {
format_bytes_per_sec(self.disk_write_bytes)
}
}
#[derive(Debug, Clone)]
pub struct ExtendedProcessStats {
pub name: String,
pub exe_path: Option<String>,
pub cwd: Option<String>,
pub environ: Vec<String>,
pub status: String,
pub cpu_percent: f32,
pub memory_bytes: u64,
pub virtual_memory_bytes: u64,
pub uptime_secs: u64,
pub start_time: u64,
pub disk_read_bytes: u64,
pub disk_write_bytes: u64,
pub parent_pid: Option<u32>,
pub thread_count: usize,
pub user_id: Option<String>,
}
impl ExtendedProcessStats {
pub fn memory_display(&self) -> String {
format_bytes(self.memory_bytes)
}
pub fn virtual_memory_display(&self) -> String {
format_bytes(self.virtual_memory_bytes)
}
pub fn cpu_display(&self) -> String {
format!("{:.1}%", self.cpu_percent)
}
pub fn uptime_display(&self) -> String {
format_duration(self.uptime_secs)
}
pub fn start_time_display(&self) -> String {
use std::time::{Duration, UNIX_EPOCH};
let datetime = UNIX_EPOCH + Duration::from_secs(self.start_time);
chrono::DateTime::<chrono::Local>::from(datetime)
.format("%Y-%m-%d %H:%M:%S")
.to_string()
}
pub fn disk_read_display(&self) -> String {
format_bytes_per_sec(self.disk_read_bytes)
}
pub fn disk_write_display(&self) -> String {
format_bytes_per_sec(self.disk_write_bytes)
}
}
fn format_bytes(bytes: u64) -> String {
if bytes < 1024 {
format!("{bytes}B")
} else if bytes < 1024 * 1024 {
format!("{:.1}KB", bytes as f64 / 1024.0)
} else if bytes < 1024 * 1024 * 1024 {
format!("{:.1}MB", bytes as f64 / (1024.0 * 1024.0))
} else {
format!("{:.1}GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
}
}
fn format_duration(secs: u64) -> String {
if secs < 60 {
format!("{secs}s")
} else if secs < 3600 {
format!("{}m {}s", secs / 60, secs % 60)
} else if secs < 86400 {
let hours = secs / 3600;
let mins = (secs % 3600) / 60;
format!("{hours}h {mins}m")
} else {
let days = secs / 86400;
let hours = (secs % 86400) / 3600;
format!("{days}d {hours}h")
}
}
fn format_bytes_per_sec(bytes: u64) -> String {
if bytes < 1024 {
format!("{bytes}B/s")
} else if bytes < 1024 * 1024 {
format!("{:.1}KB/s", bytes as f64 / 1024.0)
} else if bytes < 1024 * 1024 * 1024 {
format!("{:.1}MB/s", bytes as f64 / (1024.0 * 1024.0))
} else {
format!("{:.1}GB/s", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
}
}