cargo_athena_core/lib.rs
1//! cargo-athena runtime.
2//!
3//! Every `#[workflow]`/`#[container]` becomes a unit-struct **type** that
4//! implements [`Template`]. That type *is* the cross-crate wormhole: the
5//! type system resolves a callee's Argo name/inputs across modules and
6//! crates, and the generated `Template::collect` calls
7//! `<Callee as Template>::collect` directly — a monomorphic call, so the
8//! whole reachable closure is force-linked with no `inventory`/DCE games.
9//!
10//! Two worlds share one binary:
11//!
12//! * **Emit** — `main` calls [`entrypoint::<E>()`]; we walk the closure
13//! from `E` and print one Argo `WorkflowTemplate` document per template
14//! (cross-refs via `templateRef`) plus a runnable `Workflow` for `E`.
15//! * **Run** — Argo invokes the binary with `--cargo-athena-template <name>`;
16//! we deserialize inputs, run the real container body, serialize outputs.
17//!
18//! `host!` declarations are still collected statically by the attribute
19//! macros and resolved through the `#[fragment]` (`inventory`) closure here
20//! — fragments are genuinely *called* by container bodies, so unlike
21//! templates they have a real symbol reference and no DCE concern.
22
23use std::collections::{HashMap, HashSet};
24
25/// Argo API types (generated from protobuf).
26pub use cargo_athena_api as api;
27// Re-exported so macro-generated code has stable paths under `::cargo_athena`.
28pub use inventory;
29pub use serde_json;
30// Maintained serde_yaml fork: YAML 1.1-aware emitter (quotes `n`/`yes`/
31// `null`/… so Argo's Go YAML→JSON parser can't mis-type them). serde_yaml
32// itself is archived/EOL.
33pub use serde_norway;
34
35/// Fan-out: `list.fan_out(|x| template(x, ..))` runs `template` once per
36/// element (Argo `withParam`); the binding is the aggregated `Vec<U>` of
37/// the per-element returns. This trait exists only so the ghost
38/// type-checks the element type, the closure, and the resulting
39/// `Vec<U>`; the macro lowers the call to Argo and it never runs.
40pub trait AthenaList<T> {
41 #[doc(hidden)]
42 fn fan_out<U, F: FnOnce(T) -> U>(self, _f: F) -> Vec<U>
43 where
44 Self: Sized,
45 {
46 unimplemented!("athena ghost: never executed")
47 }
48}
49impl<T> AthenaList<T> for Vec<T> {}
50impl<T, const N: usize> AthenaList<T> for [T; N] {}
51
52/// Marker for types that may be injected into a `#[container]`
53/// attribute (`image = "repo:" + tag`). Restricted to `String`/`str`
54/// and the primitive numbers: their `serde_json` form, unwrapped at
55/// runtime by `{{=fromJSON(...)}}`, renders to the obvious raw scalar
56/// (`"v"`→`v`, `7`→`7`). A `Display` bound would wrongly admit types
57/// whose `Display` differs from their JSON round-trip. The macro emits
58/// a hidden `Injectable`-bounded assertion against the real arg type.
59#[doc(hidden)]
60pub trait Injectable {}
61macro_rules! __athena_injectable {
62 ($($t:ty),* $(,)?) => { $( impl Injectable for $t {} )* };
63}
64__athena_injectable!(
65 String, str, i8, i16, i32, i64, i128, isize, u8, u16, u32, u64, u128, usize, f32, f64,
66);
67
68/// `host!("/lit/path")` — declare a hostPath volume for the enclosing
69/// container, evaluating to the (already-mounted) path at runtime.
70///
71/// Only valid inside a `#[cargo_athena::container]` or
72/// `#[cargo_athena::fragment]` fn: those attribute macros rewrite the
73/// invocations they can see into the private real macro below. This
74/// public definition is therefore a *hard error* — it only ever expands
75/// when `host!` is used somewhere the collector cannot see it (a plain
76/// fn, a `#[workflow]`, or nested inside another macro's tokens), where a
77/// silently-unmounted path would otherwise be a footgun.
78#[macro_export]
79macro_rules! host {
80 ($($t:tt)*) => {
81 ::core::compile_error!(
82 "`host!` may only be used directly inside a \
83 `#[cargo_athena::container]` or `#[cargo_athena::fragment]` fn \
84 (not in a plain fn, a `#[workflow]`, or nested inside another \
85 macro invocation)"
86 )
87 };
88}
89
90/// The real expansion. Private: only the attribute macros emit this path.
91#[doc(hidden)]
92#[macro_export]
93macro_rules! __cargo_athena_host {
94 ($path:literal) => {
95 $crate::rt::host_path($path)
96 };
97 ($($t:tt)*) => {
98 ::core::compile_error!("`host!` takes a single string-literal path")
99 };
100}
101
102// --- artifact declaration macros (native Argo artifacts, no S3) ------------
103//
104// Each is a public (gated `compile_error!`) + private (real) pair, exactly
105// like `host!`: only valid inside `#[container]`/`#[fragment]`, where the
106// attribute macro rewrites the public form to the private one.
107
108/// Declare an Argo *input* artifact port and read it (bytes) at runtime.
109#[macro_export]
110macro_rules! load_artifact {
111 ($($t:tt)*) => {
112 ::core::compile_error!(
113 "`load_artifact!` may only be used directly inside a \
114 `#[cargo_athena::container]` or `#[cargo_athena::fragment]` fn"
115 )
116 };
117}
118#[doc(hidden)]
119#[macro_export]
120macro_rules! __cargo_athena_load_artifact {
121 ($name:literal) => {
122 $crate::rt::load_artifact($name)
123 };
124 ($($t:tt)*) => {
125 ::core::compile_error!("load_artifact!(\"name\")")
126 };
127}
128
129/// Declare an Argo *input* artifact port and read it (UTF-8) at runtime.
130#[macro_export]
131macro_rules! load_artifact_str {
132 ($($t:tt)*) => {
133 ::core::compile_error!(
134 "`load_artifact_str!` may only be used directly inside a \
135 `#[cargo_athena::container]` or `#[cargo_athena::fragment]` fn"
136 )
137 };
138}
139#[doc(hidden)]
140#[macro_export]
141macro_rules! __cargo_athena_load_artifact_str {
142 ($name:literal) => {
143 $crate::rt::load_artifact_str($name)
144 };
145 ($($t:tt)*) => {
146 ::core::compile_error!("load_artifact_str!(\"name\")")
147 };
148}
149
150/// Declare an Argo *output* artifact port and write bytes to it at runtime.
151#[macro_export]
152macro_rules! save_artifact {
153 ($($t:tt)*) => {
154 ::core::compile_error!(
155 "`save_artifact!` may only be used directly inside a \
156 `#[cargo_athena::container]` or `#[cargo_athena::fragment]` fn"
157 )
158 };
159}
160#[doc(hidden)]
161#[macro_export]
162macro_rules! __cargo_athena_save_artifact {
163 ($name:literal, $data:expr) => {
164 $crate::rt::save_artifact($name, $data)
165 };
166 ($($t:tt)*) => {
167 ::core::compile_error!("save_artifact!(\"name\", data)")
168 };
169}
170
171/// Declare an Argo *output* artifact port and write a string at runtime.
172#[macro_export]
173macro_rules! save_artifact_str {
174 ($($t:tt)*) => {
175 ::core::compile_error!(
176 "`save_artifact_str!` may only be used directly inside a \
177 `#[cargo_athena::container]` or `#[cargo_athena::fragment]` fn"
178 )
179 };
180}
181#[doc(hidden)]
182#[macro_export]
183macro_rules! __cargo_athena_save_artifact_str {
184 ($name:literal, $data:expr) => {
185 $crate::rt::save_artifact_str($name, $data)
186 };
187 ($($t:tt)*) => {
188 ::core::compile_error!("save_artifact_str!(\"name\", data)")
189 };
190}
191
192/// Runtime shims referenced by the declaration macros. Artifact ports are
193/// plain files at fixed paths; Argo moves them (no S3 from us).
194pub mod rt {
195 use std::path::PathBuf;
196
197 /// Identity: the volume is already mounted when the container runs.
198 pub const fn host_path(path: &'static str) -> &'static str {
199 path
200 }
201
202 /// Where Argo drops/collects declared artifact ports inside the pod.
203 pub const IN_DIR: &str = "/athena/artifacts/in";
204 pub const OUT_DIR: &str = "/athena/artifacts/out";
205
206 fn in_path(name: &str) -> PathBuf {
207 PathBuf::from(IN_DIR).join(name)
208 }
209 fn out_path(name: &str) -> PathBuf {
210 PathBuf::from(OUT_DIR).join(name)
211 }
212
213 pub fn load_artifact(name: &str) -> Vec<u8> {
214 let p = in_path(name);
215 std::fs::read(&p).unwrap_or_else(|e| panic!("load_artifact({name:?}) {}: {e}", p.display()))
216 }
217
218 pub fn load_artifact_str(name: &str) -> String {
219 let p = in_path(name);
220 std::fs::read_to_string(&p)
221 .unwrap_or_else(|e| panic!("load_artifact_str({name:?}) {}: {e}", p.display()))
222 }
223
224 pub fn save_artifact(name: &str, data: impl AsRef<[u8]>) {
225 let p = out_path(name);
226 if let Some(d) = p.parent() {
227 std::fs::create_dir_all(d).expect("create artifact out dir");
228 }
229 std::fs::write(&p, data.as_ref())
230 .unwrap_or_else(|e| panic!("save_artifact({name:?}) {}: {e}", p.display()));
231 }
232
233 pub fn save_artifact_str(name: &str, data: impl AsRef<str>) {
234 save_artifact(name, data.as_ref().as_bytes());
235 }
236}
237
238/// What kind of Argo template a type produces.
239#[derive(Clone, Copy, PartialEq, Eq, Debug)]
240pub enum TemplateKind {
241 /// Leaf — real code in a pod (`#[container]`).
242 Container,
243 /// Composition — a DAG of other templates (`#[workflow]`).
244 Workflow,
245}
246
247/// The cross-crate identity of a template, implemented by the unit struct
248/// the `#[workflow]`/`#[container]` macros generate.
249///
250/// Callers never name the Argo string; they reference the *type*
251/// (`<foo::ingest as Template>::ARGO_NAME`), so name/input resolution is
252/// done by the compiler — collision-proof across crates, and the reference
253/// itself force-links the defining crate.
254pub trait Template {
255 /// Globally-unique Argo resource name (`<crate>-<fn>` by default).
256 const ARGO_NAME: &'static str;
257 /// Declared input parameter names, in order.
258 const INPUTS: &'static [&'static str];
259 /// Stringified Rust types of [`Self::INPUTS`], same order — for
260 /// `container emulate`'s pre-launch arg checking and the `ls`
261 /// listings. Emitted by `#[container]` and `#[workflow]`; defaulted
262 /// empty for synthetic/hand impls.
263 const INPUT_TYPES: &'static [&'static str] = &[];
264 /// `true` for athena-synthesized templates (the `if`/`else`
265 /// wrapper + per-arm sub-workflows). They're an implementation
266 /// detail, so `workflow ls` hides them unless `--include-synthetic`.
267 const SYNTHETIC: bool = false;
268 const KIND: TemplateKind;
269 /// Whole-workflow exit-handler template name, from
270 /// `#[workflow(on_exit_if_root=…)]` / `#[container(on_exit_if_root=…)]`.
271 /// `emit` puts it on this template's own `spec.hooks.exit`; Argo
272 /// fires exit hooks workflow-scoped, so it runs only when this
273 /// workflow is the one submitted (inert as a nested templateRef).
274 const ON_EXIT: Option<&'static str> = None;
275 /// Workflow-scoped TTL GC, from `#[…(ttl(..))]`. `build_templates`
276 /// puts it on this template's own `spec.ttlStrategy` (same per-WT
277 /// plumbing as `ON_EXIT`; never on synthetic `if` wrappers).
278 const TTL: ::core::option::Option<crate::api::TtlStrategy> = None;
279 /// Workflow-scoped pod GC strategy, from `#[…(pod_gc(strategy=..))]`.
280 /// `build_templates` puts it on this template's own `spec.podGC`.
281 const POD_GC: ::core::option::Option<&'static str> = None;
282 /// Root-only whole-workflow runtime cap (seconds), from
283 /// `#[…(active_deadline_if_root=..)]`. `build_templates` stamps it
284 /// on this template's own `spec.activeDeadlineSeconds` (same per-WT,
285 /// root-only plumbing as `TTL`/`POD_GC`). This is the *only* working
286 /// whole-workflow timeout: Argo applies neither `Template.timeout`
287 /// nor `Template.activeDeadlineSeconds` to dag/steps templates.
288 const ACTIVE_DEADLINE_IF_ROOT: ::core::option::Option<i64> = None;
289
290 /// Build this template's inner Argo `template` object.
291 fn build(ctx: &BuildCtx) -> api::Template;
292
293 /// Run-mode body — overridden by `#[container]`; never called on a
294 /// `#[workflow]`.
295 fn run(_input: serde_json::Value) -> serde_json::Value {
296 panic!(
297 "`{}` is not a #[container]; nothing to run",
298 Self::ARGO_NAME
299 )
300 }
301
302 /// Push self + the transitive callee closure into `out`. The macro
303 /// generates `<Callee as Template>::collect(out)` per callee, so the
304 /// whole reachable set is linked by direct calls.
305 fn collect(out: &mut Collector);
306}
307
308// ---- athena.toml ---------------------------------------------------------
309
310/// `athena.toml` — required by `cargo athena` at emit time. Mirrors the
311/// parts of Argo's S3 `ArtifactRepository` we inject, plus bootstrap config.
312#[derive(Debug, Clone, serde::Deserialize)]
313pub struct AthenaConfig {
314 pub artifact_repository: ArtifactRepository,
315 pub artifact: ArtifactSpec,
316 #[serde(default)]
317 pub bootstrap: Bootstrap,
318 #[serde(default)]
319 pub defaults: Defaults,
320}
321
322#[derive(Debug, Clone, serde::Deserialize)]
323pub struct Defaults {
324 /// Kubernetes ServiceAccount the workflow pods run as (so users bind
325 /// their own RBAC). Per-`#[container(service_account=...)]` overrides.
326 #[serde(default = "default_service_account")]
327 pub service_account: String,
328 /// Default cargo package the `cargo athena` subcommands drive (so
329 /// you don't repeat `--package`). The `--package`/`-p` flag wins.
330 #[serde(default)]
331 pub package: Option<String>,
332 /// Default cargo bin within that package (multi-bin crates need
333 /// it). The `--bin` flag wins.
334 #[serde(default)]
335 pub bin: Option<String>,
336 /// Default Kubernetes namespace for `cargo athena submit`. Precedence:
337 /// `-n/--namespace` → `$ARGO_NAMESPACE` → this → `"default"`.
338 #[serde(default)]
339 pub namespace: Option<String>,
340}
341
342impl Default for Defaults {
343 fn default() -> Self {
344 Self {
345 service_account: default_service_account(),
346 package: None,
347 bin: None,
348 namespace: None,
349 }
350 }
351}
352
353fn default_service_account() -> String {
354 "default".to_string()
355}
356
357/// Resolve a container template's ServiceAccount: the
358/// `#[container(service_account=...)]` override, else `[defaults]`.
359pub fn service_account(ctx: &BuildCtx, over: Option<&str>) -> String {
360 over.map(str::to_string)
361 .unwrap_or_else(|| ctx.config().defaults.service_account.clone())
362}
363
364#[derive(Debug, Clone, serde::Deserialize)]
365pub struct ArtifactRepository {
366 pub s3: S3Repo,
367}
368
369#[derive(Debug, Clone, serde::Deserialize)]
370pub struct S3Repo {
371 pub endpoint: String,
372 pub bucket: String,
373 #[serde(default)]
374 pub region: String,
375 #[serde(default)]
376 pub insecure: bool,
377 pub access_key_secret: SecretRef,
378 pub secret_key_secret: SecretRef,
379}
380
381#[derive(Debug, Clone, serde::Deserialize)]
382pub struct SecretRef {
383 pub name: String,
384 pub key: String,
385}
386
387#[derive(Debug, Clone, serde::Deserialize)]
388pub struct ArtifactSpec {
389 /// Object key of the per-binary tarball (holds `app-<triple>` for every
390 /// `bootstrap.targets`). `cargo athena build` fills any
391 /// `{crate}`/`{version}`/`{bin}` placeholders before publish.
392 pub key: String,
393}
394
395#[derive(Debug, Clone, serde::Deserialize)]
396pub struct Bootstrap {
397 /// Fallback image when a `#[container]` doesn't set its own. Per-
398 /// container `image` always wins (arbitrary by design); this is just
399 /// the small default for containers that don't care.
400 #[serde(default = "default_image")]
401 pub default_image: String,
402 /// Cross-compile / `uname` target matrix.
403 #[serde(default = "default_targets")]
404 pub targets: Vec<String>,
405}
406
407impl Default for Bootstrap {
408 fn default() -> Self {
409 Self {
410 default_image: default_image(),
411 targets: default_targets(),
412 }
413 }
414}
415
416fn default_image() -> String {
417 "busybox:1.36-musl".to_string()
418}
419
420fn default_targets() -> Vec<String> {
421 vec![
422 "x86_64-unknown-linux-musl".to_string(),
423 "aarch64-unknown-linux-musl".to_string(),
424 ]
425}
426
427impl AthenaConfig {
428 /// `ATHENA_CONFIG` override, else the nearest `athena.toml` walking up
429 /// from the cwd. Only ever called during emit — the in-pod binary
430 /// (run-mode) never needs `athena.toml`.
431 pub fn load() -> Self {
432 let path = std::env::var_os("ATHENA_CONFIG")
433 .map(std::path::PathBuf::from)
434 .or_else(Self::find_upwards)
435 .expect(
436 "athena.toml not found: set ATHENA_CONFIG or add athena.toml \
437 to the workspace (required by `cargo athena`)",
438 );
439 let text = std::fs::read_to_string(&path)
440 .unwrap_or_else(|e| panic!("read {}: {e}", path.display()));
441 toml::from_str(&text).unwrap_or_else(|e| panic!("parse {}: {e}", path.display()))
442 }
443
444 fn find_upwards() -> Option<std::path::PathBuf> {
445 let mut d = std::env::current_dir().ok()?;
446 loop {
447 let p = d.join("athena.toml");
448 if p.is_file() {
449 return Some(p);
450 }
451 if !d.pop() {
452 return None;
453 }
454 }
455 }
456}
457
458/// Pod-scoped scratch root, backed by an `emptyDir` on every container
459/// template so all athena paths are writable regardless of the image
460/// (distroless / read-only rootfs) and shared with Argo's init/wait
461/// containers for artifact load/collect.
462pub const ATHENA_DIR: &str = "/athena";
463/// Where Argo's executor (init container) extracts the per-arch
464/// binaries from our `.tar.gz` input artifact. We rely on Argo's
465/// built-in tarball auto-extraction (no `archive: none`, no `tar` in
466/// the main container's image — see `container_delivery`).
467pub const ATHENA_BIN_DIR: &str = "/athena/bin";
468/// The in-pod arch-resolving + exec bootstrap, kept in a separate
469/// `bootstrap.sh` so it can be read, edited, and `shellcheck`'d as a
470/// plain shell file rather than buried in a Rust `format!`. `@@ARMS@@`
471/// / `@@BIN_DIR@@` / `@@TEMPLATE@@` are substituted at emit time in
472/// `container_delivery`.
473const BOOTSTRAP_TEMPLATE: &str = include_str!("bootstrap.sh");
474/// Name of the scratch `emptyDir` volume.
475pub const SCRATCH_VOLUME: &str = "athena-work";
476/// Argo input-artifact name of the binary tarball `emit` injects.
477pub const ATHENA_DIST_ARTIFACT: &str = "athena-dist";
478/// Env-var prefix the in-pod bootstrap reads each input parameter from.
479pub const ATHENA_PARAM_PREFIX: &str = "ATHENA_PARAM_";
480
481/// Resolved S3 coordinates for one artifact (creds are supplied
482/// locally, e.g. via AWS env vars — `cargo athena container run` uses
483/// `object_store`; the in-cluster path uses the k8s Secret refs).
484#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
485pub struct S3Ref {
486 pub endpoint: String,
487 pub bucket: String,
488 pub region: String,
489 pub insecure: bool,
490 pub key: String,
491}
492
493/// One artifact bound into the container at `path`, backed by S3.
494#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
495pub struct ArtifactRef {
496 pub s3: S3Ref,
497 pub path: String,
498}
499
500/// An input parameter, the env var the bootstrap reads it from, and its
501/// stringified Rust type (`""` if unknown — synthetic templates).
502#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
503pub struct ParamRef {
504 pub name: String,
505 pub env: String,
506 pub ty: String,
507}
508
509/// Purpose-built introspection of one `#[container]`, derived from the
510/// *same* `Template::build()` `emit` uses (so it never drifts), but
511/// expressed in the runner's vocabulary instead of Argo's. Emitted as
512/// JSON by the binary when `CARGO_ATHENA_DESCRIBE=<name>` is set;
513/// consumed by `cargo athena container run` to realize the spec under
514/// docker/podman locally. Also the basis for a future
515/// `cargo athena container describe`.
516#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
517pub struct ContainerRunMeta {
518 /// Argo template name (`<crate>-<fn>`).
519 pub name: String,
520 /// `"container"`, `"workflow"`, or `"other"`.
521 pub kind: String,
522 /// athena-synthesized template (an `if`/`else` wrapper or arm) —
523 /// `workflow ls` hides these unless `--include-synthetic`.
524 pub synthetic: bool,
525 /// Resolved container image.
526 pub image: String,
527 /// The injected bootstrap command + args, verbatim — run as-is so
528 /// the local execution path is byte-identical to the pod's.
529 pub command: Vec<String>,
530 pub args: Vec<String>,
531 /// Mount path of the pod-scoped scratch dir (the `emptyDir`, e.g.
532 /// `/athena`); bind a host temp dir here to read `result_path` back.
533 pub work_dir: String,
534 /// Input parameters and the env var each is delivered through.
535 pub params: Vec<ParamRef>,
536 /// The binary tarball artifact (always present for a container).
537 pub binary_artifact: Option<ArtifactRef>,
538 /// `load_artifact!` input ports (excludes the binary tarball).
539 pub input_artifacts: Vec<ArtifactRef>,
540 /// `save_artifact!` output ports.
541 pub output_artifacts: Vec<ArtifactRef>,
542 /// `host!` paths (mounted at the same path in-pod; bind 1:1 locally).
543 pub host_paths: Vec<String>,
544 /// File the body writes its serialized return to
545 /// (`outputs.parameters.return`); read it back from the bind mount.
546 pub result_path: Option<String>,
547}
548
549impl ContainerRunMeta {
550 /// Derive the runner metadata from one built Argo template.
551 /// `input_types` is parallel to the template's input parameters
552 /// (same order); empty when unknown.
553 fn from_template(t: &api::Template, input_types: &[&str]) -> Self {
554 let kind = if t.container.is_some() {
555 "container"
556 } else if t.dag.is_some() || !t.steps.is_empty() {
557 "workflow"
558 } else {
559 "other"
560 };
561 let c = t.container.as_ref();
562 let mount_path = |vol: &str| {
563 c.and_then(|c| {
564 c.volume_mounts
565 .iter()
566 .find(|m| m.name == vol)
567 .map(|m| m.mount_path.clone())
568 })
569 };
570 let to_ref = |a: &api::Artifact| {
571 a.s3.as_ref().map(|s| ArtifactRef {
572 s3: S3Ref {
573 endpoint: s.endpoint.clone(),
574 bucket: s.bucket.clone(),
575 region: s.region.clone(),
576 insecure: s.insecure,
577 key: s.key.clone(),
578 },
579 path: a.path.clone(),
580 })
581 };
582 let in_arts = t.inputs.as_ref().map(|i| &i.artifacts);
583 ContainerRunMeta {
584 name: t.name.clone(),
585 kind: kind.to_string(),
586 // set by the caller from the Collector (Template::SYNTHETIC
587 // isn't visible through the type-erased builder fn here).
588 synthetic: false,
589 image: c.map(|c| c.image.clone()).unwrap_or_default(),
590 command: c.map(|c| c.command.clone()).unwrap_or_default(),
591 args: c.map(|c| c.args.clone()).unwrap_or_default(),
592 work_dir: mount_path(SCRATCH_VOLUME).unwrap_or_else(|| ATHENA_DIR.to_string()),
593 params: t
594 .inputs
595 .as_ref()
596 .map(|i| {
597 i.parameters
598 .iter()
599 .enumerate()
600 .map(|(idx, p)| ParamRef {
601 name: p.name.clone(),
602 env: format!("{ATHENA_PARAM_PREFIX}{}", p.name),
603 ty: input_types
604 .get(idx)
605 .map(|s| (*s).to_string())
606 .unwrap_or_default(),
607 })
608 .collect()
609 })
610 .unwrap_or_default(),
611 binary_artifact: in_arts
612 .and_then(|a| a.iter().find(|a| a.name == ATHENA_DIST_ARTIFACT))
613 .and_then(to_ref),
614 input_artifacts: in_arts
615 .map(|a| {
616 a.iter()
617 .filter(|a| a.name != ATHENA_DIST_ARTIFACT)
618 .filter_map(to_ref)
619 .collect()
620 })
621 .unwrap_or_default(),
622 output_artifacts: t
623 .outputs
624 .as_ref()
625 .map(|o| o.artifacts.iter().filter_map(to_ref).collect())
626 .unwrap_or_default(),
627 host_paths: t
628 .volumes
629 .iter()
630 .filter_map(|v| v.host_path.as_ref().map(|h| h.path.clone()))
631 .collect(),
632 result_path: t.outputs.as_ref().and_then(|o| {
633 o.parameters
634 .iter()
635 .find(|p| p.name == "return")
636 .and_then(|p| p.value_from.as_ref())
637 .map(|vf| vf.path.clone())
638 .filter(|p| !p.is_empty())
639 }),
640 }
641 }
642}
643
644/// What [`container_delivery`] produces for one `#[container]` template.
645pub struct ContainerDelivery {
646 /// Resolved image: the `#[container(image=...)]` override, else
647 /// `[bootstrap].default_image`.
648 pub image: String,
649 pub command: Vec<String>,
650 pub args: Vec<String>,
651 pub artifact: api::Artifact,
652}
653
654/// The arch-resolving bootstrap + the S3 binary artifact for one container
655/// template. Called from macro-generated `Template::build` (emit only).
656///
657/// Runs inside the container's *arbitrary, user-chosen* image (it only
658/// needs a POSIX `sh` and `uname` — **no `tar`**). The binary `.tar.gz`
659/// is delivered as an Argo input artifact at [`ATHENA_BIN_DIR`]; with
660/// the default `Archive` (no `archive: none`) Argo's executor init
661/// container **auto-detects tarballs and untars them into the artifact
662/// path** (proven from `workflow/executor/executor.go:262–289`,
663/// `untar` ibid., real v4.0.5 source). So by the time our bootstrap
664/// runs the per-arch `app-<triple>` files already exist under
665/// [`ATHENA_BIN_DIR`]; the script just `uname`s, `chmod +x`'s, and
666/// `exec`s — replacing the shell so the Rust binary is the container's
667/// main process. Works whether the tarball has one entry or many.
668pub fn container_delivery(
669 ctx: &BuildCtx,
670 argo_name: &str,
671 image_override: Option<&str>,
672) -> ContainerDelivery {
673 let cfg = ctx.config();
674 let s3 = &cfg.artifact_repository.s3;
675 let image = image_override
676 .map(str::to_string)
677 .unwrap_or_else(|| cfg.bootstrap.default_image.clone());
678
679 let mut arms = String::new();
680 for triple in &cfg.bootstrap.targets {
681 let arch = triple.split('-').next().unwrap_or(triple);
682 let pat = if arch == "aarch64" {
683 "aarch64|arm64"
684 } else {
685 arch
686 };
687 arms.push_str(&format!(" {pat}) __t={triple} ;;\n"));
688 }
689
690 // Argo's executor (init container) auto-extracts the `.tar.gz` into
691 // ATHENA_BIN_DIR, so the bootstrap just picks + execs. The template
692 // lives in a sibling `bootstrap.sh` file (legible / greppable /
693 // shellcheck-able); we just substitute the @@-delimited slots.
694 let script = BOOTSTRAP_TEMPLATE
695 .replace("@@ARMS@@", &arms)
696 .replace("@@BIN_DIR@@", ATHENA_BIN_DIR)
697 .replace("@@TEMPLATE@@", argo_name);
698
699 // `archive: None` (NOT `archive: none`) lets Argo auto-detect the
700 // input as a tarball and untar it into `path` (`ATHENA_BIN_DIR`).
701 // See `container_delivery` doc above.
702 let artifact = api::Artifact {
703 name: "athena-dist".to_string(),
704 path: ATHENA_BIN_DIR.to_string(),
705 s3: Some(s3_loc(s3, &cfg.artifact.key)),
706 archive: None,
707 mode: None,
708 };
709
710 ContainerDelivery {
711 image,
712 command: vec!["/bin/sh".to_string(), "-c".to_string()],
713 args: vec![script],
714 artifact,
715 }
716}
717
718/// A `#[fragment]`: a plain helper carrying `host!` decls. Still
719/// `inventory`-based — a container's real body actually *calls* its
720/// fragments, so the symbol reference exists and DCE is not a concern.
721pub struct FragmentReg {
722 pub rust_name: &'static str,
723 pub host_paths: &'static [&'static str],
724 pub in_artifacts: &'static [&'static str],
725 pub out_artifacts: &'static [&'static str],
726 pub callees: &'static [&'static str],
727}
728inventory::collect!(FragmentReg);
729
730/// Fragment registry snapshot, passed to container `build`s.
731pub struct BuildCtx {
732 fragments: HashMap<&'static str, &'static FragmentReg>,
733 config: AthenaConfig,
734}
735
736impl BuildCtx {
737 /// Emit-only: gathers fragments AND loads `athena.toml`. Never called
738 /// in run-mode, so the in-pod binary needs no `athena.toml`.
739 pub fn collect() -> Self {
740 let mut fragments = HashMap::new();
741 for f in inventory::iter::<FragmentReg> {
742 fragments.insert(f.rust_name, f);
743 }
744 Self {
745 fragments,
746 config: AthenaConfig::load(),
747 }
748 }
749
750 pub fn config(&self) -> &AthenaConfig {
751 &self.config
752 }
753
754 /// Own literal decls ∪ the transitive `#[fragment]` closure for one
755 /// kind of declaration (deduped, stable order). `select` picks which
756 /// `FragmentReg` slice to pull from a reached fragment.
757 fn resolved(
758 &self,
759 own: &[&str],
760 own_callees: &[&str],
761 select: impl Fn(&FragmentReg) -> &'static [&'static str],
762 ) -> Vec<String> {
763 let mut out: Vec<String> = Vec::new();
764 let mut seen: HashSet<String> = HashSet::new();
765 let mut push = |p: &str, out: &mut Vec<String>| {
766 if seen.insert(p.to_string()) {
767 out.push(p.to_string());
768 }
769 };
770 for p in own {
771 push(p, &mut out);
772 }
773 let mut queue: Vec<&str> = own_callees.to_vec();
774 let mut visited: HashSet<&str> = HashSet::new();
775 while let Some(c) = queue.pop() {
776 if !visited.insert(c) {
777 continue;
778 }
779 if let Some(f) = self.fragments.get(c) {
780 for p in select(f) {
781 push(p, &mut out);
782 }
783 queue.extend(f.callees.iter().copied());
784 }
785 }
786 out
787 }
788
789 /// hostPaths: own `host!`s ∪ fragment closure.
790 pub fn resolved_host_paths(&self, own: &[&str], callees: &[&str]) -> Vec<String> {
791 self.resolved(own, callees, |f| f.host_paths)
792 }
793
794 /// Input artifact ports: own `load_artifact*!`s ∪ fragment closure.
795 pub fn resolved_in_artifacts(&self, own: &[&str], callees: &[&str]) -> Vec<String> {
796 self.resolved(own, callees, |f| f.in_artifacts)
797 }
798
799 /// Output artifact ports: own `save_artifact*!`s ∪ fragment closure.
800 pub fn resolved_out_artifacts(&self, own: &[&str], callees: &[&str]) -> Vec<String> {
801 self.resolved(own, callees, |f| f.out_artifacts)
802 }
803}
804
805fn archive_none() -> api::ArchiveStrategy {
806 api::ArchiveStrategy {
807 none: Some(api::NoneStrategy {}),
808 }
809}
810
811/// Build an Argo S3 location (artifact-repository creds from `athena.toml`)
812/// for an exact object `key`.
813pub fn s3_loc(s3: &S3Repo, key: &str) -> api::S3Artifact {
814 api::S3Artifact {
815 endpoint: s3.endpoint.clone(),
816 bucket: s3.bucket.clone(),
817 region: s3.region.clone(),
818 insecure: s3.insecure,
819 key: key.to_string(),
820 access_key_secret: Some(api::SecretKeySelector {
821 name: s3.access_key_secret.name.clone(),
822 key: s3.access_key_secret.key.clone(),
823 }),
824 secret_key_secret: Some(api::SecretKeySelector {
825 name: s3.secret_key_secret.name.clone(),
826 key: s3.secret_key_secret.key.clone(),
827 }),
828 }
829}
830
831/// A valid Argo artifact identifier derived from an S3 key (which may
832/// contain `/`, `.`). The key itself is preserved in `s3.key`.
833fn artifact_ident(key: &str) -> String {
834 let mut s: String = key
835 .chars()
836 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
837 .collect();
838 s = s.trim_matches('-').to_ascii_lowercase();
839 if s.is_empty() {
840 s.push('a');
841 }
842 s
843}
844
845/// `load_artifact!("key")` input ports: Argo pulls the exact S3 object
846/// `key` from the configured repo into the pod (raw, `archive: none`).
847pub fn artifact_inputs(ctx: &BuildCtx, keys: &[String]) -> Vec<api::Artifact> {
848 let s3 = &ctx.config().artifact_repository.s3;
849 keys.iter()
850 .map(|k| api::Artifact {
851 name: artifact_ident(k),
852 path: format!("{}/{k}", rt::IN_DIR),
853 s3: Some(s3_loc(s3, k)),
854 archive: Some(archive_none()),
855 mode: None,
856 })
857 .collect()
858}
859
860/// `save_artifact!("key")` output ports: Argo pushes the written file to
861/// the exact S3 object `key` in the configured repo (raw, `archive: none`).
862pub fn artifact_outputs(ctx: &BuildCtx, keys: &[String]) -> Vec<api::Artifact> {
863 let s3 = &ctx.config().artifact_repository.s3;
864 keys.iter()
865 .map(|k| api::Artifact {
866 name: artifact_ident(k),
867 path: format!("{}/{k}", rt::OUT_DIR),
868 s3: Some(s3_loc(s3, k)),
869 archive: Some(archive_none()),
870 mode: None,
871 })
872 .collect()
873}
874
875/// Accumulates the reachable templates (as `WorkflowTemplate`s) and the
876/// run-mode dispatch table while `Template::collect` walks the closure.
877pub struct Collector {
878 seen: HashSet<String>,
879 /// Deferred so `athena.toml` is read only at emit, never run-mode.
880 builders: Vec<fn(&BuildCtx) -> api::Template>,
881 runners: HashMap<String, fn(serde_json::Value) -> serde_json::Value>,
882 /// `<argo name> -> <on_exit handler argo name>` for *every* template
883 /// with an `on_exit` (not just the root). Each WorkflowTemplate
884 /// carries its own `spec.hooks.exit`; Argo only fires the hook of
885 /// the workflow that is actually submitted (workflow-scoped), so
886 /// submitting a sub-workflow's template directly runs its own hook.
887 exits: HashMap<String, &'static str>,
888 /// `<argo name> -> ttlStrategy` for every template with `ttl(..)`.
889 /// Stamped onto that WorkflowTemplate's `spec.ttlStrategy` (same
890 /// per-WT, workflow-scoped semantics as `exits`).
891 ttl: HashMap<String, crate::api::TtlStrategy>,
892 /// `<argo name> -> podGC strategy` for every template with
893 /// `pod_gc(..)`. Stamped onto that WT's `spec.podGC`.
894 pod_gc: HashMap<String, String>,
895 /// `<argo name> -> activeDeadlineSeconds` for every template with
896 /// `active_deadline_if_root(..)`. Stamped onto that WT's
897 /// `spec.activeDeadlineSeconds` (same per-WT, root-only as `ttl`).
898 active_deadline: HashMap<String, i64>,
899 /// `<argo name> -> stringified input types` (parallel to the
900 /// template's INPUTS), for `container emulate` arg type-checking.
901 types: HashMap<String, &'static [&'static str]>,
902 /// Argo names of athena-synthesized templates (`Template::SYNTHETIC`)
903 /// so `workflow ls` can hide them by default.
904 synthetic: HashSet<String>,
905}
906
907impl Default for Collector {
908 fn default() -> Self {
909 Self::new()
910 }
911}
912
913impl Collector {
914 pub fn new() -> Self {
915 Self {
916 seen: HashSet::new(),
917 builders: Vec::new(),
918 runners: HashMap::new(),
919 exits: HashMap::new(),
920 ttl: HashMap::new(),
921 pod_gc: HashMap::new(),
922 active_deadline: HashMap::new(),
923 types: HashMap::new(),
924 synthetic: HashSet::new(),
925 }
926 }
927
928 /// Returns `false` if `argo_name` was already collected (generated
929 /// `collect` impls return early in that case — dedup + cycle guard).
930 pub fn enter(&mut self, argo_name: &str) -> bool {
931 self.seen.insert(argo_name.to_string())
932 }
933
934 /// Register a template's `build` fn (invoked lazily at emit).
935 pub fn add_builder(&mut self, build: fn(&BuildCtx) -> api::Template) {
936 self.builders.push(build);
937 }
938
939 /// Register a template by type: its `build` fn plus, if it sets
940 /// `on_exit_if_root`, its exit handler keyed by Argo name (so
941 /// `emit` can put `spec.hooks.exit` on *that* WorkflowTemplate).
942 pub fn add<T: Template>(&mut self) {
943 self.builders.push(T::build);
944 if let Some(handler) = T::ON_EXIT {
945 self.exits.insert(T::ARGO_NAME.to_string(), handler);
946 }
947 if let Some(t) = T::TTL {
948 self.ttl.insert(T::ARGO_NAME.to_string(), t);
949 }
950 if let Some(s) = T::POD_GC {
951 self.pod_gc.insert(T::ARGO_NAME.to_string(), s.to_string());
952 }
953 if let Some(s) = T::ACTIVE_DEADLINE_IF_ROOT {
954 self.active_deadline.insert(T::ARGO_NAME.to_string(), s);
955 }
956 if !T::INPUT_TYPES.is_empty() {
957 self.types.insert(T::ARGO_NAME.to_string(), T::INPUT_TYPES);
958 }
959 if T::SYNTHETIC {
960 self.synthetic.insert(T::ARGO_NAME.to_string());
961 }
962 }
963
964 pub fn add_runner(&mut self, argo_name: &str, run: fn(serde_json::Value) -> serde_json::Value) {
965 self.runners.insert(argo_name.to_string(), run);
966 }
967
968 /// Emit the multi-document stream: one `WorkflowTemplate` per template
969 /// plus a runnable `Workflow` for the entrypoint `E`. Builds the
970 /// `BuildCtx` (and reads `athena.toml`) here — emit only.
971 /// Emit the multi-doc YAML. `with_workflow` appends a convenience
972 /// runnable `Workflow` (`generateName`, `workflowTemplateRef` →
973 /// root) — off by default: the deterministic, stable-named
974 /// `WorkflowTemplate`s are the artifact you register/GitOps, and
975 /// runs are triggered with `argo submit --from
976 /// workflowtemplate/<root>`. The convenience Workflow is opt-in for
977 /// quick demos / `kubectl create -f -`.
978 /// The deterministic `WorkflowTemplate` set `emit` serializes —
979 /// every reachable template, sorted, with each `on_exit_if_root`
980 /// hook stamped on its own template. Shared by YAML emit and the
981 /// `CARGO_ATHENA_EMIT_JSON` mode `cargo athena submit` consumes for
982 /// its register/drift checks.
983 pub fn build_templates(&self) -> Vec<api::WorkflowTemplate> {
984 let ctx = BuildCtx::collect();
985 let mut tpls: Vec<api::WorkflowTemplate> = self
986 .builders
987 .iter()
988 .map(|b| {
989 let inner = b(&ctx);
990 wrap_workflow_template(inner.name.clone(), inner)
991 })
992 .collect();
993 tpls.sort_by_key(name_of);
994
995 // `on_exit_if_root`: EVERY template that declares it carries the
996 // exit hook on its OWN WorkflowTemplate's `spec.hooks.exit`
997 // (`templateRef` — the legacy `spec.onExit` name-string can't
998 // cross the one-WT-per-template model). Argo only fires the hook
999 // of the workflow actually submitted (workflow-scoped, proven on
1000 // real Argo v4.0.5 via both `argo submit --from` and a
1001 // `workflowTemplateRef` Workflow); a templateRef'd sub-workflow's
1002 // own hook stays inert when nested — but submit that sub-WT
1003 // directly and its own hook fires.
1004 for t in tpls.iter_mut() {
1005 let name = name_of(t);
1006 if let Some(handler) = self.exits.get(&name)
1007 && let Some(spec) = t.spec.as_mut()
1008 {
1009 spec.hooks.insert(
1010 "exit".to_string(),
1011 api::LifecycleHook {
1012 template_ref: Some(api::TemplateRef {
1013 name: handler.to_string(),
1014 template: handler.to_string(),
1015 cluster_scope: false,
1016 }),
1017 ..Default::default()
1018 },
1019 );
1020 }
1021 // `ttl(..)`/`pod_gc(..)` are WorkflowSpec-scoped: stamped on
1022 // each declaring template's OWN WorkflowTemplate (Argo runs
1023 // GC workflow-scoped, same per-WT model as the exit hook).
1024 if let Some(ttl) = self.ttl.get(&name)
1025 && let Some(spec) = t.spec.as_mut()
1026 {
1027 spec.ttl_strategy = Some(ttl.clone());
1028 }
1029 if let Some(s) = self.pod_gc.get(&name)
1030 && let Some(spec) = t.spec.as_mut()
1031 {
1032 spec.pod_gc = Some(api::PodGc {
1033 strategy: s.clone(),
1034 });
1035 }
1036 // `active_deadline_if_root` is the genuine whole-workflow
1037 // runtime cap (Argo applies `Template.timeout` /
1038 // `Template.activeDeadlineSeconds` to neither dag nor steps);
1039 // root-only, same per-WT plumbing as `ttl`/`pod_gc`.
1040 if let Some(s) = self.active_deadline.get(&name)
1041 && let Some(spec) = t.spec.as_mut()
1042 {
1043 spec.active_deadline_seconds = Some(*s);
1044 }
1045 }
1046 tpls
1047 }
1048
1049 pub fn emit<E: Template>(&self, with_workflow: bool) -> String {
1050 let tpls = self.build_templates();
1051 let mut docs: Vec<String> = tpls
1052 .iter()
1053 .map(|t| serde_norway::to_string(t).expect("WorkflowTemplate is serializable"))
1054 .collect();
1055
1056 if !with_workflow {
1057 return docs.join("---\n");
1058 }
1059
1060 let ctx = BuildCtx::collect();
1061 let wf = api::Workflow {
1062 api_version: api::API_VERSION.to_string(),
1063 kind: api::KIND_WORKFLOW.to_string(),
1064 metadata: Some(api::ObjectMeta {
1065 generate_name: format!("{}-", E::ARGO_NAME),
1066 ..Default::default()
1067 }),
1068 spec: Some(api::WorkflowSpec {
1069 workflow_template_ref: Some(api::WorkflowTemplateRef {
1070 name: E::ARGO_NAME.to_string(),
1071 cluster_scope: false,
1072 }),
1073 service_account_name: ctx.config().defaults.service_account.clone(),
1074 // Only the emit root's on_exit reaches a runnable Workflow,
1075 // and it must be `hooks.exit` with a templateRef (the
1076 // legacy `spec.onExit` name-string can't cross the
1077 // one-WT-per-template wormhole).
1078 hooks: E::ON_EXIT
1079 .map(|n| {
1080 let mut m = std::collections::BTreeMap::new();
1081 m.insert(
1082 "exit".to_string(),
1083 api::LifecycleHook {
1084 template_ref: Some(api::TemplateRef {
1085 name: n.to_string(),
1086 template: n.to_string(),
1087 cluster_scope: false,
1088 }),
1089 ..Default::default()
1090 },
1091 );
1092 m
1093 })
1094 .unwrap_or_default(),
1095 // Only the emit root's ttl/pod_gc/active_deadline reaches
1096 // the runnable Workflow (same workflow-scoped reasoning
1097 // as the hook).
1098 ttl_strategy: E::TTL,
1099 active_deadline_seconds: E::ACTIVE_DEADLINE_IF_ROOT,
1100 pod_gc: E::POD_GC.map(|s| api::PodGc {
1101 strategy: s.to_string(),
1102 }),
1103 ..Default::default()
1104 }),
1105 };
1106 docs.push(serde_norway::to_string(&wf).expect("Workflow is serializable"));
1107 docs.join("---\n")
1108 }
1109}
1110
1111fn name_of(t: &api::WorkflowTemplate) -> String {
1112 t.metadata
1113 .as_ref()
1114 .map(|m| m.name.clone())
1115 .unwrap_or_default()
1116}
1117
1118/// Wrap one inner Argo `template` as a standalone `WorkflowTemplate` whose
1119/// resource name == the inner template name == its entrypoint.
1120pub fn wrap_workflow_template(name: String, inner: api::Template) -> api::WorkflowTemplate {
1121 api::WorkflowTemplate {
1122 api_version: api::API_VERSION.to_string(),
1123 kind: api::KIND_WORKFLOW_TEMPLATE.to_string(),
1124 metadata: Some(api::ObjectMeta {
1125 name: name.clone(),
1126 ..Default::default()
1127 }),
1128 spec: Some(api::WorkflowSpec {
1129 entrypoint: name,
1130 templates: vec![inner],
1131 arguments: None,
1132 workflow_template_ref: None,
1133 ..Default::default()
1134 }),
1135 }
1136}
1137
1138/// kebab-case an Argo identifier (DNS-1123-ish) from a Rust ident.
1139pub fn kebab(s: &str) -> String {
1140 s.replace('_', "-").to_ascii_lowercase()
1141}
1142
1143fn volume_name(path: &str) -> String {
1144 let mut n: String = path
1145 .chars()
1146 .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1147 .collect();
1148 n = n.trim_matches('-').to_string();
1149 if n.is_empty() {
1150 n.push('v');
1151 }
1152 format!("host-{n}")
1153}
1154
1155/// `volumes` + `volumeMounts` for a set of hostPaths (from `host!`).
1156pub fn host_path_volumes(paths: &[String]) -> (Vec<api::Volume>, Vec<api::VolumeMount>) {
1157 let mut vols = Vec::new();
1158 let mut mounts = Vec::new();
1159 for p in paths {
1160 let name = volume_name(p);
1161 vols.push(api::Volume {
1162 name: name.clone(),
1163 host_path: Some(api::HostPathVolumeSource {
1164 path: p.clone(),
1165 r#type: String::new(),
1166 }),
1167 ..Default::default()
1168 });
1169 mounts.push(api::VolumeMount {
1170 name,
1171 mount_path: p.clone(),
1172 read_only: false,
1173 });
1174 }
1175 (vols, mounts)
1176}
1177
1178/// Every container template's volumes/mounts: the always-present
1179/// `emptyDir` scratch at [`ATHENA_DIR`] (binary tarball, in/out artifact
1180/// ports, extraction, result — all writable on any image) followed by the
1181/// declared hostPaths.
1182pub fn container_volumes(host_paths: &[String]) -> (Vec<api::Volume>, Vec<api::VolumeMount>) {
1183 let mut vols = vec![api::Volume {
1184 name: SCRATCH_VOLUME.to_string(),
1185 empty_dir: Some(api::EmptyDirVolumeSource {}),
1186 ..Default::default()
1187 }];
1188 let mut mounts = vec![api::VolumeMount {
1189 name: SCRATCH_VOLUME.to_string(),
1190 mount_path: ATHENA_DIR.to_string(),
1191 read_only: false,
1192 }];
1193 let (hv, hm) = host_path_volumes(host_paths);
1194 vols.extend(hv);
1195 mounts.extend(hm);
1196 (vols, mounts)
1197}
1198
1199/// The entrypoint a user's `main` calls, parameterised by the root
1200/// workflow type. Referencing `E` force-links the entire reachable
1201/// closure (each `collect` calls callees' `collect` directly).
1202pub fn entrypoint<E: Template>() {
1203 let mut collector = Collector::new();
1204 E::collect(&mut collector);
1205
1206 let args: Vec<String> = std::env::args().collect();
1207 let target = args
1208 .iter()
1209 .position(|a| a == "--cargo-athena-template")
1210 .and_then(|i| args.get(i + 1).cloned())
1211 .or_else(|| std::env::var("CARGO_ATHENA_TEMPLATE").ok());
1212
1213 if let Some(t) = target {
1214 let run = *collector
1215 .runners
1216 .get(&t)
1217 .unwrap_or_else(|| panic!("no runnable container template named {t:?}"));
1218 // Base inputs: `cargo athena run --input` (local), then overlay
1219 // Argo-supplied params delivered as `ATHENA_PARAM_<name>` env vars
1220 // (set on the container template from `{{inputs.parameters.*}}`).
1221 let mut input = std::env::var("CARGO_ATHENA_INPUT")
1222 .ok()
1223 .map(|s| serde_json::from_str(&s).expect("CARGO_ATHENA_INPUT must be JSON"))
1224 .unwrap_or(serde_json::Value::Object(Default::default()));
1225 if let serde_json::Value::Object(map) = &mut input {
1226 for (k, v) in std::env::vars() {
1227 if let Some(name) = k.strip_prefix("ATHENA_PARAM_") {
1228 // A param is JSON if it parses, else a bare string.
1229 let val =
1230 serde_json::from_str(&v).unwrap_or(serde_json::Value::String(v.clone()));
1231 map.insert(name.to_string(), val);
1232 }
1233 }
1234 }
1235 let output = run(input);
1236 if let Ok(path) = std::env::var("CARGO_ATHENA_OUTPUT") {
1237 std::fs::write(path, output.to_string()).expect("write CARGO_ATHENA_OUTPUT");
1238 } else {
1239 println!("{output}");
1240 }
1241 return;
1242 }
1243
1244 // `cargo athena container ls` sets this to enumerate every reachable
1245 // template's metadata as a JSON array (same per-template derivation
1246 // as describe — so names/params shown are exactly what runs).
1247 if std::env::var_os("CARGO_ATHENA_LIST").is_some() {
1248 let ctx = BuildCtx::collect();
1249 let all: Vec<ContainerRunMeta> = collector
1250 .builders
1251 .iter()
1252 .map(|b| {
1253 let t = b(&ctx);
1254 let it = collector.types.get(&t.name).copied().unwrap_or(&[]);
1255 let mut m = ContainerRunMeta::from_template(&t, it);
1256 m.synthetic = collector.synthetic.contains(&t.name);
1257 m
1258 })
1259 .collect();
1260 println!(
1261 "{}",
1262 serde_json::to_string(&all).expect("ContainerRunMeta is serializable")
1263 );
1264 return;
1265 }
1266
1267 // `cargo athena container emulate/describe` sets this to fetch ONE
1268 // template's metadata as JSON (it then realizes that exact spec
1269 // locally via docker/podman). Reusing `Template::build` here is what
1270 // makes the local run identical to Argo by construction — same
1271 // image, bootstrap, env, volumes, and artifacts as `emit`.
1272 if let Ok(name) = std::env::var("CARGO_ATHENA_DESCRIBE") {
1273 let ctx = BuildCtx::collect();
1274 let tpl = collector
1275 .builders
1276 .iter()
1277 .map(|b| b(&ctx))
1278 .find(|t| t.name == name)
1279 .unwrap_or_else(|| panic!("no template named {name:?}"));
1280 let input_types = collector.types.get(&name).copied().unwrap_or(&[]);
1281 let mut meta = ContainerRunMeta::from_template(&tpl, input_types);
1282 meta.synthetic = collector.synthetic.contains(&name);
1283 println!(
1284 "{}",
1285 serde_json::to_string(&meta).expect("ContainerRunMeta is serializable")
1286 );
1287 return;
1288 }
1289
1290 // `cargo athena submit` sets this to get the deterministic
1291 // `WorkflowTemplate` set as a JSON array (structured — for its
1292 // register-if-missing / drift-detect / apply checks), instead of
1293 // re-parsing the YAML `emit` prints.
1294 if std::env::var_os("CARGO_ATHENA_EMIT_JSON").is_some() {
1295 println!(
1296 "{}",
1297 serde_json::to_string(&collector.build_templates())
1298 .expect("WorkflowTemplate is serializable")
1299 );
1300 return;
1301 }
1302
1303 // `cargo athena emit --with-workflow` sets this on the child so the
1304 // convenience runnable Workflow is appended (default: templates
1305 // only — deterministic, `kubectl apply`-able, GitOps-clean).
1306 let with_workflow = std::env::var_os("CARGO_ATHENA_WITH_WORKFLOW").is_some_and(|v| v == "1");
1307 print!("{}", collector.emit::<E>(with_workflow));
1308}
1309
1310#[cfg(test)]
1311mod tests {
1312 use super::*;
1313
1314 #[test]
1315 fn kebab_lowercases_and_hyphenates() {
1316 assert_eq!(kebab("run_a_container"), "run-a-container");
1317 assert_eq!(kebab("RunFoo"), "runfoo");
1318 }
1319
1320 #[test]
1321 fn volume_name_is_dns_safe() {
1322 assert_eq!(volume_name("/etc/myapp"), "host-etc-myapp");
1323 assert_eq!(volume_name("/var/lib/extra"), "host-var-lib-extra");
1324 }
1325}