Skip to main content

pitchfork_cli/
procs.rs

1use crate::Result;
2#[cfg(unix)]
3use crate::settings::settings;
4use miette::IntoDiagnostic;
5use once_cell::sync::Lazy;
6use std::sync::Mutex;
7use sysinfo::ProcessesToUpdate;
8
9pub struct Procs {
10    system: Mutex<sysinfo::System>,
11}
12
13pub static PROCS: Lazy<Procs> = Lazy::new(Procs::new);
14
15impl Default for Procs {
16    fn default() -> Self {
17        Self::new()
18    }
19}
20
21impl Procs {
22    pub fn new() -> Self {
23        Self {
24            system: Mutex::new(sysinfo::System::new()),
25        }
26    }
27
28    fn lock_system(&self) -> std::sync::MutexGuard<'_, sysinfo::System> {
29        self.system.lock().unwrap_or_else(|poisoned| {
30            warn!("System mutex was poisoned, recovering");
31            poisoned.into_inner()
32        })
33    }
34
35    pub fn title(&self, pid: u32) -> Option<String> {
36        self.lock_system()
37            .process(sysinfo::Pid::from_u32(pid))
38            .map(|p| p.name().to_string_lossy().to_string())
39    }
40
41    pub fn is_running(&self, pid: u32) -> bool {
42        self.lock_system()
43            .process(sysinfo::Pid::from_u32(pid))
44            .is_some()
45    }
46
47    /// Walk the /proc tree to find all descendant PIDs.
48    /// Kept for diagnostics/status display; no longer used in the kill path.
49    #[allow(dead_code)]
50    pub fn all_children(&self, pid: u32) -> Vec<u32> {
51        let system = self.lock_system();
52        let all = system.processes();
53        let mut children = vec![];
54        for (child_pid, process) in all {
55            let mut process = process;
56            while let Some(parent) = process.parent() {
57                if parent == sysinfo::Pid::from_u32(pid) {
58                    children.push(child_pid.as_u32());
59                    break;
60                }
61                match system.process(parent) {
62                    Some(p) => process = p,
63                    None => break,
64                }
65            }
66        }
67        children
68    }
69
70    /// Async wrapper for `kill_process_group`. See its docs for details and precondition.
71    pub async fn kill_process_group_async(
72        &self,
73        pid: u32,
74        stop_signal: i32,
75        stop_timeout: Option<std::time::Duration>,
76    ) -> Result<bool> {
77        tokio::task::spawn_blocking(move || {
78            PROCS.kill_process_group(pid, stop_signal, stop_timeout)
79        })
80        .await
81        .into_diagnostic()?
82    }
83
84    /// Kill an entire process group with graceful shutdown strategy:
85    /// 1. Send the configured stop signal to the process group (-pgid) and wait up to ~3s
86    /// 2. If any processes remain, send SIGKILL to the group
87    ///
88    /// Since daemons are spawned with setsid(), the daemon PID == PGID,
89    /// so this atomically signals all descendant processes.
90    ///
91    /// Returns `Err` if the signal could not be sent (e.g. permission denied).
92    ///
93    /// **Precondition:** the caller must have called `refresh_pids` or `refresh_processes`
94    /// for `pid` before invoking this method. Without a prior refresh, the internal
95    /// process map is empty and this method will incorrectly conclude the process is
96    /// already dead (`Ok(false)`) without sending any signal.
97    #[cfg(unix)]
98    fn kill_process_group(
99        &self,
100        pid: u32,
101        stop_signal: i32,
102        stop_timeout: Option<std::time::Duration>,
103    ) -> Result<bool> {
104        let pgid = pid as i32;
105        let signal_name = signal_name(stop_signal);
106
107        debug!("killing process group {pgid} with {signal_name}");
108
109        // Send the stop signal to the entire process group.
110        // killpg sends to all processes in the group atomically.
111        // We intentionally skip the zombie check here because the leader may be
112        // a zombie while children in the group are still running.
113        let ret = unsafe { libc::killpg(pgid, stop_signal) };
114        if ret == -1 {
115            let err = std::io::Error::last_os_error();
116            if err.raw_os_error() == Some(libc::ESRCH) {
117                debug!("process group {pgid} no longer exists");
118                return Ok(false);
119            }
120            if err.raw_os_error() == Some(libc::EPERM) {
121                return Err(miette::miette!(
122                    "failed to send {signal_name} to process group {pgid}: permission denied"
123                ));
124            }
125            warn!("failed to send {signal_name} to process group {pgid}: {err}");
126        }
127
128        // Wait for graceful shutdown: fast initial check then slower polling.
129        // Per-daemon timeout overrides the global setting.
130        let stop_timeout = stop_timeout.unwrap_or_else(|| settings().supervisor_stop_timeout());
131        let fast_ms = 10u64;
132        let slow_ms = 50u64;
133        let total_ms = stop_timeout.as_millis().max(1) as u64;
134        let fast_count = ((total_ms / fast_ms) as usize).min(10);
135        let fast_total_ms = fast_ms * fast_count as u64;
136        let remaining_ms = total_ms.saturating_sub(fast_total_ms);
137        let slow_count = (remaining_ms / slow_ms) as usize;
138
139        let fast_checks =
140            std::iter::repeat_n(std::time::Duration::from_millis(fast_ms), fast_count);
141        let slow_checks =
142            std::iter::repeat_n(std::time::Duration::from_millis(slow_ms), slow_count);
143        let mut elapsed_ms = 0u64;
144
145        for sleep_duration in fast_checks.chain(slow_checks) {
146            std::thread::sleep(sleep_duration);
147            self.refresh_pids(&[pid]);
148            elapsed_ms += sleep_duration.as_millis() as u64;
149            if self.is_terminated_or_zombie(sysinfo::Pid::from_u32(pid)) {
150                debug!("process group {pgid} terminated after {signal_name} ({elapsed_ms} ms)",);
151                return Ok(true);
152            }
153        }
154
155        // SIGKILL the entire process group as last resort
156        warn!(
157            "process group {pgid} did not respond to {signal_name} after {}ms, sending SIGKILL",
158            stop_timeout.as_millis()
159        );
160        let ret = unsafe { libc::killpg(pgid, libc::SIGKILL) };
161        if ret == -1 {
162            let err = std::io::Error::last_os_error();
163            if err.raw_os_error() != Some(libc::ESRCH) {
164                warn!("failed to send SIGKILL to process group {pgid}: {err}");
165            }
166        }
167
168        // Brief wait for SIGKILL to take effect
169        std::thread::sleep(std::time::Duration::from_millis(100));
170        Ok(true)
171    }
172
173    #[cfg(not(unix))]
174    fn kill_process_group(
175        &self,
176        pid: u32,
177        _stop_signal: i32,
178        _stop_timeout: Option<std::time::Duration>,
179    ) -> Result<bool> {
180        self.kill(pid, 0, None)
181    }
182
183    /// Async wrapper for `kill`. See its docs for details and precondition.
184    pub async fn kill_async(
185        &self,
186        pid: u32,
187        stop_signal: i32,
188        stop_timeout: Option<std::time::Duration>,
189    ) -> Result<bool> {
190        tokio::task::spawn_blocking(move || PROCS.kill(pid, stop_signal, stop_timeout))
191            .await
192            .into_diagnostic()?
193    }
194
195    /// Kill a process with graceful shutdown strategy:
196    /// 1. Send the configured stop signal and wait up to ~3s (10ms intervals for first 100ms, then 50ms intervals)
197    /// 2. If still running, send SIGKILL to force termination
198    ///
199    /// This ensures fast-exiting processes don't wait unnecessarily,
200    /// while stubborn processes eventually get forcefully terminated.
201    ///
202    /// Returns `Err` if the signal could not be sent (e.g. permission denied
203    /// when targeting a process owned by another user/root).
204    ///
205    /// **Precondition:** the caller must have called `refresh_pids` or `refresh_processes`
206    /// for `pid` before invoking this method. Without a prior refresh, the internal
207    /// process map is empty and this method will incorrectly conclude the process is
208    /// already dead (`Ok(false)`) without sending any signal.
209    fn kill(
210        &self,
211        pid: u32,
212        stop_signal: i32,
213        stop_timeout: Option<std::time::Duration>,
214    ) -> Result<bool> {
215        let sysinfo_pid = sysinfo::Pid::from_u32(pid);
216
217        // Check if process exists or is a zombie (already terminated but not reaped)
218        if self.is_terminated_or_zombie(sysinfo_pid) {
219            return Ok(false);
220        }
221
222        debug!("killing process {pid}");
223
224        #[cfg(windows)]
225        {
226            let _ = (stop_signal, stop_timeout);
227            if let Some(process) = self.lock_system().process(sysinfo_pid) {
228                process.kill();
229                process.wait();
230            }
231            Ok(true)
232        }
233
234        #[cfg(unix)]
235        {
236            let signal_name = signal_name(stop_signal);
237            // Send stop signal for graceful shutdown using libc::kill directly
238            // so we can distinguish EPERM (permission denied) from ESRCH
239            // (process already gone — possible in a narrow race window).
240            debug!("sending {signal_name} to process {pid}");
241            let ret = unsafe { libc::kill(pid as i32, stop_signal) };
242            if ret == -1 {
243                let err = std::io::Error::last_os_error();
244                if err.raw_os_error() == Some(libc::ESRCH) {
245                    debug!("process {pid} no longer exists");
246                    return Ok(false);
247                }
248                if err.raw_os_error() == Some(libc::EPERM) {
249                    return Err(miette::miette!(
250                        "failed to send {signal_name} to process {pid}: permission denied"
251                    ));
252                }
253                return Err(miette::miette!(
254                    "failed to send {signal_name} to process {pid}: {err}"
255                ));
256            }
257
258            // Fast check: 10ms intervals, then slower 50ms polling for stop_timeout.
259            // Per-daemon timeout overrides the global setting.
260            let stop_timeout = stop_timeout.unwrap_or_else(|| settings().supervisor_stop_timeout());
261            let fast_ms = 10u64;
262            let slow_ms = 50u64;
263            let total_ms = stop_timeout.as_millis().max(1) as u64;
264            let fast_count = ((total_ms / fast_ms) as usize).min(10);
265            let fast_total_ms = fast_ms * fast_count as u64;
266            let remaining_ms = total_ms.saturating_sub(fast_total_ms);
267            let slow_count = (remaining_ms / slow_ms) as usize;
268
269            for i in 0..fast_count {
270                std::thread::sleep(std::time::Duration::from_millis(fast_ms));
271                self.refresh_pids(&[pid]);
272                if self.is_terminated_or_zombie(sysinfo_pid) {
273                    debug!(
274                        "process {pid} terminated after {signal_name} ({} ms)",
275                        (i + 1) * fast_ms as usize
276                    );
277                    return Ok(true);
278                }
279            }
280
281            // Slower check: 50ms intervals for the remainder of stop_timeout
282            for i in 0..slow_count {
283                std::thread::sleep(std::time::Duration::from_millis(slow_ms));
284                self.refresh_pids(&[pid]);
285                if self.is_terminated_or_zombie(sysinfo_pid) {
286                    debug!(
287                        "process {pid} terminated after {signal_name} ({} ms)",
288                        fast_total_ms + (i + 1) as u64 * slow_ms
289                    );
290                    return Ok(true);
291                }
292            }
293
294            // SIGKILL as last resort after stop_timeout
295            warn!(
296                "process {pid} did not respond to {signal_name} after {}ms, sending SIGKILL",
297                stop_timeout.as_millis()
298            );
299            let ret = unsafe { libc::kill(pid as i32, libc::SIGKILL) };
300            if ret == -1 {
301                let err = std::io::Error::last_os_error();
302                if err.raw_os_error() != Some(libc::ESRCH) {
303                    warn!("failed to send SIGKILL to process {pid}: {err}");
304                }
305            }
306
307            // Brief wait for SIGKILL to take effect
308            std::thread::sleep(std::time::Duration::from_millis(100));
309            Ok(true)
310        }
311    }
312
313    /// Check if a process is terminated or is a zombie.
314    /// On Linux, zombie processes still have /proc/[pid] entries but are effectively dead.
315    /// This prevents unnecessary signal escalation for processes that have already exited.
316    fn is_terminated_or_zombie(&self, sysinfo_pid: sysinfo::Pid) -> bool {
317        let system = self.lock_system();
318        match system.process(sysinfo_pid) {
319            None => true,
320            Some(process) => {
321                #[cfg(unix)]
322                {
323                    matches!(process.status(), sysinfo::ProcessStatus::Zombie)
324                }
325                #[cfg(not(unix))]
326                {
327                    let _ = process;
328                    false
329                }
330            }
331        }
332    }
333
334    pub(crate) fn refresh_processes(&self) {
335        self.lock_system()
336            .refresh_processes(ProcessesToUpdate::All, true);
337    }
338
339    /// Refresh only specific PIDs instead of all processes.
340    /// More efficient when you only need to check a small set of known PIDs.
341    pub(crate) fn refresh_pids(&self, pids: &[u32]) {
342        let sysinfo_pids: Vec<sysinfo::Pid> =
343            pids.iter().map(|p| sysinfo::Pid::from_u32(*p)).collect();
344        self.lock_system()
345            .refresh_processes(ProcessesToUpdate::Some(&sysinfo_pids), true);
346    }
347
348    /// Collect daemon PIDs from a StateFile and refresh only those.
349    /// Avoids the cost of refreshing all system processes when we only
350    /// need stats for managed daemons.
351    pub(crate) fn refresh_daemon_pids(&self, state: &crate::state_file::StateFile) {
352        let pids: Vec<u32> = state.daemons.values().filter_map(|d| d.pid).collect();
353        if !pids.is_empty() {
354            self.refresh_pids(&pids);
355        }
356    }
357
358    /// Get aggregated stats for multiple process groups in a single pass.
359    ///
360    /// Builds the parent→children map once (O(N)) and then BFS-es from each
361    /// root PID (O(D_i) per daemon). Total cost is O(N + ΣD_i) instead of
362    /// O(D × N) when calling `get_group_stats` in a loop.
363    pub fn get_batch_group_stats(&self, pids: &[u32]) -> Vec<(u32, Option<ProcessStats>)> {
364        let system = self.lock_system();
365        let processes = system.processes();
366
367        let now = std::time::SystemTime::now()
368            .duration_since(std::time::UNIX_EPOCH)
369            .map(|d| d.as_secs())
370            .unwrap_or(0);
371
372        // Build parent → children map once for all daemons
373        let mut children_map: std::collections::HashMap<sysinfo::Pid, Vec<sysinfo::Pid>> =
374            std::collections::HashMap::new();
375        for (child_pid, child) in processes {
376            if let Some(ppid) = child.parent() {
377                children_map.entry(ppid).or_default().push(*child_pid);
378            }
379        }
380
381        pids.iter()
382            .map(|&pid| {
383                let root_pid = sysinfo::Pid::from_u32(pid);
384                let Some(root) = processes.get(&root_pid) else {
385                    return (pid, None);
386                };
387
388                let root_disk = root.disk_usage();
389                let mut stats = ProcessStats {
390                    cpu_percent: root.cpu_usage(),
391                    memory_bytes: root.memory(),
392                    uptime_secs: now.saturating_sub(root.start_time()),
393                    disk_read_bytes: root_disk.read_bytes,
394                    disk_write_bytes: root_disk.written_bytes,
395                };
396
397                // BFS from root_pid to find all descendants
398                let mut queue = std::collections::VecDeque::new();
399                if let Some(direct_children) = children_map.get(&root_pid) {
400                    queue.extend(direct_children);
401                }
402                while let Some(child_pid) = queue.pop_front() {
403                    if let Some(child) = processes.get(&child_pid) {
404                        let disk = child.disk_usage();
405                        stats.cpu_percent += child.cpu_usage();
406                        stats.memory_bytes += child.memory();
407                        stats.disk_read_bytes += disk.read_bytes;
408                        stats.disk_write_bytes += disk.written_bytes;
409                    }
410                    if let Some(grandchildren) = children_map.get(&child_pid) {
411                        queue.extend(grandchildren);
412                    }
413                }
414
415                (pid, Some(stats))
416            })
417            .collect()
418    }
419
420    /// Get process stats (cpu%, memory bytes, uptime secs, disk I/O) for a given PID
421    pub fn get_stats(&self, pid: u32) -> Option<ProcessStats> {
422        let system = self.lock_system();
423        system.process(sysinfo::Pid::from_u32(pid)).map(|p| {
424            let now = std::time::SystemTime::now()
425                .duration_since(std::time::UNIX_EPOCH)
426                .map(|d| d.as_secs())
427                .unwrap_or(0);
428            let disk = p.disk_usage();
429            ProcessStats {
430                cpu_percent: p.cpu_usage(),
431                memory_bytes: p.memory(),
432                uptime_secs: now.saturating_sub(p.start_time()),
433                disk_read_bytes: disk.read_bytes,
434                disk_write_bytes: disk.written_bytes,
435            }
436        })
437    }
438
439    /// Get extended process information for a given PID
440    pub fn get_extended_stats(&self, pid: u32) -> Option<ExtendedProcessStats> {
441        let system = self.lock_system();
442        system.process(sysinfo::Pid::from_u32(pid)).map(|p| {
443            let now = std::time::SystemTime::now()
444                .duration_since(std::time::UNIX_EPOCH)
445                .map(|d| d.as_secs())
446                .unwrap_or(0);
447            let disk = p.disk_usage();
448
449            ExtendedProcessStats {
450                name: p.name().to_string_lossy().to_string(),
451                exe_path: p.exe().map(|e| e.to_string_lossy().to_string()),
452                cwd: p.cwd().map(|c| c.to_string_lossy().to_string()),
453                environ: p
454                    .environ()
455                    .iter()
456                    .take(20)
457                    .map(|s| s.to_string_lossy().to_string())
458                    .collect(),
459                status: format!("{:?}", p.status()),
460                cpu_percent: p.cpu_usage(),
461                memory_bytes: p.memory(),
462                virtual_memory_bytes: p.virtual_memory(),
463                uptime_secs: now.saturating_sub(p.start_time()),
464                start_time: p.start_time(),
465                disk_read_bytes: disk.read_bytes,
466                disk_write_bytes: disk.written_bytes,
467                parent_pid: p.parent().map(|pp| pp.as_u32()),
468                thread_count: p.tasks().map(|t| t.len()).unwrap_or(0),
469                user_id: p.user_id().map(|u| u.to_string()),
470            }
471        })
472    }
473}
474
475#[derive(Debug, Clone, Copy)]
476pub struct ProcessStats {
477    pub cpu_percent: f32,
478    pub memory_bytes: u64,
479    pub uptime_secs: u64,
480    pub disk_read_bytes: u64,
481    pub disk_write_bytes: u64,
482}
483
484impl ProcessStats {
485    pub fn memory_display(&self) -> String {
486        format_bytes(self.memory_bytes)
487    }
488
489    pub fn cpu_display(&self) -> String {
490        format!("{:.1}%", self.cpu_percent)
491    }
492
493    pub fn uptime_display(&self) -> String {
494        format_duration(self.uptime_secs)
495    }
496
497    pub fn disk_read_display(&self) -> String {
498        format_bytes_per_sec(self.disk_read_bytes)
499    }
500
501    pub fn disk_write_display(&self) -> String {
502        format_bytes_per_sec(self.disk_write_bytes)
503    }
504}
505
506/// Extended process stats with more detailed information
507#[derive(Debug, Clone)]
508pub struct ExtendedProcessStats {
509    pub name: String,
510    pub exe_path: Option<String>,
511    pub cwd: Option<String>,
512    pub environ: Vec<String>,
513    pub status: String,
514    pub cpu_percent: f32,
515    pub memory_bytes: u64,
516    pub virtual_memory_bytes: u64,
517    pub uptime_secs: u64,
518    pub start_time: u64,
519    pub disk_read_bytes: u64,
520    pub disk_write_bytes: u64,
521    pub parent_pid: Option<u32>,
522    pub thread_count: usize,
523    pub user_id: Option<String>,
524}
525
526impl ExtendedProcessStats {
527    pub fn memory_display(&self) -> String {
528        format_bytes(self.memory_bytes)
529    }
530
531    pub fn virtual_memory_display(&self) -> String {
532        format_bytes(self.virtual_memory_bytes)
533    }
534
535    pub fn cpu_display(&self) -> String {
536        format!("{:.1}%", self.cpu_percent)
537    }
538
539    pub fn uptime_display(&self) -> String {
540        format_duration(self.uptime_secs)
541    }
542
543    pub fn start_time_display(&self) -> String {
544        use std::time::{Duration, UNIX_EPOCH};
545        let datetime = UNIX_EPOCH + Duration::from_secs(self.start_time);
546        chrono::DateTime::<chrono::Local>::from(datetime)
547            .format("%Y-%m-%d %H:%M:%S")
548            .to_string()
549    }
550
551    pub fn disk_read_display(&self) -> String {
552        format_bytes_per_sec(self.disk_read_bytes)
553    }
554
555    pub fn disk_write_display(&self) -> String {
556        format_bytes_per_sec(self.disk_write_bytes)
557    }
558}
559
560fn format_bytes(bytes: u64) -> String {
561    if bytes < 1024 {
562        format!("{bytes}B")
563    } else if bytes < 1024 * 1024 {
564        format!("{:.1}KB", bytes as f64 / 1024.0)
565    } else if bytes < 1024 * 1024 * 1024 {
566        format!("{:.1}MB", bytes as f64 / (1024.0 * 1024.0))
567    } else {
568        format!("{:.1}GB", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
569    }
570}
571
572fn format_duration(secs: u64) -> String {
573    if secs < 60 {
574        format!("{secs}s")
575    } else if secs < 3600 {
576        format!("{}m {}s", secs / 60, secs % 60)
577    } else if secs < 86400 {
578        let hours = secs / 3600;
579        let mins = (secs % 3600) / 60;
580        format!("{hours}h {mins}m")
581    } else {
582        let days = secs / 86400;
583        let hours = (secs % 86400) / 3600;
584        format!("{days}d {hours}h")
585    }
586}
587
588fn format_bytes_per_sec(bytes: u64) -> String {
589    if bytes < 1024 {
590        format!("{bytes}B/s")
591    } else if bytes < 1024 * 1024 {
592        format!("{:.1}KB/s", bytes as f64 / 1024.0)
593    } else if bytes < 1024 * 1024 * 1024 {
594        format!("{:.1}MB/s", bytes as f64 / (1024.0 * 1024.0))
595    } else {
596        format!("{:.1}GB/s", bytes as f64 / (1024.0 * 1024.0 * 1024.0))
597    }
598}
599
600#[cfg(unix)]
601fn signal_name(sig: i32) -> &'static str {
602    match sig {
603        libc::SIGHUP => "SIGHUP",
604        libc::SIGINT => "SIGINT",
605        libc::SIGQUIT => "SIGQUIT",
606        libc::SIGTERM => "SIGTERM",
607        libc::SIGUSR1 => "SIGUSR1",
608        libc::SIGUSR2 => "SIGUSR2",
609        libc::SIGKILL => "SIGKILL",
610        _ => "UNKNOWN",
611    }
612}