use std::fs::read_dir;
use std::fs::{create_dir, remove_dir_all, File};
use std::io::{BufReader, BufWriter, Write};
use std::path::{Path, PathBuf};
use std::result::Result as StdResult;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use bincode;
use ctrlc::set_handler;
use log::LevelFilter;
use ron::{self, ser::PrettyConfig};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use simple_logging::log_to_file;
use tar::{Archive as TarArchive, Builder as TarBuilder};
use time::{Duration, Error as TimeError, OffsetDateTime};
use crate::error::{IndexOutOfBounds, Result, StatesNotRecorded};
use crate::params::FromParams;
pub trait Runnable: FromParams + Sized {
type State: Clone;
fn step(&mut self) -> Option<&Self::State>;
}
pub const DATA_DIR_NAME: &'static str = "data";
pub const SETTINGS_FILE_NAME: &'static str = "settings.ron";
pub const STATS_FILE_NAME: &'static str = "stats.ron";
pub const PARAMS_FILE_NAME: &'static str = "params.ron";
pub const LOG_FILE_NAME: &'static str = "run.log";
pub fn run_and_save<R: Runnable>(mut settings: Settings) -> Result<Run<R>>
where
R::Params: Serialize + DeserializeOwned,
R::State: Serialize,
{
let mut stats = Stats::new_paused()?;
let is_running = Arc::new(AtomicBool::new(true));
let r = is_running.clone();
set_handler(move || {
r.store(false, Ordering::SeqCst);
})?;
if settings.verbose {
match settings.name {
Some(ref name) => println!("Starting run for {}", name),
None => println!("Starting run"),
}
println!("\tmax_iterations: {:?}", settings.max_iterations);
println!("\tkeep_in_memory: {}", settings.keep_in_memory);
println!("\toutput_dir: {:?}", settings.output_dir);
println!("\tfilename_prefix: {:?}", settings.filename_prefix);
println!("\tfilename_suffix: {:?}", settings.filename_suffix);
println!();
}
let params: R::Params = load_ron(&settings.parameter_file)?;
if settings.verbose {
println!("Parameters loaded");
}
let (run_dir, data_dir) = if let Some(output_dir) = settings.output_dir {
let run_dir = output_dir;
create_dir(&run_dir)?;
log_to_file(run_dir.join(LOG_FILE_NAME), LevelFilter::Info)?;
let data_dir = run_dir.join(DATA_DIR_NAME);
create_dir(&data_dir)?;
(Some(run_dir), Some(data_dir))
} else {
(None, None)
};
let mut runnable = R::from_params(params.clone())?;
settings.output_dir = run_dir;
let mut states = if settings.keep_in_memory {
Some(StateStorage::new_memory())
} else {
match data_dir {
Some(dir) => Some(StateStorage::new_disk(
dir,
settings.filename_prefix.clone(),
settings.filename_suffix.clone(),
)),
None => None,
}
};
while is_running.load(Ordering::SeqCst) {
if let Some(max_iterations) = settings.max_iterations {
if settings.verbose {
print!(
"\rRunning iteration: ({}/{})",
stats.iterations + 1,
max_iterations
);
std::io::stdout().flush()?;
}
log::info!(
"Running iteration: ({}/{})",
stats.iterations + 1,
max_iterations
);
} else {
if settings.verbose {
print!("\rRunning iteration {}", stats.iterations + 1);
std::io::stdout().flush()?;
}
log::info!("Running iteration {}", stats.iterations + 1);
}
stats.start()?; let state_opt = runnable.step();
stats.pause()?;
match state_opt {
Some(state) => {
match states.as_mut() {
Some(StateStorage::InMemory(states)) => {
states.push(stats.timestamp(state.clone())?);
if settings.verbose {
print!(", Saved to memory");
}
}
Some(StateStorage::OnDisk {
dir,
prefix,
suffix,
count,
..
}) => {
let path =
get_indexed_path(dir, prefix.as_ref(), suffix.as_ref(), *count);
save_bincode(&path, &stats.timestamp(state)?)?;
*count += 1;
if settings.verbose {
print!(", Saved to disk");
}
}
None => {
if settings.verbose {
print!(", State information not saved");
}
}
}
stats.next_iteration();
if let Some(max_iterations) = settings.max_iterations {
if stats.iterations >= max_iterations {
break;
}
}
}
None => break,
}
}
println!();
stats.pause()?;
let mut run = Run {
settings,
stats,
params,
states,
};
run.save()?;
if run.settings.verbose {
println!(
"Algorithm running time: {:.4} seconds",
run.stats.get_running_time().as_seconds_f64()
);
if let Some(total_time) = run.stats.get_total_time() {
println!("Total time: {:.4} seconds", total_time.as_seconds_f64());
}
}
return Ok(run);
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Settings {
pub verbose: bool,
pub algorithm: String,
pub name: Option<String>,
pub parameter_file: PathBuf,
pub max_iterations: Option<usize>,
pub keep_in_memory: bool,
pub double: bool,
pub output_dir: Option<PathBuf>,
pub filename_prefix: Option<String>,
pub filename_suffix: Option<String>,
}
impl Settings {
pub fn load<P: AsRef<Path>>(dir: P) -> Result<Self> {
let path = dir.as_ref().join(SETTINGS_FILE_NAME);
let mut settings: Self = load_ron(&path)?;
if settings.verbose {
println!("Settings loaded from \"{}\"", path.display());
}
settings.output_dir = Some(dir.as_ref().to_path_buf());
Ok(settings)
}
}
#[derive(Debug, Clone, Copy)]
enum TimerState {
Running { last_start_time: OffsetDateTime },
Paused,
Finished,
}
impl Default for TimerState {
fn default() -> Self {
TimerState::Finished
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct Stats {
start_time: OffsetDateTime,
#[serde(skip)]
state: TimerState,
total_time: Option<Duration>,
running_time: Duration,
iterations: usize,
}
impl Stats {
fn new_paused() -> Result<Self> {
Ok(Self {
start_time: Self::time_now()?,
state: TimerState::Paused,
total_time: None,
running_time: Duration::ZERO,
iterations: 0,
})
}
fn timestamp<T>(&self, obj: T) -> Result<TimeStamped<T>> {
Ok(TimeStamped {
time_since_start: self.total_time_since_start()?,
time_running_since_start: self.running_time_since_start()?,
obj,
})
}
fn time_now() -> StdResult<OffsetDateTime, TimeError> {
Ok(OffsetDateTime::now_utc())
}
fn running_time_since_start(&self) -> Result<Duration> {
match self.state {
TimerState::Running { last_start_time } => {
let since_last_start = Self::time_now()? - last_start_time;
Ok(since_last_start + self.get_running_time())
}
TimerState::Paused | TimerState::Finished => Ok(self.get_running_time()),
}
}
fn total_time_since_start(&self) -> Result<Duration> {
Ok(Self::time_now()? - self.start_time)
}
fn start(&mut self) -> Result<()> {
match self.state {
TimerState::Running { .. } => {
Ok(())
}
TimerState::Paused => {
self.state = TimerState::Running {
last_start_time: Self::time_now()?,
};
Ok(())
}
TimerState::Finished => {
Ok(())
}
}
}
fn pause(&mut self) -> Result<()> {
match self.state {
TimerState::Running { .. } => {
self.running_time = self.running_time_since_start()?;
self.state = TimerState::Paused;
Ok(())
}
TimerState::Paused => {
Ok(())
}
TimerState::Finished => {
Ok(())
}
}
}
fn finish(&mut self) -> Result<()> {
self.pause()?;
self.state = TimerState::Finished;
self.total_time = Some(self.total_time_since_start()?);
Ok(())
}
pub fn get_total_time(&self) -> Option<&Duration> {
self.total_time.as_ref()
}
pub fn get_running_time(&self) -> Duration {
self.running_time
}
fn next_iteration(&mut self) {
self.iterations += 1;
}
pub fn get_iterations(&self) -> usize {
self.iterations
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound(
serialize = "T: Serialize",
deserialize = "T: DeserializeOwned"
))]
enum StateStorage<T> {
InMemory(Vec<T>),
OnDisk {
dir: PathBuf,
prefix: Option<String>,
suffix: Option<String>,
count: usize,
#[serde(skip)]
last: Option<T>,
},
}
impl<T> StateStorage<T> {
fn new_memory() -> Self {
Self::InMemory(Vec::new())
}
fn load_memory(
dir: &PathBuf,
prefix: Option<String>,
suffix: Option<String>,
count: usize,
verbose: bool,
) -> Result<Self>
where
T: DeserializeOwned,
{
let mut vec = Vec::new();
for i in 0..count {
let path = get_indexed_path(dir, prefix.as_ref(), suffix.as_ref(), i);
let new = load_bincode(path)?;
vec.push(new);
if verbose {
print!("\r> Loading states from file: ({}/{})", i + 1, count);
}
}
if verbose {
println!(" Done");
}
Ok(Self::InMemory(vec))
}
fn new_disk(
dir: PathBuf,
prefix: Option<String>,
suffix: Option<String>,
) -> Self {
Self::OnDisk {
dir,
prefix,
suffix,
count: 0,
last: None,
}
}
fn load_disk(
dir: &PathBuf,
prefix: Option<String>,
suffix: Option<String>,
count: usize,
) -> Result<Self> {
Ok(Self::OnDisk {
dir: dir.clone(),
prefix,
suffix,
count,
last: None,
})
}
fn get(&mut self, index: usize) -> Result<&T>
where
T: DeserializeOwned,
{
match self {
StateStorage::InMemory(states) => match states.get(index) {
Some(obj) => Ok(obj),
None => Err(IndexOutOfBounds)?,
},
StateStorage::OnDisk {
dir,
prefix,
suffix,
count,
last,
} => {
if index >= *count {
Err(IndexOutOfBounds)?;
}
let path =
get_indexed_path(dir, prefix.as_ref(), suffix.as_ref(), index);
let ref_ = last.insert(load_bincode(&path)?);
Ok(ref_)
}
}
}
fn try_get_all(&self) -> Option<&Vec<T>> {
match self {
StateStorage::InMemory(states) => Some(states),
StateStorage::OnDisk {
dir: _,
prefix: _,
suffix: _,
count: _,
last: _,
} => None,
}
}
fn count(&self) -> usize {
match self {
StateStorage::InMemory(states) => states.len(),
StateStorage::OnDisk {
dir: _,
prefix: _,
suffix: _,
count,
last: _,
} => *count,
}
}
}
impl<T> Drop for StateStorage<T> {
fn drop(&mut self) {
if let StateStorage::OnDisk {
dir,
prefix: _,
suffix: _,
count: _,
last: _,
} = self
{
remove_dir_all(&dir).unwrap();
println!("> Removed \"{}\"", dir.display());
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(bound(
serialize = "T: Serialize",
deserialize = "T: DeserializeOwned"
))]
pub struct TimeStamped<T> {
time_since_start: Duration,
time_running_since_start: Duration,
obj: T,
}
impl<T> TimeStamped<T> {
pub fn get_timestamp(&self) -> Duration {
self.time_since_start
}
pub fn get_running_timestamp(&self) -> Duration {
self.time_running_since_start
}
pub fn get_value(&self) -> &T {
&self.obj
}
}
impl<T> From<TimeStamped<&T>> for TimeStamped<T>
where
T: Clone,
{
fn from(timestamped: TimeStamped<&T>) -> Self {
Self {
time_since_start: timestamped.time_since_start,
time_running_since_start: timestamped.time_running_since_start,
obj: timestamped.obj.clone(),
}
}
}
#[derive(Debug)]
pub struct Run<R: Runnable> {
settings: Settings,
stats: Stats,
params: R::Params,
states: Option<StateStorage<TimeStamped<R::State>>>,
}
impl<R: Runnable> Clone for Run<R> {
fn clone(&self) -> Self {
Self {
settings: self.settings.clone(),
stats: self.stats.clone(),
params: self.params.clone(),
states: self.states.clone(),
}
}
}
impl<R: Runnable> Run<R> {
pub fn name(&self) -> Option<&String> {
self.settings.name.as_ref()
}
pub fn settings(&self) -> &Settings {
&self.settings
}
pub fn params(&self) -> &R::Params {
&self.params
}
pub fn max_iterations(&self) -> Option<usize> {
self.settings.max_iterations
}
pub fn count(&self) -> usize {
match self.states {
Some(ref states) => states.count(),
None => 0,
}
}
pub fn get(&mut self, index: usize) -> Result<&TimeStamped<R::State>>
where
R::State: DeserializeOwned,
{
match self.states {
Some(ref mut states) => states.get(index),
None => Err(StatesNotRecorded)?,
}
}
pub fn try_get_all(&self) -> Option<&Vec<TimeStamped<R::State>>> {
match self.states {
Some(ref states) => states.try_get_all(),
None => None,
}
}
pub fn save(&mut self) -> Result<()>
where
R::Params: Serialize,
R::State: Serialize,
{
match &self.states {
Some(StateStorage::InMemory(states)) => {
if let Some(ref output_dir) = self.settings.output_dir {
let dir = output_dir.join(DATA_DIR_NAME);
for (i, state) in states.iter().enumerate() {
let path = get_indexed_path(
&dir,
self.settings.filename_prefix.clone(),
self.settings.filename_suffix.clone(),
i,
);
save_bincode(&path, state)?;
if self.settings.verbose {
print!("\r> Saving states to file: ({}/{})", i + 1, states.len());
}
}
if self.settings.verbose {
println!(" Done");
}
}
}
Some(StateStorage::OnDisk {
dir: _,
prefix: _,
suffix: _,
count: _,
last: _,
}) => {
}
None => (), }
if let Some(ref output_dir) = self.settings.output_dir {
let data_dir = output_dir.join(DATA_DIR_NAME);
let data_file = output_dir.join(format!("{}.tar", DATA_DIR_NAME));
let file = File::create(&data_file)?;
let mut tar = TarBuilder::new(file);
for (i, entry) in read_dir(&data_dir)?.enumerate() {
let entry = entry?;
if entry.file_type()?.is_file() {
let file_path = entry.path();
let mut file = File::open(&file_path)?;
let archive_path = file_path.strip_prefix(&data_dir)?;
if self.settings.verbose {
print!(
"\r> Compressing states: ({}/{})",
i + 1,
self.stats.iterations
);
}
tar.append_file(archive_path, &mut file)?;
}
}
let file = tar.into_inner()?;
drop(file);
if self.settings.verbose {
println!(" Done\n> Data archived to \"{}\"", data_file.display());
}
let path = output_dir.join(SETTINGS_FILE_NAME);
save_ron(&path, &self.settings)?;
if self.settings.verbose {
println!("> Settings saved to \"{}\"", path.display());
}
let path = output_dir.join(PARAMS_FILE_NAME);
save_ron(&path, &self.params)?;
if self.settings.verbose {
println!("> Parameters saved to \"{}\"", path.display());
}
self.stats.finish()?;
let path = output_dir.join(STATS_FILE_NAME);
save_ron(&path, &self.stats)?;
if self.settings.verbose {
println!("> Statistics saved to \"{}\"", path.display());
}
}
Ok(())
}
pub fn load(settings: Settings) -> Result<Self>
where
R::Params: DeserializeOwned,
R::State: Serialize + DeserializeOwned,
{
let dir = match settings.output_dir {
Some(ref output_dir) => output_dir,
None => Err(StatesNotRecorded)?,
};
if settings.verbose {
match settings.name {
Some(ref name) => println!("Visualizing {}", name),
None => println!("Visualizing"),
}
println!("\tmax_iterations: {:?}", settings.max_iterations);
println!("\tkeep_in_memory: {}", settings.keep_in_memory);
println!("\toutput_dir: {:?}", settings.output_dir);
println!("\tfilename_prefix: {:?}", settings.filename_prefix);
println!("\tfilename_suffix: {:?}", settings.filename_suffix);
println!();
}
let path = dir.join(STATS_FILE_NAME);
let stats: Stats = load_ron(&path)?;
if settings.verbose {
println!("> Statistics loaded from \"{}\"", path.display());
}
let path = dir.join(PARAMS_FILE_NAME);
let params: R::Params = load_ron(&path)?;
if settings.verbose {
println!("> Parameters loaded from \"{}\"", path.display());
}
let data_dir = dir.join(DATA_DIR_NAME);
create_dir(&data_dir)?;
let data_file = dir.join(format!("{}.tar", DATA_DIR_NAME));
let file = File::open(data_file)?;
let mut archive = TarArchive::new(file);
for (i, entry) in archive.entries()?.enumerate() {
let mut file = entry?;
print!("\r> Uncompressing states: ({}/{})", i + 1, stats.iterations);
file.unpack_in(&data_dir)?;
}
println!(" Done");
let states = if settings.keep_in_memory {
Some(StateStorage::load_memory(
&data_dir,
settings.filename_prefix.clone(),
settings.filename_suffix.clone(),
stats.get_iterations(),
settings.verbose,
)?)
} else {
Some(StateStorage::load_disk(
&data_dir,
settings.filename_prefix.clone(),
settings.filename_suffix.clone(),
stats.get_iterations(),
)?)
};
Ok(Run {
settings,
stats,
params,
states,
})
}
}
pub fn save_ron<T, P>(path: P, obj: &T) -> Result<()>
where
T: Serialize,
P: AsRef<Path>,
{
let file = BufWriter::new(File::create(path)?);
Ok(ron::ser::to_writer_pretty(
file,
obj,
PrettyConfig::default(),
)?)
}
pub fn load_ron<T, P>(path: P) -> Result<T>
where
T: DeserializeOwned,
P: AsRef<Path>,
{
let file = BufReader::new(File::open(path)?);
Ok(ron::de::from_reader(file)?)
}
pub fn save_bincode<T, P>(path: P, obj: &T) -> Result<()>
where
T: Serialize,
P: AsRef<Path>,
{
let file = BufWriter::new(File::create(path)?);
Ok(bincode::serialize_into(file, obj)?)
}
pub fn load_bincode<T, P>(path: P) -> Result<T>
where
T: DeserializeOwned,
P: AsRef<Path>,
{
let file = BufReader::new(File::open(path)?);
Ok(bincode::deserialize_from(file)?)
}
fn get_indexed_path<P, S1, S2>(
directory: P,
prefix: Option<S1>,
suffix: Option<S2>,
index: usize,
) -> PathBuf
where
P: AsRef<Path>,
S1: AsRef<str>,
S2: AsRef<str>,
{
let filename = match (prefix, suffix) {
(Some(prefix), Some(suffix)) => {
prefix.as_ref().to_owned() + &index.to_string() + suffix.as_ref()
}
(Some(prefix), None) => prefix.as_ref().to_owned() + &index.to_string(),
(None, Some(suffix)) => index.to_string() + suffix.as_ref(),
(None, None) => index.to_string(),
};
directory.as_ref().join(filename)
}