Skip to main content

git_paw/broker/
watcher.rs

1//! Git-status polling watcher.
2//!
3//! Each `WatchTarget` spawns one async task that runs `git status --porcelain`
4//! in the worktree every [`POLL_INTERVAL`]. When the set of reported paths
5//! differs from the previous tick, the watcher publishes `agent.status` with
6//! the current paths in `modified_files`.
7//!
8//! The watcher inherits git's exclusion rules (`.gitignore`, `.git/` internals)
9//! instead of maintaining a hand-rolled filter list.
10
11use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14
15use super::messages::{BrokerMessage, StatusPayload};
16use super::{BrokerState, WatchTarget, delivery};
17
18/// Interval between `git status --porcelain` polls.
19pub const POLL_INTERVAL: Duration = Duration::from_secs(2);
20
21/// Parses `git status --porcelain` output into a sorted, deduplicated list of paths.
22///
23/// Each porcelain line looks like `XY PATH` or `XY PATH1 -> PATH2` for renames.
24/// For renames, both the source and destination paths are reported.
25fn parse_porcelain(stdout: &str) -> Vec<String> {
26    let mut paths: Vec<String> = Vec::new();
27    for line in stdout.lines() {
28        if line.len() < 4 {
29            continue;
30        }
31        // Skip the two-character status prefix and the separating space.
32        let rest = &line[3..];
33        if let Some((from, to)) = rest.split_once(" -> ") {
34            paths.push(from.trim().to_string());
35            paths.push(to.trim().to_string());
36        } else {
37            paths.push(rest.trim().to_string());
38        }
39    }
40    paths.sort();
41    paths.dedup();
42    paths
43}
44
45/// Runs `git status --porcelain` in `worktree` and returns the parsed path list.
46///
47/// Returns `None` when git is unavailable or the command fails — callers treat
48/// that as "no change detected this tick" and retry on the next interval.
49async fn run_git_status(worktree: &Path) -> Option<Vec<String>> {
50    let output = tokio::process::Command::new("git")
51        .arg("status")
52        .arg("--porcelain")
53        .current_dir(worktree)
54        .output()
55        .await
56        .ok()?;
57    if !output.status.success() {
58        return None;
59    }
60    let stdout = String::from_utf8_lossy(&output.stdout);
61    Some(parse_porcelain(&stdout))
62}
63
64/// Watches a single worktree, publishing `agent.status` when git-status output changes.
65///
66/// The task runs until the broker's shutdown signal fires. Each iteration waits
67/// [`POLL_INTERVAL`] and then checks `git status --porcelain`. If the result
68/// differs from the previous tick, it publishes a status message.
69pub async fn watch_worktree(
70    state: Arc<BrokerState>,
71    target: WatchTarget,
72    mut shutdown: tokio::sync::watch::Receiver<bool>,
73) {
74    let mut previous: Option<Vec<String>> = None;
75    let mut ticker = tokio::time::interval(POLL_INTERVAL);
76    // Skip the immediate first tick so we wait one interval before the first poll.
77    ticker.tick().await;
78    loop {
79        tokio::select! {
80            _ = ticker.tick() => {}
81            _ = shutdown.changed() => {
82                if *shutdown.borrow() {
83                    break;
84                }
85            }
86        }
87
88        let Some(current) = run_git_status(&target.worktree_path).await else {
89            continue;
90        };
91
92        if previous.as_ref() == Some(&current) {
93            continue;
94        }
95
96        // Skip the very first baseline when the worktree is clean. We only
97        // want to announce the agent once it has actual dirty state; otherwise
98        // a quiet worktree would publish an empty status on startup with no
99        // useful information.
100        if previous.is_none() && current.is_empty() {
101            previous = Some(current);
102            continue;
103        }
104
105        let msg = BrokerMessage::Status {
106            agent_id: target.agent_id.clone(),
107            payload: StatusPayload {
108                status: "working".to_string(),
109                modified_files: current.clone(),
110                message: None,
111            },
112        };
113        delivery::publish_message(&state, &msg);
114        previous = Some(current);
115    }
116}
117
118#[cfg(test)]
119mod tests {
120    use super::*;
121
122    #[test]
123    fn parse_porcelain_handles_modified_and_untracked() {
124        let input = " M src/main.rs\n?? new_file.txt\nM  src/lib.rs\n";
125        let parsed = parse_porcelain(input);
126        assert_eq!(
127            parsed,
128            vec![
129                "new_file.txt".to_string(),
130                "src/lib.rs".to_string(),
131                "src/main.rs".to_string(),
132            ]
133        );
134    }
135
136    #[test]
137    fn parse_porcelain_handles_renames() {
138        let input = "R  old.rs -> new.rs\n";
139        let parsed = parse_porcelain(input);
140        assert_eq!(parsed, vec!["new.rs".to_string(), "old.rs".to_string()]);
141    }
142
143    #[test]
144    fn parse_porcelain_empty_is_empty_vec() {
145        assert!(parse_porcelain("").is_empty());
146    }
147
148    #[test]
149    fn parse_porcelain_dedupes() {
150        let input = " M a.rs\n M a.rs\n";
151        let parsed = parse_porcelain(input);
152        assert_eq!(parsed, vec!["a.rs".to_string()]);
153    }
154
155    fn init_test_repo(dir: &std::path::Path) {
156        use std::process::Command;
157        let run = |args: &[&str]| {
158            Command::new("git")
159                .args(args)
160                .current_dir(dir)
161                .output()
162                .expect("git command failed");
163        };
164        run(&["init", "-q", "-b", "main"]);
165        run(&["config", "user.email", "test@example.com"]);
166        run(&["config", "user.name", "test"]);
167        run(&["commit", "--allow-empty", "-m", "root", "-q"]);
168    }
169
170    #[tokio::test(flavor = "current_thread")]
171    #[serial_test::serial]
172    async fn run_git_status_detects_new_file() {
173        let tmp = tempfile::tempdir().unwrap();
174        init_test_repo(tmp.path());
175        std::fs::write(tmp.path().join("hello.txt"), "hi").unwrap();
176        let result = run_git_status(tmp.path()).await.unwrap();
177        assert!(
178            result.iter().any(|p| p == "hello.txt"),
179            "expected hello.txt in {result:?}"
180        );
181    }
182
183    #[tokio::test(flavor = "current_thread")]
184    #[serial_test::serial]
185    async fn run_git_status_respects_gitignore() {
186        let tmp = tempfile::tempdir().unwrap();
187        init_test_repo(tmp.path());
188        std::fs::write(tmp.path().join(".gitignore"), "target/\n").unwrap();
189        std::fs::create_dir(tmp.path().join("target")).unwrap();
190        std::fs::write(tmp.path().join("target").join("build.o"), "x").unwrap();
191        let result = run_git_status(tmp.path()).await.unwrap();
192        assert!(
193            !result.iter().any(|p| p.starts_with("target/")),
194            "target/ should be filtered by gitignore, got {result:?}"
195        );
196    }
197
198    #[tokio::test(flavor = "current_thread")]
199    #[serial_test::serial]
200    async fn watch_worktree_publishes_on_change() {
201        use crate::broker::BrokerState;
202        let tmp = tempfile::tempdir().unwrap();
203        init_test_repo(tmp.path());
204
205        let state = Arc::new(BrokerState::new(None));
206        let (tx, rx) = tokio::sync::watch::channel(false);
207        let target = WatchTarget {
208            agent_id: "feat-x".to_string(),
209            cli: "claude".to_string(),
210            worktree_path: tmp.path().to_path_buf(),
211        };
212        let state_clone = Arc::clone(&state);
213        let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
214
215        // Give the first tick a chance, then create a file.
216        tokio::time::sleep(Duration::from_millis(300)).await;
217        std::fs::write(tmp.path().join("change.txt"), "hello").unwrap();
218
219        // Wait long enough for at least two poll intervals.
220        tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
221
222        let msg = {
223            let inner = state.read();
224            let record = inner
225                .agents
226                .get("feat-x")
227                .expect("watcher should register the agent");
228            record
229                .last_message
230                .clone()
231                .expect("watcher should publish a message")
232        };
233        match msg {
234            BrokerMessage::Status { agent_id, payload } => {
235                assert_eq!(agent_id, "feat-x");
236                assert!(payload.modified_files.iter().any(|p| p == "change.txt"));
237            }
238            other => panic!("expected Status message, got {other:?}"),
239        }
240
241        let _ = tx.send(true);
242        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
243    }
244
245    #[tokio::test(flavor = "current_thread")]
246    #[serial_test::serial]
247    async fn watch_worktree_does_not_publish_when_unchanged() {
248        use crate::broker::BrokerState;
249        let tmp = tempfile::tempdir().unwrap();
250        init_test_repo(tmp.path());
251
252        let state = Arc::new(BrokerState::new(None));
253        let (tx, rx) = tokio::sync::watch::channel(false);
254        let target = WatchTarget {
255            agent_id: "feat-y".to_string(),
256            cli: "claude".to_string(),
257            worktree_path: tmp.path().to_path_buf(),
258        };
259        let state_clone = Arc::clone(&state);
260        let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
261
262        // Let several ticks elapse with no changes.
263        tokio::time::sleep(POLL_INTERVAL * 2 + Duration::from_millis(200)).await;
264
265        let has_entry = {
266            let inner = state.read();
267            inner.agents.contains_key("feat-y")
268        };
269        assert!(
270            !has_entry,
271            "no publish expected when git status is unchanged"
272        );
273
274        let _ = tx.send(true);
275        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
276    }
277}