Skip to main content

forgekit_core/
watcher.rs

1//! File watching for hot-reload and incremental updates.
2//!
3//! This module provides file system monitoring using the `notify` crate,
4//! enabling automatic detection of code changes for incremental indexing.
5
6use std::path::PathBuf;
7use std::sync::Arc;
8use tokio::sync::mpsc;
9
10use crate::storage::UnifiedGraphStore;
11
12/// File system event emitted by the watcher.
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub enum WatchEvent {
15    /// A new file was created.
16    Created(PathBuf),
17    /// An existing file was modified.
18    Modified(PathBuf),
19    /// A file was deleted.
20    Deleted(PathBuf),
21    /// An error occurred while watching.
22    Error(String),
23}
24
25/// File watcher for monitoring codebase changes.
26///
27/// The `Watcher` spawns a background task that monitors the specified
28/// directory and emits events via a channel. Events are consumed by
29/// the incremental indexer for hot-reload capability.
30#[derive(Clone, Debug)]
31pub struct Watcher {
32    /// The graph store (for future use in event correlation).
33    _store: Arc<UnifiedGraphStore>,
34    /// Channel sender for watch events.
35    sender: mpsc::UnboundedSender<WatchEvent>,
36    /// The underlying notify watcher (kept alive to continue watching).
37    inner: WatchHandle,
38}
39
40/// Notify watcher handle, kept behind a parking-lot mutex so it can be swapped
41/// (e.g. paused) from any thread without risking poison on a panic.
42type WatchHandle = Arc<parking_lot::Mutex<Option<notify::RecommendedWatcher>>>;
43
44impl Watcher {
45    /// Creates a new watcher instance.
46    ///
47    /// # Arguments
48    ///
49    /// * `store` - The graph store for event correlation
50    /// * `sender` - Channel to send watch events
51    pub fn new(store: Arc<UnifiedGraphStore>, sender: mpsc::UnboundedSender<WatchEvent>) -> Self {
52        Self {
53            _store: store,
54            sender,
55            inner: Arc::new(parking_lot::Mutex::new(None)),
56        }
57    }
58
59    /// Starts watching the specified directory.
60    ///
61    /// Spawns a background task that recursively watches the directory
62    /// and emits events for file system changes.
63    ///
64    /// # Arguments
65    ///
66    /// * `path` - Directory path to watch
67    ///
68    /// # Returns
69    ///
70    /// `Ok(())` if watching started successfully, or an error.
71    ///
72    /// # Errors
73    ///
74    /// Returns an error if the directory cannot be watched.
75    pub async fn start(&self, path: PathBuf) -> notify::Result<()> {
76        use notify::{RecommendedWatcher, RecursiveMode, Watcher as _};
77
78        let sender = self.sender.clone();
79
80        // Create event handler function
81        let mut last_event = std::time::Instant::now();
82        let mut last_path: Option<PathBuf> = None;
83
84        let event_handler = move |res: notify::Result<notify::Event>| {
85            // Debounce: ignore events within 100ms of same path
86            let now = std::time::Instant::now();
87
88            match res {
89                Ok(event) => {
90                    for path in event.paths {
91                        // Check debounce
92                        if let Some(last) = &last_path {
93                            if last == &path && now.duration_since(last_event).as_millis() < 100 {
94                                continue;
95                            }
96                        }
97
98                        let watch_event = match event.kind {
99                            notify::EventKind::Create(_) => WatchEvent::Created(path.clone()),
100                            notify::EventKind::Modify(_) => WatchEvent::Modified(path.clone()),
101                            notify::EventKind::Remove(_) => WatchEvent::Deleted(path.clone()),
102                            _ => continue,
103                        };
104
105                        last_path = Some(path);
106                        last_event = now;
107
108                        let _ = sender.send(watch_event);
109                    }
110                }
111                Err(e) => {
112                    let _ = sender.send(WatchEvent::Error(e.to_string()));
113                }
114            }
115        };
116
117        // Create watcher and store it to keep alive
118        let mut watcher = RecommendedWatcher::new(event_handler, notify::Config::default())?;
119        watcher.watch(&path, RecursiveMode::Recursive)?;
120
121        // Store the watcher to keep it alive
122        *self.inner.lock() = Some(watcher);
123
124        Ok(())
125    }
126
127    /// Creates a new channel pair for watch events.
128    ///
129    /// # Returns
130    ///
131    /// A tuple of (sender, receiver) for watch events.
132    pub fn channel() -> (
133        mpsc::UnboundedSender<WatchEvent>,
134        mpsc::UnboundedReceiver<WatchEvent>,
135    ) {
136        mpsc::unbounded_channel()
137    }
138}
139
140#[cfg(test)]
141mod tests {
142    use super::*;
143    use crate::storage::UnifiedGraphStore;
144    use tempfile::TempDir;
145    use tokio::time::{timeout, Duration};
146
147    #[tokio::test]
148    async fn test_watcher_creation() {
149        let (tx, _rx) = Watcher::channel();
150        let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
151        let watcher = Watcher::new(store, tx);
152
153        // Watcher should be clonable
154        let _ = watcher.clone();
155    }
156
157    #[tokio::test]
158    async fn test_watcher_channel() {
159        let (tx, mut rx) = Watcher::channel();
160
161        // Send an event
162        let path = PathBuf::from("/test/file.rs");
163        tx.send(WatchEvent::Created(path.clone())).unwrap();
164
165        // Receive it
166        let received = rx.recv().await.unwrap();
167        assert_eq!(received, WatchEvent::Created(path));
168    }
169
170    #[tokio::test]
171    async fn test_watch_event_equality() {
172        let path = PathBuf::from("/test/file.rs");
173
174        assert_eq!(
175            WatchEvent::Created(path.clone()),
176            WatchEvent::Created(path.clone())
177        );
178        assert_eq!(
179            WatchEvent::Modified(path.clone()),
180            WatchEvent::Modified(path.clone())
181        );
182        assert_eq!(
183            WatchEvent::Deleted(path),
184            WatchEvent::Deleted(PathBuf::from("/test/file.rs"))
185        );
186        assert_ne!(
187            WatchEvent::Created(PathBuf::from("/a.rs")),
188            WatchEvent::Created(PathBuf::from("/b.rs"))
189        );
190    }
191
192    #[tokio::test]
193    async fn test_watcher_create_event() {
194        let temp_dir = TempDir::new().unwrap();
195        let (tx, mut rx) = Watcher::channel();
196        let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
197        let watcher = Watcher::new(store, tx);
198
199        // Start watching the temp directory
200        watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
201
202        // Give the watcher a moment to start
203        tokio::time::sleep(Duration::from_millis(100)).await;
204
205        // Create a test file
206        let test_file = temp_dir.path().join("test_create.rs");
207        tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
208
209        // Wait for the create event
210        let event = timeout(Duration::from_secs(2), rx.recv())
211            .await
212            .expect("Timeout waiting for create event")
213            .expect("No event received");
214
215        assert!(matches!(event, WatchEvent::Created(path) if path == test_file));
216    }
217
218    #[tokio::test]
219    async fn test_watcher_modify_event() {
220        let temp_dir = TempDir::new().unwrap();
221        let (tx, mut rx) = Watcher::channel();
222        let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
223        let watcher = Watcher::new(store, tx);
224
225        // Start watching the temp directory
226        watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
227
228        // Give the watcher a moment to start
229        tokio::time::sleep(Duration::from_millis(200)).await;
230
231        // Create a test file first
232        let test_file = temp_dir.path().join("test_modify.rs");
233        tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
234
235        // Wait for the create event and discard it
236        let _ = timeout(Duration::from_secs(2), rx.recv())
237            .await
238            .expect("Timeout waiting for create event");
239
240        // Give the file system time to settle
241        tokio::time::sleep(Duration::from_millis(300)).await;
242
243        // Modify the file
244        tokio::fs::write(&test_file, "fn test() { println!(\"modified\"); }")
245            .await
246            .unwrap();
247
248        // Wait for the modify event (with longer timeout for file system)
249        let event = timeout(Duration::from_secs(3), rx.recv())
250            .await
251            .expect("Timeout waiting for modify event")
252            .expect("No event received");
253
254        assert!(matches!(event, WatchEvent::Modified(path) if path == test_file));
255    }
256
257    #[tokio::test]
258    async fn test_watcher_delete_event() {
259        let temp_dir = TempDir::new().unwrap();
260        let (tx, mut rx) = Watcher::channel();
261        let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
262        let watcher = Watcher::new(store, tx);
263
264        // Start watching the temp directory
265        watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
266
267        // Give the watcher a moment to start
268        tokio::time::sleep(Duration::from_millis(200)).await;
269
270        // Create a test file first
271        let test_file = temp_dir.path().join("test_delete.rs");
272        tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
273
274        // Wait for the create event and discard it
275        let _ = timeout(Duration::from_secs(2), rx.recv())
276            .await
277            .expect("Timeout waiting for create event");
278
279        // Give the file system time to settle
280        tokio::time::sleep(Duration::from_millis(300)).await;
281
282        // Delete the file
283        tokio::fs::remove_file(&test_file).await.unwrap();
284
285        // Wait for the delete event (with longer timeout for file system)
286        let event = timeout(Duration::from_secs(3), rx.recv())
287            .await
288            .expect("Timeout waiting for delete event")
289            .expect("No event received");
290
291        assert!(matches!(event, WatchEvent::Deleted(path) if path == test_file));
292    }
293
294    #[tokio::test]
295    async fn test_watcher_recursive_watching() {
296        let temp_dir = TempDir::new().unwrap();
297        let (tx, mut rx) = Watcher::channel();
298        let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
299        let watcher = Watcher::new(store, tx);
300
301        // Start watching the temp directory (should be recursive)
302        watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
303
304        // Give the watcher a moment to start
305        tokio::time::sleep(Duration::from_millis(100)).await;
306
307        // Create a subdirectory
308        let subdir = temp_dir.path().join("subdir");
309        tokio::fs::create_dir(&subdir).await.unwrap();
310
311        // Wait for the directory create event and discard it (if any)
312        let _ = timeout(Duration::from_secs(1), rx.recv()).await;
313
314        // Give a brief moment for the file system to settle
315        tokio::time::sleep(Duration::from_millis(50)).await;
316
317        // Create a file in the subdirectory
318        let test_file = subdir.join("nested.rs");
319        tokio::fs::write(&test_file, "fn nested() {}")
320            .await
321            .unwrap();
322
323        // Wait for the create event from the subdirectory
324        let event = timeout(Duration::from_secs(2), rx.recv())
325            .await
326            .expect("Timeout waiting for nested create event")
327            .expect("No event received");
328
329        assert!(matches!(event, WatchEvent::Created(path) if path == test_file));
330    }
331
332    #[tokio::test]
333    async fn test_watcher_multiple_events() {
334        let temp_dir = TempDir::new().unwrap();
335        let (tx, mut rx) = Watcher::channel();
336        let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
337        let watcher = Watcher::new(store, tx);
338
339        // Start watching the temp directory
340        watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
341
342        // Give the watcher a moment to start
343        tokio::time::sleep(Duration::from_millis(200)).await;
344
345        let test_file = temp_dir.path().join("test_multiple.rs");
346
347        // Create
348        tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
349
350        // Wait for create event
351        let event1 = timeout(Duration::from_secs(3), rx.recv())
352            .await
353            .expect("Timeout waiting for create event")
354            .expect("No event received");
355        assert!(matches!(event1, WatchEvent::Created(_)));
356
357        // Give the file system time to settle
358        tokio::time::sleep(Duration::from_millis(300)).await;
359
360        // Modify
361        tokio::fs::write(&test_file, "fn test() { println!(\"modified\"); }")
362            .await
363            .unwrap();
364
365        // Wait for modify event (with longer timeout)
366        let event2 = timeout(Duration::from_secs(3), rx.recv())
367            .await
368            .expect("Timeout waiting for modify event")
369            .expect("No event received");
370        assert!(matches!(event2, WatchEvent::Modified(_)));
371
372        // Give the file system time to settle
373        tokio::time::sleep(Duration::from_millis(300)).await;
374
375        // Delete
376        tokio::fs::remove_file(&test_file).await.unwrap();
377
378        // Wait for delete event (with longer timeout)
379        let event3 = timeout(Duration::from_secs(3), rx.recv())
380            .await
381            .expect("Timeout waiting for delete event")
382            .expect("No event received");
383        assert!(matches!(event3, WatchEvent::Deleted(_)));
384
385        // Verify all events were for the same file
386        if let WatchEvent::Created(p1) = event1 {
387            if let WatchEvent::Modified(p2) = event2 {
388                if let WatchEvent::Deleted(p3) = event3 {
389                    assert_eq!(p1, test_file);
390                    assert_eq!(p2, test_file);
391                    assert_eq!(p3, test_file);
392                    return;
393                }
394            }
395        }
396        panic!("Events did not match expected sequence");
397    }
398
399    #[tokio::test]
400    async fn test_watcher_debounce() {
401        let temp_dir = TempDir::new().unwrap();
402        let (tx, mut rx) = Watcher::channel();
403        let store = Arc::new(UnifiedGraphStore::memory().await.unwrap());
404        let watcher = Watcher::new(store, tx);
405
406        // Start watching the temp directory
407        watcher.start(temp_dir.path().to_path_buf()).await.unwrap();
408
409        // Give the watcher a moment to start
410        tokio::time::sleep(Duration::from_millis(100)).await;
411
412        // Create a test file first
413        let test_file = temp_dir.path().join("test_debounce.rs");
414        tokio::fs::write(&test_file, "fn test() {}").await.unwrap();
415
416        // Wait for the create event and discard it
417        let _ = timeout(Duration::from_secs(2), rx.recv())
418            .await
419            .expect("Timeout waiting for create event");
420
421        // Rapidly modify the same file 3 times
422        for i in 0..3 {
423            tokio::fs::write(&test_file, format!("fn test() {{ println!(\"{}\"); }}", i))
424                .await
425                .unwrap();
426            // Small delay between writes (less than debounce threshold of 100ms)
427            tokio::time::sleep(Duration::from_millis(20)).await;
428        }
429
430        // Collect events for a short period
431        let mut events = Vec::new();
432        let start = std::time::Instant::now();
433
434        while start.elapsed() < Duration::from_secs(1) {
435            match timeout(Duration::from_millis(100), rx.recv()).await {
436                Ok(Some(event)) => {
437                    if matches!(event, WatchEvent::Modified(_)) {
438                        events.push(event);
439                    }
440                }
441                _ => break,
442            }
443        }
444
445        // Due to debouncing (100ms threshold), we should receive fewer than 3 events
446        // The exact number depends on timing, but it should be less than 3
447        assert!(
448            events.len() < 3,
449            "Expected fewer than 3 events due to debouncing, got {}",
450            events.len()
451        );
452
453        // Verify the last event represents the final state
454        if let Some(WatchEvent::Modified(path)) = events.last() {
455            assert_eq!(path, &test_file);
456        }
457    }
458}