Skip to main content

sqry_core/
watch.rs

1//! File system watcher for real-time index updates.
2//!
3//! This module provides cross-platform file system monitoring using OS-level APIs:
4//! - **Linux**: inotify (kernel-level, < 1ms latency)
5//! - **macOS**: FSEvents (Apple's file system monitoring)
6//! - **Windows**: ReadDirectoryChangesW
7//!
8//! The watcher detects file changes in real-time and enables "watch mode" for
9//! automatic index updates during development.
10//!
11//! # Usage
12//!
13//! ```rust,ignore
14//! use sqry_core::watch::FileWatcher;
15//! use std::path::Path;
16//!
17//! // Create watcher for a directory
18//! let mut watcher = FileWatcher::new(Path::new("src"))?;
19//!
20//! // Poll for changes (non-blocking)
21//! let changes = watcher.poll_changes();
22//! for change in changes {
23//!     match change {
24//!         FileChange::Created(path) => println!("Created: {:?}", path),
25//!         FileChange::Modified(path) => println!("Modified: {:?}", path),
26//!         FileChange::Deleted(path) => println!("Deleted: {:?}", path),
27//!     }
28//! }
29//!
30//! // Wait for next change (blocking)
31//! let changes = watcher.wait_for_change()?;
32//! ```
33
34use anyhow::{Context, Result};
35use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcher};
36use std::path::{Path, PathBuf};
37use std::sync::mpsc::{Receiver, TryRecvError, channel};
38use std::time::Duration;
39
40/// Type of file system change
41#[derive(Debug, Clone, PartialEq, Eq)]
42pub enum FileChange {
43    /// File was created
44    Created(PathBuf),
45    /// File was modified
46    Modified(PathBuf),
47    /// File was deleted
48    Deleted(PathBuf),
49}
50
51/// Cross-platform file system watcher
52///
53/// Uses OS-level APIs for efficient real-time file monitoring:
54/// - Linux: inotify
55/// - macOS: `FSEvents`
56/// - Windows: `ReadDirectoryChangesW`
57pub struct FileWatcher {
58    /// Underlying notify watcher
59    _watcher: RecommendedWatcher,
60    /// Channel for receiving file system events
61    receiver: Receiver<Result<Event, notify::Error>>,
62    /// Root path being watched
63    root_path: PathBuf,
64}
65
66impl FileWatcher {
67    /// Create a new file watcher for a directory
68    ///
69    /// The watcher monitors all files recursively under `root_path`.
70    ///
71    /// # Arguments
72    ///
73    /// * `root_path` - Root directory to watch
74    ///
75    /// # Errors
76    ///
77    /// Returns an error if the watcher cannot be created or the path cannot be watched.
78    pub fn new(root_path: &Path) -> Result<Self> {
79        let (tx, rx) = channel();
80
81        let mut watcher = notify::recommended_watcher(move |res| {
82            // Send event to channel (ignore send errors if receiver dropped)
83            let _ = tx.send(res);
84        })
85        .context("Failed to create file system watcher")?;
86
87        // Start watching the directory recursively
88        watcher
89            .watch(root_path, RecursiveMode::Recursive)
90            .with_context(|| format!("Failed to watch directory: {}", root_path.display()))?;
91
92        log::info!("File watcher started for: {}", root_path.display());
93
94        Ok(Self {
95            _watcher: watcher,
96            receiver: rx,
97            root_path: root_path.to_path_buf(),
98        })
99    }
100
101    /// Poll for file changes (non-blocking)
102    ///
103    /// Returns all pending file system events without blocking.
104    /// Use this for periodic polling in a loop.
105    ///
106    /// # Returns
107    ///
108    /// Vector of file changes (empty if no changes)
109    #[must_use]
110    pub fn poll_changes(&self) -> Vec<FileChange> {
111        let mut changes = Vec::new();
112
113        // Drain all pending events
114        loop {
115            match self.receiver.try_recv() {
116                Ok(Ok(event)) => {
117                    changes.extend(Self::process_event(event));
118                }
119                Ok(Err(e)) => {
120                    log::warn!("File watcher error: {e}");
121                }
122                Err(TryRecvError::Empty) => {
123                    // No more events
124                    break;
125                }
126                Err(TryRecvError::Disconnected) => {
127                    log::error!("File watcher channel disconnected");
128                    break;
129                }
130            }
131        }
132
133        changes
134    }
135
136    /// Wait for next file change (blocking)
137    ///
138    /// Blocks until at least one file system event occurs.
139    /// Use this for event-driven processing.
140    ///
141    /// # Returns
142    ///
143    /// Vector of file changes (at least one)
144    ///
145    /// # Errors
146    ///
147    /// Returns an error if the watcher channel is disconnected.
148    pub fn wait_for_change(&self) -> Result<Vec<FileChange>> {
149        // Wait for first event (blocking)
150        let event = self
151            .receiver
152            .recv()
153            .context("File watcher channel disconnected")?
154            .context("File watcher error")?;
155
156        let mut changes = Self::process_event(event);
157
158        // Collect any additional pending events (non-blocking)
159        changes.extend(self.poll_changes());
160
161        Ok(changes)
162    }
163
164    /// Wait for next file change with debouncing
165    ///
166    /// Waits for a file system event, then waits for `debounce_duration`
167    /// to collect any additional rapid-fire events (e.g., from editor saves).
168    ///
169    /// This is useful for editors that save files multiple times in quick succession.
170    ///
171    /// # Arguments
172    ///
173    /// * `debounce_duration` - How long to wait for additional events (typically 100-500ms)
174    ///
175    /// # Returns
176    ///
177    /// Vector of deduplicated file changes
178    ///
179    /// # Errors
180    ///
181    /// Returns [`anyhow::Error`] when the underlying watcher channel disconnects or emits an
182    /// unrecoverable error while collecting events.
183    pub fn wait_with_debounce(&self, debounce_duration: Duration) -> Result<Vec<FileChange>> {
184        // Wait for first event
185        let mut changes = self.wait_for_change()?;
186
187        // Wait for debounce period while draining events
188        changes.extend(self.wait_until(debounce_duration));
189
190        // Deduplicate changes (keep last change per file)
191        Ok(Self::deduplicate_changes(changes))
192    }
193
194    /// Wait for a duration while continuously draining events from the channel
195    ///
196    /// Unlike `std::thread::sleep()`, this actively drains the event channel,
197    /// collecting all events that arrive during the wait period. This is crucial
198    /// for macOS `FSEvents` which may deliver batched notifications.
199    ///
200    /// Returns all file changes collected during the wait period.
201    ///
202    /// Reference: `CI_FAILURE_REMEDIATION_PLAN.md` Section 2 (M-4)
203    #[must_use]
204    pub fn wait_until(&self, duration: Duration) -> Vec<FileChange> {
205        let deadline = std::time::Instant::now() + duration;
206        let mut changes = Vec::new();
207
208        while std::time::Instant::now() < deadline {
209            // Try to receive with a small timeout
210            let remaining = deadline.saturating_duration_since(std::time::Instant::now());
211            let poll_interval = Duration::from_millis(10).min(remaining);
212
213            match self.receiver.recv_timeout(poll_interval) {
214                Ok(Ok(event)) => {
215                    changes.extend(Self::process_event(event));
216                }
217                Ok(Err(e)) => {
218                    log::warn!("File watcher error: {e}");
219                }
220                Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
221                    // Continue waiting
222                }
223                Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
224                    log::error!("File watcher channel disconnected");
225                    break;
226                }
227            }
228        }
229
230        changes
231    }
232
233    /// Get the root path being watched
234    #[must_use]
235    pub fn root_path(&self) -> &Path {
236        &self.root_path
237    }
238
239    /// Process a notify event into file changes
240    ///
241    /// Filters out directory events to only track files, ensuring deterministic behavior
242    /// across platforms (some report directory events, some don't).
243    fn process_event(event: Event) -> Vec<FileChange> {
244        let mut changes = Vec::new();
245
246        match event.kind {
247            EventKind::Create(_) => {
248                for path in event.paths {
249                    // Filter out directory events - only track files
250                    if path.is_file() {
251                        log::debug!("File created: {}", path.display());
252                        changes.push(FileChange::Created(path));
253                    } else {
254                        log::trace!("Ignoring directory creation: {}", path.display());
255                    }
256                }
257            }
258            EventKind::Modify(_) => {
259                for path in event.paths {
260                    // Filter out directory events - only track files
261                    if path.is_file() {
262                        log::debug!("File modified: {}", path.display());
263                        changes.push(FileChange::Modified(path));
264                    } else {
265                        log::trace!("Ignoring directory modification: {}", path.display());
266                    }
267                }
268            }
269            EventKind::Remove(_) => {
270                for path in event.paths {
271                    // For deletions, path no longer exists, so we can't check is_file()
272                    // We rely on the path extension or accept all Remove events
273                    // (directories are rare to delete and won't hurt index)
274                    log::debug!("File deleted: {}", path.display());
275                    changes.push(FileChange::Deleted(path));
276                }
277            }
278            _ => {
279                // Ignore other event types (access, metadata, etc.)
280            }
281        }
282
283        changes
284    }
285
286    /// Deduplicate file changes (keep last change per file)
287    ///
288    /// When a file is modified multiple times rapidly, we only care about the final state.
289    fn deduplicate_changes(changes: Vec<FileChange>) -> Vec<FileChange> {
290        use std::collections::HashMap;
291
292        let mut map: HashMap<PathBuf, FileChange> = HashMap::new();
293
294        for change in changes {
295            let path = match &change {
296                FileChange::Created(p) | FileChange::Modified(p) | FileChange::Deleted(p) => {
297                    p.clone()
298                }
299            };
300
301            map.insert(path, change);
302        }
303
304        map.into_values().collect()
305    }
306}
307
308#[cfg(test)]
309mod tests {
310    use super::*;
311    use std::fs;
312    use std::thread;
313    use std::time::{Duration, Instant};
314    use tempfile::TempDir;
315
316    fn event_timeout() -> Duration {
317        // CI environments need more generous timeouts due to resource constraints
318        let base = if cfg!(target_os = "macos") {
319            Duration::from_secs(3)
320        } else {
321            Duration::from_secs(2) // Increased from 1s for CI stability
322        };
323
324        // Double timeout in CI environment
325        if std::env::var("CI").is_ok() {
326            base * 2
327        } else {
328            base
329        }
330    }
331
332    fn wait_for<F>(timeout: Duration, mut predicate: F) -> bool
333    where
334        F: FnMut() -> bool,
335    {
336        let deadline = Instant::now() + timeout;
337        loop {
338            if predicate() {
339                return true;
340            }
341            if Instant::now() >= deadline {
342                return false;
343            }
344            thread::sleep(Duration::from_millis(50));
345        }
346    }
347
348    #[test]
349    fn test_watcher_creation() {
350        let tmp_watch_workspace = TempDir::new().unwrap();
351        let watcher = FileWatcher::new(tmp_watch_workspace.path());
352        assert!(watcher.is_ok());
353    }
354
355    #[test]
356    #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
357    fn test_watcher_detects_file_creation() {
358        let tmp_watch_workspace = TempDir::new().unwrap();
359        let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
360
361        // Create a file
362        let file_path = tmp_watch_workspace.path().join("test.txt");
363        fs::write(&file_path, "test content").unwrap();
364
365        let detected = wait_for(event_timeout(), || {
366            let changes = watcher.poll_changes();
367            changes
368                .iter()
369                .any(|c| matches!(c, FileChange::Created(p) if p == &file_path))
370        });
371
372        assert!(detected, "Expected FileWatcher to detect file creation");
373    }
374
375    #[test]
376    #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
377    fn test_watcher_detects_file_modification() {
378        let tmp_watch_workspace = TempDir::new().unwrap();
379        let file_path = tmp_watch_workspace.path().join("test.txt");
380
381        // Create file before starting watcher
382        fs::write(&file_path, "initial content").unwrap();
383
384        let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
385
386        // Give watcher time to initialize
387        thread::sleep(Duration::from_millis(50));
388
389        // Modify the file
390        fs::write(&file_path, "modified content").unwrap();
391
392        let detected = wait_for(event_timeout(), || {
393            let changes = watcher.poll_changes();
394            changes
395                .iter()
396                .any(|c| matches!(c, FileChange::Modified(p) if p == &file_path))
397        });
398
399        assert!(detected, "Expected FileWatcher to detect file modification");
400    }
401
402    #[test]
403    #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
404    fn test_watcher_detects_file_deletion() {
405        let tmp_watch_workspace = TempDir::new().unwrap();
406        let file_path = tmp_watch_workspace.path().join("test.txt");
407
408        // Create file
409        fs::write(&file_path, "test content").unwrap();
410
411        let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
412
413        // Give watcher time to initialize
414        thread::sleep(Duration::from_millis(50));
415
416        // Delete the file
417        fs::remove_file(&file_path).unwrap();
418
419        let detected = wait_for(event_timeout(), || {
420            let changes = watcher.poll_changes();
421            changes
422                .iter()
423                .any(|c| matches!(c, FileChange::Deleted(p) if p == &file_path))
424        });
425
426        assert!(detected, "Expected FileWatcher to detect file deletion");
427    }
428
429    #[test]
430    fn test_watcher_poll_returns_empty_when_no_changes() {
431        let tmp_watch_workspace = TempDir::new().unwrap();
432        let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
433
434        // Poll without making any changes
435        let changes = watcher.poll_changes();
436
437        // Should return empty vector
438        assert!(changes.is_empty());
439    }
440
441    #[test]
442    #[cfg_attr(target_os = "macos", ignore = "FSEvents timing flaky in CI")]
443    fn test_watcher_ignores_directories() {
444        let tmp_watch_workspace = TempDir::new().unwrap();
445        let watcher = FileWatcher::new(tmp_watch_workspace.path()).unwrap();
446
447        // Create a subdirectory
448        let sub_dir = tmp_watch_workspace.path().join("subdir");
449        fs::create_dir(&sub_dir).unwrap();
450
451        // Give the watcher time
452        thread::sleep(Duration::from_millis(100));
453
454        // Poll for changes
455        let changes = watcher.poll_changes();
456
457        // Should not report directory creation (only files)
458        // With the fix, process_event now filters out directory events using is_file()
459        assert!(
460            changes.is_empty(),
461            "Watcher should not report directory creation events, found: {changes:?}"
462        );
463
464        // Also test that creating a file inside the directory IS detected
465        let file_path = sub_dir.join("test.txt");
466        fs::write(&file_path, "test").unwrap();
467
468        let detected = wait_for(event_timeout(), || {
469            let changes = watcher.poll_changes();
470            changes
471                .iter()
472                .any(|c| matches!(c, FileChange::Created(p) if p == &file_path))
473        });
474
475        assert!(
476            detected,
477            "Expected watcher to detect file creation in subdirectory"
478        );
479    }
480
481    #[test]
482    fn test_deduplicate_changes() {
483        let changes = vec![
484            FileChange::Modified(PathBuf::from("file1.txt")),
485            FileChange::Modified(PathBuf::from("file1.txt")), // duplicate
486            FileChange::Created(PathBuf::from("file2.txt")),
487            FileChange::Modified(PathBuf::from("file1.txt")), // another duplicate
488        ];
489
490        let deduped = FileWatcher::deduplicate_changes(changes);
491
492        // Should have 2 unique files
493        assert_eq!(deduped.len(), 2);
494
495        // file1.txt should only appear once (last modification wins)
496        assert_eq!(
497            deduped
498                .iter()
499                .filter(|c| matches!(c, FileChange::Modified(p) if p == Path::new("file1.txt")))
500                .count(),
501            1
502        );
503
504        // file2.txt should appear once
505        assert_eq!(
506            deduped
507                .iter()
508                .filter(|c| matches!(c, FileChange::Created(p) if p == Path::new("file2.txt")))
509                .count(),
510            1
511        );
512    }
513}