use std::path::Path;
use std::sync::Arc;
use std::time::Duration;
use super::messages::{BrokerMessage, StatusPayload};
use super::{BrokerState, WatchTarget, delivery};
pub const POLL_INTERVAL: Duration = Duration::from_secs(2);
#[must_use]
pub fn should_republish_working(
status: &str,
since_committed: Option<Duration>,
ttl: Duration,
) -> bool {
if status != "committed" {
return true;
}
if ttl.is_zero() {
return false;
}
match since_committed {
Some(elapsed) => elapsed <= ttl,
None => false,
}
}
fn parse_porcelain(stdout: &str) -> Vec<String> {
let mut paths: Vec<String> = Vec::new();
for line in stdout.lines() {
if line.len() < 4 {
continue;
}
let rest = &line[3..];
if let Some((from, to)) = rest.split_once(" -> ") {
paths.push(from.trim().to_string());
paths.push(to.trim().to_string());
} else {
paths.push(rest.trim().to_string());
}
}
paths.sort();
paths.dedup();
paths
}
async fn run_git_status(worktree: &Path) -> Option<Vec<String>> {
let output = tokio::process::Command::new("git")
.arg("status")
.arg("--porcelain")
.current_dir(worktree)
.output()
.await
.ok()?;
if !output.status.success() {
return None;
}
let stdout = String::from_utf8_lossy(&output.stdout);
Some(parse_porcelain(&stdout))
}
pub async fn watch_worktree(
state: Arc<BrokerState>,
target: WatchTarget,
mut shutdown: tokio::sync::watch::Receiver<bool>,
) {
let mut previous: Option<Vec<String>> = None;
let mut ticker = tokio::time::interval(POLL_INTERVAL);
ticker.tick().await;
loop {
tokio::select! {
_ = ticker.tick() => {}
_ = shutdown.changed() => {
if *shutdown.borrow() {
break;
}
}
}
let Some(current) = run_git_status(&target.worktree_path).await else {
if !target.worktree_path.exists() {
state.forget_watch_target(&target.worktree_path);
break;
}
continue;
};
if previous.as_ref() == Some(¤t) {
continue;
}
if previous.is_none() && current.is_empty() {
previous = Some(current);
continue;
}
let (status, since_committed, ttl) = {
let inner = state.read();
let ttl = inner.republish_working_ttl;
let rec = inner.agents.get(&target.agent_id);
let status = rec.map(|r| r.status.clone()).unwrap_or_default();
let since = rec.and_then(|r| r.last_committed_at).map(|t| t.elapsed());
(status, since, ttl)
};
if !should_republish_working(&status, since_committed, ttl) {
previous = Some(current);
continue;
}
let msg = BrokerMessage::Status {
agent_id: target.agent_id.clone(),
payload: StatusPayload {
status: "working".to_string(),
modified_files: current.clone(),
message: None,
..Default::default()
},
};
delivery::publish_message(&state, &msg);
previous = Some(current);
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn non_committed_status_always_publishes() {
assert!(should_republish_working(
"working",
None,
Duration::from_secs(45)
));
assert!(should_republish_working("idle", None, Duration::ZERO));
}
#[test]
fn committed_within_ttl_republishes() {
assert!(should_republish_working(
"committed",
Some(Duration::from_secs(10)),
Duration::from_secs(45)
));
}
#[test]
fn committed_past_ttl_does_not_republish() {
assert!(!should_republish_working(
"committed",
Some(Duration::from_secs(290)),
Duration::from_secs(45)
));
}
#[test]
fn committed_with_zero_ttl_does_not_republish() {
assert!(!should_republish_working(
"committed",
Some(Duration::from_secs(0)),
Duration::ZERO
));
}
#[test]
fn committed_without_timestamp_does_not_republish() {
assert!(!should_republish_working(
"committed",
None,
Duration::from_secs(45)
));
}
#[test]
fn parse_porcelain_handles_modified_and_untracked() {
let input = " M src/main.rs\n?? new_file.txt\nM src/lib.rs\n";
let parsed = parse_porcelain(input);
assert_eq!(
parsed,
vec![
"new_file.txt".to_string(),
"src/lib.rs".to_string(),
"src/main.rs".to_string(),
]
);
}
#[test]
fn parse_porcelain_handles_renames() {
let input = "R old.rs -> new.rs\n";
let parsed = parse_porcelain(input);
assert_eq!(parsed, vec!["new.rs".to_string(), "old.rs".to_string()]);
}
#[test]
fn parse_porcelain_empty_is_empty_vec() {
assert!(parse_porcelain("").is_empty());
}
#[test]
fn parse_porcelain_dedupes() {
let input = " M a.rs\n M a.rs\n";
let parsed = parse_porcelain(input);
assert_eq!(parsed, vec!["a.rs".to_string()]);
}
fn init_test_repo(dir: &std::path::Path) {
use std::process::Command;
let run = |args: &[&str]| {
Command::new("git")
.args(args)
.current_dir(dir)
.output()
.expect("git command failed");
};
run(&["init", "-q", "-b", "main"]);
run(&["config", "user.email", "test@example.com"]);
run(&["config", "user.name", "test"]);
run(&["commit", "--allow-empty", "-m", "root", "-q"]);
}
#[tokio::test(flavor = "current_thread")]
#[serial_test::serial]
async fn run_git_status_detects_new_file() {
let tmp = tempfile::tempdir().unwrap();
init_test_repo(tmp.path());
std::fs::write(tmp.path().join("hello.txt"), "hi").unwrap();
let result = run_git_status(tmp.path()).await.unwrap();
assert!(
result.iter().any(|p| p == "hello.txt"),
"expected hello.txt in {result:?}"
);
}
#[tokio::test(flavor = "current_thread")]
#[serial_test::serial]
async fn watch_worktree_burst_republishes_working_once() {
use crate::broker::BrokerState;
use crate::broker::messages::{ArtifactPayload, BrokerMessage};
let tmp = tempfile::tempdir().unwrap();
init_test_repo(tmp.path());
let state = Arc::new(BrokerState::new(None));
super::delivery::publish_message(
&state,
&BrokerMessage::Artifact {
agent_id: "feat-b".to_string(),
payload: ArtifactPayload {
status: "committed".to_string(),
exports: vec![],
modified_files: vec![],
},
},
);
let (tx, rx) = tokio::sync::watch::channel(false);
let target = WatchTarget {
agent_id: "feat-b".to_string(),
cli: "claude".to_string(),
worktree_path: tmp.path().to_path_buf(),
};
let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
tokio::time::sleep(Duration::from_millis(300)).await;
for i in 0..10 {
std::fs::write(tmp.path().join(format!("f{i}.rs")), "x").unwrap();
}
tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
let working_count = {
let inner = state.read();
inner
.message_log
.iter()
.filter(|(_, _, m)| {
matches!(m, BrokerMessage::Status { agent_id, payload }
if agent_id == "feat-b" && payload.status == "working")
})
.count()
};
assert_eq!(
working_count, 1,
"a burst of writes within one poll interval must republish working exactly once"
);
let _ = tx.send(true);
let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
}
#[tokio::test(flavor = "current_thread")]
#[serial_test::serial]
async fn run_git_status_respects_gitignore() {
let tmp = tempfile::tempdir().unwrap();
init_test_repo(tmp.path());
std::fs::write(tmp.path().join(".gitignore"), "target/\n").unwrap();
std::fs::create_dir(tmp.path().join("target")).unwrap();
std::fs::write(tmp.path().join("target").join("build.o"), "x").unwrap();
let result = run_git_status(tmp.path()).await.unwrap();
assert!(
!result.iter().any(|p| p.starts_with("target/")),
"target/ should be filtered by gitignore, got {result:?}"
);
}
#[tokio::test(flavor = "current_thread")]
#[serial_test::serial]
async fn watch_worktree_publishes_on_change() {
use crate::broker::BrokerState;
let tmp = tempfile::tempdir().unwrap();
init_test_repo(tmp.path());
let state = Arc::new(BrokerState::new(None));
let (tx, rx) = tokio::sync::watch::channel(false);
let target = WatchTarget {
agent_id: "feat-x".to_string(),
cli: "claude".to_string(),
worktree_path: tmp.path().to_path_buf(),
};
let state_clone = Arc::clone(&state);
let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
tokio::time::sleep(Duration::from_millis(300)).await;
std::fs::write(tmp.path().join("change.txt"), "hello").unwrap();
tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
let msg = {
let inner = state.read();
let record = inner
.agents
.get("feat-x")
.expect("watcher should register the agent");
record
.last_message
.clone()
.expect("watcher should publish a message")
};
match msg {
BrokerMessage::Status { agent_id, payload } => {
assert_eq!(agent_id, "feat-x");
assert!(payload.modified_files.iter().any(|p| p == "change.txt"));
}
other => panic!("expected Status message, got {other:?}"),
}
let _ = tx.send(true);
let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
}
#[tokio::test(flavor = "current_thread")]
#[serial_test::serial]
async fn watch_worktree_reenters_working_after_commit() {
use crate::broker::BrokerState;
use crate::broker::messages::{ArtifactPayload, BrokerMessage};
let tmp = tempfile::tempdir().unwrap();
init_test_repo(tmp.path());
let state = Arc::new(BrokerState::new(None));
super::delivery::publish_message(
&state,
&BrokerMessage::Artifact {
agent_id: "feat-x".to_string(),
payload: ArtifactPayload {
status: "committed".to_string(),
exports: vec![],
modified_files: vec![],
},
},
);
assert_eq!(state.read().agents["feat-x"].status, "committed");
let (tx, rx) = tokio::sync::watch::channel(false);
let target = WatchTarget {
agent_id: "feat-x".to_string(),
cli: "claude".to_string(),
worktree_path: tmp.path().to_path_buf(),
};
let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
tokio::time::sleep(Duration::from_millis(300)).await;
std::fs::write(tmp.path().join("more_work.rs"), "fn extra() {}").unwrap();
tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
assert_eq!(
state.read().agents["feat-x"].status,
"working",
"watcher must re-enter working after a post-commit edit within TTL"
);
let _ = tx.send(true);
let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
}
#[tokio::test(flavor = "current_thread")]
#[serial_test::serial]
async fn watch_worktree_does_not_reenter_when_ttl_zero() {
use crate::broker::BrokerState;
use crate::broker::messages::{ArtifactPayload, BrokerMessage};
let tmp = tempfile::tempdir().unwrap();
init_test_repo(tmp.path());
let state = Arc::new(BrokerState::new(None));
state.set_republish_working_ttl(Duration::ZERO);
super::delivery::publish_message(
&state,
&BrokerMessage::Artifact {
agent_id: "feat-z".to_string(),
payload: ArtifactPayload {
status: "committed".to_string(),
exports: vec![],
modified_files: vec![],
},
},
);
let (tx, rx) = tokio::sync::watch::channel(false);
let target = WatchTarget {
agent_id: "feat-z".to_string(),
cli: "claude".to_string(),
worktree_path: tmp.path().to_path_buf(),
};
let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
tokio::time::sleep(Duration::from_millis(300)).await;
std::fs::write(tmp.path().join("more_work.rs"), "fn extra() {}").unwrap();
tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
assert_eq!(
state.read().agents["feat-z"].status,
"committed",
"with TTL=0 the watcher must not re-enter working after commit"
);
let _ = tx.send(true);
let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
}
#[tokio::test(flavor = "current_thread")]
#[serial_test::serial]
async fn watch_worktree_prunes_vanished_worktree() {
use crate::broker::BrokerState;
let tmp = tempfile::tempdir().unwrap();
init_test_repo(tmp.path());
let path = tmp.path().to_path_buf();
let state = Arc::new(BrokerState::new(None));
let target = WatchTarget {
agent_id: "feat-gone".to_string(),
cli: "claude".to_string(),
worktree_path: path.clone(),
};
assert!(state.register_watch_target(&target));
let (tx, rx) = tokio::sync::watch::channel(false);
let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
tokio::time::sleep(Duration::from_millis(300)).await;
tmp.close().unwrap();
let joined = tokio::time::timeout(POLL_INTERVAL * 2 + Duration::from_secs(1), handle).await;
assert!(
joined.is_ok(),
"watcher task must exit after its worktree disappears"
);
assert!(
!state.read().watched_paths.contains(&path),
"the vanished worktree must be pruned from the live target set"
);
let _ = tx.send(true);
}
#[tokio::test(flavor = "current_thread")]
#[serial_test::serial]
async fn watch_worktree_does_not_publish_when_unchanged() {
use crate::broker::BrokerState;
let tmp = tempfile::tempdir().unwrap();
init_test_repo(tmp.path());
let state = Arc::new(BrokerState::new(None));
let (tx, rx) = tokio::sync::watch::channel(false);
let target = WatchTarget {
agent_id: "feat-y".to_string(),
cli: "claude".to_string(),
worktree_path: tmp.path().to_path_buf(),
};
let state_clone = Arc::clone(&state);
let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
tokio::time::sleep(POLL_INTERVAL * 2 + Duration::from_millis(200)).await;
let has_entry = {
let inner = state.read();
inner.agents.contains_key("feat-y")
};
assert!(
!has_entry,
"no publish expected when git status is unchanged"
);
let _ = tx.send(true);
let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
}
}