use crate::traits::{ChangeDetector, ChangeEvent};
use crate::PersistenceResult;
use chrono::Utc;
use notify::{RecursiveMode, Watcher};
use std::path::PathBuf;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use tokio::sync::broadcast;
use tokio::sync::Mutex as TokioMutex;
#[derive(Clone)]
pub struct FileWatcher {
tx: broadcast::Sender<ChangeEvent>,
task_handle: Arc<TokioMutex<Option<tokio::task::JoinHandle<()>>>>,
suppress_remaining: Arc<AtomicUsize>,
}
impl FileWatcher {
pub fn new() -> Self {
let (tx, _) = broadcast::channel(10);
Self {
tx,
task_handle: Arc::new(TokioMutex::new(None)),
suppress_remaining: Arc::new(AtomicUsize::new(0)),
}
}
#[doc(hidden)]
pub fn suppress_remaining(&self) -> usize {
self.suppress_remaining.load(Ordering::SeqCst)
}
pub fn suppress_next_event(&self) {
self.suppress_remaining.store(2, Ordering::SeqCst);
tracing::debug!("File watcher suppress counter set to 2");
}
}
impl Default for FileWatcher {
fn default() -> Self {
Self::new()
}
}
#[async_trait::async_trait]
impl ChangeDetector for FileWatcher {
async fn start_watching(&self, path: PathBuf) -> PersistenceResult<()> {
let tx = self.tx.clone();
let task_handle = self.task_handle.clone();
let suppress_remaining = self.suppress_remaining.clone();
let canonical_path = tokio::fs::canonicalize(&path).await?;
let handle = tokio::spawn(async move {
let parent = canonical_path
.parent()
.expect("Canonicalized path should always have parent")
.to_path_buf();
let watch_path = canonical_path.clone();
let suppress_remaining_clone = suppress_remaining.clone();
match notify::recommended_watcher(move |res: notify::Result<notify::Event>| match res {
Ok(event) => {
let is_relevant_event = matches!(
event.kind,
notify::EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content,
)) | notify::EventKind::Modify(notify::event::ModifyKind::Name(_),)
| notify::EventKind::Create(_)
| notify::EventKind::Remove(_)
);
let has_our_file = event.paths.iter().any(|p| p == &watch_path);
if is_relevant_event {
tracing::debug!(
"File system event detected: kind={:?}, paths={:?}, has_our_file={}",
event.kind,
event.paths,
has_our_file
);
}
if is_relevant_event && has_our_file {
let suppressed = suppress_remaining_clone
.fetch_update(Ordering::SeqCst, Ordering::SeqCst, |n| n.checked_sub(1))
.is_ok();
if suppressed {
tracing::debug!(
"Own-write event suppressed (counter): kind={:?}, path={}",
event.kind,
watch_path.display()
);
return;
}
tracing::debug!(
"File event detected: kind={:?}, path={}, our_file_exists={}",
event.kind,
watch_path.display(),
watch_path.exists()
);
let change = ChangeEvent {
path: watch_path.clone(),
detected_at: Utc::now(),
};
match tx.send(change) {
Ok(receiver_count) => {
tracing::debug!(
"File change event sent to {} receivers",
receiver_count
);
}
Err(e) => {
tracing::warn!("Failed to send file change event: {}", e);
}
}
}
}
Err(e) => {
tracing::warn!("File watcher error: {}", e);
}
}) {
Ok(mut watcher) => {
let watch_result = watcher.watch(&parent, RecursiveMode::NonRecursive);
if watch_result.is_err() {
if let Err(e) = watcher.watch(&canonical_path, RecursiveMode::NonRecursive)
{
tracing::error!("Failed to watch file or parent directory: {}", e);
return;
}
tracing::info!("Watching file: {}", canonical_path.display());
} else {
tracing::info!("Watching parent directory: {}", parent.display());
}
std::future::pending::<()>().await;
}
Err(e) => {
tracing::error!("Failed to create watcher: {}", e);
}
}
});
let mut guard = task_handle.lock().await;
*guard = Some(handle);
Ok(())
}
async fn stop_watching(&self) -> PersistenceResult<()> {
let mut guard = self.task_handle.lock().await;
if let Some(handle) = guard.take() {
handle.abort();
tracing::info!("Stopped file watching");
}
Ok(())
}
fn subscribe(&self) -> broadcast::Receiver<ChangeEvent> {
self.tx.subscribe()
}
fn is_watching(&self) -> bool {
true
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
use tokio::time::{sleep, Duration};
#[tokio::test]
async fn test_file_watcher_detects_direct_writes() {
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.json");
tokio::fs::write(&file_path, b"initial content")
.await
.unwrap();
let watcher = FileWatcher::new();
let mut rx = watcher.subscribe();
watcher.start_watching(file_path.clone()).await.unwrap();
sleep(Duration::from_millis(100)).await;
tokio::fs::write(&file_path, b"modified content")
.await
.unwrap();
let result = tokio::time::timeout(Duration::from_secs(2), rx.recv()).await;
watcher.stop_watching().await.unwrap();
if let Ok(Ok(event)) = result {
let expected_path = tokio::fs::canonicalize(&file_path)
.await
.unwrap_or(file_path.clone());
let event_path = tokio::fs::canonicalize(&event.path)
.await
.unwrap_or(event.path.clone());
assert_eq!(event_path, expected_path);
}
}
#[tokio::test]
async fn test_file_watcher_detects_atomic_writes() {
use std::fs;
use tempfile::NamedTempFile;
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.json");
tokio::fs::write(&file_path, b"initial content")
.await
.unwrap();
let watcher = FileWatcher::new();
let mut rx = watcher.subscribe();
watcher.start_watching(file_path.clone()).await.unwrap();
sleep(Duration::from_millis(100)).await;
let temp_file = NamedTempFile::new_in(dir.path()).unwrap();
let temp_path = temp_file.path().to_path_buf();
std::fs::write(&temp_path, b"atomic write content").unwrap();
fs::rename(&temp_path, &file_path).unwrap();
let result = tokio::time::timeout(Duration::from_secs(2), rx.recv()).await;
watcher.stop_watching().await.unwrap();
if let Ok(Ok(event)) = result {
let expected_path = tokio::fs::canonicalize(&file_path)
.await
.unwrap_or(file_path.clone());
let event_path = tokio::fs::canonicalize(&event.path)
.await
.unwrap_or(event.path.clone());
assert_eq!(event_path, expected_path);
}
}
#[test]
fn test_suppress_next_event_sets_counter() {
let watcher = FileWatcher::new();
assert_eq!(watcher.suppress_remaining(), 0, "counter must start at 0");
watcher.suppress_next_event();
assert_eq!(
watcher.suppress_remaining(),
2,
"suppress_next_event must set counter to 2"
);
}
#[tokio::test]
async fn test_own_write_decrements_suppress_counter() {
use std::fs;
use tempfile::NamedTempFile;
let dir = tempdir().unwrap();
let file_path = dir.path().join("own.json");
tokio::fs::write(&file_path, b"initial").await.unwrap();
let watcher = FileWatcher::new();
let mut rx = watcher.subscribe();
watcher.start_watching(file_path.clone()).await.unwrap();
sleep(Duration::from_millis(100)).await;
watcher.suppress_next_event();
assert_eq!(watcher.suppress_remaining(), 2);
let temp = NamedTempFile::new_in(dir.path()).unwrap();
std::fs::write(temp.path(), b"own write").unwrap();
fs::rename(temp.path(), &file_path).unwrap();
sleep(Duration::from_millis(150)).await;
assert!(
watcher.suppress_remaining() < 2,
"counter must have been decremented by the OS event"
);
let result = rx.try_recv();
assert!(
result.is_err(),
"no event should reach the channel for an own write; got: {:?}",
result
);
watcher.stop_watching().await.unwrap();
}
#[tokio::test]
async fn test_external_write_delivered_after_counter_exhausted() {
use std::fs;
use tempfile::NamedTempFile;
let dir = tempdir().unwrap();
let file_path = dir.path().join("external.json");
tokio::fs::write(&file_path, b"initial").await.unwrap();
let watcher = FileWatcher::new();
let mut rx = watcher.subscribe();
watcher.start_watching(file_path.clone()).await.unwrap();
sleep(Duration::from_millis(100)).await;
watcher.suppress_next_event();
let temp = NamedTempFile::new_in(dir.path()).unwrap();
std::fs::write(temp.path(), b"own write").unwrap();
fs::rename(temp.path(), &file_path).unwrap();
sleep(Duration::from_millis(150)).await;
let temp2 = NamedTempFile::new_in(dir.path()).unwrap();
std::fs::write(temp2.path(), b"external write").unwrap();
fs::rename(temp2.path(), &file_path).unwrap();
let result = tokio::time::timeout(Duration::from_millis(300), rx.recv()).await;
watcher.stop_watching().await.unwrap();
assert!(
result.is_ok(),
"external write after counter is exhausted must fire an event, got: {:?}",
result
);
}
#[tokio::test]
async fn test_file_watcher_does_not_fire_for_unrelated_temp_file() {
use tempfile::NamedTempFile;
let dir = tempdir().unwrap();
let file_path = dir.path().join("test.json");
tokio::fs::write(&file_path, b"initial content")
.await
.unwrap();
let watcher = FileWatcher::new();
let mut rx = watcher.subscribe();
watcher.start_watching(file_path.clone()).await.unwrap();
sleep(Duration::from_millis(100)).await;
let temp_file = NamedTempFile::new_in(dir.path()).unwrap();
std::fs::write(temp_file.path(), b"unrelated content").unwrap();
let result = tokio::time::timeout(Duration::from_millis(500), rx.recv()).await;
watcher.stop_watching().await.unwrap();
assert!(
result.is_err(),
"Expected timeout (no event), but got: {:?}",
result
);
}
}