use super::event::*;
use super::{Config, Error, EventHandler, EventKindMask, RecursiveMode, Result, Watcher};
use crate::{unbounded, Receiver, Sender};
use kqueue::{EventData, EventFilter, FilterFlag, Ident};
use std::collections::HashMap;
use std::env;
use std::fs::metadata;
use std::os::unix::io::AsRawFd;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;
use walkdir::WalkDir;
const KQUEUE: mio::Token = mio::Token(0);
const MESSAGE: mio::Token = mio::Token(1);
struct EventLoop {
running: bool,
poll: mio::Poll,
event_loop_waker: Arc<mio::Waker>,
event_loop_tx: Sender<EventLoopMsg>,
event_loop_rx: Receiver<EventLoopMsg>,
kqueue: kqueue::Watcher,
event_handler: Box<dyn EventHandler>,
watches: HashMap<PathBuf, bool>,
follow_symlinks: bool,
event_kinds: EventKindMask,
}
#[derive(Debug)]
pub struct KqueueWatcher {
channel: Sender<EventLoopMsg>,
waker: Arc<mio::Waker>,
}
enum EventLoopMsg {
AddWatch(PathBuf, RecursiveMode, Sender<Result<()>>),
RemoveWatch(PathBuf, Sender<Result<()>>),
GetWatchedPaths(Sender<Vec<(PathBuf, RecursiveMode)>>),
Shutdown,
}
impl EventLoop {
pub fn new(
kqueue: kqueue::Watcher,
event_handler: Box<dyn EventHandler>,
follow_symlinks: bool,
event_kinds: EventKindMask,
) -> Result<Self> {
let (event_loop_tx, event_loop_rx) = unbounded::<EventLoopMsg>();
let poll = mio::Poll::new()?;
let event_loop_waker = Arc::new(mio::Waker::new(poll.registry(), MESSAGE)?);
let kqueue_fd = kqueue.as_raw_fd();
let mut evented_kqueue = mio::unix::SourceFd(&kqueue_fd);
poll.registry()
.register(&mut evented_kqueue, KQUEUE, mio::Interest::READABLE)?;
let event_loop = EventLoop {
running: true,
poll,
event_loop_waker,
event_loop_tx,
event_loop_rx,
kqueue,
event_handler,
watches: HashMap::new(),
follow_symlinks,
event_kinds,
};
Ok(event_loop)
}
pub fn run(self) {
let _ = thread::Builder::new()
.name("notify-rs kqueue loop".to_string())
.spawn(|| self.event_loop_thread());
}
fn event_loop_thread(mut self) {
let mut events = mio::Events::with_capacity(16);
loop {
match self.poll.poll(&mut events, None) {
Err(ref e) if matches!(e.kind(), std::io::ErrorKind::Interrupted) => {
}
Err(e) => panic!("poll failed: {}", e),
Ok(()) => {}
}
for event in &events {
self.handle_event(event);
}
if !self.running {
break;
}
}
}
fn handle_event(&mut self, event: &mio::event::Event) {
match event.token() {
MESSAGE => {
self.handle_messages()
}
KQUEUE => {
self.handle_kqueue()
}
_ => unreachable!(),
}
}
fn handle_messages(&mut self) {
while let Ok(msg) = self.event_loop_rx.try_recv() {
match msg {
EventLoopMsg::AddWatch(path, recursive_mode, tx) => {
let _ = tx.send(self.add_watch(path, recursive_mode.is_recursive()));
}
EventLoopMsg::RemoveWatch(path, tx) => {
let _ = tx.send(self.remove_watch(path, false));
}
EventLoopMsg::GetWatchedPaths(tx) => {
let _ = tx.send(
self.watches
.iter()
.map(|(path, is_recursive)| {
(
path.clone(),
if *is_recursive {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
},
)
})
.collect(),
);
}
EventLoopMsg::Shutdown => {
self.running = false;
break;
}
}
}
}
fn handle_kqueue(&mut self) {
let mut add_watches = Vec::new();
let mut remove_watches = Vec::new();
while let Some(event) = self.kqueue.poll(None) {
log::trace!("kqueue event: {event:?}");
match event {
kqueue::Event {
data: EventData::Vnode(data),
ident: Ident::Filename(_, path),
} => {
let path = PathBuf::from(path);
let event = match data {
kqueue::Vnode::Delete => {
remove_watches.push(path.clone());
Ok(Event::new(EventKind::Remove(RemoveKind::Any)).add_path(path))
}
kqueue::Vnode::Write if path.is_dir() => {
std::fs::read_dir(&path)
.map(|dir| {
dir.filter_map(std::result::Result::ok)
.map(|f| f.path())
.find(|f| !self.watches.contains_key(f))
})
.map(|file| {
if let Some(file) = file {
add_watches.push(file.clone());
Event::new(EventKind::Create(if file.is_dir() {
CreateKind::Folder
} else if file.is_file() {
CreateKind::File
} else {
CreateKind::Other
}))
.add_path(file)
} else {
Event::new(EventKind::Modify(ModifyKind::Data(
DataChange::Any,
)))
.add_path(path)
}
})
.map_err(Into::into)
}
kqueue::Vnode::Write => Ok(Event::new(EventKind::Modify(
ModifyKind::Data(DataChange::Any),
))
.add_path(path)),
kqueue::Vnode::Extend | kqueue::Vnode::Truncate => Ok(Event::new(
EventKind::Modify(ModifyKind::Data(DataChange::Size)),
)
.add_path(path)),
kqueue::Vnode::Attrib => Ok(Event::new(EventKind::Modify(
ModifyKind::Metadata(MetadataKind::Any),
))
.add_path(path)),
kqueue::Vnode::Link => {
remove_watches.push(path.clone());
add_watches.push(path.clone());
Ok(Event::new(EventKind::Modify(ModifyKind::Any)).add_path(path))
}
kqueue::Vnode::Rename => {
remove_watches.push(path.clone());
Ok(
Event::new(EventKind::Modify(ModifyKind::Name(RenameMode::Any)))
.add_path(path),
)
}
kqueue::Vnode::Revoke => {
remove_watches.push(path.clone());
Ok(Event::new(EventKind::Remove(RemoveKind::Any)).add_path(path))
}
#[allow(unreachable_patterns)]
_ => Ok(Event::new(EventKind::Other)),
};
match &event {
Ok(e) if !self.event_kinds.matches(&e.kind) => {
}
_ => self.event_handler.handle_event(event),
}
}
kqueue::Event { ident: _, data: _ } => unreachable!(),
}
}
for path in remove_watches {
self.remove_watch(path, true).ok();
}
for path in add_watches {
self.add_watch(path, true).ok();
}
}
fn add_watch(&mut self, path: PathBuf, is_recursive: bool) -> Result<()> {
if !is_recursive || !metadata(&path).map_err(Error::io)?.is_dir() {
self.add_single_watch(path, false)?;
} else {
for entry in WalkDir::new(path)
.follow_links(self.follow_symlinks)
.into_iter()
{
let entry = entry.map_err(map_walkdir_error)?;
self.add_single_watch(entry.into_path(), is_recursive)?;
}
}
self.kqueue.watch()?;
Ok(())
}
fn add_single_watch(&mut self, path: PathBuf, is_recursive: bool) -> Result<()> {
let event_filter = EventFilter::EVFILT_VNODE;
let filter_flags = FilterFlag::NOTE_DELETE
| FilterFlag::NOTE_WRITE
| FilterFlag::NOTE_EXTEND
| FilterFlag::NOTE_ATTRIB
| FilterFlag::NOTE_LINK
| FilterFlag::NOTE_RENAME
| FilterFlag::NOTE_REVOKE;
log::trace!("adding kqueue watch: {}", path.display());
self.kqueue
.add_filename(&path, event_filter, filter_flags)
.map_err(|e| Error::io(e).add_path(path.clone()))?;
self.watches.insert(path, is_recursive);
Ok(())
}
fn remove_watch(&mut self, path: PathBuf, remove_recursive: bool) -> Result<()> {
log::trace!("removing kqueue watch: {}", path.display());
match self.watches.remove(&path) {
None => return Err(Error::watch_not_found()),
Some(is_recursive) => {
if is_recursive || remove_recursive {
for entry in WalkDir::new(path)
.follow_links(self.follow_symlinks)
.into_iter()
{
let p = entry.map_err(map_walkdir_error)?.into_path();
self.kqueue
.remove_filename(&p, EventFilter::EVFILT_VNODE)
.map_err(|e| Error::io(e).add_path(p))?;
}
} else {
self.kqueue
.remove_filename(&path, EventFilter::EVFILT_VNODE)
.map_err(|e| Error::io(e).add_path(path.clone()))?;
}
self.kqueue.watch()?;
}
}
Ok(())
}
}
fn map_walkdir_error(e: walkdir::Error) -> Error {
if e.io_error().is_some() {
Error::io(e.into_io_error().unwrap())
} else {
Error::generic(&e.to_string())
}
}
impl KqueueWatcher {
fn from_event_handler(
event_handler: Box<dyn EventHandler>,
follow_symlinks: bool,
event_kinds: EventKindMask,
) -> Result<Self> {
let kqueue = kqueue::Watcher::new()?;
let event_loop = EventLoop::new(kqueue, event_handler, follow_symlinks, event_kinds)?;
let channel = event_loop.event_loop_tx.clone();
let waker = event_loop.event_loop_waker.clone();
event_loop.run();
Ok(KqueueWatcher { channel, waker })
}
fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
let pb = if path.is_absolute() {
path.to_owned()
} else {
let p = env::current_dir().map_err(Error::io)?;
p.join(path)
};
let (tx, rx) = unbounded();
let msg = EventLoopMsg::AddWatch(pb, recursive_mode, tx);
self.channel
.send(msg)
.map_err(|e| Error::generic(&e.to_string()))?;
self.waker
.wake()
.map_err(|e| Error::generic(&e.to_string()))?;
rx.recv()
.unwrap()
.map_err(|e| Error::generic(&e.to_string()))
}
fn unwatch_inner(&mut self, path: &Path) -> Result<()> {
let pb = if path.is_absolute() {
path.to_owned()
} else {
let p = env::current_dir().map_err(Error::io)?;
p.join(path)
};
let (tx, rx) = unbounded();
let msg = EventLoopMsg::RemoveWatch(pb, tx);
self.channel
.send(msg)
.map_err(|e| Error::generic(&e.to_string()))?;
self.waker
.wake()
.map_err(|e| Error::generic(&e.to_string()))?;
rx.recv()
.unwrap()
.map_err(|e| Error::generic(&e.to_string()))
}
fn watched_paths_inner(&self) -> Result<Vec<(PathBuf, RecursiveMode)>> {
let (tx, rx) = unbounded();
self.channel.send(EventLoopMsg::GetWatchedPaths(tx))?;
self.waker.wake()?;
rx.recv().map_err(Error::from)
}
}
impl Watcher for KqueueWatcher {
fn new<F: EventHandler>(event_handler: F, config: Config) -> Result<Self> {
Self::from_event_handler(
Box::new(event_handler),
config.follow_symlinks(),
config.event_kinds(),
)
}
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
self.watch_inner(path, recursive_mode)
}
fn unwatch(&mut self, path: &Path) -> Result<()> {
self.unwatch_inner(path)
}
fn watched_paths(&self) -> Result<Vec<(PathBuf, RecursiveMode)>> {
self.watched_paths_inner()
}
fn kind() -> crate::WatcherKind {
crate::WatcherKind::Kqueue
}
}
impl Drop for KqueueWatcher {
fn drop(&mut self) {
self.channel.send(EventLoopMsg::Shutdown).unwrap();
self.waker.wake().unwrap();
}
}
#[cfg(test)]
mod tests {
use std::time::Duration;
use super::*;
use crate::test::{self, *};
fn watcher() -> (TestWatcher<KqueueWatcher>, test::Receiver) {
channel()
}
#[test]
fn test_remove_recursive() -> std::result::Result<(), Box<dyn std::error::Error>> {
let path = PathBuf::from("src");
let mut watcher = KqueueWatcher::new(|event| println!("{:?}", event), Config::default())?;
watcher.watch(&path, RecursiveMode::Recursive)?;
let result = watcher.unwatch(&path);
assert!(
result.is_ok(),
"unwatch yielded error: {}",
result.unwrap_err()
);
Ok(())
}
#[test]
fn create_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
watcher.watch_recursively(&tmpdir);
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("create");
rx.wait_unordered([expected(path).create_file()]);
}
#[test]
fn write_file() {
let tmpdir = testdir();
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("create");
let (mut watcher, mut rx) = watcher();
watcher.watch_recursively(&tmpdir);
std::fs::write(&path, b"123").expect("write");
rx.wait_unordered([expected(&path).modify_data_any()]);
}
#[test]
fn chmod_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
let file = std::fs::File::create_new(&path).expect("create");
let mut permissions = file.metadata().expect("metadata").permissions();
permissions.set_readonly(true);
watcher.watch_recursively(&tmpdir);
file.set_permissions(permissions).expect("set_permissions");
rx.wait_unordered([expected(&path).modify_meta_any()]);
}
#[test]
fn rename_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("create");
watcher.watch_recursively(&tmpdir);
let new_path = tmpdir.path().join("renamed");
std::fs::rename(&path, &new_path).expect("rename");
rx.wait_unordered([expected(path).rename_any(), expected(new_path).create()]);
}
#[test]
fn delete_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let file = tmpdir.path().join("file");
std::fs::write(&file, "").expect("write");
watcher.watch_nonrecursively(&tmpdir);
std::fs::remove_file(&file).expect("remove");
rx.wait_unordered([expected(tmpdir.path()).modify_data_any()]);
}
#[test]
fn delete_self_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let file = tmpdir.path().join("file");
std::fs::write(&file, "").expect("write");
watcher.watch_nonrecursively(&file);
std::fs::remove_file(&file).expect("remove");
rx.wait_unordered([expected(file).remove_any()]);
}
#[test]
#[ignore = "FIXME"]
fn create_write_overwrite() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let overwritten_file = tmpdir.path().join("overwritten_file");
let overwriting_file = tmpdir.path().join("overwriting_file");
std::fs::write(&overwritten_file, "123").expect("write1");
watcher.watch_nonrecursively(&tmpdir);
std::fs::File::create(&overwriting_file).expect("create");
std::fs::write(&overwriting_file, "321").expect("write2");
std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
rx.wait_unordered([
expected(&overwriting_file).create_file(),
expected(&overwriting_file).modify_data_any().multiple(),
expected(&overwriting_file).rename_any(),
expected(&overwritten_file).rename_any(),
])
.ensure_no_tail();
}
#[test]
fn create_dir() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
watcher.watch_recursively(&tmpdir);
let path = tmpdir.path().join("entry");
std::fs::create_dir(&path).expect("create");
rx.wait_unordered([expected(&path).create_folder()]);
}
#[test]
fn chmod_dir() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
std::fs::create_dir(&path).expect("create_dir");
let mut permissions = std::fs::metadata(&path).expect("metadata").permissions();
permissions.set_readonly(true);
watcher.watch_recursively(&tmpdir);
std::fs::set_permissions(&path, permissions).expect("set_permissions");
rx.wait_unordered([expected(&path).modify_meta_any()]);
}
#[test]
fn rename_dir() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
let new_path = tmpdir.path().join("new_path");
std::fs::create_dir(&path).expect("create_dir");
watcher.watch_recursively(&tmpdir);
std::fs::rename(&path, &new_path).expect("rename");
rx.wait_ordered([
expected(&new_path).create_folder(),
expected(&path).rename_any(),
]);
}
#[test]
fn delete_dir() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
std::fs::create_dir(&path).expect("create_dir");
watcher.watch_recursively(&tmpdir);
std::fs::remove_dir(&path).expect("remove");
rx.wait_unordered([expected(path).remove_any()]);
}
#[test]
#[ignore = "FIXME"]
fn rename_dir_twice() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
let new_path = tmpdir.path().join("new_path");
let new_path2 = tmpdir.path().join("new_path2");
std::fs::create_dir(&path).expect("create_dir");
watcher.watch_recursively(&tmpdir);
std::fs::rename(&path, &new_path).expect("rename");
std::fs::rename(&new_path, &new_path2).expect("rename2");
rx.wait_unordered([
expected(&path).rename_any(),
expected(&new_path).create_folder(),
expected(&new_path).rename_any(),
expected(&new_path2).create_folder(),
]);
}
#[test]
fn move_out_of_watched_dir() {
let tmpdir = testdir();
let subdir = tmpdir.path().join("subdir");
let (mut watcher, mut rx) = watcher();
let path = subdir.join("entry");
std::fs::create_dir_all(&subdir).expect("create_dir_all");
std::fs::File::create_new(&path).expect("create");
watcher.watch_recursively(&subdir);
let new_path = tmpdir.path().join("entry");
std::fs::rename(&path, &new_path).expect("rename");
rx.wait_unordered([expected(path).rename_any()]);
}
#[test]
#[ignore = "FIXME"]
fn create_write_write_rename_write_remove() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let file1 = tmpdir.path().join("entry");
let file2 = tmpdir.path().join("entry2");
std::fs::File::create_new(&file2).expect("create file2");
let new_path = tmpdir.path().join("renamed");
watcher.watch_recursively(&tmpdir);
std::fs::write(&file1, "123").expect("write 1");
std::fs::write(&file2, "321").expect("write 2");
std::fs::rename(&file1, &new_path).expect("rename");
std::fs::write(&new_path, b"1").expect("write 3");
std::fs::remove_file(&new_path).expect("remove");
rx.wait_ordered([
expected(&file1).create_file(),
expected(&file1).modify_data_content(),
expected(&file2).modify_data_content(),
expected(&file1).rename_any(),
expected(&new_path).rename_any(),
expected(&new_path).modify_data_content(),
expected(&new_path).remove_file(),
]);
}
#[test]
#[ignore = "FIXME"]
fn rename_twice() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("create");
watcher.watch_recursively(&tmpdir);
let new_path1 = tmpdir.path().join("renamed1");
let new_path2 = tmpdir.path().join("renamed2");
std::fs::rename(&path, &new_path1).expect("rename1");
std::fs::rename(&new_path1, &new_path2).expect("rename2");
rx.wait_unordered([
expected(&path).rename_any(),
expected(&new_path1).rename_any(),
expected(&new_path2).create(),
]);
}
#[test]
fn set_file_mtime() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
let file = std::fs::File::create_new(&path).expect("create");
watcher.watch_recursively(&tmpdir);
file.set_modified(
std::time::SystemTime::now()
.checked_sub(Duration::from_secs(60 * 60))
.expect("time"),
)
.expect("set_time");
rx.wait_unordered([expected(&path).modify_meta_any()]);
}
#[test]
fn write_file_non_recursive_watch() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("create");
watcher.watch_nonrecursively(&path);
std::fs::write(&path, b"123").expect("write");
rx.wait_unordered([expected(path).modify_data_any()]);
}
#[test]
fn write_to_a_hardlink_pointed_to_the_watched_file_triggers_an_event() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let subdir = tmpdir.path().join("subdir");
let file = subdir.join("file");
let hardlink = tmpdir.path().join("hardlink");
std::fs::create_dir(&subdir).expect("create");
std::fs::write(&file, "").expect("file");
std::fs::hard_link(&file, &hardlink).expect("hardlink");
watcher.watch_nonrecursively(&file);
std::fs::write(&hardlink, "123123").expect("write to the hard link");
rx.wait_unordered([expected(file).modify_data_any()]);
}
#[test]
#[ignore = "FIXME"]
fn recursive_creation() {
let tmpdir = testdir();
let nested1 = tmpdir.path().join("1");
let nested2 = tmpdir.path().join("1/2");
let nested3 = tmpdir.path().join("1/2/3");
let nested4 = tmpdir.path().join("1/2/3/4");
let nested5 = tmpdir.path().join("1/2/3/4/5");
let nested6 = tmpdir.path().join("1/2/3/4/5/6");
let nested7 = tmpdir.path().join("1/2/3/4/5/6/7");
let nested8 = tmpdir.path().join("1/2/3/4/5/6/7/8");
let nested9 = tmpdir.path().join("1/2/3/4/5/6/7/8/9");
let (mut watcher, mut rx) = watcher();
watcher.watch_recursively(&tmpdir);
std::fs::create_dir_all(&nested9).expect("create_dir_all");
rx.wait_unordered([
expected(&nested1).create_folder(),
expected(&nested2).create_folder(),
expected(&nested3).create_folder(),
expected(&nested4).create_folder(),
expected(&nested5).create_folder(),
expected(&nested6).create_folder(),
expected(&nested7).create_folder(),
expected(&nested8).create_folder(),
expected(&nested9).create_folder(),
]);
}
#[test]
fn kqueue_watcher_respects_event_kind_mask() {
use crate::Watcher;
use notify_types::event::EventKindMask;
let tmpdir = testdir();
let (tx, rx) = std::sync::mpsc::channel();
let config = Config::default().with_event_kinds(EventKindMask::CREATE);
let mut watcher = KqueueWatcher::new(tx, config).expect("create watcher");
watcher
.watch(tmpdir.path(), crate::RecursiveMode::Recursive)
.expect("watch");
let path = tmpdir.path().join("test_file");
std::fs::File::create_new(&path).expect("create");
std::thread::sleep(Duration::from_millis(100));
std::fs::write(&path, "modified content").expect("write modified");
std::thread::sleep(Duration::from_millis(100));
let events: Vec<_> = rx.try_iter().filter_map(|r| r.ok()).collect();
assert!(
events.iter().any(|e| e.kind.is_create()),
"Expected CREATE event, got: {:?}",
events
);
assert!(
!events.iter().any(|e| e.kind.is_modify()),
"Should not receive MODIFY events with CREATE-only mask, got: {:?}",
events
);
}
}