Skip to main content

git_paw/broker/
watcher.rs

1//! Git-status polling watcher.
2//!
3//! Each `WatchTarget` spawns one async task that runs `git status --porcelain`
4//! in the worktree every [`POLL_INTERVAL`]. When the set of reported paths
5//! differs from the previous tick, the watcher publishes `agent.status` with
6//! the current paths in `modified_files`.
7//!
8//! The watcher inherits git's exclusion rules (`.gitignore`, `.git/` internals)
9//! instead of maintaining a hand-rolled filter list.
10
11use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14
15use super::messages::{BrokerMessage, StatusPayload};
16use super::{BrokerState, WatchTarget, delivery};
17
18/// Interval between `git status --porcelain` polls.
19pub const POLL_INTERVAL: Duration = Duration::from_secs(2);
20
21/// Decides whether the watcher should publish `working` for an observed
22/// git-status change, given the agent's current broker status, the time since
23/// its last `committed` event (if any), and the configured post-commit
24/// re-entry TTL (bug 8).
25///
26/// - When the agent is **not** in the `committed` state, the watcher publishes
27///   `working` exactly as in v0.5.0.
28/// - When the agent **is** `committed`:
29///   - `ttl == 0` suppresses the publish (committed stays terminal — the
30///     v0.5.0 opt-out model).
31///   - otherwise the publish fires only when the elapsed time since the
32///     committed event is within `ttl`; past the window the agent is
33///     considered settled and the watcher suppresses the publish.
34#[must_use]
35pub fn should_republish_working(
36    status: &str,
37    since_committed: Option<Duration>,
38    ttl: Duration,
39) -> bool {
40    if status != "committed" {
41        return true;
42    }
43    if ttl.is_zero() {
44        return false;
45    }
46    match since_committed {
47        Some(elapsed) => elapsed <= ttl,
48        None => false,
49    }
50}
51
52/// Parses `git status --porcelain` output into a sorted, deduplicated list of paths.
53///
54/// Each porcelain line looks like `XY PATH` or `XY PATH1 -> PATH2` for renames.
55/// For renames, both the source and destination paths are reported.
56fn parse_porcelain(stdout: &str) -> Vec<String> {
57    let mut paths: Vec<String> = Vec::new();
58    for line in stdout.lines() {
59        if line.len() < 4 {
60            continue;
61        }
62        // Skip the two-character status prefix and the separating space.
63        let rest = &line[3..];
64        if let Some((from, to)) = rest.split_once(" -> ") {
65            paths.push(from.trim().to_string());
66            paths.push(to.trim().to_string());
67        } else {
68            paths.push(rest.trim().to_string());
69        }
70    }
71    paths.sort();
72    paths.dedup();
73    paths
74}
75
76/// Runs `git status --porcelain` in `worktree` and returns the parsed path list.
77///
78/// Returns `None` when git is unavailable or the command fails — callers treat
79/// that as "no change detected this tick" and retry on the next interval.
80async fn run_git_status(worktree: &Path) -> Option<Vec<String>> {
81    let output = tokio::process::Command::new("git")
82        .arg("status")
83        .arg("--porcelain")
84        .current_dir(worktree)
85        .output()
86        .await
87        .ok()?;
88    if !output.status.success() {
89        return None;
90    }
91    let stdout = String::from_utf8_lossy(&output.stdout);
92    Some(parse_porcelain(&stdout))
93}
94
95/// Watches a single worktree, publishing `agent.status` when git-status output changes.
96///
97/// The task runs until the broker's shutdown signal fires. Each iteration waits
98/// [`POLL_INTERVAL`] and then checks `git status --porcelain`. If the result
99/// differs from the previous tick, it publishes a status message.
100pub async fn watch_worktree(
101    state: Arc<BrokerState>,
102    target: WatchTarget,
103    mut shutdown: tokio::sync::watch::Receiver<bool>,
104) {
105    let mut previous: Option<Vec<String>> = None;
106    let mut ticker = tokio::time::interval(POLL_INTERVAL);
107    // Skip the immediate first tick so we wait one interval before the first poll.
108    ticker.tick().await;
109    loop {
110        tokio::select! {
111            _ = ticker.tick() => {}
112            _ = shutdown.changed() => {
113                if *shutdown.borrow() {
114                    break;
115                }
116            }
117        }
118
119        let Some(current) = run_git_status(&target.worktree_path).await else {
120            // `git status` failed. Distinguish a transient failure (retry next
121            // tick) from a vanished worktree: when the worktree no longer
122            // exists on disk, deregister it and stop this task. This is the
123            // prune path for `git paw remove` — a later re-registration of the
124            // same path spawns a fresh watcher.
125            if !target.worktree_path.exists() {
126                state.forget_watch_target(&target.worktree_path);
127                break;
128            }
129            continue;
130        };
131
132        if previous.as_ref() == Some(&current) {
133            continue;
134        }
135
136        // Skip the very first baseline when the worktree is clean. We only
137        // want to announce the agent once it has actual dirty state; otherwise
138        // a quiet worktree would publish an empty status on startup with no
139        // useful information.
140        if previous.is_none() && current.is_empty() {
141            previous = Some(current);
142            continue;
143        }
144
145        // Bug 8: gate the `working` publish against the post-commit re-entry
146        // TTL. Read the agent's status, time-since-committed, and the
147        // configured TTL under a short read lock (never held across an await).
148        let (status, since_committed, ttl) = {
149            let inner = state.read();
150            let ttl = inner.republish_working_ttl;
151            let rec = inner.agents.get(&target.agent_id);
152            let status = rec.map(|r| r.status.clone()).unwrap_or_default();
153            let since = rec.and_then(|r| r.last_committed_at).map(|t| t.elapsed());
154            (status, since, ttl)
155        };
156        if !should_republish_working(&status, since_committed, ttl) {
157            // Agent is settled at `committed`; absorb the change as the new
158            // baseline without re-publishing `working`.
159            previous = Some(current);
160            continue;
161        }
162
163        let msg = BrokerMessage::Status {
164            agent_id: target.agent_id.clone(),
165            payload: StatusPayload {
166                status: "working".to_string(),
167                modified_files: current.clone(),
168                message: None,
169                ..Default::default()
170            },
171        };
172        delivery::publish_message(&state, &msg);
173        previous = Some(current);
174    }
175}
176
177#[cfg(test)]
178mod tests {
179    use super::*;
180
181    // === Bug 8: post-commit re-entry decision (should_republish_working) ===
182
183    #[test]
184    fn non_committed_status_always_publishes() {
185        // Normal operation: a working agent publishes regardless of TTL.
186        assert!(should_republish_working(
187            "working",
188            None,
189            Duration::from_secs(45)
190        ));
191        assert!(should_republish_working("idle", None, Duration::ZERO));
192    }
193
194    #[test]
195    fn committed_within_ttl_republishes() {
196        assert!(should_republish_working(
197            "committed",
198            Some(Duration::from_secs(10)),
199            Duration::from_secs(45)
200        ));
201    }
202
203    #[test]
204    fn committed_past_ttl_does_not_republish() {
205        assert!(!should_republish_working(
206            "committed",
207            Some(Duration::from_secs(290)),
208            Duration::from_secs(45)
209        ));
210    }
211
212    #[test]
213    fn committed_with_zero_ttl_does_not_republish() {
214        // Opt-out: TTL=0 keeps committed terminal even within "0 seconds".
215        assert!(!should_republish_working(
216            "committed",
217            Some(Duration::from_secs(0)),
218            Duration::ZERO
219        ));
220    }
221
222    #[test]
223    fn committed_without_timestamp_does_not_republish() {
224        assert!(!should_republish_working(
225            "committed",
226            None,
227            Duration::from_secs(45)
228        ));
229    }
230
231    #[test]
232    fn parse_porcelain_handles_modified_and_untracked() {
233        let input = " M src/main.rs\n?? new_file.txt\nM  src/lib.rs\n";
234        let parsed = parse_porcelain(input);
235        assert_eq!(
236            parsed,
237            vec![
238                "new_file.txt".to_string(),
239                "src/lib.rs".to_string(),
240                "src/main.rs".to_string(),
241            ]
242        );
243    }
244
245    #[test]
246    fn parse_porcelain_handles_renames() {
247        let input = "R  old.rs -> new.rs\n";
248        let parsed = parse_porcelain(input);
249        assert_eq!(parsed, vec!["new.rs".to_string(), "old.rs".to_string()]);
250    }
251
252    #[test]
253    fn parse_porcelain_empty_is_empty_vec() {
254        assert!(parse_porcelain("").is_empty());
255    }
256
257    #[test]
258    fn parse_porcelain_dedupes() {
259        let input = " M a.rs\n M a.rs\n";
260        let parsed = parse_porcelain(input);
261        assert_eq!(parsed, vec!["a.rs".to_string()]);
262    }
263
264    fn init_test_repo(dir: &std::path::Path) {
265        use std::process::Command;
266        let run = |args: &[&str]| {
267            Command::new("git")
268                .args(args)
269                .current_dir(dir)
270                .output()
271                .expect("git command failed");
272        };
273        run(&["init", "-q", "-b", "main"]);
274        run(&["config", "user.email", "test@example.com"]);
275        run(&["config", "user.name", "test"]);
276        run(&["commit", "--allow-empty", "-m", "root", "-q"]);
277    }
278
279    #[tokio::test(flavor = "current_thread")]
280    #[serial_test::serial]
281    async fn run_git_status_detects_new_file() {
282        let tmp = tempfile::tempdir().unwrap();
283        init_test_repo(tmp.path());
284        std::fs::write(tmp.path().join("hello.txt"), "hi").unwrap();
285        let result = run_git_status(tmp.path()).await.unwrap();
286        assert!(
287            result.iter().any(|p| p == "hello.txt"),
288            "expected hello.txt in {result:?}"
289        );
290    }
291
292    /// Spec scenario: multiple writes within the TTL republish `working`
293    /// exactly once (the 2s poll coalesces a burst into one tick).
294    #[tokio::test(flavor = "current_thread")]
295    #[serial_test::serial]
296    async fn watch_worktree_burst_republishes_working_once() {
297        use crate::broker::BrokerState;
298        use crate::broker::messages::{ArtifactPayload, BrokerMessage};
299
300        let tmp = tempfile::tempdir().unwrap();
301        init_test_repo(tmp.path());
302
303        let state = Arc::new(BrokerState::new(None));
304        super::delivery::publish_message(
305            &state,
306            &BrokerMessage::Artifact {
307                agent_id: "feat-b".to_string(),
308                payload: ArtifactPayload {
309                    status: "committed".to_string(),
310                    exports: vec![],
311                    modified_files: vec![],
312                },
313            },
314        );
315
316        let (tx, rx) = tokio::sync::watch::channel(false);
317        let target = WatchTarget {
318            agent_id: "feat-b".to_string(),
319            cli: "claude".to_string(),
320            worktree_path: tmp.path().to_path_buf(),
321        };
322        let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
323
324        // Burst: ten files written well within one poll interval.
325        tokio::time::sleep(Duration::from_millis(300)).await;
326        for i in 0..10 {
327            std::fs::write(tmp.path().join(format!("f{i}.rs")), "x").unwrap();
328        }
329
330        tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
331
332        let working_count = {
333            let inner = state.read();
334            inner
335                .message_log
336                .iter()
337                .filter(|(_, _, m)| {
338                    matches!(m, BrokerMessage::Status { agent_id, payload }
339                        if agent_id == "feat-b" && payload.status == "working")
340                })
341                .count()
342        };
343        assert_eq!(
344            working_count, 1,
345            "a burst of writes within one poll interval must republish working exactly once"
346        );
347
348        let _ = tx.send(true);
349        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
350    }
351
352    #[tokio::test(flavor = "current_thread")]
353    #[serial_test::serial]
354    async fn run_git_status_respects_gitignore() {
355        let tmp = tempfile::tempdir().unwrap();
356        init_test_repo(tmp.path());
357        std::fs::write(tmp.path().join(".gitignore"), "target/\n").unwrap();
358        std::fs::create_dir(tmp.path().join("target")).unwrap();
359        std::fs::write(tmp.path().join("target").join("build.o"), "x").unwrap();
360        let result = run_git_status(tmp.path()).await.unwrap();
361        assert!(
362            !result.iter().any(|p| p.starts_with("target/")),
363            "target/ should be filtered by gitignore, got {result:?}"
364        );
365    }
366
367    #[tokio::test(flavor = "current_thread")]
368    #[serial_test::serial]
369    async fn watch_worktree_publishes_on_change() {
370        use crate::broker::BrokerState;
371        let tmp = tempfile::tempdir().unwrap();
372        init_test_repo(tmp.path());
373
374        let state = Arc::new(BrokerState::new(None));
375        let (tx, rx) = tokio::sync::watch::channel(false);
376        let target = WatchTarget {
377            agent_id: "feat-x".to_string(),
378            cli: "claude".to_string(),
379            worktree_path: tmp.path().to_path_buf(),
380        };
381        let state_clone = Arc::clone(&state);
382        let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
383
384        // Give the first tick a chance, then create a file.
385        tokio::time::sleep(Duration::from_millis(300)).await;
386        std::fs::write(tmp.path().join("change.txt"), "hello").unwrap();
387
388        // Wait long enough for at least two poll intervals.
389        tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
390
391        let msg = {
392            let inner = state.read();
393            let record = inner
394                .agents
395                .get("feat-x")
396                .expect("watcher should register the agent");
397            record
398                .last_message
399                .clone()
400                .expect("watcher should publish a message")
401        };
402        match msg {
403            BrokerMessage::Status { agent_id, payload } => {
404                assert_eq!(agent_id, "feat-x");
405                assert!(payload.modified_files.iter().any(|p| p == "change.txt"));
406            }
407            other => panic!("expected Status message, got {other:?}"),
408        }
409
410        let _ = tx.send(true);
411        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
412    }
413
414    /// Spec scenario (task 5.5): an agent commits, the broker records
415    /// `committed`, then the agent keeps editing — the watcher re-publishes
416    /// `working`, so the record transitions `committed` -> `working`.
417    #[tokio::test(flavor = "current_thread")]
418    #[serial_test::serial]
419    async fn watch_worktree_reenters_working_after_commit() {
420        use crate::broker::BrokerState;
421        use crate::broker::messages::{ArtifactPayload, BrokerMessage};
422
423        let tmp = tempfile::tempdir().unwrap();
424        init_test_repo(tmp.path());
425
426        let state = Arc::new(BrokerState::new(None));
427        // Record a committed artifact so the agent is in the committed state
428        // with a fresh last_committed_at (default TTL is 60s).
429        super::delivery::publish_message(
430            &state,
431            &BrokerMessage::Artifact {
432                agent_id: "feat-x".to_string(),
433                payload: ArtifactPayload {
434                    status: "committed".to_string(),
435                    exports: vec![],
436                    modified_files: vec![],
437                },
438            },
439        );
440        assert_eq!(state.read().agents["feat-x"].status, "committed");
441
442        let (tx, rx) = tokio::sync::watch::channel(false);
443        let target = WatchTarget {
444            agent_id: "feat-x".to_string(),
445            cli: "claude".to_string(),
446            worktree_path: tmp.path().to_path_buf(),
447        };
448        let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
449
450        // Agent keeps editing after the commit.
451        tokio::time::sleep(Duration::from_millis(300)).await;
452        std::fs::write(tmp.path().join("more_work.rs"), "fn extra() {}").unwrap();
453
454        tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
455
456        assert_eq!(
457            state.read().agents["feat-x"].status,
458            "working",
459            "watcher must re-enter working after a post-commit edit within TTL"
460        );
461
462        let _ = tx.send(true);
463        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
464    }
465
466    /// With TTL=0 the post-commit edit must NOT re-publish `working` — the
467    /// dashboard keeps showing `committed` (v0.5.0 opt-out).
468    #[tokio::test(flavor = "current_thread")]
469    #[serial_test::serial]
470    async fn watch_worktree_does_not_reenter_when_ttl_zero() {
471        use crate::broker::BrokerState;
472        use crate::broker::messages::{ArtifactPayload, BrokerMessage};
473
474        let tmp = tempfile::tempdir().unwrap();
475        init_test_repo(tmp.path());
476
477        let state = Arc::new(BrokerState::new(None));
478        state.set_republish_working_ttl(Duration::ZERO);
479        super::delivery::publish_message(
480            &state,
481            &BrokerMessage::Artifact {
482                agent_id: "feat-z".to_string(),
483                payload: ArtifactPayload {
484                    status: "committed".to_string(),
485                    exports: vec![],
486                    modified_files: vec![],
487                },
488            },
489        );
490
491        let (tx, rx) = tokio::sync::watch::channel(false);
492        let target = WatchTarget {
493            agent_id: "feat-z".to_string(),
494            cli: "claude".to_string(),
495            worktree_path: tmp.path().to_path_buf(),
496        };
497        let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
498
499        tokio::time::sleep(Duration::from_millis(300)).await;
500        std::fs::write(tmp.path().join("more_work.rs"), "fn extra() {}").unwrap();
501        tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
502
503        assert_eq!(
504            state.read().agents["feat-z"].status,
505            "committed",
506            "with TTL=0 the watcher must not re-enter working after commit"
507        );
508
509        let _ = tx.send(true);
510        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
511    }
512
513    /// Spec scenario (task 3.2): when a watched worktree disappears (the
514    /// `git paw remove` prune path), the watcher deregisters it from the live
515    /// target set and exits, so a later re-add of the same path spawns a fresh
516    /// watcher.
517    #[tokio::test(flavor = "current_thread")]
518    #[serial_test::serial]
519    async fn watch_worktree_prunes_vanished_worktree() {
520        use crate::broker::BrokerState;
521        let tmp = tempfile::tempdir().unwrap();
522        init_test_repo(tmp.path());
523        let path = tmp.path().to_path_buf();
524
525        let state = Arc::new(BrokerState::new(None));
526        let target = WatchTarget {
527            agent_id: "feat-gone".to_string(),
528            cli: "claude".to_string(),
529            worktree_path: path.clone(),
530        };
531        assert!(state.register_watch_target(&target));
532
533        let (tx, rx) = tokio::sync::watch::channel(false);
534        let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
535
536        // Let the watcher settle on a baseline, then delete the worktree.
537        tokio::time::sleep(Duration::from_millis(300)).await;
538        tmp.close().unwrap();
539
540        // Within a couple poll intervals the watcher detects the vanished
541        // worktree, prunes it, and exits.
542        let joined = tokio::time::timeout(POLL_INTERVAL * 2 + Duration::from_secs(1), handle).await;
543        assert!(
544            joined.is_ok(),
545            "watcher task must exit after its worktree disappears"
546        );
547        assert!(
548            !state.read().watched_paths.contains(&path),
549            "the vanished worktree must be pruned from the live target set"
550        );
551
552        let _ = tx.send(true);
553    }
554
555    #[tokio::test(flavor = "current_thread")]
556    #[serial_test::serial]
557    async fn watch_worktree_does_not_publish_when_unchanged() {
558        use crate::broker::BrokerState;
559        let tmp = tempfile::tempdir().unwrap();
560        init_test_repo(tmp.path());
561
562        let state = Arc::new(BrokerState::new(None));
563        let (tx, rx) = tokio::sync::watch::channel(false);
564        let target = WatchTarget {
565            agent_id: "feat-y".to_string(),
566            cli: "claude".to_string(),
567            worktree_path: tmp.path().to_path_buf(),
568        };
569        let state_clone = Arc::clone(&state);
570        let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
571
572        // Let several ticks elapse with no changes.
573        tokio::time::sleep(POLL_INTERVAL * 2 + Duration::from_millis(200)).await;
574
575        let has_entry = {
576            let inner = state.read();
577            inner.agents.contains_key("feat-y")
578        };
579        assert!(
580            !has_entry,
581            "no publish expected when git status is unchanged"
582        );
583
584        let _ = tx.send(true);
585        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
586    }
587}