Skip to main content

vault_client_rs/client/
async_client.rs

1use std::fmt::{self, Write};
2use std::fs;
3use std::sync::{Arc, LazyLock, RwLock};
4use std::time::{Duration, Instant};
5
6use rand::RngExt;
7use reqwest::{Client, Method, Response};
8use secrecy::{ExposeSecret, SecretString};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use url::Url;
12use zeroize::Zeroizing;
13
14use tracing::Instrument;
15
16use crate::api;
17use crate::api::auth::{AuthMethod, AuthMethodDyn};
18use crate::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
19use crate::types::error::VaultError;
20use crate::types::kv::ListResponse;
21use crate::types::response::{AuthInfo, VaultResponse};
22
23const MAX_BACKOFF: Duration = Duration::from_secs(30);
24
25/// HTTP LIST method used by Vault's list endpoints
26static METHOD_LIST: LazyLock<Method> =
27    LazyLock::new(|| Method::from_bytes(b"LIST").expect("LIST is a valid HTTP method"));
28
29/// An asynchronous Vault client
30///
31/// Build instances with [`ClientBuilder`]
32#[derive(Clone)]
33pub struct VaultClient {
34    pub(crate) inner: Arc<VaultClientInner>,
35    pub(crate) namespace_override: Option<String>,
36    pub(crate) wrap_ttl_override: Option<String>,
37}
38
39const _: () = {
40    fn _assert_send_sync<T: Send + Sync>() {}
41    fn _assert() {
42        _assert_send_sync::<VaultClient>();
43    }
44};
45
46impl fmt::Debug for VaultClient {
47    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48        f.debug_struct("VaultClient")
49            .field("base_url", &self.inner.base_url.as_str())
50            .finish_non_exhaustive()
51    }
52}
53
54type TokenChangedCallback = Arc<dyn Fn(&AuthInfo) + Send + Sync>;
55
56pub(crate) struct VaultClientInner {
57    pub(crate) http: Client,
58    pub(crate) base_url: Url,
59    pub(crate) token: RwLock<Option<TokenState>>,
60    pub(crate) namespace: Option<String>,
61    pub(crate) config: ClientConfig,
62    pub(crate) auth_method: Option<Arc<dyn AuthMethodDyn>>,
63    pub(crate) circuit_breaker: Option<CircuitBreaker>,
64    pub(crate) on_token_changed: Option<TokenChangedCallback>,
65}
66
67/// Internal token state; fields beyond `value` are populated by
68/// `update_token_from_auth` and used by the renewal daemon
69pub(crate) struct TokenState {
70    pub value: SecretString,
71    pub expires_at: Option<Instant>,
72    pub renewable: bool,
73    pub lease_duration: Duration,
74}
75
76pub(crate) struct ClientConfig {
77    pub timeout: Duration,
78    pub max_retries: u32,
79    pub initial_retry_delay: Duration,
80    pub wrap_ttl: Option<String>,
81    pub forward_to_leader: bool,
82    pub retry_on_sealed: bool,
83}
84
85impl Default for ClientConfig {
86    fn default() -> Self {
87        Self {
88            timeout: Duration::from_secs(60),
89            max_retries: 2,
90            initial_retry_delay: Duration::from_millis(500),
91            wrap_ttl: None,
92            forward_to_leader: false,
93            retry_on_sealed: true,
94        }
95    }
96}
97
98// ---------------------------------------------------------------------------
99// Builder
100// ---------------------------------------------------------------------------
101
102/// Builder for configuring and constructing a [`VaultClient`]
103#[derive(Default)]
104#[must_use]
105pub struct ClientBuilder {
106    address: Option<String>,
107    token: Option<SecretString>,
108    namespace: Option<String>,
109    timeout: Option<Duration>,
110    max_retries: Option<u32>,
111    initial_retry_delay: Option<Duration>,
112    wrap_ttl: Option<String>,
113    forward_to_leader: bool,
114    danger_disable_tls_verify: bool,
115    ca_cert_pem: Option<Vec<u8>>,
116    client_cert_pem: Option<Vec<u8>>,
117    client_key_pem: Option<Zeroizing<Vec<u8>>>,
118    reqwest_client: Option<Client>,
119    auth_method: Option<Arc<dyn AuthMethodDyn>>,
120    circuit_breaker: Option<CircuitBreakerConfig>,
121    on_token_changed: Option<TokenChangedCallback>,
122    /// When true: max_retries=0 and Sealed is not retried
123    cli_mode: bool,
124}
125
126impl ClientBuilder {
127    /// Pre-populate the builder from `VAULT_*` environment variables;
128    /// token resolution order: `VAULT_TOKEN` → `~/.vault-token` → `None`
129    pub fn from_env() -> Self {
130        let cli_mode = std::env::var("VAULT_CLI_MODE")
131            .ok()
132            .is_some_and(|v| v == "1" || v.eq_ignore_ascii_case("true"));
133
134        let skip_tls = std::env::var("VAULT_SKIP_VERIFY")
135            .ok()
136            .or_else(|| {
137                let v = std::env::var("VAULT_SKIP_TLS_VERIFY").ok();
138                if v.is_some() {
139                    tracing::warn!("VAULT_SKIP_TLS_VERIFY is non-standard; use VAULT_SKIP_VERIFY");
140                }
141                v
142            })
143            .is_some_and(|v| v == "1" || v.eq_ignore_ascii_case("true"));
144
145        Self {
146            address: std::env::var("VAULT_ADDR").ok(),
147            token: std::env::var("VAULT_TOKEN")
148                .ok()
149                .map(SecretString::from)
150                .or_else(read_vault_token_file),
151            namespace: std::env::var("VAULT_NAMESPACE").ok(),
152            timeout: std::env::var("VAULT_CLIENT_TIMEOUT")
153                .ok()
154                .and_then(|v| v.parse().ok())
155                .map(Duration::from_secs),
156            max_retries: if cli_mode {
157                Some(0)
158            } else {
159                std::env::var("VAULT_MAX_RETRIES")
160                    .ok()
161                    .and_then(|v| v.parse().ok())
162            },
163            wrap_ttl: std::env::var("VAULT_WRAP_TTL").ok(),
164            danger_disable_tls_verify: skip_tls,
165            ca_cert_pem: std::env::var("VAULT_CACERT")
166                .ok()
167                .and_then(|path| fs::read(path).ok()),
168            client_cert_pem: std::env::var("VAULT_CLIENT_CERT")
169                .ok()
170                .and_then(|path| fs::read(path).ok()),
171            client_key_pem: std::env::var("VAULT_CLIENT_KEY")
172                .ok()
173                .and_then(|path| fs::read(path).ok().map(Zeroizing::new)),
174            cli_mode,
175            ..Self::default()
176        }
177    }
178
179    pub fn address(mut self, addr: &str) -> Self {
180        self.address = Some(addr.to_owned());
181        self
182    }
183
184    pub fn token(mut self, token: SecretString) -> Self {
185        self.token = Some(token);
186        self
187    }
188
189    pub fn token_str(self, token: &str) -> Self {
190        self.token(SecretString::from(token))
191    }
192
193    pub fn namespace(mut self, ns: &str) -> Self {
194        self.namespace = Some(ns.to_owned());
195        self
196    }
197
198    pub fn timeout(mut self, timeout: Duration) -> Self {
199        self.timeout = Some(timeout);
200        self
201    }
202
203    pub fn max_retries(mut self, n: u32) -> Self {
204        self.max_retries = Some(n);
205        self
206    }
207
208    pub fn initial_retry_delay(mut self, d: Duration) -> Self {
209        self.initial_retry_delay = Some(d);
210        self
211    }
212
213    pub fn wrap_ttl(mut self, ttl: &str) -> Self {
214        self.wrap_ttl = Some(ttl.to_owned());
215        self
216    }
217
218    pub fn forward_to_leader(mut self, yes: bool) -> Self {
219        self.forward_to_leader = yes;
220        self
221    }
222
223    /// Optimise for short-lived CLI invocations
224    ///
225    /// Sets `max_retries(0)` and disables sealed-Vault retries —
226    /// a sealed Vault will not unseal itself between invocations —
227    /// equivalent to `VAULT_CLI_MODE=1` in `from_env()`
228    pub fn cli_mode(mut self, yes: bool) -> Self {
229        if yes {
230            self.max_retries = Some(0);
231        }
232        self.cli_mode = yes;
233        self
234    }
235
236    pub fn danger_disable_tls_verify(mut self, yes: bool) -> Self {
237        self.danger_disable_tls_verify = yes;
238        self
239    }
240
241    pub fn ca_cert_pem(mut self, pem: impl Into<Vec<u8>>) -> Self {
242        self.ca_cert_pem = Some(pem.into());
243        self
244    }
245
246    pub fn client_cert_pem(mut self, cert: impl Into<Vec<u8>>, key: impl Into<Vec<u8>>) -> Self {
247        self.client_cert_pem = Some(cert.into());
248        self.client_key_pem = Some(Zeroizing::new(key.into()));
249        self
250    }
251
252    /// Set an authentication method for automatic token lifecycle management
253    ///
254    /// When set, the client will automatically re-authenticate when the token
255    /// nears expiry or is missing
256    pub fn auth_method(mut self, method: impl AuthMethod + 'static) -> Self {
257        self.auth_method = Some(Arc::new(method));
258        self
259    }
260
261    /// Enable the circuit breaker with the given configuration
262    ///
263    /// When enabled, consecutive failures will trip the circuit and
264    /// short-circuit subsequent requests until the reset timeout elapses
265    pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
266        self.circuit_breaker = Some(config);
267        self
268    }
269
270    /// Register a callback invoked whenever the client's token changes
271    /// via renewal or re-authentication
272    pub fn on_token_changed(mut self, f: impl Fn(&AuthInfo) + Send + Sync + 'static) -> Self {
273        self.on_token_changed = Some(Arc::new(f));
274        self
275    }
276
277    pub fn with_reqwest_client(mut self, client: Client) -> Self {
278        self.reqwest_client = Some(client);
279        self
280    }
281
282    pub fn build(self) -> Result<VaultClient, VaultError> {
283        let addr = self
284            .address
285            .ok_or_else(|| VaultError::Config("address is required".into()))?;
286        let mut base_url =
287            Url::parse(&addr).map_err(|e| VaultError::Config(format!("invalid address: {e}")))?;
288        // Ensure trailing slash so path joins work correctly
289        if !base_url.path().ends_with('/') {
290            base_url.set_path(&format!("{}/", base_url.path()));
291        }
292
293        let config = ClientConfig {
294            timeout: self.timeout.unwrap_or(Duration::from_secs(60)),
295            max_retries: self.max_retries.unwrap_or(2),
296            initial_retry_delay: self
297                .initial_retry_delay
298                .unwrap_or(Duration::from_millis(500)),
299            wrap_ttl: self.wrap_ttl,
300            forward_to_leader: self.forward_to_leader,
301            retry_on_sealed: !self.cli_mode,
302        };
303
304        // Build the HTTP client. We must do this after constructing config
305        // (for timeout) but handle the partial-move by matching reqwest_client
306        // separately: in the None arm we still need &self for TLS fields.
307        let http = if let Some(c) = self.reqwest_client {
308            c
309        } else {
310            build_reqwest_client(
311                &config,
312                self.danger_disable_tls_verify,
313                self.ca_cert_pem.as_deref(),
314                self.client_cert_pem.as_deref(),
315                self.client_key_pem.as_ref().map(|k| k.as_slice()),
316            )?
317        };
318
319        let token_state = self.token.map(|t| TokenState {
320            value: t,
321            expires_at: None,
322            renewable: false,
323            lease_duration: Duration::ZERO,
324        });
325
326        if self.danger_disable_tls_verify {
327            tracing::warn!(
328                vault_address = %base_url,
329                "TLS certificate verification is DISABLED (danger_disable_tls_verify). \
330                 This must not be used in production."
331            );
332        }
333
334        Ok(VaultClient {
335            inner: Arc::new(VaultClientInner {
336                http,
337                base_url,
338                token: RwLock::new(token_state),
339                namespace: self.namespace,
340                config,
341                auth_method: self.auth_method,
342                circuit_breaker: self.circuit_breaker.map(CircuitBreaker::new),
343                on_token_changed: self.on_token_changed,
344            }),
345            namespace_override: None,
346            wrap_ttl_override: None,
347        })
348    }
349}
350
351fn build_reqwest_client(
352    config: &ClientConfig,
353    danger_disable_tls_verify: bool,
354    ca_cert_pem: Option<&[u8]>,
355    client_cert_pem: Option<&[u8]>,
356    client_key_pem: Option<&[u8]>,
357) -> Result<Client, VaultError> {
358    let mut builder = Client::builder()
359        .timeout(config.timeout)
360        .danger_accept_invalid_certs(danger_disable_tls_verify);
361
362    if let Some(ca_pem) = ca_cert_pem {
363        let cert = reqwest::tls::Certificate::from_pem(ca_pem)
364            .map_err(|e| VaultError::Config(format!("CA cert: {e}")))?;
365        builder = builder.add_root_certificate(cert);
366    }
367
368    if let (Some(cert_pem), Some(key_pem)) = (client_cert_pem, client_key_pem) {
369        let mut combined = Zeroizing::new(Vec::with_capacity(cert_pem.len() + key_pem.len()));
370        combined.extend_from_slice(cert_pem);
371        combined.extend_from_slice(key_pem);
372        let identity = reqwest::tls::Identity::from_pem(&combined)
373            .map_err(|e| VaultError::Config(format!("TLS identity: {e}")))?;
374        drop(combined); // zeroize on drop
375        builder = builder.identity(identity);
376    }
377
378    builder
379        .build()
380        .map_err(|e| VaultError::Config(format!("reqwest client: {e}")))
381}
382
383// ---------------------------------------------------------------------------
384// Handler accessors
385// ---------------------------------------------------------------------------
386
387impl VaultClient {
388    /// Create a client with an address and plaintext token
389    ///
390    /// For more options, use [`VaultClient::builder()`]
391    pub fn new(address: &str, token: &str) -> Result<Self, VaultError> {
392        Self::builder().address(address).token_str(token).build()
393    }
394
395    /// Create a client from `VAULT_*` environment variables;
396    /// token resolution order: `VAULT_TOKEN` → `~/.vault-token` → `None`
397    pub fn from_env() -> Result<Self, VaultError> {
398        ClientBuilder::from_env().build()
399    }
400
401    pub fn builder() -> ClientBuilder {
402        ClientBuilder::default()
403    }
404
405    #[must_use]
406    pub fn cubbyhole(&self, mount: &str) -> api::cubbyhole::CubbyholeHandler<'_> {
407        api::cubbyhole::CubbyholeHandler {
408            client: self,
409            mount: encode_path(mount),
410        }
411    }
412
413    #[must_use]
414    pub fn kv1(&self, mount: &str) -> api::kv1::Kv1Handler<'_> {
415        api::kv1::Kv1Handler {
416            client: self,
417            mount: encode_path(mount),
418        }
419    }
420
421    #[must_use]
422    pub fn kv2(&self, mount: &str) -> api::kv2::Kv2Handler<'_> {
423        api::kv2::Kv2Handler {
424            client: self,
425            mount: encode_path(mount),
426        }
427    }
428
429    #[must_use]
430    pub fn transit(&self, mount: &str) -> api::transit::TransitHandler<'_> {
431        api::transit::TransitHandler {
432            client: self,
433            mount: encode_path(mount),
434        }
435    }
436
437    #[must_use]
438    pub fn pki(&self, mount: &str) -> api::pki::PkiHandler<'_> {
439        api::pki::PkiHandler {
440            client: self,
441            mount: encode_path(mount),
442        }
443    }
444
445    #[must_use]
446    pub fn database(&self, mount: &str) -> api::database::DatabaseHandler<'_> {
447        api::database::DatabaseHandler {
448            client: self,
449            mount: encode_path(mount),
450        }
451    }
452
453    #[must_use]
454    pub fn ssh(&self, mount: &str) -> api::ssh::SshHandler<'_> {
455        api::ssh::SshHandler {
456            client: self,
457            mount: encode_path(mount),
458        }
459    }
460
461    #[must_use]
462    pub fn aws_secrets(&self, mount: &str) -> api::aws::AwsSecretsHandler<'_> {
463        api::aws::AwsSecretsHandler {
464            client: self,
465            mount: encode_path(mount),
466        }
467    }
468
469    #[must_use]
470    pub fn totp(&self, mount: &str) -> api::totp::TotpHandler<'_> {
471        api::totp::TotpHandler {
472            client: self,
473            mount: encode_path(mount),
474        }
475    }
476
477    #[must_use]
478    pub fn consul_secrets(&self, mount: &str) -> api::consul::ConsulHandler<'_> {
479        api::consul::ConsulHandler {
480            client: self,
481            mount: encode_path(mount),
482        }
483    }
484
485    #[must_use]
486    pub fn nomad_secrets(&self, mount: &str) -> api::nomad::NomadHandler<'_> {
487        api::nomad::NomadHandler {
488            client: self,
489            mount: encode_path(mount),
490        }
491    }
492
493    #[must_use]
494    pub fn azure_secrets(&self, mount: &str) -> api::azure::AzureHandler<'_> {
495        api::azure::AzureHandler {
496            client: self,
497            mount: encode_path(mount),
498        }
499    }
500
501    #[must_use]
502    pub fn gcp_secrets(&self, mount: &str) -> api::gcp::GcpHandler<'_> {
503        api::gcp::GcpHandler {
504            client: self,
505            mount: encode_path(mount),
506        }
507    }
508
509    #[must_use]
510    pub fn rabbitmq(&self, mount: &str) -> api::rabbitmq::RabbitmqHandler<'_> {
511        api::rabbitmq::RabbitmqHandler {
512            client: self,
513            mount: encode_path(mount),
514        }
515    }
516
517    #[must_use]
518    pub fn terraform_cloud(&self, mount: &str) -> api::terraform::TerraformCloudHandler<'_> {
519        api::terraform::TerraformCloudHandler {
520            client: self,
521            mount: encode_path(mount),
522        }
523    }
524
525    #[must_use]
526    pub fn identity(&self) -> api::identity::IdentityHandler<'_> {
527        api::identity::IdentityHandler { client: self }
528    }
529
530    #[must_use]
531    pub fn sys(&self) -> api::sys::SysHandler<'_> {
532        api::sys::SysHandler { client: self }
533    }
534
535    #[must_use]
536    pub fn auth(&self) -> api::auth::AuthHandler<'_> {
537        api::auth::AuthHandler { client: self }
538    }
539
540    /// Replace the current token at runtime
541    pub fn set_token(&self, token: SecretString) -> Result<(), VaultError> {
542        let mut guard = self
543            .inner
544            .token
545            .write()
546            .map_err(|_| VaultError::LockPoisoned)?;
547        *guard = Some(TokenState {
548            value: token,
549            expires_at: None,
550            renewable: false,
551            lease_duration: Duration::ZERO,
552        });
553        Ok(())
554    }
555
556    /// Return a client view with a different namespace (cheap Arc clone)
557    #[must_use]
558    pub fn with_namespace(&self, ns: &str) -> Self {
559        VaultClient {
560            inner: Arc::clone(&self.inner),
561            namespace_override: Some(ns.to_owned()),
562            wrap_ttl_override: self.wrap_ttl_override.clone(),
563        }
564    }
565
566    /// Return a client view with a different wrap TTL (cheap Arc clone)
567    #[must_use]
568    pub fn with_wrap_ttl(&self, ttl: &str) -> Self {
569        VaultClient {
570            inner: Arc::clone(&self.inner),
571            namespace_override: self.namespace_override.clone(),
572            wrap_ttl_override: Some(ttl.to_owned()),
573        }
574    }
575
576    /// Update internal token state from an auth response
577    pub(crate) fn update_token_from_auth(&self, auth: &AuthInfo) -> Result<(), VaultError> {
578        let mut guard = self
579            .inner
580            .token
581            .write()
582            .map_err(|_| VaultError::LockPoisoned)?;
583        *guard = Some(TokenState {
584            value: auth.client_token.clone(),
585            lease_duration: Duration::from_secs(auth.lease_duration),
586            expires_at: if auth.lease_duration > 0 {
587                Instant::now().checked_add(Duration::from_secs(auth.lease_duration))
588            } else {
589                None
590            },
591            renewable: auth.renewable,
592        });
593        drop(guard);
594
595        if let Some(cb) = &self.inner.on_token_changed {
596            cb(auth);
597        }
598        Ok(())
599    }
600}
601
602// ---------------------------------------------------------------------------
603// Generic escape hatch
604// ---------------------------------------------------------------------------
605
606impl VaultClient {
607    /// Read from an arbitrary Vault path, deserializing the `data` field
608    pub async fn read<T: DeserializeOwned>(&self, path: &str) -> Result<T, VaultError> {
609        self.exec_with_data(Method::GET, path, None).await
610    }
611
612    /// Read from an arbitrary path, returning the full Vault response envelope
613    pub async fn read_raw(
614        &self,
615        path: &str,
616    ) -> Result<VaultResponse<serde_json::Value>, VaultError> {
617        self.exec_with_auth(Method::GET, path, None).await
618    }
619
620    /// Write to an arbitrary Vault path
621    pub async fn write<T: DeserializeOwned>(
622        &self,
623        path: &str,
624        data: &impl Serialize,
625    ) -> Result<VaultResponse<T>, VaultError> {
626        let body = to_body(data)?;
627        self.exec_with_auth(Method::POST, path, Some(&body)).await
628    }
629
630    /// Delete at an arbitrary Vault path
631    pub async fn delete(&self, path: &str) -> Result<(), VaultError> {
632        self.exec_empty(Method::DELETE, path, None).await
633    }
634
635    /// List keys at an arbitrary Vault path
636    pub async fn list(&self, path: &str) -> Result<Vec<String>, VaultError> {
637        self.exec_list(path).await
638    }
639}
640
641// ---------------------------------------------------------------------------
642// Request execution
643// ---------------------------------------------------------------------------
644
645impl VaultClient {
646    pub(crate) async fn exec_with_data<T: DeserializeOwned>(
647        &self,
648        method: Method,
649        path: &str,
650        body: Option<&serde_json::Value>,
651    ) -> Result<T, VaultError> {
652        let resp = self.execute(method, path, body).await?;
653        if resp.status().as_u16() == 404 {
654            return Err(VaultError::NotFound {
655                path: path.to_owned(),
656            });
657        }
658        let envelope: VaultResponse<T> = resp.json().await?;
659        self.log_warnings(&envelope.warnings);
660        envelope.data.ok_or(VaultError::EmptyResponse)
661    }
662
663    pub(crate) async fn exec_with_auth<T: DeserializeOwned>(
664        &self,
665        method: Method,
666        path: &str,
667        body: Option<&serde_json::Value>,
668    ) -> Result<VaultResponse<T>, VaultError> {
669        let resp = self.execute(method, path, body).await?;
670        if resp.status().as_u16() == 404 {
671            return Err(VaultError::NotFound {
672                path: path.to_owned(),
673            });
674        }
675        let envelope: VaultResponse<T> = resp.json().await?;
676        self.log_warnings(&envelope.warnings);
677        Ok(envelope)
678    }
679
680    pub(crate) async fn exec_empty(
681        &self,
682        method: Method,
683        path: &str,
684        body: Option<&serde_json::Value>,
685    ) -> Result<(), VaultError> {
686        let resp = self.execute(method, path, body).await?;
687        if resp.status().as_u16() == 404 {
688            return Err(VaultError::NotFound {
689                path: path.to_owned(),
690            });
691        }
692        Ok(())
693    }
694
695    /// Deserialize response body directly (that is, not through the Vault envelope)
696    ///
697    /// Used for endpoints like /sys/health that return flat JSON
698    pub(crate) async fn exec_direct<T: DeserializeOwned>(
699        &self,
700        method: Method,
701        path: &str,
702        body: Option<&serde_json::Value>,
703    ) -> Result<T, VaultError> {
704        let resp = self.execute(method, path, body).await?;
705        Ok(resp.json().await?)
706    }
707
708    pub(crate) async fn exec_list(&self, path: &str) -> Result<Vec<String>, VaultError> {
709        let resp = self.execute(METHOD_LIST.clone(), path, None).await?;
710        if resp.status().as_u16() == 404 {
711            return Ok(vec![]);
712        }
713        let envelope: VaultResponse<ListResponse> = resp.json().await?;
714        Ok(envelope.data.map(|d| d.keys).unwrap_or_default())
715    }
716
717    pub(crate) async fn exec_patch<T: DeserializeOwned>(
718        &self,
719        path: &str,
720        body: &serde_json::Value,
721    ) -> Result<T, VaultError> {
722        let resp = self.execute(Method::PATCH, path, Some(body)).await?;
723        if resp.status().as_u16() == 404 {
724            return Err(VaultError::NotFound {
725                path: path.to_owned(),
726            });
727        }
728        let envelope: VaultResponse<T> = resp.json().await?;
729        self.log_warnings(&envelope.warnings);
730        envelope.data.ok_or(VaultError::EmptyResponse)
731    }
732
733    fn token_needs_renewal(ts: &TokenState) -> bool {
734        match ts.expires_at {
735            Some(expires) => {
736                let threshold = ts.lease_duration.mul_f64(0.2);
737                Instant::now() + threshold >= expires
738            }
739            None => false, // root token or no expiry
740        }
741    }
742
743    /// Proactively renew or re-authenticate before the token expires
744    ///
745    /// Uses a double-check pattern to avoid redundant renewals under contention;
746    /// all lock guards are dropped before any `.await` to keep futures `Send`
747    async fn ensure_valid_token(&self) -> Result<(), VaultError> {
748        enum Action {
749            Ok,
750            ReAuth,
751            Renew,
752        }
753
754        let action = {
755            let guard = self
756                .inner
757                .token
758                .read()
759                .map_err(|_| VaultError::LockPoisoned)?;
760            match guard.as_ref() {
761                Some(ts) if !Self::token_needs_renewal(ts) => Action::Ok,
762                Some(ts) if ts.renewable => Action::Renew,
763                _ if self.inner.auth_method.is_some() => Action::ReAuth,
764                // No token and no auth method: let the request through.
765                _ => Action::Ok,
766            }
767        }; // guard dropped
768
769        match action {
770            Action::Ok => Ok(()),
771            Action::ReAuth => self.try_re_authenticate().await,
772            Action::Renew => {
773                // Double-check under write lock
774                let still_needed = {
775                    let guard = self
776                        .inner
777                        .token
778                        .write()
779                        .map_err(|_| VaultError::LockPoisoned)?;
780                    guard.as_ref().is_some_and(Self::token_needs_renewal)
781                }; // write lock dropped
782
783                if !still_needed {
784                    return Ok(());
785                }
786
787                // Renew the current token directly via the Vault API.
788                // All locks are released before this await so the future stays `Send`.
789                // Uses execute_raw to bypass ensure_valid_token and avoid recursion.
790                let raw_resp = self
791                    .execute_raw(Method::POST, "auth/token/renew-self", None)
792                    .await?;
793                let resp: VaultResponse<serde_json::Value> = raw_resp.json().await?;
794                if let Some(auth) = resp.auth {
795                    self.update_token_from_auth(&auth)?;
796                }
797                Ok(())
798            }
799        }
800    }
801
802    /// Attempt re-authentication using the configured auth method
803    pub(crate) async fn try_re_authenticate(&self) -> Result<(), VaultError> {
804        match &self.inner.auth_method {
805            Some(method) => {
806                let auth = method.login_dyn(self).await?;
807                self.update_token_from_auth(&auth)?;
808                Ok(())
809            }
810            None => Err(VaultError::AuthRequired),
811        }
812    }
813
814    pub(crate) async fn execute(
815        &self,
816        method: Method,
817        path: &str,
818        body: Option<&serde_json::Value>,
819    ) -> Result<Response, VaultError> {
820        // Skip token lifecycle only for login endpoints to avoid infinite
821        // recursion (the call chain is: `try_re_authenticate` => `login_dyn` => `execute` => `ensure_valid_token`)
822        let is_login = path.starts_with("auth/") && path.contains("/login");
823        if !is_login {
824            self.ensure_valid_token().await?;
825        }
826        self.execute_raw(method, path, body).await
827    }
828
829    /// Low-level execute that bypasses token lifecycle, used internally by
830    /// `ensure_valid_token` to avoid recursion
831    pub(crate) async fn execute_raw(
832        &self,
833        method: Method,
834        path: &str,
835        body: Option<&serde_json::Value>,
836    ) -> Result<Response, VaultError> {
837        let span = tracing::info_span!(
838            "vault.request",
839            http.method = %method,
840            vault.path = %path,
841            http.status_code = tracing::field::Empty,
842        );
843
844        async {
845            if let Some(cb) = &self.inner.circuit_breaker {
846                cb.check()?;
847            }
848
849            let url_str = format!("{}v1/{}", self.inner.base_url, path.trim_start_matches('/'));
850            let url = Url::parse(&url_str)?;
851
852            let mut req = self
853                .inner
854                .http
855                .request(method.clone(), url.clone())
856                .header("X-Vault-Request", "true");
857
858            if method == Method::PATCH {
859                req = req.header("Content-Type", "application/merge-patch+json");
860            }
861
862            req = self.inject_headers(req)?;
863
864            if let Some(body) = body {
865                req = req.json(body);
866            }
867
868            match self.send_with_retry(req, &url, &method).await {
869                Ok(resp) => {
870                    if let Some(cb) = &self.inner.circuit_breaker {
871                        cb.record_success();
872                    }
873                    tracing::Span::current().record("http.status_code", resp.status().as_u16());
874                    tracing::debug!(status = resp.status().as_u16(), "vault response");
875                    Ok(resp)
876                }
877                Err(e) => {
878                    if let Some(cb) = &self.inner.circuit_breaker {
879                        cb.record_failure();
880                    }
881                    Err(e)
882                }
883            }
884        }
885        .instrument(span)
886        .await
887    }
888
889    pub(crate) fn inject_headers(
890        &self,
891        mut req: reqwest::RequestBuilder,
892    ) -> Result<reqwest::RequestBuilder, VaultError> {
893        let guard = self
894            .inner
895            .token
896            .read()
897            .map_err(|_| VaultError::LockPoisoned)?;
898        if let Some(ts) = guard.as_ref() {
899            req = req.header("X-Vault-Token", ts.value.expose_secret());
900        }
901        drop(guard);
902
903        let ns = self
904            .namespace_override
905            .as_deref()
906            .or(self.inner.namespace.as_deref());
907        if let Some(ns) = ns {
908            req = req.header("X-Vault-Namespace", ns);
909        }
910        let ttl = self
911            .wrap_ttl_override
912            .as_deref()
913            .or(self.inner.config.wrap_ttl.as_deref());
914        if let Some(ttl) = ttl {
915            req = req.header("X-Vault-Wrap-TTL", ttl);
916        }
917        if self.inner.config.forward_to_leader {
918            req = req.header("X-Vault-Forward", "active-node");
919        }
920        Ok(req)
921    }
922
923    async fn send_with_retry(
924        &self,
925        builder: reqwest::RequestBuilder,
926        url: &Url,
927        method: &Method,
928    ) -> Result<Response, VaultError> {
929        let max = self.inner.config.max_retries;
930        let mut skip_backoff = false;
931
932        for attempt in 0..=max {
933            if attempt > 0 && !skip_backoff {
934                let base = self
935                    .inner
936                    .config
937                    .initial_retry_delay
938                    .checked_mul(2u32.saturating_pow(attempt - 1))
939                    .unwrap_or(MAX_BACKOFF);
940                let capped = base.min(MAX_BACKOFF);
941                let capped_ms = u64::try_from(capped.as_millis()).unwrap_or(u64::MAX).max(1);
942                let delay = Duration::from_millis(rand::rng().random_range(0u64..capped_ms));
943                tracing::warn!(attempt, max, %url, %method, ?delay, "retrying");
944                tokio::time::sleep(delay).await;
945            }
946            skip_backoff = false;
947
948            let req = match builder.try_clone() {
949                Some(r) => r,
950                None => {
951                    return Err(VaultError::Config(
952                        "request body not cloneable (stream body?)".into(),
953                    ));
954                }
955            };
956
957            match req.send().await {
958                Ok(resp) => {
959                    let status = resp.status().as_u16();
960                    match status {
961                        200..=299 | 404 => return Ok(resp),
962                        401 => {
963                            return Err(VaultError::AuthRequired);
964                        }
965                        403 => {
966                            let errors = Self::extract_errors(resp).await;
967                            return Err(VaultError::PermissionDenied { errors });
968                        }
969                        429 => {
970                            let retry_after = resp
971                                .headers()
972                                .get("Retry-After")
973                                .and_then(|v| v.to_str().ok())
974                                .and_then(|v| v.parse::<u64>().ok());
975                            if attempt >= max {
976                                return Err(VaultError::RateLimited { retry_after });
977                            }
978                            if let Some(secs) = retry_after {
979                                let capped = Duration::from_secs(secs).min(MAX_BACKOFF);
980                                tokio::time::sleep(capped).await;
981                                skip_backoff = true;
982                            }
983                            continue;
984                        }
985                        412 => {
986                            if attempt >= max {
987                                return Err(VaultError::ConsistencyRetry);
988                            }
989                            continue;
990                        }
991                        503 => {
992                            let e = VaultError::Sealed {
993                                url: url.to_string(),
994                            };
995                            if attempt >= max || !self.inner.config.retry_on_sealed {
996                                return Err(e);
997                            }
998                            continue;
999                        }
1000                        _ => {
1001                            let errors = Self::extract_errors(resp).await;
1002                            let err = VaultError::Api { status, errors };
1003                            if err.is_retryable() && attempt < max {
1004                                continue;
1005                            }
1006                            return Err(err);
1007                        }
1008                    }
1009                }
1010                Err(e) if (e.is_timeout() || e.is_connect()) && attempt < max => {
1011                    continue;
1012                }
1013                Err(e) => return Err(VaultError::Http(e)),
1014            }
1015        }
1016
1017        unreachable!("retry loop always returns from within")
1018    }
1019
1020    async fn extract_errors(resp: Response) -> Vec<String> {
1021        let body = resp.text().await.unwrap_or_default();
1022        serde_json::from_str::<serde_json::Value>(&body)
1023            .ok()
1024            .and_then(|v| v.get("errors")?.as_array().cloned())
1025            .map(|arr| {
1026                arr.into_iter()
1027                    .filter_map(|v| v.as_str().map(String::from))
1028                    .collect()
1029            })
1030            .unwrap_or_else(|| if body.is_empty() { vec![] } else { vec![body] })
1031    }
1032
1033    fn log_warnings(&self, warnings: &Option<Vec<String>>) {
1034        if let Some(warns) = warnings {
1035            for w in warns {
1036                tracing::debug!(warning = %w, "vault response warning");
1037            }
1038        }
1039    }
1040}
1041
1042/// Serialize a value to `serde_json::Value`, mapping errors to `VaultError::Config`
1043pub(crate) fn to_body(value: &impl Serialize) -> Result<serde_json::Value, VaultError> {
1044    serde_json::to_value(value).map_err(|e| VaultError::Config(format!("serialize: {e}")))
1045}
1046
1047/// Read `~/.vault-token`, mirroring the Vault CLI token helper
1048///
1049/// Returns `None` on any I/O error or empty file. Trims trailing whitespace —
1050/// `vault login` writes a trailing newline
1051fn read_vault_token_file() -> Option<SecretString> {
1052    let path = home::home_dir()?.join(".vault-token");
1053    let raw = fs::read_to_string(path).ok()?;
1054    let trimmed = raw.trim();
1055    if trimmed.is_empty() {
1056        None
1057    } else {
1058        Some(SecretString::from(trimmed))
1059    }
1060}
1061
1062/// Percent-encode characters in a path segment that would cause URL parsing issues
1063///
1064/// Preserves `/` as path separators; encodes `?`, `#`, `%`, spaces, and control chars
1065pub fn encode_path(raw: &str) -> String {
1066    let mut out = String::with_capacity(raw.len());
1067    for &byte in raw.as_bytes() {
1068        match byte {
1069            b'?' | b'#' | b'%' | b' ' | b'[' | b']' | 0..=0x1F | 0x7F | 0x80..=0xFF => {
1070                write!(out, "%{byte:02X}").unwrap();
1071            }
1072            _ => out.push(byte as char),
1073        }
1074    }
1075    out
1076}