notify_forked/
inotify.rs

1//! Watcher implementation for the inotify Linux API
2//!
3//! The inotify API provides a mechanism for monitoring filesystem events.  Inotify can be used to
4//! monitor individual files, or to monitor directories.  When a directory is monitored, inotify
5//! will return events for the directory itself, and for files inside the directory.
6
7extern crate inotify as inotify_sys;
8extern crate libc;
9extern crate walkdir;
10
11use self::inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use self::walkdir::WalkDir;
13use super::debounce::{Debounce, EventTx};
14use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher};
15use mio;
16use mio_extras;
17use std::collections::HashMap;
18use std::env;
19use std::ffi::OsStr;
20use std::fs::metadata;
21use std::mem;
22use std::os::unix::io::AsRawFd;
23use std::path::{Path, PathBuf};
24use std::sync::mpsc::{self, Sender};
25use std::sync::Mutex;
26use std::thread;
27use std::time::Duration;
28
29const INOTIFY: mio::Token = mio::Token(0);
30const MESSAGE: mio::Token = mio::Token(1);
31
32// The EventLoop will set up a mio::Poll and use it to wait for the following:
33//
34// -  messages telling it what to do
35//
36// -  events telling it that something has happened on one of the watched files.
37struct EventLoop {
38    running: bool,
39    poll: mio::Poll,
40    event_loop_tx: mio_extras::channel::Sender<EventLoopMsg>,
41    event_loop_rx: mio_extras::channel::Receiver<EventLoopMsg>,
42    inotify: Option<Inotify>,
43    event_tx: EventTx,
44    watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
45    paths: HashMap<WatchDescriptor, PathBuf>,
46    rename_event: Option<RawEvent>,
47}
48
49/// Watcher implementation based on inotify
50pub struct INotifyWatcher(Mutex<mio_extras::channel::Sender<EventLoopMsg>>);
51
52enum EventLoopMsg {
53    AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
54    RemoveWatch(PathBuf, Sender<Result<()>>),
55    Shutdown,
56    RenameTimeout(u32),
57}
58
59#[inline]
60fn send_pending_rename_event(rename_event: &mut Option<RawEvent>, event_tx: &mut EventTx) {
61    let event = mem::replace(rename_event, None);
62    if let Some(e) = event {
63        event_tx.send(RawEvent {
64            path: e.path,
65            op: Ok(op::Op::REMOVE),
66            cookie: None,
67        });
68    }
69}
70
71#[inline]
72fn add_watch_by_event(
73    path: &Option<PathBuf>,
74    event: &inotify_sys::Event<&OsStr>,
75    watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
76    add_watches: &mut Vec<PathBuf>,
77) {
78    if let Some(ref path) = *path {
79        if event.mask.contains(EventMask::ISDIR) {
80            if let Some(parent_path) = path.parent() {
81                if let Some(&(_, _, is_recursive)) = watches.get(parent_path) {
82                    if is_recursive {
83                        add_watches.push(path.to_owned());
84                    }
85                }
86            }
87        }
88    }
89}
90
91#[inline]
92fn remove_watch_by_event(
93    path: &Option<PathBuf>,
94    watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
95    remove_watches: &mut Vec<PathBuf>,
96) {
97    if let Some(ref path) = *path {
98        if watches.contains_key(path) {
99            remove_watches.push(path.to_owned());
100        }
101    }
102}
103
104impl EventLoop {
105    pub fn new(inotify: Inotify, event_tx: EventTx) -> Result<EventLoop> {
106        let (event_loop_tx, event_loop_rx) = mio_extras::channel::channel::<EventLoopMsg>();
107        let poll = mio::Poll::new()?;
108        poll.register(
109            &event_loop_rx,
110            MESSAGE,
111            mio::Ready::readable(),
112            mio::PollOpt::edge(),
113        )?;
114
115        let inotify_fd = inotify.as_raw_fd();
116        let evented_inotify = mio::unix::EventedFd(&inotify_fd);
117        poll.register(
118            &evented_inotify,
119            INOTIFY,
120            mio::Ready::readable(),
121            mio::PollOpt::edge(),
122        )?;
123
124        let event_loop = EventLoop {
125            running: true,
126            poll,
127            event_loop_tx,
128            event_loop_rx,
129            inotify: Some(inotify),
130            event_tx,
131            watches: HashMap::new(),
132            paths: HashMap::new(),
133            rename_event: None,
134        };
135        Ok(event_loop)
136    }
137
138    fn channel(&self) -> mio_extras::channel::Sender<EventLoopMsg> {
139        self.event_loop_tx.clone()
140    }
141
142    // Run the event loop.
143    pub fn run(self) {
144        thread::spawn(|| self.event_loop_thread());
145    }
146
147    fn event_loop_thread(mut self) {
148        let mut events = mio::Events::with_capacity(16);
149        loop {
150            // Wait for something to happen.
151            self.poll.poll(&mut events, None).expect("poll failed");
152
153            // Process whatever happened.
154            for event in &events {
155                self.handle_event(&event);
156            }
157
158            // Stop, if we're done.
159            if !self.running {
160                break;
161            }
162        }
163    }
164
165    // Handle a single event.
166    fn handle_event(&mut self, event: &mio::Event) {
167        match event.token() {
168            MESSAGE => {
169                // The channel is readable - handle messages.
170                self.handle_messages()
171            }
172            INOTIFY => {
173                // inotify has something to tell us.
174                self.handle_inotify()
175            }
176            _ => unreachable!(),
177        }
178    }
179
180    fn handle_messages(&mut self) {
181        while let Ok(msg) = self.event_loop_rx.try_recv() {
182            match msg {
183                EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
184                    let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
185                }
186                EventLoopMsg::RemoveWatch(path, tx) => {
187                    let _ = tx.send(self.remove_watch(path, false));
188                }
189                EventLoopMsg::Shutdown => {
190                    let _ = self.remove_all_watches();
191                    if let Some(inotify) = self.inotify.take() {
192                        let _ = inotify.close();
193                    }
194                    self.running = false;
195                    break;
196                }
197                EventLoopMsg::RenameTimeout(cookie) => {
198                    let current_cookie = self.rename_event.as_ref().and_then(|e| e.cookie);
199                    // send pending rename event only if the rename event for which the timer has been created hasn't been handled already; otherwise ignore this timeout
200                    if current_cookie == Some(cookie) {
201                        send_pending_rename_event(&mut self.rename_event, &mut self.event_tx);
202                    }
203                }
204            }
205        }
206    }
207    fn handle_inotify(&mut self) {
208        let mut add_watches = Vec::new();
209        let mut remove_watches = Vec::new();
210
211        if let Some(ref mut inotify) = self.inotify {
212            let mut buffer = [0; 1024];
213            match inotify.read_events(&mut buffer) {
214                Ok(events) => {
215                    for event in events {
216                        if event.mask.contains(EventMask::Q_OVERFLOW) {
217                            self.event_tx.send(RawEvent {
218                                path: None,
219                                op: Ok(op::Op::RESCAN),
220                                cookie: None,
221                            });
222                        }
223
224                        let path = match event.name {
225                            Some(name) => self.paths.get(&event.wd).map(|root| root.join(&name)),
226                            None => self.paths.get(&event.wd).cloned(),
227                        };
228
229                        if event.mask.contains(EventMask::MOVED_FROM) {
230                            send_pending_rename_event(&mut self.rename_event, &mut self.event_tx);
231                            remove_watch_by_event(&path, &self.watches, &mut remove_watches);
232                            self.rename_event = Some(RawEvent {
233                                path: path,
234                                op: Ok(op::Op::RENAME),
235                                cookie: Some(event.cookie),
236                            });
237                        } else {
238                            let mut o = Op::empty();
239                            let mut c = None;
240                            if event.mask.contains(EventMask::MOVED_TO) {
241                                let rename_event = mem::replace(&mut self.rename_event, None);
242                                if let Some(e) = rename_event {
243                                    if e.cookie == Some(event.cookie) {
244                                        self.event_tx.send(e);
245                                        o.insert(op::Op::RENAME);
246                                        c = Some(event.cookie);
247                                    } else {
248                                        o.insert(op::Op::CREATE);
249                                    }
250                                } else {
251                                    o.insert(op::Op::CREATE);
252                                }
253                                add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
254                            }
255                            if event.mask.contains(EventMask::MOVE_SELF) {
256                                o.insert(op::Op::RENAME);
257                            }
258                            if event.mask.contains(EventMask::CREATE) {
259                                o.insert(op::Op::CREATE);
260                                add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
261                            }
262                            if event.mask.contains(EventMask::DELETE_SELF)
263                                || event.mask.contains(EventMask::DELETE)
264                            {
265                                o.insert(op::Op::REMOVE);
266                                remove_watch_by_event(&path, &self.watches, &mut remove_watches);
267                            }
268                            if event.mask.contains(EventMask::MODIFY) {
269                                o.insert(op::Op::WRITE);
270                            }
271                            if event.mask.contains(EventMask::CLOSE_WRITE) {
272                                o.insert(op::Op::CLOSE_WRITE);
273                            }
274                            if event.mask.contains(EventMask::ATTRIB) {
275                                o.insert(op::Op::CHMOD);
276                            }
277
278                            if !o.is_empty() {
279                                send_pending_rename_event(
280                                    &mut self.rename_event,
281                                    &mut self.event_tx,
282                                );
283
284                                self.event_tx.send(RawEvent {
285                                    path: path,
286                                    op: Ok(o),
287                                    cookie: c,
288                                });
289                            }
290                        }
291                    }
292
293                    // When receiving only the first part of a move event (IN_MOVED_FROM) it is unclear
294                    // whether the second part (IN_MOVED_TO) will arrive because the file or directory
295                    // could just have been moved out of the watched directory. So it's necessary to wait
296                    // for possible subsequent events in case it's a complete move event but also to make sure
297                    // that the first part of the event is handled in a timely manner in case no subsequent events arrive.
298                    if let Some(ref rename_event) = self.rename_event {
299                        let event_loop_tx = self.event_loop_tx.clone();
300                        let cookie = rename_event.cookie.unwrap(); // unwrap is safe because rename_event is always set with some cookie
301                        thread::spawn(move || {
302                            thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event
303                            event_loop_tx
304                                .send(EventLoopMsg::RenameTimeout(cookie))
305                                .unwrap();
306                        });
307                    }
308                }
309                Err(e) => {
310                    self.event_tx.send(RawEvent {
311                        path: None,
312                        op: Err(Error::Io(e)),
313                        cookie: None,
314                    });
315                }
316            }
317        }
318
319        for path in remove_watches {
320            let _ = self.remove_watch(path, true);
321        }
322
323        for path in add_watches {
324            let _ = self.add_watch(path, true, false);
325        }
326    }
327
328    fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
329        let metadata = try!(metadata(&path).map_err(Error::Io));
330
331        if !metadata.is_dir() || !is_recursive {
332            return self.add_single_watch(path, false, true);
333        }
334
335        for entry in WalkDir::new(path)
336            .follow_links(true)
337            .into_iter()
338            .filter_map(filter_dir)
339        {
340            try!(self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self));
341            watch_self = false;
342        }
343
344        Ok(())
345    }
346
347    fn add_single_watch(
348        &mut self,
349        path: PathBuf,
350        is_recursive: bool,
351        watch_self: bool,
352    ) -> Result<()> {
353        let mut watchmask = WatchMask::ATTRIB
354            | WatchMask::CREATE
355            | WatchMask::DELETE
356            | WatchMask::CLOSE_WRITE
357            | WatchMask::MODIFY
358            | WatchMask::MOVED_FROM
359            | WatchMask::MOVED_TO;
360
361        if watch_self {
362            watchmask.insert(WatchMask::DELETE_SELF);
363            watchmask.insert(WatchMask::MOVE_SELF);
364        }
365
366        if let Some(&(_, old_watchmask, _)) = self.watches.get(&path) {
367            watchmask.insert(old_watchmask);
368            watchmask.insert(WatchMask::MASK_ADD);
369        }
370
371        if let Some(ref mut inotify) = self.inotify {
372            match inotify.add_watch(&path, watchmask) {
373                Err(e) => Err(Error::Io(e)),
374                Ok(w) => {
375                    watchmask.remove(WatchMask::MASK_ADD);
376                    self.watches
377                        .insert(path.clone(), (w.clone(), watchmask, is_recursive));
378                    self.paths.insert(w, path);
379                    Ok(())
380                }
381            }
382        } else {
383            Ok(())
384        }
385    }
386
387    fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
388        match self.watches.remove(&path) {
389            None => return Err(Error::WatchNotFound),
390            Some((w, _, is_recursive)) => {
391                if let Some(ref mut inotify) = self.inotify {
392                    try!(inotify.rm_watch(w.clone()).map_err(Error::Io));
393                    self.paths.remove(&w);
394
395                    if is_recursive || remove_recursive {
396                        let mut remove_list = Vec::new();
397                        for (w, p) in &self.paths {
398                            if p.starts_with(&path) {
399                                try!(inotify.rm_watch(w.clone()).map_err(Error::Io));
400                                self.watches.remove(p);
401                                remove_list.push(w.clone());
402                            }
403                        }
404                        for w in remove_list {
405                            self.paths.remove(&w);
406                        }
407                    }
408                }
409            }
410        }
411        Ok(())
412    }
413
414    fn remove_all_watches(&mut self) -> Result<()> {
415        if let Some(ref mut inotify) = self.inotify {
416            for w in self.paths.keys() {
417                try!(inotify.rm_watch(w.clone()).map_err(Error::Io));
418            }
419            self.watches.clear();
420            self.paths.clear();
421        }
422        Ok(())
423    }
424}
425
426/// return `DirEntry` when it is a directory
427fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
428    if let Ok(e) = e {
429        if let Ok(metadata) = e.metadata() {
430            if metadata.is_dir() {
431                return Some(e);
432            }
433        }
434    }
435    None
436}
437
438impl Watcher for INotifyWatcher {
439    fn new_raw(tx: Sender<RawEvent>) -> Result<INotifyWatcher> {
440        let inotify = Inotify::init()?;
441        let event_tx = EventTx::Raw { tx };
442        let event_loop = EventLoop::new(inotify, event_tx)?;
443        let channel = event_loop.channel();
444        event_loop.run();
445        Ok(INotifyWatcher(Mutex::new(channel)))
446    }
447
448    fn new(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<INotifyWatcher> {
449        let inotify = Inotify::init()?;
450        let event_tx = EventTx::Debounced {
451            tx: tx.clone(),
452            debounce: Debounce::new(delay, tx),
453        };
454        let event_loop = EventLoop::new(inotify, event_tx)?;
455        let channel = event_loop.channel();
456        event_loop.run();
457        Ok(INotifyWatcher(Mutex::new(channel)))
458    }
459
460    fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> {
461        let pb = if path.as_ref().is_absolute() {
462            path.as_ref().to_owned()
463        } else {
464            let p = try!(env::current_dir().map_err(Error::Io));
465            p.join(path)
466        };
467        let (tx, rx) = mpsc::channel();
468        let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
469
470        // we expect the event loop to live and reply => unwraps must not panic
471        self.0.lock().unwrap().send(msg).unwrap();
472        rx.recv().unwrap()
473    }
474
475    fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
476        let pb = if path.as_ref().is_absolute() {
477            path.as_ref().to_owned()
478        } else {
479            let p = try!(env::current_dir().map_err(Error::Io));
480            p.join(path)
481        };
482        let (tx, rx) = mpsc::channel();
483        let msg = EventLoopMsg::RemoveWatch(pb, tx);
484
485        // we expect the event loop to live and reply => unwraps must not panic
486        self.0.lock().unwrap().send(msg).unwrap();
487        rx.recv().unwrap()
488    }
489}
490
491impl Drop for INotifyWatcher {
492    fn drop(&mut self) {
493        // we expect the event loop to live => unwrap must not panic
494        self.0.lock().unwrap().send(EventLoopMsg::Shutdown).unwrap();
495    }
496}