use crate::dispatch::DispatchHandle;
use crate::reflect::{
gap_domain, outcome_str, pr_number_of, DispatchReflection, NoopReflectionWriter,
ReflectionWriter,
};
use anyhow::{Context, Result};
use serde::Deserialize;
use std::collections::HashMap;
use std::path::{Path, PathBuf};
use std::process::Command;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum DispatchOutcome {
Shipped(u32),
Stalled,
Killed(String),
CiFailed(u32),
}
#[derive(Debug, Clone, Deserialize)]
pub struct PrStatus {
pub number: u32,
pub state: String,
#[serde(rename = "mergeStateStatus", default)]
pub merge_state_status: String,
}
pub trait PrProvider {
fn latest_pr(&self, branch: &str) -> Result<Option<PrStatus>>;
}
pub struct GhPrProvider;
impl PrProvider for GhPrProvider {
fn latest_pr(&self, branch: &str) -> Result<Option<PrStatus>> {
let out = Command::new("gh")
.args([
"pr",
"list",
"--head",
branch,
"--state",
"all",
"--json",
"number,state,mergeStateStatus",
"--limit",
"1",
])
.output()
.context("running gh pr list")?;
if !out.status.success() {
anyhow::bail!(
"gh pr list failed for {branch}: {}",
String::from_utf8_lossy(&out.stderr)
);
}
let stdout = String::from_utf8(out.stdout).context("gh pr list output not utf-8")?;
let rows: Vec<PrStatus> =
serde_json::from_str(stdout.trim()).context("parsing gh pr list JSON")?;
Ok(rows.into_iter().next())
}
}
pub fn soft_deadline_seconds(effort: &str) -> u64 {
match effort.to_ascii_lowercase().as_str() {
"s" => 20 * 60,
"m" => 60 * 60,
"l" => 180 * 60,
"xl" => 360 * 60, _ => 60 * 60, }
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum TickDecision {
KeepGoing,
Done(DispatchOutcome),
KillThenDone(String),
}
pub fn decide_tick(
pr: Option<&PrStatus>,
started_at_unix: u64,
now_unix: u64,
soft_deadline_secs: u64,
child_exit_code: Option<i32>,
) -> TickDecision {
if let Some(code) = child_exit_code {
if code == 0 {
} else {
return TickDecision::Done(DispatchOutcome::Killed(format!("exit code {code}")));
}
}
if let Some(p) = pr {
if p.state.eq_ignore_ascii_case("MERGED") {
return TickDecision::Done(DispatchOutcome::Shipped(p.number));
}
if p.state.eq_ignore_ascii_case("CLOSED") {
return TickDecision::Done(DispatchOutcome::CiFailed(p.number));
}
}
let elapsed = now_unix.saturating_sub(started_at_unix);
if pr.is_none() && elapsed > 2 * soft_deadline_secs {
return TickDecision::KillThenDone(format!(
"soft-deadline exceeded ({soft_deadline_secs}s × 2)"
));
}
if pr.is_none() && elapsed > soft_deadline_secs {
return TickDecision::Done(DispatchOutcome::Stalled);
}
TickDecision::KeepGoing
}
pub struct WatchEntry {
pub handle: DispatchHandle,
pub soft_deadline_secs: u64,
pub effort: String,
}
pub struct MonitorLoop<'p, P: PrProvider> {
entries: Vec<WatchEntry>,
#[allow(dead_code)]
repo_root: PathBuf,
tick: Duration,
provider: &'p P,
reflection_writer: Box<dyn ReflectionWriter>,
}
impl<'p, P: PrProvider> MonitorLoop<'p, P> {
pub fn new(
entries: Vec<WatchEntry>,
repo_root: PathBuf,
tick: Duration,
provider: &'p P,
) -> Self {
Self {
entries,
repo_root,
tick,
provider,
reflection_writer: Box::new(NoopReflectionWriter),
}
}
pub fn with_reflection_writer(mut self, writer: Box<dyn ReflectionWriter>) -> Self {
self.reflection_writer = writer;
self
}
pub async fn watch_until_done(mut self) -> Vec<(String, DispatchOutcome)> {
let mut pending: Vec<WatchEntry> = std::mem::take(&mut self.entries);
let mut done: HashMap<String, DispatchOutcome> = HashMap::new();
let mut order: Vec<String> = pending
.iter()
.map(|e| e.handle.branch_name.clone())
.collect();
let total_entries = order.len();
while !pending.is_empty() {
let mut still_pending: Vec<WatchEntry> = Vec::with_capacity(pending.len());
let in_flight = pending.len();
for mut entry in pending.drain(..) {
let exit = poll_child_exit(&mut entry.handle);
let pr = self
.provider
.latest_pr(&entry.handle.branch_name)
.unwrap_or_else(|e| {
eprintln!(
"[monitor] gh pr list failed for {}: {e:#} (will retry)",
entry.handle.branch_name
);
None
});
let now = unix_now();
let decision = decide_tick(
pr.as_ref(),
entry.handle.started_at_unix,
now,
entry.soft_deadline_secs,
exit,
);
match decision {
TickDecision::KeepGoing => still_pending.push(entry),
TickDecision::Done(out) => {
self.record_reflection(&entry, &out, now, in_flight);
done.insert(entry.handle.branch_name.clone(), out);
}
TickDecision::KillThenDone(reason) => {
kill_child_with_grace(&mut entry.handle);
let out = DispatchOutcome::Killed(reason);
self.record_reflection(&entry, &out, now, in_flight);
done.insert(entry.handle.branch_name.clone(), out);
}
}
}
pending = still_pending;
if pending.is_empty() {
break;
}
tokio::time::sleep(self.tick).await;
}
let _ = total_entries; order
.drain(..)
.map(|b| {
let o = done
.remove(&b)
.unwrap_or(DispatchOutcome::Stalled );
(b, o)
})
.collect()
}
fn record_reflection(
&self,
entry: &WatchEntry,
outcome: &DispatchOutcome,
now_unix: u64,
in_flight_at_tick_start: usize,
) {
let duration_s = now_unix.saturating_sub(entry.handle.started_at_unix);
let parallel_siblings = in_flight_at_tick_start.saturating_sub(1);
let backend_label = entry.handle.backend.label();
let stderr_tail = entry.handle.stderr_tail_snapshot();
let notes = if stderr_tail.is_empty() {
format!("backend={backend_label}")
} else {
format!("backend={backend_label} {stderr_tail}")
};
let reflection = DispatchReflection {
gap_id: entry.handle.gap_id.clone(),
effort: entry.effort.clone(),
gap_domain: gap_domain(&entry.handle.gap_id),
outcome: outcome_str(outcome).to_string(),
duration_s,
parallel_siblings,
pr_number: pr_number_of(outcome),
notes,
};
if let Err(e) = self.reflection_writer.write(&reflection) {
eprintln!(
"[monitor] reflection write failed for {} ({}): {e:#} (continuing)",
entry.handle.gap_id, reflection.outcome
);
}
}
}
fn poll_child_exit(handle: &mut DispatchHandle) -> Option<i32> {
let child = handle.child.as_mut()?;
match child.try_wait() {
Ok(Some(status)) => Some(status.code().unwrap_or(-1)),
Ok(None) => None,
Err(e) => {
eprintln!(
"[monitor] try_wait failed for pid {:?}: {e:#}",
handle.child_pid
);
None
}
}
}
fn kill_child_with_grace(handle: &mut DispatchHandle) {
let Some(child) = handle.child.as_mut() else {
return;
};
#[cfg(unix)]
{
if let Some(pid) = handle.child_pid {
let _ = unsafe {
libc_kill(pid as i32, 15 )
};
}
std::thread::sleep(Duration::from_secs(30));
}
let _ = child.kill();
let _ = child.wait();
}
#[cfg(unix)]
unsafe fn libc_kill(pid: i32, sig: i32) -> i32 {
extern "C" {
fn kill(pid: i32, sig: i32) -> i32;
}
kill(pid, sig)
}
fn unix_now() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0)
}
pub fn watch_entries(
handles: Vec<DispatchHandle>,
efforts_by_gap: &HashMap<String, String>,
) -> Vec<WatchEntry> {
handles
.into_iter()
.map(|h| {
let effort = efforts_by_gap
.get(&h.gap_id)
.cloned()
.unwrap_or_else(|| "m".to_string());
WatchEntry {
soft_deadline_secs: soft_deadline_seconds(&effort),
handle: h,
effort,
}
})
.collect()
}
pub fn default_monitor(
entries: Vec<WatchEntry>,
repo_root: &Path,
) -> MonitorLoop<'static, GhPrProvider> {
static PROVIDER: GhPrProvider = GhPrProvider;
MonitorLoop::new(
entries,
repo_root.to_path_buf(),
Duration::from_secs(30),
&PROVIDER,
)
}
#[cfg(test)]
mod tests {
use super::*;
use std::cell::RefCell;
use std::collections::HashMap as Map;
fn handle(branch: &str, gap: &str, started: u64) -> DispatchHandle {
DispatchHandle {
gap_id: gap.into(),
worktree_path: PathBuf::from(format!("/tmp/{branch}")),
branch_name: branch.into(),
child_pid: None,
started_at_unix: started,
child: None,
stderr_tail: None,
backend: crate::dispatch::DispatchBackend::Claude,
}
}
fn entry(h: DispatchHandle, soft: u64) -> WatchEntry {
WatchEntry {
handle: h,
soft_deadline_secs: soft,
effort: "m".into(),
}
}
fn pr(num: u32, state: &str) -> PrStatus {
PrStatus {
number: num,
state: state.into(),
merge_state_status: String::new(),
}
}
#[test]
fn soft_deadline_table() {
assert_eq!(soft_deadline_seconds("s"), 20 * 60);
assert_eq!(soft_deadline_seconds("M"), 60 * 60);
assert_eq!(soft_deadline_seconds("L"), 180 * 60);
assert_eq!(soft_deadline_seconds("xl"), 360 * 60);
assert_eq!(soft_deadline_seconds("weird"), 60 * 60);
}
#[test]
fn decide_shipped_when_pr_merged() {
let p = pr(42, "MERGED");
let d = decide_tick(Some(&p), 0, 100, 1000, None);
assert_eq!(d, TickDecision::Done(DispatchOutcome::Shipped(42)));
}
#[test]
fn decide_ci_failed_when_pr_closed() {
let p = pr(7, "CLOSED");
let d = decide_tick(Some(&p), 0, 100, 1000, None);
assert_eq!(d, TickDecision::Done(DispatchOutcome::CiFailed(7)));
}
#[test]
fn decide_keep_going_when_pr_open_within_deadline() {
let p = pr(99, "OPEN");
let d = decide_tick(Some(&p), 0, 100, 1000, None);
assert_eq!(d, TickDecision::KeepGoing);
}
#[test]
fn decide_keep_going_no_pr_within_deadline() {
let d = decide_tick(None, 0, 100, 1000, None);
assert_eq!(d, TickDecision::KeepGoing);
}
#[test]
fn decide_stalled_no_pr_past_soft_deadline() {
let d = decide_tick(None, 0, 1500, 1000, None);
assert_eq!(d, TickDecision::Done(DispatchOutcome::Stalled));
}
#[test]
fn decide_kill_no_pr_past_2x_soft_deadline() {
let d = decide_tick(None, 0, 2500, 1000, None);
match d {
TickDecision::KillThenDone(reason) => {
assert!(reason.contains("1000s × 2"), "got: {reason}");
}
other => panic!("expected KillThenDone, got {other:?}"),
}
}
#[test]
fn decide_killed_when_child_exits_nonzero() {
let d = decide_tick(None, 0, 100, 1000, Some(2));
assert_eq!(
d,
TickDecision::Done(DispatchOutcome::Killed("exit code 2".into()))
);
}
#[test]
fn decide_clean_exit_without_pr_keeps_going_until_deadline() {
let d = decide_tick(None, 0, 100, 1000, Some(0));
assert_eq!(d, TickDecision::KeepGoing);
}
#[test]
fn decide_pr_takes_precedence_over_deadline() {
let p = pr(11, "OPEN");
let d = decide_tick(Some(&p), 0, 9_999, 1000, None);
assert_eq!(d, TickDecision::KeepGoing);
}
struct ScriptedProvider {
scripts: RefCell<Map<String, Vec<Option<PrStatus>>>>,
}
impl ScriptedProvider {
fn new(scripts: Map<String, Vec<Option<PrStatus>>>) -> Self {
Self {
scripts: RefCell::new(scripts),
}
}
}
impl PrProvider for ScriptedProvider {
fn latest_pr(&self, branch: &str) -> Result<Option<PrStatus>> {
let mut s = self.scripts.borrow_mut();
let v = s.entry(branch.to_string()).or_default();
if v.len() > 1 {
Ok(v.remove(0))
} else {
Ok(v.first().cloned().unwrap_or(None))
}
}
}
#[tokio::test]
async fn watch_returns_immediately_on_empty_input() {
let provider = ScriptedProvider::new(Map::new());
let m = MonitorLoop::new(
vec![],
PathBuf::from("/tmp"),
Duration::from_millis(10),
&provider,
);
let out = m.watch_until_done().await;
assert!(out.is_empty());
}
#[tokio::test]
async fn watch_picks_up_merged_pr_on_first_tick() {
let mut scripts = Map::new();
scripts.insert("claude/x".to_string(), vec![Some(pr(101, "MERGED"))]);
let provider = ScriptedProvider::new(scripts);
let entries = vec![entry(handle("claude/x", "X-1", unix_now()), 600)];
let m = MonitorLoop::new(
entries,
PathBuf::from("/tmp"),
Duration::from_millis(5),
&provider,
);
let out = m.watch_until_done().await;
assert_eq!(out.len(), 1);
assert_eq!(out[0].0, "claude/x");
assert_eq!(out[0].1, DispatchOutcome::Shipped(101));
}
#[tokio::test]
async fn watch_marks_stalled_when_no_pr_after_deadline() {
let mut scripts = Map::new();
scripts.insert("claude/y".to_string(), vec![None]);
let provider = ScriptedProvider::new(scripts);
let entries = vec![entry(handle("claude/y", "Y-1", 0), 1)];
let m = MonitorLoop::new(
entries,
PathBuf::from("/tmp"),
Duration::from_millis(5),
&provider,
);
let out = m.watch_until_done().await;
assert_eq!(out.len(), 1);
match &out[0].1 {
DispatchOutcome::Killed(_) => {}
other => panic!("expected Killed (past 2x), got {other:?}"),
}
}
#[tokio::test]
async fn watch_processes_multiple_branches_in_input_order() {
let mut scripts = Map::new();
scripts.insert("claude/a".to_string(), vec![Some(pr(1, "MERGED"))]);
scripts.insert("claude/b".to_string(), vec![Some(pr(2, "MERGED"))]);
let provider = ScriptedProvider::new(scripts);
let entries = vec![
entry(handle("claude/a", "A", unix_now()), 600),
entry(handle("claude/b", "B", unix_now()), 600),
];
let m = MonitorLoop::new(
entries,
PathBuf::from("/tmp"),
Duration::from_millis(5),
&provider,
);
let out = m.watch_until_done().await;
assert_eq!(out.len(), 2);
assert_eq!(out[0].0, "claude/a");
assert_eq!(out[1].0, "claude/b");
assert_eq!(out[0].1, DispatchOutcome::Shipped(1));
assert_eq!(out[1].1, DispatchOutcome::Shipped(2));
}
#[test]
fn watch_entries_uses_effort_table_and_defaults_to_medium() {
let h1 = handle("claude/a", "A", 0);
let h2 = handle("claude/b", "B", 0);
let h3 = handle("claude/c", "C-NO-EFFORT", 0);
let mut efforts: Map<String, String> = Map::new();
efforts.insert("A".into(), "s".into());
efforts.insert("B".into(), "l".into());
let entries = watch_entries(vec![h1, h2, h3], &efforts);
assert_eq!(entries[0].soft_deadline_secs, 20 * 60);
assert_eq!(entries[0].effort, "s");
assert_eq!(entries[1].soft_deadline_secs, 180 * 60);
assert_eq!(entries[1].effort, "l");
assert_eq!(entries[2].soft_deadline_secs, 60 * 60);
assert_eq!(entries[2].effort, "m");
}
#[tokio::test]
async fn watch_writes_one_reflection_per_outcome() {
use crate::reflect::MemoryReflectionWriter;
let mut scripts = Map::new();
scripts.insert("claude/a".to_string(), vec![Some(pr(1, "MERGED"))]);
scripts.insert("claude/b".to_string(), vec![Some(pr(2, "CLOSED"))]);
scripts.insert("claude/c".to_string(), vec![None]); let provider = ScriptedProvider::new(scripts);
let entries = vec![
entry(handle("claude/a", "AUTO-1", unix_now()), 600),
entry(handle("claude/b", "EVAL-9", unix_now()), 600),
entry(handle("claude/c", "PRODUCT-3", 0), 1),
];
let writer = std::sync::Arc::new(MemoryReflectionWriter::new());
struct ArcAdapter(std::sync::Arc<MemoryReflectionWriter>);
impl crate::reflect::ReflectionWriter for ArcAdapter {
fn write(&self, r: &crate::reflect::DispatchReflection) -> Result<()> {
self.0.write(r)
}
}
let m = MonitorLoop::new(
entries,
PathBuf::from("/tmp"),
Duration::from_millis(5),
&provider,
)
.with_reflection_writer(Box::new(ArcAdapter(std::sync::Arc::clone(&writer))));
let out = m.watch_until_done().await;
assert_eq!(out.len(), 3);
let snap = writer.snapshot();
assert_eq!(snap.len(), 3, "one reflection per terminal outcome");
let by_gap: Map<String, _> = snap.iter().map(|r| (r.gap_id.clone(), r.clone())).collect();
assert_eq!(by_gap["AUTO-1"].outcome, "shipped");
assert_eq!(by_gap["AUTO-1"].pr_number, Some(1));
assert_eq!(by_gap["AUTO-1"].gap_domain, "auto");
assert_eq!(by_gap["EVAL-9"].outcome, "ci_failed");
assert_eq!(by_gap["EVAL-9"].pr_number, Some(2));
assert_eq!(by_gap["EVAL-9"].gap_domain, "eval");
assert_eq!(by_gap["PRODUCT-3"].outcome, "killed");
assert_eq!(by_gap["PRODUCT-3"].pr_number, None);
assert_eq!(by_gap["PRODUCT-3"].gap_domain, "product");
let d = by_gap["AUTO-1"].directive();
assert!(d.contains("gap=AUTO-1"));
assert!(d.contains("effort=m"));
assert!(d.contains("outcome=shipped"));
assert!(d.contains("pr_number=Some(1)"));
assert!(d.contains("parallel_siblings="));
}
#[tokio::test]
async fn watch_default_writer_is_noop() {
let mut scripts = Map::new();
scripts.insert("claude/x".to_string(), vec![Some(pr(7, "MERGED"))]);
let provider = ScriptedProvider::new(scripts);
let entries = vec![entry(handle("claude/x", "X", unix_now()), 600)];
let m = MonitorLoop::new(
entries,
PathBuf::from("/tmp"),
Duration::from_millis(5),
&provider,
);
let out = m.watch_until_done().await;
assert_eq!(out.len(), 1);
assert_eq!(out[0].1, DispatchOutcome::Shipped(7));
}
}