1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
use k8s_openapi::api::core::v1::Pod;
use kube::{api::Api, client::Client, core::subresource::AttachParams};
use tokio::io::AsyncReadExt;

use crate::Error;
use tracing::{debug, error, warn};

#[derive(Debug)]
pub struct ExecOutput {
    pub stdout: Option<String>,
    pub stderr: Option<String>,
    pub success: bool,
}

impl ExecOutput {
    pub fn new(stdout: Option<String>, stderr: Option<String>, success: bool) -> Self {
        Self {
            stdout,
            stderr,
            success,
        }
    }
}

pub struct ExecCommand {
    pods_api: Api<Pod>,
    pod_name: String,
}

impl ExecCommand {
    pub fn new(pod_name: String, namespace: String, client: Client) -> Self {
        let pods_api: Api<Pod> = Api::namespaced(client, &namespace);
        Self { pod_name, pods_api }
    }

    pub async fn execute(&self, command: &[String]) -> Result<ExecOutput, Error> {
        let attach_params = AttachParams {
            container: Some("postgres".to_string()),
            tty: false,
            stdin: true,
            stdout: true,
            stderr: true,
            max_stdin_buf_size: Some(1024),
            max_stdout_buf_size: Some(1024),
            max_stderr_buf_size: Some(1024),
        };

        let mut attached_process = self
            .pods_api
            .exec(self.pod_name.as_str(), command, &attach_params)
            .await?;

        let result_stdout = match attached_process.stdout() {
            None => {
                warn!("No stdout from exec to pod: {:?}", self.pod_name);
                String::new()
            }
            Some(mut stdout_reader) => {
                let mut result_stdout = String::new();
                stdout_reader
                    .read_to_string(&mut result_stdout)
                    .await
                    .unwrap_or_default();
                result_stdout
            }
        };

        let result_stderr = match attached_process.stderr() {
            None => {
                warn!("No stderr from exec to pod: {:?}", self.pod_name);
                String::new()
            }
            Some(mut stderr_reader) => {
                let mut result_stderr = String::new();
                stderr_reader
                    .read_to_string(&mut result_stderr)
                    .await
                    .unwrap_or_default();
                result_stderr
            }
        };

        let status = match attached_process.take_status() {
            None => {
                return Err(Error::KubeExecError(format!(
                    "Error executing command: {:?} on pod: {:?}. Failed to find command status.",
                    command, self.pod_name
                )));
            }
            Some(status) => status.await.unwrap_or_default(),
        };
        // https://git.k8s.io/community/contributors/devel/sig-architecture/api-conventions.md#spec-and-status

        let success = match status.status.expect("no status reported").as_str() {
            "Success" => true,
            "Failure" => {
                let output = format!(
                    "stdout:\n{}\nstderr:\n{}",
                    result_stdout.clone(),
                    result_stderr.clone()
                );
                warn!(
                    "Error executing command on pod: {:?}. response: {:?}",
                    self.pod_name, output
                );
                debug!("Failed command: {:?}", command);
                false
            }
            // This is never supposed to happen because status is supposed to only be
            // Success or Failure based on how the Kube API works
            _ => {
                error!(
                    "Undefined response from kube API when exec to pod: {:?}",
                    self.pod_name
                );
                return Err(Error::KubeExecError(format!(
                    "Error executing command: {:?} on pod: {:?}.",
                    command, self.pod_name
                )));
            }
        };
        Ok(ExecOutput::new(
            Some(result_stdout),
            Some(result_stderr),
            success,
        ))
    }
}