hyper_util/client/legacy/client.rs
1//! The legacy HTTP Client from 0.14.x
2//!
3//! This `Client` will eventually be deconstructed into more composable parts.
4//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
5//! in much the same way it did in hyper 0.14.
6
7use std::error::Error as StdError;
8use std::fmt;
9use std::future::{poll_fn, Future};
10use std::pin::Pin;
11use std::task::{self, Poll};
12use std::time::Duration;
13
14use futures_util::future::{self, Either, FutureExt, TryFutureExt};
15use http::uri::Scheme;
16use hyper::client::conn::TrySendError as ConnTrySendError;
17use hyper::header::{HeaderValue, HOST};
18use hyper::rt::Timer;
19use hyper::{body::Body, Method, Request, Response, Uri, Version};
20use tracing::{debug, trace, warn};
21
22use super::connect::capture::CaptureConnectionExtension;
23#[cfg(feature = "tokio")]
24use super::connect::HttpConnector;
25use super::connect::{Alpn, Connect, Connected, Connection};
26use super::pool::{self, Ver};
27
28use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
29
30type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
31
32/// A Client to make outgoing HTTP requests.
33///
34/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
35/// underlying connection pool will be reused.
36#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
37pub struct Client<C, B> {
38 config: Config,
39 connector: C,
40 exec: Exec,
41 #[cfg(feature = "http1")]
42 h1_builder: hyper::client::conn::http1::Builder,
43 #[cfg(feature = "http2")]
44 h2_builder: hyper::client::conn::http2::Builder<Exec>,
45 pool: pool::Pool<PoolClient<B>, PoolKey>,
46}
47
48#[derive(Clone, Copy, Debug)]
49struct Config {
50 retry_canceled_requests: bool,
51 set_host: bool,
52 ver: Ver,
53}
54
55/// Client errors
56pub struct Error {
57 kind: ErrorKind,
58 source: Option<Box<dyn StdError + Send + Sync>>,
59 #[cfg(any(feature = "http1", feature = "http2"))]
60 connect_info: Option<Connected>,
61}
62
63#[derive(Debug)]
64enum ErrorKind {
65 Canceled,
66 ChannelClosed,
67 Connect,
68 UserUnsupportedRequestMethod,
69 UserUnsupportedVersion,
70 UserAbsoluteUriRequired,
71 SendRequest,
72}
73
74macro_rules! e {
75 ($kind:ident) => {
76 Error {
77 kind: ErrorKind::$kind,
78 source: None,
79 connect_info: None,
80 }
81 };
82 ($kind:ident, $src:expr) => {
83 Error {
84 kind: ErrorKind::$kind,
85 source: Some($src.into()),
86 connect_info: None,
87 }
88 };
89}
90
91// We might change this... :shrug:
92type PoolKey = (http::uri::Scheme, http::uri::Authority);
93
94enum TrySendError<B> {
95 Retryable {
96 error: Error,
97 req: Request<B>,
98 connection_reused: bool,
99 },
100 Nope(Error),
101}
102
103/// A `Future` that will resolve to an HTTP Response.
104///
105/// This is returned by `Client::request` (and `Client::get`).
106#[must_use = "futures do nothing unless polled"]
107pub struct ResponseFuture {
108 inner: SyncWrapper<
109 Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
110 >,
111}
112
113// ===== impl Client =====
114
115impl Client<(), ()> {
116 /// Create a builder to configure a new `Client`.
117 ///
118 /// # Example
119 ///
120 /// ```
121 /// # #[cfg(feature = "tokio")]
122 /// # fn run () {
123 /// use std::time::Duration;
124 /// use hyper_util::client::legacy::Client;
125 /// use hyper_util::rt::{TokioExecutor, TokioTimer};
126 ///
127 /// let client = Client::builder(TokioExecutor::new())
128 /// .pool_timer(TokioTimer::new())
129 /// .pool_idle_timeout(Duration::from_secs(30))
130 /// .http2_only(true)
131 /// .build_http();
132 /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
133 /// # drop(infer);
134 /// # }
135 /// # fn main() {}
136 /// ```
137 pub fn builder<E>(executor: E) -> Builder
138 where
139 E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
140 {
141 Builder::new(executor)
142 }
143}
144
145impl<C, B> Client<C, B>
146where
147 C: Connect + Clone + Send + Sync + 'static,
148 B: Body + Send + 'static + Unpin,
149 B::Data: Send,
150 B::Error: Into<Box<dyn StdError + Send + Sync>>,
151{
152 /// Send a `GET` request to the supplied `Uri`.
153 ///
154 /// # Note
155 ///
156 /// This requires that the `Body` type have a `Default` implementation.
157 /// It *should* return an "empty" version of itself, such that
158 /// `Body::is_end_stream` is `true`.
159 ///
160 /// # Example
161 ///
162 /// ```
163 /// # #[cfg(feature = "tokio")]
164 /// # fn run () {
165 /// use hyper::Uri;
166 /// use hyper_util::client::legacy::Client;
167 /// use hyper_util::rt::TokioExecutor;
168 /// use bytes::Bytes;
169 /// use http_body_util::Full;
170 ///
171 /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
172 ///
173 /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
174 /// # }
175 /// # fn main() {}
176 /// ```
177 pub fn get(&self, uri: Uri) -> ResponseFuture
178 where
179 B: Default,
180 {
181 let body = B::default();
182 if !body.is_end_stream() {
183 warn!("default Body used for get() does not return true for is_end_stream");
184 }
185
186 let mut req = Request::new(body);
187 *req.uri_mut() = uri;
188 self.request(req)
189 }
190
191 /// Send a constructed `Request` using this `Client`.
192 ///
193 /// # Example
194 ///
195 /// ```
196 /// # #[cfg(feature = "tokio")]
197 /// # fn run () {
198 /// use hyper::{Method, Request};
199 /// use hyper_util::client::legacy::Client;
200 /// use http_body_util::Full;
201 /// use hyper_util::rt::TokioExecutor;
202 /// use bytes::Bytes;
203 ///
204 /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
205 ///
206 /// let req: Request<Full<Bytes>> = Request::builder()
207 /// .method(Method::POST)
208 /// .uri("http://httpbin.org/post")
209 /// .body(Full::from("Hallo!"))
210 /// .expect("request builder");
211 ///
212 /// let future = client.request(req);
213 /// # }
214 /// # fn main() {}
215 /// ```
216 pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
217 let is_http_connect = req.method() == Method::CONNECT;
218 match req.version() {
219 Version::HTTP_11 => (),
220 Version::HTTP_10 => {
221 if is_http_connect {
222 warn!("CONNECT is not allowed for HTTP/1.0");
223 return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod)));
224 }
225 }
226 Version::HTTP_2 => (),
227 // completely unsupported HTTP version (like HTTP/0.9)!
228 other => return ResponseFuture::error_version(other),
229 };
230
231 let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
232 Ok(s) => s,
233 Err(err) => {
234 return ResponseFuture::new(future::err(err));
235 }
236 };
237
238 ResponseFuture::new(self.clone().send_request(req, pool_key))
239 }
240
241 async fn send_request(
242 self,
243 mut req: Request<B>,
244 pool_key: PoolKey,
245 ) -> Result<Response<hyper::body::Incoming>, Error> {
246 let uri = req.uri().clone();
247
248 loop {
249 req = match self.try_send_request(req, pool_key.clone()).await {
250 Ok(resp) => return Ok(resp),
251 Err(TrySendError::Nope(err)) => return Err(err),
252 Err(TrySendError::Retryable {
253 mut req,
254 error,
255 connection_reused,
256 }) => {
257 if !self.config.retry_canceled_requests || !connection_reused {
258 // if client disabled, don't retry
259 // a fresh connection means we definitely can't retry
260 return Err(error);
261 }
262
263 trace!(
264 "unstarted request canceled, trying again (reason={:?})",
265 error
266 );
267 *req.uri_mut() = uri.clone();
268 req
269 }
270 }
271 }
272 }
273
274 async fn try_send_request(
275 &self,
276 mut req: Request<B>,
277 pool_key: PoolKey,
278 ) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
279 let mut pooled = self
280 .connection_for(pool_key)
281 .await
282 // `connection_for` already retries checkout errors, so if
283 // it returns an error, there's not much else to retry
284 .map_err(TrySendError::Nope)?;
285
286 if let Some(conn) = req.extensions_mut().get_mut::<CaptureConnectionExtension>() {
287 conn.set(&pooled.conn_info);
288 }
289
290 if pooled.is_http1() {
291 if req.version() == Version::HTTP_2 {
292 warn!("Connection is HTTP/1, but request requires HTTP/2");
293 return Err(TrySendError::Nope(
294 e!(UserUnsupportedVersion).with_connect_info(pooled.conn_info.clone()),
295 ));
296 }
297
298 if self.config.set_host {
299 let uri = req.uri().clone();
300 req.headers_mut().entry(HOST).or_insert_with(|| {
301 let hostname = uri.host().expect("authority implies host");
302 if let Some(port) = get_non_default_port(&uri) {
303 let s = format!("{hostname}:{port}");
304 HeaderValue::from_maybe_shared(bytes::Bytes::from(s))
305 } else {
306 HeaderValue::from_str(hostname)
307 }
308 .expect("uri host is valid header value")
309 });
310 }
311
312 // CONNECT always sends authority-form, so check it first...
313 if req.method() == Method::CONNECT {
314 authority_form(req.uri_mut());
315 } else if pooled.conn_info.is_proxied {
316 absolute_form(req.uri_mut());
317 } else {
318 origin_form(req.uri_mut());
319 }
320 } else if req.method() == Method::CONNECT && !pooled.is_http2() {
321 authority_form(req.uri_mut());
322 }
323
324 let mut res = match pooled.try_send_request(req).await {
325 Ok(res) => res,
326 Err(mut err) => {
327 return if let Some(req) = err.take_message() {
328 Err(TrySendError::Retryable {
329 connection_reused: pooled.is_reused(),
330 error: e!(Canceled, err.into_error())
331 .with_connect_info(pooled.conn_info.clone()),
332 req,
333 })
334 } else {
335 Err(TrySendError::Nope(
336 e!(SendRequest, err.into_error())
337 .with_connect_info(pooled.conn_info.clone()),
338 ))
339 };
340 }
341 };
342
343 // If the Connector included 'extra' info, add to Response...
344 if let Some(extra) = &pooled.conn_info.extra {
345 extra.set(res.extensions_mut());
346 }
347
348 // If pooled is HTTP/2, we can toss this reference immediately.
349 //
350 // when pooled is dropped, it will try to insert back into the
351 // pool. To delay that, spawn a future that completes once the
352 // sender is ready again.
353 //
354 // This *should* only be once the related `Connection` has polled
355 // for a new request to start.
356 //
357 // It won't be ready if there is a body to stream.
358 if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
359 drop(pooled);
360 } else {
361 let on_idle = poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
362 self.exec.execute(on_idle);
363 }
364
365 Ok(res)
366 }
367
368 async fn connection_for(
369 &self,
370 pool_key: PoolKey,
371 ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
372 loop {
373 match self.one_connection_for(pool_key.clone()).await {
374 Ok(pooled) => return Ok(pooled),
375 Err(ClientConnectError::Normal(err)) => return Err(err),
376 Err(ClientConnectError::CheckoutIsClosed(reason)) => {
377 if !self.config.retry_canceled_requests {
378 return Err(e!(Connect, reason));
379 }
380
381 trace!(
382 "unstarted request canceled, trying again (reason={:?})",
383 reason,
384 );
385 continue;
386 }
387 };
388 }
389 }
390
391 async fn one_connection_for(
392 &self,
393 pool_key: PoolKey,
394 ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
395 // Return a single connection if pooling is not enabled
396 if !self.pool.is_enabled() {
397 return self
398 .connect_to(pool_key)
399 .await
400 .map_err(ClientConnectError::Normal);
401 }
402
403 // This actually races 2 different futures to try to get a ready
404 // connection the fastest, and to reduce connection churn.
405 //
406 // - If the pool has an idle connection waiting, that's used
407 // immediately.
408 // - Otherwise, the Connector is asked to start connecting to
409 // the destination Uri.
410 // - Meanwhile, the pool Checkout is watching to see if any other
411 // request finishes and tries to insert an idle connection.
412 // - If a new connection is started, but the Checkout wins after
413 // (an idle connection became available first), the started
414 // connection future is spawned into the runtime to complete,
415 // and then be inserted into the pool as an idle connection.
416 let checkout = self.pool.checkout(pool_key.clone());
417 let connect = self.connect_to(pool_key);
418 let is_ver_h2 = self.config.ver == Ver::Http2;
419
420 // The order of the `select` is depended on below...
421
422 match future::select(checkout, connect).await {
423 // Checkout won, connect future may have been started or not.
424 //
425 // If it has, let it finish and insert back into the pool,
426 // so as to not waste the socket...
427 Either::Left((Ok(checked_out), connecting)) => {
428 // This depends on the `select` above having the correct
429 // order, such that if the checkout future were ready
430 // immediately, the connect future will never have been
431 // started.
432 //
433 // If it *wasn't* ready yet, then the connect future will
434 // have been started...
435 if connecting.started() {
436 let bg = connecting
437 .map_err(|err| {
438 trace!("background connect error: {}", err);
439 })
440 .map(|_pooled| {
441 // dropping here should just place it in
442 // the Pool for us...
443 });
444 // An execute error here isn't important, we're just trying
445 // to prevent a waste of a socket...
446 self.exec.execute(bg);
447 }
448 Ok(checked_out)
449 }
450 // Connect won, checkout can just be dropped.
451 Either::Right((Ok(connected), _checkout)) => Ok(connected),
452 // Either checkout or connect could get canceled:
453 //
454 // 1. Connect is canceled if this is HTTP/2 and there is
455 // an outstanding HTTP/2 connecting task.
456 // 2. Checkout is canceled if the pool cannot deliver an
457 // idle connection reliably.
458 //
459 // In both cases, we should just wait for the other future.
460 Either::Left((Err(err), connecting)) => {
461 if err.is_canceled() {
462 connecting.await.map_err(ClientConnectError::Normal)
463 } else {
464 Err(ClientConnectError::Normal(e!(Connect, err)))
465 }
466 }
467 Either::Right((Err(err), checkout)) => {
468 if err.is_canceled() {
469 checkout.await.map_err(move |err| {
470 if is_ver_h2 && err.is_canceled() {
471 ClientConnectError::CheckoutIsClosed(err)
472 } else {
473 ClientConnectError::Normal(e!(Connect, err))
474 }
475 })
476 } else {
477 Err(ClientConnectError::Normal(err))
478 }
479 }
480 }
481 }
482
483 #[cfg(any(feature = "http1", feature = "http2"))]
484 fn connect_to(
485 &self,
486 pool_key: PoolKey,
487 ) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
488 {
489 let executor = self.exec.clone();
490 let pool = self.pool.clone();
491 #[cfg(feature = "http1")]
492 let h1_builder = self.h1_builder.clone();
493 #[cfg(feature = "http2")]
494 let h2_builder = self.h2_builder.clone();
495 let ver = self.config.ver;
496 let is_ver_h2 = ver == Ver::Http2;
497 let connector = self.connector.clone();
498 let dst = domain_as_uri(pool_key.clone());
499 hyper_lazy(move || {
500 // Try to take a "connecting lock".
501 //
502 // If the pool_key is for HTTP/2, and there is already a
503 // connection being established, then this can't take a
504 // second lock. The "connect_to" future is Canceled.
505 let connecting = match pool.connecting(&pool_key, ver) {
506 Some(lock) => lock,
507 None => {
508 let canceled = e!(Canceled);
509 // TODO
510 //crate::Error::new_canceled().with("HTTP/2 connection in progress");
511 return Either::Right(future::err(canceled));
512 }
513 };
514 Either::Left(
515 connector
516 .connect(super::connect::sealed::Internal, dst)
517 .map_err(|src| e!(Connect, src))
518 .and_then(move |io| {
519 let connected = io.connected();
520 // If ALPN is h2 and we aren't http2_only already,
521 // then we need to convert our pool checkout into
522 // a single HTTP2 one.
523 let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
524 match connecting.alpn_h2(&pool) {
525 Some(lock) => {
526 trace!("ALPN negotiated h2, updating pool");
527 lock
528 }
529 None => {
530 // Another connection has already upgraded,
531 // the pool checkout should finish up for us.
532 let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
533 return Either::Right(future::err(canceled));
534 }
535 }
536 } else {
537 connecting
538 };
539
540 #[cfg_attr(not(feature = "http2"), allow(unused))]
541 let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
542
543 Either::Left(Box::pin(async move {
544 let tx = if is_h2 {
545 #[cfg(feature = "http2")] {
546 let (mut tx, conn) =
547 h2_builder.handshake(io).await.map_err(Error::tx)?;
548
549 trace!(
550 "http2 handshake complete, spawning background dispatcher task"
551 );
552 executor.execute(
553 conn.map_err(|e| debug!("client connection error: {}", e))
554 .map(|_| ()),
555 );
556
557 // Wait for 'conn' to ready up before we
558 // declare this tx as usable
559 tx.ready().await.map_err(Error::tx)?;
560 PoolTx::Http2(tx)
561 }
562 #[cfg(not(feature = "http2"))]
563 panic!("http2 feature is not enabled");
564 } else {
565 #[cfg(feature = "http1")] {
566 // Perform the HTTP/1.1 handshake on the provided I/O stream.
567 // Uses the h1_builder to establish a connection, returning a sender (tx) for requests
568 // and a connection task (conn) that manages the connection lifecycle.
569 let (mut tx, conn) =
570 h1_builder.handshake(io).await.map_err(crate::client::legacy::client::Error::tx)?;
571
572 // Log that the HTTP/1.1 handshake has completed successfully.
573 // This indicates the connection is established and ready for request processing.
574 trace!(
575 "http1 handshake complete, spawning background dispatcher task"
576 );
577 // Create a oneshot channel to communicate errors from the connection task.
578 // err_tx sends errors from the connection task, and err_rx receives them
579 // to correlate connection failures with request readiness errors.
580 let (err_tx, err_rx) = tokio::sync::oneshot::channel();
581 // Spawn the connection task in the background using the executor.
582 // The task manages the HTTP/1.1 connection, including upgrades (e.g., WebSocket).
583 // Errors are sent via err_tx to ensure they can be checked if the sender (tx) fails.
584 executor.execute(
585 conn.with_upgrades()
586 .map_err(|e| {
587 // Log the connection error at debug level for diagnostic purposes.
588 debug!("client connection error: {:?}", e);
589 // Log that the error is being sent to the error channel.
590 trace!("sending connection error to error channel");
591 // Send the error via the oneshot channel, ignoring send failures
592 // (e.g., if the receiver is dropped, which is handled later).
593 let _ =err_tx.send(e);
594 })
595 .map(|_| ()),
596 );
597 // Log that the client is waiting for the connection to be ready.
598 // Readiness indicates the sender (tx) can accept a request without blocking.
599 trace!("waiting for connection to be ready");
600 // Check if the sender is ready to accept a request.
601 // This ensures the connection is fully established before proceeding.
602 // aka:
603 // Wait for 'conn' to ready up before we
604 // declare this tx as usable
605 match tx.ready().await {
606 // If ready, the connection is usable for sending requests.
607 Ok(_) => {
608 // Log that the connection is ready for use.
609 trace!("connection is ready");
610 // Drop the error receiver, as it’s no longer needed since the sender is ready.
611 // This prevents waiting for errors that won’t occur in a successful case.
612 drop(err_rx);
613 // Wrap the sender in PoolTx::Http1 for use in the connection pool.
614 PoolTx::Http1(tx)
615 }
616 // If the sender fails with a closed channel error, check for a specific connection error.
617 // This distinguishes between a vague ChannelClosed error and an actual connection failure.
618 Err(e) if e.is_closed() => {
619 // Log that the channel is closed, indicating a potential connection issue.
620 trace!("connection channel closed, checking for connection error");
621 // Check the oneshot channel for a specific error from the connection task.
622 match err_rx.await {
623 // If an error was received, it’s a specific connection failure.
624 Ok(err) => {
625 // Log the specific connection error for diagnostics.
626 trace!("received connection error: {:?}", err);
627 // Return the error wrapped in Error::tx to propagate it.
628 return Err(crate::client::legacy::client::Error::tx(err));
629 }
630 // If the error channel is closed, no specific error was sent.
631 // Fall back to the vague ChannelClosed error.
632 Err(_) => {
633 // Log that the error channel is closed, indicating no specific error.
634 trace!("error channel closed, returning the vague ChannelClosed error");
635 // Return the original error wrapped in Error::tx.
636 return Err(crate::client::legacy::client::Error::tx(e));
637 }
638 }
639 }
640 // For other errors (e.g., timeout, I/O issues), propagate them directly.
641 // These are not ChannelClosed errors and don’t require error channel checks.
642 Err(e) => {
643 // Log the specific readiness failure for diagnostics.
644 trace!("connection readiness failed: {:?}", e);
645 // Return the error wrapped in Error::tx to propagate it.
646 return Err(crate::client::legacy::client::Error::tx(e));
647 }
648 }
649 }
650 #[cfg(not(feature = "http1"))] {
651 panic!("http1 feature is not enabled");
652 }
653 };
654
655 Ok(pool.pooled(
656 connecting,
657 PoolClient {
658 conn_info: connected,
659 tx,
660 },
661 ))
662 }))
663 }),
664 )
665 })
666 }
667}
668
669impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
670where
671 C: Connect + Clone + Send + Sync + 'static,
672 B: Body + Send + 'static + Unpin,
673 B::Data: Send,
674 B::Error: Into<Box<dyn StdError + Send + Sync>>,
675{
676 type Response = Response<hyper::body::Incoming>;
677 type Error = Error;
678 type Future = ResponseFuture;
679
680 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
681 Poll::Ready(Ok(()))
682 }
683
684 fn call(&mut self, req: Request<B>) -> Self::Future {
685 self.request(req)
686 }
687}
688
689impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
690where
691 C: Connect + Clone + Send + Sync + 'static,
692 B: Body + Send + 'static + Unpin,
693 B::Data: Send,
694 B::Error: Into<Box<dyn StdError + Send + Sync>>,
695{
696 type Response = Response<hyper::body::Incoming>;
697 type Error = Error;
698 type Future = ResponseFuture;
699
700 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
701 Poll::Ready(Ok(()))
702 }
703
704 fn call(&mut self, req: Request<B>) -> Self::Future {
705 self.request(req)
706 }
707}
708
709impl<C: Clone, B> Clone for Client<C, B> {
710 fn clone(&self) -> Client<C, B> {
711 Client {
712 config: self.config,
713 exec: self.exec.clone(),
714 #[cfg(feature = "http1")]
715 h1_builder: self.h1_builder.clone(),
716 #[cfg(feature = "http2")]
717 h2_builder: self.h2_builder.clone(),
718 connector: self.connector.clone(),
719 pool: self.pool.clone(),
720 }
721 }
722}
723
724impl<C, B> fmt::Debug for Client<C, B> {
725 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
726 f.debug_struct("Client").finish()
727 }
728}
729
730// ===== impl ResponseFuture =====
731
732impl ResponseFuture {
733 fn new<F>(value: F) -> Self
734 where
735 F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
736 {
737 Self {
738 inner: SyncWrapper::new(Box::pin(value)),
739 }
740 }
741
742 fn error_version(ver: Version) -> Self {
743 warn!("Request has unsupported version \"{:?}\"", ver);
744 ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion))))
745 }
746}
747
748impl fmt::Debug for ResponseFuture {
749 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
750 f.pad("Future<Response>")
751 }
752}
753
754impl Future for ResponseFuture {
755 type Output = Result<Response<hyper::body::Incoming>, Error>;
756
757 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
758 self.inner.get_mut().as_mut().poll(cx)
759 }
760}
761
762// ===== impl PoolClient =====
763
764// FIXME: allow() required due to `impl Trait` leaking types to this lint
765#[allow(missing_debug_implementations)]
766struct PoolClient<B> {
767 conn_info: Connected,
768 tx: PoolTx<B>,
769}
770
771enum PoolTx<B> {
772 #[cfg(feature = "http1")]
773 Http1(hyper::client::conn::http1::SendRequest<B>),
774 #[cfg(feature = "http2")]
775 Http2(hyper::client::conn::http2::SendRequest<B>),
776}
777
778impl<B> PoolClient<B> {
779 fn poll_ready(
780 &mut self,
781 #[allow(unused_variables)] cx: &mut task::Context<'_>,
782 ) -> Poll<Result<(), Error>> {
783 match self.tx {
784 #[cfg(feature = "http1")]
785 PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
786 #[cfg(feature = "http2")]
787 PoolTx::Http2(_) => Poll::Ready(Ok(())),
788 }
789 }
790
791 fn is_http1(&self) -> bool {
792 !self.is_http2()
793 }
794
795 fn is_http2(&self) -> bool {
796 match self.tx {
797 #[cfg(feature = "http1")]
798 PoolTx::Http1(_) => false,
799 #[cfg(feature = "http2")]
800 PoolTx::Http2(_) => true,
801 }
802 }
803
804 fn is_poisoned(&self) -> bool {
805 self.conn_info.poisoned.poisoned()
806 }
807
808 fn is_ready(&self) -> bool {
809 match self.tx {
810 #[cfg(feature = "http1")]
811 PoolTx::Http1(ref tx) => tx.is_ready(),
812 #[cfg(feature = "http2")]
813 PoolTx::Http2(ref tx) => tx.is_ready(),
814 }
815 }
816}
817
818impl<B: Body + 'static> PoolClient<B> {
819 fn try_send_request(
820 &mut self,
821 req: Request<B>,
822 ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
823 where
824 B: Send,
825 {
826 #[cfg(all(feature = "http1", feature = "http2"))]
827 return match self.tx {
828 #[cfg(feature = "http1")]
829 PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
830 #[cfg(feature = "http2")]
831 PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
832 };
833
834 #[cfg(feature = "http1")]
835 #[cfg(not(feature = "http2"))]
836 return match self.tx {
837 #[cfg(feature = "http1")]
838 PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
839 };
840
841 #[cfg(not(feature = "http1"))]
842 #[cfg(feature = "http2")]
843 return match self.tx {
844 #[cfg(feature = "http2")]
845 PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
846 };
847 }
848}
849
850impl<B> pool::Poolable for PoolClient<B>
851where
852 B: Send + 'static,
853{
854 fn is_open(&self) -> bool {
855 !self.is_poisoned() && self.is_ready()
856 }
857
858 fn reserve(self) -> pool::Reservation<Self> {
859 match self.tx {
860 #[cfg(feature = "http1")]
861 PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
862 conn_info: self.conn_info,
863 tx: PoolTx::Http1(tx),
864 }),
865 #[cfg(feature = "http2")]
866 PoolTx::Http2(tx) => {
867 let b = PoolClient {
868 conn_info: self.conn_info.clone(),
869 tx: PoolTx::Http2(tx.clone()),
870 };
871 let a = PoolClient {
872 conn_info: self.conn_info,
873 tx: PoolTx::Http2(tx),
874 };
875 pool::Reservation::Shared(a, b)
876 }
877 }
878 }
879
880 fn can_share(&self) -> bool {
881 self.is_http2()
882 }
883}
884
885enum ClientConnectError {
886 Normal(Error),
887 CheckoutIsClosed(pool::Error),
888}
889
890fn origin_form(uri: &mut Uri) {
891 let path = match uri.path_and_query() {
892 Some(path) if path.as_str() != "/" => {
893 let mut parts = ::http::uri::Parts::default();
894 parts.path_and_query = Some(path.clone());
895 Uri::from_parts(parts).expect("path is valid uri")
896 }
897 _none_or_just_slash => {
898 debug_assert!(Uri::default() == "/");
899 Uri::default()
900 }
901 };
902 *uri = path
903}
904
905fn absolute_form(uri: &mut Uri) {
906 debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
907 debug_assert!(
908 uri.authority().is_some(),
909 "absolute_form needs an authority"
910 );
911}
912
913fn authority_form(uri: &mut Uri) {
914 if let Some(path) = uri.path_and_query() {
915 // `https://hyper.rs` would parse with `/` path, don't
916 // annoy people about that...
917 if path != "/" {
918 warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
919 }
920 }
921 *uri = match uri.authority() {
922 Some(auth) => {
923 let mut parts = ::http::uri::Parts::default();
924 parts.authority = Some(auth.clone());
925 Uri::from_parts(parts).expect("authority is valid")
926 }
927 None => {
928 unreachable!("authority_form with relative uri");
929 }
930 };
931}
932
933fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
934 let uri_clone = uri.clone();
935 match (uri_clone.scheme(), uri_clone.authority()) {
936 (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
937 (None, Some(auth)) if is_http_connect => {
938 let scheme = match auth.port_u16() {
939 Some(443) => {
940 set_scheme(uri, Scheme::HTTPS);
941 Scheme::HTTPS
942 }
943 _ => {
944 set_scheme(uri, Scheme::HTTP);
945 Scheme::HTTP
946 }
947 };
948 Ok((scheme, auth.clone()))
949 }
950 _ => {
951 debug!("Client requires absolute-form URIs, received: {:?}", uri);
952 Err(e!(UserAbsoluteUriRequired))
953 }
954 }
955}
956
957fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
958 http::uri::Builder::new()
959 .scheme(scheme)
960 .authority(auth)
961 .path_and_query("/")
962 .build()
963 .expect("domain is valid Uri")
964}
965
966fn set_scheme(uri: &mut Uri, scheme: Scheme) {
967 debug_assert!(
968 uri.scheme().is_none(),
969 "set_scheme expects no existing scheme"
970 );
971 let old = std::mem::take(uri);
972 let mut parts: ::http::uri::Parts = old.into();
973 parts.scheme = Some(scheme);
974 parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
975 *uri = Uri::from_parts(parts).expect("scheme is valid");
976}
977
978fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
979 match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
980 (Some(443), true) => None,
981 (Some(80), false) => None,
982 _ => uri.port(),
983 }
984}
985
986fn is_schema_secure(uri: &Uri) -> bool {
987 uri.scheme_str()
988 .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
989 .unwrap_or_default()
990}
991
992/// A builder to configure a new [`Client`](Client).
993///
994/// # Example
995///
996/// ```
997/// # #[cfg(feature = "tokio")]
998/// # fn run () {
999/// use std::time::Duration;
1000/// use hyper_util::client::legacy::Client;
1001/// use hyper_util::rt::TokioExecutor;
1002///
1003/// let client = Client::builder(TokioExecutor::new())
1004/// .pool_idle_timeout(Duration::from_secs(30))
1005/// .http2_only(true)
1006/// .build_http();
1007/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1008/// # drop(infer);
1009/// # }
1010/// # fn main() {}
1011/// ```
1012#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
1013#[derive(Clone)]
1014pub struct Builder {
1015 client_config: Config,
1016 exec: Exec,
1017 #[cfg(feature = "http1")]
1018 h1_builder: hyper::client::conn::http1::Builder,
1019 #[cfg(feature = "http2")]
1020 h2_builder: hyper::client::conn::http2::Builder<Exec>,
1021 pool_config: pool::Config,
1022 pool_timer: Option<timer::Timer>,
1023}
1024
1025impl Builder {
1026 /// Construct a new Builder.
1027 pub fn new<E>(executor: E) -> Self
1028 where
1029 E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
1030 {
1031 let exec = Exec::new(executor);
1032 Self {
1033 client_config: Config {
1034 retry_canceled_requests: true,
1035 set_host: true,
1036 ver: Ver::Auto,
1037 },
1038 exec: exec.clone(),
1039 #[cfg(feature = "http1")]
1040 h1_builder: hyper::client::conn::http1::Builder::new(),
1041 #[cfg(feature = "http2")]
1042 h2_builder: hyper::client::conn::http2::Builder::new(exec),
1043 pool_config: pool::Config {
1044 idle_timeout: Some(Duration::from_secs(90)),
1045 max_idle_per_host: usize::MAX,
1046 },
1047 pool_timer: None,
1048 }
1049 }
1050 /// Set an optional timeout for idle sockets being kept-alive.
1051 /// A `Timer` is required for this to take effect. See `Builder::pool_timer`
1052 ///
1053 /// Pass `None` to disable timeout.
1054 ///
1055 /// Default is 90 seconds.
1056 ///
1057 /// # Example
1058 ///
1059 /// ```
1060 /// # #[cfg(feature = "tokio")]
1061 /// # fn run () {
1062 /// use std::time::Duration;
1063 /// use hyper_util::client::legacy::Client;
1064 /// use hyper_util::rt::{TokioExecutor, TokioTimer};
1065 ///
1066 /// let client = Client::builder(TokioExecutor::new())
1067 /// .pool_idle_timeout(Duration::from_secs(30))
1068 /// .pool_timer(TokioTimer::new())
1069 /// .build_http();
1070 ///
1071 /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1072 /// # }
1073 /// # fn main() {}
1074 /// ```
1075 pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
1076 where
1077 D: Into<Option<Duration>>,
1078 {
1079 self.pool_config.idle_timeout = val.into();
1080 self
1081 }
1082
1083 #[doc(hidden)]
1084 #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
1085 pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1086 self.pool_config.max_idle_per_host = max_idle;
1087 self
1088 }
1089
1090 /// Sets the maximum idle connection per host allowed in the pool.
1091 ///
1092 /// Default is `usize::MAX` (no limit).
1093 pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1094 self.pool_config.max_idle_per_host = max_idle;
1095 self
1096 }
1097
1098 // HTTP/1 options
1099
1100 /// Sets the exact size of the read buffer to *always* use.
1101 ///
1102 /// Note that setting this option unsets the `http1_max_buf_size` option.
1103 ///
1104 /// Default is an adaptive read buffer.
1105 #[cfg(feature = "http1")]
1106 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1107 pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1108 self.h1_builder.read_buf_exact_size(Some(sz));
1109 self
1110 }
1111
1112 /// Set the maximum buffer size for the connection.
1113 ///
1114 /// Default is ~400kb.
1115 ///
1116 /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1117 ///
1118 /// # Panics
1119 ///
1120 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1121 #[cfg(feature = "http1")]
1122 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1123 pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1124 self.h1_builder.max_buf_size(max);
1125 self
1126 }
1127
1128 /// Set whether HTTP/1 connections will accept spaces between header names
1129 /// and the colon that follow them in responses.
1130 ///
1131 /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1132 /// parsing.
1133 ///
1134 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1135 /// to say about it:
1136 ///
1137 /// > No whitespace is allowed between the header field-name and colon. In
1138 /// > the past, differences in the handling of such whitespace have led to
1139 /// > security vulnerabilities in request routing and response handling. A
1140 /// > server MUST reject any received request message that contains
1141 /// > whitespace between a header field-name and colon with a response code
1142 /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1143 /// > response message before forwarding the message downstream.
1144 ///
1145 /// Note that this setting does not affect HTTP/2.
1146 ///
1147 /// Default is false.
1148 ///
1149 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1150 #[cfg(feature = "http1")]
1151 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1152 pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1153 self.h1_builder
1154 .allow_spaces_after_header_name_in_responses(val);
1155 self
1156 }
1157
1158 /// Set whether HTTP/1 connections will accept obsolete line folding for
1159 /// header values.
1160 ///
1161 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1162 /// to say about it:
1163 ///
1164 /// > A server that receives an obs-fold in a request message that is not
1165 /// > within a message/http container MUST either reject the message by
1166 /// > sending a 400 (Bad Request), preferably with a representation
1167 /// > explaining that obsolete line folding is unacceptable, or replace
1168 /// > each received obs-fold with one or more SP octets prior to
1169 /// > interpreting the field value or forwarding the message downstream.
1170 ///
1171 /// > A proxy or gateway that receives an obs-fold in a response message
1172 /// > that is not within a message/http container MUST either discard the
1173 /// > message and replace it with a 502 (Bad Gateway) response, preferably
1174 /// > with a representation explaining that unacceptable line folding was
1175 /// > received, or replace each received obs-fold with one or more SP
1176 /// > octets prior to interpreting the field value or forwarding the
1177 /// > message downstream.
1178 ///
1179 /// > A user agent that receives an obs-fold in a response message that is
1180 /// > not within a message/http container MUST replace each received
1181 /// > obs-fold with one or more SP octets prior to interpreting the field
1182 /// > value.
1183 ///
1184 /// Note that this setting does not affect HTTP/2.
1185 ///
1186 /// Default is false.
1187 ///
1188 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1189 #[cfg(feature = "http1")]
1190 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1191 pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1192 self.h1_builder
1193 .allow_obsolete_multiline_headers_in_responses(val);
1194 self
1195 }
1196
1197 /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1198 ///
1199 /// This mimics the behaviour of major browsers. You probably don't want this.
1200 /// You should only want this if you are implementing a proxy whose main
1201 /// purpose is to sit in front of browsers whose users access arbitrary content
1202 /// which may be malformed, and they expect everything that works without
1203 /// the proxy to keep working with the proxy.
1204 ///
1205 /// This option will prevent Hyper's client from returning an error encountered
1206 /// when parsing a header, except if the error was caused by the character NUL
1207 /// (ASCII code 0), as Chrome specifically always reject those.
1208 ///
1209 /// The ignorable errors are:
1210 /// * empty header names;
1211 /// * characters that are not allowed in header names, except for `\0` and `\r`;
1212 /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1213 /// spaces and tabs between the header name and the colon;
1214 /// * missing colon between header name and colon;
1215 /// * characters that are not allowed in header values except for `\0` and `\r`.
1216 ///
1217 /// If an ignorable error is encountered, the parser tries to find the next
1218 /// line in the input to resume parsing the rest of the headers. An error
1219 /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1220 /// looking for the next line.
1221 #[cfg(feature = "http1")]
1222 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1223 pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1224 self.h1_builder.ignore_invalid_headers_in_responses(val);
1225 self
1226 }
1227
1228 /// Set whether HTTP/1 connections should try to use vectored writes,
1229 /// or always flatten into a single buffer.
1230 ///
1231 /// Note that setting this to false may mean more copies of body data,
1232 /// but may also improve performance when an IO transport doesn't
1233 /// support vectored writes well, such as most TLS implementations.
1234 ///
1235 /// Setting this to true will force hyper to use queued strategy
1236 /// which may eliminate unnecessary cloning on some TLS backends
1237 ///
1238 /// Default is `auto`. In this mode hyper will try to guess which
1239 /// mode to use
1240 #[cfg(feature = "http1")]
1241 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1242 pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1243 self.h1_builder.writev(enabled);
1244 self
1245 }
1246
1247 /// Set whether HTTP/1 connections will write header names as title case at
1248 /// the socket level.
1249 ///
1250 /// Note that this setting does not affect HTTP/2.
1251 ///
1252 /// Default is false.
1253 #[cfg(feature = "http1")]
1254 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1255 pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1256 self.h1_builder.title_case_headers(val);
1257 self
1258 }
1259
1260 /// Set whether to support preserving original header cases.
1261 ///
1262 /// Currently, this will record the original cases received, and store them
1263 /// in a private extension on the `Response`. It will also look for and use
1264 /// such an extension in any provided `Request`.
1265 ///
1266 /// Since the relevant extension is still private, there is no way to
1267 /// interact with the original cases. The only effect this can have now is
1268 /// to forward the cases in a proxy-like fashion.
1269 ///
1270 /// Note that this setting does not affect HTTP/2.
1271 ///
1272 /// Default is false.
1273 #[cfg(feature = "http1")]
1274 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1275 pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1276 self.h1_builder.preserve_header_case(val);
1277 self
1278 }
1279
1280 /// Set the maximum number of headers.
1281 ///
1282 /// When a response is received, the parser will reserve a buffer to store headers for optimal
1283 /// performance.
1284 ///
1285 /// If client receives more headers than the buffer size, the error "message header too large"
1286 /// is returned.
1287 ///
1288 /// The headers is allocated on the stack by default, which has higher performance. After
1289 /// setting this value, headers will be allocated in heap memory, that is, heap memory
1290 /// allocation will occur for each response, and there will be a performance drop of about 5%.
1291 ///
1292 /// Note that this setting does not affect HTTP/2.
1293 ///
1294 /// Default is 100.
1295 #[cfg(feature = "http1")]
1296 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1297 pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
1298 self.h1_builder.max_headers(val);
1299 self
1300 }
1301
1302 /// Set whether HTTP/0.9 responses should be tolerated.
1303 ///
1304 /// Default is false.
1305 #[cfg(feature = "http1")]
1306 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1307 pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1308 self.h1_builder.http09_responses(val);
1309 self
1310 }
1311
1312 /// Set whether the connection **must** use HTTP/2.
1313 ///
1314 /// The destination must either allow HTTP2 Prior Knowledge, or the
1315 /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1316 /// as part of the connection process. This will not make the `Client`
1317 /// utilize ALPN by itself.
1318 ///
1319 /// Note that setting this to true prevents HTTP/1 from being allowed.
1320 ///
1321 /// Default is false.
1322 #[cfg(feature = "http2")]
1323 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1324 pub fn http2_only(&mut self, val: bool) -> &mut Self {
1325 self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1326 self
1327 }
1328
1329 /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
1330 ///
1331 /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
1332 /// As of v0.4.0, it is 20.
1333 ///
1334 /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
1335 #[cfg(feature = "http2")]
1336 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1337 pub fn http2_max_pending_accept_reset_streams(
1338 &mut self,
1339 max: impl Into<Option<usize>>,
1340 ) -> &mut Self {
1341 self.h2_builder.max_pending_accept_reset_streams(max.into());
1342 self
1343 }
1344
1345 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1346 /// stream-level flow control.
1347 ///
1348 /// Passing `None` will do nothing.
1349 ///
1350 /// If not set, hyper will use a default.
1351 ///
1352 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1353 #[cfg(feature = "http2")]
1354 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1355 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1356 self.h2_builder.initial_stream_window_size(sz.into());
1357 self
1358 }
1359
1360 /// Sets the max connection-level flow control for HTTP2
1361 ///
1362 /// Passing `None` will do nothing.
1363 ///
1364 /// If not set, hyper will use a default.
1365 #[cfg(feature = "http2")]
1366 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1367 pub fn http2_initial_connection_window_size(
1368 &mut self,
1369 sz: impl Into<Option<u32>>,
1370 ) -> &mut Self {
1371 self.h2_builder.initial_connection_window_size(sz.into());
1372 self
1373 }
1374
1375 /// Sets the initial maximum of locally initiated (send) streams.
1376 ///
1377 /// This value will be overwritten by the value included in the initial
1378 /// SETTINGS frame received from the peer as part of a [connection preface].
1379 ///
1380 /// Passing `None` will do nothing.
1381 ///
1382 /// If not set, hyper will use a default.
1383 ///
1384 /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
1385 #[cfg(feature = "http2")]
1386 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1387 pub fn http2_initial_max_send_streams(
1388 &mut self,
1389 initial: impl Into<Option<usize>>,
1390 ) -> &mut Self {
1391 self.h2_builder.initial_max_send_streams(initial);
1392 self
1393 }
1394
1395 /// Sets whether to use an adaptive flow control.
1396 ///
1397 /// Enabling this will override the limits set in
1398 /// `http2_initial_stream_window_size` and
1399 /// `http2_initial_connection_window_size`.
1400 #[cfg(feature = "http2")]
1401 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1402 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1403 self.h2_builder.adaptive_window(enabled);
1404 self
1405 }
1406
1407 /// Sets the maximum frame size to use for HTTP2.
1408 ///
1409 /// Passing `None` will do nothing.
1410 ///
1411 /// If not set, hyper will use a default.
1412 #[cfg(feature = "http2")]
1413 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1414 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1415 self.h2_builder.max_frame_size(sz);
1416 self
1417 }
1418
1419 /// Sets the max size of received header frames for HTTP2.
1420 ///
1421 /// Default is currently 16KB, but can change.
1422 #[cfg(feature = "http2")]
1423 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1424 pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
1425 self.h2_builder.max_header_list_size(max);
1426 self
1427 }
1428
1429 /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1430 /// connection alive.
1431 ///
1432 /// Pass `None` to disable HTTP2 keep-alive.
1433 ///
1434 /// Default is currently disabled.
1435 ///
1436 /// # Cargo Feature
1437 ///
1438 /// Requires the `tokio` cargo feature to be enabled.
1439 #[cfg(feature = "tokio")]
1440 #[cfg(feature = "http2")]
1441 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1442 pub fn http2_keep_alive_interval(
1443 &mut self,
1444 interval: impl Into<Option<Duration>>,
1445 ) -> &mut Self {
1446 self.h2_builder.keep_alive_interval(interval);
1447 self
1448 }
1449
1450 /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1451 ///
1452 /// If the ping is not acknowledged within the timeout, the connection will
1453 /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1454 ///
1455 /// Default is 20 seconds.
1456 ///
1457 /// # Cargo Feature
1458 ///
1459 /// Requires the `tokio` cargo feature to be enabled.
1460 #[cfg(feature = "tokio")]
1461 #[cfg(feature = "http2")]
1462 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1463 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1464 self.h2_builder.keep_alive_timeout(timeout);
1465 self
1466 }
1467
1468 /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1469 ///
1470 /// If disabled, keep-alive pings are only sent while there are open
1471 /// request/responses streams. If enabled, pings are also sent when no
1472 /// streams are active. Does nothing if `http2_keep_alive_interval` is
1473 /// disabled.
1474 ///
1475 /// Default is `false`.
1476 ///
1477 /// # Cargo Feature
1478 ///
1479 /// Requires the `tokio` cargo feature to be enabled.
1480 #[cfg(feature = "tokio")]
1481 #[cfg(feature = "http2")]
1482 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1483 pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1484 self.h2_builder.keep_alive_while_idle(enabled);
1485 self
1486 }
1487
1488 /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1489 ///
1490 /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1491 /// details.
1492 ///
1493 /// The default value is determined by the `h2` crate.
1494 ///
1495 /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1496 #[cfg(feature = "http2")]
1497 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1498 pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1499 self.h2_builder.max_concurrent_reset_streams(max);
1500 self
1501 }
1502
1503 /// Provide a timer to be used for h2
1504 ///
1505 /// See the documentation of [`h2::client::Builder::timer`] for more
1506 /// details.
1507 ///
1508 /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
1509 pub fn timer<M>(&mut self, timer: M) -> &mut Self
1510 where
1511 M: Timer + Send + Sync + 'static,
1512 {
1513 #[cfg(feature = "http2")]
1514 self.h2_builder.timer(timer);
1515 self
1516 }
1517
1518 /// Provide a timer to be used for timeouts and intervals in connection pools.
1519 pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
1520 where
1521 M: Timer + Clone + Send + Sync + 'static,
1522 {
1523 self.pool_timer = Some(timer::Timer::new(timer.clone()));
1524 self
1525 }
1526
1527 /// Set the maximum write buffer size for each HTTP/2 stream.
1528 ///
1529 /// Default is currently 1MB, but may change.
1530 ///
1531 /// # Panics
1532 ///
1533 /// The value must be no larger than `u32::MAX`.
1534 #[cfg(feature = "http2")]
1535 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1536 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1537 self.h2_builder.max_send_buf_size(max);
1538 self
1539 }
1540
1541 /// Set whether to retry requests that get disrupted before ever starting
1542 /// to write.
1543 ///
1544 /// This means a request that is queued, and gets given an idle, reused
1545 /// connection, and then encounters an error immediately as the idle
1546 /// connection was found to be unusable.
1547 ///
1548 /// When this is set to `false`, the related `ResponseFuture` would instead
1549 /// resolve to an `Error::Cancel`.
1550 ///
1551 /// Default is `true`.
1552 #[inline]
1553 pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1554 self.client_config.retry_canceled_requests = val;
1555 self
1556 }
1557
1558 /// Set whether to automatically add the `Host` header to requests.
1559 ///
1560 /// If true, and a request does not include a `Host` header, one will be
1561 /// added automatically, derived from the authority of the `Uri`.
1562 ///
1563 /// Default is `true`.
1564 #[inline]
1565 pub fn set_host(&mut self, val: bool) -> &mut Self {
1566 self.client_config.set_host = val;
1567 self
1568 }
1569
1570 /// Build a client with this configuration and the default `HttpConnector`.
1571 #[cfg(feature = "tokio")]
1572 pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1573 where
1574 B: Body + Send,
1575 B::Data: Send,
1576 {
1577 let mut connector = HttpConnector::new();
1578 if self.pool_config.is_enabled() {
1579 connector.set_keepalive(self.pool_config.idle_timeout);
1580 }
1581 self.build(connector)
1582 }
1583
1584 /// Combine the configuration of this builder with a connector to create a `Client`.
1585 pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1586 where
1587 C: Connect + Clone,
1588 B: Body + Send,
1589 B::Data: Send,
1590 {
1591 let exec = self.exec.clone();
1592 let timer = self.pool_timer.clone();
1593 Client {
1594 config: self.client_config,
1595 exec: exec.clone(),
1596 #[cfg(feature = "http1")]
1597 h1_builder: self.h1_builder.clone(),
1598 #[cfg(feature = "http2")]
1599 h2_builder: self.h2_builder.clone(),
1600 connector,
1601 pool: pool::Pool::new(self.pool_config, exec, timer),
1602 }
1603 }
1604}
1605
1606impl fmt::Debug for Builder {
1607 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1608 f.debug_struct("Builder")
1609 .field("client_config", &self.client_config)
1610 .field("pool_config", &self.pool_config)
1611 .finish()
1612 }
1613}
1614
1615// ==== impl Error ====
1616
1617impl fmt::Debug for Error {
1618 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1619 let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
1620 f.field(&self.kind);
1621 if let Some(ref cause) = self.source {
1622 f.field(cause);
1623 }
1624 f.finish()
1625 }
1626}
1627
1628impl fmt::Display for Error {
1629 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1630 write!(f, "client error ({:?})", self.kind)
1631 }
1632}
1633
1634impl StdError for Error {
1635 fn source(&self) -> Option<&(dyn StdError + 'static)> {
1636 self.source.as_ref().map(|e| &**e as _)
1637 }
1638}
1639
1640impl Error {
1641 /// Returns true if this was an error from `Connect`.
1642 pub fn is_connect(&self) -> bool {
1643 matches!(self.kind, ErrorKind::Connect)
1644 }
1645
1646 /// Returns the info of the client connection on which this error occurred.
1647 #[cfg(any(feature = "http1", feature = "http2"))]
1648 pub fn connect_info(&self) -> Option<&Connected> {
1649 self.connect_info.as_ref()
1650 }
1651
1652 #[cfg(any(feature = "http1", feature = "http2"))]
1653 fn with_connect_info(self, connect_info: Connected) -> Self {
1654 Self {
1655 connect_info: Some(connect_info),
1656 ..self
1657 }
1658 }
1659 fn is_canceled(&self) -> bool {
1660 matches!(self.kind, ErrorKind::Canceled)
1661 }
1662
1663 fn tx(src: hyper::Error) -> Self {
1664 e!(SendRequest, src)
1665 }
1666
1667 fn closed(src: hyper::Error) -> Self {
1668 e!(ChannelClosed, src)
1669 }
1670}