Skip to main content

kube_client/client/
mod.rs

1//! API client for interacting with the Kubernetes API
2//!
3//! The [`Client`] uses standard kube error handling.
4//!
5//! This client can be used on its own or in conjuction with the [`Api`][crate::api::Api]
6//! type for more structured interaction with the kubernetes API.
7//!
8//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
9//! retrieve the resources served by the kubernetes API.
10use either::{Either, Left, Right};
11use futures::{AsyncBufRead, StreamExt, TryStream, TryStreamExt, future::BoxFuture};
12use http::{self, Request, Response};
13use http_body_util::BodyExt;
14#[cfg(feature = "ws")] use hyper_util::rt::TokioIo;
15use jiff::Timestamp;
16use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
17use kube_core::{discovery::v2::ACCEPT_AGGREGATED_DISCOVERY_V2, response::Status};
18use serde::de::DeserializeOwned;
19use serde_json::{self, Value};
20#[cfg(feature = "ws")]
21use tokio_tungstenite::{WebSocketStream, tungstenite as ws};
22use tokio_util::{
23    codec::{FramedRead, LinesCodec, LinesCodecError},
24    io::StreamReader,
25};
26use tower::{BoxError, Service, ServiceExt as _, buffer::Buffer};
27use tower_http::ServiceExt as _;
28
29pub use self::body::Body;
30use crate::{Config, Error, Result, api::WatchEvent, config::Kubeconfig};
31
32mod auth;
33mod body;
34mod builder;
35pub use kube_core::discovery::v2::{
36    APIGroupDiscovery, APIGroupDiscoveryList, APIResourceDiscovery, APISubresourceDiscovery,
37    APIVersionDiscovery, GroupVersionKind as DiscoveryGroupVersionKind,
38};
39#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
40#[cfg(feature = "unstable-client")]
41mod client_ext;
42#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
43#[cfg(feature = "unstable-client")]
44pub use client_ext::scope;
45mod config_ext;
46pub use auth::Error as AuthError;
47pub use config_ext::ConfigExt;
48pub mod middleware;
49pub mod retry;
50
51#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))] mod tls;
52
53#[cfg(feature = "openssl-tls")]
54pub use tls::openssl_tls::Error as OpensslTlsError;
55#[cfg(feature = "rustls-tls")] pub use tls::rustls_tls::Error as RustlsTlsError;
56#[cfg(feature = "ws")] mod upgrade;
57
58#[cfg(feature = "oauth")]
59#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
60pub use auth::OAuthError;
61
62#[cfg(feature = "oidc")]
63#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
64pub use auth::oidc_errors;
65
66#[cfg(feature = "ws")] pub use upgrade::UpgradeConnectionError;
67
68#[cfg(feature = "kubelet-debug")]
69#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
70mod kubelet_debug;
71
72pub use builder::{ClientBuilder, DynBody};
73
74/// Client for connecting with a Kubernetes cluster.
75///
76/// The easiest way to instantiate the client is either by
77/// inferring the configuration from the environment using
78/// [`Client::try_default`] or with an existing [`Config`]
79/// using [`Client::try_from`].
80#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
81#[derive(Clone)]
82pub struct Client {
83    // - `Buffer` for cheap clone
84    // - `BoxFuture` for dynamic response future type
85    inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
86    default_ns: String,
87    valid_until: Option<Timestamp>,
88}
89
90/// Represents a WebSocket connection.
91/// Value returned by [`Client::connect`].
92#[cfg(feature = "ws")]
93#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
94pub struct Connection {
95    stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
96    protocol: upgrade::StreamProtocol,
97}
98
99#[cfg(feature = "ws")]
100#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
101impl Connection {
102    /// Return true if the stream supports graceful close signaling.
103    pub fn supports_stream_close(&self) -> bool {
104        self.protocol.supports_stream_close()
105    }
106
107    /// Transform into the raw WebSocketStream.
108    pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
109        self.stream
110    }
111}
112
113/// Constructors and low-level api interfaces.
114///
115/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
116///
117/// The many various lower level interfaces here are for more advanced use-cases with specific requirements.
118impl Client {
119    /// Create a [`Client`] using a custom `Service` stack.
120    ///
121    /// [`ConfigExt`](crate::client::ConfigExt) provides extensions for
122    /// building a custom stack.
123    ///
124    /// To create with the default stack with a [`Config`], use
125    /// [`Client::try_from`].
126    ///
127    /// To create with the default stack with an inferred [`Config`], use
128    /// [`Client::try_default`].
129    ///
130    /// # Example
131    ///
132    /// ```rust
133    /// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
134    /// use kube::{client::ConfigExt, Client, Config};
135    /// use tower::{BoxError, ServiceBuilder};
136    /// use hyper_util::rt::TokioExecutor;
137    ///
138    /// let config = Config::infer().await?;
139    /// let service = ServiceBuilder::new()
140    ///     .layer(config.base_uri_layer())
141    ///     .option_layer(config.auth_layer()?)
142    ///     .map_err(BoxError::from)
143    ///     .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build_http());
144    /// let client = Client::new(service, config.default_namespace);
145    /// # Ok(())
146    /// # }
147    /// ```
148    pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
149    where
150        S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
151        S::Future: Send + 'static,
152        S::Error: Into<BoxError>,
153        B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
154        B::Error: Into<BoxError>,
155        T: Into<String>,
156    {
157        // Transform response body to `crate::client::Body` and use type erased error to avoid type parameters.
158        let service = service
159            .map_response_body(Body::wrap_body)
160            .map_err(Into::into)
161            .boxed();
162        Self {
163            inner: Buffer::new(service, 1024),
164            default_ns: default_namespace.into(),
165            valid_until: None,
166        }
167    }
168
169    /// Sets an expiration timestamp to the client, which has to be checked by the user using [`Client::valid_until`] function.
170    pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
171        Client { valid_until, ..self }
172    }
173
174    /// Get the expiration timestamp of the client, if it has been set.
175    pub fn valid_until(&self) -> &Option<Timestamp> {
176        &self.valid_until
177    }
178
179    /// Create and initialize a [`Client`] using the inferred configuration.
180    ///
181    /// Will use [`Config::infer`] which attempts to load the local kubeconfig first,
182    /// and then if that fails, trying the in-cluster environment variables.
183    ///
184    /// Will fail if neither configuration could be loaded.
185    ///
186    /// ```rust
187    /// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
188    /// # use kube::Client;
189    /// let client = Client::try_default().await?;
190    /// # Ok(())
191    /// # }
192    /// ```
193    ///
194    /// If you already have a [`Config`] then use [`Client::try_from`](Self::try_from)
195    /// instead.
196    pub async fn try_default() -> Result<Self> {
197        Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
198    }
199
200    /// Get the default namespace for the client
201    ///
202    /// The namespace is either configured on `context` in the kubeconfig,
203    /// falls back to `default` when running locally,
204    /// or uses the service account's namespace when deployed in-cluster.
205    pub fn default_namespace(&self) -> &str {
206        &self.default_ns
207    }
208
209    /// Perform a raw HTTP request against the API and return the raw response back.
210    /// This method can be used to get raw access to the API which may be used to, for example,
211    /// create a proxy server or application-level gateway between localhost and the API server.
212    pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
213        let mut svc = self.inner.clone();
214        let res = svc
215            .ready()
216            .await
217            .map_err(Error::Service)?
218            .call(request)
219            .await
220            .map_err(|err| {
221                // Error decorating request
222                err.downcast::<Error>()
223                    .map(|e| *e)
224                    // Error requesting
225                    .or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
226                    // Error from another middleware
227                    .unwrap_or_else(Error::Service)
228            })?;
229        Ok(res)
230    }
231
232    /// Make WebSocket connection.
233    #[cfg(feature = "ws")]
234    #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
235    pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
236        use http::header::HeaderValue;
237        let (mut parts, body) = request.into_parts();
238        parts
239            .headers
240            .insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
241        parts
242            .headers
243            .insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
244        parts.headers.insert(
245            http::header::SEC_WEBSOCKET_VERSION,
246            HeaderValue::from_static("13"),
247        );
248        let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
249        parts.headers.insert(
250            http::header::SEC_WEBSOCKET_KEY,
251            key.parse().expect("valid header value"),
252        );
253        upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;
254
255        let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
256        let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
257        match hyper::upgrade::on(res).await {
258            Ok(upgraded) => Ok(Connection {
259                stream: WebSocketStream::from_raw_socket(
260                    TokioIo::new(upgraded),
261                    ws::protocol::Role::Client,
262                    None,
263                )
264                .await,
265                protocol,
266            }),
267
268            Err(e) => Err(Error::UpgradeConnection(
269                UpgradeConnectionError::GetPendingUpgrade(e),
270            )),
271        }
272    }
273
274    /// Perform a raw HTTP request against the API and deserialize the response
275    /// as JSON to some known type.
276    pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
277    where
278        T: DeserializeOwned,
279    {
280        let text = self.request_text(request).await?;
281
282        serde_json::from_str(&text).map_err(|e| {
283            tracing::warn!("{}, {:?}", text, e);
284            Error::SerdeError(e)
285        })
286    }
287
288    /// Perform a raw HTTP request against the API and get back the response
289    /// as a string
290    pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
291        let res = self.send(request.map(Body::from)).await?;
292        let res = handle_api_errors(res).await?;
293        let body_bytes = res.into_body().collect().await?.to_bytes();
294        let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
295        Ok(text)
296    }
297
298    /// Perform a raw HTTP request against the API and stream the response body.
299    ///
300    /// The response can be processed using [`AsyncReadExt`](futures::AsyncReadExt)
301    /// and [`AsyncBufReadExt`](futures::AsyncBufReadExt).
302    pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead + use<>> {
303        let res = self.send(request.map(Body::from)).await?;
304        let res = handle_api_errors(res).await?;
305        // Map the error, since we want to convert this into an `AsyncBufReader` using
306        // `into_async_read` which specifies `std::io::Error` as the stream's error type.
307        let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
308        Ok(body.into_async_read())
309    }
310
311    /// Perform a raw HTTP request against the API and get back either an object
312    /// deserialized as JSON or a [`Status`] Object.
313    pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
314    where
315        T: DeserializeOwned,
316    {
317        let text = self.request_text(request).await?;
318        // It needs to be JSON:
319        let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
320        if v["kind"] == "Status" {
321            tracing::trace!("Status from {}", text);
322            Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
323                tracing::warn!("{}, {:?}", text, e);
324                Error::SerdeError(e)
325            })?))
326        } else {
327            Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
328                tracing::warn!("{}, {:?}", text, e);
329                Error::SerdeError(e)
330            })?))
331        }
332    }
333
334    /// Perform a raw request and get back a stream of [`WatchEvent`] objects
335    pub async fn request_events<T>(
336        &self,
337        request: Request<Vec<u8>>,
338    ) -> Result<impl TryStream<Item = Result<WatchEvent<T>>> + use<T>>
339    where
340        T: Clone + DeserializeOwned,
341    {
342        let res = self.send(request.map(Body::from)).await?;
343        // trace!("Streaming from {} -> {}", res.url(), res.status().as_str());
344        tracing::trace!("headers: {:?}", res.headers());
345
346        let frames = FramedRead::new(
347            StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
348                // Unexpected EOF from chunked decoder.
349                // Tends to happen when watching for 300+s. This will be ignored.
350                if e.to_string().contains("unexpected EOF during chunk") {
351                    return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
352                }
353                std::io::Error::other(e)
354            })),
355            LinesCodec::new(),
356        );
357
358        Ok(frames.filter_map(|res| async {
359            match res {
360                Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
361                    Ok(event) => Some(Ok(event)),
362                    Err(e) => {
363                        // Ignore EOF error that can happen for incomplete line from `decode_eof`.
364                        if e.is_eof() {
365                            return None;
366                        }
367
368                        // Got general error response
369                        if let Ok(status) = serde_json::from_str::<Status>(&line) {
370                            return Some(Err(Error::Api(status.boxed())));
371                        }
372                        // Parsing error
373                        Some(Err(Error::SerdeError(e)))
374                    }
375                },
376
377                Err(LinesCodecError::Io(e)) => match e.kind() {
378                    // Client timeout
379                    std::io::ErrorKind::TimedOut => {
380                        tracing::warn!("timeout in poll: {}", e); // our client timeout
381                        None
382                    }
383                    // Unexpected EOF from chunked decoder.
384                    // Tends to happen after 300+s of watching.
385                    std::io::ErrorKind::UnexpectedEof => {
386                        tracing::warn!("eof in poll: {}", e);
387                        None
388                    }
389                    _ => Some(Err(Error::ReadEvents(e))),
390                },
391
392                // Reached the maximum line length without finding a newline.
393                // This should never happen because we're using the default `usize::MAX`.
394                Err(LinesCodecError::MaxLineLengthExceeded) => {
395                    Some(Err(Error::LinesCodecMaxLineLengthExceeded))
396                }
397            }
398        }))
399    }
400}
401
402/// Low level discovery methods using `k8s_openapi` types.
403///
404/// Consider using the [`discovery`](crate::discovery) module for
405/// easier-to-use variants of this functionality.
406/// The following methods might be deprecated to avoid confusion between similarly named types within `discovery`.
407impl Client {
408    /// Returns apiserver version.
409    pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
410        self.request(
411            Request::builder()
412                .uri("/version")
413                .body(vec![])
414                .map_err(Error::HttpError)?,
415        )
416        .await
417    }
418
419    /// Lists api groups that apiserver serves.
420    pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
421        self.request(
422            Request::builder()
423                .uri("/apis")
424                .body(vec![])
425                .map_err(Error::HttpError)?,
426        )
427        .await
428    }
429
430    /// Lists resources served in given API group.
431    ///
432    /// ### Example usage:
433    /// ```rust
434    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
435    /// let apigroups = client.list_api_groups().await?;
436    /// for g in apigroups.groups {
437    ///     let ver = g
438    ///         .preferred_version
439    ///         .as_ref()
440    ///         .or_else(|| g.versions.first())
441    ///         .expect("preferred or versions exists");
442    ///     let apis = client.list_api_group_resources(&ver.group_version).await?;
443    ///     dbg!(apis);
444    /// }
445    /// # Ok(())
446    /// # }
447    /// ```
448    pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
449        let url = format!("/apis/{apiversion}");
450        self.request(
451            Request::builder()
452                .uri(url)
453                .body(vec![])
454                .map_err(Error::HttpError)?,
455        )
456        .await
457    }
458
459    /// Lists versions of `core` a.k.a. `""` legacy API group.
460    pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
461        self.request(
462            Request::builder()
463                .uri("/api")
464                .body(vec![])
465                .map_err(Error::HttpError)?,
466        )
467        .await
468    }
469
470    /// Lists resources served in particular `core` group version.
471    pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
472        let url = format!("/api/{version}");
473        self.request(
474            Request::builder()
475                .uri(url)
476                .body(vec![])
477                .map_err(Error::HttpError)?,
478        )
479        .await
480    }
481}
482
483/// Aggregated Discovery API methods
484///
485/// These methods use the Aggregated Discovery API (available since Kubernetes 1.26, stable in 1.30)
486/// to fetch all API resources in a single request, reducing the number of API calls compared to
487/// the traditional discovery methods.
488impl Client {
489    /// Returns aggregated discovery for all API groups served at /apis.
490    ///
491    /// This uses the Aggregated Discovery API to fetch all non-core API groups
492    /// and their resources in a single request.
493    ///
494    /// Requires Kubernetes 1.26+ (beta) or 1.30+ (stable).
495    ///
496    /// ### Example usage:
497    /// ```rust,no_run
498    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
499    /// let discovery = client.list_api_groups_aggregated().await?;
500    /// for group in discovery.items {
501    ///     let name = group.metadata.as_ref().and_then(|m| m.name.as_ref());
502    ///     println!("Group: {:?}", name);
503    ///     for version in group.versions {
504    ///         println!("  Version: {:?}", version.version);
505    ///         for resource in version.resources {
506    ///             println!("    Resource: {:?}", resource.resource);
507    ///         }
508    ///     }
509    /// }
510    /// # Ok(())
511    /// # }
512    /// ```
513    pub async fn list_api_groups_aggregated(&self) -> Result<APIGroupDiscoveryList> {
514        self.request(
515            Request::builder()
516                .uri("/apis")
517                .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
518                .body(vec![])
519                .map_err(Error::HttpError)?,
520        )
521        .await
522    }
523
524    /// Returns aggregated discovery for core API group served at /api.
525    ///
526    /// This uses the Aggregated Discovery API to fetch the core API group
527    /// and all its resources in a single request.
528    ///
529    /// Requires Kubernetes 1.26+ (beta) or 1.30+ (stable).
530    ///
531    /// ### Example usage:
532    /// ```rust,no_run
533    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
534    /// let discovery = client.list_core_api_versions_aggregated().await?;
535    /// for group in discovery.items {
536    ///     for version in group.versions {
537    ///         println!("Core version: {:?}", version.version);
538    ///         for resource in version.resources {
539    ///             println!("  Resource: {:?} (scope: {:?})", resource.resource, resource.scope);
540    ///         }
541    ///     }
542    /// }
543    /// # Ok(())
544    /// # }
545    /// ```
546    pub async fn list_core_api_versions_aggregated(&self) -> Result<APIGroupDiscoveryList> {
547        self.request(
548            Request::builder()
549                .uri("/api")
550                .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
551                .body(vec![])
552                .map_err(Error::HttpError)?,
553        )
554        .await
555    }
556}
557
558/// Kubernetes returned error handling
559///
560/// Either kube returned an explicit ApiError struct,
561/// or it someohow returned something we couldn't parse as one.
562///
563/// In either case, present an ApiError upstream.
564/// The latter is probably a bug if encountered.
565async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
566    let status = res.status();
567    if status.is_client_error() || status.is_server_error() {
568        // trace!("Status = {:?} for {}", status, res.url());
569        let body_bytes = res.into_body().collect().await?.to_bytes();
570        let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
571        // Print better debug when things do fail
572        // trace!("Parsing error: {}", text);
573        if let Ok(status) = serde_json::from_str::<Status>(&text) {
574            tracing::debug!("Unsuccessful: {status:?}");
575            Err(Error::Api(status.boxed()))
576        } else {
577            tracing::warn!("Unsuccessful data error parse: {text}");
578            let status = Status::failure(&text, "Failed to parse error data").with_code(status.as_u16());
579            tracing::debug!("Unsuccessful: {status:?} (reconstruct)");
580            Err(Error::Api(status.boxed()))
581        }
582    } else {
583        Ok(res)
584    }
585}
586
587impl TryFrom<Config> for Client {
588    type Error = Error;
589
590    /// Builds a default [`Client`] from a [`Config`].
591    ///
592    /// See [`ClientBuilder`] or [`Client::new`] if more customization is required
593    fn try_from(config: Config) -> Result<Self> {
594        Ok(ClientBuilder::try_from(config)?.build())
595    }
596}
597
598impl TryFrom<Kubeconfig> for Client {
599    type Error = Error;
600
601    fn try_from(kubeconfig: Kubeconfig) -> Result<Self> {
602        let config = Config::try_from(kubeconfig)?;
603        Ok(ClientBuilder::try_from(config)?.build())
604    }
605}
606
607#[cfg(test)]
608mod tests {
609    use std::pin::pin;
610
611    use crate::{
612        Api, Client,
613        client::Body,
614        config::{AuthInfo, Cluster, Context, Kubeconfig, NamedAuthInfo, NamedCluster, NamedContext},
615    };
616
617    use http::{Request, Response};
618    use k8s_openapi::api::core::v1::Pod;
619    use tower_test::mock;
620
621    #[tokio::test]
622    async fn test_default_ns() {
623        let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
624        let client = Client::new(mock_service, "test-namespace");
625        assert_eq!(client.default_namespace(), "test-namespace");
626    }
627
628    #[tokio::test]
629    async fn test_try_from_kubeconfig() {
630        let config = Kubeconfig {
631            current_context: Some("test-context".to_string()),
632            auth_infos: vec![NamedAuthInfo {
633                name: "test-user".to_string(),
634                auth_info: Some(AuthInfo::default()), // <-- empty but valid
635            }],
636            contexts: vec![NamedContext {
637                name: "test-context".to_string(),
638                context: Some(Context {
639                    cluster: "test-cluster".to_string(),
640                    user: Some("test-user".to_string()),
641                    namespace: Some("test-namespace".to_string()),
642                    ..Default::default()
643                }),
644            }],
645            clusters: vec![NamedCluster {
646                name: "test-cluster".to_string(),
647                cluster: Some(Cluster {
648                    server: Some("http://localhost:8080".to_string()),
649                    ..Default::default()
650                }),
651            }],
652            ..Default::default()
653        };
654        let client = Client::try_from(config).expect("Failed to create client from kubeconfig");
655        assert_eq!(client.default_namespace(), "test-namespace");
656    }
657
658    #[tokio::test]
659    async fn test_mock() {
660        let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
661        let spawned = tokio::spawn(async move {
662            // Receive a request for pod and respond with some data
663            let mut handle = pin!(handle);
664            let (request, send) = handle.next_request().await.expect("service not called");
665            assert_eq!(request.method(), http::Method::GET);
666            assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
667            let pod: Pod = serde_json::from_value(serde_json::json!({
668                "apiVersion": "v1",
669                "kind": "Pod",
670                "metadata": {
671                    "name": "test",
672                    "annotations": { "kube-rs": "test" },
673                },
674                "spec": {
675                    "containers": [{ "name": "test", "image": "test-image" }],
676                }
677            }))
678            .unwrap();
679            send.send_response(
680                Response::builder()
681                    .body(Body::from(serde_json::to_vec(&pod).unwrap()))
682                    .unwrap(),
683            );
684        });
685
686        let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
687        let pod = pods.get("test").await.unwrap();
688        assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
689        spawned.await.unwrap();
690    }
691}