1use http_body_util::{BodyExt, Empty};
2use hyper::body::Bytes;
3use kube::runtime::wait::Condition;
4use secrecy::Secret;
5
6use crate::{raft_configuration_request, seal_status_request, BytesBody, HttpRequest};
7
8#[derive(Debug, serde::Serialize, serde::Deserialize)]
9pub struct PodSealStatus {
10 #[serde(rename = "type")]
11 pub type_: String,
12 pub initialized: bool,
13 pub sealed: bool,
14 pub t: u8,
15 pub n: u8,
16 pub progress: u8,
17 pub nonce: String,
18 pub version: String,
19 pub build_date: String,
20 pub migration: bool,
21 pub recovery_seal: bool,
22 pub storage_type: String,
23 pub ha_enabled: Option<bool>,
24 pub cluster_name: Option<String>,
25 pub cluster_id: Option<String>,
26 pub active_time: Option<String>,
27 pub leader_address: Option<String>,
28 pub leader_cluster_address: Option<String>,
29 pub raft_committed_index: Option<u64>,
30 pub raft_applied_index: Option<u64>,
31}
32
33#[async_trait::async_trait]
35pub trait GetSealStatus {
36 async fn seal_status(&mut self) -> anyhow::Result<PodSealStatus>;
38
39 async fn await_seal_status(
41 &mut self,
42 cond: impl Condition<PodSealStatus> + Send,
43 ) -> Result<Option<PodSealStatus>, anyhow::Error>;
44}
45
46#[async_trait::async_trait]
47impl<T> GetSealStatus for T
48where
49 T: HttpRequest<BytesBody> + Send + Sync + 'static,
50{
51 async fn seal_status(&mut self) -> anyhow::Result<PodSealStatus> {
52 let http_req = seal_status_request(Empty::<Bytes>::new().boxed())?;
53
54 let (parts, body) = self.send_request(http_req).await?.into_parts();
55
56 let body = String::from_utf8(body.to_vec())?;
57
58 if parts.status != hyper::StatusCode::OK {
59 return Err(anyhow::anyhow!("getting seal status: {}", body));
60 }
61
62 Ok(serde_json::from_str(&body).map_err(|e| anyhow::anyhow!("{}: {}", e, body))?)
63 }
64
65 async fn await_seal_status(
66 &mut self,
67 cond: impl Condition<PodSealStatus> + Send,
68 ) -> Result<Option<PodSealStatus>, anyhow::Error> {
69 loop {
70 let status = self.seal_status().await?;
71 if cond.matches_object(Some(&status)) {
72 return Ok(Some(status));
73 }
74 }
75 }
76}
77
78#[must_use]
79pub fn is_seal_status_initialized() -> impl Condition<PodSealStatus> {
80 |obj: Option<&PodSealStatus>| {
81 if let Some(status) = obj {
82 return status.initialized;
83 }
84 false
85 }
86}
87
88#[must_use]
89pub fn is_seal_status_sealed() -> impl Condition<PodSealStatus> {
90 |obj: Option<&PodSealStatus>| {
91 if let Some(status) = obj {
92 return status.sealed;
93 }
94 false
95 }
96}
97
98#[derive(Debug, serde::Serialize, serde::Deserialize)]
99pub struct RaftConfiguration {
100 pub request_id: String,
101 pub lease_id: String,
102 pub renewable: bool,
103 pub lease_duration: u64,
104 pub data: RaftConfigurationData,
105 pub wrap_info: Option<serde_json::Value>,
106 pub warnings: Option<serde_json::Value>,
107 pub auth: Option<serde_json::Value>,
108}
109
110#[derive(Debug, serde::Serialize, serde::Deserialize)]
111pub struct RaftConfigurationData {
112 pub config: RaftConfigurationDataConfig,
113}
114
115#[derive(Debug, serde::Serialize, serde::Deserialize)]
116pub struct RaftConfigurationDataConfig {
117 pub servers: Vec<RaftConfigurationServer>,
118 pub index: u64,
119}
120
121#[derive(Debug, serde::Serialize, serde::Deserialize)]
122pub struct RaftConfigurationServer {
123 pub node_id: String,
124 pub address: String,
125 pub leader: bool,
126 pub protocol_version: String,
127 pub voter: bool,
128}
129
130#[async_trait::async_trait]
132pub trait GetRaftConfiguration {
133 async fn raft_configuration(
135 &mut self,
136 token: Secret<String>,
137 ) -> anyhow::Result<RaftConfiguration>;
138
139 async fn await_raft_configuration(
141 &mut self,
142 token: Secret<String>,
143 cond: impl Condition<RaftConfiguration> + Send,
144 ) -> Result<Option<RaftConfiguration>, anyhow::Error>;
145}
146
147#[async_trait::async_trait]
148impl<T> GetRaftConfiguration for T
149where
150 T: HttpRequest<BytesBody> + Send + Sync + 'static,
151{
152 async fn raft_configuration(
153 &mut self,
154 token: Secret<String>,
155 ) -> anyhow::Result<RaftConfiguration> {
156 let http_req = raft_configuration_request(token, Empty::<Bytes>::new().boxed())?;
157
158 let (parts, body) = self.send_request(http_req).await?.into_parts();
159
160 let body = String::from_utf8(body.to_vec())?;
161
162 if parts.status != hyper::StatusCode::OK {
163 return Err(anyhow::anyhow!("getting raft configuration: {}", body));
164 }
165
166 Ok(serde_json::from_str(&body).map_err(|e| anyhow::anyhow!("{}: {}", e, body))?)
167 }
168
169 async fn await_raft_configuration(
170 &mut self,
171 token: Secret<String>,
172 cond: impl Condition<RaftConfiguration> + Send,
173 ) -> Result<Option<RaftConfiguration>, anyhow::Error> {
174 loop {
175 let config = self.raft_configuration(token.clone()).await?;
176 if cond.matches_object(Some(&config)) {
177 return Ok(Some(config));
178 }
179 }
180 }
181}
182
183#[must_use]
184pub fn raft_configuration_any_leader() -> impl Condition<RaftConfiguration> {
185 |obj: Option<&RaftConfiguration>| {
186 if let Some(config) = obj {
187 return config.data.config.servers.iter().any(|s| s.leader);
188 }
189 false
190 }
191}
192
193#[must_use]
194pub fn raft_configuration_all_voters() -> impl Condition<RaftConfiguration> {
195 |obj: Option<&RaftConfiguration>| {
196 if let Some(config) = obj {
197 return config.data.config.servers.iter().all(|s| s.voter);
198 }
199 false
200 }
201}
202
203#[cfg(test)]
204mod tests {
205 use std::str::FromStr;
206
207 use secrecy::Secret;
208 use wiremock::{
209 matchers::{header, method, path},
210 Mock, MockServer, ResponseTemplate,
211 };
212
213 use crate::{
214 is_seal_status_initialized, raft_configuration_all_voters, raft_configuration_any_leader,
215 GetRaftConfiguration, GetSealStatus, HttpForwarderService, RaftConfiguration,
216 };
217
218 fn minimal_seal_status() -> serde_json::Value {
219 serde_json::json!({
220 "type": "shamir",
221 "initialized": false,
222 "sealed": true,
223 "t": 2,
224 "n": 3,
225 "progress": 0,
226 "nonce": "",
227 "version": "1.13.0",
228 "build_date": "2023-03-01T14:58:13Z",
229 "migration": false,
230 "recovery_seal": false,
231 "storage_type": "raft"
232 })
233 }
234
235 fn uninitialized_seal_status() -> serde_json::Value {
236 serde_json::json!({
237 "type": "shamir",
238 "initialized": false,
239 "sealed": true,
240 "t": 2,
241 "n": 3,
242 "progress": 0,
243 "nonce": "",
244 "version": "1.13.0",
245 "build_date": "2023-03-01T14:58:13Z",
246 "migration": false,
247 "recovery_seal": false,
248 "storage_type": "raft",
249 "ha_enabled": true,
250 "active_time": "0001-01-01T00:00:00Z"
251 })
252 }
253
254 fn initialized_seal_status() -> serde_json::Value {
255 serde_json::json!({
256 "type": "shamir",
257 "initialized": true,
258 "sealed": false,
259 "t": 2,
260 "n": 3,
261 "progress": 0,
262 "nonce": "",
263 "version": "1.13.0",
264 "build_date": "2023-03-01T14:58:13Z",
265 "migration": false,
266 "cluster_name": "vault-cluster-211d673a",
267 "cluster_id": "b7b7f5e2-803a-2484-df4a-870c6b15f22f",
268 "recovery_seal": false,
269 "storage_type": "raft",
270 "ha_enabled": true,
271 "active_time": "0001-01-01T00:00:00Z",
272 "leader_address": "http://10.42.2.25:8200",
273 "leader_cluster_address": "https://vault-0.vault-internal:8201",
274 "raft_committed_index": 40,
275 "raft_applied_index": 40
276 })
277 }
278
279 async fn mock(response: serde_json::Value) -> MockServer {
280 let mock_server = MockServer::start().await;
281
282 Mock::given(method(http::Method::GET))
283 .and(path("/v1/sys/seal-status"))
284 .and(header("X-Vault-Request", "true"))
285 .respond_with(ResponseTemplate::new(200).set_body_json(response))
286 .expect(1)
287 .mount(&mock_server)
288 .await;
289
290 mock_server
291 }
292
293 #[tokio::test]
294 async fn getting_seal_status_works_if_minimal() {
295 let mock_server = mock(minimal_seal_status()).await;
296
297 let mut client = HttpForwarderService::http(
298 tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
299 .await
300 .unwrap(),
301 )
302 .await
303 .unwrap();
304
305 let status = client.seal_status().await.unwrap();
306
307 assert_eq!(status.type_, "shamir");
308 assert!(!status.initialized);
309 assert!(status.sealed);
310 assert_eq!(status.t, 2);
311 assert_eq!(status.n, 3);
312 assert_eq!(status.progress, 0);
313 assert_eq!(status.nonce, "");
314 assert_eq!(status.version, "1.13.0");
315 assert_eq!(status.build_date, "2023-03-01T14:58:13Z");
316 assert!(!status.migration);
317 assert!(!status.recovery_seal);
318 assert_eq!(status.storage_type, "raft");
319 assert_eq!(status.ha_enabled, None);
320 assert_eq!(status.cluster_name, None);
321 assert_eq!(status.cluster_id, None);
322 assert_eq!(status.active_time, None);
323 assert_eq!(status.leader_address, None);
324 assert_eq!(status.leader_cluster_address, None);
325 assert_eq!(status.raft_committed_index, None);
326 assert_eq!(status.raft_applied_index, None);
327 }
328
329 #[tokio::test]
330 async fn getting_seal_status_works_if_uninitialized() {
331 let mock_server = mock(uninitialized_seal_status()).await;
332
333 let mut client = HttpForwarderService::http(
334 tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
335 .await
336 .unwrap(),
337 )
338 .await
339 .unwrap();
340
341 let status = client.seal_status().await.unwrap();
342
343 assert_eq!(status.type_, "shamir");
344 assert!(!status.initialized);
345 assert!(status.sealed);
346 assert_eq!(status.t, 2);
347 assert_eq!(status.n, 3);
348 assert_eq!(status.progress, 0);
349 assert_eq!(status.nonce, "");
350 assert_eq!(status.version, "1.13.0");
351 assert_eq!(status.build_date, "2023-03-01T14:58:13Z");
352 assert!(!status.migration);
353 assert!(!status.recovery_seal);
354 assert_eq!(status.storage_type, "raft");
355 assert!(status.ha_enabled.unwrap());
356 assert_eq!(status.cluster_name, None);
357 assert_eq!(status.cluster_id, None);
358 assert_eq!(status.active_time.unwrap(), "0001-01-01T00:00:00Z");
359 assert_eq!(status.leader_address, None);
360 assert_eq!(status.leader_cluster_address, None);
361 assert_eq!(status.raft_committed_index, None);
362 assert_eq!(status.raft_applied_index, None);
363 }
364
365 #[tokio::test]
366 async fn getting_seal_status_works_if_initialized() {
367 let mock_server = mock(initialized_seal_status()).await;
368
369 let mut client = HttpForwarderService::http(
370 tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
371 .await
372 .unwrap(),
373 )
374 .await
375 .unwrap();
376
377 let status = client.seal_status().await.unwrap();
378
379 assert_eq!(status.type_, "shamir");
380 assert!(status.initialized);
381 assert!(!status.sealed);
382 assert_eq!(status.t, 2);
383 assert_eq!(status.n, 3);
384 assert_eq!(status.progress, 0);
385 assert_eq!(status.nonce, "");
386 assert_eq!(status.version, "1.13.0");
387 assert_eq!(status.build_date, "2023-03-01T14:58:13Z");
388 assert!(!status.migration);
389 assert!(!status.recovery_seal);
390 assert_eq!(status.storage_type, "raft");
391 assert!(status.ha_enabled.unwrap());
392 assert_eq!(status.cluster_name.unwrap(), "vault-cluster-211d673a");
393 assert_eq!(
394 status.cluster_id.unwrap(),
395 "b7b7f5e2-803a-2484-df4a-870c6b15f22f"
396 );
397 assert_eq!(status.active_time.unwrap(), "0001-01-01T00:00:00Z");
398 assert_eq!(status.leader_address.unwrap(), "http://10.42.2.25:8200");
399 assert_eq!(
400 status.leader_cluster_address.unwrap(),
401 "https://vault-0.vault-internal:8201"
402 );
403 assert_eq!(status.raft_committed_index.unwrap(), 40);
404 assert_eq!(status.raft_applied_index.unwrap(), 40);
405 }
406
407 #[tokio::test]
408 async fn waiting_for_seal_status_works() {
409 let mock_server = mock(initialized_seal_status()).await;
410
411 let mut client = HttpForwarderService::http(
412 tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
413 .await
414 .unwrap(),
415 )
416 .await
417 .unwrap();
418
419 let status = client
420 .await_seal_status(is_seal_status_initialized())
421 .await
422 .unwrap()
423 .unwrap();
424
425 assert!(status.initialized);
426 }
427
428 fn raft_configuration() -> serde_json::Value {
429 serde_json::json!({
430 "request_id": "7f6fc909-bb7f-e48c-d850-0ad8a22cb434",
431 "lease_id": "",
432 "renewable": false,
433 "lease_duration": 0,
434 "data": {
435 "config": {
436 "servers": [
437 {
438 "node_id": "147c957f-5718-07b6-424e-5522efcfbc9e",
439 "address": "vault-0.vault-internal:8201",
440 "leader": true,
441 "protocol_version": "3",
442 "voter": true
443 },
444 {
445 "node_id": "04ffa935-e1c2-e891-a9e9-426bf1a6c93d",
446 "address": "vault-1.vault-internal:8201",
447 "leader": false,
448 "protocol_version": "3",
449 "voter": true
450 },
451 {
452 "node_id": "124bef00-64ec-59de-1366-7050edfb5c49",
453 "address": "vault-2.vault-internal:8201",
454 "leader": false,
455 "protocol_version": "3",
456 "voter": true
457 }
458 ],
459 "index": 0
460 }
461 },
462 "wrap_info": null,
463 "warnings": null,
464 "auth": null
465 })
466 }
467
468 fn raft_configuration_no_leader() -> serde_json::Value {
469 let mut rc = serde_json::from_value::<RaftConfiguration>(raft_configuration()).unwrap();
470 rc.data.config.servers[0].leader = false;
471 serde_json::to_value(rc).unwrap()
472 }
473
474 fn raft_configuration_single_non_voter() -> serde_json::Value {
475 let mut rc = serde_json::from_value::<RaftConfiguration>(raft_configuration()).unwrap();
476 rc.data.config.servers[2].voter = false;
477 serde_json::to_value(rc).unwrap()
478 }
479
480 fn raft_configuration_no_voter() -> serde_json::Value {
481 let mut rc = serde_json::from_value::<RaftConfiguration>(raft_configuration()).unwrap();
482 rc.data.config.servers[0].leader = false;
483 rc.data
484 .config
485 .servers
486 .iter_mut()
487 .for_each(|s| s.voter = false);
488 serde_json::to_value(rc).unwrap()
489 }
490
491 async fn mock_raft_configuration(response: &[serde_json::Value]) -> MockServer {
492 let mock_server = MockServer::start().await;
493
494 for r in response {
495 Mock::given(method(http::Method::GET))
496 .and(path("/v1/sys/storage/raft/configuration"))
497 .and(header("X-Vault-Request", "true"))
498 .and(header("X-Vault-Token", "abc"))
499 .respond_with(ResponseTemplate::new(200).set_body_json(r))
500 .up_to_n_times(1)
501 .expect(1)
502 .mount(&mock_server)
503 .await;
504 }
505
506 mock_server
507 }
508
509 #[tokio::test]
510 async fn getting_raft_configuration_works() {
511 let mock_server = mock_raft_configuration(&[raft_configuration()]).await;
512
513 let mut client = HttpForwarderService::http(
514 tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
515 .await
516 .unwrap(),
517 )
518 .await
519 .unwrap();
520
521 let config = client
522 .raft_configuration(Secret::from_str("abc").unwrap())
523 .await
524 .unwrap();
525
526 assert_eq!(config.request_id, "7f6fc909-bb7f-e48c-d850-0ad8a22cb434");
527 assert_eq!(config.lease_id, "");
528 assert!(!config.renewable);
529 assert_eq!(config.lease_duration, 0);
530 assert_eq!(config.data.config.index, 0);
531 assert_eq!(config.data.config.servers.len(), 3);
532 assert_eq!(
533 config.data.config.servers[0].node_id,
534 "147c957f-5718-07b6-424e-5522efcfbc9e"
535 );
536 assert_eq!(
537 config.data.config.servers[0].address,
538 "vault-0.vault-internal:8201"
539 );
540 assert!(config.data.config.servers[0].leader);
541 assert_eq!(config.data.config.servers[0].protocol_version, "3");
542 assert!(config.data.config.servers[0].voter);
543 assert_eq!(
544 config.data.config.servers[1].node_id,
545 "04ffa935-e1c2-e891-a9e9-426bf1a6c93d"
546 );
547 assert_eq!(
548 config.data.config.servers[1].address,
549 "vault-1.vault-internal:8201"
550 );
551 assert!(!config.data.config.servers[1].leader);
552 assert_eq!(config.data.config.servers[1].protocol_version, "3");
553 assert!(config.data.config.servers[1].voter);
554 assert_eq!(
555 config.data.config.servers[2].node_id,
556 "124bef00-64ec-59de-1366-7050edfb5c49"
557 );
558 assert_eq!(
559 config.data.config.servers[2].address,
560 "vault-2.vault-internal:8201"
561 );
562 assert!(!config.data.config.servers[2].leader);
563 assert_eq!(config.data.config.servers[2].protocol_version, "3");
564 assert!(config.data.config.servers[2].voter);
565
566 assert_eq!(config.wrap_info, None);
567 assert_eq!(config.warnings, None);
568 assert_eq!(config.auth, None);
569 }
570
571 #[tokio::test]
572 async fn waiting_for_raft_configuration_works() {
573 let mock_server = mock_raft_configuration(&[raft_configuration()]).await;
574
575 let mut client = HttpForwarderService::http(
576 tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
577 .await
578 .unwrap(),
579 )
580 .await
581 .unwrap();
582
583 let config = client
584 .await_raft_configuration(
585 Secret::from_str("abc").unwrap(),
586 raft_configuration_any_leader(),
587 )
588 .await
589 .unwrap()
590 .unwrap();
591
592 assert!(config.data.config.servers[0].leader);
593 }
594
595 #[tokio::test]
596 async fn waiting_for_raft_configuration_having_leader_works() {
597 let mock_server = mock_raft_configuration(&[
598 raft_configuration_no_leader(),
599 raft_configuration_no_leader(),
600 raft_configuration(),
601 ])
602 .await;
603
604 let mut client = HttpForwarderService::http(
605 tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
606 .await
607 .unwrap(),
608 )
609 .await
610 .unwrap();
611
612 let config = client
613 .await_raft_configuration(
614 Secret::from_str("abc").unwrap(),
615 raft_configuration_any_leader(),
616 )
617 .await
618 .unwrap()
619 .unwrap();
620
621 assert!(config.data.config.servers[0].leader);
622 }
623
624 #[tokio::test]
625 async fn waiting_for_raft_configuration_having_all_voters_works() {
626 let mock_server = mock_raft_configuration(&[
627 raft_configuration_no_voter(),
628 raft_configuration_single_non_voter(),
629 raft_configuration(),
630 ])
631 .await;
632
633 let mut client = HttpForwarderService::http(
634 tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
635 .await
636 .unwrap(),
637 )
638 .await
639 .unwrap();
640
641 let config = client
642 .await_raft_configuration(
643 Secret::from_str("abc").unwrap(),
644 raft_configuration_all_voters(),
645 )
646 .await
647 .unwrap()
648 .unwrap();
649
650 assert!(config.data.config.servers.iter().all(|s| s.voter));
651 }
652}