1use crate::error::{Error, Result};
7use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::collections::HashMap;
9use std::path::{Path, PathBuf};
10use std::sync::{Arc, Mutex};
11use std::time::{Duration, Instant};
12use tokio::sync::mpsc;
13use tokio::time::sleep;
14
15#[derive(Debug, Clone, PartialEq, Eq)]
17pub enum ChangeKind {
18 Created,
20 Modified,
22 Deleted,
24 Renamed {
26 old: PathBuf,
28 new: PathBuf,
30 },
31}
32
33#[derive(Debug, Clone)]
35pub struct ChangeEvent {
36 pub repo_root: PathBuf,
38 pub path: PathBuf,
40 pub kind: ChangeKind,
42 pub timestamp: Instant,
44}
45
46impl ChangeEvent {
47 pub fn new(repo_root: PathBuf, path: PathBuf, kind: ChangeKind) -> Self {
49 Self {
50 repo_root,
51 path,
52 kind,
53 timestamp: Instant::now(),
54 }
55 }
56}
57
58struct Debouncer {
60 pending: Arc<Mutex<HashMap<PathBuf, (ChangeKind, Instant)>>>,
61 tx: mpsc::UnboundedSender<ChangeEvent>,
62 debounce_duration: Duration,
63}
64
65impl Debouncer {
66 fn new(tx: mpsc::UnboundedSender<ChangeEvent>, debounce_duration: Duration) -> Self {
68 Self {
69 pending: Arc::new(Mutex::new(HashMap::new())),
70 tx,
71 debounce_duration,
72 }
73 }
74
75 fn add_event(&self, event: ChangeEvent) {
77 let mut pending = self.pending.lock().unwrap();
78 pending.insert(event.path.clone(), (event.kind.clone(), event.timestamp));
79
80 let pending_clone = Arc::clone(&self.pending);
82 let tx = self.tx.clone();
83 let path = event.path.clone();
84 let repo_root = event.repo_root.clone();
85 let duration = self.debounce_duration;
86
87 tokio::spawn(async move {
88 sleep(duration).await;
89
90 let mut pending = pending_clone.lock().unwrap();
91 if let Some((kind, timestamp)) = pending.remove(&path) {
92 if timestamp.elapsed() >= duration {
94 let event = ChangeEvent {
95 repo_root,
96 path,
97 kind,
98 timestamp,
99 };
100 let _ = tx.send(event);
101 }
102 }
103 });
104 }
105}
106
107pub struct FileWatcher {
109 watcher: RecommendedWatcher,
110 #[allow(dead_code)] debouncer: Arc<Debouncer>,
112 change_rx: mpsc::UnboundedReceiver<ChangeEvent>,
113 watched_paths: Arc<Mutex<Vec<PathBuf>>>,
114}
115
116impl FileWatcher {
117 pub fn new() -> Result<Self> {
119 Self::with_debounce(Duration::from_millis(50))
120 }
121
122 pub fn with_debounce(debounce_duration: Duration) -> Result<Self> {
124 let (change_tx, change_rx) = mpsc::unbounded_channel();
125 let debouncer = Arc::new(Debouncer::new(change_tx.clone(), debounce_duration));
126
127 let (notify_tx, mut notify_rx) = mpsc::unbounded_channel();
129
130 let watcher = RecommendedWatcher::new(
131 move |res: notify::Result<Event>| {
132 if let Ok(event) = res {
133 let _ = notify_tx.send(event);
134 }
135 },
136 Config::default(),
137 )
138 .map_err(|e| Error::watcher(format!("Failed to create watcher: {e}")))?;
139
140 let debouncer_clone = Arc::clone(&debouncer);
142 tokio::spawn(async move {
143 while let Some(event) = notify_rx.recv().await {
144 if let Some(path) = event.paths.first() {
147 let repo_root = path.clone();
148 if let Some(change_event) = Self::convert_event(event, repo_root) {
149 debouncer_clone.add_event(change_event);
150 }
151 }
152 }
153 });
154
155 Ok(Self {
156 watcher,
157 debouncer,
158 change_rx,
159 watched_paths: Arc::new(Mutex::new(Vec::new())),
160 })
161 }
162
163 pub fn watch_dir(&mut self, path: &Path, _repo_root: PathBuf) -> Result<()> {
165 self.watcher
166 .watch(path, RecursiveMode::Recursive)
167 .map_err(|e| Error::watcher(format!("Failed to watch {}: {}", path.display(), e)))?;
168
169 let mut paths = self.watched_paths.lock().unwrap();
170 paths.push(path.to_path_buf());
171
172 Ok(())
173 }
174
175 pub fn unwatch(&mut self, path: &Path) -> Result<()> {
177 self.watcher
178 .unwatch(path)
179 .map_err(|e| Error::watcher(format!("Failed to unwatch {}: {}", path.display(), e)))?;
180
181 let mut paths = self.watched_paths.lock().unwrap();
182 paths.retain(|p| p != path);
183
184 Ok(())
185 }
186
187 pub async fn next_change(&mut self) -> Option<ChangeEvent> {
189 self.change_rx.recv().await
190 }
191
192 fn convert_event(event: Event, repo_root: PathBuf) -> Option<ChangeEvent> {
194 let path = event.paths.first()?.clone();
195
196 let kind = match event.kind {
197 EventKind::Create(_) => ChangeKind::Created,
198 EventKind::Modify(_) => ChangeKind::Modified,
199 EventKind::Remove(_) => ChangeKind::Deleted,
200 EventKind::Any => ChangeKind::Modified,
201 _ => return None,
202 };
203
204 Some(ChangeEvent::new(repo_root, path, kind))
205 }
206}
207
208impl Default for FileWatcher {
209 fn default() -> Self {
210 Self::new().expect("Failed to create file watcher")
211 }
212}
213
214#[cfg(test)]
215mod tests {
216 use super::*;
217 use std::fs;
218 use tempfile::TempDir;
219 use tokio::time::{sleep, timeout};
220
221 #[tokio::test]
223 async fn test_file_watcher_creation() {
224 let watcher = FileWatcher::new();
225 assert!(watcher.is_ok(), "Operation should succeed");
226 }
227
228 #[tokio::test]
229 async fn test_file_watcher_with_custom_debounce() {
230 let watcher = FileWatcher::with_debounce(Duration::from_millis(200));
231 assert!(watcher.is_ok(), "Operation should succeed");
232 }
233
234 #[tokio::test]
235 async fn test_debouncer() {
236 let (tx, mut rx) = mpsc::unbounded_channel();
237 let debouncer = Debouncer::new(tx, Duration::from_millis(50));
238
239 let event = ChangeEvent::new(
240 PathBuf::from("/repo"),
241 PathBuf::from("/repo/file.txt"),
242 ChangeKind::Modified,
243 );
244
245 debouncer.add_event(event);
246
247 sleep(Duration::from_millis(200)).await;
249
250 let received = rx.recv().await;
251 assert!(received.is_some(), "Should have value");
252 }
253
254 #[tokio::test]
255 async fn test_watch_directory() {
256 let temp_dir = TempDir::new().unwrap();
257 let mut watcher = FileWatcher::new().unwrap();
258
259 let result = watcher.watch_dir(temp_dir.path(), temp_dir.path().to_path_buf());
260 assert!(result.is_ok(), "Operation should succeed");
261 }
262
263 #[tokio::test]
264 async fn test_watch_and_unwatch_directory() {
265 let temp_dir = TempDir::new().unwrap();
266 let mut watcher = FileWatcher::new().unwrap();
267
268 let result = watcher.watch_dir(temp_dir.path(), temp_dir.path().to_path_buf());
269 assert!(result.is_ok(), "Operation should succeed");
270
271 let result = watcher.unwatch(temp_dir.path());
272 assert!(result.is_ok(), "Operation should succeed");
273 }
274
275 #[tokio::test]
276 async fn test_watch_invalid_directory() {
277 let mut watcher = FileWatcher::new().unwrap();
278 let invalid_path = Path::new("/nonexistent/directory");
279
280 let result = watcher.watch_dir(invalid_path, PathBuf::from("/tmp"));
281 assert!(
282 result.is_err(),
283 "Should fail to watch nonexistent directory"
284 );
285 }
286
287 #[tokio::test]
288 async fn test_watch_multiple_directories() {
289 let temp_dir1 = TempDir::new().unwrap();
290 let temp_dir2 = TempDir::new().unwrap();
291 let mut watcher = FileWatcher::new().unwrap();
292
293 let result1 = watcher.watch_dir(temp_dir1.path(), temp_dir1.path().to_path_buf());
294 assert!(result1.is_ok(), "Operation should succeed");
295
296 let result2 = watcher.watch_dir(temp_dir2.path(), temp_dir2.path().to_path_buf());
297 assert!(result2.is_ok(), "Operation should succeed");
298
299 let watched_paths = watcher.watched_paths.lock().unwrap();
300 assert_eq!(watched_paths.len(), 2, "Should have 2 items");
301 assert!(watched_paths.contains(&temp_dir1.path().to_path_buf()));
302 assert!(watched_paths.contains(&temp_dir2.path().to_path_buf()));
303 }
304
305 #[tokio::test]
307 #[cfg_attr(any(feature = "ci-skip"), ignore)]
308 async fn test_file_creation_detection() {
309 let temp_dir = TempDir::new().unwrap();
310 let mut watcher = FileWatcher::with_debounce(Duration::from_millis(50)).unwrap();
311
312 watcher
313 .watch_dir(temp_dir.path(), temp_dir.path().to_path_buf())
314 .unwrap();
315
316 sleep(Duration::from_millis(500)).await;
318
319 let file_path = temp_dir.path().join("new_file.txt");
320 fs::write(&file_path, "content").unwrap();
321
322 for _attempt in 0..3 {
324 let event_result = timeout(Duration::from_secs(5), watcher.next_change()).await;
325
326 if let Ok(Some(event)) = event_result {
327 if event.path.ends_with("new_file.txt") {
328 assert!(matches!(
329 event.kind,
330 ChangeKind::Created | ChangeKind::Modified
331 ));
332 return; }
334 }
335
336 sleep(Duration::from_millis(500)).await;
337 }
338
339 eprintln!("File creation event not detected - this can be flaky on some systems");
341 }
342
343 #[tokio::test]
344 #[cfg_attr(any(feature = "ci-skip"), ignore)]
345 async fn test_file_modification_detection() {
346 let temp_dir = TempDir::new().unwrap();
347 let file_path = temp_dir.path().join("existing_file.txt");
348
349 fs::write(&file_path, "initial content").unwrap();
350
351 let mut watcher = FileWatcher::with_debounce(Duration::from_millis(50)).unwrap();
352 watcher
353 .watch_dir(temp_dir.path(), temp_dir.path().to_path_buf())
354 .unwrap();
355
356 sleep(Duration::from_millis(500)).await;
357
358 fs::write(&file_path, "modified content").unwrap();
359
360 for _attempt in 0..3 {
362 let event_result = timeout(Duration::from_secs(5), watcher.next_change()).await;
363
364 if let Ok(Some(event)) = event_result {
365 if event.path.ends_with("existing_file.txt") {
366 assert!(matches!(
367 event.kind,
368 ChangeKind::Created | ChangeKind::Modified
369 ));
370 return; }
372 }
373
374 sleep(Duration::from_millis(500)).await;
375 }
376
377 eprintln!("File modification event not detected - this can be flaky on some systems");
378 }
379
380 #[tokio::test]
382 async fn test_event_convert_function() {
383 use notify::{Event, EventKind};
384
385 let repo_root = PathBuf::from("/repo");
386 let file_path = PathBuf::from("/repo/test.txt");
387
388 let create_event = Event {
390 kind: EventKind::Create(notify::event::CreateKind::File),
391 paths: vec![file_path.clone()],
392 attrs: Default::default(),
393 };
394 let change_event = FileWatcher::convert_event(create_event, repo_root.clone());
395 assert!(change_event.is_some(), "Should have value");
396 let change_event = change_event.unwrap();
397 assert_eq!(change_event.kind, ChangeKind::Created);
398
399 let modify_event = Event {
401 kind: EventKind::Modify(notify::event::ModifyKind::Data(
402 notify::event::DataChange::Content,
403 )),
404 paths: vec![file_path.clone()],
405 attrs: Default::default(),
406 };
407 let change_event = FileWatcher::convert_event(modify_event, repo_root.clone());
408 assert!(change_event.is_some(), "Should have value");
409 let change_event = change_event.unwrap();
410 assert_eq!(change_event.kind, ChangeKind::Modified);
411
412 let remove_event = Event {
414 kind: EventKind::Remove(notify::event::RemoveKind::File),
415 paths: vec![file_path.clone()],
416 attrs: Default::default(),
417 };
418 let change_event = FileWatcher::convert_event(remove_event, repo_root);
419 assert!(change_event.is_some(), "Should have value");
420 let change_event = change_event.unwrap();
421 assert_eq!(change_event.kind, ChangeKind::Deleted);
422 }
423
424 #[test]
425 fn test_change_kind_equality_and_inequality() {
426 assert_ne!(ChangeKind::Created, ChangeKind::Modified);
428 assert_ne!(ChangeKind::Modified, ChangeKind::Deleted);
429 assert_ne!(ChangeKind::Created, ChangeKind::Deleted);
430
431 let renamed1 = ChangeKind::Renamed {
433 old: PathBuf::from("old.txt"),
434 new: PathBuf::from("new.txt"),
435 };
436 let renamed2 = ChangeKind::Renamed {
437 old: PathBuf::from("old.txt"),
438 new: PathBuf::from("new.txt"),
439 };
440 assert_eq!(
441 renamed1, renamed2,
442 "Renamed variants with same paths should be equal"
443 );
444
445 let renamed3 = ChangeKind::Renamed {
447 old: PathBuf::from("different.txt"),
448 new: PathBuf::from("new.txt"),
449 };
450 assert_ne!(
451 renamed1, renamed3,
452 "Renamed variants with different paths should not be equal"
453 );
454
455 assert_ne!(renamed1, ChangeKind::Created);
457 assert_ne!(renamed1, ChangeKind::Modified);
458 assert_ne!(renamed1, ChangeKind::Deleted);
459 }
460
461 #[test]
462 fn test_change_event_creation() {
463 let event = ChangeEvent::new(
464 PathBuf::from("/repo"),
465 PathBuf::from("/repo/file.txt"),
466 ChangeKind::Modified,
467 );
468
469 assert_eq!(event.repo_root, PathBuf::from("/repo"));
470 assert_eq!(event.path, PathBuf::from("/repo/file.txt"));
471 assert_eq!(event.kind, ChangeKind::Modified);
472 }
473
474 #[test]
475 fn test_change_event_timestamp() {
476 let before = Instant::now();
477 let event = ChangeEvent::new(
478 PathBuf::from("/repo"),
479 PathBuf::from("/repo/file.txt"),
480 ChangeKind::Modified,
481 );
482 let after = Instant::now();
483
484 assert!(event.timestamp >= before);
485 assert!(event.timestamp <= after);
486 }
487}