git_paw/broker/
watcher.rs1use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14
15use super::messages::{BrokerMessage, StatusPayload};
16use super::{BrokerState, WatchTarget, delivery};
17
18pub const POLL_INTERVAL: Duration = Duration::from_secs(2);
20
21fn parse_porcelain(stdout: &str) -> Vec<String> {
26 let mut paths: Vec<String> = Vec::new();
27 for line in stdout.lines() {
28 if line.len() < 4 {
29 continue;
30 }
31 let rest = &line[3..];
33 if let Some((from, to)) = rest.split_once(" -> ") {
34 paths.push(from.trim().to_string());
35 paths.push(to.trim().to_string());
36 } else {
37 paths.push(rest.trim().to_string());
38 }
39 }
40 paths.sort();
41 paths.dedup();
42 paths
43}
44
45async fn run_git_status(worktree: &Path) -> Option<Vec<String>> {
50 let output = tokio::process::Command::new("git")
51 .arg("status")
52 .arg("--porcelain")
53 .current_dir(worktree)
54 .output()
55 .await
56 .ok()?;
57 if !output.status.success() {
58 return None;
59 }
60 let stdout = String::from_utf8_lossy(&output.stdout);
61 Some(parse_porcelain(&stdout))
62}
63
64pub async fn watch_worktree(
70 state: Arc<BrokerState>,
71 target: WatchTarget,
72 mut shutdown: tokio::sync::watch::Receiver<bool>,
73) {
74 let mut previous: Option<Vec<String>> = None;
75 let mut ticker = tokio::time::interval(POLL_INTERVAL);
76 ticker.tick().await;
78 loop {
79 tokio::select! {
80 _ = ticker.tick() => {}
81 _ = shutdown.changed() => {
82 if *shutdown.borrow() {
83 break;
84 }
85 }
86 }
87
88 let Some(current) = run_git_status(&target.worktree_path).await else {
89 continue;
90 };
91
92 if previous.as_ref() == Some(¤t) {
93 continue;
94 }
95
96 if previous.is_none() && current.is_empty() {
101 previous = Some(current);
102 continue;
103 }
104
105 let msg = BrokerMessage::Status {
106 agent_id: target.agent_id.clone(),
107 payload: StatusPayload {
108 status: "working".to_string(),
109 modified_files: current.clone(),
110 message: None,
111 },
112 };
113 delivery::publish_message(&state, &msg);
114 previous = Some(current);
115 }
116}
117
118#[cfg(test)]
119mod tests {
120 use super::*;
121
122 #[test]
123 fn parse_porcelain_handles_modified_and_untracked() {
124 let input = " M src/main.rs\n?? new_file.txt\nM src/lib.rs\n";
125 let parsed = parse_porcelain(input);
126 assert_eq!(
127 parsed,
128 vec![
129 "new_file.txt".to_string(),
130 "src/lib.rs".to_string(),
131 "src/main.rs".to_string(),
132 ]
133 );
134 }
135
136 #[test]
137 fn parse_porcelain_handles_renames() {
138 let input = "R old.rs -> new.rs\n";
139 let parsed = parse_porcelain(input);
140 assert_eq!(parsed, vec!["new.rs".to_string(), "old.rs".to_string()]);
141 }
142
143 #[test]
144 fn parse_porcelain_empty_is_empty_vec() {
145 assert!(parse_porcelain("").is_empty());
146 }
147
148 #[test]
149 fn parse_porcelain_dedupes() {
150 let input = " M a.rs\n M a.rs\n";
151 let parsed = parse_porcelain(input);
152 assert_eq!(parsed, vec!["a.rs".to_string()]);
153 }
154
155 fn init_test_repo(dir: &std::path::Path) {
156 use std::process::Command;
157 let run = |args: &[&str]| {
158 Command::new("git")
159 .args(args)
160 .current_dir(dir)
161 .output()
162 .expect("git command failed");
163 };
164 run(&["init", "-q", "-b", "main"]);
165 run(&["config", "user.email", "test@example.com"]);
166 run(&["config", "user.name", "test"]);
167 run(&["commit", "--allow-empty", "-m", "root", "-q"]);
168 }
169
170 #[tokio::test(flavor = "current_thread")]
171 #[serial_test::serial]
172 async fn run_git_status_detects_new_file() {
173 let tmp = tempfile::tempdir().unwrap();
174 init_test_repo(tmp.path());
175 std::fs::write(tmp.path().join("hello.txt"), "hi").unwrap();
176 let result = run_git_status(tmp.path()).await.unwrap();
177 assert!(
178 result.iter().any(|p| p == "hello.txt"),
179 "expected hello.txt in {result:?}"
180 );
181 }
182
183 #[tokio::test(flavor = "current_thread")]
184 #[serial_test::serial]
185 async fn run_git_status_respects_gitignore() {
186 let tmp = tempfile::tempdir().unwrap();
187 init_test_repo(tmp.path());
188 std::fs::write(tmp.path().join(".gitignore"), "target/\n").unwrap();
189 std::fs::create_dir(tmp.path().join("target")).unwrap();
190 std::fs::write(tmp.path().join("target").join("build.o"), "x").unwrap();
191 let result = run_git_status(tmp.path()).await.unwrap();
192 assert!(
193 !result.iter().any(|p| p.starts_with("target/")),
194 "target/ should be filtered by gitignore, got {result:?}"
195 );
196 }
197
198 #[tokio::test(flavor = "current_thread")]
199 #[serial_test::serial]
200 async fn watch_worktree_publishes_on_change() {
201 use crate::broker::BrokerState;
202 let tmp = tempfile::tempdir().unwrap();
203 init_test_repo(tmp.path());
204
205 let state = Arc::new(BrokerState::new(None));
206 let (tx, rx) = tokio::sync::watch::channel(false);
207 let target = WatchTarget {
208 agent_id: "feat-x".to_string(),
209 cli: "claude".to_string(),
210 worktree_path: tmp.path().to_path_buf(),
211 };
212 let state_clone = Arc::clone(&state);
213 let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
214
215 tokio::time::sleep(Duration::from_millis(300)).await;
217 std::fs::write(tmp.path().join("change.txt"), "hello").unwrap();
218
219 tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
221
222 let msg = {
223 let inner = state.read();
224 let record = inner
225 .agents
226 .get("feat-x")
227 .expect("watcher should register the agent");
228 record
229 .last_message
230 .clone()
231 .expect("watcher should publish a message")
232 };
233 match msg {
234 BrokerMessage::Status { agent_id, payload } => {
235 assert_eq!(agent_id, "feat-x");
236 assert!(payload.modified_files.iter().any(|p| p == "change.txt"));
237 }
238 other => panic!("expected Status message, got {other:?}"),
239 }
240
241 let _ = tx.send(true);
242 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
243 }
244
245 #[tokio::test(flavor = "current_thread")]
246 #[serial_test::serial]
247 async fn watch_worktree_does_not_publish_when_unchanged() {
248 use crate::broker::BrokerState;
249 let tmp = tempfile::tempdir().unwrap();
250 init_test_repo(tmp.path());
251
252 let state = Arc::new(BrokerState::new(None));
253 let (tx, rx) = tokio::sync::watch::channel(false);
254 let target = WatchTarget {
255 agent_id: "feat-y".to_string(),
256 cli: "claude".to_string(),
257 worktree_path: tmp.path().to_path_buf(),
258 };
259 let state_clone = Arc::clone(&state);
260 let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
261
262 tokio::time::sleep(POLL_INTERVAL * 2 + Duration::from_millis(200)).await;
264
265 let has_entry = {
266 let inner = state.read();
267 inner.agents.contains_key("feat-y")
268 };
269 assert!(
270 !has_entry,
271 "no publish expected when git status is unchanged"
272 );
273
274 let _ = tx.send(true);
275 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
276 }
277}