Skip to main content

netsky_core/
process.rs

1//! Bounded subprocess execution.
2//!
3//! The watchdog tick holds the D1 lock while driving osascript (escalate)
4//! and `claude -p` (agentinit). If either hangs, the lock ages past its
5//! stale threshold and a concurrent tick force-releases it, entering the
6//! tick path twice. Every subprocess held under a lock must be bounded.
7//!
8//! `run_bounded` spawns the child, drains its pipes on dedicated threads,
9//! polls `try_wait` on the main thread, and kills + reaps on timeout.
10
11use std::io::{self, Read};
12use std::process::{Command, ExitStatus, Stdio};
13use std::thread;
14use std::time::{Duration, Instant};
15
16/// Interval between `try_wait` polls. Short enough to keep timeout
17/// latency tight, long enough to stay well below 1% CPU.
18const POLL_INTERVAL: Duration = Duration::from_millis(50);
19
20/// Result of a time-bounded subprocess run.
21#[derive(Debug)]
22pub struct BoundedOutput {
23    /// Exit status, or `None` if the process was killed on timeout.
24    pub status: Option<ExitStatus>,
25    pub stdout: Vec<u8>,
26    pub stderr: Vec<u8>,
27    /// True if we killed the child because it exceeded `timeout`.
28    pub timed_out: bool,
29}
30
31impl BoundedOutput {
32    pub fn success(&self) -> bool {
33        self.status.is_some_and(|s| s.success())
34    }
35}
36
37/// Run `cmd` to completion or `timeout`, whichever comes first.
38///
39/// On timeout, the child is SIGKILLed via `Child::kill` and reaped;
40/// `timed_out` is set and `status` is `None`. Both pipes are always
41/// drained so the child never blocks on a full pipe buffer.
42pub fn run_bounded(mut cmd: Command, timeout: Duration) -> io::Result<BoundedOutput> {
43    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
44    let mut child = cmd.spawn()?;
45
46    let stdout_pipe = child.stdout.take().expect("piped stdout");
47    let stderr_pipe = child.stderr.take().expect("piped stderr");
48
49    let stdout_h = thread::spawn(move || drain(stdout_pipe));
50    let stderr_h = thread::spawn(move || drain(stderr_pipe));
51
52    let start = Instant::now();
53    let mut timed_out = false;
54    let status = loop {
55        if let Some(st) = child.try_wait()? {
56            break Some(st);
57        }
58        if start.elapsed() >= timeout {
59            let _ = child.kill();
60            let _ = child.wait();
61            timed_out = true;
62            break None;
63        }
64        thread::sleep(POLL_INTERVAL);
65    };
66
67    let stdout = stdout_h.join().unwrap_or_default();
68    let stderr = stderr_h.join().unwrap_or_default();
69
70    Ok(BoundedOutput {
71        status,
72        stdout,
73        stderr,
74        timed_out,
75    })
76}
77
78fn drain<R: Read>(mut r: R) -> Vec<u8> {
79    let mut buf = Vec::new();
80    let _ = r.read_to_end(&mut buf);
81    buf
82}
83
84#[cfg(test)]
85mod tests {
86    use super::*;
87
88    #[test]
89    fn completes_under_timeout() {
90        let mut cmd = Command::new("sh");
91        cmd.arg("-c").arg("echo hi; echo bye >&2");
92        let out = run_bounded(cmd, Duration::from_secs(5)).unwrap();
93        assert!(out.success());
94        assert!(!out.timed_out);
95        assert_eq!(String::from_utf8_lossy(&out.stdout).trim(), "hi");
96        assert_eq!(String::from_utf8_lossy(&out.stderr).trim(), "bye");
97    }
98
99    #[test]
100    fn kills_on_timeout() {
101        let mut cmd = Command::new("sh");
102        cmd.arg("-c").arg("sleep 30");
103        let out = run_bounded(cmd, Duration::from_millis(200)).unwrap();
104        assert!(out.timed_out);
105        assert!(out.status.is_none());
106    }
107
108    #[test]
109    fn propagates_nonzero_exit() {
110        let mut cmd = Command::new("sh");
111        cmd.arg("-c").arg("exit 7");
112        let out = run_bounded(cmd, Duration::from_secs(5)).unwrap();
113        assert!(!out.success());
114        assert!(!out.timed_out);
115        assert_eq!(out.status.and_then(|s| s.code()), Some(7));
116    }
117}