Skip to main content

mago_database/
watcher.rs

1//! Database watcher for real-time file change monitoring.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::mem::ManuallyDrop;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::mpsc;
10use std::sync::mpsc::Receiver;
11use std::sync::mpsc::RecvTimeoutError;
12use std::time::Duration;
13
14use globset::GlobBuilder;
15use globset::GlobSet;
16use globset::GlobSetBuilder;
17use notify::Config;
18use notify::Event;
19use notify::EventKind;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher as NotifyWatcher;
23use notify::event::ModifyKind;
24
25use crate::Database;
26use crate::DatabaseReader;
27use crate::ReadDatabase;
28use crate::error::DatabaseError;
29use crate::exclusion::Exclusion;
30use crate::file::File;
31use crate::file::FileId;
32use crate::file::FileType;
33
34const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
35const WAIT_INTERNAL_MS: u64 = 100;
36const WAIT_DEBOUNCE_MS: u64 = 300;
37const STABILITY_CHECK_MS: u64 = 10;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40struct ChangedFile {
41    id: FileId,
42    path: PathBuf,
43}
44
45/// Options for configuring the file system watcher.
46#[derive(Debug, Clone)]
47pub struct WatchOptions {
48    pub poll_interval: Option<Duration>,
49    pub additional_excludes: Vec<Exclusion<'static>>,
50}
51
52impl Default for WatchOptions {
53    #[inline]
54    fn default() -> Self {
55        Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
56    }
57}
58
59/// Database watcher service that monitors file changes and updates the database.
60pub struct DatabaseWatcher<'config> {
61    database: Database<'config>,
62    watcher: Option<RecommendedWatcher>,
63    watched_paths: Vec<PathBuf>,
64    receiver: Option<Receiver<Vec<ChangedFile>>>,
65}
66
67impl<'config> DatabaseWatcher<'config> {
68    #[inline]
69    #[must_use]
70    pub fn new(database: Database<'config>) -> Self {
71        Self { database, watcher: None, watched_paths: Vec::new(), receiver: None }
72    }
73
74    /// Starts watching for file changes in the configured directories.
75    ///
76    /// # Errors
77    ///
78    /// Returns a [`DatabaseError`] if:
79    /// - A glob pattern is invalid
80    /// - The file system watcher cannot be created
81    /// - Directories cannot be watched
82    #[inline]
83    pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
84        self.stop();
85
86        let config = &self.database.configuration;
87
88        let (tx, rx) = mpsc::channel();
89
90        let mut all_exclusions = vec![
91            Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
92            Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
93            Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
94            Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
95        ];
96        all_exclusions.extend(config.excludes.iter().cloned());
97        all_exclusions.extend(options.additional_excludes);
98
99        let glob_settings = &config.glob;
100        let mut glob_builder = GlobSetBuilder::new();
101        for ex in &all_exclusions {
102            if let Exclusion::Pattern(pat) = ex {
103                let glob = GlobBuilder::new(pat)
104                    .case_insensitive(glob_settings.case_insensitive)
105                    .literal_separator(glob_settings.literal_separator)
106                    .backslash_escape(glob_settings.backslash_escape)
107                    .empty_alternates(glob_settings.empty_alternates)
108                    .build()?;
109                glob_builder.add(glob);
110            }
111        }
112
113        let glob_excludes = glob_builder.build()?;
114
115        let path_excludes: HashSet<PathBuf> = all_exclusions
116            .iter()
117            .filter_map(|ex| match ex {
118                Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
119                Exclusion::Pattern(_) => None,
120            })
121            .collect();
122
123        let extensions: HashSet<String> = config.extensions.iter().map(std::string::ToString::to_string).collect();
124        let workspace = config.workspace.as_ref().to_path_buf();
125
126        // Build the set of explicitly configured watch paths so that events from
127        // these directories are never filtered out by the default glob exclusions
128        // (e.g., a project that explicitly includes `vendor/revolt` should still
129        // receive change events for files under that directory).
130        let mut unique_watch_paths = HashSet::new();
131
132        for path in &config.paths {
133            let watch_path = Self::extract_watch_path(path.as_ref());
134            let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
135
136            unique_watch_paths.insert(absolute_path);
137        }
138
139        for path in &config.includes {
140            let watch_path = Self::extract_watch_path(path.as_ref());
141            let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
142
143            unique_watch_paths.insert(absolute_path);
144        }
145
146        let explicit_watch_paths: Vec<PathBuf> = unique_watch_paths
147            .iter()
148            .filter(|wp| glob_excludes.is_match(wp.as_path()) || path_excludes.contains(wp.as_path()))
149            .cloned()
150            .collect();
151
152        let mut watcher = RecommendedWatcher::new(
153            move |res: Result<Event, notify::Error>| {
154                if let Ok(event) = res
155                    && let Some(changed) = Self::handle_event(
156                        event,
157                        &workspace,
158                        &glob_excludes,
159                        &path_excludes,
160                        &extensions,
161                        &explicit_watch_paths,
162                    )
163                {
164                    let _ = tx.send(changed);
165                }
166            },
167            Config::default()
168                .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
169        )
170        .map_err(DatabaseError::WatcherInit)?;
171
172        let mut watched_paths = Vec::new();
173        for path in unique_watch_paths {
174            watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
175            watched_paths.push(path.clone());
176            tracing::debug!("Watching path: {}", path.display());
177        }
178
179        tracing::info!("Database watcher started for workspace: {}", config.workspace.display());
180
181        self.watcher = Some(watcher);
182        self.watched_paths = watched_paths;
183        self.receiver = Some(rx);
184
185        Ok(())
186    }
187
188    /// Stops watching if currently active.
189    #[inline]
190    pub fn stop(&mut self) {
191        if let Some(mut watcher) = self.watcher.take() {
192            for path in &self.watched_paths {
193                let _ = watcher.unwatch(path);
194                tracing::debug!("Stopped watching: {}", path.display());
195            }
196        }
197        self.watched_paths.clear();
198        self.receiver = None;
199    }
200
201    /// Checks if the watcher is currently active.
202    #[inline]
203    #[must_use]
204    pub fn is_watching(&self) -> bool {
205        self.watcher.is_some()
206    }
207
208    /// Extracts the base directory path from a potentially glob-pattern path.
209    ///
210    /// For glob patterns (containing *, ?, [, {), this returns the directory portion
211    /// before the first glob metacharacter. For regular paths, returns the path as-is.
212    ///
213    /// # Examples
214    ///
215    /// - `"src/**/*.php"` → `"src"`
216    /// - `"lib/*/foo.php"` → `"lib"`
217    /// - `"tests/fixtures"` → `"tests/fixtures"` (unchanged)
218    fn extract_watch_path(pattern: &str) -> PathBuf {
219        let is_glob = pattern.contains('*') || pattern.contains('?') || pattern.contains('[') || pattern.contains('{');
220
221        if !is_glob {
222            return PathBuf::from(pattern);
223        }
224
225        let first_glob_pos = pattern.find(['*', '?', '[', '{']).unwrap_or(pattern.len());
226
227        let base = &pattern[..first_glob_pos];
228
229        let base = base.trim_end_matches('/').trim_end_matches('\\');
230
231        if base.is_empty() { PathBuf::from(".") } else { PathBuf::from(base) }
232    }
233
234    fn handle_event(
235        event: Event,
236        workspace: &Path,
237        glob_excludes: &GlobSet,
238        path_excludes: &HashSet<PathBuf>,
239        extensions: &HashSet<String>,
240        explicit_watch_paths: &[PathBuf],
241    ) -> Option<Vec<ChangedFile>> {
242        tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);
243
244        if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
245            event.kind
246        {
247            tracing::debug!("Ignoring non-modification event: {:?}", event.kind);
248
249            return None;
250        }
251
252        let mut changed_files = Vec::new();
253
254        for path in event.paths {
255            // Check if file has a valid extension
256            if let Some(ext) = path.extension() {
257                if !extensions.contains(ext.to_string_lossy().as_ref()) {
258                    continue;
259                }
260            } else {
261                continue;
262            }
263
264            let is_explicitly_watched = explicit_watch_paths.iter().any(|wp| path.starts_with(wp));
265            if !is_explicitly_watched {
266                // Check glob pattern exclusions
267                if glob_excludes.is_match(&path) {
268                    tracing::debug!("Skipping path excluded by pattern: {}", path.display());
269                    continue;
270                }
271
272                // Check exact path exclusions
273                if path_excludes.contains(&path) {
274                    tracing::debug!("Skipping excluded path: {}", path.display());
275                    continue;
276                }
277
278                // Check if any parent directory is in path_excludes
279                let mut should_skip = false;
280                for ancestor in path.ancestors().skip(1) {
281                    if path_excludes.contains(ancestor) {
282                        tracing::debug!("Skipping path under excluded directory: {}", path.display());
283                        should_skip = true;
284                        break;
285                    }
286                }
287
288                if should_skip {
289                    continue;
290                }
291            }
292
293            // Normalize to forward slashes for cross-platform determinism
294            let logical_name = path.strip_prefix(workspace).unwrap_or(&path).to_string_lossy().replace('\\', "/");
295            let file_id = FileId::new(&logical_name);
296
297            changed_files.push(ChangedFile { id: file_id, path: path.clone() });
298        }
299
300        if changed_files.is_empty() { None } else { Some(changed_files) }
301    }
302
303    /// Waits for file changes and updates the database.
304    ///
305    /// This method blocks until file changes are detected, then updates the database
306    /// in place and returns the IDs of changed files.
307    ///
308    /// # Errors
309    ///
310    /// Returns a [`DatabaseError`] if:
311    /// - The watcher is not currently active ([`DatabaseError::WatcherNotActive`])
312    /// - Updating the database with changed files fails
313    #[inline]
314    pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
315        let Some(receiver) = &self.receiver else {
316            return Err(DatabaseError::WatcherNotActive);
317        };
318
319        let config = &self.database.configuration;
320        let workspace = config.workspace.as_ref().to_path_buf();
321
322        match receiver.recv_timeout(Duration::from_millis(WAIT_INTERNAL_MS)) {
323            Ok(changed_files) => {
324                let mut all_changed = changed_files;
325                loop {
326                    match receiver.recv_timeout(Duration::from_millis(WAIT_DEBOUNCE_MS)) {
327                        Ok(more) => all_changed.extend(more),
328                        Err(RecvTimeoutError::Timeout) => break,
329                        Err(RecvTimeoutError::Disconnected) => {
330                            self.stop();
331                            return Err(DatabaseError::WatcherNotActive);
332                        }
333                    }
334                }
335
336                let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
337                for changed in all_changed {
338                    latest_changes.insert(changed.id, changed);
339                }
340                let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
341                let mut changed_ids = Vec::new();
342
343                for changed_file in &all_changed {
344                    changed_ids.push(changed_file.id);
345
346                    let Ok(file) = self.database.get(&changed_file.id) else {
347                        if changed_file.path.exists() {
348                            match File::read(&workspace, &changed_file.path, FileType::Host) {
349                                Ok(file) => {
350                                    self.database.add(file);
351                                    tracing::debug!("Added new file to database: {}", changed_file.path.display());
352                                }
353                                Err(e) => {
354                                    tracing::error!("Failed to load new file {}: {}", changed_file.path.display(), e);
355                                }
356                            }
357                        }
358
359                        continue;
360                    };
361
362                    if !changed_file.path.exists() {
363                        self.database.delete(changed_file.id);
364                        tracing::trace!("Deleted file from database: {}", file.name);
365                        continue;
366                    }
367
368                    match Self::read_stable_contents(&changed_file.path) {
369                        Ok(contents) => {
370                            if self.database.update(changed_file.id, Cow::Owned(contents)) {
371                                tracing::trace!("Updated file in database: {}", file.name);
372                            } else {
373                                tracing::warn!("Failed to update file in database (ID not found): {}", file.name);
374                            }
375                        }
376                        Err(e) => {
377                            tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
378                        }
379                    }
380                }
381
382                Ok(changed_ids)
383            }
384            Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
385            Err(RecvTimeoutError::Disconnected) => {
386                self.stop();
387                Err(DatabaseError::WatcherNotActive)
388            }
389        }
390    }
391
392    /// Reads file contents with a stability check to handle partial writes.
393    ///
394    /// Some IDEs and formatters write files in multiple steps (save, then format).
395    /// This method reads the file, waits briefly, and re-reads to ensure the content
396    /// has stabilized before returning.
397    fn read_stable_contents(path: &Path) -> std::io::Result<String> {
398        let contents = std::fs::read_to_string(path)?;
399
400        std::thread::sleep(Duration::from_millis(STABILITY_CHECK_MS));
401
402        if path.exists()
403            && let Ok(reread) = std::fs::read_to_string(path)
404            && reread != contents
405        {
406            tracing::debug!("File content changed during stability check: {}", path.display());
407
408            return Ok(reread);
409        }
410
411        Ok(contents)
412    }
413
414    /// Returns a reference to the database.
415    #[inline]
416    #[must_use]
417    pub fn database(&self) -> &Database<'config> {
418        &self.database
419    }
420
421    /// Returns a reference to the database.
422    #[inline]
423    #[must_use]
424    pub fn read_only_database(&self) -> ReadDatabase {
425        self.database.read_only()
426    }
427
428    /// Returns a mutable reference to the database.
429    #[inline]
430    pub fn database_mut(&mut self) -> &mut Database<'config> {
431        &mut self.database
432    }
433
434    /// Provides temporary mutable access to the database through a closure.
435    ///
436    /// This method helps Rust's borrow checker understand that the mutable borrow
437    /// of the database is scoped to just the closure execution, allowing the watcher
438    /// to be used again after the closure returns.
439    ///
440    /// The closure is bounded with for<'x> to explicitly show that the database
441    /// reference lifetime is scoped to the closure execution only.
442    #[inline]
443    pub fn with_database_mut<F, R>(&mut self, f: F) -> R
444    where
445        F: for<'borrow> FnOnce(&'borrow mut Database<'config>) -> R,
446    {
447        f(&mut self.database)
448    }
449
450    /// Consumes the watcher and returns the database.
451    #[inline]
452    #[must_use]
453    pub fn into_database(self) -> Database<'config> {
454        let mut md = ManuallyDrop::new(self);
455        md.stop();
456        // SAFETY: `md` is a `ManuallyDrop<Self>`, so its `Drop` impl will not run; reading the
457        // `database` field byte-for-byte is safe because we never read or drop it again.
458        unsafe { std::ptr::read(&raw const md.database) }
459    }
460}
461
462impl Drop for DatabaseWatcher<'_> {
463    #[inline]
464    fn drop(&mut self) {
465        self.stop();
466    }
467}