1use std::{
2 path::{Path, PathBuf},
3 sync::mpsc::{self, Receiver},
4 thread,
5 time::Duration,
6};
7
8use anyhow::{Context, Result};
9use log::{debug, error, info, trace, warn};
10use notify_debouncer_full::{
11 new_debouncer,
12 notify::{EventKind, RecursiveMode},
13 DebouncedEvent,
14};
15
16pub struct FileWatcher {
17 debounce_seconds: u64,
18 retry_count: i32,
19}
20
21impl FileWatcher {
22 pub fn new(debounce_seconds: u64, retry_count: i32) -> Self {
23 Self {
24 debounce_seconds,
25 retry_count,
26 }
27 }
28
29 pub fn watch<F, P>(
30 &self,
31 path: &Path,
32 on_change: F,
33 is_path_ignored: P,
34 shutdown_rx: Option<Receiver<()>>,
35 ) -> Result<()>
36 where
37 F: Fn(&Vec<PathBuf>) -> Result<()>,
38 P: Fn(&Path) -> bool,
39 {
40 let (tx, rx) = mpsc::channel();
41
42 let mut debouncer = new_debouncer(Duration::from_secs(self.debounce_seconds), None, tx)?;
43
44 debouncer
45 .watch(path, RecursiveMode::Recursive)
46 .context("Failed to watch path")?;
47 info!("Watching for changes...");
48
49 loop {
50 if let Some(rx) = &shutdown_rx {
51 if rx.try_recv().is_ok() {
52 debug!("Received shutdown signal");
53 break;
54 }
55 }
56
57 match rx.recv_timeout(Duration::from_millis(100)) {
58 Ok(received) => match received {
59 Ok(events) => {
60 if let Err(e) = self.handle_events(events, &on_change, &is_path_ignored) {
61 error!("All retry attempts failed: {e}");
62 return Err(e);
63 }
64 }
65 Err(errors) => errors.iter().for_each(|error| error!("{error:?}")),
66 },
67 Err(mpsc::RecvTimeoutError::Timeout) => continue,
68 Err(mpsc::RecvTimeoutError::Disconnected) => break,
69 }
70 }
71
72 Ok(())
73 }
74
75 fn handle_events<F, P>(
76 &self,
77 events: Vec<DebouncedEvent>,
78 on_change: F,
79 is_path_ignored: P,
80 ) -> Result<()>
81 where
82 F: Fn(&Vec<PathBuf>) -> Result<()>,
83 P: Fn(&Path) -> bool,
84 {
85 trace!("Received notify events {{ events {events:?} }}");
86 let paths: Vec<_> = events
87 .iter()
88 .filter(|event| !matches!(event.kind, EventKind::Access(_) | EventKind::Other))
89 .flat_map(|event| event.paths.clone())
90 .filter(|path| !is_path_ignored(path))
91 .collect();
92
93 if !paths.is_empty() {
94 let mut retry_count = 0;
95 loop {
96 match on_change(&paths) {
97 Ok(()) => break,
98 Err(e) => {
99 if retry_count == self.retry_count {
100 return Err(e);
101 }
102 retry_count += 1;
103 warn!(
104 "Failed to commit changes. Retrying... ({}/{}).\nError: {:?}",
105 retry_count, self.retry_count, e
106 );
107 thread::sleep(RETRY_DELAY);
108 }
109 }
110 }
111 }
112 Ok(())
113 }
114}
115
116const RETRY_DELAY: Duration = Duration::from_secs(1);
117
118#[cfg(test)]
119mod tests {
120 use std::{
121 fs,
122 sync::{
123 atomic::{AtomicBool, AtomicU32, Ordering},
124 mpsc, Arc,
125 },
126 thread,
127 time::{Duration, Instant},
128 };
129
130 use anyhow::bail;
131 use notify_debouncer_full::notify::Event;
132 use testresult::TestResult;
133
134 use super::*;
135
136 #[test]
137 fn test_watcher_notify_error() -> TestResult {
138 let dir = tempfile::tempdir()?;
139 let path = dir.path().to_path_buf();
140
141 let watcher = FileWatcher::new(0, 2);
142
143 fs::remove_dir_all(&path)?;
145
146 let result = watcher.watch(&path, |_| Ok(()), |_path| false, None);
148 assert!(result.is_err());
149 let err = result.unwrap_err().to_string();
150
151 assert!(
152 err.contains("Failed to watch path"),
153 "Unexpected error message: {err}"
154 );
155
156 Ok(())
157 }
158
159 #[test]
160 fn test_watcher_callback_error() -> TestResult {
161 let temp_dir = tempfile::tempdir()?;
162 let test_file = temp_dir.path().join("test.txt");
163
164 let attempt_count = Arc::new(AtomicU32::new(0));
166 let attempt_count_clone = attempt_count.clone();
167
168 let watcher = FileWatcher::new(0, 2);
169 let (shutdown_tx, shutdown_rx) = mpsc::channel();
170
171 let temp_dir_path = temp_dir.path().to_owned();
173 let handle = thread::spawn(move || {
174 watcher.watch(
175 &temp_dir_path,
176 |_| {
177 attempt_count_clone.fetch_add(1, Ordering::SeqCst);
178 bail!("Mock callback error")
179 },
180 |_path| false,
181 Some(shutdown_rx),
182 )
183 });
184
185 thread::sleep(Duration::from_millis(100));
187
188 fs::write(&test_file, "initial content")?;
189
190 thread::sleep(Duration::from_secs(2));
192
193 let _ = shutdown_tx.send(());
194
195 match handle.join().expect("Thread panicked") {
196 Ok(_) => panic!("Expected an error from watcher"),
197 Err(e) => {
198 assert!(e.to_string().contains("Mock callback error"));
199 assert_eq!(
201 attempt_count.load(Ordering::SeqCst),
202 3,
203 "Expected 3 attempts (1 initial + 2 retries)"
204 );
205 }
206 }
207
208 Ok(())
209 }
210
211 #[test]
212 fn test_watcher_shutdown() -> Result<()> {
213 let temp_dir = tempfile::tempdir()?;
214 let test_file = temp_dir.path().join("test.txt");
215 let test_file_2 = temp_dir.path().join("test2.txt");
216 let temp_dir_path = temp_dir.path().to_owned();
217
218 let (shutdown_tx, shutdown_rx) = mpsc::channel();
220
221 let watcher = FileWatcher::new(1, 0);
222
223 let counter = Arc::new(AtomicU32::new(0));
225 let counter_clone = counter.clone();
226
227 let handle = thread::spawn(move || {
229 watcher.watch(
230 &temp_dir_path,
231 |_| {
232 counter_clone.fetch_add(1, Ordering::SeqCst);
233 Ok(())
234 },
235 |_| false,
236 Some(shutdown_rx),
237 )
238 });
239
240 thread::sleep(Duration::from_millis(100));
242
243 fs::write(test_file, "test content")?;
244
245 thread::sleep(Duration::from_secs(3));
247
248 let counter_before_shutdown = counter.load(Ordering::SeqCst);
250 assert!(counter_before_shutdown > 0);
251
252 shutdown_tx.send(())?;
253 handle.join().unwrap()?;
254
255 fs::write(test_file_2, "test content")?;
257 thread::sleep(Duration::from_secs(2));
258
259 assert_eq!(counter_before_shutdown, counter.load(Ordering::SeqCst));
261
262 Ok(())
263 }
264
265 #[test]
266 fn test_all_paths_ignored() -> Result<()> {
267 let was_called = AtomicBool::new(false);
268 let watcher = FileWatcher::new(0, 0);
269
270 let events = vec![
271 DebouncedEvent::new(
272 Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
273 Instant::now(),
274 ),
275 DebouncedEvent::new(
276 Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
277 Instant::now(),
278 ),
279 DebouncedEvent::new(
280 Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
281 Instant::now(),
282 ),
283 ];
284
285 watcher.handle_events(
286 events,
287 |_| {
288 was_called.store(true, Ordering::SeqCst);
289 Ok(())
290 },
291 |_| true,
292 )?;
293
294 assert!(!was_called.load(Ordering::SeqCst));
295
296 Ok(())
297 }
298}