use std::path::Path;
use std::path::PathBuf;
use std::time::Duration;
use futures::channel::mpsc;
use futures::prelude::*;
use notify::event::DataChange;
use notify::event::MetadataKind;
use notify::event::ModifyKind;
use notify::Config;
use notify::EventKind;
use notify::PollWatcher;
use notify::RecursiveMode;
use notify::Watcher;
#[cfg(not(test))]
const DEFAULT_WATCH_DURATION: Duration = Duration::from_secs(3);
#[cfg(test)]
const DEFAULT_WATCH_DURATION: Duration = Duration::from_millis(100);
pub(crate) fn watch(path: &Path) -> impl Stream<Item = ()> {
watch_with_duration(path, DEFAULT_WATCH_DURATION)
}
fn watch_with_duration(path: &Path, duration: Duration) -> impl Stream<Item = ()> {
let config_file_path = PathBuf::from(path);
let watched_path = config_file_path.clone();
let (mut watch_sender, watch_receiver) = mpsc::channel(1);
let config = Config::default()
.with_poll_interval(duration)
.with_compare_contents(true);
let mut watcher = PollWatcher::new(
move |res: Result<notify::Event, notify::Error>| match res {
Ok(event) => {
if matches!(
event.kind,
EventKind::Modify(ModifyKind::Metadata(MetadataKind::WriteTime))
| EventKind::Modify(ModifyKind::Data(DataChange::Any))
) && event.paths.contains(&watched_path)
{
loop {
match watch_sender.try_send(()) {
Ok(_) => break,
Err(err) => {
tracing::warn!(
"could not process file watch notification. {}",
err.to_string()
);
if err.is_full() {
std::thread::sleep(Duration::from_millis(50));
} else {
panic!("event channel failed: {}", err);
}
}
}
}
}
}
Err(e) => tracing::error!("event error: {:?}", e),
},
config,
)
.unwrap_or_else(|_| panic!("could not create watch on: {:?}", config_file_path));
watcher
.watch(&config_file_path, RecursiveMode::NonRecursive)
.unwrap_or_else(|_| panic!("could not watch: {:?}", config_file_path));
stream::once(future::ready(()))
.chain(watch_receiver)
.chain(stream::once(async move {
drop(watcher);
}))
.boxed()
}
#[cfg(test)]
pub(crate) mod tests {
use std::env::temp_dir;
use std::fs::File;
use std::io::Seek;
use std::io::SeekFrom;
use std::io::Write;
use std::path::PathBuf;
use test_log::test;
use super::*;
#[test(tokio::test)]
async fn basic_watch() {
let (path, mut file) = create_temp_file();
let mut watch = watch_with_duration(&path, Duration::from_millis(100));
assert!(futures::poll!(watch.next()).is_ready());
write_and_flush(&mut file, "Some data 1").await;
assert!(futures::poll!(watch.next()).is_ready());
write_and_flush(&mut file, "Some data 2").await;
assert!(futures::poll!(watch.next()).is_ready())
}
pub(crate) fn create_temp_file() -> (PathBuf, File) {
let path = temp_dir().join(format!("{}", uuid::Uuid::new_v4()));
let file = std::fs::File::create(&path).unwrap();
(path, file)
}
pub(crate) async fn write_and_flush(file: &mut File, contents: &str) {
file.seek(SeekFrom::Start(0)).unwrap();
file.set_len(0).unwrap();
file.write_all(contents.as_bytes()).unwrap();
file.flush().unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
}
}