Skip to main content

nexus_shield/endpoint/
watcher.rs

1// ============================================================================
2// File: endpoint/watcher.rs
3// Description: Real-time filesystem monitoring via inotify (notify crate)
4// Author: Andrew Jewell Sr. - AutomataNexus
5// Updated: March 24, 2026
6// ============================================================================
7//! File Watcher — monitors directories for file creation, modification, and
8//! renames, then dispatches paths to the scanner pipeline.
9
10use super::allowlist::DeveloperAllowlist;
11use notify::{Event, EventKind, RecursiveMode, Watcher};
12use serde::{Deserialize, Serialize};
13use std::path::{Path, PathBuf};
14use std::sync::atomic::{AtomicBool, Ordering};
15use std::sync::Arc;
16
17/// Configuration for the filesystem watcher.
18#[derive(Debug, Clone, Serialize, Deserialize)]
19pub struct WatcherConfig {
20    /// Directories to monitor recursively.
21    pub watch_paths: Vec<PathBuf>,
22    /// Path patterns to exclude (component or extension match).
23    pub exclude_patterns: Vec<String>,
24    /// Maximum file size to scan (bytes). Larger files are skipped.
25    pub max_file_size: u64,
26    /// Debounce interval in milliseconds for rapid file events.
27    pub debounce_ms: u64,
28}
29
30impl Default for WatcherConfig {
31    fn default() -> Self {
32        let home = std::env::var("HOME")
33            .map(PathBuf::from)
34            .unwrap_or_else(|_| PathBuf::from("/root"));
35
36        Self {
37            watch_paths: vec![home, PathBuf::from("/tmp")],
38            exclude_patterns: vec![
39                "node_modules".to_string(),
40                "target".to_string(),
41                ".git".to_string(),
42                "__pycache__".to_string(),
43                ".cache".to_string(),
44                "*.o".to_string(),
45                "*.a".to_string(),
46                "*.pyc".to_string(),
47                "*.class".to_string(),
48            ],
49            max_file_size: 104_857_600, // 100 MB
50            debounce_ms: 300,
51        }
52    }
53}
54
55/// Real-time filesystem watcher that dispatches file paths to the scan pipeline.
56pub struct FileWatcher {
57    config: WatcherConfig,
58    scan_tx: tokio::sync::mpsc::UnboundedSender<PathBuf>,
59    running: Arc<AtomicBool>,
60}
61
62impl FileWatcher {
63    /// Create a new file watcher.
64    pub fn new(
65        config: WatcherConfig,
66        scan_tx: tokio::sync::mpsc::UnboundedSender<PathBuf>,
67    ) -> Self {
68        Self {
69            config,
70            scan_tx,
71            running: Arc::new(AtomicBool::new(true)),
72        }
73    }
74
75    /// Start the watcher in a background task. Returns a JoinHandle.
76    pub fn start(self, allowlist: Arc<DeveloperAllowlist>) -> tokio::task::JoinHandle<()> {
77        let config = self.config.clone();
78        let scan_tx = self.scan_tx.clone();
79        let running = Arc::clone(&self.running);
80
81        tokio::spawn(async move {
82            // Use a std channel for the notify watcher (it's sync)
83            let (tx, rx) = std::sync::mpsc::channel::<notify::Result<Event>>();
84
85            let mut watcher = match notify::RecommendedWatcher::new(
86                tx,
87                notify::Config::default()
88                    .with_poll_interval(std::time::Duration::from_millis(config.debounce_ms)),
89            ) {
90                Ok(w) => w,
91                Err(e) => {
92                    tracing::error!("Failed to create file watcher: {}", e);
93                    return;
94                }
95            };
96
97            // Add watch paths
98            for path in &config.watch_paths {
99                if path.exists() {
100                    match watcher.watch(path, RecursiveMode::Recursive) {
101                        Ok(_) => tracing::info!("Watching directory: {}", path.display()),
102                        Err(e) => tracing::warn!("Cannot watch {}: {}", path.display(), e),
103                    }
104                }
105            }
106
107            // Process events
108            while running.load(Ordering::Relaxed) {
109                match rx.recv_timeout(std::time::Duration::from_secs(1)) {
110                    Ok(Ok(event)) => {
111                        // Only process create and modify events
112                        let dominated = matches!(
113                            event.kind,
114                            EventKind::Create(_) | EventKind::Modify(_)
115                        );
116
117                        if !dominated {
118                            continue;
119                        }
120
121                        for path in event.paths {
122                            // Skip directories
123                            if path.is_dir() {
124                                continue;
125                            }
126
127                            // Check exclude patterns
128                            if should_exclude(&path, &config.exclude_patterns) {
129                                continue;
130                            }
131
132                            // Check developer allowlist
133                            if allowlist.should_skip_path(&path) {
134                                tracing::trace!(file = %path.display(), "Skipped by allowlist");
135                                continue;
136                            }
137
138                            // Check file size
139                            if let Ok(meta) = std::fs::metadata(&path) {
140                                if meta.len() > config.max_file_size {
141                                    continue;
142                                }
143                                if !meta.is_file() {
144                                    continue;
145                                }
146                            } else {
147                                continue;
148                            }
149
150                            // Send to scan pipeline
151                            if scan_tx.send(path).is_err() {
152                                tracing::warn!("Scan channel closed, stopping watcher");
153                                return;
154                            }
155                        }
156                    }
157                    Ok(Err(e)) => {
158                        tracing::warn!("Watch error: {}", e);
159                    }
160                    Err(std::sync::mpsc::RecvTimeoutError::Timeout) => {
161                        // Normal timeout, check running flag
162                    }
163                    Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => {
164                        tracing::info!("Watcher channel disconnected, stopping");
165                        return;
166                    }
167                }
168            }
169
170            tracing::info!("File watcher stopped");
171        })
172    }
173
174    /// Signal the watcher to stop.
175    pub fn stop(&self) {
176        self.running.store(false, Ordering::Relaxed);
177    }
178}
179
180/// Check if a path matches any exclude pattern.
181pub fn should_exclude(path: &Path, patterns: &[String]) -> bool {
182    let path_str = path.to_string_lossy();
183
184    for pattern in patterns {
185        // Extension match: *.ext
186        if let Some(ext_pat) = pattern.strip_prefix("*.") {
187            if let Some(ext) = path.extension() {
188                if ext.to_string_lossy().eq_ignore_ascii_case(ext_pat) {
189                    return true;
190                }
191            }
192            continue;
193        }
194
195        // Component match: check if any path component equals the pattern
196        for component in path.components() {
197            if let std::path::Component::Normal(c) = component {
198                if c.to_string_lossy() == pattern.as_str() {
199                    return true;
200                }
201            }
202        }
203
204        // Substring match fallback
205        if path_str.contains(pattern.as_str()) {
206            return true;
207        }
208    }
209
210    false
211}
212
213#[cfg(test)]
214mod tests {
215    use super::*;
216
217    #[test]
218    fn exclude_node_modules() {
219        let patterns = vec!["node_modules".to_string()];
220        assert!(should_exclude(
221            Path::new("/home/user/project/node_modules/express/index.js"),
222            &patterns
223        ));
224    }
225
226    #[test]
227    fn exclude_deep_target() {
228        let patterns = vec!["target".to_string()];
229        assert!(should_exclude(
230            Path::new("/home/user/rust-project/target/debug/myapp"),
231            &patterns
232        ));
233    }
234
235    #[test]
236    fn exclude_object_extension() {
237        let patterns = vec!["*.o".to_string(), "*.a".to_string()];
238        assert!(should_exclude(Path::new("/tmp/build/main.o"), &patterns));
239        assert!(should_exclude(Path::new("/tmp/lib/libz.a"), &patterns));
240    }
241
242    #[test]
243    fn normal_file_not_excluded() {
244        let patterns = vec![
245            "node_modules".to_string(),
246            "target".to_string(),
247            "*.o".to_string(),
248        ];
249        assert!(!should_exclude(
250            Path::new("/home/user/Documents/report.pdf"),
251            &patterns
252        ));
253        assert!(!should_exclude(
254            Path::new("/tmp/download.exe"),
255            &patterns
256        ));
257    }
258
259    #[test]
260    fn config_defaults() {
261        let config = WatcherConfig::default();
262        assert!(!config.watch_paths.is_empty());
263        assert!(!config.exclude_patterns.is_empty());
264        assert!(config.max_file_size > 0);
265        assert!(config.debounce_ms > 0);
266    }
267
268    #[test]
269    fn config_serialization_roundtrip() {
270        let config = WatcherConfig::default();
271        let json = serde_json::to_string(&config).unwrap();
272        let config2: WatcherConfig = serde_json::from_str(&json).unwrap();
273        assert_eq!(config.max_file_size, config2.max_file_size);
274        assert_eq!(config.debounce_ms, config2.debounce_ms);
275    }
276
277    #[test]
278    fn exclude_git_directory() {
279        let patterns = vec![".git".to_string()];
280        assert!(should_exclude(
281            Path::new("/home/user/repo/.git/objects/pack/pack-abc.idx"),
282            &patterns
283        ));
284    }
285
286    #[test]
287    fn exclude_pycache() {
288        let patterns = vec!["__pycache__".to_string()];
289        assert!(should_exclude(
290            Path::new("/home/user/app/__pycache__/module.cpython-311.pyc"),
291            &patterns
292        ));
293    }
294}