1use http::uri::Scheme;
2use http_body_util::{BodyExt, Full};
3use hyper::body::Bytes;
4use k8s_openapi::api::core::v1::Pod;
5use kube::api::Api;
6use secrecy::{ExposeSecret, Secret};
7use tokio::process::Command;
8
9use crate::{
10 get_unseal_keys_request, list_vault_pods, unseal_request, BytesBody, ExecIn,
11 HttpForwarderService, HttpRequest,
12};
13
14#[tracing::instrument()]
16pub async fn get_unseal_keys(key_cmd: &str) -> anyhow::Result<Vec<Secret<String>>> {
17 let output = Command::new("sh").arg("-c").arg(key_cmd).output().await?;
18
19 let stdout = String::from_utf8(output.stdout)?;
20 let keys = stdout
21 .lines()
22 .collect::<Vec<_>>()
23 .iter()
24 .map(|k| Secret::new(k.to_string()))
25 .collect();
26
27 Ok(keys)
28}
29
30pub async fn list_sealed_pods(api: &Api<Pod>) -> anyhow::Result<Vec<Pod>> {
32 let pods = api
33 .list(&list_vault_pods().labels(&ExecIn::Sealed.to_label_selector()))
34 .await?;
35
36 Ok(pods.items)
37}
38
39#[async_trait::async_trait]
41pub trait Unseal {
42 async fn unseal(&mut self, keys: &[Secret<String>]) -> anyhow::Result<()>;
44}
45
46#[async_trait::async_trait]
47impl<T> Unseal for T
48where
49 T: HttpRequest<BytesBody> + Send + Sync + 'static,
50{
51 async fn unseal(&mut self, keys: &[Secret<String>]) -> anyhow::Result<()> {
52 if keys.is_empty() {
53 return Err(anyhow::anyhow!("no keys provided"));
54 }
55
56 for key in keys {
57 self.ready().await?;
58
59 let body = serde_json::json!({
60 "key": key.expose_secret(),
61 "reset": false,
62 "migrate": false,
63 });
64
65 let http_req = unseal_request(Full::new(Bytes::from(body.to_string())).boxed())?;
66
67 let (parts, body) = self.send_request(http_req).await?.into_parts();
68
69 let body = String::from_utf8(body.into()).unwrap();
70
71 if !(parts.status.is_success() || parts.status.is_redirection()) {
72 return Err(anyhow::anyhow!("unsealing: {}", body));
73 }
74 }
75
76 Ok(())
77 }
78}
79
80#[async_trait::async_trait]
82pub trait GetUnsealKeys {
83 async fn get_unseal_keys(
85 &mut self,
86 path: &http::uri::PathAndQuery,
87 token: Secret<String>,
88 ) -> anyhow::Result<Vec<Secret<String>>>;
89}
90
91#[async_trait::async_trait]
92impl<T> GetUnsealKeys for T
93where
94 T: HttpRequest<BytesBody> + Send + Sync + 'static,
95{
96 async fn get_unseal_keys(
97 &mut self,
98 path: &http::uri::PathAndQuery,
99 token: Secret<String>,
100 ) -> anyhow::Result<Vec<Secret<String>>> {
101 let req = get_unseal_keys_request(path.as_str(), token)?;
102
103 let (parts, body) = self.send_request(req).await?.into_parts();
104
105 let body = String::from_utf8(body.to_vec())?;
106
107 if !(parts.status.is_success()) {
108 return Err(anyhow::anyhow!("retrieving unseal keys: {}", body));
109 }
110
111 let response: vault_kvget::Response = serde_json::from_str(&body)?;
112
113 Ok(response.keys())
114 }
115}
116
117pub struct GetUnsealKeysFromVault {
118 scheme: http::uri::Scheme,
119 authority: http::uri::Authority,
120}
121
122impl GetUnsealKeysFromVault {
123 pub fn new(uri: &http::Uri) -> anyhow::Result<Self> {
124 Ok(Self {
125 scheme: uri
126 .scheme()
127 .unwrap_or_else(|| match uri.port_u16() {
128 Some(443) => &Scheme::HTTPS,
129 _ => &Scheme::HTTP,
130 })
131 .clone(),
132 authority: uri
133 .authority()
134 .ok_or(anyhow::anyhow!(
135 "keys secret uri does not include an authority"
136 ))?
137 .clone(),
138 })
139 }
140}
141
142#[async_trait::async_trait]
143impl GetUnsealKeys for GetUnsealKeysFromVault {
144 async fn get_unseal_keys(
145 &mut self,
146 path: &http::uri::PathAndQuery,
147 token: Secret<String>,
148 ) -> anyhow::Result<Vec<Secret<String>>> {
149 let stream = tokio::net::TcpStream::connect((
150 self.authority.host(),
151 self.authority
152 .port_u16()
153 .unwrap_or_else(|| match self.scheme.as_str() {
154 "https" => 443,
155 _ => 80,
156 }),
157 ))
158 .await
159 .unwrap();
160
161 let mut client = match self.scheme.as_str() {
162 "https" => HttpForwarderService::https(self.authority.host(), stream)
163 .await
164 .unwrap(),
165 "http" => HttpForwarderService::http(stream).await.unwrap(),
166 _ => {
167 anyhow::bail!("unsupported scheme {}", self.scheme.as_str())
168 }
169 };
170
171 client.get_unseal_keys(path, token).await
172 }
173}
174
175mod vault_kvget {
176 use secrecy::Secret;
177 use serde::{Deserialize, Serialize};
178
179 #[derive(Deserialize, Serialize, Debug)]
180 pub struct Response {
181 data: DataMetadata,
182 }
183
184 impl Response {
185 pub fn keys(&self) -> Vec<Secret<String>> {
186 self.data
187 .data
188 .keys
189 .lines()
190 .collect::<Vec<_>>()
191 .iter()
192 .map(|k| Secret::new(k.to_string()))
193 .collect()
194 }
195 }
196
197 #[derive(Deserialize, Serialize, Debug)]
198 struct DataMetadata {
199 data: Data,
200 }
201
202 #[derive(Deserialize, Serialize, Debug)]
203 struct Data {
204 keys: String,
205 }
206}
207
208#[cfg(test)]
209mod tests {
210 use std::str::FromStr;
211
212 use http::{Method, Request, Response, StatusCode};
213 use hyper::body::Bytes;
214 use k8s_openapi::{api::core::v1::Pod, List};
215 use kube::{client::Body, Api, Client};
216 use secrecy::Secret;
217 use tokio::task::JoinHandle;
218 use tokio_util::sync::CancellationToken;
219 use tower_test::mock::{self, Handle};
220 use wiremock::{
221 matchers::{header, method, path},
222 Mock, MockServer, ResponseTemplate,
223 };
224
225 use crate::{
226 list_sealed_pods, GetUnsealKeys, GetUnsealKeysFromVault, HttpForwarderService, Unseal,
227 };
228
229 async fn mock_list_sealed(
230 cancel: CancellationToken,
231 handle: &mut Handle<Request<Body>, Response<Body>>,
232 ) {
233 loop {
234 tokio::select! {
235 request = handle.next_request() => {
236 let (request, send) = request.expect("Service not called");
237
238 let method = request.method().to_string();
239 let uri = request.uri().path().to_string();
240 let query = request.uri().query().unwrap_or_default().to_string();
241
242 let watch = query.contains("watch=true");
243
244 println!("{} {} {} ", method, uri, query);
245
246 let body = match (method.as_str(), uri.as_str(), query.as_str(), watch) {
247 ("GET", "/api/v1/namespaces/vault-mgmt-e2e/pods", "&labelSelector=vault-sealed%3Dtrue", false) => {
248 let mut list = List::<Pod>::default();
249
250 for id in 0..=2 {
251 let file = tokio::fs::read_to_string(format!(
252 "tests/resources/installed/{}{}.yaml",
253 "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-",
254 id
255 ))
256 .await
257 .unwrap();
258
259 let pod: Pod = serde_yaml::from_str(&file).unwrap();
260 list.items.push(pod);
261 }
262
263 list.metadata.resource_version = Some(format!("{}", 1));
264
265 serde_json::to_string(&list).unwrap()
266 }
267 _ => panic!("Unexpected API request {:?} {:?} {:?}", method, uri, query),
268 };
269
270 send.send_response(Response::builder().body(Bytes::from(body).into()).unwrap());
271 }
272 _ = cancel.cancelled() => {
273 return;
274 }
275 }
276 }
277 }
278
279 async fn setup() -> (Api<Pod>, JoinHandle<()>, CancellationToken) {
280 let (mock_service, mut handle) = mock::pair::<Request<Body>, Response<Body>>();
281
282 let cancel = CancellationToken::new();
283 let cloned_token = cancel.clone();
284
285 let spawned = tokio::spawn(async move {
286 mock_list_sealed(cloned_token, &mut handle).await;
287 });
288
289 let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "vault-mgmt-e2e"));
290
291 (pods, spawned, cancel)
292 }
293
294 #[tokio::test]
295 async fn get_sealed_pods_returns_sealed_pods() {
296 let (api, service, cancel) = setup().await;
297
298 let pods = list_sealed_pods(&api).await.unwrap();
299
300 assert_eq!(pods.len(), 3);
301
302 cancel.cancel();
303
304 service.await.unwrap();
305 }
306
307 #[tokio::test]
308 async fn unseal_returns_err_without_keys() {
309 let mock_server = MockServer::start().await;
310 let mut client = HttpForwarderService::http(
311 tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
312 .await
313 .unwrap(),
314 )
315 .await
316 .unwrap();
317
318 let outcome = client.unseal(&[]).await;
319
320 assert!(outcome.is_err());
321 }
322
323 struct UnsealBodyMatcher(String);
324
325 impl wiremock::Match for UnsealBodyMatcher {
326 fn matches(&self, request: &wiremock::Request) -> bool {
327 let result: Result<serde_json::Value, _> = serde_json::from_slice(&request.body);
328 if let Ok(body) = result {
329 body.get("key").is_some()
330 && body.get("key").unwrap() == &self.0
331 && body.get("reset").is_some()
332 && body.get("migrate").is_some()
333 } else {
334 false
335 }
336 }
337 }
338
339 #[tokio::test]
340 async fn unseal_calls_api() {
341 let mock_server = MockServer::start().await;
342
343 for key in ["abc".to_string(), "def".to_string(), "ghi".to_string()] {
344 Mock::given(method(Method::PUT))
345 .and(path("/v1/sys/unseal"))
346 .and(header("X-Vault-Request", "true"))
347 .and(UnsealBodyMatcher(key))
348 .respond_with(ResponseTemplate::new(StatusCode::OK))
349 .expect(1)
350 .mount(&mock_server)
351 .await;
352 }
353
354 let mut client = HttpForwarderService::http(
355 tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
356 .await
357 .unwrap(),
358 )
359 .await
360 .unwrap();
361
362 let outcome = client
363 .unseal(&[
364 Secret::from_str("abc").unwrap(),
365 Secret::from_str("def").unwrap(),
366 Secret::from_str("ghi").unwrap(),
367 ])
368 .await;
369
370 assert!(outcome.is_ok());
371 }
372
373 async fn mock_get_unseal_keys() -> MockServer {
374 let mock_server = MockServer::start().await;
375
376 Mock::given(method(Method::GET))
377 .and(path("/v1/kv/data/test"))
378 .and(header("X-Vault-Request", "true"))
379 .and(header("X-Vault-Token", "token"))
380 .respond_with(
381 ResponseTemplate::new(StatusCode::OK).set_body_json(serde_json::json!({
382 "request_id": "abd3b7a3-581f-8add-1a6d-1d7cdb5b9c2b",
383 "lease_id": "",
384 "lease_duration": 0,
385 "renewable": false,
386 "data": {
387 "data": {
388 "keys": "abc\ndef\nghi"
389 },
390 "metadata": {
391 "created_time": "2023-06-09T13:59:44.750984296Z",
392 "custom_metadata": null,
393 "deletion_time": "",
394 "destroyed": false,
395 "version": 1
396 }
397 },
398 "warnings": null
399 })),
400 )
401 .expect(1)
402 .mount(&mock_server)
403 .await;
404
405 mock_server
406 }
407
408 #[tokio::test]
409 async fn retrieving_unseal_keys_works() {
410 let mock_server = mock_get_unseal_keys().await;
411
412 let uri = http::uri::Uri::builder()
413 .scheme(http::uri::Scheme::HTTP)
414 .authority(mock_server.uri().strip_prefix("http://").unwrap())
415 .path_and_query("/v1/kv/data/test")
416 .build()
417 .unwrap();
418
419 let mut client = HttpForwarderService::http(
420 tokio::net::TcpStream::connect(uri.authority().unwrap().as_str())
421 .await
422 .unwrap(),
423 )
424 .await
425 .unwrap();
426
427 let outcome = client
428 .get_unseal_keys(
429 uri.path_and_query().unwrap(),
430 Secret::new("token".to_string()),
431 )
432 .await;
433
434 assert!(outcome.is_ok());
435 }
436
437 #[tokio::test]
438 async fn retrieving_unseal_keys_works_externally() {
439 let mock_server = mock_get_unseal_keys().await;
440
441 let uri = http::uri::Uri::builder()
442 .scheme(http::uri::Scheme::HTTP)
443 .authority(mock_server.uri().strip_prefix("http://").unwrap())
444 .path_and_query("/v1/kv/data/test")
445 .build()
446 .unwrap();
447
448 dbg!(mock_server.uri());
449
450 let mut client = GetUnsealKeysFromVault::new(&uri).unwrap();
451
452 let outcome = client
453 .get_unseal_keys(
454 uri.path_and_query().unwrap(),
455 Secret::new("token".to_string()),
456 )
457 .await;
458
459 assert!(outcome.is_ok());
460 }
461}