Skip to main content

gity_watch/
lib.rs

1use async_trait::async_trait;
2use notify::{
3    Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait,
4};
5use std::{
6    any::Any,
7    path::{Path, PathBuf},
8    sync::{Arc, Mutex},
9};
10use thiserror::Error;
11use tokio::sync::mpsc;
12
13pub type WatchReceiver = mpsc::UnboundedReceiver<WatchEvent>;
14pub type WatchSender = mpsc::UnboundedSender<WatchEvent>;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct WatchEvent {
18    pub path: PathBuf,
19    pub kind: WatchEventKind,
20}
21
22impl WatchEvent {
23    pub fn new(path: PathBuf, kind: WatchEventKind) -> Self {
24        Self { path, kind }
25    }
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum WatchEventKind {
30    Created,
31    Modified,
32    Deleted,
33}
34
35#[derive(Error, Debug)]
36pub enum WatchError {
37    #[error("watcher backend error: {0}")]
38    Backend(String),
39}
40
41impl From<std::io::Error> for WatchError {
42    fn from(value: std::io::Error) -> Self {
43        Self::Backend(value.to_string())
44    }
45}
46
47impl From<mpsc::error::SendError<WatchEvent>> for WatchError {
48    fn from(value: mpsc::error::SendError<WatchEvent>) -> Self {
49        Self::Backend(value.to_string())
50    }
51}
52
53impl From<notify::Error> for WatchError {
54    fn from(value: notify::Error) -> Self {
55        Self::Backend(value.to_string())
56    }
57}
58
59pub type WatchHandleRef = Arc<dyn WatchHandle>;
60
61pub struct WatchSubscription {
62    handle: WatchHandleRef,
63    receiver: WatchReceiver,
64}
65
66impl WatchSubscription {
67    pub fn new(handle: WatchHandleRef, receiver: WatchReceiver) -> Self {
68        Self { handle, receiver }
69    }
70
71    pub fn handle(&self) -> WatchHandleRef {
72        Arc::clone(&self.handle)
73    }
74
75    pub fn into_parts(self) -> (WatchHandleRef, WatchReceiver) {
76        (self.handle, self.receiver)
77    }
78}
79
80pub trait WatchHandle: Send + Sync {
81    fn stop(&self);
82    fn as_any(&self) -> &dyn Any;
83}
84
85#[async_trait]
86pub trait Watcher: Send + Sync {
87    async fn watch(&self, repo_path: PathBuf) -> Result<WatchSubscription, WatchError>;
88}
89
90pub type WatcherRef = Arc<dyn Watcher>;
91
92/// Manual watcher used in tests to emit events manually.
93#[derive(Debug, Default, Clone, Copy)]
94pub struct ManualWatcher;
95
96#[derive(Debug)]
97pub struct ManualWatchHandle {
98    repo_path: PathBuf,
99    sender: Mutex<Option<WatchSender>>,
100}
101
102impl ManualWatcher {
103    pub fn new() -> Self {
104        Self
105    }
106}
107
108#[async_trait]
109impl Watcher for ManualWatcher {
110    async fn watch(&self, repo_path: PathBuf) -> Result<WatchSubscription, WatchError> {
111        let (sender, receiver) = mpsc::unbounded_channel();
112        let handle = Arc::new(ManualWatchHandle {
113            repo_path,
114            sender: Mutex::new(Some(sender)),
115        });
116        let handle_ref: WatchHandleRef = handle;
117        Ok(WatchSubscription::new(handle_ref, receiver))
118    }
119}
120
121impl ManualWatchHandle {
122    pub fn emit(&self, event: WatchEvent) -> Result<(), WatchError> {
123        let sender = self
124            .sender
125            .lock()
126            .map_err(|_| WatchError::Backend("watch handle poisoned".into()))?;
127        match sender.as_ref() {
128            Some(tx) => tx.send(event).map_err(WatchError::from),
129            None => Err(WatchError::Backend("watcher stopped".into())),
130        }
131    }
132
133    pub fn emit_path(
134        &self,
135        kind: WatchEventKind,
136        path: impl Into<PathBuf>,
137    ) -> Result<(), WatchError> {
138        self.emit(WatchEvent::new(path.into(), kind))
139    }
140
141    pub fn repo_path(&self) -> &Path {
142        &self.repo_path
143    }
144}
145
146impl WatchHandle for ManualWatchHandle {
147    fn stop(&self) {
148        if let Ok(mut sender) = self.sender.lock() {
149            sender.take();
150        }
151    }
152
153    fn as_any(&self) -> &dyn Any {
154        self
155    }
156}
157
158/// Watcher that never emits events; used as a placeholder until platform
159/// backends are wired up.
160#[derive(Debug, Default, Clone, Copy)]
161pub struct NoopWatcher;
162
163#[derive(Debug)]
164pub struct NoopWatchHandle {
165    sender: Mutex<Option<WatchSender>>,
166}
167
168impl NoopWatcher {
169    pub fn new() -> Self {
170        Self
171    }
172}
173
174#[async_trait]
175impl Watcher for NoopWatcher {
176    async fn watch(&self, _repo_path: PathBuf) -> Result<WatchSubscription, WatchError> {
177        let (sender, receiver) = mpsc::unbounded_channel();
178        let handle = Arc::new(NoopWatchHandle {
179            sender: Mutex::new(Some(sender)),
180        });
181        let handle_ref: WatchHandleRef = handle;
182        Ok(WatchSubscription::new(handle_ref, receiver))
183    }
184}
185
186impl WatchHandle for NoopWatchHandle {
187    fn stop(&self) {
188        if let Ok(mut sender) = self.sender.lock() {
189            sender.take();
190        }
191    }
192
193    fn as_any(&self) -> &dyn Any {
194        self
195    }
196}
197
198/// Watcher backed by the `notify` crate for real filesystem monitoring.
199#[derive(Debug, Default, Clone, Copy)]
200pub struct NotifyWatcher;
201
202#[derive(Debug)]
203pub struct NotifyWatchHandle {
204    watcher: Mutex<Option<RecommendedWatcher>>,
205    sender: Mutex<Option<WatchSender>>,
206}
207
208impl NotifyWatcher {
209    pub fn new() -> Self {
210        Self
211    }
212}
213
214#[async_trait]
215impl Watcher for NotifyWatcher {
216    async fn watch(&self, repo_path: PathBuf) -> Result<WatchSubscription, WatchError> {
217        let (sender, receiver) = mpsc::unbounded_channel();
218        let closure_sender = sender.clone();
219        let mut watcher = RecommendedWatcher::new(
220            move |res: notify::Result<Event>| {
221                if let Err(err) = res.map(|event| dispatch_event(&closure_sender, event)) {
222                    eprintln!("notify error: {err}");
223                }
224            },
225            Config::default(),
226        )
227        .map_err(WatchError::from)?;
228        watcher
229            .watch(&repo_path, RecursiveMode::Recursive)
230            .map_err(WatchError::from)?;
231        let handle = Arc::new(NotifyWatchHandle::new(watcher, sender));
232        let handle_ref: WatchHandleRef = handle;
233        Ok(WatchSubscription::new(handle_ref, receiver))
234    }
235}
236
237impl NotifyWatchHandle {
238    fn new(watcher: RecommendedWatcher, sender: WatchSender) -> Self {
239        Self {
240            watcher: Mutex::new(Some(watcher)),
241            sender: Mutex::new(Some(sender)),
242        }
243    }
244}
245
246impl WatchHandle for NotifyWatchHandle {
247    fn stop(&self) {
248        if let Ok(mut watcher) = self.watcher.lock() {
249            watcher.take();
250        }
251        if let Ok(mut sender) = self.sender.lock() {
252            sender.take();
253        }
254    }
255
256    fn as_any(&self) -> &dyn Any {
257        self
258    }
259}
260
261fn dispatch_event(sender: &WatchSender, event: Event) -> Result<(), WatchError> {
262    if let Some(kind) = map_event_kind(&event.kind) {
263        for path in event.paths {
264            sender.send(WatchEvent::new(path, kind.clone()))?;
265        }
266    }
267    Ok(())
268}
269
270fn map_event_kind(kind: &EventKind) -> Option<WatchEventKind> {
271    use notify::event::{CreateKind, ModifyKind, RemoveKind};
272    match kind {
273        EventKind::Create(CreateKind::Any) | EventKind::Create(_) => Some(WatchEventKind::Created),
274        EventKind::Modify(ModifyKind::Data(_))
275        | EventKind::Modify(ModifyKind::Metadata(_))
276        | EventKind::Modify(ModifyKind::Any)
277        | EventKind::Modify(_) => Some(WatchEventKind::Modified),
278        EventKind::Remove(RemoveKind::Any) | EventKind::Remove(_) => Some(WatchEventKind::Deleted),
279        _ => None,
280    }
281}
282
283#[cfg(test)]
284mod tests {
285    use super::*;
286    use std::{fs, path::Path, time::Duration};
287    use tempfile::tempdir;
288    use tokio::{runtime::Runtime, time::timeout};
289
290    #[test]
291    fn manual_watcher_emits_events() {
292        let runtime = Runtime::new().expect("runtime");
293        runtime.block_on(async {
294            let watcher = ManualWatcher::new();
295            let subscription = watcher
296                .watch(PathBuf::from("/tmp/manual"))
297                .await
298                .expect("start watcher");
299            let (handle, mut receiver) = subscription.into_parts();
300            let manual = handle
301                .as_any()
302                .downcast_ref::<ManualWatchHandle>()
303                .expect("manual handle");
304            assert_eq!(manual.repo_path(), Path::new("/tmp/manual"));
305            manual
306                .emit_path(WatchEventKind::Created, "/tmp/manual/foo.txt")
307                .expect("emit event");
308            let event = receiver.recv().await.expect("receive event");
309            assert_eq!(event.kind, WatchEventKind::Created);
310            assert_eq!(event.path, PathBuf::from("/tmp/manual/foo.txt"));
311        });
312    }
313
314    #[test]
315    fn manual_handle_stop_closes_stream() {
316        let runtime = Runtime::new().expect("runtime");
317        runtime.block_on(async {
318            let watcher = ManualWatcher::new();
319            let subscription = watcher
320                .watch(PathBuf::from("/tmp/manual"))
321                .await
322                .expect("start watcher");
323            let (handle, mut receiver) = subscription.into_parts();
324            handle.stop();
325            assert!(receiver.recv().await.is_none());
326        });
327    }
328
329    #[test]
330    fn noop_watcher_stop_closes_receiver() {
331        let runtime = Runtime::new().expect("runtime");
332        runtime.block_on(async {
333            let watcher = NoopWatcher::new();
334            let subscription = watcher
335                .watch(PathBuf::from("/tmp/noop"))
336                .await
337                .expect("start watcher");
338            let (handle, mut receiver) = subscription.into_parts();
339            handle.stop();
340            assert!(receiver.recv().await.is_none());
341        });
342    }
343
344    #[test]
345    fn notify_watcher_emits_real_events() {
346        let runtime = Runtime::new().expect("runtime");
347        runtime.block_on(async {
348            let dir = tempdir().expect("temp dir");
349            // Canonicalize to handle macOS FSEvents symlink resolution
350            let canonical_dir =
351                std::fs::canonicalize(dir.path()).unwrap_or_else(|_| dir.path().to_path_buf());
352            let watcher = NotifyWatcher::new();
353            let subscription = watcher
354                .watch(canonical_dir.clone())
355                .await
356                .expect("start watcher");
357            let (handle, mut receiver) = subscription.into_parts();
358            let file_path = canonical_dir.join("notify.txt");
359            fs::write(&file_path, "data").expect("write file");
360            let event = timeout(Duration::from_secs(2), receiver.recv())
361                .await
362                .expect("watch timed out")
363                .expect("event");
364            assert!(event.path.ends_with(Path::new("notify.txt")));
365            handle.stop();
366        });
367    }
368}