vault_mgmt_lib/
unseal.rs

1use http::uri::Scheme;
2use http_body_util::{BodyExt, Full};
3use hyper::body::Bytes;
4use k8s_openapi::api::core::v1::Pod;
5use kube::api::Api;
6use secrecy::{ExposeSecret, Secret};
7use tokio::process::Command;
8
9use crate::{
10    get_unseal_keys_request, list_vault_pods, unseal_request, BytesBody, ExecIn,
11    HttpForwarderService, HttpRequest,
12};
13
14/// Get the unseal keys by running the specified command
15#[tracing::instrument()]
16pub async fn get_unseal_keys(key_cmd: &str) -> anyhow::Result<Vec<Secret<String>>> {
17    let output = Command::new("sh").arg("-c").arg(key_cmd).output().await?;
18
19    let stdout = String::from_utf8(output.stdout)?;
20    let keys = stdout
21        .lines()
22        .collect::<Vec<_>>()
23        .iter()
24        .map(|k| Secret::new(k.to_string()))
25        .collect();
26
27    Ok(keys)
28}
29
30/// List all pods that are sealed
31pub async fn list_sealed_pods(api: &Api<Pod>) -> anyhow::Result<Vec<Pod>> {
32    let pods = api
33        .list(&list_vault_pods().labels(&ExecIn::Sealed.to_label_selector()))
34        .await?;
35
36    Ok(pods.items)
37}
38
39/// Unseal a vault process using the provided keys
40#[async_trait::async_trait]
41pub trait Unseal {
42    /// Unseal a vault process using the provided keys
43    async fn unseal(&mut self, keys: &[Secret<String>]) -> anyhow::Result<()>;
44}
45
46#[async_trait::async_trait]
47impl<T> Unseal for T
48where
49    T: HttpRequest<BytesBody> + Send + Sync + 'static,
50{
51    async fn unseal(&mut self, keys: &[Secret<String>]) -> anyhow::Result<()> {
52        if keys.is_empty() {
53            return Err(anyhow::anyhow!("no keys provided"));
54        }
55
56        for key in keys {
57            self.ready().await?;
58
59            let body = serde_json::json!({
60                "key": key.expose_secret(),
61                "reset": false,
62                "migrate": false,
63            });
64
65            let http_req = unseal_request(Full::new(Bytes::from(body.to_string())).boxed())?;
66
67            let (parts, body) = self.send_request(http_req).await?.into_parts();
68
69            let body = String::from_utf8(body.into()).unwrap();
70
71            if !(parts.status.is_success() || parts.status.is_redirection()) {
72                return Err(anyhow::anyhow!("unsealing: {}", body));
73            }
74        }
75
76        Ok(())
77    }
78}
79
80/// Get the unseal keys from a Vault secret
81#[async_trait::async_trait]
82pub trait GetUnsealKeys {
83    /// Get the unseal keys from a Vault secret
84    async fn get_unseal_keys(
85        &mut self,
86        path: &http::uri::PathAndQuery,
87        token: Secret<String>,
88    ) -> anyhow::Result<Vec<Secret<String>>>;
89}
90
91#[async_trait::async_trait]
92impl<T> GetUnsealKeys for T
93where
94    T: HttpRequest<BytesBody> + Send + Sync + 'static,
95{
96    async fn get_unseal_keys(
97        &mut self,
98        path: &http::uri::PathAndQuery,
99        token: Secret<String>,
100    ) -> anyhow::Result<Vec<Secret<String>>> {
101        let req = get_unseal_keys_request(path.as_str(), token)?;
102
103        let (parts, body) = self.send_request(req).await?.into_parts();
104
105        let body = String::from_utf8(body.to_vec())?;
106
107        if !(parts.status.is_success()) {
108            return Err(anyhow::anyhow!("retrieving unseal keys: {}", body));
109        }
110
111        let response: vault_kvget::Response = serde_json::from_str(&body)?;
112
113        Ok(response.keys())
114    }
115}
116
117pub struct GetUnsealKeysFromVault {
118    scheme: http::uri::Scheme,
119    authority: http::uri::Authority,
120}
121
122impl GetUnsealKeysFromVault {
123    pub fn new(uri: &http::Uri) -> anyhow::Result<Self> {
124        Ok(Self {
125            scheme: uri
126                .scheme()
127                .unwrap_or_else(|| match uri.port_u16() {
128                    Some(443) => &Scheme::HTTPS,
129                    _ => &Scheme::HTTP,
130                })
131                .clone(),
132            authority: uri
133                .authority()
134                .ok_or(anyhow::anyhow!(
135                    "keys secret uri does not include an authority"
136                ))?
137                .clone(),
138        })
139    }
140}
141
142#[async_trait::async_trait]
143impl GetUnsealKeys for GetUnsealKeysFromVault {
144    async fn get_unseal_keys(
145        &mut self,
146        path: &http::uri::PathAndQuery,
147        token: Secret<String>,
148    ) -> anyhow::Result<Vec<Secret<String>>> {
149        let stream = tokio::net::TcpStream::connect((
150            self.authority.host(),
151            self.authority
152                .port_u16()
153                .unwrap_or_else(|| match self.scheme.as_str() {
154                    "https" => 443,
155                    _ => 80,
156                }),
157        ))
158        .await
159        .unwrap();
160
161        let mut client = match self.scheme.as_str() {
162            "https" => HttpForwarderService::https(self.authority.host(), stream)
163                .await
164                .unwrap(),
165            "http" => HttpForwarderService::http(stream).await.unwrap(),
166            _ => {
167                anyhow::bail!("unsupported scheme {}", self.scheme.as_str())
168            }
169        };
170
171        client.get_unseal_keys(path, token).await
172    }
173}
174
175mod vault_kvget {
176    use secrecy::Secret;
177    use serde::{Deserialize, Serialize};
178
179    #[derive(Deserialize, Serialize, Debug)]
180    pub struct Response {
181        data: DataMetadata,
182    }
183
184    impl Response {
185        pub fn keys(&self) -> Vec<Secret<String>> {
186            self.data
187                .data
188                .keys
189                .lines()
190                .collect::<Vec<_>>()
191                .iter()
192                .map(|k| Secret::new(k.to_string()))
193                .collect()
194        }
195    }
196
197    #[derive(Deserialize, Serialize, Debug)]
198    struct DataMetadata {
199        data: Data,
200    }
201
202    #[derive(Deserialize, Serialize, Debug)]
203    struct Data {
204        keys: String,
205    }
206}
207
208#[cfg(test)]
209mod tests {
210    use std::str::FromStr;
211
212    use http::{Method, Request, Response, StatusCode};
213    use hyper::body::Bytes;
214    use k8s_openapi::{api::core::v1::Pod, List};
215    use kube::{client::Body, Api, Client};
216    use secrecy::Secret;
217    use tokio::task::JoinHandle;
218    use tokio_util::sync::CancellationToken;
219    use tower_test::mock::{self, Handle};
220    use wiremock::{
221        matchers::{header, method, path},
222        Mock, MockServer, ResponseTemplate,
223    };
224
225    use crate::{
226        list_sealed_pods, GetUnsealKeys, GetUnsealKeysFromVault, HttpForwarderService, Unseal,
227    };
228
229    async fn mock_list_sealed(
230        cancel: CancellationToken,
231        handle: &mut Handle<Request<Body>, Response<Body>>,
232    ) {
233        loop {
234            tokio::select! {
235                request = handle.next_request() => {
236                    let (request, send) = request.expect("Service not called");
237
238                    let method = request.method().to_string();
239                    let uri = request.uri().path().to_string();
240                    let query = request.uri().query().unwrap_or_default().to_string();
241
242                    let watch = query.contains("watch=true");
243
244                    println!("{} {} {} ", method, uri, query);
245
246                    let body = match (method.as_str(), uri.as_str(), query.as_str(), watch) {
247                        ("GET", "/api/v1/namespaces/vault-mgmt-e2e/pods", "&labelSelector=vault-sealed%3Dtrue", false) => {
248                            let mut list = List::<Pod>::default();
249
250                            for id in 0..=2 {
251                                let file = tokio::fs::read_to_string(format!(
252                                    "tests/resources/installed/{}{}.yaml",
253                                    "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-",
254                                    id
255                                ))
256                                .await
257                                .unwrap();
258
259                                let pod: Pod = serde_yaml::from_str(&file).unwrap();
260                                list.items.push(pod);
261                            }
262
263                            list.metadata.resource_version = Some(format!("{}", 1));
264
265                            serde_json::to_string(&list).unwrap()
266                        }
267                        _ => panic!("Unexpected API request {:?} {:?} {:?}", method, uri, query),
268                    };
269
270                    send.send_response(Response::builder().body(Bytes::from(body).into()).unwrap());
271                }
272                _ = cancel.cancelled() => {
273                    return;
274                }
275            }
276        }
277    }
278
279    async fn setup() -> (Api<Pod>, JoinHandle<()>, CancellationToken) {
280        let (mock_service, mut handle) = mock::pair::<Request<Body>, Response<Body>>();
281
282        let cancel = CancellationToken::new();
283        let cloned_token = cancel.clone();
284
285        let spawned = tokio::spawn(async move {
286            mock_list_sealed(cloned_token, &mut handle).await;
287        });
288
289        let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "vault-mgmt-e2e"));
290
291        (pods, spawned, cancel)
292    }
293
294    #[tokio::test]
295    async fn get_sealed_pods_returns_sealed_pods() {
296        let (api, service, cancel) = setup().await;
297
298        let pods = list_sealed_pods(&api).await.unwrap();
299
300        assert_eq!(pods.len(), 3);
301
302        cancel.cancel();
303
304        service.await.unwrap();
305    }
306
307    #[tokio::test]
308    async fn unseal_returns_err_without_keys() {
309        let mock_server = MockServer::start().await;
310        let mut client = HttpForwarderService::http(
311            tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
312                .await
313                .unwrap(),
314        )
315        .await
316        .unwrap();
317
318        let outcome = client.unseal(&[]).await;
319
320        assert!(outcome.is_err());
321    }
322
323    struct UnsealBodyMatcher(String);
324
325    impl wiremock::Match for UnsealBodyMatcher {
326        fn matches(&self, request: &wiremock::Request) -> bool {
327            let result: Result<serde_json::Value, _> = serde_json::from_slice(&request.body);
328            if let Ok(body) = result {
329                body.get("key").is_some()
330                    && body.get("key").unwrap() == &self.0
331                    && body.get("reset").is_some()
332                    && body.get("migrate").is_some()
333            } else {
334                false
335            }
336        }
337    }
338
339    #[tokio::test]
340    async fn unseal_calls_api() {
341        let mock_server = MockServer::start().await;
342
343        for key in ["abc".to_string(), "def".to_string(), "ghi".to_string()] {
344            Mock::given(method(Method::PUT))
345                .and(path("/v1/sys/unseal"))
346                .and(header("X-Vault-Request", "true"))
347                .and(UnsealBodyMatcher(key))
348                .respond_with(ResponseTemplate::new(StatusCode::OK))
349                .expect(1)
350                .mount(&mock_server)
351                .await;
352        }
353
354        let mut client = HttpForwarderService::http(
355            tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
356                .await
357                .unwrap(),
358        )
359        .await
360        .unwrap();
361
362        let outcome = client
363            .unseal(&[
364                Secret::from_str("abc").unwrap(),
365                Secret::from_str("def").unwrap(),
366                Secret::from_str("ghi").unwrap(),
367            ])
368            .await;
369
370        assert!(outcome.is_ok());
371    }
372
373    async fn mock_get_unseal_keys() -> MockServer {
374        let mock_server = MockServer::start().await;
375
376        Mock::given(method(Method::GET))
377            .and(path("/v1/kv/data/test"))
378            .and(header("X-Vault-Request", "true"))
379            .and(header("X-Vault-Token", "token"))
380            .respond_with(
381                ResponseTemplate::new(StatusCode::OK).set_body_json(serde_json::json!({
382                    "request_id": "abd3b7a3-581f-8add-1a6d-1d7cdb5b9c2b",
383                    "lease_id": "",
384                    "lease_duration": 0,
385                    "renewable": false,
386                    "data": {
387                        "data": {
388                            "keys": "abc\ndef\nghi"
389                        },
390                        "metadata": {
391                            "created_time": "2023-06-09T13:59:44.750984296Z",
392                            "custom_metadata": null,
393                            "deletion_time": "",
394                            "destroyed": false,
395                            "version": 1
396                        }
397                    },
398                    "warnings": null
399                })),
400            )
401            .expect(1)
402            .mount(&mock_server)
403            .await;
404
405        mock_server
406    }
407
408    #[tokio::test]
409    async fn retrieving_unseal_keys_works() {
410        let mock_server = mock_get_unseal_keys().await;
411
412        let uri = http::uri::Uri::builder()
413            .scheme(http::uri::Scheme::HTTP)
414            .authority(mock_server.uri().strip_prefix("http://").unwrap())
415            .path_and_query("/v1/kv/data/test")
416            .build()
417            .unwrap();
418
419        let mut client = HttpForwarderService::http(
420            tokio::net::TcpStream::connect(uri.authority().unwrap().as_str())
421                .await
422                .unwrap(),
423        )
424        .await
425        .unwrap();
426
427        let outcome = client
428            .get_unseal_keys(
429                uri.path_and_query().unwrap(),
430                Secret::new("token".to_string()),
431            )
432            .await;
433
434        assert!(outcome.is_ok());
435    }
436
437    #[tokio::test]
438    async fn retrieving_unseal_keys_works_externally() {
439        let mock_server = mock_get_unseal_keys().await;
440
441        let uri = http::uri::Uri::builder()
442            .scheme(http::uri::Scheme::HTTP)
443            .authority(mock_server.uri().strip_prefix("http://").unwrap())
444            .path_and_query("/v1/kv/data/test")
445            .build()
446            .unwrap();
447
448        dbg!(mock_server.uri());
449
450        let mut client = GetUnsealKeysFromVault::new(&uri).unwrap();
451
452        let outcome = client
453            .get_unseal_keys(
454                uri.path_and_query().unwrap(),
455                Secret::new("token".to_string()),
456            )
457            .await;
458
459        assert!(outcome.is_ok());
460    }
461}