spicex/
watcher.rs

1//! File system watching utilities for configuration files.
2
3use crate::error::{ConfigError, ConfigResult};
4use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::{mpsc, Arc, Mutex};
7use std::thread;
8use std::time::Duration;
9
10/// Type alias for configuration change callback functions.
11pub type ConfigChangeCallback = Box<dyn Fn() + Send + Sync>;
12
13/// Manages file system watching for configuration files.
14pub struct FileWatcher {
15    _watcher: RecommendedWatcher,
16    receiver: mpsc::Receiver<notify::Result<Event>>,
17    watched_files: Vec<PathBuf>,
18    callbacks: Arc<Mutex<Vec<ConfigChangeCallback>>>,
19    is_watching: bool,
20}
21
22impl FileWatcher {
23    /// Creates a new file watcher for the specified path.
24    pub fn new<P: AsRef<Path>>(path: P) -> ConfigResult<Self> {
25        let (sender, receiver) = mpsc::channel();
26
27        let mut watcher = notify::recommended_watcher(sender)
28            .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
29
30        let path_buf = path.as_ref().to_path_buf();
31        watcher
32            .watch(&path_buf, RecursiveMode::NonRecursive)
33            .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
34
35        Ok(Self {
36            _watcher: watcher,
37            receiver,
38            watched_files: vec![path_buf],
39            callbacks: Arc::new(Mutex::new(Vec::new())),
40            is_watching: false,
41        })
42    }
43
44    /// Creates a new file watcher without watching any files initially.
45    pub fn new_empty() -> ConfigResult<Self> {
46        let (sender, receiver) = mpsc::channel();
47
48        let watcher = notify::recommended_watcher(sender)
49            .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
50
51        Ok(Self {
52            _watcher: watcher,
53            receiver,
54            watched_files: Vec::new(),
55            callbacks: Arc::new(Mutex::new(Vec::new())),
56            is_watching: false,
57        })
58    }
59
60    /// Adds a file to be watched.
61    pub fn watch_file<P: AsRef<Path>>(&mut self, path: P) -> ConfigResult<()> {
62        let path_buf = path.as_ref().to_path_buf();
63
64        // Only watch if the file exists
65        if !path_buf.exists() {
66            return Err(ConfigError::FileWatch(format!(
67                "Cannot watch non-existent file: {}",
68                path_buf.display()
69            )));
70        }
71
72        self._watcher
73            .watch(&path_buf, RecursiveMode::NonRecursive)
74            .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
75
76        self.watched_files.push(path_buf);
77        Ok(())
78    }
79
80    /// Removes a file from being watched.
81    pub fn unwatch_file<P: AsRef<Path>>(&mut self, path: P) -> ConfigResult<()> {
82        let path_buf = path.as_ref().to_path_buf();
83
84        self._watcher
85            .unwatch(&path_buf)
86            .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
87
88        self.watched_files.retain(|p| p != &path_buf);
89        Ok(())
90    }
91
92    /// Gets the list of currently watched files.
93    pub fn watched_files(&self) -> &[PathBuf] {
94        &self.watched_files
95    }
96
97    /// Registers a callback to be called when configuration changes are detected.
98    pub fn on_config_change<F>(&self, callback: F) -> ConfigResult<()>
99    where
100        F: Fn() + Send + Sync + 'static,
101    {
102        let mut callbacks = self.callbacks.lock().map_err(|e| {
103            ConfigError::FileWatch(format!("Failed to acquire callback lock: {e}"))
104        })?;
105
106        callbacks.push(Box::new(callback));
107        Ok(())
108    }
109
110    /// Starts watching for file changes in a background thread.
111    /// This method spawns a background thread that monitors for file changes
112    /// and calls registered callbacks when changes are detected.
113    pub fn start_watching(&mut self) -> ConfigResult<()> {
114        if self.is_watching {
115            return Ok(()); // Already watching
116        }
117
118        let callbacks = Arc::clone(&self.callbacks);
119        let (_stop_sender, stop_receiver) = mpsc::channel::<()>();
120
121        // We need to create a new receiver since we can't clone the existing one
122        let (event_sender, event_receiver) = mpsc::channel();
123
124        // Replace the watcher with a new one that uses our new sender
125        let mut new_watcher = notify::recommended_watcher(event_sender)
126            .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
127
128        // Re-watch all previously watched files
129        for path in &self.watched_files {
130            new_watcher
131                .watch(path, RecursiveMode::NonRecursive)
132                .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
133        }
134
135        self._watcher = new_watcher;
136        self.is_watching = true;
137
138        // Spawn background thread for watching
139        thread::spawn(move || {
140            loop {
141                // Check if we should stop
142                if stop_receiver.try_recv().is_ok() {
143                    break;
144                }
145
146                // Check for file system events
147                match event_receiver.recv_timeout(Duration::from_millis(100)) {
148                    Ok(Ok(_event)) => {
149                        // File change detected, call all callbacks
150                        if let Ok(callbacks_guard) = callbacks.lock() {
151                            for callback in callbacks_guard.iter() {
152                                callback();
153                            }
154                        }
155                    }
156                    Ok(Err(_)) => {
157                        // Error in file watching, but continue
158                        continue;
159                    }
160                    Err(mpsc::RecvTimeoutError::Timeout) => {
161                        // No events, continue
162                        continue;
163                    }
164                    Err(mpsc::RecvTimeoutError::Disconnected) => {
165                        // Channel disconnected, stop watching
166                        break;
167                    }
168                }
169            }
170        });
171
172        Ok(())
173    }
174
175    /// Stops watching for file changes.
176    pub fn stop_watching(&mut self) {
177        self.is_watching = false;
178        // Note: In a full implementation, we'd send a stop signal to the background thread
179        // For now, the thread will detect disconnection and stop
180    }
181
182    /// Returns whether the watcher is currently active.
183    pub fn is_watching(&self) -> bool {
184        self.is_watching
185    }
186
187    /// Triggers all registered callbacks manually (for testing purposes).
188    #[cfg(test)]
189    pub fn trigger_callbacks_for_test(&self) {
190        if let Ok(callbacks_guard) = self.callbacks.lock() {
191            for callback in callbacks_guard.iter() {
192                callback();
193            }
194        }
195    }
196
197    /// Checks for file system events with a timeout.
198    /// This method is primarily for testing and manual polling.
199    /// For automatic reloading, use start_watching() instead.
200    pub fn check_for_changes(&self, timeout: Duration) -> ConfigResult<bool> {
201        match self.receiver.recv_timeout(timeout) {
202            Ok(Ok(_event)) => {
203                // Call callbacks when changes are detected
204                if let Ok(callbacks_guard) = self.callbacks.lock() {
205                    for callback in callbacks_guard.iter() {
206                        callback();
207                    }
208                }
209                Ok(true)
210            }
211            Ok(Err(e)) => Err(ConfigError::FileWatch(e.to_string())),
212            Err(mpsc::RecvTimeoutError::Timeout) => Ok(false),
213            Err(mpsc::RecvTimeoutError::Disconnected) => {
214                Err(ConfigError::FileWatch("Watcher disconnected".to_string()))
215            }
216        }
217    }
218
219    /// Blocks until a file change is detected.
220    /// This method is primarily for testing and manual polling.
221    /// For automatic reloading, use start_watching() instead.
222    pub fn wait_for_change(&self) -> ConfigResult<()> {
223        match self.receiver.recv() {
224            Ok(Ok(_event)) => {
225                // Call callbacks when changes are detected
226                if let Ok(callbacks_guard) = self.callbacks.lock() {
227                    for callback in callbacks_guard.iter() {
228                        callback();
229                    }
230                }
231                Ok(())
232            }
233            Ok(Err(e)) => Err(ConfigError::FileWatch(e.to_string())),
234            Err(_) => Err(ConfigError::FileWatch("Watcher disconnected".to_string())),
235        }
236    }
237}
238
239#[cfg(test)]
240mod tests {
241    use super::*;
242    use std::fs;
243    use std::sync::{Arc, Mutex};
244    use std::time::Duration;
245    use tempfile::TempDir;
246
247    #[test]
248    fn test_file_watcher_creation() {
249        let temp_dir = TempDir::new().unwrap();
250        let config_path = temp_dir.path().join("config.json");
251        fs::write(&config_path, "{}").unwrap();
252
253        let watcher = FileWatcher::new(&config_path);
254        assert!(watcher.is_ok());
255
256        let watcher = watcher.unwrap();
257        assert_eq!(watcher.watched_files().len(), 1);
258        assert_eq!(watcher.watched_files()[0], config_path);
259    }
260
261    #[test]
262    fn test_empty_file_watcher() {
263        let watcher = FileWatcher::new_empty();
264        assert!(watcher.is_ok());
265
266        let watcher = watcher.unwrap();
267        assert_eq!(watcher.watched_files().len(), 0);
268        assert!(!watcher.is_watching());
269    }
270
271    #[test]
272    fn test_watch_multiple_files() {
273        let temp_dir = TempDir::new().unwrap();
274        let config1 = temp_dir.path().join("config1.json");
275        let config2 = temp_dir.path().join("config2.yaml");
276
277        fs::write(&config1, "{}").unwrap();
278        fs::write(&config2, "key: value").unwrap();
279
280        let mut watcher = FileWatcher::new_empty().unwrap();
281
282        assert!(watcher.watch_file(&config1).is_ok());
283        assert!(watcher.watch_file(&config2).is_ok());
284
285        assert_eq!(watcher.watched_files().len(), 2);
286    }
287
288    #[test]
289    fn test_watch_nonexistent_file() {
290        let mut watcher = FileWatcher::new_empty().unwrap();
291        let nonexistent = PathBuf::from("/nonexistent/file.json");
292
293        let result = watcher.watch_file(&nonexistent);
294        assert!(result.is_err());
295        assert!(result
296            .unwrap_err()
297            .to_string()
298            .contains("Cannot watch non-existent file"));
299    }
300
301    #[test]
302    fn test_callback_registration() {
303        let temp_dir = TempDir::new().unwrap();
304        let config_path = temp_dir.path().join("config.json");
305        fs::write(&config_path, "{}").unwrap();
306
307        let watcher = FileWatcher::new(&config_path).unwrap();
308
309        let callback_called = Arc::new(Mutex::new(false));
310        let callback_called_clone = Arc::clone(&callback_called);
311
312        let result = watcher.on_config_change(move || {
313            *callback_called_clone.lock().unwrap() = true;
314        });
315
316        assert!(result.is_ok());
317    }
318
319    #[test]
320    fn test_unwatch_file() {
321        let temp_dir = TempDir::new().unwrap();
322        let config_path = temp_dir.path().join("config.json");
323        fs::write(&config_path, "{}").unwrap();
324
325        let mut watcher = FileWatcher::new(&config_path).unwrap();
326        assert_eq!(watcher.watched_files().len(), 1);
327
328        assert!(watcher.unwatch_file(&config_path).is_ok());
329        assert_eq!(watcher.watched_files().len(), 0);
330    }
331
332    #[test]
333    fn test_file_change_detection() {
334        let temp_dir = TempDir::new().unwrap();
335        let config_path = temp_dir.path().join("config.json");
336        fs::write(&config_path, r#"{"key": "value1"}"#).unwrap();
337
338        let watcher = FileWatcher::new(&config_path).unwrap();
339
340        // Modify the file
341        std::thread::spawn({
342            let config_path = config_path.clone();
343            move || {
344                std::thread::sleep(Duration::from_millis(50));
345                fs::write(&config_path, r#"{"key": "value2"}"#).unwrap();
346            }
347        });
348
349        // Check for changes with a reasonable timeout
350        let result = watcher.check_for_changes(Duration::from_millis(200));
351        assert!(result.is_ok());
352        // Note: The actual change detection depends on the file system and timing
353    }
354
355    #[test]
356    fn test_multiple_callbacks() {
357        let temp_dir = TempDir::new().unwrap();
358        let config_path = temp_dir.path().join("config.json");
359        fs::write(&config_path, "{}").unwrap();
360
361        let watcher = FileWatcher::new(&config_path).unwrap();
362
363        let callback1_called = Arc::new(Mutex::new(false));
364        let callback2_called = Arc::new(Mutex::new(false));
365
366        let callback1_called_clone = Arc::clone(&callback1_called);
367        let callback2_called_clone = Arc::clone(&callback2_called);
368
369        // Register multiple callbacks
370        watcher
371            .on_config_change(move || {
372                *callback1_called_clone.lock().unwrap() = true;
373            })
374            .unwrap();
375
376        watcher
377            .on_config_change(move || {
378                *callback2_called_clone.lock().unwrap() = true;
379            })
380            .unwrap();
381
382        // Simulate a file change by calling callbacks directly
383        // In a real scenario, this would be triggered by file system events
384        if let Ok(callbacks_guard) = watcher.callbacks.lock() {
385            for callback in callbacks_guard.iter() {
386                callback();
387            }
388        }
389
390        // Both callbacks should have been called
391        assert!(*callback1_called.lock().unwrap());
392        assert!(*callback2_called.lock().unwrap());
393    }
394
395    #[test]
396    fn test_start_stop_watching() {
397        let temp_dir = TempDir::new().unwrap();
398        let config_path = temp_dir.path().join("config.json");
399        fs::write(&config_path, "{}").unwrap();
400
401        let mut watcher = FileWatcher::new(&config_path).unwrap();
402        assert!(!watcher.is_watching());
403
404        // Start watching
405        watcher.start_watching().unwrap();
406        assert!(watcher.is_watching());
407
408        // Stop watching
409        watcher.stop_watching();
410        assert!(!watcher.is_watching());
411    }
412
413    #[test]
414    fn test_callback_error_handling() {
415        let temp_dir = TempDir::new().unwrap();
416        let config_path = temp_dir.path().join("config.json");
417        fs::write(&config_path, "{}").unwrap();
418
419        let watcher = FileWatcher::new(&config_path).unwrap();
420
421        // Register a callback that might panic (but shouldn't crash the system)
422        let result = watcher.on_config_change(|| {
423            // This callback doesn't panic, but tests the error handling path
424        });
425
426        assert!(result.is_ok());
427    }
428}