mago-database 1.20.1

Provides a high-performance, in-memory database for source code analysis, featuring distinct mutable and immutable states and transactional updates.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
//! Database watcher for real-time file change monitoring.

use std::borrow::Cow;
use std::collections::HashMap;
use std::collections::HashSet;
use std::mem::ManuallyDrop;
use std::path::Path;
use std::path::PathBuf;
use std::sync::mpsc;
use std::sync::mpsc::Receiver;
use std::sync::mpsc::RecvTimeoutError;
use std::time::Duration;

use globset::GlobBuilder;
use globset::GlobSet;
use globset::GlobSetBuilder;
use notify::Config;
use notify::Event;
use notify::EventKind;
use notify::RecommendedWatcher;
use notify::RecursiveMode;
use notify::Watcher as NotifyWatcher;
use notify::event::ModifyKind;

use crate::Database;
use crate::DatabaseReader;
use crate::ReadDatabase;
use crate::error::DatabaseError;
use crate::exclusion::Exclusion;
use crate::file::File;
use crate::file::FileId;
use crate::file::FileType;

const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
const WAIT_INTERNAL_MS: u64 = 100;
const WAIT_DEBOUNCE_MS: u64 = 300;
const STABILITY_CHECK_MS: u64 = 10;

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
struct ChangedFile {
    id: FileId,
    path: PathBuf,
}

/// Options for configuring the file system watcher.
#[derive(Debug, Clone)]
pub struct WatchOptions {
    pub poll_interval: Option<Duration>,
    pub additional_excludes: Vec<Exclusion<'static>>,
}

impl Default for WatchOptions {
    fn default() -> Self {
        Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
    }
}

/// Database watcher service that monitors file changes and updates the database.
pub struct DatabaseWatcher<'a> {
    database: Database<'a>,
    watcher: Option<RecommendedWatcher>,
    watched_paths: Vec<PathBuf>,
    receiver: Option<Receiver<Vec<ChangedFile>>>,
}

impl<'a> DatabaseWatcher<'a> {
    #[must_use]
    pub fn new(database: Database<'a>) -> Self {
        Self { database, watcher: None, watched_paths: Vec::new(), receiver: None }
    }

    /// Starts watching for file changes in the configured directories.
    ///
    /// # Errors
    ///
    /// Returns a [`DatabaseError`] if:
    /// - A glob pattern is invalid
    /// - The file system watcher cannot be created
    /// - Directories cannot be watched
    pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
        self.stop();

        let config = &self.database.configuration;

        let (tx, rx) = mpsc::channel();

        let mut all_exclusions = vec![
            Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
            Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
            Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
            Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
        ];
        all_exclusions.extend(config.excludes.iter().cloned());
        all_exclusions.extend(options.additional_excludes);

        let glob_settings = &config.glob;
        let mut glob_builder = GlobSetBuilder::new();
        for ex in &all_exclusions {
            if let Exclusion::Pattern(pat) = ex {
                let glob = GlobBuilder::new(pat)
                    .case_insensitive(glob_settings.case_insensitive)
                    .literal_separator(glob_settings.literal_separator)
                    .backslash_escape(glob_settings.backslash_escape)
                    .empty_alternates(glob_settings.empty_alternates)
                    .build()?;
                glob_builder.add(glob);
            }
        }

        let glob_excludes = glob_builder.build()?;

        let path_excludes: HashSet<PathBuf> = all_exclusions
            .iter()
            .filter_map(|ex| match ex {
                Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
                _ => None,
            })
            .collect();

        let extensions: HashSet<String> = config.extensions.iter().map(std::string::ToString::to_string).collect();
        let workspace = config.workspace.as_ref().to_path_buf();

        // Build the set of explicitly configured watch paths so that events from
        // these directories are never filtered out by the default glob exclusions
        // (e.g., a project that explicitly includes `vendor/revolt` should still
        // receive change events for files under that directory).
        let mut unique_watch_paths = HashSet::new();

        for path in &config.paths {
            let watch_path = Self::extract_watch_path(path.as_ref());
            let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };

