1use std::fmt::Debug;
2use std::fmt::Display;
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use bytes::Buf;
8use futures_util::future::FutureExt;
9use futures_util::stream::empty;
10use futures_util::stream::BoxStream;
11use futures_util::stream::Stream;
12use futures_util::stream::StreamExt;
13use futures_util::stream::TryStreamExt;
14use http::StatusCode;
15use http::header::InvalidHeaderValue;
16use hyper::body::aggregate;
17use hyper::body::Bytes;
18use hyper::header::HeaderValue;
19use hyper::header::ACCEPT;
20use hyper::header::AUTHORIZATION;
21use hyper::header::CONTENT_TYPE;
22use hyper::Body;
23use hyper::Request;
24use hyper::Uri;
25use serde::{Serialize, Deserialize};
26use serde::de::DeserializeOwned;
27use serde_json::Value;
28use tracing::debug;
29use tracing::error;
30use tracing::trace;
31
32use k8_types::{UpdatedK8Obj, MetaStatus};
33use k8_config::K8Config;
34use k8_types::{InputK8Obj, K8List, K8Meta, K8Obj, DeleteStatus, K8Watch, Spec, UpdateK8ObjStatus};
35use k8_types::options::{ListOptions, DeleteOptions};
36
37use crate::uri::{item_uri, items_uri};
38use crate::meta_client::{ListArg, MetadataClient, NameSpace, PatchMergeType, TokenStreamResult};
39
40use super::wstream::WatchStream;
41use super::{HyperClient, HyperConfigBuilder, ListStream, LogStream};
42
43const SA_TOKEN_PATH: &str = "/var/run/secrets/kubernetes.io/serviceaccount/token";
44
45#[derive(Debug)]
47pub struct K8Client {
48 client: HyperClient,
49 host: String,
50 token: std::sync::RwLock<Option<String>>,
51}
52
53#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, Clone)]
54#[serde(rename_all = "camelCase", default)]
55pub struct VersionInfo {
56 pub major: String,
57 pub minor: String,
58 pub git_version: String,
59 pub git_commit: String,
60 pub git_treestate: String,
61 pub build_date: String,
62 pub go_version: String,
63 pub compiler: String,
64 pub platform: String,
65}
66
67impl K8Client {
68 pub fn try_default() -> Result<Self> {
70 let config = K8Config::load()?;
71 Self::new(config)
72 }
73
74 pub fn new(config: K8Config) -> Result<Self> {
75 let helper = HyperConfigBuilder::new(config)?;
76 let host = helper.host();
77 let token = helper.token()?;
78 let client = helper.build()?;
79 debug!("using k8 token: {:#?}", token);
80 Ok(Self {
81 client,
82 host,
83 token: std::sync::RwLock::new(token),
84 })
85 }
86
87 pub async fn server_version(&self) -> Result<VersionInfo> {
88 let uri = format!("{}/version", self.host);
89 let info = self
90 .handle_request(Request::get(uri).body(Body::empty())?)
91 .await?;
92 trace!("version info retrieved: {:#?}", info);
93 Ok(info)
94 }
95
96 fn hostname(&self) -> &str {
97 &self.host
98 }
99
100 fn finish_request<B>(&self, request: &mut Request<B>) -> Result<(), InvalidHeaderValue>
101 where
102 B: Into<Body>,
103 {
104 if let Ok(guard) = self.token.read() {
105 if let Some(ref token) = *guard {
106 let full_token = format!("Bearer {token}");
107 request
108 .headers_mut()
109 .insert(AUTHORIZATION, HeaderValue::from_str(&full_token)?);
110 }
111 }
112 Ok(())
113 }
114
115 async fn handle_request_bytes<T>(&self, request: Request<Vec<u8>>) -> Result<T>
117 where
118 T: DeserializeOwned,
119 {
120 use std::io::Read;
121
122 let method = request.method().clone();
123 let uri = request.uri().clone();
124 let version = request.version();
125 let headers = request.headers().clone();
126 let body_buf = request.into_body();
127
128 trace!("request url: {}", uri);
129
130 let mut req1 = http::Request::builder()
132 .method(method.clone())
133 .uri(uri.clone())
134 .version(version)
135 .body(Body::from(body_buf.clone()))?;
136 {
137 let h = req1.headers_mut();
138 for (k, v) in headers.iter() {
139 h.insert(k.clone(), v.clone());
140 }
141 }
142 self.finish_request(&mut req1)?;
143
144 let resp1 = self.client.request(req1).await?;
145 let status1 = resp1.status();
146
147 let mut r1 = (aggregate(resp1).await?).reader();
148 let mut b1 = Vec::new();
149 r1.read_to_end(&mut b1)?;
150
151 if status1.is_success() {
153 return serde_json::from_slice(&b1).map_err(|err| {
154 error!("json error: {}", err);
155 error!("source: {}", String::from_utf8_lossy(&b1));
156 err.into()
157 });
158 }
159
160 if status1 == StatusCode::UNAUTHORIZED {
162 if let Ok(fresh) = std::fs::read_to_string(SA_TOKEN_PATH) {
163 let fresh_trimmed = fresh.trim().to_owned();
164
165 let mut req2 = http::Request::builder()
166 .method(method)
167 .uri(uri)
168 .version(version)
169 .body(Body::from(body_buf))?;
170 {
171 let h = req2.headers_mut();
172 for (k, v) in headers.iter() {
173 h.insert(k.clone(), v.clone());
174 }
175 let bearer = format!("Bearer {}", fresh_trimmed);
176 h.insert(AUTHORIZATION, HeaderValue::from_str(&bearer)?);
177 }
178
179 let resp2 = self.client.request(req2).await?;
180 let status2 = resp2.status();
181
182 let mut r2 = (aggregate(resp2).await?).reader();
183 let mut b2 = Vec::new();
184 r2.read_to_end(&mut b2)?;
185
186 if status2.is_success() {
187 if let Ok(mut w) = self.token.write() {
188 *w = Some(fresh_trimmed);
189 }
190 trace!(%status2, "success response (retry): {}", String::from_utf8_lossy(&b2));
191 return serde_json::from_slice(&b2).map_err(|err| {
192 error!("json error: {}", err);
193 error!("source: {}", String::from_utf8_lossy(&b2));
194 err.into()
195 });
196 } else {
197 trace!(%status2, "error response received (retry)");
198 let api_status: MetaStatus = serde_json::from_slice(&b2).map_err(|err| {
199 error!("json error: {}", err);
200 err
201 })?;
202 return Err(api_status.into());
203 }
204 }
205 }
206
207 let api_status: MetaStatus = serde_json::from_slice(&b1).map_err(|err| {
208 error!("json error: {}", err);
209 err
210 })?;
211 Err(api_status.into())
212 }
213
214 async fn handle_request<T>(&self, mut request: Request<Body>) -> Result<T>
216 where
217 T: DeserializeOwned,
218 {
219 use std::io::Read;
220
221 self.finish_request(&mut request)?;
222
223 trace!("request url: {}", request.uri());
224 trace!("request body: {:?}", request.body());
225
226 let resp = self.client.request(request).await?;
227
228 let status = resp.status();
229
230 if status.is_success() {
231 let mut reader = (aggregate(resp).await?).reader();
232 let mut buffer = Vec::new();
233 reader.read_to_end(&mut buffer)?;
234 trace!(%status, "success response: {}", String::from_utf8_lossy(&buffer));
235 serde_json::from_slice(&buffer).map_err(|err| {
236 error!("json error: {}", err);
237 error!("source: {}", String::from_utf8_lossy(&buffer));
238 err.into()
239 })
240 } else {
241 trace!(%status, "error response received");
242 let mut reader = (aggregate(resp).await?).reader();
243 let mut buffer = Vec::new();
244 reader.read_to_end(&mut buffer).map_err(|err| {
245 error!("unable to read error response: {}", err);
246 err
247 })?;
248 trace!("error response: {}", String::from_utf8_lossy(&buffer));
249 let api_status: MetaStatus = serde_json::from_slice(&buffer).map_err(|err| {
250 error!("json error: {}", err);
251 err
252 })?;
253 Err(api_status.into())
254 }
255 }
256
257 #[allow(clippy::useless_conversion)]
259 fn stream_of_chunks(&self, uri: Uri) -> impl Stream<Item = Bytes> {
260 debug!("streaming: {}", uri);
261
262 let request = http::Request::get(uri)
263 .body(Body::empty())
264 .and_then(|mut req| {
265 self.finish_request(&mut req)?;
266 Ok(req)
267 });
268
269 let http_client = self.client.clone();
270
271 let ft = async move {
272 let request = match request {
273 Ok(req) => req,
274 Err(err) => {
275 error!("error building request: {}", err);
276 return empty().right_stream();
277 }
278 };
279
280 let method = request.method().clone();
282 let uri2 = request.uri().clone();
283 let version = request.version();
284 let headers = request.headers().clone();
285
286 match http_client.request(request).await {
287 Ok(response) => {
288 trace!("res status: {}", response.status());
289 trace!("res header: {:#?}", response.headers());
290
291 let final_body = if response.status() == StatusCode::UNAUTHORIZED {
293 if let Ok(fresh) = std::fs::read_to_string(SA_TOKEN_PATH) {
294 let mut retry_req = http::Request::builder()
296 .method(method)
297 .uri(uri2)
298 .version(version)
299 .body(Body::empty())
300 .expect("failed to build retry request");
301 {
302 let h = retry_req.headers_mut();
303 for (k, v) in headers.iter() {
304 h.insert(k.clone(), v.clone());
305 }
306 let bearer = format!("Bearer {}", fresh.trim());
307 if let Ok(hv) = HeaderValue::from_str(&bearer) {
308 h.insert(AUTHORIZATION, hv);
309 }
310 }
311 match http_client.request(retry_req).await {
312 Ok(resp2) => {
313 trace!("res2 status: {}", resp2.status());
314 trace!("res2 header: {:#?}", resp2.headers());
315 resp2.into_body()
316 }
317 Err(err) => {
318 error!("error getting streaming (retry): {}", err);
319 return empty().right_stream();
320 }
321 }
322 } else {
323 response.into_body()
325 }
326 } else {
327 response.into_body()
328 };
329
330 WatchStream::new(final_body.map_err(|err| err.into())).left_stream()
331 }
332 Err(err) => {
333 error!("error getting streaming: {}", err);
334 empty().right_stream()
335 }
336 }
337 };
338
339 ft.flatten_stream()
340 }
341
342 fn stream<S>(&self, uri: Uri) -> impl Stream<Item = TokenStreamResult<S>> + '_
344 where
345 K8Watch<S>: DeserializeOwned,
346 S: Spec + 'static,
347 S::Status: 'static,
348 S::Header: 'static,
349 {
350 self.stream_of_chunks(uri).map(move |chunk| {
351 trace!(
352 "decoding raw stream : {}",
353 String::from_utf8_lossy(&chunk).to_string()
354 );
355
356 let result: Result<K8Watch<S>, serde_json::Error> = serde_json::from_slice(&chunk)
357 .map_err(|err| {
358 error!("parsing error, chunk_len: {}, error: {}", chunk.len(), err);
359 error!(
360 "error raw stream {}",
361 String::from_utf8_lossy(&chunk).to_string()
362 );
363 err
364 });
365 Ok(vec![match result {
366 Ok(obj) => {
367 trace!("de serialized: {:#?}", obj);
368 Ok(obj)
369 }
370 Err(err) => Err(err.into()),
371 }])
372 })
373 }
374
375 pub async fn retrieve_items_inner<S, N>(
376 &self,
377 namespace: N,
378 options: Option<ListOptions>,
379 ) -> Result<K8List<S>>
380 where
381 S: Spec,
382 N: Into<NameSpace> + Send + Sync,
383 {
384 let uri = items_uri::<S>(self.hostname(), namespace.into(), options);
385 debug!("{}: retrieving items: {}", S::label(), uri);
386 let items = self
387 .handle_request(Request::get(uri).body(Body::empty())?)
388 .await?;
389 trace!("items retrieved: {:#?}", items);
390 Ok(items)
391 }
392
393 pub async fn replace_item<S>(&self, value: UpdatedK8Obj<S>) -> Result<K8Obj<S>>
396 where
397 S: Spec,
398 {
399 let metadata = &value.metadata;
400 debug!( name = %metadata.name,"replace item");
401 trace!("replace {:#?}", value);
402 let uri = item_uri::<S>(
403 self.hostname(),
404 metadata.name(),
405 metadata.namespace(),
406 None,
407 None,
408 )?;
409
410 let bytes = serde_json::to_vec(&value)?;
411
412 trace!(
413 "replace uri: {}, raw: {}",
414 uri,
415 String::from_utf8_lossy(&bytes).to_string()
416 );
417
418 let request = Request::put(uri)
419 .header(CONTENT_TYPE, "application/json")
420 .body(bytes)?;
421
422 self.handle_request_bytes(request).await
423 }
424
425 pub async fn retrieve_log(
426 &self,
427 namespace: &str,
428 pod_name: &str,
429 container_name: &str,
430 ) -> Result<LogStream> {
431 let sub_resource = format!("/log?container={}&follow={}", container_name, false);
432 let uri = item_uri::<k8_types::core::pod::PodSpec>(
433 self.hostname(),
434 pod_name,
435 namespace,
436 Some(&sub_resource),
437 None,
438 )?;
439 let stream = self.stream_of_chunks(uri);
440 Ok(LogStream(Box::pin(stream)))
441 }
442}
443
444#[async_trait]
445impl MetadataClient for K8Client {
446 async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<Option<K8Obj<S>>>
448 where
449 S: Spec,
450 M: K8Meta + Send + Sync,
451 {
452 let uri = item_uri::<S>(
453 self.hostname(),
454 metadata.name(),
455 metadata.namespace(),
456 None,
457 None,
458 )?;
459 debug!("{}: retrieving item: {}", S::label(), uri);
460
461 let result: Result<K8Obj<S>> = self
462 .handle_request(Request::get(uri).body(Body::empty())?)
463 .await;
464
465 match result {
466 Ok(item) => Ok(Some(item)),
467 Err(err) => {
468 if let Some(MetaStatus {
469 code: Some(code), ..
470 }) = err.downcast_ref()
471 {
472 if *code == StatusCode::NOT_FOUND.as_u16() {
473 Ok(None)
474 } else {
475 Err(err)
476 }
477 } else {
478 Err(err)
479 }
480 }
481 }
482 }
483
484 async fn retrieve_items_with_option<S, N>(
485 &self,
486 namespace: N,
487 option: Option<ListArg>,
488 ) -> Result<K8List<S>>
489 where
490 S: Spec,
491 N: Into<NameSpace> + Send + Sync,
492 {
493 let list_option = option.map(|opt| ListOptions {
494 field_selector: opt.field_selector,
495 label_selector: opt.label_selector,
496 ..Default::default()
497 });
498 self.retrieve_items_inner(namespace, list_option).await
499 }
500
501 fn retrieve_items_in_chunks<'a, S, N>(
502 self: Arc<Self>,
503 namespace: N,
504 limit: u32,
505 option: Option<ListArg>,
506 ) -> BoxStream<'a, K8List<S>>
507 where
508 S: Spec + 'static,
509 N: Into<NameSpace> + Send + Sync + 'static,
510 {
511 ListStream::new(namespace.into(), limit, option, self).boxed()
512 }
513
514 async fn delete_item_with_option<S, M>(
515 &self,
516 metadata: &M,
517 option: Option<DeleteOptions>,
518 ) -> Result<DeleteStatus<S>>
519 where
520 S: Spec,
521 M: K8Meta + Send + Sync,
522 {
523 use k8_types::MetaStatus;
524
525 let uri = item_uri::<S>(
526 self.hostname(),
527 metadata.name(),
528 metadata.namespace(),
529 None,
530 None,
531 )?;
532 debug!("{}: delete item on url: {}", S::label(), uri);
533
534 let body = if let Some(option_value) = option {
535 let bytes = serde_json::to_vec(&option_value)?;
536 trace!("delete raw : {}", String::from_utf8_lossy(&bytes));
537
538 bytes
539 } else {
540 Vec::new()
541 };
542 let request = Request::delete(uri)
543 .header(ACCEPT, "application/json")
544 .body(body)?;
545 let values: serde_json::Map<String, serde_json::Value> =
546 self.handle_request_bytes(request).await?;
547 if let Some(kind) = values.get("kind") {
548 if kind == "Status" {
549 let status: MetaStatus =
550 serde::Deserialize::deserialize(serde_json::Value::Object(values))?;
551 Ok(DeleteStatus::Deleted(status))
552 } else {
553 let status: K8Obj<S> =
554 serde::Deserialize::deserialize(serde_json::Value::Object(values))?;
555 Ok(DeleteStatus::ForegroundDelete(status))
556 }
557 } else {
558 Err(anyhow::anyhow!("missing kind: {:#?}", values))
559 }
560 }
561
562 async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>>
564 where
565 S: Spec,
566 {
567 let namespace: NameSpace = value.metadata.namespace.clone().into();
568 let uri = items_uri::<S>(self.hostname(), namespace, None);
569 debug!("creating '{}'", uri);
570 trace!("creating RUST {:#?}", &value);
571
572 let bytes = serde_json::to_vec(&value)?;
573
574 trace!(
575 "create {} raw: {}",
576 S::label(),
577 String::from_utf8_lossy(&bytes).to_string()
578 );
579
580 let request = Request::post(uri)
581 .header(CONTENT_TYPE, "application/json")
582 .body(bytes)?;
583
584 self.handle_request_bytes(request).await
585 }
586
587 async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>>
589 where
590 S: Spec,
591 {
592 let uri = item_uri::<S>(
593 self.hostname(),
594 &value.metadata.name,
595 &value.metadata.namespace,
596 Some("/status"),
597 None,
598 )?;
599 debug!("updating '{}' status - uri: {}", value.metadata.name, uri);
600 trace!("update status: {:#?}", &value);
601
602 let bytes = serde_json::to_vec(&value)?;
603 trace!(
604 "update raw: {}",
605 String::from_utf8_lossy(&bytes).to_string()
606 );
607
608 let request = Request::put(uri)
609 .header(CONTENT_TYPE, "application/json")
610 .body(bytes)?;
611
612 self.handle_request_bytes(request).await
613 }
614
615 async fn patch<S, M>(
617 &self,
618 metadata: &M,
619 patch: &Value,
620 merge_type: PatchMergeType,
621 ) -> Result<K8Obj<S>>
622 where
623 S: Spec,
624 M: K8Meta + Display + Send + Sync,
625 {
626 debug!(%metadata, "patching");
627 trace!("patch json value: {:#?}", patch);
628 let uri = item_uri::<S>(
629 self.hostname(),
630 metadata.name(),
631 metadata.namespace(),
632 None,
633 None,
634 )?;
635
636 let bytes = serde_json::to_vec(&patch)?;
637
638 trace!(
639 "patch uri: {}, raw: {}",
640 uri,
641 String::from_utf8_lossy(&bytes).to_string()
642 );
643
644 let request = Request::patch(uri)
645 .header(ACCEPT, "application/json")
646 .header(CONTENT_TYPE, merge_type.content_type())
647 .body(bytes)?;
648
649 self.handle_request_bytes(request).await
650 }
651
652 async fn patch_status<S, M>(
654 &self,
655 metadata: &M,
656 patch: &Value,
657 merge_type: PatchMergeType,
658 ) -> Result<K8Obj<S>>
659 where
660 S: Spec,
661 M: K8Meta + Display + Send + Sync,
662 {
663 self.patch_subresource(metadata, String::from("/status"), patch, merge_type)
664 .await
665 }
666
667 async fn patch_subresource<S, M>(
668 &self,
669 metadata: &M,
670 subresource: String,
671 patch: &Value,
672 merge_type: PatchMergeType,
673 ) -> Result<K8Obj<S>>
674 where
675 S: Spec,
676 M: K8Meta + Display + Send + Sync,
677 {
678 tracing::info!(%metadata, "patching subresource");
679 tracing::info!("patch json value: {:#?}", patch);
680 let params = match &merge_type {
681 PatchMergeType::Apply(params) => {
682 let params = serde_qs::to_string(¶ms)?;
683 Some(params)
684 }
685 _ => None,
686 };
687 let uri = item_uri::<S>(
688 self.hostname(),
689 metadata.name(),
690 metadata.namespace(),
691 Some(&subresource),
692 params.as_deref(),
693 )?;
694
695 let bytes = serde_json::to_vec(&patch)?;
696
697 tracing::info!(
698 "patch subresource uri: {}, raw: {}",
699 uri,
700 String::from_utf8_lossy(&bytes).to_string()
701 );
702
703 let request = Request::patch(uri)
704 .header(ACCEPT, "application/json")
705 .header(CONTENT_TYPE, merge_type.content_type())
706 .body(bytes)?;
707
708 self.handle_request_bytes(request).await
709 }
710
711 fn watch_stream_since<S, N>(
713 &self,
714 namespace: N,
715 resource_version: Option<String>,
716 ) -> BoxStream<'_, TokenStreamResult<S>>
717 where
718 S: Spec + 'static,
719 S::Status: 'static,
720 S::Header: 'static,
721 N: Into<NameSpace>,
722 {
723 let opt = ListOptions {
724 watch: Some(true),
725 resource_version,
726 timeout_seconds: Some(3600),
727 ..Default::default()
728 };
729 let uri = items_uri::<S>(self.hostname(), namespace.into(), Some(opt));
730 self.stream(uri).boxed()
731 }
732}