Skip to main content

team_core/
supervisor.rs

1//! Process supervision.
2//!
3//! The default back-end is a portable `TmuxSupervisor` that works on macOS
4//! and Linux. `SystemdSupervisor` and `LaunchdSupervisor` plug in behind
5//! the same trait when the host supports them.
6
7use std::path::{Path, PathBuf};
8use std::process::Command;
9use std::thread;
10use std::time::{Duration, Instant};
11
12use anyhow::{Context, Result};
13
14use crate::compose::AgentHandle;
15
16#[derive(Debug, Clone)]
17pub struct AgentSpec {
18    pub project: String,
19    pub agent: String,
20    pub tmux_session: String,
21    pub wrapper: PathBuf,
22    pub cwd: PathBuf,
23    pub env_file: PathBuf,
24}
25
26impl AgentSpec {
27    pub fn from_handle(h: AgentHandle<'_>, root: &Path, tmux_prefix: &str) -> Self {
28        Self {
29            project: h.project.into(),
30            agent: h.agent.into(),
31            tmux_session: format!("{tmux_prefix}{}-{}", h.project, h.agent),
32            wrapper: root.join("bin/agent-wrapper.sh"),
33            cwd: root.to_path_buf(),
34            env_file: crate::render::env_path(root, h.project, h.agent),
35        }
36    }
37}
38
39/// Observed state of an agent's supervising process.
40#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum AgentState {
42    Running,
43    Stopped,
44    Unknown,
45}
46
47/// Outcome of a graceful drain. `Graceful` means the agent observed
48/// `Stopped` before the timeout elapsed; `TimedOutKilled` means the
49/// poll fell through and `down()` was used as a hard stop. Surfaced
50/// to the caller so reload can annotate which agents were forcibly
51/// killed — operator signal that a drain budget needs tuning.
52#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum DrainOutcome {
54    Graceful,
55    TimedOutKilled,
56}
57
58pub trait Supervisor {
59    fn up(&self, spec: &AgentSpec) -> Result<()>;
60    fn down(&self, spec: &AgentSpec) -> Result<()>;
61    fn state(&self, spec: &AgentSpec) -> Result<AgentState>;
62
63    /// Stop an agent gracefully. The default implementation falls
64    /// back to `down()` for back-ends that don't implement signal
65    /// delivery (or where graceful shutdown isn't meaningful — e.g.
66    /// a `MockSupervisor` in tests).
67    fn drain(&self, spec: &AgentSpec, _timeout: Duration) -> Result<DrainOutcome> {
68        self.down(spec)?;
69        Ok(DrainOutcome::TimedOutKilled)
70    }
71
72    /// Cadence at which `drain` polls for `Stopped` after the
73    /// graceful-stop signal is sent. Default 250ms — fine on every
74    /// host we've tested. The hook exists so tests can inject a
75    /// shorter cadence (no real-time waits) without going through
76    /// the OS, and so a future slow-tmux host has an escape valve
77    /// without forking the orchestration.
78    fn drain_poll_interval(&self) -> Duration {
79        Duration::from_millis(250)
80    }
81}
82
83/// Generic graceful-drain orchestration used by `Supervisor` impls
84/// that have a "signal a graceful stop" primitive (e.g. tmux's
85/// `send-keys C-c`). Calls `signal_fn`, polls
86/// `supervisor.state(spec)` for `Stopped` up to `timeout` at the
87/// supervisor's `drain_poll_interval`, falls through to
88/// `supervisor.down(spec)` if the agent doesn't exit in time.
89///
90/// Pulled out so the orchestration contract is testable end-to-end
91/// against a `MockSupervisor` without a real tmux runtime.
92pub fn orchestrate_drain<S, F>(
93    supervisor: &S,
94    spec: &AgentSpec,
95    timeout: Duration,
96    signal_fn: F,
97) -> Result<DrainOutcome>
98where
99    S: Supervisor + ?Sized,
100    F: FnOnce(),
101{
102    signal_fn();
103    let outcome = poll_for_stopped(timeout, supervisor.drain_poll_interval(), || {
104        supervisor.state(spec).unwrap_or(AgentState::Unknown)
105    });
106    if outcome == DrainOutcome::TimedOutKilled {
107        supervisor.down(spec)?;
108    }
109    Ok(outcome)
110}
111
112/// Read the per-agent env file into a list of `KEY=VALUE` assignment
113/// tokens, parsed in Rust so values never reach the shell unquoted.
114///
115/// The file is the line-based `KEY=VALUE` shape written by
116/// [`crate::render`]. Lines are taken verbatim (value bytes — spaces,
117/// glob metacharacters, `$` — preserved exactly); blank lines and any
118/// line without `=` are skipped. Before T-194 a no-`=` line would have
119/// been word-split into command position and broken the launch
120/// entirely, so skipping it is strictly safer and leaves the
121/// common-case contract intact. A missing env file yields no
122/// assignments — matching the prior `$(cat <missing>)` behaviour
123/// (empty substitution, agent still launches); making it fatal here
124/// would be a behaviour change outside this ticket's scope.
125fn env_assignments(env_file: &Path) -> Result<Vec<String>> {
126    let body = match std::fs::read_to_string(env_file) {
127        Ok(b) => b,
128        Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
129        Err(e) => return Err(e).with_context(|| format!("read env file {}", env_file.display())),
130    };
131    Ok(body
132        .lines()
133        .filter(|l| !l.trim().is_empty() && l.contains('='))
134        .map(str::to_string)
135        .collect())
136}
137
138/// Build the `sh -c` command line that launches one agent.
139///
140/// Each env assignment is read in Rust and passed to `env` as its own
141/// single-quoted token, so the shell performs no word-splitting or
142/// pathname expansion on env values (T-194). Command shape is otherwise
143/// unchanged from the pre-T-194 form.
144fn build_up_command(spec: &AgentSpec) -> Result<String> {
145    let mut parts: Vec<String> = vec!["env".to_string()];
146    for kv in env_assignments(&spec.env_file)? {
147        parts.push(shlex::try_quote(&kv)?);
148    }
149    parts.push(shlex::try_quote(&spec.wrapper.display().to_string())?);
150    parts.push(format!("{}:{}", spec.project, spec.agent));
151    Ok(parts.join(" "))
152}
153
154/// Portable supervisor: one detached `tmux` session per agent.
155pub struct TmuxSupervisor;
156
157impl Supervisor for TmuxSupervisor {
158    fn up(&self, spec: &AgentSpec) -> Result<()> {
159        if matches!(self.state(spec)?, AgentState::Running) {
160            return Ok(());
161        }
162        let cmd = build_up_command(spec)?;
163        // -x/-y set the size of the detached pane. Without these, tmux
164        // falls back to 80x24 for off-screen windows, which is what the
165        // child's PTY inherits via TIOCGWINSZ on stdin. We pick a size
166        // larger than any common terminal so the inner TUI starts roomy;
167        // once a client attaches, SIGWINCH propagates the real size
168        // through `rl-watch`.
169        let status = Command::new("tmux")
170            .args([
171                "new-session",
172                "-d",
173                "-x",
174                "200",
175                "-y",
176                "50",
177                "-s",
178                &spec.tmux_session,
179                "-c",
180                &spec.cwd.display().to_string(),
181                "sh",
182                "-c",
183                &cmd,
184            ])
185            .status()
186            .context("spawn tmux new-session")?;
187        anyhow::ensure!(status.success(), "tmux new-session exited {status}");
188        // Tag the session so `teamctl sessions` can identify it as
189        // teamctl-managed across projects without parsing the name.
190        // Best-effort — `-q` swallows tmux errors so a stale tmux build
191        // can't break `up`.
192        let cwd_str = spec.cwd.to_string_lossy();
193        for (key, value) in [
194            ("@teamctl", "1"),
195            ("@teamctl-project", spec.project.as_str()),
196            ("@teamctl-agent", spec.agent.as_str()),
197            ("@teamctl-root", cwd_str.as_ref()),
198        ] {
199            let _ = Command::new("tmux")
200                .args(["set-option", "-q", "-t", &spec.tmux_session, key, value])
201                .status();
202        }
203        Ok(())
204    }
205
206    fn down(&self, spec: &AgentSpec) -> Result<()> {
207        let _ = Command::new("tmux")
208            .args(["kill-session", "-t", &spec.tmux_session])
209            .status();
210        Ok(())
211    }
212
213    fn state(&self, spec: &AgentSpec) -> Result<AgentState> {
214        let out = Command::new("tmux")
215            .args(["has-session", "-t", &spec.tmux_session])
216            .output();
217        Ok(match out {
218            Ok(o) if o.status.success() => AgentState::Running,
219            Ok(_) => AgentState::Stopped,
220            Err(_) => AgentState::Unknown,
221        })
222    }
223
224    /// Send Ctrl-C to the pane (kernel delivers SIGINT to the
225    /// foreground process), then poll for `Stopped` up to `timeout`.
226    /// Falls through to `kill-session` if the agent doesn't exit in
227    /// time. Used by `reload` so in-flight tool calls and partial
228    /// assistant responses get a chance to flush instead of being
229    /// SIGKILL'd by the prior `down()`.
230    fn drain(&self, spec: &AgentSpec, timeout: Duration) -> Result<DrainOutcome> {
231        orchestrate_drain(self, spec, timeout, || {
232            let _ = Command::new("tmux")
233                .args(["send-keys", "-t", &spec.tmux_session, "C-c"])
234                .status();
235        })
236    }
237}
238
239/// Poll `observe_state` every `interval` for up to `timeout`, returning
240/// `Graceful` if `Stopped` is observed in time and `TimedOutKilled`
241/// otherwise. Pulled out as a free function so it can be tested with
242/// fake observers — neither tmux nor real time is involved.
243fn poll_for_stopped<F: FnMut() -> AgentState>(
244    timeout: Duration,
245    interval: Duration,
246    mut observe_state: F,
247) -> DrainOutcome {
248    let deadline = Instant::now() + timeout;
249    loop {
250        if observe_state() == AgentState::Stopped {
251            return DrainOutcome::Graceful;
252        }
253        if Instant::now() >= deadline {
254            return DrainOutcome::TimedOutKilled;
255        }
256        thread::sleep(interval);
257    }
258}
259
260#[cfg(test)]
261mod drain_tests {
262    use super::*;
263    use std::cell::RefCell;
264
265    #[test]
266    fn poll_returns_graceful_when_stopped_observed_in_time() {
267        let calls = RefCell::new(0u32);
268        let outcome = poll_for_stopped(Duration::from_millis(50), Duration::from_millis(1), || {
269            let mut n = calls.borrow_mut();
270            *n += 1;
271            if *n >= 2 {
272                AgentState::Stopped
273            } else {
274                AgentState::Running
275            }
276        });
277        assert_eq!(outcome, DrainOutcome::Graceful);
278    }
279
280    #[test]
281    fn poll_falls_through_to_kill_when_agent_never_stops() {
282        let outcome = poll_for_stopped(Duration::from_millis(8), Duration::from_millis(2), || {
283            AgentState::Running
284        });
285        assert_eq!(outcome, DrainOutcome::TimedOutKilled);
286    }
287
288    #[test]
289    fn poll_zero_timeout_only_checks_once_then_kills() {
290        let mut calls: u32 = 0;
291        let outcome = poll_for_stopped(Duration::from_millis(0), Duration::from_millis(1), || {
292            calls += 1;
293            AgentState::Running
294        });
295        assert_eq!(outcome, DrainOutcome::TimedOutKilled);
296        assert_eq!(calls, 1, "single state observation before timeout");
297    }
298
299    /// Test supervisor that records every up/down/state/drain
300    /// call, optionally returns `Stopped` after N state observations,
301    /// and exposes a tunable `drain_poll_interval` so tests don't
302    /// wait on real time. Every invariant a Supervisor impl is
303    /// supposed to honour can be asserted against this.
304    #[derive(Default)]
305    struct MockSupervisor {
306        calls: RefCell<Vec<&'static str>>,
307        /// On the Nth state() call (1-indexed), return Stopped. 0 =
308        /// always Running.
309        stop_after: u32,
310        state_calls: RefCell<u32>,
311        poll_interval: Duration,
312    }
313
314    impl MockSupervisor {
315        fn record(&self, op: &'static str) {
316            self.calls.borrow_mut().push(op);
317        }
318    }
319
320    impl Supervisor for MockSupervisor {
321        fn up(&self, _spec: &AgentSpec) -> Result<()> {
322            self.record("up");
323            Ok(())
324        }
325        fn down(&self, _spec: &AgentSpec) -> Result<()> {
326            self.record("down");
327            Ok(())
328        }
329        fn state(&self, _spec: &AgentSpec) -> Result<AgentState> {
330            self.record("state");
331            let mut n = self.state_calls.borrow_mut();
332            *n += 1;
333            if self.stop_after > 0 && *n >= self.stop_after {
334                Ok(AgentState::Stopped)
335            } else {
336                Ok(AgentState::Running)
337            }
338        }
339        fn drain_poll_interval(&self) -> Duration {
340            self.poll_interval
341        }
342    }
343
344    fn fake_spec() -> AgentSpec {
345        AgentSpec {
346            project: "p".into(),
347            agent: "a".into(),
348            tmux_session: "p-a".into(),
349            wrapper: PathBuf::from("/dev/null"),
350            cwd: PathBuf::from("/tmp"),
351            env_file: PathBuf::from("/dev/null"),
352        }
353    }
354
355    #[test]
356    fn drain_with_zero_timeout_returns_timed_out_killed_and_calls_down() {
357        // Contract: timeout=0 → instant signal-fn invocation, single
358        // state observation, fall-through to down(). No graceful path,
359        // no double-kill, no other side effects.
360        let mock = MockSupervisor {
361            poll_interval: Duration::from_millis(1),
362            ..Default::default()
363        };
364        let spec = fake_spec();
365        let signaled = RefCell::new(false);
366
367        let outcome = orchestrate_drain(&mock, &spec, Duration::ZERO, || {
368            *signaled.borrow_mut() = true;
369        })
370        .unwrap();
371
372        assert_eq!(outcome, DrainOutcome::TimedOutKilled);
373        assert!(*signaled.borrow(), "signal_fn must run before the poll");
374        assert_eq!(
375            mock.calls.borrow().as_slice(),
376            &["state", "down"],
377            "zero-timeout: one state observation then kill"
378        );
379    }
380
381    #[test]
382    fn drain_with_graceful_stop_does_not_call_down() {
383        // Contract: agent observed `Stopped` within timeout → no
384        // fall-through kill. The down() side effect is reserved for
385        // forced terminations.
386        let mock = MockSupervisor {
387            poll_interval: Duration::from_millis(1),
388            stop_after: 2, // Stopped on 2nd state() call.
389            ..Default::default()
390        };
391        let spec = fake_spec();
392
393        let outcome = orchestrate_drain(&mock, &spec, Duration::from_millis(100), || {}).unwrap();
394
395        assert_eq!(outcome, DrainOutcome::Graceful);
396        assert!(
397            !mock.calls.borrow().contains(&"down"),
398            "graceful drain must not call down(); calls: {:?}",
399            mock.calls.borrow()
400        );
401    }
402
403    #[test]
404    fn drain_poll_interval_default_is_250ms() {
405        // Pin the documented default so a future "tighten the
406        // default" change has to update the docstring + this test
407        // together.
408        struct Default250;
409        impl Supervisor for Default250 {
410            fn up(&self, _: &AgentSpec) -> Result<()> {
411                Ok(())
412            }
413            fn down(&self, _: &AgentSpec) -> Result<()> {
414                Ok(())
415            }
416            fn state(&self, _: &AgentSpec) -> Result<AgentState> {
417                Ok(AgentState::Stopped)
418            }
419        }
420        assert_eq!(Default250.drain_poll_interval(), Duration::from_millis(250));
421    }
422
423    #[test]
424    fn drain_poll_interval_override_is_used_by_orchestrator() {
425        // Sanity check that the trait method's value flows into
426        // poll_for_stopped — without this, a host-specific override
427        // would silently no-op.
428        let mock = MockSupervisor {
429            poll_interval: Duration::from_millis(2),
430            stop_after: 0,
431            ..Default::default()
432        };
433        let spec = fake_spec();
434
435        let start = Instant::now();
436        let _ = orchestrate_drain(&mock, &spec, Duration::from_millis(8), || {});
437        let elapsed = start.elapsed();
438
439        // With a 2ms poll interval and an 8ms timeout, we expect a
440        // handful of state observations, not 0 and not 100. Loose
441        // bound — enough to catch a 250ms default leaking in.
442        let states = mock
443            .calls
444            .borrow()
445            .iter()
446            .filter(|c| **c == "state")
447            .count();
448        assert!(
449            states >= 2,
450            "expected several state observations at 2ms cadence, got {states}"
451        );
452        assert!(
453            elapsed < Duration::from_millis(60),
454            "drain with 2ms interval finished too slowly ({elapsed:?})"
455        );
456    }
457}
458
459mod shlex {
460    /// Minimal POSIX shell single-quote escaper so we don't pull a full dep.
461    pub fn try_quote(s: &str) -> anyhow::Result<String> {
462        anyhow::ensure!(!s.contains('\0'), "null byte in shell arg");
463        let escaped = s.replace('\'', r"'\''");
464        Ok(format!("'{escaped}'"))
465    }
466
467    #[cfg(test)]
468    mod tests {
469        use super::*;
470
471        #[test]
472        fn quotes_plain_path() {
473            assert_eq!(try_quote("/a/b.sh").unwrap(), "'/a/b.sh'");
474        }
475
476        #[test]
477        fn escapes_embedded_single_quote() {
478            assert_eq!(try_quote("x'y").unwrap(), r"'x'\''y'");
479        }
480    }
481}
482
483#[cfg(test)]
484#[cfg(unix)]
485mod env_harden_tests {
486    //! T-194: env-file values must reach the agent process verbatim —
487    //! no shell word-splitting, no glob expansion against the cwd.
488    use super::*;
489    use std::fs;
490    use std::os::unix::fs::PermissionsExt;
491    use std::process::Command;
492
493    fn spec_with(env_file: &Path, wrapper: &Path, cwd: &Path) -> AgentSpec {
494        AgentSpec {
495            project: "proj".into(),
496            agent: "agt".into(),
497            tmux_session: "proj-agt".into(),
498            wrapper: wrapper.to_path_buf(),
499            cwd: cwd.to_path_buf(),
500            env_file: env_file.to_path_buf(),
501        }
502    }
503
504    /// The end-to-end Done-when pin: a value with spaces and a value
505    /// with glob metacharacters must survive `sh -c <cmd>` into the
506    /// launched process unchanged, even when cwd holds files a glob
507    /// would otherwise match.
508    #[test]
509    fn env_values_round_trip_through_real_shell() {
510        let dir = tempfile::tempdir().unwrap();
511        let env_file = dir.path().join("agt.env");
512        fs::write(
513            &env_file,
514            "MY_PATH=/some path with spaces/x\nGL=*?\nPLAIN=ok\n",
515        )
516        .unwrap();
517
518        // cwd holds a decoy file: under the pre-T-194 `$(cat)` form the
519        // unquoted `*?` would glob-expand to this name.
520        let cwd = tempfile::tempdir().unwrap();
521        fs::write(cwd.path().join("decoy"), "x").unwrap();
522
523        let wrapper = dir.path().join("wrapper.sh");
524        fs::write(
525            &wrapper,
526            "#!/bin/sh\nprintf 'MY_PATH=[%s]\\n' \"$MY_PATH\"\n\
527             printf 'GL=[%s]\\n' \"$GL\"\nprintf 'PLAIN=[%s]\\n' \"$PLAIN\"\n",
528        )
529        .unwrap();
530        fs::set_permissions(&wrapper, fs::Permissions::from_mode(0o755)).unwrap();
531
532        let spec = spec_with(&env_file, &wrapper, cwd.path());
533        let cmd = build_up_command(&spec).unwrap();
534
535        let out = Command::new("sh")
536            .arg("-c")
537            .arg(&cmd)
538            .current_dir(cwd.path())
539            .output()
540            .unwrap();
541        let stdout = String::from_utf8_lossy(&out.stdout);
542
543        assert!(
544            stdout.contains("MY_PATH=[/some path with spaces/x]"),
545            "spaced value mangled by the shell — cmd: {cmd}\nstdout: {stdout}"
546        );
547        assert!(
548            stdout.contains("GL=[*?]"),
549            "glob value expanded against cwd — cmd: {cmd}\nstdout: {stdout}"
550        );
551        assert!(
552            stdout.contains("PLAIN=[ok]"),
553            "common-case value lost — cmd: {cmd}\nstdout: {stdout}"
554        );
555    }
556
557    #[test]
558    fn env_assignments_keeps_lines_verbatim_skips_blank_and_no_eq() {
559        let dir = tempfile::tempdir().unwrap();
560        let f = dir.path().join("a.env");
561        // Spaced value, glob value, `=` inside value, empty value,
562        // a blank line, and a stray no-`=` line.
563        fs::write(&f, "K=v\nSP=/a b/c\nGL=*?\nEQ=a=b\nEMPTY=\n\nstray-no-eq\n").unwrap();
564        assert_eq!(
565            env_assignments(&f).unwrap(),
566            vec![
567                "K=v".to_string(),
568                "SP=/a b/c".to_string(),
569                "GL=*?".to_string(),
570                "EQ=a=b".to_string(),
571                "EMPTY=".to_string(),
572            ]
573        );
574    }
575
576    #[test]
577    fn env_assignments_missing_file_is_empty_not_error() {
578        let p = Path::new("/no/such/teamctl/env/file.env");
579        assert_eq!(env_assignments(p).unwrap(), Vec::<String>::new());
580    }
581
582    #[test]
583    fn build_up_command_quotes_each_token_and_has_no_cat() {
584        let dir = tempfile::tempdir().unwrap();
585        let env_file = dir.path().join("a.env");
586        fs::write(&env_file, "SP=/a b/c\nGL=*?\n").unwrap();
587        let spec = spec_with(&env_file, Path::new("/w/wrap.sh"), Path::new("/tmp"));
588
589        let cmd = build_up_command(&spec).unwrap();
590
591        assert!(!cmd.contains("$(cat"), "command substitution gone: {cmd}");
592        assert!(!cmd.contains("cat "), "no cat at all: {cmd}");
593        assert!(
594            cmd.contains("'SP=/a b/c'"),
595            "spaced kv single-quoted: {cmd}"
596        );
597        assert!(cmd.contains("'GL=*?'"), "glob kv single-quoted: {cmd}");
598        assert!(cmd.contains("'/w/wrap.sh'"), "wrapper still quoted: {cmd}");
599        assert!(cmd.ends_with(" proj:agt"), "agent arg unchanged: {cmd}");
600    }
601
602    #[test]
603    fn build_up_command_empty_env_has_no_stray_token() {
604        let dir = tempfile::tempdir().unwrap();
605        let env_file = dir.path().join("empty.env");
606        fs::write(&env_file, "").unwrap();
607        let spec = spec_with(&env_file, Path::new("/w/wrap.sh"), Path::new("/tmp"));
608
609        assert_eq!(
610            build_up_command(&spec).unwrap(),
611            "env '/w/wrap.sh' proj:agt"
612        );
613    }
614}