Skip to main content

sqry_core/watch/
mod.rs

1//! File system watcher for real-time index updates.
2//!
3//! This module provides cross-platform file system monitoring using OS-level APIs:
4//! - **Linux**: inotify (kernel-level, < 1ms latency)
5//! - **macOS**: FSEvents (Apple's file system monitoring)
6//! - **Windows**: ReadDirectoryChangesW
7//!
8//! The watcher detects file changes in real-time and enables "watch mode" for
9//! automatic index updates during development.
10//!
11//! # Usage
12//!
13//! ```rust,ignore
14//! use sqry_core::watch::FileWatcher;
15//! use std::path::Path;
16//!
17//! // Create watcher for a directory
18//! let mut watcher = FileWatcher::new(Path::new("src"))?;
19//!
20//! // Poll for changes (non-blocking)
21//! let changes = watcher.poll_changes();
22//! for change in changes {
23//!     match change {
24//!         FileChange::Created(path) => println!("Created: {:?}", path),
25//!         FileChange::Modified(path) => println!("Modified: {:?}", path),
26//!         FileChange::Deleted(path) => println!("Deleted: {:?}", path),
27//!     }
28//! }
29//!
30//! // Wait for next change (blocking)
31//! let changes = watcher.wait_for_change()?;
32//! ```
33
34pub mod git_state;
35pub mod source_tree;
36
37pub use git_state::{GitChangeClass, GitStateWatcher, LastIndexedGitState};
38pub use source_tree::{ChangeSet, SourceTreeWatcher};
39
40use anyhow::{Context, Result};
41use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
42use std::path::{Path, PathBuf};
43use std::sync::mpsc::{Receiver, TryRecvError, channel};
44use std::time::Duration;
45
46/// Type of file system change
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub enum FileChange {
49    /// File was created
50    Created(PathBuf),
51    /// File was modified
52    Modified(PathBuf),
53    /// File was deleted
54    Deleted(PathBuf),
55}
56
57/// Cross-platform file system watcher
58///
59/// Uses OS-level APIs for efficient real-time file monitoring:
60/// - Linux: inotify
61/// - macOS: `FSEvents`
62/// - Windows: `ReadDirectoryChangesW`
63pub struct FileWatcher {
64    /// Underlying notify watcher
65    _watcher: RecommendedWatcher,
66    /// Channel for receiving file system events
67    receiver: Receiver<Result<Event, notify::Error>>,
68    /// Root path being watched
69    root_path: PathBuf,
70}
71
72impl FileWatcher {
73    /// Create a new file watcher for a directory
74    ///
75    /// The watcher monitors all files recursively under `root_path`.
76    ///
77    /// # Arguments
78    ///
79    /// * `root_path` - Root directory to watch
80    ///
81    /// # Errors
82    ///
83    /// Returns an error if the watcher cannot be created or the path cannot be watched.
84    pub fn new(root_path: &Path) -> Result<Self> {
85        let (tx, rx) = channel();
86
87        let mut watcher = notify::recommended_watcher(move |res| {
88            // Send event to channel (ignore send errors if receiver dropped)
89            let _ = tx.send(res);
90        })
91        .context("Failed to create file system watcher")?;
92
93        // Start watching the directory recursively
94        watcher
95            .watch(root_path, RecursiveMode::Recursive)
96            .with_context(|| format!("Failed to watch directory: {}", root_path.display()))?;
97
98        log::info!("File watcher started for: {}", root_path.display());
99
100        Ok(Self {
101            _watcher: watcher,
102            receiver: rx,
103            root_path: root_path.to_path_buf(),
104        })
105    }
106
107    /// Poll for file changes (non-blocking)
108    ///
109    /// Returns all pending file system events without blocking.
110    /// Use this for periodic polling in a loop.
111    ///
112    /// # Returns
113    ///
114    /// Vector of file changes (empty if no changes)
115    #[must_use]
116    pub fn poll_changes(&self) -> Vec<FileChange> {
117        let mut changes = Vec::new();
118
119        // Drain all pending events
120        loop {
121            match self.receiver.try_recv() {
122                Ok(Ok(event)) => {
123                    changes.extend(Self::process_event(event));
124                }
125                Ok(Err(e)) => {
126                    log::warn!("File watcher error: {e}");
127                }
128                Err(TryRecvError::Empty) => {
129                    // No more events
130                    break;
131                }
132                Err(TryRecvError::Disconnected) => {
133                    log::error!("File watcher channel disconnected");
134                    break;
135                }
136            }
137        }
138
139        changes
140    }
141
142    /// Wait for next file change (blocking)
143    ///
144    /// Blocks until at least one file system event occurs.
145    /// Use this for event-driven processing.
146    ///
147    /// # Returns
148    ///
149    /// Vector of file changes (at least one)
150    ///
151    /// # Errors
152    ///
153    /// Returns an error if the watcher channel is disconnected.
154    pub fn wait_for_change(&self) -> Result<Vec<FileChange>> {
155        // Wait for first event (blocking)
156        let event = self
157            .receiver
158            .recv()
159            .context("File watcher channel disconnected")?
160            .context("File watcher error")?;
161
162        let mut changes = Self::process_event(event);
163
164        // Collect any additional pending events (non-blocking)
165        changes.extend(self.poll_changes());
166
167        Ok(changes)
168    }
169
170    /// Wait for next file change with debouncing
171    ///
172    /// Waits for a file system event, then waits for `debounce_duration`
173    /// to collect any additional rapid-fire events (e.g., from editor saves).
174    ///
175    /// This is useful for editors that save files multiple times in quick succession.
176    ///
177    /// # Arguments
178    ///
179    /// * `debounce_duration` - How long to wait for additional events (typically 100-500ms)
180    ///
181    /// # Returns
182    ///
183    /// Vector of deduplicated file changes
184    ///
185    /// # Errors
186    ///
187    /// Returns [`anyhow::Error`] when the underlying watcher channel disconnects or emits an
188    /// unrecoverable error while collecting events.
189    pub fn wait_with_debounce(&self, debounce_duration: Duration) -> Result<Vec<FileChange>> {
190        // Wait for first event
191        let mut changes = self.wait_for_change()?;
192
193        // Wait for debounce period while draining events
194        changes.extend(self.wait_until(debounce_duration));
195
196        // Deduplicate changes (keep last change per file)
197        Ok(Self::deduplicate_changes(changes))
198    }
199
200    /// Wait for a duration while continuously draining events from the channel
201    ///
202    /// Unlike `std::thread::sleep()`, this actively drains the event channel,
203    /// collecting all events that arrive during the wait period. This is crucial
204    /// for macOS `FSEvents` which may deliver batched notifications.
205    ///
206    /// Returns all file changes collected during the wait period.
207    ///
208    /// Reference: `CI_FAILURE_REMEDIATION_PLAN.md` Section 2 (M-4)
209    #[must_use]
210    pub fn wait_until(&self, duration: Duration) -> Vec<FileChange> {
211        let deadline = std::time::Instant::now() + duration;
212        let mut changes = Vec::new();
213
214        while std::time::Instant::now() < deadline {
215            // Try to receive with a small timeout
216            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
217            let poll_interval = Duration::from_millis(10).min(remaining);
218
219            match self.receiver.recv_timeout(poll_interval) {
220                Ok(Ok(event)) => {
221                    changes.extend(Self::process_event(event));
222                }
223                Ok(Err(e)) => {
224                    log::warn!("File watcher error: {e}");
225                }
226                Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
227                    // Continue waiting
228                }
229                Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
230                    log::error!("File watcher channel disconnected");
231                    break;
232                }
233            }
234        }
235
236        changes
237    }
238
239    /// Get the root path being watched
240    #[must_use]
241    pub fn root_path(&self) -> &Path {
242        &self.root_path
243    }
244
245    /// Process a notify event into file changes
246    ///
247    /// Filters out directory events to only track files, ensuring deterministic behavior
248    /// across platforms (some report directory events, some don't).
249    fn process_event(event: Event) -> Vec<FileChange> {
250        let mut changes = Vec::new();
251
252        match event.kind {
253            EventKind::Create(_) => {
254                for path in event.paths {
255                    // Filter out directory events - only track files
256                    if path.is_file() {
257                        log::debug!("File created: {}", path.display());
258                        changes.push(FileChange::Created(path));
259                    } else {
260                        log::trace!("Ignoring directory creation: {}", path.display());
261                    }
262                }
263            }
264            EventKind::Modify(_) => {
265                for path in event.paths {
266                    // Filter out directory events - only track files
267                    if path.is_file() {
268                        log::debug!("File modified: {}", path.display());
269                        changes.push(FileChange::Modified(path));
270                    } else {
271                        log::trace!("Ignoring directory modification: {}", path.display());
272                    }
273                }
274            }
275            EventKind::Remove(_) => {
276                for path in event.paths {
277                    // For deletions, path no longer exists, so we can't check is_file()
278                    // We rely on the path extension or accept all Remove events
279                    // (directories are rare to delete and won't hurt index)
280                    log::debug!("File deleted: {}", path.display());
281                    changes.push(FileChange::Deleted(path));
282                }
283            }
284            _ => {
285                // Ignore other event types (access, metadata, etc.)
286            }
287        }
288
289        changes
290    }
291
292    /// Deduplicate file changes (keep last change per file)
293    ///
294    /// When a file is modified multiple times rapidly, we only care about the final state.
295    fn deduplicate_changes(changes: Vec<FileChange>) -> Vec<FileChange> {
296        use std::collections::HashMap;
297
298        let mut map: HashMap<PathBuf, FileChange> = HashMap::new();
299
300        for change in changes {
301            let path = match &change {
302                FileChange::Created(p) | FileChange::Modified(p) | FileChange::Deleted(p) => {
303                    p.clone()
304                }
305            };
306
307            map.insert(path, change);
308        }
309
310        map.into_values().collect()
311    }
312}
313
314#[cfg(test)]
315mod tests {
316    use super::*;
317    use std::fs;
318    use std::thread;
319    use std::time::{Duration, Instant};
320    use tempfile::TempDir;
321
322    fn event_timeout() -> Duration {
323        // CI environments need more generous timeouts due to resource constraints
324        let base = if cfg!(target_os = "macos") {
325            Duration::from_secs(3)
326        } else {
327            Duration::from_secs(2) // Increased from 1s for CI stability
328        };
329
330        // Double timeout in CI environment
331        if std::env::var("CI").is_ok() {
332            base * 2
333        } else {
334            base
335        }
336    }
337
338    fn wait_for<F>(timeout: Duration, mut predicate: F) -> bool
339    where
340        F: FnMut() -> bool,
341    {
342        let deadline = Instant::now() + timeout;
343        loop {
344            if predicate() {
345                return true;
346            }
347            if Instant::now() >= deadline {
348                return false;
349            }
350            thread::sleep(Duration::from_millis(50));
351        }
352    }
353
354    #[test]
355    fn test_watcher_creation() {
356        let tmp_watch_workspace = TempDir::new().unwrap();
357        let watcher = FileWatcher::new(tmp_watch_workspace.path());
358        assert!(watcher.is_ok());
359    }
360
361    #[test]
362    #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
363    fn test_watcher_detects_file_creation() {
364        let tmp_watch_workspace = TempDir::new().unwrap();
365        let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
366
367        // Create a file
368        let file_path = tmp_watch_workspace.path().join("test.txt");
369        fs::write(&file_path, "test content").unwrap();
370
371        let detected = wait_for(event_timeout(), || {
372            let changes = watcher.poll_changes();
373            changes
374                .iter()
375                .any(|c| matches!(c, FileChange::Created(p) if p == &file_path))
376        });
377
378        assert!(detected, "Expected FileWatcher to detect file creation");
379    }
380
381    #[test]
382    #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
383    fn test_watcher_detects_file_modification() {
384        let tmp_watch_workspace = TempDir::new().unwrap();
385        let file_path = tmp_watch_workspace.path().join("test.txt");
386
387        // Create file before starting watcher
388        fs::write(&file_path, "initial content").unwrap();
389
390        let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
391
392        // Give watcher time to initialize
393        thread::sleep(Duration::from_millis(50));
394
395        // Modify the file
396        fs::write(&file_path, "modified content").unwrap();
397
398        let detected = wait_for(event_timeout(), || {
399            let changes = watcher.poll_changes();
400            changes
401                .iter()
402                .any(|c| matches!(c, FileChange::Modified(p) if p == &file_path))
403        });
404
405        assert!(detected, "Expected FileWatcher to detect file modification");
406    }
407
408    #[test]
409    #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
410    fn test_watcher_detects_file_deletion() {
411        let tmp_watch_workspace = TempDir::new().unwrap();
412        let file_path = tmp_watch_workspace.path().join("test.txt");
413
414        // Create file
415        fs::write(&file_path, "test content").unwrap();
416
417        let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
418
419        // Give watcher time to initialize
420        thread::sleep(Duration::from_millis(50));
421
422        // Delete the file
423        fs::remove_file(&file_path).unwrap();
424
425        let detected = wait_for(event_timeout(), || {
426            let changes = watcher.poll_changes();
427            changes
428                .iter()
429                .any(|c| matches!(c, FileChange::Deleted(p) if p == &file_path))
430        });
431
432        assert!(detected, "Expected FileWatcher to detect file deletion");
433    }
434
435    #[test]
436    fn test_watcher_poll_returns_empty_when_no_changes() {
437        let tmp_watch_workspace = TempDir::new().unwrap();
438        let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
439
440        // Poll without making any changes
441        let changes = watcher.poll_changes();
442
443        // Should return empty vector
444        assert!(changes.is_empty());
445    }
446
447    #[test]
448    #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
449    fn test_watcher_ignores_directories() {
450        let tmp_watch_workspace = TempDir::new().unwrap();
451        let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
452
453        // Create a subdirectory
454        let sub_dir = tmp_watch_workspace.path().join("subdir");
455        fs::create_dir(&sub_dir).unwrap();
456
457        // Give the watcher time
458        thread::sleep(Duration::from_millis(100));
459
460        // Poll for changes
461        let changes = watcher.poll_changes();
462
463        // Should not report directory creation (only files)
464        // With the fix, process_event now filters out directory events using is_file()
465        assert!(
466            changes.is_empty(),
467            "Watcher should not report directory creation events, found: {changes:?}"
468        );
469
470        // Also test that creating a file inside the directory IS detected
471        let file_path = sub_dir.join("test.txt");
472        fs::write(&file_path, "test").unwrap();
473
474        let detected = wait_for(event_timeout(), || {
475            let changes = watcher.poll_changes();
476            changes
477                .iter()
478                .any(|c| matches!(c, FileChange::Created(p) if p == &file_path))
479        });
480
481        assert!(
482            detected,
483            "Expected watcher to detect file creation in subdirectory"
484        );
485    }
486
487    #[test]
488    fn test_deduplicate_changes() {
489        let changes = vec![
490            FileChange::Modified(PathBuf::from("file1.txt")),
491            FileChange::Modified(PathBuf::from("file1.txt")), // duplicate
492            FileChange::Created(PathBuf::from("file2.txt")),
493            FileChange::Modified(PathBuf::from("file1.txt")), // another duplicate
494        ];
495
496        let deduped = FileWatcher::deduplicate_changes(changes);
497
498        // Should have 2 unique files
499        assert_eq!(deduped.len(), 2);
500
501        // file1.txt should only appear once (last modification wins)
502        assert_eq!(
503            deduped
504                .iter()
505                .filter(|c| matches!(c, FileChange::Modified(p) if p == Path::new("file1.txt")))
506                .count(),
507            1
508        );
509
510        // file2.txt should appear once
511        assert_eq!(
512            deduped
513                .iter()
514                .filter(|c| matches!(c, FileChange::Created(p) if p == Path::new("file2.txt")))
515                .count(),
516            1
517        );
518    }
519}