1use std::path::{Path, PathBuf};
8use std::process::Command;
9use std::thread;
10use std::time::{Duration, Instant};
11
12use anyhow::{Context, Result};
13
14use crate::compose::AgentHandle;
15
16#[derive(Debug, Clone)]
17pub struct AgentSpec {
18 pub project: String,
19 pub agent: String,
20 pub tmux_session: String,
21 pub wrapper: PathBuf,
22 pub cwd: PathBuf,
23 pub env_file: PathBuf,
24}
25
26impl AgentSpec {
27 pub fn from_handle(h: AgentHandle<'_>, root: &Path, tmux_prefix: &str) -> Self {
28 Self {
29 project: h.project.into(),
30 agent: h.agent.into(),
31 tmux_session: format!("{tmux_prefix}{}-{}", h.project, h.agent),
32 wrapper: root.join("bin/agent-wrapper.sh"),
33 cwd: root.to_path_buf(),
34 env_file: crate::render::env_path(root, h.project, h.agent),
35 }
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum AgentState {
42 Running,
43 Stopped,
44 Unknown,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum DrainOutcome {
54 Graceful,
55 TimedOutKilled,
56}
57
58pub trait Supervisor {
59 fn up(&self, spec: &AgentSpec) -> Result<()>;
60 fn down(&self, spec: &AgentSpec) -> Result<()>;
61 fn state(&self, spec: &AgentSpec) -> Result<AgentState>;
62
63 fn drain(&self, spec: &AgentSpec, _timeout: Duration) -> Result<DrainOutcome> {
68 self.down(spec)?;
69 Ok(DrainOutcome::TimedOutKilled)
70 }
71}
72
73pub struct TmuxSupervisor;
75
76impl Supervisor for TmuxSupervisor {
77 fn up(&self, spec: &AgentSpec) -> Result<()> {
78 if matches!(self.state(spec)?, AgentState::Running) {
79 return Ok(());
80 }
81 let cmd = format!(
82 "env $(cat {env}) {wrapper} {project}:{agent}",
83 env = shlex::try_quote(&spec.env_file.display().to_string())?,
84 wrapper = shlex::try_quote(&spec.wrapper.display().to_string())?,
85 project = spec.project,
86 agent = spec.agent,
87 );
88 let status = Command::new("tmux")
89 .args([
90 "new-session",
91 "-d",
92 "-s",
93 &spec.tmux_session,
94 "-c",
95 &spec.cwd.display().to_string(),
96 "sh",
97 "-c",
98 &cmd,
99 ])
100 .status()
101 .context("spawn tmux new-session")?;
102 anyhow::ensure!(status.success(), "tmux new-session exited {status}");
103 Ok(())
104 }
105
106 fn down(&self, spec: &AgentSpec) -> Result<()> {
107 let _ = Command::new("tmux")
108 .args(["kill-session", "-t", &spec.tmux_session])
109 .status();
110 Ok(())
111 }
112
113 fn state(&self, spec: &AgentSpec) -> Result<AgentState> {
114 let out = Command::new("tmux")
115 .args(["has-session", "-t", &spec.tmux_session])
116 .output();
117 Ok(match out {
118 Ok(o) if o.status.success() => AgentState::Running,
119 Ok(_) => AgentState::Stopped,
120 Err(_) => AgentState::Unknown,
121 })
122 }
123
124 fn drain(&self, spec: &AgentSpec, timeout: Duration) -> Result<DrainOutcome> {
131 let _ = Command::new("tmux")
132 .args(["send-keys", "-t", &spec.tmux_session, "C-c"])
133 .status();
134 let outcome = poll_for_stopped(timeout, POLL_INTERVAL, || {
135 self.state(spec).unwrap_or(AgentState::Unknown)
136 });
137 if outcome == DrainOutcome::TimedOutKilled {
138 self.down(spec)?;
139 }
140 Ok(outcome)
141 }
142}
143
144const POLL_INTERVAL: Duration = Duration::from_millis(250);
145
146fn poll_for_stopped<F: FnMut() -> AgentState>(
151 timeout: Duration,
152 interval: Duration,
153 mut observe_state: F,
154) -> DrainOutcome {
155 let deadline = Instant::now() + timeout;
156 loop {
157 if observe_state() == AgentState::Stopped {
158 return DrainOutcome::Graceful;
159 }
160 if Instant::now() >= deadline {
161 return DrainOutcome::TimedOutKilled;
162 }
163 thread::sleep(interval);
164 }
165}
166
167#[cfg(test)]
168mod drain_tests {
169 use super::*;
170 use std::cell::RefCell;
171
172 #[test]
173 fn poll_returns_graceful_when_stopped_observed_in_time() {
174 let calls = RefCell::new(0u32);
175 let outcome = poll_for_stopped(Duration::from_millis(50), Duration::from_millis(1), || {
176 let mut n = calls.borrow_mut();
177 *n += 1;
178 if *n >= 2 {
179 AgentState::Stopped
180 } else {
181 AgentState::Running
182 }
183 });
184 assert_eq!(outcome, DrainOutcome::Graceful);
185 }
186
187 #[test]
188 fn poll_falls_through_to_kill_when_agent_never_stops() {
189 let outcome = poll_for_stopped(Duration::from_millis(8), Duration::from_millis(2), || {
190 AgentState::Running
191 });
192 assert_eq!(outcome, DrainOutcome::TimedOutKilled);
193 }
194
195 #[test]
196 fn poll_zero_timeout_only_checks_once_then_kills() {
197 let mut calls: u32 = 0;
198 let outcome = poll_for_stopped(Duration::from_millis(0), Duration::from_millis(1), || {
199 calls += 1;
200 AgentState::Running
201 });
202 assert_eq!(outcome, DrainOutcome::TimedOutKilled);
203 assert_eq!(calls, 1, "single state observation before timeout");
204 }
205}
206
207mod shlex {
208 pub fn try_quote(s: &str) -> anyhow::Result<String> {
210 anyhow::ensure!(!s.contains('\0'), "null byte in shell arg");
211 let escaped = s.replace('\'', r"'\''");
212 Ok(format!("'{escaped}'"))
213 }
214
215 #[cfg(test)]
216 mod tests {
217 use super::*;
218
219 #[test]
220 fn quotes_plain_path() {
221 assert_eq!(try_quote("/a/b.sh").unwrap(), "'/a/b.sh'");
222 }
223
224 #[test]
225 fn escapes_embedded_single_quote() {
226 assert_eq!(try_quote("x'y").unwrap(), r"'x'\''y'");
227 }
228 }
229}