hyper_util/client/legacy/
client.rs

1//! The legacy HTTP Client from 0.14.x
2//!
3//! This `Client` will eventually be deconstructed into more composable parts.
4//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
5//! in much the same way it did in hyper 0.14.
6
7use std::error::Error as StdError;
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{self, Poll};
12use std::time::Duration;
13
14use futures_util::future::{self, Either, FutureExt, TryFutureExt};
15use http::uri::Scheme;
16use hyper::client::conn::TrySendError as ConnTrySendError;
17use hyper::header::{HeaderValue, HOST};
18use hyper::rt::Timer;
19use hyper::{body::Body, Method, Request, Response, Uri, Version};
20use tracing::{debug, trace, warn};
21
22use super::connect::capture::CaptureConnectionExtension;
23#[cfg(feature = "tokio")]
24use super::connect::HttpConnector;
25use super::connect::{Alpn, Connect, Connected, Connection};
26use super::pool::{self, Ver};
27
28use crate::common::future::poll_fn;
29use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
30
31type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
32
33/// A Client to make outgoing HTTP requests.
34///
35/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
36/// underlying connection pool will be reused.
37#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
38pub struct Client<C, B> {
39    config: Config,
40    connector: C,
41    exec: Exec,
42    #[cfg(feature = "http1")]
43    h1_builder: hyper::client::conn::http1::Builder,
44    #[cfg(feature = "http2")]
45    h2_builder: hyper::client::conn::http2::Builder<Exec>,
46    pool: pool::Pool<PoolClient<B>, PoolKey>,
47}
48
49#[derive(Clone, Copy, Debug)]
50struct Config {
51    retry_canceled_requests: bool,
52    set_host: bool,
53    ver: Ver,
54}
55
56/// Client errors
57pub struct Error {
58    kind: ErrorKind,
59    source: Option<Box<dyn StdError + Send + Sync>>,
60    #[cfg(any(feature = "http1", feature = "http2"))]
61    connect_info: Option<Connected>,
62}
63
64#[derive(Debug)]
65enum ErrorKind {
66    Canceled,
67    ChannelClosed,
68    Connect,
69    UserUnsupportedRequestMethod,
70    UserUnsupportedVersion,
71    UserAbsoluteUriRequired,
72    SendRequest,
73}
74
75macro_rules! e {
76    ($kind:ident) => {
77        Error {
78            kind: ErrorKind::$kind,
79            source: None,
80            connect_info: None,
81        }
82    };
83    ($kind:ident, $src:expr) => {
84        Error {
85            kind: ErrorKind::$kind,
86            source: Some($src.into()),
87            connect_info: None,
88        }
89    };
90}
91
92// We might change this... :shrug:
93type PoolKey = (http::uri::Scheme, http::uri::Authority);
94
95enum TrySendError<B> {
96    Retryable {
97        error: Error,
98        req: Request<B>,
99        connection_reused: bool,
100    },
101    Nope(Error),
102}
103
104/// A `Future` that will resolve to an HTTP Response.
105///
106/// This is returned by `Client::request` (and `Client::get`).
107#[must_use = "futures do nothing unless polled"]
108pub struct ResponseFuture {
109    inner: SyncWrapper<
110        Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
111    >,
112}
113
114// ===== impl Client =====
115
116impl Client<(), ()> {
117    /// Create a builder to configure a new `Client`.
118    ///
119    /// # Example
120    ///
121    /// ```
122    /// # #[cfg(feature = "tokio")]
123    /// # fn run () {
124    /// use std::time::Duration;
125    /// use hyper_util::client::legacy::Client;
126    /// use hyper_util::rt::{TokioExecutor, TokioTimer};
127    ///
128    /// let client = Client::builder(TokioExecutor::new())
129    ///     .pool_timer(TokioTimer::new())
130    ///     .pool_idle_timeout(Duration::from_secs(30))
131    ///     .http2_only(true)
132    ///     .build_http();
133    /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
134    /// # drop(infer);
135    /// # }
136    /// # fn main() {}
137    /// ```
138    pub fn builder<E>(executor: E) -> Builder
139    where
140        E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
141    {
142        Builder::new(executor)
143    }
144}
145
146impl<C, B> Client<C, B>
147where
148    C: Connect + Clone + Send + Sync + 'static,
149    B: Body + Send + 'static + Unpin,
150    B::Data: Send,
151    B::Error: Into<Box<dyn StdError + Send + Sync>>,
152{
153    /// Send a `GET` request to the supplied `Uri`.
154    ///
155    /// # Note
156    ///
157    /// This requires that the `Body` type have a `Default` implementation.
158    /// It *should* return an "empty" version of itself, such that
159    /// `Body::is_end_stream` is `true`.
160    ///
161    /// # Example
162    ///
163    /// ```
164    /// # #[cfg(feature = "tokio")]
165    /// # fn run () {
166    /// use hyper::Uri;
167    /// use hyper_util::client::legacy::Client;
168    /// use hyper_util::rt::TokioExecutor;
169    /// use bytes::Bytes;
170    /// use http_body_util::Full;
171    ///
172    /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
173    ///
174    /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
175    /// # }
176    /// # fn main() {}
177    /// ```
178    pub fn get(&self, uri: Uri) -> ResponseFuture
179    where
180        B: Default,
181    {
182        let body = B::default();
183        if !body.is_end_stream() {
184            warn!("default Body used for get() does not return true for is_end_stream");
185        }
186
187        let mut req = Request::new(body);
188        *req.uri_mut() = uri;
189        self.request(req)
190    }
191
192    /// Send a constructed `Request` using this `Client`.
193    ///
194    /// # Example
195    ///
196    /// ```
197    /// # #[cfg(feature = "tokio")]
198    /// # fn run () {
199    /// use hyper::{Method, Request};
200    /// use hyper_util::client::legacy::Client;
201    /// use http_body_util::Full;
202    /// use hyper_util::rt::TokioExecutor;
203    /// use bytes::Bytes;
204    ///
205    /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
206    ///
207    /// let req: Request<Full<Bytes>> = Request::builder()
208    ///     .method(Method::POST)
209    ///     .uri("http://httpbin.org/post")
210    ///     .body(Full::from("Hallo!"))
211    ///     .expect("request builder");
212    ///
213    /// let future = client.request(req);
214    /// # }
215    /// # fn main() {}
216    /// ```
217    pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
218        let is_http_connect = req.method() == Method::CONNECT;
219        match req.version() {
220            Version::HTTP_11 => (),
221            Version::HTTP_10 => {
222                if is_http_connect {
223                    warn!("CONNECT is not allowed for HTTP/1.0");
224                    return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod)));
225                }
226            }
227            Version::HTTP_2 => (),
228            // completely unsupported HTTP version (like HTTP/0.9)!
229            other => return ResponseFuture::error_version(other),
230        };
231
232        let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
233            Ok(s) => s,
234            Err(err) => {
235                return ResponseFuture::new(future::err(err));
236            }
237        };
238
239        ResponseFuture::new(self.clone().send_request(req, pool_key))
240    }
241
242    async fn send_request(
243        self,
244        mut req: Request<B>,
245        pool_key: PoolKey,
246    ) -> Result<Response<hyper::body::Incoming>, Error> {
247        let uri = req.uri().clone();
248
249        loop {
250            req = match self.try_send_request(req, pool_key.clone()).await {
251                Ok(resp) => return Ok(resp),
252                Err(TrySendError::Nope(err)) => return Err(err),
253                Err(TrySendError::Retryable {
254                    mut req,
255                    error,
256                    connection_reused,
257                }) => {
258                    if !self.config.retry_canceled_requests || !connection_reused {
259                        // if client disabled, don't retry
260                        // a fresh connection means we definitely can't retry
261                        return Err(error);
262                    }
263
264                    trace!(
265                        "unstarted request canceled, trying again (reason={:?})",
266                        error
267                    );
268                    *req.uri_mut() = uri.clone();
269                    req
270                }
271            }
272        }
273    }
274
275    async fn try_send_request(
276        &self,
277        mut req: Request<B>,
278        pool_key: PoolKey,
279    ) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
280        let mut pooled = self
281            .connection_for(pool_key)
282            .await
283            // `connection_for` already retries checkout errors, so if
284            // it returns an error, there's not much else to retry
285            .map_err(TrySendError::Nope)?;
286
287        if let Some(conn) = req.extensions_mut().get_mut::<CaptureConnectionExtension>() {
288            conn.set(&pooled.conn_info);
289        }
290
291        if pooled.is_http1() {
292            if req.version() == Version::HTTP_2 {
293                warn!("Connection is HTTP/1, but request requires HTTP/2");
294                return Err(TrySendError::Nope(
295                    e!(UserUnsupportedVersion).with_connect_info(pooled.conn_info.clone()),
296                ));
297            }
298
299            if self.config.set_host {
300                let uri = req.uri().clone();
301                req.headers_mut().entry(HOST).or_insert_with(|| {
302                    let hostname = uri.host().expect("authority implies host");
303                    if let Some(port) = get_non_default_port(&uri) {
304                        let s = format!("{hostname}:{port}");
305                        HeaderValue::from_str(&s)
306                    } else {
307                        HeaderValue::from_str(hostname)
308                    }
309                    .expect("uri host is valid header value")
310                });
311            }
312
313            // CONNECT always sends authority-form, so check it first...
314            if req.method() == Method::CONNECT {
315                authority_form(req.uri_mut());
316            } else if pooled.conn_info.is_proxied {
317                absolute_form(req.uri_mut());
318            } else {
319                origin_form(req.uri_mut());
320            }
321        } else if req.method() == Method::CONNECT && !pooled.is_http2() {
322            authority_form(req.uri_mut());
323        }
324
325        let mut res = match pooled.try_send_request(req).await {
326            Ok(res) => res,
327            Err(mut err) => {
328                return if let Some(req) = err.take_message() {
329                    Err(TrySendError::Retryable {
330                        connection_reused: pooled.is_reused(),
331                        error: e!(Canceled, err.into_error())
332                            .with_connect_info(pooled.conn_info.clone()),
333                        req,
334                    })
335                } else {
336                    Err(TrySendError::Nope(
337                        e!(SendRequest, err.into_error())
338                            .with_connect_info(pooled.conn_info.clone()),
339                    ))
340                }
341            }
342        };
343
344        // If the Connector included 'extra' info, add to Response...
345        if let Some(extra) = &pooled.conn_info.extra {
346            extra.set(res.extensions_mut());
347        }
348
349        // If pooled is HTTP/2, we can toss this reference immediately.
350        //
351        // when pooled is dropped, it will try to insert back into the
352        // pool. To delay that, spawn a future that completes once the
353        // sender is ready again.
354        //
355        // This *should* only be once the related `Connection` has polled
356        // for a new request to start.
357        //
358        // It won't be ready if there is a body to stream.
359        if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
360            drop(pooled);
361        } else if !res.body().is_end_stream() {
362            //let (delayed_tx, delayed_rx) = oneshot::channel::<()>();
363            //res.body_mut().delayed_eof(delayed_rx);
364            let on_idle = poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
365                // At this point, `pooled` is dropped, and had a chance
366                // to insert into the pool (if conn was idle)
367                //drop(delayed_tx);
368            });
369
370            self.exec.execute(on_idle);
371        } else {
372            // There's no body to delay, but the connection isn't
373            // ready yet. Only re-insert when it's ready
374            let on_idle = poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
375
376            self.exec.execute(on_idle);
377        }
378
379        Ok(res)
380    }
381
382    async fn connection_for(
383        &self,
384        pool_key: PoolKey,
385    ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
386        loop {
387            match self.one_connection_for(pool_key.clone()).await {
388                Ok(pooled) => return Ok(pooled),
389                Err(ClientConnectError::Normal(err)) => return Err(err),
390                Err(ClientConnectError::CheckoutIsClosed(reason)) => {
391                    if !self.config.retry_canceled_requests {
392                        return Err(e!(Connect, reason));
393                    }
394
395                    trace!(
396                        "unstarted request canceled, trying again (reason={:?})",
397                        reason,
398                    );
399                    continue;
400                }
401            };
402        }
403    }
404
405    async fn one_connection_for(
406        &self,
407        pool_key: PoolKey,
408    ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
409        // Return a single connection if pooling is not enabled
410        if !self.pool.is_enabled() {
411            return self
412                .connect_to(pool_key)
413                .await
414                .map_err(ClientConnectError::Normal);
415        }
416
417        // This actually races 2 different futures to try to get a ready
418        // connection the fastest, and to reduce connection churn.
419        //
420        // - If the pool has an idle connection waiting, that's used
421        //   immediately.
422        // - Otherwise, the Connector is asked to start connecting to
423        //   the destination Uri.
424        // - Meanwhile, the pool Checkout is watching to see if any other
425        //   request finishes and tries to insert an idle connection.
426        // - If a new connection is started, but the Checkout wins after
427        //   (an idle connection became available first), the started
428        //   connection future is spawned into the runtime to complete,
429        //   and then be inserted into the pool as an idle connection.
430        let checkout = self.pool.checkout(pool_key.clone());
431        let connect = self.connect_to(pool_key);
432        let is_ver_h2 = self.config.ver == Ver::Http2;
433
434        // The order of the `select` is depended on below...
435
436        match future::select(checkout, connect).await {
437            // Checkout won, connect future may have been started or not.
438            //
439            // If it has, let it finish and insert back into the pool,
440            // so as to not waste the socket...
441            Either::Left((Ok(checked_out), connecting)) => {
442                // This depends on the `select` above having the correct
443                // order, such that if the checkout future were ready
444                // immediately, the connect future will never have been
445                // started.
446                //
447                // If it *wasn't* ready yet, then the connect future will
448                // have been started...
449                if connecting.started() {
450                    let bg = connecting
451                        .map_err(|err| {
452                            trace!("background connect error: {}", err);
453                        })
454                        .map(|_pooled| {
455                            // dropping here should just place it in
456                            // the Pool for us...
457                        });
458                    // An execute error here isn't important, we're just trying
459                    // to prevent a waste of a socket...
460                    self.exec.execute(bg);
461                }
462                Ok(checked_out)
463            }
464            // Connect won, checkout can just be dropped.
465            Either::Right((Ok(connected), _checkout)) => Ok(connected),
466            // Either checkout or connect could get canceled:
467            //
468            // 1. Connect is canceled if this is HTTP/2 and there is
469            //    an outstanding HTTP/2 connecting task.
470            // 2. Checkout is canceled if the pool cannot deliver an
471            //    idle connection reliably.
472            //
473            // In both cases, we should just wait for the other future.
474            Either::Left((Err(err), connecting)) => {
475                if err.is_canceled() {
476                    connecting.await.map_err(ClientConnectError::Normal)
477                } else {
478                    Err(ClientConnectError::Normal(e!(Connect, err)))
479                }
480            }
481            Either::Right((Err(err), checkout)) => {
482                if err.is_canceled() {
483                    checkout.await.map_err(move |err| {
484                        if is_ver_h2 && err.is_canceled() {
485                            ClientConnectError::CheckoutIsClosed(err)
486                        } else {
487                            ClientConnectError::Normal(e!(Connect, err))
488                        }
489                    })
490                } else {
491                    Err(ClientConnectError::Normal(err))
492                }
493            }
494        }
495    }
496
497    #[cfg(any(feature = "http1", feature = "http2"))]
498    fn connect_to(
499        &self,
500        pool_key: PoolKey,
501    ) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
502    {
503        let executor = self.exec.clone();
504        let pool = self.pool.clone();
505        #[cfg(feature = "http1")]
506        let h1_builder = self.h1_builder.clone();
507        #[cfg(feature = "http2")]
508        let h2_builder = self.h2_builder.clone();
509        let ver = self.config.ver;
510        let is_ver_h2 = ver == Ver::Http2;
511        let connector = self.connector.clone();
512        let dst = domain_as_uri(pool_key.clone());
513        hyper_lazy(move || {
514            // Try to take a "connecting lock".
515            //
516            // If the pool_key is for HTTP/2, and there is already a
517            // connection being established, then this can't take a
518            // second lock. The "connect_to" future is Canceled.
519            let connecting = match pool.connecting(&pool_key, ver) {
520                Some(lock) => lock,
521                None => {
522                    let canceled = e!(Canceled);
523                    // TODO
524                    //crate::Error::new_canceled().with("HTTP/2 connection in progress");
525                    return Either::Right(future::err(canceled));
526                }
527            };
528            Either::Left(
529                connector
530                    .connect(super::connect::sealed::Internal, dst)
531                    .map_err(|src| e!(Connect, src))
532                    .and_then(move |io| {
533                        let connected = io.connected();
534                        // If ALPN is h2 and we aren't http2_only already,
535                        // then we need to convert our pool checkout into
536                        // a single HTTP2 one.
537                        let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
538                            match connecting.alpn_h2(&pool) {
539                                Some(lock) => {
540                                    trace!("ALPN negotiated h2, updating pool");
541                                    lock
542                                }
543                                None => {
544                                    // Another connection has already upgraded,
545                                    // the pool checkout should finish up for us.
546                                    let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
547                                    return Either::Right(future::err(canceled));
548                                }
549                            }
550                        } else {
551                            connecting
552                        };
553
554                        #[cfg_attr(not(feature = "http2"), allow(unused))]
555                        let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
556
557                        Either::Left(Box::pin(async move {
558                            let tx = if is_h2 {
559                                #[cfg(feature = "http2")] {
560                                    let (mut tx, conn) =
561                                        h2_builder.handshake(io).await.map_err(Error::tx)?;
562
563                                    trace!(
564                                        "http2 handshake complete, spawning background dispatcher task"
565                                    );
566                                    executor.execute(
567                                        conn.map_err(|e| debug!("client connection error: {}", e))
568                                            .map(|_| ()),
569                                    );
570
571                                    // Wait for 'conn' to ready up before we
572                                    // declare this tx as usable
573                                    tx.ready().await.map_err(Error::tx)?;
574                                    PoolTx::Http2(tx)
575                                }
576                                #[cfg(not(feature = "http2"))]
577                                panic!("http2 feature is not enabled");
578                            } else {
579                                #[cfg(feature = "http1")] {
580                                    // Perform the HTTP/1.1 handshake on the provided I/O stream.
581                                    // Uses the h1_builder to establish a connection, returning a sender (tx) for requests
582                                    // and a connection task (conn) that manages the connection lifecycle.
583                                    let (mut tx, conn) =
584                                        h1_builder.handshake(io).await.map_err(crate::client::legacy::client::Error::tx)?;
585
586                                    // Log that the HTTP/1.1 handshake has completed successfully.
587                                    // This indicates the connection is established and ready for request processing.
588                                    trace!(
589                                        "http1 handshake complete, spawning background dispatcher task"
590                                    );
591                                    // Create a oneshot channel to communicate errors from the connection task.
592                                    // err_tx sends errors from the connection task, and err_rx receives them
593                                    // to correlate connection failures with request readiness errors.
594                                    let (err_tx, err_rx) = tokio::sync::oneshot::channel();
595                                    // Spawn the connection task in the background using the executor.
596                                    // The task manages the HTTP/1.1 connection, including upgrades (e.g., WebSocket).
597                                    // Errors are sent via err_tx to ensure they can be checked if the sender (tx) fails.
598                                    executor.execute(
599                                        conn.with_upgrades()
600                                            .map_err(|e| {
601                                                // Log the connection error at debug level for diagnostic purposes.
602                                                debug!("client connection error: {:?}", e);
603                                                // Log that the error is being sent to the error channel.
604                                                trace!("sending connection error to error channel");
605                                                // Send the error via the oneshot channel, ignoring send failures
606                                                // (e.g., if the receiver is dropped, which is handled later).
607                                                let _ =err_tx.send(e);
608                                            })
609                                            .map(|_| ()),
610                                    );
611                                    // Log that the client is waiting for the connection to be ready.
612                                    // Readiness indicates the sender (tx) can accept a request without blocking.
613                                    trace!("waiting for connection to be ready");
614                                    // Check if the sender is ready to accept a request.
615                                    // This ensures the connection is fully established before proceeding.
616                                    // aka:
617                                    // Wait for 'conn' to ready up before we
618                                    // declare this tx as usable
619                                    match tx.ready().await {
620                                        // If ready, the connection is usable for sending requests.
621                                        Ok(_) => {
622                                            // Log that the connection is ready for use.
623                                            trace!("connection is ready");
624                                            // Drop the error receiver, as it’s no longer needed since the sender is ready.
625                                            // This prevents waiting for errors that won’t occur in a successful case.
626                                            drop(err_rx);
627                                            // Wrap the sender in PoolTx::Http1 for use in the connection pool.
628                                            PoolTx::Http1(tx)
629                                        }
630                                        // If the sender fails with a closed channel error, check for a specific connection error.
631                                        // This distinguishes between a vague ChannelClosed error and an actual connection failure.
632                                        Err(e) if e.is_closed() => {
633                                            // Log that the channel is closed, indicating a potential connection issue.
634                                            trace!("connection channel closed, checking for connection error");
635                                            // Check the oneshot channel for a specific error from the connection task.
636                                            match err_rx.await {
637                                                // If an error was received, it’s a specific connection failure.
638                                                Ok(err) => {
639                                                     // Log the specific connection error for diagnostics.
640                                                    trace!("received connection error: {:?}", err);
641                                                    // Return the error wrapped in Error::tx to propagate it.
642                                                    return Err(crate::client::legacy::client::Error::tx(err));
643                                                }
644                                                // If the error channel is closed, no specific error was sent.
645                                                // Fall back to the vague ChannelClosed error.
646                                                Err(_) => {
647                                                    // Log that the error channel is closed, indicating no specific error.
648                                                    trace!("error channel closed, returning the vague ChannelClosed error");
649                                                    // Return the original error wrapped in Error::tx.
650                                                    return Err(crate::client::legacy::client::Error::tx(e));
651                                                }
652                                            }
653                                        }
654                                        // For other errors (e.g., timeout, I/O issues), propagate them directly.
655                                        // These are not ChannelClosed errors and don’t require error channel checks.
656                                        Err(e) => {
657                                            // Log the specific readiness failure for diagnostics.
658                                            trace!("connection readiness failed: {:?}", e);
659                                            // Return the error wrapped in Error::tx to propagate it.
660                                            return Err(crate::client::legacy::client::Error::tx(e));
661                                        }
662                                    }
663                                }
664                                #[cfg(not(feature = "http1"))] {
665                                    panic!("http1 feature is not enabled");
666                                }
667                            };
668
669                            Ok(pool.pooled(
670                                connecting,
671                                PoolClient {
672                                    conn_info: connected,
673                                    tx,
674                                },
675                            ))
676                        }))
677                    }),
678            )
679        })
680    }
681}
682
683impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
684where
685    C: Connect + Clone + Send + Sync + 'static,
686    B: Body + Send + 'static + Unpin,
687    B::Data: Send,
688    B::Error: Into<Box<dyn StdError + Send + Sync>>,
689{
690    type Response = Response<hyper::body::Incoming>;
691    type Error = Error;
692    type Future = ResponseFuture;
693
694    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
695        Poll::Ready(Ok(()))
696    }
697
698    fn call(&mut self, req: Request<B>) -> Self::Future {
699        self.request(req)
700    }
701}
702
703impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
704where
705    C: Connect + Clone + Send + Sync + 'static,
706    B: Body + Send + 'static + Unpin,
707    B::Data: Send,
708    B::Error: Into<Box<dyn StdError + Send + Sync>>,
709{
710    type Response = Response<hyper::body::Incoming>;
711    type Error = Error;
712    type Future = ResponseFuture;
713
714    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
715        Poll::Ready(Ok(()))
716    }
717
718    fn call(&mut self, req: Request<B>) -> Self::Future {
719        self.request(req)
720    }
721}
722
723impl<C: Clone, B> Clone for Client<C, B> {
724    fn clone(&self) -> Client<C, B> {
725        Client {
726            config: self.config,
727            exec: self.exec.clone(),
728            #[cfg(feature = "http1")]
729            h1_builder: self.h1_builder.clone(),
730            #[cfg(feature = "http2")]
731            h2_builder: self.h2_builder.clone(),
732            connector: self.connector.clone(),
733            pool: self.pool.clone(),
734        }
735    }
736}
737
738impl<C, B> fmt::Debug for Client<C, B> {
739    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
740        f.debug_struct("Client").finish()
741    }
742}
743
744// ===== impl ResponseFuture =====
745
746impl ResponseFuture {
747    fn new<F>(value: F) -> Self
748    where
749        F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
750    {
751        Self {
752            inner: SyncWrapper::new(Box::pin(value)),
753        }
754    }
755
756    fn error_version(ver: Version) -> Self {
757        warn!("Request has unsupported version \"{:?}\"", ver);
758        ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion))))
759    }
760}
761
762impl fmt::Debug for ResponseFuture {
763    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
764        f.pad("Future<Response>")
765    }
766}
767
768impl Future for ResponseFuture {
769    type Output = Result<Response<hyper::body::Incoming>, Error>;
770
771    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
772        self.inner.get_mut().as_mut().poll(cx)
773    }
774}
775
776// ===== impl PoolClient =====
777
778// FIXME: allow() required due to `impl Trait` leaking types to this lint
779#[allow(missing_debug_implementations)]
780struct PoolClient<B> {
781    conn_info: Connected,
782    tx: PoolTx<B>,
783}
784
785enum PoolTx<B> {
786    #[cfg(feature = "http1")]
787    Http1(hyper::client::conn::http1::SendRequest<B>),
788    #[cfg(feature = "http2")]
789    Http2(hyper::client::conn::http2::SendRequest<B>),
790}
791
792impl<B> PoolClient<B> {
793    fn poll_ready(
794        &mut self,
795        #[allow(unused_variables)] cx: &mut task::Context<'_>,
796    ) -> Poll<Result<(), Error>> {
797        match self.tx {
798            #[cfg(feature = "http1")]
799            PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
800            #[cfg(feature = "http2")]
801            PoolTx::Http2(_) => Poll::Ready(Ok(())),
802        }
803    }
804
805    fn is_http1(&self) -> bool {
806        !self.is_http2()
807    }
808
809    fn is_http2(&self) -> bool {
810        match self.tx {
811            #[cfg(feature = "http1")]
812            PoolTx::Http1(_) => false,
813            #[cfg(feature = "http2")]
814            PoolTx::Http2(_) => true,
815        }
816    }
817
818    fn is_poisoned(&self) -> bool {
819        self.conn_info.poisoned.poisoned()
820    }
821
822    fn is_ready(&self) -> bool {
823        match self.tx {
824            #[cfg(feature = "http1")]
825            PoolTx::Http1(ref tx) => tx.is_ready(),
826            #[cfg(feature = "http2")]
827            PoolTx::Http2(ref tx) => tx.is_ready(),
828        }
829    }
830}
831
832impl<B: Body + 'static> PoolClient<B> {
833    fn try_send_request(
834        &mut self,
835        req: Request<B>,
836    ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
837    where
838        B: Send,
839    {
840        #[cfg(all(feature = "http1", feature = "http2"))]
841        return match self.tx {
842            #[cfg(feature = "http1")]
843            PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
844            #[cfg(feature = "http2")]
845            PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
846        };
847
848        #[cfg(feature = "http1")]
849        #[cfg(not(feature = "http2"))]
850        return match self.tx {
851            #[cfg(feature = "http1")]
852            PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
853        };
854
855        #[cfg(not(feature = "http1"))]
856        #[cfg(feature = "http2")]
857        return match self.tx {
858            #[cfg(feature = "http2")]
859            PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
860        };
861    }
862}
863
864impl<B> pool::Poolable for PoolClient<B>
865where
866    B: Send + 'static,
867{
868    fn is_open(&self) -> bool {
869        !self.is_poisoned() && self.is_ready()
870    }
871
872    fn reserve(self) -> pool::Reservation<Self> {
873        match self.tx {
874            #[cfg(feature = "http1")]
875            PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
876                conn_info: self.conn_info,
877                tx: PoolTx::Http1(tx),
878            }),
879            #[cfg(feature = "http2")]
880            PoolTx::Http2(tx) => {
881                let b = PoolClient {
882                    conn_info: self.conn_info.clone(),
883                    tx: PoolTx::Http2(tx.clone()),
884                };
885                let a = PoolClient {
886                    conn_info: self.conn_info,
887                    tx: PoolTx::Http2(tx),
888                };
889                pool::Reservation::Shared(a, b)
890            }
891        }
892    }
893
894    fn can_share(&self) -> bool {
895        self.is_http2()
896    }
897}
898
899enum ClientConnectError {
900    Normal(Error),
901    CheckoutIsClosed(pool::Error),
902}
903
904fn origin_form(uri: &mut Uri) {
905    let path = match uri.path_and_query() {
906        Some(path) if path.as_str() != "/" => {
907            let mut parts = ::http::uri::Parts::default();
908            parts.path_and_query = Some(path.clone());
909            Uri::from_parts(parts).expect("path is valid uri")
910        }
911        _none_or_just_slash => {
912            debug_assert!(Uri::default() == "/");
913            Uri::default()
914        }
915    };
916    *uri = path
917}
918
919fn absolute_form(uri: &mut Uri) {
920    debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
921    debug_assert!(
922        uri.authority().is_some(),
923        "absolute_form needs an authority"
924    );
925    // If the URI is to HTTPS, and the connector claimed to be a proxy,
926    // then it *should* have tunneled, and so we don't want to send
927    // absolute-form in that case.
928    if uri.scheme() == Some(&Scheme::HTTPS) {
929        origin_form(uri);
930    }
931}
932
933fn authority_form(uri: &mut Uri) {
934    if let Some(path) = uri.path_and_query() {
935        // `https://hyper.rs` would parse with `/` path, don't
936        // annoy people about that...
937        if path != "/" {
938            warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
939        }
940    }
941    *uri = match uri.authority() {
942        Some(auth) => {
943            let mut parts = ::http::uri::Parts::default();
944            parts.authority = Some(auth.clone());
945            Uri::from_parts(parts).expect("authority is valid")
946        }
947        None => {
948            unreachable!("authority_form with relative uri");
949        }
950    };
951}
952
953fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
954    let uri_clone = uri.clone();
955    match (uri_clone.scheme(), uri_clone.authority()) {
956        (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
957        (None, Some(auth)) if is_http_connect => {
958            let scheme = match auth.port_u16() {
959                Some(443) => {
960                    set_scheme(uri, Scheme::HTTPS);
961                    Scheme::HTTPS
962                }
963                _ => {
964                    set_scheme(uri, Scheme::HTTP);
965                    Scheme::HTTP
966                }
967            };
968            Ok((scheme, auth.clone()))
969        }
970        _ => {
971            debug!("Client requires absolute-form URIs, received: {:?}", uri);
972            Err(e!(UserAbsoluteUriRequired))
973        }
974    }
975}
976
977fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
978    http::uri::Builder::new()
979        .scheme(scheme)
980        .authority(auth)
981        .path_and_query("/")
982        .build()
983        .expect("domain is valid Uri")
984}
985
986fn set_scheme(uri: &mut Uri, scheme: Scheme) {
987    debug_assert!(
988        uri.scheme().is_none(),
989        "set_scheme expects no existing scheme"
990    );
991    let old = std::mem::take(uri);
992    let mut parts: ::http::uri::Parts = old.into();
993    parts.scheme = Some(scheme);
994    parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
995    *uri = Uri::from_parts(parts).expect("scheme is valid");
996}
997
998fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
999    match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
1000        (Some(443), true) => None,
1001        (Some(80), false) => None,
1002        _ => uri.port(),
1003    }
1004}
1005
1006fn is_schema_secure(uri: &Uri) -> bool {
1007    uri.scheme_str()
1008        .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
1009        .unwrap_or_default()
1010}
1011
1012/// A builder to configure a new [`Client`](Client).
1013///
1014/// # Example
1015///
1016/// ```
1017/// # #[cfg(feature = "tokio")]
1018/// # fn run () {
1019/// use std::time::Duration;
1020/// use hyper_util::client::legacy::Client;
1021/// use hyper_util::rt::TokioExecutor;
1022///
1023/// let client = Client::builder(TokioExecutor::new())
1024///     .pool_idle_timeout(Duration::from_secs(30))
1025///     .http2_only(true)
1026///     .build_http();
1027/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1028/// # drop(infer);
1029/// # }
1030/// # fn main() {}
1031/// ```
1032#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
1033#[derive(Clone)]
1034pub struct Builder {
1035    client_config: Config,
1036    exec: Exec,
1037    #[cfg(feature = "http1")]
1038    h1_builder: hyper::client::conn::http1::Builder,
1039    #[cfg(feature = "http2")]
1040    h2_builder: hyper::client::conn::http2::Builder<Exec>,
1041    pool_config: pool::Config,
1042    pool_timer: Option<timer::Timer>,
1043}
1044
1045impl Builder {
1046    /// Construct a new Builder.
1047    pub fn new<E>(executor: E) -> Self
1048    where
1049        E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
1050    {
1051        let exec = Exec::new(executor);
1052        Self {
1053            client_config: Config {
1054                retry_canceled_requests: true,
1055                set_host: true,
1056                ver: Ver::Auto,
1057            },
1058            exec: exec.clone(),
1059            #[cfg(feature = "http1")]
1060            h1_builder: hyper::client::conn::http1::Builder::new(),
1061            #[cfg(feature = "http2")]
1062            h2_builder: hyper::client::conn::http2::Builder::new(exec),
1063            pool_config: pool::Config {
1064                idle_timeout: Some(Duration::from_secs(90)),
1065                max_idle_per_host: usize::MAX,
1066            },
1067            pool_timer: None,
1068        }
1069    }
1070    /// Set an optional timeout for idle sockets being kept-alive.
1071    /// A `Timer` is required for this to take effect. See `Builder::pool_timer`
1072    ///
1073    /// Pass `None` to disable timeout.
1074    ///
1075    /// Default is 90 seconds.
1076    ///
1077    /// # Example
1078    ///
1079    /// ```
1080    /// # #[cfg(feature = "tokio")]
1081    /// # fn run () {
1082    /// use std::time::Duration;
1083    /// use hyper_util::client::legacy::Client;
1084    /// use hyper_util::rt::{TokioExecutor, TokioTimer};
1085    ///
1086    /// let client = Client::builder(TokioExecutor::new())
1087    ///     .pool_idle_timeout(Duration::from_secs(30))
1088    ///     .pool_timer(TokioTimer::new())
1089    ///     .build_http();
1090    ///
1091    /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1092    /// # }
1093    /// # fn main() {}
1094    /// ```
1095    pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
1096    where
1097        D: Into<Option<Duration>>,
1098    {
1099        self.pool_config.idle_timeout = val.into();
1100        self
1101    }
1102
1103    #[doc(hidden)]
1104    #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
1105    pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1106        self.pool_config.max_idle_per_host = max_idle;
1107        self
1108    }
1109
1110    /// Sets the maximum idle connection per host allowed in the pool.
1111    ///
1112    /// Default is `usize::MAX` (no limit).
1113    pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1114        self.pool_config.max_idle_per_host = max_idle;
1115        self
1116    }
1117
1118    // HTTP/1 options
1119
1120    /// Sets the exact size of the read buffer to *always* use.
1121    ///
1122    /// Note that setting this option unsets the `http1_max_buf_size` option.
1123    ///
1124    /// Default is an adaptive read buffer.
1125    #[cfg(feature = "http1")]
1126    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1127    pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1128        self.h1_builder.read_buf_exact_size(Some(sz));
1129        self
1130    }
1131
1132    /// Set the maximum buffer size for the connection.
1133    ///
1134    /// Default is ~400kb.
1135    ///
1136    /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1137    ///
1138    /// # Panics
1139    ///
1140    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1141    #[cfg(feature = "http1")]
1142    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1143    pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1144        self.h1_builder.max_buf_size(max);
1145        self
1146    }
1147
1148    /// Set whether HTTP/1 connections will accept spaces between header names
1149    /// and the colon that follow them in responses.
1150    ///
1151    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1152    /// parsing.
1153    ///
1154    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1155    /// to say about it:
1156    ///
1157    /// > No whitespace is allowed between the header field-name and colon. In
1158    /// > the past, differences in the handling of such whitespace have led to
1159    /// > security vulnerabilities in request routing and response handling. A
1160    /// > server MUST reject any received request message that contains
1161    /// > whitespace between a header field-name and colon with a response code
1162    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1163    /// > response message before forwarding the message downstream.
1164    ///
1165    /// Note that this setting does not affect HTTP/2.
1166    ///
1167    /// Default is false.
1168    ///
1169    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1170    #[cfg(feature = "http1")]
1171    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1172    pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1173        self.h1_builder
1174            .allow_spaces_after_header_name_in_responses(val);
1175        self
1176    }
1177
1178    /// Set whether HTTP/1 connections will accept obsolete line folding for
1179    /// header values.
1180    ///
1181    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1182    /// to say about it:
1183    ///
1184    /// > A server that receives an obs-fold in a request message that is not
1185    /// > within a message/http container MUST either reject the message by
1186    /// > sending a 400 (Bad Request), preferably with a representation
1187    /// > explaining that obsolete line folding is unacceptable, or replace
1188    /// > each received obs-fold with one or more SP octets prior to
1189    /// > interpreting the field value or forwarding the message downstream.
1190    ///
1191    /// > A proxy or gateway that receives an obs-fold in a response message
1192    /// > that is not within a message/http container MUST either discard the
1193    /// > message and replace it with a 502 (Bad Gateway) response, preferably
1194    /// > with a representation explaining that unacceptable line folding was
1195    /// > received, or replace each received obs-fold with one or more SP
1196    /// > octets prior to interpreting the field value or forwarding the
1197    /// > message downstream.
1198    ///
1199    /// > A user agent that receives an obs-fold in a response message that is
1200    /// > not within a message/http container MUST replace each received
1201    /// > obs-fold with one or more SP octets prior to interpreting the field
1202    /// > value.
1203    ///
1204    /// Note that this setting does not affect HTTP/2.
1205    ///
1206    /// Default is false.
1207    ///
1208    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1209    #[cfg(feature = "http1")]
1210    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1211    pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1212        self.h1_builder
1213            .allow_obsolete_multiline_headers_in_responses(val);
1214        self
1215    }
1216
1217    /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1218    ///
1219    /// This mimics the behaviour of major browsers. You probably don't want this.
1220    /// You should only want this if you are implementing a proxy whose main
1221    /// purpose is to sit in front of browsers whose users access arbitrary content
1222    /// which may be malformed, and they expect everything that works without
1223    /// the proxy to keep working with the proxy.
1224    ///
1225    /// This option will prevent Hyper's client from returning an error encountered
1226    /// when parsing a header, except if the error was caused by the character NUL
1227    /// (ASCII code 0), as Chrome specifically always reject those.
1228    ///
1229    /// The ignorable errors are:
1230    /// * empty header names;
1231    /// * characters that are not allowed in header names, except for `\0` and `\r`;
1232    /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1233    ///   spaces and tabs between the header name and the colon;
1234    /// * missing colon between header name and colon;
1235    /// * characters that are not allowed in header values except for `\0` and `\r`.
1236    ///
1237    /// If an ignorable error is encountered, the parser tries to find the next
1238    /// line in the input to resume parsing the rest of the headers. An error
1239    /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1240    /// looking for the next line.
1241    #[cfg(feature = "http1")]
1242    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1243    pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1244        self.h1_builder.ignore_invalid_headers_in_responses(val);
1245        self
1246    }
1247
1248    /// Set whether HTTP/1 connections should try to use vectored writes,
1249    /// or always flatten into a single buffer.
1250    ///
1251    /// Note that setting this to false may mean more copies of body data,
1252    /// but may also improve performance when an IO transport doesn't
1253    /// support vectored writes well, such as most TLS implementations.
1254    ///
1255    /// Setting this to true will force hyper to use queued strategy
1256    /// which may eliminate unnecessary cloning on some TLS backends
1257    ///
1258    /// Default is `auto`. In this mode hyper will try to guess which
1259    /// mode to use
1260    #[cfg(feature = "http1")]
1261    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1262    pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1263        self.h1_builder.writev(enabled);
1264        self
1265    }
1266
1267    /// Set whether HTTP/1 connections will write header names as title case at
1268    /// the socket level.
1269    ///
1270    /// Note that this setting does not affect HTTP/2.
1271    ///
1272    /// Default is false.
1273    #[cfg(feature = "http1")]
1274    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1275    pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1276        self.h1_builder.title_case_headers(val);
1277        self
1278    }
1279
1280    /// Set whether to support preserving original header cases.
1281    ///
1282    /// Currently, this will record the original cases received, and store them
1283    /// in a private extension on the `Response`. It will also look for and use
1284    /// such an extension in any provided `Request`.
1285    ///
1286    /// Since the relevant extension is still private, there is no way to
1287    /// interact with the original cases. The only effect this can have now is
1288    /// to forward the cases in a proxy-like fashion.
1289    ///
1290    /// Note that this setting does not affect HTTP/2.
1291    ///
1292    /// Default is false.
1293    #[cfg(feature = "http1")]
1294    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1295    pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1296        self.h1_builder.preserve_header_case(val);
1297        self
1298    }
1299
1300    /// Set the maximum number of headers.
1301    ///
1302    /// When a response is received, the parser will reserve a buffer to store headers for optimal
1303    /// performance.
1304    ///
1305    /// If client receives more headers than the buffer size, the error "message header too large"
1306    /// is returned.
1307    ///
1308    /// The headers is allocated on the stack by default, which has higher performance. After
1309    /// setting this value, headers will be allocated in heap memory, that is, heap memory
1310    /// allocation will occur for each response, and there will be a performance drop of about 5%.
1311    ///
1312    /// Note that this setting does not affect HTTP/2.
1313    ///
1314    /// Default is 100.
1315    #[cfg(feature = "http1")]
1316    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1317    pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
1318        self.h1_builder.max_headers(val);
1319        self
1320    }
1321
1322    /// Set whether HTTP/0.9 responses should be tolerated.
1323    ///
1324    /// Default is false.
1325    #[cfg(feature = "http1")]
1326    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1327    pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1328        self.h1_builder.http09_responses(val);
1329        self
1330    }
1331
1332    /// Set whether the connection **must** use HTTP/2.
1333    ///
1334    /// The destination must either allow HTTP2 Prior Knowledge, or the
1335    /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1336    /// as part of the connection process. This will not make the `Client`
1337    /// utilize ALPN by itself.
1338    ///
1339    /// Note that setting this to true prevents HTTP/1 from being allowed.
1340    ///
1341    /// Default is false.
1342    #[cfg(feature = "http2")]
1343    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1344    pub fn http2_only(&mut self, val: bool) -> &mut Self {
1345        self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1346        self
1347    }
1348
1349    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
1350    ///
1351    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
1352    /// As of v0.4.0, it is 20.
1353    ///
1354    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
1355    #[cfg(feature = "http2")]
1356    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1357    pub fn http2_max_pending_accept_reset_streams(
1358        &mut self,
1359        max: impl Into<Option<usize>>,
1360    ) -> &mut Self {
1361        self.h2_builder.max_pending_accept_reset_streams(max.into());
1362        self
1363    }
1364
1365    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1366    /// stream-level flow control.
1367    ///
1368    /// Passing `None` will do nothing.
1369    ///
1370    /// If not set, hyper will use a default.
1371    ///
1372    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1373    #[cfg(feature = "http2")]
1374    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1375    pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1376        self.h2_builder.initial_stream_window_size(sz.into());
1377        self
1378    }
1379
1380    /// Sets the max connection-level flow control for HTTP2
1381    ///
1382    /// Passing `None` will do nothing.
1383    ///
1384    /// If not set, hyper will use a default.
1385    #[cfg(feature = "http2")]
1386    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1387    pub fn http2_initial_connection_window_size(
1388        &mut self,
1389        sz: impl Into<Option<u32>>,
1390    ) -> &mut Self {
1391        self.h2_builder.initial_connection_window_size(sz.into());
1392        self
1393    }
1394
1395    /// Sets the initial maximum of locally initiated (send) streams.
1396    ///
1397    /// This value will be overwritten by the value included in the initial
1398    /// SETTINGS frame received from the peer as part of a [connection preface].
1399    ///
1400    /// Passing `None` will do nothing.
1401    ///
1402    /// If not set, hyper will use a default.
1403    ///
1404    /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
1405    #[cfg(feature = "http2")]
1406    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1407    pub fn http2_initial_max_send_streams(
1408        &mut self,
1409        initial: impl Into<Option<usize>>,
1410    ) -> &mut Self {
1411        self.h2_builder.initial_max_send_streams(initial);
1412        self
1413    }
1414
1415    /// Sets whether to use an adaptive flow control.
1416    ///
1417    /// Enabling this will override the limits set in
1418    /// `http2_initial_stream_window_size` and
1419    /// `http2_initial_connection_window_size`.
1420    #[cfg(feature = "http2")]
1421    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1422    pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1423        self.h2_builder.adaptive_window(enabled);
1424        self
1425    }
1426
1427    /// Sets the maximum frame size to use for HTTP2.
1428    ///
1429    /// Passing `None` will do nothing.
1430    ///
1431    /// If not set, hyper will use a default.
1432    #[cfg(feature = "http2")]
1433    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1434    pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1435        self.h2_builder.max_frame_size(sz);
1436        self
1437    }
1438
1439    /// Sets the max size of received header frames for HTTP2.
1440    ///
1441    /// Default is currently 16KB, but can change.
1442    #[cfg(feature = "http2")]
1443    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1444    pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
1445        self.h2_builder.max_header_list_size(max);
1446        self
1447    }
1448
1449    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1450    /// connection alive.
1451    ///
1452    /// Pass `None` to disable HTTP2 keep-alive.
1453    ///
1454    /// Default is currently disabled.
1455    ///
1456    /// # Cargo Feature
1457    ///
1458    /// Requires the `tokio` cargo feature to be enabled.
1459    #[cfg(feature = "tokio")]
1460    #[cfg(feature = "http2")]
1461    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1462    pub fn http2_keep_alive_interval(
1463        &mut self,
1464        interval: impl Into<Option<Duration>>,
1465    ) -> &mut Self {
1466        self.h2_builder.keep_alive_interval(interval);
1467        self
1468    }
1469
1470    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1471    ///
1472    /// If the ping is not acknowledged within the timeout, the connection will
1473    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1474    ///
1475    /// Default is 20 seconds.
1476    ///
1477    /// # Cargo Feature
1478    ///
1479    /// Requires the `tokio` cargo feature to be enabled.
1480    #[cfg(feature = "tokio")]
1481    #[cfg(feature = "http2")]
1482    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1483    pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1484        self.h2_builder.keep_alive_timeout(timeout);
1485        self
1486    }
1487
1488    /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1489    ///
1490    /// If disabled, keep-alive pings are only sent while there are open
1491    /// request/responses streams. If enabled, pings are also sent when no
1492    /// streams are active. Does nothing if `http2_keep_alive_interval` is
1493    /// disabled.
1494    ///
1495    /// Default is `false`.
1496    ///
1497    /// # Cargo Feature
1498    ///
1499    /// Requires the `tokio` cargo feature to be enabled.
1500    #[cfg(feature = "tokio")]
1501    #[cfg(feature = "http2")]
1502    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1503    pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1504        self.h2_builder.keep_alive_while_idle(enabled);
1505        self
1506    }
1507
1508    /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1509    ///
1510    /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1511    /// details.
1512    ///
1513    /// The default value is determined by the `h2` crate.
1514    ///
1515    /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1516    #[cfg(feature = "http2")]
1517    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1518    pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1519        self.h2_builder.max_concurrent_reset_streams(max);
1520        self
1521    }
1522
1523    /// Provide a timer to be used for h2
1524    ///
1525    /// See the documentation of [`h2::client::Builder::timer`] for more
1526    /// details.
1527    ///
1528    /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
1529    pub fn timer<M>(&mut self, timer: M) -> &mut Self
1530    where
1531        M: Timer + Send + Sync + 'static,
1532    {
1533        #[cfg(feature = "http2")]
1534        self.h2_builder.timer(timer);
1535        self
1536    }
1537
1538    /// Provide a timer to be used for timeouts and intervals in connection pools.
1539    pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
1540    where
1541        M: Timer + Clone + Send + Sync + 'static,
1542    {
1543        self.pool_timer = Some(timer::Timer::new(timer.clone()));
1544        self
1545    }
1546
1547    /// Set the maximum write buffer size for each HTTP/2 stream.
1548    ///
1549    /// Default is currently 1MB, but may change.
1550    ///
1551    /// # Panics
1552    ///
1553    /// The value must be no larger than `u32::MAX`.
1554    #[cfg(feature = "http2")]
1555    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1556    pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1557        self.h2_builder.max_send_buf_size(max);
1558        self
1559    }
1560
1561    /// Set whether to retry requests that get disrupted before ever starting
1562    /// to write.
1563    ///
1564    /// This means a request that is queued, and gets given an idle, reused
1565    /// connection, and then encounters an error immediately as the idle
1566    /// connection was found to be unusable.
1567    ///
1568    /// When this is set to `false`, the related `ResponseFuture` would instead
1569    /// resolve to an `Error::Cancel`.
1570    ///
1571    /// Default is `true`.
1572    #[inline]
1573    pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1574        self.client_config.retry_canceled_requests = val;
1575        self
1576    }
1577
1578    /// Set whether to automatically add the `Host` header to requests.
1579    ///
1580    /// If true, and a request does not include a `Host` header, one will be
1581    /// added automatically, derived from the authority of the `Uri`.
1582    ///
1583    /// Default is `true`.
1584    #[inline]
1585    pub fn set_host(&mut self, val: bool) -> &mut Self {
1586        self.client_config.set_host = val;
1587        self
1588    }
1589
1590    /// Build a client with this configuration and the default `HttpConnector`.
1591    #[cfg(feature = "tokio")]
1592    pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1593    where
1594        B: Body + Send,
1595        B::Data: Send,
1596    {
1597        let mut connector = HttpConnector::new();
1598        if self.pool_config.is_enabled() {
1599            connector.set_keepalive(self.pool_config.idle_timeout);
1600        }
1601        self.build(connector)
1602    }
1603
1604    /// Combine the configuration of this builder with a connector to create a `Client`.
1605    pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1606    where
1607        C: Connect + Clone,
1608        B: Body + Send,
1609        B::Data: Send,
1610    {
1611        let exec = self.exec.clone();
1612        let timer = self.pool_timer.clone();
1613        Client {
1614            config: self.client_config,
1615            exec: exec.clone(),
1616            #[cfg(feature = "http1")]
1617            h1_builder: self.h1_builder.clone(),
1618            #[cfg(feature = "http2")]
1619            h2_builder: self.h2_builder.clone(),
1620            connector,
1621            pool: pool::Pool::new(self.pool_config, exec, timer),
1622        }
1623    }
1624}
1625
1626impl fmt::Debug for Builder {
1627    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1628        f.debug_struct("Builder")
1629            .field("client_config", &self.client_config)
1630            .field("pool_config", &self.pool_config)
1631            .finish()
1632    }
1633}
1634
1635// ==== impl Error ====
1636
1637impl fmt::Debug for Error {
1638    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1639        let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
1640        f.field(&self.kind);
1641        if let Some(ref cause) = self.source {
1642            f.field(cause);
1643        }
1644        f.finish()
1645    }
1646}
1647
1648impl fmt::Display for Error {
1649    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1650        write!(f, "client error ({:?})", self.kind)
1651    }
1652}
1653
1654impl StdError for Error {
1655    fn source(&self) -> Option<&(dyn StdError + 'static)> {
1656        self.source.as_ref().map(|e| &**e as _)
1657    }
1658}
1659
1660impl Error {
1661    /// Returns true if this was an error from `Connect`.
1662    pub fn is_connect(&self) -> bool {
1663        matches!(self.kind, ErrorKind::Connect)
1664    }
1665
1666    /// Returns the info of the client connection on which this error occurred.
1667    #[cfg(any(feature = "http1", feature = "http2"))]
1668    pub fn connect_info(&self) -> Option<&Connected> {
1669        self.connect_info.as_ref()
1670    }
1671
1672    #[cfg(any(feature = "http1", feature = "http2"))]
1673    fn with_connect_info(self, connect_info: Connected) -> Self {
1674        Self {
1675            connect_info: Some(connect_info),
1676            ..self
1677        }
1678    }
1679    fn is_canceled(&self) -> bool {
1680        matches!(self.kind, ErrorKind::Canceled)
1681    }
1682
1683    fn tx(src: hyper::Error) -> Self {
1684        e!(SendRequest, src)
1685    }
1686
1687    fn closed(src: hyper::Error) -> Self {
1688        e!(ChannelClosed, src)
1689    }
1690}