1#![doc = include_str!("../README.md")]
2
3pub mod args;
4
5use std::{sync::Arc, time::Duration};
6
7use async_trait::async_trait;
8use cache_loader_async::{
9 backing::{LruCacheBacking, TtlCacheBacking, TtlMeta},
10 cache_api::{CacheEntry, LoadingCache, WithMeta},
11};
12use log::debug;
13use metrics::increment_counter;
14use reqwest::{Certificate, Client, StatusCode, Url};
15use serde::{Deserialize, Serialize};
16use tokio::{sync::Mutex, time::Instant};
17
18use streambed::{
19 delayer::Delayer,
20 secret_store::{
21 AppRoleAuthReply, Error, GetSecretReply, SecretData, SecretStore, UserPassAuthReply,
22 },
23};
24
25#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
26struct AppRoleAuthRequest {
27 pub role_id: String,
28 pub secret_id: String,
29}
30
31#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
32struct UserPassAuthRequest {
33 pub password: String,
34}
35
36#[derive(Clone, Debug, Deserialize, Eq, PartialEq, Serialize)]
37struct UserPassCreateUpdateRequest {
38 pub username: String,
39 pub password: String,
40}
41
42type TtlCache = LoadingCache<
43 String,
44 Option<GetSecretReply>,
45 Error,
46 TtlCacheBacking<
47 String,
48 CacheEntry<Option<GetSecretReply>, Error>,
49 LruCacheBacking<String, (CacheEntry<Option<GetSecretReply>, Error>, Instant)>,
50 >,
51>;
52
53#[derive(Clone)]
55pub struct VaultSecretStore {
56 cache: TtlCache,
57 client: Client,
58 client_token: Arc<Mutex<Option<String>>>,
59 max_secrets_cached: usize,
60 server: Url,
61 ttl_field: Option<String>,
62 unauthorized_timeout: Duration,
63}
64
65const APPROLE_AUTH_LABEL: &str = "approle_auth";
66const SECRET_PATH_LABEL: &str = "secret_path";
67const USERPASS_AUTH_LABEL: &str = "userpass_auth";
68const USERPASS_CREATE_UPDATE_LABEL: &str = "userpass_create_update";
69
70impl VaultSecretStore {
71 pub fn new(
81 server: Url,
82 server_cert: Option<Certificate>,
83 tls_insecure: bool,
84 unauthorized_timeout: Duration,
85 max_secrets_cached: usize,
86 ttl_field: Option<&str>,
87 ) -> Self {
88 let client = Client::builder().danger_accept_invalid_certs(tls_insecure);
89 let client = if let Some(cert) = server_cert {
90 client.add_root_certificate(cert)
91 } else {
92 client
93 };
94
95 Self::with_new_cache(
96 client.build().unwrap(),
97 server,
98 unauthorized_timeout,
99 max_secrets_cached,
100 ttl_field.map(|s| s.to_string()),
101 )
102 }
103
104 fn with_new_cache(
105 client: Client,
106 server: Url,
107 unauthorized_timeout: Duration,
108 max_secrets_cached: usize,
109 ttl_field: Option<String>,
110 ) -> Self {
111 let client_token: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
112 let retained_client_token = Arc::clone(&client_token);
113
114 let retained_client = client.clone();
115 let retained_server = server.clone();
116 let retained_ttl_field = ttl_field.clone();
117 let retained_unauthorized_timeout = unauthorized_timeout;
118
119 let cache: TtlCache = LoadingCache::with_meta_loader(
120 TtlCacheBacking::with_backing(
121 unauthorized_timeout,
122 LruCacheBacking::new(max_secrets_cached),
123 ),
124 move |secret_path| {
125 let task_client_token = Arc::clone(&client_token);
126
127 let task_client = client.clone();
128 let task_server = server.clone();
129 let task_ttl_field = ttl_field.clone();
130
131 async move {
132 let mut delayer = Delayer::default();
133 loop {
134 increment_counter!("ss_get_secret_requests", SECRET_PATH_LABEL => secret_path.clone());
135
136 let mut builder = task_client.get(
137 task_server
138 .join(&format!("{task_server}v1/secret/data/{secret_path}"))
139 .unwrap(),
140 );
141 if let Some(client_token) = task_client_token.lock().await.as_deref() {
142 builder = builder.header("X-Vault-Token", client_token)
143 }
144
145 let result = builder.send().await;
146 match result {
147 Ok(response) => {
148 if response.status() == StatusCode::FORBIDDEN {
149 increment_counter!("ss_unauthorized", SECRET_PATH_LABEL => secret_path.clone());
150 break Err(Error::Unauthorized);
151 } else {
152 let secret_reply = if response.status().is_success() {
153 response.json::<GetSecretReply>().await.ok()
154 } else {
155 debug!(
156 "Secret store failure status while getting secret: {:?}",
157 response.status()
158 );
159 increment_counter!("ss_other_reply_failures");
160 None
161 };
162 let lease_duration = secret_reply.as_ref().map(|sr| {
163 let mut lease_duration = None;
164 if let Some(ttl_field) = task_ttl_field.as_ref() {
165 if let Some(ttl) = sr.data.data.get(ttl_field) {
166 if let Ok(ttl_duration) =
167 ttl.parse::<humantime::Duration>()
168 {
169 lease_duration = Some(ttl_duration.into());
170 }
171 }
172 }
173 lease_duration.unwrap_or_else(|| {
174 Duration::from_secs(sr.lease_duration)
175 })
176 });
177 break Ok(secret_reply)
178 .with_meta(lease_duration.map(TtlMeta::from));
179 }
180 }
181 Err(e) => {
182 debug!(
183 "Secret store is unavailable while getting secret. Error: {:?}",
184 e
185 );
186 increment_counter!("ss_unavailables");
187 }
188 }
189 delayer.delay().await;
190 }
191 }
192 },
193 );
194
195 Self {
196 cache,
197 client: retained_client,
198 client_token: retained_client_token,
199 max_secrets_cached,
200 ttl_field: retained_ttl_field,
201 server: retained_server,
202 unauthorized_timeout: retained_unauthorized_timeout,
203 }
204 }
205
206 pub fn with_new_auth_prepared(ss: &Self) -> Self {
207 Self::with_new_cache(
208 ss.client.clone(),
209 ss.server.clone(),
210 ss.unauthorized_timeout,
211 ss.max_secrets_cached,
212 ss.ttl_field.clone(),
213 )
214 }
215}
216
217#[async_trait]
218impl SecretStore for VaultSecretStore {
219 async fn approle_auth(
220 &self,
221 role_id: &str,
222 secret_id: &str,
223 ) -> Result<AppRoleAuthReply, Error> {
224 loop {
225 let role_id = role_id.to_string();
226
227 increment_counter!("ss_approle_auth_requests", APPROLE_AUTH_LABEL => role_id.clone());
228
229 let task_client_token = Arc::clone(&self.client_token);
230 let result = self
231 .client
232 .post(
233 self.server
234 .join(&format!("{}v1/auth/approle/login", self.server))
235 .unwrap(),
236 )
237 .json(&AppRoleAuthRequest {
238 role_id: role_id.to_string(),
239 secret_id: secret_id.to_string(),
240 })
241 .send()
242 .await;
243 match result {
244 Ok(response) => {
245 if response.status() == StatusCode::FORBIDDEN {
246 increment_counter!("ss_unauthorized", APPROLE_AUTH_LABEL => role_id.clone());
247 break Err(Error::Unauthorized);
248 } else {
249 let secret_reply = if response.status().is_success() {
250 let approle_auth_reply = response
251 .json::<AppRoleAuthReply>()
252 .await
253 .map_err(|_| Error::Unauthorized);
254 if let Ok(r) = &approle_auth_reply {
255 let mut client_token = task_client_token.lock().await;
256 *client_token = Some(r.auth.client_token.clone());
257 }
258 approle_auth_reply
259 } else {
260 debug!(
261 "Secret store failure status while authenticating: {:?}",
262 response.status()
263 );
264 increment_counter!("ss_other_reply_failures", APPROLE_AUTH_LABEL => role_id.clone());
265 Err(Error::Unauthorized)
266 };
267 break secret_reply;
268 }
269 }
270 Err(e) => {
271 debug!(
272 "Secret store is unavailable while authenticating. Error: {:?}",
273 e
274 );
275 increment_counter!("ss_unavailables");
276 }
277 }
278 }
279 }
280
281 async fn create_secret(
282 &self,
283 _secret_path: &str,
284 _secret_data: SecretData,
285 ) -> Result<(), Error> {
286 todo!()
287 }
288
289 async fn get_secret(&self, secret_path: &str) -> Result<Option<GetSecretReply>, Error> {
290 self.cache
291 .get(secret_path.to_owned())
292 .await
293 .map_err(|e| e.as_loading_error().unwrap().clone()) }
295
296 async fn userpass_auth(
297 &self,
298 username: &str,
299 password: &str,
300 ) -> Result<UserPassAuthReply, Error> {
301 let username = username.to_string();
302
303 increment_counter!("ss_userpass_auth_requests", USERPASS_AUTH_LABEL => username.clone());
304
305 let task_client_token = Arc::clone(&self.client_token);
306 let result = self
307 .client
308 .post(
309 self.server
310 .join(&format!(
311 "{}v1/auth/userpass/login/{}",
312 self.server, username
313 ))
314 .unwrap(),
315 )
316 .json(&UserPassAuthRequest {
317 password: password.to_string(),
318 })
319 .send()
320 .await;
321 match result {
322 Ok(response) => {
323 if response.status() == StatusCode::FORBIDDEN {
324 increment_counter!("ss_unauthorized", USERPASS_AUTH_LABEL => username.clone());
325 Err(Error::Unauthorized)
326 } else if response.status().is_success() {
327 let userpass_auth_reply = response
328 .json::<UserPassAuthReply>()
329 .await
330 .map_err(|_| Error::Unauthorized);
331 if let Ok(r) = &userpass_auth_reply {
332 let mut client_token = task_client_token.lock().await;
333 *client_token = Some(r.auth.client_token.clone());
334 }
335 userpass_auth_reply
336 } else {
337 debug!(
338 "Secret store failure status while authenticating: {:?}",
339 response.status()
340 );
341 increment_counter!("ss_other_reply_failures", USERPASS_AUTH_LABEL => username.clone());
342 Err(Error::Unauthorized)
343 }
344 }
345 Err(e) => {
346 debug!(
347 "Secret store is unavailable while authenticating. Error: {:?}",
348 e
349 );
350 increment_counter!("ss_unavailables");
351 Err(Error::Unauthorized)
352 }
353 }
354 }
355
356 async fn token_auth(&self, _token: &str) -> Result<(), Error> {
357 todo!()
358 }
359
360 async fn userpass_create_update_user(
361 &self,
362 current_username: &str,
363 username: &str,
364 password: &str,
365 ) -> Result<(), Error> {
366 let username = username.to_string();
367
368 increment_counter!("ss_userpass_create_updates", USERPASS_CREATE_UPDATE_LABEL => username.clone());
369
370 let mut builder = self
371 .client
372 .post(
373 self.server
374 .join(&format!(
375 "{}v1/auth/userpass/users/{}",
376 self.server, current_username
377 ))
378 .unwrap(),
379 )
380 .json(&UserPassCreateUpdateRequest {
381 username: username.to_string(),
382 password: password.to_string(),
383 });
384 if let Some(client_token) = self.client_token.lock().await.as_deref() {
385 builder = builder.header("X-Vault-Token", client_token)
386 }
387
388 let result = builder.send().await;
389 match result {
390 Ok(response) => {
391 if response.status() == StatusCode::FORBIDDEN {
392 increment_counter!("ss_unauthorized", USERPASS_CREATE_UPDATE_LABEL => username.clone());
393 Err(Error::Unauthorized)
394 } else if response.status().is_success() {
395 let _ = response.json::<()>().await.map_err(|_| Error::Unauthorized);
396 Ok(())
397 } else {
398 debug!(
399 "Secret store failure status while creating/updating userpass: {:?}",
400 response.status()
401 );
402 increment_counter!("ss_other_reply_failures", USERPASS_CREATE_UPDATE_LABEL => username.clone());
403 Err(Error::Unauthorized)
404 }
405 }
406 Err(e) => {
407 debug!(
408 "Secret store is unavailable while creating/updating userpass. Error: {:?}",
409 e
410 );
411 increment_counter!("ss_unavailables");
412 Err(Error::Unauthorized)
413 }
414 }
415 }
416}