use std::{
path::Path,
pin::Pin,
task::{Context, Poll},
};
use futures::Stream;
#[cfg(not(target_os = "macos"))]
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use notify::{Event, Result as NotifyResult};
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
use tracing::error;
pub struct FileSystemWatcher {
recv: UnboundedReceiver<NotifyResult<Event>>,
#[cfg(not(target_os = "macos"))]
_watcher: RecommendedWatcher, }
impl Stream for FileSystemWatcher {
type Item = NotifyResult<Event>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.recv.poll_recv(cx)
}
}
impl FileSystemWatcher {
#[cfg(not(target_os = "macos"))]
pub fn new<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
let (stream_tx, stream_rx) = unbounded_channel::<NotifyResult<Event>>();
let mut watcher: RecommendedWatcher = Watcher::new_immediate(move |res| {
if let Err(e) = stream_tx.send(res) {
error!("Unable to send inotify event into stream: {:?}", e)
}
})?;
watcher.configure(Config::PreciseEvents(true))?;
watcher.watch(path, RecursiveMode::NonRecursive)?;
Ok(FileSystemWatcher {
recv: stream_rx,
_watcher: watcher,
})
}
#[cfg(target_os = "macos")]
pub fn new<P: AsRef<Path>>(path: P) -> anyhow::Result<Self> {
Ok(FileSystemWatcher {
recv: mac::dir_watcher(path),
})
}
}
#[cfg(target_os = "macos")]
mod mac {
use std::collections::HashSet;
use std::path::PathBuf;
use super::*;
use notify::event::{CreateKind, EventKind, RemoveKind};
use notify::Error as NotifyError;
use tokio::fs::DirEntry;
use tokio::sync::mpsc::UnboundedSender;
use tokio::time::{self, Duration};
use tokio_stream::wrappers::ReadDirStream;
use tokio_stream::StreamExt;
const WAIT_TIME: u64 = 2;
pub fn dir_watcher<P: AsRef<Path>>(dir: P) -> UnboundedReceiver<NotifyResult<Event>> {
let (tx, rx) = unbounded_channel();
let path = dir.as_ref().to_path_buf();
tokio::spawn(async move {
let mut path_cache: HashSet<PathBuf> = match get_dir_list(&path).await {
Ok(set) => set,
Err(e) => {
error!(
"Unable to refresh directory {}, will attempt again: {:?}",
path.display(),
e
);
HashSet::new()
}
};
loop {
let current_paths: HashSet<PathBuf> = match get_dir_list(&path).await {
Ok(set) => set,
Err(e) => {
error!(
"Unable to refresh directory {}, will attempt again: {:?}",
path.display(),
e
);
if let Err(e) = tx.send(Err(NotifyError::io(e))) {
error!("Unable to send error {:?} due to channel being closed", e.0);
}
continue;
}
};
send_creates(tx.clone(), current_paths.difference(&path_cache).cloned());
send_deletes(tx.clone(), path_cache.difference(¤t_paths).cloned());
path_cache = current_paths;
time::sleep(Duration::from_secs(WAIT_TIME)).await;
}
});
rx
}
async fn get_dir_list(path: &PathBuf) -> Result<HashSet<PathBuf>, std::io::Error> {
ReadDirStream::new(tokio::fs::read_dir(path).await?)
.collect::<Result<Vec<DirEntry>, _>>()
.await
.map(|entries| {
entries
.into_iter()
.map(|e| e.path())
.collect::<HashSet<PathBuf>>()
})
}
fn send_creates(
tx: UnboundedSender<NotifyResult<Event>>,
items: impl Iterator<Item = PathBuf>,
) {
send_event_with_kind(tx, items, EventKind::Create(CreateKind::Any))
}
fn send_deletes(
tx: UnboundedSender<NotifyResult<Event>>,
items: impl Iterator<Item = PathBuf>,
) {
send_event_with_kind(tx, items, EventKind::Remove(RemoveKind::Any))
}
fn send_event_with_kind(
tx: UnboundedSender<NotifyResult<Event>>,
items: impl Iterator<Item = PathBuf>,
kind: EventKind,
) {
let paths: Vec<PathBuf> = items.collect();
if paths.is_empty() {
return;
}
let event = Event {
kind,
paths,
..Default::default()
};
if let Err(e) = tx.send(Ok(event)) {
error!(
"Unable to send event {:?} due to the channel being closed",
e.0
);
}
}
#[cfg(test)]
mod test {
use super::*;
#[tokio::test]
async fn test_send_deletes() {
let (tx, mut rx) = unbounded_channel();
let file1 = PathBuf::from("/foo/bar");
let file2 = PathBuf::from("/bar/foo");
send_deletes(tx, vec![file1.clone(), file2.clone()].into_iter());
let event = rx
.recv()
.await
.expect("got None result, which means the channel was closed prematurely")
.expect("Got error from watch");
assert!(event.kind.is_remove(), "Event is not a delete type");
assert!(event.paths.len() == 2, "Event should contain two paths");
assert!(event.paths.contains(&file1), "Missing expected path");
assert!(event.paths.contains(&file2), "Missing expected path");
}
#[tokio::test]
async fn test_send_creates() {
let (tx, mut rx) = unbounded_channel();
let file1 = PathBuf::from("/foo/bar");
let file2 = PathBuf::from("/bar/foo");
send_creates(tx, vec![file1.clone(), file2.clone()].into_iter());
let event = rx
.recv()
.await
.expect("got None result, which means the channel was closed prematurely")
.expect("Got error from watch");
assert!(event.kind.is_create(), "Event is not a create type");
assert!(event.paths.len() == 2, "Event should contain two paths");
assert!(event.paths.contains(&file1), "Missing expected path");
assert!(event.paths.contains(&file2), "Missing expected path");
}
#[tokio::test]
async fn test_watcher() {
let temp = tempfile::tempdir().expect("unable to set up temporary directory");
let first = tokio::fs::write(temp.path().join("old_foo.txt"), "");
let second = tokio::fs::write(temp.path().join("old_bar.txt"), "");
tokio::try_join!(first, second).expect("unable to write test files");
let mut rx = dir_watcher(&temp);
let base = temp.path().to_owned();
tokio::spawn(create_files(base));
let event = tokio::time::timeout(Duration::from_secs(WAIT_TIME + 1), rx.recv())
.await
.expect("Timed out waiting for event")
.expect("got None result, which means the channel was closed prematurely")
.expect("Got error from watch");
let mut found_create = false;
let mut found_delete = false;
assert_event(event, &temp, &mut found_create, &mut found_delete);
let event = tokio::time::timeout(Duration::from_secs(WAIT_TIME + 1), rx.recv())
.await
.expect("Timed out waiting for event")
.expect("got None result, which means the channel was closed prematurely")
.expect("Got error from watch");
assert_event(event, &temp, &mut found_create, &mut found_delete);
assert!(
tokio::time::timeout(Duration::from_secs(WAIT_TIME + 1), rx.recv())
.await
.is_err(),
"Should not have gotten another event"
);
}
async fn create_files(base: PathBuf) {
tokio::time::sleep(Duration::from_secs(1)).await;
let first = tokio::fs::write(base.join("new_foo.txt"), "");
let second = tokio::fs::write(base.join("new_bar.txt"), "");
let third = tokio::fs::remove_file(base.join("old_foo.txt"));
tokio::try_join!(first, second, third).expect("unable to write/delete test files");
}
fn assert_event(
event: Event,
base: impl AsRef<Path>,
found_create: &mut bool,
found_delete: &mut bool,
) {
match event.kind {
EventKind::Create(_) => {
if *found_create {
panic!("Got second create event");
}
assert!(event.paths.len() == 2, "Expected two created paths");
assert!(event.paths.contains(&base.as_ref().join("new_foo.txt")));
assert!(event.paths.contains(&base.as_ref().join("new_bar.txt")));
*found_create = true;
}
EventKind::Remove(_) => {
if *found_delete {
panic!("Got second delete event");
}
assert!(event.paths.len() == 1, "Expected 1 deleted path");
assert!(event.paths.contains(&base.as_ref().join("old_foo.txt")));
*found_delete = true;
}
_ => panic!("Event wasn't a create or remove"),
}
}
}
}