1use clap::ValueEnum;
2use futures_util::StreamExt;
3use k8s_openapi::api::core::v1::Pod;
4use kube::api::{Api, AttachParams, AttachedProcess};
5use secrecy::{ExposeSecret, Secret};
6use std::collections::HashMap;
7use tokio::io::AsyncWriteExt;
8
9use crate::{list_vault_pods, LABEL_KEY_VAULT_ACTIVE, LABEL_KEY_VAULT_SEALED};
10
11#[derive(ValueEnum, Copy, Clone, Debug, PartialEq, Eq)]
12pub enum ExecIn {
13 Active,
14 Standby,
15 Sealed,
16}
17
18impl std::fmt::Display for ExecIn {
19 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20 self.to_possible_value()
21 .expect("no values are skipped")
22 .get_name()
23 .fmt(f)
24 }
25}
26
27impl ExecIn {
28 pub fn to_label_selector(&self) -> String {
29 match self {
30 ExecIn::Active => format!("{}=true", LABEL_KEY_VAULT_ACTIVE),
31 ExecIn::Standby => format!("{}=false", LABEL_KEY_VAULT_ACTIVE),
32 ExecIn::Sealed => format!("{}=true", LABEL_KEY_VAULT_SEALED),
33 }
34 }
35}
36
37#[tracing::instrument(skip_all, fields(cmd, exec_in = %exec_in))]
38pub async fn exec(
39 api: &Api<Pod>,
40 cmd: String,
41 exec_in: ExecIn,
42 env: HashMap<String, Secret<String>>,
43) -> anyhow::Result<()> {
44 let pods = api
45 .list(&list_vault_pods().labels(&exec_in.to_label_selector()))
46 .await?;
47 let pod = pods
48 .items
49 .first()
50 .ok_or(anyhow::anyhow!("no matching vault pod found"))?;
51
52 let (stdout, stderr) = exec_pod(api, pod, cmd, env).await?;
53
54 tokio::io::stdout().write_all(stdout.as_bytes()).await?;
55 tokio::io::stderr().write_all(stderr.as_bytes()).await?;
56
57 Ok(())
58}
59
60#[tracing::instrument(
61 skip_all,
62 fields(pod = %pod.metadata.name.clone().ok_or(anyhow::anyhow!("pod does not have a name"))?,
63 cmd = %cmd,
64 env_vars = ?env.keys()),
65)]
66pub async fn exec_pod(
67 api: &Api<Pod>,
68 pod: &Pod,
69 cmd: String,
70 env: HashMap<String, Secret<String>>,
71) -> anyhow::Result<(String, String)> {
72 let mut attached = api
73 .exec(
74 &pod.metadata
75 .name
76 .clone()
77 .ok_or(anyhow::anyhow!("pod does not have a name"))?,
78 vec!["sh"],
79 &AttachParams::default().stdin(true),
80 )
81 .await?;
82
83 let mut stdin_writer = attached
84 .stdin()
85 .ok_or(anyhow::anyhow!("no stdin available"))?;
86
87 let mut cmd_with_env_vars = String::new();
88 for (k, v) in env {
89 cmd_with_env_vars.push_str(&format!("{}={} ", k, v.expose_secret()));
90 }
91 cmd_with_env_vars.push_str(&cmd);
92 cmd_with_env_vars.push_str("\nexit\n");
93
94 stdin_writer.write_all(cmd_with_env_vars.as_bytes()).await?;
95
96 let (stdout, stderr) = get_output(attached).await?;
97
98 Ok((stdout, stderr))
99}
100
101#[tracing::instrument(skip_all)]
102pub async fn get_output(mut attached: AttachedProcess) -> anyhow::Result<(String, String)> {
103 let stdout = tokio_util::io::ReaderStream::new(
104 attached
105 .stdout()
106 .ok_or(anyhow::anyhow!("no stdout available"))?,
107 );
108 let stderr = tokio_util::io::ReaderStream::new(
109 attached
110 .stderr()
111 .ok_or(anyhow::anyhow!("no stderr available"))?,
112 );
113 attached.join().await?;
114 let out = stdout
115 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
116 .collect::<Vec<_>>()
117 .await
118 .join("");
119 let err = stderr
120 .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
121 .collect::<Vec<_>>()
122 .await
123 .join("");
124 Ok((out, err))
125}