Skip to main content

mago_database/
watcher.rs

1//! Database watcher for real-time file change monitoring.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::mem::ManuallyDrop;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::mpsc;
10use std::sync::mpsc::Receiver;
11use std::sync::mpsc::RecvTimeoutError;
12use std::time::Duration;
13
14use globset::GlobBuilder;
15use globset::GlobSet;
16use globset::GlobSetBuilder;
17use notify::Config;
18use notify::Event;
19use notify::EventKind;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher as NotifyWatcher;
23use notify::event::ModifyKind;
24
25use crate::Database;
26use crate::DatabaseReader;
27use crate::ReadDatabase;
28use crate::error::DatabaseError;
29use crate::exclusion::Exclusion;
30use crate::file::File;
31use crate::file::FileId;
32use crate::file::FileType;
33use crate::utils::bytes_to_path;
34
35const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
36const WAIT_INTERNAL_MS: u64 = 100;
37const WAIT_DEBOUNCE_MS: u64 = 300;
38const STABILITY_CHECK_MS: u64 = 10;
39
40#[derive(Debug, Clone, PartialEq, Eq, Hash)]
41struct ChangedFile {
42    id: FileId,
43    path: PathBuf,
44}
45
46/// Options for configuring the file system watcher.
47#[derive(Debug, Clone)]
48pub struct WatchOptions {
49    pub poll_interval: Option<Duration>,
50    pub additional_excludes: Vec<Exclusion<'static>>,
51}
52
53impl Default for WatchOptions {
54    #[inline]
55    fn default() -> Self {
56        Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
57    }
58}
59
60/// Database watcher service that monitors file changes and updates the database.
61pub struct DatabaseWatcher<'config> {
62    database: Database<'config>,
63    watcher: Option<RecommendedWatcher>,
64    watched_paths: Vec<PathBuf>,
65    receiver: Option<Receiver<Vec<ChangedFile>>>,
66}
67
68impl<'config> DatabaseWatcher<'config> {
69    #[inline]
70    #[must_use]
71    pub fn new(database: Database<'config>) -> Self {
72        Self { database, watcher: None, watched_paths: Vec::new(), receiver: None }
73    }
74
75    /// Starts watching for file changes in the configured directories.
76    ///
77    /// # Errors
78    ///
79    /// Returns a [`DatabaseError`] if:
80    /// - A glob pattern is invalid
81    /// - The file system watcher cannot be created
82    /// - Directories cannot be watched
83    #[inline]
84    pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
85        self.stop();
86
87        let config = &self.database.configuration;
88
89        let (tx, rx) = mpsc::channel();
90
91        let mut all_exclusions = vec![
92            Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
93            Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
94            Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
95            Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
96        ];
97        all_exclusions.extend(config.excludes.iter().cloned());
98        all_exclusions.extend(options.additional_excludes);
99
100        let glob_settings = &config.glob;
101        let mut glob_builder = GlobSetBuilder::new();
102        for ex in &all_exclusions {
103            if let Exclusion::Pattern(pat) = ex {
104                let glob = GlobBuilder::new(pat)
105                    .case_insensitive(glob_settings.case_insensitive)
106                    .literal_separator(glob_settings.literal_separator)
107                    .backslash_escape(glob_settings.backslash_escape)
108                    .empty_alternates(glob_settings.empty_alternates)
109                    .build()?;
110                glob_builder.add(glob);
111            }
112        }
113
114        let glob_excludes = glob_builder.build()?;
115
116        let path_excludes: HashSet<PathBuf> = all_exclusions
117            .iter()
118            .filter_map(|ex| match ex {
119                Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
120                Exclusion::Pattern(_) => None,
121            })
122            .collect();
123
124        let extensions: HashSet<Vec<u8>> = config.extensions.iter().map(|c| c.to_vec()).collect();
125        let workspace = config.workspace.as_ref().to_path_buf();
126
127        // Build the set of explicitly configured watch paths so that events from
128        // these directories are never filtered out by the default glob exclusions
129        // (e.g., a project that explicitly includes `vendor/revolt` should still
130        // receive change events for files under that directory).
131        let mut unique_watch_paths = HashSet::new();
132
133        for path in &config.paths {
134            let watch_path = Self::extract_watch_path(path.as_ref());
135            let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
136
137            unique_watch_paths.insert(absolute_path);
138        }
139
140        for path in &config.includes {
141            let watch_path = Self::extract_watch_path(path.as_ref());
142            let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
143
144            unique_watch_paths.insert(absolute_path);
145        }
146
147        let explicit_watch_paths: Vec<PathBuf> = unique_watch_paths
148            .iter()
149            .filter(|wp| glob_excludes.is_match(wp.as_path()) || path_excludes.contains(wp.as_path()))
150            .cloned()
151            .collect();
152
153        let mut watcher = RecommendedWatcher::new(
154            move |res: Result<Event, notify::Error>| {
155                if let Ok(event) = res
156                    && let Some(changed) = Self::handle_event(
157                        event,
158                        &workspace,
159                        &glob_excludes,
160                        &path_excludes,
161                        &extensions,
162                        &explicit_watch_paths,
163                    )
164                {
165                    let _ = tx.send(changed);
166                }
167            },
168            Config::default()
169                .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
170        )
171        .map_err(DatabaseError::WatcherInit)?;
172
173        let mut watched_paths = Vec::new();
174        for path in unique_watch_paths {
175            watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
176            watched_paths.push(path.clone());
177            tracing::debug!("Watching path: {}", path.display());
178        }
179
180        tracing::info!("Database watcher started for workspace: {}", config.workspace.display());
181
182        self.watcher = Some(watcher);
183        self.watched_paths = watched_paths;
184        self.receiver = Some(rx);
185
186        Ok(())
187    }
188
189    /// Stops watching if currently active.
190    #[inline]
191    pub fn stop(&mut self) {
192        if let Some(mut watcher) = self.watcher.take() {
193            for path in &self.watched_paths {
194                let _ = watcher.unwatch(path);
195                tracing::debug!("Stopped watching: {}", path.display());
196            }
197        }
198        self.watched_paths.clear();
199        self.receiver = None;
200    }
201
202    /// Checks if the watcher is currently active.
203    #[inline]
204    #[must_use]
205    pub fn is_watching(&self) -> bool {
206        self.watcher.is_some()
207    }
208
209    /// Extracts the base directory path from a potentially glob-pattern path.
210    ///
211    /// For glob patterns (containing *, ?, [, {), this returns the directory portion
212    /// before the first glob metacharacter. For regular paths, returns the path as-is.
213    ///
214    /// # Examples
215    ///
216    /// - `"src/**/*.php"` → `"src"`
217    /// - `"lib/*/foo.php"` → `"lib"`
218    /// - `"tests/fixtures"` → `"tests/fixtures"` (unchanged)
219    fn extract_watch_path(pattern: &[u8]) -> PathBuf {
220        let is_glob =
221            pattern.contains(&b'*') || pattern.contains(&b'?') || pattern.contains(&b'[') || pattern.contains(&b'{');
222
223        if !is_glob {
224            return bytes_to_path(pattern).into_owned();
225        }
226
227        let first_glob_pos =
228            pattern.iter().position(|&b| matches!(b, b'*' | b'?' | b'[' | b'{')).unwrap_or(pattern.len());
229
230        let base = &pattern[..first_glob_pos];
231
232        let mut end = base.len();
233        while end > 0 && matches!(base[end - 1], b'/' | b'\\') {
234            end -= 1;
235        }
236        let base = &base[..end];
237
238        if base.is_empty() { PathBuf::from(".") } else { bytes_to_path(base).into_owned() }
239    }
240
241    fn handle_event(
242        event: Event,
243        workspace: &Path,
244        glob_excludes: &GlobSet,
245        path_excludes: &HashSet<PathBuf>,
246        extensions: &HashSet<Vec<u8>>,
247        explicit_watch_paths: &[PathBuf],
248    ) -> Option<Vec<ChangedFile>> {
249        tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);
250
251        if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
252            event.kind
253        {
254            tracing::debug!("Ignoring non-modification event: {:?}", event.kind);
255
256            return None;
257        }
258
259        let mut changed_files = Vec::new();
260
261        for path in event.paths {
262            // Check if file has a valid extension
263            if let Some(ext) = path.extension() {
264                if !extensions.contains(ext.as_encoded_bytes()) {
265                    continue;
266                }
267            } else {
268                continue;
269            }
270
271            let is_explicitly_watched = explicit_watch_paths.iter().any(|wp| path.starts_with(wp));
272            if !is_explicitly_watched {
273                // Check glob pattern exclusions
274                if glob_excludes.is_match(&path) {
275                    tracing::debug!("Skipping path excluded by pattern: {}", path.display());
276                    continue;
277                }
278
279                // Check exact path exclusions
280                if path_excludes.contains(&path) {
281                    tracing::debug!("Skipping excluded path: {}", path.display());
282                    continue;
283                }
284
285                // Check if any parent directory is in path_excludes
286                let mut should_skip = false;
287                for ancestor in path.ancestors().skip(1) {
288                    if path_excludes.contains(ancestor) {
289                        tracing::debug!("Skipping path under excluded directory: {}", path.display());
290                        should_skip = true;
291                        break;
292                    }
293                }
294
295                if should_skip {
296                    continue;
297                }
298            }
299
300            // Normalize to forward slashes for cross-platform determinism
301            #[cfg(windows)]
302            let logical_name = path
303                .strip_prefix(workspace)
304                .unwrap_or(&path)
305                .as_os_str()
306                .as_encoded_bytes()
307                .iter()
308                .map(|i| if *i == b'\\' { b'/' } else { *i })
309                .collect::<Vec<_>>();
310            #[cfg(not(windows))]
311            let logical_name = path.strip_prefix(workspace).unwrap_or(&path).as_os_str().as_encoded_bytes().to_owned();
312
313            let file_id = FileId::new(&logical_name);
314
315            changed_files.push(ChangedFile { id: file_id, path: path.clone() });
316        }
317
318        if changed_files.is_empty() { None } else { Some(changed_files) }
319    }
320
321    /// Waits for file changes and updates the database.
322    ///
323    /// This method blocks until file changes are detected, then updates the database
324    /// in place and returns the IDs of changed files.
325    ///
326    /// # Errors
327    ///
328    /// Returns a [`DatabaseError`] if:
329    /// - The watcher is not currently active ([`DatabaseError::WatcherNotActive`])
330    /// - Updating the database with changed files fails
331    #[inline]
332    pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
333        let Some(receiver) = &self.receiver else {
334            return Err(DatabaseError::WatcherNotActive);
335        };
336
337        let config = &self.database.configuration;
338        let workspace = config.workspace.as_ref().to_path_buf();
339
340        match receiver.recv_timeout(Duration::from_millis(WAIT_INTERNAL_MS)) {
341            Ok(changed_files) => {
342                let mut all_changed = changed_files;
343                loop {
344                    match receiver.recv_timeout(Duration::from_millis(WAIT_DEBOUNCE_MS)) {
345                        Ok(more) => all_changed.extend(more),
346                        Err(RecvTimeoutError::Timeout) => break,
347                        Err(RecvTimeoutError::Disconnected) => {
348                            self.stop();
349                            return Err(DatabaseError::WatcherNotActive);
350                        }
351                    }
352                }
353
354                let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
355                for changed in all_changed {
356                    latest_changes.insert(changed.id, changed);
357                }
358                let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
359                let mut changed_ids = Vec::new();
360
361                for changed_file in &all_changed {
362                    changed_ids.push(changed_file.id);
363
364                    let Ok(file) = self.database.get(&changed_file.id) else {
365                        if changed_file.path.exists() {
366                            match File::read(&workspace, &changed_file.path, FileType::Host) {
367                                Ok(file) => {
368                                    self.database.add(file);
369                                    tracing::debug!("Added new file to database: {}", changed_file.path.display());
370                                }
371                                Err(e) => {
372                                    tracing::error!("Failed to load new file {}: {}", changed_file.path.display(), e);
373                                }
374                            }
375                        }
376
377                        continue;
378                    };
379
380                    if !changed_file.path.exists() {
381                        self.database.delete(changed_file.id);
382                        tracing::trace!("Deleted file from database: {}", String::from_utf8_lossy(&file.name));
383                        continue;
384                    }
385
386                    match Self::read_stable_contents(&changed_file.path) {
387                        Ok(contents) => {
388                            if self.database.update(changed_file.id, Cow::Owned(contents)) {
389                                tracing::trace!("Updated file in database: {}", String::from_utf8_lossy(&file.name));
390                            } else {
391                                tracing::warn!(
392                                    "Failed to update file in database (ID not found): {}",
393                                    String::from_utf8_lossy(&file.name)
394                                );
395                            }
396                        }
397                        Err(e) => {
398                            tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
399                        }
400                    }
401                }
402
403                Ok(changed_ids)
404            }
405            Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
406            Err(RecvTimeoutError::Disconnected) => {
407                self.stop();
408                Err(DatabaseError::WatcherNotActive)
409            }
410        }
411    }
412
413    /// Reads file contents with a stability check to handle partial writes.
414    ///
415    /// Some IDEs and formatters write files in multiple steps (save, then format).
416    /// This method reads the file, waits briefly, and re-reads to ensure the content
417    /// has stabilized before returning.
418    fn read_stable_contents(path: &Path) -> std::io::Result<Vec<u8>> {
419        let contents = std::fs::read(path)?;
420
421        std::thread::sleep(Duration::from_millis(STABILITY_CHECK_MS));
422
423        if path.exists()
424            && let Ok(reread) = std::fs::read(path)
425            && reread != contents
426        {
427            tracing::debug!("File content changed during stability check: {}", path.display());
428
429            return Ok(reread);
430        }
431
432        Ok(contents)
433    }
434
435    /// Returns a reference to the database.
436    #[inline]
437    #[must_use]
438    pub fn database(&self) -> &Database<'config> {
439        &self.database
440    }
441
442    /// Returns a reference to the database.
443    #[inline]
444    #[must_use]
445    pub fn read_only_database(&self) -> ReadDatabase {
446        self.database.read_only()
447    }
448
449    /// Returns a mutable reference to the database.
450    #[inline]
451    pub fn database_mut(&mut self) -> &mut Database<'config> {
452        &mut self.database
453    }
454
455    /// Provides temporary mutable access to the database through a closure.
456    ///
457    /// This method helps Rust's borrow checker understand that the mutable borrow
458    /// of the database is scoped to just the closure execution, allowing the watcher
459    /// to be used again after the closure returns.
460    ///
461    /// The closure is bounded with for<'x> to explicitly show that the database
462    /// reference lifetime is scoped to the closure execution only.
463    #[inline]
464    pub fn with_database_mut<F, R>(&mut self, f: F) -> R
465    where
466        F: for<'borrow> FnOnce(&'borrow mut Database<'config>) -> R,
467    {
468        f(&mut self.database)
469    }
470
471    /// Consumes the watcher and returns the database.
472    #[inline]
473    #[must_use]
474    pub fn into_database(self) -> Database<'config> {
475        let mut md = ManuallyDrop::new(self);
476        md.stop();
477        // SAFETY: `md` is a `ManuallyDrop<Self>`, so its `Drop` impl will not run; reading the
478        // `database` field byte-for-byte is safe because we never read or drop it again.
479        unsafe { std::ptr::read(&raw const md.database) }
480    }
481}
482
483impl Drop for DatabaseWatcher<'_> {
484    #[inline]
485    fn drop(&mut self) {
486        self.stop();
487    }
488}