Skip to main content

cargo_athena_core/
lib.rs

1//! cargo-athena runtime.
2//!
3//! Every `#[workflow]`/`#[container]` becomes a unit-struct **type** that
4//! implements [`Template`]. That type *is* the cross-crate wormhole: the
5//! type system resolves a callee's Argo name/inputs across modules and
6//! crates, and the generated `Template::collect` calls
7//! `<Callee as Template>::collect` directly — a monomorphic call, so the
8//! whole reachable closure is force-linked with no `inventory`/DCE games.
9//!
10//! Two worlds share one binary:
11//!
12//! * **Emit** — `main` calls [`entrypoint::<E>()`]; we walk the closure
13//!   from `E` and print one Argo `WorkflowTemplate` document per template
14//!   (cross-refs via `templateRef`) plus a runnable `Workflow` for `E`.
15//! * **Run** — Argo invokes the binary with `--cargo-athena-template <name>`;
16//!   we deserialize inputs, run the real container body, serialize outputs.
17//!
18//! `host!` declarations are still collected statically by the attribute
19//! macros and resolved through the `#[fragment]` (`inventory`) closure here
20//! — fragments are genuinely *called* by container bodies, so unlike
21//! templates they have a real symbol reference and no DCE concern.
22
23use std::collections::{HashMap, HashSet};
24
25/// Argo API types (generated from protobuf).
26pub use cargo_athena_api as api;
27// Re-exported so macro-generated code has stable paths under `::cargo_athena`.
28pub use inventory;
29pub use serde_json;
30// Maintained serde_yaml fork: YAML 1.1-aware emitter (quotes `n`/`yes`/
31// `null`/… so Argo's Go YAML→JSON parser can't mis-type them). serde_yaml
32// itself is archived/EOL.
33pub use serde_norway;
34
35/// Fan-out: `list.fan_out(|x| template(x, ..))` runs `template` once per
36/// element (Argo `withParam`); the binding is the aggregated `Vec<U>` of
37/// the per-element returns. This trait exists only so the ghost
38/// type-checks the element type, the closure, and the resulting
39/// `Vec<U>`; the macro lowers the call to Argo and it never runs.
40pub trait AthenaList<T> {
41    #[doc(hidden)]
42    fn fan_out<U, F: FnOnce(T) -> U>(self, _f: F) -> Vec<U>
43    where
44        Self: Sized,
45    {
46        unimplemented!("athena ghost: never executed")
47    }
48}
49impl<T> AthenaList<T> for Vec<T> {}
50impl<T, const N: usize> AthenaList<T> for [T; N] {}
51
52/// Marker for types that may be injected into a `#[container]`
53/// attribute (`image = "repo:" + tag`). Restricted to `String`/`str`
54/// and the primitive numbers: their `serde_json` form, unwrapped at
55/// runtime by `{{=fromJSON(...)}}`, renders to the obvious raw scalar
56/// (`"v"`→`v`, `7`→`7`). A `Display` bound would wrongly admit types
57/// whose `Display` differs from their JSON round-trip. The macro emits
58/// a hidden `Injectable`-bounded assertion against the real arg type.
59#[doc(hidden)]
60pub trait Injectable {}
61macro_rules! __athena_injectable {
62    ($($t:ty),* $(,)?) => { $( impl Injectable for $t {} )* };
63}
64__athena_injectable!(
65    String, str, i8, i16, i32, i64, i128, isize, u8, u16, u32, u64, u128, usize, f32, f64,
66);
67
68/// `host!("/lit/path")` — declare a hostPath volume for the enclosing
69/// container, evaluating to the (already-mounted) path at runtime.
70///
71/// Only valid inside a `#[cargo_athena::container]` or
72/// `#[cargo_athena::fragment]` fn: those attribute macros rewrite the
73/// invocations they can see into the private real macro below. This
74/// public definition is therefore a *hard error* — it only ever expands
75/// when `host!` is used somewhere the collector cannot see it (a plain
76/// fn, a `#[workflow]`, or nested inside another macro's tokens), where a
77/// silently-unmounted path would otherwise be a footgun.
78#[macro_export]
79macro_rules! host {
80    ($($t:tt)*) => {
81        ::core::compile_error!(
82            "`host!` may only be used directly inside a \
83             `#[cargo_athena::container]` or `#[cargo_athena::fragment]` fn \
84             (not in a plain fn, a `#[workflow]`, or nested inside another \
85             macro invocation)"
86        )
87    };
88}
89
90/// The real expansion. Private: only the attribute macros emit this path.
91#[doc(hidden)]
92#[macro_export]
93macro_rules! __cargo_athena_host {
94    ($path:literal) => {
95        $crate::rt::host_path($path)
96    };
97    ($($t:tt)*) => {
98        ::core::compile_error!("`host!` takes a single string-literal path")
99    };
100}
101
102// --- artifact declaration macros (native Argo artifacts, no S3) ------------
103//
104// Each is a public (gated `compile_error!`) + private (real) pair, exactly
105// like `host!`: only valid inside `#[container]`/`#[fragment]`, where the
106// attribute macro rewrites the public form to the private one.
107
108/// Declare an Argo *input* artifact port and read it (bytes) at runtime.
109#[macro_export]
110macro_rules! load_artifact {
111    ($($t:tt)*) => {
112        ::core::compile_error!(
113            "`load_artifact!` may only be used directly inside a \
114             `#[cargo_athena::container]` or `#[cargo_athena::fragment]` fn"
115        )
116    };
117}
118#[doc(hidden)]
119#[macro_export]
120macro_rules! __cargo_athena_load_artifact {
121    ($name:literal) => {
122        $crate::rt::load_artifact($name)
123    };
124    ($($t:tt)*) => {
125        ::core::compile_error!("load_artifact!(\"name\")")
126    };
127}
128
129/// Declare an Argo *input* artifact port and read it (UTF-8) at runtime.
130#[macro_export]
131macro_rules! load_artifact_str {
132    ($($t:tt)*) => {
133        ::core::compile_error!(
134            "`load_artifact_str!` may only be used directly inside a \
135             `#[cargo_athena::container]` or `#[cargo_athena::fragment]` fn"
136        )
137    };
138}
139#[doc(hidden)]
140#[macro_export]
141macro_rules! __cargo_athena_load_artifact_str {
142    ($name:literal) => {
143        $crate::rt::load_artifact_str($name)
144    };
145    ($($t:tt)*) => {
146        ::core::compile_error!("load_artifact_str!(\"name\")")
147    };
148}
149
150/// Declare an Argo *output* artifact port and write bytes to it at runtime.
151#[macro_export]
152macro_rules! save_artifact {
153    ($($t:tt)*) => {
154        ::core::compile_error!(
155            "`save_artifact!` may only be used directly inside a \
156             `#[cargo_athena::container]` or `#[cargo_athena::fragment]` fn"
157        )
158    };
159}
160#[doc(hidden)]
161#[macro_export]
162macro_rules! __cargo_athena_save_artifact {
163    ($name:literal, $data:expr) => {
164        $crate::rt::save_artifact($name, $data)
165    };
166    ($($t:tt)*) => {
167        ::core::compile_error!("save_artifact!(\"name\", data)")
168    };
169}
170
171/// Declare an Argo *output* artifact port and write a string at runtime.
172#[macro_export]
173macro_rules! save_artifact_str {
174    ($($t:tt)*) => {
175        ::core::compile_error!(
176            "`save_artifact_str!` may only be used directly inside a \
177             `#[cargo_athena::container]` or `#[cargo_athena::fragment]` fn"
178        )
179    };
180}
181#[doc(hidden)]
182#[macro_export]
183macro_rules! __cargo_athena_save_artifact_str {
184    ($name:literal, $data:expr) => {
185        $crate::rt::save_artifact_str($name, $data)
186    };
187    ($($t:tt)*) => {
188        ::core::compile_error!("save_artifact_str!(\"name\", data)")
189    };
190}
191
192/// Runtime shims referenced by the declaration macros. Artifact ports are
193/// plain files at fixed paths; Argo moves them (no S3 from us).
194pub mod rt {
195    use std::path::PathBuf;
196
197    /// Identity: the volume is already mounted when the container runs.
198    pub const fn host_path(path: &'static str) -> &'static str {
199        path
200    }
201
202    /// Where Argo drops/collects declared artifact ports inside the pod.
203    pub const IN_DIR: &str = "/athena/artifacts/in";
204    pub const OUT_DIR: &str = "/athena/artifacts/out";
205
206    fn in_path(name: &str) -> PathBuf {
207        PathBuf::from(IN_DIR).join(name)
208    }
209    fn out_path(name: &str) -> PathBuf {
210        PathBuf::from(OUT_DIR).join(name)
211    }
212
213    pub fn load_artifact(name: &str) -> Vec<u8> {
214        let p = in_path(name);
215        std::fs::read(&p).unwrap_or_else(|e| panic!("load_artifact({name:?}) {}: {e}", p.display()))
216    }
217
218    pub fn load_artifact_str(name: &str) -> String {
219        let p = in_path(name);
220        std::fs::read_to_string(&p)
221            .unwrap_or_else(|e| panic!("load_artifact_str({name:?}) {}: {e}", p.display()))
222    }
223
224    pub fn save_artifact(name: &str, data: impl AsRef<[u8]>) {
225        let p = out_path(name);
226        if let Some(d) = p.parent() {
227            std::fs::create_dir_all(d).expect("create artifact out dir");
228        }
229        std::fs::write(&p, data.as_ref())
230            .unwrap_or_else(|e| panic!("save_artifact({name:?}) {}: {e}", p.display()));
231    }
232
233    pub fn save_artifact_str(name: &str, data: impl AsRef<str>) {
234        save_artifact(name, data.as_ref().as_bytes());
235    }
236}
237
238/// What kind of Argo template a type produces.
239#[derive(Clone, Copy, PartialEq, Eq, Debug)]
240pub enum TemplateKind {
241    /// Leaf — real code in a pod (`#[container]`).
242    Container,
243    /// Composition — a DAG of other templates (`#[workflow]`).
244    Workflow,
245}
246
247/// The cross-crate identity of a template, implemented by the unit struct
248/// the `#[workflow]`/`#[container]` macros generate.
249///
250/// Callers never name the Argo string; they reference the *type*
251/// (`<foo::ingest as Template>::ARGO_NAME`), so name/input resolution is
252/// done by the compiler — collision-proof across crates, and the reference
253/// itself force-links the defining crate.
254pub trait Template {
255    /// Globally-unique Argo resource name (`<crate>-<fn>` by default).
256    const ARGO_NAME: &'static str;
257    /// Declared input parameter names, in order.
258    const INPUTS: &'static [&'static str];
259    /// Stringified Rust types of [`Self::INPUTS`], same order — for
260    /// `container emulate`'s pre-launch arg checking and the `ls`
261    /// listings. Emitted by `#[container]` and `#[workflow]`; defaulted
262    /// empty for synthetic/hand impls.
263    const INPUT_TYPES: &'static [&'static str] = &[];
264    /// `true` for athena-synthesized templates (the `if`/`else`
265    /// wrapper + per-arm sub-workflows). They're an implementation
266    /// detail, so `workflow ls` hides them unless `--include-synthetic`.
267    const SYNTHETIC: bool = false;
268    const KIND: TemplateKind;
269    /// Whole-workflow exit-handler template name, from
270    /// `#[workflow(on_exit_if_root=…)]` / `#[container(on_exit_if_root=…)]`.
271    /// `emit` puts it on this template's own `spec.hooks.exit`; Argo
272    /// fires exit hooks workflow-scoped, so it runs only when this
273    /// workflow is the one submitted (inert as a nested templateRef).
274    const ON_EXIT: Option<&'static str> = None;
275    /// Workflow-scoped TTL GC, from `#[…(ttl(..))]`. `build_templates`
276    /// puts it on this template's own `spec.ttlStrategy` (same per-WT
277    /// plumbing as `ON_EXIT`; never on synthetic `if` wrappers).
278    const TTL: ::core::option::Option<crate::api::TtlStrategy> = None;
279    /// Workflow-scoped pod GC strategy, from `#[…(pod_gc(strategy=..))]`.
280    /// `build_templates` puts it on this template's own `spec.podGC`.
281    const POD_GC: ::core::option::Option<&'static str> = None;
282    /// Root-only whole-workflow runtime cap (seconds), from
283    /// `#[…(active_deadline_if_root=..)]`. `build_templates` stamps it
284    /// on this template's own `spec.activeDeadlineSeconds` (same per-WT,
285    /// root-only plumbing as `TTL`/`POD_GC`). This is the *only* working
286    /// whole-workflow timeout: Argo applies neither `Template.timeout`
287    /// nor `Template.activeDeadlineSeconds` to dag/steps templates.
288    const ACTIVE_DEADLINE_IF_ROOT: ::core::option::Option<i64> = None;
289
290    /// Build this template's inner Argo `template` object.
291    fn build(ctx: &BuildCtx) -> api::Template;
292
293    /// Run-mode body — overridden by `#[container]`; never called on a
294    /// `#[workflow]`.
295    fn run(_input: serde_json::Value) -> serde_json::Value {
296        panic!(
297            "`{}` is not a #[container]; nothing to run",
298            Self::ARGO_NAME
299        )
300    }
301
302    /// Push self + the transitive callee closure into `out`. The macro
303    /// generates `<Callee as Template>::collect(out)` per callee, so the
304    /// whole reachable set is linked by direct calls.
305    fn collect(out: &mut Collector);
306}
307
308// ---- athena.toml ---------------------------------------------------------
309
310/// `athena.toml` — required by `cargo athena` at emit time. Mirrors the
311/// parts of Argo's S3 `ArtifactRepository` we inject, plus bootstrap config.
312#[derive(Debug, Clone, serde::Deserialize)]
313pub struct AthenaConfig {
314    pub artifact_repository: ArtifactRepository,
315    pub artifact: ArtifactSpec,
316    #[serde(default)]
317    pub bootstrap: Bootstrap,
318    #[serde(default)]
319    pub defaults: Defaults,
320}
321
322#[derive(Debug, Clone, serde::Deserialize)]
323pub struct Defaults {
324    /// Kubernetes ServiceAccount the workflow pods run as (so users bind
325    /// their own RBAC). Per-`#[container(service_account=...)]` overrides.
326    #[serde(default = "default_service_account")]
327    pub service_account: String,
328    /// Default cargo package the `cargo athena` subcommands drive (so
329    /// you don't repeat `--package`). The `--package`/`-p` flag wins.
330    #[serde(default)]
331    pub package: Option<String>,
332    /// Default cargo bin within that package (multi-bin crates need
333    /// it). The `--bin` flag wins.
334    #[serde(default)]
335    pub bin: Option<String>,
336    /// Default Kubernetes namespace for `cargo athena submit`. Precedence:
337    /// `-n/--namespace` → `$ARGO_NAMESPACE` → this → `"default"`.
338    #[serde(default)]
339    pub namespace: Option<String>,
340}
341
342impl Default for Defaults {
343    fn default() -> Self {
344        Self {
345            service_account: default_service_account(),
346            package: None,
347            bin: None,
348            namespace: None,
349        }
350    }
351}
352
353fn default_service_account() -> String {
354    "default".to_string()
355}
356
357/// Resolve a container template's ServiceAccount: the
358/// `#[container(service_account=...)]` override, else `[defaults]`.
359pub fn service_account(ctx: &BuildCtx, over: Option<&str>) -> String {
360    over.map(str::to_string)
361        .unwrap_or_else(|| ctx.config().defaults.service_account.clone())
362}
363
364#[derive(Debug, Clone, serde::Deserialize)]
365pub struct ArtifactRepository {
366    pub s3: S3Repo,
367}
368
369#[derive(Debug, Clone, serde::Deserialize)]
370pub struct S3Repo {
371    pub endpoint: String,
372    pub bucket: String,
373    #[serde(default)]
374    pub region: String,
375    #[serde(default)]
376    pub insecure: bool,
377    pub access_key_secret: SecretRef,
378    pub secret_key_secret: SecretRef,
379}
380
381#[derive(Debug, Clone, serde::Deserialize)]
382pub struct SecretRef {
383    pub name: String,
384    pub key: String,
385}
386
387#[derive(Debug, Clone, serde::Deserialize)]
388pub struct ArtifactSpec {
389    /// Object key of the per-binary tarball (holds `app-<triple>` for every
390    /// `bootstrap.targets`). `cargo athena build` fills any
391    /// `{crate}`/`{version}`/`{bin}` placeholders before publish.
392    pub key: String,
393}
394
395#[derive(Debug, Clone, serde::Deserialize)]
396pub struct Bootstrap {
397    /// Fallback image when a `#[container]` doesn't set its own. Per-
398    /// container `image` always wins (arbitrary by design); this is just
399    /// the small default for containers that don't care.
400    #[serde(default = "default_image")]
401    pub default_image: String,
402    /// Cross-compile / `uname` target matrix.
403    #[serde(default = "default_targets")]
404    pub targets: Vec<String>,
405}
406
407impl Default for Bootstrap {
408    fn default() -> Self {
409        Self {
410            default_image: default_image(),
411            targets: default_targets(),
412        }
413    }
414}
415
416fn default_image() -> String {
417    "busybox:1.36-musl".to_string()
418}
419
420fn default_targets() -> Vec<String> {
421    vec![
422        "x86_64-unknown-linux-musl".to_string(),
423        "aarch64-unknown-linux-musl".to_string(),
424    ]
425}
426
427impl AthenaConfig {
428    /// `ATHENA_CONFIG` override, else the nearest `athena.toml` walking up
429    /// from the cwd. Only ever called during emit — the in-pod binary
430    /// (run-mode) never needs `athena.toml`.
431    pub fn load() -> Self {
432        let path = std::env::var_os("ATHENA_CONFIG")
433            .map(std::path::PathBuf::from)
434            .or_else(Self::find_upwards)
435            .expect(
436                "athena.toml not found: set ATHENA_CONFIG or add athena.toml \
437                 to the workspace (required by `cargo athena`)",
438            );
439        let text = std::fs::read_to_string(&path)
440            .unwrap_or_else(|e| panic!("read {}: {e}", path.display()));
441        toml::from_str(&text).unwrap_or_else(|e| panic!("parse {}: {e}", path.display()))
442    }
443
444    fn find_upwards() -> Option<std::path::PathBuf> {
445        let mut d = std::env::current_dir().ok()?;
446        loop {
447            let p = d.join("athena.toml");
448            if p.is_file() {
449                return Some(p);
450            }
451            if !d.pop() {
452                return None;
453            }
454        }
455    }
456}
457
458/// Pod-scoped scratch root, backed by an `emptyDir` on every container
459/// template so all athena paths are writable regardless of the image
460/// (distroless / read-only rootfs) and shared with Argo's init/wait
461/// containers for artifact load/collect.
462pub const ATHENA_DIR: &str = "/athena";
463/// Where Argo's executor (init container) extracts the per-arch
464/// binaries from our `.tar.gz` input artifact. We rely on Argo's
465/// built-in tarball auto-extraction (no `archive: none`, no `tar` in
466/// the main container's image — see `container_delivery`).
467pub const ATHENA_BIN_DIR: &str = "/athena/bin";
468/// The in-pod arch-resolving + exec bootstrap, kept in a separate
469/// `bootstrap.sh` so it can be read, edited, and `shellcheck`'d as a
470/// plain shell file rather than buried in a Rust `format!`. `@@ARMS@@`
471/// / `@@BIN_DIR@@` / `@@TEMPLATE@@` are substituted at emit time in
472/// `container_delivery`.
473const BOOTSTRAP_TEMPLATE: &str = include_str!("bootstrap.sh");
474/// Name of the scratch `emptyDir` volume.
475pub const SCRATCH_VOLUME: &str = "athena-work";
476/// Argo input-artifact name of the binary tarball `emit` injects.
477pub const ATHENA_DIST_ARTIFACT: &str = "athena-dist";
478/// Env-var prefix the in-pod bootstrap reads each input parameter from.
479pub const ATHENA_PARAM_PREFIX: &str = "ATHENA_PARAM_";
480
481/// Resolved S3 coordinates for one artifact (creds are supplied
482/// locally, e.g. via AWS env vars — `cargo athena container run` uses
483/// `object_store`; the in-cluster path uses the k8s Secret refs).
484#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
485pub struct S3Ref {
486    pub endpoint: String,
487    pub bucket: String,
488    pub region: String,
489    pub insecure: bool,
490    pub key: String,
491}
492
493/// One artifact bound into the container at `path`, backed by S3.
494#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
495pub struct ArtifactRef {
496    pub s3: S3Ref,
497    pub path: String,
498}
499
500/// An input parameter, the env var the bootstrap reads it from, and its
501/// stringified Rust type (`""` if unknown — synthetic templates).
502#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
503pub struct ParamRef {
504    pub name: String,
505    pub env: String,
506    pub ty: String,
507}
508
509/// Purpose-built introspection of one `#[container]`, derived from the
510/// *same* `Template::build()` `emit` uses (so it never drifts), but
511/// expressed in the runner's vocabulary instead of Argo's. Emitted as
512/// JSON by the binary when `CARGO_ATHENA_DESCRIBE=<name>` is set;
513/// consumed by `cargo athena container run` to realize the spec under
514/// docker/podman locally. Also the basis for a future
515/// `cargo athena container describe`.
516#[derive(serde::Serialize, serde::Deserialize, Debug, Clone)]
517pub struct ContainerRunMeta {
518    /// Argo template name (`<crate>-<fn>`).
519    pub name: String,
520    /// `"container"`, `"workflow"`, or `"other"`.
521    pub kind: String,
522    /// athena-synthesized template (an `if`/`else` wrapper or arm) —
523    /// `workflow ls` hides these unless `--include-synthetic`.
524    pub synthetic: bool,
525    /// Resolved container image.
526    pub image: String,
527    /// The injected bootstrap command + args, verbatim — run as-is so
528    /// the local execution path is byte-identical to the pod's.
529    pub command: Vec<String>,
530    pub args: Vec<String>,
531    /// Mount path of the pod-scoped scratch dir (the `emptyDir`, e.g.
532    /// `/athena`); bind a host temp dir here to read `result_path` back.
533    pub work_dir: String,
534    /// Input parameters and the env var each is delivered through.
535    pub params: Vec<ParamRef>,
536    /// The binary tarball artifact (always present for a container).
537    pub binary_artifact: Option<ArtifactRef>,
538    /// `load_artifact!` input ports (excludes the binary tarball).
539    pub input_artifacts: Vec<ArtifactRef>,
540    /// `save_artifact!` output ports.
541    pub output_artifacts: Vec<ArtifactRef>,
542    /// `host!` paths (mounted at the same path in-pod; bind 1:1 locally).
543    pub host_paths: Vec<String>,
544    /// File the body writes its serialized return to
545    /// (`outputs.parameters.return`); read it back from the bind mount.
546    pub result_path: Option<String>,
547}
548
549impl ContainerRunMeta {
550    /// Derive the runner metadata from one built Argo template.
551    /// `input_types` is parallel to the template's input parameters
552    /// (same order); empty when unknown.
553    fn from_template(t: &api::Template, input_types: &[&str]) -> Self {
554        let kind = if t.container.is_some() {
555            "container"
556        } else if t.dag.is_some() || !t.steps.is_empty() {
557            "workflow"
558        } else {
559            "other"
560        };
561        let c = t.container.as_ref();
562        let mount_path = |vol: &str| {
563            c.and_then(|c| {
564                c.volume_mounts
565                    .iter()
566                    .find(|m| m.name == vol)
567                    .map(|m| m.mount_path.clone())
568            })
569        };
570        let to_ref = |a: &api::Artifact| {
571            a.s3.as_ref().map(|s| ArtifactRef {
572                s3: S3Ref {
573                    endpoint: s.endpoint.clone(),
574                    bucket: s.bucket.clone(),
575                    region: s.region.clone(),
576                    insecure: s.insecure,
577                    key: s.key.clone(),
578                },
579                path: a.path.clone(),
580            })
581        };
582        let in_arts = t.inputs.as_ref().map(|i| &i.artifacts);
583        ContainerRunMeta {
584            name: t.name.clone(),
585            kind: kind.to_string(),
586            // set by the caller from the Collector (Template::SYNTHETIC
587            // isn't visible through the type-erased builder fn here).
588            synthetic: false,
589            image: c.map(|c| c.image.clone()).unwrap_or_default(),
590            command: c.map(|c| c.command.clone()).unwrap_or_default(),
591            args: c.map(|c| c.args.clone()).unwrap_or_default(),
592            work_dir: mount_path(SCRATCH_VOLUME).unwrap_or_else(|| ATHENA_DIR.to_string()),
593            params: t
594                .inputs
595                .as_ref()
596                .map(|i| {
597                    i.parameters
598                        .iter()
599                        .enumerate()
600                        .map(|(idx, p)| ParamRef {
601                            name: p.name.clone(),
602                            env: format!("{ATHENA_PARAM_PREFIX}{}", p.name),
603                            ty: input_types
604                                .get(idx)
605                                .map(|s| (*s).to_string())
606                                .unwrap_or_default(),
607                        })
608                        .collect()
609                })
610                .unwrap_or_default(),
611            binary_artifact: in_arts
612                .and_then(|a| a.iter().find(|a| a.name == ATHENA_DIST_ARTIFACT))
613                .and_then(to_ref),
614            input_artifacts: in_arts
615                .map(|a| {
616                    a.iter()
617                        .filter(|a| a.name != ATHENA_DIST_ARTIFACT)
618                        .filter_map(to_ref)
619                        .collect()
620                })
621                .unwrap_or_default(),
622            output_artifacts: t
623                .outputs
624                .as_ref()
625                .map(|o| o.artifacts.iter().filter_map(to_ref).collect())
626                .unwrap_or_default(),
627            host_paths: t
628                .volumes
629                .iter()
630                .filter_map(|v| v.host_path.as_ref().map(|h| h.path.clone()))
631                .collect(),
632            result_path: t.outputs.as_ref().and_then(|o| {
633                o.parameters
634                    .iter()
635                    .find(|p| p.name == "return")
636                    .and_then(|p| p.value_from.as_ref())
637                    .map(|vf| vf.path.clone())
638                    .filter(|p| !p.is_empty())
639            }),
640        }
641    }
642}
643
644/// What [`container_delivery`] produces for one `#[container]` template.
645pub struct ContainerDelivery {
646    /// Resolved image: the `#[container(image=...)]` override, else
647    /// `[bootstrap].default_image`.
648    pub image: String,
649    pub command: Vec<String>,
650    pub args: Vec<String>,
651    pub artifact: api::Artifact,
652}
653
654/// The arch-resolving bootstrap + the S3 binary artifact for one container
655/// template. Called from macro-generated `Template::build` (emit only).
656///
657/// Runs inside the container's *arbitrary, user-chosen* image (it only
658/// needs a POSIX `sh` and `uname` — **no `tar`**). The binary `.tar.gz`
659/// is delivered as an Argo input artifact at [`ATHENA_BIN_DIR`]; with
660/// the default `Archive` (no `archive: none`) Argo's executor init
661/// container **auto-detects tarballs and untars them into the artifact
662/// path** (proven from `workflow/executor/executor.go:262–289`,
663/// `untar` ibid., real v4.0.5 source). So by the time our bootstrap
664/// runs the per-arch `app-<triple>` files already exist under
665/// [`ATHENA_BIN_DIR`]; the script just `uname`s, `chmod +x`'s, and
666/// `exec`s — replacing the shell so the Rust binary is the container's
667/// main process. Works whether the tarball has one entry or many.
668pub fn container_delivery(
669    ctx: &BuildCtx,
670    argo_name: &str,
671    image_override: Option<&str>,
672) -> ContainerDelivery {
673    let cfg = ctx.config();
674    let s3 = &cfg.artifact_repository.s3;
675    let image = image_override
676        .map(str::to_string)
677        .unwrap_or_else(|| cfg.bootstrap.default_image.clone());
678
679    let mut arms = String::new();
680    for triple in &cfg.bootstrap.targets {
681        let arch = triple.split('-').next().unwrap_or(triple);
682        let pat = if arch == "aarch64" {
683            "aarch64|arm64"
684        } else {
685            arch
686        };
687        arms.push_str(&format!("  {pat}) __t={triple} ;;\n"));
688    }
689
690    // Argo's executor (init container) auto-extracts the `.tar.gz` into
691    // ATHENA_BIN_DIR, so the bootstrap just picks + execs. The template
692    // lives in a sibling `bootstrap.sh` file (legible / greppable /
693    // shellcheck-able); we just substitute the @@-delimited slots.
694    let script = BOOTSTRAP_TEMPLATE
695        .replace("@@ARMS@@", &arms)
696        .replace("@@BIN_DIR@@", ATHENA_BIN_DIR)
697        .replace("@@TEMPLATE@@", argo_name);
698
699    // `archive: None` (NOT `archive: none`) lets Argo auto-detect the
700    // input as a tarball and untar it into `path` (`ATHENA_BIN_DIR`).
701    // See `container_delivery` doc above.
702    let artifact = api::Artifact {
703        name: "athena-dist".to_string(),
704        path: ATHENA_BIN_DIR.to_string(),
705        s3: Some(s3_loc(s3, &cfg.artifact.key)),
706        archive: None,
707        mode: None,
708    };
709
710    ContainerDelivery {
711        image,
712        command: vec!["/bin/sh".to_string(), "-c".to_string()],
713        args: vec![script],
714        artifact,
715    }
716}
717
718/// A `#[fragment]`: a plain helper carrying `host!` decls. Still
719/// `inventory`-based — a container's real body actually *calls* its
720/// fragments, so the symbol reference exists and DCE is not a concern.
721pub struct FragmentReg {
722    pub rust_name: &'static str,
723    pub host_paths: &'static [&'static str],
724    pub in_artifacts: &'static [&'static str],
725    pub out_artifacts: &'static [&'static str],
726    pub callees: &'static [&'static str],
727}
728inventory::collect!(FragmentReg);
729
730/// Fragment registry snapshot, passed to container `build`s.
731pub struct BuildCtx {
732    fragments: HashMap<&'static str, &'static FragmentReg>,
733    config: AthenaConfig,
734}
735
736impl BuildCtx {
737    /// Emit-only: gathers fragments AND loads `athena.toml`. Never called
738    /// in run-mode, so the in-pod binary needs no `athena.toml`.
739    pub fn collect() -> Self {
740        let mut fragments = HashMap::new();
741        for f in inventory::iter::<FragmentReg> {
742            fragments.insert(f.rust_name, f);
743        }
744        Self {
745            fragments,
746            config: AthenaConfig::load(),
747        }
748    }
749
750    pub fn config(&self) -> &AthenaConfig {
751        &self.config
752    }
753
754    /// Own literal decls ∪ the transitive `#[fragment]` closure for one
755    /// kind of declaration (deduped, stable order). `select` picks which
756    /// `FragmentReg` slice to pull from a reached fragment.
757    fn resolved(
758        &self,
759        own: &[&str],
760        own_callees: &[&str],
761        select: impl Fn(&FragmentReg) -> &'static [&'static str],
762    ) -> Vec<String> {
763        let mut out: Vec<String> = Vec::new();
764        let mut seen: HashSet<String> = HashSet::new();
765        let mut push = |p: &str, out: &mut Vec<String>| {
766            if seen.insert(p.to_string()) {
767                out.push(p.to_string());
768            }
769        };
770        for p in own {
771            push(p, &mut out);
772        }
773        let mut queue: Vec<&str> = own_callees.to_vec();
774        let mut visited: HashSet<&str> = HashSet::new();
775        while let Some(c) = queue.pop() {
776            if !visited.insert(c) {
777                continue;
778            }
779            if let Some(f) = self.fragments.get(c) {
780                for p in select(f) {
781                    push(p, &mut out);
782                }
783                queue.extend(f.callees.iter().copied());
784            }
785        }
786        out
787    }
788
789    /// hostPaths: own `host!`s ∪ fragment closure.
790    pub fn resolved_host_paths(&self, own: &[&str], callees: &[&str]) -> Vec<String> {
791        self.resolved(own, callees, |f| f.host_paths)
792    }
793
794    /// Input artifact ports: own `load_artifact*!`s ∪ fragment closure.
795    pub fn resolved_in_artifacts(&self, own: &[&str], callees: &[&str]) -> Vec<String> {
796        self.resolved(own, callees, |f| f.in_artifacts)
797    }
798
799    /// Output artifact ports: own `save_artifact*!`s ∪ fragment closure.
800    pub fn resolved_out_artifacts(&self, own: &[&str], callees: &[&str]) -> Vec<String> {
801        self.resolved(own, callees, |f| f.out_artifacts)
802    }
803}
804
805fn archive_none() -> api::ArchiveStrategy {
806    api::ArchiveStrategy {
807        none: Some(api::NoneStrategy {}),
808    }
809}
810
811/// Build an Argo S3 location (artifact-repository creds from `athena.toml`)
812/// for an exact object `key`.
813pub fn s3_loc(s3: &S3Repo, key: &str) -> api::S3Artifact {
814    api::S3Artifact {
815        endpoint: s3.endpoint.clone(),
816        bucket: s3.bucket.clone(),
817        region: s3.region.clone(),
818        insecure: s3.insecure,
819        key: key.to_string(),
820        access_key_secret: Some(api::SecretKeySelector {
821            name: s3.access_key_secret.name.clone(),
822            key: s3.access_key_secret.key.clone(),
823        }),
824        secret_key_secret: Some(api::SecretKeySelector {
825            name: s3.secret_key_secret.name.clone(),
826            key: s3.secret_key_secret.key.clone(),
827        }),
828    }
829}
830
831/// A valid Argo artifact identifier derived from an S3 key (which may
832/// contain `/`, `.`). The key itself is preserved in `s3.key`.
833fn artifact_ident(key: &str) -> String {
834    let mut s: String = key
835        .chars()
836        .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
837        .collect();
838    s = s.trim_matches('-').to_ascii_lowercase();
839    if s.is_empty() {
840        s.push('a');
841    }
842    s
843}
844
845/// `load_artifact!("key")` input ports: Argo pulls the exact S3 object
846/// `key` from the configured repo into the pod (raw, `archive: none`).
847pub fn artifact_inputs(ctx: &BuildCtx, keys: &[String]) -> Vec<api::Artifact> {
848    let s3 = &ctx.config().artifact_repository.s3;
849    keys.iter()
850        .map(|k| api::Artifact {
851            name: artifact_ident(k),
852            path: format!("{}/{k}", rt::IN_DIR),
853            s3: Some(s3_loc(s3, k)),
854            archive: Some(archive_none()),
855            mode: None,
856        })
857        .collect()
858}
859
860/// `save_artifact!("key")` output ports: Argo pushes the written file to
861/// the exact S3 object `key` in the configured repo (raw, `archive: none`).
862pub fn artifact_outputs(ctx: &BuildCtx, keys: &[String]) -> Vec<api::Artifact> {
863    let s3 = &ctx.config().artifact_repository.s3;
864    keys.iter()
865        .map(|k| api::Artifact {
866            name: artifact_ident(k),
867            path: format!("{}/{k}", rt::OUT_DIR),
868            s3: Some(s3_loc(s3, k)),
869            archive: Some(archive_none()),
870            mode: None,
871        })
872        .collect()
873}
874
875/// Accumulates the reachable templates (as `WorkflowTemplate`s) and the
876/// run-mode dispatch table while `Template::collect` walks the closure.
877pub struct Collector {
878    seen: HashSet<String>,
879    /// Deferred so `athena.toml` is read only at emit, never run-mode.
880    builders: Vec<fn(&BuildCtx) -> api::Template>,
881    runners: HashMap<String, fn(serde_json::Value) -> serde_json::Value>,
882    /// `<argo name> -> <on_exit handler argo name>` for *every* template
883    /// with an `on_exit` (not just the root). Each WorkflowTemplate
884    /// carries its own `spec.hooks.exit`; Argo only fires the hook of
885    /// the workflow that is actually submitted (workflow-scoped), so
886    /// submitting a sub-workflow's template directly runs its own hook.
887    exits: HashMap<String, &'static str>,
888    /// `<argo name> -> ttlStrategy` for every template with `ttl(..)`.
889    /// Stamped onto that WorkflowTemplate's `spec.ttlStrategy` (same
890    /// per-WT, workflow-scoped semantics as `exits`).
891    ttl: HashMap<String, crate::api::TtlStrategy>,
892    /// `<argo name> -> podGC strategy` for every template with
893    /// `pod_gc(..)`. Stamped onto that WT's `spec.podGC`.
894    pod_gc: HashMap<String, String>,
895    /// `<argo name> -> activeDeadlineSeconds` for every template with
896    /// `active_deadline_if_root(..)`. Stamped onto that WT's
897    /// `spec.activeDeadlineSeconds` (same per-WT, root-only as `ttl`).
898    active_deadline: HashMap<String, i64>,
899    /// `<argo name> -> stringified input types` (parallel to the
900    /// template's INPUTS), for `container emulate` arg type-checking.
901    types: HashMap<String, &'static [&'static str]>,
902    /// Argo names of athena-synthesized templates (`Template::SYNTHETIC`)
903    /// so `workflow ls` can hide them by default.
904    synthetic: HashSet<String>,
905}
906
907impl Default for Collector {
908    fn default() -> Self {
909        Self::new()
910    }
911}
912
913impl Collector {
914    pub fn new() -> Self {
915        Self {
916            seen: HashSet::new(),
917            builders: Vec::new(),
918            runners: HashMap::new(),
919            exits: HashMap::new(),
920            ttl: HashMap::new(),
921            pod_gc: HashMap::new(),
922            active_deadline: HashMap::new(),
923            types: HashMap::new(),
924            synthetic: HashSet::new(),
925        }
926    }
927
928    /// Returns `false` if `argo_name` was already collected (generated
929    /// `collect` impls return early in that case — dedup + cycle guard).
930    pub fn enter(&mut self, argo_name: &str) -> bool {
931        self.seen.insert(argo_name.to_string())
932    }
933
934    /// Register a template's `build` fn (invoked lazily at emit).
935    pub fn add_builder(&mut self, build: fn(&BuildCtx) -> api::Template) {
936        self.builders.push(build);
937    }
938
939    /// Register a template by type: its `build` fn plus, if it sets
940    /// `on_exit_if_root`, its exit handler keyed by Argo name (so
941    /// `emit` can put `spec.hooks.exit` on *that* WorkflowTemplate).
942    pub fn add<T: Template>(&mut self) {
943        self.builders.push(T::build);
944        if let Some(handler) = T::ON_EXIT {
945            self.exits.insert(T::ARGO_NAME.to_string(), handler);
946        }
947        if let Some(t) = T::TTL {
948            self.ttl.insert(T::ARGO_NAME.to_string(), t);
949        }
950        if let Some(s) = T::POD_GC {
951            self.pod_gc.insert(T::ARGO_NAME.to_string(), s.to_string());
952        }
953        if let Some(s) = T::ACTIVE_DEADLINE_IF_ROOT {
954            self.active_deadline.insert(T::ARGO_NAME.to_string(), s);
955        }
956        if !T::INPUT_TYPES.is_empty() {
957            self.types.insert(T::ARGO_NAME.to_string(), T::INPUT_TYPES);
958        }
959        if T::SYNTHETIC {
960            self.synthetic.insert(T::ARGO_NAME.to_string());
961        }
962    }
963
964    pub fn add_runner(&mut self, argo_name: &str, run: fn(serde_json::Value) -> serde_json::Value) {
965        self.runners.insert(argo_name.to_string(), run);
966    }
967
968    /// Emit the multi-document stream: one `WorkflowTemplate` per template
969    /// plus a runnable `Workflow` for the entrypoint `E`. Builds the
970    /// `BuildCtx` (and reads `athena.toml`) here — emit only.
971    /// Emit the multi-doc YAML. `with_workflow` appends a convenience
972    /// runnable `Workflow` (`generateName`, `workflowTemplateRef` →
973    /// root) — off by default: the deterministic, stable-named
974    /// `WorkflowTemplate`s are the artifact you register/GitOps, and
975    /// runs are triggered with `argo submit --from
976    /// workflowtemplate/<root>`. The convenience Workflow is opt-in for
977    /// quick demos / `kubectl create -f -`.
978    /// The deterministic `WorkflowTemplate` set `emit` serializes —
979    /// every reachable template, sorted, with each `on_exit_if_root`
980    /// hook stamped on its own template. Shared by YAML emit and the
981    /// `CARGO_ATHENA_EMIT_JSON` mode `cargo athena submit` consumes for
982    /// its register/drift checks.
983    pub fn build_templates(&self) -> Vec<api::WorkflowTemplate> {
984        let ctx = BuildCtx::collect();
985        let mut tpls: Vec<api::WorkflowTemplate> = self
986            .builders
987            .iter()
988            .map(|b| {
989                let inner = b(&ctx);
990                wrap_workflow_template(inner.name.clone(), inner)
991            })
992            .collect();
993        tpls.sort_by_key(name_of);
994
995        // `on_exit_if_root`: EVERY template that declares it carries the
996        // exit hook on its OWN WorkflowTemplate's `spec.hooks.exit`
997        // (`templateRef` — the legacy `spec.onExit` name-string can't
998        // cross the one-WT-per-template model). Argo only fires the hook
999        // of the workflow actually submitted (workflow-scoped, proven on
1000        // real Argo v4.0.5 via both `argo submit --from` and a
1001        // `workflowTemplateRef` Workflow); a templateRef'd sub-workflow's
1002        // own hook stays inert when nested — but submit that sub-WT
1003        // directly and its own hook fires.
1004        for t in tpls.iter_mut() {
1005            let name = name_of(t);
1006            if let Some(handler) = self.exits.get(&name)
1007                && let Some(spec) = t.spec.as_mut()
1008            {
1009                spec.hooks.insert(
1010                    "exit".to_string(),
1011                    api::LifecycleHook {
1012                        template_ref: Some(api::TemplateRef {
1013                            name: handler.to_string(),
1014                            template: handler.to_string(),
1015                            cluster_scope: false,
1016                        }),
1017                        ..Default::default()
1018                    },
1019                );
1020            }
1021            // `ttl(..)`/`pod_gc(..)` are WorkflowSpec-scoped: stamped on
1022            // each declaring template's OWN WorkflowTemplate (Argo runs
1023            // GC workflow-scoped, same per-WT model as the exit hook).
1024            if let Some(ttl) = self.ttl.get(&name)
1025                && let Some(spec) = t.spec.as_mut()
1026            {
1027                spec.ttl_strategy = Some(ttl.clone());
1028            }
1029            if let Some(s) = self.pod_gc.get(&name)
1030                && let Some(spec) = t.spec.as_mut()
1031            {
1032                spec.pod_gc = Some(api::PodGc {
1033                    strategy: s.clone(),
1034                });
1035            }
1036            // `active_deadline_if_root` is the genuine whole-workflow
1037            // runtime cap (Argo applies `Template.timeout` /
1038            // `Template.activeDeadlineSeconds` to neither dag nor steps);
1039            // root-only, same per-WT plumbing as `ttl`/`pod_gc`.
1040            if let Some(s) = self.active_deadline.get(&name)
1041                && let Some(spec) = t.spec.as_mut()
1042            {
1043                spec.active_deadline_seconds = Some(*s);
1044            }
1045        }
1046        tpls
1047    }
1048
1049    pub fn emit<E: Template>(&self, with_workflow: bool) -> String {
1050        let tpls = self.build_templates();
1051        let mut docs: Vec<String> = tpls
1052            .iter()
1053            .map(|t| serde_norway::to_string(t).expect("WorkflowTemplate is serializable"))
1054            .collect();
1055
1056        if !with_workflow {
1057            return docs.join("---\n");
1058        }
1059
1060        let ctx = BuildCtx::collect();
1061        let wf = api::Workflow {
1062            api_version: api::API_VERSION.to_string(),
1063            kind: api::KIND_WORKFLOW.to_string(),
1064            metadata: Some(api::ObjectMeta {
1065                generate_name: format!("{}-", E::ARGO_NAME),
1066                ..Default::default()
1067            }),
1068            spec: Some(api::WorkflowSpec {
1069                workflow_template_ref: Some(api::WorkflowTemplateRef {
1070                    name: E::ARGO_NAME.to_string(),
1071                    cluster_scope: false,
1072                }),
1073                service_account_name: ctx.config().defaults.service_account.clone(),
1074                // Only the emit root's on_exit reaches a runnable Workflow,
1075                // and it must be `hooks.exit` with a templateRef (the
1076                // legacy `spec.onExit` name-string can't cross the
1077                // one-WT-per-template wormhole).
1078                hooks: E::ON_EXIT
1079                    .map(|n| {
1080                        let mut m = std::collections::BTreeMap::new();
1081                        m.insert(
1082                            "exit".to_string(),
1083                            api::LifecycleHook {
1084                                template_ref: Some(api::TemplateRef {
1085                                    name: n.to_string(),
1086                                    template: n.to_string(),
1087                                    cluster_scope: false,
1088                                }),
1089                                ..Default::default()
1090                            },
1091                        );
1092                        m
1093                    })
1094                    .unwrap_or_default(),
1095                // Only the emit root's ttl/pod_gc/active_deadline reaches
1096                // the runnable Workflow (same workflow-scoped reasoning
1097                // as the hook).
1098                ttl_strategy: E::TTL,
1099                active_deadline_seconds: E::ACTIVE_DEADLINE_IF_ROOT,
1100                pod_gc: E::POD_GC.map(|s| api::PodGc {
1101                    strategy: s.to_string(),
1102                }),
1103                ..Default::default()
1104            }),
1105        };
1106        docs.push(serde_norway::to_string(&wf).expect("Workflow is serializable"));
1107        docs.join("---\n")
1108    }
1109}
1110
1111fn name_of(t: &api::WorkflowTemplate) -> String {
1112    t.metadata
1113        .as_ref()
1114        .map(|m| m.name.clone())
1115        .unwrap_or_default()
1116}
1117
1118/// Wrap one inner Argo `template` as a standalone `WorkflowTemplate` whose
1119/// resource name == the inner template name == its entrypoint.
1120pub fn wrap_workflow_template(name: String, inner: api::Template) -> api::WorkflowTemplate {
1121    api::WorkflowTemplate {
1122        api_version: api::API_VERSION.to_string(),
1123        kind: api::KIND_WORKFLOW_TEMPLATE.to_string(),
1124        metadata: Some(api::ObjectMeta {
1125            name: name.clone(),
1126            ..Default::default()
1127        }),
1128        spec: Some(api::WorkflowSpec {
1129            entrypoint: name,
1130            templates: vec![inner],
1131            arguments: None,
1132            workflow_template_ref: None,
1133            ..Default::default()
1134        }),
1135    }
1136}
1137
1138/// kebab-case an Argo identifier (DNS-1123-ish) from a Rust ident.
1139pub fn kebab(s: &str) -> String {
1140    s.replace('_', "-").to_ascii_lowercase()
1141}
1142
1143fn volume_name(path: &str) -> String {
1144    let mut n: String = path
1145        .chars()
1146        .map(|c| if c.is_ascii_alphanumeric() { c } else { '-' })
1147        .collect();
1148    n = n.trim_matches('-').to_string();
1149    if n.is_empty() {
1150        n.push('v');
1151    }
1152    format!("host-{n}")
1153}
1154
1155/// `volumes` + `volumeMounts` for a set of hostPaths (from `host!`).
1156pub fn host_path_volumes(paths: &[String]) -> (Vec<api::Volume>, Vec<api::VolumeMount>) {
1157    let mut vols = Vec::new();
1158    let mut mounts = Vec::new();
1159    for p in paths {
1160        let name = volume_name(p);
1161        vols.push(api::Volume {
1162            name: name.clone(),
1163            host_path: Some(api::HostPathVolumeSource {
1164                path: p.clone(),
1165                r#type: String::new(),
1166            }),
1167            ..Default::default()
1168        });
1169        mounts.push(api::VolumeMount {
1170            name,
1171            mount_path: p.clone(),
1172            read_only: false,
1173        });
1174    }
1175    (vols, mounts)
1176}
1177
1178/// Every container template's volumes/mounts: the always-present
1179/// `emptyDir` scratch at [`ATHENA_DIR`] (binary tarball, in/out artifact
1180/// ports, extraction, result — all writable on any image) followed by the
1181/// declared hostPaths.
1182pub fn container_volumes(host_paths: &[String]) -> (Vec<api::Volume>, Vec<api::VolumeMount>) {
1183    let mut vols = vec![api::Volume {
1184        name: SCRATCH_VOLUME.to_string(),
1185        empty_dir: Some(api::EmptyDirVolumeSource {}),
1186        ..Default::default()
1187    }];
1188    let mut mounts = vec![api::VolumeMount {
1189        name: SCRATCH_VOLUME.to_string(),
1190        mount_path: ATHENA_DIR.to_string(),
1191        read_only: false,
1192    }];
1193    let (hv, hm) = host_path_volumes(host_paths);
1194    vols.extend(hv);
1195    mounts.extend(hm);
1196    (vols, mounts)
1197}
1198
1199/// The entrypoint a user's `main` calls, parameterised by the root
1200/// workflow type. Referencing `E` force-links the entire reachable
1201/// closure (each `collect` calls callees' `collect` directly).
1202pub fn entrypoint<E: Template>() {
1203    let mut collector = Collector::new();
1204    E::collect(&mut collector);
1205
1206    let args: Vec<String> = std::env::args().collect();
1207    let target = args
1208        .iter()
1209        .position(|a| a == "--cargo-athena-template")
1210        .and_then(|i| args.get(i + 1).cloned())
1211        .or_else(|| std::env::var("CARGO_ATHENA_TEMPLATE").ok());
1212
1213    if let Some(t) = target {
1214        let run = *collector
1215            .runners
1216            .get(&t)
1217            .unwrap_or_else(|| panic!("no runnable container template named {t:?}"));
1218        // Base inputs: `cargo athena run --input` (local), then overlay
1219        // Argo-supplied params delivered as `ATHENA_PARAM_<name>` env vars
1220        // (set on the container template from `{{inputs.parameters.*}}`).
1221        let mut input = std::env::var("CARGO_ATHENA_INPUT")
1222            .ok()
1223            .map(|s| serde_json::from_str(&s).expect("CARGO_ATHENA_INPUT must be JSON"))
1224            .unwrap_or(serde_json::Value::Object(Default::default()));
1225        if let serde_json::Value::Object(map) = &mut input {
1226            for (k, v) in std::env::vars() {
1227                if let Some(name) = k.strip_prefix("ATHENA_PARAM_") {
1228                    // A param is JSON if it parses, else a bare string.
1229                    let val =
1230                        serde_json::from_str(&v).unwrap_or(serde_json::Value::String(v.clone()));
1231                    map.insert(name.to_string(), val);
1232                }
1233            }
1234        }
1235        let output = run(input);
1236        if let Ok(path) = std::env::var("CARGO_ATHENA_OUTPUT") {
1237            std::fs::write(path, output.to_string()).expect("write CARGO_ATHENA_OUTPUT");
1238        } else {
1239            println!("{output}");
1240        }
1241        return;
1242    }
1243
1244    // `cargo athena container ls` sets this to enumerate every reachable
1245    // template's metadata as a JSON array (same per-template derivation
1246    // as describe — so names/params shown are exactly what runs).
1247    if std::env::var_os("CARGO_ATHENA_LIST").is_some() {
1248        let ctx = BuildCtx::collect();
1249        let all: Vec<ContainerRunMeta> = collector
1250            .builders
1251            .iter()
1252            .map(|b| {
1253                let t = b(&ctx);
1254                let it = collector.types.get(&t.name).copied().unwrap_or(&[]);
1255                let mut m = ContainerRunMeta::from_template(&t, it);
1256                m.synthetic = collector.synthetic.contains(&t.name);
1257                m
1258            })
1259            .collect();
1260        println!(
1261            "{}",
1262            serde_json::to_string(&all).expect("ContainerRunMeta is serializable")
1263        );
1264        return;
1265    }
1266
1267    // `cargo athena container emulate/describe` sets this to fetch ONE
1268    // template's metadata as JSON (it then realizes that exact spec
1269    // locally via docker/podman). Reusing `Template::build` here is what
1270    // makes the local run identical to Argo by construction — same
1271    // image, bootstrap, env, volumes, and artifacts as `emit`.
1272    if let Ok(name) = std::env::var("CARGO_ATHENA_DESCRIBE") {
1273        let ctx = BuildCtx::collect();
1274        let tpl = collector
1275            .builders
1276            .iter()
1277            .map(|b| b(&ctx))
1278            .find(|t| t.name == name)
1279            .unwrap_or_else(|| panic!("no template named {name:?}"));
1280        let input_types = collector.types.get(&name).copied().unwrap_or(&[]);
1281        let mut meta = ContainerRunMeta::from_template(&tpl, input_types);
1282        meta.synthetic = collector.synthetic.contains(&name);
1283        println!(
1284            "{}",
1285            serde_json::to_string(&meta).expect("ContainerRunMeta is serializable")
1286        );
1287        return;
1288    }
1289
1290    // `cargo athena submit` sets this to get the deterministic
1291    // `WorkflowTemplate` set as a JSON array (structured — for its
1292    // register-if-missing / drift-detect / apply checks), instead of
1293    // re-parsing the YAML `emit` prints.
1294    if std::env::var_os("CARGO_ATHENA_EMIT_JSON").is_some() {
1295        println!(
1296            "{}",
1297            serde_json::to_string(&collector.build_templates())
1298                .expect("WorkflowTemplate is serializable")
1299        );
1300        return;
1301    }
1302
1303    // `cargo athena emit --with-workflow` sets this on the child so the
1304    // convenience runnable Workflow is appended (default: templates
1305    // only — deterministic, `kubectl apply`-able, GitOps-clean).
1306    let with_workflow = std::env::var_os("CARGO_ATHENA_WITH_WORKFLOW").is_some_and(|v| v == "1");
1307    print!("{}", collector.emit::<E>(with_workflow));
1308}
1309
1310#[cfg(test)]
1311mod tests {
1312    use super::*;
1313
1314    #[test]
1315    fn kebab_lowercases_and_hyphenates() {
1316        assert_eq!(kebab("run_a_container"), "run-a-container");
1317        assert_eq!(kebab("RunFoo"), "runfoo");
1318    }
1319
1320    #[test]
1321    fn volume_name_is_dns_safe() {
1322        assert_eq!(volume_name("/etc/myapp"), "host-etc-myapp");
1323        assert_eq!(volume_name("/var/lib/extra"), "host-var-lib-extra");
1324    }
1325}