Skip to main content

cellos_supervisor/
spec_input.rs

1//! Cell spec path reading (`O_NOFOLLOW` on Unix) and NATS subject template resolution.
2
3use std::fs;
4use std::io::Read;
5use std::path::Path;
6
7use anyhow::Context;
8use sha2::{Digest, Sha256};
9
10/// Read the cell spec. If `path` is `-`, reads the entire document from **stdin** (no `O_NOFOLLOW`;
11/// caller supplies bytes from a sealed pipe or CI artifact). Otherwise reads from a file: on Unix,
12/// opens with `O_NOFOLLOW` on the final path component so a swapped-in symlink cannot be followed.
13pub fn read_cell_spec(path: &Path) -> anyhow::Result<String> {
14    if path.as_os_str() == std::ffi::OsStr::new("-") {
15        let mut raw = String::new();
16        std::io::stdin()
17            .read_to_string(&mut raw)
18            .context("read cell spec from stdin")?;
19        return Ok(raw);
20    }
21
22    #[cfg(unix)]
23    {
24        use std::os::unix::fs::OpenOptionsExt;
25        let mut opts = fs::OpenOptions::new();
26        opts.read(true);
27        opts.custom_flags(libc::O_RDONLY | libc::O_NOFOLLOW);
28        let mut file = opts
29            .open(path)
30            .with_context(|| format!("read cell spec {}", path.display()))?;
31        let mut raw = String::new();
32        file.read_to_string(&mut raw)
33            .with_context(|| format!("read cell spec {}", path.display()))?;
34        Ok(raw)
35    }
36    #[cfg(not(unix))]
37    {
38        fs::read_to_string(path).with_context(|| format!("read cell spec {}", path.display()))
39    }
40}
41
42/// Compute the SHA-256 of raw spec bytes and return the lowercase hex digest.
43///
44/// This digest is included in `lifecycle.started` so the audit trail can detect
45/// spec file tampering between submission and execution.  The hash covers the
46/// raw bytes as-read — JSON whitespace and key ordering are not normalized, so
47/// the hash changes if anyone edits the file.
48pub fn spec_sha256(raw: &str) -> String {
49    let digest = Sha256::digest(raw.as_bytes());
50    digest.iter().map(|b| format!("{b:02x}")).collect()
51}
52
53/// Token substituted for `{tenantId}` when a spec has no `correlation.tenantId`.
54///
55/// Mirrors the canonical "absent tenant" placeholder used by
56/// `cellos-sink-jetstream` so multi-tenant subject namespaces stay structurally
57/// uniform whether or not the operator declared a tenant.
58pub const TENANT_ID_DEFAULT_TOKEN: &str = "_no_tenant_";
59
60/// Resolve the NATS event subject for this supervisor run.
61///
62/// Precedence (highest first):
63///
64/// 1. `CELLOS_EVENTS_SUBJECT_TEMPLATE` (non-empty): substitute `{spec_id}`,
65///    `{run_id}`, and `{tenantId}` verbatim.
66/// 2. `CELLOS_TENANT_ID` (T12 RBAC, non-empty): produce
67///    `cellos.events.<CELLOS_TENANT_ID>.<spec_id>.<run_id>` so multi-tenant
68///    deployments get per-tenant subject namespaces without an explicit
69///    template. This is the 1.0 RBAC default for operators that pin a single
70///    tenant per supervisor process. Preserves backward compat: unset
71///    `CELLOS_TENANT_ID` keeps the legacy literal-subject behaviour below.
72/// 3. `CELL_OS_EVENTS_SUBJECT` (legacy literal): used verbatim.
73/// 4. Default: `cellos.events.v1`.
74///
75/// `tenant_id` (the spec-side `correlation.tenantId`) is `None` when the spec
76/// has no `correlation.tenantId`; in that case the literal token
77/// [`TENANT_ID_DEFAULT_TOKEN`] is substituted in the template path, matching
78/// the convention used by the JetStream sink. Callers SHOULD have already
79/// validated the tenant id with
80/// [`cellos_core::validate_tenant_id_for_subject_token`] at admission so no
81/// NATS subject-token reserved char (`.`, `*`, `>`, whitespace) reaches the
82/// wire. This function does NOT re-validate — substitution is verbatim.
83pub fn resolve_event_subject(spec_id: &str, run_id: &str, tenant_id: Option<&str>) -> String {
84    let tmpl = std::env::var("CELLOS_EVENTS_SUBJECT_TEMPLATE").unwrap_or_default();
85    if !tmpl.trim().is_empty() {
86        let tenant = tenant_id.unwrap_or(TENANT_ID_DEFAULT_TOKEN);
87        return tmpl
88            .replace("{spec_id}", spec_id)
89            .replace("{run_id}", run_id)
90            .replace("{tenantId}", tenant);
91    }
92    // T12 multi-tenant default: when CELLOS_TENANT_ID is set on the
93    // supervisor process, lift it into the subject so every event for this
94    // run lands on a tenant-namespaced subject. The supervisor is single-
95    // tenant-per-process under T12 (one CELLOS_TENANT_ID), so we burn the
96    // tenant directly into the subject rather than templating per event.
97    // Unset/empty preserves byte-identical pre-T12 behaviour.
98    if let Ok(env_tenant) = std::env::var("CELLOS_TENANT_ID") {
99        let env_tenant = env_tenant.trim();
100        if !env_tenant.is_empty() {
101            return format!("cellos.events.{env_tenant}.{spec_id}.{run_id}");
102        }
103    }
104    std::env::var("CELL_OS_EVENTS_SUBJECT").unwrap_or_else(|_| "cellos.events.v1".into())
105}
106
107#[cfg(test)]
108mod tests {
109    use super::resolve_event_subject;
110
111    // Guard env-var mutation so resolve_event_subject tests don't race.
112    static SUBJECT_MUTEX: std::sync::Mutex<()> = std::sync::Mutex::new(());
113
114    #[test]
115    fn resolve_subject_default_when_neither_var_set() {
116        let _g = SUBJECT_MUTEX.lock().unwrap();
117        std::env::remove_var("CELLOS_EVENTS_SUBJECT_TEMPLATE");
118        std::env::remove_var("CELL_OS_EVENTS_SUBJECT");
119        assert_eq!(
120            resolve_event_subject("my-spec", "run-1", None),
121            "cellos.events.v1"
122        );
123    }
124
125    #[test]
126    fn resolve_subject_literal_from_env() {
127        let _g = SUBJECT_MUTEX.lock().unwrap();
128        std::env::remove_var("CELLOS_EVENTS_SUBJECT_TEMPLATE");
129        std::env::set_var("CELL_OS_EVENTS_SUBJECT", "cellos.events.custom");
130        let result = resolve_event_subject("my-spec", "run-1", None);
131        std::env::remove_var("CELL_OS_EVENTS_SUBJECT");
132        assert_eq!(result, "cellos.events.custom");
133    }
134
135    #[test]
136    fn resolve_subject_template_substitutes_spec_id_and_run_id() {
137        let _g = SUBJECT_MUTEX.lock().unwrap();
138        std::env::set_var(
139            "CELLOS_EVENTS_SUBJECT_TEMPLATE",
140            "cellos.{spec_id}.{run_id}.v1",
141        );
142        let result = resolve_event_subject("demo-cell", "run-42", None);
143        std::env::remove_var("CELLOS_EVENTS_SUBJECT_TEMPLATE");
144        assert_eq!(result, "cellos.demo-cell.run-42.v1");
145    }
146
147    #[test]
148    fn resolve_subject_template_takes_priority_over_literal_env() {
149        let _g = SUBJECT_MUTEX.lock().unwrap();
150        std::env::set_var("CELLOS_EVENTS_SUBJECT_TEMPLATE", "cellos.{spec_id}.v1");
151        std::env::set_var("CELL_OS_EVENTS_SUBJECT", "cellos.events.custom");
152        let result = resolve_event_subject("cell-abc", "run-1", None);
153        std::env::remove_var("CELLOS_EVENTS_SUBJECT_TEMPLATE");
154        std::env::remove_var("CELL_OS_EVENTS_SUBJECT");
155        assert_eq!(result, "cellos.cell-abc.v1");
156    }
157
158    // ── T12: CELLOS_TENANT_ID multi-tenant subject default ──────────────────
159
160    #[test]
161    fn resolve_subject_cellos_tenant_id_produces_namespaced_subject() {
162        let _g = SUBJECT_MUTEX.lock().unwrap();
163        std::env::remove_var("CELLOS_EVENTS_SUBJECT_TEMPLATE");
164        std::env::remove_var("CELL_OS_EVENTS_SUBJECT");
165        std::env::set_var("CELLOS_TENANT_ID", "tenant-a");
166        let result = resolve_event_subject("demo-cell", "run-42", None);
167        std::env::remove_var("CELLOS_TENANT_ID");
168        assert_eq!(result, "cellos.events.tenant-a.demo-cell.run-42");
169    }
170
171    #[test]
172    fn resolve_subject_no_cellos_tenant_id_preserves_legacy_default() {
173        let _g = SUBJECT_MUTEX.lock().unwrap();
174        std::env::remove_var("CELLOS_EVENTS_SUBJECT_TEMPLATE");
175        std::env::remove_var("CELL_OS_EVENTS_SUBJECT");
176        std::env::remove_var("CELLOS_TENANT_ID");
177        let result = resolve_event_subject("demo-cell", "run-42", None);
178        assert_eq!(result, "cellos.events.v1");
179    }
180
181    #[test]
182    fn resolve_subject_template_beats_cellos_tenant_id() {
183        let _g = SUBJECT_MUTEX.lock().unwrap();
184        std::env::set_var("CELLOS_EVENTS_SUBJECT_TEMPLATE", "cellos.{spec_id}.v1");
185        std::env::set_var("CELLOS_TENANT_ID", "tenant-a");
186        let result = resolve_event_subject("demo-cell", "run-42", None);
187        std::env::remove_var("CELLOS_EVENTS_SUBJECT_TEMPLATE");
188        std::env::remove_var("CELLOS_TENANT_ID");
189        assert_eq!(result, "cellos.demo-cell.v1");
190    }
191}