Skip to main content

syncor_core/watch/
watcher.rs

1use std::path::PathBuf;
2use std::time::Duration;
3
4use notify::{RecommendedWatcher, RecursiveMode, Watcher};
5use tokio::sync::mpsc;
6
7/// An event emitted by the debounced watcher.
8#[derive(Debug, Clone)]
9pub struct WatchEvent {
10    pub changed: bool,
11}
12
13/// A file-system watcher that debounces rapid change events.
14pub struct DebouncedWatcher {
15    // Holds the underlying watcher so it is not dropped prematurely.
16    _watcher: RecommendedWatcher,
17}
18
19impl DebouncedWatcher {
20    /// Start watching `path` recursively.
21    ///
22    /// A single [`WatchEvent`] is sent to `sender` after `debounce_duration`
23    /// has elapsed with no further fs events.
24    pub fn start(
25        path: PathBuf,
26        debounce_duration: Duration,
27        sender: mpsc::Sender<WatchEvent>,
28    ) -> notify::Result<Self> {
29        // Internal channel: the notify callback sends `()` each time an fs
30        // event arrives; the tokio task drains the channel and debounces.
31        let (raw_tx, mut raw_rx) = mpsc::channel::<()>(64);
32
33        let watcher_tx = raw_tx.clone();
34        let mut watcher =
35            notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
36                if res.is_ok() {
37                    // Best-effort: ignore send errors (task may have exited).
38                    let _ = watcher_tx.try_send(());
39                }
40            })?;
41
42        watcher.watch(&path, RecursiveMode::Recursive)?;
43
44        // Spawn the debounce task.
45        tokio::spawn(async move {
46            loop {
47                // Wait for the first raw event.
48                if raw_rx.recv().await.is_none() {
49                    // Channel closed; watcher was dropped.
50                    break;
51                }
52
53                // Drain all events that arrive within the debounce window.
54                loop {
55                    match tokio::time::timeout(debounce_duration, raw_rx.recv()).await {
56                        // Another event arrived before the timeout — reset the window.
57                        Ok(Some(())) => continue,
58                        // Channel closed.
59                        Ok(None) => {
60                            // Still emit the event before exiting.
61                            let _ = sender.send(WatchEvent { changed: true }).await;
62                            return;
63                        }
64                        // Timeout elapsed with no further events — debounce complete.
65                        Err(_elapsed) => break,
66                    }
67                }
68
69                // Send the debounced event; stop if the receiver is gone.
70                if sender.send(WatchEvent { changed: true }).await.is_err() {
71                    break;
72                }
73            }
74        });
75
76        Ok(Self { _watcher: watcher })
77    }
78}