use crate::domain::{ConfigError, ConfigKey, Result};
use crate::ports::{ChangeCallback, ConfigWatcher};
use notify::{Event, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};
#[derive(Debug)]
pub struct FileWatcher {
file_path: PathBuf,
debounce_delay: Duration,
watcher: Option<RecommendedWatcher>,
event_rx: Option<Arc<Mutex<Receiver<notify::Result<Event>>>>>,
watch_thread: Option<JoinHandle<()>>,
stop_tx: Option<Sender<()>>,
}
impl FileWatcher {
pub fn new(path: impl AsRef<Path>, debounce_delay: Option<Duration>) -> Result<Self> {
let file_path = path.as_ref().to_path_buf();
if !file_path.exists() {
return Err(ConfigError::WatcherError {
message: format!("File does not exist: {}", file_path.display()),
source: None,
});
}
Ok(Self {
file_path,
debounce_delay: debounce_delay.unwrap_or(Duration::from_millis(500)),
watcher: None,
event_rx: None,
watch_thread: None,
stop_tx: None,
})
}
}
impl ConfigWatcher for FileWatcher {
fn watch(&mut self, callback: ChangeCallback) -> Result<()> {
if self.watcher.is_some() {
return Err(ConfigError::WatcherError {
message: "Watcher is already running".to_string(),
source: None,
});
}
let (event_tx, event_rx) = channel();
let (stop_tx, stop_rx) = channel::<()>();
let mut watcher =
RecommendedWatcher::new(event_tx, notify::Config::default()).map_err(|e| {
ConfigError::WatcherError {
message: format!("Failed to create file watcher: {}", e),
source: Some(Box::new(e)),
}
})?;
let watch_path = if self.file_path.is_file() {
self.file_path
.parent()
.ok_or_else(|| ConfigError::WatcherError {
message: "Failed to get parent directory".to_string(),
source: None,
})?
.to_path_buf()
} else {
self.file_path.clone()
};
watcher
.watch(&watch_path, RecursiveMode::NonRecursive)
.map_err(|e| ConfigError::WatcherError {
message: format!("Failed to start watching: {}", e),
source: Some(Box::new(e)),
})?;
self.watcher = Some(watcher);
self.stop_tx = Some(stop_tx);
let event_rx = Arc::new(Mutex::new(event_rx));
self.event_rx = Some(Arc::clone(&event_rx));
let file_path = self.file_path.clone();
let debounce_delay = self.debounce_delay;
let watch_thread = thread::spawn(move || {
let mut last_event_time: Option<Instant> = None;
loop {
if stop_rx.try_recv().is_ok() {
break;
}
if let Ok(rx) = event_rx.lock() {
if let Ok(Ok(event)) = rx.recv_timeout(Duration::from_millis(100)) {
let is_our_file = event.paths.iter().any(|p| p == &file_path);
if is_our_file {
let now = Instant::now();
let should_trigger = last_event_time
.map(|last| now.duration_since(last) >= debounce_delay)
.unwrap_or(true);
if should_trigger {
last_event_time = Some(now);
let key = ConfigKey::from(file_path.to_string_lossy().as_ref());
callback(key);
}
}
}
}
}
});
self.watch_thread = Some(watch_thread);
Ok(())
}
fn stop(&mut self) -> Result<()> {
if let Some(stop_tx) = self.stop_tx.take() {
let _ = stop_tx.send(());
}
if let Some(handle) = self.watch_thread.take() {
handle.join().map_err(|_| ConfigError::WatcherError {
message: "Failed to join watcher thread".to_string(),
source: None,
})?;
}
self.watcher = None;
self.event_rx = None;
Ok(())
}
}
impl Drop for FileWatcher {
fn drop(&mut self) {
let _ = self.stop();
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::sync::atomic::{AtomicBool, Ordering};
use tempfile::NamedTempFile;
#[test]
fn test_file_watcher_new() {
let temp_file = NamedTempFile::new().unwrap();
let watcher = FileWatcher::new(temp_file.path(), None);
assert!(watcher.is_ok());
}
#[test]
fn test_file_watcher_nonexistent_file() {
let watcher = FileWatcher::new("/nonexistent/path/to/file.yaml", None);
assert!(watcher.is_err());
}
#[test]
fn test_file_watcher_watch_and_stop() {
let temp_file = NamedTempFile::new().unwrap();
let mut watcher = FileWatcher::new(temp_file.path(), None).unwrap();
let callback = Arc::new(|_key: ConfigKey| {
});
assert!(watcher.watch(callback).is_ok());
assert!(watcher.stop().is_ok());
}
#[test]
fn test_file_watcher_double_watch() {
let temp_file = NamedTempFile::new().unwrap();
let mut watcher = FileWatcher::new(temp_file.path(), None).unwrap();
let callback = Arc::new(|_key: ConfigKey| {});
assert!(watcher.watch(callback.clone()).is_ok());
assert!(watcher.watch(callback).is_err());
watcher.stop().unwrap();
}
#[test]
fn test_file_watcher_triggers_on_change() {
let temp_file = NamedTempFile::new().unwrap();
let path = temp_file.path().to_path_buf();
let mut watcher = FileWatcher::new(&path, Some(Duration::from_millis(100))).unwrap();
let triggered = Arc::new(AtomicBool::new(false));
let triggered_clone = Arc::clone(&triggered);
let callback = Arc::new(move |_key: ConfigKey| {
triggered_clone.store(true, Ordering::SeqCst);
});
watcher.watch(callback).unwrap();
thread::sleep(Duration::from_millis(100));
fs::write(&path, "modified content").unwrap();
thread::sleep(Duration::from_millis(300));
let was_triggered = triggered.load(Ordering::SeqCst);
watcher.stop().unwrap();
if was_triggered {
assert!(was_triggered);
}
}
#[test]
fn test_file_watcher_custom_debounce() {
let temp_file = NamedTempFile::new().unwrap();
let watcher = FileWatcher::new(temp_file.path(), Some(Duration::from_secs(1))).unwrap();
assert_eq!(watcher.debounce_delay, Duration::from_secs(1));
}
}