vuio/watcher/
mod.rs

1use async_trait::async_trait;
2use notify::{RecommendedWatcher, RecursiveMode, Config};
3use notify_debouncer_full::{new_debouncer_opt, DebounceEventResult, DebouncedEvent, Debouncer, FileIdMap};
4use std::collections::HashSet;
5use std::path::{Path, PathBuf};
6use std::sync::Arc;
7use std::time::Duration;
8use tokio::sync::{mpsc, RwLock};
9use tracing::{debug, error, info, warn};
10
11use crate::error::Result;
12
13/// Events that can occur in the file system for media files
14#[derive(Debug, Clone)]
15pub enum FileSystemEvent {
16    /// A new file was created
17    Created(PathBuf),
18    /// An existing file was modified
19    Modified(PathBuf),
20    /// A file was deleted
21    Deleted(PathBuf),
22    /// A file was renamed/moved
23    Renamed { from: PathBuf, to: PathBuf },
24}
25
26/// Trait for cross-platform file system watching
27#[async_trait]
28pub trait FileSystemWatcher: Send + Sync {
29    /// Start watching the specified directories for changes
30    async fn start_watching(&self, directories: &[PathBuf]) -> Result<()>;
31    
32    /// Stop watching all directories
33    async fn stop_watching(&self) -> Result<()>;
34    
35    /// Get a receiver for file system events
36    fn get_event_receiver(&self) -> mpsc::Receiver<FileSystemEvent>;
37    
38    /// Add a new path to watch
39    async fn add_watch_path(&self, path: &Path) -> Result<()>;
40    
41    /// Remove a path from watching
42    async fn remove_watch_path(&self, path: &Path) -> Result<()>;
43    
44    /// Check if a path is currently being watched
45    async fn is_watching(&self, path: &Path) -> bool;
46}
47
48/// Cross-platform file system watcher implementation
49pub struct CrossPlatformWatcher {
50    debouncer: Arc<RwLock<Option<Debouncer<RecommendedWatcher, FileIdMap>>>>,
51    event_sender: mpsc::Sender<FileSystemEvent>,
52    event_receiver: Arc<RwLock<Option<mpsc::Receiver<FileSystemEvent>>>>,
53    watched_paths: Arc<RwLock<HashSet<PathBuf>>>,
54    media_extensions: HashSet<String>,
55    debounce_duration: Duration,
56}
57
58impl CrossPlatformWatcher {
59    /// Create a new cross-platform file system watcher
60    pub fn new() -> Self {
61        let (event_sender, event_receiver) = mpsc::channel(256); // Reduced buffer size for memory efficiency
62        
63        // Define supported media file extensions with pre-allocated capacity
64        let mut media_extensions = HashSet::with_capacity(32);
65        let extensions = [
66            // Video formats
67            "mp4", "mkv", "avi", "mov", "wmv", "flv", "webm", "m4v", "3gp", "mpg", "mpeg",
68            // Audio formats  
69            "mp3", "flac", "wav", "aac", "ogg", "wma", "m4a", "opus", "aiff",
70            // Image formats
71            "jpg", "jpeg", "png", "gif", "bmp", "tiff", "webp", "svg",
72        ];
73        for ext in &extensions {
74            media_extensions.insert(ext.to_lowercase());
75        }
76
77        Self {
78            debouncer: Arc::new(RwLock::new(None)),
79            event_sender,
80            event_receiver: Arc::new(RwLock::new(Some(event_receiver))),
81            watched_paths: Arc::new(RwLock::new(HashSet::with_capacity(16))), // Pre-allocate capacity
82            media_extensions,
83            debounce_duration: Duration::from_millis(250), // 250ms debounce for reduced event frequency
84        }
85    }
86
87    /// Check if a file is a supported media file based on its extension
88    fn is_media_file(&self, path: &Path) -> bool {
89        if let Some(extension) = path.extension() {
90            if let Some(ext_str) = extension.to_str() {
91                return self.media_extensions.contains(&ext_str.to_lowercase());
92            }
93        }
94        false
95    }
96
97    /// Convert notify events to our FileSystemEvent enum
98    fn convert_events(&self, events: Vec<DebouncedEvent>) -> Vec<FileSystemEvent> {
99        let mut fs_events = Vec::with_capacity(events.len()); // Pre-allocate capacity
100        
101        for event in events {
102            match event.event.kind {
103                notify::EventKind::Create(_) => {
104                    for path in &event.event.paths {
105                        if path.is_dir() {
106                            // Handle directory creation - scan for media files
107                            info!("Directory created (detected by watcher): {:?}", path);
108                            fs_events.push(FileSystemEvent::Created(path.clone()));
109                        } else if self.is_media_file(path) {
110                            info!("Media file created (detected by watcher): {:?}", path);
111                            fs_events.push(FileSystemEvent::Created(path.clone()));
112                        } else {
113                            debug!("Non-media file created, ignoring: {:?}", path);
114                        }
115                    }
116                }
117                notify::EventKind::Modify(_) => {
118                    // Only process modify events for media files
119                    let media_paths: Vec<_> = event.event.paths.iter()
120                        .filter(|path| self.is_media_file(path))
121                        .collect();
122                    
123                    for path in media_paths {
124                        debug!("Media file modified: {:?}", path);
125                        fs_events.push(FileSystemEvent::Modified(path.clone()));
126                    }
127                }
128                notify::EventKind::Remove(_) => {
129                    for path in &event.event.paths {
130                        // Since the path is deleted, we can't check if it was a directory
131                        // We'll send all deletion events and let the handler figure it out
132                        info!("Path deleted (detected by watcher): {:?}", path);
133                        fs_events.push(FileSystemEvent::Deleted(path.clone()));
134                    }
135                }
136                notify::EventKind::Other => {
137                    // Handle platform-specific events for media files only
138                    let media_paths: Vec<_> = event.event.paths.iter()
139                        .filter(|path| self.is_media_file(path))
140                        .collect();
141                    
142                    for path in media_paths {
143                        debug!("Media file other event: {:?}", path);
144                        fs_events.push(FileSystemEvent::Modified(path.clone()));
145                    }
146                }
147                _ => {
148                    // Handle other event types as modifications for media files only
149                    let media_paths: Vec<_> = event.event.paths.iter()
150                        .filter(|path| self.is_media_file(path))
151                        .collect();
152                    
153                    for path in media_paths {
154                        debug!("Media file generic event: {:?}", path);
155                        fs_events.push(FileSystemEvent::Modified(path.clone()));
156                    }
157                }
158            }
159        }
160        
161        fs_events
162    }
163
164    /// Initialize the debounced watcher
165    async fn initialize_watcher(&self) -> Result<()> {
166        let event_sender = self.event_sender.clone();
167        let media_extensions = self.media_extensions.clone();
168        
169        // Create a weak reference to self to avoid circular references in the closure
170        let watcher_weak = Arc::downgrade(&self.debouncer);
171
172        let debouncer = new_debouncer_opt(
173            self.debounce_duration,
174            None, // Use default tick rate
175            move |result: DebounceEventResult| {
176                // Upgrade the weak reference to an Arc
177                let watcher_arc = if let Some(arc) = watcher_weak.upgrade() {
178                    arc
179                } else {
180                    // The watcher has been dropped, so we can't process events
181                    warn!("Watcher has been dropped, cannot process file events.");
182                    return;
183                };
184                match result {
185                    Ok(events) => {
186                        if !events.is_empty() {
187                            info!("Watcher callback triggered with {} events", events.len());
188                            for event in &events {
189                                info!("  Raw event: {:?} for paths: {:?}", event.event.kind, event.paths);
190                            }
191                        }
192                        
193                        // Filter events for media files OR directories
194                        let relevant_events: Vec<_> = events.into_iter()
195                            .filter(|event| {
196                                event.paths.iter().any(|path| {
197                                    // For deletion events, we can't check if path.is_dir() since it's gone
198                                    // So we include all deletion events
199                                    if matches!(event.event.kind, notify::EventKind::Remove(_)) {
200                                        info!("Including deletion event for path: {:?}", path);
201                                        return true;
202                                    }
203                                    
204                                    // Include directories and media files for other events
205                                    if path.is_dir() {
206                                        info!("Including directory event for path: {:?}", path);
207                                        return true;
208                                    }
209                                    
210                                    // Include media files
211                                    if let Some(extension) = path.extension() {
212                                        if let Some(ext_str) = extension.to_str() {
213                                            if media_extensions.contains(&ext_str.to_lowercase()) {
214                                                info!("Including media file event for path: {:?}", path);
215                                                return true;
216                                            }
217                                        }
218                                    }
219                                    
220                                    debug!("Excluding non-media file event for path: {:?}", path);
221                                    false
222                                })
223                            })
224                            .collect();
225
226                        if !relevant_events.is_empty() {
227                            info!("Processing {} relevant events", relevant_events.len());
228                            
229                            // Create a temporary watcher instance for event conversion
230                            let temp_watcher = CrossPlatformWatcher {
231                                debouncer: watcher_arc.clone(),
232                                event_sender: event_sender.clone(),
233                                event_receiver: Arc::new(RwLock::new(None)),
234                                watched_paths: Arc::new(RwLock::new(HashSet::with_capacity(16))),
235                                media_extensions: media_extensions.clone(),
236                                debounce_duration: Duration::from_millis(250),
237                            };
238                            let fs_events = temp_watcher.convert_events(relevant_events);
239                            for fs_event in fs_events {
240                                if let Err(e) = event_sender.try_send(fs_event) {
241                                    error!("Failed to send file system event: {}", e);
242                                }
243                            }
244                        }
245                    }
246                    Err(errors) => {
247                        for error in errors {
248                            error!("File watcher error: {:?}", error);
249                        }
250                    }
251                }
252            },
253            FileIdMap::new(),
254            Config::default(),
255        )?;
256
257        let mut debouncer_guard = self.debouncer.write().await;
258        *debouncer_guard = Some(debouncer);
259        
260        info!("File system watcher initialized with {}ms debounce", self.debounce_duration.as_millis());
261        info!("Watcher callback registered and ready to receive events");
262        Ok(())
263    }
264}
265
266#[async_trait]
267impl FileSystemWatcher for CrossPlatformWatcher {
268    async fn start_watching(&self, directories: &[PathBuf]) -> Result<()> {
269        info!("Starting file system watcher for {} directories", directories.len());
270        
271        // Initialize the watcher if not already done
272        if self.debouncer.read().await.is_none() {
273            self.initialize_watcher().await?;
274        }
275
276        let mut debouncer_guard = self.debouncer.write().await;
277        if let Some(ref mut debouncer) = *debouncer_guard {
278            let mut watched_paths = self.watched_paths.write().await;
279            
280            for directory in directories {
281                if !directory.exists() {
282                    warn!("Directory does not exist, skipping: {:?}", directory);
283                    continue;
284                }
285                
286                if !directory.is_dir() {
287                    warn!("Path is not a directory, skipping: {:?}", directory);
288                    continue;
289                }
290
291                match debouncer.watch(directory, RecursiveMode::Recursive) {
292                    Ok(()) => {
293                        watched_paths.insert(directory.clone());
294                        info!("Started watching directory: {:?}", directory);
295                        
296                        // Test if directory is accessible
297                        if directory.exists() && directory.is_dir() {
298                            info!("Directory exists and is accessible: {:?}", directory);
299                        } else {
300                            warn!("Directory may not be accessible: {:?}", directory);
301                        }
302                    }
303                    Err(e) => {
304                        error!("Failed to watch directory {:?}: {}", directory, e);
305                        return Err(e.into());
306                    }
307                }
308            }
309        }
310
311        Ok(())
312    }
313
314    async fn stop_watching(&self) -> Result<()> {
315        info!("Stopping file system watcher");
316        
317        let mut debouncer_guard = self.debouncer.write().await;
318        if let Some(debouncer) = debouncer_guard.take() {
319            // The debouncer will be dropped here, stopping the watcher
320            drop(debouncer);
321        }
322        
323        let mut watched_paths = self.watched_paths.write().await;
324        watched_paths.clear();
325        
326        info!("File system watcher stopped");
327        Ok(())
328    }
329
330    fn get_event_receiver(&self) -> mpsc::Receiver<FileSystemEvent> {
331        // This is a bit tricky - we need to return the receiver but can only do it once
332        // In practice, this should be called once during application startup
333        let receiver_guard = self.event_receiver.try_write();
334        if let Ok(mut guard) = receiver_guard {
335            if let Some(receiver) = guard.take() {
336                return receiver;
337            }
338        }
339        
340        // If we can't get the original receiver, create a new channel
341        // This shouldn't happen in normal usage
342        warn!("Creating new event receiver - original may have been consumed");
343        let (_, receiver) = mpsc::channel(256); // Reduced buffer size
344        receiver
345    }
346
347    async fn add_watch_path(&self, path: &Path) -> Result<()> {
348        if !path.exists() {
349            warn!("Path does not exist, cannot watch: {:?}", path);
350            return Ok(());
351        }
352
353        let mut debouncer_guard = self.debouncer.write().await;
354        if let Some(ref mut debouncer) = *debouncer_guard {
355            let mut watched_paths = self.watched_paths.write().await;
356            
357            if watched_paths.contains(path) {
358                debug!("Path already being watched: {:?}", path);
359                return Ok(());
360            }
361
362            match debouncer.watch(path, RecursiveMode::Recursive) {
363                Ok(()) => {
364                    watched_paths.insert(path.to_path_buf());
365                    info!("Added watch path: {:?}", path);
366                    Ok(())
367                }
368                Err(e) => {
369                    error!("Failed to add watch path {:?}: {}", path, e);
370                    Err(e.into())
371                }
372            }
373        } else {
374            warn!("Watcher not initialized, cannot add path: {:?}", path);
375            Ok(())
376        }
377    }
378
379    async fn remove_watch_path(&self, path: &Path) -> Result<()> {
380        let mut debouncer_guard = self.debouncer.write().await;
381        if let Some(ref mut debouncer) = *debouncer_guard {
382            let mut watched_paths = self.watched_paths.write().await;
383            
384            if !watched_paths.contains(path) {
385                debug!("Path not being watched: {:?}", path);
386                return Ok(());
387            }
388
389            match debouncer.unwatch(path) {
390                Ok(()) => {
391                    watched_paths.remove(path);
392                    info!("Removed watch path: {:?}", path);
393                    Ok(())
394                }
395                Err(e) => {
396                    error!("Failed to remove watch path {:?}: {}", path, e);
397                    Err(e.into())
398                }
399            }
400        } else {
401            warn!("Watcher not initialized, cannot remove path: {:?}", path);
402            Ok(())
403        }
404    }
405
406    async fn is_watching(&self, path: &Path) -> bool {
407        let watched_paths = self.watched_paths.read().await;
408        watched_paths.contains(path)
409    }
410}
411
412impl Default for CrossPlatformWatcher {
413    fn default() -> Self {
414        Self::new()
415    }
416}
417
418#[cfg(test)]
419mod tests {
420    use super::*;
421    use std::fs;
422    use tempfile::TempDir;
423    use tokio::time::{sleep, timeout};
424
425    #[tokio::test]
426    async fn test_watcher_creation() {
427        let watcher = CrossPlatformWatcher::new();
428        assert!(!watcher.is_watching(Path::new("/nonexistent")).await);
429    }
430
431    #[tokio::test]
432    async fn test_media_file_detection() {
433        let watcher = CrossPlatformWatcher::new();
434        
435        assert!(watcher.is_media_file(Path::new("test.mp4")));
436        assert!(watcher.is_media_file(Path::new("test.MP3")));
437        assert!(watcher.is_media_file(Path::new("test.jpg")));
438        assert!(!watcher.is_media_file(Path::new("test.txt")));
439        assert!(!watcher.is_media_file(Path::new("test")));
440    }
441
442    #[tokio::test]
443    async fn test_watch_nonexistent_directory() {
444        let watcher = CrossPlatformWatcher::new();
445        let result = watcher.start_watching(&[PathBuf::from("/nonexistent/path")]).await;
446        // Should not fail, just log a warning
447        assert!(result.is_ok());
448    }
449
450    #[tokio::test]
451    async fn test_watch_and_unwatch() {
452        let temp_dir = TempDir::new().unwrap();
453        let watcher = CrossPlatformWatcher::new();
454        
455        // Start watching
456        let result = watcher.start_watching(&[temp_dir.path().to_path_buf()]).await;
457        assert!(result.is_ok());
458        
459        // Check if watching
460        assert!(watcher.is_watching(temp_dir.path()).await);
461        
462        // Stop watching
463        let result = watcher.stop_watching().await;
464        assert!(result.is_ok());
465        
466        // Should no longer be watching
467        assert!(!watcher.is_watching(temp_dir.path()).await);
468    }
469
470    #[tokio::test]
471    async fn test_file_events() {
472        let temp_dir = TempDir::new().unwrap();
473        let watcher = CrossPlatformWatcher::new();
474        
475        // Get event receiver before starting watcher
476        let mut receiver = watcher.get_event_receiver();
477        
478        // Start watching
479        watcher.start_watching(&[temp_dir.path().to_path_buf()]).await.unwrap();
480        
481        // Give the watcher time to initialize
482        sleep(Duration::from_millis(200)).await;
483        
484        // Create a media file
485        let test_file = temp_dir.path().join("test.mp4");
486        fs::write(&test_file, b"test content").unwrap();
487        
488        // Wait for the correct event with timeout, ignoring directory creation events
489        let timeout_duration = Duration::from_secs(5);
490        let correct_event_result = timeout(timeout_duration, async {
491            loop {
492                let event = receiver.recv().await;
493                match event {
494                    Some(FileSystemEvent::Created(path)) => {
495                        let canonical_received = path.canonicalize().unwrap_or_else(|_| path.clone());
496                        let canonical_expected = test_file.canonicalize().unwrap_or_else(|_| test_file.clone());
497                        
498                        if canonical_received == canonical_expected {
499                            // This is the event we are looking for
500                            return Some(FileSystemEvent::Created(path));
501                        } else {
502                            // This is likely the directory creation event, ignore it and continue waiting
503                            info!("Ignoring creation event for path: {:?}", path);
504                        }
505                    }
506                    Some(other_event) => {
507                        // Ignore other events for this test
508                        info!("Ignoring other event: {:?}", other_event);
509                    }
510                    None => {
511                        // Channel is closed, stop waiting
512                        return None;
513                    }
514                }
515            }
516        }).await;
517
518        if let Ok(Some(event)) = correct_event_result {
519            match event {
520                FileSystemEvent::Created(path) => {
521                    let canonical_received = path.canonicalize().unwrap_or(path);
522                    let canonical_expected = test_file.canonicalize().unwrap_or(test_file);
523                    assert_eq!(canonical_received, canonical_expected);
524                }
525                _ => panic!("Received an unexpected event type after filtering"),
526            }
527        } else {
528            // Events might be flaky in test environments, so we don't fail the test
529            warn!("No specific file creation event received within {:?}. This can sometimes happen in test environments.", timeout_duration);
530        }
531        
532        watcher.stop_watching().await.unwrap();
533    }
534}