git_paw/broker/
watcher.rs1use std::path::Path;
12use std::sync::Arc;
13use std::time::Duration;
14
15use super::messages::{BrokerMessage, StatusPayload};
16use super::{BrokerState, WatchTarget, delivery};
17
18pub const POLL_INTERVAL: Duration = Duration::from_secs(2);
20
21fn parse_porcelain(stdout: &str) -> Vec<String> {
26 let mut paths: Vec<String> = Vec::new();
27 for line in stdout.lines() {
28 if line.len() < 4 {
29 continue;
30 }
31 let rest = &line[3..];
33 if let Some((from, to)) = rest.split_once(" -> ") {
34 paths.push(from.trim().to_string());
35 paths.push(to.trim().to_string());
36 } else {
37 paths.push(rest.trim().to_string());
38 }
39 }
40 paths.sort();
41 paths.dedup();
42 paths
43}
44
45async fn run_git_status(worktree: &Path) -> Option<Vec<String>> {
50 let output = tokio::process::Command::new("git")
51 .arg("status")
52 .arg("--porcelain")
53 .current_dir(worktree)
54 .output()
55 .await
56 .ok()?;
57 if !output.status.success() {
58 return None;
59 }
60 let stdout = String::from_utf8_lossy(&output.stdout);
61 Some(parse_porcelain(&stdout))
62}
63
64pub async fn watch_worktree(
70 state: Arc<BrokerState>,
71 target: WatchTarget,
72 mut shutdown: tokio::sync::watch::Receiver<bool>,
73) {
74 let mut previous: Option<Vec<String>> = None;
75 let mut ticker = tokio::time::interval(POLL_INTERVAL);
76 ticker.tick().await;
78 loop {
79 tokio::select! {
80 _ = ticker.tick() => {}
81 _ = shutdown.changed() => {
82 if *shutdown.borrow() {
83 break;
84 }
85 }
86 }
87
88 let Some(current) = run_git_status(&target.worktree_path).await else {
89 continue;
90 };
91
92 if previous.as_ref() == Some(¤t) {
93 continue;
94 }
95
96 if previous.is_none() && current.is_empty() {
101 previous = Some(current);
102 continue;
103 }
104
105 let msg = BrokerMessage::Status {
106 agent_id: target.agent_id.clone(),
107 payload: StatusPayload {
108 status: "working".to_string(),
109 modified_files: current.clone(),
110 message: None,
111 ..Default::default()
112 },
113 };
114 delivery::publish_message(&state, &msg);
115 previous = Some(current);
116 }
117}
118
119#[cfg(test)]
120mod tests {
121 use super::*;
122
123 #[test]
124 fn parse_porcelain_handles_modified_and_untracked() {
125 let input = " M src/main.rs\n?? new_file.txt\nM src/lib.rs\n";
126 let parsed = parse_porcelain(input);
127 assert_eq!(
128 parsed,
129 vec![
130 "new_file.txt".to_string(),
131 "src/lib.rs".to_string(),
132 "src/main.rs".to_string(),
133 ]
134 );
135 }
136
137 #[test]
138 fn parse_porcelain_handles_renames() {
139 let input = "R old.rs -> new.rs\n";
140 let parsed = parse_porcelain(input);
141 assert_eq!(parsed, vec!["new.rs".to_string(), "old.rs".to_string()]);
142 }
143
144 #[test]
145 fn parse_porcelain_empty_is_empty_vec() {
146 assert!(parse_porcelain("").is_empty());
147 }
148
149 #[test]
150 fn parse_porcelain_dedupes() {
151 let input = " M a.rs\n M a.rs\n";
152 let parsed = parse_porcelain(input);
153 assert_eq!(parsed, vec!["a.rs".to_string()]);
154 }
155
156 fn init_test_repo(dir: &std::path::Path) {
157 use std::process::Command;
158 let run = |args: &[&str]| {
159 Command::new("git")
160 .args(args)
161 .current_dir(dir)
162 .output()
163 .expect("git command failed");
164 };
165 run(&["init", "-q", "-b", "main"]);
166 run(&["config", "user.email", "test@example.com"]);
167 run(&["config", "user.name", "test"]);
168 run(&["commit", "--allow-empty", "-m", "root", "-q"]);
169 }
170
171 #[tokio::test(flavor = "current_thread")]
172 #[serial_test::serial]
173 async fn run_git_status_detects_new_file() {
174 let tmp = tempfile::tempdir().unwrap();
175 init_test_repo(tmp.path());
176 std::fs::write(tmp.path().join("hello.txt"), "hi").unwrap();
177 let result = run_git_status(tmp.path()).await.unwrap();
178 assert!(
179 result.iter().any(|p| p == "hello.txt"),
180 "expected hello.txt in {result:?}"
181 );
182 }
183
184 #[tokio::test(flavor = "current_thread")]
185 #[serial_test::serial]
186 async fn run_git_status_respects_gitignore() {
187 let tmp = tempfile::tempdir().unwrap();
188 init_test_repo(tmp.path());
189 std::fs::write(tmp.path().join(".gitignore"), "target/\n").unwrap();
190 std::fs::create_dir(tmp.path().join("target")).unwrap();
191 std::fs::write(tmp.path().join("target").join("build.o"), "x").unwrap();
192 let result = run_git_status(tmp.path()).await.unwrap();
193 assert!(
194 !result.iter().any(|p| p.starts_with("target/")),
195 "target/ should be filtered by gitignore, got {result:?}"
196 );
197 }
198
199 #[tokio::test(flavor = "current_thread")]
200 #[serial_test::serial]
201 async fn watch_worktree_publishes_on_change() {
202 use crate::broker::BrokerState;
203 let tmp = tempfile::tempdir().unwrap();
204 init_test_repo(tmp.path());
205
206 let state = Arc::new(BrokerState::new(None));
207 let (tx, rx) = tokio::sync::watch::channel(false);
208 let target = WatchTarget {
209 agent_id: "feat-x".to_string(),
210 cli: "claude".to_string(),
211 worktree_path: tmp.path().to_path_buf(),
212 };
213 let state_clone = Arc::clone(&state);
214 let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
215
216 tokio::time::sleep(Duration::from_millis(300)).await;
218 std::fs::write(tmp.path().join("change.txt"), "hello").unwrap();
219
220 tokio::time::sleep(POLL_INTERVAL + Duration::from_millis(800)).await;
222
223 let msg = {
224 let inner = state.read();
225 let record = inner
226 .agents
227 .get("feat-x")
228 .expect("watcher should register the agent");
229 record
230 .last_message
231 .clone()
232 .expect("watcher should publish a message")
233 };
234 match msg {
235 BrokerMessage::Status { agent_id, payload } => {
236 assert_eq!(agent_id, "feat-x");
237 assert!(payload.modified_files.iter().any(|p| p == "change.txt"));
238 }
239 other => panic!("expected Status message, got {other:?}"),
240 }
241
242 let _ = tx.send(true);
243 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
244 }
245
246 #[tokio::test(flavor = "current_thread")]
247 #[serial_test::serial]
248 async fn watch_worktree_does_not_publish_when_unchanged() {
249 use crate::broker::BrokerState;
250 let tmp = tempfile::tempdir().unwrap();
251 init_test_repo(tmp.path());
252
253 let state = Arc::new(BrokerState::new(None));
254 let (tx, rx) = tokio::sync::watch::channel(false);
255 let target = WatchTarget {
256 agent_id: "feat-y".to_string(),
257 cli: "claude".to_string(),
258 worktree_path: tmp.path().to_path_buf(),
259 };
260 let state_clone = Arc::clone(&state);
261 let handle = tokio::spawn(watch_worktree(state_clone, target, rx));
262
263 tokio::time::sleep(POLL_INTERVAL * 2 + Duration::from_millis(200)).await;
265
266 let has_entry = {
267 let inner = state.read();
268 inner.agents.contains_key("feat-y")
269 };
270 assert!(
271 !has_entry,
272 "no publish expected when git status is unchanged"
273 );
274
275 let _ = tx.send(true);
276 let _ = tokio::time::timeout(Duration::from_secs(3), handle).await;
277 }
278}