vault_mgmt_lib/
exec.rs

1use clap::ValueEnum;
2use futures_util::StreamExt;
3use k8s_openapi::api::core::v1::Pod;
4use kube::api::{Api, AttachParams, AttachedProcess};
5use secrecy::{ExposeSecret, Secret};
6use std::collections::HashMap;
7use tokio::io::AsyncWriteExt;
8
9use crate::{list_vault_pods, LABEL_KEY_VAULT_ACTIVE, LABEL_KEY_VAULT_SEALED};
10
11#[derive(ValueEnum, Copy, Clone, Debug, PartialEq, Eq)]
12pub enum ExecIn {
13    Active,
14    Standby,
15    Sealed,
16}
17
18impl std::fmt::Display for ExecIn {
19    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
20        self.to_possible_value()
21            .expect("no values are skipped")
22            .get_name()
23            .fmt(f)
24    }
25}
26
27impl ExecIn {
28    pub fn to_label_selector(&self) -> String {
29        match self {
30            ExecIn::Active => format!("{}=true", LABEL_KEY_VAULT_ACTIVE),
31            ExecIn::Standby => format!("{}=false", LABEL_KEY_VAULT_ACTIVE),
32            ExecIn::Sealed => format!("{}=true", LABEL_KEY_VAULT_SEALED),
33        }
34    }
35}
36
37#[tracing::instrument(skip_all, fields(cmd, exec_in = %exec_in))]
38pub async fn exec(
39    api: &Api<Pod>,
40    cmd: String,
41    exec_in: ExecIn,
42    env: HashMap<String, Secret<String>>,
43) -> anyhow::Result<()> {
44    let pods = api
45        .list(&list_vault_pods().labels(&exec_in.to_label_selector()))
46        .await?;
47    let pod = pods
48        .items
49        .first()
50        .ok_or(anyhow::anyhow!("no matching vault pod found"))?;
51
52    let (stdout, stderr) = exec_pod(api, pod, cmd, env).await?;
53
54    tokio::io::stdout().write_all(stdout.as_bytes()).await?;
55    tokio::io::stderr().write_all(stderr.as_bytes()).await?;
56
57    Ok(())
58}
59
60#[tracing::instrument(
61    skip_all,
62    fields(pod = %pod.metadata.name.clone().ok_or(anyhow::anyhow!("pod does not have a name"))?,
63    cmd = %cmd,
64    env_vars = ?env.keys()),
65)]
66pub async fn exec_pod(
67    api: &Api<Pod>,
68    pod: &Pod,
69    cmd: String,
70    env: HashMap<String, Secret<String>>,
71) -> anyhow::Result<(String, String)> {
72    let mut attached = api
73        .exec(
74            &pod.metadata
75                .name
76                .clone()
77                .ok_or(anyhow::anyhow!("pod does not have a name"))?,
78            vec!["sh"],
79            &AttachParams::default().stdin(true),
80        )
81        .await?;
82
83    let mut stdin_writer = attached
84        .stdin()
85        .ok_or(anyhow::anyhow!("no stdin available"))?;
86
87    let mut cmd_with_env_vars = String::new();
88    for (k, v) in env {
89        cmd_with_env_vars.push_str(&format!("{}={} ", k, v.expose_secret()));
90    }
91    cmd_with_env_vars.push_str(&cmd);
92    cmd_with_env_vars.push_str("\nexit\n");
93
94    stdin_writer.write_all(cmd_with_env_vars.as_bytes()).await?;
95
96    let (stdout, stderr) = get_output(attached).await?;
97
98    Ok((stdout, stderr))
99}
100
101#[tracing::instrument(skip_all)]
102pub async fn get_output(mut attached: AttachedProcess) -> anyhow::Result<(String, String)> {
103    let stdout = tokio_util::io::ReaderStream::new(
104        attached
105            .stdout()
106            .ok_or(anyhow::anyhow!("no stdout available"))?,
107    );
108    let stderr = tokio_util::io::ReaderStream::new(
109        attached
110            .stderr()
111            .ok_or(anyhow::anyhow!("no stderr available"))?,
112    );
113    attached.join().await?;
114    let out = stdout
115        .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
116        .collect::<Vec<_>>()
117        .await
118        .join("");
119    let err = stderr
120        .filter_map(|r| async { r.ok().and_then(|v| String::from_utf8(v.to_vec()).ok()) })
121        .collect::<Vec<_>>()
122        .await
123        .join("");
124    Ok((out, err))
125}