use std::path::PathBuf;
use std::time::Duration;
use anyhow::{Context, Result};
use notify::{RecommendedWatcher, RecursiveMode};
use notify_debouncer_mini::{new_debouncer, DebounceEventResult, Debouncer};
use tokio::sync::mpsc::UnboundedSender;
const DEBOUNCE_MS: u64 = 500;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WatchEvent {
Modified(PathBuf),
Removed(PathBuf),
}
pub struct FileWatcher {
_debouncer: Debouncer<RecommendedWatcher>,
}
impl FileWatcher {
pub fn start(root_path: PathBuf, tx: UnboundedSender<WatchEvent>) -> Result<Self> {
let mut debouncer = new_debouncer(
Duration::from_millis(DEBOUNCE_MS),
move |res: DebounceEventResult| match res {
Ok(events) => {
for ev in events {
let path = ev.path.clone();
let event = if path.exists() {
WatchEvent::Modified(path)
} else {
WatchEvent::Removed(path)
};
let _ = tx.send(event);
}
}
Err(err) => {
tracing::warn!(?err, "filesystem watcher error");
}
},
)
.context("create notify debouncer")?;
debouncer
.watcher()
.watch(&root_path, RecursiveMode::Recursive)
.with_context(|| format!("watch path {}", root_path.display()))?;
Ok(Self {
_debouncer: debouncer,
})
}
pub fn stop(self) {
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::fs;
use std::time::Duration;
use tokio::sync::mpsc;
use tokio::time::timeout;
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn modified_event_emitted_within_one_second() {
let dir = tempfile::tempdir().expect("tempdir");
let (tx, mut rx) = mpsc::unbounded_channel();
let _watcher = FileWatcher::start(dir.path().to_path_buf(), tx).expect("watcher starts");
tokio::time::sleep(Duration::from_millis(100)).await;
let file_path = dir.path().join("hello.txt");
fs::write(&file_path, b"hello").expect("write file");
let event = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("event arrives within 2s")
.expect("channel still open");
match event {
WatchEvent::Modified(p) => {
assert!(
p.ends_with("hello.txt"),
"expected path to end with hello.txt, got {p:?}"
);
}
other => panic!("expected Modified, got {other:?}"),
}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn removed_event_emitted_on_delete() {
let dir = tempfile::tempdir().expect("tempdir");
let file_path = dir.path().join("doomed.txt");
fs::write(&file_path, b"transient").expect("write file");
let (tx, mut rx) = mpsc::unbounded_channel();
let _watcher = FileWatcher::start(dir.path().to_path_buf(), tx).expect("watcher starts");
tokio::time::sleep(Duration::from_millis(100)).await;
fs::remove_file(&file_path).expect("delete file");
let deadline = tokio::time::Instant::now() + Duration::from_secs(2);
loop {
let remaining = deadline.saturating_duration_since(tokio::time::Instant::now());
let event = timeout(remaining, rx.recv())
.await
.expect("event arrives before deadline")
.expect("channel still open");
if let WatchEvent::Removed(p) = event {
if p.ends_with("doomed.txt") {
return;
}
}
}
}
}