syncor_core/watch/watcher.rs
1use std::path::PathBuf;
2use std::time::Duration;
3
4use notify::{RecommendedWatcher, RecursiveMode, Watcher};
5use tokio::sync::mpsc;
6
7/// An event emitted by the debounced watcher.
8#[derive(Debug, Clone)]
9pub struct WatchEvent {
10 pub changed: bool,
11}
12
13/// A file-system watcher that debounces rapid change events.
14pub struct DebouncedWatcher {
15 // Holds the underlying watcher so it is not dropped prematurely.
16 _watcher: RecommendedWatcher,
17}
18
19impl DebouncedWatcher {
20 /// Start watching `path` recursively.
21 ///
22 /// A single [`WatchEvent`] is sent to `sender` after `debounce_duration`
23 /// has elapsed with no further fs events.
24 pub fn start(
25 path: PathBuf,
26 debounce_duration: Duration,
27 sender: mpsc::Sender<WatchEvent>,
28 ) -> notify::Result<Self> {
29 // Internal channel: the notify callback sends `()` each time an fs
30 // event arrives; the tokio task drains the channel and debounces.
31 let (raw_tx, mut raw_rx) = mpsc::channel::<()>(64);
32
33 let watcher_tx = raw_tx.clone();
34 let mut watcher =
35 notify::recommended_watcher(move |res: notify::Result<notify::Event>| {
36 if res.is_ok() {
37 // Best-effort: ignore send errors (task may have exited).
38 let _ = watcher_tx.try_send(());
39 }
40 })?;
41
42 watcher.watch(&path, RecursiveMode::Recursive)?;
43
44 // Spawn the debounce task.
45 tokio::spawn(async move {
46 loop {
47 // Wait for the first raw event.
48 if raw_rx.recv().await.is_none() {
49 // Channel closed; watcher was dropped.
50 break;
51 }
52
53 // Drain all events that arrive within the debounce window.
54 loop {
55 match tokio::time::timeout(debounce_duration, raw_rx.recv()).await {
56 // Another event arrived before the timeout — reset the window.
57 Ok(Some(())) => continue,
58 // Channel closed.
59 Ok(None) => {
60 // Still emit the event before exiting.
61 let _ = sender.send(WatchEvent { changed: true }).await;
62 return;
63 }
64 // Timeout elapsed with no further events — debounce complete.
65 Err(_elapsed) => break,
66 }
67 }
68
69 // Send the debounced event; stop if the receiver is gone.
70 if sender.send(WatchEvent { changed: true }).await.is_err() {
71 break;
72 }
73 }
74 });
75
76 Ok(Self { _watcher: watcher })
77 }
78}