1extern crate inotify as inotify_sys;
8extern crate libc;
9extern crate walkdir;
10
11use self::inotify_sys::{EventMask, Inotify, WatchDescriptor, WatchMask};
12use self::walkdir::WalkDir;
13use super::debounce::{Debounce, EventTx};
14use super::{op, DebouncedEvent, Error, Op, RawEvent, RecursiveMode, Result, Watcher};
15use mio;
16use mio_extras;
17use std::collections::HashMap;
18use std::env;
19use std::ffi::OsStr;
20use std::fs::metadata;
21use std::mem;
22use std::os::unix::io::AsRawFd;
23use std::path::{Path, PathBuf};
24use std::sync::mpsc::{self, Sender};
25use std::sync::Mutex;
26use std::thread;
27use std::time::Duration;
28
29const INOTIFY: mio::Token = mio::Token(0);
30const MESSAGE: mio::Token = mio::Token(1);
31
32struct EventLoop {
38 running: bool,
39 poll: mio::Poll,
40 event_loop_tx: mio_extras::channel::Sender<EventLoopMsg>,
41 event_loop_rx: mio_extras::channel::Receiver<EventLoopMsg>,
42 inotify: Option<Inotify>,
43 event_tx: EventTx,
44 watches: HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
45 paths: HashMap<WatchDescriptor, PathBuf>,
46 rename_event: Option<RawEvent>,
47}
48
49pub struct INotifyWatcher(Mutex<mio_extras::channel::Sender<EventLoopMsg>>);
51
52enum EventLoopMsg {
53 AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
54 RemoveWatch(PathBuf, Sender<Result<()>>),
55 Shutdown,
56 RenameTimeout(u32),
57}
58
59#[inline]
60fn send_pending_rename_event(rename_event: &mut Option<RawEvent>, event_tx: &mut EventTx) {
61 let event = mem::replace(rename_event, None);
62 if let Some(e) = event {
63 event_tx.send(RawEvent {
64 path: e.path,
65 op: Ok(op::Op::REMOVE),
66 cookie: None,
67 });
68 }
69}
70
71#[inline]
72fn add_watch_by_event(
73 path: &Option<PathBuf>,
74 event: &inotify_sys::Event<&OsStr>,
75 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
76 add_watches: &mut Vec<PathBuf>,
77) {
78 if let Some(ref path) = *path {
79 if event.mask.contains(EventMask::ISDIR) {
80 if let Some(parent_path) = path.parent() {
81 if let Some(&(_, _, is_recursive)) = watches.get(parent_path) {
82 if is_recursive {
83 add_watches.push(path.to_owned());
84 }
85 }
86 }
87 }
88 }
89}
90
91#[inline]
92fn remove_watch_by_event(
93 path: &Option<PathBuf>,
94 watches: &HashMap<PathBuf, (WatchDescriptor, WatchMask, bool)>,
95 remove_watches: &mut Vec<PathBuf>,
96) {
97 if let Some(ref path) = *path {
98 if watches.contains_key(path) {
99 remove_watches.push(path.to_owned());
100 }
101 }
102}
103
104impl EventLoop {
105 pub fn new(inotify: Inotify, event_tx: EventTx) -> Result<EventLoop> {
106 let (event_loop_tx, event_loop_rx) = mio_extras::channel::channel::<EventLoopMsg>();
107 let poll = mio::Poll::new()?;
108 poll.register(
109 &event_loop_rx,
110 MESSAGE,
111 mio::Ready::readable(),
112 mio::PollOpt::edge(),
113 )?;
114
115 let inotify_fd = inotify.as_raw_fd();
116 let evented_inotify = mio::unix::EventedFd(&inotify_fd);
117 poll.register(
118 &evented_inotify,
119 INOTIFY,
120 mio::Ready::readable(),
121 mio::PollOpt::edge(),
122 )?;
123
124 let event_loop = EventLoop {
125 running: true,
126 poll,
127 event_loop_tx,
128 event_loop_rx,
129 inotify: Some(inotify),
130 event_tx,
131 watches: HashMap::new(),
132 paths: HashMap::new(),
133 rename_event: None,
134 };
135 Ok(event_loop)
136 }
137
138 fn channel(&self) -> mio_extras::channel::Sender<EventLoopMsg> {
139 self.event_loop_tx.clone()
140 }
141
142 pub fn run(self) {
144 thread::spawn(|| self.event_loop_thread());
145 }
146
147 fn event_loop_thread(mut self) {
148 let mut events = mio::Events::with_capacity(16);
149 loop {
150 self.poll.poll(&mut events, None).expect("poll failed");
152
153 for event in &events {
155 self.handle_event(&event);
156 }
157
158 if !self.running {
160 break;
161 }
162 }
163 }
164
165 fn handle_event(&mut self, event: &mio::Event) {
167 match event.token() {
168 MESSAGE => {
169 self.handle_messages()
171 }
172 INOTIFY => {
173 self.handle_inotify()
175 }
176 _ => unreachable!(),
177 }
178 }
179
180 fn handle_messages(&mut self) {
181 while let Ok(msg) = self.event_loop_rx.try_recv() {
182 match msg {
183 EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
184 let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive(), true));
185 }
186 EventLoopMsg::RemoveWatch(path, tx) => {
187 let _ = tx.send(self.remove_watch(path, false));
188 }
189 EventLoopMsg::Shutdown => {
190 let _ = self.remove_all_watches();
191 if let Some(inotify) = self.inotify.take() {
192 let _ = inotify.close();
193 }
194 self.running = false;
195 break;
196 }
197 EventLoopMsg::RenameTimeout(cookie) => {
198 let current_cookie = self.rename_event.as_ref().and_then(|e| e.cookie);
199 if current_cookie == Some(cookie) {
201 send_pending_rename_event(&mut self.rename_event, &mut self.event_tx);
202 }
203 }
204 }
205 }
206 }
207 fn handle_inotify(&mut self) {
208 let mut add_watches = Vec::new();
209 let mut remove_watches = Vec::new();
210
211 if let Some(ref mut inotify) = self.inotify {
212 let mut buffer = [0; 1024];
213 match inotify.read_events(&mut buffer) {
214 Ok(events) => {
215 for event in events {
216 if event.mask.contains(EventMask::Q_OVERFLOW) {
217 self.event_tx.send(RawEvent {
218 path: None,
219 op: Ok(op::Op::RESCAN),
220 cookie: None,
221 });
222 }
223
224 let path = match event.name {
225 Some(name) => self.paths.get(&event.wd).map(|root| root.join(&name)),
226 None => self.paths.get(&event.wd).cloned(),
227 };
228
229 if event.mask.contains(EventMask::MOVED_FROM) {
230 send_pending_rename_event(&mut self.rename_event, &mut self.event_tx);
231 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
232 self.rename_event = Some(RawEvent {
233 path: path,
234 op: Ok(op::Op::RENAME),
235 cookie: Some(event.cookie),
236 });
237 } else {
238 let mut o = Op::empty();
239 let mut c = None;
240 if event.mask.contains(EventMask::MOVED_TO) {
241 let rename_event = mem::replace(&mut self.rename_event, None);
242 if let Some(e) = rename_event {
243 if e.cookie == Some(event.cookie) {
244 self.event_tx.send(e);
245 o.insert(op::Op::RENAME);
246 c = Some(event.cookie);
247 } else {
248 o.insert(op::Op::CREATE);
249 }
250 } else {
251 o.insert(op::Op::CREATE);
252 }
253 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
254 }
255 if event.mask.contains(EventMask::MOVE_SELF) {
256 o.insert(op::Op::RENAME);
257 }
258 if event.mask.contains(EventMask::CREATE) {
259 o.insert(op::Op::CREATE);
260 add_watch_by_event(&path, &event, &self.watches, &mut add_watches);
261 }
262 if event.mask.contains(EventMask::DELETE_SELF)
263 || event.mask.contains(EventMask::DELETE)
264 {
265 o.insert(op::Op::REMOVE);
266 remove_watch_by_event(&path, &self.watches, &mut remove_watches);
267 }
268 if event.mask.contains(EventMask::MODIFY) {
269 o.insert(op::Op::WRITE);
270 }
271 if event.mask.contains(EventMask::CLOSE_WRITE) {
272 o.insert(op::Op::CLOSE_WRITE);
273 }
274 if event.mask.contains(EventMask::ATTRIB) {
275 o.insert(op::Op::CHMOD);
276 }
277
278 if !o.is_empty() {
279 send_pending_rename_event(
280 &mut self.rename_event,
281 &mut self.event_tx,
282 );
283
284 self.event_tx.send(RawEvent {
285 path: path,
286 op: Ok(o),
287 cookie: c,
288 });
289 }
290 }
291 }
292
293 if let Some(ref rename_event) = self.rename_event {
299 let event_loop_tx = self.event_loop_tx.clone();
300 let cookie = rename_event.cookie.unwrap(); thread::spawn(move || {
302 thread::sleep(Duration::from_millis(10)); event_loop_tx
304 .send(EventLoopMsg::RenameTimeout(cookie))
305 .unwrap();
306 });
307 }
308 }
309 Err(e) => {
310 self.event_tx.send(RawEvent {
311 path: None,
312 op: Err(Error::Io(e)),
313 cookie: None,
314 });
315 }
316 }
317 }
318
319 for path in remove_watches {
320 let _ = self.remove_watch(path, true);
321 }
322
323 for path in add_watches {
324 let _ = self.add_watch(path, true, false);
325 }
326 }
327
328 fn add_watch(&mut self, path: PathBuf, is_recursive: bool, mut watch_self: bool) -> Result<()> {
329 let metadata = try!(metadata(&path).map_err(Error::Io));
330
331 if !metadata.is_dir() || !is_recursive {
332 return self.add_single_watch(path, false, true);
333 }
334
335 for entry in WalkDir::new(path)
336 .follow_links(true)
337 .into_iter()
338 .filter_map(filter_dir)
339 {
340 try!(self.add_single_watch(entry.path().to_path_buf(), is_recursive, watch_self));
341 watch_self = false;
342 }
343
344 Ok(())
345 }
346
347 fn add_single_watch(
348 &mut self,
349 path: PathBuf,
350 is_recursive: bool,
351 watch_self: bool,
352 ) -> Result<()> {
353 let mut watchmask = WatchMask::ATTRIB
354 | WatchMask::CREATE
355 | WatchMask::DELETE
356 | WatchMask::CLOSE_WRITE
357 | WatchMask::MODIFY
358 | WatchMask::MOVED_FROM
359 | WatchMask::MOVED_TO;
360
361 if watch_self {
362 watchmask.insert(WatchMask::DELETE_SELF);
363 watchmask.insert(WatchMask::MOVE_SELF);
364 }
365
366 if let Some(&(_, old_watchmask, _)) = self.watches.get(&path) {
367 watchmask.insert(old_watchmask);
368 watchmask.insert(WatchMask::MASK_ADD);
369 }
370
371 if let Some(ref mut inotify) = self.inotify {
372 match inotify.add_watch(&path, watchmask) {
373 Err(e) => Err(Error::Io(e)),
374 Ok(w) => {
375 watchmask.remove(WatchMask::MASK_ADD);
376 self.watches
377 .insert(path.clone(), (w.clone(), watchmask, is_recursive));
378 self.paths.insert(w, path);
379 Ok(())
380 }
381 }
382 } else {
383 Ok(())
384 }
385 }
386
387 fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
388 match self.watches.remove(&path) {
389 None => return Err(Error::WatchNotFound),
390 Some((w, _, is_recursive)) => {
391 if let Some(ref mut inotify) = self.inotify {
392 try!(inotify.rm_watch(w.clone()).map_err(Error::Io));
393 self.paths.remove(&w);
394
395 if is_recursive || remove_recursive {
396 let mut remove_list = Vec::new();
397 for (w, p) in &self.paths {
398 if p.starts_with(&path) {
399 try!(inotify.rm_watch(w.clone()).map_err(Error::Io));
400 self.watches.remove(p);
401 remove_list.push(w.clone());
402 }
403 }
404 for w in remove_list {
405 self.paths.remove(&w);
406 }
407 }
408 }
409 }
410 }
411 Ok(())
412 }
413
414 fn remove_all_watches(&mut self) -> Result<()> {
415 if let Some(ref mut inotify) = self.inotify {
416 for w in self.paths.keys() {
417 try!(inotify.rm_watch(w.clone()).map_err(Error::Io));
418 }
419 self.watches.clear();
420 self.paths.clear();
421 }
422 Ok(())
423 }
424}
425
426fn filter_dir(e: walkdir::Result<walkdir::DirEntry>) -> Option<walkdir::DirEntry> {
428 if let Ok(e) = e {
429 if let Ok(metadata) = e.metadata() {
430 if metadata.is_dir() {
431 return Some(e);
432 }
433 }
434 }
435 None
436}
437
438impl Watcher for INotifyWatcher {
439 fn new_raw(tx: Sender<RawEvent>) -> Result<INotifyWatcher> {
440 let inotify = Inotify::init()?;
441 let event_tx = EventTx::Raw { tx };
442 let event_loop = EventLoop::new(inotify, event_tx)?;
443 let channel = event_loop.channel();
444 event_loop.run();
445 Ok(INotifyWatcher(Mutex::new(channel)))
446 }
447
448 fn new(tx: Sender<DebouncedEvent>, delay: Duration) -> Result<INotifyWatcher> {
449 let inotify = Inotify::init()?;
450 let event_tx = EventTx::Debounced {
451 tx: tx.clone(),
452 debounce: Debounce::new(delay, tx),
453 };
454 let event_loop = EventLoop::new(inotify, event_tx)?;
455 let channel = event_loop.channel();
456 event_loop.run();
457 Ok(INotifyWatcher(Mutex::new(channel)))
458 }
459
460 fn watch<P: AsRef<Path>>(&mut self, path: P, recursive_mode: RecursiveMode) -> Result<()> {
461 let pb = if path.as_ref().is_absolute() {
462 path.as_ref().to_owned()
463 } else {
464 let p = try!(env::current_dir().map_err(Error::Io));
465 p.join(path)
466 };
467 let (tx, rx) = mpsc::channel();
468 let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
469
470 self.0.lock().unwrap().send(msg).unwrap();
472 rx.recv().unwrap()
473 }
474
475 fn unwatch<P: AsRef<Path>>(&mut self, path: P) -> Result<()> {
476 let pb = if path.as_ref().is_absolute() {
477 path.as_ref().to_owned()
478 } else {
479 let p = try!(env::current_dir().map_err(Error::Io));
480 p.join(path)
481 };
482 let (tx, rx) = mpsc::channel();
483 let msg = EventLoopMsg::RemoveWatch(pb, tx);
484
485 self.0.lock().unwrap().send(msg).unwrap();
487 rx.recv().unwrap()
488 }
489}
490
491impl Drop for INotifyWatcher {
492 fn drop(&mut self) {
493 self.0.lock().unwrap().send(EventLoopMsg::Shutdown).unwrap();
495 }
496}