use anyhow::{Context as _, Result};
use clap::Parser;
use figment::{
error::Kind,
providers::{Env, Format, Serialized, Toml},
Figment,
};
use indicatif::ProgressBar;
use maelstrom_base::{
ClientJobId, JobError, JobOutputResult, JobStatus, JobStringResult, JobSuccess,
};
use maelstrom_client::{
spec::{std_env_lookup, ImageConfig},
Client, DefaultClientDriver,
};
use maelstrom_client_cli::spec::job_spec_iter_from_reader;
use maelstrom_util::{
config::BrokerAddr,
process::{ExitCode, ExitCodeAccumulator},
};
use serde::{Deserialize, Serialize};
use serde_with::skip_serializing_none;
use std::{
cell::RefCell,
io::{self, Read, Write as _},
path::PathBuf,
sync::Arc,
};
#[derive(Parser)]
#[command(
after_help = r#"Configuration values can be specified in three ways: fields in a config file, environment variables, or command-line options. Command-line options have the highest precendence, followed by environment variables.
The configuration value 'config_value' would be set via the '--config-value' command-line option, the MAELSTROM_WORKER_CONFIG_VALUE environment variable, and the 'config_value' key in a configuration file.
All values except for 'broker' have reasonable defaults.
"#
)]
#[command(version)]
struct CliOptions {
#[arg(short = 'c', long, default_value=PathBuf::from(".config/maelstrom-client-cli.toml").into_os_string())]
config_file: PathBuf,
#[arg(short = 'P', long)]
print_config: bool,
#[arg(short = 'b', long)]
broker: Option<String>,
}
impl CliOptions {
fn to_config_options(&self) -> ConfigOptions {
ConfigOptions {
broker: self.broker.clone(),
}
}
}
#[derive(Debug, Deserialize)]
#[serde(deny_unknown_fields)]
pub struct Config {
pub broker: BrokerAddr,
}
#[skip_serializing_none]
#[derive(Default, Serialize)]
pub struct ConfigOptions {
pub broker: Option<String>,
}
fn visitor(
cjid: ClientJobId,
result: JobStringResult,
accum: Arc<ExitCodeAccumulator>,
) -> Result<()> {
match result {
Ok(JobSuccess {
status,
stdout,
stderr,
}) => {
match stdout {
JobOutputResult::None => {}
JobOutputResult::Inline(bytes) => {
io::stdout().lock().write_all(&bytes)?;
}
JobOutputResult::Truncated { first, truncated } => {
io::stdout().lock().write_all(&first)?;
io::stdout().lock().flush()?;
eprintln!("job {cjid}: stdout truncated, {truncated} bytes lost");
}
}
match stderr {
JobOutputResult::None => {}
JobOutputResult::Inline(bytes) => {
io::stderr().lock().write_all(&bytes)?;
}
JobOutputResult::Truncated { first, truncated } => {
io::stderr().lock().write_all(&first)?;
eprintln!("job {cjid}: stderr truncated, {truncated} bytes lost");
}
}
match status {
JobStatus::Exited(0) => {}
JobStatus::Exited(code) => {
io::stdout().lock().flush()?;
eprintln!("job {cjid}: exited with code {code}");
accum.add(ExitCode::from(code));
}
JobStatus::Signaled(signum) => {
io::stdout().lock().flush()?;
eprintln!("job {cjid}: killed by signal {signum}");
accum.add(ExitCode::FAILURE);
}
};
}
Err(JobError::Execution(err)) => {
eprintln!("job {cjid}: execution error: {err}");
accum.add(ExitCode::FAILURE);
}
Err(JobError::System(err)) => {
eprintln!("job {cjid}: system error: {err}");
accum.add(ExitCode::FAILURE);
}
}
Ok(())
}
fn cache_dir() -> PathBuf {
directories::BaseDirs::new()
.expect("failed to find cache dir")
.cache_dir()
.join("maelstrom")
}
fn main() -> Result<ExitCode> {
let cli_options = CliOptions::parse();
let print_config = cli_options.print_config;
let config: Config = Figment::new()
.merge(Serialized::defaults(ConfigOptions::default()))
.merge(Toml::file(&cli_options.config_file))
.merge(Env::prefixed("MAELSTROM_CLIENT_"))
.merge(Serialized::globals(cli_options.to_config_options()))
.extract()
.map_err(|mut e| {
if let Kind::MissingField(field) = &e.kind {
e.kind = Kind::Message(format!("configuration value \"{field}\" was no provided"));
e
} else {
e
}
})
.context("reading configuration")?;
if print_config {
println!("{config:#?}");
return Ok(ExitCode::SUCCESS);
}
let accum = Arc::new(ExitCodeAccumulator::default());
let client = Client::new(
DefaultClientDriver::default(),
config.broker,
".",
cache_dir(),
)?;
let client = RefCell::new(client);
let reader: Box<dyn Read> = Box::new(io::stdin().lock());
let image_lookup = |image: &str| {
let (image, version) = image.split_once(':').unwrap_or((image, "latest"));
let prog = ProgressBar::hidden();
let mut client = client.borrow_mut();
let container_image_depot = client.container_image_depot_mut();
let image = container_image_depot.get_container_image(image, version, prog)?;
Ok(ImageConfig {
layers: image.layers.clone(),
environment: image.env().cloned(),
working_directory: image.working_dir().map(From::from),
})
};
let job_specs = job_spec_iter_from_reader(
reader,
|layer| client.borrow_mut().add_layer(layer),
std_env_lookup,
image_lookup,
);
for job_spec in job_specs {
let accum_clone = accum.clone();
client.borrow_mut().add_job(
job_spec?,
Box::new(move |cjid, result| visitor(cjid, result, accum_clone)),
);
}
client.into_inner().wait_for_outstanding_jobs()?;
Ok(accum.get())
}
#[test]
fn test_cli() {
use clap::CommandFactory;
CliOptions::command().debug_assert()
}