1use crate::config::KubeConfig;
2use crate::errors::KubernetesError;
3use base64;
4use chrono::DateTime;
5use http::StatusCode;
6use isahc::{
7 config::CaCertificate, config::ClientCertificate, config::Configurable, config::PrivateKey,
8 config::SslOption, Body, HttpClient, Request,
9};
10use k8s_openapi::{api::core::v1 as api, ResponseBody};
11use std::env;
12use std::fs;
13use std::{io::Read, io::Write};
14use tempfile::NamedTempFile;
15
16#[derive(Debug)]
17pub struct Kubernetes {
18 pub kubeconfig: Result<KubeConfig, KubernetesError>,
19 pub http_client: HttpClient,
20 pub base_uri: String,
21}
22
23impl Kubernetes {
24 pub fn connect(
25 kubeconfig_path: Option<String>,
26 scheme: Option<String>,
27 host: Option<String>,
28 port: Option<u32>,
29 search_uri: bool,
30 ) -> Result<Kubernetes, KubernetesError> {
31 let kubeconfig = KubeConfig::load(kubeconfig_path);
32 let token_file = "/var/run/secrets/kubernetes.io/serviceaccount/token";
33 let http_client;
34
35 if std::path::Path::new(token_file).exists() {
36 let service_account_token = fs::read_to_string(token_file)
37 .map_err(|err| KubernetesError::IoError { source: err })?;
38
39 let http_client_builder = HttpClient::builder()
40 .default_header("Authorization", format!("Bearer {}", service_account_token))
41 .ssl_ca_certificate(CaCertificate::file(
42 "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
43 ))
44 .ssl_options(SslOption::DANGER_ACCEPT_INVALID_CERTS);
45
46 http_client = match http_client_builder.build() {
47 Ok(client) => client,
48 Err(err) => {
49 return Err(KubernetesError::HttpClientBuildError {
50 message: format!(
51 "Failed to initialize http client with service account token: {}",
52 err
53 ),
54 })
55 }
56 };
57 } else if let Ok(conf) = &kubeconfig {
58 if let Some(cluster) = conf.clusters.first() {
60 if let Some(auth_info) = conf.auth_infos.first() {
61 let user = &auth_info.auth_info;
62 if let Some(crt) = &user.client_certificate_data {
63 if let Some(ca) = &cluster.cluster.certificate_authority_data {
64 if let Some(key) = &user.client_key_data {
65 let mut tmpfile = NamedTempFile::new()
66 .map_err(|err| KubernetesError::IoError { source: err })?;
67 writeln!(tmpfile, "{}", ca)
68 .map_err(|err| KubernetesError::IoError { source: err })?;
69 let http_client_builder = HttpClient::builder()
70 .ssl_client_certificate(ClientCertificate::pem(
71 base64::decode(crt).map_err(|err| {
72 KubernetesError::Base64DecodeError { source: err }
73 })?,
74 PrivateKey::pem(
75 base64::decode(key).map_err(|err| {
76 KubernetesError::Base64DecodeError { source: err }
77 })?,
78 None,
79 ),
80 ))
81 .ssl_ca_certificate(CaCertificate::file(
82 tmpfile.into_temp_path().to_path_buf(),
83 ))
84 .ssl_options(SslOption::DANGER_ACCEPT_INVALID_CERTS);
85 http_client = match http_client_builder.build() {
86 Ok(client) => client,
87 Err(err) => return Err(KubernetesError::HttpClientBuildError { message: format!("Failed to initialize http client with client certificate: {}", err) })
88 };
89 } else {
90 return Err(KubernetesError::HttpClientBuildError {
91 message: String::from(
92 "Couldn't get client key from kubeconfig.",
93 ),
94 });
95 }
96 } else {
97 return Err(KubernetesError::HttpClientBuildError {
98 message: String::from(
99 "Couldn't get CA certificate from kubeconfig.",
100 ),
101 });
102 }
103 } else {
104 return Err(KubernetesError::HttpClientBuildError {
105 message: String::from("Couldn't get client private key."),
106 });
107 }
108 } else {
109 return Err(KubernetesError::HttpClientBuildError {
110 message: String::from("No auth_info item found in kubeconfig."),
111 });
112 }
113 } else {
114 return Err(KubernetesError::ConfigLoadError);
115 }
116 } else {
117 return Err(KubernetesError::HttpClientBuildError {
118 message: String::from("Couldn't gather kubeconfig content."),
119 });
120 }
121
122 let scheme_part;
123 let host_part;
124 let port_part;
125
126 if search_uri {
127 if let Ok(host_var) = env::var("KUBERNETES_SERVICE_HOST") {
128 host_part = host_var;
129 } else {
130 host_part = host.unwrap_or(String::from("localhost"));
131 }
132 if let Ok(port_var) = env::var("KUBERNETES_SERVICE_PORT") {
133 port_part = port_var;
134 scheme_part = match port_part.as_str() {
135 "443" => String::from("https"),
136 "80" => String::from("http"),
137 _ => String::from("https"),
138 }
139 } else {
140 scheme_part = String::from("https");
141 port_part = port.unwrap_or(6443).to_string();
142 }
143 } else {
144 scheme_part = scheme.unwrap_or(String::from("https"));
145 host_part = host.unwrap_or(String::from("localhost"));
146 port_part = port.unwrap_or(6443).to_string();
147 }
148
149 let base_uri = format!("{}://{}:{}", scheme_part, host_part, port_part);
150
151 Ok(Kubernetes {
152 kubeconfig,
153 http_client,
154 base_uri,
155 })
156 }
157
158 fn request<T>(
159 &self,
160 request: Request<Vec<u8>>,
161 response_body: fn(StatusCode) -> ResponseBody<T>,
162 ) -> Result<(Body, ResponseBody<T>), KubernetesError> {
163 let (parts, body) = request.into_parts();
164 let uri_str = format!("{}{}", self.base_uri, parts.uri);
165 let request = Request::builder().uri(uri_str).body(body).map_err(|err| {
166 KubernetesError::HttpClientBuildError {
167 message: format!("Couldn't build request. Error: {:?}", err),
168 }
169 })?;
170 let response = self
171 .http_client
172 .send(request)
173 .map_err(|_| KubernetesError::HttpClientRequestError)?;
174 let status_code = response.status();
175 if !status_code.is_success() {
176 return Err(KubernetesError::HttpClientRequestError);
177 }
178 let response_body = response_body(status_code);
179 let body = response.into_body();
180 Ok((body, response_body))
181 }
182
183 pub fn get_events(&self, since: Option<String>) -> Result<Vec<api::Event>, KubernetesError> {
184 let (request, response_body) =
185 match api::Event::list_event_for_all_namespaces(Default::default()) {
186 Ok((request, response_body)) => (request, response_body),
187 Err(err) => return Err(KubernetesError::ApiRequestError { source: err }),
188 };
189 let (mut body, mut response_body) = self.request(request, response_body)?;
190 let mut buf = Box::new([0u8; 4096]);
191 let events_list_raw = loop {
192 let read = body.read(&mut *buf).map_err(|err| {
193 KubernetesError::HttpClientParseResponseError {
194 message: format!("Got error: {}", err),
195 }
196 })?;
197 response_body.append_slice(&buf[..read]);
198 let response = response_body.parse();
199 match response {
200 Ok(k8s_openapi::ListResponse::Ok(events_list)) => break events_list,
201 Ok(other) => {
202 return Err(KubernetesError::HttpClientParseResponseError {
203 message: format!("expected Ok but got {:?}", other),
204 })
205 }
206 Err(k8s_openapi::ResponseError::NeedMoreData) => continue,
207 Err(err) => {
208 return Err(KubernetesError::HttpClientParseResponseError {
209 message: format!("error: {:?}", err),
210 })
211 }
212 }
213 };
214 let events = events_list_raw.items;
215 let mut since_datetime = None;
216 if let Some(since) = since {
217 since_datetime = Some(
218 DateTime::parse_from_rfc3339(&since)
219 .map_err(|source| KubernetesError::WrongDatetimeFormat { source })?,
220 );
221 }
222 Ok(events
223 .into_iter()
224 .filter(move |e| match &e.event_time {
225 Some(time) => {
226 if let Some(since_dt) = since_datetime {
227 if time.0.ge(&since_dt) {
228 return true;
229 } else {
230 return false;
231 }
232 } else {
233 return true;
234 }
235 }
236 None => false,
237 })
238 .collect())
239 }
240
241 pub fn list_pods(&self, namespace: String) -> Result<Vec<api::Pod>, KubernetesError> {
242 let (request, response_body) =
243 match api::Pod::list_namespaced_pod(&namespace, Default::default()) {
244 Ok((request, response_body)) => (request, response_body),
245 Err(err) => return Err(KubernetesError::ApiRequestError { source: err }),
246 };
247 let (parts, body) = request.into_parts();
248 let uri_str = format!("{}{}", self.base_uri, parts.uri);
249 let request = Request::builder().uri(uri_str).body(body).map_err(|err| {
250 KubernetesError::HttpClientBuildError {
251 message: format!("Couldn't build request. Error: {:?}", err),
252 }
253 })?;
254 let response = self
255 .http_client
256 .send(request)
257 .map_err(|_| KubernetesError::HttpClientRequestError)?;
258 let status_code = response.status();
259 if !status_code.is_success() {
260 return Err(KubernetesError::HttpClientRequestError);
261 }
262 let mut response_body = response_body(status_code);
263 let mut buf = Box::new([0u8; 4096]);
264 let mut body = response.into_body();
265 let pods_list_raw = loop {
266 let read = body.read(&mut *buf).map_err(|err| {
267 KubernetesError::HttpClientParseResponseError {
268 message: format!("Got error : {}", err),
269 }
270 })?;
271 response_body.append_slice(&buf[..read]);
272 let response = response_body.parse();
273 match response {
274 Ok(k8s_openapi::ListResponse::Ok(pod_list)) => break pod_list,
275 Ok(other) => {
276 return Err(KubernetesError::HttpClientParseResponseError {
277 message: format!("expected Ok but got {} {:?}", status_code, other),
278 })
279 }
280 Err(k8s_openapi::ResponseError::NeedMoreData) => continue,
281 Err(err) => {
282 return Err(KubernetesError::HttpClientParseResponseError {
283 message: format!("error: {} {:?}", status_code, err),
284 })
285 }
286 }
287 };
288
289 Ok(pods_list_raw.items)
290 }
291}