Skip to main content

team_core/
supervisor.rs

1//! Process supervision.
2//!
3//! The default back-end is a portable `TmuxSupervisor` that works on macOS
4//! and Linux. `SystemdSupervisor` and `LaunchdSupervisor` plug in behind
5//! the same trait when the host supports them.
6
7use std::path::{Path, PathBuf};
8use std::process::Command;
9use std::thread;
10use std::time::{Duration, Instant};
11
12use anyhow::{Context, Result};
13
14use crate::compose::AgentHandle;
15
16#[derive(Debug, Clone)]
17pub struct AgentSpec {
18    pub project: String,
19    pub agent: String,
20    pub tmux_session: String,
21    pub wrapper: PathBuf,
22    pub cwd: PathBuf,
23    pub env_file: PathBuf,
24}
25
26impl AgentSpec {
27    pub fn from_handle(h: AgentHandle<'_>, root: &Path, tmux_prefix: &str) -> Self {
28        Self {
29            project: h.project.into(),
30            agent: h.agent.into(),
31            tmux_session: format!("{tmux_prefix}{}-{}", h.project, h.agent),
32            wrapper: root.join("bin/agent-wrapper.sh"),
33            cwd: root.to_path_buf(),
34            env_file: crate::render::env_path(root, h.project, h.agent),
35        }
36    }
37}
38
39/// Observed state of an agent's supervising process.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum AgentState {
42    Running,
43    Stopped,
44    Unknown,
45}
46
47/// Outcome of a graceful drain. `Graceful` means the agent observed
48/// `Stopped` before the timeout elapsed; `TimedOutKilled` means the
49/// poll fell through and `down()` was used as a hard stop. Surfaced
50/// to the caller so reload can annotate which agents were forcibly
51/// killed — operator signal that a drain budget needs tuning.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum DrainOutcome {
54    Graceful,
55    TimedOutKilled,
56}
57
58pub trait Supervisor {
59    fn up(&self, spec: &AgentSpec) -> Result<()>;
60    fn down(&self, spec: &AgentSpec) -> Result<()>;
61    fn state(&self, spec: &AgentSpec) -> Result<AgentState>;
62
63    /// Stop an agent gracefully. The default implementation falls
64    /// back to `down()` for back-ends that don't implement signal
65    /// delivery (or where graceful shutdown isn't meaningful — e.g.
66    /// a `MockSupervisor` in tests).
67    fn drain(&self, spec: &AgentSpec, _timeout: Duration) -> Result<DrainOutcome> {
68        self.down(spec)?;
69        Ok(DrainOutcome::TimedOutKilled)
70    }
71}
72
73/// Portable supervisor: one detached `tmux` session per agent.
74pub struct TmuxSupervisor;
75
76impl Supervisor for TmuxSupervisor {
77    fn up(&self, spec: &AgentSpec) -> Result<()> {
78        if matches!(self.state(spec)?, AgentState::Running) {
79            return Ok(());
80        }
81        let cmd = format!(
82            "env $(cat {env}) {wrapper} {project}:{agent}",
83            env = shlex::try_quote(&spec.env_file.display().to_string())?,
84            wrapper = shlex::try_quote(&spec.wrapper.display().to_string())?,
85            project = spec.project,
86            agent = spec.agent,
87        );
88        let status = Command::new("tmux")
89            .args([
90                "new-session",
91                "-d",
92                "-s",
93                &spec.tmux_session,
94                "-c",
95                &spec.cwd.display().to_string(),
96                "sh",
97                "-c",
98                &cmd,
99            ])
100            .status()
101            .context("spawn tmux new-session")?;
102        anyhow::ensure!(status.success(), "tmux new-session exited {status}");
103        Ok(())
104    }
105
106    fn down(&self, spec: &AgentSpec) -> Result<()> {
107        let _ = Command::new("tmux")
108            .args(["kill-session", "-t", &spec.tmux_session])
109            .status();
110        Ok(())
111    }
112
113    fn state(&self, spec: &AgentSpec) -> Result<AgentState> {
114        let out = Command::new("tmux")
115            .args(["has-session", "-t", &spec.tmux_session])
116            .output();
117        Ok(match out {
118            Ok(o) if o.status.success() => AgentState::Running,
119            Ok(_) => AgentState::Stopped,
120            Err(_) => AgentState::Unknown,
121        })
122    }
123
124    /// Send Ctrl-C to the pane (kernel delivers SIGINT to the
125    /// foreground process), then poll for `Stopped` up to `timeout`.
126    /// Falls through to `kill-session` if the agent doesn't exit in
127    /// time. Used by `reload` so in-flight tool calls and partial
128    /// assistant responses get a chance to flush instead of being
129    /// SIGKILL'd by the prior `down()`.
130    fn drain(&self, spec: &AgentSpec, timeout: Duration) -> Result<DrainOutcome> {
131        let _ = Command::new("tmux")
132            .args(["send-keys", "-t", &spec.tmux_session, "C-c"])
133            .status();
134        let outcome = poll_for_stopped(timeout, POLL_INTERVAL, || {
135            self.state(spec).unwrap_or(AgentState::Unknown)
136        });
137        if outcome == DrainOutcome::TimedOutKilled {
138            self.down(spec)?;
139        }
140        Ok(outcome)
141    }
142}
143
144const POLL_INTERVAL: Duration = Duration::from_millis(250);
145
146/// Poll `observe_state` every `interval` for up to `timeout`, returning
147/// `Graceful` if `Stopped` is observed in time and `TimedOutKilled`
148/// otherwise. Pulled out as a free function so it can be tested with
149/// fake observers — neither tmux nor real time is involved.
150fn poll_for_stopped<F: FnMut() -> AgentState>(
151    timeout: Duration,
152    interval: Duration,
153    mut observe_state: F,
154) -> DrainOutcome {
155    let deadline = Instant::now() + timeout;
156    loop {
157        if observe_state() == AgentState::Stopped {
158            return DrainOutcome::Graceful;
159        }
160        if Instant::now() >= deadline {
161            return DrainOutcome::TimedOutKilled;
162        }
163        thread::sleep(interval);
164    }
165}
166
167#[cfg(test)]
168mod drain_tests {
169    use super::*;
170    use std::cell::RefCell;
171
172    #[test]
173    fn poll_returns_graceful_when_stopped_observed_in_time() {
174        let calls = RefCell::new(0u32);
175        let outcome = poll_for_stopped(Duration::from_millis(50), Duration::from_millis(1), || {
176            let mut n = calls.borrow_mut();
177            *n += 1;
178            if *n >= 2 {
179                AgentState::Stopped
180            } else {
181                AgentState::Running
182            }
183        });
184        assert_eq!(outcome, DrainOutcome::Graceful);
185    }
186
187    #[test]
188    fn poll_falls_through_to_kill_when_agent_never_stops() {
189        let outcome = poll_for_stopped(Duration::from_millis(8), Duration::from_millis(2), || {
190            AgentState::Running
191        });
192        assert_eq!(outcome, DrainOutcome::TimedOutKilled);
193    }
194
195    #[test]
196    fn poll_zero_timeout_only_checks_once_then_kills() {
197        let mut calls: u32 = 0;
198        let outcome = poll_for_stopped(Duration::from_millis(0), Duration::from_millis(1), || {
199            calls += 1;
200            AgentState::Running
201        });
202        assert_eq!(outcome, DrainOutcome::TimedOutKilled);
203        assert_eq!(calls, 1, "single state observation before timeout");
204    }
205}
206
207mod shlex {
208    /// Minimal POSIX shell single-quote escaper so we don't pull a full dep.
209    pub fn try_quote(s: &str) -> anyhow::Result<String> {
210        anyhow::ensure!(!s.contains('\0'), "null byte in shell arg");
211        let escaped = s.replace('\'', r"'\''");
212        Ok(format!("'{escaped}'"))
213    }
214
215    #[cfg(test)]
216    mod tests {
217        use super::*;
218
219        #[test]
220        fn quotes_plain_path() {
221            assert_eq!(try_quote("/a/b.sh").unwrap(), "'/a/b.sh'");
222        }
223
224        #[test]
225        fn escapes_embedded_single_quote() {
226            assert_eq!(try_quote("x'y").unwrap(), r"'x'\''y'");
227        }
228    }
229}