extern crate chrono;
extern crate crossbeam_channel;
extern crate crossbeam_utils;
use crossbeam_channel::{select, unbounded, Receiver, Sender};
use log::*;
use notify::event::Event;
use notify::{RecommendedWatcher, RecursiveMode, Watcher};
use std::io::{Error, ErrorKind};
use std::path::Path;
use super::scheduler::job::JobInfo;
use super::scheduler::Scheduler;
#[allow(clippy::borrowed_box)]
fn check_and_queue(
scheduler: &Box<dyn Scheduler>,
s: &Sender<Box<dyn JobInfo>>,
event: Event,
) -> Result<(), std::io::Error> {
debug!("Event received: {:?}", event);
match scheduler.verify_event_kind(&event) {
Some(paths) => scheduler
.create_job_info(&paths[0])
.ok_or_else(|| {
Error::new(
ErrorKind::Other,
"Could not create job info structure".to_owned(),
)
})
.and_then(|jobinfo| {
s.send(jobinfo)
.map_err(|err| Error::new(ErrorKind::Other, err.to_string()))
}),
_ => Ok(()),
}
}
#[allow(clippy::borrowed_box)]
pub fn monitor(
scheduler: &Box<dyn Scheduler>,
path: &Path,
s: &Sender<Box<dyn JobInfo>>,
sigchannel: &Receiver<bool>,
) -> notify::Result<()> {
let (tx, rx) = unbounded();
let mut watcher = RecommendedWatcher::new_immediate(move |res| tx.send(res).unwrap())?;
info!("Watching path {:?}", &path);
if let Err(e) = watcher.watch(&path, RecursiveMode::NonRecursive) {
return Err(e);
}
#[allow(clippy::zero_ptr, clippy::drop_copy)]
loop {
select! {
recv(sigchannel) -> b => if let Ok(true) = b {
break Ok(());
},
recv(rx) -> event => {
match event {
Ok(Ok(e)) => check_and_queue(&scheduler, s, e)?,
Ok(Err(_)) | Err(_) => {
error!("Error on received event: {:?}", event);
break Err(notify::Error::new(notify::ErrorKind::Generic("Problem receiving event".to_string())));
}
}
}
}
}
}
#[cfg(test)]
mod tests {}