use anyhow::{Error, Result};
use bigml::{
self,
resource::{execution, Execution, Id, Resource, Script},
try_wait, try_with_permanent_failure,
wait::{wait, BackoffType, WaitOptions, WaitStatus},
Client,
};
use clap::Parser;
use futures::{self, stream, FutureExt, StreamExt, TryStreamExt};
use regex::Regex;
use std::{process, sync::Arc, time::Duration};
use tokio::io;
use tokio_util::codec::{FramedRead, FramedWrite, LinesCodec};
use tracing::{debug, error, instrument};
use tracing_subscriber::{
fmt::{format::FmtSpan, Subscriber},
prelude::*,
EnvFilter,
};
mod execution_input;
mod line_delimited_json_codec;
use execution_input::ExecutionInput;
use line_delimited_json_codec::LineDelimitedJsonCodec;
type BoxStream<T> = futures::stream::BoxStream<'static, Result<T>>;
type BoxFuture<T> = futures::future::BoxFuture<'static, Result<T>>;
#[derive(Debug, Parser)]
#[command(
name = "bigml-parallel",
about = "Execute WhizzML script in parallel over one or more BigML resources",
author,
version
)]
struct Opt {
#[arg(long = "script", short = 's')]
script: Id<Script>,
#[arg(long = "name", short = 'n')]
name: Option<String>,
#[arg(long = "resource", short = 'r')]
resources: Vec<String>,
#[arg(long = "resource-input-name", short = 'R', default_value = "resource")]
resource_input_name: String,
#[arg(long = "input", short = 'i')]
inputs: Vec<ExecutionInput>,
#[arg(long = "output", short = 'o')]
outputs: Vec<String>,
#[arg(long = "max-tasks", short = 'J', default_value = "2")]
max_tasks: usize,
#[arg(long = "tag")]
tags: Vec<String>,
#[arg(long = "retry-on")]
retry_on: Option<Regex>,
#[arg(long = "retry-count", default_value = "0")]
retry_count: u16,
}
#[tokio::main]
async fn main() {
let filter = EnvFilter::from_default_env();
Subscriber::builder()
.with_writer(std::io::stderr)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_env_filter(filter)
.finish()
.init();
if let Err(err) = run().await {
eprintln!("Error: {}", err);
for cause in err.chain().skip(1) {
eprintln!(" caused by: {}", cause);
}
eprintln!("{}", err.backtrace());
process::exit(1);
}
}
#[instrument(level = "trace", name = "bigml_parallel")]
async fn run() -> Result<()> {
let opt = Opt::parse();
debug!("command-line options: {:?}", opt);
let resources: BoxStream<String> = if !opt.resources.is_empty() {
let resources = opt.resources.clone();
stream::iter(resources.into_iter().map(Ok)).boxed()
} else {
let lines = FramedRead::new(io::stdin(), LinesCodec::new());
lines.map_err(|e| -> Error { e.into() }).boxed()
};
let opt = Arc::new(opt);
let opt2 = opt.clone();
let execution_futures: BoxStream<BoxFuture<Execution>> = resources
.map_ok(move |resource| {
resource_id_to_execution(opt2.clone(), resource).boxed()
})
.boxed();
let executions: BoxStream<Execution> = execution_futures
.try_buffer_unordered(opt.max_tasks)
.boxed();
let stdout = FramedWrite::new(io::stdout(), LineDelimitedJsonCodec::new());
executions.forward(stdout).await?;
Ok(())
}
#[instrument(level = "debug", fields(script = %opt.script), skip(opt))]
async fn resource_id_to_execution(
opt: Arc<Opt>,
resource: String,
) -> Result<Execution> {
let mut args = execution::Args::default();
args.script = Some(opt.script.clone());
if let Some(name) = opt.name.as_ref() {
args.name = Some(name.to_owned());
}
args.add_input(&opt.resource_input_name, &resource)?;
for input in &opt.inputs {
args.add_input(&input.name, &input.value)?;
}
for output in &opt.outputs {
args.add_output(output);
}
args.tags = opt.tags.clone();
let exec_wait_opt = WaitOptions::default()
.retry_interval(Duration::from_secs(2 * 60))
.backoff_type(BackoffType::Exponential)
.allowed_errors(opt.retry_count);
let execution = wait(&exec_wait_opt, || {
create_and_wait_execution(&args, opt.retry_on.as_ref())
})
.await?;
Ok(execution)
}
#[instrument(level = "trace")]
async fn create_and_wait_execution(
args: &execution::Args,
retry_on: Option<&Regex>,
) -> WaitStatus<Execution, bigml::Error> {
let client = try_with_permanent_failure!(Client::new_from_env());
let create_wait_opt = WaitOptions::default()
.retry_interval(Duration::from_secs(60))
.backoff_type(BackoffType::Exponential)
.allowed_errors(6);
let execution = try_with_permanent_failure!(
wait(&create_wait_opt, || {
async {
WaitStatus::Finished(try_wait!(client.create(args).await))
}
})
.await
);
match client.wait(execution.id()).await {
Ok(execution) => WaitStatus::Finished(execution),
Err(err) => match (err.original_bigml_error(), retry_on) {
(bigml::Error::WaitFailed { message, .. }, Some(retry_on))
if retry_on.is_match(message) =>
{
error!("{} failed with temporary error: {}", execution.id(), err);
WaitStatus::FailedTemporarily(err)
}
_ => WaitStatus::FailedPermanently(err),
},
}
}