Skip to main content

harn_vm/
op_interrupt.rs

1//! Cooperative interrupt observation for blocking sync builtins.
2//!
3//! Sync builtins (including every subprocess-spawning path: the hostlib
4//! `run_command` tool family and the VM-side `process.exec`/`exec_opts`
5//! builtins) execute inline on the VM's async task. While one of them
6//! blocks — typically waiting on a child process — the interpreter's
7//! `tokio::select!` cancel/deadline race in
8//! `vm/execution.rs::execute_op_with_scope_interrupts` cannot run: the op
9//! future never yields, so scope cancellation, `deadline` expiry, and host
10//! aborts used to wait for the child to exit on its own (orphaning it on
11//! task abort / VM drop).
12//!
13//! This module closes that gap cooperatively. Before invoking a sync
14//! builtin, the VM installs the *currently armed* interrupt sources — its
15//! host cancel token (`Arc<AtomicBool>`) and the innermost deadline — into
16//! a thread-local via [`install`]. Blocking wait loops poll [`requested`]
17//! (they already poll `try_wait` every ~20ms) and, when it fires,
18//! gracefully terminate their child process group (SIGTERM, then SIGKILL
19//! after [`SUBPROCESS_TERM_GRACE`]) and return. The VM then surfaces the
20//! ordinary cancellation / deadline error at the next op boundary.
21//!
22//! Trigger coverage:
23//! - **Scope / `parallel` cancellation and VM drop**: spawned-task child
24//!   VMs share the `Arc<AtomicBool>` stored in their `VmTaskHandle`;
25//!   `Vm::cancel_spawned_tasks` (also called from `Drop for Vm`) sets it,
26//!   which the blocked wait loop observes.
27//! - **Host abort**: hosts cancel a VM by setting its cancel token — same
28//!   observation path.
29//! - **`deadline` expiry**: the deadline `Instant` is captured when the
30//!   builtin starts; the wait loop compares against `Instant::now()`.
31
32use std::cell::RefCell;
33use std::sync::atomic::{AtomicBool, Ordering};
34use std::sync::Arc;
35use std::time::{Duration, Instant};
36
37/// How long a subprocess gets to exit after SIGTERM before the whole
38/// process group is SIGKILLed. Deliberately longer than the interpreter's
39/// 250ms async-op cancel grace (`CANCEL_GRACE_ASYNC_OP`): child processes
40/// often need to flush buffers / remove lock files on SIGTERM.
41pub const SUBPROCESS_TERM_GRACE: Duration = Duration::from_secs(2);
42
43#[derive(Clone, Default)]
44struct OpInterrupt {
45    cancel: Option<Arc<AtomicBool>>,
46    deadline: Option<Instant>,
47}
48
49thread_local! {
50    static CURRENT: RefCell<Option<OpInterrupt>> = const { RefCell::new(None) };
51}
52
53/// Guard returned by [`install`]. Restores the previously installed
54/// interrupt context on drop so nested builtin dispatch (child VMs running
55/// on the same thread) composes correctly.
56pub struct OpInterruptGuard {
57    // Outer Option = "guard owes a restore"; inner Option is the previous
58    // thread-local slot value (which can itself be None).
59    #[allow(clippy::option_option)]
60    prev: Option<Option<OpInterrupt>>,
61}
62
63impl Drop for OpInterruptGuard {
64    fn drop(&mut self) {
65        if let Some(prev) = self.prev.take() {
66            CURRENT.with(|slot| *slot.borrow_mut() = prev);
67        }
68    }
69}
70
71/// Install the interrupt sources a blocking builtin on this thread should
72/// observe: an optional cooperative cancel token and an optional deadline.
73/// The VM calls this around sync builtin dispatch; tests use it to simulate
74/// scope cancellation without booting a full interpreter.
75pub fn install(cancel: Option<Arc<AtomicBool>>, deadline: Option<Instant>) -> OpInterruptGuard {
76    let prev = CURRENT.with(|slot| slot.borrow_mut().replace(OpInterrupt { cancel, deadline }));
77    OpInterruptGuard { prev: Some(prev) }
78}
79
80/// Returns `true` when the interrupt context installed on this thread has
81/// fired: the cancel token is set, or the deadline has passed. Cheap enough
82/// to call from a ~20ms poll loop. Returns `false` when nothing is armed.
83pub fn requested() -> bool {
84    CURRENT.with(|slot| {
85        let ctx = slot.borrow();
86        let Some(ctx) = ctx.as_ref() else {
87            return false;
88        };
89        if ctx
90            .cancel
91            .as_ref()
92            .is_some_and(|token| token.load(Ordering::SeqCst))
93        {
94            return true;
95        }
96        ctx.deadline
97            .is_some_and(|deadline| Instant::now() >= deadline)
98    })
99}
100
101/// Put the child in its own process group (`setpgid(0, 0)`) so a later
102/// group signal reaps grandchildren too. No-op on non-Unix targets — group
103/// semantics are Unix-first; Windows callers fall back to killing the
104/// direct child handle (`TerminateProcess` via `Child::kill`).
105pub fn configure_kill_group(command: &mut std::process::Command) {
106    #[cfg(unix)]
107    {
108        use std::os::unix::process::CommandExt;
109        command.process_group(0);
110    }
111    #[cfg(not(unix))]
112    {
113        let _ = command;
114    }
115}
116
117/// Signal a pid and its process group. No-op on non-Unix targets.
118pub fn signal_pid_and_group(pid: u32, signal: i32) {
119    #[cfg(unix)]
120    {
121        // SAFETY: kill(2) takes a pid_t (i32 on all Unix targets) and a
122        // signal number; calling it with any valid signal is well-defined.
123        extern "C" {
124            fn kill(pid: i32, sig: i32) -> i32;
125        }
126        unsafe {
127            kill(-(pid as i32), signal);
128            kill(pid as i32, signal);
129        }
130    }
131    #[cfg(not(unix))]
132    {
133        let _ = (pid, signal);
134    }
135}
136
137/// How an interruptible child wait ended.
138pub enum ChildWait {
139    /// The child exited on its own.
140    Exited(std::process::ExitStatus),
141    /// The caller-supplied timeout elapsed; the child (group) was killed.
142    TimedOut,
143    /// [`requested`] fired; the child group was SIGTERMed and, after
144    /// [`SUBPROCESS_TERM_GRACE`], SIGKILLed. Carries the reaped status when
145    /// the OS reported one.
146    Interrupted(Option<std::process::ExitStatus>),
147}
148
149/// Wait for `child` while polling [`requested`] and the optional timeout.
150///
151/// Used by the VM-side `process.*` builtins (`exec`, `shell`, `exec_opts`,
152/// `spawn_captured`). The hostlib `run_command` family implements the same
153/// protocol inside its `ProcessSpawner` abstraction. Callers should have
154/// spawned the child with [`configure_kill_group`] so the group signals
155/// reach grandchildren.
156pub fn wait_child_interruptible(
157    child: &mut std::process::Child,
158    timeout: Option<Duration>,
159) -> std::io::Result<ChildWait> {
160    let deadline = timeout.map(|limit| Instant::now() + limit);
161    loop {
162        if let Some(status) = child.try_wait()? {
163            return Ok(ChildWait::Exited(status));
164        }
165        if requested() {
166            let status = terminate_child_group(child);
167            return Ok(ChildWait::Interrupted(status));
168        }
169        if deadline.is_some_and(|deadline| Instant::now() >= deadline) {
170            // Timeout keeps its historical semantics: immediate SIGKILL.
171            if let Some(pid) = child_pid(child) {
172                signal_pid_and_group(pid, 9);
173            }
174            let _ = child.kill();
175            let _ = child.wait();
176            return Ok(ChildWait::TimedOut);
177        }
178        std::thread::sleep(Duration::from_millis(20));
179    }
180}
181
182/// Gracefully terminate `child` and its process group: SIGTERM, wait up to
183/// [`SUBPROCESS_TERM_GRACE`], then SIGKILL. Reaps the child and returns its
184/// exit status when available. On non-Unix targets this is a best-effort
185/// direct `Child::kill` (`TerminateProcess`), which does not reach
186/// grandchildren.
187pub fn terminate_child_group(child: &mut std::process::Child) -> Option<std::process::ExitStatus> {
188    #[cfg(unix)]
189    {
190        if let Some(pid) = child_pid(child) {
191            const SIGTERM: i32 = 15;
192            signal_pid_and_group(pid, SIGTERM);
193            let grace_deadline = Instant::now() + SUBPROCESS_TERM_GRACE;
194            loop {
195                match child.try_wait() {
196                    Ok(Some(status)) => {
197                        // The direct child is gone, but SIGTERM-immune
198                        // descendants may linger — sweep the group.
199                        signal_pid_and_group(pid, 9);
200                        return Some(status);
201                    }
202                    Ok(None) => {
203                        if Instant::now() >= grace_deadline {
204                            break;
205                        }
206                        std::thread::sleep(Duration::from_millis(20));
207                    }
208                    Err(_) => break,
209                }
210            }
211            signal_pid_and_group(pid, 9);
212        }
213    }
214    let _ = child.kill();
215    child.wait().ok()
216}
217
218fn child_pid(child: &std::process::Child) -> Option<u32> {
219    let pid = child.id();
220    (pid > 0).then_some(pid)
221}
222
223/// Collect one captured pipe from a drain thread that sends the full buffer
224/// on EOF.
225///
226/// `killed == true` (the child group was already signalled) keeps a 100ms
227/// best-effort window for partial output. Otherwise wait for EOF like
228/// `Command::output` would — but keep observing [`requested`], because a
229/// lingering grandchild that inherited the pipe can hold it open long after
230/// the direct child exited; on interrupt the group gets the same SIGTERM →
231/// grace → SIGKILL treatment.
232pub(crate) fn drain_captured_pipe(
233    rx: &std::sync::mpsc::Receiver<Vec<u8>>,
234    killed: bool,
235    child_pid: u32,
236) -> Vec<u8> {
237    use std::sync::mpsc::RecvTimeoutError;
238    if killed {
239        return rx
240            .recv_timeout(Duration::from_millis(100))
241            .unwrap_or_default();
242    }
243    loop {
244        match rx.recv_timeout(Duration::from_millis(20)) {
245            Ok(buf) => return buf,
246            Err(RecvTimeoutError::Disconnected) => return Vec::new(),
247            Err(RecvTimeoutError::Timeout) => {
248                if requested() {
249                    const SIGTERM: i32 = 15;
250                    signal_pid_and_group(child_pid, SIGTERM);
251                    if let Ok(buf) = rx.recv_timeout(SUBPROCESS_TERM_GRACE) {
252                        signal_pid_and_group(child_pid, 9);
253                        return buf;
254                    }
255                    signal_pid_and_group(child_pid, 9);
256                    return rx
257                        .recv_timeout(Duration::from_millis(100))
258                        .unwrap_or_default();
259                }
260            }
261        }
262    }
263}
264
265/// Spawn a drain thread that reads `reader` to EOF and sends the buffer.
266pub(crate) fn spawn_pipe_drain<R: std::io::Read + Send + 'static>(
267    mut reader: R,
268) -> std::sync::mpsc::Receiver<Vec<u8>> {
269    let (tx, rx) = std::sync::mpsc::channel::<Vec<u8>>();
270    std::thread::spawn(move || {
271        let mut buf = Vec::new();
272        let _ = reader.read_to_end(&mut buf);
273        let _ = tx.send(buf);
274    });
275    rx
276}
277
278/// Interrupt-aware replacement for `Command::output()`: the child runs in
279/// its own kill group, stdout/stderr are captured in full, stdin is closed,
280/// and the wait polls [`requested`]. When an interrupt fires the whole
281/// group is gracefully terminated and the (signal-terminated) status is
282/// returned — the interpreter surfaces the pending cancellation / deadline
283/// error at the next op boundary.
284pub fn capture_output_interruptible(
285    command: &mut std::process::Command,
286) -> std::io::Result<std::process::Output> {
287    use std::process::Stdio;
288    command
289        .stdout(Stdio::piped())
290        .stderr(Stdio::piped())
291        .stdin(Stdio::null());
292    configure_kill_group(command);
293    let mut child = command.spawn()?;
294    let pid = child.id();
295    let rx_out = child.stdout.take().map(spawn_pipe_drain);
296    let rx_err = child.stderr.take().map(spawn_pipe_drain);
297
298    let (status, killed) = match wait_child_interruptible(&mut child, None)? {
299        ChildWait::Exited(status) => (status, false),
300        // No timeout is armed here, but keep the arm total.
301        ChildWait::TimedOut => (std::process::ExitStatus::default(), true),
302        ChildWait::Interrupted(status) => (status.unwrap_or_default(), true),
303    };
304    let stdout = rx_out
305        .map(|rx| drain_captured_pipe(&rx, killed, pid))
306        .unwrap_or_default();
307    let stderr = rx_err
308        .map(|rx| drain_captured_pipe(&rx, killed, pid))
309        .unwrap_or_default();
310    Ok(std::process::Output {
311        status,
312        stdout,
313        stderr,
314    })
315}
316
317#[cfg(test)]
318mod tests {
319    use super::*;
320
321    #[test]
322    fn requested_is_false_without_context() {
323        assert!(!requested());
324    }
325
326    #[test]
327    fn cancel_token_trips_requested_and_guard_restores() {
328        let token = Arc::new(AtomicBool::new(false));
329        let guard = install(Some(token.clone()), None);
330        assert!(!requested());
331        token.store(true, Ordering::SeqCst);
332        assert!(requested());
333        drop(guard);
334        assert!(!requested());
335    }
336
337    #[test]
338    fn deadline_trips_requested() {
339        let expired = Instant::now()
340            .checked_sub(Duration::from_millis(1))
341            .expect("monotonic clock supports a 1ms test lookback");
342        let _guard = install(None, Some(expired));
343        assert!(requested());
344    }
345
346    #[test]
347    fn nested_installs_restore_in_order() {
348        let outer_token = Arc::new(AtomicBool::new(true));
349        let _outer = install(Some(outer_token), None);
350        assert!(requested());
351        {
352            let _inner = install(None, None);
353            assert!(!requested());
354        }
355        assert!(requested());
356    }
357
358    #[cfg(unix)]
359    #[test]
360    fn interrupted_wait_kills_process_group() {
361        // Child spawns a grandchild; the whole group must die on interrupt.
362        let mut command = std::process::Command::new("sh");
363        command.args(["-c", "sleep 30 & wait"]);
364        configure_kill_group(&mut command);
365        let mut child = command.spawn().expect("spawn sh");
366        let pgid = child.id();
367
368        let cancel = Arc::new(AtomicBool::new(true));
369        let _guard = install(Some(cancel), None);
370        let started = Instant::now();
371        let outcome = wait_child_interruptible(&mut child, None).expect("wait");
372        assert!(matches!(outcome, ChildWait::Interrupted(_)));
373        assert!(started.elapsed() < Duration::from_secs(10));
374
375        // kill(-pgid, 0) fails with ESRCH once every member is gone.
376        extern "C" {
377            fn kill(pid: i32, sig: i32) -> i32;
378        }
379        let group_gone = || unsafe { kill(-(pgid as i32), 0) } != 0;
380        let deadline = Instant::now() + Duration::from_secs(5);
381        while !group_gone() && Instant::now() < deadline {
382            std::thread::sleep(Duration::from_millis(50));
383        }
384        assert!(group_gone(), "process group {pgid} survived interrupt");
385    }
386}