1use bytes::Bytes;
2use http::{Request, Response, header::HeaderMap};
3use hyper::{
4 body::Incoming,
5 rt::{Read, Write},
6};
7use hyper_timeout::TimeoutConnector;
8
9use hyper_util::{
10 client::legacy::connect::{Connection, HttpConnector},
11 rt::TokioExecutor,
12};
13
14use jiff::Timestamp;
15use std::time::Duration;
16use tower::{BoxError, Layer, Service, ServiceBuilder, ServiceExt as _, retry::RetryLayer, util::BoxService};
17use tower_http::{ServiceExt as _, classify::ServerErrorsFailureClass, trace::TraceLayer};
18use tracing::Span;
19
20use super::body::Body;
21use crate::{Client, Config, Error, Result, client::{ConfigExt, retry::RetryPolicy}};
22
23pub type DynBody = dyn http_body::Body<Data = Bytes, Error = BoxError> + Send + Unpin;
27
28pub struct ClientBuilder<Svc> {
30 service: Svc,
31 default_ns: String,
32 valid_until: Option<Timestamp>,
33}
34
35impl<Svc> ClientBuilder<Svc> {
36 pub fn new(service: Svc, default_namespace: impl Into<String>) -> Self
41 where
42 Svc: Service<Request<Body>>,
43 {
44 Self {
45 service,
46 default_ns: default_namespace.into(),
47 valid_until: None,
48 }
49 }
50
51 pub fn with_layer<L: Layer<Svc>>(self, layer: &L) -> ClientBuilder<L::Service> {
53 let Self {
54 service: stack,
55 default_ns,
56 valid_until,
57 } = self;
58 ClientBuilder {
59 service: layer.layer(stack),
60 default_ns,
61 valid_until,
62 }
63 }
64
65 pub fn with_valid_until(self, valid_until: Option<Timestamp>) -> Self {
67 ClientBuilder {
68 service: self.service,
69 default_ns: self.default_ns,
70 valid_until,
71 }
72 }
73
74 pub fn build<B>(self) -> Client
76 where
77 Svc: Service<Request<Body>, Response = Response<B>> + Send + 'static,
78 Svc::Future: Send + 'static,
79 Svc::Error: Into<BoxError>,
80 B: http_body::Body<Data = bytes::Bytes> + Send + 'static,
81 B::Error: Into<BoxError>,
82 {
83 Client::new(self.service, self.default_ns).with_valid_until(self.valid_until)
84 }
85}
86
87pub type GenericService = BoxService<Request<Body>, Response<Box<DynBody>>, BoxError>;
88
89impl TryFrom<Config> for ClientBuilder<GenericService> {
90 type Error = Error;
91
92 fn try_from(config: Config) -> Result<Self> {
94 let mut connector = HttpConnector::new();
95 connector.enforce_http(false);
96
97 #[cfg(all(feature = "aws-lc-rs", feature = "rustls-tls"))]
98 {
99 if rustls::crypto::CryptoProvider::get_default().is_none() {
100 let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
103 }
104 }
105
106 match config.proxy_url.as_ref() {
107 Some(proxy_url) if proxy_url.scheme_str() == Some("socks5") => {
108 #[cfg(feature = "socks5")]
109 {
110 let connector = hyper_util::client::legacy::connect::proxy::SocksV5::new(
111 proxy_url.clone(),
112 connector,
113 );
114 make_generic_builder(connector, config)
115 }
116
117 #[cfg(not(feature = "socks5"))]
118 Err(Error::ProxyProtocolDisabled {
119 proxy_url: proxy_url.clone(),
120 protocol_feature: "kube/socks5",
121 })
122 }
123
124 Some(proxy_url) if proxy_url.scheme_str() == Some("http") => {
125 #[cfg(feature = "http-proxy")]
126 {
127 let mut connector =
128 hyper_util::client::legacy::connect::proxy::Tunnel::new(proxy_url.clone(), connector);
129
130 if let Some(authority) = proxy_url.authority() {
131 if let Some((userinfo, _)) = authority.as_str().split_once('@') {
132 use base64::Engine;
133 use http::HeaderValue;
134
135 let value = format!("Basic {}", base64::engine::general_purpose::STANDARD.encode(userinfo));
136 let header = HeaderValue::from_str(&value).unwrap();
137 connector = connector.with_auth(header);
138 }
139 }
140
141 make_generic_builder(connector, config)
142 }
143
144 #[cfg(not(feature = "http-proxy"))]
145 Err(Error::ProxyProtocolDisabled {
146 proxy_url: proxy_url.clone(),
147 protocol_feature: "kube/http-proxy",
148 })
149 }
150
151 Some(proxy_url) => Err(Error::ProxyProtocolUnsupported {
152 proxy_url: proxy_url.clone(),
153 }),
154
155 None => make_generic_builder(connector, config),
156 }
157 }
158}
159
160fn make_generic_builder<H>(connector: H, config: Config) -> Result<ClientBuilder<GenericService>, Error>
163where
164 H: 'static + Clone + Send + Sync + Service<http::Uri>,
165 H::Response: 'static + Connection + Read + Write + Send + Unpin,
166 H::Future: 'static + Send,
167 H::Error: 'static + Send + Sync + std::error::Error,
168{
169 let default_ns = config.default_namespace.clone();
170 let auth_layer = config.auth_layer()?;
171
172 let client: hyper_util::client::legacy::Client<_, Body> = {
173 #[cfg(feature = "rustls-tls")]
179 let connector = config.rustls_https_connector_with_connector(connector)?;
180 #[cfg(all(not(feature = "rustls-tls"), feature = "openssl-tls"))]
181 let connector = config.openssl_https_connector_with_connector(connector)?;
182 #[cfg(all(not(feature = "rustls-tls"), not(feature = "openssl-tls")))]
183 if config.cluster_url.scheme() == Some(&http::uri::Scheme::HTTPS) {
184 return Err(Error::TlsRequired);
186 }
187
188 let mut connector = TimeoutConnector::new(connector);
189
190 connector.set_connect_timeout(config.connect_timeout);
192 connector.set_read_timeout(config.read_timeout);
193 connector.set_write_timeout(config.write_timeout);
194
195 hyper_util::client::legacy::Builder::new(TokioExecutor::new()).build(connector)
196 };
197
198 let stack = ServiceBuilder::new().layer(config.base_uri_layer()).into_inner();
199 #[cfg(feature = "gzip")]
200 let stack = ServiceBuilder::new()
201 .layer(stack)
202 .layer(
203 tower_http::decompression::DecompressionLayer::new()
204 .no_br()
205 .no_deflate()
206 .no_zstd()
207 .gzip(!config.disable_compression),
208 )
209 .into_inner();
210
211 let service = ServiceBuilder::new()
212 .layer(stack)
213 .option_layer(config.default_retry.then_some(RetryLayer::new(RetryPolicy::server_retry())))
214 .option_layer(auth_layer)
215 .layer(config.extra_headers_layer()?)
216 .layer(
217 TraceLayer::new_for_http()
220 .make_span_with(|req: &Request<Body>| {
221 tracing::debug_span!(
222 "HTTP",
223 http.method = %req.method(),
224 http.url = %req.uri(),
225 http.status_code = tracing::field::Empty,
226 otel.name = req.extensions().get::<&'static str>().unwrap_or(&"HTTP"),
227 otel.kind = "client",
228 otel.status_code = tracing::field::Empty,
229 )
230 })
231 .on_request(|_req: &Request<Body>, _span: &Span| {
232 tracing::debug!("requesting");
233 })
234 .on_response(|res: &Response<Incoming>, _latency: Duration, span: &Span| {
235 let status = res.status();
236 span.record("http.status_code", status.as_u16());
237 if status.is_client_error() || status.is_server_error() {
238 span.record("otel.status_code", "ERROR");
239 }
240 })
241 .on_body_chunk(())
243 .on_eos(|_: Option<&HeaderMap>, _duration: Duration, _span: &Span| {
244 tracing::debug!("stream closed");
245 })
246 .on_failure(|ec: ServerErrorsFailureClass, _latency: Duration, span: &Span| {
247 span.record("otel.status_code", "ERROR");
253 match ec {
254 ServerErrorsFailureClass::StatusCode(status) => {
255 span.record("http.status_code", status.as_u16());
256 tracing::error!("failed with status {}", status)
257 }
258 ServerErrorsFailureClass::Error(err) => {
259 tracing::error!("failed with error {}", err)
260 }
261 }
262 }),
263 )
264 .map_err(BoxError::from)
265 .service(client);
266
267 let (_, expiration) = config.exec_identity_pem();
268
269 let client = ClientBuilder::new(
270 service
271 .map_response_body(|body| {
272 Box::new(http_body_util::BodyExt::map_err(body, BoxError::from)) as Box<DynBody>
273 })
274 .boxed(),
275 default_ns,
276 )
277 .with_valid_until(expiration);
278
279 Ok(client)
280}
281
282#[cfg(test)]
283mod tests {
284 #[cfg(feature = "gzip")] use super::*;
285
286 #[cfg(feature = "gzip")]
287 #[tokio::test]
288 async fn test_no_accept_encoding_header_sent_when_compression_disabled()
289 -> Result<(), Box<dyn std::error::Error>> {
290 use http::Uri;
291 use std::net::SocketAddr;
292 use tokio::net::{TcpListener, TcpStream};
293
294 let addr: SocketAddr = ([127, 0, 0, 1], 0).into();
296 let listener = TcpListener::bind(addr).await?;
297 let local_addr = listener.local_addr()?;
298 let uri: Uri = format!("http://{}", local_addr).parse()?;
299
300 tokio::spawn(async move {
301 use http_body_util::Full;
302 use hyper::{server::conn::http1, service::service_fn};
303 use hyper_util::rt::{TokioIo, TokioTimer};
304 use std::convert::Infallible;
305
306 loop {
307 let (tcp, _) = listener.accept().await.unwrap();
308 let io: TokioIo<TcpStream> = TokioIo::new(tcp);
309
310 tokio::spawn(async move {
311 http1::Builder::new()
312 .timer(TokioTimer::new())
313 .serve_connection(
314 io,
315 service_fn(|req| async move {
316 let response = req
317 .headers()
318 .get(http::header::ACCEPT_ENCODING)
319 .map(|b| Bytes::copy_from_slice(b.as_bytes()))
320 .unwrap_or_default();
321 Ok::<_, Infallible>(Response::new(Full::new(response)))
322 }),
323 )
324 .await
325 .unwrap();
326 });
327 }
328 });
329
330 let config = Config { ..Config::new(uri) };
332 let client = make_generic_builder(HttpConnector::new(), config.clone())?.build();
333 let response = client.request_text(http::Request::default()).await?;
334 assert_eq!(&response, "gzip");
335
336 let config = Config {
338 disable_compression: true,
339 ..config
340 };
341 let client = make_generic_builder(HttpConnector::new(), config)?.build();
342 let response = client.request_text(http::Request::default()).await?;
343 assert_eq!(&response, "");
344
345 Ok(())
346 }
347}