1use notify::{watcher, DebouncedEvent, RecommendedWatcher, RecursiveMode, Watcher};
38use std::collections::VecDeque;
39use std::fmt;
40use std::mem;
41use std::path::{Path, PathBuf};
42use std::sync::mpsc::channel;
43use std::sync::{Arc, Mutex};
44use std::thread;
45use std::time::Duration;
46
47use xi_rpc::RpcPeer;
48
49pub const DEBOUNCE_WAIT_MILLIS: u64 = 50;
51
52pub struct FileWatcher {
55 inner: RecommendedWatcher,
56 state: Arc<Mutex<WatcherState>>,
57}
58
59#[derive(Debug, Default)]
60struct WatcherState {
61 events: EventQueue,
62 watchees: Vec<Watchee>,
63}
64
65#[doc(hidden)]
67struct Watchee {
68 path: PathBuf,
69 recursive: bool,
70 token: WatchToken,
71 filter: Option<Box<PathFilter>>,
72}
73
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
81pub struct WatchToken(pub usize);
82
83pub trait Notify: Send {
86 fn notify(&self);
87}
88
89pub type EventQueue = VecDeque<(WatchToken, DebouncedEvent)>;
90
91pub type PathFilter = dyn Fn(&Path) -> bool + Send + 'static;
92
93impl FileWatcher {
94 pub fn new<T: Notify + 'static>(peer: T) -> Self {
95 let (tx_event, rx_event) = channel();
96
97 let state = Arc::new(Mutex::new(WatcherState::default()));
98 let state_clone = state.clone();
99
100 let inner = watcher(tx_event, Duration::from_millis(100)).expect("watcher should spawn");
101
102 thread::spawn(move || {
103 while let Ok(event) = rx_event.recv() {
104 let mut state = state_clone.lock().unwrap();
105 let WatcherState { ref mut events, ref mut watchees } = *state;
106
107 watchees
108 .iter()
109 .filter(|w| w.wants_event(&event))
110 .map(|w| w.token)
111 .for_each(|t| events.push_back((t, clone_event(&event))));
112
113 peer.notify();
114 }
115 });
116
117 FileWatcher { inner, state }
118 }
119
120 pub fn watch(&mut self, path: &Path, recursive: bool, token: WatchToken) {
128 self.watch_impl(path, recursive, token, None);
129 }
130
131 pub fn watch_filtered<F>(&mut self, path: &Path, recursive: bool, token: WatchToken, filter: F)
134 where
135 F: Fn(&Path) -> bool + Send + 'static,
136 {
137 let filter = Box::new(filter) as Box<PathFilter>;
138 self.watch_impl(path, recursive, token, Some(filter));
139 }
140
141 fn watch_impl(
142 &mut self,
143 path: &Path,
144 recursive: bool,
145 token: WatchToken,
146 filter: Option<Box<PathFilter>>,
147 ) {
148 let path = match path.canonicalize() {
149 Ok(ref p) => p.to_owned(),
150 Err(e) => {
151 warn!("error watching {:?}: {:?}", path, e);
152 return;
153 }
154 };
155
156 let mut state = self.state.lock().unwrap();
157
158 let w = Watchee { path, recursive, token, filter };
159 let mode = mode_from_bool(w.recursive);
160
161 if !state.watchees.iter().any(|w2| w.path == w2.path) {
162 if let Err(e) = self.inner.watch(&w.path, mode) {
163 warn!("watching error {:?}", e);
164 }
165 }
166
167 state.watchees.push(w);
168 }
169
170 pub fn unwatch(&mut self, path: &Path, token: WatchToken) {
174 let mut state = self.state.lock().unwrap();
175
176 let idx = state.watchees.iter().position(|w| w.token == token && w.path == path);
177
178 if let Some(idx) = idx {
179 let removed = state.watchees.remove(idx);
180 if !state.watchees.iter().any(|w| w.path == removed.path) {
181 if let Err(e) = self.inner.unwatch(&removed.path) {
182 warn!("unwatching error {:?}", e);
183 }
184 }
185 if removed.recursive {
198 let to_add = state
200 .watchees
201 .iter()
202 .filter(|w| w.path.starts_with(&removed.path))
203 .map(|w| (w.path.to_owned(), mode_from_bool(w.recursive)))
204 .collect::<Vec<_>>();
205
206 for (path, mode) in to_add {
207 if let Err(e) = self.inner.watch(&path, mode) {
208 warn!("watching error {:?}", e);
209 }
210 }
211 }
212 }
213 }
214
215 pub fn take_events(&mut self) -> VecDeque<(WatchToken, DebouncedEvent)> {
217 let mut state = self.state.lock().unwrap();
218 let WatcherState { ref mut events, .. } = *state;
219 mem::replace(events, VecDeque::new())
220 }
221}
222
223impl Watchee {
224 fn wants_event(&self, event: &DebouncedEvent) -> bool {
225 use self::DebouncedEvent::*;
226 match *event {
227 NoticeWrite(ref p) | NoticeRemove(ref p) | Create(ref p) | Write(ref p)
228 | Chmod(ref p) | Remove(ref p) => self.applies_to_path(p),
229 Rename(ref p1, ref p2) => self.applies_to_path(p1) || self.applies_to_path(p2),
230 Rescan => false,
231 Error(_, ref opt_p) => opt_p.as_ref().map(|p| self.applies_to_path(p)).unwrap_or(false),
232 }
233 }
234
235 fn applies_to_path(&self, path: &Path) -> bool {
236 let general_case = if path.starts_with(&self.path) {
237 (self.recursive || self.path == path) || path.parent() == Some(&self.path)
238 } else {
239 false
240 };
241
242 if let Some(ref filter) = self.filter {
243 general_case && filter(path)
244 } else {
245 general_case
246 }
247 }
248}
249
250impl Notify for RpcPeer {
251 fn notify(&self) {
252 self.schedule_idle(crate::tabs::WATCH_IDLE_TOKEN);
253 }
254}
255
256impl fmt::Debug for Watchee {
257 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
258 write!(
259 f,
260 "Watchee path: {:?}, r {}, t {} f {}",
261 self.path,
262 self.recursive,
263 self.token.0,
264 self.filter.is_some()
265 )
266 }
267}
268
269fn mode_from_bool(is_recursive: bool) -> RecursiveMode {
270 if is_recursive {
271 RecursiveMode::Recursive
272 } else {
273 RecursiveMode::NonRecursive
274 }
275}
276
277fn clone_event(event: &DebouncedEvent) -> DebouncedEvent {
280 use self::DebouncedEvent::*;
281 use notify::Error::*;
282 match *event {
283 NoticeWrite(ref p) => NoticeWrite(p.to_owned()),
284 NoticeRemove(ref p) => NoticeRemove(p.to_owned()),
285 Create(ref p) => Create(p.to_owned()),
286 Write(ref p) => Write(p.to_owned()),
287 Chmod(ref p) => Chmod(p.to_owned()),
288 Remove(ref p) => Remove(p.to_owned()),
289 Rename(ref p1, ref p2) => Rename(p1.to_owned(), p2.to_owned()),
290 Rescan => Rescan,
291 Error(ref e, ref opt_p) => {
292 let error = match *e {
293 PathNotFound => PathNotFound,
294 WatchNotFound => WatchNotFound,
295 Generic(ref s) => Generic(s.to_owned()),
296 Io(ref e) => Generic(format!("{:?}", e)),
297 };
298 Error(error, opt_p.clone())
299 }
300 }
301}
302
303#[cfg(test)]
304extern crate tempdir;
305
306#[cfg(test)]
307mod tests {
308 use super::*;
309 use std::ffi::OsStr;
310 use std::fs;
311 use std::io::Write;
312 use std::sync::mpsc;
313 use std::thread;
314 use std::time::{Duration, Instant};
315
316 impl PartialEq<usize> for WatchToken {
317 fn eq(&self, other: &usize) -> bool {
318 self.0 == *other
319 }
320 }
321
322 impl From<usize> for WatchToken {
323 fn from(err: usize) -> WatchToken {
324 WatchToken(err)
325 }
326 }
327
328 impl Notify for mpsc::Sender<bool> {
329 fn notify(&self) {
330 self.send(true).expect("send shouldn't fail")
331 }
332 }
333
334 pub fn sleep(millis: u64) {
336 thread::sleep(Duration::from_millis(millis));
337 }
338
339 pub fn sleep_if_macos(millis: u64) {
341 if cfg!(target_os = "macos") {
342 sleep(millis)
343 }
344 }
345
346 pub fn recv_all<T>(rx: &mpsc::Receiver<T>, duration: Duration) -> Vec<T> {
347 let start = Instant::now();
348 let mut events = Vec::new();
349
350 while start.elapsed() < duration {
351 match rx.recv_timeout(Duration::from_millis(50)) {
352 Ok(event) => events.push(event),
353 Err(mpsc::RecvTimeoutError::Timeout) => (),
354 Err(e) => panic!("unexpected channel err: {:?}", e),
355 }
356 }
357 events
358 }
359
360 pub trait TestHelpers {
362 fn mkpath(&self, p: &str) -> PathBuf;
366 fn create(&self, p: &str);
369 fn create_all(&self, paths: Vec<&str>);
373 fn rename(&self, a: &str, b: &str);
375 fn write(&self, p: &str);
379 fn remove(&self, p: &str);
381 }
382
383 impl TestHelpers for tempdir::TempDir {
384 fn mkpath(&self, p: &str) -> PathBuf {
385 let mut path =
386 self.path().canonicalize().expect("failed to canonalize path").to_owned();
387 for part in p.split('/').collect::<Vec<_>>() {
388 if part != "." {
389 path.push(part);
390 }
391 }
392 path
393 }
394
395 fn create(&self, p: &str) {
396 let path = self.mkpath(p);
397 if path.components().last().unwrap().as_os_str().to_str().unwrap().contains("dir") {
398 fs::create_dir_all(path).expect("failed to create directory");
399 } else {
400 let parent = path.parent().expect("failed to get parent directory").to_owned();
401 if !parent.exists() {
402 fs::create_dir_all(parent).expect("failed to create parent directory");
403 }
404 fs::File::create(path).expect("failed to create file");
405 }
406 }
407
408 fn create_all(&self, paths: Vec<&str>) {
409 for p in paths {
410 self.create(p);
411 }
412 }
413
414 fn rename(&self, a: &str, b: &str) {
415 let path_a = self.mkpath(a);
416 let path_b = self.mkpath(b);
417 fs::rename(&path_a, &path_b).expect("failed to rename file or directory");
418 }
419
420 fn write(&self, p: &str) {
421 let path = self.mkpath(p);
422
423 let mut file =
424 fs::OpenOptions::new().write(true).open(path).expect("failed to open file");
425
426 file.write(b"some data").expect("failed to write to file");
427 file.sync_all().expect("failed to sync file");
428 }
429
430 fn remove(&self, p: &str) {
431 let path = self.mkpath(p);
432 if path.is_dir() {
433 fs::remove_dir(path).expect("failed to remove directory");
434 } else {
435 fs::remove_file(path).expect("failed to remove file");
436 }
437 }
438 }
439
440 #[test]
441 fn test_applies_to_path() {
442 let mut w = Watchee {
443 path: PathBuf::from("/hi/there/"),
444 recursive: false,
445 token: WatchToken(1),
446 filter: None,
447 };
448 assert!(w.applies_to_path(&PathBuf::from("/hi/there/friend.txt")));
449 assert!(w.applies_to_path(&PathBuf::from("/hi/there/")));
450 assert!(!w.applies_to_path(&PathBuf::from("/hi/there/dear/friend.txt")));
451 assert!(!w.applies_to_path(&PathBuf::from("/oh/hi/there/")));
452
453 w.recursive = true;
454 assert!(w.applies_to_path(&PathBuf::from("/hi/there/dear/friend.txt")));
455 assert!(w.applies_to_path(&PathBuf::from("/hi/there/friend.txt")));
456 assert!(w.applies_to_path(&PathBuf::from("/hi/there/")));
457
458 w.filter = Some(Box::new(|p| p.extension().and_then(OsStr::to_str) == Some("txt")));
459 assert!(w.applies_to_path(&PathBuf::from("/hi/there/dear/friend.txt")));
460 assert!(w.applies_to_path(&PathBuf::from("/hi/there/friend.txt")));
461 assert!(!w.applies_to_path(&PathBuf::from("/hi/there/")));
462 assert!(!w.applies_to_path(&PathBuf::from("/hi/there/friend.exe")));
463 assert!(w.applies_to_path(&PathBuf::from("/hi/there/my/old/sweet/pal.txt")));
464 }
465
466 #[test]
468 #[cfg(unix)]
469 fn test_crash_repro() {
470 let (tx, _rx) = channel();
471 let path = PathBuf::from("/bin/cat");
472 let mut w = watcher(tx, Duration::from_secs(1)).unwrap();
473 w.watch(&path, RecursiveMode::NonRecursive).unwrap();
474 sleep(20);
475 w.watch(&path, RecursiveMode::NonRecursive).unwrap();
476 w.unwatch(&path).unwrap();
477 }
478
479 #[test]
480 fn recurse_with_contained() {
481 let (tx, rx) = channel();
482 let tmp = tempdir::TempDir::new("xi-test-recurse-contained").unwrap();
483 let mut w = FileWatcher::new(tx);
484 tmp.create("adir/dir2/file");
485 sleep_if_macos(35_000);
486 w.watch(&tmp.mkpath("adir"), true, 1.into());
487 sleep(10);
488 w.watch(&tmp.mkpath("adir/dir2/file"), false, 2.into());
489 sleep(10);
490 w.unwatch(&tmp.mkpath("adir"), 1.into());
491 sleep(10);
492 tmp.write("adir/dir2/file");
493 let _ = recv_all(&rx, Duration::from_millis(1000));
494 let events = w.take_events();
495 assert_eq!(
496 events,
497 vec![
498 (2.into(), DebouncedEvent::NoticeWrite(tmp.mkpath("adir/dir2/file"))),
499 (2.into(), DebouncedEvent::Write(tmp.mkpath("adir/dir2/file"))),
500 ]
501 );
502 }
503
504 #[test]
505 fn two_watchers_one_file() {
506 let (tx, rx) = channel();
507 let tmp = tempdir::TempDir::new("xi-test-two-watchers").unwrap();
508 tmp.create("my_file");
509 sleep_if_macos(30_100);
510 let mut w = FileWatcher::new(tx);
511 w.watch(&tmp.mkpath("my_file"), false, 1.into());
512 sleep_if_macos(10);
513 w.watch(&tmp.mkpath("my_file"), false, 2.into());
514 sleep_if_macos(10);
515 tmp.write("my_file");
516
517 let _ = recv_all(&rx, Duration::from_millis(1000));
518 let events = w.take_events();
519 assert_eq!(
520 events,
521 vec![
522 (1.into(), DebouncedEvent::NoticeWrite(tmp.mkpath("my_file"))),
523 (2.into(), DebouncedEvent::NoticeWrite(tmp.mkpath("my_file"))),
524 (1.into(), DebouncedEvent::Write(tmp.mkpath("my_file"))),
525 (2.into(), DebouncedEvent::Write(tmp.mkpath("my_file"))),
526 ]
527 );
528
529 assert_eq!(w.state.lock().unwrap().watchees.len(), 2);
530 w.unwatch(&tmp.mkpath("my_file"), 1.into());
531 assert_eq!(w.state.lock().unwrap().watchees.len(), 1);
532 sleep_if_macos(1000);
533 let path = tmp.mkpath("my_file");
534 tmp.remove("my_file");
535 sleep_if_macos(1000);
536 let _ = recv_all(&rx, Duration::from_millis(1000));
537 let events = w.take_events();
538 assert!(events.contains(&(2.into(), DebouncedEvent::NoticeRemove(path.clone()))));
539 assert!(!events.contains(&(1.into(), DebouncedEvent::NoticeRemove(path))));
540 }
541}