1use std::fmt::{self, Write};
2use std::fs;
3use std::sync::{Arc, LazyLock, RwLock};
4use std::time::{Duration, Instant};
5
6use rand::RngExt;
7use reqwest::{Client, Method, Response};
8use secrecy::{ExposeSecret, SecretString};
9use serde::Serialize;
10use serde::de::DeserializeOwned;
11use url::Url;
12use zeroize::Zeroizing;
13
14use tracing::Instrument;
15
16use crate::api;
17use crate::api::auth::{AuthMethod, AuthMethodDyn};
18use crate::circuit_breaker::{CircuitBreaker, CircuitBreakerConfig};
19use crate::types::error::VaultError;
20use crate::types::kv::ListResponse;
21use crate::types::response::{AuthInfo, VaultResponse};
22
23const MAX_BACKOFF: Duration = Duration::from_secs(30);
24
25static METHOD_LIST: LazyLock<Method> =
27 LazyLock::new(|| Method::from_bytes(b"LIST").expect("LIST is a valid HTTP method"));
28
29#[derive(Clone)]
33pub struct VaultClient {
34 pub(crate) inner: Arc<VaultClientInner>,
35 pub(crate) namespace_override: Option<String>,
36 pub(crate) wrap_ttl_override: Option<String>,
37}
38
39const _: () = {
40 fn _assert_send_sync<T: Send + Sync>() {}
41 fn _assert() {
42 _assert_send_sync::<VaultClient>();
43 }
44};
45
46impl fmt::Debug for VaultClient {
47 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
48 f.debug_struct("VaultClient")
49 .field("base_url", &self.inner.base_url.as_str())
50 .finish_non_exhaustive()
51 }
52}
53
54type TokenChangedCallback = Arc<dyn Fn(&AuthInfo) + Send + Sync>;
55
56pub(crate) struct VaultClientInner {
57 pub(crate) http: Client,
58 pub(crate) base_url: Url,
59 pub(crate) token: RwLock<Option<TokenState>>,
60 pub(crate) namespace: Option<String>,
61 pub(crate) config: ClientConfig,
62 pub(crate) auth_method: Option<Arc<dyn AuthMethodDyn>>,
63 pub(crate) circuit_breaker: Option<CircuitBreaker>,
64 pub(crate) on_token_changed: Option<TokenChangedCallback>,
65}
66
67pub(crate) struct TokenState {
70 pub value: SecretString,
71 pub expires_at: Option<Instant>,
72 pub renewable: bool,
73 pub lease_duration: Duration,
74}
75
76pub(crate) struct ClientConfig {
77 pub timeout: Duration,
78 pub max_retries: u32,
79 pub initial_retry_delay: Duration,
80 pub wrap_ttl: Option<String>,
81 pub forward_to_leader: bool,
82 pub retry_on_sealed: bool,
83}
84
85impl Default for ClientConfig {
86 fn default() -> Self {
87 Self {
88 timeout: Duration::from_secs(60),
89 max_retries: 2,
90 initial_retry_delay: Duration::from_millis(500),
91 wrap_ttl: None,
92 forward_to_leader: false,
93 retry_on_sealed: true,
94 }
95 }
96}
97
98#[derive(Default)]
104#[must_use]
105pub struct ClientBuilder {
106 address: Option<String>,
107 token: Option<SecretString>,
108 namespace: Option<String>,
109 timeout: Option<Duration>,
110 max_retries: Option<u32>,
111 initial_retry_delay: Option<Duration>,
112 wrap_ttl: Option<String>,
113 forward_to_leader: bool,
114 danger_disable_tls_verify: bool,
115 ca_cert_pem: Option<Vec<u8>>,
116 client_cert_pem: Option<Vec<u8>>,
117 client_key_pem: Option<Zeroizing<Vec<u8>>>,
118 reqwest_client: Option<Client>,
119 auth_method: Option<Arc<dyn AuthMethodDyn>>,
120 circuit_breaker: Option<CircuitBreakerConfig>,
121 on_token_changed: Option<TokenChangedCallback>,
122 cli_mode: bool,
124}
125
126impl ClientBuilder {
127 pub fn from_env() -> Self {
130 let cli_mode = std::env::var("VAULT_CLI_MODE")
131 .ok()
132 .is_some_and(|v| v == "1" || v.eq_ignore_ascii_case("true"));
133
134 let skip_tls = std::env::var("VAULT_SKIP_VERIFY")
135 .ok()
136 .or_else(|| {
137 let v = std::env::var("VAULT_SKIP_TLS_VERIFY").ok();
138 if v.is_some() {
139 tracing::warn!("VAULT_SKIP_TLS_VERIFY is non-standard; use VAULT_SKIP_VERIFY");
140 }
141 v
142 })
143 .is_some_and(|v| v == "1" || v.eq_ignore_ascii_case("true"));
144
145 Self {
146 address: std::env::var("VAULT_ADDR").ok(),
147 token: std::env::var("VAULT_TOKEN")
148 .ok()
149 .map(SecretString::from)
150 .or_else(read_vault_token_file),
151 namespace: std::env::var("VAULT_NAMESPACE").ok(),
152 timeout: std::env::var("VAULT_CLIENT_TIMEOUT")
153 .ok()
154 .and_then(|v| v.parse().ok())
155 .map(Duration::from_secs),
156 max_retries: if cli_mode {
157 Some(0)
158 } else {
159 std::env::var("VAULT_MAX_RETRIES")
160 .ok()
161 .and_then(|v| v.parse().ok())
162 },
163 wrap_ttl: std::env::var("VAULT_WRAP_TTL").ok(),
164 danger_disable_tls_verify: skip_tls,
165 ca_cert_pem: std::env::var("VAULT_CACERT")
166 .ok()
167 .and_then(|path| fs::read(path).ok()),
168 client_cert_pem: std::env::var("VAULT_CLIENT_CERT")
169 .ok()
170 .and_then(|path| fs::read(path).ok()),
171 client_key_pem: std::env::var("VAULT_CLIENT_KEY")
172 .ok()
173 .and_then(|path| fs::read(path).ok().map(Zeroizing::new)),
174 cli_mode,
175 ..Self::default()
176 }
177 }
178
179 pub fn address(mut self, addr: &str) -> Self {
180 self.address = Some(addr.to_owned());
181 self
182 }
183
184 pub fn token(mut self, token: SecretString) -> Self {
185 self.token = Some(token);
186 self
187 }
188
189 pub fn token_str(self, token: &str) -> Self {
190 self.token(SecretString::from(token))
191 }
192
193 pub fn namespace(mut self, ns: &str) -> Self {
194 self.namespace = Some(ns.to_owned());
195 self
196 }
197
198 pub fn timeout(mut self, timeout: Duration) -> Self {
199 self.timeout = Some(timeout);
200 self
201 }
202
203 pub fn max_retries(mut self, n: u32) -> Self {
204 self.max_retries = Some(n);
205 self
206 }
207
208 pub fn initial_retry_delay(mut self, d: Duration) -> Self {
209 self.initial_retry_delay = Some(d);
210 self
211 }
212
213 pub fn wrap_ttl(mut self, ttl: &str) -> Self {
214 self.wrap_ttl = Some(ttl.to_owned());
215 self
216 }
217
218 pub fn forward_to_leader(mut self, yes: bool) -> Self {
219 self.forward_to_leader = yes;
220 self
221 }
222
223 pub fn cli_mode(mut self, yes: bool) -> Self {
229 if yes {
230 self.max_retries = Some(0);
231 }
232 self.cli_mode = yes;
233 self
234 }
235
236 pub fn danger_disable_tls_verify(mut self, yes: bool) -> Self {
237 self.danger_disable_tls_verify = yes;
238 self
239 }
240
241 pub fn ca_cert_pem(mut self, pem: impl Into<Vec<u8>>) -> Self {
242 self.ca_cert_pem = Some(pem.into());
243 self
244 }
245
246 pub fn client_cert_pem(mut self, cert: impl Into<Vec<u8>>, key: impl Into<Vec<u8>>) -> Self {
247 self.client_cert_pem = Some(cert.into());
248 self.client_key_pem = Some(Zeroizing::new(key.into()));
249 self
250 }
251
252 pub fn auth_method(mut self, method: impl AuthMethod + 'static) -> Self {
257 self.auth_method = Some(Arc::new(method));
258 self
259 }
260
261 pub fn circuit_breaker(mut self, config: CircuitBreakerConfig) -> Self {
266 self.circuit_breaker = Some(config);
267 self
268 }
269
270 pub fn on_token_changed(mut self, f: impl Fn(&AuthInfo) + Send + Sync + 'static) -> Self {
273 self.on_token_changed = Some(Arc::new(f));
274 self
275 }
276
277 pub fn with_reqwest_client(mut self, client: Client) -> Self {
278 self.reqwest_client = Some(client);
279 self
280 }
281
282 pub fn build(self) -> Result<VaultClient, VaultError> {
283 let addr = self
284 .address
285 .ok_or_else(|| VaultError::Config("address is required".into()))?;
286 let mut base_url =
287 Url::parse(&addr).map_err(|e| VaultError::Config(format!("invalid address: {e}")))?;
288 if !base_url.path().ends_with('/') {
290 base_url.set_path(&format!("{}/", base_url.path()));
291 }
292
293 let config = ClientConfig {
294 timeout: self.timeout.unwrap_or(Duration::from_secs(60)),
295 max_retries: self.max_retries.unwrap_or(2),
296 initial_retry_delay: self
297 .initial_retry_delay
298 .unwrap_or(Duration::from_millis(500)),
299 wrap_ttl: self.wrap_ttl,
300 forward_to_leader: self.forward_to_leader,
301 retry_on_sealed: !self.cli_mode,
302 };
303
304 let http = if let Some(c) = self.reqwest_client {
308 c
309 } else {
310 build_reqwest_client(
311 &config,
312 self.danger_disable_tls_verify,
313 self.ca_cert_pem.as_deref(),
314 self.client_cert_pem.as_deref(),
315 self.client_key_pem.as_ref().map(|k| k.as_slice()),
316 )?
317 };
318
319 let token_state = self.token.map(|t| TokenState {
320 value: t,
321 expires_at: None,
322 renewable: false,
323 lease_duration: Duration::ZERO,
324 });
325
326 if self.danger_disable_tls_verify {
327 tracing::warn!(
328 vault_address = %base_url,
329 "TLS certificate verification is DISABLED (danger_disable_tls_verify). \
330 This must not be used in production."
331 );
332 }
333
334 Ok(VaultClient {
335 inner: Arc::new(VaultClientInner {
336 http,
337 base_url,
338 token: RwLock::new(token_state),
339 namespace: self.namespace,
340 config,
341 auth_method: self.auth_method,
342 circuit_breaker: self.circuit_breaker.map(CircuitBreaker::new),
343 on_token_changed: self.on_token_changed,
344 }),
345 namespace_override: None,
346 wrap_ttl_override: None,
347 })
348 }
349}
350
351fn build_reqwest_client(
352 config: &ClientConfig,
353 danger_disable_tls_verify: bool,
354 ca_cert_pem: Option<&[u8]>,
355 client_cert_pem: Option<&[u8]>,
356 client_key_pem: Option<&[u8]>,
357) -> Result<Client, VaultError> {
358 let mut builder = Client::builder()
359 .timeout(config.timeout)
360 .danger_accept_invalid_certs(danger_disable_tls_verify);
361
362 if let Some(ca_pem) = ca_cert_pem {
363 let cert = reqwest::tls::Certificate::from_pem(ca_pem)
364 .map_err(|e| VaultError::Config(format!("CA cert: {e}")))?;
365 builder = builder.add_root_certificate(cert);
366 }
367
368 if let (Some(cert_pem), Some(key_pem)) = (client_cert_pem, client_key_pem) {
369 let mut combined = Zeroizing::new(Vec::with_capacity(cert_pem.len() + key_pem.len()));
370 combined.extend_from_slice(cert_pem);
371 combined.extend_from_slice(key_pem);
372 let identity = reqwest::tls::Identity::from_pem(&combined)
373 .map_err(|e| VaultError::Config(format!("TLS identity: {e}")))?;
374 drop(combined); builder = builder.identity(identity);
376 }
377
378 builder
379 .build()
380 .map_err(|e| VaultError::Config(format!("reqwest client: {e}")))
381}
382
383impl VaultClient {
388 pub fn new(address: &str, token: &str) -> Result<Self, VaultError> {
392 Self::builder().address(address).token_str(token).build()
393 }
394
395 pub fn from_env() -> Result<Self, VaultError> {
398 ClientBuilder::from_env().build()
399 }
400
401 pub fn builder() -> ClientBuilder {
402 ClientBuilder::default()
403 }
404
405 #[must_use]
406 pub fn cubbyhole(&self, mount: &str) -> api::cubbyhole::CubbyholeHandler<'_> {
407 api::cubbyhole::CubbyholeHandler {
408 client: self,
409 mount: encode_path(mount),
410 }
411 }
412
413 #[must_use]
414 pub fn kv1(&self, mount: &str) -> api::kv1::Kv1Handler<'_> {
415 api::kv1::Kv1Handler {
416 client: self,
417 mount: encode_path(mount),
418 }
419 }
420
421 #[must_use]
422 pub fn kv2(&self, mount: &str) -> api::kv2::Kv2Handler<'_> {
423 api::kv2::Kv2Handler {
424 client: self,
425 mount: encode_path(mount),
426 }
427 }
428
429 #[must_use]
430 pub fn transit(&self, mount: &str) -> api::transit::TransitHandler<'_> {
431 api::transit::TransitHandler {
432 client: self,
433 mount: encode_path(mount),
434 }
435 }
436
437 #[must_use]
438 pub fn pki(&self, mount: &str) -> api::pki::PkiHandler<'_> {
439 api::pki::PkiHandler {
440 client: self,
441 mount: encode_path(mount),
442 }
443 }
444
445 #[must_use]
446 pub fn database(&self, mount: &str) -> api::database::DatabaseHandler<'_> {
447 api::database::DatabaseHandler {
448 client: self,
449 mount: encode_path(mount),
450 }
451 }
452
453 #[must_use]
454 pub fn ssh(&self, mount: &str) -> api::ssh::SshHandler<'_> {
455 api::ssh::SshHandler {
456 client: self,
457 mount: encode_path(mount),
458 }
459 }
460
461 #[must_use]
462 pub fn aws_secrets(&self, mount: &str) -> api::aws::AwsSecretsHandler<'_> {
463 api::aws::AwsSecretsHandler {
464 client: self,
465 mount: encode_path(mount),
466 }
467 }
468
469 #[must_use]
470 pub fn totp(&self, mount: &str) -> api::totp::TotpHandler<'_> {
471 api::totp::TotpHandler {
472 client: self,
473 mount: encode_path(mount),
474 }
475 }
476
477 #[must_use]
478 pub fn consul_secrets(&self, mount: &str) -> api::consul::ConsulHandler<'_> {
479 api::consul::ConsulHandler {
480 client: self,
481 mount: encode_path(mount),
482 }
483 }
484
485 #[must_use]
486 pub fn nomad_secrets(&self, mount: &str) -> api::nomad::NomadHandler<'_> {
487 api::nomad::NomadHandler {
488 client: self,
489 mount: encode_path(mount),
490 }
491 }
492
493 #[must_use]
494 pub fn azure_secrets(&self, mount: &str) -> api::azure::AzureHandler<'_> {
495 api::azure::AzureHandler {
496 client: self,
497 mount: encode_path(mount),
498 }
499 }
500
501 #[must_use]
502 pub fn gcp_secrets(&self, mount: &str) -> api::gcp::GcpHandler<'_> {
503 api::gcp::GcpHandler {
504 client: self,
505 mount: encode_path(mount),
506 }
507 }
508
509 #[must_use]
510 pub fn rabbitmq(&self, mount: &str) -> api::rabbitmq::RabbitmqHandler<'_> {
511 api::rabbitmq::RabbitmqHandler {
512 client: self,
513 mount: encode_path(mount),
514 }
515 }
516
517 #[must_use]
518 pub fn terraform_cloud(&self, mount: &str) -> api::terraform::TerraformCloudHandler<'_> {
519 api::terraform::TerraformCloudHandler {
520 client: self,
521 mount: encode_path(mount),
522 }
523 }
524
525 #[must_use]
526 pub fn identity(&self) -> api::identity::IdentityHandler<'_> {
527 api::identity::IdentityHandler { client: self }
528 }
529
530 #[must_use]
531 pub fn sys(&self) -> api::sys::SysHandler<'_> {
532 api::sys::SysHandler { client: self }
533 }
534
535 #[must_use]
536 pub fn auth(&self) -> api::auth::AuthHandler<'_> {
537 api::auth::AuthHandler { client: self }
538 }
539
540 pub fn set_token(&self, token: SecretString) -> Result<(), VaultError> {
542 let mut guard = self
543 .inner
544 .token
545 .write()
546 .map_err(|_| VaultError::LockPoisoned)?;
547 *guard = Some(TokenState {
548 value: token,
549 expires_at: None,
550 renewable: false,
551 lease_duration: Duration::ZERO,
552 });
553 Ok(())
554 }
555
556 #[must_use]
558 pub fn with_namespace(&self, ns: &str) -> Self {
559 VaultClient {
560 inner: Arc::clone(&self.inner),
561 namespace_override: Some(ns.to_owned()),
562 wrap_ttl_override: self.wrap_ttl_override.clone(),
563 }
564 }
565
566 #[must_use]
568 pub fn with_wrap_ttl(&self, ttl: &str) -> Self {
569 VaultClient {
570 inner: Arc::clone(&self.inner),
571 namespace_override: self.namespace_override.clone(),
572 wrap_ttl_override: Some(ttl.to_owned()),
573 }
574 }
575
576 pub(crate) fn update_token_from_auth(&self, auth: &AuthInfo) -> Result<(), VaultError> {
578 let mut guard = self
579 .inner
580 .token
581 .write()
582 .map_err(|_| VaultError::LockPoisoned)?;
583 *guard = Some(TokenState {
584 value: auth.client_token.clone(),
585 lease_duration: Duration::from_secs(auth.lease_duration),
586 expires_at: if auth.lease_duration > 0 {
587 Instant::now().checked_add(Duration::from_secs(auth.lease_duration))
588 } else {
589 None
590 },
591 renewable: auth.renewable,
592 });
593 drop(guard);
594
595 if let Some(cb) = &self.inner.on_token_changed {
596 cb(auth);
597 }
598 Ok(())
599 }
600}
601
602impl VaultClient {
607 pub async fn read<T: DeserializeOwned>(&self, path: &str) -> Result<T, VaultError> {
609 self.exec_with_data(Method::GET, path, None).await
610 }
611
612 pub async fn read_raw(
614 &self,
615 path: &str,
616 ) -> Result<VaultResponse<serde_json::Value>, VaultError> {
617 self.exec_with_auth(Method::GET, path, None).await
618 }
619
620 pub async fn write<T: DeserializeOwned>(
622 &self,
623 path: &str,
624 data: &impl Serialize,
625 ) -> Result<VaultResponse<T>, VaultError> {
626 let body = to_body(data)?;
627 self.exec_with_auth(Method::POST, path, Some(&body)).await
628 }
629
630 pub async fn delete(&self, path: &str) -> Result<(), VaultError> {
632 self.exec_empty(Method::DELETE, path, None).await
633 }
634
635 pub async fn list(&self, path: &str) -> Result<Vec<String>, VaultError> {
637 self.exec_list(path).await
638 }
639}
640
641impl VaultClient {
646 pub(crate) async fn exec_with_data<T: DeserializeOwned>(
647 &self,
648 method: Method,
649 path: &str,
650 body: Option<&serde_json::Value>,
651 ) -> Result<T, VaultError> {
652 let resp = self.execute(method, path, body).await?;
653 if resp.status().as_u16() == 404 {
654 return Err(VaultError::NotFound {
655 path: path.to_owned(),
656 });
657 }
658 let envelope: VaultResponse<T> = resp.json().await?;
659 self.log_warnings(&envelope.warnings);
660 envelope.data.ok_or(VaultError::EmptyResponse)
661 }
662
663 pub(crate) async fn exec_with_auth<T: DeserializeOwned>(
664 &self,
665 method: Method,
666 path: &str,
667 body: Option<&serde_json::Value>,
668 ) -> Result<VaultResponse<T>, VaultError> {
669 let resp = self.execute(method, path, body).await?;
670 if resp.status().as_u16() == 404 {
671 return Err(VaultError::NotFound {
672 path: path.to_owned(),
673 });
674 }
675 let envelope: VaultResponse<T> = resp.json().await?;
676 self.log_warnings(&envelope.warnings);
677 Ok(envelope)
678 }
679
680 pub(crate) async fn exec_empty(
681 &self,
682 method: Method,
683 path: &str,
684 body: Option<&serde_json::Value>,
685 ) -> Result<(), VaultError> {
686 let resp = self.execute(method, path, body).await?;
687 if resp.status().as_u16() == 404 {
688 return Err(VaultError::NotFound {
689 path: path.to_owned(),
690 });
691 }
692 Ok(())
693 }
694
695 pub(crate) async fn exec_direct<T: DeserializeOwned>(
699 &self,
700 method: Method,
701 path: &str,
702 body: Option<&serde_json::Value>,
703 ) -> Result<T, VaultError> {
704 let resp = self.execute(method, path, body).await?;
705 Ok(resp.json().await?)
706 }
707
708 pub(crate) async fn exec_list(&self, path: &str) -> Result<Vec<String>, VaultError> {
709 let resp = self.execute(METHOD_LIST.clone(), path, None).await?;
710 if resp.status().as_u16() == 404 {
711 return Ok(vec![]);
712 }
713 let envelope: VaultResponse<ListResponse> = resp.json().await?;
714 Ok(envelope.data.map(|d| d.keys).unwrap_or_default())
715 }
716
717 pub(crate) async fn exec_patch<T: DeserializeOwned>(
718 &self,
719 path: &str,
720 body: &serde_json::Value,
721 ) -> Result<T, VaultError> {
722 let resp = self.execute(Method::PATCH, path, Some(body)).await?;
723 if resp.status().as_u16() == 404 {
724 return Err(VaultError::NotFound {
725 path: path.to_owned(),
726 });
727 }
728 let envelope: VaultResponse<T> = resp.json().await?;
729 self.log_warnings(&envelope.warnings);
730 envelope.data.ok_or(VaultError::EmptyResponse)
731 }
732
733 fn token_needs_renewal(ts: &TokenState) -> bool {
734 match ts.expires_at {
735 Some(expires) => {
736 let threshold = ts.lease_duration.mul_f64(0.2);
737 Instant::now() + threshold >= expires
738 }
739 None => false, }
741 }
742
743 async fn ensure_valid_token(&self) -> Result<(), VaultError> {
748 enum Action {
749 Ok,
750 ReAuth,
751 Renew,
752 }
753
754 let action = {
755 let guard = self
756 .inner
757 .token
758 .read()
759 .map_err(|_| VaultError::LockPoisoned)?;
760 match guard.as_ref() {
761 Some(ts) if !Self::token_needs_renewal(ts) => Action::Ok,
762 Some(ts) if ts.renewable => Action::Renew,
763 _ if self.inner.auth_method.is_some() => Action::ReAuth,
764 _ => Action::Ok,
766 }
767 }; match action {
770 Action::Ok => Ok(()),
771 Action::ReAuth => self.try_re_authenticate().await,
772 Action::Renew => {
773 let still_needed = {
775 let guard = self
776 .inner
777 .token
778 .write()
779 .map_err(|_| VaultError::LockPoisoned)?;
780 guard.as_ref().is_some_and(Self::token_needs_renewal)
781 }; if !still_needed {
784 return Ok(());
785 }
786
787 let raw_resp = self
791 .execute_raw(Method::POST, "auth/token/renew-self", None)
792 .await?;
793 let resp: VaultResponse<serde_json::Value> = raw_resp.json().await?;
794 if let Some(auth) = resp.auth {
795 self.update_token_from_auth(&auth)?;
796 }
797 Ok(())
798 }
799 }
800 }
801
802 pub(crate) async fn try_re_authenticate(&self) -> Result<(), VaultError> {
804 match &self.inner.auth_method {
805 Some(method) => {
806 let auth = method.login_dyn(self).await?;
807 self.update_token_from_auth(&auth)?;
808 Ok(())
809 }
810 None => Err(VaultError::AuthRequired),
811 }
812 }
813
814 pub(crate) async fn execute(
815 &self,
816 method: Method,
817 path: &str,
818 body: Option<&serde_json::Value>,
819 ) -> Result<Response, VaultError> {
820 let is_login = path.starts_with("auth/") && path.contains("/login");
823 if !is_login {
824 self.ensure_valid_token().await?;
825 }
826 self.execute_raw(method, path, body).await
827 }
828
829 pub(crate) async fn execute_raw(
832 &self,
833 method: Method,
834 path: &str,
835 body: Option<&serde_json::Value>,
836 ) -> Result<Response, VaultError> {
837 let span = tracing::info_span!(
838 "vault.request",
839 http.method = %method,
840 vault.path = %path,
841 http.status_code = tracing::field::Empty,
842 );
843
844 async {
845 if let Some(cb) = &self.inner.circuit_breaker {
846 cb.check()?;
847 }
848
849 let url_str = format!("{}v1/{}", self.inner.base_url, path.trim_start_matches('/'));
850 let url = Url::parse(&url_str)?;
851
852 let mut req = self
853 .inner
854 .http
855 .request(method.clone(), url.clone())
856 .header("X-Vault-Request", "true");
857
858 if method == Method::PATCH {
859 req = req.header("Content-Type", "application/merge-patch+json");
860 }
861
862 req = self.inject_headers(req)?;
863
864 if let Some(body) = body {
865 req = req.json(body);
866 }
867
868 match self.send_with_retry(req, &url, &method).await {
869 Ok(resp) => {
870 if let Some(cb) = &self.inner.circuit_breaker {
871 cb.record_success();
872 }
873 tracing::Span::current().record("http.status_code", resp.status().as_u16());
874 tracing::debug!(status = resp.status().as_u16(), "vault response");
875 Ok(resp)
876 }
877 Err(e) => {
878 if let Some(cb) = &self.inner.circuit_breaker {
879 cb.record_failure();
880 }
881 Err(e)
882 }
883 }
884 }
885 .instrument(span)
886 .await
887 }
888
889 pub(crate) fn inject_headers(
890 &self,
891 mut req: reqwest::RequestBuilder,
892 ) -> Result<reqwest::RequestBuilder, VaultError> {
893 let guard = self
894 .inner
895 .token
896 .read()
897 .map_err(|_| VaultError::LockPoisoned)?;
898 if let Some(ts) = guard.as_ref() {
899 req = req.header("X-Vault-Token", ts.value.expose_secret());
900 }
901 drop(guard);
902
903 let ns = self
904 .namespace_override
905 .as_deref()
906 .or(self.inner.namespace.as_deref());
907 if let Some(ns) = ns {
908 req = req.header("X-Vault-Namespace", ns);
909 }
910 let ttl = self
911 .wrap_ttl_override
912 .as_deref()
913 .or(self.inner.config.wrap_ttl.as_deref());
914 if let Some(ttl) = ttl {
915 req = req.header("X-Vault-Wrap-TTL", ttl);
916 }
917 if self.inner.config.forward_to_leader {
918 req = req.header("X-Vault-Forward", "active-node");
919 }
920 Ok(req)
921 }
922
923 async fn send_with_retry(
924 &self,
925 builder: reqwest::RequestBuilder,
926 url: &Url,
927 method: &Method,
928 ) -> Result<Response, VaultError> {
929 let max = self.inner.config.max_retries;
930 let mut skip_backoff = false;
931
932 for attempt in 0..=max {
933 if attempt > 0 && !skip_backoff {
934 let base = self
935 .inner
936 .config
937 .initial_retry_delay
938 .checked_mul(2u32.saturating_pow(attempt - 1))
939 .unwrap_or(MAX_BACKOFF);
940 let capped = base.min(MAX_BACKOFF);
941 let capped_ms = u64::try_from(capped.as_millis()).unwrap_or(u64::MAX).max(1);
942 let delay = Duration::from_millis(rand::rng().random_range(0u64..capped_ms));
943 tracing::warn!(attempt, max, %url, %method, ?delay, "retrying");
944 tokio::time::sleep(delay).await;
945 }
946 skip_backoff = false;
947
948 let req = match builder.try_clone() {
949 Some(r) => r,
950 None => {
951 return Err(VaultError::Config(
952 "request body not cloneable (stream body?)".into(),
953 ));
954 }
955 };
956
957 match req.send().await {
958 Ok(resp) => {
959 let status = resp.status().as_u16();
960 match status {
961 200..=299 | 404 => return Ok(resp),
962 401 => {
963 return Err(VaultError::AuthRequired);
964 }
965 403 => {
966 let errors = Self::extract_errors(resp).await;
967 return Err(VaultError::PermissionDenied { errors });
968 }
969 429 => {
970 let retry_after = resp
971 .headers()
972 .get("Retry-After")
973 .and_then(|v| v.to_str().ok())
974 .and_then(|v| v.parse::<u64>().ok());
975 if attempt >= max {
976 return Err(VaultError::RateLimited { retry_after });
977 }
978 if let Some(secs) = retry_after {
979 let capped = Duration::from_secs(secs).min(MAX_BACKOFF);
980 tokio::time::sleep(capped).await;
981 skip_backoff = true;
982 }
983 continue;
984 }
985 412 => {
986 if attempt >= max {
987 return Err(VaultError::ConsistencyRetry);
988 }
989 continue;
990 }
991 503 => {
992 let e = VaultError::Sealed {
993 url: url.to_string(),
994 };
995 if attempt >= max || !self.inner.config.retry_on_sealed {
996 return Err(e);
997 }
998 continue;
999 }
1000 _ => {
1001 let errors = Self::extract_errors(resp).await;
1002 let err = VaultError::Api { status, errors };
1003 if err.is_retryable() && attempt < max {
1004 continue;
1005 }
1006 return Err(err);
1007 }
1008 }
1009 }
1010 Err(e) if (e.is_timeout() || e.is_connect()) && attempt < max => {
1011 continue;
1012 }
1013 Err(e) => return Err(VaultError::Http(e)),
1014 }
1015 }
1016
1017 unreachable!("retry loop always returns from within")
1018 }
1019
1020 async fn extract_errors(resp: Response) -> Vec<String> {
1021 let body = resp.text().await.unwrap_or_default();
1022 serde_json::from_str::<serde_json::Value>(&body)
1023 .ok()
1024 .and_then(|v| v.get("errors")?.as_array().cloned())
1025 .map(|arr| {
1026 arr.into_iter()
1027 .filter_map(|v| v.as_str().map(String::from))
1028 .collect()
1029 })
1030 .unwrap_or_else(|| if body.is_empty() { vec![] } else { vec![body] })
1031 }
1032
1033 fn log_warnings(&self, warnings: &Option<Vec<String>>) {
1034 if let Some(warns) = warnings {
1035 for w in warns {
1036 tracing::debug!(warning = %w, "vault response warning");
1037 }
1038 }
1039 }
1040}
1041
1042pub(crate) fn to_body(value: &impl Serialize) -> Result<serde_json::Value, VaultError> {
1044 serde_json::to_value(value).map_err(|e| VaultError::Config(format!("serialize: {e}")))
1045}
1046
1047fn read_vault_token_file() -> Option<SecretString> {
1052 let path = home::home_dir()?.join(".vault-token");
1053 let raw = fs::read_to_string(path).ok()?;
1054 let trimmed = raw.trim();
1055 if trimmed.is_empty() {
1056 None
1057 } else {
1058 Some(SecretString::from(trimmed))
1059 }
1060}
1061
1062pub fn encode_path(raw: &str) -> String {
1066 let mut out = String::with_capacity(raw.len());
1067 for &byte in raw.as_bytes() {
1068 match byte {
1069 b'?' | b'#' | b'%' | b' ' | b'[' | b']' | 0..=0x1F | 0x7F | 0x80..=0xFF => {
1070 write!(out, "%{byte:02X}").unwrap();
1071 }
1072 _ => out.push(byte as char),
1073 }
1074 }
1075 out
1076}