1use std::borrow::Cow;
4use std::collections::HashMap;
5use std::collections::HashSet;
6use std::mem::ManuallyDrop;
7use std::path::Path;
8use std::path::PathBuf;
9use std::sync::mpsc;
10use std::sync::mpsc::Receiver;
11use std::sync::mpsc::RecvTimeoutError;
12use std::time::Duration;
13
14use globset::Glob;
15use globset::GlobSet;
16use globset::GlobSetBuilder;
17use notify::Config;
18use notify::Event;
19use notify::EventKind;
20use notify::RecommendedWatcher;
21use notify::RecursiveMode;
22use notify::Watcher as NotifyWatcher;
23use notify::event::ModifyKind;
24
25use crate::Database;
26use crate::DatabaseReader;
27use crate::ReadDatabase;
28use crate::error::DatabaseError;
29use crate::exclusion::Exclusion;
30use crate::file::File;
31use crate::file::FileId;
32use crate::file::FileType;
33
34const DEFAULT_POLL_INTERVAL_MS: u64 = 1000;
35
36#[derive(Debug, Clone, PartialEq, Eq, Hash)]
37struct ChangedFile {
38 id: FileId,
39 path: PathBuf,
40}
41
42#[derive(Debug, Clone)]
44pub struct WatchOptions {
45 pub poll_interval: Option<Duration>,
46 pub additional_excludes: Vec<Exclusion<'static>>,
47}
48
49impl Default for WatchOptions {
50 fn default() -> Self {
51 Self { poll_interval: Some(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS)), additional_excludes: vec![] }
52 }
53}
54
55pub struct DatabaseWatcher<'a> {
57 database: Database<'a>,
58 watcher: Option<RecommendedWatcher>,
59 watched_paths: Vec<PathBuf>,
60 receiver: Option<Receiver<Vec<ChangedFile>>>,
61}
62
63impl<'a> DatabaseWatcher<'a> {
64 #[must_use]
65 pub fn new(database: Database<'a>) -> Self {
66 Self { database, watcher: None, watched_paths: Vec::new(), receiver: None }
67 }
68
69 pub fn watch(&mut self, options: WatchOptions) -> Result<(), DatabaseError> {
78 self.stop();
79
80 let config = &self.database.configuration;
81
82 let (tx, rx) = mpsc::channel();
83
84 let mut all_exclusions = vec![
85 Exclusion::Pattern(Cow::Borrowed("**/node_modules/**")),
86 Exclusion::Pattern(Cow::Borrowed("**/.git/**")),
87 Exclusion::Pattern(Cow::Borrowed("**/.idea/**")),
88 Exclusion::Pattern(Cow::Borrowed("**/vendor/**")),
89 ];
90 all_exclusions.extend(config.excludes.iter().cloned());
91 all_exclusions.extend(options.additional_excludes);
92
93 let mut glob_builder = GlobSetBuilder::new();
94 for ex in &all_exclusions {
95 if let Exclusion::Pattern(pat) = ex {
96 glob_builder.add(Glob::new(pat)?);
97 }
98 }
99 let glob_excludes = glob_builder.build()?;
100
101 let path_excludes: HashSet<PathBuf> = all_exclusions
102 .iter()
103 .filter_map(|ex| match ex {
104 Exclusion::Path(p) => Some(p.as_ref().to_path_buf()),
105 _ => None,
106 })
107 .collect();
108
109 let extensions: HashSet<String> = config.extensions.iter().map(std::string::ToString::to_string).collect();
110 let workspace = config.workspace.as_ref().to_path_buf();
111
112 let mut watcher = RecommendedWatcher::new(
113 move |res: Result<Event, notify::Error>| {
114 if let Ok(event) = res
115 && let Some(changed) =
116 Self::handle_event(event, &workspace, &glob_excludes, &path_excludes, &extensions)
117 {
118 let _ = tx.send(changed);
119 }
120 },
121 Config::default()
122 .with_poll_interval(options.poll_interval.unwrap_or(Duration::from_millis(DEFAULT_POLL_INTERVAL_MS))),
123 )
124 .map_err(DatabaseError::WatcherInit)?;
125
126 let mut unique_watch_paths = HashSet::new();
127
128 for path in &config.paths {
129 let watch_path = Self::extract_watch_path(path.as_ref());
130 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
131
132 unique_watch_paths.insert(absolute_path);
133 }
134
135 for path in &config.includes {
136 let watch_path = Self::extract_watch_path(path.as_ref());
137 let absolute_path = if watch_path.is_absolute() { watch_path } else { config.workspace.join(watch_path) };
138
139 unique_watch_paths.insert(absolute_path);
140 }
141
142 let mut watched_paths = Vec::new();
143 for path in unique_watch_paths {
144 watcher.watch(&path, RecursiveMode::Recursive).map_err(DatabaseError::WatcherWatch)?;
145 watched_paths.push(path.clone());
146 tracing::debug!("Watching path: {}", path.display());
147 }
148
149 tracing::info!("Database watcher started for workspace: {}", config.workspace.display());
150
151 self.watcher = Some(watcher);
152 self.watched_paths = watched_paths;
153 self.receiver = Some(rx);
154
155 Ok(())
156 }
157
158 pub fn stop(&mut self) {
160 if let Some(mut watcher) = self.watcher.take() {
161 for path in &self.watched_paths {
162 let _ = watcher.unwatch(path);
163 tracing::debug!("Stopped watching: {}", path.display());
164 }
165 }
166 self.watched_paths.clear();
167 self.receiver = None;
168 }
169
170 #[must_use]
172 pub fn is_watching(&self) -> bool {
173 self.watcher.is_some()
174 }
175
176 fn extract_watch_path(pattern: &str) -> PathBuf {
187 let is_glob = pattern.contains('*') || pattern.contains('?') || pattern.contains('[') || pattern.contains('{');
188
189 if !is_glob {
190 return PathBuf::from(pattern);
191 }
192
193 let first_glob_pos = pattern.find(['*', '?', '[', '{']).unwrap_or(pattern.len());
194
195 let base = &pattern[..first_glob_pos];
196
197 let base = base.trim_end_matches('/').trim_end_matches('\\');
198
199 if base.is_empty() { PathBuf::from(".") } else { PathBuf::from(base) }
200 }
201
202 fn handle_event(
203 event: Event,
204 workspace: &Path,
205 glob_excludes: &GlobSet,
206 path_excludes: &HashSet<PathBuf>,
207 extensions: &HashSet<String>,
208 ) -> Option<Vec<ChangedFile>> {
209 tracing::debug!("Watcher received event: kind={:?}, paths={:?}", event.kind, event.paths);
210
211 if let EventKind::Other | EventKind::Any | EventKind::Access(_) | EventKind::Modify(ModifyKind::Metadata(_)) =
212 event.kind
213 {
214 tracing::debug!("Ignoring non-modification event: {:?}", event.kind);
215
216 return None;
217 }
218
219 let mut changed_files = Vec::new();
220
221 for path in event.paths {
222 if let Some(ext) = path.extension() {
224 if !extensions.contains(ext.to_string_lossy().as_ref()) {
225 continue;
226 }
227 } else {
228 continue;
229 }
230
231 if glob_excludes.is_match(&path) {
233 tracing::debug!("Skipping path excluded by pattern: {}", path.display());
234 continue;
235 }
236
237 if path_excludes.contains(&path) {
239 tracing::debug!("Skipping excluded path: {}", path.display());
240 continue;
241 }
242
243 let mut should_skip = false;
245 for ancestor in path.ancestors().skip(1) {
246 if path_excludes.contains(ancestor) {
247 tracing::debug!("Skipping path under excluded directory: {}", path.display());
248 should_skip = true;
249 break;
250 }
251 }
252 if should_skip {
253 continue;
254 }
255
256 let logical_name = path.strip_prefix(workspace).unwrap_or(&path).to_string_lossy();
257 let file_id = FileId::new(logical_name.as_ref());
258
259 changed_files.push(ChangedFile { id: file_id, path: path.clone() });
260 }
261
262 if changed_files.is_empty() { None } else { Some(changed_files) }
263 }
264
265 pub fn wait(&mut self) -> Result<Vec<FileId>, DatabaseError> {
276 let Some(receiver) = &self.receiver else {
277 return Err(DatabaseError::WatcherNotActive);
278 };
279
280 let config = &self.database.configuration;
281 let workspace = config.workspace.as_ref().to_path_buf();
282
283 match receiver.recv_timeout(Duration::from_millis(100)) {
284 Ok(changed_files) => {
285 std::thread::sleep(Duration::from_millis(250));
286 let mut all_changed = changed_files;
287 while let Ok(more) = receiver.try_recv() {
288 all_changed.extend(more);
289 }
290
291 let mut latest_changes: HashMap<FileId, ChangedFile> = HashMap::new();
292 for changed in all_changed {
293 latest_changes.insert(changed.id, changed);
294 }
295 let all_changed: Vec<ChangedFile> = latest_changes.into_values().collect();
296 let mut changed_ids = Vec::new();
297
298 for changed_file in &all_changed {
299 changed_ids.push(changed_file.id);
300
301 match self.database.get(&changed_file.id) {
302 Ok(file) => {
303 if changed_file.path.exists() {
304 match std::fs::read_to_string(&changed_file.path) {
305 Ok(contents) => {
306 self.database.update(changed_file.id, Cow::Owned(contents));
307 tracing::trace!("Updated file in database: {}", file.name);
308 }
309 Err(e) => {
310 tracing::error!("Failed to read file {}: {}", changed_file.path.display(), e);
311 }
312 }
313 } else {
314 self.database.delete(changed_file.id);
315 tracing::trace!("Deleted file from database: {}", file.name);
316 }
317 }
318 Err(_) => {
319 if changed_file.path.exists() {
320 match File::read(&workspace, &changed_file.path, FileType::Host) {
321 Ok(file) => {
322 self.database.add(file);
323 tracing::debug!("Added new file to database: {}", changed_file.path.display());
324 }
325 Err(e) => {
326 tracing::error!(
327 "Failed to load new file {}: {}",
328 changed_file.path.display(),
329 e
330 );
331 }
332 }
333 }
334 }
335 }
336 }
337
338 Ok(changed_ids)
339 }
340 Err(RecvTimeoutError::Timeout) => Ok(Vec::new()),
341 Err(RecvTimeoutError::Disconnected) => {
342 self.stop();
343 Err(DatabaseError::WatcherNotActive)
344 }
345 }
346 }
347
348 #[must_use]
350 pub fn database(&self) -> &Database<'a> {
351 &self.database
352 }
353
354 #[must_use]
356 pub fn read_only_database(&self) -> ReadDatabase {
357 self.database.read_only()
358 }
359
360 pub fn database_mut(&mut self) -> &mut Database<'a> {
362 &mut self.database
363 }
364
365 pub fn with_database_mut<F, R>(&mut self, f: F) -> R
374 where
375 F: for<'x> FnOnce(&'x mut Database<'a>) -> R,
376 {
377 f(&mut self.database)
378 }
379
380 #[must_use]
382 pub fn into_database(self) -> Database<'a> {
383 let mut md = ManuallyDrop::new(self);
384 md.stop();
385 unsafe { std::ptr::read(&raw const md.database) }
386 }
387}
388
389impl Drop for DatabaseWatcher<'_> {
390 fn drop(&mut self) {
391 self.stop();
392 }
393}