vault_mgmt_lib/
upgrade.rs

1use k8s_openapi::api::{apps::v1::StatefulSet, core::v1::Pod};
2use kube::{api::DeleteParams, runtime::wait::conditions::is_pod_running};
3use secrecy::Secret;
4use tokio_retry::{
5    strategy::{jitter, ExponentialBackoff},
6    Retry,
7};
8use tracing::*;
9
10use crate::{
11    is_active, is_pod_exporting_seal_status, ExecIn, StepDown, Unseal, VaultVersion, VAULT_PORT,
12    {is_pod_ready, is_pod_standby, is_pod_unsealed}, {is_seal_status_initialized, GetSealStatus},
13    {is_sealed, list_vault_pods, PodApi, StatefulSetApi},
14};
15
16impl PodApi {
17    /// Check if the vault pod has the specified version
18    pub fn is_current(pod: &Pod, target: &VaultVersion) -> anyhow::Result<bool> {
19        let pod_version = VaultVersion::try_from(pod)?;
20        Ok(&pod_version == target)
21    }
22
23    /// Upgrade a vault pod
24    ///
25    ///  - a.1. if Pod version is outdated
26    ///     - a.1.1. Delete pod
27    ///     - a.1.2. Wait for pod to be deleted
28    ///     - a.1.3. Wait for pod to be running
29    ///  - a.2. if Pod version is current
30    ///     - a.2.1. Pod is sealed
31    ///         - a.2.1.1 Unseal pod
32    ///     - a.2.2. Wait for pod to be unsealed
33    ///     - a.2.3. Wait for pod to be ready
34    pub async fn upgrade(
35        &self,
36        pod: Pod,
37        target: &VaultVersion,
38        token: Secret<String>,
39        should_unseal: bool,
40        force_upgrade: bool,
41        keys: &[Secret<String>],
42    ) -> anyhow::Result<()> {
43        let name = pod
44            .metadata
45            .name
46            .as_ref()
47            .ok_or(anyhow::anyhow!("pod does not have a name"))?;
48
49        // if Pod version is outdated (or upgrade is forced)
50        if !Self::is_current(&pod, target)? || force_upgrade {
51            // if Pod is active
52            if is_active(&pod)? {
53                // Step down active pod
54                self.http(name, VAULT_PORT).await?.step_down(token).await?;
55
56                // Wait for other pod to take over
57                kube::runtime::wait::await_condition(self.api.clone(), name, is_pod_standby())
58                    .await?;
59            }
60
61            // Delete pod
62            kube::runtime::wait::delete::delete_and_finalize(
63                self.api.clone(),
64                name,
65                &DeleteParams::default(),
66            )
67            .await
68            .map_err(|e| anyhow::anyhow!("deleting pod {}: {}", name, e.to_string()))?;
69        }
70
71        // Wait for pod to be running
72        kube::runtime::wait::await_condition(self.api.clone(), name, is_pod_running())
73            .await
74            .map_err(|e| {
75                anyhow::anyhow!("waiting for pod {} to be running: {}", name, e.to_string())
76            })?;
77
78        // Wait for pod to export its seal status
79        kube::runtime::wait::await_condition(
80            self.api.clone(),
81            name,
82            is_pod_exporting_seal_status(),
83        )
84        .await?;
85
86        // Refresh pod
87        let pod = self.api.get(name).await?;
88
89        if Self::is_current(&pod, target)? {
90            // Pod is sealed
91            if is_sealed(&pod)? {
92                if should_unseal {
93                    let mut pf = Retry::spawn(
94                        ExponentialBackoff::from_millis(50).map(jitter).take(5),
95                        || async move { self.http(name, VAULT_PORT).await },
96                    )
97                    .await
98                    .map_err(|e| {
99                        anyhow::anyhow!(
100                            "attempting to forward http requests to {}: {}",
101                            name,
102                            e.to_string()
103                        )
104                    })?;
105
106                    // Wait for pod to have determined its seal status
107                    pf.await_seal_status(is_seal_status_initialized())
108                        .await
109                        .map_err(|e| {
110                            anyhow::anyhow!(
111                                "waiting for pod to have required seal status {}: {}",
112                                name,
113                                e.to_string()
114                            )
115                        })?;
116
117                    // Unseal pod
118                    pf.unseal(keys).await.map_err(|e| {
119                        anyhow::anyhow!("unsealing pod {}: {}", name, e.to_string())
120                    })?;
121                } else {
122                    info!("pod {} is sealed, waiting for external unseal", name);
123                }
124            }
125            // Wait for pod to be unsealed
126            kube::runtime::wait::await_condition(self.api.clone(), name, is_pod_unsealed()).await?;
127            // Wait for pod to be ready
128            kube::runtime::wait::await_condition(self.api.clone(), name, is_pod_ready()).await?;
129        }
130
131        Ok(())
132    }
133}
134
135impl StatefulSetApi {
136    /// Upgrade a vault cluster
137    ///
138    /// - Verify that the statefulset is ready to be upgraded or in the process of being upgraded
139    ///     - if statefulset is ready and all pods are ready, initialized and unsealed
140    ///         - start upgrade process
141    /// - Detect the target version from statefulset
142    /// - Repeat for all standby pods
143    ///     - Do a.1
144    ///     - Do a.2
145    /// - Upgrade active pods
146    ///     - if Pod version is outdated
147    ///         - Step down active pod
148    ///         - Wait for other pod to take over
149    ///     - Do a.1
150    ///     - Do a.2
151    ///
152    /// - a.1. if Pod version is outdated
153    ///     - a.1.1. Delete pod
154    ///     - a.1.2. Wait for pod to be deleted
155    ///     - a.1.3. Wait for pod to be running
156    /// - a.2. if Pod version is current
157    ///     - a.2.1. Pod is sealed
158    ///         - a.2.1.1 Unseal pod
159    ///     - a.2.2. Wait for pod to be unsealed
160    ///     - a.2.3. Wait for pod to be ready
161    pub async fn upgrade(
162        &self,
163        sts: StatefulSet,
164        pods: &PodApi,
165        token: Secret<String>,
166        should_unseal: bool,
167        force_upgrade: bool,
168        keys: &[Secret<String>],
169    ) -> anyhow::Result<()> {
170        let target = VaultVersion::try_from(&sts)?;
171
172        let standby = pods
173            .api
174            .list(&list_vault_pods().labels(&ExecIn::Standby.to_label_selector()))
175            .await?;
176
177        if standby.items.is_empty() {
178            warn!("no standby pods found, skipping upgrade");
179            return Ok(());
180        }
181
182        let active = pods
183            .api
184            .list(&list_vault_pods().labels(&ExecIn::Active.to_label_selector()))
185            .await?;
186
187        if active.items.is_empty() {
188            warn!("no active pods found, skipping upgrade");
189            return Ok(());
190        }
191
192        info!("upgrading standby pods");
193        for pod in standby.iter() {
194            pods.upgrade(
195                pod.clone(),
196                &target,
197                token.clone(),
198                should_unseal,
199                force_upgrade,
200                keys,
201            )
202            .await?;
203        }
204
205        info!("upgrading active pods");
206        for pod in active.iter() {
207            pods.upgrade(
208                pod.clone(),
209                &target,
210                token.clone(),
211                should_unseal,
212                force_upgrade,
213                keys,
214            )
215            .await?;
216        }
217
218        Ok(())
219    }
220}
221
222#[cfg(test)]
223mod tests {
224    use std::str::FromStr;
225
226    use http::{Request, Response, StatusCode};
227    use hyper::body::Bytes;
228    use k8s_openapi::{api::core::v1::Pod, List};
229    use kube::{client::Body, Api, Client};
230    use secrecy::Secret;
231    use serde_yaml::Value;
232    use tokio::task::JoinHandle;
233    use tokio_util::sync::CancellationToken;
234    use tower_test::mock::{self, Handle};
235
236    use crate::{PodApi, VaultVersion};
237
238    #[tokio::test]
239    async fn is_current_returns_true_if_pod_version_is_current() {
240        let file = tokio::fs::read_to_string(format!(
241            "tests/resources/installed/{}{}.yaml",
242            "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-", 0
243        ))
244        .await
245        .unwrap();
246
247        let pod: Pod = serde_yaml::from_str(&file).unwrap();
248
249        let target = VaultVersion {
250            version: "1.13.0".to_string(),
251        };
252
253        assert!(PodApi::is_current(&pod, &target).unwrap());
254    }
255
256    #[tokio::test]
257    async fn is_current_returns_false_if_pod_version_is_outdated() {
258        let file = tokio::fs::read_to_string(format!(
259            "tests/resources/installed/{}{}.yaml",
260            "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-", 0
261        ))
262        .await
263        .unwrap();
264
265        let pod: Pod = serde_yaml::from_str(&file).unwrap();
266
267        let target = VaultVersion {
268            version: "1.14.0".to_string(),
269        };
270
271        assert!(!PodApi::is_current(&pod, &target).unwrap());
272    }
273
274    #[tokio::test]
275    async fn is_current_returns_false_if_pod_version_is_too_new() {
276        let file = tokio::fs::read_to_string(format!(
277            "tests/resources/installed/{}{}.yaml",
278            "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-", 0
279        ))
280        .await
281        .unwrap();
282
283        let pod: Pod = serde_yaml::from_str(&file).unwrap();
284
285        let target = VaultVersion {
286            version: "1.0.0".to_string(),
287        };
288
289        assert!(!PodApi::is_current(&pod, &target).unwrap());
290    }
291
292    async fn mock_list_sealed(
293        cancel: CancellationToken,
294        handle: &mut Handle<Request<Body>, Response<Body>>,
295    ) -> bool {
296        let mut delete_called = false;
297        loop {
298            tokio::select! {
299                request = handle.next_request() => {
300                    let (request, send) = request.expect("Service not called");
301
302                    let method = request.method().to_string();
303                    let uri = request.uri().path().to_string();
304                    let query = request.uri().query().unwrap_or_default().to_string();
305
306                    let watch = query.contains("watch=true");
307
308                    println!("{} {} {} ", method, uri, query);
309
310                    let body = match (method.as_str(), uri.as_str(), query.as_str(), watch) {
311                        ("GET", "/api/v1/namespaces/vault-mgmt-e2e/pods", "&fieldSelector=metadata.name%3Dvault-mgmt-e2e-2274-1&resourceVersion=0", false) => {
312                            let mut pod: Pod = serde_yaml::from_str(
313                                &tokio::fs::read_to_string(format!(
314                                    "tests/resources/installed/{}{}.yaml",
315                                    "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-",
316                                    1
317                                ))
318                                .await
319                                .unwrap()
320                            ).unwrap();
321
322                            pod.metadata
323                                .labels
324                                .as_mut()
325                                .unwrap()
326                                .entry("vault-sealed".to_string())
327                                .and_modify(|x| *x = "false".to_string());
328
329                            let mut list = List::<Pod>::default();
330                            list.items.push(pod);
331                            list.metadata.resource_version = Some("0".to_string());
332                            serde_json::to_string(&list).unwrap()
333                        }
334                        ("GET", "/api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-1", _, _) => {
335                            let file =
336                                tokio::fs::read_to_string(format!(
337                                    "tests/resources/installed/{}.yaml",
338                                    "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-1"
339                                ))
340                                .await
341                                .unwrap();
342
343                            serde_json::to_string(&serde_yaml::from_str::<Value>(&file).unwrap()).unwrap()
344                        }
345                        (method, _, _, _) => {
346                            if method == "DELETE" {
347                                delete_called = true;
348                            }
349                            send.send_response(Response::builder().status(StatusCode::NOT_FOUND).body(Bytes::from("404 not found").into()).unwrap());
350                            continue;
351                        },
352                    };
353
354                    send.send_response(Response::builder().body(Bytes::from(body.to_string()).into()).unwrap());
355                }
356                _ = cancel.cancelled() => {
357                    return delete_called;
358                }
359            }
360        }
361    }
362
363    async fn setup() -> (Api<Pod>, JoinHandle<bool>, CancellationToken) {
364        let (mock_service, mut handle) = mock::pair::<Request<Body>, Response<Body>>();
365
366        let cancel = CancellationToken::new();
367        let cloned_token = cancel.clone();
368
369        let spawned =
370            tokio::spawn(async move { mock_list_sealed(cloned_token, &mut handle).await });
371
372        let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "vault-mgmt-e2e"));
373
374        (pods, spawned, cancel)
375    }
376
377    #[tokio::test]
378    async fn upgrade_does_not_delete_pod_if_current() {
379        let target = VaultVersion {
380            version: "1.13.0".to_string(),
381        };
382
383        let (api, service, cancel) = setup().await;
384
385        let pods = PodApi::new(api, false, "vault-mgmt-e2e".to_string());
386
387        let pod = pods.api.get("vault-mgmt-e2e-2274-1").await.unwrap();
388
389        pods.upgrade(
390            pod,
391            &target,
392            Secret::from_str("token").unwrap(),
393            false,
394            false,
395            &[],
396        )
397        .await
398        .unwrap_err();
399
400        cancel.cancel();
401
402        let delete_called = service.await.unwrap();
403
404        assert!(!delete_called);
405    }
406
407    #[tokio::test]
408    async fn upgrade_does_delete_pod_if_current_and_force_upgrade() {
409        let target = VaultVersion {
410            version: "1.13.0".to_string(),
411        };
412
413        let (api, service, cancel) = setup().await;
414
415        let pods = PodApi::new(api, false, "vault-mgmt-e2e".to_string());
416
417        let pod = pods.api.get("vault-mgmt-e2e-2274-1").await.unwrap();
418
419        pods.upgrade(
420            pod,
421            &target,
422            Secret::from_str("token").unwrap(),
423            false,
424            true,
425            &[],
426        )
427        .await
428        .unwrap_err();
429
430        cancel.cancel();
431
432        let delete_called = service.await.unwrap();
433
434        assert!(delete_called);
435    }
436}