hyper_util_wasm/client/legacy/
client.rs

1//! The legacy HTTP Client from 0.14.x
2//!
3//! This `Client` will eventually be deconstructed into more composable parts.
4//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
5//! in much the same way it did in hyper 0.14.
6
7use std::error::Error as StdError;
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{self, Poll};
12use std::time::Duration;
13
14use futures_util::future::{self, Either, FutureExt, TryFutureExt};
15use http::uri::Scheme;
16use hyper::client::conn::TrySendError as ConnTrySendError;
17use hyper::header::{HeaderValue, HOST};
18use hyper::rt::Timer;
19use hyper::{body::Body, Method, Request, Response, Uri, Version};
20use tracing::{debug, trace, warn};
21
22#[cfg(feature = "tokio")]
23use super::connect::HttpConnector;
24use super::connect::{Alpn, Connect, Connected, Connection};
25use super::pool::{self, Ver};
26
27use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
28
29type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
30
31/// A Client to make outgoing HTTP requests.
32///
33/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
34/// underlying connection pool will be reused.
35#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
36pub struct Client<C, B> {
37    config: Config,
38    connector: C,
39    exec: Exec,
40    #[cfg(feature = "http1")]
41    h1_builder: hyper::client::conn::http1::Builder,
42    #[cfg(feature = "http2")]
43    h2_builder: hyper::client::conn::http2::Builder<Exec>,
44    pool: pool::Pool<PoolClient<B>, PoolKey>,
45}
46
47#[derive(Clone, Copy, Debug)]
48struct Config {
49    retry_canceled_requests: bool,
50    set_host: bool,
51    ver: Ver,
52}
53
54/// Client errors
55pub struct Error {
56    kind: ErrorKind,
57    source: Option<Box<dyn StdError + Send + Sync>>,
58}
59
60#[derive(Debug)]
61enum ErrorKind {
62    Canceled,
63    ChannelClosed,
64    Connect,
65    UserUnsupportedRequestMethod,
66    UserUnsupportedVersion,
67    UserAbsoluteUriRequired,
68    SendRequest,
69}
70
71macro_rules! e {
72    ($kind:ident) => {
73        Error {
74            kind: ErrorKind::$kind,
75            source: None,
76        }
77    };
78    ($kind:ident, $src:expr) => {
79        Error {
80            kind: ErrorKind::$kind,
81            source: Some($src.into()),
82        }
83    };
84}
85
86// We might change this... :shrug:
87type PoolKey = (http::uri::Scheme, http::uri::Authority);
88
89enum TrySendError<B> {
90    Retryable { error: Error, req: Request<B> },
91    Nope(Error),
92}
93
94/// A `Future` that will resolve to an HTTP Response.
95///
96/// This is returned by `Client::request` (and `Client::get`).
97#[must_use = "futures do nothing unless polled"]
98pub struct ResponseFuture {
99    inner: SyncWrapper<
100        Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
101    >,
102}
103
104// ===== impl Client =====
105
106impl Client<(), ()> {
107    /// Create a builder to configure a new `Client`.
108    ///
109    /// # Example
110    ///
111    /// ```
112    /// # #[cfg(feature = "tokio")]
113    /// # fn run () {
114    /// use std::time::Duration;
115    /// use hyper_util::client::legacy::Client;
116    /// use hyper_util::rt::TokioExecutor;
117    ///
118    /// let client = Client::builder(TokioExecutor::new())
119    ///     .pool_idle_timeout(Duration::from_secs(30))
120    ///     .http2_only(true)
121    ///     .build_http();
122    /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
123    /// # drop(infer);
124    /// # }
125    /// # fn main() {}
126    /// ```
127    pub fn builder<E>(executor: E) -> Builder
128    where
129        E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
130    {
131        Builder::new(executor)
132    }
133}
134
135impl<C, B> Client<C, B>
136where
137    C: Connect + Clone + Send + Sync + 'static,
138    B: Body + Send + 'static + Unpin,
139    B::Data: Send,
140    B::Error: Into<Box<dyn StdError + Send + Sync>>,
141{
142    /// Send a `GET` request to the supplied `Uri`.
143    ///
144    /// # Note
145    ///
146    /// This requires that the `Body` type have a `Default` implementation.
147    /// It *should* return an "empty" version of itself, such that
148    /// `Body::is_end_stream` is `true`.
149    ///
150    /// # Example
151    ///
152    /// ```
153    /// # #[cfg(feature = "tokio")]
154    /// # fn run () {
155    /// use hyper::Uri;
156    /// use hyper_util::client::legacy::Client;
157    /// use hyper_util::rt::TokioExecutor;
158    /// use bytes::Bytes;
159    /// use http_body_util::Full;
160    ///
161    /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
162    ///
163    /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
164    /// # }
165    /// # fn main() {}
166    /// ```
167    pub fn get(&self, uri: Uri) -> ResponseFuture
168    where
169        B: Default,
170    {
171        let body = B::default();
172        if !body.is_end_stream() {
173            warn!("default Body used for get() does not return true for is_end_stream");
174        }
175
176        let mut req = Request::new(body);
177        *req.uri_mut() = uri;
178        self.request(req)
179    }
180
181    /// Send a constructed `Request` using this `Client`.
182    ///
183    /// # Example
184    ///
185    /// ```
186    /// # #[cfg(feature = "tokio")]
187    /// # fn run () {
188    /// use hyper::{Method, Request};
189    /// use hyper_util::client::legacy::Client;
190    /// use http_body_util::Full;
191    /// use hyper_util::rt::TokioExecutor;
192    /// use bytes::Bytes;
193    ///
194    /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
195    ///
196    /// let req: Request<Full<Bytes>> = Request::builder()
197    ///     .method(Method::POST)
198    ///     .uri("http://httpbin.org/post")
199    ///     .body(Full::from("Hallo!"))
200    ///     .expect("request builder");
201    ///
202    /// let future = client.request(req);
203    /// # }
204    /// # fn main() {}
205    /// ```
206    pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
207        let is_http_connect = req.method() == Method::CONNECT;
208        match req.version() {
209            Version::HTTP_11 => (),
210            Version::HTTP_10 => {
211                if is_http_connect {
212                    warn!("CONNECT is not allowed for HTTP/1.0");
213                    return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod)));
214                }
215            }
216            Version::HTTP_2 => (),
217            // completely unsupported HTTP version (like HTTP/0.9)!
218            other => return ResponseFuture::error_version(other),
219        };
220
221        let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
222            Ok(s) => s,
223            Err(err) => {
224                return ResponseFuture::new(future::err(err));
225            }
226        };
227
228        ResponseFuture::new(self.clone().send_request(req, pool_key))
229    }
230
231    async fn send_request(
232        self,
233        mut req: Request<B>,
234        pool_key: PoolKey,
235    ) -> Result<Response<hyper::body::Incoming>, Error> {
236        let uri = req.uri().clone();
237
238        loop {
239            req = match self.try_send_request(req, pool_key.clone()).await {
240                Ok(resp) => return Ok(resp),
241                Err(TrySendError::Nope(err)) => return Err(err),
242                Err(TrySendError::Retryable { mut req, error }) => {
243                    if !self.config.retry_canceled_requests {
244                        // if client disabled, don't retry
245                        // a fresh connection means we definitely can't retry
246                        return Err(error);
247                    }
248
249                    trace!(
250                        "unstarted request canceled, trying again (reason={:?})",
251                        error
252                    );
253                    *req.uri_mut() = uri.clone();
254                    req
255                }
256            }
257        }
258    }
259
260    async fn try_send_request(
261        &self,
262        mut req: Request<B>,
263        pool_key: PoolKey,
264    ) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
265        let mut pooled = self
266            .connection_for(pool_key)
267            .await
268            // `connection_for` already retries checkout errors, so if
269            // it returns an error, there's not much else to retry
270            .map_err(TrySendError::Nope)?;
271
272        if pooled.is_http1() {
273            if req.version() == Version::HTTP_2 {
274                warn!("Connection is HTTP/1, but request requires HTTP/2");
275                return Err(TrySendError::Nope(e!(UserUnsupportedVersion)));
276            }
277
278            if self.config.set_host {
279                let uri = req.uri().clone();
280                req.headers_mut().entry(HOST).or_insert_with(|| {
281                    let hostname = uri.host().expect("authority implies host");
282                    if let Some(port) = get_non_default_port(&uri) {
283                        let s = format!("{}:{}", hostname, port);
284                        HeaderValue::from_str(&s)
285                    } else {
286                        HeaderValue::from_str(hostname)
287                    }
288                    .expect("uri host is valid header value")
289                });
290            }
291
292            // CONNECT always sends authority-form, so check it first...
293            if req.method() == Method::CONNECT {
294                authority_form(req.uri_mut());
295            } else if pooled.conn_info.is_proxied {
296                absolute_form(req.uri_mut());
297            } else {
298                origin_form(req.uri_mut());
299            }
300        } else if req.method() == Method::CONNECT {
301            authority_form(req.uri_mut());
302        }
303
304        let mut res = match pooled.try_send_request(req).await {
305            Ok(res) => res,
306            Err(mut err) => {
307                return if let Some(req) = err.take_message() {
308                    Err(TrySendError::Retryable {
309                        error: e!(Canceled, err.into_error()),
310                        req,
311                    })
312                } else {
313                    Err(TrySendError::Nope(e!(SendRequest, err.into_error())))
314                }
315            }
316        };
317        //.send_request_retryable(req)
318        //.map_err(ClientError::map_with_reused(pooled.is_reused()));
319
320        // If the Connector included 'extra' info, add to Response...
321        if let Some(extra) = &pooled.conn_info.extra {
322            extra.set(res.extensions_mut());
323        }
324
325        // As of futures@0.1.21, there is a race condition in the mpsc
326        // channel, such that sending when the receiver is closing can
327        // result in the message being stuck inside the queue. It won't
328        // ever notify until the Sender side is dropped.
329        //
330        // To counteract this, we must check if our senders 'want' channel
331        // has been closed after having tried to send. If so, error out...
332        if pooled.is_closed() {
333            return Ok(res);
334        }
335
336        // If pooled is HTTP/2, we can toss this reference immediately.
337        //
338        // when pooled is dropped, it will try to insert back into the
339        // pool. To delay that, spawn a future that completes once the
340        // sender is ready again.
341        //
342        // This *should* only be once the related `Connection` has polled
343        // for a new request to start.
344        //
345        // It won't be ready if there is a body to stream.
346        if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
347            drop(pooled);
348        } else if !res.body().is_end_stream() {
349            //let (delayed_tx, delayed_rx) = oneshot::channel::<()>();
350            //res.body_mut().delayed_eof(delayed_rx);
351            let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
352                // At this point, `pooled` is dropped, and had a chance
353                // to insert into the pool (if conn was idle)
354                //drop(delayed_tx);
355            });
356
357            self.exec.execute(on_idle);
358        } else {
359            // There's no body to delay, but the connection isn't
360            // ready yet. Only re-insert when it's ready
361            let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
362
363            self.exec.execute(on_idle);
364        }
365
366        Ok(res)
367    }
368
369    async fn connection_for(
370        &self,
371        pool_key: PoolKey,
372    ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
373        loop {
374            match self.one_connection_for(pool_key.clone()).await {
375                Ok(pooled) => return Ok(pooled),
376                Err(ClientConnectError::Normal(err)) => return Err(err),
377                Err(ClientConnectError::CheckoutIsClosed(reason)) => {
378                    if !self.config.retry_canceled_requests {
379                        return Err(e!(Connect, reason));
380                    }
381
382                    trace!(
383                        "unstarted request canceled, trying again (reason={:?})",
384                        reason,
385                    );
386                    continue;
387                }
388            };
389        }
390    }
391
392    async fn one_connection_for(
393        &self,
394        pool_key: PoolKey,
395    ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
396        // Return a single connection if pooling is not enabled
397        if !self.pool.is_enabled() {
398            return self
399                .connect_to(pool_key)
400                .await
401                .map_err(ClientConnectError::Normal);
402        }
403
404        // This actually races 2 different futures to try to get a ready
405        // connection the fastest, and to reduce connection churn.
406        //
407        // - If the pool has an idle connection waiting, that's used
408        //   immediately.
409        // - Otherwise, the Connector is asked to start connecting to
410        //   the destination Uri.
411        // - Meanwhile, the pool Checkout is watching to see if any other
412        //   request finishes and tries to insert an idle connection.
413        // - If a new connection is started, but the Checkout wins after
414        //   (an idle connection became available first), the started
415        //   connection future is spawned into the runtime to complete,
416        //   and then be inserted into the pool as an idle connection.
417        let checkout = self.pool.checkout(pool_key.clone());
418        let connect = self.connect_to(pool_key);
419        let is_ver_h2 = self.config.ver == Ver::Http2;
420
421        // The order of the `select` is depended on below...
422
423        match future::select(checkout, connect).await {
424            // Checkout won, connect future may have been started or not.
425            //
426            // If it has, let it finish and insert back into the pool,
427            // so as to not waste the socket...
428            Either::Left((Ok(checked_out), connecting)) => {
429                // This depends on the `select` above having the correct
430                // order, such that if the checkout future were ready
431                // immediately, the connect future will never have been
432                // started.
433                //
434                // If it *wasn't* ready yet, then the connect future will
435                // have been started...
436                if connecting.started() {
437                    let bg = connecting
438                        .map_err(|err| {
439                            trace!("background connect error: {}", err);
440                        })
441                        .map(|_pooled| {
442                            // dropping here should just place it in
443                            // the Pool for us...
444                        });
445                    // An execute error here isn't important, we're just trying
446                    // to prevent a waste of a socket...
447                    self.exec.execute(bg);
448                }
449                Ok(checked_out)
450            }
451            // Connect won, checkout can just be dropped.
452            Either::Right((Ok(connected), _checkout)) => Ok(connected),
453            // Either checkout or connect could get canceled:
454            //
455            // 1. Connect is canceled if this is HTTP/2 and there is
456            //    an outstanding HTTP/2 connecting task.
457            // 2. Checkout is canceled if the pool cannot deliver an
458            //    idle connection reliably.
459            //
460            // In both cases, we should just wait for the other future.
461            Either::Left((Err(err), connecting)) => {
462                if err.is_canceled() {
463                    connecting.await.map_err(ClientConnectError::Normal)
464                } else {
465                    Err(ClientConnectError::Normal(e!(Connect, err)))
466                }
467            }
468            Either::Right((Err(err), checkout)) => {
469                if err.is_canceled() {
470                    checkout.await.map_err(move |err| {
471                        if is_ver_h2 && err.is_canceled() {
472                            ClientConnectError::CheckoutIsClosed(err)
473                        } else {
474                            ClientConnectError::Normal(e!(Connect, err))
475                        }
476                    })
477                } else {
478                    Err(ClientConnectError::Normal(err))
479                }
480            }
481        }
482    }
483
484    #[cfg(any(feature = "http1", feature = "http2"))]
485    fn connect_to(
486        &self,
487        pool_key: PoolKey,
488    ) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
489    {
490        let executor = self.exec.clone();
491        let pool = self.pool.clone();
492        #[cfg(feature = "http1")]
493        let h1_builder = self.h1_builder.clone();
494        #[cfg(feature = "http2")]
495        let h2_builder = self.h2_builder.clone();
496        let ver = self.config.ver;
497        let is_ver_h2 = ver == Ver::Http2;
498        let connector = self.connector.clone();
499        let dst = domain_as_uri(pool_key.clone());
500        hyper_lazy(move || {
501            // Try to take a "connecting lock".
502            //
503            // If the pool_key is for HTTP/2, and there is already a
504            // connection being established, then this can't take a
505            // second lock. The "connect_to" future is Canceled.
506            let connecting = match pool.connecting(&pool_key, ver) {
507                Some(lock) => lock,
508                None => {
509                    let canceled = e!(Canceled);
510                    // TODO
511                    //crate::Error::new_canceled().with("HTTP/2 connection in progress");
512                    return Either::Right(future::err(canceled));
513                }
514            };
515            Either::Left(
516                connector
517                    .connect(super::connect::sealed::Internal, dst)
518                    .map_err(|src| e!(Connect, src))
519                    .and_then(move |io| {
520                        let connected = io.connected();
521                        // If ALPN is h2 and we aren't http2_only already,
522                        // then we need to convert our pool checkout into
523                        // a single HTTP2 one.
524                        let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
525                            match connecting.alpn_h2(&pool) {
526                                Some(lock) => {
527                                    trace!("ALPN negotiated h2, updating pool");
528                                    lock
529                                }
530                                None => {
531                                    // Another connection has already upgraded,
532                                    // the pool checkout should finish up for us.
533                                    let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
534                                    return Either::Right(future::err(canceled));
535                                }
536                            }
537                        } else {
538                            connecting
539                        };
540
541                        #[cfg_attr(not(feature = "http2"), allow(unused))]
542                        let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
543
544                        Either::Left(Box::pin(async move {
545                            let tx = if is_h2 {
546                                #[cfg(feature = "http2")] {
547                                    let (mut tx, conn) =
548                                        h2_builder.handshake(io).await.map_err(Error::tx)?;
549
550                                    trace!(
551                                        "http2 handshake complete, spawning background dispatcher task"
552                                    );
553                                    executor.execute(
554                                        conn.map_err(|e| debug!("client connection error: {}", e))
555                                            .map(|_| ()),
556                                    );
557
558                                    // Wait for 'conn' to ready up before we
559                                    // declare this tx as usable
560                                    tx.ready().await.map_err(Error::tx)?;
561                                    PoolTx::Http2(tx)
562                                }
563                                #[cfg(not(feature = "http2"))]
564                                panic!("http2 feature is not enabled");
565                            } else {
566                                #[cfg(feature = "http1")] {
567                                    let (mut tx, conn) =
568                                        h1_builder.handshake(io).await.map_err(Error::tx)?;
569
570                                    trace!(
571                                        "http1 handshake complete, spawning background dispatcher task"
572                                    );
573                                    executor.execute(
574                                        conn.with_upgrades()
575                                            .map_err(|e| debug!("client connection error: {}", e))
576                                            .map(|_| ()),
577                                    );
578
579                                    // Wait for 'conn' to ready up before we
580                                    // declare this tx as usable
581                                    tx.ready().await.map_err(Error::tx)?;
582                                    PoolTx::Http1(tx)
583                                }
584                                #[cfg(not(feature = "http1"))] {
585                                    panic!("http1 feature is not enabled");
586                                }
587                            };
588
589                            Ok(pool.pooled(
590                                connecting,
591                                PoolClient {
592                                    conn_info: connected,
593                                    tx,
594                                },
595                            ))
596                        }))
597                    }),
598            )
599        })
600    }
601}
602
603impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
604where
605    C: Connect + Clone + Send + Sync + 'static,
606    B: Body + Send + 'static + Unpin,
607    B::Data: Send,
608    B::Error: Into<Box<dyn StdError + Send + Sync>>,
609{
610    type Response = Response<hyper::body::Incoming>;
611    type Error = Error;
612    type Future = ResponseFuture;
613
614    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
615        Poll::Ready(Ok(()))
616    }
617
618    fn call(&mut self, req: Request<B>) -> Self::Future {
619        self.request(req)
620    }
621}
622
623impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
624where
625    C: Connect + Clone + Send + Sync + 'static,
626    B: Body + Send + 'static + Unpin,
627    B::Data: Send,
628    B::Error: Into<Box<dyn StdError + Send + Sync>>,
629{
630    type Response = Response<hyper::body::Incoming>;
631    type Error = Error;
632    type Future = ResponseFuture;
633
634    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
635        Poll::Ready(Ok(()))
636    }
637
638    fn call(&mut self, req: Request<B>) -> Self::Future {
639        self.request(req)
640    }
641}
642
643impl<C: Clone, B> Clone for Client<C, B> {
644    fn clone(&self) -> Client<C, B> {
645        Client {
646            config: self.config,
647            exec: self.exec.clone(),
648            #[cfg(feature = "http1")]
649            h1_builder: self.h1_builder.clone(),
650            #[cfg(feature = "http2")]
651            h2_builder: self.h2_builder.clone(),
652            connector: self.connector.clone(),
653            pool: self.pool.clone(),
654        }
655    }
656}
657
658impl<C, B> fmt::Debug for Client<C, B> {
659    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
660        f.debug_struct("Client").finish()
661    }
662}
663
664// ===== impl ResponseFuture =====
665
666impl ResponseFuture {
667    fn new<F>(value: F) -> Self
668    where
669        F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
670    {
671        Self {
672            inner: SyncWrapper::new(Box::pin(value)),
673        }
674    }
675
676    fn error_version(ver: Version) -> Self {
677        warn!("Request has unsupported version \"{:?}\"", ver);
678        ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion))))
679    }
680}
681
682impl fmt::Debug for ResponseFuture {
683    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684        f.pad("Future<Response>")
685    }
686}
687
688impl Future for ResponseFuture {
689    type Output = Result<Response<hyper::body::Incoming>, Error>;
690
691    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
692        self.inner.get_mut().as_mut().poll(cx)
693    }
694}
695
696// ===== impl PoolClient =====
697
698// FIXME: allow() required due to `impl Trait` leaking types to this lint
699#[allow(missing_debug_implementations)]
700struct PoolClient<B> {
701    conn_info: Connected,
702    tx: PoolTx<B>,
703}
704
705enum PoolTx<B> {
706    #[cfg(feature = "http1")]
707    Http1(hyper::client::conn::http1::SendRequest<B>),
708    #[cfg(feature = "http2")]
709    Http2(hyper::client::conn::http2::SendRequest<B>),
710}
711
712impl<B> PoolClient<B> {
713    fn poll_ready(
714        &mut self,
715        #[allow(unused_variables)] cx: &mut task::Context<'_>,
716    ) -> Poll<Result<(), Error>> {
717        match self.tx {
718            #[cfg(feature = "http1")]
719            PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
720            #[cfg(feature = "http2")]
721            PoolTx::Http2(_) => Poll::Ready(Ok(())),
722        }
723    }
724
725    fn is_http1(&self) -> bool {
726        !self.is_http2()
727    }
728
729    fn is_http2(&self) -> bool {
730        match self.tx {
731            #[cfg(feature = "http1")]
732            PoolTx::Http1(_) => false,
733            #[cfg(feature = "http2")]
734            PoolTx::Http2(_) => true,
735        }
736    }
737
738    fn is_ready(&self) -> bool {
739        match self.tx {
740            #[cfg(feature = "http1")]
741            PoolTx::Http1(ref tx) => tx.is_ready(),
742            #[cfg(feature = "http2")]
743            PoolTx::Http2(ref tx) => tx.is_ready(),
744        }
745    }
746
747    fn is_closed(&self) -> bool {
748        match self.tx {
749            #[cfg(feature = "http1")]
750            PoolTx::Http1(ref tx) => tx.is_closed(),
751            #[cfg(feature = "http2")]
752            PoolTx::Http2(ref tx) => tx.is_closed(),
753        }
754    }
755}
756
757impl<B: Body + 'static> PoolClient<B> {
758    fn try_send_request(
759        &mut self,
760        req: Request<B>,
761    ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
762    where
763        B: Send,
764    {
765        #[cfg(all(feature = "http1", feature = "http2"))]
766        return match self.tx {
767            #[cfg(feature = "http1")]
768            PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
769            #[cfg(feature = "http2")]
770            PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
771        };
772
773        #[cfg(feature = "http1")]
774        #[cfg(not(feature = "http2"))]
775        return match self.tx {
776            #[cfg(feature = "http1")]
777            PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
778        };
779
780        #[cfg(not(feature = "http1"))]
781        #[cfg(feature = "http2")]
782        return match self.tx {
783            #[cfg(feature = "http2")]
784            PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
785        };
786    }
787    /*
788    //TODO: can we re-introduce this somehow? Or must people use tower::retry?
789    fn send_request_retryable(
790        &mut self,
791        req: Request<B>,
792    ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, (Error, Option<Request<B>>)>>
793    where
794        B: Send,
795    {
796        match self.tx {
797            #[cfg(not(feature = "http2"))]
798            PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req),
799            #[cfg(feature = "http1")]
800            PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request_retryable(req)),
801            #[cfg(feature = "http2")]
802            PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request_retryable(req)),
803        }
804    }
805    */
806}
807
808impl<B> pool::Poolable for PoolClient<B>
809where
810    B: Send + 'static,
811{
812    fn is_open(&self) -> bool {
813        self.is_ready()
814    }
815
816    fn reserve(self) -> pool::Reservation<Self> {
817        match self.tx {
818            #[cfg(feature = "http1")]
819            PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
820                conn_info: self.conn_info,
821                tx: PoolTx::Http1(tx),
822            }),
823            #[cfg(feature = "http2")]
824            PoolTx::Http2(tx) => {
825                let b = PoolClient {
826                    conn_info: self.conn_info.clone(),
827                    tx: PoolTx::Http2(tx.clone()),
828                };
829                let a = PoolClient {
830                    conn_info: self.conn_info,
831                    tx: PoolTx::Http2(tx),
832                };
833                pool::Reservation::Shared(a, b)
834            }
835        }
836    }
837
838    fn can_share(&self) -> bool {
839        self.is_http2()
840    }
841}
842
843enum ClientConnectError {
844    Normal(Error),
845    CheckoutIsClosed(pool::Error),
846}
847
848fn origin_form(uri: &mut Uri) {
849    let path = match uri.path_and_query() {
850        Some(path) if path.as_str() != "/" => {
851            let mut parts = ::http::uri::Parts::default();
852            parts.path_and_query = Some(path.clone());
853            Uri::from_parts(parts).expect("path is valid uri")
854        }
855        _none_or_just_slash => {
856            debug_assert!(Uri::default() == "/");
857            Uri::default()
858        }
859    };
860    *uri = path
861}
862
863fn absolute_form(uri: &mut Uri) {
864    debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
865    debug_assert!(
866        uri.authority().is_some(),
867        "absolute_form needs an authority"
868    );
869    // If the URI is to HTTPS, and the connector claimed to be a proxy,
870    // then it *should* have tunneled, and so we don't want to send
871    // absolute-form in that case.
872    if uri.scheme() == Some(&Scheme::HTTPS) {
873        origin_form(uri);
874    }
875}
876
877fn authority_form(uri: &mut Uri) {
878    if let Some(path) = uri.path_and_query() {
879        // `https://hyper.rs` would parse with `/` path, don't
880        // annoy people about that...
881        if path != "/" {
882            warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
883        }
884    }
885    *uri = match uri.authority() {
886        Some(auth) => {
887            let mut parts = ::http::uri::Parts::default();
888            parts.authority = Some(auth.clone());
889            Uri::from_parts(parts).expect("authority is valid")
890        }
891        None => {
892            unreachable!("authority_form with relative uri");
893        }
894    };
895}
896
897fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
898    let uri_clone = uri.clone();
899    match (uri_clone.scheme(), uri_clone.authority()) {
900        (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
901        (None, Some(auth)) if is_http_connect => {
902            let scheme = match auth.port_u16() {
903                Some(443) => {
904                    set_scheme(uri, Scheme::HTTPS);
905                    Scheme::HTTPS
906                }
907                _ => {
908                    set_scheme(uri, Scheme::HTTP);
909                    Scheme::HTTP
910                }
911            };
912            Ok((scheme, auth.clone()))
913        }
914        _ => {
915            debug!("Client requires absolute-form URIs, received: {:?}", uri);
916            Err(e!(UserAbsoluteUriRequired))
917        }
918    }
919}
920
921fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
922    http::uri::Builder::new()
923        .scheme(scheme)
924        .authority(auth)
925        .path_and_query("/")
926        .build()
927        .expect("domain is valid Uri")
928}
929
930fn set_scheme(uri: &mut Uri, scheme: Scheme) {
931    debug_assert!(
932        uri.scheme().is_none(),
933        "set_scheme expects no existing scheme"
934    );
935    let old = std::mem::take(uri);
936    let mut parts: ::http::uri::Parts = old.into();
937    parts.scheme = Some(scheme);
938    parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
939    *uri = Uri::from_parts(parts).expect("scheme is valid");
940}
941
942fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
943    match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
944        (Some(443), true) => None,
945        (Some(80), false) => None,
946        _ => uri.port(),
947    }
948}
949
950fn is_schema_secure(uri: &Uri) -> bool {
951    uri.scheme_str()
952        .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
953        .unwrap_or_default()
954}
955
956/// A builder to configure a new [`Client`](Client).
957///
958/// # Example
959///
960/// ```
961/// # #[cfg(feature = "tokio")]
962/// # fn run () {
963/// use std::time::Duration;
964/// use hyper_util::client::legacy::Client;
965/// use hyper_util::rt::TokioExecutor;
966///
967/// let client = Client::builder(TokioExecutor::new())
968///     .pool_idle_timeout(Duration::from_secs(30))
969///     .http2_only(true)
970///     .build_http();
971/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
972/// # drop(infer);
973/// # }
974/// # fn main() {}
975/// ```
976#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
977#[derive(Clone)]
978pub struct Builder {
979    client_config: Config,
980    exec: Exec,
981    #[cfg(feature = "http1")]
982    h1_builder: hyper::client::conn::http1::Builder,
983    #[cfg(feature = "http2")]
984    h2_builder: hyper::client::conn::http2::Builder<Exec>,
985    pool_config: pool::Config,
986    pool_timer: Option<timer::Timer>,
987}
988
989impl Builder {
990    /// Construct a new Builder.
991    pub fn new<E>(executor: E) -> Self
992    where
993        E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
994    {
995        let exec = Exec::new(executor);
996        Self {
997            client_config: Config {
998                retry_canceled_requests: true,
999                set_host: true,
1000                ver: Ver::Auto,
1001            },
1002            exec: exec.clone(),
1003            #[cfg(feature = "http1")]
1004            h1_builder: hyper::client::conn::http1::Builder::new(),
1005            #[cfg(feature = "http2")]
1006            h2_builder: hyper::client::conn::http2::Builder::new(exec),
1007            pool_config: pool::Config {
1008                idle_timeout: Some(Duration::from_secs(90)),
1009                max_idle_per_host: usize::MAX,
1010            },
1011            pool_timer: None,
1012        }
1013    }
1014    /// Set an optional timeout for idle sockets being kept-alive.
1015    /// A `Timer` is required for this to take effect. See `Builder::pool_timer`
1016    ///
1017    /// Pass `None` to disable timeout.
1018    ///
1019    /// Default is 90 seconds.
1020    ///
1021    /// # Example
1022    ///
1023    /// ```
1024    /// # #[cfg(feature = "tokio")]
1025    /// # fn run () {
1026    /// use std::time::Duration;
1027    /// use hyper_util::client::legacy::Client;
1028    /// use hyper_util::rt::{TokioExecutor, TokioTimer};
1029    ///
1030    /// let client = Client::builder(TokioExecutor::new())
1031    ///     .pool_idle_timeout(Duration::from_secs(30))
1032    ///     .pool_timer(TokioTimer::new())
1033    ///     .build_http();
1034    ///
1035    /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1036    /// # }
1037    /// # fn main() {}
1038    /// ```
1039    pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
1040    where
1041        D: Into<Option<Duration>>,
1042    {
1043        self.pool_config.idle_timeout = val.into();
1044        self
1045    }
1046
1047    #[doc(hidden)]
1048    #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
1049    pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1050        self.pool_config.max_idle_per_host = max_idle;
1051        self
1052    }
1053
1054    /// Sets the maximum idle connection per host allowed in the pool.
1055    ///
1056    /// Default is `usize::MAX` (no limit).
1057    pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1058        self.pool_config.max_idle_per_host = max_idle;
1059        self
1060    }
1061
1062    // HTTP/1 options
1063
1064    /// Sets the exact size of the read buffer to *always* use.
1065    ///
1066    /// Note that setting this option unsets the `http1_max_buf_size` option.
1067    ///
1068    /// Default is an adaptive read buffer.
1069    #[cfg(feature = "http1")]
1070    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1071    pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1072        self.h1_builder.read_buf_exact_size(Some(sz));
1073        self
1074    }
1075
1076    /// Set the maximum buffer size for the connection.
1077    ///
1078    /// Default is ~400kb.
1079    ///
1080    /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1081    ///
1082    /// # Panics
1083    ///
1084    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1085    #[cfg(feature = "http1")]
1086    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1087    pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1088        self.h1_builder.max_buf_size(max);
1089        self
1090    }
1091
1092    /// Set whether HTTP/1 connections will accept spaces between header names
1093    /// and the colon that follow them in responses.
1094    ///
1095    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1096    /// parsing.
1097    ///
1098    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1099    /// to say about it:
1100    ///
1101    /// > No whitespace is allowed between the header field-name and colon. In
1102    /// > the past, differences in the handling of such whitespace have led to
1103    /// > security vulnerabilities in request routing and response handling. A
1104    /// > server MUST reject any received request message that contains
1105    /// > whitespace between a header field-name and colon with a response code
1106    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1107    /// > response message before forwarding the message downstream.
1108    ///
1109    /// Note that this setting does not affect HTTP/2.
1110    ///
1111    /// Default is false.
1112    ///
1113    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1114    #[cfg(feature = "http1")]
1115    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1116    pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1117        self.h1_builder
1118            .allow_spaces_after_header_name_in_responses(val);
1119        self
1120    }
1121
1122    /// Set whether HTTP/1 connections will accept obsolete line folding for
1123    /// header values.
1124    ///
1125    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1126    /// to say about it:
1127    ///
1128    /// > A server that receives an obs-fold in a request message that is not
1129    /// > within a message/http container MUST either reject the message by
1130    /// > sending a 400 (Bad Request), preferably with a representation
1131    /// > explaining that obsolete line folding is unacceptable, or replace
1132    /// > each received obs-fold with one or more SP octets prior to
1133    /// > interpreting the field value or forwarding the message downstream.
1134    ///
1135    /// > A proxy or gateway that receives an obs-fold in a response message
1136    /// > that is not within a message/http container MUST either discard the
1137    /// > message and replace it with a 502 (Bad Gateway) response, preferably
1138    /// > with a representation explaining that unacceptable line folding was
1139    /// > received, or replace each received obs-fold with one or more SP
1140    /// > octets prior to interpreting the field value or forwarding the
1141    /// > message downstream.
1142    ///
1143    /// > A user agent that receives an obs-fold in a response message that is
1144    /// > not within a message/http container MUST replace each received
1145    /// > obs-fold with one or more SP octets prior to interpreting the field
1146    /// > value.
1147    ///
1148    /// Note that this setting does not affect HTTP/2.
1149    ///
1150    /// Default is false.
1151    ///
1152    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1153    #[cfg(feature = "http1")]
1154    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1155    pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1156        self.h1_builder
1157            .allow_obsolete_multiline_headers_in_responses(val);
1158        self
1159    }
1160
1161    /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1162    ///
1163    /// This mimics the behaviour of major browsers. You probably don't want this.
1164    /// You should only want this if you are implementing a proxy whose main
1165    /// purpose is to sit in front of browsers whose users access arbitrary content
1166    /// which may be malformed, and they expect everything that works without
1167    /// the proxy to keep working with the proxy.
1168    ///
1169    /// This option will prevent Hyper's client from returning an error encountered
1170    /// when parsing a header, except if the error was caused by the character NUL
1171    /// (ASCII code 0), as Chrome specifically always reject those.
1172    ///
1173    /// The ignorable errors are:
1174    /// * empty header names;
1175    /// * characters that are not allowed in header names, except for `\0` and `\r`;
1176    /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1177    ///   spaces and tabs between the header name and the colon;
1178    /// * missing colon between header name and colon;
1179    /// * characters that are not allowed in header values except for `\0` and `\r`.
1180    ///
1181    /// If an ignorable error is encountered, the parser tries to find the next
1182    /// line in the input to resume parsing the rest of the headers. An error
1183    /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1184    /// looking for the next line.
1185    #[cfg(feature = "http1")]
1186    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1187    pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1188        self.h1_builder.ignore_invalid_headers_in_responses(val);
1189        self
1190    }
1191
1192    /// Set whether HTTP/1 connections should try to use vectored writes,
1193    /// or always flatten into a single buffer.
1194    ///
1195    /// Note that setting this to false may mean more copies of body data,
1196    /// but may also improve performance when an IO transport doesn't
1197    /// support vectored writes well, such as most TLS implementations.
1198    ///
1199    /// Setting this to true will force hyper to use queued strategy
1200    /// which may eliminate unnecessary cloning on some TLS backends
1201    ///
1202    /// Default is `auto`. In this mode hyper will try to guess which
1203    /// mode to use
1204    #[cfg(feature = "http1")]
1205    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1206    pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1207        self.h1_builder.writev(enabled);
1208        self
1209    }
1210
1211    /// Set whether HTTP/1 connections will write header names as title case at
1212    /// the socket level.
1213    ///
1214    /// Note that this setting does not affect HTTP/2.
1215    ///
1216    /// Default is false.
1217    #[cfg(feature = "http1")]
1218    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1219    pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1220        self.h1_builder.title_case_headers(val);
1221        self
1222    }
1223
1224    /// Set whether to support preserving original header cases.
1225    ///
1226    /// Currently, this will record the original cases received, and store them
1227    /// in a private extension on the `Response`. It will also look for and use
1228    /// such an extension in any provided `Request`.
1229    ///
1230    /// Since the relevant extension is still private, there is no way to
1231    /// interact with the original cases. The only effect this can have now is
1232    /// to forward the cases in a proxy-like fashion.
1233    ///
1234    /// Note that this setting does not affect HTTP/2.
1235    ///
1236    /// Default is false.
1237    #[cfg(feature = "http1")]
1238    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1239    pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1240        self.h1_builder.preserve_header_case(val);
1241        self
1242    }
1243
1244    /// Set the maximum number of headers.
1245    ///
1246    /// When a response is received, the parser will reserve a buffer to store headers for optimal
1247    /// performance.
1248    ///
1249    /// If client receives more headers than the buffer size, the error "message header too large"
1250    /// is returned.
1251    ///
1252    /// The headers is allocated on the stack by default, which has higher performance. After
1253    /// setting this value, headers will be allocated in heap memory, that is, heap memory
1254    /// allocation will occur for each response, and there will be a performance drop of about 5%.
1255    ///
1256    /// Note that this setting does not affect HTTP/2.
1257    ///
1258    /// Default is 100.
1259    #[cfg(feature = "http1")]
1260    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1261    pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
1262        self.h1_builder.max_headers(val);
1263        self
1264    }
1265
1266    /// Set whether HTTP/0.9 responses should be tolerated.
1267    ///
1268    /// Default is false.
1269    #[cfg(feature = "http1")]
1270    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1271    pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1272        self.h1_builder.http09_responses(val);
1273        self
1274    }
1275
1276    /// Set whether the connection **must** use HTTP/2.
1277    ///
1278    /// The destination must either allow HTTP2 Prior Knowledge, or the
1279    /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1280    /// as part of the connection process. This will not make the `Client`
1281    /// utilize ALPN by itself.
1282    ///
1283    /// Note that setting this to true prevents HTTP/1 from being allowed.
1284    ///
1285    /// Default is false.
1286    #[cfg(feature = "http2")]
1287    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1288    pub fn http2_only(&mut self, val: bool) -> &mut Self {
1289        self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1290        self
1291    }
1292
1293    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
1294    ///
1295    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
1296    /// As of v0.4.0, it is 20.
1297    ///
1298    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
1299    #[cfg(feature = "http2")]
1300    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1301    pub fn http2_max_pending_accept_reset_streams(
1302        &mut self,
1303        max: impl Into<Option<usize>>,
1304    ) -> &mut Self {
1305        self.h2_builder.max_pending_accept_reset_streams(max.into());
1306        self
1307    }
1308
1309    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1310    /// stream-level flow control.
1311    ///
1312    /// Passing `None` will do nothing.
1313    ///
1314    /// If not set, hyper will use a default.
1315    ///
1316    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1317    #[cfg(feature = "http2")]
1318    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1319    pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1320        self.h2_builder.initial_stream_window_size(sz.into());
1321        self
1322    }
1323
1324    /// Sets the max connection-level flow control for HTTP2
1325    ///
1326    /// Passing `None` will do nothing.
1327    ///
1328    /// If not set, hyper will use a default.
1329    #[cfg(feature = "http2")]
1330    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1331    pub fn http2_initial_connection_window_size(
1332        &mut self,
1333        sz: impl Into<Option<u32>>,
1334    ) -> &mut Self {
1335        self.h2_builder.initial_connection_window_size(sz.into());
1336        self
1337    }
1338
1339    /// Sets the initial maximum of locally initiated (send) streams.
1340    ///
1341    /// This value will be overwritten by the value included in the initial
1342    /// SETTINGS frame received from the peer as part of a [connection preface].
1343    ///
1344    /// Passing `None` will do nothing.
1345    ///
1346    /// If not set, hyper will use a default.
1347    ///
1348    /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
1349    #[cfg(feature = "http2")]
1350    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1351    pub fn http2_initial_max_send_streams(
1352        &mut self,
1353        initial: impl Into<Option<usize>>,
1354    ) -> &mut Self {
1355        self.h2_builder.initial_max_send_streams(initial);
1356        self
1357    }
1358
1359    /// Sets whether to use an adaptive flow control.
1360    ///
1361    /// Enabling this will override the limits set in
1362    /// `http2_initial_stream_window_size` and
1363    /// `http2_initial_connection_window_size`.
1364    #[cfg(feature = "http2")]
1365    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1366    pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1367        self.h2_builder.adaptive_window(enabled);
1368        self
1369    }
1370
1371    /// Sets the maximum frame size to use for HTTP2.
1372    ///
1373    /// Passing `None` will do nothing.
1374    ///
1375    /// If not set, hyper will use a default.
1376    #[cfg(feature = "http2")]
1377    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1378    pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1379        self.h2_builder.max_frame_size(sz);
1380        self
1381    }
1382
1383    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1384    /// connection alive.
1385    ///
1386    /// Pass `None` to disable HTTP2 keep-alive.
1387    ///
1388    /// Default is currently disabled.
1389    ///
1390    /// # Cargo Feature
1391    ///
1392    /// Requires the `tokio` cargo feature to be enabled.
1393    #[cfg(feature = "tokio")]
1394    #[cfg(feature = "http2")]
1395    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1396    pub fn http2_keep_alive_interval(
1397        &mut self,
1398        interval: impl Into<Option<Duration>>,
1399    ) -> &mut Self {
1400        self.h2_builder.keep_alive_interval(interval);
1401        self
1402    }
1403
1404    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1405    ///
1406    /// If the ping is not acknowledged within the timeout, the connection will
1407    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1408    ///
1409    /// Default is 20 seconds.
1410    ///
1411    /// # Cargo Feature
1412    ///
1413    /// Requires the `tokio` cargo feature to be enabled.
1414    #[cfg(feature = "tokio")]
1415    #[cfg(feature = "http2")]
1416    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1417    pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1418        self.h2_builder.keep_alive_timeout(timeout);
1419        self
1420    }
1421
1422    /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1423    ///
1424    /// If disabled, keep-alive pings are only sent while there are open
1425    /// request/responses streams. If enabled, pings are also sent when no
1426    /// streams are active. Does nothing if `http2_keep_alive_interval` is
1427    /// disabled.
1428    ///
1429    /// Default is `false`.
1430    ///
1431    /// # Cargo Feature
1432    ///
1433    /// Requires the `tokio` cargo feature to be enabled.
1434    #[cfg(feature = "tokio")]
1435    #[cfg(feature = "http2")]
1436    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1437    pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1438        self.h2_builder.keep_alive_while_idle(enabled);
1439        self
1440    }
1441
1442    /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1443    ///
1444    /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1445    /// details.
1446    ///
1447    /// The default value is determined by the `h2` crate.
1448    ///
1449    /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1450    #[cfg(feature = "http2")]
1451    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1452    pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1453        self.h2_builder.max_concurrent_reset_streams(max);
1454        self
1455    }
1456
1457    /// Provide a timer to be used for h2
1458    ///
1459    /// See the documentation of [`h2::client::Builder::timer`] for more
1460    /// details.
1461    ///
1462    /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
1463    pub fn timer<M>(&mut self, timer: M) -> &mut Self
1464    where
1465        M: Timer + Send + Sync + 'static,
1466    {
1467        #[cfg(feature = "http2")]
1468        self.h2_builder.timer(timer);
1469        self
1470    }
1471
1472    /// Provide a timer to be used for timeouts and intervals in connection pools.
1473    pub fn pool_timer<M>(&mut self, _timer: M) -> &mut Self
1474    where
1475        M: Timer + Clone + Send + Sync + 'static,
1476    {
1477        self.pool_timer = Some(timer::Timer::new());
1478        self
1479    }
1480
1481    /// Set the maximum write buffer size for each HTTP/2 stream.
1482    ///
1483    /// Default is currently 1MB, but may change.
1484    ///
1485    /// # Panics
1486    ///
1487    /// The value must be no larger than `u32::MAX`.
1488    #[cfg(feature = "http2")]
1489    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1490    pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1491        self.h2_builder.max_send_buf_size(max);
1492        self
1493    }
1494
1495    /// Set whether to retry requests that get disrupted before ever starting
1496    /// to write.
1497    ///
1498    /// This means a request that is queued, and gets given an idle, reused
1499    /// connection, and then encounters an error immediately as the idle
1500    /// connection was found to be unusable.
1501    ///
1502    /// When this is set to `false`, the related `ResponseFuture` would instead
1503    /// resolve to an `Error::Cancel`.
1504    ///
1505    /// Default is `true`.
1506    #[inline]
1507    pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1508        self.client_config.retry_canceled_requests = val;
1509        self
1510    }
1511
1512    /// Set whether to automatically add the `Host` header to requests.
1513    ///
1514    /// If true, and a request does not include a `Host` header, one will be
1515    /// added automatically, derived from the authority of the `Uri`.
1516    ///
1517    /// Default is `true`.
1518    #[inline]
1519    pub fn set_host(&mut self, val: bool) -> &mut Self {
1520        self.client_config.set_host = val;
1521        self
1522    }
1523
1524    /// Build a client with this configuration and the default `HttpConnector`.
1525    #[cfg(feature = "tokio")]
1526    pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1527    where
1528        B: Body + Send,
1529        B::Data: Send,
1530    {
1531        let mut connector = HttpConnector::new();
1532        if self.pool_config.is_enabled() {
1533            connector.set_keepalive(self.pool_config.idle_timeout);
1534        }
1535        self.build(connector)
1536    }
1537
1538    /// Combine the configuration of this builder with a connector to create a `Client`.
1539    pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1540    where
1541        C: Connect + Clone,
1542        B: Body + Send,
1543        B::Data: Send,
1544    {
1545        let exec = self.exec.clone();
1546        let timer = self.pool_timer.clone();
1547        Client {
1548            config: self.client_config,
1549            exec: exec.clone(),
1550            #[cfg(feature = "http1")]
1551            h1_builder: self.h1_builder.clone(),
1552            #[cfg(feature = "http2")]
1553            h2_builder: self.h2_builder.clone(),
1554            connector,
1555            pool: pool::Pool::new(self.pool_config, exec, timer),
1556        }
1557    }
1558}
1559
1560impl fmt::Debug for Builder {
1561    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1562        f.debug_struct("Builder")
1563            .field("client_config", &self.client_config)
1564            .field("pool_config", &self.pool_config)
1565            .finish()
1566    }
1567}
1568
1569// ==== impl Error ====
1570
1571impl fmt::Debug for Error {
1572    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1573        let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
1574        f.field(&self.kind);
1575        if let Some(ref cause) = self.source {
1576            f.field(cause);
1577        }
1578        f.finish()
1579    }
1580}
1581
1582impl fmt::Display for Error {
1583    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1584        write!(f, "client error ({:?})", self.kind)
1585    }
1586}
1587
1588impl StdError for Error {
1589    fn source(&self) -> Option<&(dyn StdError + 'static)> {
1590        self.source.as_ref().map(|e| &**e as _)
1591    }
1592}
1593
1594impl Error {
1595    /// Returns true if this was an error from `Connect`.
1596    pub fn is_connect(&self) -> bool {
1597        matches!(self.kind, ErrorKind::Connect)
1598    }
1599
1600    fn is_canceled(&self) -> bool {
1601        matches!(self.kind, ErrorKind::Canceled)
1602    }
1603
1604    fn tx(src: hyper::Error) -> Self {
1605        e!(SendRequest, src)
1606    }
1607
1608    fn closed(src: hyper::Error) -> Self {
1609        e!(ChannelClosed, src)
1610    }
1611}