1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::mem::ManuallyDrop;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::mpsc;
10use std::sync::mpsc::Receiver;
11use std::sync::mpsc::RecvTimeoutError;
12use std::time::Duration;
13
14use globset::Glob;
15use globset::GlobSet;
16use globset::GlobSetBuilder;
17use notify::Config;
18use notify::Event;
19use notify::EventKind;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher as NotifyWatcher;
23use notify::event::ModifyKind;
24
25use crate::Database;
26use crate::DatabaseReader;
27use crate::ReadDatabase;
28use crate::error::DatabaseError;
29use crate::exclusion::Exclusion;
30use crate::file::File;
31use crate::file::FileId;
32use crate::file::FileType;
33
34const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
35const WAIT_INTERNAL_MS: u64 = 100;
36const WAIT_DEBOUNCE_MS: u64 = 300;
37const STABILITY_CHECK_MS: u64 = 10;
38
39#[derive(Debug, Clone, PartialEq, Eq, Hash)]
40struct ChangedFile {
41 id: FileId,
42 path: PathBuf,
43}
44
45#[derive(Debug, Clone)]
47pub struct WatchOptions {
48 pub poll_interval: Option<Duration>,
49 pub additional_excludes: Vec<Exclusion<'static>>,
50}
51
52impl Default for WatchOptions {
53 fn default() -> Self {
54 Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
55 }
56}
57
58pub struct DatabaseWatcher<'a> {
60 database: Database<'a>,
61 watcher: Option<RecommendedWatcher>,
62 watched_paths: Vec<PathBuf>,
63 receiver: Option<Receiver<Vec<ChangedFile>>>,
64}
65
66impl<'a> DatabaseWatcher<'a> {
67 #[must_use]
68 pub fn new(database: Database<'a>) -> Self {
69 Self { database, watcher: None, watched_paths: Vec::new(), receiver: None }
70 }
71
72 pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
81 self.stop();
82
83 let config = &self.database.configuration;
84
85 let (tx, rx) = mpsc::channel();
86
87 let mut all_exclusions = vec![
88 Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
89 Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
90 Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
91 Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
92 ];
93 all_exclusions.extend(config.excludes.iter().cloned());
94 all_exclusions.extend(options.additional_excludes);
95
96 let mut glob_builder = GlobSetBuilder::new();
97 for ex in &all_exclusions {
98 if let Exclusion::Pattern(pat) = ex {
99 glob_builder.add(Glob::new(pat)?);
100 }
101 }
102 let glob_excludes = glob_builder.build()?;
103
104 let path_excludes: HashSet<PathBuf> = all_exclusions
105 .iter()
106 .filter_map(|ex| match ex {
107 Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
108 _ => None,
109 })
110 .collect();
111
112 let extensions: HashSet<String> = config.extensions.iter().map(std::string::ToString::to_string).collect();
113 let workspace = config.workspace.as_ref().to_path_buf();
114
115 let mut unique_watch_paths = HashSet::new();
120
121 for path in &config.paths {
122 let watch_path = Self::extract_watch_path(path.as_ref());
123 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
124
125 unique_watch_paths.insert(absolute_path);
126 }
127
128 for path in &config.includes {
129 let watch_path = Self::extract_watch_path(path.as_ref());
130 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
131
132 unique_watch_paths.insert(absolute_path);
133 }
134
135 let explicit_watch_paths: Vec<PathBuf> = unique_watch_paths.iter().cloned().collect();
136
137 let mut watcher = RecommendedWatcher::new(
138 move |res: Result<Event, notify::Error>| {
139 if let Ok(event) = res
140 && let Some(changed) = Self::handle_event(
141 event,
142 &workspace,
143 &glob_excludes,
144 &path_excludes,
145 &extensions,
146 &explicit_watch_paths,
147 )
148 {
149 let _ = tx.send(changed);
150 }
151 },
152 Config::default()
153 .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
154 )
155 .map_err(DatabaseError::WatcherInit)?;
156
157 let mut watched_paths = Vec::new();
158 for path in unique_watch_paths {
159 watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
160 watched_paths.push(path.clone());
161 tracing::debug!("Watching path: {}", path.display());
162 }
163
164 tracing::info!("Database watcher started for workspace: {}", config.workspace.display());
165
166 self.watcher = Some(watcher);
167 self.watched_paths = watched_paths;
168 self.receiver = Some(rx);
169
170 Ok(())
171 }
172
173 pub fn stop(&mut self) {
175 if let Some(mut watcher) = self.watcher.take() {
176 for path in &self.watched_paths {
177 let _ = watcher.unwatch(path);
178 tracing::debug!("Stopped watching: {}", path.display());
179 }
180 }
181 self.watched_paths.clear();
182 self.receiver = None;
183 }
184
185 #[must_use]
187 pub fn is_watching(&self) -> bool {
188 self.watcher.is_some()
189 }
190
191 fn extract_watch_path(pattern: &str) -> PathBuf {
202 let is_glob = pattern.contains('*') || pattern.contains('?') || pattern.contains('[') || pattern.contains('{');
203
204 if !is_glob {
205 return PathBuf::from(pattern);
206 }
207
208 let first_glob_pos = pattern.find(['*', '?', '[', '{']).unwrap_or(pattern.len());
209
210 let base = &pattern[..first_glob_pos];
211
212 let base = base.trim_end_matches('/').trim_end_matches('\\');
213
214 if base.is_empty() { PathBuf::from(".") } else { PathBuf::from(base) }
215 }
216
217 fn handle_event(
218 event: Event,
219 workspace: &Path,
220 glob_excludes: &GlobSet,
221 path_excludes: &HashSet<PathBuf>,
222 extensions: &HashSet<String>,
223 explicit_watch_paths: &[PathBuf],
224 ) -> Option<Vec<ChangedFile>> {
225 tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);
226
227 if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
228 event.kind
229 {
230 tracing::debug!("Ignoring non-modification event: {:?}", event.kind);
231
232 return None;
233 }
234
235 let mut changed_files = Vec::new();
236
237 for path in event.paths {
238 if let Some(ext) = path.extension() {
240 if !extensions.contains(ext.to_string_lossy().as_ref()) {
241 continue;
242 }
243 } else {
244 continue;
245 }
246
247 let is_explicitly_watched = explicit_watch_paths.iter().any(|wp| path.starts_with(wp));
248 if !is_explicitly_watched {
249 if glob_excludes.is_match(&path) {
251 tracing::debug!("Skipping path excluded by pattern: {}", path.display());
252 continue;
253 }
254
255 if path_excludes.contains(&path) {
257 tracing::debug!("Skipping excluded path: {}", path.display());
258 continue;
259 }
260
261 let mut should_skip = false;
263 for ancestor in path.ancestors().skip(1) {
264 if path_excludes.contains(ancestor) {
265 tracing::debug!("Skipping path under excluded directory: {}", path.display());
266 should_skip = true;
267 break;
268 }
269 }
270
271 if should_skip {
272 continue;
273 }
274 }
275
276 let logical_name = path.strip_prefix(workspace).unwrap_or(&path).to_string_lossy().replace('\\', "/");
278 let file_id = FileId::new(&logical_name);
279
280 changed_files.push(ChangedFile { id: file_id, path: path.clone() });
281 }
282
283 if changed_files.is_empty() { None } else { Some(changed_files) }
284 }
285
286 pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
297 let Some(receiver) = &self.receiver else {
298 return Err(DatabaseError::WatcherNotActive);
299 };
300
301 let config = &self.database.configuration;
302 let workspace = config.workspace.as_ref().to_path_buf();
303
304 match receiver.recv_timeout(Duration::from_millis(WAIT_INTERNAL_MS)) {
305 Ok(changed_files) => {
306 let mut all_changed = changed_files;
307 loop {
308 match receiver.recv_timeout(Duration::from_millis(WAIT_DEBOUNCE_MS)) {
309 Ok(more) => all_changed.extend(more),
310 Err(RecvTimeoutError::Timeout) => break,
311 Err(RecvTimeoutError::Disconnected) => {
312 self.stop();
313 return Err(DatabaseError::WatcherNotActive);
314 }
315 }
316 }
317
318 let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
319 for changed in all_changed {
320 latest_changes.insert(changed.id, changed);
321 }
322 let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
323 let mut changed_ids = Vec::new();
324
325 for changed_file in &all_changed {
326 changed_ids.push(changed_file.id);
327
328 let Ok(file) = self.database.get(&changed_file.id) else {
329 if changed_file.path.exists() {
330 match File::read(&workspace, &changed_file.path, FileType::Host) {
331 Ok(file) => {
332 self.database.add(file);
333 tracing::debug!("Added new file to database: {}", changed_file.path.display());
334 }
335 Err(e) => {
336 tracing::error!("Failed to load new file {}: {}", changed_file.path.display(), e);
337 }
338 }
339 }
340
341 continue;
342 };
343
344 if !changed_file.path.exists() {
345 self.database.delete(changed_file.id);
346 tracing::trace!("Deleted file from database: {}", file.name);
347 }
348
349 match Self::read_stable_contents(&changed_file.path) {
350 Ok(contents) => {
351 if self.database.update(changed_file.id, Cow::Owned(contents)) {
352 tracing::trace!("Updated file in database: {}", file.name);
353 } else {
354 tracing::warn!("Failed to update file in database (ID not found): {}", file.name);
355 }
356 }
357 Err(e) => {
358 tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
359 }
360 }
361 }
362
363 Ok(changed_ids)
364 }
365 Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
366 Err(RecvTimeoutError::Disconnected) => {
367 self.stop();
368 Err(DatabaseError::WatcherNotActive)
369 }
370 }
371 }
372
373 fn read_stable_contents(path: &Path) -> std::io::Result<String> {
379 let contents = std::fs::read_to_string(path)?;
380
381 std::thread::sleep(Duration::from_millis(STABILITY_CHECK_MS));
382
383 if path.exists()
384 && let Ok(reread) = std::fs::read_to_string(path)
385 && reread != contents
386 {
387 tracing::debug!("File content changed during stability check: {}", path.display());
388
389 return Ok(reread);
390 }
391
392 Ok(contents)
393 }
394
395 #[must_use]
397 pub fn database(&self) -> &Database<'a> {
398 &self.database
399 }
400
401 #[must_use]
403 pub fn read_only_database(&self) -> ReadDatabase {
404 self.database.read_only()
405 }
406
407 pub fn database_mut(&mut self) -> &mut Database<'a> {
409 &mut self.database
410 }
411
412 pub fn with_database_mut<F, R>(&mut self, f: F) -> R
421 where
422 F: for<'x> FnOnce(&'x mut Database<'a>) -> R,
423 {
424 f(&mut self.database)
425 }
426
427 #[must_use]
429 pub fn into_database(self) -> Database<'a> {
430 let mut md = ManuallyDrop::new(self);
431 md.stop();
432 unsafe { std::ptr::read(&raw const md.database) }
433 }
434}
435
436impl Drop for DatabaseWatcher<'_> {
437 fn drop(&mut self) {
438 self.stop();
439 }
440}