Skip to main content

opendev_runtime/
file_watcher.rs

1//! File watcher using the `notify` crate for native filesystem event detection.
2//!
3//! Monitors a working directory for file changes using OS-level filesystem
4//! notifications (FSEvents on macOS, inotify on Linux, ReadDirectoryChanges on
5//! Windows). Events are debounced with a configurable interval (default 500ms).
6//! Includes an inactivity timeout that stops watching after a configurable
7//! period of no detected changes (default 5 minutes).
8
9use std::path::{Path, PathBuf};
10use std::sync::Arc;
11use std::time::Duration;
12
13use notify::RecursiveMode;
14use notify_debouncer_mini::new_debouncer;
15use tokio::sync::mpsc;
16use tracing::{debug, info, warn};
17
18/// Default debounce interval for filesystem events.
19const DEFAULT_DEBOUNCE: Duration = Duration::from_millis(500);
20
21/// Default inactivity timeout before the watcher shuts down.
22const DEFAULT_INACTIVITY_TIMEOUT: Duration = Duration::from_secs(300); // 5 minutes
23
24/// Directories to ignore by default.
25const DEFAULT_IGNORE_DIRS: &[&str] = &[".git", "target", "node_modules", ".opendev", ".DS_Store"];
26
27/// A file change detected by the watcher.
28#[derive(Debug, Clone)]
29pub struct FileChange {
30    /// Path to the changed file.
31    pub path: PathBuf,
32    /// The kind of change detected.
33    pub kind: FileChangeKind,
34}
35
36/// The type of file change.
37#[derive(Debug, Clone, PartialEq, Eq)]
38pub enum FileChangeKind {
39    /// A file was created (new file appeared).
40    Created,
41    /// A file was modified (mtime changed).
42    Modified,
43    /// A file was deleted (previously tracked file is gone).
44    Deleted,
45}
46
47/// Configuration for the [`FileWatcher`].
48#[derive(Debug, Clone)]
49pub struct FileWatcherConfig {
50    /// How long to debounce filesystem events before emitting.
51    pub debounce: Duration,
52    /// How long without changes before the watcher stops.
53    pub inactivity_timeout: Duration,
54    /// Directory names to ignore (e.g., ".git", "target").
55    pub ignore_patterns: Vec<String>,
56}
57
58impl Default for FileWatcherConfig {
59    fn default() -> Self {
60        Self {
61            debounce: DEFAULT_DEBOUNCE,
62            inactivity_timeout: DEFAULT_INACTIVITY_TIMEOUT,
63            ignore_patterns: DEFAULT_IGNORE_DIRS
64                .iter()
65                .map(|s| (*s).to_string())
66                .collect(),
67        }
68    }
69}
70
71/// Monitors a working directory for file changes using OS-native filesystem
72/// notifications via the `notify` crate.
73///
74/// The watcher runs as an async task and sends [`FileChange`] events through
75/// a channel. It automatically stops after a configurable inactivity timeout.
76pub struct FileWatcher {
77    /// Root directory to watch.
78    root: PathBuf,
79    /// Configuration.
80    config: FileWatcherConfig,
81    /// Cancel token to stop the watcher externally.
82    cancel: tokio::sync::watch::Sender<bool>,
83}
84
85impl FileWatcher {
86    /// Create a new file watcher for the given directory.
87    pub fn new(root: impl Into<PathBuf>, config: FileWatcherConfig) -> Self {
88        let (cancel, _) = tokio::sync::watch::channel(false);
89        Self {
90            root: root.into(),
91            config,
92            cancel,
93        }
94    }
95
96    /// Create a watcher with default configuration.
97    pub fn with_defaults(root: impl Into<PathBuf>) -> Self {
98        Self::new(root, FileWatcherConfig::default())
99    }
100
101    /// Start watching and return a receiver for file changes.
102    ///
103    /// The watcher runs in a background tokio task. It will stop when:
104    /// - The inactivity timeout is reached (no changes detected)
105    /// - [`stop`] is called
106    /// - The `FileWatcher` is dropped
107    pub fn start(&self) -> mpsc::UnboundedReceiver<FileChange> {
108        let (tx, rx) = mpsc::unbounded_channel();
109        let root = self.root.clone();
110        let config = self.config.clone();
111        let mut cancel_rx = self.cancel.subscribe();
112
113        tokio::spawn(async move {
114            // Create a std mpsc channel for the notify debouncer callback.
115            let (notify_tx, notify_rx) = std::sync::mpsc::channel();
116
117            let debouncer = new_debouncer(config.debounce, move |result| {
118                let _ = notify_tx.send(result);
119            });
120
121            let mut debouncer = match debouncer {
122                Ok(d) => d,
123                Err(e) => {
124                    warn!(error = %e, "Failed to create file watcher");
125                    return;
126                }
127            };
128
129            if let Err(e) = debouncer.watcher().watch(&root, RecursiveMode::Recursive) {
130                warn!(
131                    root = %root.display(),
132                    error = %e,
133                    "Failed to start watching directory"
134                );
135                return;
136            }
137
138            let ignore_patterns = Arc::new(config.ignore_patterns);
139
140            info!(
141                root = %root.display(),
142                "FileWatcher started (notify)"
143            );
144
145            let mut last_change = tokio::time::Instant::now();
146            let mut check_interval = tokio::time::interval(Duration::from_millis(100));
147
148            loop {
149                tokio::select! {
150                    _ = check_interval.tick() => {
151                        // Check inactivity timeout
152                        if last_change.elapsed() >= config.inactivity_timeout {
153                            info!(
154                                timeout_secs = config.inactivity_timeout.as_secs(),
155                                "FileWatcher stopped: inactivity timeout"
156                            );
157                            break;
158                        }
159
160                        // Drain all available events from the notify channel
161                        while let Ok(result) = notify_rx.try_recv() {
162                            match result {
163                                Ok(events) => {
164                                    for event in events {
165                                        let path = &event.path;
166
167                                        // Skip paths containing ignored directory names
168                                        if should_ignore(path, &ignore_patterns) {
169                                            continue;
170                                        }
171
172                                        let kind = if path.exists() {
173                                            FileChangeKind::Modified
174                                        } else {
175                                            FileChangeKind::Deleted
176                                        };
177
178                                        last_change = tokio::time::Instant::now();
179                                        debug!(
180                                            path = %path.display(),
181                                            kind = ?kind,
182                                            "File change detected"
183                                        );
184
185                                        if tx.send(FileChange {
186                                            path: path.clone(),
187                                            kind,
188                                        }).is_err() {
189                                            debug!("FileWatcher channel closed, stopping");
190                                            return;
191                                        }
192                                    }
193                                }
194                                Err(errors) => {
195                                    warn!(errors = ?errors, "File watcher errors");
196                                }
197                            }
198                        }
199                    }
200                    result = cancel_rx.changed() => {
201                        if result.is_err() || *cancel_rx.borrow() {
202                            info!("FileWatcher stopped: cancelled");
203                            break;
204                        }
205                    }
206                }
207            }
208
209            // debouncer is dropped here, which stops the native watcher
210        });
211
212        rx
213    }
214
215    /// Stop the watcher.
216    pub fn stop(&self) {
217        let _ = self.cancel.send(true);
218    }
219}
220
221impl Drop for FileWatcher {
222    fn drop(&mut self) {
223        self.stop();
224    }
225}
226
227/// Check whether a path should be ignored based on the ignore patterns.
228fn should_ignore(path: &Path, ignore_patterns: &[String]) -> bool {
229    for component in path.components() {
230        let name = component.as_os_str().to_string_lossy();
231        if ignore_patterns.iter().any(|p| name.as_ref() == p.as_str()) {
232            return true;
233        }
234    }
235    false
236}
237
238#[cfg(test)]
239#[path = "file_watcher_tests.rs"]
240mod tests;