xi_core_lib/
watcher.rs

1// Copyright 2017 The xi-editor Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Monitoring files and directories.
16//!
17//! This module contains `FileWatcher` and related types, responsible for
18//! monitoring changes to files and directories. Under the hood it is a
19//! thin wrapper around some concrete type provided by the
20//! [`notify`](https://docs.rs/notify) crate; the implementation is
21//! platform dependent, and may be using kqueue, fsevent, or another
22//! low-level monitoring system.
23//!
24//! Our wrapper provides a few useful features:
25//!
26//! - All `watch` calls are associated with a `WatchToken`; this
27//! allows for the same path to be watched multiple times,
28//! presumably by multiple interested parties. events are delivered
29//! once-per token.
30//!
31//! - There is the option (via `FileWatcher::watch_filtered`) to include
32//! a predicate along with a path, to filter paths before delivery.
33//!
34//! - We are integrated with the xi_rpc runloop; events are queued as
35//! they arrive, and an idle task is scheduled.
36
37use notify::{watcher, DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
38use std::collections::VecDeque;
39use std::fmt;
40use std::mem;
41use std::path::{Path, PathBuf};
42use std::sync::mpsc::channel;
43use std::sync::{Arc, Mutex};
44use std::thread;
45use std::time::Duration;
46
47use xi_rpc::RpcPeer;
48
49/// Delay for aggregating related file system events.
50pub const DEBOUNCE_WAIT_MILLIS: u64 = 50;
51
52/// Wrapper around a `notify::Watcher`. It runs the inner watcher
53/// in a separate thread, and communicates with it via an `mpsc::channel`.
54pub struct FileWatcher {
55    inner: RecommendedWatcher,
56    state: Arc<Mutex<WatcherState>>,
57}
58
59#[derive(Debug, Default)]
60struct WatcherState {
61    events: EventQueue,
62    watchees: Vec<Watchee>,
63}
64
65/// Tracks a registered 'that-which-is-watched'.
66#[doc(hidden)]
67struct Watchee {
68    path: PathBuf,
69    recursive: bool,
70    token: WatchToken,
71    filter: Option<Box<PathFilter>>,
72}
73
74/// Token provided to `FileWatcher`, to associate events with
75/// interested parties.
76///
77/// Note: `WatchToken`s are assumed to correspond with an
78/// 'area of interest'; that is, they are used to route delivery
79/// of events.
80#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub struct WatchToken(pub usize);
82
83/// A trait for types which can be notified of new events.
84/// New events are accessible through the `FileWatcher` instance.
85pub trait Notify: Send {
86    fn notify(&self);
87}
88
89pub type EventQueue = VecDeque<(WatchToken, DebouncedEvent)>;
90
91pub type PathFilter = dyn Fn(&Path) -> bool + Send + 'static;
92
93impl FileWatcher {
94    pub fn new<T: Notify + 'static>(peer: T) -> Self {
95        let (tx_event, rx_event) = channel();
96
97        let state = Arc::new(Mutex::new(WatcherState::default()));
98        let state_clone = state.clone();
99
100        let inner = watcher(tx_event, Duration::from_millis(100)).expect("watcher should spawn");
101
102        thread::spawn(move || {
103            while let Ok(event) = rx_event.recv() {
104                let mut state = state_clone.lock().unwrap();
105                let WatcherState { ref mut events, ref mut watchees } = *state;
106
107                watchees
108                    .iter()
109                    .filter(|w| w.wants_event(&event))
110                    .map(|w| w.token)
111                    .for_each(|t| events.push_back((t, clone_event(&event))));
112
113                peer.notify();
114            }
115        });
116
117        FileWatcher { inner, state }
118    }
119
120    /// Begin watching `path`. As `DebouncedEvent`s (documented in the
121    /// [notify](https://docs.rs/notify) crate) arrive, they are stored
122    /// with the associated `token` and a task is added to the runloop's
123    /// idle queue.
124    ///
125    /// Delivery of events then requires that the runloop's handler
126    /// correctly forward the `handle_idle` call to the interested party.
127    pub fn watch(&mut self, path: &Path, recursive: bool, token: WatchToken) {
128        self.watch_impl(path, recursive, token, None);
129    }
130
131    /// Like `watch`, but taking a predicate function that filters delivery
132    /// of events based on their path.
133    pub fn watch_filtered<F>(&mut self, path: &Path, recursive: bool, token: WatchToken, filter: F)
134    where
135        F: Fn(&Path) -> bool + Send + 'static,
136    {
137        let filter = Box::new(filter) as Box<PathFilter>;
138        self.watch_impl(path, recursive, token, Some(filter));
139    }
140
141    fn watch_impl(
142        &mut self,
143        path: &Path,
144        recursive: bool,
145        token: WatchToken,
146        filter: Option<Box<PathFilter>>,
147    ) {
148        let path = match path.canonicalize() {
149            Ok(ref p) => p.to_owned(),
150            Err(e) => {
151                warn!("error watching {:?}: {:?}", path, e);
152                return;
153            }
154        };
155
156        let mut state = self.state.lock().unwrap();
157
158        let w = Watchee { path, recursive, token, filter };
159        let mode = mode_from_bool(w.recursive);
160
161        if !state.watchees.iter().any(|w2| w.path == w2.path) {
162            if let Err(e) = self.inner.watch(&w.path, mode) {
163                warn!("watching error {:?}", e);
164            }
165        }
166
167        state.watchees.push(w);
168    }
169
170    /// Removes the provided token/path pair from the watch list.
171    /// Does not stop watching this path, if it is associated with
172    /// other tokens.
173    pub fn unwatch(&mut self, path: &Path, token: WatchToken) {
174        let mut state = self.state.lock().unwrap();
175
176        let idx = state.watchees.iter().position(|w| w.token == token && w.path == path);
177
178        if let Some(idx) = idx {
179            let removed = state.watchees.remove(idx);
180            if !state.watchees.iter().any(|w| w.path == removed.path) {
181                if let Err(e) = self.inner.unwatch(&removed.path) {
182                    warn!("unwatching error {:?}", e);
183                }
184            }
185            //TODO: Ideally we would be tracking what paths we're watching with
186            // some prefix-tree-like structure, which would let us keep track
187            // of when some child path might need to be reregistered. How this
188            // works and when registration would be required is dependent on
189            // the underlying notification mechanism, however. There's an
190            // in-progress rewrite of the Notify crate which use under the
191            // hood, and a component of that rewrite is adding this
192            // functionality; so until that lands we're using a fairly coarse
193            // heuristic to determine if we need to re-watch subpaths.
194
195            // if this was recursive, check if any child paths need to be
196            // manually re-added
197            if removed.recursive {
198                // do this in two steps because we've borrowed mutably up top
199                let to_add = state
200                    .watchees
201                    .iter()
202                    .filter(|w| w.path.starts_with(&removed.path))
203                    .map(|w| (w.path.to_owned(), mode_from_bool(w.recursive)))
204                    .collect::<Vec<_>>();
205
206                for (path, mode) in to_add {
207                    if let Err(e) = self.inner.watch(&path, mode) {
208                        warn!("watching error {:?}", e);
209                    }
210                }
211            }
212        }
213    }
214
215    /// Takes ownership of this `Watcher`'s current event queue.
216    pub fn take_events(&mut self) -> VecDeque<(WatchToken, DebouncedEvent)> {
217        let mut state = self.state.lock().unwrap();
218        let WatcherState { ref mut events, .. } = *state;
219        mem::replace(events, VecDeque::new())
220    }
221}
222
223impl Watchee {
224    fn wants_event(&self, event: &DebouncedEvent) -> bool {
225        use self::DebouncedEvent::*;
226        match *event {
227            NoticeWrite(ref p) | NoticeRemove(ref p) | Create(ref p) | Write(ref p)
228            | Chmod(ref p) | Remove(ref p) => self.applies_to_path(p),
229            Rename(ref p1, ref p2) => self.applies_to_path(p1) || self.applies_to_path(p2),
230            Rescan => false,
231            Error(_, ref opt_p) => opt_p.as_ref().map(|p| self.applies_to_path(p)).unwrap_or(false),
232        }
233    }
234
235    fn applies_to_path(&self, path: &Path) -> bool {
236        let general_case = if path.starts_with(&self.path) {
237            (self.recursive || self.path == path) || path.parent() == Some(&self.path)
238        } else {
239            false
240        };
241
242        if let Some(ref filter) = self.filter {
243            general_case && filter(path)
244        } else {
245            general_case
246        }
247    }
248}
249
250impl Notify for RpcPeer {
251    fn notify(&self) {
252        self.schedule_idle(crate::tabs::WATCH_IDLE_TOKEN);
253    }
254}
255
256impl fmt::Debug for Watchee {
257    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
258        write!(
259            f,
260            "Watchee path: {:?}, r {}, t {} f {}",
261            self.path,
262            self.recursive,
263            self.token.0,
264            self.filter.is_some()
265        )
266    }
267}
268
269fn mode_from_bool(is_recursive: bool) -> RecursiveMode {
270    if is_recursive {
271        RecursiveMode::Recursive
272    } else {
273        RecursiveMode::NonRecursive
274    }
275}
276
277// Debounced event does not implement clone
278// TODO: remove if https://github.com/passcod/notify/pull/133 is merged
279fn clone_event(event: &DebouncedEvent) -> DebouncedEvent {
280    use self::DebouncedEvent::*;
281    use notify::Error::*;
282    match *event {
283        NoticeWrite(ref p) => NoticeWrite(p.to_owned()),
284        NoticeRemove(ref p) => NoticeRemove(p.to_owned()),
285        Create(ref p) => Create(p.to_owned()),
286        Write(ref p) => Write(p.to_owned()),
287        Chmod(ref p) => Chmod(p.to_owned()),
288        Remove(ref p) => Remove(p.to_owned()),
289        Rename(ref p1, ref p2) => Rename(p1.to_owned(), p2.to_owned()),
290        Rescan => Rescan,
291        Error(ref e, ref opt_p) => {
292            let error = match *e {
293                PathNotFound => PathNotFound,
294                WatchNotFound => WatchNotFound,
295                Generic(ref s) => Generic(s.to_owned()),
296                Io(ref e) => Generic(format!("{:?}", e)),
297            };
298            Error(error, opt_p.clone())
299        }
300    }
301}
302
303#[cfg(test)]
304extern crate tempdir;
305
306#[cfg(test)]
307mod tests {
308    use super::*;
309    use std::ffi::OsStr;
310    use std::fs;
311    use std::io::Write;
312    use std::sync::mpsc;
313    use std::thread;
314    use std::time::{Duration, Instant};
315
316    impl PartialEq<usize> for WatchToken {
317        fn eq(&self, other: &usize) -> bool {
318            self.0 == *other
319        }
320    }
321
322    impl From<usize> for WatchToken {
323        fn from(err: usize) -> WatchToken {
324            WatchToken(err)
325        }
326    }
327
328    impl Notify for mpsc::Sender<bool> {
329        fn notify(&self) {
330            self.send(true).expect("send shouldn't fail")
331        }
332    }
333
334    // Sleep for `duration` in milliseconds
335    pub fn sleep(millis: u64) {
336        thread::sleep(Duration::from_millis(millis));
337    }
338
339    // Sleep for `duration` in milliseconds if running on OS X
340    pub fn sleep_if_macos(millis: u64) {
341        if cfg!(target_os = "macos") {
342            sleep(millis)
343        }
344    }
345
346    pub fn recv_all<T>(rx: &mpsc::Receiver<T>, duration: Duration) -> Vec<T> {
347        let start = Instant::now();
348        let mut events = Vec::new();
349
350        while start.elapsed() < duration {
351            match rx.recv_timeout(Duration::from_millis(50)) {
352                Ok(event) => events.push(event),
353                Err(mpsc::RecvTimeoutError::Timeout) => (),
354                Err(e) => panic!("unexpected channel err: {:?}", e),
355            }
356        }
357        events
358    }
359
360    // from https://github.com/passcod/notify/blob/master/tests/utils/mod.rs
361    pub trait TestHelpers {
362        /// Return path relative to the TempDir. Directory separator must
363        /// be a forward slash, and will be converted to the platform's
364        /// native separator.
365        fn mkpath(&self, p: &str) -> PathBuf;
366        /// Create file or directory. Directories must contain the phrase
367        /// "dir" otherwise they will be interpreted as files.
368        fn create(&self, p: &str);
369        /// Create all files and directories in the `paths` list.
370        /// Directories must contain the phrase "dir" otherwise they
371        /// will be interpreted as files.
372        fn create_all(&self, paths: Vec<&str>);
373        /// Rename file or directory.
374        fn rename(&self, a: &str, b: &str);
375        ///// Toggle "other" rights on linux and os x and "readonly" on windows
376        //fn chmod(&self, p: &str);
377        /// Write some data to a file
378        fn write(&self, p: &str);
379        /// Remove file or directory
380        fn remove(&self, p: &str);
381    }
382
383    impl TestHelpers for tempdir::TempDir {
384        fn mkpath(&self, p: &str) -> PathBuf {
385            let mut path =
386                self.path().canonicalize().expect("failed to canonalize path").to_owned();
387            for part in p.split('/').collect::<Vec<_>>() {
388                if part != "." {
389                    path.push(part);
390                }
391            }
392            path
393        }
394
395        fn create(&self, p: &str) {
396            let path = self.mkpath(p);
397            if path.components().last().unwrap().as_os_str().to_str().unwrap().contains("dir") {
398                fs::create_dir_all(path).expect("failed to create directory");
399            } else {
400                let parent = path.parent().expect("failed to get parent directory").to_owned();
401                if !parent.exists() {
402                    fs::create_dir_all(parent).expect("failed to create parent directory");
403                }
404                fs::File::create(path).expect("failed to create file");
405            }
406        }
407
408        fn create_all(&self, paths: Vec<&str>) {
409            for p in paths {
410                self.create(p);
411            }
412        }
413
414        fn rename(&self, a: &str, b: &str) {
415            let path_a = self.mkpath(a);
416            let path_b = self.mkpath(b);
417            fs::rename(&path_a, &path_b).expect("failed to rename file or directory");
418        }
419
420        fn write(&self, p: &str) {
421            let path = self.mkpath(p);
422
423            let mut file =
424                fs::OpenOptions::new().write(true).open(path).expect("failed to open file");
425
426            file.write(b"some data").expect("failed to write to file");
427            file.sync_all().expect("failed to sync file");
428        }
429
430        fn remove(&self, p: &str) {
431            let path = self.mkpath(p);
432            if path.is_dir() {
433                fs::remove_dir(path).expect("failed to remove directory");
434            } else {
435                fs::remove_file(path).expect("failed to remove file");
436            }
437        }
438    }
439
440    #[test]
441    fn test_applies_to_path() {
442        let mut w = Watchee {
443            path: PathBuf::from("/hi/there/"),
444            recursive: false,
445            token: WatchToken(1),
446            filter: None,
447        };
448        assert!(w.applies_to_path(&PathBuf::from("/hi/there/friend.txt")));
449        assert!(w.applies_to_path(&PathBuf::from("/hi/there/")));
450        assert!(!w.applies_to_path(&PathBuf::from("/hi/there/dear/friend.txt")));
451        assert!(!w.applies_to_path(&PathBuf::from("/oh/hi/there/")));
452
453        w.recursive = true;
454        assert!(w.applies_to_path(&PathBuf::from("/hi/there/dear/friend.txt")));
455        assert!(w.applies_to_path(&PathBuf::from("/hi/there/friend.txt")));
456        assert!(w.applies_to_path(&PathBuf::from("/hi/there/")));
457
458        w.filter = Some(Box::new(|p| p.extension().and_then(OsStr::to_str) == Some("txt")));
459        assert!(w.applies_to_path(&PathBuf::from("/hi/there/dear/friend.txt")));
460        assert!(w.applies_to_path(&PathBuf::from("/hi/there/friend.txt")));
461        assert!(!w.applies_to_path(&PathBuf::from("/hi/there/")));
462        assert!(!w.applies_to_path(&PathBuf::from("/hi/there/friend.exe")));
463        assert!(w.applies_to_path(&PathBuf::from("/hi/there/my/old/sweet/pal.txt")));
464    }
465
466    //https://github.com/passcod/notify/issues/131
467    #[test]
468    #[cfg(unix)]
469    fn test_crash_repro() {
470        let (tx, _rx) = channel();
471        let path = PathBuf::from("/bin/cat");
472        let mut w = watcher(tx, Duration::from_secs(1)).unwrap();
473        w.watch(&path, RecursiveMode::NonRecursive).unwrap();
474        sleep(20);
475        w.watch(&path, RecursiveMode::NonRecursive).unwrap();
476        w.unwatch(&path).unwrap();
477    }
478
479    #[test]
480    fn recurse_with_contained() {
481        let (tx, rx) = channel();
482        let tmp = tempdir::TempDir::new("xi-test-recurse-contained").unwrap();
483        let mut w = FileWatcher::new(tx);
484        tmp.create("adir/dir2/file");
485        sleep_if_macos(35_000);
486        w.watch(&tmp.mkpath("adir"), true, 1.into());
487        sleep(10);
488        w.watch(&tmp.mkpath("adir/dir2/file"), false, 2.into());
489        sleep(10);
490        w.unwatch(&tmp.mkpath("adir"), 1.into());
491        sleep(10);
492        tmp.write("adir/dir2/file");
493        let _ = recv_all(&rx, Duration::from_millis(1000));
494        let events = w.take_events();
495        assert_eq!(
496            events,
497            vec![
498                (2.into(), DebouncedEvent::NoticeWrite(tmp.mkpath("adir/dir2/file"))),
499                (2.into(), DebouncedEvent::Write(tmp.mkpath("adir/dir2/file"))),
500            ]
501        );
502    }
503
504    #[test]
505    fn two_watchers_one_file() {
506        let (tx, rx) = channel();
507        let tmp = tempdir::TempDir::new("xi-test-two-watchers").unwrap();
508        tmp.create("my_file");
509        sleep_if_macos(30_100);
510        let mut w = FileWatcher::new(tx);
511        w.watch(&tmp.mkpath("my_file"), false, 1.into());
512        sleep_if_macos(10);
513        w.watch(&tmp.mkpath("my_file"), false, 2.into());
514        sleep_if_macos(10);
515        tmp.write("my_file");
516
517        let _ = recv_all(&rx, Duration::from_millis(1000));
518        let events = w.take_events();
519        assert_eq!(
520            events,
521            vec![
522                (1.into(), DebouncedEvent::NoticeWrite(tmp.mkpath("my_file"))),
523                (2.into(), DebouncedEvent::NoticeWrite(tmp.mkpath("my_file"))),
524                (1.into(), DebouncedEvent::Write(tmp.mkpath("my_file"))),
525                (2.into(), DebouncedEvent::Write(tmp.mkpath("my_file"))),
526            ]
527        );
528
529        assert_eq!(w.state.lock().unwrap().watchees.len(), 2);
530        w.unwatch(&tmp.mkpath("my_file"), 1.into());
531        assert_eq!(w.state.lock().unwrap().watchees.len(), 1);
532        sleep_if_macos(1000);
533        let path = tmp.mkpath("my_file");
534        tmp.remove("my_file");
535        sleep_if_macos(1000);
536        let _ = recv_all(&rx, Duration::from_millis(1000));
537        let events = w.take_events();
538        assert!(events.contains(&(2.into(), DebouncedEvent::NoticeRemove(path.clone()))));
539        assert!(!events.contains(&(1.into(), DebouncedEvent::NoticeRemove(path))));
540    }
541}