Skip to main content

kube_client/client/
builder.rs

1use bytes::Bytes;
2use http::{Request, Response, header::HeaderMap};
3use hyper::{
4    body::Incoming,
5    rt::{Read, Write},
6};
7use hyper_timeout::TimeoutConnector;
8
9use hyper_util::{
10    client::legacy::connect::{Connection, HttpConnector},
11    rt::TokioExecutor,
12};
13
14use jiff::Timestamp;
15use std::time::Duration;
16use tower::{BoxError, Layer, Service, ServiceBuilder, ServiceExt as _, util::BoxService};
17use tower_http::{ServiceExt as _, classify::ServerErrorsFailureClass, trace::TraceLayer};
18use tracing::Span;
19
20use super::body::Body;
21use crate::{Client, Config, Error, Result, client::ConfigExt};
22
23/// HTTP body of a dynamic backing type.
24///
25/// The suggested implementation type is [`crate::client::Body`].
26pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;
27
28/// Builder for [`Client`] instances with customized [tower](`Service`) middleware.
29pub struct ClientBuilder<Svc> {
30    service: Svc,
31    default_ns: String,
32    valid_until: Option<Timestamp>,
33}
34
35impl<Svc> ClientBuilder<Svc> {
36    /// Construct a [`ClientBuilder`] from scratch with a fully custom [`Service`] stack.
37    ///
38    /// This method is only intended for advanced use cases, most users will want to use [`ClientBuilder::try_from`] instead,
39    /// which provides a default stack as a starting point.
40    pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
41    where
42        Svc: Service<Request<Body>>,
43    {
44        Self {
45            service,
46            default_ns: default_namespace.into(),
47            valid_until: None,
48        }
49    }
50
51    /// Add a [`Layer`] to the current [`Service`] stack.
52    pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
53        let Self {
54            service: stack,
55            default_ns,
56            valid_until,
57        } = self;
58        ClientBuilder {
59            service: layer.layer(stack),
60            default_ns,
61            valid_until,
62        }
63    }
64
65    /// Sets an expiration timestamp for the client.
66    pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
67        ClientBuilder {
68            service: self.service,
69            default_ns: self.default_ns,
70            valid_until,
71        }
72    }
73
74    /// Build a [`Client`] instance with the current [`Service`] stack.
75    pub fn build<B>(self) -> Client
76    where
77        Svc: Service<Request<Body>, Response = Response<B>> + Send + 'static,
78        Svc::Future: Send + 'static,
79        Svc::Error: Into<BoxError>,
80        B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
81        B::Error: Into<BoxError>,
82    {
83        Client::new(self.service, self.default_ns).with_valid_until(self.valid_until)
84    }
85}
86
87pub type GenericService = BoxService<Request<Body>, Response<Box<DynBody>>, BoxError>;
88
89impl TryFrom<Config> for ClientBuilder<GenericService> {
90    type Error = Error;
91
92    /// Builds a default [`ClientBuilder`] stack from a given configuration
93    fn try_from(config: Config) -> Result<Self> {
94        let mut connector = HttpConnector::new();
95        connector.enforce_http(false);
96
97        #[cfg(all(feature = "aws-lc-rs", feature = "rustls-tls"))]
98        {
99            if rustls::crypto::CryptoProvider::get_default().is_none() {
100                // the only error here is if it's been initialized in between: we can ignore it
101                // since our semantic is only to set the default value if it does not exist.
102                let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
103            }
104        }
105
106        match config.proxy_url.as_ref() {
107            Some(proxy_url) if proxy_url.scheme_str() == Some("socks5") => {
108                #[cfg(feature = "socks5")]
109                {
110                    let connector = hyper_util::client::legacy::connect::proxy::SocksV5::new(
111                        proxy_url.clone(),
112                        connector,
113                    );
114                    make_generic_builder(connector, config)
115                }
116
117                #[cfg(not(feature = "socks5"))]
118                Err(Error::ProxyProtocolDisabled {
119                    proxy_url: proxy_url.clone(),
120                    protocol_feature: "kube/socks5",
121                })
122            }
123
124            Some(proxy_url) if proxy_url.scheme_str() == Some("http") => {
125                #[cfg(feature = "http-proxy")]
126                {
127                    let mut connector =
128                        hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_url.clone(), connector);
129
130                    if let Some(authority) = proxy_url.authority() {
131                        if let Some((userinfo, _)) = authority.as_str().split_once('@') {
132                            use base64::Engine;
133                            use http::HeaderValue;
134
135                            let value = format!("Basic {}", base64::engine::general_purpose::STANDARD.encode(userinfo));
136                            let header = HeaderValue::from_str(&value).unwrap();
137                            connector = connector.with_auth(header);
138                        } 
139                    }
140
141                    make_generic_builder(connector, config)
142                }
143
144                #[cfg(not(feature = "http-proxy"))]
145                Err(Error::ProxyProtocolDisabled {
146                    proxy_url: proxy_url.clone(),
147                    protocol_feature: "kube/http-proxy",
148                })
149            }
150
151            Some(proxy_url) => Err(Error::ProxyProtocolUnsupported {
152                proxy_url: proxy_url.clone(),
153            }),
154
155            None => make_generic_builder(connector, config),
156        }
157    }
158}
159
160/// Helper function for implementation of [`TryFrom<Config>`] for [`ClientBuilder`].
161/// Ignores [`Config::proxy_url`], which at this point is already handled.
162fn make_generic_builder<H>(connector: H, config: Config) -> Result<ClientBuilder<GenericService>, Error>
163where
164    H: 'static + Clone + Send + Sync + Service<http::Uri>,
165    H::Response: 'static + Connection + Read + Write + Send + Unpin,
166    H::Future: 'static + Send,
167    H::Error: 'static + Send + Sync + std::error::Error,
168{
169    let default_ns = config.default_namespace.clone();
170    let auth_layer = config.auth_layer()?;
171
172    let client: hyper_util::client::legacy::Client<_, Body> = {
173        // Current TLS feature precedence when more than one are set:
174        // 1. rustls-tls
175        // 2. openssl-tls
176        // Create a custom client to use something else.
177        // If TLS features are not enabled, http connector will be used.
178        #[cfg(feature = "rustls-tls")]
179        let connector = config.rustls_https_connector_with_connector(connector)?;
180        #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
181        let connector = config.openssl_https_connector_with_connector(connector)?;
182        #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
183        if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
184            // no tls stack situation only works with http scheme
185            return Err(Error::TlsRequired);
186        }
187
188        let mut connector = TimeoutConnector::new(connector);
189
190        // Set the timeouts for the client
191        connector.set_connect_timeout(config.connect_timeout);
192        connector.set_read_timeout(config.read_timeout);
193        connector.set_write_timeout(config.write_timeout);
194
195        hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector)
196    };
197
198    let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
199    #[cfg(feature = "gzip")]
200    let stack = ServiceBuilder::new()
201        .layer(stack)
202        .layer(
203            tower_http::decompression::DecompressionLayer::new()
204                .no_br()
205                .no_deflate()
206                .no_zstd()
207                .gzip(!config.disable_compression),
208        )
209        .into_inner();
210
211    let service = ServiceBuilder::new()
212        .layer(stack)
213        .option_layer(auth_layer)
214        .layer(config.extra_headers_layer()?)
215        .layer(
216            // Attribute names follow [Semantic Conventions].
217            // [Semantic Conventions]: https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/http.md
218            TraceLayer::new_for_http()
219                .make_span_with(|req: &Request<Body>| {
220                    tracing::debug_span!(
221                        "HTTP",
222                         http.method = %req.method(),
223                         http.url = %req.uri(),
224                         http.status_code = tracing::field::Empty,
225                         otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
226                         otel.kind = "client",
227                         otel.status_code = tracing::field::Empty,
228                    )
229                })
230                .on_request(|_req: &Request<Body>, _span: &Span| {
231                    tracing::debug!("requesting");
232                })
233                .on_response(|res: &Response<Incoming>, _latency: Duration, span: &Span| {
234                    let status = res.status();
235                    span.record("http.status_code", status.as_u16());
236                    if status.is_client_error() || status.is_server_error() {
237                        span.record("otel.status_code", "ERROR");
238                    }
239                })
240                // Explicitly disable `on_body_chunk`. The default does nothing.
241                .on_body_chunk(())
242                .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
243                    tracing::debug!("stream closed");
244                })
245                .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
246                    // Called when
247                    // - Calling the inner service errored
248                    // - Polling `Body` errored
249                    // - the response was classified as failure (5xx)
250                    // - End of stream was classified as failure
251                    span.record("otel.status_code", "ERROR");
252                    match ec {
253                        ServerErrorsFailureClass::StatusCode(status) => {
254                            span.record("http.status_code", status.as_u16());
255                            tracing::error!("failed with status {}", status)
256                        }
257                        ServerErrorsFailureClass::Error(err) => {
258                            tracing::error!("failed with error {}", err)
259                        }
260                    }
261                }),
262        )
263        .map_err(BoxError::from)
264        .service(client);
265
266    let (_, expiration) = config.exec_identity_pem();
267
268    let client = ClientBuilder::new(
269        service
270            .map_response_body(|body| {
271                Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
272            })
273            .boxed(),
274        default_ns,
275    )
276    .with_valid_until(expiration);
277
278    Ok(client)
279}
280
281#[cfg(test)]
282mod tests {
283    #[cfg(feature = "gzip")] use super::*;
284
285    #[cfg(feature = "gzip")]
286    #[tokio::test]
287    async fn test_no_accept_encoding_header_sent_when_compression_disabled()
288    -> Result<(), Box<dyn std::error::Error>> {
289        use http::Uri;
290        use std::net::SocketAddr;
291        use tokio::net::{TcpListener, TcpStream};
292
293        // setup a server that echoes back any encoding header value
294        let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
295        let listener = TcpListener::bind(addr).await?;
296        let local_addr = listener.local_addr()?;
297        let uri: Uri = format!("http://{}", local_addr).parse()?;
298
299        tokio::spawn(async move {
300            use http_body_util::Full;
301            use hyper::{server::conn::http1, service::service_fn};
302            use hyper_util::rt::{TokioIo, TokioTimer};
303            use std::convert::Infallible;
304
305            loop {
306                let (tcp, _) = listener.accept().await.unwrap();
307                let io: TokioIo<TcpStream> = TokioIo::new(tcp);
308
309                tokio::spawn(async move {
310                    http1::Builder::new()
311                        .timer(TokioTimer::new())
312                        .serve_connection(
313                            io,
314                            service_fn(|req| async move {
315                                let response = req
316                                    .headers()
317                                    .get(http::header::ACCEPT_ENCODING)
318                                    .map(|b| Bytes::copy_from_slice(b.as_bytes()))
319                                    .unwrap_or_default();
320                                Ok::<_, Infallible>(Response::new(Full::new(response)))
321                            }),
322                        )
323                        .await
324                        .unwrap();
325                });
326            }
327        });
328
329        // confirm gzip echoed back with default config
330        let config = Config { ..Config::new(uri) };
331        let client = make_generic_builder(HttpConnector::new(), config.clone())?.build();
332        let response = client.request_text(http::Request::default()).await?;
333        assert_eq!(&response, "gzip");
334
335        // now disable and check empty string echoed back
336        let config = Config {
337            disable_compression: true,
338            ..config
339        };
340        let client = make_generic_builder(HttpConnector::new(), config)?.build();
341        let response = client.request_text(http::Request::default()).await?;
342        assert_eq!(&response, "");
343
344        Ok(())
345    }
346}