use cargo_athena::{AthenaConfig, ContainerRunMeta, S3Ref, serde_json};
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use std::process::{Command, exit};
#[derive(clap::Args)]
pub struct PkgSel {
#[arg(short = 'p', long)]
package: Option<String>,
#[arg(long)]
bin: Option<String>,
}
impl PkgSel {
pub(crate) fn resolve(&self) -> (Option<String>, Option<String>) {
let d = AthenaConfig::load().defaults;
(
self.package.clone().or(d.package),
self.bin.clone().or(d.bin),
)
}
}
#[derive(clap::Args)]
pub struct EmulateArgs {
template: String,
#[command(flatten)]
pkg: PkgSel,
#[arg(short = 'a', long = "arg", value_name = "NAME=VALUE")]
args: Vec<String>,
#[arg(long = "input-file", value_name = "FILE")]
input_file: Option<PathBuf>,
#[arg(long, conflicts_with = "tarball")]
build: bool,
#[arg(long, value_name = "FILE", conflicts_with = "build")]
tarball: Option<PathBuf>,
#[arg(long)]
runtime: Option<String>,
#[arg(long = "skip-artifacts")]
skip_artifacts: bool,
}
#[derive(clap::Args)]
pub struct DescribeArgs {
template: String,
#[command(flatten)]
pkg: PkgSel,
}
#[derive(clap::Args)]
pub struct LsArgs {
#[command(flatten)]
pkg: PkgSel,
#[arg(long)]
all: bool,
}
#[derive(clap::Args)]
pub struct WorkflowLsArgs {
#[command(flatten)]
pkg: PkgSel,
#[arg(long)]
include_synthetic: bool,
}
pub fn describe_print(a: DescribeArgs) {
let (pkg, bin) = a.pkg.resolve();
let meta = describe(&a.template, pkg.as_deref(), bin.as_deref());
println!(
"{}",
serde_json::to_string_pretty(&meta).expect("ContainerRunMeta is serializable")
);
}
fn fetch_list(pkg: Option<&str>, bin: Option<&str>) -> Vec<ContainerRunMeta> {
let mut cmd = crate::cargo_run(pkg, bin);
cmd.env("CARGO_ATHENA_LIST", "1");
let out = cmd
.output()
.unwrap_or_else(|e| die(&format!("failed to spawn `cargo run`: {e}")));
if !out.status.success() || out.stdout.is_empty() {
eprint!("{}", String::from_utf8_lossy(&out.stderr));
die("could not list templates (run from your workflow crate, or pass --package/--bin)");
}
serde_json::from_slice(&out.stdout)
.unwrap_or_else(|e| die(&format!("could not parse template list ({e})")))
}
fn print_table(mut rows: Vec<&ContainerRunMeta>) {
rows.sort_by(|x, y| x.name.cmp(&y.name));
if rows.is_empty() {
eprintln!("(no matching templates)");
return;
}
let w = rows.iter().map(|m| m.name.len()).max().unwrap_or(4).max(4);
println!("{:<w$} KIND ARGS", "NAME", w = w);
for m in rows {
let sig = m
.params
.iter()
.map(|p| {
if p.ty.is_empty() {
p.name.clone()
} else {
format!("{}: {}", p.name, p.ty)
}
})
.collect::<Vec<_>>()
.join(", ");
println!("{:<w$} {:<9} {sig}", m.name, m.kind, w = w);
}
}
pub fn container_ls(a: LsArgs) {
let (pkg, bin) = a.pkg.resolve();
let all = fetch_list(pkg.as_deref(), bin.as_deref());
print_table(
all.iter()
.filter(|m| a.all || m.kind == "container")
.collect(),
);
}
pub fn workflow_ls(a: WorkflowLsArgs) {
let (pkg, bin) = a.pkg.resolve();
let all = fetch_list(pkg.as_deref(), bin.as_deref());
print_table(
all.iter()
.filter(|m| m.kind == "workflow" && (a.include_synthetic || !m.synthetic))
.collect(),
);
}
fn die(msg: &str) -> ! {
eprintln!("cargo athena container emulate: {msg}");
exit(2);
}
pub fn container_emulate(a: EmulateArgs) {
let (pkg, bin) = a.pkg.resolve();
let meta = describe(&a.template, pkg.as_deref(), bin.as_deref());
if meta.kind != "container" {
die(&format!(
"{:?} is a #[{}]; `container emulate` targets a single #[container]. \
A #[workflow] is a DAG with no pod — run its containers individually.",
meta.name, meta.kind
));
}
let values = check_params(&meta, &a);
let runtime = detect_runtime(a.runtime.as_deref());
let work = scratch_dir(&meta.name);
let host_of = |cpath: &str| -> PathBuf {
let rel = cpath
.strip_prefix(&meta.work_dir)
.unwrap_or(cpath)
.trim_start_matches('/');
work.join(rel)
};
let tarball = match (&a.tarball, a.build) {
(Some(p), _) => p.clone(),
(None, true) => build_local(pkg.as_deref(), bin.as_deref()),
(None, false) => {
let ba = meta.binary_artifact.as_ref().unwrap_or_else(|| {
die("template has no binary artifact (run `cargo athena build` first, \
or pass --build / --tarball)")
});
let dst = work.join("dist.tar.gz");
s3_get(&ba.s3, &dst);
dst
}
};
if let Some(ba) = &meta.binary_artifact {
let dst = host_of(&ba.path);
mkparent(&dst);
if dst != tarball {
std::fs::copy(&tarball, &dst)
.unwrap_or_else(|e| die(&format!("stage binary tarball: {e}")));
}
}
if !a.skip_artifacts {
for art in &meta.input_artifacts {
let dst = host_of(&art.path);
mkparent(&dst);
s3_get(&art.s3, &dst);
}
}
let mut c = Command::new(&runtime);
c.arg("run").arg("--rm");
c.arg("-v")
.arg(format!("{}:{}", work.display(), meta.work_dir));
for hp in &meta.host_paths {
if !Path::new(hp).exists() {
eprintln!("warning: host! path {hp} doesn't exist locally; binding anyway");
}
c.arg("-v").arg(format!("{hp}:{hp}"));
}
for (name, json) in &values {
c.arg("-e").arg(format!("{name}={json}"));
}
let (entry, rest) = meta
.command
.split_first()
.unwrap_or_else(|| die("template has no container command"));
c.arg("--entrypoint").arg(entry);
c.arg(&meta.image);
c.args(rest);
c.args(&meta.args);
eprintln!("→ {runtime} run {} ({})", meta.image, meta.name);
let status = c
.status()
.unwrap_or_else(|e| die(&format!("failed to start {runtime}: {e}")));
if !a.skip_artifacts {
for art in &meta.output_artifacts {
let src = host_of(&art.path);
if src.exists() {
s3_put(&art.s3, &src);
}
}
}
if let Some(rp) = &meta.result_path
&& let Ok(s) = std::fs::read_to_string(host_of(rp))
{
match serde_json::from_str::<serde_json::Value>(s.trim()) {
Ok(v) => println!(
"return: {}",
serde_json::to_string_pretty(&v).unwrap_or_else(|_| s.clone())
),
Err(_) => println!("return: {}", s.trim()),
}
}
let _ = std::fs::remove_dir_all(&work);
exit(status.code().unwrap_or(1));
}
pub(crate) fn describe(template: &str, package: Option<&str>, bin: Option<&str>) -> ContainerRunMeta {
let hint = || -> String {
let where_ = match (package, bin) {
(Some(p), Some(b)) => format!("--package {p} --bin {b}"),
(Some(p), None) => format!("--package {p}"),
(None, Some(b)) => format!("--bin {b}"),
(None, None) => "this directory".to_string(),
};
format!(
"could not get container metadata from your workflow binary ({where_}).\n\
\x20 - run this from your workflow crate, or pass --package/--bin;\n\
\x20 - its `main` must call `cargo_athena::entrypoint::<Root>()`;\n\
\x20 - {template:?} must be a template reachable from that root \
(`<crate>-<fn>` kebab, or the #[container(name=…)] override)."
)
};
let mut cmd = crate::cargo_run(package, bin);
cmd.env("CARGO_ATHENA_DESCRIBE", template);
let out = cmd
.output()
.unwrap_or_else(|e| die(&format!("failed to spawn `cargo run`: {e}\n{}", hint())));
if !out.status.success() || out.stdout.is_empty() {
let err = String::from_utf8_lossy(&out.stderr);
let tail: String = err
.lines()
.rev()
.take(8)
.collect::<Vec<_>>()
.into_iter()
.rev()
.map(|l| format!(" {l}"))
.collect::<Vec<_>>()
.join("\n");
die(&format!("{}\n\n binary output (tail):\n{tail}", hint()));
}
serde_json::from_slice(&out.stdout)
.unwrap_or_else(|e| die(&format!("could not parse container metadata ({e}).\n{}", hint())))
}
fn detect_runtime(over: Option<&str>) -> String {
if let Some(r) = over {
if !crate::tool_ok(r, &["--version"]) {
die(&format!("--runtime {r:?} is not runnable"));
}
return r.to_string();
}
for r in ["docker", "podman"] {
if crate::tool_ok(r, &["--version"]) {
return r.to_string();
}
}
die("neither `docker` nor `podman` found on PATH — install one, or pass --runtime");
}
fn scratch_dir(name: &str) -> PathBuf {
let d = std::env::temp_dir().join(format!("athena-run-{name}-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&d);
std::fs::create_dir_all(&d).unwrap_or_else(|e| die(&format!("mkdir scratch: {e}")));
d
}
fn mkparent(p: &Path) {
if let Some(d) = p.parent() {
std::fs::create_dir_all(d).unwrap_or_else(|e| die(&format!("mkdir {}: {e}", d.display())));
}
}
fn check_params(meta: &ContainerRunMeta, a: &EmulateArgs) -> Vec<(String, String)> {
let vals = parse_args(a.input_file.as_deref(), &a.args);
if let Err(report) = validate_args(meta, &vals) {
die(&report);
}
meta.params
.iter()
.filter_map(|p| {
vals.get(&p.name).map(|v| {
(
p.env.clone(),
serde_json::to_string(v).expect("JSON-encodable param"),
)
})
})
.collect()
}
pub(crate) fn parse_args(
input_file: Option<&Path>,
kvs: &[String],
) -> BTreeMap<String, serde_json::Value> {
let mut vals: BTreeMap<String, serde_json::Value> = BTreeMap::new();
if let Some(f) = input_file {
let txt = std::fs::read_to_string(f)
.unwrap_or_else(|e| die(&format!("--input-file {}: {e}", f.display())));
match serde_json::from_str::<serde_json::Value>(&txt) {
Ok(serde_json::Value::Object(m)) => vals.extend(m),
_ => die("--input-file must be a JSON object"),
}
}
for kv in kvs {
let (k, v) = kv
.split_once('=')
.unwrap_or_else(|| die(&format!("-a expects name=value, got {kv:?}")));
let val = serde_json::from_str::<serde_json::Value>(v)
.unwrap_or_else(|_| serde_json::Value::String(v.to_string()));
vals.insert(k.to_string(), val);
}
vals
}
pub(crate) fn validate_args(
meta: &ContainerRunMeta,
vals: &BTreeMap<String, serde_json::Value>,
) -> Result<(), String> {
let declared: std::collections::BTreeSet<&str> =
meta.params.iter().map(|p| p.name.as_str()).collect();
let mut missing = Vec::new();
let mut mism = Vec::new();
for p in &meta.params {
let norm: String = p.ty.split_whitespace().collect();
let (inner, optional) = match norm
.strip_prefix("Option<")
.and_then(|s| s.strip_suffix('>'))
{
Some(i) => (i.to_string(), true),
None => (norm.clone(), false),
};
match vals.get(&p.name) {
None if optional => {}
None => missing.push(format!(
" {}: {}",
p.name,
if p.ty.is_empty() { "?" } else { &p.ty }
)),
Some(v) => {
if let Some(exp) = expected_kind(&inner)
&& !kind_ok(exp, v)
{
mism.push(format!(
" {}: expected {} ({}), got {} {}",
p.name,
exp,
p.ty,
json_kind(v),
preview(v),
));
}
}
}
}
let unknown: Vec<String> = vals
.keys()
.filter(|k| !declared.contains(k.as_str()))
.map(|k| match suggest(k, &declared) {
Some(s) => format!(" {k} (did you mean `{s}`?)"),
None => format!(" {k}"),
})
.collect();
if missing.is_empty() && mism.is_empty() && unknown.is_empty() {
return Ok(());
}
let mut m = format!("error: invalid arguments for `{}`\n", meta.name);
if !missing.is_empty() {
m.push_str("\n missing required parameter(s):\n");
m.push_str(&missing.join("\n"));
m.push('\n');
}
if !mism.is_empty() {
m.push_str("\n type mismatch:\n");
m.push_str(&mism.join("\n"));
m.push('\n');
}
if !unknown.is_empty() {
m.push_str(&format!(
"\n unknown parameter(s) (not an input of `{}`):\n",
meta.name
));
m.push_str(&unknown.join("\n"));
m.push('\n');
}
let sig: Vec<String> = meta
.params
.iter()
.map(|p| {
if p.ty.is_empty() {
p.name.clone()
} else {
format!("{}: {}", p.name, p.ty)
}
})
.collect();
m.push_str(&format!("\n expected inputs: {}\n", sig.join(", ")));
m.push_str(" pass with -a <name>=<value> (JSON value, else string) or --input-file");
Err(m)
}
#[derive(Clone, Copy, PartialEq)]
enum Kind {
Str,
Int,
Float,
Bool,
Arr,
}
impl std::fmt::Display for Kind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Kind::Str => "string",
Kind::Int => "integer",
Kind::Float => "number",
Kind::Bool => "bool",
Kind::Arr => "array",
})
}
}
fn expected_kind(ty: &str) -> Option<Kind> {
let t = ty.trim_start_matches('&');
let t = t.strip_prefix("'static").unwrap_or(t).trim_start_matches('&');
match t {
"String" | "str" | "char" | "PathBuf" | "Path" | "Box<str>" => Some(Kind::Str),
"i8" | "i16" | "i32" | "i64" | "i128" | "isize" | "u8" | "u16" | "u32" | "u64"
| "u128" | "usize" => Some(Kind::Int),
"f32" | "f64" => Some(Kind::Float),
"bool" => Some(Kind::Bool),
_ if t.starts_with("Vec<")
|| t.starts_with("VecDeque<")
|| t.starts_with('[')
|| t.starts_with("&[") =>
{
Some(Kind::Arr)
}
_ if t.contains("Cow<") && t.contains("str") => Some(Kind::Str),
_ => None,
}
}
fn kind_ok(k: Kind, v: &serde_json::Value) -> bool {
match k {
Kind::Str => v.is_string(),
Kind::Int => v.is_i64() || v.is_u64(),
Kind::Float => v.is_number(),
Kind::Bool => v.is_boolean(),
Kind::Arr => v.is_array(),
}
}
fn json_kind(v: &serde_json::Value) -> &'static str {
match v {
serde_json::Value::Null => "null",
serde_json::Value::Bool(_) => "bool",
serde_json::Value::Number(n) if n.is_f64() => "number",
serde_json::Value::Number(_) => "integer",
serde_json::Value::String(_) => "string",
serde_json::Value::Array(_) => "array",
serde_json::Value::Object(_) => "object",
}
}
fn preview(v: &serde_json::Value) -> String {
let s = v.to_string();
if s.len() > 40 {
format!("{}…", &s[..40])
} else {
s
}
}
fn suggest(got: &str, declared: &std::collections::BTreeSet<&str>) -> Option<String> {
declared
.iter()
.map(|d| (edit_distance(got, d), d.to_string()))
.filter(|(d, _)| *d <= 2)
.min_by_key(|(d, _)| *d)
.map(|(_, s)| s)
}
fn edit_distance(a: &str, b: &str) -> usize {
let (a, b): (Vec<char>, Vec<char>) = (a.chars().collect(), b.chars().collect());
let mut prev: Vec<usize> = (0..=b.len()).collect();
for (i, ca) in a.iter().enumerate() {
let mut cur = vec![i + 1];
for (j, cb) in b.iter().enumerate() {
let sub = prev[j] + usize::from(ca != cb);
cur.push(sub.min(prev[j + 1] + 1).min(cur[j] + 1));
}
prev = cur;
}
prev[b.len()]
}
pub(crate) fn s3_store(s3: &S3Ref) -> object_store::aws::AmazonS3 {
use object_store::aws::AmazonS3Builder;
let mut b = AmazonS3Builder::new()
.with_bucket_name(&s3.bucket)
.with_region(&s3.region)
.with_allow_http(s3.insecure);
let ep_env = std::env::var("AWS_ENDPOINT_URL")
.ok()
.or_else(|| std::env::var("AWS_ENDPOINT_URL_S3").ok())
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty());
if let Some(url) = ep_env {
if url.starts_with("http://") {
b = b.with_allow_http(true);
}
b = b.with_endpoint(url);
} else {
let ep = s3.endpoint.trim();
if !ep.is_empty() && !ep.ends_with("amazonaws.com") {
let url = if ep.contains("://") {
ep.to_string()
} else if s3.insecure {
format!("http://{ep}")
} else {
format!("https://{ep}")
};
b = b.with_endpoint(url);
}
}
if let Ok(v) = std::env::var("AWS_ACCESS_KEY_ID") {
b = b.with_access_key_id(v);
}
if let Ok(v) = std::env::var("AWS_SECRET_ACCESS_KEY") {
b = b.with_secret_access_key(v);
}
if let Ok(v) = std::env::var("AWS_SESSION_TOKEN") {
b = b.with_token(v);
}
b.build()
.unwrap_or_else(|e| die(&format!("S3 client for bucket {:?}: {e}", s3.bucket)))
}
pub(crate) fn rt() -> tokio::runtime::Runtime {
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap_or_else(|e| die(&format!("async runtime: {e}")))
}
pub(crate) fn s3_exists(s3: &S3Ref) -> bool {
let store = s3_store(s3);
let key = object_store::path::Path::from(s3.key.as_str());
rt().block_on(async { object_store::ObjectStore::head(&store, &key).await })
.is_ok()
}
fn s3_get(s3: &S3Ref, dst: &Path) {
let store = s3_store(s3);
let key = object_store::path::Path::from(s3.key.as_str());
let bytes = rt()
.block_on(async {
let r = object_store::ObjectStore::get(&store, &key).await?;
r.bytes().await
})
.unwrap_or_else(|e| die(&format!("S3 GET {}: {e}", s3.key)));
mkparent(dst);
std::fs::write(dst, &bytes).unwrap_or_else(|e| die(&format!("write {}: {e}", dst.display())));
}
pub(crate) fn s3_put(s3: &S3Ref, src: &Path) {
let store = s3_store(s3);
let key = object_store::path::Path::from(s3.key.as_str());
let data = std::fs::read(src).unwrap_or_else(|e| die(&format!("read {}: {e}", src.display())));
rt().block_on(async {
object_store::ObjectStore::put(&store, &key, data.into()).await
})
.unwrap_or_else(|e| die(&format!("S3 PUT {}: {e}", s3.key)));
}
fn build_local(package: Option<&str>, bin: Option<&str>) -> PathBuf {
crate::preflight_zig();
let (krate, _ver, default_bin) = crate::package_meta(package);
let bin = bin.map(str::to_string).unwrap_or(default_bin);
let triple = if cfg!(target_arch = "aarch64") {
"aarch64-unknown-linux-musl"
} else {
"x86_64-unknown-linux-musl"
};
let st = Command::new("cargo")
.args([
"zigbuild",
"--release",
"--target",
triple,
"-p",
&krate,
"--bin",
&bin,
])
.status()
.unwrap_or_else(|e| die(&format!("cargo zigbuild: {e}")));
if !st.success() {
die("local build failed");
}
let stage = std::env::temp_dir().join(format!("athena-build-{}", std::process::id()));
let _ = std::fs::remove_dir_all(&stage);
std::fs::create_dir_all(&stage).ok();
let from = format!("target/{triple}/release/{bin}");
std::fs::copy(&from, stage.join(format!("app-{triple}")))
.unwrap_or_else(|e| die(&format!("copy {from}: {e}")));
let tb = stage.join("dist.tar.gz");
let st = Command::new("tar")
.arg("-czf")
.arg(&tb)
.arg("-C")
.arg(&stage)
.arg(format!("app-{triple}"))
.status()
.unwrap_or_else(|e| die(&format!("tar: {e}")));
if !st.success() {
die("tar failed");
}
tb
}