1use anyhow::{Context, Result};
16use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
17use std::collections::BTreeSet;
18use std::path::PathBuf;
19use std::sync::mpsc as std_mpsc;
20use std::time::{Duration, Instant};
21use tokio::sync::mpsc;
22
23#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum ChangeKind {
28 RustCode,
31 CargoToml,
34 Other,
37}
38
39#[derive(Debug, Clone)]
41pub struct Change {
42 pub kind: ChangeKind,
43 pub paths: Vec<PathBuf>,
44}
45
46impl Change {
47 pub fn classify(paths: Vec<PathBuf>) -> Self {
49 let kind = if paths.iter().any(|p| {
50 matches!(
51 p.file_name().and_then(|n| n.to_str()),
52 Some("Cargo.toml") | Some("Cargo.lock"),
53 )
54 }) {
55 ChangeKind::CargoToml
56 } else if paths
57 .iter()
58 .any(|p| p.extension().and_then(|e| e.to_str()) == Some("rs"))
59 {
60 ChangeKind::RustCode
61 } else {
62 ChangeKind::Other
63 };
64 Self { kind, paths }
65 }
66}
67
68pub fn spawn_watcher(
80 roots: Vec<PathBuf>,
81 debounce: Duration,
82 tx: mpsc::Sender<Change>,
83) -> Result<RecommendedWatcher> {
84 if roots.is_empty() {
85 anyhow::bail!("spawn_watcher: no roots to watch");
86 }
87 let (raw_tx, raw_rx) = std_mpsc::channel::<Event>();
88 let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| {
89 if let Ok(ev) = res {
90 let _ = raw_tx.send(ev);
93 }
94 })
95 .context("create notify watcher")?;
96 let mut attached = 0;
97 for root in &roots {
98 match watcher.watch(root, RecursiveMode::Recursive) {
99 Ok(()) => attached += 1,
100 Err(e) => whisker_build::ui::warn(format!("skip watch {}: {e}", root.display())),
101 }
102 }
103 if attached == 0 {
104 anyhow::bail!(
105 "spawn_watcher: no roots successfully attached (of {})",
106 roots.len()
107 );
108 }
109
110 std::thread::Builder::new()
113 .name("whisker-dev-watch".into())
114 .spawn(move || debounce_loop(raw_rx, debounce, tx))
115 .context("spawn debounce thread")?;
116
117 Ok(watcher)
118}
119
120fn debounce_loop(raw_rx: std_mpsc::Receiver<Event>, debounce: Duration, tx: mpsc::Sender<Change>) {
121 let mut pending: BTreeSet<PathBuf> = BTreeSet::new();
122 let mut deadline: Option<Instant> = None;
123
124 loop {
125 let block_for = match deadline {
126 Some(d) => d.saturating_duration_since(Instant::now()),
129 None => Duration::from_secs(60 * 60),
131 };
132
133 match raw_rx.recv_timeout(block_for) {
134 Ok(ev) if is_interesting(&ev.kind) => {
135 for p in ev.paths {
136 pending.insert(p);
137 }
138 deadline = Some(Instant::now() + debounce);
139 }
140 Ok(_) => {} Err(std_mpsc::RecvTimeoutError::Timeout) => {
142 if pending.is_empty() {
143 continue;
144 }
145 let paths: Vec<_> = std::mem::take(&mut pending).into_iter().collect();
146 deadline = None;
147 let change = Change::classify(paths);
148 if tx.blocking_send(change).is_err() {
149 return; }
151 }
152 Err(std_mpsc::RecvTimeoutError::Disconnected) => return,
153 }
154 }
155}
156
157fn is_interesting(k: &EventKind) -> bool {
158 use notify::event::{CreateKind, ModifyKind, RemoveKind};
159 matches!(
160 k,
161 EventKind::Create(CreateKind::File)
162 | EventKind::Modify(ModifyKind::Data(_))
163 | EventKind::Modify(ModifyKind::Any)
164 | EventKind::Modify(ModifyKind::Name(_))
165 | EventKind::Remove(RemoveKind::File)
166 )
167}
168
169#[cfg(test)]
174mod tests {
175 use super::*;
176 use std::sync::atomic::{AtomicU64, Ordering};
177
178 #[test]
181 fn classify_picks_cargo_toml_over_rust_code() {
182 let c = Change::classify(vec![
183 "/tmp/foo/src/lib.rs".into(),
184 "/tmp/foo/Cargo.toml".into(),
185 ]);
186 assert_eq!(c.kind, ChangeKind::CargoToml);
187 }
188
189 #[test]
190 fn classify_picks_rust_code_when_no_cargo_toml() {
191 let c = Change::classify(vec![
192 "/tmp/foo/src/lib.rs".into(),
193 "/tmp/foo/src/app.rs".into(),
194 ]);
195 assert_eq!(c.kind, ChangeKind::RustCode);
196 }
197
198 #[test]
199 fn classify_falls_through_to_other() {
200 let c = Change::classify(vec![
201 "/tmp/foo/README.md".into(),
202 "/tmp/foo/static/logo.png".into(),
203 ]);
204 assert_eq!(c.kind, ChangeKind::Other);
205 }
206
207 #[test]
208 fn classify_handles_cargo_lock_too() {
209 let c = Change::classify(vec!["/tmp/foo/Cargo.lock".into()]);
210 assert_eq!(c.kind, ChangeKind::CargoToml);
211 }
212
213 fn unique_tempdir() -> PathBuf {
218 static SEQ: AtomicU64 = AtomicU64::new(0);
219 let n = SEQ.fetch_add(1, Ordering::Relaxed);
220 let pid = std::process::id();
221 let p = std::env::temp_dir().join(format!("whisker-watcher-test-{pid}-{n}"));
222 std::fs::create_dir_all(&p).unwrap();
223 p
224 }
225
226 #[tokio::test]
227 async fn editing_a_rust_file_emits_a_rustcode_change() {
228 let dir = unique_tempdir();
229 std::fs::write(dir.join("lib.rs"), "fn old() {}").unwrap();
230
231 let (tx, mut rx) = mpsc::channel::<Change>(8);
232 let _watcher =
233 spawn_watcher(vec![dir.clone()], Duration::from_millis(120), tx).expect("watcher up");
234
235 tokio::time::sleep(Duration::from_millis(50)).await;
237 std::fs::write(dir.join("lib.rs"), "fn new() {}").unwrap();
238
239 let change = tokio::time::timeout(Duration::from_secs(3), rx.recv())
240 .await
241 .expect("debounced change should arrive within 3s")
242 .expect("channel closed");
243
244 assert_eq!(change.kind, ChangeKind::RustCode);
245 assert!(
246 change.paths.iter().any(|p| p.ends_with("lib.rs")),
247 "paths={:?}",
248 change.paths,
249 );
250
251 std::fs::remove_dir_all(&dir).ok();
252 }
253
254 #[tokio::test]
255 async fn editing_cargo_toml_classifies_as_cargo_toml() {
256 let dir = unique_tempdir();
257 std::fs::write(
258 dir.join("Cargo.toml"),
259 "[package]\nname = \"x\"\nversion = \"0.0.0\"\n",
260 )
261 .unwrap();
262
263 let (tx, mut rx) = mpsc::channel::<Change>(8);
264 let _watcher =
265 spawn_watcher(vec![dir.clone()], Duration::from_millis(120), tx).expect("watcher up");
266
267 tokio::time::sleep(Duration::from_millis(50)).await;
268 std::fs::write(
269 dir.join("Cargo.toml"),
270 "[package]\nname = \"x\"\nversion = \"0.0.1\"\n",
271 )
272 .unwrap();
273
274 let change = tokio::time::timeout(Duration::from_secs(3), rx.recv())
275 .await
276 .expect("change should arrive")
277 .expect("channel closed");
278 assert_eq!(change.kind, ChangeKind::CargoToml);
279
280 std::fs::remove_dir_all(&dir).ok();
281 }
282
283 fn synth_modify(path: impl Into<PathBuf>) -> Event {
288 use notify::event::{DataChange, ModifyKind};
289 Event {
290 kind: EventKind::Modify(ModifyKind::Data(DataChange::Content)),
291 paths: vec![path.into()],
292 attrs: notify::event::EventAttributes::new(),
293 }
294 }
295
296 #[tokio::test]
297 async fn rapid_edits_get_coalesced_into_one_change() {
298 let debounce = Duration::from_millis(100);
309 let (raw_tx, raw_rx) = std_mpsc::channel::<Event>();
310 let (tx, mut rx) = mpsc::channel::<Change>(8);
311 std::thread::spawn(move || debounce_loop(raw_rx, debounce, tx));
312
313 for i in 0..5 {
317 raw_tx
318 .send(synth_modify(PathBuf::from(format!("a{i}.rs"))))
319 .unwrap();
320 }
321
322 let first = tokio::time::timeout(Duration::from_secs(2), rx.recv())
323 .await
324 .expect("debounced change should arrive within 2s")
325 .expect("channel closed");
326 assert_eq!(first.kind, ChangeKind::RustCode);
327 assert_eq!(
328 first.paths.len(),
329 5,
330 "all 5 events should coalesce into one batch, got {:?}",
331 first.paths,
332 );
333
334 let second = tokio::time::timeout(debounce * 3, rx.recv()).await;
337 assert!(
338 second.is_err(),
339 "expected no second change after coalescing, got {second:?}",
340 );
341 }
342
343 #[tokio::test]
344 async fn events_outside_debounce_window_split_into_two_changes() {
345 let debounce = Duration::from_millis(80);
350 let (raw_tx, raw_rx) = std_mpsc::channel::<Event>();
351 let (tx, mut rx) = mpsc::channel::<Change>(8);
352 std::thread::spawn(move || debounce_loop(raw_rx, debounce, tx));
353
354 raw_tx.send(synth_modify("first.rs")).unwrap();
355 let first = tokio::time::timeout(Duration::from_secs(2), rx.recv())
356 .await
357 .expect("first change should arrive")
358 .expect("channel closed");
359 assert_eq!(first.paths.len(), 1);
360
361 tokio::time::sleep(debounce * 3).await;
364 raw_tx.send(synth_modify("second.rs")).unwrap();
365 let second = tokio::time::timeout(Duration::from_secs(2), rx.recv())
366 .await
367 .expect("second change should arrive")
368 .expect("channel closed");
369 assert_eq!(second.paths.len(), 1);
370 assert!(second.paths[0].ends_with("second.rs"));
371 }
372}