Skip to main content

whisker_dev_server/
watcher.rs

1//! File watcher + change classifier for the dev loop.
2//!
3//! Wraps `notify` so the rest of the dev server (the builder in I4e
4//! and the Tier 1 patcher in I4g) doesn't have to deal with raw
5//! filesystem events. Three things happen here:
6//!
7//! 1. **Recursive watch** of a package root.
8//! 2. **Debounce** raw notify events for ~200 ms — a single file save
9//!    can produce 3-5 notify events on macOS (atomic rename + chmod
10//!    + …); we coalesce them into one [`Change`].
11//! 3. **Classify** the affected paths into [`ChangeKind`] so callers
12//!    can pick a rebuild strategy (Tier 2 cold rebuild for
13//!    `RustCode`, full restart for `CargoToml`, etc).
14
15use anyhow::{Context, Result};
16use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
17use std::collections::BTreeSet;
18use std::path::PathBuf;
19use std::sync::mpsc as std_mpsc;
20use std::time::{Duration, Instant};
21use tokio::sync::mpsc;
22
23/// What sort of change happened. The classifier picks the *most
24/// disruptive* category among the paths in a single debounced batch
25/// (Cargo.toml beats Rust code beats anything else).
26#[derive(Debug, Clone, Copy, PartialEq, Eq)]
27pub enum ChangeKind {
28    /// `.rs` files inside the watched tree. Tier 2 cold rebuild
29    /// today; Tier 1 subsecond patch once I4g lands.
30    RustCode,
31    /// `Cargo.toml` (or `Cargo.lock`) — needs a full
32    /// `cargo build` and re-launch; subsecond can't reload deps.
33    CargoToml,
34    /// Anything else (assets, README edits while watching too wide a
35    /// tree, …). Callers may choose to ignore.
36    Other,
37}
38
39/// One debounced change batch.
40#[derive(Debug, Clone)]
41pub struct Change {
42    pub kind: ChangeKind,
43    pub paths: Vec<PathBuf>,
44}
45
46impl Change {
47    /// Classify a batch of paths. The most disruptive category wins.
48    pub fn classify(paths: Vec<PathBuf>) -> Self {
49        let kind = if paths.iter().any(|p| {
50            matches!(
51                p.file_name().and_then(|n| n.to_str()),
52                Some("Cargo.toml") | Some("Cargo.lock"),
53            )
54        }) {
55            ChangeKind::CargoToml
56        } else if paths
57            .iter()
58            .any(|p| p.extension().and_then(|e| e.to_str()) == Some("rs"))
59        {
60            ChangeKind::RustCode
61        } else {
62            ChangeKind::Other
63        };
64        Self { kind, paths }
65    }
66}
67
68/// Spawn a recursive watcher rooted at each path in `roots`.
69/// Debounced [`Change`] batches arrive on `tx`. The returned
70/// [`RecommendedWatcher`] keeps the OS watches alive — drop it to
71/// stop watching.
72///
73/// Roots that fail to attach (don't exist, or notify rejects them)
74/// log a warning and are skipped; the watcher still returns as long
75/// as at least one root attached. Sub-crates without a `src/` are
76/// the common case — `cargo metadata` lists every workspace member,
77/// not every member ships a `src/` (proc-macro-only crates, virtual
78/// manifest stubs, …).
79pub fn spawn_watcher(
80    roots: Vec<PathBuf>,
81    debounce: Duration,
82    tx: mpsc::Sender<Change>,
83) -> Result<RecommendedWatcher> {
84    if roots.is_empty() {
85        anyhow::bail!("spawn_watcher: no roots to watch");
86    }
87    let (raw_tx, raw_rx) = std_mpsc::channel::<Event>();
88    let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| {
89        if let Ok(ev) = res {
90            // Drop send errors silently — receiver gone means the
91            // dev loop is shutting down.
92            let _ = raw_tx.send(ev);
93        }
94    })
95    .context("create notify watcher")?;
96    let mut attached = 0;
97    for root in &roots {
98        match watcher.watch(root, RecursiveMode::Recursive) {
99            Ok(()) => attached += 1,
100            Err(e) => whisker_build::ui::warn(format!("skip watch {}: {e}", root.display())),
101        }
102    }
103    if attached == 0 {
104        anyhow::bail!(
105            "spawn_watcher: no roots successfully attached (of {})",
106            roots.len()
107        );
108    }
109
110    // Debounce loop: separate OS thread because notify's callback is
111    // synchronous and the std mpsc::Receiver isn't async-friendly.
112    std::thread::Builder::new()
113        .name("whisker-dev-watch".into())
114        .spawn(move || debounce_loop(raw_rx, debounce, tx))
115        .context("spawn debounce thread")?;
116
117    Ok(watcher)
118}
119
120fn debounce_loop(raw_rx: std_mpsc::Receiver<Event>, debounce: Duration, tx: mpsc::Sender<Change>) {
121    let mut pending: BTreeSet<PathBuf> = BTreeSet::new();
122    let mut deadline: Option<Instant> = None;
123
124    loop {
125        let block_for = match deadline {
126            // We have pending events — wait at most until the deadline
127            // for any new event to coalesce.
128            Some(d) => d.saturating_duration_since(Instant::now()),
129            // Idle — block indefinitely for the next event.
130            None => Duration::from_secs(60 * 60),
131        };
132
133        match raw_rx.recv_timeout(block_for) {
134            Ok(ev) if is_interesting(&ev.kind) => {
135                for p in ev.paths {
136                    pending.insert(p);
137                }
138                deadline = Some(Instant::now() + debounce);
139            }
140            Ok(_) => {} // notify event we don't care about
141            Err(std_mpsc::RecvTimeoutError::Timeout) => {
142                if pending.is_empty() {
143                    continue;
144                }
145                let paths: Vec<_> = std::mem::take(&mut pending).into_iter().collect();
146                deadline = None;
147                let change = Change::classify(paths);
148                if tx.blocking_send(change).is_err() {
149                    return; // receiver dropped, we're done
150                }
151            }
152            Err(std_mpsc::RecvTimeoutError::Disconnected) => return,
153        }
154    }
155}
156
157fn is_interesting(k: &EventKind) -> bool {
158    use notify::event::{CreateKind, ModifyKind, RemoveKind};
159    matches!(
160        k,
161        EventKind::Create(CreateKind::File)
162            | EventKind::Modify(ModifyKind::Data(_))
163            | EventKind::Modify(ModifyKind::Any)
164            | EventKind::Modify(ModifyKind::Name(_))
165            | EventKind::Remove(RemoveKind::File)
166    )
167}
168
169// ============================================================================
170// Tests
171// ============================================================================
172
173#[cfg(test)]
174mod tests {
175    use super::*;
176    use std::sync::atomic::{AtomicU64, Ordering};
177
178    // ----- classifier (pure) -------------------------------------------
179
180    #[test]
181    fn classify_picks_cargo_toml_over_rust_code() {
182        let c = Change::classify(vec![
183            "/tmp/foo/src/lib.rs".into(),
184            "/tmp/foo/Cargo.toml".into(),
185        ]);
186        assert_eq!(c.kind, ChangeKind::CargoToml);
187    }
188
189    #[test]
190    fn classify_picks_rust_code_when_no_cargo_toml() {
191        let c = Change::classify(vec![
192            "/tmp/foo/src/lib.rs".into(),
193            "/tmp/foo/src/app.rs".into(),
194        ]);
195        assert_eq!(c.kind, ChangeKind::RustCode);
196    }
197
198    #[test]
199    fn classify_falls_through_to_other() {
200        let c = Change::classify(vec![
201            "/tmp/foo/README.md".into(),
202            "/tmp/foo/static/logo.png".into(),
203        ]);
204        assert_eq!(c.kind, ChangeKind::Other);
205    }
206
207    #[test]
208    fn classify_handles_cargo_lock_too() {
209        let c = Change::classify(vec!["/tmp/foo/Cargo.lock".into()]);
210        assert_eq!(c.kind, ChangeKind::CargoToml);
211    }
212
213    // ----- end-to-end (notify + debounce) ------------------------------
214
215    /// Each test gets its own tempdir so concurrent test runs don't
216    /// see each other's events.
217    fn unique_tempdir() -> PathBuf {
218        static SEQ: AtomicU64 = AtomicU64::new(0);
219        let n = SEQ.fetch_add(1, Ordering::Relaxed);
220        let pid = std::process::id();
221        let p = std::env::temp_dir().join(format!("whisker-watcher-test-{pid}-{n}"));
222        std::fs::create_dir_all(&p).unwrap();
223        p
224    }
225
226    #[tokio::test]
227    async fn editing_a_rust_file_emits_a_rustcode_change() {
228        let dir = unique_tempdir();
229        std::fs::write(dir.join("lib.rs"), "fn old() {}").unwrap();
230
231        let (tx, mut rx) = mpsc::channel::<Change>(8);
232        let _watcher =
233            spawn_watcher(vec![dir.clone()], Duration::from_millis(120), tx).expect("watcher up");
234
235        // Give notify a moment to register the watch.
236        tokio::time::sleep(Duration::from_millis(50)).await;
237        std::fs::write(dir.join("lib.rs"), "fn new() {}").unwrap();
238
239        let change = tokio::time::timeout(Duration::from_secs(3), rx.recv())
240            .await
241            .expect("debounced change should arrive within 3s")
242            .expect("channel closed");
243
244        assert_eq!(change.kind, ChangeKind::RustCode);
245        assert!(
246            change.paths.iter().any(|p| p.ends_with("lib.rs")),
247            "paths={:?}",
248            change.paths,
249        );
250
251        std::fs::remove_dir_all(&dir).ok();
252    }
253
254    #[tokio::test]
255    async fn editing_cargo_toml_classifies_as_cargo_toml() {
256        let dir = unique_tempdir();
257        std::fs::write(
258            dir.join("Cargo.toml"),
259            "[package]\nname = \"x\"\nversion = \"0.0.0\"\n",
260        )
261        .unwrap();
262
263        let (tx, mut rx) = mpsc::channel::<Change>(8);
264        let _watcher =
265            spawn_watcher(vec![dir.clone()], Duration::from_millis(120), tx).expect("watcher up");
266
267        tokio::time::sleep(Duration::from_millis(50)).await;
268        std::fs::write(
269            dir.join("Cargo.toml"),
270            "[package]\nname = \"x\"\nversion = \"0.0.1\"\n",
271        )
272        .unwrap();
273
274        let change = tokio::time::timeout(Duration::from_secs(3), rx.recv())
275            .await
276            .expect("change should arrive")
277            .expect("channel closed");
278        assert_eq!(change.kind, ChangeKind::CargoToml);
279
280        std::fs::remove_dir_all(&dir).ok();
281    }
282
283    /// Synthesise a `Modify(Data(Content))` event for the given path.
284    /// Used by the debounce tests so they don't depend on real
285    /// filesystem timings — the e2e flavor (`editing_a_rust_file_*`
286    /// above) already covers the notify-to-debouncer wiring.
287    fn synth_modify(path: impl Into<PathBuf>) -> Event {
288        use notify::event::{DataChange, ModifyKind};
289        Event {
290            kind: EventKind::Modify(ModifyKind::Data(DataChange::Content)),
291            paths: vec![path.into()],
292            attrs: notify::event::EventAttributes::new(),
293        }
294    }
295
296    #[tokio::test]
297    async fn rapid_edits_get_coalesced_into_one_change() {
298        // Drive `debounce_loop` directly with synthetic events
299        // instead of touching the filesystem + notify. The earlier
300        // e2e version was flaky on slow CI runners (#30/#33/#34):
301        // each `std::fs::write` + `tokio::time::sleep(20ms)` could
302        // stretch past the 150 ms debounce window, splitting the
303        // batch into two `Change`s and tripping the "expect no
304        // second change" assertion. Feeding events through the std
305        // channel keeps the test deterministic — the only timing
306        // dependency left is the debounce window itself, which is
307        // the thing under test.
308        let debounce = Duration::from_millis(100);
309        let (raw_tx, raw_rx) = std_mpsc::channel::<Event>();
310        let (tx, mut rx) = mpsc::channel::<Change>(8);
311        std::thread::spawn(move || debounce_loop(raw_rx, debounce, tx));
312
313        // 5 rapid events back-to-back. No inter-send sleep — every
314        // send hits the debouncer before the deadline fires, so they
315        // coalesce into one `Change`.
316        for i in 0..5 {
317            raw_tx
318                .send(synth_modify(PathBuf::from(format!("a{i}.rs"))))
319                .unwrap();
320        }
321
322        let first = tokio::time::timeout(Duration::from_secs(2), rx.recv())
323            .await
324            .expect("debounced change should arrive within 2s")
325            .expect("channel closed");
326        assert_eq!(first.kind, ChangeKind::RustCode);
327        assert_eq!(
328            first.paths.len(),
329            5,
330            "all 5 events should coalesce into one batch, got {:?}",
331            first.paths,
332        );
333
334        // After waiting > debounce window, no second change should
335        // appear — pending is drained, no new events fed.
336        let second = tokio::time::timeout(debounce * 3, rx.recv()).await;
337        assert!(
338            second.is_err(),
339            "expected no second change after coalescing, got {second:?}",
340        );
341    }
342
343    #[tokio::test]
344    async fn events_outside_debounce_window_split_into_two_changes() {
345        // Inverse of the coalesce test: events separated by more
346        // than the debounce window MUST surface as two distinct
347        // `Change`s. Keeps us honest if someone "fixes" the
348        // coalesce test by making the debouncer too greedy.
349        let debounce = Duration::from_millis(80);
350        let (raw_tx, raw_rx) = std_mpsc::channel::<Event>();
351        let (tx, mut rx) = mpsc::channel::<Change>(8);
352        std::thread::spawn(move || debounce_loop(raw_rx, debounce, tx));
353
354        raw_tx.send(synth_modify("first.rs")).unwrap();
355        let first = tokio::time::timeout(Duration::from_secs(2), rx.recv())
356            .await
357            .expect("first change should arrive")
358            .expect("channel closed");
359        assert_eq!(first.paths.len(), 1);
360
361        // Wait well past the debounce window so the deadline fires
362        // before the next event, then send a second event.
363        tokio::time::sleep(debounce * 3).await;
364        raw_tx.send(synth_modify("second.rs")).unwrap();
365        let second = tokio::time::timeout(Duration::from_secs(2), rx.recv())
366            .await
367            .expect("second change should arrive")
368            .expect("channel closed");
369        assert_eq!(second.paths.len(), 1);
370        assert!(second.paths[0].ends_with("second.rs"));
371    }
372}