use notify::{watcher, DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
use std::collections::VecDeque;
use std::fmt;
use std::mem;
use std::path::{Path, PathBuf};
use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex};
use std::thread;
use std::time::Duration;
use xi_rpc::RpcPeer;
pub const DEBOUNCE_WAIT_MILLIS: u64 = 50;
pub struct FileWatcher {
inner: RecommendedWatcher,
state: Arc<Mutex<WatcherState>>,
}
#[derive(Debug, Default)]
struct WatcherState {
events: EventQueue,
watchees: Vec<Watchee>,
}
#[doc(hidden)]
struct Watchee {
path: PathBuf,
recursive: bool,
token: WatchToken,
filter: Option<Box<PathFilter>>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct WatchToken(pub usize);
pub trait Notify: Send {
fn notify(&self);
}
pub type EventQueue = VecDeque<(WatchToken, DebouncedEvent)>;
pub type PathFilter = dyn Fn(&Path) -> bool + Send + 'static;
impl FileWatcher {
pub fn new<T: Notify + 'static>(peer: T) -> Self {
let (tx_event, rx_event) = channel();
let state = Arc::new(Mutex::new(WatcherState::default()));
let state_clone = state.clone();
let inner = watcher(tx_event, Duration::from_millis(100)).expect("watcher should spawn");
thread::spawn(move || {
while let Ok(event) = rx_event.recv() {
let mut state = state_clone.lock().unwrap();
let WatcherState { ref mut events, ref mut watchees } = *state;
watchees
.iter()
.filter(|w| w.wants_event(&event))
.map(|w| w.token)
.for_each(|t| events.push_back((t, clone_event(&event))));
peer.notify();
}
});
FileWatcher { inner, state }
}
pub fn watch(&mut self, path: &Path, recursive: bool, token: WatchToken) {
self.watch_impl(path, recursive, token, None);
}
pub fn watch_filtered<F>(&mut self, path: &Path, recursive: bool, token: WatchToken, filter: F)
where
F: Fn(&Path) -> bool + Send + 'static,
{
let filter = Box::new(filter) as Box<PathFilter>;
self.watch_impl(path, recursive, token, Some(filter));
}
fn watch_impl(
&mut self,
path: &Path,
recursive: bool,
token: WatchToken,
filter: Option<Box<PathFilter>>,
) {
let path = match path.canonicalize() {
Ok(ref p) => p.to_owned(),
Err(e) => {
warn!("error watching {:?}: {:?}", path, e);
return;
}
};
let mut state = self.state.lock().unwrap();
let w = Watchee { path, recursive, token, filter };
let mode = mode_from_bool(w.recursive);
if !state.watchees.iter().any(|w2| w.path == w2.path) {
if let Err(e) = self.inner.watch(&w.path, mode) {
warn!("watching error {:?}", e);
}
}
state.watchees.push(w);
}
pub fn unwatch(&mut self, path: &Path, token: WatchToken) {
let mut state = self.state.lock().unwrap();
let idx = state.watchees.iter().position(|w| w.token == token && w.path == path);
if let Some(idx) = idx {
let removed = state.watchees.remove(idx);
if !state.watchees.iter().any(|w| w.path == removed.path) {
if let Err(e) = self.inner.unwatch(&removed.path) {
warn!("unwatching error {:?}", e);
}
}
if removed.recursive {
let to_add = state
.watchees
.iter()
.filter(|w| w.path.starts_with(&removed.path))
.map(|w| (w.path.to_owned(), mode_from_bool(w.recursive)))
.collect::<Vec<_>>();
for (path, mode) in to_add {
if let Err(e) = self.inner.watch(&path, mode) {
warn!("watching error {:?}", e);
}
}
}
}
}
pub fn take_events(&mut self) -> VecDeque<(WatchToken, DebouncedEvent)> {
let mut state = self.state.lock().unwrap();
let WatcherState { ref mut events, .. } = *state;
mem::replace(events, VecDeque::new())
}
}
impl Watchee {
fn wants_event(&self, event: &DebouncedEvent) -> bool {
use self::DebouncedEvent::*;
match *event {
NoticeWrite(ref p) | NoticeRemove(ref p) | Create(ref p) | Write(ref p)
| Chmod(ref p) | Remove(ref p) => self.applies_to_path(p),
Rename(ref p1, ref p2) => self.applies_to_path(p1) || self.applies_to_path(p2),
Rescan => false,
Error(_, ref opt_p) => opt_p.as_ref().map(|p| self.applies_to_path(p)).unwrap_or(false),
}
}
fn applies_to_path(&self, path: &Path) -> bool {
let general_case = if path.starts_with(&self.path) {
(self.recursive || self.path == path) || path.parent() == Some(&self.path)
} else {
false
};
if let Some(ref filter) = self.filter {
general_case && filter(path)
} else {
general_case
}
}
}
impl Notify for RpcPeer {
fn notify(&self) {
self.schedule_idle(crate::tabs::WATCH_IDLE_TOKEN);
}
}
impl fmt::Debug for Watchee {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(
f,
"Watchee path: {:?}, r {}, t {} f {}",
self.path,
self.recursive,
self.token.0,
self.filter.is_some()
)
}
}
fn mode_from_bool(is_recursive: bool) -> RecursiveMode {
if is_recursive {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
}
}
fn clone_event(event: &DebouncedEvent) -> DebouncedEvent {
use self::DebouncedEvent::*;
use notify::Error::*;
match *event {
NoticeWrite(ref p) => NoticeWrite(p.to_owned()),
NoticeRemove(ref p) => NoticeRemove(p.to_owned()),
Create(ref p) => Create(p.to_owned()),
Write(ref p) => Write(p.to_owned()),
Chmod(ref p) => Chmod(p.to_owned()),
Remove(ref p) => Remove(p.to_owned()),
Rename(ref p1, ref p2) => Rename(p1.to_owned(), p2.to_owned()),
Rescan => Rescan,
Error(ref e, ref opt_p) => {
let error = match *e {
PathNotFound => PathNotFound,
WatchNotFound => WatchNotFound,
Generic(ref s) => Generic(s.to_owned()),
Io(ref e) => Generic(format!("{:?}", e)),
};
Error(error, opt_p.clone())
}
}
}
#[cfg(test)]
extern crate tempdir;
#[cfg(test)]
mod tests {
use super::*;
use std::ffi::OsStr;
use std::fs;
use std::io::Write;
use std::sync::mpsc;
use std::thread;
use std::time::{Duration, Instant};
impl PartialEq<usize> for WatchToken {
fn eq(&self, other: &usize) -> bool {
self.0 == *other
}
}
impl From<usize> for WatchToken {
fn from(err: usize) -> WatchToken {
WatchToken(err)
}
}
impl Notify for mpsc::Sender<bool> {
fn notify(&self) {
self.send(true).expect("send shouldn't fail")
}
}
pub fn sleep(millis: u64) {
thread::sleep(Duration::from_millis(millis));
}
pub fn sleep_if_macos(millis: u64) {
if cfg!(target_os = "macos") {
sleep(millis)
}
}
pub fn recv_all<T>(rx: &mpsc::Receiver<T>, duration: Duration) -> Vec<T> {
let start = Instant::now();
let mut events = Vec::new();
while start.elapsed() < duration {
match rx.recv_timeout(Duration::from_millis(50)) {
Ok(event) => events.push(event),
Err(mpsc::RecvTimeoutError::Timeout) => (),
Err(e) => panic!("unexpected channel err: {:?}", e),
}
}
events
}
pub trait TestHelpers {
fn mkpath(&self, p: &str) -> PathBuf;
fn create(&self, p: &str);
fn create_all(&self, paths: Vec<&str>);
fn rename(&self, a: &str, b: &str);
fn write(&self, p: &str);
fn remove(&self, p: &str);
}
impl TestHelpers for tempdir::TempDir {
fn mkpath(&self, p: &str) -> PathBuf {
let mut path =
self.path().canonicalize().expect("failed to canonalize path").to_owned();
for part in p.split('/').collect::<Vec<_>>() {
if part != "." {
path.push(part);
}
}
path
}
fn create(&self, p: &str) {
let path = self.mkpath(p);
if path.components().last().unwrap().as_os_str().to_str().unwrap().contains("dir") {
fs::create_dir_all(path).expect("failed to create directory");
} else {
let parent = path.parent().expect("failed to get parent directory").to_owned();
if !parent.exists() {
fs::create_dir_all(parent).expect("failed to create parent directory");
}
fs::File::create(path).expect("failed to create file");
}
}
fn create_all(&self, paths: Vec<&str>) {
for p in paths {
self.create(p);
}
}
fn rename(&self, a: &str, b: &str) {
let path_a = self.mkpath(a);
let path_b = self.mkpath(b);
fs::rename(&path_a, &path_b).expect("failed to rename file or directory");
}
fn write(&self, p: &str) {
let path = self.mkpath(p);
let mut file =
fs::OpenOptions::new().write(true).open(path).expect("failed to open file");
file.write(b"some data").expect("failed to write to file");
file.sync_all().expect("failed to sync file");
}
fn remove(&self, p: &str) {
let path = self.mkpath(p);
if path.is_dir() {
fs::remove_dir(path).expect("failed to remove directory");
} else {
fs::remove_file(path).expect("failed to remove file");
}
}
}
#[test]
fn test_applies_to_path() {
let mut w = Watchee {
path: PathBuf::from("/hi/there/"),
recursive: false,
token: WatchToken(1),
filter: None,
};
assert!(w.applies_to_path(&PathBuf::from("/hi/there/friend.txt")));
assert!(w.applies_to_path(&PathBuf::from("/hi/there/")));
assert!(!w.applies_to_path(&PathBuf::from("/hi/there/dear/friend.txt")));
assert!(!w.applies_to_path(&PathBuf::from("/oh/hi/there/")));
w.recursive = true;
assert!(w.applies_to_path(&PathBuf::from("/hi/there/dear/friend.txt")));
assert!(w.applies_to_path(&PathBuf::from("/hi/there/friend.txt")));
assert!(w.applies_to_path(&PathBuf::from("/hi/there/")));
w.filter = Some(Box::new(|p| p.extension().and_then(OsStr::to_str) == Some("txt")));
assert!(w.applies_to_path(&PathBuf::from("/hi/there/dear/friend.txt")));
assert!(w.applies_to_path(&PathBuf::from("/hi/there/friend.txt")));
assert!(!w.applies_to_path(&PathBuf::from("/hi/there/")));
assert!(!w.applies_to_path(&PathBuf::from("/hi/there/friend.exe")));
assert!(w.applies_to_path(&PathBuf::from("/hi/there/my/old/sweet/pal.txt")));
}
#[test]
#[cfg(unix)]
fn test_crash_repro() {
let (tx, _rx) = channel();
let path = PathBuf::from("/bin/cat");
let mut w = watcher(tx, Duration::from_secs(1)).unwrap();
w.watch(&path, RecursiveMode::NonRecursive).unwrap();
sleep(20);
w.watch(&path, RecursiveMode::NonRecursive).unwrap();
w.unwatch(&path).unwrap();
}
#[test]
fn recurse_with_contained() {
let (tx, rx) = channel();
let tmp = tempdir::TempDir::new("xi-test-recurse-contained").unwrap();
let mut w = FileWatcher::new(tx);
tmp.create("adir/dir2/file");
sleep_if_macos(35_000);
w.watch(&tmp.mkpath("adir"), true, 1.into());
sleep(10);
w.watch(&tmp.mkpath("adir/dir2/file"), false, 2.into());
sleep(10);
w.unwatch(&tmp.mkpath("adir"), 1.into());
sleep(10);
tmp.write("adir/dir2/file");
let _ = recv_all(&rx, Duration::from_millis(1000));
let events = w.take_events();
assert_eq!(
events,
vec![
(2.into(), DebouncedEvent::NoticeWrite(tmp.mkpath("adir/dir2/file"))),
(2.into(), DebouncedEvent::Write(tmp.mkpath("adir/dir2/file"))),
]
);
}
#[test]
fn two_watchers_one_file() {
let (tx, rx) = channel();
let tmp = tempdir::TempDir::new("xi-test-two-watchers").unwrap();
tmp.create("my_file");
sleep_if_macos(30_100);
let mut w = FileWatcher::new(tx);
w.watch(&tmp.mkpath("my_file"), false, 1.into());
sleep_if_macos(10);
w.watch(&tmp.mkpath("my_file"), false, 2.into());
sleep_if_macos(10);
tmp.write("my_file");
let _ = recv_all(&rx, Duration::from_millis(1000));
let events = w.take_events();
assert_eq!(
events,
vec![
(1.into(), DebouncedEvent::NoticeWrite(tmp.mkpath("my_file"))),
(2.into(), DebouncedEvent::NoticeWrite(tmp.mkpath("my_file"))),
(1.into(), DebouncedEvent::Write(tmp.mkpath("my_file"))),
(2.into(), DebouncedEvent::Write(tmp.mkpath("my_file"))),
]
);
assert_eq!(w.state.lock().unwrap().watchees.len(), 2);
w.unwatch(&tmp.mkpath("my_file"), 1.into());
assert_eq!(w.state.lock().unwrap().watchees.len(), 1);
sleep_if_macos(1000);
let path = tmp.mkpath("my_file");
tmp.remove("my_file");
sleep_if_macos(1000);
let _ = recv_all(&rx, Duration::from_millis(1000));
let events = w.take_events();
assert!(events.contains(&(2.into(), DebouncedEvent::NoticeRemove(path.clone()))));
assert!(!events.contains(&(1.into(), DebouncedEvent::NoticeRemove(path))));
}
}