streambed_vault/
lib.rs

1#![doc = include_str!("../README.md")]
2
3pub mod args;
4
5use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use cache_loader_async::{
9    backing::{LruCacheBacking, TtlCacheBacking, TtlMeta},
10    cache_api::{CacheEntry, LoadingCache, WithMeta},
11};
12use log::debug;
13use metrics::increment_counter;
14use reqwest::{Certificate, Client, StatusCode, Url};
15use serde::{Deserialize, Serialize};
16use tokio::{sync::Mutex, time::Instant};
17
18use streambed::{
19    delayer::Delayer,
20    secret_store::{
21        AppRoleAuthReply, Error, GetSecretReply, SecretData, SecretStore, UserPassAuthReply,
22    },
23};
24
25#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
26struct AppRoleAuthRequest {
27    pub role_id: String,
28    pub secret_id: String,
29}
30
31#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
32struct UserPassAuthRequest {
33    pub password: String,
34}
35
36#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
37struct UserPassCreateUpdateRequest {
38    pub username: String,
39    pub password: String,
40}
41
42type TtlCache = LoadingCache<
43    String,
44    Option<GetSecretReply>,
45    Error,
46    TtlCacheBacking<
47        String,
48        CacheEntry<Option<GetSecretReply>, Error>,
49        LruCacheBacking<String, (CacheEntry<Option<GetSecretReply>, Error>, Instant)>,
50    >,
51>;
52
53/// A client interface that uses the Hashicorp Vault HTTP API.
54#[derive(Clone)]
55pub struct VaultSecretStore {
56    cache: TtlCache,
57    client: Client,
58    client_token: Arc<Mutex<Option<String>>>,
59    max_secrets_cached: usize,
60    server: Url,
61    ttl_field: Option<String>,
62    unauthorized_timeout: Duration,
63}
64
65const APPROLE_AUTH_LABEL: &str = "approle_auth";
66const SECRET_PATH_LABEL: &str = "secret_path";
67const USERPASS_AUTH_LABEL: &str = "userpass_auth";
68const USERPASS_CREATE_UPDATE_LABEL: &str = "userpass_create_update";
69
70impl VaultSecretStore {
71    /// Establish a new client to Hashicorp Vault. In the case where TLS is required,
72    /// a root certificate may be provided e.g. when using self-signed certificates. TLS
73    /// connections are encouraged.
74    /// An unauthorized_timeout determines how long the server should wait before being
75    /// requested again.
76    /// A max_secrets_cached arg limits the number of secrets that can be held at any time.
77    ///
78    /// Avoid creating many new Vault secret stores and clone them instead so that HTTP
79    /// connection pools can be shared.
80    pub fn new(
81        server: Url,
82        server_cert: Option<Certificate>,
83        tls_insecure: bool,
84        unauthorized_timeout: Duration,
85        max_secrets_cached: usize,
86        ttl_field: Option<&str>,
87    ) -> Self {
88        let client = Client::builder().danger_accept_invalid_certs(tls_insecure);
89        let client = if let Some(cert) = server_cert {
90            client.add_root_certificate(cert)
91        } else {
92            client
93        };
94
95        Self::with_new_cache(
96            client.build().unwrap(),
97            server,
98            unauthorized_timeout,
99            max_secrets_cached,
100            ttl_field.map(|s| s.to_string()),
101        )
102    }
103
104    fn with_new_cache(
105        client: Client,
106        server: Url,
107        unauthorized_timeout: Duration,
108        max_secrets_cached: usize,
109        ttl_field: Option<String>,
110    ) -> Self {
111        let client_token: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
112        let retained_client_token = Arc::clone(&client_token);
113
114        let retained_client = client.clone();
115        let retained_server = server.clone();
116        let retained_ttl_field = ttl_field.clone();
117        let retained_unauthorized_timeout = unauthorized_timeout;
118
119        let cache: TtlCache = LoadingCache::with_meta_loader(
120            TtlCacheBacking::with_backing(
121                unauthorized_timeout,
122                LruCacheBacking::new(max_secrets_cached),
123            ),
124            move |secret_path| {
125                let task_client_token = Arc::clone(&client_token);
126
127                let task_client = client.clone();
128                let task_server = server.clone();
129                let task_ttl_field = ttl_field.clone();
130
131                async move {
132                    let mut delayer = Delayer::default();
133                    loop {
134                        increment_counter!("ss_get_secret_requests", SECRET_PATH_LABEL => secret_path.clone());
135
136                        let mut builder = task_client.get(
137                            task_server
138                                .join(&format!("{task_server}v1/secret/data/{secret_path}"))
139                                .unwrap(),
140                        );
141                        if let Some(client_token) = task_client_token.lock().await.as_deref() {
142                            builder = builder.header("X-Vault-Token", client_token)
143                        }
144
145                        let result = builder.send().await;
146                        match result {
147                            Ok(response) => {
148                                if response.status() == StatusCode::FORBIDDEN {
149                                    increment_counter!("ss_unauthorized", SECRET_PATH_LABEL => secret_path.clone());
150                                    break Err(Error::Unauthorized);
151                                } else {
152                                    let secret_reply = if response.status().is_success() {
153                                        response.json::<GetSecretReply>().await.ok()
154                                    } else {
155                                        debug!(
156                                            "Secret store failure status while getting secret: {:?}",
157                                            response.status()
158                                        );
159                                        increment_counter!("ss_other_reply_failures");
160                                        None
161                                    };
162                                    let lease_duration = secret_reply.as_ref().map(|sr| {
163                                        let mut lease_duration = None;
164                                        if let Some(ttl_field) = task_ttl_field.as_ref() {
165                                            if let Some(ttl) = sr.data.data.get(ttl_field) {
166                                                if let Ok(ttl_duration) =
167                                                    ttl.parse::<humantime::Duration>()
168                                                {
169                                                    lease_duration = Some(ttl_duration.into());
170                                                }
171                                            }
172                                        }
173                                        lease_duration.unwrap_or_else(|| {
174                                            Duration::from_secs(sr.lease_duration)
175                                        })
176                                    });
177                                    break Ok(secret_reply)
178                                        .with_meta(lease_duration.map(TtlMeta::from));
179                                }
180                            }
181                            Err(e) => {
182                                debug!(
183                                    "Secret store is unavailable while getting secret. Error: {:?}",
184                                    e
185                                );
186                                increment_counter!("ss_unavailables");
187                            }
188                        }
189                        delayer.delay().await;
190                    }
191                }
192            },
193        );
194
195        Self {
196            cache,
197            client: retained_client,
198            client_token: retained_client_token,
199            max_secrets_cached,
200            ttl_field: retained_ttl_field,
201            server: retained_server,
202            unauthorized_timeout: retained_unauthorized_timeout,
203        }
204    }
205
206    pub fn with_new_auth_prepared(ss: &Self) -> Self {
207        Self::with_new_cache(
208            ss.client.clone(),
209            ss.server.clone(),
210            ss.unauthorized_timeout,
211            ss.max_secrets_cached,
212            ss.ttl_field.clone(),
213        )
214    }
215}
216
217#[async_trait]
218impl SecretStore for VaultSecretStore {
219    async fn approle_auth(
220        &self,
221        role_id: &str,
222        secret_id: &str,
223    ) -> Result<AppRoleAuthReply, Error> {
224        loop {
225            let role_id = role_id.to_string();
226
227            increment_counter!("ss_approle_auth_requests", APPROLE_AUTH_LABEL => role_id.clone());
228
229            let task_client_token = Arc::clone(&self.client_token);
230            let result = self
231                .client
232                .post(
233                    self.server
234                        .join(&format!("{}v1/auth/approle/login", self.server))
235                        .unwrap(),
236                )
237                .json(&AppRoleAuthRequest {
238                    role_id: role_id.to_string(),
239                    secret_id: secret_id.to_string(),
240                })
241                .send()
242                .await;
243            match result {
244                Ok(response) => {
245                    if response.status() == StatusCode::FORBIDDEN {
246                        increment_counter!("ss_unauthorized", APPROLE_AUTH_LABEL => role_id.clone());
247                        break Err(Error::Unauthorized);
248                    } else {
249                        let secret_reply = if response.status().is_success() {
250                            let approle_auth_reply = response
251                                .json::<AppRoleAuthReply>()
252                                .await
253                                .map_err(|_| Error::Unauthorized);
254                            if let Ok(r) = &approle_auth_reply {
255                                let mut client_token = task_client_token.lock().await;
256                                *client_token = Some(r.auth.client_token.clone());
257                            }
258                            approle_auth_reply
259                        } else {
260                            debug!(
261                                "Secret store failure status while authenticating: {:?}",
262                                response.status()
263                            );
264                            increment_counter!("ss_other_reply_failures", APPROLE_AUTH_LABEL => role_id.clone());
265                            Err(Error::Unauthorized)
266                        };
267                        break secret_reply;
268                    }
269                }
270                Err(e) => {
271                    debug!(
272                        "Secret store is unavailable while authenticating. Error: {:?}",
273                        e
274                    );
275                    increment_counter!("ss_unavailables");
276                }
277            }
278        }
279    }
280
281    async fn create_secret(
282        &self,
283        _secret_path: &str,
284        _secret_data: SecretData,
285    ) -> Result<(), Error> {
286        todo!()
287    }
288
289    async fn get_secret(&self, secret_path: &str) -> Result<Option<GetSecretReply>, Error> {
290        self.cache
291            .get(secret_path.to_owned())
292            .await
293            .map_err(|e| e.as_loading_error().unwrap().clone()) // Unsure how we can deal with caching issues
294    }
295
296    async fn userpass_auth(
297        &self,
298        username: &str,
299        password: &str,
300    ) -> Result<UserPassAuthReply, Error> {
301        let username = username.to_string();
302
303        increment_counter!("ss_userpass_auth_requests", USERPASS_AUTH_LABEL => username.clone());
304
305        let task_client_token = Arc::clone(&self.client_token);
306        let result = self
307            .client
308            .post(
309                self.server
310                    .join(&format!(
311                        "{}v1/auth/userpass/login/{}",
312                        self.server, username
313                    ))
314                    .unwrap(),
315            )
316            .json(&UserPassAuthRequest {
317                password: password.to_string(),
318            })
319            .send()
320            .await;
321        match result {
322            Ok(response) => {
323                if response.status() == StatusCode::FORBIDDEN {
324                    increment_counter!("ss_unauthorized", USERPASS_AUTH_LABEL => username.clone());
325                    Err(Error::Unauthorized)
326                } else if response.status().is_success() {
327                    let userpass_auth_reply = response
328                        .json::<UserPassAuthReply>()
329                        .await
330                        .map_err(|_| Error::Unauthorized);
331                    if let Ok(r) = &userpass_auth_reply {
332                        let mut client_token = task_client_token.lock().await;
333                        *client_token = Some(r.auth.client_token.clone());
334                    }
335                    userpass_auth_reply
336                } else {
337                    debug!(
338                        "Secret store failure status while authenticating: {:?}",
339                        response.status()
340                    );
341                    increment_counter!("ss_other_reply_failures", USERPASS_AUTH_LABEL => username.clone());
342                    Err(Error::Unauthorized)
343                }
344            }
345            Err(e) => {
346                debug!(
347                    "Secret store is unavailable while authenticating. Error: {:?}",
348                    e
349                );
350                increment_counter!("ss_unavailables");
351                Err(Error::Unauthorized)
352            }
353        }
354    }
355
356    async fn token_auth(&self, _token: &str) -> Result<(), Error> {
357        todo!()
358    }
359
360    async fn userpass_create_update_user(
361        &self,
362        current_username: &str,
363        username: &str,
364        password: &str,
365    ) -> Result<(), Error> {
366        let username = username.to_string();
367
368        increment_counter!("ss_userpass_create_updates", USERPASS_CREATE_UPDATE_LABEL => username.clone());
369
370        let mut builder = self
371            .client
372            .post(
373                self.server
374                    .join(&format!(
375                        "{}v1/auth/userpass/users/{}",
376                        self.server, current_username
377                    ))
378                    .unwrap(),
379            )
380            .json(&UserPassCreateUpdateRequest {
381                username: username.to_string(),
382                password: password.to_string(),
383            });
384        if let Some(client_token) = self.client_token.lock().await.as_deref() {
385            builder = builder.header("X-Vault-Token", client_token)
386        }
387
388        let result = builder.send().await;
389        match result {
390            Ok(response) => {
391                if response.status() == StatusCode::FORBIDDEN {
392                    increment_counter!("ss_unauthorized", USERPASS_CREATE_UPDATE_LABEL => username.clone());
393                    Err(Error::Unauthorized)
394                } else if response.status().is_success() {
395                    let _ = response.json::<()>().await.map_err(|_| Error::Unauthorized);
396                    Ok(())
397                } else {
398                    debug!(
399                        "Secret store failure status while creating/updating userpass: {:?}",
400                        response.status()
401                    );
402                    increment_counter!("ss_other_reply_failures", USERPASS_CREATE_UPDATE_LABEL => username.clone());
403                    Err(Error::Unauthorized)
404                }
405            }
406            Err(e) => {
407                debug!(
408                    "Secret store is unavailable while creating/updating userpass. Error: {:?}",
409                    e
410                );
411                increment_counter!("ss_unavailables");
412                Err(Error::Unauthorized)
413            }
414        }
415    }
416}