1use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9
10use crate::storage::UnifiedGraphStore;
11
12#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum WatchEvent {
15 Created(PathBuf),
17 Modified(PathBuf),
19 Deleted(PathBuf),
21 Error(String),
23}
24
25#[derive(Clone, Debug)]
31pub struct Watcher {
32 _store: Arc<UnifiedGraphStore>,
34 sender: mpsc::UnboundedSender<WatchEvent>,
36 inner: WatchHandle,
38}
39
40type WatchHandle = Arc<parking_lot::Mutex<Option<notify::RecommendedWatcher>>>;
43
44impl Watcher {
45 pub fn new(store: Arc<UnifiedGraphStore>, sender: mpsc::UnboundedSender<WatchEvent>) -> Self {
52 Self {
53 _store: store,
54 sender,
55 inner: Arc::new(parking_lot::Mutex::new(None)),
56 }
57 }
58
59 pub async fn start(&self, path: PathBuf) -> notify::Result<()> {
76 use notify::{RecommendedWatcher, RecursiveMode, Watcher as _};
77
78 let sender = self.sender.clone();
79
80 let mut last_event = std::time::Instant::now();
82 let mut last_path: Option<PathBuf> = None;
83
84 let event_handler = move |res: notify::Result<notify::Event>| {
85 let now = std::time::Instant::now();
87
88 match res {
89 Ok(event) => {
90 for path in event.paths {
91 if let Some(last) = &last_path {
93 if last == &path && now.duration_since(last_event).as_millis() < 100 {
94 continue;
95 }
96 }
97
98 let watch_event = match event.kind {
99 notify::EventKind::Create(_) => WatchEvent::Created(path.clone()),
100 notify::EventKind::Modify(_) => WatchEvent::Modified(path.clone()),
101 notify::EventKind::Remove(_) => WatchEvent::Deleted(path.clone()),
102 _ => continue,
103 };
104
105 last_path = Some(path);
106 last_event = now;
107
108 let _ = sender.send(watch_event);
109 }
110 }
111 Err(e) => {
112 let _ = sender.send(WatchEvent::Error(e.to_string()));
113 }
114 }
115 };
116
117 let mut watcher = RecommendedWatcher::new(event_handler, notify::Config::default())?;
119 watcher.watch(&path, RecursiveMode::Recursive)?;
120
121 *self.inner.lock() = Some(watcher);
123
124 Ok(())
125 }
126
127 pub fn channel() -> (
133 mpsc::UnboundedSender<WatchEvent>,
134 mpsc::UnboundedReceiver<WatchEvent>,
135 ) {
136 mpsc::unbounded_channel()
137 }
138}
139
140#[cfg(test)]
141mod tests {
142 use super::*;
143 use crate::storage::UnifiedGraphStore;
144 use tempfile::TempDir;
145 use tokio::time::{timeout, Duration};
146
147 #[tokio::test]
148 async fn test_watcher_creation() {
149 let (tx, _rx) = Watcher::channel();
150 let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
151 let watcher = Watcher::new(store, tx);
152
153 let _ = watcher.clone();
155 }
156
157 #[tokio::test]
158 async fn test_watcher_channel() {
159 let (tx, mut rx) = Watcher::channel();
160
161 let path = PathBuf::from("/test/file.rs");
163 tx.send(WatchEvent::Created(path.clone())).unwrap();
164
165 let received = rx.recv().await.unwrap();
167 assert_eq!(received, WatchEvent::Created(path));
168 }
169
170 #[tokio::test]
171 async fn test_watch_event_equality() {
172 let path = PathBuf::from("/test/file.rs");
173
174 assert_eq!(
175 WatchEvent::Created(path.clone()),
176 WatchEvent::Created(path.clone())
177 );
178 assert_eq!(
179 WatchEvent::Modified(path.clone()),
180 WatchEvent::Modified(path.clone())
181 );
182 assert_eq!(
183 WatchEvent::Deleted(path),
184 WatchEvent::Deleted(PathBuf::from("/test/file.rs"))
185 );
186 assert_ne!(
187 WatchEvent::Created(PathBuf::from("/a.rs")),
188 WatchEvent::Created(PathBuf::from("/b.rs"))
189 );
190 }
191
192 #[tokio::test]
193 async fn test_watcher_create_event() {
194 let temp_dir = TempDir::new().unwrap();
195 let (tx, mut rx) = Watcher::channel();
196 let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
197 let watcher = Watcher::new(store, tx);
198
199 watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
201
202 tokio::time::sleep(Duration::from_millis(100)).await;
204
205 let test_file = temp_dir.path().join("test_create.rs");
207 tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
208
209 let event = timeout(Duration::from_secs(2), rx.recv())
211 .await
212 .expect("Timeout waiting for create event")
213 .expect("No event received");
214
215 assert!(matches!(event, WatchEvent::Created(path) if path == test_file));
216 }
217
218 #[tokio::test]
219 async fn test_watcher_modify_event() {
220 let temp_dir = TempDir::new().unwrap();
221 let (tx, mut rx) = Watcher::channel();
222 let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
223 let watcher = Watcher::new(store, tx);
224
225 watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
227
228 tokio::time::sleep(Duration::from_millis(200)).await;
230
231 let test_file = temp_dir.path().join("test_modify.rs");
233 tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
234
235 let _ = timeout(Duration::from_secs(2), rx.recv())
237 .await
238 .expect("Timeout waiting for create event");
239
240 tokio::time::sleep(Duration::from_millis(300)).await;
242
243 tokio::fs::write(&test_file, "fn test() { println!(\"modified\"); }")
245 .await
246 .unwrap();
247
248 let event = timeout(Duration::from_secs(3), rx.recv())
250 .await
251 .expect("Timeout waiting for modify event")
252 .expect("No event received");
253
254 assert!(matches!(event, WatchEvent::Modified(path) if path == test_file));
255 }
256
257 #[tokio::test]
258 async fn test_watcher_delete_event() {
259 let temp_dir = TempDir::new().unwrap();
260 let (tx, mut rx) = Watcher::channel();
261 let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
262 let watcher = Watcher::new(store, tx);
263
264 watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
266
267 tokio::time::sleep(Duration::from_millis(200)).await;
269
270 let test_file = temp_dir.path().join("test_delete.rs");
272 tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
273
274 let _ = timeout(Duration::from_secs(2), rx.recv())
276 .await
277 .expect("Timeout waiting for create event");
278
279 tokio::time::sleep(Duration::from_millis(300)).await;
281
282 tokio::fs::remove_file(&test_file).await.unwrap();
284
285 let event = timeout(Duration::from_secs(3), rx.recv())
287 .await
288 .expect("Timeout waiting for delete event")
289 .expect("No event received");
290
291 assert!(matches!(event, WatchEvent::Deleted(path) if path == test_file));
292 }
293
294 #[tokio::test]
295 async fn test_watcher_recursive_watching() {
296 let temp_dir = TempDir::new().unwrap();
297 let (tx, mut rx) = Watcher::channel();
298 let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
299 let watcher = Watcher::new(store, tx);
300
301 watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
303
304 tokio::time::sleep(Duration::from_millis(100)).await;
306
307 let subdir = temp_dir.path().join("subdir");
309 tokio::fs::create_dir(&subdir).await.unwrap();
310
311 let _ = timeout(Duration::from_secs(1), rx.recv()).await;
313
314 tokio::time::sleep(Duration::from_millis(50)).await;
316
317 let test_file = subdir.join("nested.rs");
319 tokio::fs::write(&test_file, "fn nested() {}")
320 .await
321 .unwrap();
322
323 let event = timeout(Duration::from_secs(2), rx.recv())
325 .await
326 .expect("Timeout waiting for nested create event")
327 .expect("No event received");
328
329 assert!(matches!(event, WatchEvent::Created(path) if path == test_file));
330 }
331
332 #[tokio::test]
333 async fn test_watcher_multiple_events() {
334 let temp_dir = TempDir::new().unwrap();
335 let (tx, mut rx) = Watcher::channel();
336 let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
337 let watcher = Watcher::new(store, tx);
338
339 watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
341
342 tokio::time::sleep(Duration::from_millis(200)).await;
344
345 let test_file = temp_dir.path().join("test_multiple.rs");
346
347 tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
349
350 let event1 = timeout(Duration::from_secs(3), rx.recv())
352 .await
353 .expect("Timeout waiting for create event")
354 .expect("No event received");
355 assert!(matches!(event1, WatchEvent::Created(_)));
356
357 tokio::time::sleep(Duration::from_millis(300)).await;
359
360 tokio::fs::write(&test_file, "fn test() { println!(\"modified\"); }")
362 .await
363 .unwrap();
364
365 let event2 = timeout(Duration::from_secs(3), rx.recv())
367 .await
368 .expect("Timeout waiting for modify event")
369 .expect("No event received");
370 assert!(matches!(event2, WatchEvent::Modified(_)));
371
372 tokio::time::sleep(Duration::from_millis(300)).await;
374
375 tokio::fs::remove_file(&test_file).await.unwrap();
377
378 let event3 = timeout(Duration::from_secs(3), rx.recv())
380 .await
381 .expect("Timeout waiting for delete event")
382 .expect("No event received");
383 assert!(matches!(event3, WatchEvent::Deleted(_)));
384
385 if let WatchEvent::Created(p1) = event1 {
387 if let WatchEvent::Modified(p2) = event2 {
388 if let WatchEvent::Deleted(p3) = event3 {
389 assert_eq!(p1, test_file);
390 assert_eq!(p2, test_file);
391 assert_eq!(p3, test_file);
392 return;
393 }
394 }
395 }
396 panic!("Events did not match expected sequence");
397 }
398
399 #[tokio::test]
400 async fn test_watcher_debounce() {
401 let temp_dir = TempDir::new().unwrap();
402 let (tx, mut rx) = Watcher::channel();
403 let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
404 let watcher = Watcher::new(store, tx);
405
406 watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
408
409 tokio::time::sleep(Duration::from_millis(100)).await;
411
412 let test_file = temp_dir.path().join("test_debounce.rs");
414 tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
415
416 let _ = timeout(Duration::from_secs(2), rx.recv())
418 .await
419 .expect("Timeout waiting for create event");
420
421 for i in 0..3 {
423 tokio::fs::write(&test_file, format!("fn test() {{ println!(\"{}\"); }}", i))
424 .await
425 .unwrap();
426 tokio::time::sleep(Duration::from_millis(20)).await;
428 }
429
430 let mut events = Vec::new();
432 let start = std::time::Instant::now();
433
434 while start.elapsed() < Duration::from_secs(1) {
435 match timeout(Duration::from_millis(100), rx.recv()).await {
436 Ok(Some(event)) => {
437 if matches!(event, WatchEvent::Modified(_)) {
438 events.push(event);
439 }
440 }
441 _ => break,
442 }
443 }
444
445 assert!(
448 events.len() < 3,
449 "Expected fewer than 3 events due to debouncing, got {}",
450 events.len()
451 );
452
453 if let Some(WatchEvent::Modified(path)) = events.last() {
455 assert_eq!(path, &test_file);
456 }
457 }
458}