vault_mgmt_lib/
status.rs

1use http_body_util::{BodyExt, Empty};
2use hyper::body::Bytes;
3use kube::runtime::wait::Condition;
4use secrecy::Secret;
5
6use crate::{raft_configuration_request, seal_status_request, BytesBody, HttpRequest};
7
8#[derive(Debug, serde::Serialize, serde::Deserialize)]
9pub struct PodSealStatus {
10    #[serde(rename = "type")]
11    pub type_: String,
12    pub initialized: bool,
13    pub sealed: bool,
14    pub t: u8,
15    pub n: u8,
16    pub progress: u8,
17    pub nonce: String,
18    pub version: String,
19    pub build_date: String,
20    pub migration: bool,
21    pub recovery_seal: bool,
22    pub storage_type: String,
23    pub ha_enabled: Option<bool>,
24    pub cluster_name: Option<String>,
25    pub cluster_id: Option<String>,
26    pub active_time: Option<String>,
27    pub leader_address: Option<String>,
28    pub leader_cluster_address: Option<String>,
29    pub raft_committed_index: Option<u64>,
30    pub raft_applied_index: Option<u64>,
31}
32
33/// Get vault pod's seal status
34#[async_trait::async_trait]
35pub trait GetSealStatus {
36    /// Get vault pod's seal status
37    async fn seal_status(&mut self) -> anyhow::Result<PodSealStatus>;
38
39    /// Wait for vault pod's seal status to match the provided condition
40    async fn await_seal_status(
41        &mut self,
42        cond: impl Condition<PodSealStatus> + Send,
43    ) -> Result<Option<PodSealStatus>, anyhow::Error>;
44}
45
46#[async_trait::async_trait]
47impl<T> GetSealStatus for T
48where
49    T: HttpRequest<BytesBody> + Send + Sync + 'static,
50{
51    async fn seal_status(&mut self) -> anyhow::Result<PodSealStatus> {
52        let http_req = seal_status_request(Empty::<Bytes>::new().boxed())?;
53
54        let (parts, body) = self.send_request(http_req).await?.into_parts();
55
56        let body = String::from_utf8(body.to_vec())?;
57
58        if parts.status != hyper::StatusCode::OK {
59            return Err(anyhow::anyhow!("getting seal status: {}", body));
60        }
61
62        Ok(serde_json::from_str(&body).map_err(|e| anyhow::anyhow!("{}: {}", e, body))?)
63    }
64
65    async fn await_seal_status(
66        &mut self,
67        cond: impl Condition<PodSealStatus> + Send,
68    ) -> Result<Option<PodSealStatus>, anyhow::Error> {
69        loop {
70            let status = self.seal_status().await?;
71            if cond.matches_object(Some(&status)) {
72                return Ok(Some(status));
73            }
74        }
75    }
76}
77
78#[must_use]
79pub fn is_seal_status_initialized() -> impl Condition<PodSealStatus> {
80    |obj: Option<&PodSealStatus>| {
81        if let Some(status) = obj {
82            return status.initialized;
83        }
84        false
85    }
86}
87
88#[must_use]
89pub fn is_seal_status_sealed() -> impl Condition<PodSealStatus> {
90    |obj: Option<&PodSealStatus>| {
91        if let Some(status) = obj {
92            return status.sealed;
93        }
94        false
95    }
96}
97
98#[derive(Debug, serde::Serialize, serde::Deserialize)]
99pub struct RaftConfiguration {
100    pub request_id: String,
101    pub lease_id: String,
102    pub renewable: bool,
103    pub lease_duration: u64,
104    pub data: RaftConfigurationData,
105    pub wrap_info: Option<serde_json::Value>,
106    pub warnings: Option<serde_json::Value>,
107    pub auth: Option<serde_json::Value>,
108}
109
110#[derive(Debug, serde::Serialize, serde::Deserialize)]
111pub struct RaftConfigurationData {
112    pub config: RaftConfigurationDataConfig,
113}
114
115#[derive(Debug, serde::Serialize, serde::Deserialize)]
116pub struct RaftConfigurationDataConfig {
117    pub servers: Vec<RaftConfigurationServer>,
118    pub index: u64,
119}
120
121#[derive(Debug, serde::Serialize, serde::Deserialize)]
122pub struct RaftConfigurationServer {
123    pub node_id: String,
124    pub address: String,
125    pub leader: bool,
126    pub protocol_version: String,
127    pub voter: bool,
128}
129
130/// Get vault pod's raft configuration
131#[async_trait::async_trait]
132pub trait GetRaftConfiguration {
133    /// Get vault pod's raft configuration
134    async fn raft_configuration(
135        &mut self,
136        token: Secret<String>,
137    ) -> anyhow::Result<RaftConfiguration>;
138
139    /// Wait for vault pod's raft configuration to match the provided condition
140    async fn await_raft_configuration(
141        &mut self,
142        token: Secret<String>,
143        cond: impl Condition<RaftConfiguration> + Send,
144    ) -> Result<Option<RaftConfiguration>, anyhow::Error>;
145}
146
147#[async_trait::async_trait]
148impl<T> GetRaftConfiguration for T
149where
150    T: HttpRequest<BytesBody> + Send + Sync + 'static,
151{
152    async fn raft_configuration(
153        &mut self,
154        token: Secret<String>,
155    ) -> anyhow::Result<RaftConfiguration> {
156        let http_req = raft_configuration_request(token, Empty::<Bytes>::new().boxed())?;
157
158        let (parts, body) = self.send_request(http_req).await?.into_parts();
159
160        let body = String::from_utf8(body.to_vec())?;
161
162        if parts.status != hyper::StatusCode::OK {
163            return Err(anyhow::anyhow!("getting raft configuration: {}", body));
164        }
165
166        Ok(serde_json::from_str(&body).map_err(|e| anyhow::anyhow!("{}: {}", e, body))?)
167    }
168
169    async fn await_raft_configuration(
170        &mut self,
171        token: Secret<String>,
172        cond: impl Condition<RaftConfiguration> + Send,
173    ) -> Result<Option<RaftConfiguration>, anyhow::Error> {
174        loop {
175            let config = self.raft_configuration(token.clone()).await?;
176            if cond.matches_object(Some(&config)) {
177                return Ok(Some(config));
178            }
179        }
180    }
181}
182
183#[must_use]
184pub fn raft_configuration_any_leader() -> impl Condition<RaftConfiguration> {
185    |obj: Option<&RaftConfiguration>| {
186        if let Some(config) = obj {
187            return config.data.config.servers.iter().any(|s| s.leader);
188        }
189        false
190    }
191}
192
193#[must_use]
194pub fn raft_configuration_all_voters() -> impl Condition<RaftConfiguration> {
195    |obj: Option<&RaftConfiguration>| {
196        if let Some(config) = obj {
197            return config.data.config.servers.iter().all(|s| s.voter);
198        }
199        false
200    }
201}
202
203#[cfg(test)]
204mod tests {
205    use std::str::FromStr;
206
207    use secrecy::Secret;
208    use wiremock::{
209        matchers::{header, method, path},
210        Mock, MockServer, ResponseTemplate,
211    };
212
213    use crate::{
214        is_seal_status_initialized, raft_configuration_all_voters, raft_configuration_any_leader,
215        GetRaftConfiguration, GetSealStatus, HttpForwarderService, RaftConfiguration,
216    };
217
218    fn minimal_seal_status() -> serde_json::Value {
219        serde_json::json!({
220            "type": "shamir",
221            "initialized": false,
222            "sealed": true,
223            "t": 2,
224            "n": 3,
225            "progress": 0,
226            "nonce": "",
227            "version": "1.13.0",
228            "build_date": "2023-03-01T14:58:13Z",
229            "migration": false,
230            "recovery_seal": false,
231            "storage_type": "raft"
232        })
233    }
234
235    fn uninitialized_seal_status() -> serde_json::Value {
236        serde_json::json!({
237            "type": "shamir",
238            "initialized": false,
239            "sealed": true,
240            "t": 2,
241            "n": 3,
242            "progress": 0,
243            "nonce": "",
244            "version": "1.13.0",
245            "build_date": "2023-03-01T14:58:13Z",
246            "migration": false,
247            "recovery_seal": false,
248            "storage_type": "raft",
249            "ha_enabled": true,
250            "active_time": "0001-01-01T00:00:00Z"
251        })
252    }
253
254    fn initialized_seal_status() -> serde_json::Value {
255        serde_json::json!({
256            "type": "shamir",
257            "initialized": true,
258            "sealed": false,
259            "t": 2,
260            "n": 3,
261            "progress": 0,
262            "nonce": "",
263            "version": "1.13.0",
264            "build_date": "2023-03-01T14:58:13Z",
265            "migration": false,
266            "cluster_name": "vault-cluster-211d673a",
267            "cluster_id": "b7b7f5e2-803a-2484-df4a-870c6b15f22f",
268            "recovery_seal": false,
269            "storage_type": "raft",
270            "ha_enabled": true,
271            "active_time": "0001-01-01T00:00:00Z",
272            "leader_address": "http://10.42.2.25:8200",
273            "leader_cluster_address": "https://vault-0.vault-internal:8201",
274            "raft_committed_index": 40,
275            "raft_applied_index": 40
276        })
277    }
278
279    async fn mock(response: serde_json::Value) -> MockServer {
280        let mock_server = MockServer::start().await;
281
282        Mock::given(method(http::Method::GET))
283            .and(path("/v1/sys/seal-status"))
284            .and(header("X-Vault-Request", "true"))
285            .respond_with(ResponseTemplate::new(200).set_body_json(response))
286            .expect(1)
287            .mount(&mock_server)
288            .await;
289
290        mock_server
291    }
292
293    #[tokio::test]
294    async fn getting_seal_status_works_if_minimal() {
295        let mock_server = mock(minimal_seal_status()).await;
296
297        let mut client = HttpForwarderService::http(
298            tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
299                .await
300                .unwrap(),
301        )
302        .await
303        .unwrap();
304
305        let status = client.seal_status().await.unwrap();
306
307        assert_eq!(status.type_, "shamir");
308        assert!(!status.initialized);
309        assert!(status.sealed);
310        assert_eq!(status.t, 2);
311        assert_eq!(status.n, 3);
312        assert_eq!(status.progress, 0);
313        assert_eq!(status.nonce, "");
314        assert_eq!(status.version, "1.13.0");
315        assert_eq!(status.build_date, "2023-03-01T14:58:13Z");
316        assert!(!status.migration);
317        assert!(!status.recovery_seal);
318        assert_eq!(status.storage_type, "raft");
319        assert_eq!(status.ha_enabled, None);
320        assert_eq!(status.cluster_name, None);
321        assert_eq!(status.cluster_id, None);
322        assert_eq!(status.active_time, None);
323        assert_eq!(status.leader_address, None);
324        assert_eq!(status.leader_cluster_address, None);
325        assert_eq!(status.raft_committed_index, None);
326        assert_eq!(status.raft_applied_index, None);
327    }
328
329    #[tokio::test]
330    async fn getting_seal_status_works_if_uninitialized() {
331        let mock_server = mock(uninitialized_seal_status()).await;
332
333        let mut client = HttpForwarderService::http(
334            tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
335                .await
336                .unwrap(),
337        )
338        .await
339        .unwrap();
340
341        let status = client.seal_status().await.unwrap();
342
343        assert_eq!(status.type_, "shamir");
344        assert!(!status.initialized);
345        assert!(status.sealed);
346        assert_eq!(status.t, 2);
347        assert_eq!(status.n, 3);
348        assert_eq!(status.progress, 0);
349        assert_eq!(status.nonce, "");
350        assert_eq!(status.version, "1.13.0");
351        assert_eq!(status.build_date, "2023-03-01T14:58:13Z");
352        assert!(!status.migration);
353        assert!(!status.recovery_seal);
354        assert_eq!(status.storage_type, "raft");
355        assert!(status.ha_enabled.unwrap());
356        assert_eq!(status.cluster_name, None);
357        assert_eq!(status.cluster_id, None);
358        assert_eq!(status.active_time.unwrap(), "0001-01-01T00:00:00Z");
359        assert_eq!(status.leader_address, None);
360        assert_eq!(status.leader_cluster_address, None);
361        assert_eq!(status.raft_committed_index, None);
362        assert_eq!(status.raft_applied_index, None);
363    }
364
365    #[tokio::test]
366    async fn getting_seal_status_works_if_initialized() {
367        let mock_server = mock(initialized_seal_status()).await;
368
369        let mut client = HttpForwarderService::http(
370            tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
371                .await
372                .unwrap(),
373        )
374        .await
375        .unwrap();
376
377        let status = client.seal_status().await.unwrap();
378
379        assert_eq!(status.type_, "shamir");
380        assert!(status.initialized);
381        assert!(!status.sealed);
382        assert_eq!(status.t, 2);
383        assert_eq!(status.n, 3);
384        assert_eq!(status.progress, 0);
385        assert_eq!(status.nonce, "");
386        assert_eq!(status.version, "1.13.0");
387        assert_eq!(status.build_date, "2023-03-01T14:58:13Z");
388        assert!(!status.migration);
389        assert!(!status.recovery_seal);
390        assert_eq!(status.storage_type, "raft");
391        assert!(status.ha_enabled.unwrap());
392        assert_eq!(status.cluster_name.unwrap(), "vault-cluster-211d673a");
393        assert_eq!(
394            status.cluster_id.unwrap(),
395            "b7b7f5e2-803a-2484-df4a-870c6b15f22f"
396        );
397        assert_eq!(status.active_time.unwrap(), "0001-01-01T00:00:00Z");
398        assert_eq!(status.leader_address.unwrap(), "http://10.42.2.25:8200");
399        assert_eq!(
400            status.leader_cluster_address.unwrap(),
401            "https://vault-0.vault-internal:8201"
402        );
403        assert_eq!(status.raft_committed_index.unwrap(), 40);
404        assert_eq!(status.raft_applied_index.unwrap(), 40);
405    }
406
407    #[tokio::test]
408    async fn waiting_for_seal_status_works() {
409        let mock_server = mock(initialized_seal_status()).await;
410
411        let mut client = HttpForwarderService::http(
412            tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
413                .await
414                .unwrap(),
415        )
416        .await
417        .unwrap();
418
419        let status = client
420            .await_seal_status(is_seal_status_initialized())
421            .await
422            .unwrap()
423            .unwrap();
424
425        assert!(status.initialized);
426    }
427
428    fn raft_configuration() -> serde_json::Value {
429        serde_json::json!({
430            "request_id": "7f6fc909-bb7f-e48c-d850-0ad8a22cb434",
431            "lease_id": "",
432            "renewable": false,
433            "lease_duration": 0,
434            "data": {
435                "config": {
436                    "servers": [
437                        {
438                            "node_id": "147c957f-5718-07b6-424e-5522efcfbc9e",
439                            "address": "vault-0.vault-internal:8201",
440                            "leader": true,
441                            "protocol_version": "3",
442                            "voter": true
443                        },
444                        {
445                            "node_id": "04ffa935-e1c2-e891-a9e9-426bf1a6c93d",
446                            "address": "vault-1.vault-internal:8201",
447                            "leader": false,
448                            "protocol_version": "3",
449                            "voter": true
450                        },
451                        {
452                            "node_id": "124bef00-64ec-59de-1366-7050edfb5c49",
453                            "address": "vault-2.vault-internal:8201",
454                            "leader": false,
455                            "protocol_version": "3",
456                            "voter": true
457                        }
458                    ],
459                    "index": 0
460                }
461            },
462            "wrap_info": null,
463            "warnings": null,
464            "auth": null
465        })
466    }
467
468    fn raft_configuration_no_leader() -> serde_json::Value {
469        let mut rc = serde_json::from_value::<RaftConfiguration>(raft_configuration()).unwrap();
470        rc.data.config.servers[0].leader = false;
471        serde_json::to_value(rc).unwrap()
472    }
473
474    fn raft_configuration_single_non_voter() -> serde_json::Value {
475        let mut rc = serde_json::from_value::<RaftConfiguration>(raft_configuration()).unwrap();
476        rc.data.config.servers[2].voter = false;
477        serde_json::to_value(rc).unwrap()
478    }
479
480    fn raft_configuration_no_voter() -> serde_json::Value {
481        let mut rc = serde_json::from_value::<RaftConfiguration>(raft_configuration()).unwrap();
482        rc.data.config.servers[0].leader = false;
483        rc.data
484            .config
485            .servers
486            .iter_mut()
487            .for_each(|s| s.voter = false);
488        serde_json::to_value(rc).unwrap()
489    }
490
491    async fn mock_raft_configuration(response: &[serde_json::Value]) -> MockServer {
492        let mock_server = MockServer::start().await;
493
494        for r in response {
495            Mock::given(method(http::Method::GET))
496                .and(path("/v1/sys/storage/raft/configuration"))
497                .and(header("X-Vault-Request", "true"))
498                .and(header("X-Vault-Token", "abc"))
499                .respond_with(ResponseTemplate::new(200).set_body_json(r))
500                .up_to_n_times(1)
501                .expect(1)
502                .mount(&mock_server)
503                .await;
504        }
505
506        mock_server
507    }
508
509    #[tokio::test]
510    async fn getting_raft_configuration_works() {
511        let mock_server = mock_raft_configuration(&[raft_configuration()]).await;
512
513        let mut client = HttpForwarderService::http(
514            tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
515                .await
516                .unwrap(),
517        )
518        .await
519        .unwrap();
520
521        let config = client
522            .raft_configuration(Secret::from_str("abc").unwrap())
523            .await
524            .unwrap();
525
526        assert_eq!(config.request_id, "7f6fc909-bb7f-e48c-d850-0ad8a22cb434");
527        assert_eq!(config.lease_id, "");
528        assert!(!config.renewable);
529        assert_eq!(config.lease_duration, 0);
530        assert_eq!(config.data.config.index, 0);
531        assert_eq!(config.data.config.servers.len(), 3);
532        assert_eq!(
533            config.data.config.servers[0].node_id,
534            "147c957f-5718-07b6-424e-5522efcfbc9e"
535        );
536        assert_eq!(
537            config.data.config.servers[0].address,
538            "vault-0.vault-internal:8201"
539        );
540        assert!(config.data.config.servers[0].leader);
541        assert_eq!(config.data.config.servers[0].protocol_version, "3");
542        assert!(config.data.config.servers[0].voter);
543        assert_eq!(
544            config.data.config.servers[1].node_id,
545            "04ffa935-e1c2-e891-a9e9-426bf1a6c93d"
546        );
547        assert_eq!(
548            config.data.config.servers[1].address,
549            "vault-1.vault-internal:8201"
550        );
551        assert!(!config.data.config.servers[1].leader);
552        assert_eq!(config.data.config.servers[1].protocol_version, "3");
553        assert!(config.data.config.servers[1].voter);
554        assert_eq!(
555            config.data.config.servers[2].node_id,
556            "124bef00-64ec-59de-1366-7050edfb5c49"
557        );
558        assert_eq!(
559            config.data.config.servers[2].address,
560            "vault-2.vault-internal:8201"
561        );
562        assert!(!config.data.config.servers[2].leader);
563        assert_eq!(config.data.config.servers[2].protocol_version, "3");
564        assert!(config.data.config.servers[2].voter);
565
566        assert_eq!(config.wrap_info, None);
567        assert_eq!(config.warnings, None);
568        assert_eq!(config.auth, None);
569    }
570
571    #[tokio::test]
572    async fn waiting_for_raft_configuration_works() {
573        let mock_server = mock_raft_configuration(&[raft_configuration()]).await;
574
575        let mut client = HttpForwarderService::http(
576            tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
577                .await
578                .unwrap(),
579        )
580        .await
581        .unwrap();
582
583        let config = client
584            .await_raft_configuration(
585                Secret::from_str("abc").unwrap(),
586                raft_configuration_any_leader(),
587            )
588            .await
589            .unwrap()
590            .unwrap();
591
592        assert!(config.data.config.servers[0].leader);
593    }
594
595    #[tokio::test]
596    async fn waiting_for_raft_configuration_having_leader_works() {
597        let mock_server = mock_raft_configuration(&[
598            raft_configuration_no_leader(),
599            raft_configuration_no_leader(),
600            raft_configuration(),
601        ])
602        .await;
603
604        let mut client = HttpForwarderService::http(
605            tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
606                .await
607                .unwrap(),
608        )
609        .await
610        .unwrap();
611
612        let config = client
613            .await_raft_configuration(
614                Secret::from_str("abc").unwrap(),
615                raft_configuration_any_leader(),
616            )
617            .await
618            .unwrap()
619            .unwrap();
620
621        assert!(config.data.config.servers[0].leader);
622    }
623
624    #[tokio::test]
625    async fn waiting_for_raft_configuration_having_all_voters_works() {
626        let mock_server = mock_raft_configuration(&[
627            raft_configuration_no_voter(),
628            raft_configuration_single_non_voter(),
629            raft_configuration(),
630        ])
631        .await;
632
633        let mut client = HttpForwarderService::http(
634            tokio::net::TcpStream::connect(mock_server.uri().strip_prefix("http://").unwrap())
635                .await
636                .unwrap(),
637        )
638        .await
639        .unwrap();
640
641        let config = client
642            .await_raft_configuration(
643                Secret::from_str("abc").unwrap(),
644                raft_configuration_all_voters(),
645            )
646            .await
647            .unwrap()
648            .unwrap();
649
650        assert!(config.data.config.servers.iter().all(|s| s.voter));
651    }
652}