1use crate::error::{ConfigError, ConfigResult};
4use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
5use std::path::{Path, PathBuf};
6use std::sync::{mpsc, Arc, Mutex};
7use std::thread;
8use std::time::Duration;
9
10pub type ConfigChangeCallback = Box<dyn Fn() + Send + Sync>;
12
13pub struct FileWatcher {
15 _watcher: RecommendedWatcher,
16 receiver: mpsc::Receiver<notify::Result<Event>>,
17 watched_files: Vec<PathBuf>,
18 callbacks: Arc<Mutex<Vec<ConfigChangeCallback>>>,
19 is_watching: bool,
20}
21
22impl FileWatcher {
23 pub fn new<P: AsRef<Path>>(path: P) -> ConfigResult<Self> {
25 let (sender, receiver) = mpsc::channel();
26
27 let mut watcher = notify::recommended_watcher(sender)
28 .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
29
30 let path_buf = path.as_ref().to_path_buf();
31 watcher
32 .watch(&path_buf, RecursiveMode::NonRecursive)
33 .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
34
35 Ok(Self {
36 _watcher: watcher,
37 receiver,
38 watched_files: vec![path_buf],
39 callbacks: Arc::new(Mutex::new(Vec::new())),
40 is_watching: false,
41 })
42 }
43
44 pub fn new_empty() -> ConfigResult<Self> {
46 let (sender, receiver) = mpsc::channel();
47
48 let watcher = notify::recommended_watcher(sender)
49 .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
50
51 Ok(Self {
52 _watcher: watcher,
53 receiver,
54 watched_files: Vec::new(),
55 callbacks: Arc::new(Mutex::new(Vec::new())),
56 is_watching: false,
57 })
58 }
59
60 pub fn watch_file<P: AsRef<Path>>(&mut self, path: P) -> ConfigResult<()> {
62 let path_buf = path.as_ref().to_path_buf();
63
64 if !path_buf.exists() {
66 return Err(ConfigError::FileWatch(format!(
67 "Cannot watch non-existent file: {}",
68 path_buf.display()
69 )));
70 }
71
72 self._watcher
73 .watch(&path_buf, RecursiveMode::NonRecursive)
74 .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
75
76 self.watched_files.push(path_buf);
77 Ok(())
78 }
79
80 pub fn unwatch_file<P: AsRef<Path>>(&mut self, path: P) -> ConfigResult<()> {
82 let path_buf = path.as_ref().to_path_buf();
83
84 self._watcher
85 .unwatch(&path_buf)
86 .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
87
88 self.watched_files.retain(|p| p != &path_buf);
89 Ok(())
90 }
91
92 pub fn watched_files(&self) -> &[PathBuf] {
94 &self.watched_files
95 }
96
97 pub fn on_config_change<F>(&self, callback: F) -> ConfigResult<()>
99 where
100 F: Fn() + Send + Sync + 'static,
101 {
102 let mut callbacks = self.callbacks.lock().map_err(|e| {
103 ConfigError::FileWatch(format!("Failed to acquire callback lock: {e}"))
104 })?;
105
106 callbacks.push(Box::new(callback));
107 Ok(())
108 }
109
110 pub fn start_watching(&mut self) -> ConfigResult<()> {
114 if self.is_watching {
115 return Ok(()); }
117
118 let callbacks = Arc::clone(&self.callbacks);
119 let (_stop_sender, stop_receiver) = mpsc::channel::<()>();
120
121 let (event_sender, event_receiver) = mpsc::channel();
123
124 let mut new_watcher = notify::recommended_watcher(event_sender)
126 .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
127
128 for path in &self.watched_files {
130 new_watcher
131 .watch(path, RecursiveMode::NonRecursive)
132 .map_err(|e| ConfigError::FileWatch(e.to_string()))?;
133 }
134
135 self._watcher = new_watcher;
136 self.is_watching = true;
137
138 thread::spawn(move || {
140 loop {
141 if stop_receiver.try_recv().is_ok() {
143 break;
144 }
145
146 match event_receiver.recv_timeout(Duration::from_millis(100)) {
148 Ok(Ok(_event)) => {
149 if let Ok(callbacks_guard) = callbacks.lock() {
151 for callback in callbacks_guard.iter() {
152 callback();
153 }
154 }
155 }
156 Ok(Err(_)) => {
157 continue;
159 }
160 Err(mpsc::RecvTimeoutError::Timeout) => {
161 continue;
163 }
164 Err(mpsc::RecvTimeoutError::Disconnected) => {
165 break;
167 }
168 }
169 }
170 });
171
172 Ok(())
173 }
174
175 pub fn stop_watching(&mut self) {
177 self.is_watching = false;
178 }
181
182 pub fn is_watching(&self) -> bool {
184 self.is_watching
185 }
186
187 #[cfg(test)]
189 pub fn trigger_callbacks_for_test(&self) {
190 if let Ok(callbacks_guard) = self.callbacks.lock() {
191 for callback in callbacks_guard.iter() {
192 callback();
193 }
194 }
195 }
196
197 pub fn check_for_changes(&self, timeout: Duration) -> ConfigResult<bool> {
201 match self.receiver.recv_timeout(timeout) {
202 Ok(Ok(_event)) => {
203 if let Ok(callbacks_guard) = self.callbacks.lock() {
205 for callback in callbacks_guard.iter() {
206 callback();
207 }
208 }
209 Ok(true)
210 }
211 Ok(Err(e)) => Err(ConfigError::FileWatch(e.to_string())),
212 Err(mpsc::RecvTimeoutError::Timeout) => Ok(false),
213 Err(mpsc::RecvTimeoutError::Disconnected) => {
214 Err(ConfigError::FileWatch("Watcher disconnected".to_string()))
215 }
216 }
217 }
218
219 pub fn wait_for_change(&self) -> ConfigResult<()> {
223 match self.receiver.recv() {
224 Ok(Ok(_event)) => {
225 if let Ok(callbacks_guard) = self.callbacks.lock() {
227 for callback in callbacks_guard.iter() {
228 callback();
229 }
230 }
231 Ok(())
232 }
233 Ok(Err(e)) => Err(ConfigError::FileWatch(e.to_string())),
234 Err(_) => Err(ConfigError::FileWatch("Watcher disconnected".to_string())),
235 }
236 }
237}
238
239#[cfg(test)]
240mod tests {
241 use super::*;
242 use std::fs;
243 use std::sync::{Arc, Mutex};
244 use std::time::Duration;
245 use tempfile::TempDir;
246
247 #[test]
248 fn test_file_watcher_creation() {
249 let temp_dir = TempDir::new().unwrap();
250 let config_path = temp_dir.path().join("config.json");
251 fs::write(&config_path, "{}").unwrap();
252
253 let watcher = FileWatcher::new(&config_path);
254 assert!(watcher.is_ok());
255
256 let watcher = watcher.unwrap();
257 assert_eq!(watcher.watched_files().len(), 1);
258 assert_eq!(watcher.watched_files()[0], config_path);
259 }
260
261 #[test]
262 fn test_empty_file_watcher() {
263 let watcher = FileWatcher::new_empty();
264 assert!(watcher.is_ok());
265
266 let watcher = watcher.unwrap();
267 assert_eq!(watcher.watched_files().len(), 0);
268 assert!(!watcher.is_watching());
269 }
270
271 #[test]
272 fn test_watch_multiple_files() {
273 let temp_dir = TempDir::new().unwrap();
274 let config1 = temp_dir.path().join("config1.json");
275 let config2 = temp_dir.path().join("config2.yaml");
276
277 fs::write(&config1, "{}").unwrap();
278 fs::write(&config2, "key: value").unwrap();
279
280 let mut watcher = FileWatcher::new_empty().unwrap();
281
282 assert!(watcher.watch_file(&config1).is_ok());
283 assert!(watcher.watch_file(&config2).is_ok());
284
285 assert_eq!(watcher.watched_files().len(), 2);
286 }
287
288 #[test]
289 fn test_watch_nonexistent_file() {
290 let mut watcher = FileWatcher::new_empty().unwrap();
291 let nonexistent = PathBuf::from("/nonexistent/file.json");
292
293 let result = watcher.watch_file(&nonexistent);
294 assert!(result.is_err());
295 assert!(result
296 .unwrap_err()
297 .to_string()
298 .contains("Cannot watch non-existent file"));
299 }
300
301 #[test]
302 fn test_callback_registration() {
303 let temp_dir = TempDir::new().unwrap();
304 let config_path = temp_dir.path().join("config.json");
305 fs::write(&config_path, "{}").unwrap();
306
307 let watcher = FileWatcher::new(&config_path).unwrap();
308
309 let callback_called = Arc::new(Mutex::new(false));
310 let callback_called_clone = Arc::clone(&callback_called);
311
312 let result = watcher.on_config_change(move || {
313 *callback_called_clone.lock().unwrap() = true;
314 });
315
316 assert!(result.is_ok());
317 }
318
319 #[test]
320 fn test_unwatch_file() {
321 let temp_dir = TempDir::new().unwrap();
322 let config_path = temp_dir.path().join("config.json");
323 fs::write(&config_path, "{}").unwrap();
324
325 let mut watcher = FileWatcher::new(&config_path).unwrap();
326 assert_eq!(watcher.watched_files().len(), 1);
327
328 assert!(watcher.unwatch_file(&config_path).is_ok());
329 assert_eq!(watcher.watched_files().len(), 0);
330 }
331
332 #[test]
333 fn test_file_change_detection() {
334 let temp_dir = TempDir::new().unwrap();
335 let config_path = temp_dir.path().join("config.json");
336 fs::write(&config_path, r#"{"key": "value1"}"#).unwrap();
337
338 let watcher = FileWatcher::new(&config_path).unwrap();
339
340 std::thread::spawn({
342 let config_path = config_path.clone();
343 move || {
344 std::thread::sleep(Duration::from_millis(50));
345 fs::write(&config_path, r#"{"key": "value2"}"#).unwrap();
346 }
347 });
348
349 let result = watcher.check_for_changes(Duration::from_millis(200));
351 assert!(result.is_ok());
352 }
354
355 #[test]
356 fn test_multiple_callbacks() {
357 let temp_dir = TempDir::new().unwrap();
358 let config_path = temp_dir.path().join("config.json");
359 fs::write(&config_path, "{}").unwrap();
360
361 let watcher = FileWatcher::new(&config_path).unwrap();
362
363 let callback1_called = Arc::new(Mutex::new(false));
364 let callback2_called = Arc::new(Mutex::new(false));
365
366 let callback1_called_clone = Arc::clone(&callback1_called);
367 let callback2_called_clone = Arc::clone(&callback2_called);
368
369 watcher
371 .on_config_change(move || {
372 *callback1_called_clone.lock().unwrap() = true;
373 })
374 .unwrap();
375
376 watcher
377 .on_config_change(move || {
378 *callback2_called_clone.lock().unwrap() = true;
379 })
380 .unwrap();
381
382 if let Ok(callbacks_guard) = watcher.callbacks.lock() {
385 for callback in callbacks_guard.iter() {
386 callback();
387 }
388 }
389
390 assert!(*callback1_called.lock().unwrap());
392 assert!(*callback2_called.lock().unwrap());
393 }
394
395 #[test]
396 fn test_start_stop_watching() {
397 let temp_dir = TempDir::new().unwrap();
398 let config_path = temp_dir.path().join("config.json");
399 fs::write(&config_path, "{}").unwrap();
400
401 let mut watcher = FileWatcher::new(&config_path).unwrap();
402 assert!(!watcher.is_watching());
403
404 watcher.start_watching().unwrap();
406 assert!(watcher.is_watching());
407
408 watcher.stop_watching();
410 assert!(!watcher.is_watching());
411 }
412
413 #[test]
414 fn test_callback_error_handling() {
415 let temp_dir = TempDir::new().unwrap();
416 let config_path = temp_dir.path().join("config.json");
417 fs::write(&config_path, "{}").unwrap();
418
419 let watcher = FileWatcher::new(&config_path).unwrap();
420
421 let result = watcher.on_config_change(|| {
423 });
425
426 assert!(result.is_ok());
427 }
428}