#![feature(async_await)]
use bigml::{
resource::{execution, Execution, Id, Resource, Script},
Client,
};
use common_failures::{quick_main, Result};
use env_logger;
use failure::{Error, ResultExt};
use futures::{compat::Future01CompatExt, Future, FutureExt, TryFutureExt};
use log::debug;
use std::{env, pin::Pin, sync::Arc};
use structopt::StructOpt;
use tokio::{
codec::{FramedRead, FramedWrite, LinesCodec},
io,
prelude::{stream, Stream},
runtime::Runtime,
};
mod execution_input;
mod line_delimited_json_codec;
use execution_input::ExecutionInput;
use line_delimited_json_codec::LineDelimitedJsonCodec;
type BoxStream<T> = Box<dyn Stream<Item = T, Error = Error> + Send + 'static>;
type BoxFuture<T> = Pin<Box<dyn Future<Output = Result<T>> + Send + 'static>>;
#[derive(Debug, StructOpt)]
#[structopt(
name = "bigml-parallel",
about = "Execute WhizzML script in parallel over one or more BigML resources"
)]
struct Opt {
#[structopt(long = "script", short = "s")]
script: Id<Script>,
#[structopt(long = "name", short = "n")]
name: Option<String>,
#[structopt(long = "resource", short = "r")]
resources: Vec<String>,
#[structopt(
long = "resource-input-name",
short = "R",
default_value = "resource"
)]
resource_input_name: String,
#[structopt(long = "input", short = "i")]
inputs: Vec<ExecutionInput>,
#[structopt(long = "output", short = "o")]
outputs: Vec<String>,
#[structopt(long = "max-tasks", short = "J", default_value = "2")]
max_tasks: usize,
}
quick_main!(run);
fn run() -> Result<()> {
env_logger::init();
let opt = Opt::from_args();
debug!("command-line options: {:?}", opt);
let fut = run_async(opt);
let mut runtime = Runtime::new().expect("Unable to create a runtime");
runtime.block_on(fut.boxed().compat())?;
Ok(())
}
async fn run_async(opt: Opt) -> Result<()> {
let resources: BoxStream<String> = if !opt.resources.is_empty() {
let resources = opt.resources.clone();
Box::new(stream::iter_ok(resources.into_iter()))
} else {
let lines = FramedRead::new(io::stdin(), LinesCodec::new());
Box::new(lines.map_err(|e| -> Error { e.into() }))
};
let opt = Arc::new(opt);
let opt2 = opt.clone();
let execution_futures: BoxStream<BoxFuture<Execution>> =
Box::new(resources.map(move |resource| {
resource_id_to_execution(opt2.clone(), resource).boxed()
}));
let executions: BoxStream<Execution> = Box::new(
execution_futures
.map(|fut| fut.compat())
.buffer_unordered(opt.max_tasks),
);
let stdout = FramedWrite::new(io::stdout(), LineDelimitedJsonCodec::new());
let (_executions, _stdout) = executions.forward(stdout).compat().await?;
Ok(())
}
async fn resource_id_to_execution(
opt: Arc<Opt>,
resource: String,
) -> Result<Execution> {
debug!("running {} on {}", opt.script, resource);
let mut args = execution::Args::default();
args.script = Some(opt.script.clone());
if let Some(name) = opt.name.as_ref() {
args.name = Some(name.to_owned());
}
args.add_input(&opt.resource_input_name, &resource)?;
for input in &opt.inputs {
args.add_input(&input.name, &input.value)?;
}
for output in &opt.outputs {
args.add_output(output);
}
let client = new_client()?;
let mut execution = client.create(&args).await?;
execution = client.wait(&execution.id()).await?;
debug!("finished {} on {}", execution.id(), resource);
Ok(execution)
}
fn new_client() -> Result<Client> {
let username =
env::var("BIGML_USERNAME").context("must specify BIGML_USERNAME")?;
let api_key = env::var("BIGML_API_KEY").context("must specify BIGML_API_KEY")?;
Ok(Client::new(username, api_key)?)
}