1use crate::Result;
2#[cfg(unix)]
3use crate::settings::settings;
4use miette::IntoDiagnostic;
5use once_cell::sync::Lazy;
6use std::sync::Mutex;
7use sysinfo::ProcessesToUpdate;
8
9pub struct Procs {
10 system: Mutex<sysinfo::System>,
11}
12
13pub static PROCS: Lazy<Procs> = Lazy::new(Procs::new);
14
15impl Default for Procs {
16 fn default() -> Self {
17 Self::new()
18 }
19}
20
21impl Procs {
22 pub fn new() -> Self {
23 Self {
24 system: Mutex::new(sysinfo::System::new()),
25 }
26 }
27
28 fn lock_system(&self) -> std::sync::MutexGuard<'_, sysinfo::System> {
29 self.system.lock().unwrap_or_else(|poisoned| {
30 warn!("System mutex was poisoned, recovering");
31 poisoned.into_inner()
32 })
33 }
34
35 pub fn title(&self, pid: u32) -> Option<String> {
36 self.lock_system()
37 .process(sysinfo::Pid::from_u32(pid))
38 .map(|p| p.name().to_string_lossy().to_string())
39 }
40
41 pub fn is_running(&self, pid: u32) -> bool {
42 self.lock_system()
43 .process(sysinfo::Pid::from_u32(pid))
44 .is_some()
45 }
46
47 #[allow(dead_code)]
50 pub fn all_children(&self, pid: u32) -> Vec<u32> {
51 let system = self.lock_system();
52 let all = system.processes();
53 let mut children = vec![];
54 for (child_pid, process) in all {
55 let mut process = process;
56 while let Some(parent) = process.parent() {
57 if parent == sysinfo::Pid::from_u32(pid) {
58 children.push(child_pid.as_u32());
59 break;
60 }
61 match system.process(parent) {
62 Some(p) => process = p,
63 None => break,
64 }
65 }
66 }
67 children
68 }
69
70 pub async fn kill_process_group_async(
72 &self,
73 pid: u32,
74 stop_signal: i32,
75 stop_timeout: Option<std::time::Duration>,
76 ) -> Result<bool> {
77 tokio::task::spawn_blocking(move || {
78 PROCS.kill_process_group(pid, stop_signal, stop_timeout)
79 })
80 .await
81 .into_diagnostic()?
82 }
83
84 #[cfg(unix)]
98 fn kill_process_group(
99 &self,
100 pid: u32,
101 stop_signal: i32,
102 stop_timeout: Option<std::time::Duration>,
103 ) -> Result<bool> {
104 let pgid = pid as i32;
105 let signal_name = signal_name(stop_signal);
106
107 debug!("killing process group {pgid} with {signal_name}");
108
109 let ret = unsafe { libc::killpg(pgid, stop_signal) };
114 if ret == -1 {
115 let err = std::io::Error::last_os_error();
116 if err.raw_os_error() == Some(libc::ESRCH) {
117 debug!("process group {pgid} no longer exists");
118 return Ok(false);
119 }
120 if err.raw_os_error() == Some(libc::EPERM) {
121 return Err(miette::miette!(
122 "failed to send {signal_name} to process group {pgid}: permission denied"
123 ));
124 }
125 warn!("failed to send {signal_name} to process group {pgid}: {err}");
126 }
127
128 let stop_timeout = stop_timeout.unwrap_or_else(|| settings().supervisor_stop_timeout());
131 let fast_ms = 10u64;
132 let slow_ms = 50u64;
133 let total_ms = stop_timeout.as_millis().max(1) as u64;
134 let fast_count = ((total_ms / fast_ms) as usize).min(10);
135 let fast_total_ms = fast_ms * fast_count as u64;
136 let remaining_ms = total_ms.saturating_sub(fast_total_ms);
137 let slow_count = (remaining_ms / slow_ms) as usize;
138
139 let fast_checks =
140 std::iter::repeat_n(std::time::Duration::from_millis(fast_ms), fast_count);
141 let slow_checks =
142 std::iter::repeat_n(std::time::Duration::from_millis(slow_ms), slow_count);
143 let mut elapsed_ms = 0u64;
144
145 for sleep_duration in fast_checks.chain(slow_checks) {
146 std::thread::sleep(sleep_duration);
147 self.refresh_pids(&[pid]);
148 elapsed_ms += sleep_duration.as_millis() as u64;
149 if self.is_terminated_or_zombie(sysinfo::Pid::from_u32(pid)) {
150 debug!("process group {pgid} terminated after {signal_name} ({elapsed_ms} ms)",);
151 return Ok(true);
152 }
153 }
154
155 warn!(
157 "process group {pgid} did not respond to {signal_name} after {}ms, sending SIGKILL",
158 stop_timeout.as_millis()
159 );
160 let ret = unsafe { libc::killpg(pgid, libc::SIGKILL) };
161 if ret == -1 {
162 let err = std::io::Error::last_os_error();
163 if err.raw_os_error() != Some(libc::ESRCH) {
164 warn!("failed to send SIGKILL to process group {pgid}: {err}");
165 }
166 }
167
168 std::thread::sleep(std::time::Duration::from_millis(100));
170 Ok(true)
171 }
172
173 #[cfg(not(unix))]
174 fn kill_process_group(
175 &self,
176 pid: u32,
177 _stop_signal: i32,
178 _stop_timeout: Option<std::time::Duration>,
179 ) -> Result<bool> {
180 self.kill(pid, 0, None)
181 }
182
183 pub async fn kill_async(
185 &self,
186 pid: u32,
187 stop_signal: i32,
188 stop_timeout: Option<std::time::Duration>,
189 ) -> Result<bool> {
190 tokio::task::spawn_blocking(move || PROCS.kill(pid, stop_signal, stop_timeout))
191 .await
192 .into_diagnostic()?
193 }
194
195 fn kill(
210 &self,
211 pid: u32,
212 stop_signal: i32,
213 stop_timeout: Option<std::time::Duration>,
214 ) -> Result<bool> {
215 let sysinfo_pid = sysinfo::Pid::from_u32(pid);
216
217 if self.is_terminated_or_zombie(sysinfo_pid) {
219 return Ok(false);
220 }
221
222 debug!("killing process {pid}");
223
224 #[cfg(windows)]
225 {
226 let _ = (stop_signal, stop_timeout);
227 if let Some(process) = self.lock_system().process(sysinfo_pid) {
228 process.kill();
229 process.wait();
230 }
231 Ok(true)
232 }
233
234 #[cfg(unix)]
235 {
236 let signal_name = signal_name(stop_signal);
237 debug!("sending {signal_name} to process {pid}");
241 let ret = unsafe { libc::kill(pid as i32, stop_signal) };
242 if ret == -1 {
243 let err = std::io::Error::last_os_error();
244 if err.raw_os_error() == Some(libc::ESRCH) {
245 debug!("process {pid} no longer exists");
246 return Ok(false);
247 }
248 if err.raw_os_error() == Some(libc::EPERM) {
249 return Err(miette::miette!(
250 "failed to send {signal_name} to process {pid}: permission denied"
251 ));
252 }
253 return Err(miette::miette!(
254 "failed to send {signal_name} to process {pid}: {err}"
255 ));
256 }
257
258 let stop_timeout = stop_timeout.unwrap_or_else(|| settings().supervisor_stop_timeout());
261 let fast_ms = 10u64;
262 let slow_ms = 50u64;
263 let total_ms = stop_timeout.as_millis().max(1) as u64;
264 let fast_count = ((total_ms / fast_ms) as usize).min(10);
265 let fast_total_ms = fast_ms * fast_count as u64;
266 let remaining_ms = total_ms.saturating_sub(fast_total_ms);
267 let slow_count = (remaining_ms / slow_ms) as usize;
268
269 for i in 0..fast_count {
270 std::thread::sleep(std::time::Duration::from_millis(fast_ms));
271 self.refresh_pids(&[pid]);
272 if self.is_terminated_or_zombie(sysinfo_pid) {
273 debug!(
274 "process {pid} terminated after {signal_name} ({} ms)",
275 (i + 1) * fast_ms as usize
276 );
277 return Ok(true);
278 }
279 }
280
281 for i in 0..slow_count {
283 std::thread::sleep(std::time::Duration::from_millis(slow_ms));
284 self.refresh_pids(&[pid]);
285 if self.is_terminated_or_zombie(sysinfo_pid) {
286 debug!(
287 "process {pid} terminated after {signal_name} ({} ms)",
288 fast_total_ms + (i + 1) as u64 * slow_ms
289 );
290 return Ok(true);
291 }
292 }
293
294 warn!(
296 "process {pid} did not respond to {signal_name} after {}ms, sending SIGKILL",
297 stop_timeout.as_millis()
298 );
299 let ret = unsafe { libc::kill(pid as i32, libc::SIGKILL) };
300 if ret == -1 {
301 let err = std::io::Error::last_os_error();
302 if err.raw_os_error() != Some(libc::ESRCH) {
303 warn!("failed to send SIGKILL to process {pid}: {err}");
304 }
305 }
306
307 std::thread::sleep(std::time::Duration::from_millis(100));
309 Ok(true)
310 }
311 }
312
313 fn is_terminated_or_zombie(&self, sysinfo_pid: sysinfo::Pid) -> bool {
317 let system = self.lock_system();
318 match system.process(sysinfo_pid) {
319 None => true,
320 Some(process) => {
321 #[cfg(unix)]
322 {
323 matches!(process.status(), sysinfo::ProcessStatus::Zombie)
324 }
325 #[cfg(not(unix))]
326 {
327 let _ = process;
328 false
329 }
330 }
331 }
332 }
333
334 pub(crate) fn refresh_processes(&self) {
335 self.lock_system()
336 .refresh_processes(ProcessesToUpdate::All, true);
337 }
338
339 pub(crate) fn refresh_pids(&self, pids: &[u32]) {
342 let sysinfo_pids: Vec<sysinfo::Pid> =
343 pids.iter().map(|p| sysinfo::Pid::from_u32(*p)).collect();
344 self.lock_system()
345 .refresh_processes(ProcessesToUpdate::Some(&sysinfo_pids), true);
346 }
347
348 pub(crate) fn refresh_daemon_pids(&self, state: &crate::state_file::StateFile) {
352 let pids: Vec<u32> = state.daemons.values().filter_map(|d| d.pid).collect();
353 if !pids.is_empty() {
354 self.refresh_pids(&pids);
355 }
356 }
357
358 pub fn get_batch_group_stats(&self, pids: &[u32]) -> Vec<(u32, Option<ProcessStats>)> {
364 let system = self.lock_system();
365 let processes = system.processes();
366
367 let now = std::time::SystemTime::now()
368 .duration_since(std::time::UNIX_EPOCH)
369 .map(|d| d.as_secs())
370 .unwrap_or(0);
371
372 let mut children_map: std::collections::HashMap<sysinfo::Pid, Vec<sysinfo::Pid>> =
374 std::collections::HashMap::new();
375 for (child_pid, child) in processes {
376 if let Some(ppid) = child.parent() {
377 children_map.entry(ppid).or_default().push(*child_pid);
378 }
379 }
380
381 pids.iter()
382 .map(|&pid| {
383 let root_pid = sysinfo::Pid::from_u32(pid);
384 let Some(root) = processes.get(&root_pid) else {
385 return (pid, None);
386 };
387
388 let root_disk = root.disk_usage();
389 let mut stats = ProcessStats {
390 cpu_percent: root.cpu_usage(),
391 memory_bytes: root.memory(),
392 uptime_secs: now.saturating_sub(root.start_time()),
393 disk_read_bytes: root_disk.read_bytes,
394 disk_write_bytes: root_disk.written_bytes,
395 };
396
397 let mut queue = std::collections::VecDeque::new();
399 if let Some(direct_children) = children_map.get(&root_pid) {
400 queue.extend(direct_children);
401 }
402 while let Some(child_pid) = queue.pop_front() {
403 if let Some(child) = processes.get(&child_pid) {
404 let disk = child.disk_usage();
405 stats.cpu_percent += child.cpu_usage();
406 stats.memory_bytes += child.memory();
407 stats.disk_read_bytes += disk.read_bytes;
408 stats.disk_write_bytes += disk.written_bytes;
409 }
410 if let Some(grandchildren) = children_map.get(&child_pid) {
411 queue.extend(grandchildren);
412 }
413 }
414
415 (pid, Some(stats))
416 })
417 .collect()
418 }
419
420 pub fn get_stats(&self, pid: u32) -> Option<ProcessStats> {
422 let system = self.lock_system();
423 system.process(sysinfo::Pid::from_u32(pid)).map(|p| {
424 let now = std::time::SystemTime::now()
425 .duration_since(std::time::UNIX_EPOCH)
426 .map(|d| d.as_secs())
427 .unwrap_or(0);
428 let disk = p.disk_usage();
429 ProcessStats {
430 cpu_percent: p.cpu_usage(),
431 memory_bytes: p.memory(),
432 uptime_secs: now.saturating_sub(p.start_time()),
433 disk_read_bytes: disk.read_bytes,
434 disk_write_bytes: disk.written_bytes,
435 }
436 })
437 }
438
439 pub fn get_extended_stats(&self, pid: u32) -> Option<ExtendedProcessStats> {
441 let system = self.lock_system();
442 system.process(sysinfo::Pid::from_u32(pid)).map(|p| {
443 let now = std::time::SystemTime::now()
444 .duration_since(std::time::UNIX_EPOCH)
445 .map(|d| d.as_secs())
446 .unwrap_or(0);
447 let disk = p.disk_usage();
448
449 ExtendedProcessStats {
450 name: p.name().to_string_lossy().to_string(),
451 exe_path: p.exe().map(|e| e.to_string_lossy().to_string()),
452 cwd: p.cwd().map(|c| c.to_string_lossy().to_string()),
453 environ: p
454 .environ()
455 .iter()
456 .take(20)
457 .map(|s| s.to_string_lossy().to_string())
458 .collect(),
459 status: format!("{:?}", p.status()),
460 cpu_percent: p.cpu_usage(),
461 memory_bytes: p.memory(),
462 virtual_memory_bytes: p.virtual_memory(),
463 uptime_secs: now.saturating_sub(p.start_time()),
464 start_time: p.start_time(),
465 disk_read_bytes: disk.read_bytes,
466 disk_write_bytes: disk.written_bytes,
467 parent_pid: p.parent().map(|pp| pp.as_u32()),
468 thread_count: p.tasks().map(|t| t.len()).unwrap_or(0),
469 user_id: p.user_id().map(|u| u.to_string()),
470 }
471 })
472 }
473}
474
475#[derive(Debug, Clone, Copy)]
476pub struct ProcessStats {
477 pub cpu_percent: f32,
478 pub memory_bytes: u64,
479 pub uptime_secs: u64,
480 pub disk_read_bytes: u64,
481 pub disk_write_bytes: u64,
482}
483
484impl ProcessStats {
485 pub fn memory_display(&self) -> String {
486 format_bytes(self.memory_bytes)
487 }
488
489 pub fn cpu_display(&self) -> String {
490 format!("{:.1}%", self.cpu_percent)
491 }
492
493 pub fn uptime_display(&self) -> String {
494 format_duration(self.uptime_secs)
495 }
496
497 pub fn disk_read_display(&self) -> String {
498 format_bytes_per_sec(self.disk_read_bytes)
499 }
500
501 pub fn disk_write_display(&self) -> String {
502 format_bytes_per_sec(self.disk_write_bytes)
503 }
504}
505
506#[derive(Debug, Clone)]
508pub struct ExtendedProcessStats {
509 pub name: String,
510 pub exe_path: Option<String>,
511 pub cwd: Option<String>,
512 pub environ: Vec<String>,
513 pub status: String,
514 pub cpu_percent: f32,
515 pub memory_bytes: u64,
516 pub virtual_memory_bytes: u64,
517 pub uptime_secs: u64,
518 pub start_time: u64,
519 pub disk_read_bytes: u64,
520 pub disk_write_bytes: u64,
521 pub parent_pid: Option<u32>,
522 pub thread_count: usize,
523 pub user_id: Option<String>,
524}
525
526impl ExtendedProcessStats {
527 pub fn memory_display(&self) -> String {
528 format_bytes(self.memory_bytes)
529 }
530
531 pub fn virtual_memory_display(&self) -> String {
532 format_bytes(self.virtual_memory_bytes)
533 }
534
535 pub fn cpu_display(&self) -> String {
536 format!("{:.1}%", self.cpu_percent)
537 }
538
539 pub fn uptime_display(&self) -> String {
540 format_duration(self.uptime_secs)
541 }
542
543 pub fn start_time_display(&self) -> String {
544 use std::time::{Duration, UNIX_EPOCH};
545 let datetime = UNIX_EPOCH + Duration::from_secs(self.start_time);
546 chrono::DateTime::<chrono::Local>::from(datetime)
547 .format("%Y-%m-%d %H:%M:%S")
548 .to_string()
549 }
550
551 pub fn disk_read_display(&self) -> String {
552 format_bytes_per_sec(self.disk_read_bytes)
553 }
554
555 pub fn disk_write_display(&self) -> String {
556 format_bytes_per_sec(self.disk_write_bytes)
557 }
558}
559
560fn format_bytes(bytes: u64) -> String {
561 if bytes < 1024 {
562 format!("{bytes}B")
563 } else if bytes < 1024 * 1024 {
564 format!("{:.1}KB", bytes as f64 / 1024.0)
565 } else if bytes < 1024 * 1024 * 1024 {
566 format!("{:.1}MB", bytes as f64 / (1024.0 * 1024.0))
567 } else {
568 format!("{:.1}GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
569 }
570}
571
572fn format_duration(secs: u64) -> String {
573 if secs < 60 {
574 format!("{secs}s")
575 } else if secs < 3600 {
576 format!("{}m {}s", secs / 60, secs % 60)
577 } else if secs < 86400 {
578 let hours = secs / 3600;
579 let mins = (secs % 3600) / 60;
580 format!("{hours}h {mins}m")
581 } else {
582 let days = secs / 86400;
583 let hours = (secs % 86400) / 3600;
584 format!("{days}d {hours}h")
585 }
586}
587
588fn format_bytes_per_sec(bytes: u64) -> String {
589 if bytes < 1024 {
590 format!("{bytes}B/s")
591 } else if bytes < 1024 * 1024 {
592 format!("{:.1}KB/s", bytes as f64 / 1024.0)
593 } else if bytes < 1024 * 1024 * 1024 {
594 format!("{:.1}MB/s", bytes as f64 / (1024.0 * 1024.0))
595 } else {
596 format!("{:.1}GB/s", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
597 }
598}
599
600#[cfg(unix)]
601fn signal_name(sig: i32) -> &'static str {
602 match sig {
603 libc::SIGHUP => "SIGHUP",
604 libc::SIGINT => "SIGINT",
605 libc::SIGQUIT => "SIGQUIT",
606 libc::SIGTERM => "SIGTERM",
607 libc::SIGUSR1 => "SIGUSR1",
608 libc::SIGUSR2 => "SIGUSR2",
609 libc::SIGKILL => "SIGKILL",
610 _ => "UNKNOWN",
611 }
612}