k8_client/client/
client_impl.rs

1use std::fmt::Debug;
2use std::fmt::Display;
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use bytes::Buf;
8use futures_util::future::FutureExt;
9use futures_util::stream::empty;
10use futures_util::stream::BoxStream;
11use futures_util::stream::Stream;
12use futures_util::stream::StreamExt;
13use futures_util::stream::TryStreamExt;
14use http::StatusCode;
15use http::header::InvalidHeaderValue;
16use hyper::body::aggregate;
17use hyper::body::Bytes;
18use hyper::header::HeaderValue;
19use hyper::header::ACCEPT;
20use hyper::header::AUTHORIZATION;
21use hyper::header::CONTENT_TYPE;
22use hyper::Body;
23use hyper::Request;
24use hyper::Uri;
25use serde::{Serialize, Deserialize};
26use serde::de::DeserializeOwned;
27use serde_json::Value;
28use tracing::debug;
29use tracing::error;
30use tracing::trace;
31
32use k8_types::{UpdatedK8Obj, MetaStatus};
33use k8_config::K8Config;
34use k8_types::{InputK8Obj, K8List, K8Meta, K8Obj, DeleteStatus, K8Watch, Spec, UpdateK8ObjStatus};
35use k8_types::options::{ListOptions, DeleteOptions};
36
37use crate::uri::{item_uri, items_uri};
38use crate::meta_client::{ListArg, MetadataClient, NameSpace, PatchMergeType, TokenStreamResult};
39
40use super::wstream::WatchStream;
41use super::{HyperClient, HyperConfigBuilder, ListStream, LogStream};
42
43const SA_TOKEN_PATH: &str = "/var/run/secrets/kubernetes.io/serviceaccount/token";
44
45/// K8 Cluster accessible thru API
46#[derive(Debug)]
47pub struct K8Client {
48    client: HyperClient,
49    host: String,
50    token: std::sync::RwLock<Option<String>>,
51}
52
53#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, Clone)]
54#[serde(rename_all = "camelCase", default)]
55pub struct VersionInfo {
56    pub major: String,
57    pub minor: String,
58    pub git_version: String,
59    pub git_commit: String,
60    pub git_treestate: String,
61    pub build_date: String,
62    pub go_version: String,
63    pub compiler: String,
64    pub platform: String,
65}
66
67impl K8Client {
68    // load using default k8 config
69    pub fn try_default() -> Result<Self> {
70        let config = K8Config::load()?;
71        Self::new(config)
72    }
73
74    pub fn new(config: K8Config) -> Result<Self> {
75        let helper = HyperConfigBuilder::new(config)?;
76        let host = helper.host();
77        let token = helper.token()?;
78        let client = helper.build()?;
79        debug!("using k8 token: {:#?}", token);
80        Ok(Self {
81            client,
82            host,
83            token: std::sync::RwLock::new(token),
84        })
85    }
86
87    pub async fn server_version(&self) -> Result<VersionInfo> {
88        let uri = format!("{}/version", self.host);
89        let info = self
90            .handle_request(Request::get(uri).body(Body::empty())?)
91            .await?;
92        trace!("version info retrieved: {:#?}", info);
93        Ok(info)
94    }
95
96    fn hostname(&self) -> &str {
97        &self.host
98    }
99
100    fn finish_request<B>(&self, request: &mut Request<B>) -> Result<(), InvalidHeaderValue>
101    where
102        B: Into<Body>,
103    {
104        if let Ok(guard) = self.token.read() {
105            if let Some(ref token) = *guard {
106                let full_token = format!("Bearer {token}");
107                request
108                    .headers_mut()
109                    .insert(AUTHORIZATION, HeaderValue::from_str(&full_token)?);
110            }
111        }
112        Ok(())
113    }
114
115    /// Send a request with a Vec<u8> body, and on 401 retry once with a freshly read SA token.
116    async fn handle_request_bytes<T>(&self, request: Request<Vec<u8>>) -> Result<T>
117    where
118        T: DeserializeOwned,
119    {
120        use std::io::Read;
121
122        let method = request.method().clone();
123        let uri = request.uri().clone();
124        let version = request.version();
125        let headers = request.headers().clone();
126        let body_buf = request.into_body();
127
128        trace!("request url: {}", uri);
129
130        // first attempt
131        let mut req1 = http::Request::builder()
132            .method(method.clone())
133            .uri(uri.clone())
134            .version(version)
135            .body(Body::from(body_buf.clone()))?;
136        {
137            let h = req1.headers_mut();
138            for (k, v) in headers.iter() {
139                h.insert(k.clone(), v.clone());
140            }
141        }
142        self.finish_request(&mut req1)?;
143
144        let resp1 = self.client.request(req1).await?;
145        let status1 = resp1.status();
146
147        let mut r1 = (aggregate(resp1).await?).reader();
148        let mut b1 = Vec::new();
149        r1.read_to_end(&mut b1)?;
150
151        // success
152        if status1.is_success() {
153            return serde_json::from_slice(&b1).map_err(|err| {
154                error!("json error: {}", err);
155                error!("source: {}", String::from_utf8_lossy(&b1));
156                err.into()
157            });
158        }
159
160        // retry once on 401 with a freshly-read token
161        if status1 == StatusCode::UNAUTHORIZED {
162            if let Ok(fresh) = std::fs::read_to_string(SA_TOKEN_PATH) {
163                let fresh_trimmed = fresh.trim().to_owned();
164
165                let mut req2 = http::Request::builder()
166                    .method(method)
167                    .uri(uri)
168                    .version(version)
169                    .body(Body::from(body_buf))?;
170                {
171                    let h = req2.headers_mut();
172                    for (k, v) in headers.iter() {
173                        h.insert(k.clone(), v.clone());
174                    }
175                    let bearer = format!("Bearer {}", fresh_trimmed);
176                    h.insert(AUTHORIZATION, HeaderValue::from_str(&bearer)?);
177                }
178
179                let resp2 = self.client.request(req2).await?;
180                let status2 = resp2.status();
181
182                let mut r2 = (aggregate(resp2).await?).reader();
183                let mut b2 = Vec::new();
184                r2.read_to_end(&mut b2)?;
185
186                if status2.is_success() {
187                    if let Ok(mut w) = self.token.write() {
188                        *w = Some(fresh_trimmed);
189                    }
190                    trace!(%status2, "success response (retry): {}", String::from_utf8_lossy(&b2));
191                    return serde_json::from_slice(&b2).map_err(|err| {
192                        error!("json error: {}", err);
193                        error!("source: {}", String::from_utf8_lossy(&b2));
194                        err.into()
195                    });
196                } else {
197                    trace!(%status2, "error response received (retry)");
198                    let api_status: MetaStatus = serde_json::from_slice(&b2).map_err(|err| {
199                        error!("json error: {}", err);
200                        err
201                    })?;
202                    return Err(api_status.into());
203                }
204            }
205        }
206
207        let api_status: MetaStatus = serde_json::from_slice(&b1).map_err(|err| {
208            error!("json error: {}", err);
209            err
210        })?;
211        Err(api_status.into())
212    }
213
214    /// handle request. this is async function
215    async fn handle_request<T>(&self, mut request: Request<Body>) -> Result<T>
216    where
217        T: DeserializeOwned,
218    {
219        use std::io::Read;
220
221        self.finish_request(&mut request)?;
222
223        trace!("request url: {}", request.uri());
224        trace!("request body: {:?}", request.body());
225
226        let resp = self.client.request(request).await?;
227
228        let status = resp.status();
229
230        if status.is_success() {
231            let mut reader = (aggregate(resp).await?).reader();
232            let mut buffer = Vec::new();
233            reader.read_to_end(&mut buffer)?;
234            trace!(%status, "success response: {}", String::from_utf8_lossy(&buffer));
235            serde_json::from_slice(&buffer).map_err(|err| {
236                error!("json error: {}", err);
237                error!("source: {}", String::from_utf8_lossy(&buffer));
238                err.into()
239            })
240        } else {
241            trace!(%status, "error response received");
242            let mut reader = (aggregate(resp).await?).reader();
243            let mut buffer = Vec::new();
244            reader.read_to_end(&mut buffer).map_err(|err| {
245                error!("unable to read error response: {}", err);
246                err
247            })?;
248            trace!("error response: {}", String::from_utf8_lossy(&buffer));
249            let api_status: MetaStatus = serde_json::from_slice(&buffer).map_err(|err| {
250                error!("json error: {}", err);
251                err
252            })?;
253            Err(api_status.into())
254        }
255    }
256
257    /// return stream of chunks, chunk is a bytes that are stream thru http channel
258    #[allow(clippy::useless_conversion)]
259    fn stream_of_chunks(&self, uri: Uri) -> impl Stream<Item = Bytes> {
260        debug!("streaming: {}", uri);
261
262        let request = http::Request::get(uri)
263            .body(Body::empty())
264            .and_then(|mut req| {
265                self.finish_request(&mut req)?;
266                Ok(req)
267            });
268
269        let http_client = self.client.clone();
270
271        let ft = async move {
272            let request = match request {
273                Ok(req) => req,
274                Err(err) => {
275                    error!("error building request: {}", err);
276                    return empty().right_stream();
277                }
278            };
279
280            // Capture for potential retry
281            let method = request.method().clone();
282            let uri2 = request.uri().clone();
283            let version = request.version();
284            let headers = request.headers().clone();
285
286            match http_client.request(request).await {
287                Ok(response) => {
288                    trace!("res status: {}", response.status());
289                    trace!("res header: {:#?}", response.headers());
290
291                    // choose final response body; retry once on 401
292                    let final_body = if response.status() == StatusCode::UNAUTHORIZED {
293                        if let Ok(fresh) = std::fs::read_to_string(SA_TOKEN_PATH) {
294                            // rebuild the same GET with a fresh Authorization header
295                            let mut retry_req = http::Request::builder()
296                                .method(method)
297                                .uri(uri2)
298                                .version(version)
299                                .body(Body::empty())
300                                .expect("failed to build retry request");
301                            {
302                                let h = retry_req.headers_mut();
303                                for (k, v) in headers.iter() {
304                                    h.insert(k.clone(), v.clone());
305                                }
306                                let bearer = format!("Bearer {}", fresh.trim());
307                                if let Ok(hv) = HeaderValue::from_str(&bearer) {
308                                    h.insert(AUTHORIZATION, hv);
309                                }
310                            }
311                            match http_client.request(retry_req).await {
312                                Ok(resp2) => {
313                                    trace!("res2 status: {}", resp2.status());
314                                    trace!("res2 header: {:#?}", resp2.headers());
315                                    resp2.into_body()
316                                }
317                                Err(err) => {
318                                    error!("error getting streaming (retry): {}", err);
319                                    return empty().right_stream();
320                                }
321                            }
322                        } else {
323                            // couldn't read fresh token, fall back to original body
324                            response.into_body()
325                        }
326                    } else {
327                        response.into_body()
328                    };
329
330                    WatchStream::new(final_body.map_err(|err| err.into())).left_stream()
331                }
332                Err(err) => {
333                    error!("error getting streaming: {}", err);
334                    empty().right_stream()
335                }
336            }
337        };
338
339        ft.flatten_stream()
340    }
341
342    /// return get stream of uri
343    fn stream<S>(&self, uri: Uri) -> impl Stream<Item = TokenStreamResult<S>> + '_
344    where
345        K8Watch<S>: DeserializeOwned,
346        S: Spec + 'static,
347        S::Status: 'static,
348        S::Header: 'static,
349    {
350        self.stream_of_chunks(uri).map(move |chunk| {
351            trace!(
352                "decoding raw stream : {}",
353                String::from_utf8_lossy(&chunk).to_string()
354            );
355
356            let result: Result<K8Watch<S>, serde_json::Error> = serde_json::from_slice(&chunk)
357                .map_err(|err| {
358                    error!("parsing error, chunk_len: {}, error: {}", chunk.len(), err);
359                    error!(
360                        "error raw stream {}",
361                        String::from_utf8_lossy(&chunk).to_string()
362                    );
363                    err
364                });
365            Ok(vec![match result {
366                Ok(obj) => {
367                    trace!("de serialized: {:#?}", obj);
368                    Ok(obj)
369                }
370                Err(err) => Err(err.into()),
371            }])
372        })
373    }
374
375    pub async fn retrieve_items_inner<S, N>(
376        &self,
377        namespace: N,
378        options: Option<ListOptions>,
379    ) -> Result<K8List<S>>
380    where
381        S: Spec,
382        N: Into<NameSpace> + Send + Sync,
383    {
384        let uri = items_uri::<S>(self.hostname(), namespace.into(), options);
385        debug!("{}: retrieving items: {}", S::label(), uri);
386        let items = self
387            .handle_request(Request::get(uri).body(Body::empty())?)
388            .await?;
389        trace!("items retrieved: {:#?}", items);
390        Ok(items)
391    }
392
393    /// replace existing object.
394    /// object must exist
395    pub async fn replace_item<S>(&self, value: UpdatedK8Obj<S>) -> Result<K8Obj<S>>
396    where
397        S: Spec,
398    {
399        let metadata = &value.metadata;
400        debug!( name = %metadata.name,"replace item");
401        trace!("replace {:#?}", value);
402        let uri = item_uri::<S>(
403            self.hostname(),
404            metadata.name(),
405            metadata.namespace(),
406            None,
407            None,
408        )?;
409
410        let bytes = serde_json::to_vec(&value)?;
411
412        trace!(
413            "replace uri: {}, raw: {}",
414            uri,
415            String::from_utf8_lossy(&bytes).to_string()
416        );
417
418        let request = Request::put(uri)
419            .header(CONTENT_TYPE, "application/json")
420            .body(bytes)?;
421
422        self.handle_request_bytes(request).await
423    }
424
425    pub async fn retrieve_log(
426        &self,
427        namespace: &str,
428        pod_name: &str,
429        container_name: &str,
430    ) -> Result<LogStream> {
431        let sub_resource = format!("/log?container={}&follow={}", container_name, false);
432        let uri = item_uri::<k8_types::core::pod::PodSpec>(
433            self.hostname(),
434            pod_name,
435            namespace,
436            Some(&sub_resource),
437            None,
438        )?;
439        let stream = self.stream_of_chunks(uri);
440        Ok(LogStream(Box::pin(stream)))
441    }
442}
443
444#[async_trait]
445impl MetadataClient for K8Client {
446    /// retrieval a single item
447    async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<Option<K8Obj<S>>>
448    where
449        S: Spec,
450        M: K8Meta + Send + Sync,
451    {
452        let uri = item_uri::<S>(
453            self.hostname(),
454            metadata.name(),
455            metadata.namespace(),
456            None,
457            None,
458        )?;
459        debug!("{}: retrieving item: {}", S::label(), uri);
460
461        let result: Result<K8Obj<S>> = self
462            .handle_request(Request::get(uri).body(Body::empty())?)
463            .await;
464
465        match result {
466            Ok(item) => Ok(Some(item)),
467            Err(err) => {
468                if let Some(MetaStatus {
469                    code: Some(code), ..
470                }) = err.downcast_ref()
471                {
472                    if *code == StatusCode::NOT_FOUND.as_u16() {
473                        Ok(None)
474                    } else {
475                        Err(err)
476                    }
477                } else {
478                    Err(err)
479                }
480            }
481        }
482    }
483
484    async fn retrieve_items_with_option<S, N>(
485        &self,
486        namespace: N,
487        option: Option<ListArg>,
488    ) -> Result<K8List<S>>
489    where
490        S: Spec,
491        N: Into<NameSpace> + Send + Sync,
492    {
493        let list_option = option.map(|opt| ListOptions {
494            field_selector: opt.field_selector,
495            label_selector: opt.label_selector,
496            ..Default::default()
497        });
498        self.retrieve_items_inner(namespace, list_option).await
499    }
500
501    fn retrieve_items_in_chunks<'a, S, N>(
502        self: Arc<Self>,
503        namespace: N,
504        limit: u32,
505        option: Option<ListArg>,
506    ) -> BoxStream<'a, K8List<S>>
507    where
508        S: Spec + 'static,
509        N: Into<NameSpace> + Send + Sync + 'static,
510    {
511        ListStream::new(namespace.into(), limit, option, self).boxed()
512    }
513
514    async fn delete_item_with_option<S, M>(
515        &self,
516        metadata: &M,
517        option: Option<DeleteOptions>,
518    ) -> Result<DeleteStatus<S>>
519    where
520        S: Spec,
521        M: K8Meta + Send + Sync,
522    {
523        use k8_types::MetaStatus;
524
525        let uri = item_uri::<S>(
526            self.hostname(),
527            metadata.name(),
528            metadata.namespace(),
529            None,
530            None,
531        )?;
532        debug!("{}: delete item on url: {}", S::label(), uri);
533
534        let body = if let Some(option_value) = option {
535            let bytes = serde_json::to_vec(&option_value)?;
536            trace!("delete raw : {}", String::from_utf8_lossy(&bytes));
537
538            bytes
539        } else {
540            Vec::new()
541        };
542        let request = Request::delete(uri)
543            .header(ACCEPT, "application/json")
544            .body(body)?;
545        let values: serde_json::Map<String, serde_json::Value> =
546            self.handle_request_bytes(request).await?;
547        if let Some(kind) = values.get("kind") {
548            if kind == "Status" {
549                let status: MetaStatus =
550                    serde::Deserialize::deserialize(serde_json::Value::Object(values))?;
551                Ok(DeleteStatus::Deleted(status))
552            } else {
553                let status: K8Obj<S> =
554                    serde::Deserialize::deserialize(serde_json::Value::Object(values))?;
555                Ok(DeleteStatus::ForegroundDelete(status))
556            }
557        } else {
558            Err(anyhow::anyhow!("missing kind: {:#?}", values))
559        }
560    }
561
562    /// create new object
563    async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>>
564    where
565        S: Spec,
566    {
567        let namespace: NameSpace = value.metadata.namespace.clone().into();
568        let uri = items_uri::<S>(self.hostname(), namespace, None);
569        debug!("creating '{}'", uri);
570        trace!("creating RUST {:#?}", &value);
571
572        let bytes = serde_json::to_vec(&value)?;
573
574        trace!(
575            "create {} raw: {}",
576            S::label(),
577            String::from_utf8_lossy(&bytes).to_string()
578        );
579
580        let request = Request::post(uri)
581            .header(CONTENT_TYPE, "application/json")
582            .body(bytes)?;
583
584        self.handle_request_bytes(request).await
585    }
586
587    /// update status
588    async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>>
589    where
590        S: Spec,
591    {
592        let uri = item_uri::<S>(
593            self.hostname(),
594            &value.metadata.name,
595            &value.metadata.namespace,
596            Some("/status"),
597            None,
598        )?;
599        debug!("updating '{}' status - uri: {}", value.metadata.name, uri);
600        trace!("update status: {:#?}", &value);
601
602        let bytes = serde_json::to_vec(&value)?;
603        trace!(
604            "update raw: {}",
605            String::from_utf8_lossy(&bytes).to_string()
606        );
607
608        let request = Request::put(uri)
609            .header(CONTENT_TYPE, "application/json")
610            .body(bytes)?;
611
612        self.handle_request_bytes(request).await
613    }
614
615    /// patch existing with spec
616    async fn patch<S, M>(
617        &self,
618        metadata: &M,
619        patch: &Value,
620        merge_type: PatchMergeType,
621    ) -> Result<K8Obj<S>>
622    where
623        S: Spec,
624        M: K8Meta + Display + Send + Sync,
625    {
626        debug!(%metadata, "patching");
627        trace!("patch json value: {:#?}", patch);
628        let uri = item_uri::<S>(
629            self.hostname(),
630            metadata.name(),
631            metadata.namespace(),
632            None,
633            None,
634        )?;
635
636        let bytes = serde_json::to_vec(&patch)?;
637
638        trace!(
639            "patch uri: {}, raw: {}",
640            uri,
641            String::from_utf8_lossy(&bytes).to_string()
642        );
643
644        let request = Request::patch(uri)
645            .header(ACCEPT, "application/json")
646            .header(CONTENT_TYPE, merge_type.content_type())
647            .body(bytes)?;
648
649        self.handle_request_bytes(request).await
650    }
651
652    /// patch status
653    async fn patch_status<S, M>(
654        &self,
655        metadata: &M,
656        patch: &Value,
657        merge_type: PatchMergeType,
658    ) -> Result<K8Obj<S>>
659    where
660        S: Spec,
661        M: K8Meta + Display + Send + Sync,
662    {
663        self.patch_subresource(metadata, String::from("/status"), patch, merge_type)
664            .await
665    }
666
667    async fn patch_subresource<S, M>(
668        &self,
669        metadata: &M,
670        subresource: String,
671        patch: &Value,
672        merge_type: PatchMergeType,
673    ) -> Result<K8Obj<S>>
674    where
675        S: Spec,
676        M: K8Meta + Display + Send + Sync,
677    {
678        tracing::info!(%metadata, "patching subresource");
679        tracing::info!("patch json value: {:#?}", patch);
680        let params = match &merge_type {
681            PatchMergeType::Apply(params) => {
682                let params = serde_qs::to_string(&params)?;
683                Some(params)
684            }
685            _ => None,
686        };
687        let uri = item_uri::<S>(
688            self.hostname(),
689            metadata.name(),
690            metadata.namespace(),
691            Some(&subresource),
692            params.as_deref(),
693        )?;
694
695        let bytes = serde_json::to_vec(&patch)?;
696
697        tracing::info!(
698            "patch subresource uri: {}, raw: {}",
699            uri,
700            String::from_utf8_lossy(&bytes).to_string()
701        );
702
703        let request = Request::patch(uri)
704            .header(ACCEPT, "application/json")
705            .header(CONTENT_TYPE, merge_type.content_type())
706            .body(bytes)?;
707
708        self.handle_request_bytes(request).await
709    }
710
711    /// stream items since resource versions
712    fn watch_stream_since<S, N>(
713        &self,
714        namespace: N,
715        resource_version: Option<String>,
716    ) -> BoxStream<'_, TokenStreamResult<S>>
717    where
718        S: Spec + 'static,
719        S::Status: 'static,
720        S::Header: 'static,
721        N: Into<NameSpace>,
722    {
723        let opt = ListOptions {
724            watch: Some(true),
725            resource_version,
726            timeout_seconds: Some(3600),
727            ..Default::default()
728        };
729        let uri = items_uri::<S>(self.hostname(), namespace.into(), Some(opt));
730        self.stream(uri).boxed()
731    }
732}