1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::mem::ManuallyDrop;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::mpsc;
10use std::sync::mpsc::Receiver;
11use std::sync::mpsc::RecvTimeoutError;
12use std::time::Duration;
13
14use globset::GlobBuilder;
15use globset::GlobSet;
16use globset::GlobSetBuilder;
17use notify::Config;
18use notify::Event;
19use notify::EventKind;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher as NotifyWatcher;
23use notify::event::ModifyKind;
24
25use crate::Database;
26use crate::DatabaseReader;
27use crate::ReadDatabase;
28use crate::error::DatabaseError;
29use crate::exclusion::Exclusion;
30use crate::file::File;
31use crate::file::FileId;
32use crate::file::FileType;
33
34const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
35const WAIT_INTERNAL_MS: u64 = 100;
36const WAIT_DEBOUNCE_MS: u64 = 300;
37const STABILITY_CHECK_MS: u64 = 10;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40struct ChangedFile {
41 id: FileId,
42 path: PathBuf,
43}
44
45#[derive(Debug, Clone)]
47pub struct WatchOptions {
48 pub poll_interval: Option<Duration>,
49 pub additional_excludes: Vec<Exclusion<'static>>,
50}
51
52impl Default for WatchOptions {
53 #[inline]
54 fn default() -> Self {
55 Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
56 }
57}
58
59pub struct DatabaseWatcher<'config> {
61 database: Database<'config>,
62 watcher: Option<RecommendedWatcher>,
63 watched_paths: Vec<PathBuf>,
64 receiver: Option<Receiver<Vec<ChangedFile>>>,
65}
66
67impl<'config> DatabaseWatcher<'config> {
68 #[inline]
69 #[must_use]
70 pub fn new(database: Database<'config>) -> Self {
71 Self { database, watcher: None, watched_paths: Vec::new(), receiver: None }
72 }
73
74 #[inline]
83 pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
84 self.stop();
85
86 let config = &self.database.configuration;
87
88 let (tx, rx) = mpsc::channel();
89
90 let mut all_exclusions = vec![
91 Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
92 Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
93 Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
94 Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
95 ];
96 all_exclusions.extend(config.excludes.iter().cloned());
97 all_exclusions.extend(options.additional_excludes);
98
99 let glob_settings = &config.glob;
100 let mut glob_builder = GlobSetBuilder::new();
101 for ex in &all_exclusions {
102 if let Exclusion::Pattern(pat) = ex {
103 let glob = GlobBuilder::new(pat)
104 .case_insensitive(glob_settings.case_insensitive)
105 .literal_separator(glob_settings.literal_separator)
106 .backslash_escape(glob_settings.backslash_escape)
107 .empty_alternates(glob_settings.empty_alternates)
108 .build()?;
109 glob_builder.add(glob);
110 }
111 }
112
113 let glob_excludes = glob_builder.build()?;
114
115 let path_excludes: HashSet<PathBuf> = all_exclusions
116 .iter()
117 .filter_map(|ex| match ex {
118 Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
119 Exclusion::Pattern(_) => None,
120 })
121 .collect();
122
123 let extensions: HashSet<String> = config.extensions.iter().map(std::string::ToString::to_string).collect();
124 let workspace = config.workspace.as_ref().to_path_buf();
125
126 let mut unique_watch_paths = HashSet::new();
131
132 for path in &config.paths {
133 let watch_path = Self::extract_watch_path(path.as_ref());
134 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
135
136 unique_watch_paths.insert(absolute_path);
137 }
138
139 for path in &config.includes {
140 let watch_path = Self::extract_watch_path(path.as_ref());
141 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
142
143 unique_watch_paths.insert(absolute_path);
144 }
145
146 let explicit_watch_paths: Vec<PathBuf> = unique_watch_paths
147 .iter()
148 .filter(|wp| glob_excludes.is_match(wp.as_path()) || path_excludes.contains(wp.as_path()))
149 .cloned()
150 .collect();
151
152 let mut watcher = RecommendedWatcher::new(
153 move |res: Result<Event, notify::Error>| {
154 if let Ok(event) = res
155 && let Some(changed) = Self::handle_event(
156 event,
157 &workspace,
158 &glob_excludes,
159 &path_excludes,
160 &extensions,
161 &explicit_watch_paths,
162 )
163 {
164 let _ = tx.send(changed);
165 }
166 },
167 Config::default()
168 .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
169 )
170 .map_err(DatabaseError::WatcherInit)?;
171
172 let mut watched_paths = Vec::new();
173 for path in unique_watch_paths {
174 watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
175 watched_paths.push(path.clone());
176 tracing::debug!("Watching path: {}", path.display());
177 }
178
179 tracing::info!("Database watcher started for workspace: {}", config.workspace.display());
180
181 self.watcher = Some(watcher);
182 self.watched_paths = watched_paths;
183 self.receiver = Some(rx);
184
185 Ok(())
186 }
187
188 #[inline]
190 pub fn stop(&mut self) {
191 if let Some(mut watcher) = self.watcher.take() {
192 for path in &self.watched_paths {
193 let _ = watcher.unwatch(path);
194 tracing::debug!("Stopped watching: {}", path.display());
195 }
196 }
197 self.watched_paths.clear();
198 self.receiver = None;
199 }
200
201 #[inline]
203 #[must_use]
204 pub fn is_watching(&self) -> bool {
205 self.watcher.is_some()
206 }
207
208 fn extract_watch_path(pattern: &str) -> PathBuf {
219 let is_glob = pattern.contains('*') || pattern.contains('?') || pattern.contains('[') || pattern.contains('{');
220
221 if !is_glob {
222 return PathBuf::from(pattern);
223 }
224
225 let first_glob_pos = pattern.find(['*', '?', '[', '{']).unwrap_or(pattern.len());
226
227 let base = &pattern[..first_glob_pos];
228
229 let base = base.trim_end_matches('/').trim_end_matches('\\');
230
231 if base.is_empty() { PathBuf::from(".") } else { PathBuf::from(base) }
232 }
233
234 fn handle_event(
235 event: Event,
236 workspace: &Path,
237 glob_excludes: &GlobSet,
238 path_excludes: &HashSet<PathBuf>,
239 extensions: &HashSet<String>,
240 explicit_watch_paths: &[PathBuf],
241 ) -> Option<Vec<ChangedFile>> {
242 tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);
243
244 if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
245 event.kind
246 {
247 tracing::debug!("Ignoring non-modification event: {:?}", event.kind);
248
249 return None;
250 }
251
252 let mut changed_files = Vec::new();
253
254 for path in event.paths {
255 if let Some(ext) = path.extension() {
257 if !extensions.contains(ext.to_string_lossy().as_ref()) {
258 continue;
259 }
260 } else {
261 continue;
262 }
263
264 let is_explicitly_watched = explicit_watch_paths.iter().any(|wp| path.starts_with(wp));
265 if !is_explicitly_watched {
266 if glob_excludes.is_match(&path) {
268 tracing::debug!("Skipping path excluded by pattern: {}", path.display());
269 continue;
270 }
271
272 if path_excludes.contains(&path) {
274 tracing::debug!("Skipping excluded path: {}", path.display());
275 continue;
276 }
277
278 let mut should_skip = false;
280 for ancestor in path.ancestors().skip(1) {
281 if path_excludes.contains(ancestor) {
282 tracing::debug!("Skipping path under excluded directory: {}", path.display());
283 should_skip = true;
284 break;
285 }
286 }
287
288 if should_skip {
289 continue;
290 }
291 }
292
293 let logical_name = path.strip_prefix(workspace).unwrap_or(&path).to_string_lossy().replace('\\', "/");
295 let file_id = FileId::new(&logical_name);
296
297 changed_files.push(ChangedFile { id: file_id, path: path.clone() });
298 }
299
300 if changed_files.is_empty() { None } else { Some(changed_files) }
301 }
302
303 #[inline]
314 pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
315 let Some(receiver) = &self.receiver else {
316 return Err(DatabaseError::WatcherNotActive);
317 };
318
319 let config = &self.database.configuration;
320 let workspace = config.workspace.as_ref().to_path_buf();
321
322 match receiver.recv_timeout(Duration::from_millis(WAIT_INTERNAL_MS)) {
323 Ok(changed_files) => {
324 let mut all_changed = changed_files;
325 loop {
326 match receiver.recv_timeout(Duration::from_millis(WAIT_DEBOUNCE_MS)) {
327 Ok(more) => all_changed.extend(more),
328 Err(RecvTimeoutError::Timeout) => break,
329 Err(RecvTimeoutError::Disconnected) => {
330 self.stop();
331 return Err(DatabaseError::WatcherNotActive);
332 }
333 }
334 }
335
336 let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
337 for changed in all_changed {
338 latest_changes.insert(changed.id, changed);
339 }
340 let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
341 let mut changed_ids = Vec::new();
342
343 for changed_file in &all_changed {
344 changed_ids.push(changed_file.id);
345
346 let Ok(file) = self.database.get(&changed_file.id) else {
347 if changed_file.path.exists() {
348 match File::read(&workspace, &changed_file.path, FileType::Host) {
349 Ok(file) => {
350 self.database.add(file);
351 tracing::debug!("Added new file to database: {}", changed_file.path.display());
352 }
353 Err(e) => {
354 tracing::error!("Failed to load new file {}: {}", changed_file.path.display(), e);
355 }
356 }
357 }
358
359 continue;
360 };
361
362 if !changed_file.path.exists() {
363 self.database.delete(changed_file.id);
364 tracing::trace!("Deleted file from database: {}", file.name);
365 continue;
366 }
367
368 match Self::read_stable_contents(&changed_file.path) {
369 Ok(contents) => {
370 if self.database.update(changed_file.id, Cow::Owned(contents)) {
371 tracing::trace!("Updated file in database: {}", file.name);
372 } else {
373 tracing::warn!("Failed to update file in database (ID not found): {}", file.name);
374 }
375 }
376 Err(e) => {
377 tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
378 }
379 }
380 }
381
382 Ok(changed_ids)
383 }
384 Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
385 Err(RecvTimeoutError::Disconnected) => {
386 self.stop();
387 Err(DatabaseError::WatcherNotActive)
388 }
389 }
390 }
391
392 fn read_stable_contents(path: &Path) -> std::io::Result<String> {
398 let contents = std::fs::read_to_string(path)?;
399
400 std::thread::sleep(Duration::from_millis(STABILITY_CHECK_MS));
401
402 if path.exists()
403 && let Ok(reread) = std::fs::read_to_string(path)
404 && reread != contents
405 {
406 tracing::debug!("File content changed during stability check: {}", path.display());
407
408 return Ok(reread);
409 }
410
411 Ok(contents)
412 }
413
414 #[inline]
416 #[must_use]
417 pub fn database(&self) -> &Database<'config> {
418 &self.database
419 }
420
421 #[inline]
423 #[must_use]
424 pub fn read_only_database(&self) -> ReadDatabase {
425 self.database.read_only()
426 }
427
428 #[inline]
430 pub fn database_mut(&mut self) -> &mut Database<'config> {
431 &mut self.database
432 }
433
434 #[inline]
443 pub fn with_database_mut<F, R>(&mut self, f: F) -> R
444 where
445 F: for<'borrow> FnOnce(&'borrow mut Database<'config>) -> R,
446 {
447 f(&mut self.database)
448 }
449
450 #[inline]
452 #[must_use]
453 pub fn into_database(self) -> Database<'config> {
454 let mut md = ManuallyDrop::new(self);
455 md.stop();
456 unsafe { std::ptr::read(&raw const md.database) }
459 }
460}
461
462impl Drop for DatabaseWatcher<'_> {
463 #[inline]
464 fn drop(&mut self) {
465 self.stop();
466 }
467}