Skip to main content

mago_database/
watcher.rs

1//! Database watcher for real-time file change monitoring.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::mem::ManuallyDrop;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::mpsc;
10use std::sync::mpsc::Receiver;
11use std::sync::mpsc::RecvTimeoutError;
12use std::time::Duration;
13
14use globset::GlobBuilder;
15use globset::GlobSet;
16use globset::GlobSetBuilder;
17use notify::Config;
18use notify::Event;
19use notify::EventKind;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher as NotifyWatcher;
23use notify::event::ModifyKind;
24
25use crate::Database;
26use crate::DatabaseReader;
27use crate::ReadDatabase;
28use crate::error::DatabaseError;
29use crate::exclusion::Exclusion;
30use crate::file::File;
31use crate::file::FileId;
32use crate::file::FileType;
33
34const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
35const WAIT_INTERNAL_MS: u64 = 100;
36const WAIT_DEBOUNCE_MS: u64 = 300;
37const STABILITY_CHECK_MS: u64 = 10;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40struct ChangedFile {
41    id: FileId,
42    path: PathBuf,
43}
44
45/// Options for configuring the file system watcher.
46#[derive(Debug, Clone)]
47pub struct WatchOptions {
48    pub poll_interval: Option<Duration>,
49    pub additional_excludes: Vec<Exclusion<'static>>,
50}
51
52impl Default for WatchOptions {
53    fn default() -> Self {
54        Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
55    }
56}
57
58/// Database watcher service that monitors file changes and updates the database.
59pub struct DatabaseWatcher<'a> {
60    database: Database<'a>,
61    watcher: Option<RecommendedWatcher>,
62    watched_paths: Vec<PathBuf>,
63    receiver: Option<Receiver<Vec<ChangedFile>>>,
64}
65
66impl<'a> DatabaseWatcher<'a> {
67    #[must_use]
68    pub fn new(database: Database<'a>) -> Self {
69        Self { database, watcher: None, watched_paths: Vec::new(), receiver: None }
70    }
71
72    /// Starts watching for file changes in the configured directories.
73    ///
74    /// # Errors
75    ///
76    /// Returns a [`DatabaseError`] if:
77    /// - A glob pattern is invalid
78    /// - The file system watcher cannot be created
79    /// - Directories cannot be watched
80    pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
81        self.stop();
82
83        let config = &self.database.configuration;
84
85        let (tx, rx) = mpsc::channel();
86
87        let mut all_exclusions = vec![
88            Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
89            Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
90            Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
91            Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
92        ];
93        all_exclusions.extend(config.excludes.iter().cloned());
94        all_exclusions.extend(options.additional_excludes);
95
96        let glob_settings = &config.glob;
97        let mut glob_builder = GlobSetBuilder::new();
98        for ex in &all_exclusions {
99            if let Exclusion::Pattern(pat) = ex {
100                let glob = GlobBuilder::new(pat)
101                    .case_insensitive(glob_settings.case_insensitive)
102                    .literal_separator(glob_settings.literal_separator)
103                    .backslash_escape(glob_settings.backslash_escape)
104                    .empty_alternates(glob_settings.empty_alternates)
105                    .build()?;
106                glob_builder.add(glob);
107            }
108        }
109
110        let glob_excludes = glob_builder.build()?;
111
112        let path_excludes: HashSet<PathBuf> = all_exclusions
113            .iter()
114            .filter_map(|ex| match ex {
115                Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
116                _ => None,
117            })
118            .collect();
119
120        let extensions: HashSet<String> = config.extensions.iter().map(std::string::ToString::to_string).collect();
121        let workspace = config.workspace.as_ref().to_path_buf();
122
123        // Build the set of explicitly configured watch paths so that events from
124        // these directories are never filtered out by the default glob exclusions
125        // (e.g., a project that explicitly includes `vendor/revolt` should still
126        // receive change events for files under that directory).
127        let mut unique_watch_paths = HashSet::new();
128
129        for path in &config.paths {
130            let watch_path = Self::extract_watch_path(path.as_ref());
131            let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
132
133            unique_watch_paths.insert(absolute_path);
134        }
135
136        for path in &config.includes {
137            let watch_path = Self::extract_watch_path(path.as_ref());
138            let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
139
140            unique_watch_paths.insert(absolute_path);
141        }
142
143        let explicit_watch_paths: Vec<PathBuf> = unique_watch_paths.iter().cloned().collect();
144
145        let mut watcher = RecommendedWatcher::new(
146            move |res: Result<Event, notify::Error>| {
147                if let Ok(event) = res
148                    && let Some(changed) = Self::handle_event(
149                        event,
150                        &workspace,
151                        &glob_excludes,
152                        &path_excludes,
153                        &extensions,
154                        &explicit_watch_paths,
155                    )
156                {
157                    let _ = tx.send(changed);
158                }
159            },
160            Config::default()
161                .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
162        )
163        .map_err(DatabaseError::WatcherInit)?;
164
165        let mut watched_paths = Vec::new();
166        for path in unique_watch_paths {
167            watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
168            watched_paths.push(path.clone());
169            tracing::debug!("Watching path: {}", path.display());
170        }
171
172        tracing::info!("Database watcher started for workspace: {}", config.workspace.display());
173
174        self.watcher = Some(watcher);
175        self.watched_paths = watched_paths;
176        self.receiver = Some(rx);
177
178        Ok(())
179    }
180
181    /// Stops watching if currently active.
182    pub fn stop(&mut self) {
183        if let Some(mut watcher) = self.watcher.take() {
184            for path in &self.watched_paths {
185                let _ = watcher.unwatch(path);
186                tracing::debug!("Stopped watching: {}", path.display());
187            }
188        }
189        self.watched_paths.clear();
190        self.receiver = None;
191    }
192
193    /// Checks if the watcher is currently active.
194    #[must_use]
195    pub fn is_watching(&self) -> bool {
196        self.watcher.is_some()
197    }
198
199    /// Extracts the base directory path from a potentially glob-pattern path.
200    ///
201    /// For glob patterns (containing *, ?, [, {), this returns the directory portion
202    /// before the first glob metacharacter. For regular paths, returns the path as-is.
203    ///
204    /// # Examples
205    ///
206    /// - `"src/**/*.php"` → `"src"`
207    /// - `"lib/*/foo.php"` → `"lib"`
208    /// - `"tests/fixtures"` → `"tests/fixtures"` (unchanged)
209    fn extract_watch_path(pattern: &str) -> PathBuf {
210        let is_glob = pattern.contains('*') || pattern.contains('?') || pattern.contains('[') || pattern.contains('{');
211
212        if !is_glob {
213            return PathBuf::from(pattern);
214        }
215
216        let first_glob_pos = pattern.find(['*', '?', '[', '{']).unwrap_or(pattern.len());
217
218        let base = &pattern[..first_glob_pos];
219
220        let base = base.trim_end_matches('/').trim_end_matches('\\');
221
222        if base.is_empty() { PathBuf::from(".") } else { PathBuf::from(base) }
223    }
224
225    fn handle_event(
226        event: Event,
227        workspace: &Path,
228        glob_excludes: &GlobSet,
229        path_excludes: &HashSet<PathBuf>,
230        extensions: &HashSet<String>,
231        explicit_watch_paths: &[PathBuf],
232    ) -> Option<Vec<ChangedFile>> {
233        tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);
234
235        if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
236            event.kind
237        {
238            tracing::debug!("Ignoring non-modification event: {:?}", event.kind);
239
240            return None;
241        }
242
243        let mut changed_files = Vec::new();
244
245        for path in event.paths {
246            // Check if file has a valid extension
247            if let Some(ext) = path.extension() {
248                if !extensions.contains(ext.to_string_lossy().as_ref()) {
249                    continue;
250                }
251            } else {
252                continue;
253            }
254
255            let is_explicitly_watched = explicit_watch_paths.iter().any(|wp| path.starts_with(wp));
256            if !is_explicitly_watched {
257                // Check glob pattern exclusions
258                if glob_excludes.is_match(&path) {
259                    tracing::debug!("Skipping path excluded by pattern: {}", path.display());
260                    continue;
261                }
262
263                // Check exact path exclusions
264                if path_excludes.contains(&path) {
265                    tracing::debug!("Skipping excluded path: {}", path.display());
266                    continue;
267                }
268
269                // Check if any parent directory is in path_excludes
270                let mut should_skip = false;
271                for ancestor in path.ancestors().skip(1) {
272                    if path_excludes.contains(ancestor) {
273                        tracing::debug!("Skipping path under excluded directory: {}", path.display());
274                        should_skip = true;
275                        break;
276                    }
277                }
278
279                if should_skip {
280                    continue;
281                }
282            }
283
284            // Normalize to forward slashes for cross-platform determinism
285            let logical_name = path.strip_prefix(workspace).unwrap_or(&path).to_string_lossy().replace('\\', "/");
286            let file_id = FileId::new(&logical_name);
287
288            changed_files.push(ChangedFile { id: file_id, path: path.clone() });
289        }
290
291        if changed_files.is_empty() { None } else { Some(changed_files) }
292    }
293
294    /// Waits for file changes and updates the database.
295    ///
296    /// This method blocks until file changes are detected, then updates the database
297    /// in place and returns the IDs of changed files.
298    ///
299    /// # Errors
300    ///
301    /// Returns a [`DatabaseError`] if:
302    /// - The watcher is not currently active ([`DatabaseError::WatcherNotActive`])
303    /// - Updating the database with changed files fails
304    pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
305        let Some(receiver) = &self.receiver else {
306            return Err(DatabaseError::WatcherNotActive);
307        };
308
309        let config = &self.database.configuration;
310        let workspace = config.workspace.as_ref().to_path_buf();
311
312        match receiver.recv_timeout(Duration::from_millis(WAIT_INTERNAL_MS)) {
313            Ok(changed_files) => {
314                let mut all_changed = changed_files;
315                loop {
316                    match receiver.recv_timeout(Duration::from_millis(WAIT_DEBOUNCE_MS)) {
317                        Ok(more) => all_changed.extend(more),
318                        Err(RecvTimeoutError::Timeout) => break,
319                        Err(RecvTimeoutError::Disconnected) => {
320                            self.stop();
321                            return Err(DatabaseError::WatcherNotActive);
322                        }
323                    }
324                }
325
326                let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
327                for changed in all_changed {
328                    latest_changes.insert(changed.id, changed);
329                }
330                let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
331                let mut changed_ids = Vec::new();
332
333                for changed_file in &all_changed {
334                    changed_ids.push(changed_file.id);
335
336                    let Ok(file) = self.database.get(&changed_file.id) else {
337                        if changed_file.path.exists() {
338                            match File::read(&workspace, &changed_file.path, FileType::Host) {
339                                Ok(file) => {
340                                    self.database.add(file);
341                                    tracing::debug!("Added new file to database: {}", changed_file.path.display());
342                                }
343                                Err(e) => {
344                                    tracing::error!("Failed to load new file {}: {}", changed_file.path.display(), e);
345                                }
346                            }
347                        }
348
349                        continue;
350                    };
351
352                    if !changed_file.path.exists() {
353                        self.database.delete(changed_file.id);
354                        tracing::trace!("Deleted file from database: {}", file.name);
355                        continue;
356                    }
357
358                    match Self::read_stable_contents(&changed_file.path) {
359                        Ok(contents) => {
360                            if self.database.update(changed_file.id, Cow::Owned(contents)) {
361                                tracing::trace!("Updated file in database: {}", file.name);
362                            } else {
363                                tracing::warn!("Failed to update file in database (ID not found): {}", file.name);
364                            }
365                        }
366                        Err(e) => {
367                            tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
368                        }
369                    }
370                }
371
372                Ok(changed_ids)
373            }
374            Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
375            Err(RecvTimeoutError::Disconnected) => {
376                self.stop();
377                Err(DatabaseError::WatcherNotActive)
378            }
379        }
380    }
381
382    /// Reads file contents with a stability check to handle partial writes.
383    ///
384    /// Some IDEs and formatters write files in multiple steps (save, then format).
385    /// This method reads the file, waits briefly, and re-reads to ensure the content
386    /// has stabilized before returning.
387    fn read_stable_contents(path: &Path) -> std::io::Result<String> {
388        let contents = std::fs::read_to_string(path)?;
389
390        std::thread::sleep(Duration::from_millis(STABILITY_CHECK_MS));
391
392        if path.exists()
393            && let Ok(reread) = std::fs::read_to_string(path)
394            && reread != contents
395        {
396            tracing::debug!("File content changed during stability check: {}", path.display());
397
398            return Ok(reread);
399        }
400
401        Ok(contents)
402    }
403
404    /// Returns a reference to the database.
405    #[must_use]
406    pub fn database(&self) -> &Database<'a> {
407        &self.database
408    }
409
410    /// Returns a reference to the database.
411    #[must_use]
412    pub fn read_only_database(&self) -> ReadDatabase {
413        self.database.read_only()
414    }
415
416    /// Returns a mutable reference to the database.
417    pub fn database_mut(&mut self) -> &mut Database<'a> {
418        &mut self.database
419    }
420
421    /// Provides temporary mutable access to the database through a closure.
422    ///
423    /// This method helps Rust's borrow checker understand that the mutable borrow
424    /// of the database is scoped to just the closure execution, allowing the watcher
425    /// to be used again after the closure returns.
426    ///
427    /// The closure is bounded with for<'x> to explicitly show that the database
428    /// reference lifetime is scoped to the closure execution only.
429    pub fn with_database_mut<F, R>(&mut self, f: F) -> R
430    where
431        F: for<'x> FnOnce(&'x mut Database<'a>) -> R,
432    {
433        f(&mut self.database)
434    }
435
436    /// Consumes the watcher and returns the database.
437    #[must_use]
438    pub fn into_database(self) -> Database<'a> {
439        let mut md = ManuallyDrop::new(self);
440        md.stop();
441        unsafe { std::ptr::read(&raw const md.database) }
442    }
443}
444
445impl Drop for DatabaseWatcher<'_> {
446    fn drop(&mut self) {
447        self.stop();
448    }
449}