use super::{
error::{Error, FileContext},
timeout::Timeout,
Result,
};
use serde::Serialize;
use std::{
collections::BTreeMap,
convert::TryFrom,
fs::{self, File},
io::BufReader,
path::{Component, Path, PathBuf},
str::FromStr,
};
#[derive(Debug)]
pub struct GWasmBinary<'a> {
pub js: &'a [u8],
pub wasm: &'a [u8],
}
#[derive(Debug)]
pub struct TaskBuilder<'a> {
binary: GWasmBinary<'a>,
name: Option<String>,
bid: Option<f64>,
budget: Option<f64>,
timeout: Option<Timeout>,
subtask_timeout: Option<Timeout>,
input_dir_path: PathBuf,
output_dir_path: PathBuf,
output_path: Option<PathBuf>,
subtask_data: Vec<Vec<u8>>,
}
impl<'a> TaskBuilder<'a> {
#[deprecated(
since = "0.3.0",
note = "Use try_new instead, which properly handles relative paths"
)]
pub fn new<P: AsRef<Path>>(workspace: P, binary: GWasmBinary<'a>) -> Self {
Self {
binary,
name: None,
bid: None,
budget: None,
timeout: None,
subtask_timeout: None,
input_dir_path: workspace.as_ref().join("in"),
output_dir_path: workspace.as_ref().join("out"),
output_path: None,
subtask_data: Vec::new(),
}
}
pub fn try_new<P: AsRef<Path>>(workspace: P, binary: GWasmBinary<'a>) -> Result<Self> {
let abspath = workspace.as_ref().canonicalize()?;
#[allow(deprecated)]
Ok(Self::new(abspath, binary))
}
pub fn name<S: AsRef<str>>(mut self, name: S) -> Self {
self.name = Some(name.as_ref().to_owned());
self
}
pub fn bid(mut self, bid: f64) -> Self {
self.bid = Some(bid);
self
}
pub fn budget(mut self, budget: f64) -> Self {
self.budget = Some(budget);
self
}
pub fn timeout(mut self, timeout: Timeout) -> Self {
self.timeout = Some(timeout);
self
}
pub fn subtask_timeout(mut self, subtask_timeout: Timeout) -> Self {
self.subtask_timeout = Some(subtask_timeout);
self
}
pub fn output_path<P: AsRef<Path>>(mut self, output_path: P) -> Self {
self.output_path = Some(output_path.as_ref().into());
self
}
pub fn push_subtask_data<T: Into<Vec<u8>>>(mut self, data: T) -> Self {
self.subtask_data.push(data.into());
self
}
pub fn build(mut self) -> Result<Task> {
let name = self.name.take().unwrap_or("unknown".to_owned());
let bid = self.bid.unwrap_or(1.0);
let timeout = self.timeout.unwrap_or(
Timeout::from_str("00:10:00")
.expect("could correctly parse default task timeout value"),
);
let subtask_timeout = self.subtask_timeout.unwrap_or(
Timeout::from_str("00:10:00")
.expect("could correctly parse default subtask timeout value"),
);
let js_name = format!("{}.js", name);
let wasm_name = format!("{}.wasm", name);
let mut options = Options::new(
js_name,
wasm_name,
self.input_dir_path.clone(),
self.output_dir_path.clone(),
self.output_path.clone(),
);
fs::create_dir(&options.input_dir_path).file_context(&options.input_dir_path)?;
let js_filename = options.input_dir_path.join(&options.js_name);
fs::write(&js_filename, self.binary.js).file_context(&js_filename)?;
let wasm_filename = options.input_dir_path.join(&options.wasm_name);
fs::write(&wasm_filename, self.binary.wasm).file_context(&wasm_filename)?;
fs::create_dir(&options.output_dir_path).file_context(&options.output_dir_path)?;
for (i, chunk) in self.subtask_data.into_iter().enumerate() {
let name = format!("subtask_{}", i);
let input_dir_path = options.input_dir_path.join(&name);
fs::create_dir(&input_dir_path).file_context(&input_dir_path)?;
let output_dir_path = options.output_dir_path.join(&name);
fs::create_dir(&output_dir_path).file_context(&output_dir_path)?;
let input_name = format!("in{}", i);
let input_filename = input_dir_path.join(&input_name);
fs::write(&input_filename, &chunk).file_context(&input_filename)?;
let mut subtask = Subtask::new();
subtask.exec_args.push(input_name.into());
let output_name = "out";
subtask.exec_args.push(output_name.into());
subtask.output_file_paths.push(output_name.into());
options.subtasks.insert(name, subtask);
}
Ok(Task::new(
name,
bid,
self.budget,
timeout,
subtask_timeout,
options,
))
}
}
#[derive(Debug, Serialize, Clone)]
pub struct Task {
#[serde(rename = "type")]
task_type: String,
name: String,
bid: f64,
#[serde(skip_serializing_if = "Option::is_none")]
budget: Option<f64>,
timeout: Timeout,
subtask_timeout: Timeout,
options: Options,
}
impl Task {
pub fn new<S: Into<String>>(
name: S,
bid: f64,
budget: Option<f64>,
timeout: Timeout,
subtask_timeout: Timeout,
options: Options,
) -> Self {
Self {
task_type: "wasm".into(),
name: name.into(),
bid,
budget,
timeout,
subtask_timeout,
options,
}
}
pub fn name(&self) -> &str {
&self.name
}
pub fn bid(&self) -> f64 {
self.bid
}
pub fn budget(&self) -> Option<f64> {
self.budget
}
pub fn timeout(&self) -> &Timeout {
&self.timeout
}
pub fn subtask_timeout(&self) -> &Timeout {
&self.subtask_timeout
}
pub fn options(&self) -> &Options {
&self.options
}
}
#[derive(Debug, Serialize, Clone)]
pub struct Options {
js_name: String,
wasm_name: String,
#[serde(rename = "input_dir")]
input_dir_path: PathBuf,
#[serde(rename = "output_dir")]
output_dir_path: PathBuf,
#[serde(skip_serializing_if = "Option::is_none")]
output_path: Option<PathBuf>,
subtasks: BTreeMap<String, Subtask>,
}
impl Options {
pub fn new<S: Into<String>, P: Into<PathBuf>>(
js_name: S,
wasm_name: S,
input_dir_path: P,
output_dir_path: P,
output_path: Option<P>,
) -> Self {
Self {
js_name: js_name.into(),
wasm_name: wasm_name.into(),
input_dir_path: input_dir_path.into(),
output_dir_path: output_dir_path.into(),
output_path: output_path.map(Into::into),
subtasks: BTreeMap::new(),
}
}
pub fn js_name(&self) -> &str {
&self.js_name
}
pub fn wasm_name(&self) -> &str {
&self.wasm_name
}
pub fn input_dir_path(&self) -> &Path {
&self.input_dir_path
}
pub fn output_dir_path(&self) -> &Path {
&self.output_dir_path
}
pub fn output_path(&self) -> Option<&Path> {
self.output_path.as_ref().map(AsRef::as_ref)
}
pub fn subtasks(&self) -> impl Iterator<Item = (&str, &Subtask)> {
self.subtasks
.iter()
.map(|(name, subtask)| (name.as_str(), subtask))
}
pub fn add_subtask(&mut self, name: String, subtask: Subtask) {
self.subtasks.insert(name, subtask);
}
}
#[derive(Debug, Serialize, Clone)]
pub struct Subtask {
pub exec_args: Vec<String>,
pub output_file_paths: Vec<PathBuf>,
}
impl Subtask {
pub fn new() -> Self {
Self {
exec_args: Vec::new(),
output_file_paths: Vec::new(),
}
}
pub fn exec_args(&self) -> impl Iterator<Item = &str> {
self.exec_args.iter().map(|s| s.as_str())
}
pub fn output_file_paths(&self) -> impl Iterator<Item = &Path> {
self.output_file_paths.iter().map(|p| p.as_ref())
}
}
#[derive(Debug)]
pub struct ComputedTask {
pub name: String,
pub bid: f64,
pub timeout: Timeout,
pub subtask_timeout: Timeout,
pub subtasks: Vec<ComputedSubtask>,
}
#[derive(Debug)]
pub struct ComputedSubtask {
pub data: BTreeMap<PathBuf, BufReader<File>>,
pub name: String,
}
impl TryFrom<Task> for ComputedTask {
type Error = Error;
fn try_from(task: Task) -> Result<Self> {
let name = task.name;
let bid = task.bid;
let timeout = task.timeout;
let subtask_timeout = task.subtask_timeout;
let mut computed_subtasks = Vec::new();
for (s_name, subtask) in task.options.subtasks() {
let output_dir = task.options.output_dir_path().join(s_name);
let mut computed_subtask = ComputedSubtask {
data: BTreeMap::new(),
name: String::from(s_name),
};
for out_path in subtask.output_file_paths() {
let relative_path = out_path
.strip_prefix(Component::RootDir)
.unwrap_or(out_path);
let fname = output_dir.join(relative_path);
let f = File::open(&fname).file_context(&fname)?;
let reader = BufReader::new(f);
computed_subtask.data.insert(out_path.into(), reader);
}
computed_subtasks.push(computed_subtask);
}
Ok(Self {
name,
bid,
timeout,
subtask_timeout,
subtasks: computed_subtasks,
})
}
}