Skip to main content

cellos_broker_vault/
lib.rs

1//! [`SecretBroker`] that fetches secrets from HashiCorp Vault using AppRole authentication.
2//!
3//! # Overview
4//!
5//! AppRole is the canonical machine-to-machine authentication method for Vault.
6//! The operator pre-stages a `role_id` and `secret_id` for the runner; the broker
7//! exchanges them for a short-lived Vault token at resolve time, then reads the
8//! requested secret from Vault KV v2.
9//!
10//! # Configuration
11//!
12//! | Env var | Required | Description |
13//! |---------|----------|-------------|
14//! | `CELLOS_VAULT_ADDR` | yes | Vault server address, e.g. `https://vault.example.com` |
15//! | `CELLOS_VAULT_ROLE_ID` | yes | AppRole role_id |
16//! | `CELLOS_VAULT_SECRET_ID` | yes | AppRole secret_id |
17//! | `CELLOS_VAULT_KV_MOUNT` | no | KV v2 mount name (default: `secret`) |
18//! | `CELLOS_VAULT_KV_PATH_PREFIX` | no | Path prefix prepended to every key (e.g. `cellos/prod`) |
19//! | `CELLOS_VAULT_NAMESPACE` | no | Vault Enterprise namespace (`X-Vault-Namespace` header) |
20//! | `CELLOS_CA_BUNDLE` | no | PEM CA bundle for TLS to private Vault endpoints |
21//!
22//! # Secret path resolution
23//!
24//! For `resolve("DB_PASSWORD", ...)`, the broker reads from:
25//! ```text
26//! GET {CELLOS_VAULT_ADDR}/v1/{KV_MOUNT}/data/{PATH_PREFIX}/{key}
27//! ```
28//!
29//! If `CELLOS_VAULT_KV_PATH_PREFIX` is not set, no prefix is added and the key is
30//! used directly. The `data` wrapper is Vault KV v2 format.
31//!
32//! # Auth flow
33//!
34//! Classic `resolve(...)`:
35//! 1. POST `{addr}/v1/auth/approle/login` with `{ "role_id": "...", "secret_id": "..." }`
36//! 2. GET `{addr}/v1/{mount}/data/{path}` with `X-Vault-Token: {client_token}`
37//! 3. Extract `data.data.{key}` from the KV v2 response envelope
38//!
39//! A fresh token is obtained for each `resolve` call. This avoids persistent token
40//! state in the supervisor process and ensures each secret fetch is independently
41//! authenticated. Use a `use_limit=1` or short `ttl` policy on the AppRole if you
42//! want one-shot tokens.
43//!
44//! `runtimeLeasedBroker`:
45//! 1. `prepare_runtime_secret_lease(...)` performs a single AppRole login for the cell
46//! 2. `fetch_runtime_secret(...)` reuses the cached token for on-demand KV reads
47//! 3. `revoke_for_cell(...)` calls `/v1/auth/token/revoke-self` and drops the cached token
48//!
49//! # Revocation
50//!
51//! For classic env delivery, revocation still depends on short Vault TTLs because a raw
52//! secret has already been materialized into the child process. For `runtimeLeasedBroker`,
53//! `revoke_for_cell` actively revokes the cached Vault token and drops the local runtime
54//! channel, so future fetches fail both locally and upstream.
55//!
56//! # TLS
57//!
58//! Honors `CELLOS_CA_BUNDLE` (PEM file path) for Vault endpoints behind a private CA.
59//! `HTTP_PROXY`, `HTTPS_PROXY`, and `NO_PROXY` are respected automatically by reqwest.
60//!
61//! # Timeout contract (BROKER-VAULT-TIMEOUT)
62//!
63//! The reqwest client is built with **bounded** request and connect timeouts so a
64//! hung Vault endpoint cannot stall a cell's secret-resolve phase indefinitely:
65//!
66//! - Request timeout: [`DEFAULT_REQUEST_TIMEOUT_MS`] (override via
67//!   `CELLOS_VAULT_TIMEOUT_MS`). Default is 15 s — Vault calls are interactive
68//!   (login + KV read), so a shorter ceiling than artifact upload is appropriate.
69//! - Connect timeout: [`DEFAULT_CONNECT_TIMEOUT_MS`] (override via
70//!   `CELLOS_VAULT_CONNECT_TIMEOUT_MS`).
71//!
72//! Both env vars accept a positive `u64` count of milliseconds; unparseable or
73//! zero values fall back to the default. The client is **never** constructed
74//! without explicit timeouts.
75//!
76//! # Correlation propagation (Tranche-1 seam-freeze G1)
77//!
78//! Each cell run gets a fresh AppRole login (or a single login when
79//! `runtimeLeasedBroker` is active). The Vault `client_token` returned by
80//! that login is the natural seed for a broker-side correlation ID, but
81//! Vault tokens are sensitive material — we do **not** surface the token
82//! itself. [`SecretBroker::broker_correlation_id`] therefore returns `None`
83//! today, and the supervisor uses the operator-supplied
84//! `spec.correlation.correlationId` for cross-tool correlation. A future
85//! revision may emit `urn:vault:lease:<accessor>` once the seam-freeze
86//! consumer tools (taudit, tencrypt) commit to that shape — accessors are
87//! safe to expose because they bind to the lease's audit log entry rather
88//! than the credential.
89
90use std::collections::HashMap;
91use std::sync::Mutex;
92use std::time::Duration;
93
94use async_trait::async_trait;
95use cellos_core::ports::SecretBroker;
96use cellos_core::{CellosError, RuntimeSecretLeaseRequest, SecretView};
97use serde::Deserialize;
98use std::fmt;
99use tracing::instrument;
100use zeroize::{Zeroize, ZeroizeOnDrop};
101
102/// Default total request timeout (ms) applied to every Vault HTTP call.
103///
104/// 15 seconds: Vault interactions are interactive (login + KV read), so a
105/// tighter default than the artifact-upload sinks is appropriate. Long enough
106/// for a slow private network, short enough that a black-holed Vault endpoint
107/// does not block the secret-resolve phase indefinitely.
108pub const DEFAULT_REQUEST_TIMEOUT_MS: u64 = 15_000;
109
110/// Default TCP connect timeout (ms) for the underlying reqwest client.
111pub const DEFAULT_CONNECT_TIMEOUT_MS: u64 = 10_000;
112
113/// Env var to override [`DEFAULT_REQUEST_TIMEOUT_MS`].
114pub const ENV_REQUEST_TIMEOUT_MS: &str = "CELLOS_VAULT_TIMEOUT_MS";
115
116/// Env var to override [`DEFAULT_CONNECT_TIMEOUT_MS`].
117pub const ENV_CONNECT_TIMEOUT_MS: &str = "CELLOS_VAULT_CONNECT_TIMEOUT_MS";
118
119/// Resolve a timeout in milliseconds from the named env var.
120///
121/// Returns `default_ms` when the env var is unset, empty, non-numeric, or `0`.
122/// Pure function — exposed so callers (and contract tests) can verify the
123/// resolution policy without constructing a client.
124pub fn resolve_timeout_ms(env_var: &str, default_ms: u64) -> u64 {
125    match std::env::var(env_var) {
126        Ok(raw) => raw
127            .trim()
128            .parse::<u64>()
129            .ok()
130            .filter(|v| *v > 0)
131            .unwrap_or(default_ms),
132        Err(_) => default_ms,
133    }
134}
135
136/// Vault AppRole secret broker.
137///
138/// Authenticates with Vault per `resolve` call for classic env delivery, and can
139/// also hold a cell-scoped upstream token for `runtimeLeasedBroker`.
140///
141/// # D7 redaction
142///
143/// `Debug` is hand-written to redact both `role_id` and `secret_id` so
144/// accidental `tracing::debug!("{:?}", broker)` calls cannot leak the credential
145/// through logs. Zeroization on drop is delegated to a `ZeroizeOnDrop` derive
146/// that wipes `secret_id`; cached [`RuntimeVaultLease`] tokens are zeroized via
147/// their own `ZeroizeOnDrop` derive when the inner `Mutex<HashMap<..>>` drops.
148#[derive(ZeroizeOnDrop)]
149pub struct VaultAppRoleBroker {
150    #[zeroize(skip)]
151    client: reqwest::Client,
152    /// Vault server address — not sensitive.
153    #[zeroize(skip)]
154    addr: String,
155    /// AppRole role_id — not a credential itself but treated as non-public.
156    #[zeroize(skip)]
157    role_id: String,
158    /// AppRole secret_id — sensitive, zeroized on drop and redacted in Debug.
159    secret_id: String,
160    #[zeroize(skip)]
161    kv_mount: String,
162    #[zeroize(skip)]
163    kv_path_prefix: Option<String>,
164    #[zeroize(skip)]
165    namespace: Option<String>,
166    /// Runtime-lease map. Each contained [`RuntimeVaultLease`] derives
167    /// `ZeroizeOnDrop`, so dropping the map zeroizes every cached token.
168    #[zeroize(skip)]
169    runtime_leases: Mutex<HashMap<String, RuntimeVaultLease>>,
170}
171
172impl fmt::Debug for VaultAppRoleBroker {
173    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
174        f.debug_struct("VaultAppRoleBroker")
175            .field("addr", &self.addr)
176            .field("role_id", &"<redacted>")
177            .field("secret_id", &"<redacted>")
178            .field("kv_mount", &self.kv_mount)
179            .field("kv_path_prefix", &self.kv_path_prefix)
180            .field("namespace", &self.namespace)
181            // Mask the runtime-lease map by cardinality only — do not iterate
182            // tokens through Debug.
183            .field(
184                "runtime_leases",
185                &format_args!(
186                    "<{} cell(s)>",
187                    self.runtime_leases
188                        .lock()
189                        .map(|g| g.len())
190                        .unwrap_or_else(|e| e.into_inner().len())
191                ),
192            )
193            .finish()
194    }
195}
196
197#[derive(ZeroizeOnDrop)]
198struct RuntimeVaultLease {
199    /// Vault `client_token` — sensitive, zeroized on drop and redacted in Debug.
200    token: String,
201}
202
203impl fmt::Debug for RuntimeVaultLease {
204    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
205        f.debug_struct("RuntimeVaultLease")
206            .field("token", &"<REDACTED>")
207            .finish()
208    }
209}
210
211impl RuntimeVaultLease {
212    /// Explicit early zeroization, for paths that revoke a token before the
213    /// owning struct would otherwise drop (e.g. lease replacement).
214    fn zeroize(&mut self) {
215        self.token.zeroize();
216    }
217}
218
219// ── Vault HTTP response shapes ─────────────────────────────────────────────
220
221/// VAULT-CT-ZERO (reviewer wave 9, follow-up #1):
222///
223/// `serde::Deserialize` is not implemented for `zeroize::Zeroizing<String>` in
224/// the upstream `zeroize` crate, so we cannot wrap the field directly at the
225/// struct level. To minimize the un-zeroized window we:
226///
227/// 1. Keep `client_token: String` (serde-friendly), but
228/// 2. Use [`std::mem::take`] in [`VaultAppRoleBroker::login`] to *move* the
229///    token's heap buffer out of the parsed struct in a single op (leaving
230///    an empty `String` behind), then explicitly drop the parsed
231///    [`VaultLoginResponse`] before returning. The consumer (the caller of
232///    `login`) then wraps the moved buffer in [`zeroize::Zeroizing`] (or
233///    stores it in a [`RuntimeVaultLease`] that zeroizes on drop / revoke),
234///    so the only live `String` holding the token bytes is now under our
235///    zeroization regime.
236/// 3. Implement [`Debug`] manually so the token cannot leak via `{:?}` /
237///    `tracing::error!` spans on parse errors or future log additions.
238///
239/// We deliberately do NOT implement [`Drop`] on [`VaultLoginResponse`] /
240/// [`VaultAuth`] because that would forbid moving the inner `String` out at
241/// all (Rust forbids partial moves out of `Drop` types). The
242/// [`std::mem::take`] pattern is the canonical replacement: it leaves the
243/// original allocation in place to be freed by the normal struct drop while
244/// the consumer wraps the live token in [`zeroize::Zeroizing`].
245#[derive(Deserialize)]
246struct VaultLoginResponse {
247    auth: VaultAuth,
248}
249
250#[derive(Deserialize)]
251struct VaultAuth {
252    client_token: String,
253}
254
255impl std::fmt::Debug for VaultLoginResponse {
256    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
257        f.debug_struct("VaultLoginResponse")
258            .field("auth", &self.auth)
259            .finish()
260    }
261}
262
263impl std::fmt::Debug for VaultAuth {
264    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
265        f.debug_struct("VaultAuth")
266            .field("client_token", &"<redacted>")
267            .finish()
268    }
269}
270
271#[derive(Deserialize)]
272struct VaultKvResponse {
273    data: VaultKvDataWrapper,
274}
275
276#[derive(Deserialize)]
277struct VaultKvDataWrapper {
278    data: serde_json::Map<String, serde_json::Value>,
279}
280
281// ── CA bundle helper (same pattern as cellos-export-http) ─────────────────
282
283/// Build a reqwest client that honours `CELLOS_CA_BUNDLE` (path to a PEM CA bundle).
284///
285/// Always installs **bounded** request and connect timeouts (see module docs).
286fn http_client_builder() -> Result<reqwest::ClientBuilder, String> {
287    let request_timeout = Duration::from_millis(resolve_timeout_ms(
288        ENV_REQUEST_TIMEOUT_MS,
289        DEFAULT_REQUEST_TIMEOUT_MS,
290    ));
291    let connect_timeout = Duration::from_millis(resolve_timeout_ms(
292        ENV_CONNECT_TIMEOUT_MS,
293        DEFAULT_CONNECT_TIMEOUT_MS,
294    ));
295    let mut builder = reqwest::Client::builder()
296        .timeout(request_timeout)
297        .connect_timeout(connect_timeout);
298    if let Ok(path) = std::env::var("CELLOS_CA_BUNDLE") {
299        let pem =
300            std::fs::read(&path).map_err(|e| format!("CELLOS_CA_BUNDLE: read {path}: {e}"))?;
301        let mut added = 0usize;
302        for block in pem_cert_blocks(&pem) {
303            let cert = reqwest::Certificate::from_pem(&block)
304                .map_err(|e| format!("CELLOS_CA_BUNDLE: parse cert in {path}: {e}"))?;
305            builder = builder.add_root_certificate(cert);
306            added += 1;
307        }
308        if added == 0 {
309            return Err(format!("CELLOS_CA_BUNDLE: no certificates found in {path}"));
310        }
311        tracing::debug!(path = %path, count = added, "CELLOS_CA_BUNDLE: loaded CA certificates");
312    }
313    Ok(builder)
314}
315
316fn pem_cert_blocks(pem: &[u8]) -> Vec<Vec<u8>> {
317    let text = String::from_utf8_lossy(pem);
318    let mut blocks = Vec::new();
319    let mut current = String::new();
320    let mut in_block = false;
321    for line in text.lines() {
322        if line.starts_with("-----BEGIN ") {
323            in_block = true;
324            current.clear();
325        }
326        if in_block {
327            current.push_str(line);
328            current.push('\n');
329            if line.starts_with("-----END ") {
330                blocks.push(current.as_bytes().to_vec());
331                in_block = false;
332            }
333        }
334    }
335    blocks
336}
337
338// ── Implementation ─────────────────────────────────────────────────────────
339
340impl VaultAppRoleBroker {
341    /// Construct from environment variables. Reads all config at construction time
342    /// so misconfiguration fails at startup rather than at first secret resolve.
343    pub fn from_env() -> Result<Self, CellosError> {
344        let addr = std::env::var("CELLOS_VAULT_ADDR")
345            .map_err(|_| CellosError::SecretBroker("CELLOS_VAULT_ADDR not set".into()))?;
346        let addr = addr.trim().trim_end_matches('/').to_string();
347        if addr.is_empty() {
348            return Err(CellosError::SecretBroker(
349                "CELLOS_VAULT_ADDR is empty after trim".into(),
350            ));
351        }
352        let parsed = reqwest::Url::parse(&addr).map_err(|e| {
353            CellosError::SecretBroker(format!("CELLOS_VAULT_ADDR invalid URL: {e}"))
354        })?;
355        let scheme = parsed.scheme();
356        if scheme != "http" && scheme != "https" {
357            return Err(CellosError::SecretBroker(format!(
358                "CELLOS_VAULT_ADDR scheme must be http or https, got {scheme}"
359            )));
360        }
361
362        let role_id = std::env::var("CELLOS_VAULT_ROLE_ID")
363            .map_err(|_| CellosError::SecretBroker("CELLOS_VAULT_ROLE_ID not set".into()))?;
364        if role_id.trim().is_empty() {
365            return Err(CellosError::SecretBroker(
366                "CELLOS_VAULT_ROLE_ID is empty".into(),
367            ));
368        }
369
370        let secret_id = std::env::var("CELLOS_VAULT_SECRET_ID")
371            .map_err(|_| CellosError::SecretBroker("CELLOS_VAULT_SECRET_ID not set".into()))?;
372        if secret_id.trim().is_empty() {
373            return Err(CellosError::SecretBroker(
374                "CELLOS_VAULT_SECRET_ID is empty".into(),
375            ));
376        }
377
378        let kv_mount =
379            std::env::var("CELLOS_VAULT_KV_MOUNT").unwrap_or_else(|_| "secret".to_string());
380        let kv_mount = kv_mount.trim().trim_matches('/').to_string();
381
382        let kv_path_prefix = std::env::var("CELLOS_VAULT_KV_PATH_PREFIX")
383            .ok()
384            .map(|p| p.trim().trim_matches('/').to_string())
385            .filter(|p| !p.is_empty());
386
387        let namespace = std::env::var("CELLOS_VAULT_NAMESPACE")
388            .ok()
389            .map(|n| n.trim().to_string())
390            .filter(|n| !n.is_empty());
391
392        let client = http_client_builder()
393            .map_err(CellosError::SecretBroker)?
394            .build()
395            .map_err(|e| CellosError::SecretBroker(format!("vault http client init: {e}")))?;
396
397        Ok(Self {
398            client,
399            addr,
400            role_id,
401            secret_id,
402            kv_mount,
403            kv_path_prefix,
404            namespace,
405            runtime_leases: Mutex::new(HashMap::new()),
406        })
407    }
408
409    /// Authenticate with AppRole and return a short-lived Vault token.
410    async fn login(&self) -> Result<String, CellosError> {
411        let url = format!("{}/v1/auth/approle/login", self.addr);
412        let body = serde_json::json!({
413            "role_id": self.role_id,
414            "secret_id": self.secret_id,
415        });
416        let mut req = self.client.post(&url).json(&body);
417        if let Some(ref ns) = self.namespace {
418            req = req.header("X-Vault-Namespace", ns);
419        }
420        let resp = req
421            .send()
422            .await
423            .map_err(|e| CellosError::SecretBroker(format!("vault approle login request: {e}")))?;
424
425        if !resp.status().is_success() {
426            let status = resp.status();
427            let body = resp.text().await.unwrap_or_default();
428            return Err(CellosError::SecretBroker(format!(
429                "vault approle login returned {status}: {body}"
430            )));
431        }
432
433        let mut login: VaultLoginResponse = resp
434            .json()
435            .await
436            .map_err(|e| CellosError::SecretBroker(format!("vault login response parse: {e}")))?;
437
438        // VAULT-CT-ZERO: take ownership of the token's heap buffer immediately,
439        // leaving an empty `String` in the parsed struct. The caller wraps
440        // this into `Zeroizing<String>` (or moves it into a `RuntimeVaultLease`
441        // that is zeroized on drop / revoke), so the only `String` allocation
442        // holding the token bytes is now under our zeroization regime.
443        let token = std::mem::take(&mut login.auth.client_token);
444        drop(login);
445        tracing::debug!("vault approle login succeeded");
446        Ok(token)
447    }
448
449    /// Build the KV v2 path for a given key.
450    fn kv_path(&self, key: &str) -> String {
451        match &self.kv_path_prefix {
452            Some(prefix) => format!("{}/v1/{}/data/{}/{}", self.addr, self.kv_mount, prefix, key),
453            None => format!("{}/v1/{}/data/{}", self.addr, self.kv_mount, key),
454        }
455    }
456
457    /// Fetch a secret value from Vault KV v2 using an already-obtained token.
458    async fn fetch_secret(&self, token: &str, key: &str) -> Result<String, CellosError> {
459        let url = self.kv_path(key);
460        let mut req = self.client.get(&url).header("X-Vault-Token", token);
461        if let Some(ref ns) = self.namespace {
462            req = req.header("X-Vault-Namespace", ns);
463        }
464        let resp = req
465            .send()
466            .await
467            .map_err(|e| CellosError::SecretBroker(format!("vault kv read request: {e}")))?;
468
469        if resp.status().as_u16() == 404 {
470            return Err(CellosError::SecretBroker(format!(
471                "vault kv secret not found: {key}"
472            )));
473        }
474        if !resp.status().is_success() {
475            let status = resp.status();
476            let body = resp.text().await.unwrap_or_default();
477            return Err(CellosError::SecretBroker(format!(
478                "vault kv read returned {status}: {body}"
479            )));
480        }
481
482        let kv: VaultKvResponse = resp
483            .json()
484            .await
485            .map_err(|e| CellosError::SecretBroker(format!("vault kv response parse: {e}")))?;
486
487        // Extract the value from data.data[key]; fall back to first value if not present.
488        let value = kv
489            .data
490            .data
491            .get(key)
492            .or_else(|| kv.data.data.values().next())
493            .ok_or_else(|| {
494                CellosError::SecretBroker(format!(
495                    "vault kv secret {key:?} has no fields in data.data"
496                ))
497            })?;
498
499        match value {
500            serde_json::Value::String(s) => Ok(s.clone()),
501            other => Ok(other.to_string()),
502        }
503    }
504
505    async fn revoke_token(&self, token: &str) -> Result<(), CellosError> {
506        let url = format!("{}/v1/auth/token/revoke-self", self.addr);
507        let mut req = self.client.post(&url).header("X-Vault-Token", token);
508        if let Some(ref ns) = self.namespace {
509            req = req.header("X-Vault-Namespace", ns);
510        }
511        let resp = req.send().await.map_err(|e| {
512            CellosError::SecretBroker(format!("vault revoke-self request failed: {e}"))
513        })?;
514
515        if !resp.status().is_success() {
516            let status = resp.status();
517            let body = resp.text().await.unwrap_or_default();
518            return Err(CellosError::SecretBroker(format!(
519                "vault revoke-self returned {status}: {body}"
520            )));
521        }
522
523        Ok(())
524    }
525
526    fn take_runtime_lease(&self, cell_id: &str) -> Option<RuntimeVaultLease> {
527        self.runtime_leases
528            .lock()
529            .unwrap_or_else(|e| e.into_inner())
530            .remove(cell_id)
531    }
532
533    fn insert_runtime_lease(&self, cell_id: &str, lease: RuntimeVaultLease) {
534        if let Some(mut previous) = self
535            .runtime_leases
536            .lock()
537            .unwrap_or_else(|e| e.into_inner())
538            .insert(cell_id.to_string(), lease)
539        {
540            previous.zeroize();
541        }
542    }
543
544    /// Introspection surface (E2-03): does this broker hold a runtime lease for
545    /// `cell_id`? Used by tests to assert that a partial-resolve failure left
546    /// no orphan lease in the local map.
547    pub fn has_runtime_lease(&self, cell_id: &str) -> bool {
548        self.runtime_leases
549            .lock()
550            .unwrap_or_else(|e| e.into_inner())
551            .contains_key(cell_id)
552    }
553
554    /// Introspection surface (E2-03): total number of runtime leases currently
555    /// held by this broker. Used by tests to assert no orphan leases remain
556    /// across cells after a forced mid-resolve failure.
557    pub fn runtime_lease_count(&self) -> usize {
558        self.runtime_leases
559            .lock()
560            .unwrap_or_else(|e| e.into_inner())
561            .len()
562    }
563}
564
565#[async_trait]
566impl SecretBroker for VaultAppRoleBroker {
567    /// Resolves a secret from Vault KV v2.
568    ///
569    /// Performs AppRole login + KV read per call. No token cache.
570    #[instrument(skip(self), fields(key = %key, cell_id = %cell_id))]
571    async fn resolve(
572        &self,
573        key: &str,
574        cell_id: &str,
575        _ttl_seconds: u64,
576    ) -> Result<SecretView, CellosError> {
577        tracing::debug!(key = %key, cell_id = %cell_id, "resolving vault secret");
578        // VAULT-CT-ZERO: wrap the per-resolve Vault token in `Zeroizing` so its
579        // bytes are wiped on scope exit (success or error) before this function
580        // returns to the supervisor.
581        let token = zeroize::Zeroizing::new(self.login().await?);
582        let value = self.fetch_secret(token.as_str(), key).await?;
583        tracing::info!(key = %key, cell_id = %cell_id, "vault secret resolved");
584        Ok(SecretView {
585            key: key.to_string(),
586            value: zeroize::Zeroizing::new(value),
587        })
588    }
589
590    async fn prepare_runtime_secret_lease(
591        &self,
592        cell_id: &str,
593        requests: &[RuntimeSecretLeaseRequest],
594    ) -> Result<(), CellosError> {
595        if requests.is_empty() {
596            return Ok(());
597        }
598
599        if let Some(mut previous) = self.take_runtime_lease(cell_id) {
600            let revoke_result = self.revoke_token(&previous.token).await;
601            previous.zeroize();
602            revoke_result?;
603        }
604
605        // E2-03: From the moment login() returns, we hold a materialised
606        // upstream Vault lease. Any failure on the way out — including the
607        // per-key pre-warm validation below — must revoke that lease before
608        // returning Err, otherwise the cell's token outlives the prepare call
609        // both at Vault (zombie token) and, if it had been inserted, locally
610        // (orphan map entry). We therefore keep the token in a local before
611        // inserting and explicitly revoke on every Err exit.
612        let mut token = self.login().await?;
613
614        // Pre-warm: validate that each requested key is readable with this
615        // token. Catching failures here surfaces missing keys / policy gaps
616        // at admission time rather than at first runtime fetch, and it is
617        // also the realistic "subsequent step that fails after lease was
618        // materialised" — the failure mode E2-03 hardens against.
619        for req in requests {
620            if let Err(e) = self.fetch_secret(&token, &req.key).await {
621                // Revoke the upstream lease before propagating the error so
622                // we leave neither a zombie token at Vault nor an orphan
623                // entry in the local lease map (we have not inserted yet).
624                if let Err(revoke_err) = self.revoke_token(&token).await {
625                    tracing::warn!(
626                        cell_id = %cell_id,
627                        revoke_error = %revoke_err,
628                        "failed to revoke partial Vault lease after prepare error; \
629                         upstream may rely on TTL"
630                    );
631                }
632                tracing::warn!(
633                    cell_id = %cell_id,
634                    key = %req.key,
635                    error = %e,
636                    "Vault prepare aborted; partial lease revoked (E2-03)"
637                );
638                token.zeroize();
639                return Err(e);
640            }
641        }
642
643        self.insert_runtime_lease(cell_id, RuntimeVaultLease { token });
644        tracing::info!(
645            cell_id = %cell_id,
646            secret_count = requests.len(),
647            "prepared Vault runtime secret lease"
648        );
649        Ok(())
650    }
651
652    async fn fetch_runtime_secret(
653        &self,
654        key: &str,
655        cell_id: &str,
656        _ttl_seconds: u64,
657    ) -> Result<SecretView, CellosError> {
658        let mut token = self
659            .runtime_leases
660            .lock()
661            .unwrap_or_else(|e| e.into_inner())
662            .get(cell_id)
663            .map(|lease| lease.token.clone())
664            .ok_or_else(|| {
665                CellosError::SecretBroker(format!(
666                    "no prepared Vault runtime lease for cell {cell_id:?}"
667                ))
668            })?;
669
670        let result = self
671            .fetch_secret(&token, key)
672            .await
673            .map(|value| SecretView {
674                key: key.to_string(),
675                value: zeroize::Zeroizing::new(value),
676            });
677        token.zeroize();
678        result
679    }
680
681    async fn revoke_for_cell(&self, cell_id: &str) -> Result<(), CellosError> {
682        let Some(mut lease) = self.take_runtime_lease(cell_id) else {
683            return Ok(());
684        };
685
686        let revoke_result = self.revoke_token(&lease.token).await;
687        lease.zeroize();
688        revoke_result
689    }
690}
691
692#[cfg(test)]
693mod tests {
694    use super::*;
695    use std::io::{BufRead, BufReader, Read, Write};
696    use std::net::{TcpListener, TcpStream};
697    use std::thread;
698    use std::time::{Duration, Instant};
699
700    #[derive(Debug)]
701    struct CapturedRequest {
702        method: String,
703        target: String,
704        token: Option<String>,
705        body: String,
706    }
707
708    fn read_request(stream: &mut TcpStream) -> CapturedRequest {
709        let mut reader = BufReader::new(stream.try_clone().expect("clone stream"));
710        let mut request_line = String::new();
711        reader
712            .read_line(&mut request_line)
713            .expect("read request line");
714        assert!(!request_line.trim().is_empty(), "expected request line");
715
716        let mut content_length = 0usize;
717        let mut token = None;
718        loop {
719            let mut line = String::new();
720            reader.read_line(&mut line).expect("read header");
721            if line == "\r\n" || line.is_empty() {
722                break;
723            }
724            if let Some((name, value)) = line.split_once(':') {
725                let name = name.trim().to_ascii_lowercase();
726                let value = value.trim().to_string();
727                if name == "content-length" {
728                    content_length = value.parse::<usize>().expect("parse content-length");
729                } else if name == "x-vault-token" {
730                    token = Some(value);
731                }
732            }
733        }
734
735        let mut body = vec![0u8; content_length];
736        reader.read_exact(&mut body).expect("read request body");
737
738        let mut parts = request_line.split_whitespace();
739        let method = parts.next().expect("method").to_string();
740        let target = parts.next().expect("target").to_string();
741        CapturedRequest {
742            method,
743            target,
744            token,
745            body: String::from_utf8(body).expect("utf8 request body"),
746        }
747    }
748
749    fn write_response(stream: &mut TcpStream, status_line: &str, body: &str, content_type: &str) {
750        write!(
751            stream,
752            "HTTP/1.1 {status_line}\r\nContent-Type: {content_type}\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{body}",
753            body.len()
754        )
755        .expect("write response");
756        stream.flush().expect("flush response");
757    }
758
759    fn start_mock_vault(
760        expected_requests: usize,
761    ) -> (String, thread::JoinHandle<Vec<CapturedRequest>>) {
762        let listener = TcpListener::bind("127.0.0.1:0").expect("bind mock vault");
763        listener
764            .set_nonblocking(true)
765            .expect("set mock vault nonblocking");
766        let addr = listener.local_addr().expect("mock vault addr");
767        let handle = thread::spawn(move || {
768            let deadline = Instant::now() + Duration::from_secs(10);
769            let mut requests = Vec::new();
770            while requests.len() < expected_requests && Instant::now() < deadline {
771                match listener.accept() {
772                    Ok((mut stream, _)) => {
773                        stream
774                            .set_nonblocking(false)
775                            .expect("set accepted stream blocking");
776                        let request = read_request(&mut stream);
777                        match (request.method.as_str(), request.target.as_str()) {
778                            ("POST", "/v1/auth/approle/login") => write_response(
779                                &mut stream,
780                                "200 OK",
781                                r#"{"auth":{"client_token":"vault-token"}}"#,
782                                "application/json",
783                            ),
784                            ("GET", "/v1/secret/data/API_TOKEN") => write_response(
785                                &mut stream,
786                                "200 OK",
787                                r#"{"data":{"data":{"API_TOKEN":"leased-secret"}}}"#,
788                                "application/json",
789                            ),
790                            ("POST", "/v1/auth/token/revoke-self") => {
791                                write_response(&mut stream, "204 No Content", "", "text/plain")
792                            }
793                            _ => write_response(
794                                &mut stream,
795                                "404 Not Found",
796                                "unexpected request",
797                                "text/plain",
798                            ),
799                        }
800                        requests.push(request);
801                    }
802                    Err(err) if err.kind() == std::io::ErrorKind::WouldBlock => {
803                        thread::sleep(Duration::from_millis(20));
804                    }
805                    Err(err) => panic!("mock vault accept failed: {err}"),
806                }
807            }
808            requests
809        });
810
811        (format!("http://{addr}"), handle)
812    }
813
814    fn set_required_env() {
815        std::env::set_var("CELLOS_VAULT_ADDR", "https://vault.example.com");
816        std::env::set_var("CELLOS_VAULT_ROLE_ID", "test-role-id");
817        std::env::set_var("CELLOS_VAULT_SECRET_ID", "test-secret-id");
818        std::env::remove_var("CELLOS_VAULT_KV_MOUNT");
819        std::env::remove_var("CELLOS_VAULT_KV_PATH_PREFIX");
820        std::env::remove_var("CELLOS_VAULT_NAMESPACE");
821        std::env::remove_var("CELLOS_CA_BUNDLE");
822    }
823
824    fn clear_required_env() {
825        std::env::remove_var("CELLOS_VAULT_ADDR");
826        std::env::remove_var("CELLOS_VAULT_ROLE_ID");
827        std::env::remove_var("CELLOS_VAULT_SECRET_ID");
828    }
829
830    use std::sync::Mutex;
831    static ENV_LOCK: Mutex<()> = Mutex::new(());
832
833    /// Acquire the env lock, recovering from poison (a prior test panicked while holding it).
834    fn env_lock() -> std::sync::MutexGuard<'static, ()> {
835        ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner())
836    }
837
838    #[test]
839    fn constructs_with_required_env() {
840        let _g = env_lock();
841        set_required_env();
842        let broker = VaultAppRoleBroker::from_env();
843        clear_required_env();
844        assert!(broker.is_ok());
845    }
846
847    #[test]
848    fn fails_when_addr_missing() {
849        let _g = env_lock();
850        set_required_env();
851        std::env::remove_var("CELLOS_VAULT_ADDR");
852        let err = VaultAppRoleBroker::from_env().unwrap_err();
853        clear_required_env();
854        assert!(err.to_string().contains("CELLOS_VAULT_ADDR"), "got: {err}");
855    }
856
857    #[test]
858    fn fails_when_role_id_missing() {
859        let _g = env_lock();
860        set_required_env();
861        std::env::remove_var("CELLOS_VAULT_ROLE_ID");
862        let err = VaultAppRoleBroker::from_env().unwrap_err();
863        clear_required_env();
864        assert!(
865            err.to_string().contains("CELLOS_VAULT_ROLE_ID"),
866            "got: {err}"
867        );
868    }
869
870    #[test]
871    fn fails_when_secret_id_missing() {
872        let _g = env_lock();
873        set_required_env();
874        std::env::remove_var("CELLOS_VAULT_SECRET_ID");
875        let err = VaultAppRoleBroker::from_env().unwrap_err();
876        clear_required_env();
877        assert!(
878            err.to_string().contains("CELLOS_VAULT_SECRET_ID"),
879            "got: {err}"
880        );
881    }
882
883    #[test]
884    fn fails_when_addr_not_http() {
885        let _g = env_lock();
886        set_required_env();
887        std::env::set_var("CELLOS_VAULT_ADDR", "grpc://vault.example.com");
888        let err = VaultAppRoleBroker::from_env().unwrap_err();
889        clear_required_env();
890        assert!(err.to_string().contains("http or https"), "got: {err}");
891    }
892
893    #[test]
894    fn uses_default_kv_mount() {
895        let _g = env_lock();
896        set_required_env();
897        let broker = VaultAppRoleBroker::from_env().unwrap();
898        clear_required_env();
899        assert_eq!(broker.kv_mount, "secret");
900    }
901
902    #[test]
903    fn custom_kv_mount_and_prefix() {
904        let _g = env_lock();
905        set_required_env();
906        std::env::set_var("CELLOS_VAULT_KV_MOUNT", "kv");
907        std::env::set_var("CELLOS_VAULT_KV_PATH_PREFIX", "cellos/prod");
908        let broker = VaultAppRoleBroker::from_env().unwrap();
909        clear_required_env();
910        std::env::remove_var("CELLOS_VAULT_KV_MOUNT");
911        std::env::remove_var("CELLOS_VAULT_KV_PATH_PREFIX");
912        assert_eq!(broker.kv_mount, "kv");
913        assert_eq!(broker.kv_path_prefix.as_deref(), Some("cellos/prod"));
914    }
915
916    #[test]
917    fn kv_path_without_prefix() {
918        let _g = env_lock();
919        set_required_env();
920        let broker = VaultAppRoleBroker::from_env().unwrap();
921        clear_required_env();
922        assert_eq!(
923            broker.kv_path("DB_PASSWORD"),
924            "https://vault.example.com/v1/secret/data/DB_PASSWORD"
925        );
926    }
927
928    #[test]
929    fn kv_path_with_prefix() {
930        let _g = env_lock();
931        set_required_env();
932        std::env::set_var("CELLOS_VAULT_KV_MOUNT", "kv");
933        std::env::set_var("CELLOS_VAULT_KV_PATH_PREFIX", "cellos/prod");
934        let broker = VaultAppRoleBroker::from_env().unwrap();
935        std::env::remove_var("CELLOS_VAULT_KV_MOUNT");
936        std::env::remove_var("CELLOS_VAULT_KV_PATH_PREFIX");
937        clear_required_env();
938        assert_eq!(
939            broker.kv_path("DB_PASSWORD"),
940            "https://vault.example.com/v1/kv/data/cellos/prod/DB_PASSWORD"
941        );
942    }
943
944    #[tokio::test]
945    async fn resolve_fails_without_vault_running() {
946        let broker = {
947            let _g = env_lock();
948            set_required_env();
949            // Use localhost on an unbound port — no Vault server running.
950            std::env::set_var("CELLOS_VAULT_ADDR", "http://127.0.0.1:19999");
951            let broker = VaultAppRoleBroker::from_env().unwrap();
952            clear_required_env();
953            broker
954        };
955        let err = broker.resolve("ANY_KEY", "cell-1", 60).await.unwrap_err();
956        assert!(
957            err.to_string().contains("vault approle login"),
958            "got: {err}"
959        );
960    }
961
962    #[tokio::test]
963    async fn runtime_leased_prepare_fetches_and_revokes_token() {
964        // E2-03: prepare pre-warms each requested key, so the expected
965        // sequence is now: login → GET (prewarm) → GET (runtime fetch) →
966        // POST revoke-self = 4 requests.
967        let (addr, server) = start_mock_vault(4);
968        let broker = {
969            let _g = env_lock();
970            set_required_env();
971            std::env::set_var("CELLOS_VAULT_ADDR", addr);
972            let broker = VaultAppRoleBroker::from_env().unwrap();
973            clear_required_env();
974            broker
975        };
976
977        broker
978            .prepare_runtime_secret_lease(
979                "cell-1",
980                &[RuntimeSecretLeaseRequest {
981                    key: "API_TOKEN".into(),
982                    ttl_seconds: 60,
983                }],
984            )
985            .await
986            .unwrap();
987
988        let view = broker
989            .fetch_runtime_secret("API_TOKEN", "cell-1", 60)
990            .await
991            .unwrap();
992        assert_eq!(view.key, "API_TOKEN");
993        assert_eq!(view.value.as_str(), "leased-secret");
994
995        broker.revoke_for_cell("cell-1").await.unwrap();
996        let requests = server.join().expect("join mock vault");
997        assert_eq!(requests.len(), 4);
998        assert_eq!(requests[0].method, "POST");
999        assert_eq!(requests[0].target, "/v1/auth/approle/login");
1000        assert!(requests[0].body.contains("\"role_id\":\"test-role-id\""));
1001        assert!(requests[0]
1002            .body
1003            .contains("\"secret_id\":\"test-secret-id\""));
1004        // prewarm GET
1005        assert_eq!(requests[1].method, "GET");
1006        assert_eq!(requests[1].target, "/v1/secret/data/API_TOKEN");
1007        assert_eq!(requests[1].token.as_deref(), Some("vault-token"));
1008        // runtime fetch GET
1009        assert_eq!(requests[2].method, "GET");
1010        assert_eq!(requests[2].target, "/v1/secret/data/API_TOKEN");
1011        assert_eq!(requests[2].token.as_deref(), Some("vault-token"));
1012        assert_eq!(requests[3].method, "POST");
1013        assert_eq!(requests[3].target, "/v1/auth/token/revoke-self");
1014        assert_eq!(requests[3].token.as_deref(), Some("vault-token"));
1015    }
1016
1017    #[tokio::test]
1018    async fn runtime_leased_fetch_requires_prepared_lease() {
1019        let broker = {
1020            let _g = env_lock();
1021            set_required_env();
1022            let broker = VaultAppRoleBroker::from_env().unwrap();
1023            clear_required_env();
1024            broker
1025        };
1026        let err = broker
1027            .fetch_runtime_secret("API_TOKEN", "missing-cell", 60)
1028            .await
1029            .unwrap_err();
1030        assert!(
1031            err.to_string().contains("no prepared Vault runtime lease"),
1032            "got: {err}"
1033        );
1034    }
1035
1036    #[tokio::test]
1037    async fn revoke_without_prepared_lease_is_ok() {
1038        let broker = {
1039            let _g = env_lock();
1040            set_required_env();
1041            let broker = VaultAppRoleBroker::from_env().unwrap();
1042            clear_required_env();
1043            broker
1044        };
1045        broker.revoke_for_cell("any-cell").await.unwrap();
1046    }
1047
1048    /// VAULT-CT-ZERO: the parse-side `VaultLoginResponse` / `VaultAuth` types
1049    /// must redact `client_token` in their `Debug` representation so a stray
1050    /// `tracing::error!(?response)` cannot leak the token to logs.
1051    #[test]
1052    fn vault_login_response_debug_redacts_client_token() {
1053        let response = VaultLoginResponse {
1054            auth: VaultAuth {
1055                client_token: "VAULT-CT-ZERO-INLINE-SENTINEL".to_string(),
1056            },
1057        };
1058        let dbg = format!("{response:?}");
1059        assert!(
1060            !dbg.contains("VAULT-CT-ZERO-INLINE-SENTINEL"),
1061            "VaultLoginResponse Debug leaked client_token: {dbg}"
1062        );
1063        assert!(
1064            dbg.contains("<redacted>"),
1065            "VaultLoginResponse Debug should mark client_token as redacted: {dbg}"
1066        );
1067
1068        let auth_dbg = format!("{:?}", response.auth);
1069        assert!(
1070            !auth_dbg.contains("VAULT-CT-ZERO-INLINE-SENTINEL"),
1071            "VaultAuth Debug leaked client_token: {auth_dbg}"
1072        );
1073    }
1074}