use log::debug;
use notify::event::{CreateKind, Event, EventKind};
use regex::Regex;
use std::collections::HashMap;
use std::io::Error;
use std::path::{Path, PathBuf};
use std::string::String;
use std::time::Instant;
use super::job::JobInfo;
use super::Scheduler;
use crate::utils;
pub struct SlurmJobEntry {
pub path_: PathBuf,
jobid_: String,
cluster_: String,
moment_: Instant,
script_: Option<Vec<u8>>,
env_: Option<Vec<u8>>,
filter_regex: Option<String>,
}
impl SlurmJobEntry {
pub fn new(
path: &Path,
id: &str,
cluster: &str,
filter_regex: &Option<String>,
) -> SlurmJobEntry {
SlurmJobEntry {
path_: path.to_path_buf(),
jobid_: id.to_string(),
cluster_: cluster.to_string(),
moment_: Instant::now(),
script_: None,
env_: None,
filter_regex: filter_regex.clone(),
}
}
}
fn filter_env(r: &Option<String>, env: &str) -> bool {
if let Some(rs) = r {
let rs = Regex::new(rs).unwrap();
if rs.is_match(env) {
return true;
}
return false;
}
false
}
impl JobInfo for SlurmJobEntry {
fn jobid(&self) -> String {
self.jobid_.clone()
}
fn moment(&self) -> Instant {
self.moment_
}
fn cluster(&self) -> String {
self.cluster_.clone()
}
fn read_job_info(&mut self) -> Result<(), Error> {
self.script_ = {
let mut s = utils::read_file(&self.path_, Path::new("script"), None)?;
if let Some(0) = s.last() {
s.pop();
}
Some(s)
};
self.env_ = Some(utils::read_file(
&self.path_,
Path::new("environment"),
None,
)?);
Ok(())
}
fn files(&self) -> Vec<(String, Vec<u8>)> {
[
("script", self.script_.as_ref()),
("environment", self.env_.as_ref()),
]
.iter()
.filter_map(|(filename, v)| {
v.map(|s| (format!("job.{}_{}", self.jobid_, filename), s.to_owned()))
})
.collect()
}
fn script(&self) -> String {
match &self.script_ {
Some(s) => String::from_utf8_lossy(s).to_string(),
None => panic!("No script available for job {}", self.jobid_),
}
}
fn extra_info(&self) -> Option<HashMap<String, String>> {
let r = self.filter_regex.clone();
self.env_.as_ref().map(|s| {
let env_string = String::from_utf8_lossy(s.split_at(4).1).to_string();
env_string
.split('\0')
.filter_map(|entry| {
let entry = entry.trim();
if !entry.is_empty() {
let parts: Vec<_> = entry.split('=').collect();
match parts.len() {
2 => {
let key = parts[0].trim();
println!("Checking for key {}", &key);
if !key.is_empty() && !filter_env(&r, key) {
println!("Keeping key {}", &key);
Some((key.to_owned(), parts[1].to_owned()))
} else {
None
}
}
_ => Some((entry.to_owned(), String::from(""))),
}
} else {
None
}
})
.collect::<HashMap<String, String>>()
})
}
}
pub struct Slurm {
pub base: PathBuf,
pub cluster: String,
pub filter_regex: Option<String>,
}
impl Slurm {
pub fn new(base: &Path, cluster: &str, filter_regex: &Option<String>) -> Slurm {
Slurm {
base: base.to_path_buf(),
cluster: cluster.to_string(),
filter_regex: filter_regex.clone(),
}
}
}
impl Scheduler for Slurm {
fn watch_locations(&self) -> Vec<PathBuf> {
(0..=9)
.map(|hash| self.base.join(format!("hash.{hash}")))
.collect()
}
fn create_job_info(&self, event_path: &Path) -> Option<Box<dyn JobInfo>> {
if let Some((jobid, _dirname)) = is_job_path(event_path) {
Some(Box::new(SlurmJobEntry::new(
event_path,
jobid,
&self.cluster,
&self.filter_regex,
)))
} else {
None
}
}
fn verify_event_kind(&self, event: &Event) -> Option<Vec<PathBuf>> {
if let Event {
kind: EventKind::Create(CreateKind::Folder),
paths,
..
} = event
{
Some(paths.to_vec())
} else {
None
}
}
}
pub fn is_job_path(path: &Path) -> Option<(&str, &str)> {
if path.is_dir() {
let dirname = path.file_name().unwrap().to_str().unwrap();
if dirname.starts_with("job.") {
return Some((path.extension().unwrap().to_str().unwrap(), dirname));
};
}
debug!("{:?} is not a considered job path", &path);
None
}
#[cfg(test)]
mod tests {
use super::*;
use std::env::current_dir;
use std::fs::create_dir;
use tempfile::tempdir;
#[test]
fn test_is_job_path() {
let tdir = tempdir().unwrap();
let jobdir = tdir.path().join("job.1234");
let _dir = create_dir(&jobdir);
assert_eq!(is_job_path(&jobdir), Some(("1234", "job.1234")));
let fdir = tdir.path().join("fubar");
let _faildir = create_dir(&fdir);
assert_eq!(is_job_path(&fdir), None);
}
#[test]
fn test_read_job_script_drop_zero() {
let path = PathBuf::from(current_dir().unwrap().join("tests/job.123456"));
let mut slurm_job_entry = SlurmJobEntry::new(&path, "123456", "mycluster", &None);
slurm_job_entry.read_job_info().unwrap();
let s = slurm_job_entry;
assert!(s.script_.unwrap().last() != Some(&0));
}
#[test]
fn test_read_job_extra_info() {
let path = PathBuf::from(current_dir().unwrap().join("tests/job.123456"));
let mut slurm_job_entry = SlurmJobEntry::new(&path, "123456", "mycluster", &None);
slurm_job_entry.read_job_info().unwrap();
if let Some(hm) = slurm_job_entry.extra_info() {
println!("hm length: {}", hm.len());
assert_eq!(hm.len(), 45);
assert_eq!(hm.get("SLURM_CLUSTERS").unwrap(), "cluster");
assert_eq!(hm.get("SLURM_NTASKS_PER_NODE").unwrap(), "1");
} else {
assert!(false);
}
}
#[test]
fn test_extra_info_drop_u32_prefix() {
let path = PathBuf::from(current_dir().unwrap().join("tests/job.8897161"));
let mut slurm_job_entry = SlurmJobEntry::new(&path, "8897161", "mycluster", &None);
if let Err(e) = slurm_job_entry.read_job_info() {
println!("Could not read job info: {:?}", e);
assert!(false);
}
if let Some(_hm) = slurm_job_entry.extra_info() {
} else {
assert!(false);
}
}
#[test]
fn test_extra_info() {
let env_data = b"\0\0\0\0VAR1=value1\0VAR2=value2\0VAR3=value3\0";
let filter_regex = Some("VAR2".to_string());
let job_entry = SlurmJobEntry {
path_: PathBuf::from("/some/path"),
jobid_: "12345".to_string(),
cluster_: "mycluster".to_string(),
moment_: Instant::now(),
script_: None,
env_: Some(env_data.to_vec()),
filter_regex,
};
let extra_info = job_entry.extra_info().unwrap();
assert_eq!(extra_info.get("VAR1"), Some(&"value1".to_string()));
assert_eq!(extra_info.get("VAR2"), None);
assert_eq!(extra_info.get("VAR3"), Some(&"value3".to_string()));
}
#[test]
fn test_filter_env() {
let regex = Some("VAR.*".to_string());
assert!(filter_env(®ex, "VAR1"));
assert!(filter_env(®ex, "VAR2"));
assert!(!filter_env(®ex, "OTHER"));
}
#[test]
fn test_filter_env_none() {
let regex = None;
assert!(!filter_env(®ex, "VAR1"));
assert!(!filter_env(®ex, "VAR2"));
assert!(!filter_env(®ex, "OTHER"));
}
}