gitwatch_rs/
watcher.rs

1use std::{
2    path::{Path, PathBuf},
3    sync::mpsc::{self, Receiver},
4    thread,
5    time::Duration,
6};
7
8use anyhow::{Context, Result};
9use log::{debug, error, info, trace, warn};
10use notify_debouncer_full::{
11    new_debouncer,
12    notify::{EventKind, RecursiveMode},
13    DebouncedEvent,
14};
15
16pub struct FileWatcher {
17    debounce_seconds: u64,
18    retry_count: i32,
19}
20
21impl FileWatcher {
22    pub fn new(debounce_seconds: u64, retry_count: i32) -> Self {
23        Self {
24            debounce_seconds,
25            retry_count,
26        }
27    }
28
29    pub fn watch<F, P>(
30        &self,
31        path: &Path,
32        on_change: F,
33        is_path_ignored: P,
34        shutdown_rx: Option<Receiver<()>>,
35    ) -> Result<()>
36    where
37        F: Fn(&Vec<PathBuf>) -> Result<()>,
38        P: Fn(&Path) -> bool,
39    {
40        let (tx, rx) = mpsc::channel();
41
42        let mut debouncer = new_debouncer(Duration::from_secs(self.debounce_seconds), None, tx)?;
43
44        debouncer
45            .watch(path, RecursiveMode::Recursive)
46            .context("Failed to watch path")?;
47        info!("Watching for changes...");
48
49        loop {
50            if let Some(rx) = &shutdown_rx {
51                if rx.try_recv().is_ok() {
52                    debug!("Received shutdown signal");
53                    break;
54                }
55            }
56
57            match rx.recv_timeout(Duration::from_millis(100)) {
58                Ok(received) => match received {
59                    Ok(events) => {
60                        if let Err(e) = self.handle_events(events, &on_change, &is_path_ignored) {
61                            error!("All retry attempts failed: {e}");
62                            return Err(e);
63                        }
64                    }
65                    Err(errors) => errors.iter().for_each(|error| error!("{error:?}")),
66                },
67                Err(mpsc::RecvTimeoutError::Timeout) => continue,
68                Err(mpsc::RecvTimeoutError::Disconnected) => break,
69            }
70        }
71
72        Ok(())
73    }
74
75    fn handle_events<F, P>(
76        &self,
77        events: Vec<DebouncedEvent>,
78        on_change: F,
79        is_path_ignored: P,
80    ) -> Result<()>
81    where
82        F: Fn(&Vec<PathBuf>) -> Result<()>,
83        P: Fn(&Path) -> bool,
84    {
85        trace!("Received notify events {{ events {events:?} }}");
86        let paths: Vec<_> = events
87            .iter()
88            .filter(|event| !matches!(event.kind, EventKind::Access(_) | EventKind::Other))
89            .flat_map(|event| event.paths.clone())
90            .filter(|path| !is_path_ignored(path))
91            .collect();
92
93        if !paths.is_empty() {
94            let mut retry_count = 0;
95            loop {
96                match on_change(&paths) {
97                    Ok(()) => break,
98                    Err(e) => {
99                        if retry_count == self.retry_count {
100                            return Err(e);
101                        }
102                        retry_count += 1;
103                        warn!(
104                            "Failed to commit changes. Retrying... ({}/{}).\nError: {:?}",
105                            retry_count, self.retry_count, e
106                        );
107                        thread::sleep(RETRY_DELAY);
108                    }
109                }
110            }
111        }
112        Ok(())
113    }
114}
115
116const RETRY_DELAY: Duration = Duration::from_secs(1);
117
118#[cfg(test)]
119mod tests {
120    use std::{
121        fs,
122        sync::{
123            atomic::{AtomicBool, AtomicU32, Ordering},
124            mpsc, Arc,
125        },
126        thread,
127        time::{Duration, Instant},
128    };
129
130    use anyhow::bail;
131    use notify_debouncer_full::notify::Event;
132    use testresult::TestResult;
133
134    use super::*;
135
136    #[test]
137    fn test_watcher_notify_error() -> TestResult {
138        let dir = tempfile::tempdir()?;
139        let path = dir.path().to_path_buf();
140
141        let watcher = FileWatcher::new(0, 2);
142
143        // delete the directory being watched
144        fs::remove_dir_all(&path)?;
145
146        // try to watch - should get notify error
147        let result = watcher.watch(&path, |_| Ok(()), |_path| false, None);
148        assert!(result.is_err());
149        let err = result.unwrap_err().to_string();
150
151        assert!(
152            err.contains("Failed to watch path"),
153            "Unexpected error message: {err}"
154        );
155
156        Ok(())
157    }
158
159    #[test]
160    fn test_watcher_callback_error() -> TestResult {
161        let temp_dir = tempfile::tempdir()?;
162        let test_file = temp_dir.path().join("test.txt");
163
164        // track number of retries
165        let attempt_count = Arc::new(AtomicU32::new(0));
166        let attempt_count_clone = attempt_count.clone();
167
168        let watcher = FileWatcher::new(0, 2);
169        let (shutdown_tx, shutdown_rx) = mpsc::channel();
170
171        // start watching in a separate thread
172        let temp_dir_path = temp_dir.path().to_owned();
173        let handle = thread::spawn(move || {
174            watcher.watch(
175                &temp_dir_path,
176                |_| {
177                    attempt_count_clone.fetch_add(1, Ordering::SeqCst);
178                    bail!("Mock callback error")
179                },
180                |_path| false,
181                Some(shutdown_rx),
182            )
183        });
184
185        // give the watcher time to initialize
186        thread::sleep(Duration::from_millis(100));
187
188        fs::write(&test_file, "initial content")?;
189
190        // Wait for all retries (2 retries * 1 second sleep between retries)
191        thread::sleep(Duration::from_secs(2));
192
193        let _ = shutdown_tx.send(());
194
195        match handle.join().expect("Thread panicked") {
196            Ok(_) => panic!("Expected an error from watcher"),
197            Err(e) => {
198                assert!(e.to_string().contains("Mock callback error"));
199                // Initial attempt + 2 retries = 3 total attempts
200                assert_eq!(
201                    attempt_count.load(Ordering::SeqCst),
202                    3,
203                    "Expected 3 attempts (1 initial + 2 retries)"
204                );
205            }
206        }
207
208        Ok(())
209    }
210
211    #[test]
212    fn test_watcher_shutdown() -> Result<()> {
213        let temp_dir = tempfile::tempdir()?;
214        let test_file = temp_dir.path().join("test.txt");
215        let test_file_2 = temp_dir.path().join("test2.txt");
216        let temp_dir_path = temp_dir.path().to_owned();
217
218        // create channels for shutdown signal
219        let (shutdown_tx, shutdown_rx) = mpsc::channel();
220
221        let watcher = FileWatcher::new(1, 0);
222
223        // Create a counter to track number of changes
224        let counter = Arc::new(AtomicU32::new(0));
225        let counter_clone = counter.clone();
226
227        // Spawn watcher in separate thread
228        let handle = thread::spawn(move || {
229            watcher.watch(
230                &temp_dir_path,
231                |_| {
232                    counter_clone.fetch_add(1, Ordering::SeqCst);
233                    Ok(())
234                },
235                |_| false,
236                Some(shutdown_rx),
237            )
238        });
239
240        // sleep briefly to ensure watcher is running
241        thread::sleep(Duration::from_millis(100));
242
243        fs::write(test_file, "test content")?;
244
245        // wait for debounce
246        thread::sleep(Duration::from_secs(3));
247
248        // verify that changes were detected before shutdown
249        let counter_before_shutdown = counter.load(Ordering::SeqCst);
250        assert!(counter_before_shutdown > 0);
251
252        shutdown_tx.send(())?;
253        handle.join().unwrap()?;
254
255        // create another file after shutdown
256        fs::write(test_file_2, "test content")?;
257        thread::sleep(Duration::from_secs(2));
258
259        // verify no additional changes were detected
260        assert_eq!(counter_before_shutdown, counter.load(Ordering::SeqCst));
261
262        Ok(())
263    }
264
265    #[test]
266    fn test_all_paths_ignored() -> Result<()> {
267        let was_called = AtomicBool::new(false);
268        let watcher = FileWatcher::new(0, 0);
269
270        let events = vec![
271            DebouncedEvent::new(
272                Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
273                Instant::now(),
274            ),
275            DebouncedEvent::new(
276                Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
277                Instant::now(),
278            ),
279            DebouncedEvent::new(
280                Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
281                Instant::now(),
282            ),
283        ];
284
285        watcher.handle_events(
286            events,
287            |_| {
288                was_called.store(true, Ordering::SeqCst);
289                Ok(())
290            },
291            |_| true,
292        )?;
293
294        assert!(!was_called.load(Ordering::SeqCst));
295
296        Ok(())
297    }
298}