Skip to main content

seekr_code/scanner/
watcher.rs

1//! File system watcher.
2//!
3//! Uses the `notify` crate to watch for file changes and
4//! emit events through a tokio channel for incremental index updates.
5
6use std::path::{Path, PathBuf};
7use std::sync::mpsc;
8use std::time::Duration;
9
10use notify::{
11    Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher,
12};
13
14use crate::error::ScannerError;
15
16/// A file system event suitable for incremental indexing.
17#[derive(Debug, Clone)]
18pub enum FileEvent {
19    /// A file was created or modified.
20    Changed(PathBuf),
21    /// A file was deleted.
22    Deleted(PathBuf),
23}
24
25/// A file watcher that monitors a directory tree for changes.
26pub struct FileWatcher {
27    _watcher: RecommendedWatcher,
28    receiver: mpsc::Receiver<FileEvent>,
29}
30
31impl FileWatcher {
32    /// Create a new file watcher for the given directory.
33    ///
34    /// Starts watching recursively. File events can be received
35    /// via the `recv` method.
36    pub fn new(watch_path: &Path) -> Result<Self, ScannerError> {
37        let (tx, rx) = mpsc::channel();
38
39        let sender = tx.clone();
40        let mut watcher = RecommendedWatcher::new(
41            move |res: Result<Event, notify::Error>| {
42                match res {
43                    Ok(event) => {
44                        let file_events = convert_event(event);
45                        for fe in file_events {
46                            let _ = sender.send(fe);
47                        }
48                    }
49                    Err(e) => {
50                        tracing::warn!("File watcher error: {}", e);
51                    }
52                }
53            },
54            Config::default().with_poll_interval(Duration::from_secs(2)),
55        )
56        .map_err(|e| ScannerError::WatchError(format!("Failed to create watcher: {}", e)))?;
57
58        watcher
59            .watch(watch_path, RecursiveMode::Recursive)
60            .map_err(|e| {
61                ScannerError::WatchError(format!(
62                    "Failed to watch '{}': {}",
63                    watch_path.display(),
64                    e,
65                ))
66            })?;
67
68        tracing::info!(path = %watch_path.display(), "File watcher started");
69
70        Ok(Self {
71            _watcher: watcher,
72            receiver: rx,
73        })
74    }
75
76    /// Receive the next file event, blocking until one is available.
77    pub fn recv(&self) -> Option<FileEvent> {
78        self.receiver.recv().ok()
79    }
80
81    /// Try to receive a file event without blocking.
82    pub fn try_recv(&self) -> Option<FileEvent> {
83        self.receiver.try_recv().ok()
84    }
85
86    /// Drain all pending events (non-blocking).
87    pub fn drain_events(&self) -> Vec<FileEvent> {
88        let mut events = Vec::new();
89        while let Ok(event) = self.receiver.try_recv() {
90            events.push(event);
91        }
92        events
93    }
94
95    /// Receive with timeout.
96    pub fn recv_timeout(&self, timeout: Duration) -> Option<FileEvent> {
97        self.receiver.recv_timeout(timeout).ok()
98    }
99}
100
101/// Convert a notify Event into our FileEvent(s).
102fn convert_event(event: Event) -> Vec<FileEvent> {
103    let mut file_events = Vec::new();
104
105    match event.kind {
106        EventKind::Create(_) | EventKind::Modify(_) => {
107            for path in event.paths {
108                if path.is_file() {
109                    file_events.push(FileEvent::Changed(path));
110                }
111            }
112        }
113        EventKind::Remove(_) => {
114            for path in event.paths {
115                file_events.push(FileEvent::Deleted(path));
116            }
117        }
118        _ => {
119            // Ignore access, other events
120        }
121    }
122
123    file_events
124}
125
126/// Create an async watcher that sends events through a tokio channel.
127///
128/// This is useful for integration with the async Daemon mode.
129pub fn start_async_watcher(
130    watch_path: &Path,
131) -> Result<
132    (
133        RecommendedWatcher,
134        tokio::sync::mpsc::UnboundedReceiver<FileEvent>,
135    ),
136    ScannerError,
137> {
138    let (tx, rx) = tokio::sync::mpsc::unbounded_channel();
139
140    let sender = tx;
141    let mut watcher = RecommendedWatcher::new(
142        move |res: Result<Event, notify::Error>| {
143            match res {
144                Ok(event) => {
145                    let file_events = convert_event(event);
146                    for fe in file_events {
147                        let _ = sender.send(fe);
148                    }
149                }
150                Err(e) => {
151                    tracing::warn!("File watcher error: {}", e);
152                }
153            }
154        },
155        Config::default().with_poll_interval(Duration::from_secs(2)),
156    )
157    .map_err(|e| ScannerError::WatchError(format!("Failed to create async watcher: {}", e)))?;
158
159    watcher
160        .watch(watch_path, RecursiveMode::Recursive)
161        .map_err(|e| {
162            ScannerError::WatchError(format!(
163                "Failed to watch '{}': {}",
164                watch_path.display(),
165                e,
166            ))
167        })?;
168
169    tracing::info!(path = %watch_path.display(), "Async file watcher started");
170
171    Ok((watcher, rx))
172}
173
174/// Deduplicate events by path, keeping only the latest event type per path.
175pub fn dedup_events(events: Vec<FileEvent>) -> Vec<FileEvent> {
176    use std::collections::HashMap;
177
178    let mut latest: HashMap<PathBuf, FileEvent> = HashMap::new();
179
180    for event in events {
181        let path = match &event {
182            FileEvent::Changed(p) => p.clone(),
183            FileEvent::Deleted(p) => p.clone(),
184        };
185        latest.insert(path, event);
186    }
187
188    latest.into_values().collect()
189}
190
191#[cfg(test)]
192mod tests {
193    use super::*;
194
195    #[test]
196    fn test_dedup_events() {
197        let events = vec![
198            FileEvent::Changed(PathBuf::from("/a/b.rs")),
199            FileEvent::Changed(PathBuf::from("/a/b.rs")),
200            FileEvent::Deleted(PathBuf::from("/a/c.rs")),
201            FileEvent::Changed(PathBuf::from("/a/c.rs")),
202        ];
203
204        let deduped = dedup_events(events);
205        assert_eq!(deduped.len(), 2);
206    }
207
208    #[test]
209    fn test_convert_create_event() {
210        let dir = tempfile::tempdir().unwrap();
211        let file_path = dir.path().join("test.rs");
212        std::fs::write(&file_path, "fn main() {}").unwrap();
213
214        let event = Event {
215            kind: EventKind::Create(notify::event::CreateKind::File),
216            paths: vec![file_path.clone()],
217            attrs: Default::default(),
218        };
219
220        let file_events = convert_event(event);
221        assert_eq!(file_events.len(), 1);
222        match &file_events[0] {
223            FileEvent::Changed(p) => assert_eq!(p, &file_path),
224            _ => panic!("Expected Changed event"),
225        }
226    }
227
228    #[test]
229    fn test_convert_remove_event() {
230        let event = Event {
231            kind: EventKind::Remove(notify::event::RemoveKind::File),
232            paths: vec![PathBuf::from("/a/deleted.rs")],
233            attrs: Default::default(),
234        };
235
236        let file_events = convert_event(event);
237        assert_eq!(file_events.len(), 1);
238        match &file_events[0] {
239            FileEvent::Deleted(p) => assert_eq!(p, &PathBuf::from("/a/deleted.rs")),
240            _ => panic!("Expected Deleted event"),
241        }
242    }
243
244    #[test]
245    fn test_watcher_creation() {
246        let dir = tempfile::tempdir().unwrap();
247        let watcher = FileWatcher::new(dir.path());
248        assert!(watcher.is_ok(), "Should be able to create a watcher");
249    }
250}