use std::path::{Path, PathBuf};
use std::process::Command;
use std::thread;
use std::time::{Duration, Instant};
use anyhow::{Context, Result};
use crate::compose::AgentHandle;
#[derive(Debug, Clone)]
pub struct AgentSpec {
pub project: String,
pub agent: String,
pub tmux_session: String,
pub wrapper: PathBuf,
pub cwd: PathBuf,
pub env_file: PathBuf,
}
impl AgentSpec {
pub fn from_handle(h: AgentHandle<'_>, root: &Path, tmux_prefix: &str) -> Self {
Self {
project: h.project.into(),
agent: h.agent.into(),
tmux_session: format!("{tmux_prefix}{}-{}", h.project, h.agent),
wrapper: root.join("bin/agent-wrapper.sh"),
cwd: root.to_path_buf(),
env_file: crate::render::env_path(root, h.project, h.agent),
}
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum AgentState {
Running,
Stopped,
Unknown,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum DrainOutcome {
Graceful,
TimedOutKilled,
}
pub trait Supervisor {
fn up(&self, spec: &AgentSpec) -> Result<()>;
fn down(&self, spec: &AgentSpec) -> Result<()>;
fn state(&self, spec: &AgentSpec) -> Result<AgentState>;
fn drain(&self, spec: &AgentSpec, _timeout: Duration) -> Result<DrainOutcome> {
self.down(spec)?;
Ok(DrainOutcome::TimedOutKilled)
}
fn drain_poll_interval(&self) -> Duration {
Duration::from_millis(250)
}
}
pub fn orchestrate_drain<S, F>(
supervisor: &S,
spec: &AgentSpec,
timeout: Duration,
signal_fn: F,
) -> Result<DrainOutcome>
where
S: Supervisor + ?Sized,
F: FnOnce(),
{
signal_fn();
let outcome = poll_for_stopped(timeout, supervisor.drain_poll_interval(), || {
supervisor.state(spec).unwrap_or(AgentState::Unknown)
});
if outcome == DrainOutcome::TimedOutKilled {
supervisor.down(spec)?;
}
Ok(outcome)
}
fn env_assignments(env_file: &Path) -> Result<Vec<String>> {
let body = match std::fs::read_to_string(env_file) {
Ok(b) => b,
Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
Err(e) => return Err(e).with_context(|| format!("read env file {}", env_file.display())),
};
Ok(body
.lines()
.filter(|l| !l.trim().is_empty() && l.contains('='))
.map(str::to_string)
.collect())
}
fn build_up_command(spec: &AgentSpec) -> Result<String> {
let mut parts: Vec<String> = vec!["env".to_string()];
for kv in env_assignments(&spec.env_file)? {
parts.push(shlex::try_quote(&kv)?);
}
parts.push(shlex::try_quote(&spec.wrapper.display().to_string())?);
parts.push(format!("{}:{}", spec.project, spec.agent));
Ok(parts.join(" "))
}
pub struct TmuxSupervisor;
impl Supervisor for TmuxSupervisor {
fn up(&self, spec: &AgentSpec) -> Result<()> {
if matches!(self.state(spec)?, AgentState::Running) {
return Ok(());
}
let cmd = build_up_command(spec)?;
let status = Command::new("tmux")
.args([
"new-session",
"-d",
"-x",
"200",
"-y",
"50",
"-s",
&spec.tmux_session,
"-c",
&spec.cwd.display().to_string(),
"sh",
"-c",
&cmd,
])
.status()
.context("spawn tmux new-session")?;
anyhow::ensure!(status.success(), "tmux new-session exited {status}");
let cwd_str = spec.cwd.to_string_lossy();
for (key, value) in [
("@teamctl", "1"),
("@teamctl-project", spec.project.as_str()),
("@teamctl-agent", spec.agent.as_str()),
("@teamctl-root", cwd_str.as_ref()),
] {
let _ = Command::new("tmux")
.args(["set-option", "-q", "-t", &spec.tmux_session, key, value])
.status();
}
Ok(())
}
fn down(&self, spec: &AgentSpec) -> Result<()> {
let _ = Command::new("tmux")
.args(["kill-session", "-t", &spec.tmux_session])
.status();
Ok(())
}
fn state(&self, spec: &AgentSpec) -> Result<AgentState> {
let out = Command::new("tmux")
.args(["has-session", "-t", &spec.tmux_session])
.output();
Ok(match out {
Ok(o) if o.status.success() => AgentState::Running,
Ok(_) => AgentState::Stopped,
Err(_) => AgentState::Unknown,
})
}
fn drain(&self, spec: &AgentSpec, timeout: Duration) -> Result<DrainOutcome> {
orchestrate_drain(self, spec, timeout, || {
let _ = Command::new("tmux")
.args(["send-keys", "-t", &spec.tmux_session, "C-c"])
.status();
})
}
}
fn poll_for_stopped<F: FnMut() -> AgentState>(
timeout: Duration,
interval: Duration,
mut observe_state: F,
) -> DrainOutcome {
let deadline = Instant::now() + timeout;
loop {
if observe_state() == AgentState::Stopped {
return DrainOutcome::Graceful;
}
if Instant::now() >= deadline {
return DrainOutcome::TimedOutKilled;
}
thread::sleep(interval);
}
}
#[cfg(test)]
mod drain_tests {
use super::*;
use std::cell::RefCell;
#[test]
fn poll_returns_graceful_when_stopped_observed_in_time() {
let calls = RefCell::new(0u32);
let outcome = poll_for_stopped(Duration::from_millis(50), Duration::from_millis(1), || {
let mut n = calls.borrow_mut();
*n += 1;
if *n >= 2 {
AgentState::Stopped
} else {
AgentState::Running
}
});
assert_eq!(outcome, DrainOutcome::Graceful);
}
#[test]
fn poll_falls_through_to_kill_when_agent_never_stops() {
let outcome = poll_for_stopped(Duration::from_millis(8), Duration::from_millis(2), || {
AgentState::Running
});
assert_eq!(outcome, DrainOutcome::TimedOutKilled);
}
#[test]
fn poll_zero_timeout_only_checks_once_then_kills() {
let mut calls: u32 = 0;
let outcome = poll_for_stopped(Duration::from_millis(0), Duration::from_millis(1), || {
calls += 1;
AgentState::Running
});
assert_eq!(outcome, DrainOutcome::TimedOutKilled);
assert_eq!(calls, 1, "single state observation before timeout");
}
#[derive(Default)]
struct MockSupervisor {
calls: RefCell<Vec<&'static str>>,
stop_after: u32,
state_calls: RefCell<u32>,
poll_interval: Duration,
}
impl MockSupervisor {
fn record(&self, op: &'static str) {
self.calls.borrow_mut().push(op);
}
}
impl Supervisor for MockSupervisor {
fn up(&self, _spec: &AgentSpec) -> Result<()> {
self.record("up");
Ok(())
}
fn down(&self, _spec: &AgentSpec) -> Result<()> {
self.record("down");
Ok(())
}
fn state(&self, _spec: &AgentSpec) -> Result<AgentState> {
self.record("state");
let mut n = self.state_calls.borrow_mut();
*n += 1;
if self.stop_after > 0 && *n >= self.stop_after {
Ok(AgentState::Stopped)
} else {
Ok(AgentState::Running)
}
}
fn drain_poll_interval(&self) -> Duration {
self.poll_interval
}
}
fn fake_spec() -> AgentSpec {
AgentSpec {
project: "p".into(),
agent: "a".into(),
tmux_session: "p-a".into(),
wrapper: PathBuf::from("/dev/null"),
cwd: PathBuf::from("/tmp"),
env_file: PathBuf::from("/dev/null"),
}
}
#[test]
fn drain_with_zero_timeout_returns_timed_out_killed_and_calls_down() {
let mock = MockSupervisor {
poll_interval: Duration::from_millis(1),
..Default::default()
};
let spec = fake_spec();
let signaled = RefCell::new(false);
let outcome = orchestrate_drain(&mock, &spec, Duration::ZERO, || {
*signaled.borrow_mut() = true;
})
.unwrap();
assert_eq!(outcome, DrainOutcome::TimedOutKilled);
assert!(*signaled.borrow(), "signal_fn must run before the poll");
assert_eq!(
mock.calls.borrow().as_slice(),
&["state", "down"],
"zero-timeout: one state observation then kill"
);
}
#[test]
fn drain_with_graceful_stop_does_not_call_down() {
let mock = MockSupervisor {
poll_interval: Duration::from_millis(1),
stop_after: 2, ..Default::default()
};
let spec = fake_spec();
let outcome = orchestrate_drain(&mock, &spec, Duration::from_millis(100), || {}).unwrap();
assert_eq!(outcome, DrainOutcome::Graceful);
assert!(
!mock.calls.borrow().contains(&"down"),
"graceful drain must not call down(); calls: {:?}",
mock.calls.borrow()
);
}
#[test]
fn drain_poll_interval_default_is_250ms() {
struct Default250;
impl Supervisor for Default250 {
fn up(&self, _: &AgentSpec) -> Result<()> {
Ok(())
}
fn down(&self, _: &AgentSpec) -> Result<()> {
Ok(())
}
fn state(&self, _: &AgentSpec) -> Result<AgentState> {
Ok(AgentState::Stopped)
}
}
assert_eq!(Default250.drain_poll_interval(), Duration::from_millis(250));
}
#[test]
fn drain_poll_interval_override_is_used_by_orchestrator() {
let mock = MockSupervisor {
poll_interval: Duration::from_millis(2),
stop_after: 0,
..Default::default()
};
let spec = fake_spec();
let start = Instant::now();
let _ = orchestrate_drain(&mock, &spec, Duration::from_millis(8), || {});
let elapsed = start.elapsed();
let states = mock
.calls
.borrow()
.iter()
.filter(|c| **c == "state")
.count();
assert!(
states >= 2,
"expected several state observations at 2ms cadence, got {states}"
);
assert!(
elapsed < Duration::from_millis(60),
"drain with 2ms interval finished too slowly ({elapsed:?})"
);
}
}
pub(crate) mod shlex {
pub fn try_quote(s: &str) -> anyhow::Result<String> {
anyhow::ensure!(!s.contains('\0'), "null byte in shell arg");
let escaped = s.replace('\'', r"'\''");
Ok(format!("'{escaped}'"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn quotes_plain_path() {
assert_eq!(try_quote("/a/b.sh").unwrap(), "'/a/b.sh'");
}
#[test]
fn escapes_embedded_single_quote() {
assert_eq!(try_quote("x'y").unwrap(), r"'x'\''y'");
}
}
}
#[cfg(test)]
#[cfg(unix)]
mod env_harden_tests {
use super::*;
use std::fs;
use std::os::unix::fs::PermissionsExt;
use std::process::Command;
fn spec_with(env_file: &Path, wrapper: &Path, cwd: &Path) -> AgentSpec {
AgentSpec {
project: "proj".into(),
agent: "agt".into(),
tmux_session: "proj-agt".into(),
wrapper: wrapper.to_path_buf(),
cwd: cwd.to_path_buf(),
env_file: env_file.to_path_buf(),
}
}
#[test]
fn env_values_round_trip_through_real_shell() {
let dir = tempfile::tempdir().unwrap();
let env_file = dir.path().join("agt.env");
fs::write(
&env_file,
"MY_PATH=/some path with spaces/x\nGL=*?\nPLAIN=ok\n",
)
.unwrap();
let cwd = tempfile::tempdir().unwrap();
fs::write(cwd.path().join("decoy"), "x").unwrap();
let wrapper = dir.path().join("wrapper.sh");
fs::write(
&wrapper,
"#!/bin/sh\nprintf 'MY_PATH=[%s]\\n' \"$MY_PATH\"\n\
printf 'GL=[%s]\\n' \"$GL\"\nprintf 'PLAIN=[%s]\\n' \"$PLAIN\"\n",
)
.unwrap();
fs::set_permissions(&wrapper, fs::Permissions::from_mode(0o755)).unwrap();
let spec = spec_with(&env_file, &wrapper, cwd.path());
let cmd = build_up_command(&spec).unwrap();
let out = Command::new("sh")
.arg("-c")
.arg(&cmd)
.current_dir(cwd.path())
.output()
.unwrap();
let stdout = String::from_utf8_lossy(&out.stdout);
assert!(
stdout.contains("MY_PATH=[/some path with spaces/x]"),
"spaced value mangled by the shell — cmd: {cmd}\nstdout: {stdout}"
);
assert!(
stdout.contains("GL=[*?]"),
"glob value expanded against cwd — cmd: {cmd}\nstdout: {stdout}"
);
assert!(
stdout.contains("PLAIN=[ok]"),
"common-case value lost — cmd: {cmd}\nstdout: {stdout}"
);
}
#[test]
fn env_assignments_keeps_lines_verbatim_skips_blank_and_no_eq() {
let dir = tempfile::tempdir().unwrap();
let f = dir.path().join("a.env");
fs::write(&f, "K=v\nSP=/a b/c\nGL=*?\nEQ=a=b\nEMPTY=\n\nstray-no-eq\n").unwrap();
assert_eq!(
env_assignments(&f).unwrap(),
vec![
"K=v".to_string(),
"SP=/a b/c".to_string(),
"GL=*?".to_string(),
"EQ=a=b".to_string(),
"EMPTY=".to_string(),
]
);
}
#[test]
fn env_assignments_missing_file_is_empty_not_error() {
let p = Path::new("/no/such/teamctl/env/file.env");
assert_eq!(env_assignments(p).unwrap(), Vec::<String>::new());
}
#[test]
fn build_up_command_quotes_each_token_and_has_no_cat() {
let dir = tempfile::tempdir().unwrap();
let env_file = dir.path().join("a.env");
fs::write(&env_file, "SP=/a b/c\nGL=*?\n").unwrap();
let spec = spec_with(&env_file, Path::new("/w/wrap.sh"), Path::new("/tmp"));
let cmd = build_up_command(&spec).unwrap();
assert!(!cmd.contains("$(cat"), "command substitution gone: {cmd}");
assert!(!cmd.contains("cat "), "no cat at all: {cmd}");
assert!(
cmd.contains("'SP=/a b/c'"),
"spaced kv single-quoted: {cmd}"
);
assert!(cmd.contains("'GL=*?'"), "glob kv single-quoted: {cmd}");
assert!(cmd.contains("'/w/wrap.sh'"), "wrapper still quoted: {cmd}");
assert!(cmd.ends_with(" proj:agt"), "agent arg unchanged: {cmd}");
}
#[test]
fn build_up_command_empty_env_has_no_stray_token() {
let dir = tempfile::tempdir().unwrap();
let env_file = dir.path().join("empty.env");
fs::write(&env_file, "").unwrap();
let spec = spec_with(&env_file, Path::new("/w/wrap.sh"), Path::new("/tmp"));
assert_eq!(
build_up_command(&spec).unwrap(),
"env '/w/wrap.sh' proj:agt"
);
}
}