1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
#![allow(dead_code)]
use inotify::wrapper::{INotify, Watch, Event};
use futures::{Async, Poll};
use futures::stream::Stream;
use mio::deprecated::unix::Io;
use tokio_core::reactor::{Handle, PollEvented};
use std::io;
use std::os::unix::io::FromRawFd;
use std::path::Path;
pub struct AsyncINotify {
inner: INotify,
io: PollEvented<Io>,
cached_events: Vec<Event>,
}
impl AsyncINotify {
pub fn init(handle: &Handle) -> io::Result<AsyncINotify> {
AsyncINotify::init_with_flags(handle, 0)
}
pub fn init_with_flags(handle: &Handle, flags: isize) -> io::Result<AsyncINotify> {
let inotify = try!(INotify::init_with_flags(flags));
let evfd = unsafe { Io::from_raw_fd(inotify.fd) };
let pollev = try!(PollEvented::new(evfd, handle));
Ok(AsyncINotify {
inner: inotify,
io: pollev,
cached_events: Vec::new(),
})
}
pub fn add_watch(&self, path: &Path, mask: u32) -> io::Result<Watch> {
self.inner.add_watch(path, mask)
}
pub fn rm_watch(&self, watch: Watch) -> io::Result<()> {
self.inner.rm_watch(watch)
}
pub fn close(self) -> io::Result<()> {
self.inner.close()
}
}
impl Stream for AsyncINotify {
type Item = Event;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
if self.cached_events.len() > 0 {
if self.cached_events.len() == 1 {
self.io.need_read()
}
return Ok(Async::Ready(self.cached_events.pop()));
}
match self.io.poll_read() {
Async::NotReady => {
return Ok(Async::NotReady);
}
Async::Ready(_) => (),
}
let events = try!(self.inner.available_events());
if events.len() < 1 {
self.io.need_read();
Ok(Async::NotReady)
} else if events.len() == 1 {
self.io.need_read();
Ok(Async::Ready(Some(events[0].clone())))
} else {
self.cached_events.extend_from_slice(&events[1..]);
Ok(Async::Ready(Some(events[0].clone())))
}
}
}