Skip to main content

seekr_code/scanner/
watcher.rs

1//! File system watcher.
2//!
3//! Uses the `notify` crate to watch for file changes and
4//! emit events through a tokio channel for incremental index updates.
5
6use std::path::{Path, PathBuf};
7use std::sync::mpsc;
8use std::time::Duration;
9
10use notify::{Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher};
11
12use crate::error::ScannerError;
13
14/// A file system event suitable for incremental indexing.
15#[derive(Debug, Clone)]
16pub enum FileEvent {
17    /// A file was created or modified.
18    Changed(PathBuf),
19    /// A file was deleted.
20    Deleted(PathBuf),
21}
22
23/// A file watcher that monitors a directory tree for changes.
24pub struct FileWatcher {
25    _watcher: RecommendedWatcher,
26    receiver: mpsc::Receiver<FileEvent>,
27}
28
29impl FileWatcher {
30    /// Create a new file watcher for the given directory.
31    ///
32    /// Starts watching recursively. File events can be received
33    /// via the `recv` method.
34    pub fn new(watch_path: &Path) -> Result<Self, ScannerError> {
35        let (tx, rx) = mpsc::channel();
36
37        let sender = tx.clone();
38        let mut watcher = RecommendedWatcher::new(
39            move |res: Result<Event, notify::Error>| match res {
40                Ok(event) => {
41                    let file_events = convert_event(event);
42                    for fe in file_events {
43                        let _ = sender.send(fe);
44                    }
45                }
46                Err(e) => {
47                    tracing::warn!("File watcher error: {}", e);
48                }
49            },
50            Config::default().with_poll_interval(Duration::from_secs(2)),
51        )
52        .map_err(|e| ScannerError::WatchError(format!("Failed to create watcher: {}", e)))?;
53
54        watcher
55            .watch(watch_path, RecursiveMode::Recursive)
56            .map_err(|e| {
57                ScannerError::WatchError(format!(
58                    "Failed to watch '{}': {}",
59                    watch_path.display(),
60                    e,
61                ))
62            })?;
63
64        tracing::info!(path = %watch_path.display(), "File watcher started");
65
66        Ok(Self {
67            _watcher: watcher,
68            receiver: rx,
69        })
70    }
71
72    /// Receive the next file event, blocking until one is available.
73    pub fn recv(&self) -> Option<FileEvent> {
74        self.receiver.recv().ok()
75    }
76
77    /// Try to receive a file event without blocking.
78    pub fn try_recv(&self) -> Option<FileEvent> {
79        self.receiver.try_recv().ok()
80    }
81
82    /// Drain all pending events (non-blocking).
83    pub fn drain_events(&self) -> Vec<FileEvent> {
84        let mut events = Vec::new();
85        while let Ok(event) = self.receiver.try_recv() {
86            events.push(event);
87        }
88        events
89    }
90
91    /// Receive with timeout.
92    pub fn recv_timeout(&self, timeout: Duration) -> Option<FileEvent> {
93        self.receiver.recv_timeout(timeout).ok()
94    }
95}
96
97/// Convert a notify Event into our FileEvent(s).
98fn convert_event(event: Event) -> Vec<FileEvent> {
99    let mut file_events = Vec::new();
100
101    match event.kind {
102        EventKind::Create(_) | EventKind::Modify(_) => {
103            for path in event.paths {
104                if path.is_file() {
105                    file_events.push(FileEvent::Changed(path));
106                }
107            }
108        }
109        EventKind::Remove(_) => {
110            for path in event.paths {
111                file_events.push(FileEvent::Deleted(path));
112            }
113        }
114        _ => {
115            // Ignore access, other events
116        }
117    }
118
119    file_events
120}
121
122/// Create an async watcher that sends events through a tokio channel.
123///
124/// This is useful for integration with the async Daemon mode.
125pub fn start_async_watcher(
126    watch_path: &Path,
127) -> Result<
128    (
129        RecommendedWatcher,
130        tokio::sync::mpsc::UnboundedReceiver<FileEvent>,
131    ),
132    ScannerError,
133> {
134    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
135
136    let sender = tx;
137    let mut watcher = RecommendedWatcher::new(
138        move |res: Result<Event, notify::Error>| match res {
139            Ok(event) => {
140                let file_events = convert_event(event);
141                for fe in file_events {
142                    let _ = sender.send(fe);
143                }
144            }
145            Err(e) => {
146                tracing::warn!("File watcher error: {}", e);
147            }
148        },
149        Config::default().with_poll_interval(Duration::from_secs(2)),
150    )
151    .map_err(|e| ScannerError::WatchError(format!("Failed to create async watcher: {}", e)))?;
152
153    watcher
154        .watch(watch_path, RecursiveMode::Recursive)
155        .map_err(|e| {
156            ScannerError::WatchError(format!("Failed to watch '{}': {}", watch_path.display(), e,))
157        })?;
158
159    tracing::info!(path = %watch_path.display(), "Async file watcher started");
160
161    Ok((watcher, rx))
162}
163
164/// Deduplicate events by path, keeping only the latest event type per path.
165pub fn dedup_events(events: Vec<FileEvent>) -> Vec<FileEvent> {
166    use std::collections::HashMap;
167
168    let mut latest: HashMap<PathBuf, FileEvent> = HashMap::new();
169
170    for event in events {
171        let path = match &event {
172            FileEvent::Changed(p) => p.clone(),
173            FileEvent::Deleted(p) => p.clone(),
174        };
175        latest.insert(path, event);
176    }
177
178    latest.into_values().collect()
179}
180
181#[cfg(test)]
182mod tests {
183    use super::*;
184
185    #[test]
186    fn test_dedup_events() {
187        let events = vec![
188            FileEvent::Changed(PathBuf::from("/a/b.rs")),
189            FileEvent::Changed(PathBuf::from("/a/b.rs")),
190            FileEvent::Deleted(PathBuf::from("/a/c.rs")),
191            FileEvent::Changed(PathBuf::from("/a/c.rs")),
192        ];
193
194        let deduped = dedup_events(events);
195        assert_eq!(deduped.len(), 2);
196    }
197
198    #[test]
199    fn test_convert_create_event() {
200        let dir = tempfile::tempdir().unwrap();
201        let file_path = dir.path().join("test.rs");
202        std::fs::write(&file_path, "fn main() {}").unwrap();
203
204        let event = Event {
205            kind: EventKind::Create(notify::event::CreateKind::File),
206            paths: vec![file_path.clone()],
207            attrs: Default::default(),
208        };
209
210        let file_events = convert_event(event);
211        assert_eq!(file_events.len(), 1);
212        match &file_events[0] {
213            FileEvent::Changed(p) => assert_eq!(p, &file_path),
214            _ => panic!("Expected Changed event"),
215        }
216    }
217
218    #[test]
219    fn test_convert_remove_event() {
220        let event = Event {
221            kind: EventKind::Remove(notify::event::RemoveKind::File),
222            paths: vec![PathBuf::from("/a/deleted.rs")],
223            attrs: Default::default(),
224        };
225
226        let file_events = convert_event(event);
227        assert_eq!(file_events.len(), 1);
228        match &file_events[0] {
229            FileEvent::Deleted(p) => assert_eq!(p, &PathBuf::from("/a/deleted.rs")),
230            _ => panic!("Expected Deleted event"),
231        }
232    }
233
234    #[test]
235    fn test_watcher_creation() {
236        let dir = tempfile::tempdir().unwrap();
237        let watcher = FileWatcher::new(dir.path());
238        assert!(watcher.is_ok(), "Should be able to create a watcher");
239    }
240}