1use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14
15use super::messages::{BrokerMessage, StatusPayload};
16use super::{BrokerState, WatchTarget, delivery};
17
18pub const POLL_INTERVAL: Duration = Duration::from_secs(2);
20
21#[must_use]
35pub fn should_republish_working(
36 status: &str,
37 since_committed: Option<Duration>,
38 ttl: Duration,
39) -> bool {
40 if status != "committed" {
41 return true;
42 }
43 if ttl.is_zero() {
44 return false;
45 }
46 match since_committed {
47 Some(elapsed) => elapsed <= ttl,
48 None => false,
49 }
50}
51
52fn parse_porcelain(stdout: &str) -> Vec<String> {
57 let mut paths: Vec<String> = Vec::new();
58 for line in stdout.lines() {
59 if line.len() < 4 {
60 continue;
61 }
62 let rest = &line[3..];
64 if let Some((from, to)) = rest.split_once(" -> ") {
65 paths.push(from.trim().to_string());
66 paths.push(to.trim().to_string());
67 } else {
68 paths.push(rest.trim().to_string());
69 }
70 }
71 paths.sort();
72 paths.dedup();
73 paths
74}
75
76async fn run_git_status(worktree: &Path) -> Option<Vec<String>> {
81 let output = tokio::process::Command::new("git")
82 .arg("status")
83 .arg("--porcelain")
84 .current_dir(worktree)
85 .output()
86 .await
87 .ok()?;
88 if !output.status.success() {
89 return None;
90 }
91 let stdout = String::from_utf8_lossy(&output.stdout);
92 Some(parse_porcelain(&stdout))
93}
94
95pub async fn watch_worktree(
101 state: Arc<BrokerState>,
102 target: WatchTarget,
103 mut shutdown: tokio::sync::watch::Receiver<bool>,
104) {
105 let mut previous: Option<Vec<String>> = None;
106 let mut ticker = tokio::time::interval(POLL_INTERVAL);
107 ticker.tick().await;
109 loop {
110 tokio::select! {
111 _ = ticker.tick() => {}
112 _ = shutdown.changed() => {
113 if *shutdown.borrow() {
114 break;
115 }
116 }
117 }
118
119 let Some(current) = run_git_status(&target.worktree_path).await else {
120 if !target.worktree_path.exists() {
126 state.forget_watch_target(&target.worktree_path);
127 break;
128 }
129 continue;
130 };
131
132 if previous.as_ref() == Some(¤t) {
133 continue;
134 }
135
136 if previous.is_none() && current.is_empty() {
141 previous = Some(current);
142 continue;
143 }
144
145 let (status, since_committed, ttl) = {
149 let inner = state.read();
150 let ttl = inner.republish_working_ttl;
151 let rec = inner.agents.get(&target.agent_id);
152 let status = rec.map(|r| r.status.clone()).unwrap_or_default();
153 let since = rec.and_then(|r| r.last_committed_at).map(|t| t.elapsed());
154 (status, since, ttl)
155 };
156 if !should_republish_working(&status, since_committed, ttl) {
157 previous = Some(current);
160 continue;
161 }
162
163 let msg = BrokerMessage::Status {
164 agent_id: target.agent_id.clone(),
165 payload: StatusPayload {
166 status: "working".to_string(),
167 modified_files: current.clone(),
168 message: None,
169 ..Default::default()
170 },
171 };
172 delivery::publish_message(&state, &msg);
173 previous = Some(current);
174 }
175}
176
177#[cfg(test)]
178mod tests {
179 use super::*;
180
181 #[test]
184 fn non_committed_status_always_publishes() {
185 assert!(should_republish_working(
187 "working",
188 None,
189 Duration::from_secs(45)
190 ));
191 assert!(should_republish_working("idle", None, Duration::ZERO));
192 }
193
194 #[test]
195 fn committed_within_ttl_republishes() {
196 assert!(should_republish_working(
197 "committed",
198 Some(Duration::from_secs(10)),
199 Duration::from_secs(45)
200 ));
201 }
202
203 #[test]
204 fn committed_past_ttl_does_not_republish() {
205 assert!(!should_republish_working(
206 "committed",
207 Some(Duration::from_secs(290)),
208 Duration::from_secs(45)
209 ));
210 }
211
212 #[test]
213 fn committed_with_zero_ttl_does_not_republish() {
214 assert!(!should_republish_working(
216 "committed",
217 Some(Duration::from_secs(0)),
218 Duration::ZERO
219 ));
220 }
221
222 #[test]
223 fn committed_without_timestamp_does_not_republish() {
224 assert!(!should_republish_working(
225 "committed",
226 None,
227 Duration::from_secs(45)
228 ));
229 }
230
231 #[test]
232 fn parse_porcelain_handles_modified_and_untracked() {
233 let input = " M src/main.rs\n?? new_file.txt\nM src/lib.rs\n";
234 let parsed = parse_porcelain(input);
235 assert_eq!(
236 parsed,
237 vec![
238 "new_file.txt".to_string(),
239 "src/lib.rs".to_string(),
240 "src/main.rs".to_string(),
241 ]
242 );
243 }
244
245 #[test]
246 fn parse_porcelain_handles_renames() {
247 let input = "R old.rs -> new.rs\n";
248 let parsed = parse_porcelain(input);
249 assert_eq!(parsed, vec!["new.rs".to_string(), "old.rs".to_string()]);
250 }
251
252 #[test]
253 fn parse_porcelain_empty_is_empty_vec() {
254 assert!(parse_porcelain("").is_empty());
255 }
256
257 #[test]
258 fn parse_porcelain_dedupes() {
259 let input = " M a.rs\n M a.rs\n";
260 let parsed = parse_porcelain(input);
261 assert_eq!(parsed, vec!["a.rs".to_string()]);
262 }
263
264 fn init_test_repo(dir: &std::path::Path) {
265 use std::process::Command;
266 let run = |args: &[&str]| {
267 Command::new("git")
268 .args(args)
269 .current_dir(dir)
270 .output()
271 .expect("git command failed");
272 };
273 run(&["init", "-q", "-b", "main"]);
274 run(&["config", "user.email", "test@example.com"]);
275 run(&["config", "user.name", "test"]);
276 run(&["commit", "--allow-empty", "-m", "root", "-q"]);
277 }
278
279 #[tokio::test(flavor = "current_thread")]
280 #[serial_test::serial]
281 async fn run_git_status_detects_new_file() {
282 let tmp = tempfile::tempdir().unwrap();
283 init_test_repo(tmp.path());
284 std::fs::write(tmp.path().join("hello.txt"), "hi").unwrap();
285 let result = run_git_status(tmp.path()).await.unwrap();
286 assert!(
287 result.iter().any(|p| p == "hello.txt"),
288 "expected hello.txt in {result:?}"
289 );
290 }
291
292 #[tokio::test(flavor = "current_thread")]
295 #[serial_test::serial]
296 async fn watch_worktree_burst_republishes_working_once() {
297 use crate::broker::BrokerState;
298 use crate::broker::messages::{ArtifactPayload, BrokerMessage};
299
300 let tmp = tempfile::tempdir().unwrap();
301 init_test_repo(tmp.path());
302
303 let state = Arc::new(BrokerState::new(None));
304 super::delivery::publish_message(
305 &state,
306 &BrokerMessage::Artifact {
307 agent_id: "feat-b".to_string(),
308 payload: ArtifactPayload {
309 status: "committed".to_string(),
310 exports: vec![],
311 modified_files: vec![],
312 },
313 },
314 );
315
316 let (tx, rx) = tokio::sync::watch::channel(false);
317 let target = WatchTarget {
318 agent_id: "feat-b".to_string(),
319 cli: "claude".to_string(),
320 worktree_path: tmp.path().to_path_buf(),
321 };
322 let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
323
324 tokio::time::sleep(Duration::from_millis(300)).await;
326 for i in 0..10 {
327 std::fs::write(tmp.path().join(format!("f{i}.rs")), "x").unwrap();
328 }
329
330 tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
331
332 let working_count = {
333 let inner = state.read();
334 inner
335 .message_log
336 .iter()
337 .filter(|(_, _, m)| {
338 matches!(m, BrokerMessage::Status { agent_id, payload }
339 if agent_id == "feat-b" && payload.status == "working")
340 })
341 .count()
342 };
343 assert_eq!(
344 working_count, 1,
345 "a burst of writes within one poll interval must republish working exactly once"
346 );
347
348 let _ = tx.send(true);
349 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
350 }
351
352 #[tokio::test(flavor = "current_thread")]
353 #[serial_test::serial]
354 async fn run_git_status_respects_gitignore() {
355 let tmp = tempfile::tempdir().unwrap();
356 init_test_repo(tmp.path());
357 std::fs::write(tmp.path().join(".gitignore"), "target/\n").unwrap();
358 std::fs::create_dir(tmp.path().join("target")).unwrap();
359 std::fs::write(tmp.path().join("target").join("build.o"), "x").unwrap();
360 let result = run_git_status(tmp.path()).await.unwrap();
361 assert!(
362 !result.iter().any(|p| p.starts_with("target/")),
363 "target/ should be filtered by gitignore, got {result:?}"
364 );
365 }
366
367 #[tokio::test(flavor = "current_thread")]
368 #[serial_test::serial]
369 async fn watch_worktree_publishes_on_change() {
370 use crate::broker::BrokerState;
371 let tmp = tempfile::tempdir().unwrap();
372 init_test_repo(tmp.path());
373
374 let state = Arc::new(BrokerState::new(None));
375 let (tx, rx) = tokio::sync::watch::channel(false);
376 let target = WatchTarget {
377 agent_id: "feat-x".to_string(),
378 cli: "claude".to_string(),
379 worktree_path: tmp.path().to_path_buf(),
380 };
381 let state_clone = Arc::clone(&state);
382 let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
383
384 tokio::time::sleep(Duration::from_millis(300)).await;
386 std::fs::write(tmp.path().join("change.txt"), "hello").unwrap();
387
388 tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
390
391 let msg = {
392 let inner = state.read();
393 let record = inner
394 .agents
395 .get("feat-x")
396 .expect("watcher should register the agent");
397 record
398 .last_message
399 .clone()
400 .expect("watcher should publish a message")
401 };
402 match msg {
403 BrokerMessage::Status { agent_id, payload } => {
404 assert_eq!(agent_id, "feat-x");
405 assert!(payload.modified_files.iter().any(|p| p == "change.txt"));
406 }
407 other => panic!("expected Status message, got {other:?}"),
408 }
409
410 let _ = tx.send(true);
411 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
412 }
413
414 #[tokio::test(flavor = "current_thread")]
418 #[serial_test::serial]
419 async fn watch_worktree_reenters_working_after_commit() {
420 use crate::broker::BrokerState;
421 use crate::broker::messages::{ArtifactPayload, BrokerMessage};
422
423 let tmp = tempfile::tempdir().unwrap();
424 init_test_repo(tmp.path());
425
426 let state = Arc::new(BrokerState::new(None));
427 super::delivery::publish_message(
430 &state,
431 &BrokerMessage::Artifact {
432 agent_id: "feat-x".to_string(),
433 payload: ArtifactPayload {
434 status: "committed".to_string(),
435 exports: vec![],
436 modified_files: vec![],
437 },
438 },
439 );
440 assert_eq!(state.read().agents["feat-x"].status, "committed");
441
442 let (tx, rx) = tokio::sync::watch::channel(false);
443 let target = WatchTarget {
444 agent_id: "feat-x".to_string(),
445 cli: "claude".to_string(),
446 worktree_path: tmp.path().to_path_buf(),
447 };
448 let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
449
450 tokio::time::sleep(Duration::from_millis(300)).await;
452 std::fs::write(tmp.path().join("more_work.rs"), "fn extra() {}").unwrap();
453
454 tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
455
456 assert_eq!(
457 state.read().agents["feat-x"].status,
458 "working",
459 "watcher must re-enter working after a post-commit edit within TTL"
460 );
461
462 let _ = tx.send(true);
463 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
464 }
465
466 #[tokio::test(flavor = "current_thread")]
469 #[serial_test::serial]
470 async fn watch_worktree_does_not_reenter_when_ttl_zero() {
471 use crate::broker::BrokerState;
472 use crate::broker::messages::{ArtifactPayload, BrokerMessage};
473
474 let tmp = tempfile::tempdir().unwrap();
475 init_test_repo(tmp.path());
476
477 let state = Arc::new(BrokerState::new(None));
478 state.set_republish_working_ttl(Duration::ZERO);
479 super::delivery::publish_message(
480 &state,
481 &BrokerMessage::Artifact {
482 agent_id: "feat-z".to_string(),
483 payload: ArtifactPayload {
484 status: "committed".to_string(),
485 exports: vec![],
486 modified_files: vec![],
487 },
488 },
489 );
490
491 let (tx, rx) = tokio::sync::watch::channel(false);
492 let target = WatchTarget {
493 agent_id: "feat-z".to_string(),
494 cli: "claude".to_string(),
495 worktree_path: tmp.path().to_path_buf(),
496 };
497 let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
498
499 tokio::time::sleep(Duration::from_millis(300)).await;
500 std::fs::write(tmp.path().join("more_work.rs"), "fn extra() {}").unwrap();
501 tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
502
503 assert_eq!(
504 state.read().agents["feat-z"].status,
505 "committed",
506 "with TTL=0 the watcher must not re-enter working after commit"
507 );
508
509 let _ = tx.send(true);
510 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
511 }
512
513 #[tokio::test(flavor = "current_thread")]
518 #[serial_test::serial]
519 async fn watch_worktree_prunes_vanished_worktree() {
520 use crate::broker::BrokerState;
521 let tmp = tempfile::tempdir().unwrap();
522 init_test_repo(tmp.path());
523 let path = tmp.path().to_path_buf();
524
525 let state = Arc::new(BrokerState::new(None));
526 let target = WatchTarget {
527 agent_id: "feat-gone".to_string(),
528 cli: "claude".to_string(),
529 worktree_path: path.clone(),
530 };
531 assert!(state.register_watch_target(&target));
532
533 let (tx, rx) = tokio::sync::watch::channel(false);
534 let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
535
536 tokio::time::sleep(Duration::from_millis(300)).await;
538 tmp.close().unwrap();
539
540 let joined = tokio::time::timeout(POLL_INTERVAL * 2 + Duration::from_secs(1), handle).await;
543 assert!(
544 joined.is_ok(),
545 "watcher task must exit after its worktree disappears"
546 );
547 assert!(
548 !state.read().watched_paths.contains(&path),
549 "the vanished worktree must be pruned from the live target set"
550 );
551
552 let _ = tx.send(true);
553 }
554
555 #[tokio::test(flavor = "current_thread")]
556 #[serial_test::serial]
557 async fn watch_worktree_does_not_publish_when_unchanged() {
558 use crate::broker::BrokerState;
559 let tmp = tempfile::tempdir().unwrap();
560 init_test_repo(tmp.path());
561
562 let state = Arc::new(BrokerState::new(None));
563 let (tx, rx) = tokio::sync::watch::channel(false);
564 let target = WatchTarget {
565 agent_id: "feat-y".to_string(),
566 cli: "claude".to_string(),
567 worktree_path: tmp.path().to_path_buf(),
568 };
569 let state_clone = Arc::clone(&state);
570 let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
571
572 tokio::time::sleep(POLL_INTERVAL * 2 + Duration::from_millis(200)).await;
574
575 let has_entry = {
576 let inner = state.read();
577 inner.agents.contains_key("feat-y")
578 };
579 assert!(
580 !has_entry,
581 "no publish expected when git status is unchanged"
582 );
583
584 let _ = tx.send(true);
585 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
586 }
587}