Skip to main content

fallow_cli/signal/
registry.rs

1//! Process-wide registry of live spawned-child PIDs.
2//!
3//! Keyed by a monotonic `AtomicU64` counter rather than `Child::id()`
4//! because POSIX recycles PIDs aggressively on long-running runners; a
5//! recycled PID would collide with a previously-deregistered entry.
6//!
7//! Stores PIDs (not `Child` handles): the `ScopedChild` wrapper owns
8//! the `Child` outright so it can call `wait_with_output` / `wait`
9//! normally, and the signal handler kills by PID via a `kill -9
10//! <pid>` shell subprocess on Unix (avoids adding `libc` as a
11//! workspace dep) or `OpenProcess + TerminateProcess` on Windows. No
12//! ownership transfer, no race between wait and kill.
13
14use std::sync::atomic::{AtomicU64, Ordering};
15use std::sync::{Mutex, OnceLock};
16use std::time::{Duration, Instant};
17
18use rustc_hash::FxHashMap;
19
20static NEXT_ID: AtomicU64 = AtomicU64::new(1);
21static REGISTRY: OnceLock<Mutex<FxHashMap<u64, u32>>> = OnceLock::new();
22
23/// One-shot guard: repeated signals during drain (signal storm) no-op
24/// the second-and-onwards entries.
25static DRAINING: AtomicU64 = AtomicU64::new(0);
26
27fn registry() -> &'static Mutex<FxHashMap<u64, u32>> {
28    REGISTRY.get_or_init(|| Mutex::new(FxHashMap::default()))
29}
30
31/// Register `pid`. Returns a monotonic key the caller stores in their
32/// `ScopedChild` for deregister at wait/drop time.
33pub(super) fn register(pid: u32) -> u64 {
34    let id = NEXT_ID.fetch_add(1, Ordering::SeqCst);
35    registry()
36        .lock()
37        .unwrap_or_else(|e| e.into_inner())
38        .insert(id, pid);
39    id
40}
41
42/// Remove the registry entry for `id`. Idempotent.
43pub(super) fn deregister(id: u64) {
44    registry()
45        .lock()
46        .unwrap_or_else(|e| e.into_inner())
47        .remove(&id);
48}
49
50/// Snapshot every registered PID and kill each. Polls for liveness
51/// with a bounded budget. Caller is the platform signal handler thread.
52///
53/// First-call-wins via the `DRAINING` guard: subsequent invocations
54/// during the same shutdown skip the body to avoid re-entering the
55/// lock under signal storm.
56pub(super) fn drain_and_kill() {
57    if DRAINING
58        .compare_exchange(0, 1, Ordering::SeqCst, Ordering::SeqCst)
59        .is_err()
60    {
61        return;
62    }
63
64    let pids: Vec<u32> = {
65        registry()
66            .lock()
67            .unwrap_or_else(|e| e.into_inner())
68            .drain()
69            .map(|(_id, pid)| pid)
70            .collect()
71    };
72
73    for pid in &pids {
74        kill_pid(*pid);
75    }
76
77    let deadline = Instant::now() + drain_budget();
78    while Instant::now() < deadline {
79        if !pids.iter().copied().any(pid_is_alive) {
80            return;
81        }
82        std::thread::sleep(Duration::from_millis(50));
83    }
84}
85
86#[cfg(unix)]
87fn kill_pid(pid: u32) {
88    // SIGKILL has the value 9 on every POSIX system fallow targets.
89    // No libc dep in the workspace, so fork `/bin/kill -9 <pid>`
90    // instead. Costs one extra process per signal delivery, which
91    // happens at most once per fallow invocation, so the overhead is
92    // negligible. PIDs from Child::id() are always positive; pid 0 / -1
93    // (broadcast semantics) cannot occur on this path.
94    let _ = std::process::Command::new("kill")
95        .args(["-9", &pid.to_string()])
96        .stdout(std::process::Stdio::null())
97        .stderr(std::process::Stdio::null())
98        .status();
99}
100
101#[cfg(windows)]
102#[expect(
103    unsafe_code,
104    reason = "FFI to Win32 OpenProcess/TerminateProcess/CloseHandle; preconditions documented inline"
105)]
106fn kill_pid(pid: u32) {
107    use windows_sys::Win32::Foundation::{CloseHandle, FALSE, HANDLE};
108    use windows_sys::Win32::System::Threading::{OpenProcess, PROCESS_TERMINATE, TerminateProcess};
109    // SAFETY: OpenProcess returns null on failure (which we check),
110    // TerminateProcess with exit code 1 is a no-op if the handle is
111    // null. CloseHandle on a valid handle is well-defined.
112    unsafe {
113        let handle: HANDLE = OpenProcess(PROCESS_TERMINATE, FALSE, pid);
114        if handle.is_null() {
115            return;
116        }
117        let _ = TerminateProcess(handle, 1);
118        let _ = CloseHandle(handle);
119    }
120}
121
122#[cfg(not(any(unix, windows)))]
123fn kill_pid(_pid: u32) {
124    // Unknown platform; no kill primitive available.
125}
126
127#[cfg(unix)]
128fn pid_is_alive(pid: u32) -> bool {
129    std::process::Command::new("kill")
130        .args(["-0", &pid.to_string()])
131        .stdout(std::process::Stdio::null())
132        .stderr(std::process::Stdio::null())
133        .status()
134        .is_ok_and(|s| s.success())
135}
136
137#[cfg(windows)]
138#[expect(
139    unsafe_code,
140    reason = "FFI to Win32 OpenProcess/WaitForSingleObject/CloseHandle; preconditions documented inline"
141)]
142fn pid_is_alive(pid: u32) -> bool {
143    use windows_sys::Win32::Foundation::{CloseHandle, FALSE, HANDLE, WAIT_OBJECT_0};
144    use windows_sys::Win32::System::Threading::{
145        OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION, WaitForSingleObject,
146    };
147    // SAFETY: identical safety contract as kill_pid.
148    unsafe {
149        let handle: HANDLE = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, FALSE, pid);
150        if handle.is_null() {
151            return false;
152        }
153        let result = WaitForSingleObject(handle, 0);
154        let _ = CloseHandle(handle);
155        result != WAIT_OBJECT_0
156    }
157}
158
159#[cfg(not(any(unix, windows)))]
160fn pid_is_alive(_pid: u32) -> bool {
161    false
162}
163
164#[cfg(unix)]
165const fn drain_budget() -> Duration {
166    Duration::from_millis(500)
167}
168
169#[cfg(windows)]
170const fn drain_budget() -> Duration {
171    Duration::from_millis(1500)
172}
173
174#[cfg(not(any(unix, windows)))]
175const fn drain_budget() -> Duration {
176    Duration::from_millis(500)
177}
178
179#[cfg(test)]
180mod tests {
181    use super::*;
182
183    #[test]
184    fn register_deregister_roundtrip() {
185        let id = register(42);
186        assert!(id > 0);
187        deregister(id);
188        // Idempotent: second deregister is a no-op.
189        deregister(id);
190    }
191
192    #[test]
193    fn ids_are_monotonic() {
194        let a = register(100);
195        let b = register(200);
196        assert!(b > a);
197        deregister(a);
198        deregister(b);
199    }
200}