Skip to main content

netsky_core/
process.rs

1//! Bounded subprocess execution.
2//!
3//! The watchdog tick holds the D1 lock while driving osascript (escalate)
4//! and `claude -p` (agentinit). If either hangs, the lock ages past its
5//! stale threshold and a concurrent tick force-releases it, entering the
6//! tick path twice. Every subprocess held under a lock must be bounded.
7//!
8//! `run_bounded` spawns the child, drains its pipes on dedicated threads,
9//! polls `try_wait` on the main thread, and kills + reaps on timeout.
10
11use std::io::{self, Read};
12use std::process::{Command, ExitStatus, Stdio};
13use std::thread;
14use std::time::{Duration, Instant};
15
16/// Interval between `try_wait` polls. Short enough to keep timeout
17/// latency tight, long enough to stay well below 1% CPU.
18const POLL_INTERVAL: Duration = Duration::from_millis(50);
19
20/// Result of a time-bounded subprocess run.
21#[derive(Debug)]
22pub struct BoundedOutput {
23    /// Exit status, or `None` if the process was killed on timeout.
24    pub status: Option<ExitStatus>,
25    pub stdout: Vec<u8>,
26    pub stderr: Vec<u8>,
27    /// True if we killed the child because it exceeded `timeout`.
28    pub timed_out: bool,
29}
30
31impl BoundedOutput {
32    pub fn success(&self) -> bool {
33        self.status.is_some_and(|s| s.success())
34    }
35}
36
37/// Run `cmd` to completion or `timeout`, whichever comes first.
38///
39/// On timeout, the child is SIGKILLed via `Child::kill` and reaped;
40/// `timed_out` is set and `status` is `None`. Both pipes are always
41/// drained so the child never blocks on a full pipe buffer.
42pub fn run_bounded(mut cmd: Command, timeout: Duration) -> io::Result<BoundedOutput> {
43    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
44    let mut child = cmd.spawn()?;
45
46    let stdout_pipe = child
47        .stdout
48        .take()
49        .ok_or_else(|| io::Error::other("child stdout was not piped"))?;
50    let stderr_pipe = child
51        .stderr
52        .take()
53        .ok_or_else(|| io::Error::other("child stderr was not piped"))?;
54
55    let stdout_h = thread::spawn(move || drain(stdout_pipe));
56    let stderr_h = thread::spawn(move || drain(stderr_pipe));
57
58    let start = Instant::now();
59    let mut timed_out = false;
60    let status = loop {
61        if let Some(st) = child.try_wait()? {
62            break Some(st);
63        }
64        if start.elapsed() >= timeout {
65            let _ = child.kill();
66            let _ = child.wait();
67            timed_out = true;
68            break None;
69        }
70        thread::sleep(POLL_INTERVAL);
71    };
72
73    let stdout = stdout_h.join().unwrap_or_default();
74    let stderr = stderr_h.join().unwrap_or_default();
75
76    Ok(BoundedOutput {
77        status,
78        stdout,
79        stderr,
80        timed_out,
81    })
82}
83
84fn drain<R: Read>(mut r: R) -> Vec<u8> {
85    let mut buf = Vec::new();
86    let _ = r.read_to_end(&mut buf);
87    buf
88}
89
90#[cfg(test)]
91mod tests {
92    use super::*;
93
94    #[test]
95    fn completes_under_timeout() {
96        let mut cmd = Command::new("sh");
97        cmd.arg("-c").arg("echo hi; echo bye >&2");
98        let out = run_bounded(cmd, Duration::from_secs(5)).unwrap();
99        assert!(out.success());
100        assert!(!out.timed_out);
101        assert_eq!(String::from_utf8_lossy(&out.stdout).trim(), "hi");
102        assert_eq!(String::from_utf8_lossy(&out.stderr).trim(), "bye");
103    }
104
105    #[test]
106    fn kills_on_timeout() {
107        let mut cmd = Command::new("sh");
108        cmd.arg("-c").arg("sleep 30");
109        let out = run_bounded(cmd, Duration::from_millis(200)).unwrap();
110        assert!(out.timed_out);
111        assert!(out.status.is_none());
112    }
113
114    #[test]
115    fn propagates_nonzero_exit() {
116        let mut cmd = Command::new("sh");
117        cmd.arg("-c").arg("exit 7");
118        let out = run_bounded(cmd, Duration::from_secs(5)).unwrap();
119        assert!(!out.success());
120        assert!(!out.timed_out);
121        assert_eq!(out.status.and_then(|s| s.code()), Some(7));
122    }
123}