#![deny(missing_docs)]
pub use config::{Config, PathOp, RecursiveMode, WatchPathConfig, WindowsPathSeparatorStyle};
pub use error::{Error, ErrorKind, Result, UpdatePathsError};
pub use notify_types::event::{self, Event, EventKind, EventKindMask};
use std::path::{Path, PathBuf};
pub(crate) type StdResult<T, E> = std::result::Result<T, E>;
pub(crate) type Receiver<T> = std::sync::mpsc::Receiver<T>;
pub(crate) type Sender<T> = std::sync::mpsc::Sender<T>;
#[cfg(any(target_os = "linux", target_os = "android", target_os = "windows"))]
pub(crate) type BoundSender<T> = std::sync::mpsc::SyncSender<T>;
#[inline]
pub(crate) fn unbounded<T>() -> (Sender<T>, Receiver<T>) {
std::sync::mpsc::channel()
}
#[cfg(any(target_os = "linux", target_os = "android", target_os = "windows"))]
#[inline]
pub(crate) fn bounded<T>(cap: usize) -> (BoundSender<T>, Receiver<T>) {
std::sync::mpsc::sync_channel(cap)
}
#[cfg(all(target_os = "macos", not(feature = "macos_kqueue")))]
pub use crate::fsevent::FsEventWatcher;
#[cfg(any(target_os = "linux", target_os = "android"))]
pub use crate::inotify::INotifyWatcher;
#[cfg(any(
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly",
target_os = "ios",
all(target_os = "macos", feature = "macos_kqueue")
))]
pub use crate::kqueue::KqueueWatcher;
pub use null::NullWatcher;
pub use poll::PollWatcher;
#[cfg(target_os = "windows")]
pub use windows::ReadDirectoryChangesWatcher;
#[cfg(all(target_os = "macos", not(feature = "macos_kqueue")))]
pub mod fsevent;
#[cfg(any(target_os = "linux", target_os = "android"))]
pub mod inotify;
#[cfg(any(
target_os = "freebsd",
target_os = "openbsd",
target_os = "dragonfly",
target_os = "netbsd",
target_os = "ios",
all(target_os = "macos", feature = "macos_kqueue")
))]
pub mod kqueue;
#[cfg(target_os = "windows")]
pub mod windows;
pub mod null;
pub mod poll;
mod config;
mod error;
#[cfg(test)]
pub(crate) mod test;
pub trait EventHandler: Send + 'static {
fn handle_event(&mut self, event: Result<Event>);
}
impl<F> EventHandler for F
where
F: FnMut(Result<Event>) + Send + 'static,
{
fn handle_event(&mut self, event: Result<Event>) {
(self)(event);
}
}
#[cfg(feature = "crossbeam-channel")]
impl EventHandler for crossbeam_channel::Sender<Result<Event>> {
fn handle_event(&mut self, event: Result<Event>) {
let _ = self.send(event);
}
}
#[cfg(feature = "flume")]
impl EventHandler for flume::Sender<Result<Event>> {
fn handle_event(&mut self, event: Result<Event>) {
let _ = self.send(event);
}
}
#[cfg(feature = "futures")]
impl EventHandler for futures::channel::mpsc::UnboundedSender<Result<Event>> {
fn handle_event(&mut self, event: Result<Event>) {
let _ = self.unbounded_send(event);
}
}
#[cfg(feature = "tokio")]
impl EventHandler for tokio::sync::mpsc::UnboundedSender<Result<Event>> {
fn handle_event(&mut self, event: Result<Event>) {
let _ = self.send(event);
}
}
impl EventHandler for std::sync::mpsc::Sender<Result<Event>> {
fn handle_event(&mut self, event: Result<Event>) {
let _ = self.send(event);
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
#[non_exhaustive]
pub enum WatcherKind {
Inotify,
Fsevent,
Kqueue,
PollWatcher,
ReadDirectoryChangesWatcher,
NullWatcher,
}
pub trait Watcher {
fn new<F: EventHandler>(event_handler: F, config: config::Config) -> Result<Self>
where
Self: Sized;
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()>;
fn unwatch(&mut self, path: &Path) -> Result<()>;
fn update_paths(&mut self, ops: Vec<PathOp>) -> StdResult<(), UpdatePathsError> {
update_paths(ops, |op| match op {
PathOp::Watch(path, config) => self
.watch(&path, config.recursive_mode())
.map_err(|e| (PathOp::Watch(path, config), e)),
PathOp::Unwatch(path) => self.unwatch(&path).map_err(|e| (PathOp::Unwatch(path), e)),
})
}
fn configure(&mut self, _option: Config) -> Result<bool> {
Ok(false)
}
fn watched_paths(&self) -> Result<Vec<(PathBuf, RecursiveMode)>> {
Err(Error::generic(
"listing watched paths is not supported by this watcher",
))
}
fn kind() -> WatcherKind
where
Self: Sized;
}
#[cfg(any(target_os = "linux", target_os = "android"))]
pub type RecommendedWatcher = INotifyWatcher;
#[cfg(all(target_os = "macos", not(feature = "macos_kqueue")))]
pub type RecommendedWatcher = FsEventWatcher;
#[cfg(target_os = "windows")]
pub type RecommendedWatcher = ReadDirectoryChangesWatcher;
#[cfg(any(
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly",
target_os = "ios",
all(target_os = "macos", feature = "macos_kqueue")
))]
pub type RecommendedWatcher = KqueueWatcher;
#[cfg(not(any(
target_os = "linux",
target_os = "android",
target_os = "macos",
target_os = "windows",
target_os = "freebsd",
target_os = "openbsd",
target_os = "netbsd",
target_os = "dragonfly",
target_os = "ios"
)))]
pub type RecommendedWatcher = PollWatcher;
pub fn recommended_watcher<F>(event_handler: F) -> Result<RecommendedWatcher>
where
F: EventHandler,
{
RecommendedWatcher::new(event_handler, Config::default())
}
pub(crate) fn update_paths<F>(ops: Vec<PathOp>, mut apply: F) -> StdResult<(), UpdatePathsError>
where
F: FnMut(PathOp) -> StdResult<(), (PathOp, Error)>,
{
let mut iter = ops.into_iter();
while let Some(op) = iter.next() {
if let Err((error_op, source)) = apply(op) {
return Err(UpdatePathsError {
source,
origin: Some(error_op),
remaining: iter.collect(),
});
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use std::{
collections::HashSet,
fs, iter,
path::{Path, PathBuf},
sync::mpsc,
time::{Duration, Instant},
};
use tempfile::tempdir;
use super::{
Config, Error, ErrorKind, Event, NullWatcher, PathOp, PollWatcher, RecommendedWatcher,
RecursiveMode, Result, StdResult, WatchPathConfig, Watcher, WatcherKind,
};
use crate::test::*;
#[test]
fn test_object_safe() {
let _watcher: &dyn Watcher = &NullWatcher;
}
#[test]
fn test_debug_impl() {
macro_rules! assert_debug_impl {
($t:ty) => {{
#[allow(dead_code)]
trait NeedsDebug: std::fmt::Debug {}
impl NeedsDebug for $t {}
}};
}
assert_debug_impl!(Config);
assert_debug_impl!(Error);
assert_debug_impl!(ErrorKind);
assert_debug_impl!(NullWatcher);
assert_debug_impl!(PollWatcher);
assert_debug_impl!(RecommendedWatcher);
assert_debug_impl!(RecursiveMode);
assert_debug_impl!(WatcherKind);
}
fn iter_with_timeout(rx: &mpsc::Receiver<Result<Event>>) -> impl Iterator<Item = Event> + '_ {
let deadline = Instant::now() + Duration::from_secs(10);
iter::from_fn(move || {
if Instant::now() >= deadline {
return None;
}
Some(
rx.recv_timeout(deadline - Instant::now())
.expect("did not receive expected event")
.expect("received an error"),
)
})
}
fn canonical_or_path(path: &Path) -> PathBuf {
path.canonicalize()
.expect("test paths should always be canonicalizable")
}
fn canonical_watch_set(
paths: Vec<(PathBuf, RecursiveMode)>,
) -> HashSet<(PathBuf, RecursiveMode)> {
paths
.into_iter()
.map(|(path, recursive_mode)| (canonical_or_path(&path), recursive_mode))
.collect()
}
fn matches_path(path: &Path, expected: &Path, canonical_expected: Option<&PathBuf>) -> bool {
path == expected || canonical_expected.is_some_and(|canonical| path == canonical)
}
fn update_paths_unwatch_with_retry(
watcher: &mut RecommendedWatcher,
path: &Path,
) -> Result<()> {
const FSEVENT_UNWATCH_RETRIES: usize = 5;
const FSEVENT_UNWATCH_RETRY_BASE_DELAY: Duration = Duration::from_millis(50);
for attempt in 0..=FSEVENT_UNWATCH_RETRIES {
match watcher.update_paths(vec![PathOp::unwatch(path)]) {
Ok(()) => return Ok(()),
Err(err)
if RecommendedWatcher::kind() == WatcherKind::Fsevent
&& matches!(
&err.source.kind,
ErrorKind::Io(io_err) if io_err.raw_os_error() == Some(9)
)
&& attempt < FSEVENT_UNWATCH_RETRIES =>
{
let delay_factor = 1u32 << attempt;
std::thread::sleep(FSEVENT_UNWATCH_RETRY_BASE_DELAY * delay_factor);
}
Err(err) => return Err(err.into()),
}
}
unreachable!("fsevent unwatch retries must return or error")
}
#[test]
fn integration() -> std::result::Result<(), Box<dyn std::error::Error>> {
let dir = tempdir()?;
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
watcher.watch(dir.path(), RecursiveMode::Recursive)?;
let file_path = dir.path().join("file.txt");
fs::write(&file_path, b"Lorem ipsum")?;
println!("waiting for event at {}", file_path.display());
for event in iter_with_timeout(&rx) {
if event.paths == vec![file_path.clone()]
|| event.paths == vec![file_path.canonicalize()?]
{
return Ok(());
}
println!("unexpected event: {event:?}");
}
panic!("did not receive expected event");
}
#[test]
#[cfg(target_os = "windows")]
fn test_windows_trash_dir() -> std::result::Result<(), Box<dyn std::error::Error>> {
use crate::recommended_watcher;
let dir = tempdir()?;
let child_dir = dir.path().join("child");
fs::create_dir(&child_dir)?;
let mut watcher = recommended_watcher(|_| {
})?;
watcher.watch(&child_dir, RecursiveMode::NonRecursive)?;
trash::delete(&child_dir)?;
watcher.watch(dir.path(), RecursiveMode::NonRecursive)?;
Ok(())
}
#[test]
fn test_update_paths() -> std::result::Result<(), Box<dyn std::error::Error>> {
let dir = tempdir()?;
let dir_a = dir.path().join("a");
let dir_b = dir.path().join("b");
fs::create_dir(&dir_a)?;
fs::create_dir(&dir_b)?;
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
watcher.update_paths(vec![
PathOp::Watch(
dir_a.clone(),
WatchPathConfig::new(RecursiveMode::Recursive),
),
PathOp::Watch(
dir_b.clone(),
WatchPathConfig::new(RecursiveMode::Recursive),
),
])?;
let a_file1 = dir_a.join("file1");
let b_file1 = dir_b.join("file1");
fs::write(&a_file1, b"Lorem ipsum")?;
fs::write(&b_file1, b"Lorem ipsum")?;
let a_file1_canonical = a_file1.canonicalize().ok();
let b_file1_canonical = b_file1.canonicalize().ok();
let mut a_file1_encountered: bool = false;
let mut b_file1_encountered: bool = false;
for event in iter_with_timeout(&rx) {
for path in event.paths {
a_file1_encountered = a_file1_encountered
|| matches_path(&path, &a_file1, a_file1_canonical.as_ref());
b_file1_encountered = b_file1_encountered
|| matches_path(&path, &b_file1, b_file1_canonical.as_ref());
}
if a_file1_encountered && b_file1_encountered {
break;
}
}
assert!(a_file1_encountered, "Did not receive event of {a_file1:?}");
assert!(b_file1_encountered, "Did not receive event of {b_file1:?}");
update_paths_unwatch_with_retry(&mut watcher, &dir_a)?;
let a_file2 = dir_a.join("file2");
let b_file2 = dir_b.join("file2");
fs::write(&a_file2, b"Lorem ipsum")?;
fs::write(&b_file2, b"Lorem ipsum")?;
let a_file2_canonical = a_file2.canonicalize().ok();
let b_file2_canonical = b_file2.canonicalize().ok();
for event in iter_with_timeout(&rx) {
for path in event.paths {
assert!(
!matches_path(&path, &a_file2, a_file2_canonical.as_ref()),
"Event of {a_file2:?} should not be received"
);
if matches_path(&path, &b_file2, b_file2_canonical.as_ref()) {
return Ok(());
}
}
}
panic!("Did not receive the event of {b_file2:?}");
}
#[test]
fn watched_paths_reflect_watch_and_unwatch(
) -> std::result::Result<(), Box<dyn std::error::Error>> {
let dir = tempdir()?;
let dir_a = dir.path().join("a");
let dir_b = dir.path().join("b");
fs::create_dir(&dir_a)?;
fs::create_dir(&dir_b)?;
let (tx, _rx) = std::sync::mpsc::channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
watcher.watch(&dir_a, RecursiveMode::Recursive)?;
watcher.watch(&dir_b, RecursiveMode::NonRecursive)?;
let watched = canonical_watch_set(watcher.watched_paths()?);
assert!(watched.contains(&(canonical_or_path(&dir_a), RecursiveMode::Recursive)));
assert!(watched.contains(&(canonical_or_path(&dir_b), RecursiveMode::NonRecursive)));
watcher.unwatch(&dir_a)?;
let watched = canonical_watch_set(watcher.watched_paths()?);
assert!(!watched.contains(&(canonical_or_path(&dir_a), RecursiveMode::Recursive)));
assert!(watched.contains(&(canonical_or_path(&dir_b), RecursiveMode::NonRecursive)));
Ok(())
}
#[test]
fn update_paths_in_a_loop_with_errors() -> StdResult<(), Box<dyn std::error::Error>> {
let dir = tempdir()?;
let existing_dir_1 = dir.path().join("existing_dir_1");
let not_existent_file = dir.path().join("not_existent_file");
let existing_dir_2 = dir.path().join("existing_dir_2");
fs::create_dir(&existing_dir_1)?;
fs::create_dir(&existing_dir_2)?;
let mut paths_to_add = vec![
PathOp::watch_recursive(existing_dir_1.clone()),
PathOp::watch_recursive(not_existent_file.clone()),
PathOp::watch_recursive(existing_dir_2.clone()),
];
let (tx, rx) = std::sync::mpsc::channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
while !paths_to_add.is_empty() {
if let Err(e) = watcher.update_paths(std::mem::take(&mut paths_to_add)) {
paths_to_add = e.remaining;
}
}
fs::write(existing_dir_1.join("1"), "")?;
fs::write(¬_existent_file, "")?;
let waiting_path = existing_dir_2.join("1");
fs::write(&waiting_path, "")?;
let waiting_path_canonical = waiting_path.canonicalize().ok();
for event in iter_with_timeout(&rx) {
let path = event
.paths
.first()
.unwrap_or_else(|| panic!("event must have a path: {event:?}"));
assert!(
path != ¬_existent_file,
"unexpected {not_existent_file:?} event"
);
if matches_path(path, &waiting_path, waiting_path_canonical.as_ref()) {
return Ok(());
}
}
panic!("Did not receive the event of {waiting_path:?}");
}
#[test]
fn create_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = recommended_channel();
watcher.watch_recursively(&tmpdir);
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("create");
rx.wait_unordered([expected(path).create()]);
}
#[test]
fn create_dir() {
let tmpdir = testdir();
let (mut watcher, mut rx) = recommended_channel();
watcher.watch_recursively(&tmpdir);
let path = tmpdir.path().join("entry");
std::fs::create_dir(&path).expect("create");
rx.wait_unordered([expected(path).create()]);
}
#[test]
fn modify_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = recommended_channel();
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("create");
watcher.watch_recursively(&tmpdir);
std::fs::write(&path, b"123").expect("write");
rx.wait_unordered([expected(path).modify()]);
}
#[test]
fn remove_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = recommended_channel();
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("create");
watcher.watch_recursively(&tmpdir);
std::fs::remove_file(&path).expect("remove");
rx.wait_unordered([expected(path).remove()]);
}
#[cfg(feature = "futures")]
#[tokio::test]
async fn futures_unbounded_sender_as_handler() {
use crate::recommended_watcher;
use futures::StreamExt;
let tmpdir = testdir();
let (tx, mut rx) = futures::channel::mpsc::unbounded();
let mut watcher = recommended_watcher(tx).unwrap();
watcher
.watch(tmpdir.path(), RecursiveMode::NonRecursive)
.unwrap();
std::fs::create_dir(tmpdir.path().join("1")).unwrap();
tokio::time::timeout(Duration::from_secs(5), rx.next())
.await
.expect("timeout")
.expect("No event")
.expect("Error");
}
#[cfg(feature = "tokio")]
#[tokio::test]
async fn tokio_unbounded_sender_as_handler() {
use crate::recommended_watcher;
let tmpdir = testdir();
let (tx, mut rx) = tokio::sync::mpsc::unbounded_channel();
let mut watcher = recommended_watcher(tx).unwrap();
watcher
.watch(tmpdir.path(), RecursiveMode::NonRecursive)
.unwrap();
std::fs::create_dir(tmpdir.path().join("1")).unwrap();
tokio::time::timeout(Duration::from_secs(5), rx.recv())
.await
.expect("timeout")
.expect("No event")
.expect("Error");
}
#[test]
fn update_paths_error_contains_errored_path() {
let err = super::update_paths(
[
PathOp::unwatch("1"),
PathOp::unwatch("2"),
PathOp::unwatch("3"),
]
.into(),
|op| {
if op.as_path() == Path::new("2") {
Err((op, super::Error::path_not_found()))
} else {
Ok(())
}
},
)
.unwrap_err();
assert_eq!(
&err.into_iter().map(PathOp::into_path).collect::<Vec<_>>(),
&[PathBuf::from("2"), PathBuf::from("3"),]
)
}
}