Skip to main content

git_paw/broker/
watcher.rs

1//! Git-status polling watcher.
2//!
3//! Each `WatchTarget` spawns one async task that runs `git status --porcelain`
4//! in the worktree every [`POLL_INTERVAL`]. When the set of reported paths
5//! differs from the previous tick, the watcher publishes `agent.status` with
6//! the current paths in `modified_files`.
7//!
8//! The watcher inherits git's exclusion rules (`.gitignore`, `.git/` internals)
9//! instead of maintaining a hand-rolled filter list.
10
11use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14
15use super::messages::{BrokerMessage, StatusPayload};
16use super::{BrokerState, WatchTarget, delivery};
17
18/// Interval between `git status --porcelain` polls.
19pub const POLL_INTERVAL: Duration = Duration::from_secs(2);
20
21/// Parses `git status --porcelain` output into a sorted, deduplicated list of paths.
22///
23/// Each porcelain line looks like `XY PATH` or `XY PATH1 -> PATH2` for renames.
24/// For renames, both the source and destination paths are reported.
25fn parse_porcelain(stdout: &str) -> Vec<String> {
26    let mut paths: Vec<String> = Vec::new();
27    for line in stdout.lines() {
28        if line.len() < 4 {
29            continue;
30        }
31        // Skip the two-character status prefix and the separating space.
32        let rest = &line[3..];
33        if let Some((from, to)) = rest.split_once(" -> ") {
34            paths.push(from.trim().to_string());
35            paths.push(to.trim().to_string());
36        } else {
37            paths.push(rest.trim().to_string());
38        }
39    }
40    paths.sort();
41    paths.dedup();
42    paths
43}
44
45/// Runs `git status --porcelain` in `worktree` and returns the parsed path list.
46///
47/// Returns `None` when git is unavailable or the command fails — callers treat
48/// that as "no change detected this tick" and retry on the next interval.
49async fn run_git_status(worktree: &Path) -> Option<Vec<String>> {
50    let output = tokio::process::Command::new("git")
51        .arg("status")
52        .arg("--porcelain")
53        .current_dir(worktree)
54        .output()
55        .await
56        .ok()?;
57    if !output.status.success() {
58        return None;
59    }
60    let stdout = String::from_utf8_lossy(&output.stdout);
61    Some(parse_porcelain(&stdout))
62}
63
64/// Watches a single worktree, publishing `agent.status` when git-status output changes.
65///
66/// The task runs until the broker's shutdown signal fires. Each iteration waits
67/// [`POLL_INTERVAL`] and then checks `git status --porcelain`. If the result
68/// differs from the previous tick, it publishes a status message.
69pub async fn watch_worktree(
70    state: Arc<BrokerState>,
71    target: WatchTarget,
72    mut shutdown: tokio::sync::watch::Receiver<bool>,
73) {
74    let mut previous: Option<Vec<String>> = None;
75    let mut ticker = tokio::time::interval(POLL_INTERVAL);
76    // Skip the immediate first tick so we wait one interval before the first poll.
77    ticker.tick().await;
78    loop {
79        tokio::select! {
80            _ = ticker.tick() => {}
81            _ = shutdown.changed() => {
82                if *shutdown.borrow() {
83                    break;
84                }
85            }
86        }
87
88        let Some(current) = run_git_status(&target.worktree_path).await else {
89            continue;
90        };
91
92        if previous.as_ref() == Some(&current) {
93            continue;
94        }
95
96        // Skip the very first baseline when the worktree is clean. We only
97        // want to announce the agent once it has actual dirty state; otherwise
98        // a quiet worktree would publish an empty status on startup with no
99        // useful information.
100        if previous.is_none() && current.is_empty() {
101            previous = Some(current);
102            continue;
103        }
104
105        let msg = BrokerMessage::Status {
106            agent_id: target.agent_id.clone(),
107            payload: StatusPayload {
108                status: "working".to_string(),
109                modified_files: current.clone(),
110                message: None,
111                ..Default::default()
112            },
113        };
114        delivery::publish_message(&state, &msg);
115        previous = Some(current);
116    }
117}
118
119#[cfg(test)]
120mod tests {
121    use super::*;
122
123    #[test]
124    fn parse_porcelain_handles_modified_and_untracked() {
125        let input = " M src/main.rs\n?? new_file.txt\nM  src/lib.rs\n";
126        let parsed = parse_porcelain(input);
127        assert_eq!(
128            parsed,
129            vec![
130                "new_file.txt".to_string(),
131                "src/lib.rs".to_string(),
132                "src/main.rs".to_string(),
133            ]
134        );
135    }
136
137    #[test]
138    fn parse_porcelain_handles_renames() {
139        let input = "R  old.rs -> new.rs\n";
140        let parsed = parse_porcelain(input);
141        assert_eq!(parsed, vec!["new.rs".to_string(), "old.rs".to_string()]);
142    }
143
144    #[test]
145    fn parse_porcelain_empty_is_empty_vec() {
146        assert!(parse_porcelain("").is_empty());
147    }
148
149    #[test]
150    fn parse_porcelain_dedupes() {
151        let input = " M a.rs\n M a.rs\n";
152        let parsed = parse_porcelain(input);
153        assert_eq!(parsed, vec!["a.rs".to_string()]);
154    }
155
156    fn init_test_repo(dir: &std::path::Path) {
157        use std::process::Command;
158        let run = |args: &[&str]| {
159            Command::new("git")
160                .args(args)
161                .current_dir(dir)
162                .output()
163                .expect("git command failed");
164        };
165        run(&["init", "-q", "-b", "main"]);
166        run(&["config", "user.email", "test@example.com"]);
167        run(&["config", "user.name", "test"]);
168        run(&["commit", "--allow-empty", "-m", "root", "-q"]);
169    }
170
171    #[tokio::test(flavor = "current_thread")]
172    #[serial_test::serial]
173    async fn run_git_status_detects_new_file() {
174        let tmp = tempfile::tempdir().unwrap();
175        init_test_repo(tmp.path());
176        std::fs::write(tmp.path().join("hello.txt"), "hi").unwrap();
177        let result = run_git_status(tmp.path()).await.unwrap();
178        assert!(
179            result.iter().any(|p| p == "hello.txt"),
180            "expected hello.txt in {result:?}"
181        );
182    }
183
184    #[tokio::test(flavor = "current_thread")]
185    #[serial_test::serial]
186    async fn run_git_status_respects_gitignore() {
187        let tmp = tempfile::tempdir().unwrap();
188        init_test_repo(tmp.path());
189        std::fs::write(tmp.path().join(".gitignore"), "target/\n").unwrap();
190        std::fs::create_dir(tmp.path().join("target")).unwrap();
191        std::fs::write(tmp.path().join("target").join("build.o"), "x").unwrap();
192        let result = run_git_status(tmp.path()).await.unwrap();
193        assert!(
194            !result.iter().any(|p| p.starts_with("target/")),
195            "target/ should be filtered by gitignore, got {result:?}"
196        );
197    }
198
199    #[tokio::test(flavor = "current_thread")]
200    #[serial_test::serial]
201    async fn watch_worktree_publishes_on_change() {
202        use crate::broker::BrokerState;
203        let tmp = tempfile::tempdir().unwrap();
204        init_test_repo(tmp.path());
205
206        let state = Arc::new(BrokerState::new(None));
207        let (tx, rx) = tokio::sync::watch::channel(false);
208        let target = WatchTarget {
209            agent_id: "feat-x".to_string(),
210            cli: "claude".to_string(),
211            worktree_path: tmp.path().to_path_buf(),
212        };
213        let state_clone = Arc::clone(&state);
214        let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
215
216        // Give the first tick a chance, then create a file.
217        tokio::time::sleep(Duration::from_millis(300)).await;
218        std::fs::write(tmp.path().join("change.txt"), "hello").unwrap();
219
220        // Wait long enough for at least two poll intervals.
221        tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
222
223        let msg = {
224            let inner = state.read();
225            let record = inner
226                .agents
227                .get("feat-x")
228                .expect("watcher should register the agent");
229            record
230                .last_message
231                .clone()
232                .expect("watcher should publish a message")
233        };
234        match msg {
235            BrokerMessage::Status { agent_id, payload } => {
236                assert_eq!(agent_id, "feat-x");
237                assert!(payload.modified_files.iter().any(|p| p == "change.txt"));
238            }
239            other => panic!("expected Status message, got {other:?}"),
240        }
241
242        let _ = tx.send(true);
243        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
244    }
245
246    #[tokio::test(flavor = "current_thread")]
247    #[serial_test::serial]
248    async fn watch_worktree_does_not_publish_when_unchanged() {
249        use crate::broker::BrokerState;
250        let tmp = tempfile::tempdir().unwrap();
251        init_test_repo(tmp.path());
252
253        let state = Arc::new(BrokerState::new(None));
254        let (tx, rx) = tokio::sync::watch::channel(false);
255        let target = WatchTarget {
256            agent_id: "feat-y".to_string(),
257            cli: "claude".to_string(),
258            worktree_path: tmp.path().to_path_buf(),
259        };
260        let state_clone = Arc::clone(&state);
261        let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
262
263        // Let several ticks elapse with no changes.
264        tokio::time::sleep(POLL_INTERVAL * 2 + Duration::from_millis(200)).await;
265
266        let has_entry = {
267            let inner = state.read();
268            inner.agents.contains_key("feat-y")
269        };
270        assert!(
271            !has_entry,
272            "no publish expected when git status is unchanged"
273        );
274
275        let _ = tx.send(true);
276        let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
277    }
278}