1use std::cell::RefCell;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant};
36
37pub const SUBPROCESS_TERM_GRACE: Duration = Duration::from_secs(2);
42
43#[derive(Clone, Default)]
44struct OpInterrupt {
45 cancel: Option<Arc<AtomicBool>>,
46 deadline: Option<Instant>,
47}
48
49thread_local! {
50 static CURRENT: RefCell<Option<OpInterrupt>> = const { RefCell::new(None) };
51}
52
53pub struct OpInterruptGuard {
57 #[allow(clippy::option_option)]
60 prev: Option<Option<OpInterrupt>>,
61}
62
63impl Drop for OpInterruptGuard {
64 fn drop(&mut self) {
65 if let Some(prev) = self.prev.take() {
66 CURRENT.with(|slot| *slot.borrow_mut() = prev);
67 }
68 }
69}
70
71pub fn install(cancel: Option<Arc<AtomicBool>>, deadline: Option<Instant>) -> OpInterruptGuard {
76 let prev = CURRENT.with(|slot| slot.borrow_mut().replace(OpInterrupt { cancel, deadline }));
77 OpInterruptGuard { prev: Some(prev) }
78}
79
80pub fn requested() -> bool {
84 CURRENT.with(|slot| {
85 let ctx = slot.borrow();
86 let Some(ctx) = ctx.as_ref() else {
87 return false;
88 };
89 if ctx
90 .cancel
91 .as_ref()
92 .is_some_and(|token| token.load(Ordering::SeqCst))
93 {
94 return true;
95 }
96 ctx.deadline
97 .is_some_and(|deadline| Instant::now() >= deadline)
98 })
99}
100
101pub fn configure_kill_group(command: &mut std::process::Command) {
106 #[cfg(unix)]
107 {
108 use std::os::unix::process::CommandExt;
109 command.process_group(0);
110 }
111 #[cfg(not(unix))]
112 {
113 let _ = command;
114 }
115}
116
117pub fn signal_pid_and_group(pid: u32, signal: i32) {
119 #[cfg(unix)]
120 {
121 extern "C" {
124 fn kill(pid: i32, sig: i32) -> i32;
125 }
126 unsafe {
127 kill(-(pid as i32), signal);
128 kill(pid as i32, signal);
129 }
130 }
131 #[cfg(not(unix))]
132 {
133 let _ = (pid, signal);
134 }
135}
136
137pub enum ChildWait {
139 Exited(std::process::ExitStatus),
141 TimedOut,
143 Interrupted(Option<std::process::ExitStatus>),
147}
148
149pub fn wait_child_interruptible(
157 child: &mut std::process::Child,
158 timeout: Option<Duration>,
159) -> std::io::Result<ChildWait> {
160 let deadline = timeout.map(|limit| Instant::now() + limit);
161 loop {
162 if let Some(status) = child.try_wait()? {
163 return Ok(ChildWait::Exited(status));
164 }
165 if requested() {
166 let status = terminate_child_group(child);
167 return Ok(ChildWait::Interrupted(status));
168 }
169 if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
170 if let Some(pid) = child_pid(child) {
172 signal_pid_and_group(pid, 9);
173 }
174 let _ = child.kill();
175 let _ = child.wait();
176 return Ok(ChildWait::TimedOut);
177 }
178 std::thread::sleep(Duration::from_millis(20));
179 }
180}
181
182pub fn terminate_child_group(child: &mut std::process::Child) -> Option<std::process::ExitStatus> {
188 #[cfg(unix)]
189 {
190 if let Some(pid) = child_pid(child) {
191 const SIGTERM: i32 = 15;
192 signal_pid_and_group(pid, SIGTERM);
193 let grace_deadline = Instant::now() + SUBPROCESS_TERM_GRACE;
194 loop {
195 match child.try_wait() {
196 Ok(Some(status)) => {
197 signal_pid_and_group(pid, 9);
200 return Some(status);
201 }
202 Ok(None) => {
203 if Instant::now() >= grace_deadline {
204 break;
205 }
206 std::thread::sleep(Duration::from_millis(20));
207 }
208 Err(_) => break,
209 }
210 }
211 signal_pid_and_group(pid, 9);
212 }
213 }
214 let _ = child.kill();
215 child.wait().ok()
216}
217
218fn child_pid(child: &std::process::Child) -> Option<u32> {
219 let pid = child.id();
220 (pid > 0).then_some(pid)
221}
222
223pub(crate) fn drain_captured_pipe(
233 rx: &std::sync::mpsc::Receiver<Vec<u8>>,
234 killed: bool,
235 child_pid: u32,
236) -> Vec<u8> {
237 use std::sync::mpsc::RecvTimeoutError;
238 if killed {
239 return rx
240 .recv_timeout(Duration::from_millis(100))
241 .unwrap_or_default();
242 }
243 loop {
244 match rx.recv_timeout(Duration::from_millis(20)) {
245 Ok(buf) => return buf,
246 Err(RecvTimeoutError::Disconnected) => return Vec::new(),
247 Err(RecvTimeoutError::Timeout) => {
248 if requested() {
249 const SIGTERM: i32 = 15;
250 signal_pid_and_group(child_pid, SIGTERM);
251 if let Ok(buf) = rx.recv_timeout(SUBPROCESS_TERM_GRACE) {
252 signal_pid_and_group(child_pid, 9);
253 return buf;
254 }
255 signal_pid_and_group(child_pid, 9);
256 return rx
257 .recv_timeout(Duration::from_millis(100))
258 .unwrap_or_default();
259 }
260 }
261 }
262 }
263}
264
265pub(crate) fn spawn_pipe_drain<R: std::io::Read + Send + 'static>(
267 mut reader: R,
268) -> std::sync::mpsc::Receiver<Vec<u8>> {
269 let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
270 std::thread::spawn(move || {
271 let mut buf = Vec::new();
272 let _ = reader.read_to_end(&mut buf);
273 let _ = tx.send(buf);
274 });
275 rx
276}
277
278pub fn capture_output_interruptible(
285 command: &mut std::process::Command,
286) -> std::io::Result<std::process::Output> {
287 use std::process::Stdio;
288 command
289 .stdout(Stdio::piped())
290 .stderr(Stdio::piped())
291 .stdin(Stdio::null());
292 configure_kill_group(command);
293 let mut child = command.spawn()?;
294 let pid = child.id();
295 let rx_out = child.stdout.take().map(spawn_pipe_drain);
296 let rx_err = child.stderr.take().map(spawn_pipe_drain);
297
298 let (status, killed) = match wait_child_interruptible(&mut child, None)? {
299 ChildWait::Exited(status) => (status, false),
300 ChildWait::TimedOut => (std::process::ExitStatus::default(), true),
302 ChildWait::Interrupted(status) => (status.unwrap_or_default(), true),
303 };
304 let stdout = rx_out
305 .map(|rx| drain_captured_pipe(&rx, killed, pid))
306 .unwrap_or_default();
307 let stderr = rx_err
308 .map(|rx| drain_captured_pipe(&rx, killed, pid))
309 .unwrap_or_default();
310 Ok(std::process::Output {
311 status,
312 stdout,
313 stderr,
314 })
315}
316
317#[cfg(test)]
318mod tests {
319 use super::*;
320
321 #[test]
322 fn requested_is_false_without_context() {
323 assert!(!requested());
324 }
325
326 #[test]
327 fn cancel_token_trips_requested_and_guard_restores() {
328 let token = Arc::new(AtomicBool::new(false));
329 let guard = install(Some(token.clone()), None);
330 assert!(!requested());
331 token.store(true, Ordering::SeqCst);
332 assert!(requested());
333 drop(guard);
334 assert!(!requested());
335 }
336
337 #[test]
338 fn deadline_trips_requested() {
339 let expired = Instant::now()
340 .checked_sub(Duration::from_millis(1))
341 .expect("monotonic clock supports a 1ms test lookback");
342 let _guard = install(None, Some(expired));
343 assert!(requested());
344 }
345
346 #[test]
347 fn nested_installs_restore_in_order() {
348 let outer_token = Arc::new(AtomicBool::new(true));
349 let _outer = install(Some(outer_token), None);
350 assert!(requested());
351 {
352 let _inner = install(None, None);
353 assert!(!requested());
354 }
355 assert!(requested());
356 }
357
358 #[cfg(unix)]
359 #[test]
360 fn interrupted_wait_kills_process_group() {
361 let mut command = std::process::Command::new("sh");
363 command.args(["-c", "sleep 30 & wait"]);
364 configure_kill_group(&mut command);
365 let mut child = command.spawn().expect("spawn sh");
366 let pgid = child.id();
367
368 let cancel = Arc::new(AtomicBool::new(true));
369 let _guard = install(Some(cancel), None);
370 let started = Instant::now();
371 let outcome = wait_child_interruptible(&mut child, None).expect("wait");
372 assert!(matches!(outcome, ChildWait::Interrupted(_)));
373 assert!(started.elapsed() < Duration::from_secs(10));
374
375 extern "C" {
377 fn kill(pid: i32, sig: i32) -> i32;
378 }
379 let group_gone = || unsafe { kill(-(pgid as i32), 0) } != 0;
380 let deadline = Instant::now() + Duration::from_secs(5);
381 while !group_gone() && Instant::now() < deadline {
382 std::thread::sleep(Duration::from_millis(50));
383 }
384 assert!(group_gone(), "process group {pgid} survived interrupt");
385 }
386}