1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::mem::ManuallyDrop;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::mpsc;
10use std::sync::mpsc::Receiver;
11use std::sync::mpsc::RecvTimeoutError;
12use std::time::Duration;
13
14use globset::GlobBuilder;
15use globset::GlobSet;
16use globset::GlobSetBuilder;
17use notify::Config;
18use notify::Event;
19use notify::EventKind;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher as NotifyWatcher;
23use notify::event::ModifyKind;
24
25use crate::Database;
26use crate::DatabaseReader;
27use crate::ReadDatabase;
28use crate::error::DatabaseError;
29use crate::exclusion::Exclusion;
30use crate::file::File;
31use crate::file::FileId;
32use crate::file::FileType;
33
34const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
35const WAIT_INTERNAL_MS: u64 = 100;
36const WAIT_DEBOUNCE_MS: u64 = 300;
37const STABILITY_CHECK_MS: u64 = 10;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40struct ChangedFile {
41 id: FileId,
42 path: PathBuf,
43}
44
45#[derive(Debug, Clone)]
47pub struct WatchOptions {
48 pub poll_interval: Option<Duration>,
49 pub additional_excludes: Vec<Exclusion<'static>>,
50}
51
52impl Default for WatchOptions {
53 fn default() -> Self {
54 Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
55 }
56}
57
58pub struct DatabaseWatcher<'a> {
60 database: Database<'a>,
61 watcher: Option<RecommendedWatcher>,
62 watched_paths: Vec<PathBuf>,
63 receiver: Option<Receiver<Vec<ChangedFile>>>,
64}
65
66impl<'a> DatabaseWatcher<'a> {
67 #[must_use]
68 pub fn new(database: Database<'a>) -> Self {
69 Self { database, watcher: None, watched_paths: Vec::new(), receiver: None }
70 }
71
72 pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
81 self.stop();
82
83 let config = &self.database.configuration;
84
85 let (tx, rx) = mpsc::channel();
86
87 let mut all_exclusions = vec![
88 Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
89 Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
90 Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
91 Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
92 ];
93 all_exclusions.extend(config.excludes.iter().cloned());
94 all_exclusions.extend(options.additional_excludes);
95
96 let glob_settings = &config.glob;
97 let mut glob_builder = GlobSetBuilder::new();
98 for ex in &all_exclusions {
99 if let Exclusion::Pattern(pat) = ex {
100 let glob = GlobBuilder::new(pat)
101 .case_insensitive(glob_settings.case_insensitive)
102 .literal_separator(glob_settings.literal_separator)
103 .backslash_escape(glob_settings.backslash_escape)
104 .empty_alternates(glob_settings.empty_alternates)
105 .build()?;
106 glob_builder.add(glob);
107 }
108 }
109
110 let glob_excludes = glob_builder.build()?;
111
112 let path_excludes: HashSet<PathBuf> = all_exclusions
113 .iter()
114 .filter_map(|ex| match ex {
115 Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
116 _ => None,
117 })
118 .collect();
119
120 let extensions: HashSet<String> = config.extensions.iter().map(std::string::ToString::to_string).collect();
121 let workspace = config.workspace.as_ref().to_path_buf();
122
123 let mut unique_watch_paths = HashSet::new();
128
129 for path in &config.paths {
130 let watch_path = Self::extract_watch_path(path.as_ref());
131 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
132
133 unique_watch_paths.insert(absolute_path);
134 }
135
136 for path in &config.includes {
137 let watch_path = Self::extract_watch_path(path.as_ref());
138 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
139
140 unique_watch_paths.insert(absolute_path);
141 }
142
143 let explicit_watch_paths: Vec<PathBuf> = unique_watch_paths
144 .iter()
145 .filter(|wp| glob_excludes.is_match(wp.as_path()) || path_excludes.contains(wp.as_path()))
146 .cloned()
147 .collect();
148
149 let mut watcher = RecommendedWatcher::new(
150 move |res: Result<Event, notify::Error>| {
151 if let Ok(event) = res
152 && let Some(changed) = Self::handle_event(
153 event,
154 &workspace,
155 &glob_excludes,
156 &path_excludes,
157 &extensions,
158 &explicit_watch_paths,
159 )
160 {
161 let _ = tx.send(changed);
162 }
163 },
164 Config::default()
165 .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
166 )
167 .map_err(DatabaseError::WatcherInit)?;
168
169 let mut watched_paths = Vec::new();
170 for path in unique_watch_paths {
171 watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
172 watched_paths.push(path.clone());
173 tracing::debug!("Watching path: {}", path.display());
174 }
175
176 tracing::info!("Database watcher started for workspace: {}", config.workspace.display());
177
178 self.watcher = Some(watcher);
179 self.watched_paths = watched_paths;
180 self.receiver = Some(rx);
181
182 Ok(())
183 }
184
185 pub fn stop(&mut self) {
187 if let Some(mut watcher) = self.watcher.take() {
188 for path in &self.watched_paths {
189 let _ = watcher.unwatch(path);
190 tracing::debug!("Stopped watching: {}", path.display());
191 }
192 }
193 self.watched_paths.clear();
194 self.receiver = None;
195 }
196
197 #[must_use]
199 pub fn is_watching(&self) -> bool {
200 self.watcher.is_some()
201 }
202
203 fn extract_watch_path(pattern: &str) -> PathBuf {
214 let is_glob = pattern.contains('*') || pattern.contains('?') || pattern.contains('[') || pattern.contains('{');
215
216 if !is_glob {
217 return PathBuf::from(pattern);
218 }
219
220 let first_glob_pos = pattern.find(['*', '?', '[', '{']).unwrap_or(pattern.len());
221
222 let base = &pattern[..first_glob_pos];
223
224 let base = base.trim_end_matches('/').trim_end_matches('\\');
225
226 if base.is_empty() { PathBuf::from(".") } else { PathBuf::from(base) }
227 }
228
229 fn handle_event(
230 event: Event,
231 workspace: &Path,
232 glob_excludes: &GlobSet,
233 path_excludes: &HashSet<PathBuf>,
234 extensions: &HashSet<String>,
235 explicit_watch_paths: &[PathBuf],
236 ) -> Option<Vec<ChangedFile>> {
237 tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);
238
239 if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
240 event.kind
241 {
242 tracing::debug!("Ignoring non-modification event: {:?}", event.kind);
243
244 return None;
245 }
246
247 let mut changed_files = Vec::new();
248
249 for path in event.paths {
250 if let Some(ext) = path.extension() {
252 if !extensions.contains(ext.to_string_lossy().as_ref()) {
253 continue;
254 }
255 } else {
256 continue;
257 }
258
259 let is_explicitly_watched = explicit_watch_paths.iter().any(|wp| path.starts_with(wp));
260 if !is_explicitly_watched {
261 if glob_excludes.is_match(&path) {
263 tracing::debug!("Skipping path excluded by pattern: {}", path.display());
264 continue;
265 }
266
267 if path_excludes.contains(&path) {
269 tracing::debug!("Skipping excluded path: {}", path.display());
270 continue;
271 }
272
273 let mut should_skip = false;
275 for ancestor in path.ancestors().skip(1) {
276 if path_excludes.contains(ancestor) {
277 tracing::debug!("Skipping path under excluded directory: {}", path.display());
278 should_skip = true;
279 break;
280 }
281 }
282
283 if should_skip {
284 continue;
285 }
286 }
287
288 let logical_name = path.strip_prefix(workspace).unwrap_or(&path).to_string_lossy().replace('\\', "/");
290 let file_id = FileId::new(&logical_name);
291
292 changed_files.push(ChangedFile { id: file_id, path: path.clone() });
293 }
294
295 if changed_files.is_empty() { None } else { Some(changed_files) }
296 }
297
298 pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
309 let Some(receiver) = &self.receiver else {
310 return Err(DatabaseError::WatcherNotActive);
311 };
312
313 let config = &self.database.configuration;
314 let workspace = config.workspace.as_ref().to_path_buf();
315
316 match receiver.recv_timeout(Duration::from_millis(WAIT_INTERNAL_MS)) {
317 Ok(changed_files) => {
318 let mut all_changed = changed_files;
319 loop {
320 match receiver.recv_timeout(Duration::from_millis(WAIT_DEBOUNCE_MS)) {
321 Ok(more) => all_changed.extend(more),
322 Err(RecvTimeoutError::Timeout) => break,
323 Err(RecvTimeoutError::Disconnected) => {
324 self.stop();
325 return Err(DatabaseError::WatcherNotActive);
326 }
327 }
328 }
329
330 let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
331 for changed in all_changed {
332 latest_changes.insert(changed.id, changed);
333 }
334 let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
335 let mut changed_ids = Vec::new();
336
337 for changed_file in &all_changed {
338 changed_ids.push(changed_file.id);
339
340 let Ok(file) = self.database.get(&changed_file.id) else {
341 if changed_file.path.exists() {
342 match File::read(&workspace, &changed_file.path, FileType::Host) {
343 Ok(file) => {
344 self.database.add(file);
345 tracing::debug!("Added new file to database: {}", changed_file.path.display());
346 }
347 Err(e) => {
348 tracing::error!("Failed to load new file {}: {}", changed_file.path.display(), e);
349 }
350 }
351 }
352
353 continue;
354 };
355
356 if !changed_file.path.exists() {
357 self.database.delete(changed_file.id);
358 tracing::trace!("Deleted file from database: {}", file.name);
359 continue;
360 }
361
362 match Self::read_stable_contents(&changed_file.path) {
363 Ok(contents) => {
364 if self.database.update(changed_file.id, Cow::Owned(contents)) {
365 tracing::trace!("Updated file in database: {}", file.name);
366 } else {
367 tracing::warn!("Failed to update file in database (ID not found): {}", file.name);
368 }
369 }
370 Err(e) => {
371 tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
372 }
373 }
374 }
375
376 Ok(changed_ids)
377 }
378 Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
379 Err(RecvTimeoutError::Disconnected) => {
380 self.stop();
381 Err(DatabaseError::WatcherNotActive)
382 }
383 }
384 }
385
386 fn read_stable_contents(path: &Path) -> std::io::Result<String> {
392 let contents = std::fs::read_to_string(path)?;
393
394 std::thread::sleep(Duration::from_millis(STABILITY_CHECK_MS));
395
396 if path.exists()
397 && let Ok(reread) = std::fs::read_to_string(path)
398 && reread != contents
399 {
400 tracing::debug!("File content changed during stability check: {}", path.display());
401
402 return Ok(reread);
403 }
404
405 Ok(contents)
406 }
407
408 #[must_use]
410 pub fn database(&self) -> &Database<'a> {
411 &self.database
412 }
413
414 #[must_use]
416 pub fn read_only_database(&self) -> ReadDatabase {
417 self.database.read_only()
418 }
419
420 pub fn database_mut(&mut self) -> &mut Database<'a> {
422 &mut self.database
423 }
424
425 pub fn with_database_mut<F, R>(&mut self, f: F) -> R
434 where
435 F: for<'x> FnOnce(&'x mut Database<'a>) -> R,
436 {
437 f(&mut self.database)
438 }
439
440 #[must_use]
442 pub fn into_database(self) -> Database<'a> {
443 let mut md = ManuallyDrop::new(self);
444 md.stop();
445 unsafe { std::ptr::read(&raw const md.database) }
446 }
447}
448
449impl Drop for DatabaseWatcher<'_> {
450 fn drop(&mut self) {
451 self.stop();
452 }
453}