mago_database/
watcher.rs

1//! Database watcher for real-time file change monitoring.
2
3use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::mem::ManuallyDrop;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::mpsc;
10use std::sync::mpsc::Receiver;
11use std::sync::mpsc::RecvTimeoutError;
12use std::time::Duration;
13
14use globset::Glob;
15use globset::GlobSet;
16use globset::GlobSetBuilder;
17use notify::Config;
18use notify::Event;
19use notify::EventKind;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher as NotifyWatcher;
23use notify::event::ModifyKind;
24
25use crate::Database;
26use crate::DatabaseReader;
27use crate::ReadDatabase;
28use crate::error::DatabaseError;
29use crate::exclusion::Exclusion;
30use crate::file::File;
31use crate::file::FileId;
32use crate::file::FileType;
33
34const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37struct ChangedFile {
38    id: FileId,
39    path: PathBuf,
40}
41
42/// Options for configuring the file system watcher.
43#[derive(Debug, Clone)]
44pub struct WatchOptions {
45    pub poll_interval: Option<Duration>,
46    pub additional_excludes: Vec<Exclusion<'static>>,
47}
48
49impl Default for WatchOptions {
50    fn default() -> Self {
51        Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
52    }
53}
54
55/// Database watcher service that monitors file changes and updates the database.
56pub struct DatabaseWatcher<'a> {
57    database: Database<'a>,
58    watcher: Option<RecommendedWatcher>,
59    watched_paths: Vec<PathBuf>,
60    receiver: Option<Receiver<Vec<ChangedFile>>>,
61}
62
63impl<'a> DatabaseWatcher<'a> {
64    #[must_use]
65    pub fn new(database: Database<'a>) -> Self {
66        Self { database, watcher: None, watched_paths: Vec::new(), receiver: None }
67    }
68
69    /// Starts watching for file changes in the configured directories.
70    ///
71    /// # Errors
72    ///
73    /// Returns a [`DatabaseError`] if:
74    /// - A glob pattern is invalid
75    /// - The file system watcher cannot be created
76    /// - Directories cannot be watched
77    pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
78        self.stop();
79
80        let config = &self.database.configuration;
81
82        let (tx, rx) = mpsc::channel();
83
84        let mut all_exclusions = vec![
85            Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
86            Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
87            Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
88            Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
89        ];
90        all_exclusions.extend(config.excludes.iter().cloned());
91        all_exclusions.extend(options.additional_excludes);
92
93        let mut glob_builder = GlobSetBuilder::new();
94        for ex in &all_exclusions {
95            if let Exclusion::Pattern(pat) = ex {
96                glob_builder.add(Glob::new(pat)?);
97            }
98        }
99        let glob_excludes = glob_builder.build()?;
100
101        let path_excludes: HashSet<PathBuf> = all_exclusions
102            .iter()
103            .filter_map(|ex| match ex {
104                Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
105                _ => None,
106            })
107            .collect();
108
109        let extensions: HashSet<String> = config.extensions.iter().map(std::string::ToString::to_string).collect();
110        let workspace = config.workspace.as_ref().to_path_buf();
111
112        let mut watcher = RecommendedWatcher::new(
113            move |res: Result<Event, notify::Error>| {
114                if let Ok(event) = res
115                    && let Some(changed) =
116                        Self::handle_event(event, &workspace, &glob_excludes, &path_excludes, &extensions)
117                {
118                    let _ = tx.send(changed);
119                }
120            },
121            Config::default()
122                .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
123        )
124        .map_err(DatabaseError::WatcherInit)?;
125
126        let mut unique_watch_paths = HashSet::new();
127
128        for path in &config.paths {
129            let watch_path = Self::extract_watch_path(path.as_ref());
130            let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
131
132            unique_watch_paths.insert(absolute_path);
133        }
134
135        for path in &config.includes {
136            let watch_path = Self::extract_watch_path(path.as_ref());
137            let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
138
139            unique_watch_paths.insert(absolute_path);
140        }
141
142        let mut watched_paths = Vec::new();
143        for path in unique_watch_paths {
144            watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
145            watched_paths.push(path.clone());
146            tracing::debug!("Watching path: {}", path.display());
147        }
148
149        tracing::info!("Database watcher started for workspace: {}", config.workspace.display());
150
151        self.watcher = Some(watcher);
152        self.watched_paths = watched_paths;
153        self.receiver = Some(rx);
154
155        Ok(())
156    }
157
158    /// Stops watching if currently active.
159    pub fn stop(&mut self) {
160        if let Some(mut watcher) = self.watcher.take() {
161            for path in &self.watched_paths {
162                let _ = watcher.unwatch(path);
163                tracing::debug!("Stopped watching: {}", path.display());
164            }
165        }
166        self.watched_paths.clear();
167        self.receiver = None;
168    }
169
170    /// Checks if the watcher is currently active.
171    #[must_use]
172    pub fn is_watching(&self) -> bool {
173        self.watcher.is_some()
174    }
175
176    /// Extracts the base directory path from a potentially glob-pattern path.
177    ///
178    /// For glob patterns (containing *, ?, [, {), this returns the directory portion
179    /// before the first glob metacharacter. For regular paths, returns the path as-is.
180    ///
181    /// # Examples
182    ///
183    /// - `"src/**/*.php"` → `"src"`
184    /// - `"lib/*/foo.php"` → `"lib"`
185    /// - `"tests/fixtures"` → `"tests/fixtures"` (unchanged)
186    fn extract_watch_path(pattern: &str) -> PathBuf {
187        let is_glob = pattern.contains('*') || pattern.contains('?') || pattern.contains('[') || pattern.contains('{');
188
189        if !is_glob {
190            return PathBuf::from(pattern);
191        }
192
193        let first_glob_pos = pattern.find(['*', '?', '[', '{']).unwrap_or(pattern.len());
194
195        let base = &pattern[..first_glob_pos];
196
197        let base = base.trim_end_matches('/').trim_end_matches('\\');
198
199        if base.is_empty() { PathBuf::from(".") } else { PathBuf::from(base) }
200    }
201
202    fn handle_event(
203        event: Event,
204        workspace: &Path,
205        glob_excludes: &GlobSet,
206        path_excludes: &HashSet<PathBuf>,
207        extensions: &HashSet<String>,
208    ) -> Option<Vec<ChangedFile>> {
209        tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);
210
211        if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
212            event.kind
213        {
214            tracing::debug!("Ignoring non-modification event: {:?}", event.kind);
215
216            return None;
217        }
218
219        let mut changed_files = Vec::new();
220
221        for path in event.paths {
222            // Check if file has a valid extension
223            if let Some(ext) = path.extension() {
224                if !extensions.contains(ext.to_string_lossy().as_ref()) {
225                    continue;
226                }
227            } else {
228                continue;
229            }
230
231            // Check glob pattern exclusions
232            if glob_excludes.is_match(&path) {
233                tracing::debug!("Skipping path excluded by pattern: {}", path.display());
234                continue;
235            }
236
237            // Check exact path exclusions
238            if path_excludes.contains(&path) {
239                tracing::debug!("Skipping excluded path: {}", path.display());
240                continue;
241            }
242
243            // Check if any parent directory is in path_excludes
244            let mut should_skip = false;
245            for ancestor in path.ancestors().skip(1) {
246                if path_excludes.contains(ancestor) {
247                    tracing::debug!("Skipping path under excluded directory: {}", path.display());
248                    should_skip = true;
249                    break;
250                }
251            }
252            if should_skip {
253                continue;
254            }
255
256            let logical_name = path.strip_prefix(workspace).unwrap_or(&path).to_string_lossy();
257            let file_id = FileId::new(logical_name.as_ref());
258
259            changed_files.push(ChangedFile { id: file_id, path: path.clone() });
260        }
261
262        if changed_files.is_empty() { None } else { Some(changed_files) }
263    }
264
265    /// Waits for file changes and updates the database.
266    ///
267    /// This method blocks until file changes are detected, then updates the database
268    /// in place and returns the IDs of changed files.
269    ///
270    /// # Errors
271    ///
272    /// Returns a [`DatabaseError`] if:
273    /// - The watcher is not currently active ([`DatabaseError::WatcherNotActive`])
274    /// - Updating the database with changed files fails
275    pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
276        let Some(receiver) = &self.receiver else {
277            return Err(DatabaseError::WatcherNotActive);
278        };
279
280        let config = &self.database.configuration;
281        let workspace = config.workspace.as_ref().to_path_buf();
282
283        match receiver.recv_timeout(Duration::from_millis(100)) {
284            Ok(changed_files) => {
285                std::thread::sleep(Duration::from_millis(250));
286                let mut all_changed = changed_files;
287                while let Ok(more) = receiver.try_recv() {
288                    all_changed.extend(more);
289                }
290
291                let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
292                for changed in all_changed {
293                    latest_changes.insert(changed.id, changed);
294                }
295                let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
296                let mut changed_ids = Vec::new();
297
298                for changed_file in &all_changed {
299                    changed_ids.push(changed_file.id);
300
301                    match self.database.get(&changed_file.id) {
302                        Ok(file) => {
303                            if changed_file.path.exists() {
304                                match std::fs::read_to_string(&changed_file.path) {
305                                    Ok(contents) => {
306                                        self.database.update(changed_file.id, Cow::Owned(contents));
307                                        tracing::trace!("Updated file in database: {}", file.name);
308                                    }
309                                    Err(e) => {
310                                        tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
311                                    }
312                                }
313                            } else {
314                                self.database.delete(changed_file.id);
315                                tracing::trace!("Deleted file from database: {}", file.name);
316                            }
317                        }
318                        Err(_) => {
319                            if changed_file.path.exists() {
320                                match File::read(&workspace, &changed_file.path, FileType::Host) {
321                                    Ok(file) => {
322                                        self.database.add(file);
323                                        tracing::debug!("Added new file to database: {}", changed_file.path.display());
324                                    }
325                                    Err(e) => {
326                                        tracing::error!(
327                                            "Failed to load new file {}: {}",
328                                            changed_file.path.display(),
329                                            e
330                                        );
331                                    }
332                                }
333                            }
334                        }
335                    }
336                }
337
338                Ok(changed_ids)
339            }
340            Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
341            Err(RecvTimeoutError::Disconnected) => {
342                self.stop();
343                Err(DatabaseError::WatcherNotActive)
344            }
345        }
346    }
347
348    /// Returns a reference to the database.
349    #[must_use]
350    pub fn database(&self) -> &Database<'a> {
351        &self.database
352    }
353
354    /// Returns a reference to the database.
355    #[must_use]
356    pub fn read_only_database(&self) -> ReadDatabase {
357        self.database.read_only()
358    }
359
360    /// Returns a mutable reference to the database.
361    pub fn database_mut(&mut self) -> &mut Database<'a> {
362        &mut self.database
363    }
364
365    /// Provides temporary mutable access to the database through a closure.
366    ///
367    /// This method helps Rust's borrow checker understand that the mutable borrow
368    /// of the database is scoped to just the closure execution, allowing the watcher
369    /// to be used again after the closure returns.
370    ///
371    /// The closure is bounded with for<'x> to explicitly show that the database
372    /// reference lifetime is scoped to the closure execution only.
373    pub fn with_database_mut<F, R>(&mut self, f: F) -> R
374    where
375        F: for<'x> FnOnce(&'x mut Database<'a>) -> R,
376    {
377        f(&mut self.database)
378    }
379
380    /// Consumes the watcher and returns the database.
381    #[must_use]
382    pub fn into_database(self) -> Database<'a> {
383        let mut md = ManuallyDrop::new(self);
384        md.stop();
385        unsafe { std::ptr::read(&raw const md.database) }
386    }
387}
388
389impl Drop for DatabaseWatcher<'_> {
390    fn drop(&mut self) {
391        self.stop();
392    }
393}