k8_client/client/
client_impl.rs

1use std::fmt::Debug;
2use std::fmt::Display;
3use std::sync::Arc;
4
5use anyhow::Result;
6use async_trait::async_trait;
7use bytes::Buf;
8use futures_util::future::FutureExt;
9use futures_util::stream::empty;
10use futures_util::stream::BoxStream;
11use futures_util::stream::Stream;
12use futures_util::stream::StreamExt;
13use futures_util::stream::TryStreamExt;
14use http::StatusCode;
15use http::header::InvalidHeaderValue;
16use hyper::body::aggregate;
17use hyper::body::Bytes;
18use hyper::header::HeaderValue;
19use hyper::header::ACCEPT;
20use hyper::header::AUTHORIZATION;
21use hyper::header::CONTENT_TYPE;
22use hyper::Body;
23use hyper::Request;
24use hyper::Uri;
25use serde::{Serialize, Deserialize};
26use serde::de::DeserializeOwned;
27use serde_json::Value;
28use tracing::debug;
29use tracing::error;
30use tracing::trace;
31
32use k8_types::{UpdatedK8Obj, MetaStatus};
33use k8_config::K8Config;
34use k8_types::{InputK8Obj, K8List, K8Meta, K8Obj, DeleteStatus, K8Watch, Spec, UpdateK8ObjStatus};
35use k8_types::options::{ListOptions, DeleteOptions};
36
37use crate::uri::{item_uri, items_uri};
38use crate::meta_client::{ListArg, MetadataClient, NameSpace, PatchMergeType, TokenStreamResult};
39
40use super::wstream::WatchStream;
41use super::{HyperClient, HyperConfigBuilder, ListStream, LogStream};
42
43/// K8 Cluster accessible thru API
44#[derive(Debug)]
45pub struct K8Client {
46    client: HyperClient,
47    host: String,
48    token: Option<String>,
49}
50
51#[derive(Deserialize, Serialize, Debug, Eq, PartialEq, Default, Clone)]
52#[serde(rename_all = "camelCase", default)]
53pub struct VersionInfo {
54    pub major: String,
55    pub minor: String,
56    pub git_version: String,
57    pub git_commit: String,
58    pub git_treestate: String,
59    pub build_date: String,
60    pub go_version: String,
61    pub compiler: String,
62    pub platform: String,
63}
64
65impl K8Client {
66    // load using default k8 config
67    pub fn try_default() -> Result<Self> {
68        let config = K8Config::load()?;
69        Self::new(config)
70    }
71
72    pub fn new(config: K8Config) -> Result<Self> {
73        let helper = HyperConfigBuilder::new(config)?;
74        let host = helper.host();
75        let token = helper.token()?;
76        let client = helper.build()?;
77        debug!("using k8 token: {:#?}", token);
78        Ok(Self {
79            client,
80            host,
81            token,
82        })
83    }
84
85    pub async fn server_version(&self) -> Result<VersionInfo> {
86        let uri = format!("{}/version", self.host);
87        let info = self
88            .handle_request(Request::get(uri).body(Body::empty())?)
89            .await?;
90        trace!("version info retrieved: {:#?}", info);
91        Ok(info)
92    }
93
94    fn hostname(&self) -> &str {
95        &self.host
96    }
97
98    fn finish_request<B>(&self, request: &mut Request<B>) -> Result<(), InvalidHeaderValue>
99    where
100        B: Into<Body>,
101    {
102        if let Some(ref token) = self.token {
103            let full_token = format!("Bearer {}", token);
104            request
105                .headers_mut()
106                .insert(AUTHORIZATION, HeaderValue::from_str(&full_token)?);
107        }
108        Ok(())
109    }
110
111    /// handle request. this is async function
112    async fn handle_request<T>(&self, mut request: Request<Body>) -> Result<T>
113    where
114        T: DeserializeOwned,
115    {
116        use std::io::Read;
117
118        self.finish_request(&mut request)?;
119
120        trace!("request url: {}", request.uri());
121        trace!("request body: {:?}", request.body());
122
123        let resp = self.client.request(request).await?;
124
125        let status = resp.status();
126
127        if status.is_success() {
128            let mut reader = (aggregate(resp).await?).reader();
129            let mut buffer = Vec::new();
130            reader.read_to_end(&mut buffer)?;
131            trace!(%status, "success response: {}", String::from_utf8_lossy(&buffer));
132            serde_json::from_slice(&buffer).map_err(|err| {
133                error!("json error: {}", err);
134                error!("source: {}", String::from_utf8_lossy(&buffer));
135                err.into()
136            })
137        } else {
138            trace!(%status, "error response received");
139            let mut reader = (aggregate(resp).await?).reader();
140            let mut buffer = Vec::new();
141            reader.read_to_end(&mut buffer).map_err(|err| {
142                error!("unable to read error response: {}", err);
143                err
144            })?;
145            trace!("error response: {}", String::from_utf8_lossy(&buffer));
146            let api_status: MetaStatus = serde_json::from_slice(&buffer).map_err(|err| {
147                error!("json error: {}", err);
148                err
149            })?;
150            Err(api_status.into())
151        }
152    }
153
154    /// return stream of chunks, chunk is a bytes that are stream thru http channel
155    #[allow(clippy::useless_conversion)]
156    fn stream_of_chunks(&self, uri: Uri) -> impl Stream<Item = Bytes> {
157        debug!("streaming: {}", uri);
158
159        let request = http::Request::get(uri)
160            .body(Body::empty())
161            .and_then(|mut req| {
162                self.finish_request(&mut req)?;
163                Ok(req)
164            });
165
166        let http_client = self.client.clone();
167
168        let ft = async move {
169            let request = match request {
170                Ok(req) => req,
171                Err(err) => {
172                    error!("error building request: {}", err);
173                    return empty().right_stream();
174                }
175            };
176
177            match http_client.request(request).await {
178                Ok(response) => {
179                    trace!("res status: {}", response.status());
180                    trace!("res header: {:#?}", response.headers());
181                    WatchStream::new(response.into_body().map_err(|err| err.into())).left_stream()
182                }
183                Err(err) => {
184                    error!("error getting streaming: {}", err);
185                    empty().right_stream()
186                }
187            }
188        };
189
190        ft.flatten_stream()
191    }
192
193    /// return get stream of uri
194    fn stream<S>(&self, uri: Uri) -> impl Stream<Item = TokenStreamResult<S>> + '_
195    where
196        K8Watch<S>: DeserializeOwned,
197        S: Spec + 'static,
198        S::Status: 'static,
199        S::Header: 'static,
200    {
201        self.stream_of_chunks(uri).map(move |chunk| {
202            trace!(
203                "decoding raw stream : {}",
204                String::from_utf8_lossy(&chunk).to_string()
205            );
206
207            let result: Result<K8Watch<S>, serde_json::Error> = serde_json::from_slice(&chunk)
208                .map_err(|err| {
209                    error!("parsing error, chunk_len: {}, error: {}", chunk.len(), err);
210                    error!(
211                        "error raw stream {}",
212                        String::from_utf8_lossy(&chunk).to_string()
213                    );
214                    err
215                });
216            Ok(vec![match result {
217                Ok(obj) => {
218                    trace!("de serialized: {:#?}", obj);
219                    Ok(obj)
220                }
221                Err(err) => Err(err.into()),
222            }])
223        })
224    }
225
226    pub async fn retrieve_items_inner<S, N>(
227        &self,
228        namespace: N,
229        options: Option<ListOptions>,
230    ) -> Result<K8List<S>>
231    where
232        S: Spec,
233        N: Into<NameSpace> + Send + Sync,
234    {
235        let uri = items_uri::<S>(self.hostname(), namespace.into(), options);
236        debug!("{}: retrieving items: {}", S::label(), uri);
237        let items = self
238            .handle_request(Request::get(uri).body(Body::empty())?)
239            .await?;
240        trace!("items retrieved: {:#?}", items);
241        Ok(items)
242    }
243
244    /// replace existing object.
245    /// object must exist
246    pub async fn replace_item<S>(&self, value: UpdatedK8Obj<S>) -> Result<K8Obj<S>>
247    where
248        S: Spec,
249    {
250        let metadata = &value.metadata;
251        debug!( name = %metadata.name,"replace item");
252        trace!("replace {:#?}", value);
253        let uri = item_uri::<S>(
254            self.hostname(),
255            metadata.name(),
256            metadata.namespace(),
257            None,
258            None,
259        )?;
260
261        let bytes = serde_json::to_vec(&value)?;
262
263        trace!(
264            "replace uri: {}, raw: {}",
265            uri,
266            String::from_utf8_lossy(&bytes).to_string()
267        );
268
269        let request = Request::put(uri)
270            .header(CONTENT_TYPE, "application/json")
271            .body(bytes.into())?;
272
273        self.handle_request(request).await
274    }
275
276    pub async fn retrieve_log(
277        &self,
278        namespace: &str,
279        pod_name: &str,
280        container_name: &str,
281    ) -> Result<LogStream> {
282        let sub_resource = format!("/log?container={}&follow={}", container_name, false);
283        let uri = item_uri::<k8_types::core::pod::PodSpec>(
284            self.hostname(),
285            pod_name,
286            namespace,
287            Some(&sub_resource),
288            None,
289        )?;
290        let stream = self.stream_of_chunks(uri);
291        Ok(LogStream(Box::pin(stream)))
292    }
293}
294
295#[async_trait]
296impl MetadataClient for K8Client {
297    /// retrieval a single item
298    async fn retrieve_item<S, M>(&self, metadata: &M) -> Result<Option<K8Obj<S>>>
299    where
300        S: Spec,
301        M: K8Meta + Send + Sync,
302    {
303        let uri = item_uri::<S>(
304            self.hostname(),
305            metadata.name(),
306            metadata.namespace(),
307            None,
308            None,
309        )?;
310        debug!("{}: retrieving item: {}", S::label(), uri);
311
312        let result: Result<K8Obj<S>> = self
313            .handle_request(Request::get(uri).body(Body::empty())?)
314            .await;
315
316        match result {
317            Ok(item) => Ok(Some(item)),
318            Err(err) => {
319                if let Some(MetaStatus {
320                    code: Some(code), ..
321                }) = err.downcast_ref()
322                {
323                    if *code == StatusCode::NOT_FOUND.as_u16() {
324                        Ok(None)
325                    } else {
326                        Err(err)
327                    }
328                } else {
329                    Err(err)
330                }
331            }
332        }
333    }
334
335    async fn retrieve_items_with_option<S, N>(
336        &self,
337        namespace: N,
338        option: Option<ListArg>,
339    ) -> Result<K8List<S>>
340    where
341        S: Spec,
342        N: Into<NameSpace> + Send + Sync,
343    {
344        let list_option = option.map(|opt| ListOptions {
345            field_selector: opt.field_selector,
346            label_selector: opt.label_selector,
347            ..Default::default()
348        });
349        self.retrieve_items_inner(namespace, list_option).await
350    }
351
352    fn retrieve_items_in_chunks<'a, S, N>(
353        self: Arc<Self>,
354        namespace: N,
355        limit: u32,
356        option: Option<ListArg>,
357    ) -> BoxStream<'a, K8List<S>>
358    where
359        S: Spec + 'static,
360        N: Into<NameSpace> + Send + Sync + 'static,
361    {
362        ListStream::new(namespace.into(), limit, option, self).boxed()
363    }
364
365    async fn delete_item_with_option<S, M>(
366        &self,
367        metadata: &M,
368        option: Option<DeleteOptions>,
369    ) -> Result<DeleteStatus<S>>
370    where
371        S: Spec,
372        M: K8Meta + Send + Sync,
373    {
374        use k8_types::MetaStatus;
375
376        let uri = item_uri::<S>(
377            self.hostname(),
378            metadata.name(),
379            metadata.namespace(),
380            None,
381            None,
382        )?;
383        debug!("{}: delete item on url: {}", S::label(), uri);
384
385        let body = if let Some(option_value) = option {
386            let bytes = serde_json::to_vec(&option_value)?;
387            trace!("delete raw : {}", String::from_utf8_lossy(&bytes));
388
389            bytes.into()
390        } else {
391            Body::empty()
392        };
393        let request = Request::delete(uri)
394            .header(ACCEPT, "application/json")
395            .body(body)?;
396        let values: serde_json::Map<String, serde_json::Value> =
397            self.handle_request(request).await?;
398        if let Some(kind) = values.get("kind") {
399            if kind == "Status" {
400                let status: MetaStatus =
401                    serde::Deserialize::deserialize(serde_json::Value::Object(values))?;
402                Ok(DeleteStatus::Deleted(status))
403            } else {
404                let status: K8Obj<S> =
405                    serde::Deserialize::deserialize(serde_json::Value::Object(values))?;
406                Ok(DeleteStatus::ForegroundDelete(status))
407            }
408        } else {
409            Err(anyhow::anyhow!("missing kind: {:#?}", values))
410        }
411    }
412
413    /// create new object
414    async fn create_item<S>(&self, value: InputK8Obj<S>) -> Result<K8Obj<S>>
415    where
416        S: Spec,
417    {
418        let namespace: NameSpace = value.metadata.namespace.clone().into();
419        let uri = items_uri::<S>(self.hostname(), namespace, None);
420        debug!("creating '{}'", uri);
421        trace!("creating RUST {:#?}", &value);
422
423        let bytes = serde_json::to_vec(&value)?;
424
425        trace!(
426            "create {} raw: {}",
427            S::label(),
428            String::from_utf8_lossy(&bytes).to_string()
429        );
430
431        let request = Request::post(uri)
432            .header(CONTENT_TYPE, "application/json")
433            .body(bytes.into())?;
434
435        self.handle_request(request).await
436    }
437
438    /// update status
439    async fn update_status<S>(&self, value: &UpdateK8ObjStatus<S>) -> Result<K8Obj<S>>
440    where
441        S: Spec,
442    {
443        let uri = item_uri::<S>(
444            self.hostname(),
445            &value.metadata.name,
446            &value.metadata.namespace,
447            Some("/status"),
448            None,
449        )?;
450        debug!("updating '{}' status - uri: {}", value.metadata.name, uri);
451        trace!("update status: {:#?}", &value);
452
453        let bytes = serde_json::to_vec(&value)?;
454        trace!(
455            "update raw: {}",
456            String::from_utf8_lossy(&bytes).to_string()
457        );
458
459        let request = Request::put(uri)
460            .header(CONTENT_TYPE, "application/json")
461            .body(bytes.into())?;
462
463        self.handle_request(request).await
464    }
465
466    /// patch existing with spec
467    async fn patch<S, M>(
468        &self,
469        metadata: &M,
470        patch: &Value,
471        merge_type: PatchMergeType,
472    ) -> Result<K8Obj<S>>
473    where
474        S: Spec,
475        M: K8Meta + Display + Send + Sync,
476    {
477        debug!(%metadata, "patching");
478        trace!("patch json value: {:#?}", patch);
479        let uri = item_uri::<S>(
480            self.hostname(),
481            metadata.name(),
482            metadata.namespace(),
483            None,
484            None,
485        )?;
486
487        let bytes = serde_json::to_vec(&patch)?;
488
489        trace!(
490            "patch uri: {}, raw: {}",
491            uri,
492            String::from_utf8_lossy(&bytes).to_string()
493        );
494
495        let request = Request::patch(uri)
496            .header(ACCEPT, "application/json")
497            .header(CONTENT_TYPE, merge_type.content_type())
498            .body(bytes.into())?;
499
500        self.handle_request(request).await
501    }
502
503    /// patch status
504    async fn patch_status<S, M>(
505        &self,
506        metadata: &M,
507        patch: &Value,
508        merge_type: PatchMergeType,
509    ) -> Result<K8Obj<S>>
510    where
511        S: Spec,
512        M: K8Meta + Display + Send + Sync,
513    {
514        self.patch_subresource(metadata, String::from("/status"), patch, merge_type)
515            .await
516    }
517
518    async fn patch_subresource<S, M>(
519        &self,
520        metadata: &M,
521        subresource: String,
522        patch: &Value,
523        merge_type: PatchMergeType,
524    ) -> Result<K8Obj<S>>
525    where
526        S: Spec,
527        M: K8Meta + Display + Send + Sync,
528    {
529        tracing::info!(%metadata, "patching subresource");
530        tracing::info!("patch json value: {:#?}", patch);
531        let params = match &merge_type {
532            PatchMergeType::Apply(params) => {
533                let params = serde_qs::to_string(&params)?;
534                Some(params)
535            }
536            _ => None,
537        };
538        let uri = item_uri::<S>(
539            self.hostname(),
540            metadata.name(),
541            metadata.namespace(),
542            Some(&subresource),
543            params.as_deref(),
544        )?;
545
546        let bytes = serde_json::to_vec(&patch)?;
547
548        tracing::info!(
549            "patch subresource uri: {}, raw: {}",
550            uri,
551            String::from_utf8_lossy(&bytes).to_string()
552        );
553
554        let request = Request::patch(uri)
555            .header(ACCEPT, "application/json")
556            .header(CONTENT_TYPE, merge_type.content_type())
557            .body(bytes.into())?;
558
559        self.handle_request(request).await
560    }
561
562    /// stream items since resource versions
563    fn watch_stream_since<S, N>(
564        &self,
565        namespace: N,
566        resource_version: Option<String>,
567    ) -> BoxStream<'_, TokenStreamResult<S>>
568    where
569        S: Spec + 'static,
570        S::Status: 'static,
571        S::Header: 'static,
572        N: Into<NameSpace>,
573    {
574        let opt = ListOptions {
575            watch: Some(true),
576            resource_version,
577            timeout_seconds: Some(3600),
578            ..Default::default()
579        };
580        let uri = items_uri::<S>(self.hostname(), namespace.into(), Some(opt));
581        self.stream(uri).boxed()
582    }
583}