pub mod file;
#[cfg(feature = "kafka")]
pub mod kafka;
use clap::Subcommand;
use crossbeam_channel::{select, Receiver};
use log::{debug, error, info};
use std::io::Error;
#[cfg(feature = "kafka")]
use self::kafka::{KafkaArchive, KafkaArgs};
use super::scheduler::job::JobInfo;
use file::{FileArchive, FileArgs};
use std::thread::sleep;
use std::time::Duration;
#[derive(Subcommand, Debug)]
pub enum Archiver {
File(FileArgs),
#[cfg(feature = "kafka")]
Kafka(KafkaArgs),
}
#[allow(clippy::borrowed_box)]
pub trait Archive: Send {
fn archive(&self, slurm_job_entry: &Box<dyn JobInfo>) -> Result<(), Error>;
}
pub fn archive_builder(archiver: Archiver) -> Result<Box<dyn Archive>, Error> {
match archiver {
Archiver::File(args) => {
let archive = FileArchive::build(args)?;
Ok(Box::new(archive))
}
#[cfg(feature = "kafka")]
Archiver::Kafka(kafka_args) => {
let archive = KafkaArchive::build(kafka_args)?;
Ok(Box::new(archive))
}
}
}
pub fn process(
archiver: Box<dyn Archive>,
r: &Receiver<Box<dyn JobInfo>>,
sigchannel: &Receiver<bool>,
cleanup: bool,
) -> Result<(), Error> {
info!("Start processing events");
#[allow(clippy::zero_ptr, dropping_copy_types)]
loop {
select! {
recv(sigchannel) -> b => if let Ok(true) = b {
if !cleanup {
info!("Stopped processing entries, {} skipped", r.len());
} else {
info!("Processing {} entries, then stopping", r.len());
for mut entry in r.iter() {
entry.read_job_info()?;
archiver.archive(&entry)?;
}
info!("Done processing");
}
break;
},
recv(r) -> entry => {
if let Ok(mut job_entry) = entry {
let elapsed = job_entry.moment().elapsed();
if let Some(dur) = Duration::from_millis(2000).checked_sub(elapsed) {
debug!("Waiting for {} ms to elapse before checking files", dur.as_millis());
sleep(dur);
}
if let Err(e) = job_entry.read_job_info() {
error!("Error on reading JobEntry ({}) info for path {:?}: {}", job_entry.jobid(), job_entry.path(), e);
} else if let Err(e) = archiver.archive(&job_entry) {
error!("Error on archiving JobEntry ({}) for path {:?}: {}", job_entry.jobid(), job_entry.path(), e);
} else {
info!("JobEntry ({}) archived successfully for path {:?}", job_entry.jobid(), job_entry.path());
}
} else {
error!("Error on receiving JobEntry info");
break;
}
}
}
}
debug!("Processing loop exited");
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::scheduler::job::JobInfo;
use crate::scheduler::slurm::SlurmJobEntry;
use crossbeam_channel::unbounded;
use crossbeam_utils::thread::scope;
use std::env::current_dir;
use std::path::PathBuf;
use std::thread::sleep;
use std::time::Duration;
struct DummyArchiver;
impl Archive for DummyArchiver {
fn archive(&self, _: &Box<dyn JobInfo>) -> Result<(), Error> {
info!("Archiving");
Ok(())
}
}
#[test]
fn test_process_cleanup() {
let (tx1, rx1) = unbounded();
let (tx2, rx2) = unbounded();
let archiver = Box::new(DummyArchiver);
scope(|s| {
let path = PathBuf::from(current_dir().unwrap().join("tests/job.123456"));
let slurm_job_entry = SlurmJobEntry::new(&path, "123456", "mycluster", "master", None);
s.spawn(move |_| match process(archiver, &rx1, &rx2, false) {
Ok(v) => assert_eq!(v, ()),
Err(_) => panic!("Unexpected error from process function"),
});
tx1.send(Box::new(slurm_job_entry)).unwrap();
sleep(Duration::from_millis(1000));
tx2.send(true).unwrap();
})
.unwrap();
}
}