netsky-core 0.1.6

netsky core: agent model, prompt loader, spawner, config
Documentation
//! Bounded subprocess execution.
//!
//! The watchdog tick holds the D1 lock while driving osascript (escalate)
//! and `claude -p` (agentinit). If either hangs, the lock ages past its
//! stale threshold and a concurrent tick force-releases it, entering the
//! tick path twice. Every subprocess held under a lock must be bounded.
//!
//! `run_bounded` spawns the child, drains its pipes on dedicated threads,
//! polls `try_wait` on the main thread, and kills + reaps on timeout.

use std::io::{self, Read};
use std::process::{Command, ExitStatus, Stdio};
use std::thread;
use std::time::{Duration, Instant};

/// Interval between `try_wait` polls. Short enough to keep timeout
/// latency tight, long enough to stay well below 1% CPU.
const POLL_INTERVAL: Duration = Duration::from_millis(50);

/// Result of a time-bounded subprocess run.
#[derive(Debug)]
pub struct BoundedOutput {
    /// Exit status, or `None` if the process was killed on timeout.
    pub status: Option<ExitStatus>,
    pub stdout: Vec<u8>,
    pub stderr: Vec<u8>,
    /// True if we killed the child because it exceeded `timeout`.
    pub timed_out: bool,
}

impl BoundedOutput {
    pub fn success(&self) -> bool {
        self.status.is_some_and(|s| s.success())
    }
}

/// Run `cmd` to completion or `timeout`, whichever comes first.
///
/// On timeout, the child is SIGKILLed via `Child::kill` and reaped;
/// `timed_out` is set and `status` is `None`. Both pipes are always
/// drained so the child never blocks on a full pipe buffer.
pub fn run_bounded(mut cmd: Command, timeout: Duration) -> io::Result<BoundedOutput> {
    cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
    let mut child = cmd.spawn()?;

    let stdout_pipe = child
        .stdout
        .take()
        .ok_or_else(|| io::Error::other("child stdout was not piped"))?;
    let stderr_pipe = child
        .stderr
        .take()
        .ok_or_else(|| io::Error::other("child stderr was not piped"))?;

    let stdout_h = thread::spawn(move || drain(stdout_pipe));
    let stderr_h = thread::spawn(move || drain(stderr_pipe));

    let start = Instant::now();
    let mut timed_out = false;
    let status = loop {
        if let Some(st) = child.try_wait()? {
            break Some(st);
        }
        if start.elapsed() >= timeout {
            let _ = child.kill();
            let _ = child.wait();
            timed_out = true;
            break None;
        }
        thread::sleep(POLL_INTERVAL);
    };

    let stdout = stdout_h.join().unwrap_or_default();
    let stderr = stderr_h.join().unwrap_or_default();

    Ok(BoundedOutput {
        status,
        stdout,
        stderr,
        timed_out,
    })
}

fn drain<R: Read>(mut r: R) -> Vec<u8> {
    let mut buf = Vec::new();
    let _ = r.read_to_end(&mut buf);
    buf
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn completes_under_timeout() {
        let mut cmd = Command::new("sh");
        cmd.arg("-c").arg("echo hi; echo bye >&2");
        let out = run_bounded(cmd, Duration::from_secs(5)).unwrap();
        assert!(out.success());
        assert!(!out.timed_out);
        assert_eq!(String::from_utf8_lossy(&out.stdout).trim(), "hi");
        assert_eq!(String::from_utf8_lossy(&out.stderr).trim(), "bye");
    }

    #[test]
    fn kills_on_timeout() {
        let mut cmd = Command::new("sh");
        cmd.arg("-c").arg("sleep 30");
        let out = run_bounded(cmd, Duration::from_millis(200)).unwrap();
        assert!(out.timed_out);
        assert!(out.status.is_none());
    }

    #[test]
    fn propagates_nonzero_exit() {
        let mut cmd = Command::new("sh");
        cmd.arg("-c").arg("exit 7");
        let out = run_bounded(cmd, Duration::from_secs(5)).unwrap();
        assert!(!out.success());
        assert!(!out.timed_out);
        assert_eq!(out.status.and_then(|s| s.code()), Some(7));
    }
}