1use std::path::{Path, PathBuf};
8use std::process::Command;
9use std::thread;
10use std::time::{Duration, Instant};
11
12use anyhow::{Context, Result};
13
14use crate::compose::AgentHandle;
15
16#[derive(Debug, Clone)]
17pub struct AgentSpec {
18 pub project: String,
19 pub agent: String,
20 pub tmux_session: String,
21 pub wrapper: PathBuf,
22 pub cwd: PathBuf,
23 pub env_file: PathBuf,
24}
25
26impl AgentSpec {
27 pub fn from_handle(h: AgentHandle<'_>, root: &Path, tmux_prefix: &str) -> Self {
28 Self {
29 project: h.project.into(),
30 agent: h.agent.into(),
31 tmux_session: format!("{tmux_prefix}{}-{}", h.project, h.agent),
32 wrapper: root.join("bin/agent-wrapper.sh"),
33 cwd: root.to_path_buf(),
34 env_file: crate::render::env_path(root, h.project, h.agent),
35 }
36 }
37}
38
39#[derive(Debug, Clone, Copy, PartialEq, Eq)]
41pub enum AgentState {
42 Running,
43 Stopped,
44 Unknown,
45}
46
47#[derive(Debug, Clone, Copy, PartialEq, Eq)]
53pub enum DrainOutcome {
54 Graceful,
55 TimedOutKilled,
56}
57
58pub trait Supervisor {
59 fn up(&self, spec: &AgentSpec) -> Result<()>;
60 fn down(&self, spec: &AgentSpec) -> Result<()>;
61 fn state(&self, spec: &AgentSpec) -> Result<AgentState>;
62
63 fn drain(&self, spec: &AgentSpec, _timeout: Duration) -> Result<DrainOutcome> {
68 self.down(spec)?;
69 Ok(DrainOutcome::TimedOutKilled)
70 }
71
72 fn drain_poll_interval(&self) -> Duration {
79 Duration::from_millis(250)
80 }
81}
82
83pub fn orchestrate_drain<S, F>(
93 supervisor: &S,
94 spec: &AgentSpec,
95 timeout: Duration,
96 signal_fn: F,
97) -> Result<DrainOutcome>
98where
99 S: Supervisor + ?Sized,
100 F: FnOnce(),
101{
102 signal_fn();
103 let outcome = poll_for_stopped(timeout, supervisor.drain_poll_interval(), || {
104 supervisor.state(spec).unwrap_or(AgentState::Unknown)
105 });
106 if outcome == DrainOutcome::TimedOutKilled {
107 supervisor.down(spec)?;
108 }
109 Ok(outcome)
110}
111
112fn env_assignments(env_file: &Path) -> Result<Vec<String>> {
126 let body = match std::fs::read_to_string(env_file) {
127 Ok(b) => b,
128 Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(Vec::new()),
129 Err(e) => return Err(e).with_context(|| format!("read env file {}", env_file.display())),
130 };
131 Ok(body
132 .lines()
133 .filter(|l| !l.trim().is_empty() && l.contains('='))
134 .map(str::to_string)
135 .collect())
136}
137
138fn build_up_command(spec: &AgentSpec) -> Result<String> {
145 let mut parts: Vec<String> = vec!["env".to_string()];
146 for kv in env_assignments(&spec.env_file)? {
147 parts.push(shlex::try_quote(&kv)?);
148 }
149 parts.push(shlex::try_quote(&spec.wrapper.display().to_string())?);
150 parts.push(format!("{}:{}", spec.project, spec.agent));
151 Ok(parts.join(" "))
152}
153
154pub struct TmuxSupervisor;
156
157impl Supervisor for TmuxSupervisor {
158 fn up(&self, spec: &AgentSpec) -> Result<()> {
159 if matches!(self.state(spec)?, AgentState::Running) {
160 return Ok(());
161 }
162 let cmd = build_up_command(spec)?;
163 let status = Command::new("tmux")
170 .args([
171 "new-session",
172 "-d",
173 "-x",
174 "200",
175 "-y",
176 "50",
177 "-s",
178 &spec.tmux_session,
179 "-c",
180 &spec.cwd.display().to_string(),
181 "sh",
182 "-c",
183 &cmd,
184 ])
185 .status()
186 .context("spawn tmux new-session")?;
187 anyhow::ensure!(status.success(), "tmux new-session exited {status}");
188 let cwd_str = spec.cwd.to_string_lossy();
193 for (key, value) in [
194 ("@teamctl", "1"),
195 ("@teamctl-project", spec.project.as_str()),
196 ("@teamctl-agent", spec.agent.as_str()),
197 ("@teamctl-root", cwd_str.as_ref()),
198 ] {
199 let _ = Command::new("tmux")
200 .args(["set-option", "-q", "-t", &spec.tmux_session, key, value])
201 .status();
202 }
203 Ok(())
204 }
205
206 fn down(&self, spec: &AgentSpec) -> Result<()> {
207 let _ = Command::new("tmux")
208 .args(["kill-session", "-t", &spec.tmux_session])
209 .status();
210 Ok(())
211 }
212
213 fn state(&self, spec: &AgentSpec) -> Result<AgentState> {
214 let out = Command::new("tmux")
215 .args(["has-session", "-t", &spec.tmux_session])
216 .output();
217 Ok(match out {
218 Ok(o) if o.status.success() => AgentState::Running,
219 Ok(_) => AgentState::Stopped,
220 Err(_) => AgentState::Unknown,
221 })
222 }
223
224 fn drain(&self, spec: &AgentSpec, timeout: Duration) -> Result<DrainOutcome> {
231 orchestrate_drain(self, spec, timeout, || {
232 let _ = Command::new("tmux")
233 .args(["send-keys", "-t", &spec.tmux_session, "C-c"])
234 .status();
235 })
236 }
237}
238
239fn poll_for_stopped<F: FnMut() -> AgentState>(
244 timeout: Duration,
245 interval: Duration,
246 mut observe_state: F,
247) -> DrainOutcome {
248 let deadline = Instant::now() + timeout;
249 loop {
250 if observe_state() == AgentState::Stopped {
251 return DrainOutcome::Graceful;
252 }
253 if Instant::now() >= deadline {
254 return DrainOutcome::TimedOutKilled;
255 }
256 thread::sleep(interval);
257 }
258}
259
260#[cfg(test)]
261mod drain_tests {
262 use super::*;
263 use std::cell::RefCell;
264
265 #[test]
266 fn poll_returns_graceful_when_stopped_observed_in_time() {
267 let calls = RefCell::new(0u32);
268 let outcome = poll_for_stopped(Duration::from_millis(50), Duration::from_millis(1), || {
269 let mut n = calls.borrow_mut();
270 *n += 1;
271 if *n >= 2 {
272 AgentState::Stopped
273 } else {
274 AgentState::Running
275 }
276 });
277 assert_eq!(outcome, DrainOutcome::Graceful);
278 }
279
280 #[test]
281 fn poll_falls_through_to_kill_when_agent_never_stops() {
282 let outcome = poll_for_stopped(Duration::from_millis(8), Duration::from_millis(2), || {
283 AgentState::Running
284 });
285 assert_eq!(outcome, DrainOutcome::TimedOutKilled);
286 }
287
288 #[test]
289 fn poll_zero_timeout_only_checks_once_then_kills() {
290 let mut calls: u32 = 0;
291 let outcome = poll_for_stopped(Duration::from_millis(0), Duration::from_millis(1), || {
292 calls += 1;
293 AgentState::Running
294 });
295 assert_eq!(outcome, DrainOutcome::TimedOutKilled);
296 assert_eq!(calls, 1, "single state observation before timeout");
297 }
298
299 #[derive(Default)]
305 struct MockSupervisor {
306 calls: RefCell<Vec<&'static str>>,
307 stop_after: u32,
310 state_calls: RefCell<u32>,
311 poll_interval: Duration,
312 }
313
314 impl MockSupervisor {
315 fn record(&self, op: &'static str) {
316 self.calls.borrow_mut().push(op);
317 }
318 }
319
320 impl Supervisor for MockSupervisor {
321 fn up(&self, _spec: &AgentSpec) -> Result<()> {
322 self.record("up");
323 Ok(())
324 }
325 fn down(&self, _spec: &AgentSpec) -> Result<()> {
326 self.record("down");
327 Ok(())
328 }
329 fn state(&self, _spec: &AgentSpec) -> Result<AgentState> {
330 self.record("state");
331 let mut n = self.state_calls.borrow_mut();
332 *n += 1;
333 if self.stop_after > 0 && *n >= self.stop_after {
334 Ok(AgentState::Stopped)
335 } else {
336 Ok(AgentState::Running)
337 }
338 }
339 fn drain_poll_interval(&self) -> Duration {
340 self.poll_interval
341 }
342 }
343
344 fn fake_spec() -> AgentSpec {
345 AgentSpec {
346 project: "p".into(),
347 agent: "a".into(),
348 tmux_session: "p-a".into(),
349 wrapper: PathBuf::from("/dev/null"),
350 cwd: PathBuf::from("/tmp"),
351 env_file: PathBuf::from("/dev/null"),
352 }
353 }
354
355 #[test]
356 fn drain_with_zero_timeout_returns_timed_out_killed_and_calls_down() {
357 let mock = MockSupervisor {
361 poll_interval: Duration::from_millis(1),
362 ..Default::default()
363 };
364 let spec = fake_spec();
365 let signaled = RefCell::new(false);
366
367 let outcome = orchestrate_drain(&mock, &spec, Duration::ZERO, || {
368 *signaled.borrow_mut() = true;
369 })
370 .unwrap();
371
372 assert_eq!(outcome, DrainOutcome::TimedOutKilled);
373 assert!(*signaled.borrow(), "signal_fn must run before the poll");
374 assert_eq!(
375 mock.calls.borrow().as_slice(),
376 &["state", "down"],
377 "zero-timeout: one state observation then kill"
378 );
379 }
380
381 #[test]
382 fn drain_with_graceful_stop_does_not_call_down() {
383 let mock = MockSupervisor {
387 poll_interval: Duration::from_millis(1),
388 stop_after: 2, ..Default::default()
390 };
391 let spec = fake_spec();
392
393 let outcome = orchestrate_drain(&mock, &spec, Duration::from_millis(100), || {}).unwrap();
394
395 assert_eq!(outcome, DrainOutcome::Graceful);
396 assert!(
397 !mock.calls.borrow().contains(&"down"),
398 "graceful drain must not call down(); calls: {:?}",
399 mock.calls.borrow()
400 );
401 }
402
403 #[test]
404 fn drain_poll_interval_default_is_250ms() {
405 struct Default250;
409 impl Supervisor for Default250 {
410 fn up(&self, _: &AgentSpec) -> Result<()> {
411 Ok(())
412 }
413 fn down(&self, _: &AgentSpec) -> Result<()> {
414 Ok(())
415 }
416 fn state(&self, _: &AgentSpec) -> Result<AgentState> {
417 Ok(AgentState::Stopped)
418 }
419 }
420 assert_eq!(Default250.drain_poll_interval(), Duration::from_millis(250));
421 }
422
423 #[test]
424 fn drain_poll_interval_override_is_used_by_orchestrator() {
425 let mock = MockSupervisor {
429 poll_interval: Duration::from_millis(2),
430 stop_after: 0,
431 ..Default::default()
432 };
433 let spec = fake_spec();
434
435 let start = Instant::now();
436 let _ = orchestrate_drain(&mock, &spec, Duration::from_millis(8), || {});
437 let elapsed = start.elapsed();
438
439 let states = mock
443 .calls
444 .borrow()
445 .iter()
446 .filter(|c| **c == "state")
447 .count();
448 assert!(
449 states >= 2,
450 "expected several state observations at 2ms cadence, got {states}"
451 );
452 assert!(
453 elapsed < Duration::from_millis(60),
454 "drain with 2ms interval finished too slowly ({elapsed:?})"
455 );
456 }
457}
458
459pub(crate) mod shlex {
460 pub fn try_quote(s: &str) -> anyhow::Result<String> {
464 anyhow::ensure!(!s.contains('\0'), "null byte in shell arg");
465 let escaped = s.replace('\'', r"'\''");
466 Ok(format!("'{escaped}'"))
467 }
468
469 #[cfg(test)]
470 mod tests {
471 use super::*;
472
473 #[test]
474 fn quotes_plain_path() {
475 assert_eq!(try_quote("/a/b.sh").unwrap(), "'/a/b.sh'");
476 }
477
478 #[test]
479 fn escapes_embedded_single_quote() {
480 assert_eq!(try_quote("x'y").unwrap(), r"'x'\''y'");
481 }
482 }
483}
484
485#[cfg(test)]
486#[cfg(unix)]
487mod env_harden_tests {
488 use super::*;
491 use std::fs;
492 use std::os::unix::fs::PermissionsExt;
493 use std::process::Command;
494
495 fn spec_with(env_file: &Path, wrapper: &Path, cwd: &Path) -> AgentSpec {
496 AgentSpec {
497 project: "proj".into(),
498 agent: "agt".into(),
499 tmux_session: "proj-agt".into(),
500 wrapper: wrapper.to_path_buf(),
501 cwd: cwd.to_path_buf(),
502 env_file: env_file.to_path_buf(),
503 }
504 }
505
506 #[test]
511 fn env_values_round_trip_through_real_shell() {
512 let dir = tempfile::tempdir().unwrap();
513 let env_file = dir.path().join("agt.env");
514 fs::write(
515 &env_file,
516 "MY_PATH=/some path with spaces/x\nGL=*?\nPLAIN=ok\n",
517 )
518 .unwrap();
519
520 let cwd = tempfile::tempdir().unwrap();
523 fs::write(cwd.path().join("decoy"), "x").unwrap();
524
525 let wrapper = dir.path().join("wrapper.sh");
526 fs::write(
527 &wrapper,
528 "#!/bin/sh\nprintf 'MY_PATH=[%s]\\n' \"$MY_PATH\"\n\
529 printf 'GL=[%s]\\n' \"$GL\"\nprintf 'PLAIN=[%s]\\n' \"$PLAIN\"\n",
530 )
531 .unwrap();
532 fs::set_permissions(&wrapper, fs::Permissions::from_mode(0o755)).unwrap();
533
534 let spec = spec_with(&env_file, &wrapper, cwd.path());
535 let cmd = build_up_command(&spec).unwrap();
536
537 let out = Command::new("sh")
538 .arg("-c")
539 .arg(&cmd)
540 .current_dir(cwd.path())
541 .output()
542 .unwrap();
543 let stdout = String::from_utf8_lossy(&out.stdout);
544
545 assert!(
546 stdout.contains("MY_PATH=[/some path with spaces/x]"),
547 "spaced value mangled by the shell — cmd: {cmd}\nstdout: {stdout}"
548 );
549 assert!(
550 stdout.contains("GL=[*?]"),
551 "glob value expanded against cwd — cmd: {cmd}\nstdout: {stdout}"
552 );
553 assert!(
554 stdout.contains("PLAIN=[ok]"),
555 "common-case value lost — cmd: {cmd}\nstdout: {stdout}"
556 );
557 }
558
559 #[test]
560 fn env_assignments_keeps_lines_verbatim_skips_blank_and_no_eq() {
561 let dir = tempfile::tempdir().unwrap();
562 let f = dir.path().join("a.env");
563 fs::write(&f, "K=v\nSP=/a b/c\nGL=*?\nEQ=a=b\nEMPTY=\n\nstray-no-eq\n").unwrap();
566 assert_eq!(
567 env_assignments(&f).unwrap(),
568 vec![
569 "K=v".to_string(),
570 "SP=/a b/c".to_string(),
571 "GL=*?".to_string(),
572 "EQ=a=b".to_string(),
573 "EMPTY=".to_string(),
574 ]
575 );
576 }
577
578 #[test]
579 fn env_assignments_missing_file_is_empty_not_error() {
580 let p = Path::new("/no/such/teamctl/env/file.env");
581 assert_eq!(env_assignments(p).unwrap(), Vec::<String>::new());
582 }
583
584 #[test]
585 fn build_up_command_quotes_each_token_and_has_no_cat() {
586 let dir = tempfile::tempdir().unwrap();
587 let env_file = dir.path().join("a.env");
588 fs::write(&env_file, "SP=/a b/c\nGL=*?\n").unwrap();
589 let spec = spec_with(&env_file, Path::new("/w/wrap.sh"), Path::new("/tmp"));
590
591 let cmd = build_up_command(&spec).unwrap();
592
593 assert!(!cmd.contains("$(cat"), "command substitution gone: {cmd}");
594 assert!(!cmd.contains("cat "), "no cat at all: {cmd}");
595 assert!(
596 cmd.contains("'SP=/a b/c'"),
597 "spaced kv single-quoted: {cmd}"
598 );
599 assert!(cmd.contains("'GL=*?'"), "glob kv single-quoted: {cmd}");
600 assert!(cmd.contains("'/w/wrap.sh'"), "wrapper still quoted: {cmd}");
601 assert!(cmd.ends_with(" proj:agt"), "agent arg unchanged: {cmd}");
602 }
603
604 #[test]
605 fn build_up_command_empty_env_has_no_stray_token() {
606 let dir = tempfile::tempdir().unwrap();
607 let env_file = dir.path().join("empty.env");
608 fs::write(&env_file, "").unwrap();
609 let spec = spec_with(&env_file, Path::new("/w/wrap.sh"), Path::new("/tmp"));
610
611 assert_eq!(
612 build_up_command(&spec).unwrap(),
613 "env '/w/wrap.sh' proj:agt"
614 );
615 }
616}