1use super::event::*;
8use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
10use inotify as inotify_sys;
11use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsStr;
15use std::fs::metadata;
16use std::os::unix::io::AsRawFd;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::thread;
20use walkdir::WalkDir;
21
22const INOTIFY: mio::Token = mio::Token(0);
23const MESSAGE: mio::Token = mio::Token(1);
24
25struct EventLoop {
32 running: bool,
33 poll: mio::Poll,
34 event_loop_waker: Arc<mio::Waker>,
35 event_loop_tx: Sender<EventLoopMsg>,
36 event_loop_rx: Receiver<EventLoopMsg>,
37 inotify: Option<Inotify>,
38 event_handler: Box<dyn EventHandler>,
39 watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
41 paths: HashMap<WatchDescriptor, PathBuf>,
42 rename_event: Option<Event>,
43}
44
45#[derive(Debug)]
47pub struct INotifyWatcher {
48 channel: Sender<EventLoopMsg>,
49 waker: Arc<mio::Waker>,
50}
51
52enum EventLoopMsg {
53 AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
54 RemoveWatch(PathBuf, Sender<Result<()>>),
55 Shutdown,
56 Configure(Config, BoundSender<Result<bool>>),
57}
58
59#[inline]
60fn add_watch_by_event(
61 path: &Option<PathBuf>,
62 event: &inotify_sys::Event<&OsStr>,
63 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
64 add_watches: &mut Vec<PathBuf>,
65) {
66 if let Some(ref path) = *path {
67 if event.mask.contains(EventMask::ISDIR) {
68 if let Some(parent_path) = path.parent() {
69 if let Some(&(_, _, is_recursive, _)) = watches.get(parent_path) {
70 if is_recursive {
71 add_watches.push(path.to_owned());
72 }
73 }
74 }
75 }
76 }
77}
78
79#[inline]
80fn remove_watch_by_event(
81 path: &Option<PathBuf>,
82 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
83 remove_watches: &mut Vec<PathBuf>,
84) {
85 if let Some(ref path) = *path {
86 if watches.contains_key(path) {
87 remove_watches.push(path.to_owned());
88 }
89 }
90}
91
92impl EventLoop {
93 pub fn new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self> {
94 let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
95 let poll = mio::Poll::new()?;
96
97 let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
98
99 let inotify_fd = inotify.as_raw_fd();
100 let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
101 poll.registry()
102 .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
103
104 let event_loop = EventLoop {
105 running: true,
106 poll,
107 event_loop_waker,
108 event_loop_tx,
109 event_loop_rx,
110 inotify: Some(inotify),
111 event_handler,
112 watches: HashMap::new(),
113 paths: HashMap::new(),
114 rename_event: None,
115 };
116 Ok(event_loop)
117 }
118
119 pub fn run(self) {
121 let _ = thread::Builder::new()
122 .name("notify-rs inotify loop".to_string())
123 .spawn(|| self.event_loop_thread());
124 }
125
126 fn event_loop_thread(mut self) {
127 let mut events = mio::Events::with_capacity(16);
128 loop {
129 match self.poll.poll(&mut events, None) {
131 Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
132 }
135 Err(e) => panic!("poll failed: {}", e),
136 Ok(()) => {}
137 }
138
139 for event in &events {
141 self.handle_event(event);
142 }
143
144 if !self.running {
146 break;
147 }
148 }
149 }
150
151 fn handle_event(&mut self, event: &mio::event::Event) {
153 match event.token() {
154 MESSAGE => {
155 self.handle_messages()
157 }
158 INOTIFY => {
159 self.handle_inotify()
161 }
162 _ => unreachable!(),
163 }
164 }
165
166 fn handle_messages(&mut self) {
167 while let Ok(msg) = self.event_loop_rx.try_recv() {
168 match msg {
169 EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
170 let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
171 }
172 EventLoopMsg::RemoveWatch(path, tx) => {
173 let _ = tx.send(self.remove_watch(path, false));
174 }
175 EventLoopMsg::Shutdown => {
176 let _ = self.remove_all_watches();
177 if let Some(inotify) = self.inotify.take() {
178 let _ = inotify.close();
179 }
180 self.running = false;
181 break;
182 }
183 EventLoopMsg::Configure(config, tx) => {
184 self.configure_raw_mode(config, tx);
185 }
186 }
187 }
188 }
189
190 fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
191 tx.send(Ok(false))
192 .expect("configuration channel disconnected");
193 }
194
195 fn handle_inotify(&mut self) {
196 let mut add_watches = Vec::new();
197 let mut remove_watches = Vec::new();
198
199 if let Some(ref mut inotify) = self.inotify {
200 let mut buffer = [0; 1024];
201 loop {
203 match inotify.read_events(&mut buffer) {
204 Ok(events) => {
205 let mut num_events = 0;
206 for event in events {
207 log::trace!("inotify event: {event:?}");
208
209 num_events += 1;
210 if event.mask.contains(EventMask::Q_OVERFLOW) {
211 let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
212 self.event_handler.handle_event(ev);
213 }
214
215 let path = match event.name {
216 Some(name) => self.paths.get(&event.wd).map(|root| root.join(name)),
217 None => self.paths.get(&event.wd).cloned(),
218 };
219
220 let mut evs = Vec::new();
221
222 if event.mask.contains(EventMask::MOVED_FROM) {
223 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
224
225 let event = Event::new(EventKind::Modify(ModifyKind::Name(
226 RenameMode::From,
227 )))
228 .add_some_path(path.clone())
229 .set_tracker(event.cookie as usize);
230
231 self.rename_event = Some(event.clone());
232
233 evs.push(event);
234 } else if event.mask.contains(EventMask::MOVED_TO) {
235 evs.push(
236 Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
237 .set_tracker(event.cookie as usize)
238 .add_some_path(path.clone()),
239 );
240
241 let trackers_match = self
242 .rename_event
243 .as_ref()
244 .and_then(|e| e.tracker())
245 .map_or(false, |from_tracker| {
246 from_tracker == event.cookie as usize
247 });
248
249 if trackers_match {
250 let rename_event = self.rename_event.take().unwrap(); evs.push(
252 Event::new(EventKind::Modify(ModifyKind::Name(
253 RenameMode::Both,
254 )))
255 .set_tracker(event.cookie as usize)
256 .add_some_path(rename_event.paths.first().cloned())
257 .add_some_path(path.clone()),
258 );
259 }
260 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
261 }
262 if event.mask.contains(EventMask::MOVE_SELF) {
263 evs.push(
264 Event::new(EventKind::Modify(ModifyKind::Name(
265 RenameMode::From,
266 )))
267 .add_some_path(path.clone()),
268 );
269 }
273 if event.mask.contains(EventMask::CREATE) {
274 evs.push(
275 Event::new(EventKind::Create(
276 if event.mask.contains(EventMask::ISDIR) {
277 CreateKind::Folder
278 } else {
279 CreateKind::File
280 },
281 ))
282 .add_some_path(path.clone()),
283 );
284 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
285 }
286 if event.mask.contains(EventMask::DELETE) {
287 evs.push(
288 Event::new(EventKind::Remove(
289 if event.mask.contains(EventMask::ISDIR) {
290 RemoveKind::Folder
291 } else {
292 RemoveKind::File
293 },
294 ))
295 .add_some_path(path.clone()),
296 );
297 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
298 }
299 if event.mask.contains(EventMask::DELETE_SELF) {
300 let remove_kind = match &path {
301 Some(watched_path) => {
302 let current_watch = self.watches.get(watched_path);
303 match current_watch {
304 Some(&(_, _, _, true)) => RemoveKind::Folder,
305 Some(&(_, _, _, false)) => RemoveKind::File,
306 None => RemoveKind::Other,
307 }
308 }
309 None => {
310 log::trace!(
311 "No patch for DELETE_SELF event, may be a bug?"
312 );
313 RemoveKind::Other
314 }
315 };
316 evs.push(
317 Event::new(EventKind::Remove(remove_kind))
318 .add_some_path(path.clone()),
319 );
320 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
321 }
322 if event.mask.contains(EventMask::MODIFY) {
323 evs.push(
324 Event::new(EventKind::Modify(ModifyKind::Data(
325 DataChange::Any,
326 )))
327 .add_some_path(path.clone()),
328 );
329 }
330 if event.mask.contains(EventMask::CLOSE_WRITE) {
331 evs.push(
332 Event::new(EventKind::Access(AccessKind::Close(
333 AccessMode::Write,
334 )))
335 .add_some_path(path.clone()),
336 );
337 }
338 if event.mask.contains(EventMask::CLOSE_NOWRITE) {
339 evs.push(
340 Event::new(EventKind::Access(AccessKind::Close(
341 AccessMode::Read,
342 )))
343 .add_some_path(path.clone()),
344 );
345 }
346 if event.mask.contains(EventMask::ATTRIB) {
347 evs.push(
348 Event::new(EventKind::Modify(ModifyKind::Metadata(
349 MetadataKind::Any,
350 )))
351 .add_some_path(path.clone()),
352 );
353 }
354 if event.mask.contains(EventMask::OPEN) {
355 evs.push(
356 Event::new(EventKind::Access(AccessKind::Open(
357 AccessMode::Any,
358 )))
359 .add_some_path(path.clone()),
360 );
361 }
362
363 for ev in evs {
364 self.event_handler.handle_event(Ok(ev));
365 }
366 }
367
368 if num_events == 0 {
370 break;
371 }
372 }
373 Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
374 break;
376 }
377 Err(e) => {
378 self.event_handler.handle_event(Err(Error::io(e)));
379 }
380 }
381 }
382 }
383
384 for path in remove_watches {
385 self.remove_watch(path, true).ok();
386 }
387
388 for path in add_watches {
389 self.add_watch(path, true, false).ok();
390 }
391 }
392
393 fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
394 if !is_recursive || !metadata(&path).map_err(Error::io_watch)?.is_dir() {
397 return self.add_single_watch(path, false, true);
398 }
399
400 for entry in WalkDir::new(path)
401 .follow_links(true)
402 .into_iter()
403 .filter_map(filter_dir)
404 {
405 self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
406 watch_self = false;
407 }
408
409 Ok(())
410 }
411
412 fn add_single_watch(
413 &mut self,
414 path: PathBuf,
415 is_recursive: bool,
416 watch_self: bool,
417 ) -> Result<()> {
418 let mut watchmask = WatchMask::ATTRIB
419 | WatchMask::CREATE
420 | WatchMask::DELETE
421 | WatchMask::CLOSE_WRITE
422 | WatchMask::MODIFY
423 | WatchMask::MOVED_FROM
424 | WatchMask::MOVED_TO;
425
426 if watch_self {
427 watchmask.insert(WatchMask::DELETE_SELF);
428 watchmask.insert(WatchMask::MOVE_SELF);
429 }
430
431 if let Some(&(_, old_watchmask, _, _)) = self.watches.get(&path) {
432 watchmask.insert(old_watchmask);
433 watchmask.insert(WatchMask::MASK_ADD);
434 }
435
436 if let Some(ref mut inotify) = self.inotify {
437 log::trace!("adding inotify watch: {}", path.display());
438
439 match inotify.watches().add(&path, watchmask) {
440 Err(e) => {
441 Err(if e.raw_os_error() == Some(libc::ENOSPC) {
442 Error::new(ErrorKind::MaxFilesWatch)
444 } else {
445 Error::io(e)
446 }
447 .add_path(path))
448 }
449 Ok(w) => {
450 watchmask.remove(WatchMask::MASK_ADD);
451 let is_dir = metadata(&path).map_err(Error::io)?.is_dir();
452 self.watches
453 .insert(path.clone(), (w.clone(), watchmask, is_recursive, is_dir));
454 self.paths.insert(w, path);
455 Ok(())
456 }
457 }
458 } else {
459 Ok(())
460 }
461 }
462
463 fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
464 match self.watches.remove(&path) {
465 None => return Err(Error::watch_not_found().add_path(path)),
466 Some((w, _, is_recursive, _)) => {
467 if let Some(ref mut inotify) = self.inotify {
468 let mut inotify_watches = inotify.watches();
469 log::trace!("removing inotify watch: {}", path.display());
470
471 inotify_watches
472 .remove(w.clone())
473 .map_err(|e| Error::io(e).add_path(path.clone()))?;
474 self.paths.remove(&w);
475
476 if is_recursive || remove_recursive {
477 let mut remove_list = Vec::new();
478 for (w, p) in &self.paths {
479 if p.starts_with(&path) {
480 inotify_watches
481 .remove(w.clone())
482 .map_err(|e| Error::io(e).add_path(p.into()))?;
483 self.watches.remove(p);
484 remove_list.push(w.clone());
485 }
486 }
487 for w in remove_list {
488 self.paths.remove(&w);
489 }
490 }
491 }
492 }
493 }
494 Ok(())
495 }
496
497 fn remove_all_watches(&mut self) -> Result<()> {
498 if let Some(ref mut inotify) = self.inotify {
499 let mut inotify_watches = inotify.watches();
500 for (w, p) in &self.paths {
501 inotify_watches
502 .remove(w.clone())
503 .map_err(|e| Error::io(e).add_path(p.into()))?;
504 }
505 self.watches.clear();
506 self.paths.clear();
507 }
508 Ok(())
509 }
510}
511
512fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
514 if let Ok(e) = e {
515 if let Ok(metadata) = e.metadata() {
516 if metadata.is_dir() {
517 return Some(e);
518 }
519 }
520 }
521 None
522}
523
524impl INotifyWatcher {
525 fn from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self> {
526 let inotify = Inotify::init()?;
527 let event_loop = EventLoop::new(inotify, event_handler)?;
528 let channel = event_loop.event_loop_tx.clone();
529 let waker = event_loop.event_loop_waker.clone();
530 event_loop.run();
531 Ok(INotifyWatcher { channel, waker })
532 }
533
534 fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
535 let pb = if path.is_absolute() {
536 path.to_owned()
537 } else {
538 let p = env::current_dir().map_err(Error::io)?;
539 p.join(path)
540 };
541 let (tx, rx) = unbounded();
542 let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
543
544 self.channel.send(msg).unwrap();
546 self.waker.wake().unwrap();
547 rx.recv().unwrap()
548 }
549
550 fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
551 let pb = if path.is_absolute() {
552 path.to_owned()
553 } else {
554 let p = env::current_dir().map_err(Error::io)?;
555 p.join(path)
556 };
557 let (tx, rx) = unbounded();
558 let msg = EventLoopMsg::RemoveWatch(pb, tx);
559
560 self.channel.send(msg).unwrap();
562 self.waker.wake().unwrap();
563 rx.recv().unwrap()
564 }
565}
566
567impl Watcher for INotifyWatcher {
568 fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
570 Self::from_event_handler(Box::new(event_handler))
571 }
572
573 fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
574 self.watch_inner(path, recursive_mode)
575 }
576
577 fn unwatch(&mut self, path: &Path) -> Result<()> {
578 self.unwatch_inner(path)
579 }
580
581 fn configure(&mut self, config: Config) -> Result<bool> {
582 let (tx, rx) = bounded(1);
583 self.channel.send(EventLoopMsg::Configure(config, tx))?;
584 self.waker.wake()?;
585 rx.recv()?
586 }
587
588 fn kind() -> crate::WatcherKind {
589 crate::WatcherKind::Inotify
590 }
591}
592
593impl Drop for INotifyWatcher {
594 fn drop(&mut self) {
595 self.channel.send(EventLoopMsg::Shutdown).unwrap();
597 self.waker.wake().unwrap();
598 }
599}
600
601#[test]
602fn inotify_watcher_is_send_and_sync() {
603 fn check<T: Send + Sync>() {}
604 check::<INotifyWatcher>();
605}