use std::fmt::{self,Debug,Formatter};
use std::fs::{self,File,OpenOptions};
use std::str::FromStr;
use std::io::prelude::*;
use std::io::{stdout,BufReader};
use std::path::{Path,PathBuf};
use std::process::Command;
use std::net::TcpStream;
use std::collections::{HashSet};
use ssh2::Session;
use indicatif::{ProgressBar,ProgressStyle};
use crate::config_parser::{self,ConfigurationValue};
use crate::{Simulation,Plugs,source_location,error,match_object_panic};
use crate::output::{create_output,OutputEnvironment,OutputEnvironmentEntry};
use crate::config::{self,evaluate,flatten_configuration_value};
use crate::error::{Error,ErrorKind,SourceLocation};
#[derive(Debug,Clone,Copy,PartialEq)]
pub enum Action
{
LocalAndOutput,
Local,
Output,
Slurm,
Check,
Pull,
RemoteCheck,
Push,
SlurmCancel,
Shell,
Pack,
Discard,
QuickTest,
}
impl FromStr for Action
{
type Err = Error;
fn from_str(s:&str) -> Result<Action,Error>
{
match s
{
"default" => Ok(Action::LocalAndOutput),
"local_and_output" => Ok(Action::LocalAndOutput),
"local" => Ok(Action::Local),
"output" => Ok(Action::Output),
"slurm" => Ok(Action::Slurm),
"check" => Ok(Action::Check),
"pull" => Ok(Action::Pull),
"remote_check" => Ok(Action::RemoteCheck),
"push" => Ok(Action::Push),
"slurm_cancel" => Ok(Action::SlurmCancel),
"shell" => Ok(Action::Shell),
"pack" => Ok(Action::Pack),
"discard" => Ok(Action::Discard),
"quick_test" => Ok(Action::QuickTest),
_ => Err(error!(bad_argument).with_message(format!("String {s} cannot be parsed as an Action."))),
}
}
}
impl fmt::Display for Action
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result
{
write!(f, "{:?}", *self)
}
}
struct KeyboardInteraction;
impl KeyboardInteraction
{
fn ask_password(&self, username: &str, hostname: &str) -> String
{
println!("Asking username {} for their password at {}.",username,hostname);
stdout().lock().flush().unwrap();
rpassword::prompt_password("Password: ").unwrap() }
fn ask_confirmation(&self, action:&str) -> Result<bool,Error>
{
loop{
let prompt = format!("Asking confirmation for \"{action}\" [Yes/No/Panic]: ");
let reply = rprompt::prompt_reply_stdout( &prompt ).map_err(|e|error!(undetermined).with_message(format!("could not read from prompt: {e}")))?;
match reply.as_ref()
{
"Yes" | "yes" | "YES" | "Y" | "y" => return Ok(true),
"No" | "no" | "NO" | "N" | "n" => return Ok(false),
"Panic" => panic!("panic requested."),
_ => (),
}
println!("\nReply was not understood\n");
}
}
}
impl ssh2::KeyboardInteractivePrompt for KeyboardInteraction
{
fn prompt<'a>(&mut self, username:&str, instructions: &str, prompts: &[ssh2::Prompt<'a>]) -> Vec<String>
{
println!("Asking username {} for its password. {}",username,instructions);
println!("{} prompts?",prompts.len());
stdout().lock().flush().unwrap();
vec![rpassword::prompt_password("Password: ").unwrap()] }
}
struct SlurmOptions
{
time: String,
mem: Option<String>,
maximum_jobs: Option<usize>,
job_pack_size: Option<usize>,
wrapper: Option<PathBuf>,
sbatch_args: Vec<String>,
}
impl Default for SlurmOptions
{
fn default() -> Self
{
SlurmOptions{
time: "0-24:00:00".to_string(),
mem: None,
maximum_jobs: None,
job_pack_size: None,
wrapper: None,
sbatch_args: vec![],
}
}
}
impl SlurmOptions
{
pub fn new(launch_configurations:&[ConfigurationValue]) -> Result<SlurmOptions,Error>
{
let mut maximum_jobs=None;
let mut time:Option<&str> =None;
let mut mem:Option<&str> =None;
let mut job_pack_size=None;
let mut wrapper = None;
let mut sbatch_args : Vec<String> = vec![];
for lc in launch_configurations.iter()
{
match_object_panic!(lc,"Slurm",slurm_value,
"maximum_jobs" => maximum_jobs=Some(slurm_value.as_f64().expect("bad value for maximum_jobs") as usize),
"job_pack_size" => job_pack_size=Some(slurm_value.as_f64().expect("bad value for job_pack_size") as usize),
"time" => time=Some(slurm_value.as_str().expect("bad value for time")),
"mem" => mem=Some(slurm_value.as_str().expect("bad value for mem")),
"wrapper" => wrapper=Some(slurm_value.as_str().expect("bad value for wrapper")),
"sbatch_args" => sbatch_args=slurm_value.as_array()?.iter().map(|x|x.as_str().expect("bad value for sbatch_args").to_string()).collect(),
);
}
Ok(SlurmOptions{
time: time.map(|x|x.to_string()).unwrap_or_else(||"2-24:00:00".to_string()),
mem: mem.map(|x|x.to_string()),
maximum_jobs,
job_pack_size,
wrapper: wrapper.map(|value|Path::new(&value).to_path_buf()),
sbatch_args,
})
}
}
fn gather_slurm_jobs() -> Result<Vec<usize>,Error>
{
let squeue_output=Command::new("squeue")
.arg("-ho")
.arg("%A")
.output().map_err(|e|Error::command_not_found(source_location!(),"squeue".to_string(),e))?;
let squeue_output=String::from_utf8_lossy(&squeue_output.stdout);
squeue_output.lines().map(|line|
line.parse::<usize>().map_err(|e|Error::nonsense_command_output(source_location!()).with_message(format!("error {} on parsing line [{}] from squeue",e,line)))
).collect()
}
fn slurm_get_association(field:&str) -> Result<String,Error>
{
let command = Command::new("sacctmgr")
.arg("list")
.arg("associations")
.arg("-p")
.output().map_err(|e|Error::command_not_found(source_location!(),"squeue".to_string(),e))?;
let output=String::from_utf8_lossy(&command.stdout);
let mut lines = output.lines();
let mut index_user=0;
let mut index_field=0;
let header = lines.next().ok_or_else( ||Error::new(source_location!(),ErrorKind::NonsenseCommandOutput) )?;
for (header_index,header_field) in header.split('|').enumerate()
{
if header_field == "User"
{
index_user=header_index;
}
if header_field == field
{
index_field =header_index;
}
}
let user = std::env::var("USER").map_err(|e|Error::missing_environment_variable(source_location!(),"USER".to_string(),e) )?;
for line in lines
{
let values:Vec<&str> = line.split('|').collect();
if values[index_user]==user
{
return Ok(values[index_field].to_string());
}
}
Err( Error::new(source_location!(),ErrorKind::NonsenseCommandOutput) )
}
fn slurm_get_qos(name:&str, field:&str) -> Result<String,Error>
{
let command=Command::new("sacctmgr")
.arg("show")
.arg("qos")
.arg("-p")
.output().map_err(|e|Error::command_not_found(source_location!(),"sacctmgr".to_string(),e))?;
let output=String::from_utf8_lossy(&command.stdout);
let mut lines = output.lines();
let mut index_name=0;
let mut index_field=0;
let header = lines.next().ok_or_else( ||Error::new(source_location!(),ErrorKind::NonsenseCommandOutput) )?;
for (header_index,header_field) in header.split('|').enumerate()
{
if header_field == "Name"
{
index_name=header_index;
}
if header_field == field
{
index_field =header_index;
}
}
for line in lines
{
let values:Vec<&str> = line.split('|').collect();
if values[index_name]==name
{
return Ok(values[index_field].to_string());
}
}
Err( Error::new(source_location!(),ErrorKind::NonsenseCommandOutput) )
}
pub fn slurm_available_space() -> Result<usize,Error>
{
let command=Command::new("squeue")
.arg("-ho")
.arg("%A")
.arg("--me")
.output().map_err(|e|Error::command_not_found(source_location!(),"squeue".to_string(),e))?;
let output=String::from_utf8_lossy(&command.stdout);
let current = output.lines().count();
let qos = slurm_get_association("Def QOS")?; let maximum = slurm_get_qos(&qos,"MaxSubmitPU")?; let maximum = maximum.parse::<usize>().map_err( |_|Error::new(source_location!(),ErrorKind::NonsenseCommandOutput) )?;
Ok(maximum - current)
}
struct Job
{
execution_code_vec: Vec<String>,
execution_id_vec: Vec<usize>,
}
impl Job
{
fn new()->Job
{
Job{
execution_code_vec:vec![],
execution_id_vec:vec![],
}
}
fn len(&self)->usize
{
self.execution_id_vec.len()
}
fn add_execution(&mut self, execution_id: usize, binary:&Path, execution_path_str: &str)
{
let job_line=format!("echo execution {}\n/bin/date\n{} {}/local.cfg --results={}/local.result",execution_id,binary.display(),execution_path_str,execution_path_str);
self.execution_code_vec.push(job_line);
self.execution_id_vec.push(execution_id);
}
fn write_slurm_script(&self, out:&mut dyn Write,prefix:&str, slurm_options:&SlurmOptions, job_lines:&str)
{
let mem_str = if let Some(s)=&slurm_options.mem { format!("#SBATCH --mem={}\n",s) } else {"".to_string()};
writeln!(out,"#!/bin/bash
#SBATCH --job-name=CAMINOS
#SBATCH -D .
#SBATCH --output={prefix}-%j.out
#SBATCH --error={prefix}-%j.err
#SBATCH --cpus-per-task=1
#SBATCH --ntasks=1
#SBATCH --time={slurm_time}
{mem_str}
sync
{job_lines}
",prefix=prefix,slurm_time=slurm_options.time,mem_str=mem_str,job_lines=job_lines).unwrap();
}
fn launch_slurm_script(&self, directory:&Path,script_name:&str, slurm_options:&SlurmOptions) -> Result<usize,Error>
{
let mut sbatch=Command::new("sbatch");
sbatch.current_dir(directory);
for argument in &slurm_options.sbatch_args
{
sbatch.arg(argument);
}
sbatch.arg(script_name);
let sbatch_output=sbatch.output().map_err(|e|Error::command_not_found(source_location!(),"sbatch".to_string(),e))?;
let mut jobids=vec![];
let sbatch_stdout=String::from_utf8_lossy(&sbatch_output.stdout);
for word in sbatch_stdout.split_whitespace()
{
match word.parse::<usize>()
{
Ok(id) => jobids.push(id),
Err(_) => (),
};
}
if jobids.len()!=1
{
return Err(Error::nonsense_command_output(source_location!()).with_message(format!("sbatch executed but we got incorrect jobids ({:?} from {})",jobids,sbatch_stdout)));
}
Ok(jobids[0])
}
fn slurm(&mut self, internal_job_id:usize, jobs_path:&Path, slurm_options:&SlurmOptions) -> Result<String,Error>
{
let job_lines=self.execution_code_vec.join("\n") + "\n/bin/date\necho job finished\n";
let launch_name=format!("launch{}",internal_job_id);
let launch_script=jobs_path.join(&launch_name);
let mut launch_script_file=File::create(&launch_script).expect("Could not create launch file");
self.write_slurm_script(&mut launch_script_file,&launch_name,slurm_options,&job_lines);
let slurm_job_id=self.launch_slurm_script(jobs_path,&launch_name,slurm_options)?;
let execution_id_string=self.execution_id_vec.iter().map(|id|format!("{}",id)).collect::<Vec<String>>().join(",");
Ok(format!("{}={}[{}], ",slurm_job_id,internal_job_id,execution_id_string))
}
}
#[non_exhaustive]
#[derive(Default)]
pub struct ExperimentOptions
{
pub external_source: Option<PathBuf>,
pub start_index: Option<usize>,
pub end_index: Option<usize>,
pub where_clause: Option<config_parser::Expr>,
pub message: Option<String>,
pub interactive: Option<bool>,
pub foreign: bool,
pub use_csv: Option<PathBuf>,
pub targets: Option<Vec<String>>,
}
pub struct Experiment<'a>
{
files: ExperimentFiles,
options: ExperimentOptions,
journal: PathBuf,
journal_index: usize,
remote_files: Option<ExperimentFiles>,
#[allow(dead_code)]
visible_slurm_jobs: Vec<usize>,
owned_slurm_jobs: Vec<usize>,
experiments_on_slurm: Vec<usize>,
experiment_to_slurm: Vec<Option<(usize,usize,usize)>>,
plugs:&'a Plugs,
}
pub struct ExperimentFiles
{
pub host: Option<String>,
pub username: Option<String>,
ssh2_session: Option<Session>,
pub binary_call: Option<PathBuf>,
pub binary: Option<PathBuf>,
pub root: Option<PathBuf>,
pub cfg_contents: Option<String>,
pub parsed_cfg: Option<config_parser::Token>,
pub runs_path: Option<PathBuf>,
pub experiments: Vec<ConfigurationValue>,
pub launch_configurations: Vec<ConfigurationValue>,
pub packed_results: ConfigurationValue,
}
impl ExperimentFiles
{
pub fn build_cfg_contents(&mut self) -> Result<(),Error>
{
if self.cfg_contents.is_none()
{
let cfg=self.root.as_ref().unwrap().join("main.cfg");
if let Some(session) = &self.ssh2_session {
let sftp = session.sftp().map_err(|e|Error::could_not_start_sftp_session(source_location!(),e))?;
let mut remote_main_cfg = sftp.open(&cfg).map_err(|e|Error::could_not_open_remote_file(source_location!(),cfg.to_path_buf(),e))?;
let mut remote_main_cfg_contents=String::new();
remote_main_cfg.read_to_string(&mut remote_main_cfg_contents).expect("Could not read remote main.cfg.");
self.cfg_contents = Some(remote_main_cfg_contents);
} else {
let cfg_contents={
let mut cfg_contents = String::new();
let mut cfg_file=File::open(&cfg).map_err(|e|Error::could_not_open_file(source_location!(),cfg.to_path_buf(),e))?;
cfg_file.read_to_string(&mut cfg_contents).expect("something went wrong reading main.cfg");
cfg_contents
};
self.cfg_contents = Some(cfg_contents);
}
}
Ok(())
}
pub fn cfg_contents_ref(&self) -> &String
{
self.cfg_contents.as_ref().unwrap()
}
pub fn cfg_enough_content(&self) -> bool
{
match self.cfg_contents
{
None => false,
Some(ref content) => content.len()>=2,
}
}
pub fn build_parsed_cfg(&mut self) -> Result<(),Error>
{
if self.parsed_cfg.is_none()
{
self.build_cfg_contents()?;
let parsed_cfg=config_parser::parse(self.cfg_contents_ref()).map_err(|x|{
let cfg=self.root.as_ref().unwrap().join("main.cfg");
Error::could_not_parse_file(source_location!(),cfg).with_message(format!("error:{:?}",x))
})?;
self.parsed_cfg = Some(parsed_cfg);
}
Ok(())
}
pub fn build_root_path(&mut self) -> Result<(),Error>
{
let root=self.root.as_ref().unwrap();
if let Some(session) = &self.ssh2_session {
let sftp = session.sftp().map_err(|e|Error::could_not_start_sftp_session(source_location!(),e))?;
let mut to_create = vec![root.to_owned()];
while !to_create.is_empty()
{
let dir = to_create.last().unwrap();
match sftp.stat(dir)
{
Ok(remote_stat) =>
{
if !remote_stat.is_dir()
{
panic!("remote {:?} exists, but is not a directory",&remote_stat);
}
},
Err(_err) =>
{
eprintln!("Could not open remote '{:?}', creating it",root);
match sftp.mkdir(dir,0o755)
{
Ok(_) =>
{
to_create.pop();
}
Err(e) =>
{
let parent = dir.parent().ok_or_else(||error!(remote_file_system_error,e).with_message(format!("{:?} has no parent to create",dir)))?.to_owned();
eprintln!("Trying to create its parent directory {:?}",parent);
to_create.push(parent);
}
}
},
};
}
} else {
fs::create_dir_all(root).map_err(|e|error!(could_not_generate_file,root.to_path_buf(),e))?;
}
Ok(())
}
pub fn build_runs_path(&mut self) -> Result<(),Error>
{
if self.runs_path.is_none()
{
let mut is_old=false;
if self.root.as_ref().unwrap().join("run0").is_dir()
{
is_old=true;
}
let runs_path = if is_old
{
self.root.as_ref().unwrap().join("")
}
else
{
let runs_path=self.root.as_ref().unwrap().join("runs");
if !runs_path.is_dir()
{
fs::create_dir(&runs_path).expect("Something went wrong when creating the runs directory.");
}
runs_path
};
let runs_path=runs_path.canonicalize().map_err(|e|{
let message=format!("The runs path \"{:?}\" cannot be resolved (error {})",runs_path,e);
Error::file_system_error(source_location!(),e).with_message(message)
})?;
self.runs_path = Some( runs_path );
}
Ok(())
}
pub fn build_experiments(&mut self) -> Result<(),Error>
{
self.build_parsed_cfg()?;
self.experiments=match self.parsed_cfg
{
Some(config_parser::Token::Value(ref value)) =>
{
let flat=flatten_configuration_value(value);
if let ConfigurationValue::Experiments(experiments)=flat
{
experiments
}
else
{
let cfg = self.root.as_ref().unwrap().join("main.cfg");
return Err(Error::could_not_parse_file(source_location!(),cfg).with_message("there are not experiments".to_string()));
}
},
_ =>
{
let cfg = self.root.as_ref().unwrap().join("main.cfg");
return Err(Error::could_not_parse_file(source_location!(),cfg));
}
};
Ok(())
}
pub fn build_launch_configurations(&mut self)->Result<(),Error>
{
self.build_parsed_cfg()?;
if let config_parser::Token::Value(ref value)=self.parsed_cfg.as_ref().unwrap()
{
if let &ConfigurationValue::Object(ref cv_name, ref cv_pairs)=value
{
if cv_name!="Configuration"
{
return Err( Error::ill_formed_configuration(source_location!(),value.clone()).with_message(format!("A simulation must be created from a `Configuration` object not `{}`",cv_name)) );
}
for &(ref name,ref value) in cv_pairs
{
match name.as_ref()
{
"launch_configurations" => match value
{
&ConfigurationValue::Array(ref l) => self.launch_configurations = l.clone(),
_ => return Err( Error::ill_formed_configuration(source_location!(),value.clone()).with_message("bad value for launch_configurations".to_string() ) ),
}
_ => (),
}
}
}
else
{
return Err( Error::ill_formed_configuration(source_location!(),value.clone()).with_message("Those are not experiments.".to_string() ) );
}
}
Ok(())
}
pub fn compare_cfg(&self, other:&ExperimentFiles) -> Result<(),Error>
{
let local_content = self.cfg_contents.as_ref().unwrap();
let other_content = other.cfg_contents.as_ref().unwrap();
if local_content == other_content {
println!("The configurations match");
return Ok(());
} else {
let mut last_both=None;
let mut show_both=false;
let mut count_left=0;
let mut count_right=0;
let mut show_count=true;
for diff in diff::lines(local_content, other_content)
{
match diff {
diff::Result::Left(x) =>
{
if show_count
{
println!("@left line {}, right line {}",count_left,count_right);
show_count=false;
}
if let Some(p)=last_both.take()
{
println!(" {}",p);
}
println!("-{}",x);
show_both=true;
count_left+=1;
},
diff::Result::Right(x) =>
{
if show_count
{
println!("@left line {}, right line {}",count_left,count_right);
show_count=false;
}
if let Some(p)=last_both.take()
{
println!(" {}",p);
}
println!("+{}",x);
show_both=true;
count_right+=1;
},
diff::Result::Both(x,_) =>
{
if show_both
{
println!(" {}",x);
show_both=false;
}
last_both = Some(x);
show_count=true;
count_left+=1;
count_right+=1;
},
}
}
let cfg = self.root.as_ref().unwrap().join("main.cfg");
let remote_cfg_path = other.root.as_ref().unwrap().join("main.cfg");
let username = other.username.as_ref().unwrap();
let host = other.host.as_ref().unwrap();
return Err(Error::incompatible_configurations(source_location!()).with_message(format!("The configurations do not match.\nYou may try$ vimdiff {:?} scp://{}@{}/{:?}\n",cfg,username,host,remote_cfg_path)));
}
}
pub fn build_packed_results(&mut self)
{
let packed_results_path = self.root.as_ref().unwrap().join("binary.results");
self.packed_results = if let Some(session) = &self.ssh2_session {
match session.scp_recv(&packed_results_path)
{
Ok( (mut remote_binary_results_channel, _stat) ) => {
let mut remote_binary_results_contents= vec![];
remote_binary_results_channel.read_to_end(&mut remote_binary_results_contents).expect("Could not read remote binary.results");
let got = config::config_from_binary(&remote_binary_results_contents,0).expect("something went wrong while deserializing binary.results");
match got
{
ConfigurationValue::Experiments(ref _a) => {
},
_ => panic!("A non-Experiments stored on binary.results"),
};
got
},
Err(_) => ConfigurationValue::None,
}
} else {
let n = self.experiments.len();
match File::open(&packed_results_path)
{
Err(_) => {
ConfigurationValue::Experiments( (0..n).map(|_|ConfigurationValue::None).collect() )
},
Ok(ref mut file) => {
let mut contents = Vec::with_capacity(n);
file.read_to_end(&mut contents).expect("something went wrong reading binary.results");
let got = config::config_from_binary(&contents,0).expect("something went wrong while deserializing binary.results");
match got
{
ConfigurationValue::Experiments(ref a) => {
if a.len()!=n {
panic!("The Experiments stored in binary.results has length {} instead of {} as the number of experiment items",a.len(),n);
}
},
_ => panic!("A non-Experiments stored on binary.results"),
};
got
},
}
};
}
pub fn get_outputs_path(&self) -> PathBuf
{
let path = self.root.as_ref().unwrap().join("outputs");
if !path.is_dir()
{
if path.exists()
{
panic!("There exists \"outputs\", but it is not a directory.");
}
fs::create_dir(&path).expect("Something went wrong when creating the outputs folder.");
}
path
}
pub fn example_cfg() -> &'static str
{
include_str!("defaults/main.cfg")
}
pub fn example_od() -> &'static str
{
include_str!("defaults/main.od")
}
pub fn example_remote() -> &'static str
{
include_str!("defaults/remote")
}
}
impl Debug for ExperimentFiles
{
fn fmt(&self, formatter: &mut Formatter<'_>) -> std::result::Result<(), std::fmt::Error>
{
write!(formatter,"ExperimentFiles{{")?;
write!(formatter,"host={:?},",self.host)?;
write!(formatter,"username={:?},",self.username)?;
write!(formatter,"ssh2_session={:?},",
if self.ssh2_session.is_some() {
"Some session"
} else {
"None"
}
)?;
write!(formatter,"binary={:?},",self.binary)?;
write!(formatter,"root={:?},",self.root)?;
write!(formatter,"cfg_contents={:?},",self.cfg_contents)?;
write!(formatter,"parsed_cfg={:?},",self.parsed_cfg)?;
write!(formatter,"runs_path={:?},",self.runs_path)?;
write!(formatter,"experiments={:?},",self.experiments)?;
write!(formatter,"launch_configurations={:?},",self.launch_configurations)?;
write!(formatter,"packed_results={:?},",self.packed_results)?;
write!(formatter,"}}")?;
Ok(())
}
}
impl<'a> Experiment<'a>
{
pub fn new(binary:&Path,root:&Path,plugs:&'a Plugs,options:ExperimentOptions)->Experiment<'a>
{
println!("Preparing experiment with {:?} as path",root);
let visible_slurm_jobs:Vec<usize> = gather_slurm_jobs().unwrap_or_default();
let journal=root.join("journal");
let journal_file=OpenOptions::new().read(true).write(true).create(true).open(&journal).expect("Something went wrong reading or creating the journal file");
let mut journal_index=0;
let reader = BufReader::new(journal_file);
let mut owned_slurm_jobs=vec![];
let mut experiments_on_slurm=vec![];
let mut experiment_to_slurm = vec![];
for rline in reader.lines()
{
let line=rline.expect("bad line read from journal");
if ! line.is_empty()
{
let mut s = line.split(':');
let prefix=s.next().expect("Not found the expected journal index");
journal_index= 1usize+prefix.parse::<usize>().unwrap_or_else(|_|panic!("The journal index must be a non-negative integer (received {})",prefix));
let entry = s.next().expect("No content found on the journal line");
if entry.starts_with(" Launched jobs ")
{
let mut slurm_items=entry.split(' ');
slurm_items.next(); slurm_items.next(); slurm_items.next(); for slurm_item in slurm_items
{
if slurm_item.is_empty()
{
continue;
}
let mut slurm_pair = slurm_item.split('=');
let slurm_job_id = slurm_pair.next().unwrap().parse::<usize>().unwrap_or_else(|_|panic!("left term on '{}' should be an integer",slurm_item));
let slurm_job_content = slurm_pair.next().unwrap();
let left_bracket_index = slurm_job_content.find('[').unwrap();
let right_bracket_index = slurm_job_content.find(']').unwrap();
let experiments:Vec<usize> =slurm_job_content[left_bracket_index+1 .. right_bracket_index].split(',').map(|item|item.parse::<usize>().unwrap_or_else(|_|panic!("failed with content={} for item {}",slurm_job_content,slurm_item))).collect();
let batch = slurm_job_content[..left_bracket_index].parse::<usize>().unwrap_or_else(|_|panic!("failed to get batch for item {}",slurm_item));
let track = Some( (journal_index-1, batch, slurm_job_id) );
for &experiment_index in experiments.iter()
{
if experiment_index>=experiment_to_slurm.len()
{
experiment_to_slurm.resize(experiment_index+1,None);
}
experiment_to_slurm[experiment_index]= track;
}
if visible_slurm_jobs.contains(&slurm_job_id)
{
owned_slurm_jobs.push(slurm_job_id);
experiments_on_slurm.extend(experiments);
}
}
}
if entry==" message"
{
println!("journal message {}",line);
}
}
}
Experiment{
files: ExperimentFiles{
host: None,
username: None,
ssh2_session: None,
binary_call: Some(binary.to_path_buf()),
binary: Some(std::env::current_exe().expect("could not get the current executing binary").canonicalize().expect("could not canonicalize the path to the binary.")),
root: Some(root.to_path_buf()),
cfg_contents: None,
parsed_cfg: None,
runs_path: None,
experiments: Vec::new(),
launch_configurations: Vec::new(),
packed_results: ConfigurationValue::None,
},
options,
journal,
journal_index,
remote_files: None,
visible_slurm_jobs,
owned_slurm_jobs,
experiments_on_slurm,
experiment_to_slurm,
plugs,
}
}
fn write_journal_entry(&self, entry:&str)
{
let mut journal_file=OpenOptions::new().append(true).open(&self.journal).expect("Something went wrong reading or creating the journal file");
writeln!(journal_file,"{}: {}",self.journal_index,entry).expect("Could not write to journal");
}
pub fn execute_action(&mut self,action:Action) -> Result<(),Error>
{
let now = chrono::Utc::now();
self.write_journal_entry(&format!("Executing action {} on {}.", action, now.format("%Y %m(%b) %0d(%a), %T (UTC%:z)")));
let cfg=self.files.root.as_ref().unwrap().join("main.cfg");
match action
{
Action::Shell =>
{
if cfg.exists()
{
panic!("{:?} already exists, could not proceed with the shell action. To generate new files delete main.cfg manually.",cfg);
}
let path_main_od = self.files.root.as_ref().unwrap().join("main.od");
let path_remote = self.files.root.as_ref().unwrap().join("remote");
if let Some(ref path) = self.options.external_source
{
fs::copy(path.join("main.cfg"),&cfg).map_err(|e|error!(could_not_generate_file,cfg,e).with_message(format!("trying to copy it from {path:?}")))?;
let external_main_od = path.join("main.od");
if external_main_od.exists(){
fs::copy(external_main_od,&path_main_od).map_err(|e|Error::could_not_generate_file(source_location!(),path_main_od,e))?;
} else {
println!("There is not main.od on the source given [{path:?}], creating a default one.");
let mut new_od_file=File::create(&path_main_od).map_err(|e|Error::could_not_generate_file(source_location!(),path_main_od.to_path_buf(),e))?;
writeln!(new_od_file,"{}",ExperimentFiles::example_od()).map_err(|e|Error::could_not_generate_file(source_location!(),path_main_od,e))?;
}
let external_remote = path.join("remote");
if external_remote.exists() {
let mut content=String::new();
let mut external_remote_file=File::open(&external_remote).map_err(|e|Error::could_not_open_file(source_location!(),external_remote.to_path_buf(),e))?;
external_remote_file.read_to_string(&mut content).map_err(|e|Error::could_not_generate_file(source_location!(),external_remote.to_path_buf(),e))?;
let external_directory_name = path.canonicalize().expect("path does not have canonical form").file_name().expect("could not get name of the external folder").to_str().unwrap().to_string();
let directory_name = self.files.root.as_ref().unwrap().canonicalize().expect("path does not have canonical form").file_name().expect("could not get name of the external folder").to_str().unwrap().to_string();
content = content.replace(&external_directory_name,&directory_name);
let mut new_remote_file=File::create(&path_remote).map_err(|e|Error::could_not_generate_file(source_location!(),path_remote.to_path_buf(),e))?;
writeln!(new_remote_file,"{}",content).map_err(|e|Error::could_not_generate_file(source_location!(),path_remote,e))?;
} else {
println!("There is not remote on the source given [{path:?}], creating a default one.");
let mut new_remote_file=File::create(&path_remote).map_err(|e|Error::could_not_generate_file(source_location!(),path_remote.to_path_buf(),e))?;
writeln!(new_remote_file,"{}",ExperimentFiles::example_remote()).map_err(|e|Error::could_not_generate_file(source_location!(),path_remote,e))?;
}
} else {
let mut new_cfg_file=File::create(&cfg).map_err(|e|Error::could_not_generate_file(source_location!(),cfg.to_path_buf(),e))?;
writeln!(new_cfg_file,"{}",ExperimentFiles::example_cfg()).map_err(|e|Error::could_not_generate_file(source_location!(),cfg,e))?;
let mut new_od_file=File::create(&path_main_od).map_err(|e|Error::could_not_generate_file(source_location!(),path_main_od.to_path_buf(),e))?;
writeln!(new_od_file,"{}",ExperimentFiles::example_od()).map_err(|e|Error::could_not_generate_file(source_location!(),path_main_od,e))?;
let mut new_remote_file=File::create(&path_remote).map_err(|e|Error::could_not_generate_file(source_location!(),path_remote.to_path_buf(),e))?;
writeln!(new_remote_file,"{}",ExperimentFiles::example_remote()).map_err(|e|Error::could_not_generate_file(source_location!(),path_remote,e))?;
};
},
_ => (),
}
let mut results;
self.files.build_experiments().or_else(|e|if self.options.foreign {Ok(())} else {Err(e)})?;
let external_files = if let (Some(path),true) = (self.options.external_source.as_ref(), action!=Action::Shell ) {
let mut ef = ExperimentFiles{
host: None,
username: None,
ssh2_session: None,
binary_call: None,
binary: None,
root: Some(path.to_path_buf()),
cfg_contents: None,
parsed_cfg: None,
runs_path: None,
experiments: Vec::new(),
launch_configurations: Vec::new(),
packed_results: ConfigurationValue::None,
};
ef.build_experiments().map_err(|e|e.with_message("could not build external experiments".to_string()))?;
ef.build_packed_results();
Some(ef)
} else {
None
};
if let Some(message)=&self.options.message
{
self.write_journal_entry(&format!("message: {}",message));
}
self.files.build_packed_results();
let mut added_packed_results = 0usize;
let mut removed_packed_results = 0usize;
let mut must_draw=false;
let mut job_pack_size=1; let mut job=Job::new();
let mut slurm_options: Option<SlurmOptions> = None;
let mut uses_jobs=false;
match action
{
Action::LocalAndOutput =>
{
must_draw=true;
},
Action::Local =>
{
must_draw=false;
},
Action::Output =>
{
must_draw=true;
},
Action::Slurm =>
{
uses_jobs=true;
if self.files.build_launch_configurations().is_ok()
{
let n = self.files.experiments.len();
if let Ok(got) = SlurmOptions::new(&self.files.launch_configurations)
{
if let Some(value)=got.maximum_jobs
{
let new_job_pack_size=(n + value-1 ) / value; if new_job_pack_size>=job_pack_size
{
job_pack_size=new_job_pack_size;
}
else
{
panic!("Trying to reduce job_pack_size from {} to {}.",job_pack_size,new_job_pack_size);
}
}
if let Some(value)=got.job_pack_size
{
if job_pack_size!=1 && value!=1
{
panic!("Trying to change job_pack_size unexpectedly");
}
job_pack_size = value;
}
slurm_options=Some(got);
} else {
slurm_options = Some( SlurmOptions::default() );
}
if let Ok(available) = slurm_available_space()
{
println!("Available number of jobs to send to slurm is {}",available);
}
}
},
Action::Check =>
{
must_draw=false;
},
Action::Pull =>
{
self.initialize_remote()?;
self.remote_files.as_mut().unwrap().build_cfg_contents()?;
self.files.compare_cfg(self.remote_files.as_ref().unwrap())?;
},
Action::RemoteCheck =>
{
self.initialize_remote()?;
let remote_root=self.remote_files.as_ref().unwrap().root.clone().unwrap();
let remote_binary=self.remote_files.as_ref().unwrap().binary.clone().unwrap();
let mut channel = self.remote_files.as_ref().unwrap().ssh2_session.as_ref().unwrap().channel_session().unwrap();
let remote_command = format!("{:?} {:?} --action=check",remote_binary,remote_root);
channel.exec(&remote_command).unwrap();
let mut remote_command_output = String::new();
channel.read_to_string(&mut remote_command_output).unwrap();
channel.stderr().read_to_string(&mut remote_command_output).unwrap();
channel.wait_close().expect("Could not close the channel of remote executions.");
channel.exit_status().unwrap();
for line in remote_command_output.lines()
{
println!("at remote: {}",line);
}
},
Action::Push =>
{
self.initialize_remote()?;
let remote_root=self.remote_files.as_ref().unwrap().root.clone().unwrap();
let sftp = self.remote_files.as_ref().unwrap().ssh2_session.as_ref().unwrap().sftp().unwrap();
match sftp.stat(&remote_root)
{
Ok(remote_stat) =>
{
if !remote_stat.is_dir()
{
panic!("remote {:?} exists, but is not a directory",&remote_stat);
}
},
Err(_err) =>
{
eprintln!("Could not open remote '{:?}', creating it",remote_root);
self.remote_files.as_mut().unwrap().build_root_path()?;
},
};
self.remote_files.as_mut().unwrap().build_cfg_contents().ok();
if self.remote_files.as_ref().unwrap().cfg_enough_content() {
self.files.compare_cfg(self.remote_files.as_ref().unwrap())?;
} else {
let remote_cfg_path = remote_root.join("main.cfg");
let mut remote_cfg = sftp.create(&remote_cfg_path).expect("Could not create remote main.cfg");
write!(remote_cfg,"{}",self.files.cfg_contents_ref()).expect("Could not write into remote main.cfg");
let mut remote_od = sftp.create(&remote_root.join("main.od")).expect("Could not create remote main.od");
let mut local_od = File::open(self.files.root.as_ref().unwrap().join("main.od")).expect("Could not open local main.od");
let mut od_contents = String::new();
local_od.read_to_string(&mut od_contents).expect("something went wrong reading main.od");
write!(remote_od,"{}",od_contents).expect("Could not write into remote main.od");
}
},
Action::SlurmCancel =>
{
let mut scancel=&mut Command::new("scancel");
for jobid in self.owned_slurm_jobs.iter()
{
scancel = scancel.arg(jobid.to_string());
}
scancel.output().map_err(|e|Error::command_not_found(source_location!(),"scancel".to_string(),e))?;
},
Action::Shell => (),
Action::Pack => (),
Action::Discard => (),
Action::QuickTest => (),
};
let must_draw=must_draw;
let job_pack_size=job_pack_size;
let slurm_options = slurm_options;
let uses_jobs=uses_jobs;
self.files.build_runs_path()?;
let runs_path : PathBuf = self.files.runs_path.as_ref().unwrap().to_path_buf();
let start_index = self.options.start_index.unwrap_or(0);
if start_index>self.files.experiments.len() {panic!("start_index={} > experiments.len()={}",start_index,self.files.experiments.len());}
let end_index = self.options.end_index.unwrap_or(self.files.experiments.len());
if end_index>self.files.experiments.len() {panic!("end_index={} > experiments.len()={}",end_index,self.files.experiments.len());}
let jobs_path=runs_path.join(format!("jobs{}",self.journal_index));
let mut launch_entry="".to_string();
if uses_jobs && !jobs_path.is_dir()
{
fs::create_dir(&jobs_path).expect("Something went wrong when creating the jobs directory.");
}
let before_amount_slurm=self.experiments_on_slurm.len(); let mut before_amount_inactive=0; let mut before_amount_active=0; let mut delta_amount_slurm=0;
let mut delta_completed=0;
let sftp = self.remote_files.as_ref().map(|f|f.ssh2_session.as_ref().unwrap().sftp().unwrap());
let mut progress = ActionProgress::new(&action,end_index-start_index);
for (experiment_index,experiment) in self.files.experiments.iter().enumerate().skip(start_index).take(end_index-start_index)
{
progress.inc(1);
if let Some(ref expr) = self.options.where_clause
{
match evaluate(expr,experiment,self.files.root.as_ref().unwrap())?
{
ConfigurationValue::True => (), ConfigurationValue::False => continue, x => panic!("The where clause evaluate to a non-bool type ({:?})",x),
}
}
let experiment_path=runs_path.join(format!("run{}",experiment_index));
if !experiment_path.is_dir()
{
use Action::*;
match action
{
Local|LocalAndOutput|Slurm => fs::create_dir(&experiment_path).expect("Something went wrong when creating the run directory."),
_ => (),
}
}
let is_packed = if let ConfigurationValue::Experiments(ref a) = self.files.packed_results {
! matches!(a[experiment_index],ConfigurationValue::None)
} else {false};
let result_path=experiment_path.join("local.result");
let has_file = result_path.is_file();
let has_content=if !has_file
{
before_amount_inactive+=1;
false
}
else
{
result_path.metadata().unwrap().len()>=5
};
let mut is_merged = false;
if !has_content && !is_packed
{
if let Some(ref external_files) = external_files
{
for (ext_index,ext_experiment) in external_files.experiments.iter().enumerate()
{
if config::config_relaxed_cmp(experiment,ext_experiment)
{
let mut ext_result_contents=None;
let mut ext_result_value:Option<ConfigurationValue> = None;
if let ConfigurationValue::Experiments(ref a) = external_files.packed_results
{
let external_value = &a[ext_index];
if *external_value!=ConfigurationValue::None
{
ext_result_value = Some( external_value.clone() );
}
} else {
let ext_path=self.options.external_source.as_ref().unwrap().join(format!("runs/run{}/local.result",ext_index));
let mut ext_result_file=match File::open(&ext_path)
{
Ok(rf) => rf,
Err(_error) =>
{
continue;
}
};
let mut aux=String::new();
ext_result_file.read_to_string(&mut aux).expect("Could not read remote result file.");
if aux.len()>=5
{
ext_result_contents = Some ( aux );
}
}
if ext_result_contents.is_some() || ext_result_value.is_some()
{
if let ConfigurationValue::Experiments(ref mut a) = self.files.packed_results
{
if ext_result_value.is_none()
{
if let Some(ref contents) = ext_result_contents
{
match config_parser::parse(contents)
{
Ok(cv) =>
{
let result=match cv
{
config_parser::Token::Value(value) => value,
_ => panic!("wrong token"),
};
ext_result_value = Some(result);
}
Err(_error)=>
{
eprintln!("pulled invalid results (experiment {}).",experiment_index);
}
}
}
}
a[experiment_index] = ext_result_value.unwrap();
added_packed_results+=1;
}
else
{
if ext_result_contents.is_none()
{
ext_result_contents = Some(format!("{}",ext_result_value.as_ref().unwrap()));
}
let mut new_result_file=File::create(&result_path).expect("Could not create result file.");
writeln!(new_result_file,"{}",ext_result_contents.unwrap()).unwrap();
}
progress.merged+=1;
is_merged=true;
}
}
}
}
}
if let (true,Action::Pack) = (has_content,action)
{
let mut result_file=match File::open(&result_path)
{
Ok(rf) => rf,
Err(_error) =>
{
continue;
}
};
let mut result_contents=String::new();
result_file.read_to_string(&mut result_contents).expect("something went wrong reading the result file.");
let result = match config_parser::parse(&result_contents)
{
Ok(cv) =>
{
match cv
{
config_parser::Token::Value(value) => value,
_ => panic!("wrong token"),
}
}
Err(_error)=>
{
eprintln!("There are missing results (experiment {}).",experiment_index);
ConfigurationValue::None
}
};
if let ConfigurationValue::Experiments(ref mut a) = self.files.packed_results
{
match a[experiment_index]
{
ConfigurationValue::None =>
{
a[experiment_index] = result;
added_packed_results+=1;
},
_ =>
{
if a[experiment_index] != result
{
panic!("Packed mistmatch at experiment index {}",experiment_index);
}
},
};
} else { panic!("broken pack"); }
}
if has_content || is_packed || is_merged
{
progress.before_amount_completed+=1;
if let Action::Discard = action
{
let silent = ! self.options.interactive.unwrap_or(true);
if is_merged
{
panic!("What are you doing merging and discarding simultaneously!?");
}
let keyboard = KeyboardInteraction{};
if is_packed
{
if silent || keyboard.ask_confirmation(&format!("remove experiment {experiment_index} from packed results."))?
{
if let ConfigurationValue::Experiments(ref mut a) = self.files.packed_results {
a[experiment_index] = ConfigurationValue::None;
removed_packed_results+=1;
} else { panic!("but it was packed.") };
}
}
if has_content
{
if silent || keyboard.ask_confirmation(&format!("remove file {result_path:?} for experiment {experiment_index}."))?
{
std::fs::remove_file(&result_path).map_err(|e|error!(file_system_error,e).with_message(format!("could not delete file {result_path:?}")))?;
}
}
progress.discarded+=1;
}
}
else
{
if has_file
{
before_amount_active+=1;
}
match action
{
Action::Local | Action::LocalAndOutput =>
{
println!("experiment {} of {} is {}",experiment_index,self.files.experiments.len(),experiment.format_terminal());
let mut simulation=Simulation::new(experiment,self.plugs);
simulation.run();
simulation.write_result(&mut File::create(&result_path).expect("Could not create the result file."));
},
Action::Slurm => if !self.experiments_on_slurm.contains(&experiment_index)
{
let real_experiment_path=experiment_path.canonicalize().expect("This path cannot be resolved");
let experiment_path_string = real_experiment_path.to_str().expect("You should use paths representable with unicode");
let local_cfg=experiment_path.join("local.cfg");
let mut local_cfg_file=File::create(&local_cfg).expect("Could not create local.cfg file");
writeln!(local_cfg_file,"{}",experiment).unwrap();
let slurm_options = slurm_options.as_ref().unwrap();
let binary = slurm_options.wrapper.as_ref().unwrap_or_else(||self.files.binary.as_ref().unwrap());
job.add_execution(experiment_index,binary,experiment_path_string);
if job.len()>=job_pack_size
{
delta_amount_slurm+=job.len();
let job_id=experiment_index;
match job.slurm(job_id,&jobs_path,slurm_options)
{
Ok( launched_batch ) => launch_entry += &launched_batch,
Err( e ) =>
{
eprintln!("Error when launching jobs:\n{}\ntrying to terinate the action without launching more.",e);
job=Job::new();
break;
}
}
job=Job::new();
}
},
Action::Pull =>
{
let (remote_result,remote_result_contents) =
{
self.remote_files.as_mut().unwrap().build_packed_results();
let binary_result = match self.remote_files.as_ref().unwrap().packed_results{
ConfigurationValue::Experiments(ref a) => if let ConfigurationValue::None = a[experiment_index] { None } else { Some(a[experiment_index].clone()) },
ConfigurationValue::None => None,
_ => panic!("remote binary.results is corrupted"),
};
match binary_result
{
Some(x)=> (Some(x),None),
None => {
let remote_root=self.remote_files.as_ref().unwrap().root.clone().unwrap();
let remote_result_path = remote_root.join(format!("runs/run{}/local.result",experiment_index));
let mut remote_result_file = match sftp.as_ref().unwrap().open(&remote_result_path)
{
Ok(file) => file,
Err(_err) =>
{
progress.missing+=1;
continue;
}
};
let mut remote_result_contents=String::new();
remote_result_file.read_to_string(&mut remote_result_contents).expect("Could not read remote result file.");
if remote_result_contents.len()<5
{
progress.empty+=1;
(None,Some(remote_result_contents))
} else {
match config_parser::parse(&remote_result_contents)
{
Ok(cv) =>
{
let result=match cv
{
config_parser::Token::Value(value) => value,
_ => panic!("wrong token"),
};
(Some(result),Some(remote_result_contents))
}
Err(_error)=>
{
println!("pulled invalid results (experiment {}).",experiment_index);
(None,None)
}
}
}
},
}
};
if let Some(result) = remote_result
{
if let ConfigurationValue::Experiments(ref mut a) = self.files.packed_results
{
a[experiment_index] = result;
added_packed_results+=1;
}
else
{
let remote_result_contents = match remote_result_contents
{
Some(x) => x,
None => format!("{}",result),
};
let mut new_result_file=File::create(&result_path).expect("Could not create result file.");
writeln!(new_result_file,"{}",remote_result_contents).unwrap();
}
delta_completed+=1;
progress.pulled+=1;
}
}
Action::Check =>
{
if experiment_index < self.experiment_to_slurm.len()
{
if let Some( (journal_entry,batch,slurm_id) ) = self.experiment_to_slurm[experiment_index]
{
let slurm_stderr_path = runs_path.join(format!("jobs{}/launch{}-{}.err",journal_entry,batch,slurm_id));
let mut stderr_contents = String::new();
if let Ok(mut stderr_file) = File::open(&slurm_stderr_path)
{
stderr_file.read_to_string(&mut stderr_contents).unwrap_or_else(|_|panic!("something went wrong reading {:?}",slurm_stderr_path));
if stderr_contents.len()>=2
{
println!("Experiment {} contains errors in {:?}: {} bytes",experiment_index,slurm_stderr_path,stderr_contents.len());
println!("First error line: {}",stderr_contents.lines().next().expect("Unable to read first line from errors."));
progress.errors+=1;
}
}
}
}
}
Action::QuickTest =>
{
let mut simulation=Simulation::new(experiment,self.plugs);
for _ in 0..20 {
simulation.advance();
}
},
Action::Output | Action::RemoteCheck | Action::Push | Action::SlurmCancel | Action::Shell | Action::Pack | Action::Discard =>
{
},
};
}
}
progress.finish();
if job.len()>0
{
let job_id=self.files.experiments.len();
let slurm_options = slurm_options.as_ref().unwrap();
match job.slurm(job_id,&jobs_path,slurm_options)
{
Ok( launched_batch ) => launch_entry += &launched_batch,
Err( e ) =>
{
eprintln!("Error when launching remaining jobs:\n{}\ntrying to terminate the action.",e);
}
}
drop(job);
}
if ! launch_entry.is_empty()
{
self.write_journal_entry(&format!("Launched jobs {}",launch_entry));
}
let status_string = format!("Before: completed={} of {} slurm={} inactive={} active={} Changed: slurm=+{} completed=+{}",progress.before_amount_completed,self.files.experiments.len(),before_amount_slurm,before_amount_inactive,before_amount_active,delta_amount_slurm,delta_completed);
self.write_journal_entry(&status_string);
println!("{}",status_string);
println!("Now: completed={} of {}. {} on slurm",progress.before_amount_completed+delta_completed,self.files.experiments.len(),before_amount_slurm+delta_amount_slurm);
if must_draw
{
results=Vec::with_capacity(self.files.experiments.len());
for (experiment_index,experiment) in self.files.experiments.iter().enumerate().skip(start_index).take(end_index-start_index)
{
if let ConfigurationValue::Experiments(ref a) = self.files.packed_results
{
match &a[experiment_index]
{
&ConfigurationValue::None => (),
result => {
results.push(
OutputEnvironmentEntry::new(experiment_index)
.with_experiment(experiment.clone())
.with_result(result.clone())
);
continue;
},
}
}
let experiment_path=runs_path.join(format!("run{}",experiment_index));
let result_path=experiment_path.join("local.result");
let mut result_file=match File::open(&result_path)
{
Ok(rf) => rf,
Err(_error) =>
{
continue;
}
};
let mut result_contents=String::new();
result_file.read_to_string(&mut result_contents).expect("something went wrong reading the result file.");
match config_parser::parse(&result_contents)
{
Ok(cv) =>
{
let result=match cv
{
config_parser::Token::Value(value) => value,
_ => panic!("wrong token"),
};
if let ConfigurationValue::Experiments(ref mut a) = self.files.packed_results
{
a[experiment_index] = result.clone();
added_packed_results+=1;
}
results.push(
OutputEnvironmentEntry::new(experiment_index)
.with_experiment(experiment.clone())
.with_result(result.clone())
);
}
Err(_error)=>
{
println!("There are missing results (experiment {}).",experiment_index);
}
}
}
if let Some(csv) = &self.options.use_csv
{
let mut csv_contents = String::new();
let mut csv_file=File::open(&csv).map_err(|e|Error::could_not_open_file(source_location!(),csv.to_path_buf(),e))?;
csv_file.read_to_string(&mut csv_contents).expect("something went wrong reading {csv}");
let mut lines = csv_contents.lines();
let header : Vec<String> = lines.next().ok_or_else( ||error!(could_not_parse_file,csv.to_path_buf()) )?
.split(',').map(|x|x.trim().to_string()).collect();
for (csv_index,line) in lines.enumerate()
{
let values : Vec<String> = line.split(',').map(|x|x.trim().to_string()).collect();
if values.is_empty()
{
println!("Skipping empty CSV line {csv_index}");
continue
}
if values.len() != header.len()
{
return Err(error!(could_not_parse_file,csv.to_owned()));
}
let res = if csv_index < results.len() { &mut results[csv_index] } else {
results.push(OutputEnvironmentEntry::new(csv_index));
results.last_mut().unwrap()
};
let attrs = (0..header.len()).map(|attr_index|{
let value = &values[attr_index];
let value_f64 = value.parse::<f64>().ok();
let value = if let Some(x) = value_f64 {
ConfigurationValue::Number(x)
} else {
ConfigurationValue::Literal(value.to_string())
};
(header[attr_index].clone(),value)
}).collect();
let csv = ConfigurationValue::Object("CSV".to_string(),attrs);
res.csv = Some(csv);
}
}
match results.len()
{
0 => println!("There are no results. Skipping output generation."),
result_count =>
{
println!("There are {} results.",result_count);
let od=self.files.root.as_ref().unwrap().join("main.od");
let mut od_file=File::open(&od).expect("main.od could not be opened");
let mut od_contents = String::new();
od_file.read_to_string(&mut od_contents).expect("something went wrong reading main.od");
let total = self.files.experiments.len().max(results.len());
let mut environment = OutputEnvironment::new(
results,
total,
&self.files,
&self.options.targets,
);
match config_parser::parse(&od_contents)
{
Err(x) => return Err(error!(could_not_parse_file,od).with_message(format!("error parsing output description file: {:?}",x))),
Ok(config_parser::Token::Value(ConfigurationValue::Array(ref descriptions))) => for description in descriptions.iter()
{
match create_output(description,&mut environment)
{
Ok(_) => (),
Err(err) => eprintln!("ERROR: could not create output {:?}",err),
}
},
_ => panic!("The output description file does not contain a list.")
};
}
}
}
if added_packed_results>=1 || removed_packed_results>=1
{
let packed_results_path = self.files.root.as_ref().unwrap().join("binary.results");
let mut binary_results_file=File::create(&packed_results_path).expect("Could not create binary results file.");
let binary_results = config::config_to_binary(&self.files.packed_results).expect("error while serializing into binary");
binary_results_file.write_all(&binary_results).expect("error happened when creating binary file");
println!("Added {} results to binary.results.",added_packed_results);
if removed_packed_results>=1
{
println!("Removed {} results from binary.results.",removed_packed_results);
}
}
if let (Action::Pack,ConfigurationValue::Experiments(ref a)) = (action,&self.files.packed_results)
{
for (experiment_index,value) in a.iter().enumerate()
{
match value
{
ConfigurationValue::None => (),
_ =>
{
let experiment_path=runs_path.join(format!("run{}",experiment_index));
if experiment_path.exists()
{
if !experiment_path.is_dir()
{
panic!("Somehow {:?} exists but is not a directory",experiment_path);
}
fs::remove_dir_all(&experiment_path).unwrap_or_else(|e|panic!("Error {} when removing directory {:?} and its contents",e,experiment_path));
}
}
}
}
}
let fin = format!("Finished action {} on {}.", action, now.format("%Y %m(%b) %0d(%a), %T (UTC%:z)"));
self.write_journal_entry(&fin);
println!("{}",fin);
Ok(())
}
fn initialize_remote(&mut self) -> Result<(),Error>
{
let remote_path = self.files.root.as_ref().unwrap().join("remote");
let mut remote_file = File::open(&remote_path).expect("remote could not be opened");
let mut remote_contents = String::new();
remote_file.read_to_string(&mut remote_contents).expect("something went wrong reading remote.");
let parsed_remote=match config_parser::parse(&remote_contents)
{
Err(x) => panic!("error parsing remote file: {:?}",x),
Ok(x) => x,
};
match parsed_remote
{
config_parser::Token::Value(ref value) =>
{
if let ConfigurationValue::Array(ref l)=value
{
for remote_value in l
{
let mut name:Option<String> = None;
let mut host:Option<String> = None;
let mut username:Option<String> = None;
let mut root:Option<String> = None;
let mut binary:Option<String> = None;
if let &ConfigurationValue::Object(ref cv_name, ref cv_pairs)=remote_value
{
if cv_name!="Remote"
{
panic!("A remote must be created from a `Remote` object not `{}`",cv_name);
}
for &(ref cvname,ref value) in cv_pairs
{
match cvname.as_ref()
{
"name" => match value
{
&ConfigurationValue::Literal(ref s) => name=Some(s.to_string()),
_ => panic!("bad value for a remote name"),
},
"host" => match value
{
&ConfigurationValue::Literal(ref s) => host=Some(s.to_string()),
_ => panic!("bad value for a remote host"),
},
"username" => match value
{
&ConfigurationValue::Literal(ref s) => username=Some(s.to_string()),
_ => panic!("bad value for a remote username"),
},
"root" => match value
{
&ConfigurationValue::Literal(ref s) => root=Some(s.to_string()),
_ => panic!("bad value for a remote root"),
},
"binary" => match value
{
&ConfigurationValue::Literal(ref s) => binary=Some(s.to_string()),
_ => panic!("bad value for a remote binary"),
},
_ => panic!("Nothing to do with field {} in Remote",cvname),
}
}
}
else
{
panic!("Trying to create a remote from a non-Object");
}
if name==Some("default".to_string())
{
self.remote_files = Some(ExperimentFiles {
host,
username,
ssh2_session: None,
binary_call: None,
binary: binary.map(|value|Path::new(&value).to_path_buf()),
root: root.map(|value|Path::new(&value).to_path_buf()),
cfg_contents: None,
parsed_cfg: None,
runs_path: None,
experiments: vec![],
launch_configurations: Vec::new(),
packed_results: ConfigurationValue::None,
});
}
}
}
else
{
panic!("there are not remotes");
}
},
_ => panic!("Not a value"),
};
let host=self.remote_files.as_ref().unwrap().host.as_ref().expect("there is no host").to_owned();
let tcp = TcpStream::connect(format!("{}:22",host)).unwrap();
let mut session = Session::new().unwrap();
session.set_tcp_stream(tcp);
session.handshake().map_err(|e|error!(authentication_failed,e))?;
let mut prompt = KeyboardInteraction;
let username = self.remote_files.as_ref().unwrap().username.as_ref().expect("there is no username").to_owned();
let raw_methods = session.auth_methods(&username).unwrap();
let methods: HashSet<&str> = raw_methods.split(',').collect();
println!("{} available authentication methods ({})",methods.len(),raw_methods);
let mut last_error = None;
if !session.authenticated() && methods.contains("publickey")
{
let home = dirs::home_dir().ok_or_else(||error!(undetermined).with_message(format!("could not get home path")))?;
let mut private_key_paths : Vec<PathBuf> = {
let ssh_config=Command::new("ssh")
.arg("-G")
.arg(&host)
.output().map_err(|e|Error::command_not_found(source_location!(),"squeue".to_string(),e))?;
let ssh_config_output=String::from_utf8_lossy(&ssh_config.stdout);
ssh_config_output.lines().filter_map(|line|{
line.strip_prefix("identityfile ").map(|s|{
match s.strip_prefix("~/")
{
None => PathBuf::from(s),
Some(r) => home.join(r),
}
})
}).collect()
};
if private_key_paths.is_empty()
{
let ssh_config_dir = home.join(".ssh/");
private_key_paths = ["id_rsa","id_ecdsa","id_ecdsa_sk","id_ed25519","id_ed25519_sk","id_xmss","id_dsa"]
.iter().map(|name|ssh_config_dir.join(name)).collect()
}
println!("Attempt to use private_key_paths={private_key_paths:?}");
for private_key in private_key_paths.into_iter()
{
if private_key.is_file()
{
let pub_key = private_key.with_extension("pub");
let pub_key = if pub_key.is_file() { Some(pub_key.as_ref()) } else { None };
let passphrase = None;
println!("username={username} pub_key={pub_key:?} private_key={private_key:?}");
match session.userauth_pubkey_file(&username,pub_key,&private_key,passphrase)
{
Ok(_) => break,
Err(e) =>
{
let error = Error::authentication_failed(source_location!(),e);
eprintln!("SSH method publickey failed: {error:?}");
last_error = Some(error);
}
}
}
}
}
if !session.authenticated() && methods.contains("keyboard-interactive")
{
if let Err(e) = session.userauth_keyboard_interactive(&username,&mut prompt)
{
let error = Error::authentication_failed(source_location!(),e);
eprintln!("SSH method keyboard-interactive failed: {error:?}");
last_error = Some(error);
}
}
if !session.authenticated() && methods.contains("password")
{
let password=prompt.ask_password(&username,&host);
if let Err(e) = session.userauth_password(&username,&password)
{
let error = Error::authentication_failed(source_location!(),e);
eprintln!("SSH method password failed: {error:?}");
last_error = Some(error);
}
}
if !session.authenticated()
{
eprintln!("All SSH authentication methods failed.");
return Err(last_error.unwrap());
}
assert!(session.authenticated());
self.remote_files.as_mut().unwrap().ssh2_session = Some(session);
println!("ssh2 session created with remote host");
self.remote_files.as_mut().unwrap().build_packed_results();
Ok(())
}
}
#[derive(Debug)]
pub struct ActionProgress
{
bar: ProgressBar,
pulled: usize,
empty: usize,
missing: usize,
merged: usize,
discarded: usize,
errors: usize,
before_amount_completed: usize,
}
impl ActionProgress
{
pub fn new(action:&Action,size:usize)->ActionProgress
{
let bar = ProgressBar::new(size as u64);
bar.set_style(ProgressStyle::default_bar().template("{prefix} [{elapsed_precise}] {bar:30.blue/white.dim} {pos:5}/{len:5} {msg}"));
match action
{
Action::Pull => bar.set_prefix("pulling files"),
Action::Local | Action::LocalAndOutput => bar.set_prefix("running locally"),
Action::Slurm => bar.set_prefix("preparing slurm scripts"),
_ => bar.set_prefix("checking result files"),
};
ActionProgress{
bar,
pulled: 0,
empty: 0,
missing: 0,
merged: 0,
discarded: 0,
errors: 0,
before_amount_completed: 0,
}
}
pub fn inc(&self, increment:u64)
{
self.update();
self.bar.inc(increment);
}
pub fn finish(&self)
{
self.update();
self.bar.finish()
}
pub fn update(&self)
{
let values = vec![ (self.pulled,"pulled"), (self.empty,"empty"), (self.missing,"missing"), (self.before_amount_completed,"already"), (self.merged,"merged"), (self.discarded,"discarded"), (self.errors,"errors") ];
let message : String = values.iter().filter_map(|(x,s)|{
if *x>0 { Some(format!("{} {}",x,s)) } else { None }
}).collect::<Vec<_>>().join(", ");
self.bar.set_message(message);
}
}