1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#![allow(dead_code)]

use inotify::wrapper::{INotify, Watch, Event};
use futures::{Async, Poll};
use futures::stream::Stream;
use mio::deprecated::unix::Io;
use tokio_core::reactor::{Handle, PollEvented};

use std::io;
use std::os::unix::io::FromRawFd;
use std::path::Path;

/// Wraps an INotify object and provides asynchronous methods based on the inner object.
pub struct AsyncINotify {
    inner: INotify,
    io: PollEvented<Io>,

    cached_events: Vec<Event>,
}

impl AsyncINotify {
    /// Create a new inotify stream on the loop behind `handle`.
    pub fn init(handle: &Handle) -> io::Result<AsyncINotify> {
        AsyncINotify::init_with_flags(handle, 0)
    }

    /// Create a new inotify stream with the given inotify flags (`IN_NONBLOCK` or `IN_CLOEXEC`).
    pub fn init_with_flags(handle: &Handle, flags: isize) -> io::Result<AsyncINotify> {
        let inotify = try!(INotify::init_with_flags(flags));
        let evfd = unsafe { Io::from_raw_fd(inotify.fd) };

        let pollev = try!(PollEvented::new(evfd, handle));

        Ok(AsyncINotify {
            inner: inotify,
            io: pollev,
            cached_events: Vec::new(),
        })
    }

    /// Monitor `path` for the events in `mask`. For a list of events, see
    /// https://docs.rs/tokio-inotify/0.2.1/tokio_inotify/struct.AsyncINotify.html (items prefixed with
    /// "Event")
    pub fn add_watch(&self, path: &Path, mask: u32) -> io::Result<Watch> {
        self.inner.add_watch(path, mask)
    }

    /// Remove an element currently watched.
    pub fn rm_watch(&self, watch: Watch) -> io::Result<()> {
        self.inner.rm_watch(watch)
    }

    /// Close the underlying file descriptor and remove it from the event loop.
    pub fn close(self) -> io::Result<()> {
        // FD is removed from loop by PollEvented::drop()
        self.inner.close()
    }
}

impl Stream for AsyncINotify {
    type Item = Event;
    type Error = io::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        // BUG-ish: This returns cached events in a reversed order. Usually, that shouldn't be a
        // problem though.
        if self.cached_events.len() > 0 {
            if self.cached_events.len() == 1 {
                self.io.need_read()
            }
            return Ok(Async::Ready(self.cached_events.pop()));
        }

        match self.io.poll_read() {
            Async::NotReady => {
                return Ok(Async::NotReady);
            }
            Async::Ready(_) => (), // proceed
        }

        // the inner fd is non-blocking by default (set in the inotify crate)
        let events = try!(self.inner.available_events());

        // Only do vec operations if there are many events
        if events.len() < 1 {
            // If EWOULDBLOCK is returned, inotify returns an empty slice. Signal that we want
            // more.
            self.io.need_read();
            Ok(Async::NotReady)
        } else if events.len() == 1 {
            self.io.need_read();
            Ok(Async::Ready(Some(events[0].clone())))
        } else {
            // events.len() > 1
            self.cached_events.extend_from_slice(&events[1..]);
            Ok(Async::Ready(Some(events[0].clone())))
        }
    }
}