1use std::fmt::Debug;
2use std::fmt::Display;
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use bytes::Buf;
8use futures_util::future::FutureExt;
9use futures_util::stream::empty;
10use futures_util::stream::BoxStream;
11use futures_util::stream::Stream;
12use futures_util::stream::StreamExt;
13use futures_util::stream::TryStreamExt;
14use http::StatusCode;
15use http::header::InvalidHeaderValue;
16use hyper::body::aggregate;
17use hyper::body::Bytes;
18use hyper::header::HeaderValue;
19use hyper::header::ACCEPT;
20use hyper::header::AUTHORIZATION;
21use hyper::header::CONTENT_TYPE;
22use hyper::Body;
23use hyper::Request;
24use hyper::Uri;
25use serde::{Serialize, Deserialize};
26use serde::de::DeserializeOwned;
27use serde_json::Value;
28use tracing::debug;
29use tracing::error;
30use tracing::trace;
31
32use k8_types::{UpdatedK8Obj, MetaStatus};
33use k8_config::K8Config;
34use k8_types::{InputK8Obj, K8List, K8Meta, K8Obj, DeleteStatus, K8Watch, Spec, UpdateK8ObjStatus};
35use k8_types::options::{ListOptions, DeleteOptions};
36
37use crate::uri::{item_uri, items_uri};
38use crate::meta_client::{ListArg, MetadataClient, NameSpace, PatchMergeType, TokenStreamResult};
39
40use super::wstream::WatchStream;
41use super::{HyperClient, HyperConfigBuilder, ListStream, LogStream};
42
43#[derive(Debug)]
45pub struct K8Client {
46 client: HyperClient,
47 host: String,
48 token: Option<String>,
49}
50
51#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, Clone)]
52#[serde(rename_all = "camelCase", default)]
53pub struct VersionInfo {
54 pub major: String,
55 pub minor: String,
56 pub git_version: String,
57 pub git_commit: String,
58 pub git_treestate: String,
59 pub build_date: String,
60 pub go_version: String,
61 pub compiler: String,
62 pub platform: String,
63}
64
65impl K8Client {
66 pub fn try_default() -> Result<Self> {
68 let config = K8Config::load()?;
69 Self::new(config)
70 }
71
72 pub fn new(config: K8Config) -> Result<Self> {
73 let helper = HyperConfigBuilder::new(config)?;
74 let host = helper.host();
75 let token = helper.token()?;
76 let client = helper.build()?;
77 debug!("using k8 token: {:#?}", token);
78 Ok(Self {
79 client,
80 host,
81 token,
82 })
83 }
84
85 pub async fn server_version(&self) -> Result<VersionInfo> {
86 let uri = format!("{}/version", self.host);
87 let info = self
88 .handle_request(Request::get(uri).body(Body::empty())?)
89 .await?;
90 trace!("version info retrieved: {:#?}", info);
91 Ok(info)
92 }
93
94 fn hostname(&self) -> &str {
95 &self.host
96 }
97
98 fn finish_request<B>(&self, request: &mut Request<B>) -> Result<(), InvalidHeaderValue>
99 where
100 B: Into<Body>,
101 {
102 if let Some(ref token) = self.token {
103 let full_token = format!("Bearer {}", token);
104 request
105 .headers_mut()
106 .insert(AUTHORIZATION, HeaderValue::from_str(&full_token)?);
107 }
108 Ok(())
109 }
110
111 async fn handle_request<T>(&self, mut request: Request<Body>) -> Result<T>
113 where
114 T: DeserializeOwned,
115 {
116 use std::io::Read;
117
118 self.finish_request(&mut request)?;
119
120 trace!("request url: {}", request.uri());
121 trace!("request body: {:?}", request.body());
122
123 let resp = self.client.request(request).await?;
124
125 let status = resp.status();
126
127 if status.is_success() {
128 let mut reader = (aggregate(resp).await?).reader();
129 let mut buffer = Vec::new();
130 reader.read_to_end(&mut buffer)?;
131 trace!(%status, "success response: {}", String::from_utf8_lossy(&buffer));
132 serde_json::from_slice(&buffer).map_err(|err| {
133 error!("json error: {}", err);
134 error!("source: {}", String::from_utf8_lossy(&buffer));
135 err.into()
136 })
137 } else {
138 trace!(%status, "error response received");
139 let mut reader = (aggregate(resp).await?).reader();
140 let mut buffer = Vec::new();
141 reader.read_to_end(&mut buffer).map_err(|err| {
142 error!("unable to read error response: {}", err);
143 err
144 })?;
145 trace!("error response: {}", String::from_utf8_lossy(&buffer));
146 let api_status: MetaStatus = serde_json::from_slice(&buffer).map_err(|err| {
147 error!("json error: {}", err);
148 err
149 })?;
150 Err(api_status.into())
151 }
152 }
153
154 #[allow(clippy::useless_conversion)]
156 fn stream_of_chunks(&self, uri: Uri) -> impl Stream<Item = Bytes> {
157 debug!("streaming: {}", uri);
158
159 let request = http::Request::get(uri)
160 .body(Body::empty())
161 .and_then(|mut req| {
162 self.finish_request(&mut req)?;
163 Ok(req)
164 });
165
166 let http_client = self.client.clone();
167
168 let ft = async move {
169 let request = match request {
170 Ok(req) => req,
171 Err(err) => {
172 error!("error building request: {}", err);
173 return empty().right_stream();
174 }
175 };
176
177 match http_client.request(request).await {
178 Ok(response) => {
179 trace!("res status: {}", response.status());
180 trace!("res header: {:#?}", response.headers());
181 WatchStream::new(response.into_body().map_err(|err| err.into())).left_stream()
182 }
183 Err(err) => {
184 error!("error getting streaming: {}", err);
185 empty().right_stream()
186 }
187 }
188 };
189
190 ft.flatten_stream()
191 }
192
193 fn stream<S>(&self, uri: Uri) -> impl Stream<Item = TokenStreamResult<S>> + '_
195 where
196 K8Watch<S>: DeserializeOwned,
197 S: Spec + 'static,
198 S::Status: 'static,
199 S::Header: 'static,
200 {
201 self.stream_of_chunks(uri).map(move |chunk| {
202 trace!(
203 "decoding raw stream : {}",
204 String::from_utf8_lossy(&chunk).to_string()
205 );
206
207 let result: Result<K8Watch<S>, serde_json::Error> = serde_json::from_slice(&chunk)
208 .map_err(|err| {
209 error!("parsing error, chunk_len: {}, error: {}", chunk.len(), err);
210 error!(
211 "error raw stream {}",
212 String::from_utf8_lossy(&chunk).to_string()
213 );
214 err
215 });
216 Ok(vec![match result {
217 Ok(obj) => {
218 trace!("de serialized: {:#?}", obj);
219 Ok(obj)
220 }
221 Err(err) => Err(err.into()),
222 }])
223 })
224 }
225
226 pub async fn retrieve_items_inner<S, N>(
227 &self,
228 namespace: N,
229 options: Option<ListOptions>,
230 ) -> Result<K8List<S>>
231 where
232 S: Spec,
233 N: Into<NameSpace> + Send + Sync,
234 {
235 let uri = items_uri::<S>(self.hostname(), namespace.into(), options);
236 debug!("{}: retrieving items: {}", S::label(), uri);
237 let items = self
238 .handle_request(Request::get(uri).body(Body::empty())?)
239 .await?;
240 trace!("items retrieved: {:#?}", items);
241 Ok(items)
242 }
243
244 pub async fn replace_item<S>(&self, value: UpdatedK8Obj<S>) -> Result<K8Obj<S>>
247 where
248 S: Spec,
249 {
250 let metadata = &value.metadata;
251 debug!( name = %metadata.name,"replace item");
252 trace!("replace {:#?}", value);
253 let uri = item_uri::<S>(
254 self.hostname(),
255 metadata.name(),
256 metadata.namespace(),
257 None,
258 None,
259 )?;
260
261 let bytes = serde_json::to_vec(&value)?;
262
263 trace!(
264 "replace uri: {}, raw: {}",
265 uri,
266 String::from_utf8_lossy(&bytes).to_string()
267 );
268
269 let request = Request::put(uri)
270 .header(CONTENT_TYPE, "application/json")
271 .body(bytes.into())?;
272
273 self.handle_request(request).await
274 }
275
276 pub async fn retrieve_log(
277 &self,
278 namespace: &str,
279 pod_name: &str,
280 container_name: &str,
281 ) -> Result<LogStream> {
282 let sub_resource = format!("/log?container={}&follow={}", container_name, false);
283 let uri = item_uri::<k8_types::core::pod::PodSpec>(
284 self.hostname(),
285 pod_name,
286 namespace,
287 Some(&sub_resource),
288 None,
289 )?;
290 let stream = self.stream_of_chunks(uri);
291 Ok(LogStream(Box::pin(stream)))
292 }
293}
294
295#[async_trait]
296impl MetadataClient for K8Client {
297 async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<Option<K8Obj<S>>>
299 where
300 S: Spec,
301 M: K8Meta + Send + Sync,
302 {
303 let uri = item_uri::<S>(
304 self.hostname(),
305 metadata.name(),
306 metadata.namespace(),
307 None,
308 None,
309 )?;
310 debug!("{}: retrieving item: {}", S::label(), uri);
311
312 let result: Result<K8Obj<S>> = self
313 .handle_request(Request::get(uri).body(Body::empty())?)
314 .await;
315
316 match result {
317 Ok(item) => Ok(Some(item)),
318 Err(err) => {
319 if let Some(MetaStatus {
320 code: Some(code), ..
321 }) = err.downcast_ref()
322 {
323 if *code == StatusCode::NOT_FOUND.as_u16() {
324 Ok(None)
325 } else {
326 Err(err)
327 }
328 } else {
329 Err(err)
330 }
331 }
332 }
333 }
334
335 async fn retrieve_items_with_option<S, N>(
336 &self,
337 namespace: N,
338 option: Option<ListArg>,
339 ) -> Result<K8List<S>>
340 where
341 S: Spec,
342 N: Into<NameSpace> + Send + Sync,
343 {
344 let list_option = option.map(|opt| ListOptions {
345 field_selector: opt.field_selector,
346 label_selector: opt.label_selector,
347 ..Default::default()
348 });
349 self.retrieve_items_inner(namespace, list_option).await
350 }
351
352 fn retrieve_items_in_chunks<'a, S, N>(
353 self: Arc<Self>,
354 namespace: N,
355 limit: u32,
356 option: Option<ListArg>,
357 ) -> BoxStream<'a, K8List<S>>
358 where
359 S: Spec + 'static,
360 N: Into<NameSpace> + Send + Sync + 'static,
361 {
362 ListStream::new(namespace.into(), limit, option, self).boxed()
363 }
364
365 async fn delete_item_with_option<S, M>(
366 &self,
367 metadata: &M,
368 option: Option<DeleteOptions>,
369 ) -> Result<DeleteStatus<S>>
370 where
371 S: Spec,
372 M: K8Meta + Send + Sync,
373 {
374 use k8_types::MetaStatus;
375
376 let uri = item_uri::<S>(
377 self.hostname(),
378 metadata.name(),
379 metadata.namespace(),
380 None,
381 None,
382 )?;
383 debug!("{}: delete item on url: {}", S::label(), uri);
384
385 let body = if let Some(option_value) = option {
386 let bytes = serde_json::to_vec(&option_value)?;
387 trace!("delete raw : {}", String::from_utf8_lossy(&bytes));
388
389 bytes.into()
390 } else {
391 Body::empty()
392 };
393 let request = Request::delete(uri)
394 .header(ACCEPT, "application/json")
395 .body(body)?;
396 let values: serde_json::Map<String, serde_json::Value> =
397 self.handle_request(request).await?;
398 if let Some(kind) = values.get("kind") {
399 if kind == "Status" {
400 let status: MetaStatus =
401 serde::Deserialize::deserialize(serde_json::Value::Object(values))?;
402 Ok(DeleteStatus::Deleted(status))
403 } else {
404 let status: K8Obj<S> =
405 serde::Deserialize::deserialize(serde_json::Value::Object(values))?;
406 Ok(DeleteStatus::ForegroundDelete(status))
407 }
408 } else {
409 Err(anyhow::anyhow!("missing kind: {:#?}", values))
410 }
411 }
412
413 async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>>
415 where
416 S: Spec,
417 {
418 let namespace: NameSpace = value.metadata.namespace.clone().into();
419 let uri = items_uri::<S>(self.hostname(), namespace, None);
420 debug!("creating '{}'", uri);
421 trace!("creating RUST {:#?}", &value);
422
423 let bytes = serde_json::to_vec(&value)?;
424
425 trace!(
426 "create {} raw: {}",
427 S::label(),
428 String::from_utf8_lossy(&bytes).to_string()
429 );
430
431 let request = Request::post(uri)
432 .header(CONTENT_TYPE, "application/json")
433 .body(bytes.into())?;
434
435 self.handle_request(request).await
436 }
437
438 async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>>
440 where
441 S: Spec,
442 {
443 let uri = item_uri::<S>(
444 self.hostname(),
445 &value.metadata.name,
446 &value.metadata.namespace,
447 Some("/status"),
448 None,
449 )?;
450 debug!("updating '{}' status - uri: {}", value.metadata.name, uri);
451 trace!("update status: {:#?}", &value);
452
453 let bytes = serde_json::to_vec(&value)?;
454 trace!(
455 "update raw: {}",
456 String::from_utf8_lossy(&bytes).to_string()
457 );
458
459 let request = Request::put(uri)
460 .header(CONTENT_TYPE, "application/json")
461 .body(bytes.into())?;
462
463 self.handle_request(request).await
464 }
465
466 async fn patch<S, M>(
468 &self,
469 metadata: &M,
470 patch: &Value,
471 merge_type: PatchMergeType,
472 ) -> Result<K8Obj<S>>
473 where
474 S: Spec,
475 M: K8Meta + Display + Send + Sync,
476 {
477 debug!(%metadata, "patching");
478 trace!("patch json value: {:#?}", patch);
479 let uri = item_uri::<S>(
480 self.hostname(),
481 metadata.name(),
482 metadata.namespace(),
483 None,
484 None,
485 )?;
486
487 let bytes = serde_json::to_vec(&patch)?;
488
489 trace!(
490 "patch uri: {}, raw: {}",
491 uri,
492 String::from_utf8_lossy(&bytes).to_string()
493 );
494
495 let request = Request::patch(uri)
496 .header(ACCEPT, "application/json")
497 .header(CONTENT_TYPE, merge_type.content_type())
498 .body(bytes.into())?;
499
500 self.handle_request(request).await
501 }
502
503 async fn patch_status<S, M>(
505 &self,
506 metadata: &M,
507 patch: &Value,
508 merge_type: PatchMergeType,
509 ) -> Result<K8Obj<S>>
510 where
511 S: Spec,
512 M: K8Meta + Display + Send + Sync,
513 {
514 self.patch_subresource(metadata, String::from("/status"), patch, merge_type)
515 .await
516 }
517
518 async fn patch_subresource<S, M>(
519 &self,
520 metadata: &M,
521 subresource: String,
522 patch: &Value,
523 merge_type: PatchMergeType,
524 ) -> Result<K8Obj<S>>
525 where
526 S: Spec,
527 M: K8Meta + Display + Send + Sync,
528 {
529 tracing::info!(%metadata, "patching subresource");
530 tracing::info!("patch json value: {:#?}", patch);
531 let params = match &merge_type {
532 PatchMergeType::Apply(params) => {
533 let params = serde_qs::to_string(¶ms)?;
534 Some(params)
535 }
536 _ => None,
537 };
538 let uri = item_uri::<S>(
539 self.hostname(),
540 metadata.name(),
541 metadata.namespace(),
542 Some(&subresource),
543 params.as_deref(),
544 )?;
545
546 let bytes = serde_json::to_vec(&patch)?;
547
548 tracing::info!(
549 "patch subresource uri: {}, raw: {}",
550 uri,
551 String::from_utf8_lossy(&bytes).to_string()
552 );
553
554 let request = Request::patch(uri)
555 .header(ACCEPT, "application/json")
556 .header(CONTENT_TYPE, merge_type.content_type())
557 .body(bytes.into())?;
558
559 self.handle_request(request).await
560 }
561
562 fn watch_stream_since<S, N>(
564 &self,
565 namespace: N,
566 resource_version: Option<String>,
567 ) -> BoxStream<'_, TokenStreamResult<S>>
568 where
569 S: Spec + 'static,
570 S::Status: 'static,
571 S::Header: 'static,
572 N: Into<NameSpace>,
573 {
574 let opt = ListOptions {
575 watch: Some(true),
576 resource_version,
577 timeout_seconds: Some(3600),
578 ..Default::default()
579 };
580 let uri = items_uri::<S>(self.hostname(), namespace.into(), Some(opt));
581 self.stream(uri).boxed()
582 }
583}