rusty_files/watcher/
monitor.rs

1use crate::core::config::SearchConfig;
2use crate::core::error::Result;
3use crate::filters::ExclusionFilter;
4use crate::storage::Database;
5use crate::watcher::debouncer::{EventDebouncer, FileEventType};
6use crate::watcher::synchronizer::{FileEvent, IndexSynchronizer};
7use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::path::Path;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use tokio::sync::mpsc;
12
13pub struct FileSystemMonitor {
14    exclusion_filter: Arc<ExclusionFilter>,
15    synchronizer: Arc<IndexSynchronizer>,
16    debouncer: Arc<EventDebouncer>,
17    is_running: Arc<AtomicBool>,
18    watcher: Option<RecommendedWatcher>,
19}
20
21impl FileSystemMonitor {
22    pub fn new(
23        database: Arc<Database>,
24        config: Arc<SearchConfig>,
25        exclusion_filter: Arc<ExclusionFilter>,
26    ) -> Self {
27        let synchronizer = Arc::new(IndexSynchronizer::new(
28            database,
29            Arc::clone(&config),
30            Arc::clone(&exclusion_filter),
31        ));
32
33        let debouncer = Arc::new(EventDebouncer::new(config.watch_debounce_ms));
34
35        Self {
36            exclusion_filter,
37            synchronizer,
38            debouncer,
39            is_running: Arc::new(AtomicBool::new(false)),
40            watcher: None,
41        }
42    }
43
44    pub fn start<P: AsRef<Path>>(&mut self, root: P) -> Result<()> {
45        if self.is_running.load(Ordering::Relaxed) {
46            return Ok(());
47        }
48
49        let sender = self.synchronizer.get_sender();
50        let debouncer = Arc::clone(&self.debouncer);
51        let exclusion_filter = Arc::clone(&self.exclusion_filter);
52
53        let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| {
54            if let Ok(event) = res {
55                Self::handle_notify_event(event, &sender, &debouncer, &exclusion_filter);
56            }
57        })?;
58
59        watcher.watch(root.as_ref(), RecursiveMode::Recursive)?;
60
61        self.watcher = Some(watcher);
62        self.is_running.store(true, Ordering::Relaxed);
63
64        Ok(())
65    }
66
67    pub fn stop(&mut self) -> Result<()> {
68        if !self.is_running.load(Ordering::Relaxed) {
69            return Ok(());
70        }
71
72        self.watcher = None;
73        self.is_running.store(false, Ordering::Relaxed);
74
75        Ok(())
76    }
77
78    pub fn is_running(&self) -> bool {
79        self.is_running.load(Ordering::Relaxed)
80    }
81
82    fn handle_notify_event(
83        event: Event,
84        sender: &mpsc::UnboundedSender<FileEvent>,
85        debouncer: &Arc<EventDebouncer>,
86        exclusion_filter: &Arc<ExclusionFilter>,
87    ) {
88        let event_type = match event.kind {
89            EventKind::Create(_) => FileEventType::Created,
90            EventKind::Modify(_) => FileEventType::Modified,
91            EventKind::Remove(_) => FileEventType::Deleted,
92            EventKind::Any => FileEventType::Modified,
93            _ => return,
94        };
95
96        for path in event.paths {
97            if exclusion_filter.is_excluded(&path) {
98                continue;
99            }
100
101            if !debouncer.should_process(path.clone(), event_type) {
102                continue;
103            }
104
105            let file_event = FileEvent { path, event_type };
106
107            if sender.send(file_event).is_err() {
108                log::error!("Failed to send file event to synchronizer");
109            }
110        }
111    }
112
113    pub async fn run_cleanup_task(&self) {
114        use tokio::time::{interval, Duration};
115
116        let mut interval = interval(Duration::from_secs(60));
117        let debouncer = Arc::clone(&self.debouncer);
118
119        loop {
120            interval.tick().await;
121            debouncer.cleanup_old_events(Duration::from_secs(300));
122        }
123    }
124}
125
126impl Drop for FileSystemMonitor {
127    fn drop(&mut self) {
128        let _ = self.stop();
129    }
130}
131
132#[cfg(test)]
133mod tests {
134    use super::*;
135    use tempfile::TempDir;
136
137    #[test]
138    fn test_monitor_creation() {
139        let db = Arc::new(Database::in_memory(10).unwrap());
140        let config = Arc::new(SearchConfig::default());
141        let filter = Arc::new(ExclusionFilter::default());
142
143        let monitor = FileSystemMonitor::new(db, config, filter);
144        assert!(!monitor.is_running());
145    }
146
147    #[test]
148    fn test_monitor_start_stop() {
149        let temp_dir = TempDir::new().unwrap();
150
151        let db = Arc::new(Database::in_memory(10).unwrap());
152        let config = Arc::new(SearchConfig::default());
153        let filter = Arc::new(ExclusionFilter::default());
154
155        let mut monitor = FileSystemMonitor::new(db, config, filter);
156
157        assert!(monitor.start(temp_dir.path()).is_ok());
158        assert!(monitor.is_running());
159
160        assert!(monitor.stop().is_ok());
161        assert!(!monitor.is_running());
162    }
163}