gitwatch_rs/
watcher.rs

1use std::{
2    path::{Path, PathBuf},
3    sync::mpsc::{self, Receiver},
4    thread,
5    time::Duration,
6};
7
8use anyhow::{Context, Result};
9use log::{debug, error, info, trace, warn};
10use notify_debouncer_full::{
11    new_debouncer,
12    notify::{EventKind, RecursiveMode},
13    DebouncedEvent,
14};
15
16pub struct FileWatcher {
17    debounce_seconds: u64,
18    retry_count: i32,
19}
20
21impl FileWatcher {
22    pub fn new(debounce_seconds: u64, retry_count: i32) -> Self {
23        Self {
24            debounce_seconds,
25            retry_count,
26        }
27    }
28
29    pub fn watch<F, P>(
30        &self,
31        path: &Path,
32        on_change: F,
33        is_path_ignored: P,
34        shutdown_rx: Option<Receiver<()>>,
35    ) -> Result<()>
36    where
37        F: Fn(&Vec<PathBuf>) -> Result<()>,
38        P: Fn(&Path) -> bool,
39    {
40        let (tx, rx) = mpsc::channel();
41
42        let mut debouncer = new_debouncer(Duration::from_secs(self.debounce_seconds), None, tx)?;
43
44        debouncer
45            .watch(path, RecursiveMode::Recursive)
46            .context("Failed to watch path")?;
47        info!("Watching for changes...");
48
49        loop {
50            if let Some(rx) = &shutdown_rx {
51                if rx.try_recv().is_ok() {
52                    debug!("Received shutdown signal");
53                    break;
54                }
55            }
56
57            match rx.recv_timeout(Duration::from_millis(100)) {
58                Ok(received) => match received {
59                    Ok(events) => {
60                        if let Err(e) = self.handle_events(events, &on_change, &is_path_ignored) {
61                            error!("All retry attempts failed: {}", e);
62                            return Err(e);
63                        }
64                    }
65                    Err(errors) => errors.iter().for_each(|error| error!("{error:?}")),
66                },
67                Err(mpsc::RecvTimeoutError::Timeout) => continue,
68                Err(mpsc::RecvTimeoutError::Disconnected) => break,
69            }
70        }
71
72        Ok(())
73    }
74
75    fn handle_events<F, P>(
76        &self,
77        events: Vec<DebouncedEvent>,
78        on_change: F,
79        is_path_ignored: P,
80    ) -> Result<()>
81    where
82        F: Fn(&Vec<PathBuf>) -> Result<()>,
83        P: Fn(&Path) -> bool,
84    {
85        trace!("Received notify events {{ events {:?} }}", events);
86        let paths: Vec<_> = events
87            .iter()
88            .filter(|event| !matches!(event.kind, EventKind::Access(_) | EventKind::Other))
89            .flat_map(|event| event.paths.clone())
90            .filter(|path| !is_path_ignored(path))
91            .collect();
92
93        if !paths.is_empty() {
94            let mut retry_count = 0;
95            loop {
96                match on_change(&paths) {
97                    Ok(()) => break,
98                    Err(e) => {
99                        if retry_count == self.retry_count {
100                            return Err(e);
101                        }
102                        retry_count += 1;
103                        warn!(
104                            "Commiting changes failed, retrying ({}/{}): {}",
105                            retry_count, self.retry_count, e
106                        );
107                        thread::sleep(RETRY_DELAY);
108                    }
109                }
110            }
111        }
112        Ok(())
113    }
114}
115
116const RETRY_DELAY: Duration = Duration::from_secs(1);
117
118#[cfg(test)]
119mod tests {
120    use std::{
121        fs,
122        sync::{
123            atomic::{AtomicBool, AtomicU32, Ordering},
124            mpsc, Arc,
125        },
126        thread,
127        time::{Duration, Instant},
128    };
129
130    use anyhow::bail;
131    use notify_debouncer_full::notify::Event;
132    use testresult::TestResult;
133
134    use super::*;
135
136    #[test]
137    fn test_watcher_notify_error() -> TestResult {
138        let dir = tempfile::tempdir()?;
139        let path = dir.path().to_path_buf();
140
141        let watcher = FileWatcher::new(0, 2);
142
143        // delete the directory being watched
144        fs::remove_dir_all(&path)?;
145
146        // try to watch - should get notify error
147        let result = watcher.watch(&path, |_| Ok(()), |_path| false, None);
148        assert!(result.is_err());
149        let err = result.unwrap_err().to_string();
150
151        assert!(
152            err.contains("Failed to watch path"),
153            "Unexpected error message: {}",
154            err
155        );
156
157        Ok(())
158    }
159
160    #[test]
161    fn test_watcher_callback_error() -> TestResult {
162        let temp_dir = tempfile::tempdir()?;
163        let test_file = temp_dir.path().join("test.txt");
164
165        // track number of retries
166        let attempt_count = Arc::new(AtomicU32::new(0));
167        let attempt_count_clone = attempt_count.clone();
168
169        let watcher = FileWatcher::new(0, 2);
170        let (shutdown_tx, shutdown_rx) = mpsc::channel();
171
172        // start watching in a separate thread
173        let temp_dir_path = temp_dir.path().to_owned();
174        let handle = thread::spawn(move || {
175            watcher.watch(
176                &temp_dir_path,
177                |_| {
178                    attempt_count_clone.fetch_add(1, Ordering::SeqCst);
179                    bail!("Mock callback error")
180                },
181                |_path| false,
182                Some(shutdown_rx),
183            )
184        });
185
186        // give the watcher time to initialize
187        thread::sleep(Duration::from_millis(100));
188
189        fs::write(&test_file, "initial content")?;
190
191        // Wait for all retries (2 retries * 1 second sleep between retries)
192        thread::sleep(Duration::from_secs(2));
193
194        let _ = shutdown_tx.send(());
195
196        match handle.join().expect("Thread panicked") {
197            Ok(_) => panic!("Expected an error from watcher"),
198            Err(e) => {
199                assert!(e.to_string().contains("Mock callback error"));
200                // Initial attempt + 2 retries = 3 total attempts
201                assert_eq!(
202                    attempt_count.load(Ordering::SeqCst),
203                    3,
204                    "Expected 3 attempts (1 initial + 2 retries)"
205                );
206            }
207        }
208
209        Ok(())
210    }
211
212    #[test]
213    fn test_watcher_shutdown() -> Result<()> {
214        let temp_dir = tempfile::tempdir()?;
215        let test_file = temp_dir.path().join("test.txt");
216        let test_file_2 = temp_dir.path().join("test2.txt");
217        let temp_dir_path = temp_dir.path().to_owned();
218
219        // create channels for shutdown signal
220        let (shutdown_tx, shutdown_rx) = mpsc::channel();
221
222        let watcher = FileWatcher::new(1, 0);
223
224        // Create a counter to track number of changes
225        let counter = Arc::new(AtomicU32::new(0));
226        let counter_clone = counter.clone();
227
228        // Spawn watcher in separate thread
229        let handle = thread::spawn(move || {
230            watcher.watch(
231                &temp_dir_path,
232                |_| {
233                    counter_clone.fetch_add(1, Ordering::SeqCst);
234                    Ok(())
235                },
236                |_| false,
237                Some(shutdown_rx),
238            )
239        });
240
241        // sleep briefly to ensure watcher is running
242        thread::sleep(Duration::from_millis(100));
243
244        fs::write(test_file, "test content")?;
245
246        // wait for debounce
247        thread::sleep(Duration::from_secs(3));
248
249        // verify that changes were detected before shutdown
250        let counter_before_shutdown = counter.load(Ordering::SeqCst);
251        assert!(counter_before_shutdown > 0);
252
253        shutdown_tx.send(())?;
254        handle.join().unwrap()?;
255
256        // create another file after shutdown
257        fs::write(test_file_2, "test content")?;
258        thread::sleep(Duration::from_secs(2));
259
260        // verify no additional changes were detected
261        assert_eq!(counter_before_shutdown, counter.load(Ordering::SeqCst));
262
263        Ok(())
264    }
265
266    #[test]
267    fn test_all_paths_ignored() -> Result<()> {
268        let was_called = AtomicBool::new(false);
269        let watcher = FileWatcher::new(0, 0);
270
271        let events = vec![
272            DebouncedEvent::new(
273                Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
274                Instant::now(),
275            ),
276            DebouncedEvent::new(
277                Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
278                Instant::now(),
279            ),
280            DebouncedEvent::new(
281                Event::new(EventKind::Any).add_path(PathBuf::from("test1.txt")),
282                Instant::now(),
283            ),
284        ];
285
286        watcher.handle_events(
287            events,
288            |_| {
289                was_called.store(true, Ordering::SeqCst);
290                Ok(())
291            },
292            |_| true,
293        )?;
294
295        assert!(!was_called.load(Ordering::SeqCst));
296
297        Ok(())
298    }
299}