Skip to main content

kube_client/client/
mod.rs

1//! API client for interacting with the Kubernetes API
2//!
3//! The [`Client`] uses standard kube error handling.
4//!
5//! This client can be used on its own or in conjunction with the [`Api`][crate::api::Api]
6//! type for more structured interaction with the kubernetes API.
7//!
8//! The [`Client`] can also be used with [`Discovery`](crate::Discovery) to dynamically
9//! retrieve the resources served by the kubernetes API.
10use either::{Either, Left, Right};
11use futures::{AsyncBufRead, StreamExt, TryStream, TryStreamExt, future::BoxFuture};
12use http::{self, Request, Response};
13use http_body_util::BodyExt;
14#[cfg(feature = "ws")]
15use hyper_util::rt::TokioIo;
16use jiff::Timestamp;
17use k8s_openapi::apimachinery::pkg::apis::meta::v1 as k8s_meta_v1;
18use kube_core::{discovery::v2::ACCEPT_AGGREGATED_DISCOVERY_V2, response::Status};
19use serde::de::DeserializeOwned;
20use serde_json::{self, Value};
21#[cfg(feature = "ws")]
22use tokio_tungstenite::{WebSocketStream, tungstenite as ws};
23use tokio_util::{
24    codec::{FramedRead, LinesCodec, LinesCodecError},
25    io::StreamReader,
26};
27use tower::{BoxError, Service, ServiceExt as _, buffer::Buffer};
28use tower_http::ServiceExt as _;
29
30pub use self::body::Body;
31use crate::{Config, Error, Result, api::WatchEvent, config::Kubeconfig};
32
33mod auth;
34mod body;
35mod builder;
36pub use kube_core::discovery::v2::{
37    APIGroupDiscovery, APIGroupDiscoveryList, APIResourceDiscovery, APISubresourceDiscovery,
38    APIVersionDiscovery, GroupVersionKind as DiscoveryGroupVersionKind,
39};
40#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
41#[cfg(feature = "unstable-client")]
42mod client_ext;
43#[cfg_attr(docsrs, doc(cfg(feature = "unstable-client")))]
44#[cfg(feature = "unstable-client")]
45pub use client_ext::scope;
46mod config_ext;
47pub use auth::Error as AuthError;
48pub use config_ext::ConfigExt;
49pub mod middleware;
50pub mod retry;
51
52#[cfg(any(feature = "rustls-tls", feature = "openssl-tls"))]
53mod tls;
54
55#[cfg(feature = "openssl-tls")]
56pub use tls::openssl_tls::Error as OpensslTlsError;
57#[cfg(feature = "rustls-tls")]
58pub use tls::rustls_tls::Error as RustlsTlsError;
59#[cfg(feature = "ws")]
60mod upgrade;
61
62#[cfg(feature = "oauth")]
63#[cfg_attr(docsrs, doc(cfg(feature = "oauth")))]
64pub use auth::OAuthError;
65
66#[cfg(feature = "oidc")]
67#[cfg_attr(docsrs, doc(cfg(feature = "oidc")))]
68pub use auth::oidc_errors;
69
70#[cfg(feature = "ws")]
71pub use upgrade::UpgradeConnectionError;
72
73#[cfg(feature = "kubelet-debug")]
74#[cfg_attr(docsrs, doc(cfg(feature = "kubelet-debug")))]
75mod kubelet_debug;
76
77pub use builder::{ClientBuilder, DynBody};
78
79/// Client for connecting with a Kubernetes cluster.
80///
81/// The easiest way to instantiate the client is either by
82/// inferring the configuration from the environment using
83/// [`Client::try_default`] or with an existing [`Config`]
84/// using [`Client::try_from`].
85#[cfg_attr(docsrs, doc(cfg(feature = "client")))]
86#[derive(Clone)]
87pub struct Client {
88    // - `Buffer` for cheap clone
89    // - `BoxFuture` for dynamic response future type
90    inner: Buffer<Request<Body>, BoxFuture<'static, Result<Response<Body>, BoxError>>>,
91    default_ns: String,
92    valid_until: Option<Timestamp>,
93}
94
95/// Represents a WebSocket connection.
96/// Value returned by [`Client::connect`].
97#[cfg(feature = "ws")]
98#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
99pub struct Connection {
100    stream: WebSocketStream<TokioIo<hyper::upgrade::Upgraded>>,
101    protocol: upgrade::StreamProtocol,
102}
103
104#[cfg(feature = "ws")]
105#[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
106impl Connection {
107    /// Return true if the stream supports graceful close signaling.
108    pub fn supports_stream_close(&self) -> bool {
109        self.protocol.supports_stream_close()
110    }
111
112    /// Transform into the raw WebSocketStream.
113    pub fn into_stream(self) -> WebSocketStream<TokioIo<hyper::upgrade::Upgraded>> {
114        self.stream
115    }
116}
117
118/// Constructors and low-level api interfaces.
119///
120/// Most users only need [`Client::try_default`] or [`Client::new`] from this block.
121///
122/// The many various lower level interfaces here are for more advanced use-cases with specific requirements.
123impl Client {
124    /// Create a [`Client`] using a custom `Service` stack.
125    ///
126    /// [`ConfigExt`](crate::client::ConfigExt) provides extensions for
127    /// building a custom stack.
128    ///
129    /// To create with the default stack with a [`Config`], use
130    /// [`Client::try_from`].
131    ///
132    /// To create with the default stack with an inferred [`Config`], use
133    /// [`Client::try_default`].
134    ///
135    /// # Example
136    ///
137    /// ```rust
138    /// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
139    /// use kube::{client::ConfigExt, Client, Config};
140    /// use tower::{BoxError, ServiceBuilder};
141    /// use hyper_util::rt::TokioExecutor;
142    ///
143    /// let config = Config::infer().await?;
144    /// let service = ServiceBuilder::new()
145    ///     .layer(config.base_uri_layer())
146    ///     .option_layer(config.auth_layer()?)
147    ///     .map_err(BoxError::from)
148    ///     .service(hyper_util::client::legacy::Client::builder(TokioExecutor::new()).build_http());
149    /// let client = Client::new(service, config.default_namespace);
150    /// # Ok(())
151    /// # }
152    /// ```
153    pub fn new<S, B, T>(service: S, default_namespace: T) -> Self
154    where
155        S: Service<Request<Body>, Response = Response<B>> + Send + 'static,
156        S::Future: Send + 'static,
157        S::Error: Into<BoxError>,
158        B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
159        B::Error: Into<BoxError>,
160        T: Into<String>,
161    {
162        // Transform response body to `crate::client::Body` and use type erased error to avoid type parameters.
163        let service = service
164            .map_response_body(Body::wrap_body)
165            .map_err(Into::into)
166            .boxed();
167        Self {
168            inner: Buffer::new(service, 1024),
169            default_ns: default_namespace.into(),
170            valid_until: None,
171        }
172    }
173
174    /// Sets an expiration timestamp to the client, which has to be checked by the user using [`Client::valid_until`] function.
175    pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
176        Client { valid_until, ..self }
177    }
178
179    /// Get the expiration timestamp of the client, if it has been set.
180    pub fn valid_until(&self) -> &Option<Timestamp> {
181        &self.valid_until
182    }
183
184    /// Create and initialize a [`Client`] using the inferred configuration.
185    ///
186    /// Will use [`Config::infer`] which attempts to load the local kubeconfig first,
187    /// and then if that fails, trying the in-cluster environment variables.
188    ///
189    /// Will fail if neither configuration could be loaded.
190    ///
191    /// ```rust
192    /// # async fn doc() -> Result<(), Box<dyn std::error::Error>> {
193    /// # use kube::Client;
194    /// let client = Client::try_default().await?;
195    /// # Ok(())
196    /// # }
197    /// ```
198    ///
199    /// If you already have a [`Config`] then use [`Client::try_from`](Self::try_from)
200    /// instead.
201    pub async fn try_default() -> Result<Self> {
202        Self::try_from(Config::infer().await.map_err(Error::InferConfig)?)
203    }
204
205    /// Get the default namespace for the client
206    ///
207    /// The namespace is either configured on `context` in the kubeconfig,
208    /// falls back to `default` when running locally,
209    /// or uses the service account's namespace when deployed in-cluster.
210    pub fn default_namespace(&self) -> &str {
211        &self.default_ns
212    }
213
214    /// Perform a raw HTTP request against the API and return the raw response back.
215    /// This method can be used to get raw access to the API which may be used to, for example,
216    /// create a proxy server or application-level gateway between localhost and the API server.
217    pub async fn send(&self, request: Request<Body>) -> Result<Response<Body>> {
218        let mut svc = self.inner.clone();
219        let res = svc
220            .ready()
221            .await
222            .map_err(Error::Service)?
223            .call(request)
224            .await
225            .map_err(|err| {
226                // Error decorating request
227                err.downcast::<Error>()
228                    .map(|e| *e)
229                    // Error requesting
230                    .or_else(|err| err.downcast::<hyper::Error>().map(|err| Error::HyperError(*err)))
231                    // Error from another middleware
232                    .unwrap_or_else(Error::Service)
233            })?;
234        Ok(res)
235    }
236
237    /// Make WebSocket connection.
238    #[cfg(feature = "ws")]
239    #[cfg_attr(docsrs, doc(cfg(feature = "ws")))]
240    pub async fn connect(&self, request: Request<Vec<u8>>) -> Result<Connection> {
241        use http::header::HeaderValue;
242        let (mut parts, body) = request.into_parts();
243        parts
244            .headers
245            .insert(http::header::CONNECTION, HeaderValue::from_static("Upgrade"));
246        parts
247            .headers
248            .insert(http::header::UPGRADE, HeaderValue::from_static("websocket"));
249        parts.headers.insert(
250            http::header::SEC_WEBSOCKET_VERSION,
251            HeaderValue::from_static("13"),
252        );
253        let key = tokio_tungstenite::tungstenite::handshake::client::generate_key();
254        parts.headers.insert(
255            http::header::SEC_WEBSOCKET_KEY,
256            key.parse().expect("valid header value"),
257        );
258        upgrade::StreamProtocol::add_to_headers(&mut parts.headers)?;
259
260        let res = self.send(Request::from_parts(parts, Body::from(body))).await?;
261        let protocol = upgrade::verify_response(&res, &key).map_err(Error::UpgradeConnection)?;
262        match hyper::upgrade::on(res).await {
263            Ok(upgraded) => Ok(Connection {
264                stream: WebSocketStream::from_raw_socket(
265                    TokioIo::new(upgraded),
266                    ws::protocol::Role::Client,
267                    None,
268                )
269                .await,
270                protocol,
271            }),
272
273            Err(e) => Err(Error::UpgradeConnection(
274                UpgradeConnectionError::GetPendingUpgrade(e),
275            )),
276        }
277    }
278
279    /// Perform a raw HTTP request against the API and deserialize the response
280    /// as JSON to some known type.
281    pub async fn request<T>(&self, request: Request<Vec<u8>>) -> Result<T>
282    where
283        T: DeserializeOwned,
284    {
285        let text = self.request_text(request).await?;
286
287        serde_json::from_str(&text).map_err(|e| {
288            tracing::warn!("{}, {:?}", text, e);
289            Error::SerdeError(e)
290        })
291    }
292
293    /// Perform a raw HTTP request against the API and get back the response
294    /// as a string
295    pub async fn request_text(&self, request: Request<Vec<u8>>) -> Result<String> {
296        let res = self.send(request.map(Body::from)).await?;
297        let res = handle_api_errors(res).await?;
298        let body_bytes = res.into_body().collect().await?.to_bytes();
299        let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
300        Ok(text)
301    }
302
303    /// Perform a raw HTTP request against the API and stream the response body.
304    ///
305    /// The response can be processed using [`AsyncReadExt`](futures::AsyncReadExt)
306    /// and [`AsyncBufReadExt`](futures::AsyncBufReadExt).
307    pub async fn request_stream(&self, request: Request<Vec<u8>>) -> Result<impl AsyncBufRead + use<>> {
308        let res = self.send(request.map(Body::from)).await?;
309        let res = handle_api_errors(res).await?;
310        // Map the error, since we want to convert this into an `AsyncBufReader` using
311        // `into_async_read` which specifies `std::io::Error` as the stream's error type.
312        let body = res.into_body().into_data_stream().map_err(std::io::Error::other);
313        Ok(body.into_async_read())
314    }
315
316    /// Perform a raw HTTP request against the API and get back either an object
317    /// deserialized as JSON or a [`Status`] Object.
318    pub async fn request_status<T>(&self, request: Request<Vec<u8>>) -> Result<Either<T, Status>>
319    where
320        T: DeserializeOwned,
321    {
322        let text = self.request_text(request).await?;
323        // It needs to be JSON:
324        let v: Value = serde_json::from_str(&text).map_err(Error::SerdeError)?;
325        if v["kind"] == "Status" {
326            tracing::trace!("Status from {}", text);
327            Ok(Right(serde_json::from_str::<Status>(&text).map_err(|e| {
328                tracing::warn!("{}, {:?}", text, e);
329                Error::SerdeError(e)
330            })?))
331        } else {
332            Ok(Left(serde_json::from_str::<T>(&text).map_err(|e| {
333                tracing::warn!("{}, {:?}", text, e);
334                Error::SerdeError(e)
335            })?))
336        }
337    }
338
339    /// Perform a raw request and get back a stream of [`WatchEvent`] objects
340    pub async fn request_events<T>(
341        &self,
342        request: Request<Vec<u8>>,
343    ) -> Result<impl TryStream<Item = Result<WatchEvent<T>>> + use<T>>
344    where
345        T: Clone + DeserializeOwned,
346    {
347        let res = self.send(request.map(Body::from)).await?;
348        // trace!("Streaming from {} -> {}", res.url(), res.status().as_str());
349        tracing::trace!("headers: {:?}", res.headers());
350
351        let frames = FramedRead::new(
352            StreamReader::new(res.into_body().into_data_stream().map_err(|e| {
353                // Unexpected EOF from chunked decoder.
354                // Tends to happen when watching for 300+s. This will be ignored.
355                if e.to_string().contains("unexpected EOF during chunk") {
356                    return std::io::Error::new(std::io::ErrorKind::UnexpectedEof, e);
357                }
358                std::io::Error::other(e)
359            })),
360            LinesCodec::new(),
361        );
362
363        Ok(frames.filter_map(|res| async {
364            match res {
365                Ok(line) => match serde_json::from_str::<WatchEvent<T>>(&line) {
366                    Ok(event) => Some(Ok(event)),
367                    Err(e) => {
368                        // Ignore EOF error that can happen for incomplete line from `decode_eof`.
369                        if e.is_eof() {
370                            return None;
371                        }
372
373                        // Got general error response
374                        if let Ok(status) = serde_json::from_str::<Status>(&line) {
375                            return Some(Err(Error::Api(status.boxed())));
376                        }
377                        // Parsing error
378                        Some(Err(Error::SerdeError(e)))
379                    }
380                },
381
382                Err(LinesCodecError::Io(e)) => match e.kind() {
383                    // Client timeout
384                    std::io::ErrorKind::TimedOut => {
385                        tracing::warn!("timeout in poll: {}", e); // our client timeout
386                        None
387                    }
388                    // Unexpected EOF from chunked decoder.
389                    // Tends to happen after 300+s of watching.
390                    std::io::ErrorKind::UnexpectedEof => {
391                        tracing::warn!("eof in poll: {}", e);
392                        None
393                    }
394                    _ => Some(Err(Error::ReadEvents(e))),
395                },
396
397                // Reached the maximum line length without finding a newline.
398                // This should never happen because we're using the default `usize::MAX`.
399                Err(LinesCodecError::MaxLineLengthExceeded) => {
400                    Some(Err(Error::LinesCodecMaxLineLengthExceeded))
401                }
402            }
403        }))
404    }
405}
406
407/// Low level discovery methods using `k8s_openapi` types.
408///
409/// Consider using the [`discovery`](crate::discovery) module for
410/// easier-to-use variants of this functionality.
411/// The following methods might be deprecated to avoid confusion between similarly named types within `discovery`.
412impl Client {
413    /// Returns apiserver version.
414    pub async fn apiserver_version(&self) -> Result<k8s_openapi::apimachinery::pkg::version::Info> {
415        self.request(Request::get("/version").body(vec![]).map_err(Error::HttpError)?)
416            .await
417    }
418
419    /// Lists api groups that apiserver serves.
420    pub async fn list_api_groups(&self) -> Result<k8s_meta_v1::APIGroupList> {
421        self.request(Request::get("/apis").body(vec![]).map_err(Error::HttpError)?)
422            .await
423    }
424
425    /// Lists resources served in given API group.
426    ///
427    /// ### Example usage:
428    /// ```rust
429    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
430    /// let apigroups = client.list_api_groups().await?;
431    /// for g in apigroups.groups {
432    ///     let ver = g
433    ///         .preferred_version
434    ///         .as_ref()
435    ///         .or_else(|| g.versions.first())
436    ///         .expect("preferred or versions exists");
437    ///     let apis = client.list_api_group_resources(&ver.group_version).await?;
438    ///     dbg!(apis);
439    /// }
440    /// # Ok(())
441    /// # }
442    /// ```
443    pub async fn list_api_group_resources(&self, apiversion: &str) -> Result<k8s_meta_v1::APIResourceList> {
444        let url = format!("/apis/{apiversion}");
445        self.request(Request::get(url).body(vec![]).map_err(Error::HttpError)?)
446            .await
447    }
448
449    /// Lists versions of `core` a.k.a. `""` legacy API group.
450    pub async fn list_core_api_versions(&self) -> Result<k8s_meta_v1::APIVersions> {
451        self.request(Request::get("/api").body(vec![]).map_err(Error::HttpError)?)
452            .await
453    }
454
455    /// Lists resources served in particular `core` group version.
456    pub async fn list_core_api_resources(&self, version: &str) -> Result<k8s_meta_v1::APIResourceList> {
457        let url = format!("/api/{version}");
458        self.request(Request::get(url).body(vec![]).map_err(Error::HttpError)?)
459            .await
460    }
461}
462
463/// Aggregated Discovery API methods
464///
465/// These methods use the Aggregated Discovery API (available since Kubernetes 1.26, stable in 1.30)
466/// to fetch all API resources in a single request, reducing the number of API calls compared to
467/// the traditional discovery methods.
468impl Client {
469    /// Returns aggregated discovery for all API groups served at /apis.
470    ///
471    /// This uses the Aggregated Discovery API to fetch all non-core API groups
472    /// and their resources in a single request.
473    ///
474    /// Requires Kubernetes 1.26+ (beta) or 1.30+ (stable).
475    ///
476    /// ### Example usage:
477    /// ```rust,no_run
478    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
479    /// let discovery = client.list_api_groups_aggregated().await?;
480    /// for group in discovery.items {
481    ///     let name = group.metadata.as_ref().and_then(|m| m.name.as_ref());
482    ///     println!("Group: {:?}", name);
483    ///     for version in group.versions {
484    ///         println!("  Version: {:?}", version.version);
485    ///         for resource in version.resources {
486    ///             println!("    Resource: {:?}", resource.resource);
487    ///         }
488    ///     }
489    /// }
490    /// # Ok(())
491    /// # }
492    /// ```
493    pub async fn list_api_groups_aggregated(&self) -> Result<APIGroupDiscoveryList> {
494        self.request(
495            Request::get("/apis")
496                .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
497                .body(vec![])
498                .map_err(Error::HttpError)?,
499        )
500        .await
501    }
502
503    /// Returns aggregated discovery for core API group served at /api.
504    ///
505    /// This uses the Aggregated Discovery API to fetch the core API group
506    /// and all its resources in a single request.
507    ///
508    /// Requires Kubernetes 1.26+ (beta) or 1.30+ (stable).
509    ///
510    /// ### Example usage:
511    /// ```rust,no_run
512    /// # async fn scope(client: kube::Client) -> Result<(), Box<dyn std::error::Error>> {
513    /// let discovery = client.list_core_api_versions_aggregated().await?;
514    /// for group in discovery.items {
515    ///     for version in group.versions {
516    ///         println!("Core version: {:?}", version.version);
517    ///         for resource in version.resources {
518    ///             println!("  Resource: {:?} (scope: {:?})", resource.resource, resource.scope);
519    ///         }
520    ///     }
521    /// }
522    /// # Ok(())
523    /// # }
524    /// ```
525    pub async fn list_core_api_versions_aggregated(&self) -> Result<APIGroupDiscoveryList> {
526        self.request(
527            Request::get("/api")
528                .header(http::header::ACCEPT, ACCEPT_AGGREGATED_DISCOVERY_V2)
529                .body(vec![])
530                .map_err(Error::HttpError)?,
531        )
532        .await
533    }
534}
535
536/// Kubernetes returned error handling
537///
538/// Either kube returned an explicit ApiError struct,
539/// or it someohow returned something we couldn't parse as one.
540///
541/// In either case, present an ApiError upstream.
542/// The latter is probably a bug if encountered.
543async fn handle_api_errors(res: Response<Body>) -> Result<Response<Body>> {
544    let status = res.status();
545    if status.is_client_error() || status.is_server_error() {
546        // trace!("Status = {:?} for {}", status, res.url());
547        let body_bytes = res.into_body().collect().await?.to_bytes();
548        let text = String::from_utf8(body_bytes.to_vec()).map_err(Error::FromUtf8)?;
549        // Print better debug when things do fail
550        // trace!("Parsing error: {}", text);
551        if let Ok(status) = serde_json::from_str::<Status>(&text) {
552            tracing::debug!("Unsuccessful: {status:?}");
553            Err(Error::Api(status.boxed()))
554        } else {
555            tracing::warn!("Unsuccessful data error parse: {text}");
556            let status = Status::failure(&text, "Failed to parse error data").with_code(status.as_u16());
557            tracing::debug!("Unsuccessful: {status:?} (reconstruct)");
558            Err(Error::Api(status.boxed()))
559        }
560    } else {
561        Ok(res)
562    }
563}
564
565impl TryFrom<Config> for Client {
566    type Error = Error;
567
568    /// Builds a default [`Client`] from a [`Config`].
569    ///
570    /// See [`ClientBuilder`] or [`Client::new`] if more customization is required
571    fn try_from(config: Config) -> Result<Self> {
572        Ok(ClientBuilder::try_from(config)?.build())
573    }
574}
575
576impl TryFrom<Kubeconfig> for Client {
577    type Error = Error;
578
579    fn try_from(kubeconfig: Kubeconfig) -> Result<Self> {
580        let config = Config::try_from(kubeconfig)?;
581        Ok(ClientBuilder::try_from(config)?.build())
582    }
583}
584
585#[cfg(test)]
586mod tests {
587    use std::pin::pin;
588
589    use crate::{
590        Api, Client,
591        client::Body,
592        config::{AuthInfo, Cluster, Context, Kubeconfig, NamedAuthInfo, NamedCluster, NamedContext},
593    };
594
595    use http::{Request, Response};
596    use k8s_openapi::api::core::v1::Pod;
597    use tower_test::mock;
598
599    #[tokio::test]
600    async fn test_default_ns() {
601        let (mock_service, _) = mock::pair::<Request<Body>, Response<Body>>();
602        let client = Client::new(mock_service, "test-namespace");
603        assert_eq!(client.default_namespace(), "test-namespace");
604    }
605
606    #[tokio::test]
607    async fn test_try_from_kubeconfig() {
608        let config = Kubeconfig {
609            current_context: Some("test-context".to_string()),
610            auth_infos: vec![NamedAuthInfo {
611                name: "test-user".to_string(),
612                auth_info: Some(AuthInfo::default()), // <-- empty but valid
613            }],
614            contexts: vec![NamedContext {
615                name: "test-context".to_string(),
616                context: Some(Context {
617                    cluster: "test-cluster".to_string(),
618                    user: Some("test-user".to_string()),
619                    namespace: Some("test-namespace".to_string()),
620                    ..Default::default()
621                }),
622            }],
623            clusters: vec![NamedCluster {
624                name: "test-cluster".to_string(),
625                cluster: Some(Cluster {
626                    server: Some("http://localhost:8080".to_string()),
627                    ..Default::default()
628                }),
629            }],
630            ..Default::default()
631        };
632        let client = Client::try_from(config).expect("Failed to create client from kubeconfig");
633        assert_eq!(client.default_namespace(), "test-namespace");
634    }
635
636    #[tokio::test]
637    async fn test_mock() {
638        let (mock_service, handle) = mock::pair::<Request<Body>, Response<Body>>();
639        let spawned = tokio::spawn(async move {
640            // Receive a request for pod and respond with some data
641            let mut handle = pin!(handle);
642            let (request, send) = handle.next_request().await.expect("service not called");
643            assert_eq!(request.method(), http::Method::GET);
644            assert_eq!(request.uri().to_string(), "/api/v1/namespaces/default/pods/test");
645            let pod: Pod = serde_json::from_value(serde_json::json!({
646                "apiVersion": "v1",
647                "kind": "Pod",
648                "metadata": {
649                    "name": "test",
650                    "annotations": { "kube-rs": "test" },
651                },
652                "spec": {
653                    "containers": [{ "name": "test", "image": "test-image" }],
654                }
655            }))
656            .unwrap();
657            send.send_response(Response::new(Body::from(serde_json::to_vec(&pod).unwrap())));
658        });
659
660        let pods: Api<Pod> = Api::default_namespaced(Client::new(mock_service, "default"));
661        let pod = pods.get("test").await.unwrap();
662        assert_eq!(pod.metadata.annotations.unwrap().get("kube-rs").unwrap(), "test");
663        spawned.await.unwrap();
664    }
665}