Skip to main content

git_paw/broker/
watcher.rs

1//! Git-status polling watcher.
2//!
3//! Each `WatchTarget` spawns one async task that runs `git status --porcelain`
4//! in the worktree every [`POLL_INTERVAL`]. When the set of reported paths
5//! differs from the previous tick, the watcher publishes `agent.status` with
6//! the current paths in `modified_files`.
7//!
8//! The watcher inherits git's exclusion rules (`.gitignore`, `.git/` internals)
9//! instead of maintaining a hand-rolled filter list.
10
11use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14
15use super::messages::{BrokerMessage, StatusPayload};
16use super::{BrokerState, WatchTarget, delivery};
17
18/// Interval between `git status --porcelain` polls.
19pub const POLL_INTERVAL: Duration = Duration::from_secs(2);
20
21/// Decides whether the watcher should publish `working` for an observed
22/// git-status change, given the agent's current broker status, the time since
23/// its last `committed` event (if any), and the configured post-commit
24/// re-entry TTL (bug 8).
25///
26/// - When the agent is **not** in the `committed` state, the watcher publishes
27///   `working` exactly as in v0.5.0.
28/// - When the agent **is** `committed`:
29///   - `ttl == 0` suppresses the publish (committed stays terminal — the
30///     v0.5.0 opt-out model).
31///   - otherwise the publish fires only when the elapsed time since the
32///     committed event is within `ttl`; past the window the agent is
33///     considered settled and the watcher suppresses the publish.
34#[must_use]
35pub fn should_republish_working(
36    status: &str,
37    since_committed: Option<Duration>,
38    ttl: Duration,
39) -> bool {
40    if status != "committed" {
41        return true;
42    }
43    if ttl.is_zero() {
44        return false;
45    }
46    match since_committed {
47        Some(elapsed) => elapsed <= ttl,
48        None => false,
49    }
50}
51
52/// Parses `git status --porcelain` output into a sorted, deduplicated list of paths.
53///
54/// Each porcelain line looks like `XY PATH` or `XY PATH1 -> PATH2` for renames.
55/// For renames, both the source and destination paths are reported.
56fn parse_porcelain(stdout: &str) -> Vec<String> {
57    let mut paths: Vec<String> = Vec::new();
58    for line in stdout.lines() {
59        if line.len() < 4 {
60            continue;
61        }
62        // Skip the two-character status prefix and the separating space.
63        let rest = &line[3..];
64        if let Some((from, to)) = rest.split_once(" -> ") {
65            paths.push(from.trim().to_string());
66            paths.push(to.trim().to_string());
67        } else {
68            paths.push(rest.trim().to_string());
69        }
70    }
71    paths.sort();
72    paths.dedup();
73    paths
74}
75
76/// Runs `git status --porcelain` in `worktree` and returns the parsed path list.
77///
78/// Returns `None` when git is unavailable or the command fails — callers treat
79/// that as "no change detected this tick" and retry on the next interval.
80async fn run_git_status(worktree: &Path) -> Option<Vec<String>> {
81    let output = tokio::process::Command::new("git")
82        .arg("status")
83        .arg("--porcelain")
84        .current_dir(worktree)
85        .output()
86        .await
87        .ok()?;
88    if !output.status.success() {
89        return None;
90    }
91    let stdout = String::from_utf8_lossy(&output.stdout);
92    Some(parse_porcelain(&stdout))
93}
94
95/// Watches a single worktree, publishing `agent.status` when git-status output changes.
96///
97/// The task runs until the broker's shutdown signal fires. Each iteration waits
98/// [`POLL_INTERVAL`] and then checks `git status --porcelain`. If the result
99/// differs from the previous tick, it publishes a status message.
100pub async fn watch_worktree(
101    state: Arc<BrokerState>,
102    target: WatchTarget,
103    mut shutdown: tokio::sync::watch::Receiver<bool>,
104) {
105    let mut previous: Option<Vec<String>> = None;
106    let mut ticker = tokio::time::interval(POLL_INTERVAL);
107    // Skip the immediate first tick so we wait one interval before the first poll.
108    ticker.tick().await;
109    loop {
110        tokio::select! {
111            _ = ticker.tick() => {}
112            _ = shutdown.changed() => {
113                if *shutdown.borrow() {
114                    break;
115                }
116            }
117        }
118
119        let Some(current) = run_git_status(&target.worktree_path).await else {
120            continue;
121        };
122
123        if previous.as_ref() == Some(&current) {
124            continue;
125        }
126
127        // Skip the very first baseline when the worktree is clean. We only
128        // want to announce the agent once it has actual dirty state; otherwise
129        // a quiet worktree would publish an empty status on startup with no
130        // useful information.
131        if previous.is_none() && current.is_empty() {
132            previous = Some(current);
133            continue;
134        }
135
136        // Bug 8: gate the `working` publish against the post-commit re-entry
137        // TTL. Read the agent's status, time-since-committed, and the
138        // configured TTL under a short read lock (never held across an await).
139        let (status, since_committed, ttl) = {
140            let inner = state.read();
141            let ttl = inner.republish_working_ttl;
142            let rec = inner.agents.get(&target.agent_id);
143            let status = rec.map(|r| r.status.clone()).unwrap_or_default();
144            let since = rec.and_then(|r| r.last_committed_at).map(|t| t.elapsed());
145            (status, since, ttl)
146        };
147        if !should_republish_working(&status, since_committed, ttl) {
148            // Agent is settled at `committed`; absorb the change as the new
149            // baseline without re-publishing `working`.
150            previous = Some(current);
151            continue;
152        }
153
154        let msg = BrokerMessage::Status {
155            agent_id: target.agent_id.clone(),
156            payload: StatusPayload {
157                status: "working".to_string(),
158                modified_files: current.clone(),
159                message: None,
160                ..Default::default()
161            },
162        };
163        delivery::publish_message(&state, &msg);
164        previous = Some(current);
165    }
166}
167
168#[cfg(test)]
169mod tests {
170    use super::*;
171
172    // === Bug 8: post-commit re-entry decision (should_republish_working) ===
173
174    #[test]
175    fn non_committed_status_always_publishes() {
176        // Normal operation: a working agent publishes regardless of TTL.
177        assert!(should_republish_working(
178            "working",
179            None,
180            Duration::from_secs(45)
181        ));
182        assert!(should_republish_working("idle", None, Duration::ZERO));
183    }
184
185    #[test]
186    fn committed_within_ttl_republishes() {
187        assert!(should_republish_working(
188            "committed",
189            Some(Duration::from_secs(10)),
190            Duration::from_secs(45)
191        ));
192    }
193
194    #[test]
195    fn committed_past_ttl_does_not_republish() {
196        assert!(!should_republish_working(
197            "committed",
198            Some(Duration::from_secs(290)),
199            Duration::from_secs(45)
200        ));
201    }
202
203    #[test]
204    fn committed_with_zero_ttl_does_not_republish() {
205        // Opt-out: TTL=0 keeps committed terminal even within "0 seconds".
206        assert!(!should_republish_working(
207            "committed",
208            Some(Duration::from_secs(0)),
209            Duration::ZERO
210        ));
211    }
212
213    #[test]
214    fn committed_without_timestamp_does_not_republish() {
215        assert!(!should_republish_working(
216            "committed",
217            None,
218            Duration::from_secs(45)
219        ));
220    }
221
222    #[test]
223    fn parse_porcelain_handles_modified_and_untracked() {
224        let input = " M src/main.rs\n?? new_file.txt\nM  src/lib.rs\n";
225        let parsed = parse_porcelain(input);
226        assert_eq!(
227            parsed,
228            vec![
229                "new_file.txt".to_string(),
230                "src/lib.rs".to_string(),
231                "src/main.rs".to_string(),
232            ]
233        );
234    }
235
236    #[test]
237    fn parse_porcelain_handles_renames() {
238        let input = "R  old.rs -> new.rs\n";
239        let parsed = parse_porcelain(input);
240        assert_eq!(parsed, vec!["new.rs".to_string(), "old.rs".to_string()]);
241    }
242
243    #[test]
244    fn parse_porcelain_empty_is_empty_vec() {
245        assert!(parse_porcelain("").is_empty());
246    }
247
248    #[test]
249    fn parse_porcelain_dedupes() {
250        let input = " M a.rs\n M a.rs\n";
251        let parsed = parse_porcelain(input);
252        assert_eq!(parsed, vec!["a.rs".to_string()]);
253    }
254
255    fn init_test_repo(dir: &std::path::Path) {
256        use std::process::Command;
257        let run = |args: &[&str]| {
258            Command::new("git")
259                .args(args)
260                .current_dir(dir)
261                .output()
262                .expect("git command failed");
263        };
264        run(&["init", "-q", "-b", "main"]);
265        run(&["config", "user.email", "test@example.com"]);
266        run(&["config", "user.name", "test"]);
267        run(&["commit", "--allow-empty", "-m", "root", "-q"]);
268    }
269
270    #[tokio::test(flavor = "current_thread")]
271    #[serial_test::serial]
272    async fn run_git_status_detects_new_file() {
273        let tmp = tempfile::tempdir().unwrap();
274        init_test_repo(tmp.path());
275        std::fs::write(tmp.path().join("hello.txt"), "hi").unwrap();
276        let result = run_git_status(tmp.path()).await.unwrap();
277        assert!(
278            result.iter().any(|p| p == "hello.txt"),
279            "expected hello.txt in {result:?}"
280        );
281    }
282
283    /// Spec scenario: multiple writes within the TTL republish `working`
284    /// exactly once (the 2s poll coalesces a burst into one tick).
285    #[tokio::test(flavor = "current_thread")]
286    #[serial_test::serial]
287    async fn watch_worktree_burst_republishes_working_once() {
288        use crate::broker::BrokerState;
289        use crate::broker::messages::{ArtifactPayload, BrokerMessage};
290
291        let tmp = tempfile::tempdir().unwrap();
292        init_test_repo(tmp.path());
293
294        let state = Arc::new(BrokerState::new(None));
295        super::delivery::publish_message(
296            &state,
297            &BrokerMessage::Artifact {
298                agent_id: "feat-b".to_string(),
299                payload: ArtifactPayload {
300                    status: "committed".to_string(),
301                    exports: vec![],
302                    modified_files: vec![],
303                },
304            },
305        );
306
307        let (tx, rx) = tokio::sync::watch::channel(false);
308        let target = WatchTarget {
309            agent_id: "feat-b".to_string(),
310            cli: "claude".to_string(),
311            worktree_path: tmp.path().to_path_buf(),
312        };
313        let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
314
315        // Burst: ten files written well within one poll interval.
316        tokio::time::sleep(Duration::from_millis(300)).await;
317        for i in 0..10 {
318            std::fs::write(tmp.path().join(format!("f{i}.rs")), "x").unwrap();
319        }
320
321        tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
322
323        let working_count = {
324            let inner = state.read();
325            inner
326                .message_log
327                .iter()
328                .filter(|(_, _, m)| {
329                    matches!(m, BrokerMessage::Status { agent_id, payload }
330                        if agent_id == "feat-b" && payload.status == "working")
331                })
332                .count()
333        };
334        assert_eq!(
335            working_count, 1,
336            "a burst of writes within one poll interval must republish working exactly once"
337        );
338
339        let _ = tx.send(true);
340        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
341    }
342
343    #[tokio::test(flavor = "current_thread")]
344    #[serial_test::serial]
345    async fn run_git_status_respects_gitignore() {
346        let tmp = tempfile::tempdir().unwrap();
347        init_test_repo(tmp.path());
348        std::fs::write(tmp.path().join(".gitignore"), "target/\n").unwrap();
349        std::fs::create_dir(tmp.path().join("target")).unwrap();
350        std::fs::write(tmp.path().join("target").join("build.o"), "x").unwrap();
351        let result = run_git_status(tmp.path()).await.unwrap();
352        assert!(
353            !result.iter().any(|p| p.starts_with("target/")),
354            "target/ should be filtered by gitignore, got {result:?}"
355        );
356    }
357
358    #[tokio::test(flavor = "current_thread")]
359    #[serial_test::serial]
360    async fn watch_worktree_publishes_on_change() {
361        use crate::broker::BrokerState;
362        let tmp = tempfile::tempdir().unwrap();
363        init_test_repo(tmp.path());
364
365        let state = Arc::new(BrokerState::new(None));
366        let (tx, rx) = tokio::sync::watch::channel(false);
367        let target = WatchTarget {
368            agent_id: "feat-x".to_string(),
369            cli: "claude".to_string(),
370            worktree_path: tmp.path().to_path_buf(),
371        };
372        let state_clone = Arc::clone(&state);
373        let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
374
375        // Give the first tick a chance, then create a file.
376        tokio::time::sleep(Duration::from_millis(300)).await;
377        std::fs::write(tmp.path().join("change.txt"), "hello").unwrap();
378
379        // Wait long enough for at least two poll intervals.
380        tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
381
382        let msg = {
383            let inner = state.read();
384            let record = inner
385                .agents
386                .get("feat-x")
387                .expect("watcher should register the agent");
388            record
389                .last_message
390                .clone()
391                .expect("watcher should publish a message")
392        };
393        match msg {
394            BrokerMessage::Status { agent_id, payload } => {
395                assert_eq!(agent_id, "feat-x");
396                assert!(payload.modified_files.iter().any(|p| p == "change.txt"));
397            }
398            other => panic!("expected Status message, got {other:?}"),
399        }
400
401        let _ = tx.send(true);
402        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
403    }
404
405    /// Spec scenario (task 5.5): an agent commits, the broker records
406    /// `committed`, then the agent keeps editing — the watcher re-publishes
407    /// `working`, so the record transitions `committed` -> `working`.
408    #[tokio::test(flavor = "current_thread")]
409    #[serial_test::serial]
410    async fn watch_worktree_reenters_working_after_commit() {
411        use crate::broker::BrokerState;
412        use crate::broker::messages::{ArtifactPayload, BrokerMessage};
413
414        let tmp = tempfile::tempdir().unwrap();
415        init_test_repo(tmp.path());
416
417        let state = Arc::new(BrokerState::new(None));
418        // Record a committed artifact so the agent is in the committed state
419        // with a fresh last_committed_at (default TTL is 60s).
420        super::delivery::publish_message(
421            &state,
422            &BrokerMessage::Artifact {
423                agent_id: "feat-x".to_string(),
424                payload: ArtifactPayload {
425                    status: "committed".to_string(),
426                    exports: vec![],
427                    modified_files: vec![],
428                },
429            },
430        );
431        assert_eq!(state.read().agents["feat-x"].status, "committed");
432
433        let (tx, rx) = tokio::sync::watch::channel(false);
434        let target = WatchTarget {
435            agent_id: "feat-x".to_string(),
436            cli: "claude".to_string(),
437            worktree_path: tmp.path().to_path_buf(),
438        };
439        let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
440
441        // Agent keeps editing after the commit.
442        tokio::time::sleep(Duration::from_millis(300)).await;
443        std::fs::write(tmp.path().join("more_work.rs"), "fn extra() {}").unwrap();
444
445        tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
446
447        assert_eq!(
448            state.read().agents["feat-x"].status,
449            "working",
450            "watcher must re-enter working after a post-commit edit within TTL"
451        );
452
453        let _ = tx.send(true);
454        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
455    }
456
457    /// With TTL=0 the post-commit edit must NOT re-publish `working` — the
458    /// dashboard keeps showing `committed` (v0.5.0 opt-out).
459    #[tokio::test(flavor = "current_thread")]
460    #[serial_test::serial]
461    async fn watch_worktree_does_not_reenter_when_ttl_zero() {
462        use crate::broker::BrokerState;
463        use crate::broker::messages::{ArtifactPayload, BrokerMessage};
464
465        let tmp = tempfile::tempdir().unwrap();
466        init_test_repo(tmp.path());
467
468        let state = Arc::new(BrokerState::new(None));
469        state.set_republish_working_ttl(Duration::ZERO);
470        super::delivery::publish_message(
471            &state,
472            &BrokerMessage::Artifact {
473                agent_id: "feat-z".to_string(),
474                payload: ArtifactPayload {
475                    status: "committed".to_string(),
476                    exports: vec![],
477                    modified_files: vec![],
478                },
479            },
480        );
481
482        let (tx, rx) = tokio::sync::watch::channel(false);
483        let target = WatchTarget {
484            agent_id: "feat-z".to_string(),
485            cli: "claude".to_string(),
486            worktree_path: tmp.path().to_path_buf(),
487        };
488        let handle = tokio::spawn(watch_worktree(Arc::clone(&state), target, rx));
489
490        tokio::time::sleep(Duration::from_millis(300)).await;
491        std::fs::write(tmp.path().join("more_work.rs"), "fn extra() {}").unwrap();
492        tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
493
494        assert_eq!(
495            state.read().agents["feat-z"].status,
496            "committed",
497            "with TTL=0 the watcher must not re-enter working after commit"
498        );
499
500        let _ = tx.send(true);
501        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
502    }
503
504    #[tokio::test(flavor = "current_thread")]
505    #[serial_test::serial]
506    async fn watch_worktree_does_not_publish_when_unchanged() {
507        use crate::broker::BrokerState;
508        let tmp = tempfile::tempdir().unwrap();
509        init_test_repo(tmp.path());
510
511        let state = Arc::new(BrokerState::new(None));
512        let (tx, rx) = tokio::sync::watch::channel(false);
513        let target = WatchTarget {
514            agent_id: "feat-y".to_string(),
515            cli: "claude".to_string(),
516            worktree_path: tmp.path().to_path_buf(),
517        };
518        let state_clone = Arc::clone(&state);
519        let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
520
521        // Let several ticks elapse with no changes.
522        tokio::time::sleep(POLL_INTERVAL * 2 + Duration::from_millis(200)).await;
523
524        let has_entry = {
525            let inner = state.read();
526            inner.agents.contains_key("feat-y")
527        };
528        assert!(
529            !has_entry,
530            "no publish expected when git status is unchanged"
531        );
532
533        let _ = tx.send(true);
534        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
535    }
536}