1use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14
15use super::messages::{BrokerMessage, StatusPayload};
16use super::{BrokerState, WatchTarget, delivery};
17
18pub const POLL_INTERVAL: Duration = Duration::from_secs(2);
20
21#[must_use]
35pub fn should_republish_working(
36 status: &str,
37 since_committed: Option<Duration>,
38 ttl: Duration,
39) -> bool {
40 if status != "committed" {
41 return true;
42 }
43 if ttl.is_zero() {
44 return false;
45 }
46 match since_committed {
47 Some(elapsed) => elapsed <= ttl,
48 None => false,
49 }
50}
51
52fn parse_porcelain(stdout: &str) -> Vec<String> {
57 let mut paths: Vec<String> = Vec::new();
58 for line in stdout.lines() {
59 if line.len() < 4 {
60 continue;
61 }
62 let rest = &line[3..];
64 if let Some((from, to)) = rest.split_once(" -> ") {
65 paths.push(from.trim().to_string());
66 paths.push(to.trim().to_string());
67 } else {
68 paths.push(rest.trim().to_string());
69 }
70 }
71 paths.sort();
72 paths.dedup();
73 paths
74}
75
76async fn run_git_status(worktree: &Path) -> Option<Vec<String>> {
81 let output = tokio::process::Command::new("git")
82 .arg("status")
83 .arg("--porcelain")
84 .current_dir(worktree)
85 .output()
86 .await
87 .ok()?;
88 if !output.status.success() {
89 return None;
90 }
91 let stdout = String::from_utf8_lossy(&output.stdout);
92 Some(parse_porcelain(&stdout))
93}
94
95pub async fn watch_worktree(
101 state: Arc<BrokerState>,
102 target: WatchTarget,
103 mut shutdown: tokio::sync::watch::Receiver<bool>,
104) {
105 let mut previous: Option<Vec<String>> = None;
106 let mut ticker = tokio::time::interval(POLL_INTERVAL);
107 ticker.tick().await;
109 loop {
110 tokio::select! {
111 _ = ticker.tick() => {}
112 _ = shutdown.changed() => {
113 if *shutdown.borrow() {
114 break;
115 }
116 }
117 }
118
119 let Some(current) = run_git_status(&target.worktree_path).await else {
120 continue;
121 };
122
123 if previous.as_ref() == Some(¤t) {
124 continue;
125 }
126
127 if previous.is_none() && current.is_empty() {
132 previous = Some(current);
133 continue;
134 }
135
136 let (status, since_committed, ttl) = {
140 let inner = state.read();
141 let ttl = inner.republish_working_ttl;
142 let rec = inner.agents.get(&target.agent_id);
143 let status = rec.map(|r| r.status.clone()).unwrap_or_default();
144 let since = rec.and_then(|r| r.last_committed_at).map(|t| t.elapsed());
145 (status, since, ttl)
146 };
147 if !should_republish_working(&status, since_committed, ttl) {
148 previous = Some(current);
151 continue;
152 }
153
154 let msg = BrokerMessage::Status {
155 agent_id: target.agent_id.clone(),
156 payload: StatusPayload {
157 status: "working".to_string(),
158 modified_files: current.clone(),
159 message: None,
160 ..Default::default()
161 },
162 };
163 delivery::publish_message(&state, &msg);
164 previous = Some(current);
165 }
166}
167
168#[cfg(test)]
169mod tests {
170 use super::*;
171
172 #[test]
175 fn non_committed_status_always_publishes() {
176 assert!(should_republish_working(
178 "working",
179 None,
180 Duration::from_secs(45)
181 ));
182 assert!(should_republish_working("idle", None, Duration::ZERO));
183 }
184
185 #[test]
186 fn committed_within_ttl_republishes() {
187 assert!(should_republish_working(
188 "committed",
189 Some(Duration::from_secs(10)),
190 Duration::from_secs(45)
191 ));
192 }
193
194 #[test]
195 fn committed_past_ttl_does_not_republish() {
196 assert!(!should_republish_working(
197 "committed",
198 Some(Duration::from_secs(290)),
199 Duration::from_secs(45)
200 ));
201 }
202
203 #[test]
204 fn committed_with_zero_ttl_does_not_republish() {
205 assert!(!should_republish_working(
207 "committed",
208 Some(Duration::from_secs(0)),
209 Duration::ZERO
210 ));
211 }
212
213 #[test]
214 fn committed_without_timestamp_does_not_republish() {
215 assert!(!should_republish_working(
216 "committed",
217 None,
218 Duration::from_secs(45)
219 ));
220 }
221
222 #[test]
223 fn parse_porcelain_handles_modified_and_untracked() {
224 let input = " M src/main.rs\n?? new_file.txt\nM src/lib.rs\n";
225 let parsed = parse_porcelain(input);
226 assert_eq!(
227 parsed,
228 vec![
229 "new_file.txt".to_string(),
230 "src/lib.rs".to_string(),
231 "src/main.rs".to_string(),
232 ]
233 );
234 }
235
236 #[test]
237 fn parse_porcelain_handles_renames() {
238 let input = "R old.rs -> new.rs\n";
239 let parsed = parse_porcelain(input);
240 assert_eq!(parsed, vec!["new.rs".to_string(), "old.rs".to_string()]);
241 }
242
243 #[test]
244 fn parse_porcelain_empty_is_empty_vec() {
245 assert!(parse_porcelain("").is_empty());
246 }
247
248 #[test]
249 fn parse_porcelain_dedupes() {
250 let input = " M a.rs\n M a.rs\n";
251 let parsed = parse_porcelain(input);
252 assert_eq!(parsed, vec!["a.rs".to_string()]);
253 }
254
255 fn init_test_repo(dir: &std::path::Path) {
256 use std::process::Command;
257 let run = |args: &[&str]| {
258 Command::new("git")
259 .args(args)
260 .current_dir(dir)
261 .output()
262 .expect("git command failed");
263 };
264 run(&["init", "-q", "-b", "main"]);
265 run(&["config", "user.email", "test@example.com"]);
266 run(&["config", "user.name", "test"]);
267 run(&["commit", "--allow-empty", "-m", "root", "-q"]);
268 }
269
270 #[tokio::test(flavor = "current_thread")]
271 #[serial_test::serial]
272 async fn run_git_status_detects_new_file() {
273 let tmp = tempfile::tempdir().unwrap();
274 init_test_repo(tmp.path());
275 std::fs::write(tmp.path().join("hello.txt"), "hi").unwrap();
276 let result = run_git_status(tmp.path()).await.unwrap();
277 assert!(
278 result.iter().any(|p| p == "hello.txt"),
279 "expected hello.txt in {result:?}"
280 );
281 }
282
283 #[tokio::test(flavor = "current_thread")]
286 #[serial_test::serial]
287 async fn watch_worktree_burst_republishes_working_once() {
288 use crate::broker::BrokerState;
289 use crate::broker::messages::{ArtifactPayload, BrokerMessage};
290
291 let tmp = tempfile::tempdir().unwrap();
292 init_test_repo(tmp.path());
293
294 let state = Arc::new(BrokerState::new(None));
295 super::delivery::publish_message(
296 &state,
297 &BrokerMessage::Artifact {
298 agent_id: "feat-b".to_string(),
299 payload: ArtifactPayload {
300 status: "committed".to_string(),
301 exports: vec![],
302 modified_files: vec![],
303 },
304 },
305 );
306
307 let (tx, rx) = tokio::sync::watch::channel(false);
308 let target = WatchTarget {
309 agent_id: "feat-b".to_string(),
310 cli: "claude".to_string(),
311 worktree_path: tmp.path().to_path_buf(),
312 };
313 let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
314
315 tokio::time::sleep(Duration::from_millis(300)).await;
317 for i in 0..10 {
318 std::fs::write(tmp.path().join(format!("f{i}.rs")), "x").unwrap();
319 }
320
321 tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
322
323 let working_count = {
324 let inner = state.read();
325 inner
326 .message_log
327 .iter()
328 .filter(|(_, _, m)| {
329 matches!(m, BrokerMessage::Status { agent_id, payload }
330 if agent_id == "feat-b" && payload.status == "working")
331 })
332 .count()
333 };
334 assert_eq!(
335 working_count, 1,
336 "a burst of writes within one poll interval must republish working exactly once"
337 );
338
339 let _ = tx.send(true);
340 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
341 }
342
343 #[tokio::test(flavor = "current_thread")]
344 #[serial_test::serial]
345 async fn run_git_status_respects_gitignore() {
346 let tmp = tempfile::tempdir().unwrap();
347 init_test_repo(tmp.path());
348 std::fs::write(tmp.path().join(".gitignore"), "target/\n").unwrap();
349 std::fs::create_dir(tmp.path().join("target")).unwrap();
350 std::fs::write(tmp.path().join("target").join("build.o"), "x").unwrap();
351 let result = run_git_status(tmp.path()).await.unwrap();
352 assert!(
353 !result.iter().any(|p| p.starts_with("target/")),
354 "target/ should be filtered by gitignore, got {result:?}"
355 );
356 }
357
358 #[tokio::test(flavor = "current_thread")]
359 #[serial_test::serial]
360 async fn watch_worktree_publishes_on_change() {
361 use crate::broker::BrokerState;
362 let tmp = tempfile::tempdir().unwrap();
363 init_test_repo(tmp.path());
364
365 let state = Arc::new(BrokerState::new(None));
366 let (tx, rx) = tokio::sync::watch::channel(false);
367 let target = WatchTarget {
368 agent_id: "feat-x".to_string(),
369 cli: "claude".to_string(),
370 worktree_path: tmp.path().to_path_buf(),
371 };
372 let state_clone = Arc::clone(&state);
373 let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
374
375 tokio::time::sleep(Duration::from_millis(300)).await;
377 std::fs::write(tmp.path().join("change.txt"), "hello").unwrap();
378
379 tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
381
382 let msg = {
383 let inner = state.read();
384 let record = inner
385 .agents
386 .get("feat-x")
387 .expect("watcher should register the agent");
388 record
389 .last_message
390 .clone()
391 .expect("watcher should publish a message")
392 };
393 match msg {
394 BrokerMessage::Status { agent_id, payload } => {
395 assert_eq!(agent_id, "feat-x");
396 assert!(payload.modified_files.iter().any(|p| p == "change.txt"));
397 }
398 other => panic!("expected Status message, got {other:?}"),
399 }
400
401 let _ = tx.send(true);
402 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
403 }
404
405 #[tokio::test(flavor = "current_thread")]
409 #[serial_test::serial]
410 async fn watch_worktree_reenters_working_after_commit() {
411 use crate::broker::BrokerState;
412 use crate::broker::messages::{ArtifactPayload, BrokerMessage};
413
414 let tmp = tempfile::tempdir().unwrap();
415 init_test_repo(tmp.path());
416
417 let state = Arc::new(BrokerState::new(None));
418 super::delivery::publish_message(
421 &state,
422 &BrokerMessage::Artifact {
423 agent_id: "feat-x".to_string(),
424 payload: ArtifactPayload {
425 status: "committed".to_string(),
426 exports: vec![],
427 modified_files: vec![],
428 },
429 },
430 );
431 assert_eq!(state.read().agents["feat-x"].status, "committed");
432
433 let (tx, rx) = tokio::sync::watch::channel(false);
434 let target = WatchTarget {
435 agent_id: "feat-x".to_string(),
436 cli: "claude".to_string(),
437 worktree_path: tmp.path().to_path_buf(),
438 };
439 let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
440
441 tokio::time::sleep(Duration::from_millis(300)).await;
443 std::fs::write(tmp.path().join("more_work.rs"), "fn extra() {}").unwrap();
444
445 tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
446
447 assert_eq!(
448 state.read().agents["feat-x"].status,
449 "working",
450 "watcher must re-enter working after a post-commit edit within TTL"
451 );
452
453 let _ = tx.send(true);
454 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
455 }
456
457 #[tokio::test(flavor = "current_thread")]
460 #[serial_test::serial]
461 async fn watch_worktree_does_not_reenter_when_ttl_zero() {
462 use crate::broker::BrokerState;
463 use crate::broker::messages::{ArtifactPayload, BrokerMessage};
464
465 let tmp = tempfile::tempdir().unwrap();
466 init_test_repo(tmp.path());
467
468 let state = Arc::new(BrokerState::new(None));
469 state.set_republish_working_ttl(Duration::ZERO);
470 super::delivery::publish_message(
471 &state,
472 &BrokerMessage::Artifact {
473 agent_id: "feat-z".to_string(),
474 payload: ArtifactPayload {
475 status: "committed".to_string(),
476 exports: vec![],
477 modified_files: vec![],
478 },
479 },
480 );
481
482 let (tx, rx) = tokio::sync::watch::channel(false);
483 let target = WatchTarget {
484 agent_id: "feat-z".to_string(),
485 cli: "claude".to_string(),
486 worktree_path: tmp.path().to_path_buf(),
487 };
488 let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
489
490 tokio::time::sleep(Duration::from_millis(300)).await;
491 std::fs::write(tmp.path().join("more_work.rs"), "fn extra() {}").unwrap();
492 tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
493
494 assert_eq!(
495 state.read().agents["feat-z"].status,
496 "committed",
497 "with TTL=0 the watcher must not re-enter working after commit"
498 );
499
500 let _ = tx.send(true);
501 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
502 }
503
504 #[tokio::test(flavor = "current_thread")]
505 #[serial_test::serial]
506 async fn watch_worktree_does_not_publish_when_unchanged() {
507 use crate::broker::BrokerState;
508 let tmp = tempfile::tempdir().unwrap();
509 init_test_repo(tmp.path());
510
511 let state = Arc::new(BrokerState::new(None));
512 let (tx, rx) = tokio::sync::watch::channel(false);
513 let target = WatchTarget {
514 agent_id: "feat-y".to_string(),
515 cli: "claude".to_string(),
516 worktree_path: tmp.path().to_path_buf(),
517 };
518 let state_clone = Arc::clone(&state);
519 let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
520
521 tokio::time::sleep(POLL_INTERVAL * 2 + Duration::from_millis(200)).await;
523
524 let has_entry = {
525 let inner = state.read();
526 inner.agents.contains_key("feat-y")
527 };
528 assert!(
529 !has_entry,
530 "no publish expected when git status is unchanged"
531 );
532
533 let _ = tx.send(true);
534 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
535 }
536}