use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::mpsc;
use crate::storage::UnifiedGraphStore;
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum WatchEvent {
Created(PathBuf),
Modified(PathBuf),
Deleted(PathBuf),
Error(String),
}
#[derive(Clone, Debug)]
pub struct Watcher {
_store: Arc<UnifiedGraphStore>,
sender: mpsc::UnboundedSender<WatchEvent>,
inner: WatchHandle,
}
type WatchHandle = Arc<parking_lot::Mutex<Option<notify::RecommendedWatcher>>>;
impl Watcher {
pub fn new(store: Arc<UnifiedGraphStore>, sender: mpsc::UnboundedSender<WatchEvent>) -> Self {
Self {
_store: store,
sender,
inner: Arc::new(parking_lot::Mutex::new(None)),
}
}
pub async fn start(&self, path: PathBuf) -> notify::Result<()> {
use notify::{RecommendedWatcher, RecursiveMode, Watcher as _};
let sender = self.sender.clone();
let mut last_event = std::time::Instant::now();
let mut last_path: Option<PathBuf> = None;
let event_handler = move |res: notify::Result<notify::Event>| {
let now = std::time::Instant::now();
match res {
Ok(event) => {
for path in event.paths {
if let Some(last) = &last_path {
if last == &path && now.duration_since(last_event).as_millis() < 100 {
continue;
}
}
let watch_event = match event.kind {
notify::EventKind::Create(_) => WatchEvent::Created(path.clone()),
notify::EventKind::Modify(_) => WatchEvent::Modified(path.clone()),
notify::EventKind::Remove(_) => WatchEvent::Deleted(path.clone()),
_ => continue,
};
last_path = Some(path);
last_event = now;
let _ = sender.send(watch_event);
}
}
Err(e) => {
let _ = sender.send(WatchEvent::Error(e.to_string()));
}
}
};
let mut watcher = RecommendedWatcher::new(event_handler, notify::Config::default())?;
watcher.watch(&path, RecursiveMode::Recursive)?;
*self.inner.lock() = Some(watcher);
Ok(())
}
pub fn channel() -> (
mpsc::UnboundedSender<WatchEvent>,
mpsc::UnboundedReceiver<WatchEvent>,
) {
mpsc::unbounded_channel()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::storage::UnifiedGraphStore;
use tempfile::TempDir;
use tokio::time::{timeout, Duration};
#[tokio::test]
async fn test_watcher_creation() {
let (tx, _rx) = Watcher::channel();
let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
let watcher = Watcher::new(store, tx);
let _ = watcher.clone();
}
#[tokio::test]
async fn test_watcher_channel() {
let (tx, mut rx) = Watcher::channel();
let path = PathBuf::from("/test/file.rs");
tx.send(WatchEvent::Created(path.clone())).unwrap();
let received = rx.recv().await.unwrap();
assert_eq!(received, WatchEvent::Created(path));
}
#[tokio::test]
async fn test_watch_event_equality() {
let path = PathBuf::from("/test/file.rs");
assert_eq!(
WatchEvent::Created(path.clone()),
WatchEvent::Created(path.clone())
);
assert_eq!(
WatchEvent::Modified(path.clone()),
WatchEvent::Modified(path.clone())
);
assert_eq!(
WatchEvent::Deleted(path),
WatchEvent::Deleted(PathBuf::from("/test/file.rs"))
);
assert_ne!(
WatchEvent::Created(PathBuf::from("/a.rs")),
WatchEvent::Created(PathBuf::from("/b.rs"))
);
}
#[tokio::test]
async fn test_watcher_create_event() {
let temp_dir = TempDir::new().unwrap();
let (tx, mut rx) = Watcher::channel();
let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
let watcher = Watcher::new(store, tx);
watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let test_file = temp_dir.path().join("test_create.rs");
tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
let event = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("Timeout waiting for create event")
.expect("No event received");
assert!(matches!(event, WatchEvent::Created(path) if path == test_file));
}
#[tokio::test]
async fn test_watcher_modify_event() {
let temp_dir = TempDir::new().unwrap();
let (tx, mut rx) = Watcher::channel();
let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
let watcher = Watcher::new(store, tx);
watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let test_file = temp_dir.path().join("test_modify.rs");
tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
let _ = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("Timeout waiting for create event");
tokio::time::sleep(Duration::from_millis(300)).await;
tokio::fs::write(&test_file, "fn test() { println!(\"modified\"); }")
.await
.unwrap();
let event = timeout(Duration::from_secs(3), rx.recv())
.await
.expect("Timeout waiting for modify event")
.expect("No event received");
assert!(matches!(event, WatchEvent::Modified(path) if path == test_file));
}
#[tokio::test]
async fn test_watcher_delete_event() {
let temp_dir = TempDir::new().unwrap();
let (tx, mut rx) = Watcher::channel();
let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
let watcher = Watcher::new(store, tx);
watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let test_file = temp_dir.path().join("test_delete.rs");
tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
let _ = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("Timeout waiting for create event");
tokio::time::sleep(Duration::from_millis(300)).await;
tokio::fs::remove_file(&test_file).await.unwrap();
let event = timeout(Duration::from_secs(3), rx.recv())
.await
.expect("Timeout waiting for delete event")
.expect("No event received");
assert!(matches!(event, WatchEvent::Deleted(path) if path == test_file));
}
#[tokio::test]
async fn test_watcher_recursive_watching() {
let temp_dir = TempDir::new().unwrap();
let (tx, mut rx) = Watcher::channel();
let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
let watcher = Watcher::new(store, tx);
watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let subdir = temp_dir.path().join("subdir");
tokio::fs::create_dir(&subdir).await.unwrap();
let _ = timeout(Duration::from_secs(1), rx.recv()).await;
tokio::time::sleep(Duration::from_millis(50)).await;
let test_file = subdir.join("nested.rs");
tokio::fs::write(&test_file, "fn nested() {}")
.await
.unwrap();
let event = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("Timeout waiting for nested create event")
.expect("No event received");
assert!(matches!(event, WatchEvent::Created(path) if path == test_file));
}
#[tokio::test]
async fn test_watcher_multiple_events() {
let temp_dir = TempDir::new().unwrap();
let (tx, mut rx) = Watcher::channel();
let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
let watcher = Watcher::new(store, tx);
watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
tokio::time::sleep(Duration::from_millis(200)).await;
let test_file = temp_dir.path().join("test_multiple.rs");
tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
let event1 = timeout(Duration::from_secs(3), rx.recv())
.await
.expect("Timeout waiting for create event")
.expect("No event received");
assert!(matches!(event1, WatchEvent::Created(_)));
tokio::time::sleep(Duration::from_millis(300)).await;
tokio::fs::write(&test_file, "fn test() { println!(\"modified\"); }")
.await
.unwrap();
let event2 = timeout(Duration::from_secs(3), rx.recv())
.await
.expect("Timeout waiting for modify event")
.expect("No event received");
assert!(matches!(event2, WatchEvent::Modified(_)));
tokio::time::sleep(Duration::from_millis(300)).await;
tokio::fs::remove_file(&test_file).await.unwrap();
let event3 = timeout(Duration::from_secs(3), rx.recv())
.await
.expect("Timeout waiting for delete event")
.expect("No event received");
assert!(matches!(event3, WatchEvent::Deleted(_)));
if let WatchEvent::Created(p1) = event1 {
if let WatchEvent::Modified(p2) = event2 {
if let WatchEvent::Deleted(p3) = event3 {
assert_eq!(p1, test_file);
assert_eq!(p2, test_file);
assert_eq!(p3, test_file);
return;
}
}
}
panic!("Events did not match expected sequence");
}
#[tokio::test]
async fn test_watcher_debounce() {
let temp_dir = TempDir::new().unwrap();
let (tx, mut rx) = Watcher::channel();
let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
let watcher = Watcher::new(store, tx);
watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
tokio::time::sleep(Duration::from_millis(100)).await;
let test_file = temp_dir.path().join("test_debounce.rs");
tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
let _ = timeout(Duration::from_secs(2), rx.recv())
.await
.expect("Timeout waiting for create event");
for i in 0..3 {
tokio::fs::write(&test_file, format!("fn test() {{ println!(\"{}\"); }}", i))
.await
.unwrap();
tokio::time::sleep(Duration::from_millis(20)).await;
}
let mut events = Vec::new();
let start = std::time::Instant::now();
while start.elapsed() < Duration::from_secs(1) {
match timeout(Duration::from_millis(100), rx.recv()).await {
Ok(Some(event)) => {
if matches!(event, WatchEvent::Modified(_)) {
events.push(event);
}
}
_ => break,
}
}
assert!(
events.len() < 3,
"Expected fewer than 3 events due to debouncing, got {}",
events.len()
);
if let Some(WatchEvent::Modified(path)) = events.last() {
assert_eq!(path, &test_file);
}
}
}