            unique_watch_paths.insert(absolute_path);
        }

        for path in &config.includes {
            let watch_path = Self::extract_watch_path(path.as_ref());
            let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };

            unique_watch_paths.insert(absolute_path);
        }

        let explicit_watch_paths: Vec<PathBuf> = unique_watch_paths.iter().cloned().collect();

        let mut watcher = RecommendedWatcher::new(
            move |res: Result<Event, notify::Error>| {
                if let Ok(event) = res
                    && let Some(changed) = Self::handle_event(
                        event,
                        &workspace,
                        &glob_excludes,
                        &path_excludes,
                        &extensions,
                        &explicit_watch_paths,
                    )
                {
                    let _ = tx.send(changed);
                }
            },
            Config::default()
                .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
        )
        .map_err(DatabaseError::WatcherInit)?;

        let mut watched_paths = Vec::new();
        for path in unique_watch_paths {
            watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
            watched_paths.push(path.clone());
            tracing::debug!("Watching path: {}", path.display());
        }

        tracing::info!("Database watcher started for workspace: {}", config.workspace.display());

        self.watcher = Some(watcher);
        self.watched_paths = watched_paths;
        self.receiver = Some(rx);

        Ok(())
    }

    /// Stops watching if currently active.
    pub fn stop(&mut self) {
        if let Some(mut watcher) = self.watcher.take() {
            for path in &self.watched_paths {
                let _ = watcher.unwatch(path);
                tracing::debug!("Stopped watching: {}", path.display());
            }
        }
        self.watched_paths.clear();
        self.receiver = None;
    }

    /// Checks if the watcher is currently active.
    #[must_use]
    pub fn is_watching(&self) -> bool {
        self.watcher.is_some()
    }

    /// Extracts the base directory path from a potentially glob-pattern path.
    ///
    /// For glob patterns (containing *, ?, [, {), this returns the directory portion
    /// before the first glob metacharacter. For regular paths, returns the path as-is.
    ///
    /// # Examples
    ///
    /// - `"src/**/*.php"` → `"src"`
    /// - `"lib/*/foo.php"` → `"lib"`
    /// - `"tests/fixtures"` → `"tests/fixtures"` (unchanged)
    fn extract_watch_path(pattern: &str) -> PathBuf {
        let is_glob = pattern.contains('*') || pattern.contains('?') || pattern.contains('[') || pattern.contains('{');

        if !is_glob {
            return PathBuf::from(pattern);
        }

        let first_glob_pos = pattern.find(['*', '?', '[', '{']).unwrap_or(pattern.len());

        let base = &pattern[..first_glob_pos];

        let base = base.trim_end_matches('/').trim_end_matches('\\');

        if base.is_empty() { PathBuf::from(".") } else { PathBuf::from(base) }
    }

    fn handle_event(
        event: Event,
        workspace: &Path,
        glob_excludes: &GlobSet,
        path_excludes: &HashSet<PathBuf>,
        extensions: &HashSet<String>,
        explicit_watch_paths: &[PathBuf],
    ) -> Option<Vec<ChangedFile>> {
        tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);

        if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
            event.kind
        {
            tracing::debug!("Ignoring non-modification event: {:?}", event.kind);

            return None;
        }

        let mut changed_files = Vec::new();

        for path in event.paths {
            // Check if file has a valid extension
            if let Some(ext) = path.extension() {
                if !extensions.contains(ext.to_string_lossy().as_ref()) {
                    continue;
                }
            } else {
                continue;
            }

            let is_explicitly_watched = explicit_watch_paths.iter().any(|wp| path.starts_with(wp));
            if !is_explicitly_watched {
                // Check glob pattern exclusions
                if glob_excludes.is_match(&path) {
                    tracing::debug!("Skipping path excluded by pattern: {}", path.display());
                    continue;
                }

                // Check exact path exclusions
                if path_excludes.contains(&path) {
                    tracing::debug!("Skipping excluded path: {}", path.display());
                    continue;
                }

                // Check if any parent directory is in path_excludes
                let mut should_skip = false;
                for ancestor in path.ancestors().skip(1) {
                    if path_excludes.contains(ancestor) {
                        tracing::debug!("Skipping path under excluded directory: {}", path.display());
                        should_skip = true;
                        break;
                    }
                }

                if should_skip {
                    continue;
                }
            }

            // Normalize to forward slashes for cross-platform determinism
            let logical_name = path.strip_prefix(workspace).unwrap_or(&path).to_string_lossy().replace('\\', "/");
            let file_id = FileId::new(&logical_name);

            changed_files.push(ChangedFile { id: file_id, path: path.clone() });
        }

        if changed_files.is_empty() { None } else { Some(changed_files) }
    }

    /// Waits for file changes and updates the database.
    ///
    /// This method blocks until file changes are detected, then updates the database
    /// in place and returns the IDs of changed files.
    ///
    /// # Errors
    ///
    /// Returns a [`DatabaseError`] if:
    /// - The watcher is not currently active ([`DatabaseError::WatcherNotActive`])
    /// - Updating the database with changed files fails
    pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
        let Some(receiver) = &self.receiver else {
            return Err(DatabaseError::WatcherNotActive);
        };

        let config = &self.database.configuration;
        let workspace = config.workspace.as_ref().to_path_buf();

        match receiver.recv_timeout(Duration::from_millis(WAIT_INTERNAL_MS)) {
            Ok(changed_files) => {
                let mut all_changed = changed_files;
                loop {
                    match receiver.recv_timeout(Duration::from_millis(WAIT_DEBOUNCE_MS)) {
                        Ok(more) => all_changed.extend(more),
                        Err(RecvTimeoutError::Timeout) => break,
                        Err(RecvTimeoutError::Disconnected) => {
                            self.stop();
                            return Err(DatabaseError::WatcherNotActive);
                        }
                    }
                }

                let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
                for changed in all_changed {
                    latest_changes.insert(changed.id, changed);
                }
                let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
                let mut changed_ids = Vec::new();

                for changed_file in &all_changed {
                    changed_ids.push(changed_file.id);

                    let Ok(file) = self.database.get(&changed_file.id) else {
                        if changed_file.path.exists() {
                            match File::read(&workspace, &changed_file.path, FileType::Host) {
                                Ok(file) => {
                                    self.database.add(file);
                                    tracing::debug!("Added new file to database: {}", changed_file.path.display());
                                }
                                Err(e) => {
                                    tracing::error!("Failed to load new file {}: {}", changed_file.path.display(), e);
                                }
                            }
                        }

                        continue;
                    };

                    if !changed_file.path.exists() {
                        self.database.delete(changed_file.id);
                        tracing::trace!("Deleted file from database: {}", file.name);
                        continue;
                    }

                    match Self::read_stable_contents(&changed_file.path) {
                        Ok(contents) => {
                            if self.database.update(changed_file.id, Cow::Owned(contents)) {
                                tracing::trace!("Updated file in database: {}", file.name);
                            } else {
                                tracing::warn!("Failed to update file in database (ID not found): {}", file.name);
                            }
                        }
                        Err(e) => {
                            tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
                        }
                    }
                }

                Ok(changed_ids)
            }
            Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
            Err(RecvTimeoutError::Disconnected) => {
                self.stop();
                Err(DatabaseError::WatcherNotActive)
            }
        }
    }

    /// Reads file contents with a stability check to handle partial writes.
    ///
    /// Some IDEs and formatters write files in multiple steps (save, then format).
    /// This method reads the file, waits briefly, and re-reads to ensure the content
    /// has stabilized before returning.
    fn read_stable_contents(path: &Path) -> std::io::Result<String> {
        let contents = std::fs::read_to_string(path)?;

        std::thread::sleep(Duration::from_millis(STABILITY_CHECK_MS));

        if path.exists()
            && let Ok(reread) = std::fs::read_to_string(path)
            && reread != contents
        {
            tracing::debug!("File content changed during stability check: {}", path.display());

            return Ok(reread);
        }

        Ok(contents)
    }

    /// Returns a reference to the database.
    #[must_use]
    pub fn database(&self) -> &Database<'a> {
        &self.database
    }

    /// Returns a reference to the database.
    #[must_use]
    pub fn read_only_database(&self) -> ReadDatabase {
        self.database.read_only()
    }

    /// Returns a mutable reference to the database.
    pub fn database_mut(&mut self) -> &mut Database<'a> {
        &mut self.database
    }

    /// Provides temporary mutable access to the database through a closure.
    ///
    /// This method helps Rust's borrow checker understand that the mutable borrow
    /// of the database is scoped to just the closure execution, allowing the watcher
    /// to be used again after the closure returns.
    ///
    /// The closure is bounded with for<'x> to explicitly show that the database
    /// reference lifetime is scoped to the closure execution only.
    pub fn with_database_mut<F, R>(&mut self, f: F) -> R
    where
        F: for<'x> FnOnce(&'x mut Database<'a>) -> R,
    {
        f(&mut self.database)
    }

    /// Consumes the watcher and returns the database.
    #[must_use]
    pub fn into_database(self) -> Database<'a> {
        let mut md = ManuallyDrop::new(self);
        md.stop();
        unsafe { std::ptr::read(&raw const md.database) }
    }
}

impl Drop for DatabaseWatcher<'_> {
    fn drop(&mut self) {
        self.stop();
    }
}