use crate::{unbounded, Config, Error, EventHandler, Receiver, RecursiveMode, Sender, Watcher};
use std::{
collections::HashMap,
path::{Path, PathBuf},
sync::{
atomic::{AtomicBool, Ordering},
Arc, Mutex,
},
thread,
time::Duration,
};
pub(crate) enum PollMessage {
Poll,
PollAndWait(Sender<crate::Result<()>>),
}
pub type ScanEvent = crate::Result<PathBuf>;
pub trait ScanEventHandler: Send + 'static {
fn handle_event(&mut self, event: ScanEvent);
}
impl<F> ScanEventHandler for F
where
F: FnMut(ScanEvent) + Send + 'static,
{
fn handle_event(&mut self, event: ScanEvent) {
(self)(event);
}
}
#[cfg(feature = "crossbeam-channel")]
impl ScanEventHandler for crossbeam_channel::Sender<ScanEvent> {
fn handle_event(&mut self, event: ScanEvent) {
let _ = self.send(event);
}
}
#[cfg(feature = "flume")]
impl ScanEventHandler for flume::Sender<ScanEvent> {
fn handle_event(&mut self, event: ScanEvent) {
let _ = self.send(event);
}
}
impl ScanEventHandler for std::sync::mpsc::Sender<ScanEvent> {
fn handle_event(&mut self, event: ScanEvent) {
let _ = self.send(event);
}
}
impl ScanEventHandler for () {
fn handle_event(&mut self, _event: ScanEvent) {}
}
use data::{DataBuilder, WatchData};
mod data {
use crate::{
event::{CreateKind, DataChange, Event, EventKind, MetadataKind, ModifyKind, RemoveKind},
EventHandler, RecursiveMode,
};
use notify_types::event::EventKindMask;
use std::{
cell::RefCell,
collections::HashMap,
fmt::{self, Debug},
fs::{self, File, Metadata},
io::{self, Read},
path::{Path, PathBuf},
time::Instant,
};
use walkdir::WalkDir;
use xxhash_rust::xxh3::Xxh3Default;
use super::ScanEventHandler;
fn system_time_to_seconds(time: std::time::SystemTime) -> i64 {
match time.duration_since(std::time::SystemTime::UNIX_EPOCH) {
Ok(d) => d.as_secs() as i64,
Err(e) => -(e.duration().as_secs() as i64),
}
}
pub(super) struct DataBuilder {
emitter: EventEmitter,
scan_emitter: Option<Box<RefCell<dyn ScanEventHandler>>>,
compare_contents: bool,
now: Instant,
}
impl DataBuilder {
pub(super) fn new<F, G>(
event_handler: F,
compare_content: bool,
scan_emitter: Option<G>,
event_kinds: EventKindMask,
) -> Self
where
F: EventHandler,
G: ScanEventHandler,
{
let scan_emitter = match scan_emitter {
None => None,
Some(v) => {
let intermediate: Box<RefCell<dyn ScanEventHandler>> =
Box::new(RefCell::new(v));
Some(intermediate)
}
};
Self {
emitter: EventEmitter::new(event_handler, event_kinds),
scan_emitter,
compare_contents: compare_content,
now: Instant::now(),
}
}
pub(super) fn update_timestamp(&mut self) {
self.now = Instant::now();
}
pub(super) fn build_watch_data(
&self,
root: PathBuf,
is_recursive: bool,
follow_symlinks: bool,
) -> Option<WatchData> {
WatchData::new(self, root, is_recursive, follow_symlinks)
}
fn build_path_data(&self, meta_path: &MetaPath) -> PathData {
PathData::new(self, meta_path)
}
}
impl Debug for DataBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("DataBuilder")
.field("compare_contents", &self.compare_contents)
.field("now", &self.now)
.finish()
}
}
#[derive(Debug)]
pub(super) struct WatchData {
root: PathBuf,
is_recursive: bool,
follow_symlinks: bool,
all_path_data: HashMap<PathBuf, PathData>,
}
impl WatchData {
fn new(
data_builder: &DataBuilder,
root: PathBuf,
is_recursive: bool,
follow_symlinks: bool,
) -> Option<Self> {
if let Err(e) = fs::metadata(&root) {
data_builder.emitter.emit_io_err(e, Some(&root));
return None;
}
let all_path_data = Self::scan_all_path_data(
data_builder,
root.clone(),
is_recursive,
follow_symlinks,
true,
)
.collect();
Some(Self {
root,
is_recursive,
follow_symlinks,
all_path_data,
})
}
pub(super) fn rescan(&mut self, data_builder: &mut DataBuilder) {
for (path, new_path_data) in Self::scan_all_path_data(
data_builder,
self.root.clone(),
self.is_recursive,
self.follow_symlinks,
false,
) {
let old_path_data = self
.all_path_data
.insert(path.clone(), new_path_data.clone());
let event =
PathData::compare_to_event(path, old_path_data.as_ref(), Some(&new_path_data));
if let Some(event) = event {
data_builder.emitter.emit_ok(event);
}
}
let mut disappeared_paths = Vec::new();
for (path, path_data) in self.all_path_data.iter() {
if path_data.last_check < data_builder.now {
disappeared_paths.push(path.clone());
}
}
for path in disappeared_paths {
let old_path_data = self.all_path_data.remove(&path);
let event = PathData::compare_to_event(path, old_path_data.as_ref(), None);
if let Some(event) = event {
data_builder.emitter.emit_ok(event);
}
}
}
fn scan_all_path_data(
data_builder: &'_ DataBuilder,
root: PathBuf,
is_recursive: bool,
follow_symlinks: bool,
is_initial: bool,
) -> impl Iterator<Item = (PathBuf, PathData)> + '_ {
log::trace!("rescanning {root:?}");
WalkDir::new(root)
.follow_links(follow_symlinks)
.max_depth(Self::dir_scan_depth(is_recursive))
.into_iter()
.filter_map(|entry_res| match entry_res {
Ok(entry) => Some(entry),
Err(err) => {
log::warn!("walkdir error scanning {err:?}");
if let Some(io_error) = err.io_error() {
let new_io_error = io::Error::new(io_error.kind(), err.to_string());
data_builder.emitter.emit_io_err(new_io_error, err.path());
} else {
let crate_err =
crate::Error::new(crate::ErrorKind::Generic(err.to_string()));
data_builder.emitter.emit(Err(crate_err));
}
None
}
})
.filter_map(move |entry| match entry.metadata() {
Ok(metadata) => {
let path = entry.into_path();
if is_initial {
if let Some(ref emitter) = data_builder.scan_emitter {
emitter.borrow_mut().handle_event(Ok(path.clone()));
}
}
let meta_path = MetaPath::from_parts_unchecked(path, metadata);
let data_path = data_builder.build_path_data(&meta_path);
Some((meta_path.into_path(), data_path))
}
Err(e) => {
let path = entry.into_path();
data_builder.emitter.emit_io_err(e, Some(path));
None
}
})
}
fn dir_scan_depth(is_recursive: bool) -> usize {
if is_recursive {
usize::MAX
} else {
1
}
}
pub(super) fn recursive_mode(&self) -> RecursiveMode {
if self.is_recursive {
RecursiveMode::Recursive
} else {
RecursiveMode::NonRecursive
}
}
}
#[derive(Debug, Clone)]
struct PathData {
mtime: i64,
hash: Option<u64>,
last_check: Instant,
}
impl PathData {
fn new(data_builder: &DataBuilder, meta_path: &MetaPath) -> PathData {
let metadata = meta_path.metadata();
PathData {
mtime: metadata.modified().map_or(0, system_time_to_seconds),
hash: if data_builder.compare_contents && metadata.is_file() {
content_hash(meta_path.path()).ok()
} else {
None
},
last_check: data_builder.now,
}
}
fn compare_to_event<P>(
path: P,
old: Option<&PathData>,
new: Option<&PathData>,
) -> Option<Event>
where
P: Into<PathBuf>,
{
Self::compare_to_kind(old, new)
.map(|event_kind| Event::new(event_kind).add_path(path.into()))
}
fn compare_to_kind(old: Option<&PathData>, new: Option<&PathData>) -> Option<EventKind> {
match (old, new) {
(Some(old), Some(new)) => {
if new.mtime > old.mtime {
Some(EventKind::Modify(ModifyKind::Metadata(
MetadataKind::WriteTime,
)))
} else if new.hash != old.hash {
Some(EventKind::Modify(ModifyKind::Data(DataChange::Any)))
} else {
None
}
}
(None, Some(_new)) => Some(EventKind::Create(CreateKind::Any)),
(Some(_old), None) => Some(EventKind::Remove(RemoveKind::Any)),
(None, None) => None,
}
}
}
pub(super) fn content_hash(path: &Path) -> io::Result<u64> {
let mut hasher = Xxh3Default::new();
let mut file = File::open(path)?;
let mut buf = [0u8; 8 * 1024];
loop {
let n = match file.read(&mut buf) {
Ok(0) => break,
Ok(len) => len,
Err(e) if e.kind() == io::ErrorKind::Interrupted => continue,
Err(e) => return Err(e),
};
hasher.update(&buf[..n]);
}
Ok(hasher.digest())
}
#[derive(Debug)]
pub(super) struct MetaPath {
path: PathBuf,
metadata: Metadata,
}
impl MetaPath {
fn from_parts_unchecked(path: PathBuf, metadata: Metadata) -> Self {
Self { path, metadata }
}
fn path(&self) -> &Path {
&self.path
}
fn metadata(&self) -> &Metadata {
&self.metadata
}
fn into_path(self) -> PathBuf {
self.path
}
}
struct EventEmitter {
handler: Box<RefCell<dyn EventHandler>>,
event_kinds: EventKindMask,
}
impl EventEmitter {
fn new<F: EventHandler>(event_handler: F, event_kinds: EventKindMask) -> Self {
Self {
handler: Box::new(RefCell::new(event_handler)),
event_kinds,
}
}
fn emit(&self, event: crate::Result<Event>) {
self.handler.borrow_mut().handle_event(event);
}
fn emit_ok(&self, event: Event) {
if self.event_kinds.matches(&event.kind) {
self.emit(Ok(event))
}
}
fn emit_io_err<E, P>(&self, err: E, path: Option<P>)
where
E: Into<io::Error>,
P: Into<PathBuf>,
{
let e = crate::Error::io(err.into());
if let Some(path) = path {
self.emit(Err(e.add_path(path.into())));
} else {
self.emit(Err(e));
}
}
}
}
#[derive(Debug)]
pub struct PollWatcher {
watches: Arc<Mutex<HashMap<PathBuf, WatchData>>>,
data_builder: Arc<Mutex<DataBuilder>>,
want_to_stop: Arc<AtomicBool>,
message_channel: Sender<PollMessage>,
delay: Option<Duration>,
follow_sylinks: bool,
}
impl PollWatcher {
pub fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<PollWatcher> {
Self::with_opt::<_, ()>(event_handler, config, None)
}
pub fn poll(&self) -> crate::Result<()> {
self.message_channel
.send(PollMessage::Poll)
.map_err(|_| Error::generic("failed to send poll message"))?;
Ok(())
}
pub fn poll_blocking(&self) -> crate::Result<()> {
let (done_tx, done_rx) = std::sync::mpsc::channel();
self.message_channel
.send(PollMessage::PollAndWait(done_tx))
.map_err(|_| Error::generic("failed to send poll message"))?;
done_rx
.recv()
.map_err(|_| Error::generic("poll thread disconnected"))?
}
#[cfg(test)]
pub(crate) fn poll_sender(&self) -> Sender<PollMessage> {
self.message_channel.clone()
}
pub fn with_initial_scan<F: EventHandler, G: ScanEventHandler>(
event_handler: F,
config: Config,
scan_callback: G,
) -> crate::Result<PollWatcher> {
Self::with_opt(event_handler, config, Some(scan_callback))
}
fn with_opt<F: EventHandler, G: ScanEventHandler>(
event_handler: F,
config: Config,
scan_callback: Option<G>,
) -> crate::Result<PollWatcher> {
let data_builder = DataBuilder::new(
event_handler,
config.compare_contents(),
scan_callback,
config.event_kinds(),
);
let (tx, rx) = unbounded::<PollMessage>();
let poll_watcher = PollWatcher {
watches: Default::default(),
data_builder: Arc::new(Mutex::new(data_builder)),
want_to_stop: Arc::new(AtomicBool::new(false)),
delay: config.poll_interval(),
follow_sylinks: config.follow_symlinks(),
message_channel: tx,
};
poll_watcher.run(rx);
Ok(poll_watcher)
}
fn run(&self, rx: Receiver<PollMessage>) {
let watches = Arc::clone(&self.watches);
let data_builder = Arc::clone(&self.data_builder);
let want_to_stop = Arc::clone(&self.want_to_stop);
let delay = self.delay;
let _ = thread::Builder::new()
.name("notify-rs poll loop".to_string())
.spawn(move || {
let mut first_auto_scan = true;
loop {
if want_to_stop.load(Ordering::SeqCst) {
break;
}
let msg = match delay {
Some(_delay) if first_auto_scan => {
first_auto_scan = false;
None
}
Some(delay) => match rx.recv_timeout(delay) {
Ok(msg) => Some(msg),
Err(std::sync::mpsc::RecvTimeoutError::Timeout) => None,
Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => break,
},
None => match rx.recv() {
Ok(msg) => Some(msg),
Err(_) => break,
},
};
let scan_res = {
let mut watches = watches.lock().unwrap_or_else(|e| e.into_inner());
let mut data_builder =
data_builder.lock().unwrap_or_else(|e| e.into_inner());
data_builder.update_timestamp();
let vals = watches.values_mut();
for watch_data in vals {
watch_data.rescan(&mut data_builder);
}
Ok(())
};
if let Some(PollMessage::PollAndWait(done)) = msg {
let _ = done.send(scan_res);
}
}
});
}
fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) {
let mut watches = self.watches.lock().unwrap_or_else(|e| e.into_inner());
let mut data_builder = self.data_builder.lock().unwrap_or_else(|e| e.into_inner());
data_builder.update_timestamp();
let watch_data = data_builder.build_watch_data(
path.to_path_buf(),
recursive_mode.is_recursive(),
self.follow_sylinks,
);
if let Some(watch_data) = watch_data {
watches.insert(path.to_path_buf(), watch_data);
}
}
fn unwatch_inner(&mut self, path: &Path) -> crate::Result<()> {
self.watches
.lock()
.unwrap_or_else(|e| e.into_inner())
.remove(path)
.map(|_| ())
.ok_or_else(crate::Error::watch_not_found)
}
}
impl Watcher for PollWatcher {
fn new<F: EventHandler>(event_handler: F, config: Config) -> crate::Result<Self> {
Self::new(event_handler, config)
}
fn watch(&mut self, path: &Path, recursive_mode: RecursiveMode) -> crate::Result<()> {
self.watch_inner(path, recursive_mode);
Ok(())
}
fn unwatch(&mut self, path: &Path) -> crate::Result<()> {
self.unwatch_inner(path)
}
fn watched_paths(&self) -> crate::Result<Vec<(PathBuf, RecursiveMode)>> {
let watches = self.watches.lock().map_err(crate::Error::from)?;
Ok(watches
.iter()
.map(|(path, watch)| (path.clone(), watch.recursive_mode()))
.collect())
}
fn kind() -> crate::WatcherKind {
crate::WatcherKind::PollWatcher
}
}
impl Drop for PollWatcher {
fn drop(&mut self) {
self.want_to_stop.store(true, Ordering::Relaxed);
}
}
#[cfg(test)]
mod tests {
use super::PollWatcher;
use crate::{test::*, Config, RecursiveMode, Watcher};
fn watcher() -> (TestWatcher<PollWatcher>, Receiver) {
poll_watcher_channel()
}
#[test]
fn poll_watcher_is_send_and_sync() {
fn check<T: Send + Sync>() {}
check::<PollWatcher>();
}
#[test]
fn unwatch_with_poisoned_mutex_does_not_panic() {
use std::{path::Path, sync::Arc};
let mut watcher = PollWatcher::new(|_| {}, Config::default()).expect("create watcher");
let watches = Arc::clone(&watcher.watches);
let _ = std::thread::spawn(move || {
let _guard = watches.lock().expect("lock watches");
panic!("poison watches mutex for test");
})
.join();
let result = watcher.unwatch_inner(Path::new("/path/that/is/not/watched"));
assert!(result.is_err());
}
#[test]
fn watched_paths_reflect_watch_and_unwatch() {
let tmpdir = testdir();
let dir_a = tmpdir.path().join("a");
let dir_b = tmpdir.path().join("b");
std::fs::create_dir(&dir_a).expect("create dir a");
std::fs::create_dir(&dir_b).expect("create dir b");
let mut watcher = PollWatcher::new(|_| {}, Config::default()).expect("create watcher");
watcher
.watch(&dir_a, RecursiveMode::Recursive)
.expect("watch dir a");
watcher
.watch(&dir_b, RecursiveMode::NonRecursive)
.expect("watch dir b");
let watched = watcher.watched_paths().expect("list watched paths");
assert!(watched.contains(&(
dir_a.canonicalize().expect("canonicalize dir a"),
RecursiveMode::Recursive,
)));
assert!(watched.contains(&(
dir_b.canonicalize().expect("canonicalize dir b"),
RecursiveMode::NonRecursive,
)));
watcher.unwatch(&dir_a).expect("unwatch dir a");
let watched = watcher
.watched_paths()
.expect("list watched paths after unwatch");
assert!(!watched.contains(&(
dir_a.canonicalize().expect("canonicalize dir a"),
RecursiveMode::Recursive,
)));
assert!(watched.contains(&(
dir_b.canonicalize().expect("canonicalize dir b"),
RecursiveMode::NonRecursive,
)));
}
#[test]
fn create_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
watcher.watch_recursively(&tmpdir);
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("Unable to create");
rx.sleep_until_parent_contains(&path);
rx.sleep_until_exists(&path);
rx.sleep_until_walkdir_returns_set(tmpdir.path(), [&path]);
rx.wait_ordered_exact([expected(&path).create_any()]);
}
#[test]
fn create_dir() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
watcher.watch_recursively(&tmpdir);
let path = tmpdir.path().join("entry");
std::fs::create_dir(&path).expect("Unable to create");
rx.sleep_until_parent_contains(&path);
rx.sleep_until_exists(&path);
rx.sleep_until_walkdir_returns_set(tmpdir.path(), [&path]);
rx.wait_ordered_exact([expected(&path).create_any()]);
}
#[test]
fn modify_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("Unable to create");
rx.sleep_until_parent_contains(&path);
rx.sleep_until_exists(&path);
rx.sleep_until_walkdir_returns_set(tmpdir.path(), [&path]);
watcher.watch_recursively(&tmpdir);
std::fs::write(&path, b"123").expect("Unable to write");
assert!(
rx.sleep_until(|| std::fs::read_to_string(&path).is_ok_and(|content| content == "123")),
"the file wasn't modified"
);
rx.wait_ordered_exact([expected(&path).modify_data_any()]);
}
#[test]
fn remove_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
std::fs::File::create_new(&path).expect("Unable to create");
rx.sleep_until_parent_contains(&path);
rx.sleep_until_exists(&path);
rx.sleep_until_walkdir_returns_set(tmpdir.path(), [&path]);
watcher.watch_recursively(&tmpdir);
std::fs::remove_file(&path).expect("Unable to remove");
rx.sleep_while_exists(&path);
rx.sleep_while_parent_contains(&path);
rx.sleep_until_walkdir_returns_set::<&str>(tmpdir.path(), []);
rx.wait_ordered_exact([expected(&path).remove_any()]);
}
#[test]
fn rename_file() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let path = tmpdir.path().join("entry");
let new_path = tmpdir.path().join("new_entry");
std::fs::File::create_new(&path).expect("Unable to create");
rx.sleep_until_parent_contains(&path);
rx.sleep_until_exists(&path);
rx.sleep_until_walkdir_returns_set(tmpdir.path(), [&path]);
watcher.watch_recursively(&tmpdir);
std::fs::rename(&path, &new_path).expect("Unable to remove");
rx.sleep_while_exists(&path);
rx.sleep_until_exists(&new_path);
rx.sleep_while_parent_contains(&path);
rx.sleep_until_parent_contains(&new_path);
rx.sleep_until_walkdir_returns_set(tmpdir.path(), [&new_path]);
rx.wait_unordered_exact([
expected(&path).remove_any(),
expected(&new_path).create_any(),
]);
}
#[test]
fn create_write_overwrite() {
let tmpdir = testdir();
let (mut watcher, mut rx) = watcher();
let overwritten_file = tmpdir.path().join("overwritten_file");
let overwriting_file = tmpdir.path().join("overwriting_file");
std::fs::write(&overwritten_file, "123").expect("write1");
rx.sleep_until_parent_contains(&overwritten_file);
rx.sleep_until_exists(&overwritten_file);
rx.sleep_until_walkdir_returns_set(tmpdir.path(), [overwritten_file.clone()]);
watcher.watch_nonrecursively(&tmpdir);
std::fs::File::create(&overwriting_file).expect("create");
std::fs::write(&overwriting_file, "321").expect("write2");
std::fs::rename(&overwriting_file, &overwritten_file).expect("rename");
rx.sleep_while_exists(&overwriting_file);
rx.sleep_while_parent_contains(&overwriting_file);
rx.sleep_until_walkdir_returns_set(tmpdir.path(), [overwritten_file.clone()]);
assert!(
rx.sleep_until(
|| std::fs::read_to_string(&overwritten_file).is_ok_and(|cnt| cnt == "321")
),
"file {overwritten_file:?} was not replaced"
);
rx.wait_unordered([expected(&overwritten_file).modify_data_any()]);
}
#[test]
fn poll_watcher_respects_event_kind_mask() {
use crate::{Config, Watcher};
use notify_types::event::EventKindMask;
use std::time::Duration;
let tmpdir = testdir();
let (tx, rx) = std::sync::mpsc::channel();
let config = Config::default()
.with_event_kinds(EventKindMask::CREATE)
.with_compare_contents(true)
.with_manual_polling();
let mut watcher = PollWatcher::new(tx, config).expect("create watcher");
watcher
.watch(tmpdir.path(), crate::RecursiveMode::Recursive)
.expect("watch");
let path = tmpdir.path().join("test_file");
std::fs::write(&path, "initial").expect("write initial");
watcher.poll().expect("poll 1");
let event = rx
.recv_timeout(Duration::from_secs(1))
.expect("should receive CREATE event")
.expect("event should not be an error");
assert!(
event.kind.is_create(),
"Expected CREATE event, got: {event:?}"
);
std::fs::write(&path, "modified content").expect("write modified");
watcher.poll().expect("poll 2");
std::thread::sleep(Duration::from_millis(100));
let remaining: Vec<_> = rx.try_iter().filter_map(|r| r.ok()).collect();
assert!(
!remaining.iter().any(|e| e.kind.is_modify()),
"Should not receive MODIFY events with CREATE-only mask, got: {remaining:?}"
);
}
}