1use k8s_openapi::api::{apps::v1::StatefulSet, core::v1::Pod};
2use kube::{api::DeleteParams, runtime::wait::conditions::is_pod_running};
3use secrecy::Secret;
4use tokio_retry::{
5 strategy::{jitter, ExponentialBackoff},
6 Retry,
7};
8use tracing::*;
9
10use crate::{
11 is_active, is_pod_exporting_seal_status, ExecIn, StepDown, Unseal, VaultVersion, VAULT_PORT,
12 {is_pod_ready, is_pod_standby, is_pod_unsealed}, {is_seal_status_initialized, GetSealStatus},
13 {is_sealed, list_vault_pods, PodApi, StatefulSetApi},
14};
15
16impl PodApi {
17 pub fn is_current(pod: &Pod, target: &VaultVersion) -> anyhow::Result<bool> {
19 let pod_version = VaultVersion::try_from(pod)?;
20 Ok(&pod_version == target)
21 }
22
23 pub async fn upgrade(
35 &self,
36 pod: Pod,
37 target: &VaultVersion,
38 token: Secret<String>,
39 should_unseal: bool,
40 force_upgrade: bool,
41 keys: &[Secret<String>],
42 ) -> anyhow::Result<()> {
43 let name = pod
44 .metadata
45 .name
46 .as_ref()
47 .ok_or(anyhow::anyhow!("pod does not have a name"))?;
48
49 if !Self::is_current(&pod, target)? || force_upgrade {
51 if is_active(&pod)? {
53 self.http(name, VAULT_PORT).await?.step_down(token).await?;
55
56 kube::runtime::wait::await_condition(self.api.clone(), name, is_pod_standby())
58 .await?;
59 }
60
61 kube::runtime::wait::delete::delete_and_finalize(
63 self.api.clone(),
64 name,
65 &DeleteParams::default(),
66 )
67 .await
68 .map_err(|e| anyhow::anyhow!("deleting pod {}: {}", name, e.to_string()))?;
69 }
70
71 kube::runtime::wait::await_condition(self.api.clone(), name, is_pod_running())
73 .await
74 .map_err(|e| {
75 anyhow::anyhow!("waiting for pod {} to be running: {}", name, e.to_string())
76 })?;
77
78 kube::runtime::wait::await_condition(
80 self.api.clone(),
81 name,
82 is_pod_exporting_seal_status(),
83 )
84 .await?;
85
86 let pod = self.api.get(name).await?;
88
89 if Self::is_current(&pod, target)? {
90 if is_sealed(&pod)? {
92 if should_unseal {
93 let mut pf = Retry::spawn(
94 ExponentialBackoff::from_millis(50).map(jitter).take(5),
95 || async move { self.http(name, VAULT_PORT).await },
96 )
97 .await
98 .map_err(|e| {
99 anyhow::anyhow!(
100 "attempting to forward http requests to {}: {}",
101 name,
102 e.to_string()
103 )
104 })?;
105
106 pf.await_seal_status(is_seal_status_initialized())
108 .await
109 .map_err(|e| {
110 anyhow::anyhow!(
111 "waiting for pod to have required seal status {}: {}",
112 name,
113 e.to_string()
114 )
115 })?;
116
117 pf.unseal(keys).await.map_err(|e| {
119 anyhow::anyhow!("unsealing pod {}: {}", name, e.to_string())
120 })?;
121 } else {
122 info!("pod {} is sealed, waiting for external unseal", name);
123 }
124 }
125 kube::runtime::wait::await_condition(self.api.clone(), name, is_pod_unsealed()).await?;
127 kube::runtime::wait::await_condition(self.api.clone(), name, is_pod_ready()).await?;
129 }
130
131 Ok(())
132 }
133}
134
135impl StatefulSetApi {
136 pub async fn upgrade(
162 &self,
163 sts: StatefulSet,
164 pods: &PodApi,
165 token: Secret<String>,
166 should_unseal: bool,
167 force_upgrade: bool,
168 keys: &[Secret<String>],
169 ) -> anyhow::Result<()> {
170 let target = VaultVersion::try_from(&sts)?;
171
172 let standby = pods
173 .api
174 .list(&list_vault_pods().labels(&ExecIn::Standby.to_label_selector()))
175 .await?;
176
177 if standby.items.is_empty() {
178 warn!("no standby pods found, skipping upgrade");
179 return Ok(());
180 }
181
182 let active = pods
183 .api
184 .list(&list_vault_pods().labels(&ExecIn::Active.to_label_selector()))
185 .await?;
186
187 if active.items.is_empty() {
188 warn!("no active pods found, skipping upgrade");
189 return Ok(());
190 }
191
192 info!("upgrading standby pods");
193 for pod in standby.iter() {
194 pods.upgrade(
195 pod.clone(),
196 &target,
197 token.clone(),
198 should_unseal,
199 force_upgrade,
200 keys,
201 )
202 .await?;
203 }
204
205 info!("upgrading active pods");
206 for pod in active.iter() {
207 pods.upgrade(
208 pod.clone(),
209 &target,
210 token.clone(),
211 should_unseal,
212 force_upgrade,
213 keys,
214 )
215 .await?;
216 }
217
218 Ok(())
219 }
220}
221
222#[cfg(test)]
223mod tests {
224 use std::str::FromStr;
225
226 use http::{Request, Response, StatusCode};
227 use hyper::body::Bytes;
228 use k8s_openapi::{api::core::v1::Pod, List};
229 use kube::{client::Body, Api, Client};
230 use secrecy::Secret;
231 use serde_yaml::Value;
232 use tokio::task::JoinHandle;
233 use tokio_util::sync::CancellationToken;
234 use tower_test::mock::{self, Handle};
235
236 use crate::{PodApi, VaultVersion};
237
238 #[tokio::test]
239 async fn is_current_returns_true_if_pod_version_is_current() {
240 let file = tokio::fs::read_to_string(format!(
241 "tests/resources/installed/{}{}.yaml",
242 "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-", 0
243 ))
244 .await
245 .unwrap();
246
247 let pod: Pod = serde_yaml::from_str(&file).unwrap();
248
249 let target = VaultVersion {
250 version: "1.13.0".to_string(),
251 };
252
253 assert!(PodApi::is_current(&pod, &target).unwrap());
254 }
255
256 #[tokio::test]
257 async fn is_current_returns_false_if_pod_version_is_outdated() {
258 let file = tokio::fs::read_to_string(format!(
259 "tests/resources/installed/{}{}.yaml",
260 "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-", 0
261 ))
262 .await
263 .unwrap();
264
265 let pod: Pod = serde_yaml::from_str(&file).unwrap();
266
267 let target = VaultVersion {
268 version: "1.14.0".to_string(),
269 };
270
271 assert!(!PodApi::is_current(&pod, &target).unwrap());
272 }
273
274 #[tokio::test]
275 async fn is_current_returns_false_if_pod_version_is_too_new() {
276 let file = tokio::fs::read_to_string(format!(
277 "tests/resources/installed/{}{}.yaml",
278 "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-", 0
279 ))
280 .await
281 .unwrap();
282
283 let pod: Pod = serde_yaml::from_str(&file).unwrap();
284
285 let target = VaultVersion {
286 version: "1.0.0".to_string(),
287 };
288
289 assert!(!PodApi::is_current(&pod, &target).unwrap());
290 }
291
292 async fn mock_list_sealed(
293 cancel: CancellationToken,
294 handle: &mut Handle<Request<Body>, Response<Body>>,
295 ) -> bool {
296 let mut delete_called = false;
297 loop {
298 tokio::select! {
299 request = handle.next_request() => {
300 let (request, send) = request.expect("Service not called");
301
302 let method = request.method().to_string();
303 let uri = request.uri().path().to_string();
304 let query = request.uri().query().unwrap_or_default().to_string();
305
306 let watch = query.contains("watch=true");
307
308 println!("{} {} {} ", method, uri, query);
309
310 let body = match (method.as_str(), uri.as_str(), query.as_str(), watch) {
311 ("GET", "/api/v1/namespaces/vault-mgmt-e2e/pods", "&fieldSelector=metadata.name%3Dvault-mgmt-e2e-2274-1&resourceVersion=0", false) => {
312 let mut pod: Pod = serde_yaml::from_str(
313 &tokio::fs::read_to_string(format!(
314 "tests/resources/installed/{}{}.yaml",
315 "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-",
316 1
317 ))
318 .await
319 .unwrap()
320 ).unwrap();
321
322 pod.metadata
323 .labels
324 .as_mut()
325 .unwrap()
326 .entry("vault-sealed".to_string())
327 .and_modify(|x| *x = "false".to_string());
328
329 let mut list = List::<Pod>::default();
330 list.items.push(pod);
331 list.metadata.resource_version = Some("0".to_string());
332 serde_json::to_string(&list).unwrap()
333 }
334 ("GET", "/api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-1", _, _) => {
335 let file =
336 tokio::fs::read_to_string(format!(
337 "tests/resources/installed/{}.yaml",
338 "api/v1/namespaces/vault-mgmt-e2e/pods/vault-mgmt-e2e-2274-1"
339 ))
340 .await
341 .unwrap();
342
343 serde_json::to_string(&serde_yaml::from_str::<Value>(&file).unwrap()).unwrap()
344 }
345 (method, _, _, _) => {
346 if method == "DELETE" {
347 delete_called = true;
348 }
349 send.send_response(Response::builder().status(StatusCode::NOT_FOUND).body(Bytes::from("404 not found").into()).unwrap());
350 continue;
351 },
352 };
353
354 send.send_response(Response::builder().body(Bytes::from(body.to_string()).into()).unwrap());
355 }
356 _ = cancel.cancelled() => {
357 return delete_called;
358 }
359 }
360 }
361 }
362
363 async fn setup() -> (Api<Pod>, JoinHandle<bool>, CancellationToken) {
364 let (mock_service, mut handle) = mock::pair::<Request<Body>, Response<Body>>();
365
366 let cancel = CancellationToken::new();
367 let cloned_token = cancel.clone();
368
369 let spawned =
370 tokio::spawn(async move { mock_list_sealed(cloned_token, &mut handle).await });
371
372 let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "vault-mgmt-e2e"));
373
374 (pods, spawned, cancel)
375 }
376
377 #[tokio::test]
378 async fn upgrade_does_not_delete_pod_if_current() {
379 let target = VaultVersion {
380 version: "1.13.0".to_string(),
381 };
382
383 let (api, service, cancel) = setup().await;
384
385 let pods = PodApi::new(api, false, "vault-mgmt-e2e".to_string());
386
387 let pod = pods.api.get("vault-mgmt-e2e-2274-1").await.unwrap();
388
389 pods.upgrade(
390 pod,
391 &target,
392 Secret::from_str("token").unwrap(),
393 false,
394 false,
395 &[],
396 )
397 .await
398 .unwrap_err();
399
400 cancel.cancel();
401
402 let delete_called = service.await.unwrap();
403
404 assert!(!delete_called);
405 }
406
407 #[tokio::test]
408 async fn upgrade_does_delete_pod_if_current_and_force_upgrade() {
409 let target = VaultVersion {
410 version: "1.13.0".to_string(),
411 };
412
413 let (api, service, cancel) = setup().await;
414
415 let pods = PodApi::new(api, false, "vault-mgmt-e2e".to_string());
416
417 let pod = pods.api.get("vault-mgmt-e2e-2274-1").await.unwrap();
418
419 pods.upgrade(
420 pod,
421 &target,
422 Secret::from_str("token").unwrap(),
423 false,
424 true,
425 &[],
426 )
427 .await
428 .unwrap_err();
429
430 cancel.cancel();
431
432 let delete_called = service.await.unwrap();
433
434 assert!(delete_called);
435 }
436}