1use crate::Result;
2#[cfg(unix)]
3use crate::settings::settings;
4use miette::IntoDiagnostic;
5use once_cell::sync::Lazy;
6use std::collections::HashMap;
7use std::sync::Mutex;
8use sysinfo::ProcessesToUpdate;
9
10type ParentToChildren = HashMap<u32, Vec<u32>>;
12
13type ProcessNames = HashMap<u32, (String, Option<String>)>;
15
16pub struct Procs {
17 system: Mutex<sysinfo::System>,
18}
19
20pub static PROCS: Lazy<Procs> = Lazy::new(Procs::new);
21
22impl Default for Procs {
23 fn default() -> Self {
24 Self::new()
25 }
26}
27
28impl Procs {
29 pub fn new() -> Self {
30 let procs = Self {
31 system: Mutex::new(sysinfo::System::new_all()),
32 };
33 procs.refresh_processes();
34 procs
35 }
36
37 fn lock_system(&self) -> std::sync::MutexGuard<'_, sysinfo::System> {
38 self.system.lock().unwrap_or_else(|poisoned| {
39 warn!("System mutex was poisoned, recovering");
40 poisoned.into_inner()
41 })
42 }
43
44 pub fn title(&self, pid: u32) -> Option<String> {
45 self.lock_system()
46 .process(sysinfo::Pid::from_u32(pid))
47 .map(|p| p.name().to_string_lossy().to_string())
48 }
49
50 pub fn is_running(&self, pid: u32) -> bool {
51 self.lock_system()
52 .process(sysinfo::Pid::from_u32(pid))
53 .is_some()
54 }
55
56 #[allow(dead_code)]
59 pub fn all_children(&self, pid: u32) -> Vec<u32> {
60 let system = self.lock_system();
61 let all = system.processes();
62 let mut children = vec![];
63 for (child_pid, process) in all {
64 let mut process = process;
65 while let Some(parent) = process.parent() {
66 if parent == sysinfo::Pid::from_u32(pid) {
67 children.push(child_pid.as_u32());
68 break;
69 }
70 match system.process(parent) {
71 Some(p) => process = p,
72 None => break,
73 }
74 }
75 }
76 children
77 }
78 pub fn collect_process_tree_info(&self) -> (ParentToChildren, ProcessNames) {
83 let system = self.lock_system();
84 let all = system.processes();
85 let mut parent_to_children: ParentToChildren = HashMap::new();
86 let mut process_info: ProcessNames = HashMap::new();
87
88 for (pid, proc) in all {
89 let pid_u32 = pid.as_u32();
90 process_info.insert(
91 pid_u32,
92 (
93 proc.name().to_string_lossy().to_string(),
94 proc.exe().map(|e| e.to_string_lossy().to_string()),
95 ),
96 );
97
98 if let Some(ppid) = proc.parent() {
99 parent_to_children
100 .entry(ppid.as_u32())
101 .or_default()
102 .push(pid_u32);
103 }
104 }
105
106 (parent_to_children, process_info)
107 }
108 pub async fn kill_process_group_async(
109 &self,
110 pid: u32,
111 stop_signal: i32,
112 stop_timeout: Option<std::time::Duration>,
113 ) -> Result<bool> {
114 tokio::task::spawn_blocking(move || {
115 PROCS.kill_process_group(pid, stop_signal, stop_timeout)
116 })
117 .await
118 .into_diagnostic()?
119 }
120
121 #[cfg(unix)]
130 fn kill_process_group(
131 &self,
132 pid: u32,
133 stop_signal: i32,
134 stop_timeout: Option<std::time::Duration>,
135 ) -> Result<bool> {
136 let pgid = pid as i32;
137 let signal_name = signal_name(stop_signal);
138
139 debug!("killing process group {pgid} with {signal_name}");
140
141 let ret = unsafe { libc::killpg(pgid, stop_signal) };
146 if ret == -1 {
147 let err = std::io::Error::last_os_error();
148 if err.raw_os_error() == Some(libc::ESRCH) {
149 debug!("process group {pgid} no longer exists");
150 return Ok(false);
151 }
152 if err.raw_os_error() == Some(libc::EPERM) {
153 return Err(miette::miette!(
154 "failed to send {signal_name} to process group {pgid}: permission denied"
155 ));
156 }
157 warn!("failed to send {signal_name} to process group {pgid}: {err}");
158 }
159
160 let stop_timeout = stop_timeout.unwrap_or_else(|| settings().supervisor_stop_timeout());
163 let fast_ms = 10u64;
164 let slow_ms = 50u64;
165 let total_ms = stop_timeout.as_millis().max(1) as u64;
166 let fast_count = ((total_ms / fast_ms) as usize).min(10);
167 let fast_total_ms = fast_ms * fast_count as u64;
168 let remaining_ms = total_ms.saturating_sub(fast_total_ms);
169 let slow_count = (remaining_ms / slow_ms) as usize;
170
171 let fast_checks =
172 std::iter::repeat_n(std::time::Duration::from_millis(fast_ms), fast_count);
173 let slow_checks =
174 std::iter::repeat_n(std::time::Duration::from_millis(slow_ms), slow_count);
175 let mut elapsed_ms = 0u64;
176
177 for sleep_duration in fast_checks.chain(slow_checks) {
178 std::thread::sleep(sleep_duration);
179 self.refresh_pids(&[pid]);
180 elapsed_ms += sleep_duration.as_millis() as u64;
181 if self.is_terminated_or_zombie(sysinfo::Pid::from_u32(pid)) {
182 debug!("process group {pgid} terminated after {signal_name} ({elapsed_ms} ms)",);
183 return Ok(true);
184 }
185 }
186
187 warn!(
189 "process group {pgid} did not respond to {signal_name} after {}ms, sending SIGKILL",
190 stop_timeout.as_millis()
191 );
192 let ret = unsafe { libc::killpg(pgid, libc::SIGKILL) };
193 if ret == -1 {
194 let err = std::io::Error::last_os_error();
195 if err.raw_os_error() != Some(libc::ESRCH) {
196 warn!("failed to send SIGKILL to process group {pgid}: {err}");
197 }
198 }
199
200 std::thread::sleep(std::time::Duration::from_millis(100));
202 Ok(true)
203 }
204
205 #[cfg(not(unix))]
206 fn kill_process_group(
207 &self,
208 pid: u32,
209 _stop_signal: i32,
210 _stop_timeout: Option<std::time::Duration>,
211 ) -> Result<bool> {
212 self.kill(pid, 0, None)
213 }
214
215 pub async fn kill_async(
216 &self,
217 pid: u32,
218 stop_signal: i32,
219 stop_timeout: Option<std::time::Duration>,
220 ) -> Result<bool> {
221 tokio::task::spawn_blocking(move || PROCS.kill(pid, stop_signal, stop_timeout))
222 .await
223 .into_diagnostic()?
224 }
225
226 fn kill(
236 &self,
237 pid: u32,
238 stop_signal: i32,
239 stop_timeout: Option<std::time::Duration>,
240 ) -> Result<bool> {
241 let sysinfo_pid = sysinfo::Pid::from_u32(pid);
242
243 if self.is_terminated_or_zombie(sysinfo_pid) {
245 return Ok(false);
246 }
247
248 debug!("killing process {pid}");
249
250 #[cfg(windows)]
251 {
252 let _ = (stop_signal, stop_timeout);
253 if let Some(process) = self.lock_system().process(sysinfo_pid) {
254 process.kill();
255 process.wait();
256 }
257 Ok(true)
258 }
259
260 #[cfg(unix)]
261 {
262 let signal_name = signal_name(stop_signal);
263 debug!("sending {signal_name} to process {pid}");
267 let ret = unsafe { libc::kill(pid as i32, stop_signal) };
268 if ret == -1 {
269 let err = std::io::Error::last_os_error();
270 if err.raw_os_error() == Some(libc::ESRCH) {
271 debug!("process {pid} no longer exists");
272 return Ok(false);
273 }
274 if err.raw_os_error() == Some(libc::EPERM) {
275 return Err(miette::miette!(
276 "failed to send {signal_name} to process {pid}: permission denied"
277 ));
278 }
279 return Err(miette::miette!(
280 "failed to send {signal_name} to process {pid}: {err}"
281 ));
282 }
283
284 let stop_timeout = stop_timeout.unwrap_or_else(|| settings().supervisor_stop_timeout());
287 let fast_ms = 10u64;
288 let slow_ms = 50u64;
289 let total_ms = stop_timeout.as_millis().max(1) as u64;
290 let fast_count = ((total_ms / fast_ms) as usize).min(10);
291 let fast_total_ms = fast_ms * fast_count as u64;
292 let remaining_ms = total_ms.saturating_sub(fast_total_ms);
293 let slow_count = (remaining_ms / slow_ms) as usize;
294
295 for i in 0..fast_count {
296 std::thread::sleep(std::time::Duration::from_millis(fast_ms));
297 self.refresh_pids(&[pid]);
298 if self.is_terminated_or_zombie(sysinfo_pid) {
299 debug!(
300 "process {pid} terminated after {signal_name} ({} ms)",
301 (i + 1) * fast_ms as usize
302 );
303 return Ok(true);
304 }
305 }
306
307 for i in 0..slow_count {
309 std::thread::sleep(std::time::Duration::from_millis(slow_ms));
310 self.refresh_pids(&[pid]);
311 if self.is_terminated_or_zombie(sysinfo_pid) {
312 debug!(
313 "process {pid} terminated after {signal_name} ({} ms)",
314 fast_total_ms + (i + 1) as u64 * slow_ms
315 );
316 return Ok(true);
317 }
318 }
319
320 warn!(
322 "process {pid} did not respond to {signal_name} after {}ms, sending SIGKILL",
323 stop_timeout.as_millis()
324 );
325 let ret = unsafe { libc::kill(pid as i32, libc::SIGKILL) };
326 if ret == -1 {
327 let err = std::io::Error::last_os_error();
328 if err.raw_os_error() != Some(libc::ESRCH) {
329 warn!("failed to send SIGKILL to process {pid}: {err}");
330 }
331 }
332
333 std::thread::sleep(std::time::Duration::from_millis(100));
335 Ok(true)
336 }
337 }
338
339 fn is_terminated_or_zombie(&self, sysinfo_pid: sysinfo::Pid) -> bool {
343 let system = self.lock_system();
344 match system.process(sysinfo_pid) {
345 None => true,
346 Some(process) => {
347 #[cfg(unix)]
348 {
349 matches!(process.status(), sysinfo::ProcessStatus::Zombie)
350 }
351 #[cfg(not(unix))]
352 {
353 let _ = process;
354 false
355 }
356 }
357 }
358 }
359
360 pub(crate) fn refresh_processes(&self) {
361 self.lock_system()
362 .refresh_processes(ProcessesToUpdate::All, true);
363 }
364
365 pub(crate) fn refresh_pids(&self, pids: &[u32]) {
368 let sysinfo_pids: Vec<sysinfo::Pid> =
369 pids.iter().map(|p| sysinfo::Pid::from_u32(*p)).collect();
370 self.lock_system()
371 .refresh_processes(ProcessesToUpdate::Some(&sysinfo_pids), true);
372 }
373
374 pub fn get_batch_group_stats(&self, pids: &[u32]) -> Vec<(u32, Option<ProcessStats>)> {
380 if pids.is_empty() {
381 return Vec::new();
382 }
383
384 let system = self.lock_system();
385 let processes = system.processes();
386
387 let now = std::time::SystemTime::now()
388 .duration_since(std::time::UNIX_EPOCH)
389 .map(|d| d.as_secs())
390 .unwrap_or(0);
391
392 let mut children_map: std::collections::HashMap<sysinfo::Pid, Vec<sysinfo::Pid>> =
394 std::collections::HashMap::new();
395 for (child_pid, child) in processes {
396 if child.thread_kind().is_some() {
399 continue;
400 }
401 if let Some(ppid) = child.parent() {
402 children_map.entry(ppid).or_default().push(*child_pid);
403 }
404 }
405
406 pids.iter()
407 .map(|&pid| {
408 let root_pid = sysinfo::Pid::from_u32(pid);
409 let Some(root) = processes.get(&root_pid) else {
410 return (pid, None);
411 };
412
413 let root_disk = root.disk_usage();
414 let mut stats = ProcessStats {
415 cpu_percent: root.cpu_usage(),
416 memory_bytes: root.memory(),
417 uptime_secs: now.saturating_sub(root.start_time()),
418 disk_read_bytes: root_disk.read_bytes,
419 disk_write_bytes: root_disk.written_bytes,
420 };
421
422 let mut queue = std::collections::VecDeque::new();
424 if let Some(direct_children) = children_map.get(&root_pid) {
425 queue.extend(direct_children);
426 }
427 while let Some(child_pid) = queue.pop_front() {
428 if let Some(child) = processes.get(&child_pid) {
429 let disk = child.disk_usage();
430 stats.cpu_percent += child.cpu_usage();
431 stats.memory_bytes += child.memory();
432 stats.disk_read_bytes += disk.read_bytes;
433 stats.disk_write_bytes += disk.written_bytes;
434 }
435 if let Some(grandchildren) = children_map.get(&child_pid) {
436 queue.extend(grandchildren);
437 }
438 }
439
440 (pid, Some(stats))
441 })
442 .collect()
443 }
444 pub fn refresh_and_get_batch_stats(&self, pids: &[u32]) -> HashMap<u32, ProcessStats> {
451 self.refresh_processes();
452 self.get_batch_group_stats(pids)
453 .into_iter()
454 .filter_map(|(pid, stats)| stats.map(|s| (pid, s)))
455 .collect()
456 }
457
458 pub fn get_batch_tree_stats_map(&self, pids: &[u32]) -> HashMap<u32, ProcessStats> {
460 self.get_batch_group_stats(pids)
461 .into_iter()
462 .filter_map(|(pid, stats)| stats.map(|stats| (pid, stats)))
463 .collect()
464 }
465
466 pub fn get_stats(&self, pid: u32) -> Option<ProcessStats> {
468 self.get_batch_group_stats(&[pid])
469 .into_iter()
470 .next()
471 .and_then(|(_, stats)| stats)
472 }
473
474 pub fn get_extended_stats(&self, pid: u32) -> Option<ExtendedProcessStats> {
476 let system = self.lock_system();
477 let processes = system.processes();
478 let root_pid = sysinfo::Pid::from_u32(pid);
479 let p = processes.get(&root_pid)?;
480
481 let now = std::time::SystemTime::now()
482 .duration_since(std::time::UNIX_EPOCH)
483 .map(|d| d.as_secs())
484 .unwrap_or(0);
485
486 let root_disk = p.disk_usage();
487 let mut aggregate_stats = ProcessStats {
488 cpu_percent: p.cpu_usage(),
489 memory_bytes: p.memory(),
490 uptime_secs: now.saturating_sub(p.start_time()),
491 disk_read_bytes: root_disk.read_bytes,
492 disk_write_bytes: root_disk.written_bytes,
493 };
494
495 let mut children_map: HashMap<sysinfo::Pid, Vec<sysinfo::Pid>> = HashMap::new();
496 for (child_pid, child) in processes {
497 if let Some(ppid) = child.parent() {
498 children_map.entry(ppid).or_default().push(*child_pid);
499 }
500 }
501
502 let mut queue = std::collections::VecDeque::new();
503 if let Some(direct_children) = children_map.get(&root_pid) {
504 queue.extend(direct_children);
505 }
506 while let Some(child_pid) = queue.pop_front() {
507 if let Some(child) = processes.get(&child_pid) {
508 let disk = child.disk_usage();
509 aggregate_stats.cpu_percent += child.cpu_usage();
510 aggregate_stats.memory_bytes += child.memory();
511 aggregate_stats.disk_read_bytes += disk.read_bytes;
512 aggregate_stats.disk_write_bytes += disk.written_bytes;
513 }
514 if let Some(grandchildren) = children_map.get(&child_pid) {
515 queue.extend(grandchildren);
516 }
517 }
518
519 Some(ExtendedProcessStats {
520 name: p.name().to_string_lossy().to_string(),
521 status: format!("{:?}", p.status()),
522 cpu_percent: aggregate_stats.cpu_percent,
523 memory_bytes: aggregate_stats.memory_bytes,
524 virtual_memory_bytes: p.virtual_memory(),
525 uptime_secs: aggregate_stats.uptime_secs,
526 thread_count: p.tasks().map(|t| t.len()).unwrap_or(0),
527 })
528 }
529}
530
531#[derive(Debug, Clone, Copy)]
532pub struct ProcessStats {
533 pub cpu_percent: f32,
534 pub memory_bytes: u64,
535 pub uptime_secs: u64,
536 pub disk_read_bytes: u64,
537 pub disk_write_bytes: u64,
538}
539
540impl ProcessStats {
541 pub fn memory_display(&self) -> String {
542 format_bytes(self.memory_bytes)
543 }
544
545 pub fn cpu_display(&self) -> String {
546 format!("{:.1}%", self.cpu_percent)
547 }
548
549 pub fn uptime_display(&self) -> String {
550 format_duration(self.uptime_secs)
551 }
552
553 pub fn disk_read_display(&self) -> String {
554 format_bytes_per_sec(self.disk_read_bytes)
555 }
556
557 pub fn disk_write_display(&self) -> String {
558 format_bytes_per_sec(self.disk_write_bytes)
559 }
560}
561
562#[derive(Debug, Clone)]
563pub struct ExtendedProcessStats {
564 pub name: String,
565 pub status: String,
566 pub cpu_percent: f32,
567 pub memory_bytes: u64,
568 pub virtual_memory_bytes: u64,
569 pub uptime_secs: u64,
570 pub thread_count: usize,
571}
572
573fn format_bytes(bytes: u64) -> String {
574 if bytes < 1024 {
575 format!("{bytes}B")
576 } else if bytes < 1024 * 1024 {
577 format!("{:.1}KB", bytes as f64 / 1024.0)
578 } else if bytes < 1024 * 1024 * 1024 {
579 format!("{:.1}MB", bytes as f64 / (1024.0 * 1024.0))
580 } else {
581 format!("{:.1}GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
582 }
583}
584
585fn format_duration(secs: u64) -> String {
586 if secs < 60 {
587 format!("{secs}s")
588 } else if secs < 3600 {
589 format!("{}m {}s", secs / 60, secs % 60)
590 } else if secs < 86400 {
591 let hours = secs / 3600;
592 let mins = (secs % 3600) / 60;
593 format!("{hours}h {mins}m")
594 } else {
595 let days = secs / 86400;
596 let hours = (secs % 86400) / 3600;
597 format!("{days}d {hours}h")
598 }
599}
600
601fn format_bytes_per_sec(bytes: u64) -> String {
602 if bytes < 1024 {
603 format!("{bytes}B/s")
604 } else if bytes < 1024 * 1024 {
605 format!("{:.1}KB/s", bytes as f64 / 1024.0)
606 } else if bytes < 1024 * 1024 * 1024 {
607 format!("{:.1}MB/s", bytes as f64 / (1024.0 * 1024.0))
608 } else {
609 format!("{:.1}GB/s", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
610 }
611}
612
613#[cfg(unix)]
614fn signal_name(sig: i32) -> &'static str {
615 match sig {
616 libc::SIGHUP => "SIGHUP",
617 libc::SIGINT => "SIGINT",
618 libc::SIGQUIT => "SIGQUIT",
619 libc::SIGTERM => "SIGTERM",
620 libc::SIGUSR1 => "SIGUSR1",
621 libc::SIGUSR2 => "SIGUSR2",
622 libc::SIGKILL => "SIGKILL",
623 _ => "UNKNOWN",
624 }
625}
626
627#[cfg(all(test, unix))]
628mod tests {
629 use super::*;
630 use std::os::unix::process::CommandExt;
631 use std::process::{Child, Command, Stdio};
632 use std::time::{Duration, Instant};
633
634 struct ChildGuard(Child);
635
636 impl Drop for ChildGuard {
637 fn drop(&mut self) {
638 let pid = self.0.id() as i32;
639 let _ = unsafe { libc::killpg(pid, libc::SIGKILL) };
641 let _ = self.0.wait();
642 }
643 }
644
645 #[test]
646 fn get_stats_includes_descendant_rss() {
647 let mut command = Command::new("sh");
648 command
649 .args(["-c", "sleep 30 & wait"])
650 .stdin(Stdio::null())
651 .stdout(Stdio::null())
652 .stderr(Stdio::null());
653 unsafe {
654 command.pre_exec(|| {
655 if libc::setsid() == -1 {
656 return Err(std::io::Error::last_os_error());
657 }
658 Ok(())
659 });
660 }
661
662 let parent = command.spawn().expect("failed to spawn process tree");
663 let parent_pid = parent.id();
664 let _parent = ChildGuard(parent);
665
666 let procs = Procs::new();
667 let deadline = Instant::now() + Duration::from_secs(5);
668 let mut child_pids = Vec::new();
669 while Instant::now() < deadline {
670 procs.refresh_processes();
671 child_pids = procs.all_children(parent_pid);
672 if !child_pids.is_empty() {
673 break;
674 }
675 std::thread::sleep(Duration::from_millis(50));
676 }
677 assert!(
678 !child_pids.is_empty(),
679 "test process tree did not appear under parent pid {parent_pid}"
680 );
681
682 procs.refresh_processes();
683 child_pids = procs.all_children(parent_pid);
684 assert!(
685 !child_pids.is_empty(),
686 "test process tree disappeared under parent pid {parent_pid}"
687 );
688 let root_pid = sysinfo::Pid::from_u32(parent_pid);
689 let direct_memory = {
690 let system = procs.lock_system();
691 system
692 .process(root_pid)
693 .expect("parent process should exist")
694 .memory()
695 };
696 let descendant_memory = {
697 let system = procs.lock_system();
698 child_pids
699 .iter()
700 .filter_map(|pid| system.process(sysinfo::Pid::from_u32(*pid)))
701 .map(|process| process.memory())
702 .sum::<u64>()
703 };
704 assert!(
705 descendant_memory > 0,
706 "descendants {child_pids:?} should have nonzero RSS"
707 );
708
709 let stats = procs
710 .get_stats(parent_pid)
711 .expect("parent process should have aggregate stats");
712
713 assert_eq!(
714 stats.memory_bytes,
715 direct_memory + descendant_memory,
716 "get_stats should include descendant RSS for parent pid {parent_pid}; \
717 descendants: {child_pids:?}, direct RSS: {direct_memory}, \
718 descendant RSS: {descendant_memory}, reported RSS: {}",
719 stats.memory_bytes
720 );
721 }
722}