rusty_files/watcher/
monitor.rs1use crate::core::config::SearchConfig;
2use crate::core::error::Result;
3use crate::filters::ExclusionFilter;
4use crate::storage::Database;
5use crate::watcher::debouncer::{EventDebouncer, FileEventType};
6use crate::watcher::synchronizer::{FileEvent, IndexSynchronizer};
7use notify::{Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
8use std::path::Path;
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use tokio::sync::mpsc;
12
13pub struct FileSystemMonitor {
14 exclusion_filter: Arc<ExclusionFilter>,
15 synchronizer: Arc<IndexSynchronizer>,
16 debouncer: Arc<EventDebouncer>,
17 is_running: Arc<AtomicBool>,
18 watcher: Option<RecommendedWatcher>,
19}
20
21impl FileSystemMonitor {
22 pub fn new(
23 database: Arc<Database>,
24 config: Arc<SearchConfig>,
25 exclusion_filter: Arc<ExclusionFilter>,
26 ) -> Self {
27 let synchronizer = Arc::new(IndexSynchronizer::new(
28 database,
29 Arc::clone(&config),
30 Arc::clone(&exclusion_filter),
31 ));
32
33 let debouncer = Arc::new(EventDebouncer::new(config.watch_debounce_ms));
34
35 Self {
36 exclusion_filter,
37 synchronizer,
38 debouncer,
39 is_running: Arc::new(AtomicBool::new(false)),
40 watcher: None,
41 }
42 }
43
44 pub fn start<P: AsRef<Path>>(&mut self, root: P) -> Result<()> {
45 if self.is_running.load(Ordering::Relaxed) {
46 return Ok(());
47 }
48
49 let sender = self.synchronizer.get_sender();
50 let debouncer = Arc::clone(&self.debouncer);
51 let exclusion_filter = Arc::clone(&self.exclusion_filter);
52
53 let mut watcher = notify::recommended_watcher(move |res: notify::Result<Event>| {
54 if let Ok(event) = res {
55 Self::handle_notify_event(event, &sender, &debouncer, &exclusion_filter);
56 }
57 })?;
58
59 watcher.watch(root.as_ref(), RecursiveMode::Recursive)?;
60
61 self.watcher = Some(watcher);
62 self.is_running.store(true, Ordering::Relaxed);
63
64 Ok(())
65 }
66
67 pub fn stop(&mut self) -> Result<()> {
68 if !self.is_running.load(Ordering::Relaxed) {
69 return Ok(());
70 }
71
72 self.watcher = None;
73 self.is_running.store(false, Ordering::Relaxed);
74
75 Ok(())
76 }
77
78 pub fn is_running(&self) -> bool {
79 self.is_running.load(Ordering::Relaxed)
80 }
81
82 fn handle_notify_event(
83 event: Event,
84 sender: &mpsc::UnboundedSender<FileEvent>,
85 debouncer: &Arc<EventDebouncer>,
86 exclusion_filter: &Arc<ExclusionFilter>,
87 ) {
88 let event_type = match event.kind {
89 EventKind::Create(_) => FileEventType::Created,
90 EventKind::Modify(_) => FileEventType::Modified,
91 EventKind::Remove(_) => FileEventType::Deleted,
92 EventKind::Any => FileEventType::Modified,
93 _ => return,
94 };
95
96 for path in event.paths {
97 if exclusion_filter.is_excluded(&path) {
98 continue;
99 }
100
101 if !debouncer.should_process(path.clone(), event_type) {
102 continue;
103 }
104
105 let file_event = FileEvent { path, event_type };
106
107 if sender.send(file_event).is_err() {
108 log::error!("Failed to send file event to synchronizer");
109 }
110 }
111 }
112
113 pub async fn run_cleanup_task(&self) {
114 use tokio::time::{interval, Duration};
115
116 let mut interval = interval(Duration::from_secs(60));
117 let debouncer = Arc::clone(&self.debouncer);
118
119 loop {
120 interval.tick().await;
121 debouncer.cleanup_old_events(Duration::from_secs(300));
122 }
123 }
124}
125
126impl Drop for FileSystemMonitor {
127 fn drop(&mut self) {
128 let _ = self.stop();
129 }
130}
131
132#[cfg(test)]
133mod tests {
134 use super::*;
135 use tempfile::TempDir;
136
137 #[test]
138 fn test_monitor_creation() {
139 let db = Arc::new(Database::in_memory(10).unwrap());
140 let config = Arc::new(SearchConfig::default());
141 let filter = Arc::new(ExclusionFilter::default());
142
143 let monitor = FileSystemMonitor::new(db, config, filter);
144 assert!(!monitor.is_running());
145 }
146
147 #[test]
148 fn test_monitor_start_stop() {
149 let temp_dir = TempDir::new().unwrap();
150
151 let db = Arc::new(Database::in_memory(10).unwrap());
152 let config = Arc::new(SearchConfig::default());
153 let filter = Arc::new(ExclusionFilter::default());
154
155 let mut monitor = FileSystemMonitor::new(db, config, filter);
156
157 assert!(monitor.start(temp_dir.path()).is_ok());
158 assert!(monitor.is_running());
159
160 assert!(monitor.stop().is_ok());
161 assert!(!monitor.is_running());
162 }
163}