1use crate::Result;
2#[cfg(unix)]
3use crate::settings::settings;
4use miette::IntoDiagnostic;
5use once_cell::sync::Lazy;
6use std::collections::HashMap;
7use std::sync::Mutex;
8use sysinfo::ProcessesToUpdate;
9
10pub struct Procs {
11 system: Mutex<sysinfo::System>,
12}
13
14pub static PROCS: Lazy<Procs> = Lazy::new(Procs::new);
15
16impl Default for Procs {
17 fn default() -> Self {
18 Self::new()
19 }
20}
21
22impl Procs {
23 pub fn new() -> Self {
24 let procs = Self {
25 system: Mutex::new(sysinfo::System::new()),
26 };
27 procs.refresh_processes();
28 procs
29 }
30
31 fn lock_system(&self) -> std::sync::MutexGuard<'_, sysinfo::System> {
32 self.system.lock().unwrap_or_else(|poisoned| {
33 warn!("System mutex was poisoned, recovering");
34 poisoned.into_inner()
35 })
36 }
37
38 pub fn title(&self, pid: u32) -> Option<String> {
39 self.lock_system()
40 .process(sysinfo::Pid::from_u32(pid))
41 .map(|p| p.name().to_string_lossy().to_string())
42 }
43
44 pub fn is_running(&self, pid: u32) -> bool {
45 self.lock_system()
46 .process(sysinfo::Pid::from_u32(pid))
47 .is_some()
48 }
49
50 #[allow(dead_code)]
53 pub fn all_children(&self, pid: u32) -> Vec<u32> {
54 let system = self.lock_system();
55 let all = system.processes();
56 let mut children = vec![];
57 for (child_pid, process) in all {
58 let mut process = process;
59 while let Some(parent) = process.parent() {
60 if parent == sysinfo::Pid::from_u32(pid) {
61 children.push(child_pid.as_u32());
62 break;
63 }
64 match system.process(parent) {
65 Some(p) => process = p,
66 None => break,
67 }
68 }
69 }
70 children
71 }
72
73 pub async fn kill_process_group_async(
74 &self,
75 pid: u32,
76 stop_signal: i32,
77 stop_timeout: Option<std::time::Duration>,
78 ) -> Result<bool> {
79 tokio::task::spawn_blocking(move || {
80 PROCS.kill_process_group(pid, stop_signal, stop_timeout)
81 })
82 .await
83 .into_diagnostic()?
84 }
85
86 #[cfg(unix)]
95 fn kill_process_group(
96 &self,
97 pid: u32,
98 stop_signal: i32,
99 stop_timeout: Option<std::time::Duration>,
100 ) -> Result<bool> {
101 let pgid = pid as i32;
102 let signal_name = signal_name(stop_signal);
103
104 debug!("killing process group {pgid} with {signal_name}");
105
106 let ret = unsafe { libc::killpg(pgid, stop_signal) };
111 if ret == -1 {
112 let err = std::io::Error::last_os_error();
113 if err.raw_os_error() == Some(libc::ESRCH) {
114 debug!("process group {pgid} no longer exists");
115 return Ok(false);
116 }
117 if err.raw_os_error() == Some(libc::EPERM) {
118 return Err(miette::miette!(
119 "failed to send {signal_name} to process group {pgid}: permission denied"
120 ));
121 }
122 warn!("failed to send {signal_name} to process group {pgid}: {err}");
123 }
124
125 let stop_timeout = stop_timeout.unwrap_or_else(|| settings().supervisor_stop_timeout());
128 let fast_ms = 10u64;
129 let slow_ms = 50u64;
130 let total_ms = stop_timeout.as_millis().max(1) as u64;
131 let fast_count = ((total_ms / fast_ms) as usize).min(10);
132 let fast_total_ms = fast_ms * fast_count as u64;
133 let remaining_ms = total_ms.saturating_sub(fast_total_ms);
134 let slow_count = (remaining_ms / slow_ms) as usize;
135
136 let fast_checks =
137 std::iter::repeat_n(std::time::Duration::from_millis(fast_ms), fast_count);
138 let slow_checks =
139 std::iter::repeat_n(std::time::Duration::from_millis(slow_ms), slow_count);
140 let mut elapsed_ms = 0u64;
141
142 for sleep_duration in fast_checks.chain(slow_checks) {
143 std::thread::sleep(sleep_duration);
144 self.refresh_pids(&[pid]);
145 elapsed_ms += sleep_duration.as_millis() as u64;
146 if self.is_terminated_or_zombie(sysinfo::Pid::from_u32(pid)) {
147 debug!("process group {pgid} terminated after {signal_name} ({elapsed_ms} ms)",);
148 return Ok(true);
149 }
150 }
151
152 warn!(
154 "process group {pgid} did not respond to {signal_name} after {}ms, sending SIGKILL",
155 stop_timeout.as_millis()
156 );
157 let ret = unsafe { libc::killpg(pgid, libc::SIGKILL) };
158 if ret == -1 {
159 let err = std::io::Error::last_os_error();
160 if err.raw_os_error() != Some(libc::ESRCH) {
161 warn!("failed to send SIGKILL to process group {pgid}: {err}");
162 }
163 }
164
165 std::thread::sleep(std::time::Duration::from_millis(100));
167 Ok(true)
168 }
169
170 #[cfg(not(unix))]
171 fn kill_process_group(
172 &self,
173 pid: u32,
174 _stop_signal: i32,
175 _stop_timeout: Option<std::time::Duration>,
176 ) -> Result<bool> {
177 self.kill(pid, 0, None)
178 }
179
180 pub async fn kill_async(
181 &self,
182 pid: u32,
183 stop_signal: i32,
184 stop_timeout: Option<std::time::Duration>,
185 ) -> Result<bool> {
186 tokio::task::spawn_blocking(move || PROCS.kill(pid, stop_signal, stop_timeout))
187 .await
188 .into_diagnostic()?
189 }
190
191 fn kill(
201 &self,
202 pid: u32,
203 stop_signal: i32,
204 stop_timeout: Option<std::time::Duration>,
205 ) -> Result<bool> {
206 let sysinfo_pid = sysinfo::Pid::from_u32(pid);
207
208 if self.is_terminated_or_zombie(sysinfo_pid) {
210 return Ok(false);
211 }
212
213 debug!("killing process {pid}");
214
215 #[cfg(windows)]
216 {
217 let _ = (stop_signal, stop_timeout);
218 if let Some(process) = self.lock_system().process(sysinfo_pid) {
219 process.kill();
220 process.wait();
221 }
222 Ok(true)
223 }
224
225 #[cfg(unix)]
226 {
227 let signal_name = signal_name(stop_signal);
228 debug!("sending {signal_name} to process {pid}");
232 let ret = unsafe { libc::kill(pid as i32, stop_signal) };
233 if ret == -1 {
234 let err = std::io::Error::last_os_error();
235 if err.raw_os_error() == Some(libc::ESRCH) {
236 debug!("process {pid} no longer exists");
237 return Ok(false);
238 }
239 if err.raw_os_error() == Some(libc::EPERM) {
240 return Err(miette::miette!(
241 "failed to send {signal_name} to process {pid}: permission denied"
242 ));
243 }
244 return Err(miette::miette!(
245 "failed to send {signal_name} to process {pid}: {err}"
246 ));
247 }
248
249 let stop_timeout = stop_timeout.unwrap_or_else(|| settings().supervisor_stop_timeout());
252 let fast_ms = 10u64;
253 let slow_ms = 50u64;
254 let total_ms = stop_timeout.as_millis().max(1) as u64;
255 let fast_count = ((total_ms / fast_ms) as usize).min(10);
256 let fast_total_ms = fast_ms * fast_count as u64;
257 let remaining_ms = total_ms.saturating_sub(fast_total_ms);
258 let slow_count = (remaining_ms / slow_ms) as usize;
259
260 for i in 0..fast_count {
261 std::thread::sleep(std::time::Duration::from_millis(fast_ms));
262 self.refresh_pids(&[pid]);
263 if self.is_terminated_or_zombie(sysinfo_pid) {
264 debug!(
265 "process {pid} terminated after {signal_name} ({} ms)",
266 (i + 1) * fast_ms as usize
267 );
268 return Ok(true);
269 }
270 }
271
272 for i in 0..slow_count {
274 std::thread::sleep(std::time::Duration::from_millis(slow_ms));
275 self.refresh_pids(&[pid]);
276 if self.is_terminated_or_zombie(sysinfo_pid) {
277 debug!(
278 "process {pid} terminated after {signal_name} ({} ms)",
279 fast_total_ms + (i + 1) as u64 * slow_ms
280 );
281 return Ok(true);
282 }
283 }
284
285 warn!(
287 "process {pid} did not respond to {signal_name} after {}ms, sending SIGKILL",
288 stop_timeout.as_millis()
289 );
290 let ret = unsafe { libc::kill(pid as i32, libc::SIGKILL) };
291 if ret == -1 {
292 let err = std::io::Error::last_os_error();
293 if err.raw_os_error() != Some(libc::ESRCH) {
294 warn!("failed to send SIGKILL to process {pid}: {err}");
295 }
296 }
297
298 std::thread::sleep(std::time::Duration::from_millis(100));
300 Ok(true)
301 }
302 }
303
304 fn is_terminated_or_zombie(&self, sysinfo_pid: sysinfo::Pid) -> bool {
308 let system = self.lock_system();
309 match system.process(sysinfo_pid) {
310 None => true,
311 Some(process) => {
312 #[cfg(unix)]
313 {
314 matches!(process.status(), sysinfo::ProcessStatus::Zombie)
315 }
316 #[cfg(not(unix))]
317 {
318 let _ = process;
319 false
320 }
321 }
322 }
323 }
324
325 pub(crate) fn refresh_processes(&self) {
326 self.lock_system()
327 .refresh_processes(ProcessesToUpdate::All, true);
328 }
329
330 pub(crate) fn refresh_pids(&self, pids: &[u32]) {
333 let sysinfo_pids: Vec<sysinfo::Pid> =
334 pids.iter().map(|p| sysinfo::Pid::from_u32(*p)).collect();
335 self.lock_system()
336 .refresh_processes(ProcessesToUpdate::Some(&sysinfo_pids), true);
337 }
338
339 pub fn get_batch_group_stats(&self, pids: &[u32]) -> Vec<(u32, Option<ProcessStats>)> {
345 if pids.is_empty() {
346 return Vec::new();
347 }
348
349 let system = self.lock_system();
350 let processes = system.processes();
351
352 let now = std::time::SystemTime::now()
353 .duration_since(std::time::UNIX_EPOCH)
354 .map(|d| d.as_secs())
355 .unwrap_or(0);
356
357 let mut children_map: std::collections::HashMap<sysinfo::Pid, Vec<sysinfo::Pid>> =
359 std::collections::HashMap::new();
360 for (child_pid, child) in processes {
361 if let Some(ppid) = child.parent() {
362 children_map.entry(ppid).or_default().push(*child_pid);
363 }
364 }
365
366 pids.iter()
367 .map(|&pid| {
368 let root_pid = sysinfo::Pid::from_u32(pid);
369 let Some(root) = processes.get(&root_pid) else {
370 return (pid, None);
371 };
372
373 let root_disk = root.disk_usage();
374 let mut stats = ProcessStats {
375 cpu_percent: root.cpu_usage(),
376 memory_bytes: root.memory(),
377 uptime_secs: now.saturating_sub(root.start_time()),
378 disk_read_bytes: root_disk.read_bytes,
379 disk_write_bytes: root_disk.written_bytes,
380 };
381
382 let mut queue = std::collections::VecDeque::new();
384 if let Some(direct_children) = children_map.get(&root_pid) {
385 queue.extend(direct_children);
386 }
387 while let Some(child_pid) = queue.pop_front() {
388 if let Some(child) = processes.get(&child_pid) {
389 let disk = child.disk_usage();
390 stats.cpu_percent += child.cpu_usage();
391 stats.memory_bytes += child.memory();
392 stats.disk_read_bytes += disk.read_bytes;
393 stats.disk_write_bytes += disk.written_bytes;
394 }
395 if let Some(grandchildren) = children_map.get(&child_pid) {
396 queue.extend(grandchildren);
397 }
398 }
399
400 (pid, Some(stats))
401 })
402 .collect()
403 }
404
405 pub fn get_batch_tree_stats_map(&self, pids: &[u32]) -> HashMap<u32, ProcessStats> {
407 self.get_batch_group_stats(pids)
408 .into_iter()
409 .filter_map(|(pid, stats)| stats.map(|stats| (pid, stats)))
410 .collect()
411 }
412
413 pub fn get_stats(&self, pid: u32) -> Option<ProcessStats> {
415 self.get_batch_group_stats(&[pid])
416 .into_iter()
417 .next()
418 .and_then(|(_, stats)| stats)
419 }
420
421 pub fn get_extended_stats(&self, pid: u32) -> Option<ExtendedProcessStats> {
423 let system = self.lock_system();
424 let processes = system.processes();
425 let root_pid = sysinfo::Pid::from_u32(pid);
426 let p = processes.get(&root_pid)?;
427
428 let now = std::time::SystemTime::now()
429 .duration_since(std::time::UNIX_EPOCH)
430 .map(|d| d.as_secs())
431 .unwrap_or(0);
432
433 let root_disk = p.disk_usage();
434 let mut aggregate_stats = ProcessStats {
435 cpu_percent: p.cpu_usage(),
436 memory_bytes: p.memory(),
437 uptime_secs: now.saturating_sub(p.start_time()),
438 disk_read_bytes: root_disk.read_bytes,
439 disk_write_bytes: root_disk.written_bytes,
440 };
441
442 let mut children_map: HashMap<sysinfo::Pid, Vec<sysinfo::Pid>> = HashMap::new();
443 for (child_pid, child) in processes {
444 if let Some(ppid) = child.parent() {
445 children_map.entry(ppid).or_default().push(*child_pid);
446 }
447 }
448
449 let mut queue = std::collections::VecDeque::new();
450 if let Some(direct_children) = children_map.get(&root_pid) {
451 queue.extend(direct_children);
452 }
453 while let Some(child_pid) = queue.pop_front() {
454 if let Some(child) = processes.get(&child_pid) {
455 let disk = child.disk_usage();
456 aggregate_stats.cpu_percent += child.cpu_usage();
457 aggregate_stats.memory_bytes += child.memory();
458 aggregate_stats.disk_read_bytes += disk.read_bytes;
459 aggregate_stats.disk_write_bytes += disk.written_bytes;
460 }
461 if let Some(grandchildren) = children_map.get(&child_pid) {
462 queue.extend(grandchildren);
463 }
464 }
465
466 Some(ExtendedProcessStats {
467 name: p.name().to_string_lossy().to_string(),
468 exe_path: p.exe().map(|e| e.to_string_lossy().to_string()),
469 cwd: p.cwd().map(|c| c.to_string_lossy().to_string()),
470 environ: p
471 .environ()
472 .iter()
473 .take(20)
474 .map(|s| s.to_string_lossy().to_string())
475 .collect(),
476 status: format!("{:?}", p.status()),
477 cpu_percent: aggregate_stats.cpu_percent,
478 memory_bytes: aggregate_stats.memory_bytes,
479 virtual_memory_bytes: p.virtual_memory(),
480 uptime_secs: aggregate_stats.uptime_secs,
481 start_time: p.start_time(),
482 disk_read_bytes: aggregate_stats.disk_read_bytes,
483 disk_write_bytes: aggregate_stats.disk_write_bytes,
484 parent_pid: p.parent().map(|pp| pp.as_u32()),
485 thread_count: p.tasks().map(|t| t.len()).unwrap_or(0),
486 user_id: p.user_id().map(|u| u.to_string()),
487 })
488 }
489}
490
491#[derive(Debug, Clone, Copy)]
492pub struct ProcessStats {
493 pub cpu_percent: f32,
494 pub memory_bytes: u64,
495 pub uptime_secs: u64,
496 pub disk_read_bytes: u64,
497 pub disk_write_bytes: u64,
498}
499
500impl ProcessStats {
501 pub fn memory_display(&self) -> String {
502 format_bytes(self.memory_bytes)
503 }
504
505 pub fn cpu_display(&self) -> String {
506 format!("{:.1}%", self.cpu_percent)
507 }
508
509 pub fn uptime_display(&self) -> String {
510 format_duration(self.uptime_secs)
511 }
512
513 pub fn disk_read_display(&self) -> String {
514 format_bytes_per_sec(self.disk_read_bytes)
515 }
516
517 pub fn disk_write_display(&self) -> String {
518 format_bytes_per_sec(self.disk_write_bytes)
519 }
520}
521
522#[derive(Debug, Clone)]
524pub struct ExtendedProcessStats {
525 pub name: String,
526 pub exe_path: Option<String>,
527 pub cwd: Option<String>,
528 pub environ: Vec<String>,
529 pub status: String,
530 pub cpu_percent: f32,
531 pub memory_bytes: u64,
532 pub virtual_memory_bytes: u64,
533 pub uptime_secs: u64,
534 pub start_time: u64,
535 pub disk_read_bytes: u64,
536 pub disk_write_bytes: u64,
537 pub parent_pid: Option<u32>,
538 pub thread_count: usize,
539 pub user_id: Option<String>,
540}
541
542impl ExtendedProcessStats {
543 pub fn memory_display(&self) -> String {
544 format_bytes(self.memory_bytes)
545 }
546
547 pub fn virtual_memory_display(&self) -> String {
548 format_bytes(self.virtual_memory_bytes)
549 }
550
551 pub fn cpu_display(&self) -> String {
552 format!("{:.1}%", self.cpu_percent)
553 }
554
555 pub fn uptime_display(&self) -> String {
556 format_duration(self.uptime_secs)
557 }
558
559 pub fn start_time_display(&self) -> String {
560 use std::time::{Duration, UNIX_EPOCH};
561 let datetime = UNIX_EPOCH + Duration::from_secs(self.start_time);
562 chrono::DateTime::<chrono::Local>::from(datetime)
563 .format("%Y-%m-%d %H:%M:%S")
564 .to_string()
565 }
566
567 pub fn disk_read_display(&self) -> String {
568 format_bytes_per_sec(self.disk_read_bytes)
569 }
570
571 pub fn disk_write_display(&self) -> String {
572 format_bytes_per_sec(self.disk_write_bytes)
573 }
574}
575
576fn format_bytes(bytes: u64) -> String {
577 if bytes < 1024 {
578 format!("{bytes}B")
579 } else if bytes < 1024 * 1024 {
580 format!("{:.1}KB", bytes as f64 / 1024.0)
581 } else if bytes < 1024 * 1024 * 1024 {
582 format!("{:.1}MB", bytes as f64 / (1024.0 * 1024.0))
583 } else {
584 format!("{:.1}GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
585 }
586}
587
588fn format_duration(secs: u64) -> String {
589 if secs < 60 {
590 format!("{secs}s")
591 } else if secs < 3600 {
592 format!("{}m {}s", secs / 60, secs % 60)
593 } else if secs < 86400 {
594 let hours = secs / 3600;
595 let mins = (secs % 3600) / 60;
596 format!("{hours}h {mins}m")
597 } else {
598 let days = secs / 86400;
599 let hours = (secs % 86400) / 3600;
600 format!("{days}d {hours}h")
601 }
602}
603
604fn format_bytes_per_sec(bytes: u64) -> String {
605 if bytes < 1024 {
606 format!("{bytes}B/s")
607 } else if bytes < 1024 * 1024 {
608 format!("{:.1}KB/s", bytes as f64 / 1024.0)
609 } else if bytes < 1024 * 1024 * 1024 {
610 format!("{:.1}MB/s", bytes as f64 / (1024.0 * 1024.0))
611 } else {
612 format!("{:.1}GB/s", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
613 }
614}
615
616#[cfg(unix)]
617fn signal_name(sig: i32) -> &'static str {
618 match sig {
619 libc::SIGHUP => "SIGHUP",
620 libc::SIGINT => "SIGINT",
621 libc::SIGQUIT => "SIGQUIT",
622 libc::SIGTERM => "SIGTERM",
623 libc::SIGUSR1 => "SIGUSR1",
624 libc::SIGUSR2 => "SIGUSR2",
625 libc::SIGKILL => "SIGKILL",
626 _ => "UNKNOWN",
627 }
628}
629
630#[cfg(all(test, unix))]
631mod tests {
632 use super::*;
633 use std::os::unix::process::CommandExt;
634 use std::process::{Child, Command, Stdio};
635 use std::time::{Duration, Instant};
636
637 struct ChildGuard(Child);
638
639 impl Drop for ChildGuard {
640 fn drop(&mut self) {
641 let pid = self.0.id() as i32;
642 let _ = unsafe { libc::killpg(pid, libc::SIGKILL) };
644 let _ = self.0.wait();
645 }
646 }
647
648 #[test]
649 fn get_stats_includes_descendant_rss() {
650 let mut command = Command::new("sh");
651 command
652 .args(["-c", "sleep 30 & wait"])
653 .stdin(Stdio::null())
654 .stdout(Stdio::null())
655 .stderr(Stdio::null());
656 unsafe {
657 command.pre_exec(|| {
658 if libc::setsid() == -1 {
659 return Err(std::io::Error::last_os_error());
660 }
661 Ok(())
662 });
663 }
664
665 let parent = command.spawn().expect("failed to spawn process tree");
666 let parent_pid = parent.id();
667 let _parent = ChildGuard(parent);
668
669 let procs = Procs::new();
670 let deadline = Instant::now() + Duration::from_secs(5);
671 let mut child_pids = Vec::new();
672 while Instant::now() < deadline {
673 procs.refresh_processes();
674 child_pids = procs.all_children(parent_pid);
675 if !child_pids.is_empty() {
676 break;
677 }
678 std::thread::sleep(Duration::from_millis(50));
679 }
680 assert!(
681 !child_pids.is_empty(),
682 "test process tree did not appear under parent pid {parent_pid}"
683 );
684
685 procs.refresh_processes();
686 child_pids = procs.all_children(parent_pid);
687 assert!(
688 !child_pids.is_empty(),
689 "test process tree disappeared under parent pid {parent_pid}"
690 );
691 let root_pid = sysinfo::Pid::from_u32(parent_pid);
692 let direct_memory = {
693 let system = procs.lock_system();
694 system
695 .process(root_pid)
696 .expect("parent process should exist")
697 .memory()
698 };
699 let descendant_memory = {
700 let system = procs.lock_system();
701 child_pids
702 .iter()
703 .filter_map(|pid| system.process(sysinfo::Pid::from_u32(*pid)))
704 .map(|process| process.memory())
705 .sum::<u64>()
706 };
707 assert!(
708 descendant_memory > 0,
709 "descendants {child_pids:?} should have nonzero RSS"
710 );
711
712 let stats = procs
713 .get_stats(parent_pid)
714 .expect("parent process should have aggregate stats");
715
716 assert_eq!(
717 stats.memory_bytes,
718 direct_memory + descendant_memory,
719 "get_stats should include descendant RSS for parent pid {parent_pid}; \
720 descendants: {child_pids:?}, direct RSS: {direct_memory}, \
721 descendant RSS: {descendant_memory}, reported RSS: {}",
722 stats.memory_bytes
723 );
724 }
725}