use clap::Parser;
use crossbeam_channel::{bounded, unbounded};
use crossbeam_utils::sync::Parker;
use crossbeam_utils::thread::scope;
use log::{error, info};
use std::path::PathBuf;
use std::process::exit;
use std::sync::atomic::AtomicBool;
use std::sync::Arc;
mod archive;
mod monitor;
mod scheduler;
mod utils;
use archive::{archive_builder, process, Archive, Archiver};
use monitor::monitor;
use scheduler::{create, SchedulerKind};
use utils::{register_signal_handler, signal_handler_atomic};
fn setup_logging(debug: bool, logfile: Option<PathBuf>) -> Result<(), log::SetLoggerError> {
let level_filter = if debug {
log::LevelFilter::Debug
} else {
log::LevelFilter::Info
};
let base_config = fern::Dispatch::new()
.format(|out, message, record| {
out.finish(format_args!(
"[{}][{}][{}] {}",
chrono::Local::now().to_rfc3339(),
record.target(),
record.level(),
message
))
})
.level(level_filter);
match logfile {
Some(filename) => {
let r = fern::log_reopen(&filename, Some(libc::SIGHUP)).unwrap();
base_config.chain(r)
}
None => base_config.chain(std::io::stdout()),
}
.apply()
}
#[derive(Parser)]
#[command(author, version, about)]
struct Cli {
#[arg(
long,
help = "Name of the cluster where the jobs have been submitted to."
)]
cluster: String,
#[arg(long)]
debug: bool,
#[arg(
long,
help = "[Experimental] Process already received events when the program is terminated with SIGINT or SIGTERM"
)]
cleanup: bool,
#[arg(long, help = "Log file name.")]
logfile: Option<PathBuf>,
#[arg(long)]
torque_subdirs: bool,
#[arg(long)]
spool: PathBuf,
#[arg(long)]
scheduler: SchedulerKind,
#[command(subcommand)]
archiver: Archiver,
}
fn main() -> Result<(), std::io::Error> {
let cli = Cli::parse();
match setup_logging(cli.debug, cli.logfile) {
Ok(_) => (),
Err(e) => panic!("Cannot set up logging: {e:?}"),
};
let base = cli.spool.to_owned();
if !base.is_dir() {
error!("Provided spool {:?} is not a valid directory", &base);
exit(1);
}
let scheduler_kind = cli.scheduler;
let archiver: Box<dyn Archive> = archive_builder(&cli.archiver).unwrap();
let cluster = cli.cluster;
info!("sarchive starting. Watching spool {:?}.", &base);
let notification = Arc::new(AtomicBool::new(false));
let parker = Parker::new();
let unparker = parker.unparker();
register_signal_handler(signal_hook::consts::SIGTERM, unparker, ¬ification);
register_signal_handler(signal_hook::consts::SIGINT, unparker, ¬ification);
let (sig_sender, sig_receiver) = bounded(20);
let cleanup = cli.cleanup;
let (sender, receiver) = unbounded();
let sched = create(&scheduler_kind, &base, &cluster);
if let Err(e) = scope(|s| {
let ss = &sig_sender;
s.spawn(move |_| {
signal_handler_atomic(ss, notification, &parker);
info!("Signal handled");
});
for loc in sched.watch_locations() {
let t = &sender;
let sr = &sig_receiver;
let sl = &sched;
let b = &base;
s.spawn(move |_| match monitor(sl, &loc, t, sr) {
Ok(_) => info!("Stopped watching location {:?}", &loc),
Err(e) => {
error!("{:?}", e);
panic!("Error watching {:?}", &b);
}
});
}
let r = &receiver;
let sr = &sig_receiver;
s.spawn(move |_| {
match process(archiver, r, sr, cleanup) {
Ok(()) => info!("Processing completed succesfully"),
Err(e) => error!("processing failed: {:?}", e),
};
});
}) {
error!("sarchive stopping due to error: {:?}", e);
exit(1);
};
info!("Sarchive finished");
exit(0);
}