notify_fork/
inotify.rs

1//! Watcher implementation for the inotify Linux API
2//!
3//! The inotify API provides a mechanism for monitoring filesystem events.  Inotify can be used to
4//! monitor individual files, or to monitor directories.  When a directory is monitored, inotify
5//! will return events for the directory itself, and for files inside the directory.
6
7use super::event::*;
8use super::{Config, Error, ErrorKind, EventHandler, RecursiveMode, Result, Watcher};
9use crate::{bounded, unbounded, BoundSender, Receiver, Sender};
10use inotify as inotify_sys;
11use inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use std::collections::HashMap;
13use std::env;
14use std::ffi::OsStr;
15use std::fs::metadata;
16use std::os::unix::io::AsRawFd;
17use std::path::{Path, PathBuf};
18use std::sync::Arc;
19use std::thread;
20use walkdir::WalkDir;
21
22const INOTIFY: mio::Token = mio::Token(0);
23const MESSAGE: mio::Token = mio::Token(1);
24
25// The EventLoop will set up a mio::Poll and use it to wait for the following:
26//
27// -  messages telling it what to do
28//
29// -  events telling it that something has happened on one of the watched files.
30
31struct EventLoop {
32    running: bool,
33    poll: mio::Poll,
34    event_loop_waker: Arc<mio::Waker>,
35    event_loop_tx: Sender<EventLoopMsg>,
36    event_loop_rx: Receiver<EventLoopMsg>,
37    inotify: Option<Inotify>,
38    event_handler: Box<dyn EventHandler>,
39    /// PathBuf -> (WatchDescriptor, WatchMask, is_recursive, is_dir)
40    watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
41    paths: HashMap<WatchDescriptor, PathBuf>,
42    rename_event: Option<Event>,
43}
44
45/// Watcher implementation based on inotify
46#[derive(Debug)]
47pub struct INotifyWatcher {
48    channel: Sender<EventLoopMsg>,
49    waker: Arc<mio::Waker>,
50}
51
52enum EventLoopMsg {
53    AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
54    RemoveWatch(PathBuf, Sender<Result<()>>),
55    Shutdown,
56    Configure(Config, BoundSender<Result<bool>>),
57}
58
59#[inline]
60fn add_watch_by_event(
61    path: &Option<PathBuf>,
62    event: &inotify_sys::Event<&OsStr>,
63    watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
64    add_watches: &mut Vec<PathBuf>,
65) {
66    if let Some(ref path) = *path {
67        if event.mask.contains(EventMask::ISDIR) {
68            if let Some(parent_path) = path.parent() {
69                if let Some(&(_, _, is_recursive, _)) = watches.get(parent_path) {
70                    if is_recursive {
71                        add_watches.push(path.to_owned());
72                    }
73                }
74            }
75        }
76    }
77}
78
79#[inline]
80fn remove_watch_by_event(
81    path: &Option<PathBuf>,
82    watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool, bool)>,
83    remove_watches: &mut Vec<PathBuf>,
84) {
85    if let Some(ref path) = *path {
86        if watches.contains_key(path) {
87            remove_watches.push(path.to_owned());
88        }
89    }
90}
91
92impl EventLoop {
93    pub fn new(inotify: Inotify, event_handler: Box<dyn EventHandler>) -> Result<Self> {
94        let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
95        let poll = mio::Poll::new()?;
96
97        let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
98
99        let inotify_fd = inotify.as_raw_fd();
100        let mut evented_inotify = mio::unix::SourceFd(&inotify_fd);
101        poll.registry()
102            .register(&mut evented_inotify, INOTIFY, mio::Interest::READABLE)?;
103
104        let event_loop = EventLoop {
105            running: true,
106            poll,
107            event_loop_waker,
108            event_loop_tx,
109            event_loop_rx,
110            inotify: Some(inotify),
111            event_handler,
112            watches: HashMap::new(),
113            paths: HashMap::new(),
114            rename_event: None,
115        };
116        Ok(event_loop)
117    }
118
119    // Run the event loop.
120    pub fn run(self) {
121        let _ = thread::Builder::new()
122            .name("notify-rs inotify loop".to_string())
123            .spawn(|| self.event_loop_thread());
124    }
125
126    fn event_loop_thread(mut self) {
127        let mut events = mio::Events::with_capacity(16);
128        loop {
129            // Wait for something to happen.
130            match self.poll.poll(&mut events, None) {
131                Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
132                    // System call was interrupted, we will retry
133                    // TODO: Not covered by tests (to reproduce likely need to setup signal handlers)
134                }
135                Err(e) => panic!("poll failed: {}", e),
136                Ok(()) => {}
137            }
138
139            // Process whatever happened.
140            for event in &events {
141                self.handle_event(event);
142            }
143
144            // Stop, if we're done.
145            if !self.running {
146                break;
147            }
148        }
149    }
150
151    // Handle a single event.
152    fn handle_event(&mut self, event: &mio::event::Event) {
153        match event.token() {
154            MESSAGE => {
155                // The channel is readable - handle messages.
156                self.handle_messages()
157            }
158            INOTIFY => {
159                // inotify has something to tell us.
160                self.handle_inotify()
161            }
162            _ => unreachable!(),
163        }
164    }
165
166    fn handle_messages(&mut self) {
167        while let Ok(msg) = self.event_loop_rx.try_recv() {
168            match msg {
169                EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
170                    let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
171                }
172                EventLoopMsg::RemoveWatch(path, tx) => {
173                    let _ = tx.send(self.remove_watch(path, false));
174                }
175                EventLoopMsg::Shutdown => {
176                    let _ = self.remove_all_watches();
177                    if let Some(inotify) = self.inotify.take() {
178                        let _ = inotify.close();
179                    }
180                    self.running = false;
181                    break;
182                }
183                EventLoopMsg::Configure(config, tx) => {
184                    self.configure_raw_mode(config, tx);
185                }
186            }
187        }
188    }
189
190    fn configure_raw_mode(&mut self, _config: Config, tx: BoundSender<Result<bool>>) {
191        tx.send(Ok(false))
192            .expect("configuration channel disconnected");
193    }
194
195    fn handle_inotify(&mut self) {
196        let mut add_watches = Vec::new();
197        let mut remove_watches = Vec::new();
198
199        if let Some(ref mut inotify) = self.inotify {
200            let mut buffer = [0; 1024];
201            // Read all buffers available.
202            loop {
203                match inotify.read_events(&mut buffer) {
204                    Ok(events) => {
205                        let mut num_events = 0;
206                        for event in events {
207                            log::trace!("inotify event: {event:?}");
208
209                            num_events += 1;
210                            if event.mask.contains(EventMask::Q_OVERFLOW) {
211                                let ev = Ok(Event::new(EventKind::Other).set_flag(Flag::Rescan));
212                                self.event_handler.handle_event(ev);
213                            }
214
215                            let path = match event.name {
216                                Some(name) => self.paths.get(&event.wd).map(|root| root.join(name)),
217                                None => self.paths.get(&event.wd).cloned(),
218                            };
219
220                            let mut evs = Vec::new();
221
222                            if event.mask.contains(EventMask::MOVED_FROM) {
223                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
224
225                                let event = Event::new(EventKind::Modify(ModifyKind::Name(
226                                    RenameMode::From,
227                                )))
228                                .add_some_path(path.clone())
229                                .set_tracker(event.cookie as usize);
230
231                                self.rename_event = Some(event.clone());
232
233                                evs.push(event);
234                            } else if event.mask.contains(EventMask::MOVED_TO) {
235                                evs.push(
236                                    Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::To)))
237                                        .set_tracker(event.cookie as usize)
238                                        .add_some_path(path.clone()),
239                                );
240
241                                let trackers_match = self
242                                    .rename_event
243                                    .as_ref()
244                                    .and_then(|e| e.tracker())
245                                    .map_or(false, |from_tracker| {
246                                        from_tracker == event.cookie as usize
247                                    });
248
249                                if trackers_match {
250                                    let rename_event = self.rename_event.take().unwrap(); // unwrap is safe because `rename_event` must be set at this point
251                                    evs.push(
252                                        Event::new(EventKind::Modify(ModifyKind::Name(
253                                            RenameMode::Both,
254                                        )))
255                                        .set_tracker(event.cookie as usize)
256                                        .add_some_path(rename_event.paths.first().cloned())
257                                        .add_some_path(path.clone()),
258                                    );
259                                }
260                                add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
261                            }
262                            if event.mask.contains(EventMask::MOVE_SELF) {
263                                evs.push(
264                                    Event::new(EventKind::Modify(ModifyKind::Name(
265                                        RenameMode::From,
266                                    )))
267                                    .add_some_path(path.clone()),
268                                );
269                                // TODO stat the path and get to new path
270                                // - emit To and Both events
271                                // - change prefix for further events
272                            }
273                            if event.mask.contains(EventMask::CREATE) {
274                                evs.push(
275                                    Event::new(EventKind::Create(
276                                        if event.mask.contains(EventMask::ISDIR) {
277                                            CreateKind::Folder
278                                        } else {
279                                            CreateKind::File
280                                        },
281                                    ))
282                                    .add_some_path(path.clone()),
283                                );
284                                add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
285                            }
286                            if event.mask.contains(EventMask::DELETE) {
287                                evs.push(
288                                    Event::new(EventKind::Remove(
289                                        if event.mask.contains(EventMask::ISDIR) {
290                                            RemoveKind::Folder
291                                        } else {
292                                            RemoveKind::File
293                                        },
294                                    ))
295                                    .add_some_path(path.clone()),
296                                );
297                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
298                            }
299                            if event.mask.contains(EventMask::DELETE_SELF) {
300                                let remove_kind = match &path {
301                                    Some(watched_path) => {
302                                        let current_watch = self.watches.get(watched_path);
303                                        match current_watch {
304                                            Some(&(_, _, _, true)) => RemoveKind::Folder,
305                                            Some(&(_, _, _, false)) => RemoveKind::File,
306                                            None => RemoveKind::Other,
307                                        }
308                                    }
309                                    None => {
310                                        log::trace!(
311                                            "No patch for DELETE_SELF event, may be a bug?"
312                                        );
313                                        RemoveKind::Other
314                                    }
315                                };
316                                evs.push(
317                                    Event::new(EventKind::Remove(remove_kind))
318                                        .add_some_path(path.clone()),
319                                );
320                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
321                            }
322                            if event.mask.contains(EventMask::MODIFY) {
323                                evs.push(
324                                    Event::new(EventKind::Modify(ModifyKind::Data(
325                                        DataChange::Any,
326                                    )))
327                                    .add_some_path(path.clone()),
328                                );
329                            }
330                            if event.mask.contains(EventMask::CLOSE_WRITE) {
331                                evs.push(
332                                    Event::new(EventKind::Access(AccessKind::Close(
333                                        AccessMode::Write,
334                                    )))
335                                    .add_some_path(path.clone()),
336                                );
337                            }
338                            if event.mask.contains(EventMask::CLOSE_NOWRITE) {
339                                evs.push(
340                                    Event::new(EventKind::Access(AccessKind::Close(
341                                        AccessMode::Read,
342                                    )))
343                                    .add_some_path(path.clone()),
344                                );
345                            }
346                            if event.mask.contains(EventMask::ATTRIB) {
347                                evs.push(
348                                    Event::new(EventKind::Modify(ModifyKind::Metadata(
349                                        MetadataKind::Any,
350                                    )))
351                                    .add_some_path(path.clone()),
352                                );
353                            }
354                            if event.mask.contains(EventMask::OPEN) {
355                                evs.push(
356                                    Event::new(EventKind::Access(AccessKind::Open(
357                                        AccessMode::Any,
358                                    )))
359                                    .add_some_path(path.clone()),
360                                );
361                            }
362
363                            for ev in evs {
364                                self.event_handler.handle_event(Ok(ev));
365                            }
366                        }
367
368                        // All events read. Break out.
369                        if num_events == 0 {
370                            break;
371                        }
372                    }
373                    Err(e) if e.kind() == std::io::ErrorKind::WouldBlock => {
374                        // No events read. Break out.
375                        break;
376                    }
377                    Err(e) => {
378                        self.event_handler.handle_event(Err(Error::io(e)));
379                    }
380                }
381            }
382        }
383
384        for path in remove_watches {
385            self.remove_watch(path, true).ok();
386        }
387
388        for path in add_watches {
389            self.add_watch(path, true, false).ok();
390        }
391    }
392
393    fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
394        // If the watch is not recursive, or if we determine (by stat'ing the path to get its
395        // metadata) that the watched path is not a directory, add a single path watch.
396        if !is_recursive || !metadata(&path).map_err(Error::io_watch)?.is_dir() {
397            return self.add_single_watch(path, false, true);
398        }
399
400        for entry in WalkDir::new(path)
401            .follow_links(true)
402            .into_iter()
403            .filter_map(filter_dir)
404        {
405            self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self)?;
406            watch_self = false;
407        }
408
409        Ok(())
410    }
411
412    fn add_single_watch(
413        &mut self,
414        path: PathBuf,
415        is_recursive: bool,
416        watch_self: bool,
417    ) -> Result<()> {
418        let mut watchmask = WatchMask::ATTRIB
419            | WatchMask::CREATE
420            | WatchMask::DELETE
421            | WatchMask::CLOSE_WRITE
422            | WatchMask::MODIFY
423            | WatchMask::MOVED_FROM
424            | WatchMask::MOVED_TO;
425
426        if watch_self {
427            watchmask.insert(WatchMask::DELETE_SELF);
428            watchmask.insert(WatchMask::MOVE_SELF);
429        }
430
431        if let Some(&(_, old_watchmask, _, _)) = self.watches.get(&path) {
432            watchmask.insert(old_watchmask);
433            watchmask.insert(WatchMask::MASK_ADD);
434        }
435
436        if let Some(ref mut inotify) = self.inotify {
437            log::trace!("adding inotify watch: {}", path.display());
438
439            match inotify.watches().add(&path, watchmask) {
440                Err(e) => {
441                    Err(if e.raw_os_error() == Some(libc::ENOSPC) {
442                        // do not report inotify limits as "no more space" on linux #266
443                        Error::new(ErrorKind::MaxFilesWatch)
444                    } else {
445                        Error::io(e)
446                    }
447                    .add_path(path))
448                }
449                Ok(w) => {
450                    watchmask.remove(WatchMask::MASK_ADD);
451                    let is_dir = metadata(&path).map_err(Error::io)?.is_dir();
452                    self.watches
453                        .insert(path.clone(), (w.clone(), watchmask, is_recursive, is_dir));
454                    self.paths.insert(w, path);
455                    Ok(())
456                }
457            }
458        } else {
459            Ok(())
460        }
461    }
462
463    fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
464        match self.watches.remove(&path) {
465            None => return Err(Error::watch_not_found().add_path(path)),
466            Some((w, _, is_recursive, _)) => {
467                if let Some(ref mut inotify) = self.inotify {
468                    let mut inotify_watches = inotify.watches();
469                    log::trace!("removing inotify watch: {}", path.display());
470
471                    inotify_watches
472                        .remove(w.clone())
473                        .map_err(|e| Error::io(e).add_path(path.clone()))?;
474                    self.paths.remove(&w);
475
476                    if is_recursive || remove_recursive {
477                        let mut remove_list = Vec::new();
478                        for (w, p) in &self.paths {
479                            if p.starts_with(&path) {
480                                inotify_watches
481                                    .remove(w.clone())
482                                    .map_err(|e| Error::io(e).add_path(p.into()))?;
483                                self.watches.remove(p);
484                                remove_list.push(w.clone());
485                            }
486                        }
487                        for w in remove_list {
488                            self.paths.remove(&w);
489                        }
490                    }
491                }
492            }
493        }
494        Ok(())
495    }
496
497    fn remove_all_watches(&mut self) -> Result<()> {
498        if let Some(ref mut inotify) = self.inotify {
499            let mut inotify_watches = inotify.watches();
500            for (w, p) in &self.paths {
501                inotify_watches
502                    .remove(w.clone())
503                    .map_err(|e| Error::io(e).add_path(p.into()))?;
504            }
505            self.watches.clear();
506            self.paths.clear();
507        }
508        Ok(())
509    }
510}
511
512/// return `DirEntry` when it is a directory
513fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
514    if let Ok(e) = e {
515        if let Ok(metadata) = e.metadata() {
516            if metadata.is_dir() {
517                return Some(e);
518            }
519        }
520    }
521    None
522}
523
524impl INotifyWatcher {
525    fn from_event_handler(event_handler: Box<dyn EventHandler>) -> Result<Self> {
526        let inotify = Inotify::init()?;
527        let event_loop = EventLoop::new(inotify, event_handler)?;
528        let channel = event_loop.event_loop_tx.clone();
529        let waker = event_loop.event_loop_waker.clone();
530        event_loop.run();
531        Ok(INotifyWatcher { channel, waker })
532    }
533
534    fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
535        let pb = if path.is_absolute() {
536            path.to_owned()
537        } else {
538            let p = env::current_dir().map_err(Error::io)?;
539            p.join(path)
540        };
541        let (tx, rx) = unbounded();
542        let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
543
544        // we expect the event loop to live and reply => unwraps must not panic
545        self.channel.send(msg).unwrap();
546        self.waker.wake().unwrap();
547        rx.recv().unwrap()
548    }
549
550    fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
551        let pb = if path.is_absolute() {
552            path.to_owned()
553        } else {
554            let p = env::current_dir().map_err(Error::io)?;
555            p.join(path)
556        };
557        let (tx, rx) = unbounded();
558        let msg = EventLoopMsg::RemoveWatch(pb, tx);
559
560        // we expect the event loop to live and reply => unwraps must not panic
561        self.channel.send(msg).unwrap();
562        self.waker.wake().unwrap();
563        rx.recv().unwrap()
564    }
565}
566
567impl Watcher for INotifyWatcher {
568    /// Create a new watcher.
569    fn new<F: EventHandler>(event_handler: F, _config: Config) -> Result<Self> {
570        Self::from_event_handler(Box::new(event_handler))
571    }
572
573    fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
574        self.watch_inner(path, recursive_mode)
575    }
576
577    fn unwatch(&mut self, path: &Path) -> Result<()> {
578        self.unwatch_inner(path)
579    }
580
581    fn configure(&mut self, config: Config) -> Result<bool> {
582        let (tx, rx) = bounded(1);
583        self.channel.send(EventLoopMsg::Configure(config, tx))?;
584        self.waker.wake()?;
585        rx.recv()?
586    }
587
588    fn kind() -> crate::WatcherKind {
589        crate::WatcherKind::Inotify
590    }
591}
592
593impl Drop for INotifyWatcher {
594    fn drop(&mut self) {
595        // we expect the event loop to live => unwrap must not panic
596        self.channel.send(EventLoopMsg::Shutdown).unwrap();
597        self.waker.wake().unwrap();
598    }
599}
600
601#[test]
602fn inotify_watcher_is_send_and_sync() {
603    fn check<T: Send + Sync>() {}
604    check::<INotifyWatcher>();
605}