use debounce::EventDebouncer;
use futures::{
channel::mpsc::{channel, Receiver},
SinkExt, StreamExt,
};
use notify::{Config, Event, RecommendedWatcher, RecursiveMode, Watcher};
use std::error;
use std::{path::Path, sync::Arc, time::Duration};
type Result<T> = std::result::Result<T, Box<dyn error::Error>>;
pub struct Monitor {
dir: String,
update_interval: u64,
on_change: Arc<Box<dyn Fn(String) -> Result<()> + Send + Sync>>,
}
impl Monitor {
pub fn new<F>(dir: &str, update_interval: u64, on_change: F) -> Self
where
F: Fn(String) -> Result<()> + Send + Sync + 'static,
{
Self {
dir: dir.to_string(),
update_interval,
on_change: Arc::new(Box::new(on_change)),
}
}
pub async fn start(&self) {
if let Err(e) = self.watch_directory().await {
eprintln!("File monitoring error: {:?}", e);
}
}
async fn watch_directory(&self) -> Result<()> {
let delay = Duration::from_millis(self.update_interval);
let on_change = self.on_change.clone();
let debouncer = EventDebouncer::new(delay, move |path: String| on_change(path).unwrap());
let (mut watcher, mut rx) = self.create_watcher()?;
watcher.watch(Path::new(&self.dir), RecursiveMode::Recursive)?;
while let Some(res) = rx.next().await {
match res {
Ok(event) => {
if let Some(paths) = self.get_valid_paths(event) {
for path in paths {
debouncer.put(path);
}
}
}
Err(e) => eprintln!("File monitoring error: {:?}", e),
}
}
Ok(())
}
fn create_watcher(
&self,
) -> notify::Result<(RecommendedWatcher, Receiver<notify::Result<Event>>)> {
let (mut tx, rx) = channel(1);
let watcher = RecommendedWatcher::new(
move |res| {
futures::executor::block_on(async {
if let Err(e) = tx.send(res).await {
eprintln!("Error sending event: {:?}", e);
}
})
},
Config::default(),
)?;
Ok((watcher, rx))
}
fn get_valid_paths(&self, event: Event) -> Option<Vec<String>> {
if !matches!(
event.kind,
notify::EventKind::Modify(notify::event::ModifyKind::Data(
notify::event::DataChange::Content
))
) {
return None;
}
Some(
event
.paths
.iter()
.map(|path| path.to_str().unwrap().to_string())
.collect(),
)
}
}