k8s_sync/
kubernetes.rs

1use crate::config::KubeConfig;
2use crate::errors::KubernetesError;
3use base64;
4use chrono::DateTime;
5use http::StatusCode;
6use isahc::{
7    config::CaCertificate, config::ClientCertificate, config::Configurable, config::PrivateKey,
8    config::SslOption, Body, HttpClient, Request,
9};
10use k8s_openapi::{api::core::v1 as api, ResponseBody};
11use std::env;
12use std::fs;
13use std::{io::Read, io::Write};
14use tempfile::NamedTempFile;
15
16#[derive(Debug)]
17pub struct Kubernetes {
18    pub kubeconfig: Result<KubeConfig, KubernetesError>,
19    pub http_client: HttpClient,
20    pub base_uri: String,
21}
22
23impl Kubernetes {
24    pub fn connect(
25        kubeconfig_path: Option<String>,
26        scheme: Option<String>,
27        host: Option<String>,
28        port: Option<u32>,
29        search_uri: bool,
30    ) -> Result<Kubernetes, KubernetesError> {
31        let kubeconfig = KubeConfig::load(kubeconfig_path);
32        let token_file = "/var/run/secrets/kubernetes.io/serviceaccount/token";
33        let http_client;
34
35        if std::path::Path::new(token_file).exists() {
36            let service_account_token = fs::read_to_string(token_file)
37                .map_err(|err| KubernetesError::IoError { source: err })?;
38
39            let http_client_builder = HttpClient::builder()
40                .default_header("Authorization", format!("Bearer {}", service_account_token))
41                .ssl_ca_certificate(CaCertificate::file(
42                    "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
43                ))
44                .ssl_options(SslOption::DANGER_ACCEPT_INVALID_CERTS);
45
46            http_client = match http_client_builder.build() {
47                Ok(client) => client,
48                Err(err) => {
49                    return Err(KubernetesError::HttpClientBuildError {
50                        message: format!(
51                            "Failed to initialize http client with service account token: {}",
52                            err
53                        ),
54                    })
55                }
56            };
57        } else if let Ok(conf) = &kubeconfig {
58            //TODO add options, guessed from config
59            if let Some(cluster) = conf.clusters.first() {
60                if let Some(auth_info) = conf.auth_infos.first() {
61                    let user = &auth_info.auth_info;
62                    if let Some(crt) = &user.client_certificate_data {
63                        if let Some(ca) = &cluster.cluster.certificate_authority_data {
64                            if let Some(key) = &user.client_key_data {
65                                let mut tmpfile = NamedTempFile::new()
66                                    .map_err(|err| KubernetesError::IoError { source: err })?;
67                                writeln!(tmpfile, "{}", ca)
68                                    .map_err(|err| KubernetesError::IoError { source: err })?;
69                                let http_client_builder = HttpClient::builder()
70                                    .ssl_client_certificate(ClientCertificate::pem(
71                                        base64::decode(crt).map_err(|err| {
72                                            KubernetesError::Base64DecodeError { source: err }
73                                        })?,
74                                        PrivateKey::pem(
75                                            base64::decode(key).map_err(|err| {
76                                                KubernetesError::Base64DecodeError { source: err }
77                                            })?,
78                                            None,
79                                        ),
80                                    ))
81                                    .ssl_ca_certificate(CaCertificate::file(
82                                        tmpfile.into_temp_path().to_path_buf(),
83                                    ))
84                                    .ssl_options(SslOption::DANGER_ACCEPT_INVALID_CERTS);
85                                http_client = match http_client_builder.build() {
86                                    Ok(client) => client,
87                                    Err(err) => return Err(KubernetesError::HttpClientBuildError { message: format!("Failed to initialize http client with client certificate: {}", err) })
88                                };
89                            } else {
90                                return Err(KubernetesError::HttpClientBuildError {
91                                    message: String::from(
92                                        "Couldn't get client key from kubeconfig.",
93                                    ),
94                                });
95                            }
96                        } else {
97                            return Err(KubernetesError::HttpClientBuildError {
98                                message: String::from(
99                                    "Couldn't get CA certificate from kubeconfig.",
100                                ),
101                            });
102                        }
103                    } else {
104                        return Err(KubernetesError::HttpClientBuildError {
105                            message: String::from("Couldn't get client private key."),
106                        });
107                    }
108                } else {
109                    return Err(KubernetesError::HttpClientBuildError {
110                        message: String::from("No auth_info item found in kubeconfig."),
111                    });
112                }
113            } else {
114                return Err(KubernetesError::ConfigLoadError);
115            }
116        } else {
117            return Err(KubernetesError::HttpClientBuildError {
118                message: String::from("Couldn't gather kubeconfig content."),
119            });
120        }
121
122        let scheme_part;
123        let host_part;
124        let port_part;
125
126        if search_uri {
127            if let Ok(host_var) = env::var("KUBERNETES_SERVICE_HOST") {
128                host_part = host_var;
129            } else {
130                host_part = host.unwrap_or(String::from("localhost"));
131            }
132            if let Ok(port_var) = env::var("KUBERNETES_SERVICE_PORT") {
133                port_part = port_var;
134                scheme_part = match port_part.as_str() {
135                    "443" => String::from("https"),
136                    "80" => String::from("http"),
137                    _ => String::from("https"),
138                }
139            } else {
140                scheme_part = String::from("https");
141                port_part = port.unwrap_or(6443).to_string();
142            }
143        } else {
144            scheme_part = scheme.unwrap_or(String::from("https"));
145            host_part = host.unwrap_or(String::from("localhost"));
146            port_part = port.unwrap_or(6443).to_string();
147        }
148
149        let base_uri = format!("{}://{}:{}", scheme_part, host_part, port_part);
150
151        Ok(Kubernetes {
152            kubeconfig,
153            http_client,
154            base_uri,
155        })
156    }
157
158    fn request<T>(
159        &self,
160        request: Request<Vec<u8>>,
161        response_body: fn(StatusCode) -> ResponseBody<T>,
162    ) -> Result<(Body, ResponseBody<T>), KubernetesError> {
163        let (parts, body) = request.into_parts();
164        let uri_str = format!("{}{}", self.base_uri, parts.uri);
165        let request = Request::builder().uri(uri_str).body(body).map_err(|err| {
166            KubernetesError::HttpClientBuildError {
167                message: format!("Couldn't build request. Error: {:?}", err),
168            }
169        })?;
170        let response = self
171            .http_client
172            .send(request)
173            .map_err(|_| KubernetesError::HttpClientRequestError)?;
174        let status_code = response.status();
175        if !status_code.is_success() {
176            return Err(KubernetesError::HttpClientRequestError);
177        }
178        let response_body = response_body(status_code);
179        let body = response.into_body();
180        Ok((body, response_body))
181    }
182
183    pub fn get_events(&self, since: Option<String>) -> Result<Vec<api::Event>, KubernetesError> {
184        let (request, response_body) =
185            match api::Event::list_event_for_all_namespaces(Default::default()) {
186                Ok((request, response_body)) => (request, response_body),
187                Err(err) => return Err(KubernetesError::ApiRequestError { source: err }),
188            };
189        let (mut body, mut response_body) = self.request(request, response_body)?;
190        let mut buf = Box::new([0u8; 4096]);
191        let events_list_raw = loop {
192            let read = body.read(&mut *buf).map_err(|err| {
193                KubernetesError::HttpClientParseResponseError {
194                    message: format!("Got error: {}", err),
195                }
196            })?;
197            response_body.append_slice(&buf[..read]);
198            let response = response_body.parse();
199            match response {
200                Ok(k8s_openapi::ListResponse::Ok(events_list)) => break events_list,
201                Ok(other) => {
202                    return Err(KubernetesError::HttpClientParseResponseError {
203                        message: format!("expected Ok but got {:?}", other),
204                    })
205                }
206                Err(k8s_openapi::ResponseError::NeedMoreData) => continue,
207                Err(err) => {
208                    return Err(KubernetesError::HttpClientParseResponseError {
209                        message: format!("error: {:?}", err),
210                    })
211                }
212            }
213        };
214        let events = events_list_raw.items;
215        let mut since_datetime = None;
216        if let Some(since) = since {
217            since_datetime = Some(
218                DateTime::parse_from_rfc3339(&since)
219                    .map_err(|source| KubernetesError::WrongDatetimeFormat { source })?,
220            );
221        }
222        Ok(events
223            .into_iter()
224            .filter(move |e| match &e.event_time {
225                Some(time) => {
226                    if let Some(since_dt) = since_datetime {
227                        if time.0.ge(&since_dt) {
228                            return true;
229                        } else {
230                            return false;
231                        }
232                    } else {
233                        return true;
234                    }
235                }
236                None => false,
237            })
238            .collect())
239    }
240
241    pub fn list_pods(&self, namespace: String) -> Result<Vec<api::Pod>, KubernetesError> {
242        let (request, response_body) =
243            match api::Pod::list_namespaced_pod(&namespace, Default::default()) {
244                Ok((request, response_body)) => (request, response_body),
245                Err(err) => return Err(KubernetesError::ApiRequestError { source: err }),
246            };
247        let (parts, body) = request.into_parts();
248        let uri_str = format!("{}{}", self.base_uri, parts.uri);
249        let request = Request::builder().uri(uri_str).body(body).map_err(|err| {
250            KubernetesError::HttpClientBuildError {
251                message: format!("Couldn't build request. Error: {:?}", err),
252            }
253        })?;
254        let response = self
255            .http_client
256            .send(request)
257            .map_err(|_| KubernetesError::HttpClientRequestError)?;
258        let status_code = response.status();
259        if !status_code.is_success() {
260            return Err(KubernetesError::HttpClientRequestError);
261        }
262        let mut response_body = response_body(status_code);
263        let mut buf = Box::new([0u8; 4096]);
264        let mut body = response.into_body();
265        let pods_list_raw = loop {
266            let read = body.read(&mut *buf).map_err(|err| {
267                KubernetesError::HttpClientParseResponseError {
268                    message: format!("Got error : {}", err),
269                }
270            })?;
271            response_body.append_slice(&buf[..read]);
272            let response = response_body.parse();
273            match response {
274                Ok(k8s_openapi::ListResponse::Ok(pod_list)) => break pod_list,
275                Ok(other) => {
276                    return Err(KubernetesError::HttpClientParseResponseError {
277                        message: format!("expected Ok but got {} {:?}", status_code, other),
278                    })
279                }
280                Err(k8s_openapi::ResponseError::NeedMoreData) => continue,
281                Err(err) => {
282                    return Err(KubernetesError::HttpClientParseResponseError {
283                        message: format!("error: {} {:?}", status_code, err),
284                    })
285                }
286            }
287        };
288
289        Ok(pods_list_raw.items)
290    }
291}