1use async_trait::async_trait;
2use notify::{RecommendedWatcher, RecursiveMode, Config};
3use notify_debouncer_full::{new_debouncer_opt, DebounceEventResult, DebouncedEvent, Debouncer, FileIdMap};
4use std::collections::HashSet;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::{mpsc, RwLock};
9use tracing::{debug, error, info, warn};
10
11use crate::error::Result;
12
13#[derive(Debug, Clone)]
15pub enum FileSystemEvent {
16 Created(PathBuf),
18 Modified(PathBuf),
20 Deleted(PathBuf),
22 Renamed { from: PathBuf, to: PathBuf },
24}
25
26#[async_trait]
28pub trait FileSystemWatcher: Send + Sync {
29 async fn start_watching(&self, directories: &[PathBuf]) -> Result<()>;
31
32 async fn stop_watching(&self) -> Result<()>;
34
35 fn get_event_receiver(&self) -> mpsc::Receiver<FileSystemEvent>;
37
38 async fn add_watch_path(&self, path: &Path) -> Result<()>;
40
41 async fn remove_watch_path(&self, path: &Path) -> Result<()>;
43
44 async fn is_watching(&self, path: &Path) -> bool;
46}
47
48pub struct CrossPlatformWatcher {
50 debouncer: Arc<RwLock<Option<Debouncer<RecommendedWatcher, FileIdMap>>>>,
51 event_sender: mpsc::Sender<FileSystemEvent>,
52 event_receiver: Arc<RwLock<Option<mpsc::Receiver<FileSystemEvent>>>>,
53 watched_paths: Arc<RwLock<HashSet<PathBuf>>>,
54 media_extensions: HashSet<String>,
55 debounce_duration: Duration,
56}
57
58impl CrossPlatformWatcher {
59 pub fn new() -> Self {
61 let (event_sender, event_receiver) = mpsc::channel(256); let mut media_extensions = HashSet::with_capacity(32);
65 let extensions = [
66 "mp4", "mkv", "avi", "mov", "wmv", "flv", "webm", "m4v", "3gp", "mpg", "mpeg",
68 "mp3", "flac", "wav", "aac", "ogg", "wma", "m4a", "opus", "aiff",
70 "jpg", "jpeg", "png", "gif", "bmp", "tiff", "webp", "svg",
72 ];
73 for ext in &extensions {
74 media_extensions.insert(ext.to_lowercase());
75 }
76
77 Self {
78 debouncer: Arc::new(RwLock::new(None)),
79 event_sender,
80 event_receiver: Arc::new(RwLock::new(Some(event_receiver))),
81 watched_paths: Arc::new(RwLock::new(HashSet::with_capacity(16))), media_extensions,
83 debounce_duration: Duration::from_millis(250), }
85 }
86
87 fn is_media_file(&self, path: &Path) -> bool {
89 if let Some(extension) = path.extension() {
90 if let Some(ext_str) = extension.to_str() {
91 return self.media_extensions.contains(&ext_str.to_lowercase());
92 }
93 }
94 false
95 }
96
97 fn convert_events(&self, events: Vec<DebouncedEvent>) -> Vec<FileSystemEvent> {
99 let mut fs_events = Vec::with_capacity(events.len()); for event in events {
102 match event.event.kind {
103 notify::EventKind::Create(_) => {
104 for path in &event.event.paths {
105 if path.is_dir() {
106 info!("Directory created (detected by watcher): {:?}", path);
108 fs_events.push(FileSystemEvent::Created(path.clone()));
109 } else if self.is_media_file(path) {
110 info!("Media file created (detected by watcher): {:?}", path);
111 fs_events.push(FileSystemEvent::Created(path.clone()));
112 } else {
113 debug!("Non-media file created, ignoring: {:?}", path);
114 }
115 }
116 }
117 notify::EventKind::Modify(_) => {
118 let media_paths: Vec<_> = event.event.paths.iter()
120 .filter(|path| self.is_media_file(path))
121 .collect();
122
123 for path in media_paths {
124 debug!("Media file modified: {:?}", path);
125 fs_events.push(FileSystemEvent::Modified(path.clone()));
126 }
127 }
128 notify::EventKind::Remove(_) => {
129 for path in &event.event.paths {
130 info!("Path deleted (detected by watcher): {:?}", path);
133 fs_events.push(FileSystemEvent::Deleted(path.clone()));
134 }
135 }
136 notify::EventKind::Other => {
137 let media_paths: Vec<_> = event.event.paths.iter()
139 .filter(|path| self.is_media_file(path))
140 .collect();
141
142 for path in media_paths {
143 debug!("Media file other event: {:?}", path);
144 fs_events.push(FileSystemEvent::Modified(path.clone()));
145 }
146 }
147 _ => {
148 let media_paths: Vec<_> = event.event.paths.iter()
150 .filter(|path| self.is_media_file(path))
151 .collect();
152
153 for path in media_paths {
154 debug!("Media file generic event: {:?}", path);
155 fs_events.push(FileSystemEvent::Modified(path.clone()));
156 }
157 }
158 }
159 }
160
161 fs_events
162 }
163
164 async fn initialize_watcher(&self) -> Result<()> {
166 let event_sender = self.event_sender.clone();
167 let media_extensions = self.media_extensions.clone();
168
169 let watcher_weak = Arc::downgrade(&self.debouncer);
171
172 let debouncer = new_debouncer_opt(
173 self.debounce_duration,
174 None, move |result: DebounceEventResult| {
176 let watcher_arc = if let Some(arc) = watcher_weak.upgrade() {
178 arc
179 } else {
180 warn!("Watcher has been dropped, cannot process file events.");
182 return;
183 };
184 match result {
185 Ok(events) => {
186 if !events.is_empty() {
187 info!("Watcher callback triggered with {} events", events.len());
188 for event in &events {
189 info!(" Raw event: {:?} for paths: {:?}", event.event.kind, event.paths);
190 }
191 }
192
193 let relevant_events: Vec<_> = events.into_iter()
195 .filter(|event| {
196 event.paths.iter().any(|path| {
197 if matches!(event.event.kind, notify::EventKind::Remove(_)) {
200 info!("Including deletion event for path: {:?}", path);
201 return true;
202 }
203
204 if path.is_dir() {
206 info!("Including directory event for path: {:?}", path);
207 return true;
208 }
209
210 if let Some(extension) = path.extension() {
212 if let Some(ext_str) = extension.to_str() {
213 if media_extensions.contains(&ext_str.to_lowercase()) {
214 info!("Including media file event for path: {:?}", path);
215 return true;
216 }
217 }
218 }
219
220 debug!("Excluding non-media file event for path: {:?}", path);
221 false
222 })
223 })
224 .collect();
225
226 if !relevant_events.is_empty() {
227 info!("Processing {} relevant events", relevant_events.len());
228
229 let temp_watcher = CrossPlatformWatcher {
231 debouncer: watcher_arc.clone(),
232 event_sender: event_sender.clone(),
233 event_receiver: Arc::new(RwLock::new(None)),
234 watched_paths: Arc::new(RwLock::new(HashSet::with_capacity(16))),
235 media_extensions: media_extensions.clone(),
236 debounce_duration: Duration::from_millis(250),
237 };
238 let fs_events = temp_watcher.convert_events(relevant_events);
239 for fs_event in fs_events {
240 if let Err(e) = event_sender.try_send(fs_event) {
241 error!("Failed to send file system event: {}", e);
242 }
243 }
244 }
245 }
246 Err(errors) => {
247 for error in errors {
248 error!("File watcher error: {:?}", error);
249 }
250 }
251 }
252 },
253 FileIdMap::new(),
254 Config::default(),
255 )?;
256
257 let mut debouncer_guard = self.debouncer.write().await;
258 *debouncer_guard = Some(debouncer);
259
260 info!("File system watcher initialized with {}ms debounce", self.debounce_duration.as_millis());
261 info!("Watcher callback registered and ready to receive events");
262 Ok(())
263 }
264}
265
266#[async_trait]
267impl FileSystemWatcher for CrossPlatformWatcher {
268 async fn start_watching(&self, directories: &[PathBuf]) -> Result<()> {
269 info!("Starting file system watcher for {} directories", directories.len());
270
271 if self.debouncer.read().await.is_none() {
273 self.initialize_watcher().await?;
274 }
275
276 let mut debouncer_guard = self.debouncer.write().await;
277 if let Some(ref mut debouncer) = *debouncer_guard {
278 let mut watched_paths = self.watched_paths.write().await;
279
280 for directory in directories {
281 if !directory.exists() {
282 warn!("Directory does not exist, skipping: {:?}", directory);
283 continue;
284 }
285
286 if !directory.is_dir() {
287 warn!("Path is not a directory, skipping: {:?}", directory);
288 continue;
289 }
290
291 match debouncer.watch(directory, RecursiveMode::Recursive) {
292 Ok(()) => {
293 watched_paths.insert(directory.clone());
294 info!("Started watching directory: {:?}", directory);
295
296 if directory.exists() && directory.is_dir() {
298 info!("Directory exists and is accessible: {:?}", directory);
299 } else {
300 warn!("Directory may not be accessible: {:?}", directory);
301 }
302 }
303 Err(e) => {
304 error!("Failed to watch directory {:?}: {}", directory, e);
305 return Err(e.into());
306 }
307 }
308 }
309 }
310
311 Ok(())
312 }
313
314 async fn stop_watching(&self) -> Result<()> {
315 info!("Stopping file system watcher");
316
317 let mut debouncer_guard = self.debouncer.write().await;
318 if let Some(debouncer) = debouncer_guard.take() {
319 drop(debouncer);
321 }
322
323 let mut watched_paths = self.watched_paths.write().await;
324 watched_paths.clear();
325
326 info!("File system watcher stopped");
327 Ok(())
328 }
329
330 fn get_event_receiver(&self) -> mpsc::Receiver<FileSystemEvent> {
331 let receiver_guard = self.event_receiver.try_write();
334 if let Ok(mut guard) = receiver_guard {
335 if let Some(receiver) = guard.take() {
336 return receiver;
337 }
338 }
339
340 warn!("Creating new event receiver - original may have been consumed");
343 let (_, receiver) = mpsc::channel(256); receiver
345 }
346
347 async fn add_watch_path(&self, path: &Path) -> Result<()> {
348 if !path.exists() {
349 warn!("Path does not exist, cannot watch: {:?}", path);
350 return Ok(());
351 }
352
353 let mut debouncer_guard = self.debouncer.write().await;
354 if let Some(ref mut debouncer) = *debouncer_guard {
355 let mut watched_paths = self.watched_paths.write().await;
356
357 if watched_paths.contains(path) {
358 debug!("Path already being watched: {:?}", path);
359 return Ok(());
360 }
361
362 match debouncer.watch(path, RecursiveMode::Recursive) {
363 Ok(()) => {
364 watched_paths.insert(path.to_path_buf());
365 info!("Added watch path: {:?}", path);
366 Ok(())
367 }
368 Err(e) => {
369 error!("Failed to add watch path {:?}: {}", path, e);
370 Err(e.into())
371 }
372 }
373 } else {
374 warn!("Watcher not initialized, cannot add path: {:?}", path);
375 Ok(())
376 }
377 }
378
379 async fn remove_watch_path(&self, path: &Path) -> Result<()> {
380 let mut debouncer_guard = self.debouncer.write().await;
381 if let Some(ref mut debouncer) = *debouncer_guard {
382 let mut watched_paths = self.watched_paths.write().await;
383
384 if !watched_paths.contains(path) {
385 debug!("Path not being watched: {:?}", path);
386 return Ok(());
387 }
388
389 match debouncer.unwatch(path) {
390 Ok(()) => {
391 watched_paths.remove(path);
392 info!("Removed watch path: {:?}", path);
393 Ok(())
394 }
395 Err(e) => {
396 error!("Failed to remove watch path {:?}: {}", path, e);
397 Err(e.into())
398 }
399 }
400 } else {
401 warn!("Watcher not initialized, cannot remove path: {:?}", path);
402 Ok(())
403 }
404 }
405
406 async fn is_watching(&self, path: &Path) -> bool {
407 let watched_paths = self.watched_paths.read().await;
408 watched_paths.contains(path)
409 }
410}
411
412impl Default for CrossPlatformWatcher {
413 fn default() -> Self {
414 Self::new()
415 }
416}
417
418#[cfg(test)]
419mod tests {
420 use super::*;
421 use std::fs;
422 use tempfile::TempDir;
423 use tokio::time::{sleep, timeout};
424
425 #[tokio::test]
426 async fn test_watcher_creation() {
427 let watcher = CrossPlatformWatcher::new();
428 assert!(!watcher.is_watching(Path::new("/nonexistent")).await);
429 }
430
431 #[tokio::test]
432 async fn test_media_file_detection() {
433 let watcher = CrossPlatformWatcher::new();
434
435 assert!(watcher.is_media_file(Path::new("test.mp4")));
436 assert!(watcher.is_media_file(Path::new("test.MP3")));
437 assert!(watcher.is_media_file(Path::new("test.jpg")));
438 assert!(!watcher.is_media_file(Path::new("test.txt")));
439 assert!(!watcher.is_media_file(Path::new("test")));
440 }
441
442 #[tokio::test]
443 async fn test_watch_nonexistent_directory() {
444 let watcher = CrossPlatformWatcher::new();
445 let result = watcher.start_watching(&[PathBuf::from("/nonexistent/path")]).await;
446 assert!(result.is_ok());
448 }
449
450 #[tokio::test]
451 async fn test_watch_and_unwatch() {
452 let temp_dir = TempDir::new().unwrap();
453 let watcher = CrossPlatformWatcher::new();
454
455 let result = watcher.start_watching(&[temp_dir.path().to_path_buf()]).await;
457 assert!(result.is_ok());
458
459 assert!(watcher.is_watching(temp_dir.path()).await);
461
462 let result = watcher.stop_watching().await;
464 assert!(result.is_ok());
465
466 assert!(!watcher.is_watching(temp_dir.path()).await);
468 }
469
470 #[tokio::test]
471 async fn test_file_events() {
472 let temp_dir = TempDir::new().unwrap();
473 let watcher = CrossPlatformWatcher::new();
474
475 let mut receiver = watcher.get_event_receiver();
477
478 watcher.start_watching(&[temp_dir.path().to_path_buf()]).await.unwrap();
480
481 sleep(Duration::from_millis(200)).await;
483
484 let test_file = temp_dir.path().join("test.mp4");
486 fs::write(&test_file, b"test content").unwrap();
487
488 let timeout_duration = Duration::from_secs(5);
490 let correct_event_result = timeout(timeout_duration, async {
491 loop {
492 let event = receiver.recv().await;
493 match event {
494 Some(FileSystemEvent::Created(path)) => {
495 let canonical_received = path.canonicalize().unwrap_or_else(|_| path.clone());
496 let canonical_expected = test_file.canonicalize().unwrap_or_else(|_| test_file.clone());
497
498 if canonical_received == canonical_expected {
499 return Some(FileSystemEvent::Created(path));
501 } else {
502 info!("Ignoring creation event for path: {:?}", path);
504 }
505 }
506 Some(other_event) => {
507 info!("Ignoring other event: {:?}", other_event);
509 }
510 None => {
511 return None;
513 }
514 }
515 }
516 }).await;
517
518 if let Ok(Some(event)) = correct_event_result {
519 match event {
520 FileSystemEvent::Created(path) => {
521 let canonical_received = path.canonicalize().unwrap_or(path);
522 let canonical_expected = test_file.canonicalize().unwrap_or(test_file);
523 assert_eq!(canonical_received, canonical_expected);
524 }
525 _ => panic!("Received an unexpected event type after filtering"),
526 }
527 } else {
528 warn!("No specific file creation event received within {:?}. This can sometimes happen in test environments.", timeout_duration);
530 }
531
532 watcher.stop_watching().await.unwrap();
533 }
534}