Skip to main content

hyper_util/client/legacy/
client.rs

1//! The legacy HTTP Client from 0.14.x
2//!
3//! This `Client` will eventually be deconstructed into more composable parts.
4//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
5//! in much the same way it did in hyper 0.14.
6
7use std::error::Error as StdError;
8use std::fmt;
9use std::future::{poll_fn, Future};
10use std::pin::Pin;
11use std::task::{self, Poll};
12use std::time::Duration;
13
14use futures_util::future::{self, Either, FutureExt, TryFutureExt};
15use http::uri::Scheme;
16use hyper::client::conn::TrySendError as ConnTrySendError;
17use hyper::header::{HeaderValue, HOST};
18use hyper::rt::Timer;
19use hyper::{body::Body, Method, Request, Response, Uri, Version};
20use tracing::{debug, trace, warn};
21
22use super::connect::capture::CaptureConnectionExtension;
23#[cfg(feature = "tokio")]
24use super::connect::HttpConnector;
25use super::connect::{Alpn, Connect, Connected, Connection};
26use super::pool::{self, Ver};
27
28use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
29
30type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
31
32/// A Client to make outgoing HTTP requests.
33///
34/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
35/// underlying connection pool will be reused.
36#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
37pub struct Client<C, B> {
38    config: Config,
39    connector: C,
40    exec: Exec,
41    #[cfg(feature = "http1")]
42    h1_builder: hyper::client::conn::http1::Builder,
43    #[cfg(feature = "http2")]
44    h2_builder: hyper::client::conn::http2::Builder<Exec>,
45    pool: pool::Pool<PoolClient<B>, PoolKey>,
46}
47
48#[derive(Clone, Copy, Debug)]
49struct Config {
50    retry_canceled_requests: bool,
51    set_host: bool,
52    ver: Ver,
53}
54
55/// Client errors
56pub struct Error {
57    kind: ErrorKind,
58    source: Option<Box<dyn StdError + Send + Sync>>,
59    #[cfg(any(feature = "http1", feature = "http2"))]
60    connect_info: Option<Connected>,
61}
62
63#[derive(Debug)]
64enum ErrorKind {
65    Canceled,
66    ChannelClosed,
67    Connect,
68    UserUnsupportedRequestMethod,
69    UserUnsupportedVersion,
70    UserAbsoluteUriRequired,
71    SendRequest,
72}
73
74macro_rules! e {
75    ($kind:ident) => {
76        Error {
77            kind: ErrorKind::$kind,
78            source: None,
79            connect_info: None,
80        }
81    };
82    ($kind:ident, $src:expr) => {
83        Error {
84            kind: ErrorKind::$kind,
85            source: Some($src.into()),
86            connect_info: None,
87        }
88    };
89}
90
91// We might change this... :shrug:
92type PoolKey = (http::uri::Scheme, http::uri::Authority);
93
94enum TrySendError<B> {
95    Retryable {
96        error: Error,
97        req: Request<B>,
98        connection_reused: bool,
99    },
100    Nope(Error),
101}
102
103/// A `Future` that will resolve to an HTTP Response.
104///
105/// This is returned by `Client::request` (and `Client::get`).
106#[must_use = "futures do nothing unless polled"]
107pub struct ResponseFuture {
108    inner: SyncWrapper<
109        Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
110    >,
111}
112
113// ===== impl Client =====
114
115impl Client<(), ()> {
116    /// Create a builder to configure a new `Client`.
117    ///
118    /// # Example
119    ///
120    /// ```
121    /// # #[cfg(feature = "tokio")]
122    /// # fn run () {
123    /// use std::time::Duration;
124    /// use hyper_util::client::legacy::Client;
125    /// use hyper_util::rt::{TokioExecutor, TokioTimer};
126    ///
127    /// let client = Client::builder(TokioExecutor::new())
128    ///     .pool_timer(TokioTimer::new())
129    ///     .pool_idle_timeout(Duration::from_secs(30))
130    ///     .http2_only(true)
131    ///     .build_http();
132    /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
133    /// # drop(infer);
134    /// # }
135    /// # fn main() {}
136    /// ```
137    pub fn builder<E>(executor: E) -> Builder
138    where
139        E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
140    {
141        Builder::new(executor)
142    }
143}
144
145impl<C, B> Client<C, B>
146where
147    C: Connect + Clone + Send + Sync + 'static,
148    B: Body + Send + 'static + Unpin,
149    B::Data: Send,
150    B::Error: Into<Box<dyn StdError + Send + Sync>>,
151{
152    /// Send a `GET` request to the supplied `Uri`.
153    ///
154    /// # Note
155    ///
156    /// This requires that the `Body` type have a `Default` implementation.
157    /// It *should* return an "empty" version of itself, such that
158    /// `Body::is_end_stream` is `true`.
159    ///
160    /// # Example
161    ///
162    /// ```
163    /// # #[cfg(feature = "tokio")]
164    /// # fn run () {
165    /// use hyper::Uri;
166    /// use hyper_util::client::legacy::Client;
167    /// use hyper_util::rt::TokioExecutor;
168    /// use bytes::Bytes;
169    /// use http_body_util::Full;
170    ///
171    /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
172    ///
173    /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
174    /// # }
175    /// # fn main() {}
176    /// ```
177    pub fn get(&self, uri: Uri) -> ResponseFuture
178    where
179        B: Default,
180    {
181        let body = B::default();
182        if !body.is_end_stream() {
183            warn!("default Body used for get() does not return true for is_end_stream");
184        }
185
186        let mut req = Request::new(body);
187        *req.uri_mut() = uri;
188        self.request(req)
189    }
190
191    /// Send a constructed `Request` using this `Client`.
192    ///
193    /// # Example
194    ///
195    /// ```
196    /// # #[cfg(feature = "tokio")]
197    /// # fn run () {
198    /// use hyper::{Method, Request};
199    /// use hyper_util::client::legacy::Client;
200    /// use http_body_util::Full;
201    /// use hyper_util::rt::TokioExecutor;
202    /// use bytes::Bytes;
203    ///
204    /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
205    ///
206    /// let req: Request<Full<Bytes>> = Request::builder()
207    ///     .method(Method::POST)
208    ///     .uri("http://httpbin.org/post")
209    ///     .body(Full::from("Hallo!"))
210    ///     .expect("request builder");
211    ///
212    /// let future = client.request(req);
213    /// # }
214    /// # fn main() {}
215    /// ```
216    pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
217        let is_http_connect = req.method() == Method::CONNECT;
218        match req.version() {
219            Version::HTTP_11 => (),
220            Version::HTTP_10 => {
221                if is_http_connect {
222                    warn!("CONNECT is not allowed for HTTP/1.0");
223                    return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod)));
224                }
225            }
226            Version::HTTP_2 => (),
227            // completely unsupported HTTP version (like HTTP/0.9)!
228            other => return ResponseFuture::error_version(other),
229        };
230
231        let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
232            Ok(s) => s,
233            Err(err) => {
234                return ResponseFuture::new(future::err(err));
235            }
236        };
237
238        ResponseFuture::new(self.clone().send_request(req, pool_key))
239    }
240
241    async fn send_request(
242        self,
243        mut req: Request<B>,
244        pool_key: PoolKey,
245    ) -> Result<Response<hyper::body::Incoming>, Error> {
246        let uri = req.uri().clone();
247
248        loop {
249            req = match self.try_send_request(req, pool_key.clone()).await {
250                Ok(resp) => return Ok(resp),
251                Err(TrySendError::Nope(err)) => return Err(err),
252                Err(TrySendError::Retryable {
253                    mut req,
254                    error,
255                    connection_reused,
256                }) => {
257                    if !self.config.retry_canceled_requests || !connection_reused {
258                        // if client disabled, don't retry
259                        // a fresh connection means we definitely can't retry
260                        return Err(error);
261                    }
262
263                    trace!(
264                        "unstarted request canceled, trying again (reason={:?})",
265                        error
266                    );
267                    *req.uri_mut() = uri.clone();
268                    req
269                }
270            }
271        }
272    }
273
274    async fn try_send_request(
275        &self,
276        mut req: Request<B>,
277        pool_key: PoolKey,
278    ) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
279        let mut pooled = self
280            .connection_for(pool_key)
281            .await
282            // `connection_for` already retries checkout errors, so if
283            // it returns an error, there's not much else to retry
284            .map_err(TrySendError::Nope)?;
285
286        if let Some(conn) = req.extensions_mut().get_mut::<CaptureConnectionExtension>() {
287            conn.set(&pooled.conn_info);
288        }
289
290        if pooled.is_http1() {
291            if req.version() == Version::HTTP_2 {
292                warn!("Connection is HTTP/1, but request requires HTTP/2");
293                return Err(TrySendError::Nope(
294                    e!(UserUnsupportedVersion).with_connect_info(pooled.conn_info.clone()),
295                ));
296            }
297
298            if self.config.set_host {
299                let uri = req.uri().clone();
300                req.headers_mut().entry(HOST).or_insert_with(|| {
301                    let hostname = uri.host().expect("authority implies host");
302                    if let Some(port) = get_non_default_port(&uri) {
303                        let s = format!("{hostname}:{port}");
304                        HeaderValue::from_maybe_shared(bytes::Bytes::from(s))
305                    } else {
306                        HeaderValue::from_str(hostname)
307                    }
308                    .expect("uri host is valid header value")
309                });
310            }
311
312            // CONNECT always sends authority-form, so check it first...
313            if req.method() == Method::CONNECT {
314                authority_form(req.uri_mut());
315            } else if pooled.conn_info.is_proxied {
316                absolute_form(req.uri_mut());
317            } else {
318                origin_form(req.uri_mut());
319            }
320        } else if req.method() == Method::CONNECT && !pooled.is_http2() {
321            authority_form(req.uri_mut());
322        }
323
324        let mut res = match pooled.try_send_request(req).await {
325            Ok(res) => res,
326            Err(mut err) => {
327                return if let Some(req) = err.take_message() {
328                    Err(TrySendError::Retryable {
329                        connection_reused: pooled.is_reused(),
330                        error: e!(Canceled, err.into_error())
331                            .with_connect_info(pooled.conn_info.clone()),
332                        req,
333                    })
334                } else {
335                    Err(TrySendError::Nope(
336                        e!(SendRequest, err.into_error())
337                            .with_connect_info(pooled.conn_info.clone()),
338                    ))
339                };
340            }
341        };
342
343        // If the Connector included 'extra' info, add to Response...
344        if let Some(extra) = &pooled.conn_info.extra {
345            extra.set(res.extensions_mut());
346        }
347
348        // If pooled is HTTP/2, we can toss this reference immediately.
349        //
350        // when pooled is dropped, it will try to insert back into the
351        // pool. To delay that, spawn a future that completes once the
352        // sender is ready again.
353        //
354        // This *should* only be once the related `Connection` has polled
355        // for a new request to start.
356        //
357        // It won't be ready if there is a body to stream.
358        if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
359            drop(pooled);
360        } else {
361            let on_idle = poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
362            self.exec.execute(on_idle);
363        }
364
365        Ok(res)
366    }
367
368    async fn connection_for(
369        &self,
370        pool_key: PoolKey,
371    ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
372        loop {
373            match self.one_connection_for(pool_key.clone()).await {
374                Ok(pooled) => return Ok(pooled),
375                Err(ClientConnectError::Normal(err)) => return Err(err),
376                Err(ClientConnectError::CheckoutIsClosed(reason)) => {
377                    if !self.config.retry_canceled_requests {
378                        return Err(e!(Connect, reason));
379                    }
380
381                    trace!(
382                        "unstarted request canceled, trying again (reason={:?})",
383                        reason,
384                    );
385                    continue;
386                }
387            };
388        }
389    }
390
391    async fn one_connection_for(
392        &self,
393        pool_key: PoolKey,
394    ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
395        // Return a single connection if pooling is not enabled
396        if !self.pool.is_enabled() {
397            return self
398                .connect_to(pool_key)
399                .await
400                .map_err(ClientConnectError::Normal);
401        }
402
403        // This actually races 2 different futures to try to get a ready
404        // connection the fastest, and to reduce connection churn.
405        //
406        // - If the pool has an idle connection waiting, that's used
407        //   immediately.
408        // - Otherwise, the Connector is asked to start connecting to
409        //   the destination Uri.
410        // - Meanwhile, the pool Checkout is watching to see if any other
411        //   request finishes and tries to insert an idle connection.
412        // - If a new connection is started, but the Checkout wins after
413        //   (an idle connection became available first), the started
414        //   connection future is spawned into the runtime to complete,
415        //   and then be inserted into the pool as an idle connection.
416        let checkout = self.pool.checkout(pool_key.clone());
417        let connect = self.connect_to(pool_key);
418        let is_ver_h2 = self.config.ver == Ver::Http2;
419
420        // The order of the `select` is depended on below...
421
422        match future::select(checkout, connect).await {
423            // Checkout won, connect future may have been started or not.
424            //
425            // If it has, let it finish and insert back into the pool,
426            // so as to not waste the socket...
427            Either::Left((Ok(checked_out), connecting)) => {
428                // This depends on the `select` above having the correct
429                // order, such that if the checkout future were ready
430                // immediately, the connect future will never have been
431                // started.
432                //
433                // If it *wasn't* ready yet, then the connect future will
434                // have been started...
435                if connecting.started() {
436                    let bg = connecting
437                        .map_err(|err| {
438                            trace!("background connect error: {}", err);
439                        })
440                        .map(|_pooled| {
441                            // dropping here should just place it in
442                            // the Pool for us...
443                        });
444                    // An execute error here isn't important, we're just trying
445                    // to prevent a waste of a socket...
446                    self.exec.execute(bg);
447                }
448                Ok(checked_out)
449            }
450            // Connect won, checkout can just be dropped.
451            Either::Right((Ok(connected), _checkout)) => Ok(connected),
452            // Either checkout or connect could get canceled:
453            //
454            // 1. Connect is canceled if this is HTTP/2 and there is
455            //    an outstanding HTTP/2 connecting task.
456            // 2. Checkout is canceled if the pool cannot deliver an
457            //    idle connection reliably.
458            //
459            // In both cases, we should just wait for the other future.
460            Either::Left((Err(err), connecting)) => {
461                if err.is_canceled() {
462                    connecting.await.map_err(ClientConnectError::Normal)
463                } else {
464                    Err(ClientConnectError::Normal(e!(Connect, err)))
465                }
466            }
467            Either::Right((Err(err), checkout)) => {
468                if err.is_canceled() {
469                    checkout.await.map_err(move |err| {
470                        if is_ver_h2 && err.is_canceled() {
471                            ClientConnectError::CheckoutIsClosed(err)
472                        } else {
473                            ClientConnectError::Normal(e!(Connect, err))
474                        }
475                    })
476                } else {
477                    Err(ClientConnectError::Normal(err))
478                }
479            }
480        }
481    }
482
483    #[cfg(any(feature = "http1", feature = "http2"))]
484    fn connect_to(
485        &self,
486        pool_key: PoolKey,
487    ) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
488    {
489        let executor = self.exec.clone();
490        let pool = self.pool.clone();
491        #[cfg(feature = "http1")]
492        let h1_builder = self.h1_builder.clone();
493        #[cfg(feature = "http2")]
494        let h2_builder = self.h2_builder.clone();
495        let ver = self.config.ver;
496        let is_ver_h2 = ver == Ver::Http2;
497        let connector = self.connector.clone();
498        let dst = domain_as_uri(pool_key.clone());
499        hyper_lazy(move || {
500            // Try to take a "connecting lock".
501            //
502            // If the pool_key is for HTTP/2, and there is already a
503            // connection being established, then this can't take a
504            // second lock. The "connect_to" future is Canceled.
505            let connecting = match pool.connecting(&pool_key, ver) {
506                Some(lock) => lock,
507                None => {
508                    let canceled = e!(Canceled);
509                    // TODO
510                    //crate::Error::new_canceled().with("HTTP/2 connection in progress");
511                    return Either::Right(future::err(canceled));
512                }
513            };
514            Either::Left(
515                connector
516                    .connect(super::connect::sealed::Internal, dst)
517                    .map_err(|src| e!(Connect, src))
518                    .and_then(move |io| {
519                        let connected = io.connected();
520                        // If ALPN is h2 and we aren't http2_only already,
521                        // then we need to convert our pool checkout into
522                        // a single HTTP2 one.
523                        let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
524                            match connecting.alpn_h2(&pool) {
525                                Some(lock) => {
526                                    trace!("ALPN negotiated h2, updating pool");
527                                    lock
528                                }
529                                None => {
530                                    // Another connection has already upgraded,
531                                    // the pool checkout should finish up for us.
532                                    let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
533                                    return Either::Right(future::err(canceled));
534                                }
535                            }
536                        } else {
537                            connecting
538                        };
539
540                        #[cfg_attr(not(feature = "http2"), allow(unused))]
541                        let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
542
543                        Either::Left(Box::pin(async move {
544                            let tx = if is_h2 {
545                                #[cfg(feature = "http2")] {
546                                    let (mut tx, conn) =
547                                        h2_builder.handshake(io).await.map_err(Error::tx)?;
548
549                                    trace!(
550                                        "http2 handshake complete, spawning background dispatcher task"
551                                    );
552                                    executor.execute(
553                                        conn.map_err(|e| debug!("client connection error: {}", e))
554                                            .map(|_| ()),
555                                    );
556
557                                    // Wait for 'conn' to ready up before we
558                                    // declare this tx as usable
559                                    tx.ready().await.map_err(Error::tx)?;
560                                    PoolTx::Http2(tx)
561                                }
562                                #[cfg(not(feature = "http2"))]
563                                panic!("http2 feature is not enabled");
564                            } else {
565                                #[cfg(feature = "http1")] {
566                                    // Perform the HTTP/1.1 handshake on the provided I/O stream.
567                                    // Uses the h1_builder to establish a connection, returning a sender (tx) for requests
568                                    // and a connection task (conn) that manages the connection lifecycle.
569                                    let (mut tx, conn) =
570                                        h1_builder.handshake(io).await.map_err(crate::client::legacy::client::Error::tx)?;
571
572                                    // Log that the HTTP/1.1 handshake has completed successfully.
573                                    // This indicates the connection is established and ready for request processing.
574                                    trace!(
575                                        "http1 handshake complete, spawning background dispatcher task"
576                                    );
577                                    // Create a oneshot channel to communicate errors from the connection task.
578                                    // err_tx sends errors from the connection task, and err_rx receives them
579                                    // to correlate connection failures with request readiness errors.
580                                    let (err_tx, err_rx) = tokio::sync::oneshot::channel();
581                                    // Spawn the connection task in the background using the executor.
582                                    // The task manages the HTTP/1.1 connection, including upgrades (e.g., WebSocket).
583                                    // Errors are sent via err_tx to ensure they can be checked if the sender (tx) fails.
584                                    executor.execute(
585                                        conn.with_upgrades()
586                                            .map_err(|e| {
587                                                // Log the connection error at debug level for diagnostic purposes.
588                                                debug!("client connection error: {:?}", e);
589                                                // Log that the error is being sent to the error channel.
590                                                trace!("sending connection error to error channel");
591                                                // Send the error via the oneshot channel, ignoring send failures
592                                                // (e.g., if the receiver is dropped, which is handled later).
593                                                let _ =err_tx.send(e);
594                                            })
595                                            .map(|_| ()),
596                                    );
597                                    // Log that the client is waiting for the connection to be ready.
598                                    // Readiness indicates the sender (tx) can accept a request without blocking.
599                                    trace!("waiting for connection to be ready");
600                                    // Check if the sender is ready to accept a request.
601                                    // This ensures the connection is fully established before proceeding.
602                                    // aka:
603                                    // Wait for 'conn' to ready up before we
604                                    // declare this tx as usable
605                                    match tx.ready().await {
606                                        // If ready, the connection is usable for sending requests.
607                                        Ok(_) => {
608                                            // Log that the connection is ready for use.
609                                            trace!("connection is ready");
610                                            // Drop the error receiver, as it’s no longer needed since the sender is ready.
611                                            // This prevents waiting for errors that won’t occur in a successful case.
612                                            drop(err_rx);
613                                            // Wrap the sender in PoolTx::Http1 for use in the connection pool.
614                                            PoolTx::Http1(tx)
615                                        }
616                                        // If the sender fails with a closed channel error, check for a specific connection error.
617                                        // This distinguishes between a vague ChannelClosed error and an actual connection failure.
618                                        Err(e) if e.is_closed() => {
619                                            // Log that the channel is closed, indicating a potential connection issue.
620                                            trace!("connection channel closed, checking for connection error");
621                                            // Check the oneshot channel for a specific error from the connection task.
622                                            match err_rx.await {
623                                                // If an error was received, it’s a specific connection failure.
624                                                Ok(err) => {
625                                                     // Log the specific connection error for diagnostics.
626                                                    trace!("received connection error: {:?}", err);
627                                                    // Return the error wrapped in Error::tx to propagate it.
628                                                    return Err(crate::client::legacy::client::Error::tx(err));
629                                                }
630                                                // If the error channel is closed, no specific error was sent.
631                                                // Fall back to the vague ChannelClosed error.
632                                                Err(_) => {
633                                                    // Log that the error channel is closed, indicating no specific error.
634                                                    trace!("error channel closed, returning the vague ChannelClosed error");
635                                                    // Return the original error wrapped in Error::tx.
636                                                    return Err(crate::client::legacy::client::Error::tx(e));
637                                                }
638                                            }
639                                        }
640                                        // For other errors (e.g., timeout, I/O issues), propagate them directly.
641                                        // These are not ChannelClosed errors and don’t require error channel checks.
642                                        Err(e) => {
643                                            // Log the specific readiness failure for diagnostics.
644                                            trace!("connection readiness failed: {:?}", e);
645                                            // Return the error wrapped in Error::tx to propagate it.
646                                            return Err(crate::client::legacy::client::Error::tx(e));
647                                        }
648                                    }
649                                }
650                                #[cfg(not(feature = "http1"))] {
651                                    panic!("http1 feature is not enabled");
652                                }
653                            };
654
655                            Ok(pool.pooled(
656                                connecting,
657                                PoolClient {
658                                    conn_info: connected,
659                                    tx,
660                                },
661                            ))
662                        }))
663                    }),
664            )
665        })
666    }
667}
668
669impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
670where
671    C: Connect + Clone + Send + Sync + 'static,
672    B: Body + Send + 'static + Unpin,
673    B::Data: Send,
674    B::Error: Into<Box<dyn StdError + Send + Sync>>,
675{
676    type Response = Response<hyper::body::Incoming>;
677    type Error = Error;
678    type Future = ResponseFuture;
679
680    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
681        Poll::Ready(Ok(()))
682    }
683
684    fn call(&mut self, req: Request<B>) -> Self::Future {
685        self.request(req)
686    }
687}
688
689impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
690where
691    C: Connect + Clone + Send + Sync + 'static,
692    B: Body + Send + 'static + Unpin,
693    B::Data: Send,
694    B::Error: Into<Box<dyn StdError + Send + Sync>>,
695{
696    type Response = Response<hyper::body::Incoming>;
697    type Error = Error;
698    type Future = ResponseFuture;
699
700    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
701        Poll::Ready(Ok(()))
702    }
703
704    fn call(&mut self, req: Request<B>) -> Self::Future {
705        self.request(req)
706    }
707}
708
709impl<C: Clone, B> Clone for Client<C, B> {
710    fn clone(&self) -> Client<C, B> {
711        Client {
712            config: self.config,
713            exec: self.exec.clone(),
714            #[cfg(feature = "http1")]
715            h1_builder: self.h1_builder.clone(),
716            #[cfg(feature = "http2")]
717            h2_builder: self.h2_builder.clone(),
718            connector: self.connector.clone(),
719            pool: self.pool.clone(),
720        }
721    }
722}
723
724impl<C, B> fmt::Debug for Client<C, B> {
725    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
726        f.debug_struct("Client").finish()
727    }
728}
729
730// ===== impl ResponseFuture =====
731
732impl ResponseFuture {
733    fn new<F>(value: F) -> Self
734    where
735        F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
736    {
737        Self {
738            inner: SyncWrapper::new(Box::pin(value)),
739        }
740    }
741
742    fn error_version(ver: Version) -> Self {
743        warn!("Request has unsupported version \"{:?}\"", ver);
744        ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion))))
745    }
746}
747
748impl fmt::Debug for ResponseFuture {
749    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
750        f.pad("Future<Response>")
751    }
752}
753
754impl Future for ResponseFuture {
755    type Output = Result<Response<hyper::body::Incoming>, Error>;
756
757    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
758        self.inner.get_mut().as_mut().poll(cx)
759    }
760}
761
762// ===== impl PoolClient =====
763
764// FIXME: allow() required due to `impl Trait` leaking types to this lint
765#[allow(missing_debug_implementations)]
766struct PoolClient<B> {
767    conn_info: Connected,
768    tx: PoolTx<B>,
769}
770
771enum PoolTx<B> {
772    #[cfg(feature = "http1")]
773    Http1(hyper::client::conn::http1::SendRequest<B>),
774    #[cfg(feature = "http2")]
775    Http2(hyper::client::conn::http2::SendRequest<B>),
776}
777
778impl<B> PoolClient<B> {
779    fn poll_ready(
780        &mut self,
781        #[allow(unused_variables)] cx: &mut task::Context<'_>,
782    ) -> Poll<Result<(), Error>> {
783        match self.tx {
784            #[cfg(feature = "http1")]
785            PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
786            #[cfg(feature = "http2")]
787            PoolTx::Http2(_) => Poll::Ready(Ok(())),
788        }
789    }
790
791    fn is_http1(&self) -> bool {
792        !self.is_http2()
793    }
794
795    fn is_http2(&self) -> bool {
796        match self.tx {
797            #[cfg(feature = "http1")]
798            PoolTx::Http1(_) => false,
799            #[cfg(feature = "http2")]
800            PoolTx::Http2(_) => true,
801        }
802    }
803
804    fn is_poisoned(&self) -> bool {
805        self.conn_info.poisoned.poisoned()
806    }
807
808    fn is_ready(&self) -> bool {
809        match self.tx {
810            #[cfg(feature = "http1")]
811            PoolTx::Http1(ref tx) => tx.is_ready(),
812            #[cfg(feature = "http2")]
813            PoolTx::Http2(ref tx) => tx.is_ready(),
814        }
815    }
816}
817
818impl<B: Body + 'static> PoolClient<B> {
819    fn try_send_request(
820        &mut self,
821        req: Request<B>,
822    ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
823    where
824        B: Send,
825    {
826        #[cfg(all(feature = "http1", feature = "http2"))]
827        return match self.tx {
828            #[cfg(feature = "http1")]
829            PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
830            #[cfg(feature = "http2")]
831            PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
832        };
833
834        #[cfg(feature = "http1")]
835        #[cfg(not(feature = "http2"))]
836        return match self.tx {
837            #[cfg(feature = "http1")]
838            PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
839        };
840
841        #[cfg(not(feature = "http1"))]
842        #[cfg(feature = "http2")]
843        return match self.tx {
844            #[cfg(feature = "http2")]
845            PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
846        };
847    }
848}
849
850impl<B> pool::Poolable for PoolClient<B>
851where
852    B: Send + 'static,
853{
854    fn is_open(&self) -> bool {
855        !self.is_poisoned() && self.is_ready()
856    }
857
858    fn reserve(self) -> pool::Reservation<Self> {
859        match self.tx {
860            #[cfg(feature = "http1")]
861            PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
862                conn_info: self.conn_info,
863                tx: PoolTx::Http1(tx),
864            }),
865            #[cfg(feature = "http2")]
866            PoolTx::Http2(tx) => {
867                let b = PoolClient {
868                    conn_info: self.conn_info.clone(),
869                    tx: PoolTx::Http2(tx.clone()),
870                };
871                let a = PoolClient {
872                    conn_info: self.conn_info,
873                    tx: PoolTx::Http2(tx),
874                };
875                pool::Reservation::Shared(a, b)
876            }
877        }
878    }
879
880    fn can_share(&self) -> bool {
881        self.is_http2()
882    }
883}
884
885enum ClientConnectError {
886    Normal(Error),
887    CheckoutIsClosed(pool::Error),
888}
889
890fn origin_form(uri: &mut Uri) {
891    let path = match uri.path_and_query() {
892        Some(path) if path.as_str() != "/" => {
893            let mut parts = ::http::uri::Parts::default();
894            parts.path_and_query = Some(path.clone());
895            Uri::from_parts(parts).expect("path is valid uri")
896        }
897        _none_or_just_slash => {
898            debug_assert!(Uri::default() == "/");
899            Uri::default()
900        }
901    };
902    *uri = path
903}
904
905fn absolute_form(uri: &mut Uri) {
906    debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
907    debug_assert!(
908        uri.authority().is_some(),
909        "absolute_form needs an authority"
910    );
911}
912
913fn authority_form(uri: &mut Uri) {
914    if let Some(path) = uri.path_and_query() {
915        // `https://hyper.rs` would parse with `/` path, don't
916        // annoy people about that...
917        if path != "/" {
918            warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
919        }
920    }
921    *uri = match uri.authority() {
922        Some(auth) => {
923            let mut parts = ::http::uri::Parts::default();
924            parts.authority = Some(auth.clone());
925            Uri::from_parts(parts).expect("authority is valid")
926        }
927        None => {
928            unreachable!("authority_form with relative uri");
929        }
930    };
931}
932
933fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
934    let uri_clone = uri.clone();
935    match (uri_clone.scheme(), uri_clone.authority()) {
936        (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
937        (None, Some(auth)) if is_http_connect => {
938            let scheme = match auth.port_u16() {
939                Some(443) => {
940                    set_scheme(uri, Scheme::HTTPS);
941                    Scheme::HTTPS
942                }
943                _ => {
944                    set_scheme(uri, Scheme::HTTP);
945                    Scheme::HTTP
946                }
947            };
948            Ok((scheme, auth.clone()))
949        }
950        _ => {
951            debug!("Client requires absolute-form URIs, received: {:?}", uri);
952            Err(e!(UserAbsoluteUriRequired))
953        }
954    }
955}
956
957fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
958    http::uri::Builder::new()
959        .scheme(scheme)
960        .authority(auth)
961        .path_and_query("/")
962        .build()
963        .expect("domain is valid Uri")
964}
965
966fn set_scheme(uri: &mut Uri, scheme: Scheme) {
967    debug_assert!(
968        uri.scheme().is_none(),
969        "set_scheme expects no existing scheme"
970    );
971    let old = std::mem::take(uri);
972    let mut parts: ::http::uri::Parts = old.into();
973    parts.scheme = Some(scheme);
974    parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
975    *uri = Uri::from_parts(parts).expect("scheme is valid");
976}
977
978fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
979    match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
980        (Some(443), true) => None,
981        (Some(80), false) => None,
982        _ => uri.port(),
983    }
984}
985
986fn is_schema_secure(uri: &Uri) -> bool {
987    uri.scheme_str()
988        .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
989        .unwrap_or_default()
990}
991
992/// A builder to configure a new [`Client`](Client).
993///
994/// # Example
995///
996/// ```
997/// # #[cfg(feature = "tokio")]
998/// # fn run () {
999/// use std::time::Duration;
1000/// use hyper_util::client::legacy::Client;
1001/// use hyper_util::rt::TokioExecutor;
1002///
1003/// let client = Client::builder(TokioExecutor::new())
1004///     .pool_idle_timeout(Duration::from_secs(30))
1005///     .http2_only(true)
1006///     .build_http();
1007/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1008/// # drop(infer);
1009/// # }
1010/// # fn main() {}
1011/// ```
1012#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
1013#[derive(Clone)]
1014pub struct Builder {
1015    client_config: Config,
1016    exec: Exec,
1017    #[cfg(feature = "http1")]
1018    h1_builder: hyper::client::conn::http1::Builder,
1019    #[cfg(feature = "http2")]
1020    h2_builder: hyper::client::conn::http2::Builder<Exec>,
1021    pool_config: pool::Config,
1022    pool_timer: Option<timer::Timer>,
1023}
1024
1025impl Builder {
1026    /// Construct a new Builder.
1027    pub fn new<E>(executor: E) -> Self
1028    where
1029        E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
1030    {
1031        let exec = Exec::new(executor);
1032        Self {
1033            client_config: Config {
1034                retry_canceled_requests: true,
1035                set_host: true,
1036                ver: Ver::Auto,
1037            },
1038            exec: exec.clone(),
1039            #[cfg(feature = "http1")]
1040            h1_builder: hyper::client::conn::http1::Builder::new(),
1041            #[cfg(feature = "http2")]
1042            h2_builder: hyper::client::conn::http2::Builder::new(exec),
1043            pool_config: pool::Config {
1044                idle_timeout: Some(Duration::from_secs(90)),
1045                max_idle_per_host: usize::MAX,
1046            },
1047            pool_timer: None,
1048        }
1049    }
1050    /// Set an optional timeout for idle sockets being kept-alive.
1051    /// A `Timer` is required for this to take effect. See `Builder::pool_timer`
1052    ///
1053    /// Pass `None` to disable timeout.
1054    ///
1055    /// Default is 90 seconds.
1056    ///
1057    /// # Example
1058    ///
1059    /// ```
1060    /// # #[cfg(feature = "tokio")]
1061    /// # fn run () {
1062    /// use std::time::Duration;
1063    /// use hyper_util::client::legacy::Client;
1064    /// use hyper_util::rt::{TokioExecutor, TokioTimer};
1065    ///
1066    /// let client = Client::builder(TokioExecutor::new())
1067    ///     .pool_idle_timeout(Duration::from_secs(30))
1068    ///     .pool_timer(TokioTimer::new())
1069    ///     .build_http();
1070    ///
1071    /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1072    /// # }
1073    /// # fn main() {}
1074    /// ```
1075    pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
1076    where
1077        D: Into<Option<Duration>>,
1078    {
1079        self.pool_config.idle_timeout = val.into();
1080        self
1081    }
1082
1083    #[doc(hidden)]
1084    #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
1085    pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1086        self.pool_config.max_idle_per_host = max_idle;
1087        self
1088    }
1089
1090    /// Sets the maximum idle connection per host allowed in the pool.
1091    ///
1092    /// Default is `usize::MAX` (no limit).
1093    pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1094        self.pool_config.max_idle_per_host = max_idle;
1095        self
1096    }
1097
1098    // HTTP/1 options
1099
1100    /// Sets the exact size of the read buffer to *always* use.
1101    ///
1102    /// Note that setting this option unsets the `http1_max_buf_size` option.
1103    ///
1104    /// Default is an adaptive read buffer.
1105    #[cfg(feature = "http1")]
1106    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1107    pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1108        self.h1_builder.read_buf_exact_size(Some(sz));
1109        self
1110    }
1111
1112    /// Set the maximum buffer size for the connection.
1113    ///
1114    /// Default is ~400kb.
1115    ///
1116    /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1117    ///
1118    /// # Panics
1119    ///
1120    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1121    #[cfg(feature = "http1")]
1122    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1123    pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1124        self.h1_builder.max_buf_size(max);
1125        self
1126    }
1127
1128    /// Set whether HTTP/1 connections will accept spaces between header names
1129    /// and the colon that follow them in responses.
1130    ///
1131    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1132    /// parsing.
1133    ///
1134    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1135    /// to say about it:
1136    ///
1137    /// > No whitespace is allowed between the header field-name and colon. In
1138    /// > the past, differences in the handling of such whitespace have led to
1139    /// > security vulnerabilities in request routing and response handling. A
1140    /// > server MUST reject any received request message that contains
1141    /// > whitespace between a header field-name and colon with a response code
1142    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1143    /// > response message before forwarding the message downstream.
1144    ///
1145    /// Note that this setting does not affect HTTP/2.
1146    ///
1147    /// Default is false.
1148    ///
1149    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1150    #[cfg(feature = "http1")]
1151    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1152    pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1153        self.h1_builder
1154            .allow_spaces_after_header_name_in_responses(val);
1155        self
1156    }
1157
1158    /// Set whether HTTP/1 connections will accept obsolete line folding for
1159    /// header values.
1160    ///
1161    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1162    /// to say about it:
1163    ///
1164    /// > A server that receives an obs-fold in a request message that is not
1165    /// > within a message/http container MUST either reject the message by
1166    /// > sending a 400 (Bad Request), preferably with a representation
1167    /// > explaining that obsolete line folding is unacceptable, or replace
1168    /// > each received obs-fold with one or more SP octets prior to
1169    /// > interpreting the field value or forwarding the message downstream.
1170    ///
1171    /// > A proxy or gateway that receives an obs-fold in a response message
1172    /// > that is not within a message/http container MUST either discard the
1173    /// > message and replace it with a 502 (Bad Gateway) response, preferably
1174    /// > with a representation explaining that unacceptable line folding was
1175    /// > received, or replace each received obs-fold with one or more SP
1176    /// > octets prior to interpreting the field value or forwarding the
1177    /// > message downstream.
1178    ///
1179    /// > A user agent that receives an obs-fold in a response message that is
1180    /// > not within a message/http container MUST replace each received
1181    /// > obs-fold with one or more SP octets prior to interpreting the field
1182    /// > value.
1183    ///
1184    /// Note that this setting does not affect HTTP/2.
1185    ///
1186    /// Default is false.
1187    ///
1188    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1189    #[cfg(feature = "http1")]
1190    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1191    pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1192        self.h1_builder
1193            .allow_obsolete_multiline_headers_in_responses(val);
1194        self
1195    }
1196
1197    /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1198    ///
1199    /// This mimics the behaviour of major browsers. You probably don't want this.
1200    /// You should only want this if you are implementing a proxy whose main
1201    /// purpose is to sit in front of browsers whose users access arbitrary content
1202    /// which may be malformed, and they expect everything that works without
1203    /// the proxy to keep working with the proxy.
1204    ///
1205    /// This option will prevent Hyper's client from returning an error encountered
1206    /// when parsing a header, except if the error was caused by the character NUL
1207    /// (ASCII code 0), as Chrome specifically always reject those.
1208    ///
1209    /// The ignorable errors are:
1210    /// * empty header names;
1211    /// * characters that are not allowed in header names, except for `\0` and `\r`;
1212    /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1213    ///   spaces and tabs between the header name and the colon;
1214    /// * missing colon between header name and colon;
1215    /// * characters that are not allowed in header values except for `\0` and `\r`.
1216    ///
1217    /// If an ignorable error is encountered, the parser tries to find the next
1218    /// line in the input to resume parsing the rest of the headers. An error
1219    /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1220    /// looking for the next line.
1221    #[cfg(feature = "http1")]
1222    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1223    pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1224        self.h1_builder.ignore_invalid_headers_in_responses(val);
1225        self
1226    }
1227
1228    /// Set whether HTTP/1 connections should try to use vectored writes,
1229    /// or always flatten into a single buffer.
1230    ///
1231    /// Note that setting this to false may mean more copies of body data,
1232    /// but may also improve performance when an IO transport doesn't
1233    /// support vectored writes well, such as most TLS implementations.
1234    ///
1235    /// Setting this to true will force hyper to use queued strategy
1236    /// which may eliminate unnecessary cloning on some TLS backends
1237    ///
1238    /// Default is `auto`. In this mode hyper will try to guess which
1239    /// mode to use
1240    #[cfg(feature = "http1")]
1241    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1242    pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1243        self.h1_builder.writev(enabled);
1244        self
1245    }
1246
1247    /// Set whether HTTP/1 connections will write header names as title case at
1248    /// the socket level.
1249    ///
1250    /// Note that this setting does not affect HTTP/2.
1251    ///
1252    /// Default is false.
1253    #[cfg(feature = "http1")]
1254    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1255    pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1256        self.h1_builder.title_case_headers(val);
1257        self
1258    }
1259
1260    /// Set whether to support preserving original header cases.
1261    ///
1262    /// Currently, this will record the original cases received, and store them
1263    /// in a private extension on the `Response`. It will also look for and use
1264    /// such an extension in any provided `Request`.
1265    ///
1266    /// Since the relevant extension is still private, there is no way to
1267    /// interact with the original cases. The only effect this can have now is
1268    /// to forward the cases in a proxy-like fashion.
1269    ///
1270    /// Note that this setting does not affect HTTP/2.
1271    ///
1272    /// Default is false.
1273    #[cfg(feature = "http1")]
1274    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1275    pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1276        self.h1_builder.preserve_header_case(val);
1277        self
1278    }
1279
1280    /// Set the maximum number of headers.
1281    ///
1282    /// When a response is received, the parser will reserve a buffer to store headers for optimal
1283    /// performance.
1284    ///
1285    /// If client receives more headers than the buffer size, the error "message header too large"
1286    /// is returned.
1287    ///
1288    /// The headers is allocated on the stack by default, which has higher performance. After
1289    /// setting this value, headers will be allocated in heap memory, that is, heap memory
1290    /// allocation will occur for each response, and there will be a performance drop of about 5%.
1291    ///
1292    /// Note that this setting does not affect HTTP/2.
1293    ///
1294    /// Default is 100.
1295    #[cfg(feature = "http1")]
1296    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1297    pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
1298        self.h1_builder.max_headers(val);
1299        self
1300    }
1301
1302    /// Set whether HTTP/0.9 responses should be tolerated.
1303    ///
1304    /// Default is false.
1305    #[cfg(feature = "http1")]
1306    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1307    pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1308        self.h1_builder.http09_responses(val);
1309        self
1310    }
1311
1312    /// Set whether the connection **must** use HTTP/2.
1313    ///
1314    /// The destination must either allow HTTP2 Prior Knowledge, or the
1315    /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1316    /// as part of the connection process. This will not make the `Client`
1317    /// utilize ALPN by itself.
1318    ///
1319    /// Note that setting this to true prevents HTTP/1 from being allowed.
1320    ///
1321    /// Default is false.
1322    #[cfg(feature = "http2")]
1323    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1324    pub fn http2_only(&mut self, val: bool) -> &mut Self {
1325        self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1326        self
1327    }
1328
1329    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
1330    ///
1331    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
1332    /// As of v0.4.0, it is 20.
1333    ///
1334    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
1335    #[cfg(feature = "http2")]
1336    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1337    pub fn http2_max_pending_accept_reset_streams(
1338        &mut self,
1339        max: impl Into<Option<usize>>,
1340    ) -> &mut Self {
1341        self.h2_builder.max_pending_accept_reset_streams(max.into());
1342        self
1343    }
1344
1345    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1346    /// stream-level flow control.
1347    ///
1348    /// Passing `None` will do nothing.
1349    ///
1350    /// If not set, hyper will use a default.
1351    ///
1352    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1353    #[cfg(feature = "http2")]
1354    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1355    pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1356        self.h2_builder.initial_stream_window_size(sz.into());
1357        self
1358    }
1359
1360    /// Sets the max connection-level flow control for HTTP2
1361    ///
1362    /// Passing `None` will do nothing.
1363    ///
1364    /// If not set, hyper will use a default.
1365    #[cfg(feature = "http2")]
1366    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1367    pub fn http2_initial_connection_window_size(
1368        &mut self,
1369        sz: impl Into<Option<u32>>,
1370    ) -> &mut Self {
1371        self.h2_builder.initial_connection_window_size(sz.into());
1372        self
1373    }
1374
1375    /// Sets the initial maximum of locally initiated (send) streams.
1376    ///
1377    /// This value will be overwritten by the value included in the initial
1378    /// SETTINGS frame received from the peer as part of a [connection preface].
1379    ///
1380    /// Passing `None` will do nothing.
1381    ///
1382    /// If not set, hyper will use a default.
1383    ///
1384    /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
1385    #[cfg(feature = "http2")]
1386    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1387    pub fn http2_initial_max_send_streams(
1388        &mut self,
1389        initial: impl Into<Option<usize>>,
1390    ) -> &mut Self {
1391        self.h2_builder.initial_max_send_streams(initial);
1392        self
1393    }
1394
1395    /// Sets whether to use an adaptive flow control.
1396    ///
1397    /// Enabling this will override the limits set in
1398    /// `http2_initial_stream_window_size` and
1399    /// `http2_initial_connection_window_size`.
1400    #[cfg(feature = "http2")]
1401    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1402    pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1403        self.h2_builder.adaptive_window(enabled);
1404        self
1405    }
1406
1407    /// Sets the maximum frame size to use for HTTP2.
1408    ///
1409    /// Passing `None` will do nothing.
1410    ///
1411    /// If not set, hyper will use a default.
1412    #[cfg(feature = "http2")]
1413    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1414    pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1415        self.h2_builder.max_frame_size(sz);
1416        self
1417    }
1418
1419    /// Sets the max size of received header frames for HTTP2.
1420    ///
1421    /// Default is currently 16KB, but can change.
1422    #[cfg(feature = "http2")]
1423    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1424    pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
1425        self.h2_builder.max_header_list_size(max);
1426        self
1427    }
1428
1429    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1430    /// connection alive.
1431    ///
1432    /// Pass `None` to disable HTTP2 keep-alive.
1433    ///
1434    /// Default is currently disabled.
1435    ///
1436    /// # Cargo Feature
1437    ///
1438    /// Requires the `tokio` cargo feature to be enabled.
1439    #[cfg(feature = "tokio")]
1440    #[cfg(feature = "http2")]
1441    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1442    pub fn http2_keep_alive_interval(
1443        &mut self,
1444        interval: impl Into<Option<Duration>>,
1445    ) -> &mut Self {
1446        self.h2_builder.keep_alive_interval(interval);
1447        self
1448    }
1449
1450    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1451    ///
1452    /// If the ping is not acknowledged within the timeout, the connection will
1453    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1454    ///
1455    /// Default is 20 seconds.
1456    ///
1457    /// # Cargo Feature
1458    ///
1459    /// Requires the `tokio` cargo feature to be enabled.
1460    #[cfg(feature = "tokio")]
1461    #[cfg(feature = "http2")]
1462    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1463    pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1464        self.h2_builder.keep_alive_timeout(timeout);
1465        self
1466    }
1467
1468    /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1469    ///
1470    /// If disabled, keep-alive pings are only sent while there are open
1471    /// request/responses streams. If enabled, pings are also sent when no
1472    /// streams are active. Does nothing if `http2_keep_alive_interval` is
1473    /// disabled.
1474    ///
1475    /// Default is `false`.
1476    ///
1477    /// # Cargo Feature
1478    ///
1479    /// Requires the `tokio` cargo feature to be enabled.
1480    #[cfg(feature = "tokio")]
1481    #[cfg(feature = "http2")]
1482    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1483    pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1484        self.h2_builder.keep_alive_while_idle(enabled);
1485        self
1486    }
1487
1488    /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1489    ///
1490    /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1491    /// details.
1492    ///
1493    /// The default value is determined by the `h2` crate.
1494    ///
1495    /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1496    #[cfg(feature = "http2")]
1497    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1498    pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1499        self.h2_builder.max_concurrent_reset_streams(max);
1500        self
1501    }
1502
1503    /// Provide a timer to be used for h2
1504    ///
1505    /// See the documentation of [`h2::client::Builder::timer`] for more
1506    /// details.
1507    ///
1508    /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
1509    pub fn timer<M>(&mut self, timer: M) -> &mut Self
1510    where
1511        M: Timer + Send + Sync + 'static,
1512    {
1513        #[cfg(feature = "http2")]
1514        self.h2_builder.timer(timer);
1515        self
1516    }
1517
1518    /// Provide a timer to be used for timeouts and intervals in connection pools.
1519    pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
1520    where
1521        M: Timer + Clone + Send + Sync + 'static,
1522    {
1523        self.pool_timer = Some(timer::Timer::new(timer.clone()));
1524        self
1525    }
1526
1527    /// Set the maximum write buffer size for each HTTP/2 stream.
1528    ///
1529    /// Default is currently 1MB, but may change.
1530    ///
1531    /// # Panics
1532    ///
1533    /// The value must be no larger than `u32::MAX`.
1534    #[cfg(feature = "http2")]
1535    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1536    pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1537        self.h2_builder.max_send_buf_size(max);
1538        self
1539    }
1540
1541    /// Set whether to retry requests that get disrupted before ever starting
1542    /// to write.
1543    ///
1544    /// This means a request that is queued, and gets given an idle, reused
1545    /// connection, and then encounters an error immediately as the idle
1546    /// connection was found to be unusable.
1547    ///
1548    /// When this is set to `false`, the related `ResponseFuture` would instead
1549    /// resolve to an `Error::Cancel`.
1550    ///
1551    /// Default is `true`.
1552    #[inline]
1553    pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1554        self.client_config.retry_canceled_requests = val;
1555        self
1556    }
1557
1558    /// Set whether to automatically add the `Host` header to requests.
1559    ///
1560    /// If true, and a request does not include a `Host` header, one will be
1561    /// added automatically, derived from the authority of the `Uri`.
1562    ///
1563    /// Default is `true`.
1564    #[inline]
1565    pub fn set_host(&mut self, val: bool) -> &mut Self {
1566        self.client_config.set_host = val;
1567        self
1568    }
1569
1570    /// Build a client with this configuration and the default `HttpConnector`.
1571    #[cfg(feature = "tokio")]
1572    pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1573    where
1574        B: Body + Send,
1575        B::Data: Send,
1576    {
1577        let mut connector = HttpConnector::new();
1578        if self.pool_config.is_enabled() {
1579            connector.set_keepalive(self.pool_config.idle_timeout);
1580        }
1581        self.build(connector)
1582    }
1583
1584    /// Combine the configuration of this builder with a connector to create a `Client`.
1585    pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1586    where
1587        C: Connect + Clone,
1588        B: Body + Send,
1589        B::Data: Send,
1590    {
1591        let exec = self.exec.clone();
1592        let timer = self.pool_timer.clone();
1593        Client {
1594            config: self.client_config,
1595            exec: exec.clone(),
1596            #[cfg(feature = "http1")]
1597            h1_builder: self.h1_builder.clone(),
1598            #[cfg(feature = "http2")]
1599            h2_builder: self.h2_builder.clone(),
1600            connector,
1601            pool: pool::Pool::new(self.pool_config, exec, timer),
1602        }
1603    }
1604}
1605
1606impl fmt::Debug for Builder {
1607    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1608        f.debug_struct("Builder")
1609            .field("client_config", &self.client_config)
1610            .field("pool_config", &self.pool_config)
1611            .finish()
1612    }
1613}
1614
1615// ==== impl Error ====
1616
1617impl fmt::Debug for Error {
1618    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1619        let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
1620        f.field(&self.kind);
1621        if let Some(ref cause) = self.source {
1622            f.field(cause);
1623        }
1624        f.finish()
1625    }
1626}
1627
1628impl fmt::Display for Error {
1629    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1630        write!(f, "client error ({:?})", self.kind)
1631    }
1632}
1633
1634impl StdError for Error {
1635    fn source(&self) -> Option<&(dyn StdError + 'static)> {
1636        self.source.as_ref().map(|e| &**e as _)
1637    }
1638}
1639
1640impl Error {
1641    /// Returns true if this was an error from `Connect`.
1642    pub fn is_connect(&self) -> bool {
1643        matches!(self.kind, ErrorKind::Connect)
1644    }
1645
1646    /// Returns the info of the client connection on which this error occurred.
1647    #[cfg(any(feature = "http1", feature = "http2"))]
1648    pub fn connect_info(&self) -> Option<&Connected> {
1649        self.connect_info.as_ref()
1650    }
1651
1652    #[cfg(any(feature = "http1", feature = "http2"))]
1653    fn with_connect_info(self, connect_info: Connected) -> Self {
1654        Self {
1655            connect_info: Some(connect_info),
1656            ..self
1657        }
1658    }
1659    fn is_canceled(&self) -> bool {
1660        matches!(self.kind, ErrorKind::Canceled)
1661    }
1662
1663    fn tx(src: hyper::Error) -> Self {
1664        e!(SendRequest, src)
1665    }
1666
1667    fn closed(src: hyper::Error) -> Self {
1668        e!(ChannelClosed, src)
1669    }
1670}