1use async_trait::async_trait;
2use notify::{
3 Config, Event, EventKind, RecommendedWatcher, RecursiveMode, Watcher as NotifyWatcherTrait,
4};
5use std::{
6 any::Any,
7 path::{Path, PathBuf},
8 sync::{Arc, Mutex},
9};
10use thiserror::Error;
11use tokio::sync::mpsc;
12
13pub type WatchReceiver = mpsc::UnboundedReceiver<WatchEvent>;
14pub type WatchSender = mpsc::UnboundedSender<WatchEvent>;
15
16#[derive(Debug, Clone, PartialEq, Eq)]
17pub struct WatchEvent {
18 pub path: PathBuf,
19 pub kind: WatchEventKind,
20}
21
22impl WatchEvent {
23 pub fn new(path: PathBuf, kind: WatchEventKind) -> Self {
24 Self { path, kind }
25 }
26}
27
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum WatchEventKind {
30 Created,
31 Modified,
32 Deleted,
33}
34
35#[derive(Error, Debug)]
36pub enum WatchError {
37 #[error("watcher backend error: {0}")]
38 Backend(String),
39}
40
41impl From<std::io::Error> for WatchError {
42 fn from(value: std::io::Error) -> Self {
43 Self::Backend(value.to_string())
44 }
45}
46
47impl From<mpsc::error::SendError<WatchEvent>> for WatchError {
48 fn from(value: mpsc::error::SendError<WatchEvent>) -> Self {
49 Self::Backend(value.to_string())
50 }
51}
52
53impl From<notify::Error> for WatchError {
54 fn from(value: notify::Error) -> Self {
55 Self::Backend(value.to_string())
56 }
57}
58
59pub type WatchHandleRef = Arc<dyn WatchHandle>;
60
61pub struct WatchSubscription {
62 handle: WatchHandleRef,
63 receiver: WatchReceiver,
64}
65
66impl WatchSubscription {
67 pub fn new(handle: WatchHandleRef, receiver: WatchReceiver) -> Self {
68 Self { handle, receiver }
69 }
70
71 pub fn handle(&self) -> WatchHandleRef {
72 Arc::clone(&self.handle)
73 }
74
75 pub fn into_parts(self) -> (WatchHandleRef, WatchReceiver) {
76 (self.handle, self.receiver)
77 }
78}
79
80pub trait WatchHandle: Send + Sync {
81 fn stop(&self);
82 fn as_any(&self) -> &dyn Any;
83}
84
85#[async_trait]
86pub trait Watcher: Send + Sync {
87 async fn watch(&self, repo_path: PathBuf) -> Result<WatchSubscription, WatchError>;
88}
89
90pub type WatcherRef = Arc<dyn Watcher>;
91
92#[derive(Debug, Default, Clone, Copy)]
94pub struct ManualWatcher;
95
96#[derive(Debug)]
97pub struct ManualWatchHandle {
98 repo_path: PathBuf,
99 sender: Mutex<Option<WatchSender>>,
100}
101
102impl ManualWatcher {
103 pub fn new() -> Self {
104 Self
105 }
106}
107
108#[async_trait]
109impl Watcher for ManualWatcher {
110 async fn watch(&self, repo_path: PathBuf) -> Result<WatchSubscription, WatchError> {
111 let (sender, receiver) = mpsc::unbounded_channel();
112 let handle = Arc::new(ManualWatchHandle {
113 repo_path,
114 sender: Mutex::new(Some(sender)),
115 });
116 let handle_ref: WatchHandleRef = handle;
117 Ok(WatchSubscription::new(handle_ref, receiver))
118 }
119}
120
121impl ManualWatchHandle {
122 pub fn emit(&self, event: WatchEvent) -> Result<(), WatchError> {
123 let sender = self
124 .sender
125 .lock()
126 .map_err(|_| WatchError::Backend("watch handle poisoned".into()))?;
127 match sender.as_ref() {
128 Some(tx) => tx.send(event).map_err(WatchError::from),
129 None => Err(WatchError::Backend("watcher stopped".into())),
130 }
131 }
132
133 pub fn emit_path(
134 &self,
135 kind: WatchEventKind,
136 path: impl Into<PathBuf>,
137 ) -> Result<(), WatchError> {
138 self.emit(WatchEvent::new(path.into(), kind))
139 }
140
141 pub fn repo_path(&self) -> &Path {
142 &self.repo_path
143 }
144}
145
146impl WatchHandle for ManualWatchHandle {
147 fn stop(&self) {
148 if let Ok(mut sender) = self.sender.lock() {
149 sender.take();
150 }
151 }
152
153 fn as_any(&self) -> &dyn Any {
154 self
155 }
156}
157
158#[derive(Debug, Default, Clone, Copy)]
161pub struct NoopWatcher;
162
163#[derive(Debug)]
164pub struct NoopWatchHandle {
165 sender: Mutex<Option<WatchSender>>,
166}
167
168impl NoopWatcher {
169 pub fn new() -> Self {
170 Self
171 }
172}
173
174#[async_trait]
175impl Watcher for NoopWatcher {
176 async fn watch(&self, _repo_path: PathBuf) -> Result<WatchSubscription, WatchError> {
177 let (sender, receiver) = mpsc::unbounded_channel();
178 let handle = Arc::new(NoopWatchHandle {
179 sender: Mutex::new(Some(sender)),
180 });
181 let handle_ref: WatchHandleRef = handle;
182 Ok(WatchSubscription::new(handle_ref, receiver))
183 }
184}
185
186impl WatchHandle for NoopWatchHandle {
187 fn stop(&self) {
188 if let Ok(mut sender) = self.sender.lock() {
189 sender.take();
190 }
191 }
192
193 fn as_any(&self) -> &dyn Any {
194 self
195 }
196}
197
198#[derive(Debug, Default, Clone, Copy)]
200pub struct NotifyWatcher;
201
202#[derive(Debug)]
203pub struct NotifyWatchHandle {
204 watcher: Mutex<Option<RecommendedWatcher>>,
205 sender: Mutex<Option<WatchSender>>,
206}
207
208impl NotifyWatcher {
209 pub fn new() -> Self {
210 Self
211 }
212}
213
214#[async_trait]
215impl Watcher for NotifyWatcher {
216 async fn watch(&self, repo_path: PathBuf) -> Result<WatchSubscription, WatchError> {
217 let (sender, receiver) = mpsc::unbounded_channel();
218 let closure_sender = sender.clone();
219 let mut watcher = RecommendedWatcher::new(
220 move |res: notify::Result<Event>| {
221 if let Err(err) = res.map(|event| dispatch_event(&closure_sender, event)) {
222 eprintln!("notify error: {err}");
223 }
224 },
225 Config::default(),
226 )
227 .map_err(WatchError::from)?;
228 watcher
229 .watch(&repo_path, RecursiveMode::Recursive)
230 .map_err(WatchError::from)?;
231 let handle = Arc::new(NotifyWatchHandle::new(watcher, sender));
232 let handle_ref: WatchHandleRef = handle;
233 Ok(WatchSubscription::new(handle_ref, receiver))
234 }
235}
236
237impl NotifyWatchHandle {
238 fn new(watcher: RecommendedWatcher, sender: WatchSender) -> Self {
239 Self {
240 watcher: Mutex::new(Some(watcher)),
241 sender: Mutex::new(Some(sender)),
242 }
243 }
244}
245
246impl WatchHandle for NotifyWatchHandle {
247 fn stop(&self) {
248 if let Ok(mut watcher) = self.watcher.lock() {
249 watcher.take();
250 }
251 if let Ok(mut sender) = self.sender.lock() {
252 sender.take();
253 }
254 }
255
256 fn as_any(&self) -> &dyn Any {
257 self
258 }
259}
260
261fn dispatch_event(sender: &WatchSender, event: Event) -> Result<(), WatchError> {
262 if let Some(kind) = map_event_kind(&event.kind) {
263 for path in event.paths {
264 sender.send(WatchEvent::new(path, kind.clone()))?;
265 }
266 }
267 Ok(())
268}
269
270fn map_event_kind(kind: &EventKind) -> Option<WatchEventKind> {
271 use notify::event::{CreateKind, ModifyKind, RemoveKind};
272 match kind {
273 EventKind::Create(CreateKind::Any) | EventKind::Create(_) => Some(WatchEventKind::Created),
274 EventKind::Modify(ModifyKind::Data(_))
275 | EventKind::Modify(ModifyKind::Metadata(_))
276 | EventKind::Modify(ModifyKind::Any)
277 | EventKind::Modify(_) => Some(WatchEventKind::Modified),
278 EventKind::Remove(RemoveKind::Any) | EventKind::Remove(_) => Some(WatchEventKind::Deleted),
279 _ => None,
280 }
281}
282
283#[cfg(test)]
284mod tests {
285 use super::*;
286 use std::{fs, path::Path, time::Duration};
287 use tempfile::tempdir;
288 use tokio::{runtime::Runtime, time::timeout};
289
290 #[test]
291 fn manual_watcher_emits_events() {
292 let runtime = Runtime::new().expect("runtime");
293 runtime.block_on(async {
294 let watcher = ManualWatcher::new();
295 let subscription = watcher
296 .watch(PathBuf::from("/tmp/manual"))
297 .await
298 .expect("start watcher");
299 let (handle, mut receiver) = subscription.into_parts();
300 let manual = handle
301 .as_any()
302 .downcast_ref::<ManualWatchHandle>()
303 .expect("manual handle");
304 assert_eq!(manual.repo_path(), Path::new("/tmp/manual"));
305 manual
306 .emit_path(WatchEventKind::Created, "/tmp/manual/foo.txt")
307 .expect("emit event");
308 let event = receiver.recv().await.expect("receive event");
309 assert_eq!(event.kind, WatchEventKind::Created);
310 assert_eq!(event.path, PathBuf::from("/tmp/manual/foo.txt"));
311 });
312 }
313
314 #[test]
315 fn manual_handle_stop_closes_stream() {
316 let runtime = Runtime::new().expect("runtime");
317 runtime.block_on(async {
318 let watcher = ManualWatcher::new();
319 let subscription = watcher
320 .watch(PathBuf::from("/tmp/manual"))
321 .await
322 .expect("start watcher");
323 let (handle, mut receiver) = subscription.into_parts();
324 handle.stop();
325 assert!(receiver.recv().await.is_none());
326 });
327 }
328
329 #[test]
330 fn noop_watcher_stop_closes_receiver() {
331 let runtime = Runtime::new().expect("runtime");
332 runtime.block_on(async {
333 let watcher = NoopWatcher::new();
334 let subscription = watcher
335 .watch(PathBuf::from("/tmp/noop"))
336 .await
337 .expect("start watcher");
338 let (handle, mut receiver) = subscription.into_parts();
339 handle.stop();
340 assert!(receiver.recv().await.is_none());
341 });
342 }
343
344 #[test]
345 fn notify_watcher_emits_real_events() {
346 let runtime = Runtime::new().expect("runtime");
347 runtime.block_on(async {
348 let dir = tempdir().expect("temp dir");
349 let canonical_dir =
351 std::fs::canonicalize(dir.path()).unwrap_or_else(|_| dir.path().to_path_buf());
352 let watcher = NotifyWatcher::new();
353 let subscription = watcher
354 .watch(canonical_dir.clone())
355 .await
356 .expect("start watcher");
357 let (handle, mut receiver) = subscription.into_parts();
358 let file_path = canonical_dir.join("notify.txt");
359 fs::write(&file_path, "data").expect("write file");
360 let event = timeout(Duration::from_secs(2), receiver.recv())
361 .await
362 .expect("watch timed out")
363 .expect("event");
364 assert!(event.path.ends_with(Path::new("notify.txt")));
365 handle.stop();
366 });
367 }
368}