pub mod file;
#[cfg(feature = "elasticsearch-7")]
pub mod elastic;
#[cfg(feature = "kafka")]
pub mod kafka;
use clap::ArgMatches;
use crossbeam_channel::{select, Receiver};
use log::{debug, error, info};
use std::io::{Error, ErrorKind};
#[cfg(feature = "elasticsearch-7")]
use self::elastic::ElasticArchive;
#[cfg(feature = "kafka")]
use self::kafka::KafkaArchive;
use super::scheduler::job::JobInfo;
use file::FileArchive;
use std::thread::sleep;
use std::time::Duration;
#[allow(clippy::borrowed_box)]
pub trait Archive: Send {
fn archive(&self, slurm_job_entry: &Box<dyn JobInfo>) -> Result<(), Error>;
}
pub fn archive_builder(matches: &ArgMatches) -> Result<Box<dyn Archive>, Error> {
match matches.subcommand() {
Some(("file", command_matches)) => {
let archive = FileArchive::build(command_matches)?;
Ok(Box::new(archive))
}
#[cfg(feature = "elasticsearch-7")]
Some(("elasticsearch", run_matches)) => {
let archive = ElasticArchive::build(run_matches)?;
Ok(Box::new(archive))
}
#[cfg(feature = "kafka")]
Some(("kafka", run_matches)) => {
let archive = KafkaArchive::build(run_matches)?;
Ok(Box::new(archive))
}
_ => Err(Error::new(
ErrorKind::Other,
"No supported archival subcommand used",
)),
}
}
pub fn process(
archiver: Box<dyn Archive>,
r: &Receiver<Box<dyn JobInfo>>,
sigchannel: &Receiver<bool>,
cleanup: bool,
) -> Result<(), Error> {
info!("Start processing events");
#[allow(clippy::zero_ptr, clippy::drop_copy)]
loop {
select! {
recv(sigchannel) -> b => if let Ok(true) = b {
if !cleanup {
info!("Stopped processing entries, {} skipped", r.len());
} else {
info!("Processing {} entries, then stopping", r.len());
for mut entry in r.iter() {
entry.read_job_info()?;
archiver.archive(&entry)?;
}
info!("Done processing");
}
break;
},
recv(r) -> entry => {
if let Ok(mut job_entry) = entry {
let elapsed = job_entry.moment().elapsed();
if let Some(dur) = Duration::from_millis(2000).checked_sub(elapsed) {
debug!("Waiting for {} ms to elapse before checking files", dur.as_millis());
sleep(dur);
}
job_entry.read_job_info()?;
archiver.archive(&job_entry)?;
} else {
error!("Error on receiving JobEntry info");
break;
}
}
}
}
debug!("Processing loop exited");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::job::JobInfo;
use crate::scheduler::slurm::SlurmJobEntry;
use crossbeam_channel::unbounded;
use crossbeam_utils::thread::scope;
use std::env::current_dir;
use std::path::PathBuf;
use std::thread::sleep;
use std::time::Duration;
struct DummyArchiver;
impl Archive for DummyArchiver {
fn archive(&self, _: &Box<dyn JobInfo>) -> Result<(), Error> {
info!("Archiving");
Ok(())
}
}
#[test]
fn test_process() {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let archiver = Box::new(DummyArchiver);
scope(|s| {
let path = PathBuf::from(current_dir().unwrap().join("tests/job.123456"));
let slurm_job_entry = SlurmJobEntry::new(&path, "123456", "mycluster");
s.spawn(move |_| match process(archiver, &rx1, &rx2, false) {
Ok(v) => assert_eq!(v, ()),
Err(_) => panic!("Unexpected error from process function"),
});
tx1.send(Box::new(slurm_job_entry)).unwrap();
sleep(Duration::from_millis(1000));
tx2.send(true).unwrap();
})
.unwrap();
}
}