extern crate chrono;
extern crate crossbeam_channel;
extern crate crossbeam_utils;
use crossbeam_channel::{select, unbounded, Receiver, Sender};
use log::*;
use notify::event::Event;
use notify::{recommended_watcher, RecursiveMode, Watcher};
use std::io::Error;
use std::path::Path;
use super::scheduler::job::JobInfo;
use super::scheduler::Scheduler;
#[allow(clippy::borrowed_box)]
fn check_and_queue(
scheduler: &Box<dyn Scheduler>,
s: &Sender<Box<dyn JobInfo>>,
event: Event,
) -> Result<(), std::io::Error> {
debug!("Event received: {:?}", event);
match scheduler.verify_event_kind(&event) {
Some(paths) => {
info!(
"Event received for a scheduler job entry with path {:?}",
&paths[0]
);
scheduler
.create_job_info(&paths[0])
.ok_or_else(|| Error::other("Could not create job info structure".to_owned()))
.and_then(|jobinfo| {
info!("Sending job info for path {:?}", &paths[0]);
s.send(jobinfo).map_err(|err| Error::other(err.to_string()))
})
}
_ => {
debug!(
"Event does not pertain to a scheduler job entry: {:?}",
event
);
Ok(())
}
}
}
#[allow(clippy::borrowed_box)]
pub fn monitor(
scheduler: &Box<dyn Scheduler>,
path: &Path,
s: &Sender<Box<dyn JobInfo>>,
sigchannel: &Receiver<bool>,
) -> notify::Result<()> {
let (tx, rx) = unbounded();
let mut watcher = recommended_watcher(move |res| {
if tx.send(res).is_err() {
error!("Failed to send event through channel");
}
})?;
info!("Watching path {:?}", path);
watcher.watch(path, RecursiveMode::NonRecursive)?;
#[allow(clippy::zero_ptr, dropping_copy_types)]
loop {
select! {
recv(sigchannel) -> b => if let Ok(true) = b {
break Ok(());
},
recv(rx) -> event => {
match event {
Ok(Ok(e)) => {
if let Err(err) = check_and_queue(scheduler, s, e) {
error!("Failed to check and queue event: {:?}", err);
}
},
Ok(Err(_)) | Err(_) => {
error!("Error on received event");
break Err(notify::Error::new(notify::ErrorKind::Generic("Problem receiving event".to_string())));
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crossbeam_channel::unbounded;
use notify::event::{CreateKind, Event, EventKind};
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::{Duration, Instant};
use tempfile::tempdir;
struct DummyScheduler;
impl Scheduler for DummyScheduler {
fn watch_locations(&self) -> Vec<PathBuf> {
vec!["dummy_watch_location".into()]
}
fn create_job_info(&self, _event_path: &Path) -> Option<Box<dyn JobInfo>> {
Some(Box::new(DummyJobInfo))
}
fn verify_event_kind(&self, event: &Event) -> Option<Vec<PathBuf>> {
if let Event {
kind: EventKind::Create(CreateKind::File),
..
} = event
{
Some(vec![event.paths[0].clone()])
} else {
None
}
}
}
struct DummyJobInfo;
impl JobInfo for DummyJobInfo {
fn path(&self) -> PathBuf {
PathBuf::from("/tmp/test")
}
fn jobid(&self) -> String {
"dummy_job".to_string()
}
fn moment(&self) -> Instant {
Instant::now()
}
fn cluster(&self) -> String {
"dummy_cluster".to_string()
}
fn hostname(&self) -> String {
"master".to_string()
}
fn read_job_info(&mut self) -> Result<(), Error> {
Ok(())
}
fn files(&self) -> Vec<(String, Vec<u8>)> {
vec![]
}
fn script(&self) -> String {
"dummy_script".to_string()
}
fn extra_info(&self) -> Option<HashMap<String, String>> {
Some(HashMap::new())
}
}
#[test]
fn test_monitor() {
let temp_dir = tempdir().unwrap();
let temp_dir_path = temp_dir.path().to_owned();
let temp_dir_path_clone = temp_dir_path.clone();
let (tx, rx) = unbounded();
let (sig_tx, sig_rx) = unbounded();
let scheduler: Box<dyn Scheduler + 'static> = Box::new(DummyScheduler);
let monitor_thread = std::thread::spawn(move || {
monitor(&scheduler, &temp_dir_path_clone, &tx, &sig_rx)
.expect("Monitor function failed");
});
std::thread::sleep(Duration::from_millis(1000));
let dummy_file_path = temp_dir_path.join("dummy_file.txt");
std::fs::write(&dummy_file_path, "dummy_content").expect("Failed to create dummy file");
std::thread::sleep(Duration::from_millis(100));
let job_info = rx.try_recv().expect("No JobInfo received");
assert_eq!(job_info.jobid(), "dummy_job");
sig_tx
.send(true)
.expect("Failed to send signal to stop the monitor thread");
monitor_thread
.join()
.expect("Failed to join monitor thread");
}
#[test]
fn test_check_and_queue() {
let temp_dir = tempdir().unwrap();
let temp_dir_path = temp_dir.path().to_owned();
let (tx, rx) = unbounded();
let scheduler: Box<dyn Scheduler + 'static> = Box::new(DummyScheduler);
let dummy_file_path = temp_dir_path.join("dummy_file.txt");
std::fs::write(&dummy_file_path, "dummy_content").expect("Failed to create dummy file");
let dummy_event = Event {
kind: EventKind::Create(CreateKind::File),
paths: vec![dummy_file_path.clone()],
..Default::default()
};
let result = check_and_queue(&scheduler, &tx, dummy_event);
assert!(result.is_ok());
let job_info = rx.try_recv().expect("No JobInfo received");
assert_eq!(job_info.jobid(), "dummy_job");
}
}