1use std::{
2 path::{Path, PathBuf},
3 sync::mpsc::{self, Receiver},
4 thread,
5 time::Duration,
6};
7
8use anyhow::{Context, Result};
9use log::{debug, error, info, trace, warn};
10use notify_debouncer_full::{
11 new_debouncer,
12 notify::{EventKind, RecursiveMode},
13 DebouncedEvent,
14};
15
16pub struct FileWatcher {
17 debounce_seconds: u64,
18 retry_count: i32,
19}
20
21impl FileWatcher {
22 pub fn new(debounce_seconds: u64, retry_count: i32) -> Self {
23 Self {
24 debounce_seconds,
25 retry_count,
26 }
27 }
28
29 pub fn watch<F, P>(
30 &self,
31 path: &Path,
32 on_change: F,
33 is_path_ignored: P,
34 shutdown_rx: Option<Receiver<()>>,
35 ) -> Result<()>
36 where
37 F: Fn(&Vec<PathBuf>) -> Result<()>,
38 P: Fn(&Path) -> bool,
39 {
40 let (tx, rx) = mpsc::channel();
41
42 let mut debouncer = new_debouncer(Duration::from_secs(self.debounce_seconds), None, tx)?;
43
44 debouncer
45 .watch(path, RecursiveMode::Recursive)
46 .context("Failed to watch path")?;
47 info!("Watching for changes...");
48
49 loop {
50 if let Some(rx) = &shutdown_rx {
51 if rx.try_recv().is_ok() {
52 debug!("Received shutdown signal");
53 break;
54 }
55 }
56
57 match rx.recv_timeout(Duration::from_millis(100)) {
58 Ok(received) => match received {
59 Ok(events) => {
60 if let Err(e) = self.handle_events(events, &on_change, &is_path_ignored) {
61 error!("All retry attempts failed: {}", e);
62 return Err(e);
63 }
64 }
65 Err(errors) => errors.iter().for_each(|error| error!("{error:?}")),
66 },
67 Err(mpsc::RecvTimeoutError::Timeout) => continue,
68 Err(mpsc::RecvTimeoutError::Disconnected) => break,
69 }
70 }
71
72 Ok(())
73 }
74
75 fn handle_events<F, P>(
76 &self,
77 events: Vec<DebouncedEvent>,
78 on_change: F,
79 is_path_ignored: P,
80 ) -> Result<()>
81 where
82 F: Fn(&Vec<PathBuf>) -> Result<()>,
83 P: Fn(&Path) -> bool,
84 {
85 trace!("Received notify events {{ events {:?} }}", events);
86 let paths: Vec<_> = events
87 .iter()
88 .filter(|event| !matches!(event.kind, EventKind::Access(_) | EventKind::Other))
89 .flat_map(|event| event.paths.clone())
90 .filter(|path| !is_path_ignored(path))
91 .collect();
92
93 if !paths.is_empty() {
94 let mut retry_count = 0;
95 loop {
96 match on_change(&paths) {
97 Ok(()) => break,
98 Err(e) => {
99 if retry_count == self.retry_count {
100 return Err(e);
101 }
102 retry_count += 1;
103 warn!(
104 "Commiting changes failed, retrying ({}/{}): {}",
105 retry_count, self.retry_count, e
106 );
107 thread::sleep(RETRY_DELAY);
108 }
109 }
110 }
111 }
112 Ok(())
113 }
114}
115
116const RETRY_DELAY: Duration = Duration::from_secs(1);
117
118#[cfg(test)]
119mod tests {
120 use std::{
121 fs,
122 sync::{
123 atomic::{AtomicBool, AtomicU32, Ordering},
124 mpsc, Arc,
125 },
126 thread,
127 time::{Duration, Instant},
128 };
129
130 use anyhow::bail;
131 use notify_debouncer_full::notify::Event;
132 use testresult::TestResult;
133
134 use super::*;
135
136 #[test]
137 fn test_watcher_notify_error() -> TestResult {
138 let dir = tempfile::tempdir()?;
139 let path = dir.path().to_path_buf();
140
141 let watcher = FileWatcher::new(0, 2);
142
143 fs::remove_dir_all(&path)?;
145
146 let result = watcher.watch(&path, |_| Ok(()), |_path| false, None);
148 assert!(result.is_err());
149 let err = result.unwrap_err().to_string();
150
151 assert!(
152 err.contains("Failed to watch path"),
153 "Unexpected error message: {}",
154 err
155 );
156
157 Ok(())
158 }
159
160 #[test]
161 fn test_watcher_callback_error() -> TestResult {
162 let temp_dir = tempfile::tempdir()?;
163 let test_file = temp_dir.path().join("test.txt");
164
165 let attempt_count = Arc::new(AtomicU32::new(0));
167 let attempt_count_clone = attempt_count.clone();
168
169 let watcher = FileWatcher::new(0, 2);
170 let (shutdown_tx, shutdown_rx) = mpsc::channel();
171
172 let temp_dir_path = temp_dir.path().to_owned();
174 let handle = thread::spawn(move || {
175 watcher.watch(
176 &temp_dir_path,
177 |_| {
178 attempt_count_clone.fetch_add(1, Ordering::SeqCst);
179 bail!("Mock callback error")
180 },
181 |_path| false,
182 Some(shutdown_rx),
183 )
184 });
185
186 thread::sleep(Duration::from_millis(100));
188
189 fs::write(&test_file, "initial content")?;
190
191 thread::sleep(Duration::from_secs(2));
193
194 let _ = shutdown_tx.send(());
195
196 match handle.join().expect("Thread panicked") {
197 Ok(_) => panic!("Expected an error from watcher"),
198 Err(e) => {
199 assert!(e.to_string().contains("Mock callback error"));
200 assert_eq!(
202 attempt_count.load(Ordering::SeqCst),
203 3,
204 "Expected 3 attempts (1 initial + 2 retries)"
205 );
206 }
207 }
208
209 Ok(())
210 }
211
212 #[test]
213 fn test_watcher_shutdown() -> Result<()> {
214 let temp_dir = tempfile::tempdir()?;
215 let test_file = temp_dir.path().join("test.txt");
216 let test_file_2 = temp_dir.path().join("test2.txt");
217 let temp_dir_path = temp_dir.path().to_owned();
218
219 let (shutdown_tx, shutdown_rx) = mpsc::channel();
221
222 let watcher = FileWatcher::new(1, 0);
223
224 let counter = Arc::new(AtomicU32::new(0));
226 let counter_clone = counter.clone();
227
228 let handle = thread::spawn(move || {
230 watcher.watch(
231 &temp_dir_path,
232 |_| {
233 counter_clone.fetch_add(1, Ordering::SeqCst);
234 Ok(())
235 },
236 |_| false,
237 Some(shutdown_rx),
238 )
239 });
240
241 thread::sleep(Duration::from_millis(100));
243
244 fs::write(test_file, "test content")?;
245
246 thread::sleep(Duration::from_secs(3));
248
249 let counter_before_shutdown = counter.load(Ordering::SeqCst);
251 assert!(counter_before_shutdown > 0);
252
253 shutdown_tx.send(())?;
254 handle.join().unwrap()?;
255
256 fs::write(test_file_2, "test content")?;
258 thread::sleep(Duration::from_secs(2));
259
260 assert_eq!(counter_before_shutdown, counter.load(Ordering::SeqCst));
262
263 Ok(())
264 }
265
266 #[test]
267 fn test_all_paths_ignored() -> Result<()> {
268 let was_called = AtomicBool::new(false);
269 let watcher = FileWatcher::new(0, 0);
270
271 let events = vec![
272 DebouncedEvent::new(
273 Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
274 Instant::now(),
275 ),
276 DebouncedEvent::new(
277 Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
278 Instant::now(),
279 ),
280 DebouncedEvent::new(
281 Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
282 Instant::now(),
283 ),
284 ];
285
286 watcher.handle_events(
287 events,
288 |_| {
289 was_called.store(true, Ordering::SeqCst);
290 Ok(())
291 },
292 |_| true,
293 )?;
294
295 assert!(!was_called.load(Ordering::SeqCst));
296
297 Ok(())
298 }
299}