use clap::Args;
use glob::glob;
use log::debug;
use notify::event::{CreateKind, Event, EventKind};
use std::collections::HashMap;
use std::io::Error;
use std::path::{Path, PathBuf};
use std::time::Instant;
use super::job::JobInfo;
use super::Scheduler;
use crate::utils;
#[derive(Args, Debug)]
pub struct TorqueArgs {
subdirs: bool,
}
pub struct TorqueJobEntry {
path_: PathBuf,
jobname_: Option<String>,
jobid_: String,
cluster_: String,
hostname_: String,
moment_: Instant,
script_: Option<Vec<u8>>,
env_: HashMap<String, Vec<u8>>,
}
impl TorqueJobEntry {
fn new(p: &Path, id: &str, cluster: &str, hostname: &str) -> TorqueJobEntry {
TorqueJobEntry {
path_: p.to_path_buf(),
jobname_: None,
cluster_: cluster.to_string(),
hostname_: hostname.to_string(),
jobid_: id.to_owned(),
moment_: Instant::now(),
script_: None,
env_: HashMap::new(),
}
}
}
impl JobInfo for TorqueJobEntry {
fn path(&self) -> PathBuf {
self.path_.clone()
}
fn jobid(&self) -> String {
self.jobid_.clone()
}
fn moment(&self) -> Instant {
self.moment_
}
fn cluster(&self) -> String {
self.cluster_.clone()
}
fn hostname(&self) -> String {
self.hostname_.clone()
}
fn read_job_info(&mut self) -> Result<(), Error> {
let dir = self.path_.parent().unwrap();
let filename = Path::new(self.path_.file_name().unwrap());
self.jobname_ = Some(filename.to_str().unwrap().to_string());
self.script_ = Some(utils::read_file(dir, filename, None)?);
let ta_filename = filename.with_extension("TA");
if let Ok(ta_contents) = utils::read_file(dir, &ta_filename, Some(10)) {
self.env_
.insert(ta_filename.to_str().unwrap().to_string(), ta_contents);
let job_prefix = filename.to_str().unwrap().split('.').next().unwrap();
debug!(
"Found TA file, looking for JB files in {:?} with name {}",
dir, job_prefix
);
for jb_path in glob(&format!("{}/{}-*.JB", dir.display(), job_prefix))
.unwrap()
.filter_map(Result::ok)
{
let jb_filename = Path::new(jb_path.file_name().unwrap());
if let Ok(jb_contents) = utils::read_file(dir, jb_filename, Some(10)) {
self.env_
.insert(jb_filename.to_str().unwrap().to_string(), jb_contents);
}
}
return Ok(());
}
let jb_filename = filename.with_extension("JB");
let jb_contents = utils::read_file(dir, &jb_filename, None)?;
self.env_
.insert(jb_filename.to_str().unwrap().to_string(), jb_contents);
Ok(())
}
fn files(&self) -> Vec<(String, Vec<u8>)> {
let mut fs: Vec<(String, Vec<u8>)> = Vec::new();
if let Some(jn) = &self.jobname_ {
if let Some(script) = &self.script_ {
fs.push((jn.to_string(), script.to_vec()));
}
}
for (jb, jb_contents) in self.env_.iter() {
fs.push((jb.to_string(), jb_contents.to_vec()));
}
fs
}
fn script(&self) -> String {
match &self.script_ {
Some(s) => String::from_utf8_lossy(s).to_string(),
None => panic!("No script available for job {}", self.jobid_),
}
}
fn extra_info(&self) -> Option<HashMap<String, String>> {
Some(
self.env_
.iter()
.map(|(k, v)| (k.clone(), String::from_utf8_lossy(v).to_string()))
.collect(),
)
}
}
pub struct Torque {
pub base: PathBuf,
pub cluster: String,
pub hostname: String,
pub subdirs: bool,
}
impl Torque {
pub fn new(base: &Path, cluster: &str, hostname: &str) -> Torque {
Torque {
base: base.to_path_buf(),
cluster: cluster.to_string(),
hostname: hostname.to_string(),
subdirs: true, }
}
}
impl Scheduler for Torque {
fn watch_locations(&self) -> Vec<PathBuf> {
if self.subdirs {
(0..=9).map(|sd| self.base.join(format!("{sd}"))).collect()
} else {
[self.base.clone()].to_vec()
}
}
fn create_job_info(&self, event_path: &Path) -> Option<Box<dyn JobInfo>> {
if let Some((jobid, filename)) = is_job_path(event_path) {
Some(Box::new(TorqueJobEntry::new(
filename,
jobid,
&self.cluster,
&self.hostname,
)))
} else {
None
}
}
fn verify_event_kind(&self, event: &Event) -> Option<Vec<PathBuf>> {
if let Event {
kind: EventKind::Create(CreateKind::File),
paths,
..
} = event
{
Some(paths.to_vec())
} else {
None
}
}
}
fn is_job_path(path: &Path) -> Option<(&str, &Path)> {
if path.is_file() {
let jobid = path.file_stem().unwrap().to_str().unwrap();
return match path.extension().unwrap().to_str().unwrap() {
"SC" => Some((jobid, path)),
_ => None,
};
}
debug!("{:?} is not a considered job path", &path);
None
}
#[cfg(test)]
mod tests {
use super::*;
use std::env::current_dir;
#[test]
fn test_read_info() {
let path = PathBuf::from(
current_dir()
.unwrap()
.join("tests/torque_job.1/1.mymaster.mycluster.SC"),
);
let mut torque_job_entry = TorqueJobEntry::new(&path, "1", "mycluster", "master");
torque_job_entry.read_job_info().unwrap();
assert!(torque_job_entry
.env_
.contains_key("1.mymaster.mycluster.JB"));
assert_eq!(
torque_job_entry.env_.get("1.mymaster.mycluster.JB"),
Some(&String::from("<some><xml>M</xml></some>").into_bytes())
);
}
#[test]
fn test_read_info_job_array() {
let path = PathBuf::from(
current_dir()
.unwrap()
.join("tests/torque_job.2/2.mymaster.mycluster.SC"),
);
let mut torque_job_entry = TorqueJobEntry::new(&path, "2", "mycluster", "master");
torque_job_entry.read_job_info().unwrap();
assert!(torque_job_entry
.env_
.contains_key("2.mymaster.mycluster.TA"));
assert!(torque_job_entry
.env_
.contains_key("2-1.mymaster.mycluster.JB"));
assert!(torque_job_entry
.env_
.contains_key("2-2.mymaster.mycluster.JB"));
assert_eq!(
torque_job_entry.env_.get("2-1.mymaster.mycluster.JB"),
Some(&String::from("<some><xml>M1</xml></some>").into_bytes())
);
assert_eq!(
torque_job_entry.env_.get("2-2.mymaster.mycluster.JB"),
Some(&String::from("<some><xml>M2</xml></some>").into_bytes())
);
}
}