use anyhow::{Context, Result};
use bigml::{
resource::{script, source, Execution, Resource, StatusCode},
Client,
};
use cli_test_dir::*;
use futures::FutureExt;
use std::{env, future::Future, io::Write};
use tokio::runtime::Runtime;
use tracing::{debug_span, Instrument};
use tracing_subscriber::{
fmt::{format::FmtSpan, Subscriber},
prelude::*,
EnvFilter,
};
fn new_client() -> Result<Client> {
let username =
env::var("BIGML_USERNAME").context("must specify BIGML_USERNAME")?;
let api_key = env::var("BIGML_API_KEY").context("must specify BIGML_API_KEY")?;
Ok(Client::new(username, api_key)?)
}
fn run_async<F, T>(fut: F) -> Result<T>
where
F: Future<Output = Result<T>> + Send + 'static,
T: Send + 'static,
{
let runtime = Runtime::new().expect("Unable to create a runtime");
runtime.block_on(fut.boxed())
}
#[test]
fn help_flag() {
let testdir = TestDir::new("bigml-parallel", "help_flag");
let output = testdir.cmd().arg("--help").expect_success();
assert!(output.stdout_str().contains("bigml-parallel"));
}
#[test]
fn version_flag() {
let testdir = TestDir::new("bigml-parallel", "version_flag");
let output = testdir.cmd().arg("--version").expect_success();
assert!(output.stdout_str().contains(env!("CARGO_PKG_VERSION")));
}
static WHIZZML_SCRIPT: &str = r#"
;; Input: source
;; Input: n
;; Output: dataset
;; Output: n_times_2
(define dataset
(create-and-wait-dataset {"source" source "name" "bigml-parallel test"}))
(define n_times_2 (* n 2))
"#;
#[test]
#[ignore]
fn parallel_executions() {
let filter = EnvFilter::from_default_env();
let _ = Subscriber::builder()
.with_writer(std::io::stderr)
.with_span_events(FmtSpan::NEW | FmtSpan::CLOSE)
.with_env_filter(filter)
.finish()
.try_init();
let _span = debug_span!("parallel_executions").entered();
let testdir = TestDir::new("bigml-parallel", "parallel_executions");
let (sources, script) = run_async(
async {
let client = new_client()?;
let raw_sources = &["id,color\n1,green\n", "id,color\n2,blue\n"];
let mut sources = vec![];
for (i, &raw_source) in raw_sources.iter().enumerate() {
let mut args = source::Args::data(raw_source);
args.disable_datetime = Some(true);
args.name = Some(format!("bigml-parallel test {}", i));
let source = client.create_and_wait(&args).await?;
sources.push(source.id().to_owned());
}
let mut args = script::Args::new(WHIZZML_SCRIPT);
args.inputs
.push(script::Input::new("source", script::Type::ResourceId));
args.inputs
.push(script::Input::new("n", script::Type::Integer));
let script = client.create_and_wait(&args).await?;
Ok((sources, script.id().to_owned()))
}
.instrument(debug_span!("setup")),
)
.unwrap();
let mut input = vec![];
for source in &sources {
writeln!(&mut input, "{}", source).unwrap();
}
let output = testdir
.cmd()
.args(&["-n", "bigml-parallel test"])
.args(&["-s", &script.to_string()])
.args(&["-R", "source"])
.args(&["-i", "n=2"])
.args(&["-o", "dataset"])
.args(&["-o", "n_times_2"])
.args(&["--tag", "bigml-parallel:test"])
.output_with_stdin(&input)
.tee_output()
.expect("error running bigml-parallel");
assert_eq!(output.stdout_str().lines().count(), sources.len());
for line in output.stdout_str().lines() {
let execution: Execution =
serde_json::from_str(line).expect("error parsing output JSON");
assert_eq!(execution.status.code, StatusCode::Finished);
let outputs = &execution.execution.outputs;
assert!(outputs.iter().any(|output| output.name == "dataset"));
assert!(outputs.iter().any(|output| {
output.name == "n_times_2" && output.value == Some(4.into())
}));
}
}