miku_hyper_util/client/legacy/
client.rs

1//! The legacy HTTP Client from 0.14.x
2//!
3//! This `Client` will eventually be deconstructed into more composable parts.
4//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
5//! in much the same way it did in hyper 0.14.
6
7use std::error::Error as StdError;
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{self, Poll};
12use std::time::Duration;
13
14use futures_util::future::{self, Either, FutureExt, TryFutureExt};
15use http::uri::Scheme;
16use hyper::client::conn::TrySendError as ConnTrySendError;
17#[cfg(feature = "http2")]
18use hyper::ext::{FramePriority, FrameStreamDependency, PseudoType};
19use hyper::header::{HeaderValue, HOST};
20use hyper::rt::Timer;
21use hyper::{body::Body, Method, Request, Response, Uri, Version};
22use tracing::{debug, trace, warn};
23
24use super::connect::capture::CaptureConnectionExtension;
25#[cfg(feature = "tokio")]
26use super::connect::HttpConnector;
27use super::connect::{Alpn, Connect, Connected, Connection};
28use super::pool::{self, Ver};
29
30use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
31
32type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
33
34/// A Client to make outgoing HTTP requests.
35///
36/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
37/// underlying connection pool will be reused.
38#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
39pub struct Client<C, B> {
40    config: Config,
41    connector: C,
42    exec: Exec,
43    #[cfg(feature = "http1")]
44    h1_builder: hyper::client::conn::http1::Builder,
45    #[cfg(feature = "http2")]
46    h2_builder: hyper::client::conn::http2::Builder<Exec>,
47    pool: pool::Pool<PoolClient<B>, PoolKey>,
48}
49
50#[derive(Clone, Copy, Debug)]
51struct Config {
52    retry_canceled_requests: bool,
53    set_host: bool,
54    ver: Ver,
55}
56
57/// Client errors
58pub struct Error {
59    kind: ErrorKind,
60    source: Option<Box<dyn StdError + Send + Sync>>,
61    #[cfg(any(feature = "http1", feature = "http2"))]
62    connect_info: Option<Connected>,
63}
64
65#[derive(Debug)]
66enum ErrorKind {
67    Canceled,
68    ChannelClosed,
69    Connect,
70    UserUnsupportedRequestMethod,
71    UserUnsupportedVersion,
72    UserAbsoluteUriRequired,
73    SendRequest,
74}
75
76macro_rules! e {
77    ($kind:ident) => {
78        Error {
79            kind: ErrorKind::$kind,
80            source: None,
81            connect_info: None,
82        }
83    };
84    ($kind:ident, $src:expr) => {
85        Error {
86            kind: ErrorKind::$kind,
87            source: Some($src.into()),
88            connect_info: None,
89        }
90    };
91}
92
93// We might change this... :shrug:
94type PoolKey = (http::uri::Scheme, http::uri::Authority);
95
96enum TrySendError<B> {
97    Retryable {
98        error: Error,
99        req: Request<B>,
100        connection_reused: bool,
101    },
102    Nope(Error),
103}
104
105/// A `Future` that will resolve to an HTTP Response.
106///
107/// This is returned by `Client::request` (and `Client::get`).
108#[must_use = "futures do nothing unless polled"]
109pub struct ResponseFuture {
110    inner: SyncWrapper<
111        Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
112    >,
113}
114
115// ===== impl Client =====
116
117impl Client<(), ()> {
118    /// Create a builder to configure a new `Client`.
119    ///
120    /// # Example
121    ///
122    /// ```
123    /// # #[cfg(feature = "tokio")]
124    /// # fn run () {
125    /// use std::time::Duration;
126    /// use miku_hyper_util::client::legacy::Client;
127    /// use miku_hyper_util::rt::TokioExecutor;
128    ///
129    /// let client = Client::builder(TokioExecutor::new())
130    ///     .pool_idle_timeout(Duration::from_secs(30))
131    ///     .http2_only(true)
132    ///     .build_http();
133    /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
134    /// # drop(infer);
135    /// # }
136    /// # fn main() {}
137    /// ```
138    pub fn builder<E>(executor: E) -> Builder
139    where
140        E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
141    {
142        Builder::new(executor)
143    }
144}
145
146impl<C, B> Client<C, B>
147where
148    C: Connect + Clone + Send + Sync + 'static,
149    B: Body + Send + 'static + Unpin,
150    B::Data: Send,
151    B::Error: Into<Box<dyn StdError + Send + Sync>>,
152{
153    /// Send a `GET` request to the supplied `Uri`.
154    ///
155    /// # Note
156    ///
157    /// This requires that the `Body` type have a `Default` implementation.
158    /// It *should* return an "empty" version of itself, such that
159    /// `Body::is_end_stream` is `true`.
160    ///
161    /// # Example
162    ///
163    /// ```
164    /// # #[cfg(feature = "tokio")]
165    /// # fn run () {
166    /// use hyper::Uri;
167    /// use miku_hyper_util::client::legacy::Client;
168    /// use miku_hyper_util::rt::TokioExecutor;
169    /// use bytes::Bytes;
170    /// use http_body_util::Full;
171    ///
172    /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
173    ///
174    /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
175    /// # }
176    /// # fn main() {}
177    /// ```
178    pub fn get(&self, uri: Uri) -> ResponseFuture
179    where
180        B: Default,
181    {
182        let body = B::default();
183        if !body.is_end_stream() {
184            warn!("default Body used for get() does not return true for is_end_stream");
185        }
186
187        let mut req = Request::new(body);
188        *req.uri_mut() = uri;
189        self.request(req)
190    }
191
192    /// Send a constructed `Request` using this `Client`.
193    ///
194    /// # Example
195    ///
196    /// ```
197    /// # #[cfg(feature = "tokio")]
198    /// # fn run () {
199    /// use hyper::{Method, Request};
200    /// use miku_hyper_util::client::legacy::Client;
201    /// use http_body_util::Full;
202    /// use miku_hyper_util::rt::TokioExecutor;
203    /// use bytes::Bytes;
204    ///
205    /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
206    ///
207    /// let req: Request<Full<Bytes>> = Request::builder()
208    ///     .method(Method::POST)
209    ///     .uri("http://httpbin.org/post")
210    ///     .body(Full::from("Hallo!"))
211    ///     .expect("request builder");
212    ///
213    /// let future = client.request(req);
214    /// # }
215    /// # fn main() {}
216    /// ```
217    pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
218        let is_http_connect = req.method() == Method::CONNECT;
219        match req.version() {
220            Version::HTTP_11 => (),
221            Version::HTTP_10 => {
222                if is_http_connect {
223                    warn!("CONNECT is not allowed for HTTP/1.0");
224                    return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod)));
225                }
226            }
227            Version::HTTP_2 => (),
228            // completely unsupported HTTP version (like HTTP/0.9)!
229            other => return ResponseFuture::error_version(other),
230        };
231
232        let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
233            Ok(s) => s,
234            Err(err) => {
235                return ResponseFuture::new(future::err(err));
236            }
237        };
238
239        ResponseFuture::new(self.clone().send_request(req, pool_key))
240    }
241
242    async fn send_request(
243        self,
244        mut req: Request<B>,
245        pool_key: PoolKey,
246    ) -> Result<Response<hyper::body::Incoming>, Error> {
247        let uri = req.uri().clone();
248
249        loop {
250            req = match self.try_send_request(req, pool_key.clone()).await {
251                Ok(resp) => return Ok(resp),
252                Err(TrySendError::Nope(err)) => return Err(err),
253                Err(TrySendError::Retryable {
254                    mut req,
255                    error,
256                    connection_reused,
257                }) => {
258                    if !self.config.retry_canceled_requests || !connection_reused {
259                        // if client disabled, don't retry
260                        // a fresh connection means we definitely can't retry
261                        return Err(error);
262                    }
263
264                    trace!(
265                        "unstarted request canceled, trying again (reason={:?})",
266                        error
267                    );
268                    *req.uri_mut() = uri.clone();
269                    req
270                }
271            }
272        }
273    }
274
275    async fn try_send_request(
276        &self,
277        mut req: Request<B>,
278        pool_key: PoolKey,
279    ) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
280        let mut pooled = self
281            .connection_for(pool_key)
282            .await
283            // `connection_for` already retries checkout errors, so if
284            // it returns an error, there's not much else to retry
285            .map_err(TrySendError::Nope)?;
286
287        req.extensions_mut()
288            .get_mut::<CaptureConnectionExtension>()
289            .map(|conn| conn.set(&pooled.conn_info));
290
291        if pooled.is_http1() {
292            if req.version() == Version::HTTP_2 {
293                warn!("Connection is HTTP/1, but request requires HTTP/2");
294                return Err(TrySendError::Nope(
295                    e!(UserUnsupportedVersion).with_connect_info(pooled.conn_info.clone()),
296                ));
297            }
298
299            if self.config.set_host {
300                let uri = req.uri().clone();
301                req.headers_mut().entry(HOST).or_insert_with(|| {
302                    let hostname = uri.host().expect("authority implies host");
303                    if let Some(port) = get_non_default_port(&uri) {
304                        let s = format!("{}:{}", hostname, port);
305                        HeaderValue::from_str(&s)
306                    } else {
307                        HeaderValue::from_str(hostname)
308                    }
309                    .expect("uri host is valid header value")
310                });
311            }
312
313            // CONNECT always sends authority-form, so check it first...
314            if req.method() == Method::CONNECT {
315                authority_form(req.uri_mut());
316            } else if pooled.conn_info.is_proxied {
317                absolute_form(req.uri_mut());
318            } else {
319                origin_form(req.uri_mut());
320            }
321        } else if req.method() == Method::CONNECT {
322            authority_form(req.uri_mut());
323        }
324
325        let mut res = match pooled.try_send_request(req).await {
326            Ok(res) => res,
327            Err(mut err) => {
328                return if let Some(req) = err.take_message() {
329                    Err(TrySendError::Retryable {
330                        connection_reused: pooled.is_reused(),
331                        error: e!(Canceled, err.into_error())
332                            .with_connect_info(pooled.conn_info.clone()),
333                        req,
334                    })
335                } else {
336                    Err(TrySendError::Nope(
337                        e!(SendRequest, err.into_error())
338                            .with_connect_info(pooled.conn_info.clone()),
339                    ))
340                }
341            }
342        };
343
344        // If the Connector included 'extra' info, add to Response...
345        if let Some(extra) = &pooled.conn_info.extra {
346            extra.set(res.extensions_mut());
347        }
348
349        // As of futures@0.1.21, there is a race condition in the mpsc
350        // channel, such that sending when the receiver is closing can
351        // result in the message being stuck inside the queue. It won't
352        // ever notify until the Sender side is dropped.
353        //
354        // To counteract this, we must check if our senders 'want' channel
355        // has been closed after having tried to send. If so, error out...
356        if pooled.is_closed() {
357            return Ok(res);
358        }
359
360        // If pooled is HTTP/2, we can toss this reference immediately.
361        //
362        // when pooled is dropped, it will try to insert back into the
363        // pool. To delay that, spawn a future that completes once the
364        // sender is ready again.
365        //
366        // This *should* only be once the related `Connection` has polled
367        // for a new request to start.
368        //
369        // It won't be ready if there is a body to stream.
370        if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
371            drop(pooled);
372        } else if !res.body().is_end_stream() {
373            //let (delayed_tx, delayed_rx) = oneshot::channel::<()>();
374            //res.body_mut().delayed_eof(delayed_rx);
375            let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
376                // At this point, `pooled` is dropped, and had a chance
377                // to insert into the pool (if conn was idle)
378                //drop(delayed_tx);
379            });
380
381            self.exec.execute(on_idle);
382        } else {
383            // There's no body to delay, but the connection isn't
384            // ready yet. Only re-insert when it's ready
385            let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
386
387            self.exec.execute(on_idle);
388        }
389
390        Ok(res)
391    }
392
393    async fn connection_for(
394        &self,
395        pool_key: PoolKey,
396    ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
397        loop {
398            match self.one_connection_for(pool_key.clone()).await {
399                Ok(pooled) => return Ok(pooled),
400                Err(ClientConnectError::Normal(err)) => return Err(err),
401                Err(ClientConnectError::CheckoutIsClosed(reason)) => {
402                    if !self.config.retry_canceled_requests {
403                        return Err(e!(Connect, reason));
404                    }
405
406                    trace!(
407                        "unstarted request canceled, trying again (reason={:?})",
408                        reason,
409                    );
410                    continue;
411                }
412            };
413        }
414    }
415
416    async fn one_connection_for(
417        &self,
418        pool_key: PoolKey,
419    ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
420        // Return a single connection if pooling is not enabled
421        if !self.pool.is_enabled() {
422            return self
423                .connect_to(pool_key)
424                .await
425                .map_err(ClientConnectError::Normal);
426        }
427
428        // This actually races 2 different futures to try to get a ready
429        // connection the fastest, and to reduce connection churn.
430        //
431        // - If the pool has an idle connection waiting, that's used
432        //   immediately.
433        // - Otherwise, the Connector is asked to start connecting to
434        //   the destination Uri.
435        // - Meanwhile, the pool Checkout is watching to see if any other
436        //   request finishes and tries to insert an idle connection.
437        // - If a new connection is started, but the Checkout wins after
438        //   (an idle connection became available first), the started
439        //   connection future is spawned into the runtime to complete,
440        //   and then be inserted into the pool as an idle connection.
441        let checkout = self.pool.checkout(pool_key.clone());
442        let connect = self.connect_to(pool_key);
443        let is_ver_h2 = self.config.ver == Ver::Http2;
444
445        // The order of the `select` is depended on below...
446
447        match future::select(checkout, connect).await {
448            // Checkout won, connect future may have been started or not.
449            //
450            // If it has, let it finish and insert back into the pool,
451            // so as to not waste the socket...
452            Either::Left((Ok(checked_out), connecting)) => {
453                // This depends on the `select` above having the correct
454                // order, such that if the checkout future were ready
455                // immediately, the connect future will never have been
456                // started.
457                //
458                // If it *wasn't* ready yet, then the connect future will
459                // have been started...
460                if connecting.started() {
461                    let bg = connecting
462                        .map_err(|err| {
463                            trace!("background connect error: {}", err);
464                        })
465                        .map(|_pooled| {
466                            // dropping here should just place it in
467                            // the Pool for us...
468                        });
469                    // An execute error here isn't important, we're just trying
470                    // to prevent a waste of a socket...
471                    self.exec.execute(bg);
472                }
473                Ok(checked_out)
474            }
475            // Connect won, checkout can just be dropped.
476            Either::Right((Ok(connected), _checkout)) => Ok(connected),
477            // Either checkout or connect could get canceled:
478            //
479            // 1. Connect is canceled if this is HTTP/2 and there is
480            //    an outstanding HTTP/2 connecting task.
481            // 2. Checkout is canceled if the pool cannot deliver an
482            //    idle connection reliably.
483            //
484            // In both cases, we should just wait for the other future.
485            Either::Left((Err(err), connecting)) => {
486                if err.is_canceled() {
487                    connecting.await.map_err(ClientConnectError::Normal)
488                } else {
489                    Err(ClientConnectError::Normal(e!(Connect, err)))
490                }
491            }
492            Either::Right((Err(err), checkout)) => {
493                if err.is_canceled() {
494                    checkout.await.map_err(move |err| {
495                        if is_ver_h2 && err.is_canceled() {
496                            ClientConnectError::CheckoutIsClosed(err)
497                        } else {
498                            ClientConnectError::Normal(e!(Connect, err))
499                        }
500                    })
501                } else {
502                    Err(ClientConnectError::Normal(err))
503                }
504            }
505        }
506    }
507
508    #[cfg(any(feature = "http1", feature = "http2"))]
509    fn connect_to(
510        &self,
511        pool_key: PoolKey,
512    ) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
513    {
514        let executor = self.exec.clone();
515        let pool = self.pool.clone();
516        #[cfg(feature = "http1")]
517        let h1_builder = self.h1_builder.clone();
518        #[cfg(feature = "http2")]
519        let h2_builder = self.h2_builder.clone();
520        let ver = self.config.ver;
521        let is_ver_h2 = ver == Ver::Http2;
522        let connector = self.connector.clone();
523        let dst = domain_as_uri(pool_key.clone());
524        hyper_lazy(move || {
525            // Try to take a "connecting lock".
526            //
527            // If the pool_key is for HTTP/2, and there is already a
528            // connection being established, then this can't take a
529            // second lock. The "connect_to" future is Canceled.
530            let connecting = match pool.connecting(&pool_key, ver) {
531                Some(lock) => lock,
532                None => {
533                    let canceled = e!(Canceled);
534                    // TODO
535                    //crate::Error::new_canceled().with("HTTP/2 connection in progress");
536                    return Either::Right(future::err(canceled));
537                }
538            };
539            Either::Left(
540                connector
541                    .connect(super::connect::sealed::Internal, dst)
542                    .map_err(|src| e!(Connect, src))
543                    .and_then(move |io| {
544                        let connected = io.connected();
545                        // If ALPN is h2 and we aren't http2_only already,
546                        // then we need to convert our pool checkout into
547                        // a single HTTP2 one.
548                        let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
549                            match connecting.alpn_h2(&pool) {
550                                Some(lock) => {
551                                    trace!("ALPN negotiated h2, updating pool");
552                                    lock
553                                }
554                                None => {
555                                    // Another connection has already upgraded,
556                                    // the pool checkout should finish up for us.
557                                    let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
558                                    return Either::Right(future::err(canceled));
559                                }
560                            }
561                        } else {
562                            connecting
563                        };
564
565                        #[cfg_attr(not(feature = "http2"), allow(unused))]
566                        let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
567
568                        Either::Left(Box::pin(async move {
569                            let tx = if is_h2 {
570                                #[cfg(feature = "http2")] {
571                                    let (mut tx, conn) =
572                                        h2_builder.handshake(io).await.map_err(Error::tx)?;
573
574                                    trace!(
575                                        "http2 handshake complete, spawning background dispatcher task"
576                                    );
577                                    executor.execute(
578                                        conn.map_err(|e| debug!("client connection error: {}", e))
579                                            .map(|_| ()),
580                                    );
581
582                                    // Wait for 'conn' to ready up before we
583                                    // declare this tx as usable
584                                    tx.ready().await.map_err(Error::tx)?;
585                                    PoolTx::Http2(tx)
586                                }
587                                #[cfg(not(feature = "http2"))]
588                                panic!("http2 feature is not enabled");
589                            } else {
590                                #[cfg(feature = "http1")] {
591                                    let (mut tx, conn) =
592                                        h1_builder.handshake(io).await.map_err(Error::tx)?;
593
594                                    trace!(
595                                        "http1 handshake complete, spawning background dispatcher task"
596                                    );
597                                    executor.execute(
598                                        conn.with_upgrades()
599                                            .map_err(|e| debug!("client connection error: {}", e))
600                                            .map(|_| ()),
601                                    );
602
603                                    // Wait for 'conn' to ready up before we
604                                    // declare this tx as usable
605                                    tx.ready().await.map_err(Error::tx)?;
606                                    PoolTx::Http1(tx)
607                                }
608                                #[cfg(not(feature = "http1"))] {
609                                    panic!("http1 feature is not enabled");
610                                }
611                            };
612
613                            Ok(pool.pooled(
614                                connecting,
615                                PoolClient {
616                                    conn_info: connected,
617                                    tx,
618                                },
619                            ))
620                        }))
621                    }),
622            )
623        })
624    }
625}
626
627impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
628where
629    C: Connect + Clone + Send + Sync + 'static,
630    B: Body + Send + 'static + Unpin,
631    B::Data: Send,
632    B::Error: Into<Box<dyn StdError + Send + Sync>>,
633{
634    type Response = Response<hyper::body::Incoming>;
635    type Error = Error;
636    type Future = ResponseFuture;
637
638    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
639        Poll::Ready(Ok(()))
640    }
641
642    fn call(&mut self, req: Request<B>) -> Self::Future {
643        self.request(req)
644    }
645}
646
647impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
648where
649    C: Connect + Clone + Send + Sync + 'static,
650    B: Body + Send + 'static + Unpin,
651    B::Data: Send,
652    B::Error: Into<Box<dyn StdError + Send + Sync>>,
653{
654    type Response = Response<hyper::body::Incoming>;
655    type Error = Error;
656    type Future = ResponseFuture;
657
658    fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
659        Poll::Ready(Ok(()))
660    }
661
662    fn call(&mut self, req: Request<B>) -> Self::Future {
663        self.request(req)
664    }
665}
666
667impl<C: Clone, B> Clone for Client<C, B> {
668    fn clone(&self) -> Client<C, B> {
669        Client {
670            config: self.config,
671            exec: self.exec.clone(),
672            #[cfg(feature = "http1")]
673            h1_builder: self.h1_builder.clone(),
674            #[cfg(feature = "http2")]
675            h2_builder: self.h2_builder.clone(),
676            connector: self.connector.clone(),
677            pool: self.pool.clone(),
678        }
679    }
680}
681
682impl<C, B> fmt::Debug for Client<C, B> {
683    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684        f.debug_struct("Client").finish()
685    }
686}
687
688// ===== impl ResponseFuture =====
689
690impl ResponseFuture {
691    fn new<F>(value: F) -> Self
692    where
693        F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
694    {
695        Self {
696            inner: SyncWrapper::new(Box::pin(value)),
697        }
698    }
699
700    fn error_version(ver: Version) -> Self {
701        warn!("Request has unsupported version \"{:?}\"", ver);
702        ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion))))
703    }
704}
705
706impl fmt::Debug for ResponseFuture {
707    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
708        f.pad("Future<Response>")
709    }
710}
711
712impl Future for ResponseFuture {
713    type Output = Result<Response<hyper::body::Incoming>, Error>;
714
715    fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
716        self.inner.get_mut().as_mut().poll(cx)
717    }
718}
719
720// ===== impl PoolClient =====
721
722// FIXME: allow() required due to `impl Trait` leaking types to this lint
723#[allow(missing_debug_implementations)]
724struct PoolClient<B> {
725    conn_info: Connected,
726    tx: PoolTx<B>,
727}
728
729enum PoolTx<B> {
730    #[cfg(feature = "http1")]
731    Http1(hyper::client::conn::http1::SendRequest<B>),
732    #[cfg(feature = "http2")]
733    Http2(hyper::client::conn::http2::SendRequest<B>),
734}
735
736impl<B> PoolClient<B> {
737    fn poll_ready(
738        &mut self,
739        #[allow(unused_variables)] cx: &mut task::Context<'_>,
740    ) -> Poll<Result<(), Error>> {
741        match self.tx {
742            #[cfg(feature = "http1")]
743            PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
744            #[cfg(feature = "http2")]
745            PoolTx::Http2(_) => Poll::Ready(Ok(())),
746        }
747    }
748
749    fn is_http1(&self) -> bool {
750        !self.is_http2()
751    }
752
753    fn is_http2(&self) -> bool {
754        match self.tx {
755            #[cfg(feature = "http1")]
756            PoolTx::Http1(_) => false,
757            #[cfg(feature = "http2")]
758            PoolTx::Http2(_) => true,
759        }
760    }
761
762    fn is_poisoned(&self) -> bool {
763        self.conn_info.poisoned.poisoned()
764    }
765
766    fn is_ready(&self) -> bool {
767        match self.tx {
768            #[cfg(feature = "http1")]
769            PoolTx::Http1(ref tx) => tx.is_ready(),
770            #[cfg(feature = "http2")]
771            PoolTx::Http2(ref tx) => tx.is_ready(),
772        }
773    }
774
775    fn is_closed(&self) -> bool {
776        match self.tx {
777            #[cfg(feature = "http1")]
778            PoolTx::Http1(ref tx) => tx.is_closed(),
779            #[cfg(feature = "http2")]
780            PoolTx::Http2(ref tx) => tx.is_closed(),
781        }
782    }
783}
784
785impl<B: Body + 'static> PoolClient<B> {
786    fn try_send_request(
787        &mut self,
788        req: Request<B>,
789    ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
790    where
791        B: Send,
792    {
793        #[cfg(all(feature = "http1", feature = "http2"))]
794        return match self.tx {
795            #[cfg(feature = "http1")]
796            PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
797            #[cfg(feature = "http2")]
798            PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
799        };
800
801        #[cfg(feature = "http1")]
802        #[cfg(not(feature = "http2"))]
803        return match self.tx {
804            #[cfg(feature = "http1")]
805            PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
806        };
807
808        #[cfg(not(feature = "http1"))]
809        #[cfg(feature = "http2")]
810        return match self.tx {
811            #[cfg(feature = "http2")]
812            PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
813        };
814    }
815}
816
817impl<B> pool::Poolable for PoolClient<B>
818where
819    B: Send + 'static,
820{
821    fn is_open(&self) -> bool {
822        !self.is_poisoned() && self.is_ready()
823    }
824
825    fn reserve(self) -> pool::Reservation<Self> {
826        match self.tx {
827            #[cfg(feature = "http1")]
828            PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
829                conn_info: self.conn_info,
830                tx: PoolTx::Http1(tx),
831            }),
832            #[cfg(feature = "http2")]
833            PoolTx::Http2(tx) => {
834                let b = PoolClient {
835                    conn_info: self.conn_info.clone(),
836                    tx: PoolTx::Http2(tx.clone()),
837                };
838                let a = PoolClient {
839                    conn_info: self.conn_info,
840                    tx: PoolTx::Http2(tx),
841                };
842                pool::Reservation::Shared(a, b)
843            }
844        }
845    }
846
847    fn can_share(&self) -> bool {
848        self.is_http2()
849    }
850}
851
852enum ClientConnectError {
853    Normal(Error),
854    CheckoutIsClosed(pool::Error),
855}
856
857fn origin_form(uri: &mut Uri) {
858    let path = match uri.path_and_query() {
859        Some(path) if path.as_str() != "/" => {
860            let mut parts = ::http::uri::Parts::default();
861            parts.path_and_query = Some(path.clone());
862            Uri::from_parts(parts).expect("path is valid uri")
863        }
864        _none_or_just_slash => {
865            debug_assert!(Uri::default() == "/");
866            Uri::default()
867        }
868    };
869    *uri = path
870}
871
872fn absolute_form(uri: &mut Uri) {
873    debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
874    debug_assert!(
875        uri.authority().is_some(),
876        "absolute_form needs an authority"
877    );
878    // If the URI is to HTTPS, and the connector claimed to be a proxy,
879    // then it *should* have tunneled, and so we don't want to send
880    // absolute-form in that case.
881    if uri.scheme() == Some(&Scheme::HTTPS) {
882        origin_form(uri);
883    }
884}
885
886fn authority_form(uri: &mut Uri) {
887    if let Some(path) = uri.path_and_query() {
888        // `https://hyper.rs` would parse with `/` path, don't
889        // annoy people about that...
890        if path != "/" {
891            warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
892        }
893    }
894    *uri = match uri.authority() {
895        Some(auth) => {
896            let mut parts = ::http::uri::Parts::default();
897            parts.authority = Some(auth.clone());
898            Uri::from_parts(parts).expect("authority is valid")
899        }
900        None => {
901            unreachable!("authority_form with relative uri");
902        }
903    };
904}
905
906fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
907    let uri_clone = uri.clone();
908    match (uri_clone.scheme(), uri_clone.authority()) {
909        (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
910        (None, Some(auth)) if is_http_connect => {
911            let scheme = match auth.port_u16() {
912                Some(443) => {
913                    set_scheme(uri, Scheme::HTTPS);
914                    Scheme::HTTPS
915                }
916                _ => {
917                    set_scheme(uri, Scheme::HTTP);
918                    Scheme::HTTP
919                }
920            };
921            Ok((scheme, auth.clone()))
922        }
923        _ => {
924            debug!("Client requires absolute-form URIs, received: {:?}", uri);
925            Err(e!(UserAbsoluteUriRequired))
926        }
927    }
928}
929
930fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
931    http::uri::Builder::new()
932        .scheme(scheme)
933        .authority(auth)
934        .path_and_query("/")
935        .build()
936        .expect("domain is valid Uri")
937}
938
939fn set_scheme(uri: &mut Uri, scheme: Scheme) {
940    debug_assert!(
941        uri.scheme().is_none(),
942        "set_scheme expects no existing scheme"
943    );
944    let old = std::mem::take(uri);
945    let mut parts: ::http::uri::Parts = old.into();
946    parts.scheme = Some(scheme);
947    parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
948    *uri = Uri::from_parts(parts).expect("scheme is valid");
949}
950
951fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
952    match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
953        (Some(443), true) => None,
954        (Some(80), false) => None,
955        _ => uri.port(),
956    }
957}
958
959fn is_schema_secure(uri: &Uri) -> bool {
960    uri.scheme_str()
961        .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
962        .unwrap_or_default()
963}
964
965/// A builder to configure a new [`Client`](Client).
966///
967/// # Example
968///
969/// ```
970/// # #[cfg(feature = "tokio")]
971/// # fn run () {
972/// use std::time::Duration;
973/// use miku_hyper_util::client::legacy::Client;
974/// use miku_hyper_util::rt::TokioExecutor;
975///
976/// let client = Client::builder(TokioExecutor::new())
977///     .pool_idle_timeout(Duration::from_secs(30))
978///     .http2_only(true)
979///     .build_http();
980/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
981/// # drop(infer);
982/// # }
983/// # fn main() {}
984/// ```
985#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
986#[derive(Clone)]
987pub struct Builder {
988    client_config: Config,
989    exec: Exec,
990    #[cfg(feature = "http1")]
991    h1_builder: hyper::client::conn::http1::Builder,
992    #[cfg(feature = "http2")]
993    h2_builder: hyper::client::conn::http2::Builder<Exec>,
994    pool_config: pool::Config,
995    pool_timer: Option<timer::Timer>,
996}
997
998impl Builder {
999    /// Construct a new Builder.
1000    pub fn new<E>(executor: E) -> Self
1001    where
1002        E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
1003    {
1004        let exec = Exec::new(executor);
1005        Self {
1006            client_config: Config {
1007                retry_canceled_requests: true,
1008                set_host: true,
1009                ver: Ver::Auto,
1010            },
1011            exec: exec.clone(),
1012            #[cfg(feature = "http1")]
1013            h1_builder: hyper::client::conn::http1::Builder::new(),
1014            #[cfg(feature = "http2")]
1015            h2_builder: hyper::client::conn::http2::Builder::new(exec),
1016            pool_config: pool::Config {
1017                idle_timeout: Some(Duration::from_secs(90)),
1018                max_idle_per_host: usize::MAX,
1019            },
1020            pool_timer: None,
1021        }
1022    }
1023    /// Set an optional timeout for idle sockets being kept-alive.
1024    /// A `Timer` is required for this to take effect. See `Builder::pool_timer`
1025    ///
1026    /// Pass `None` to disable timeout.
1027    ///
1028    /// Default is 90 seconds.
1029    ///
1030    /// # Example
1031    ///
1032    /// ```
1033    /// # #[cfg(feature = "tokio")]
1034    /// # fn run () {
1035    /// use std::time::Duration;
1036    /// use miku_hyper_util::client::legacy::Client;
1037    /// use miku_hyper_util::rt::{TokioExecutor, TokioTimer};
1038    ///
1039    /// let client = Client::builder(TokioExecutor::new())
1040    ///     .pool_idle_timeout(Duration::from_secs(30))
1041    ///     .pool_timer(TokioTimer::new())
1042    ///     .build_http();
1043    ///
1044    /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1045    /// # }
1046    /// # fn main() {}
1047    /// ```
1048    pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
1049    where
1050        D: Into<Option<Duration>>,
1051    {
1052        self.pool_config.idle_timeout = val.into();
1053        self
1054    }
1055
1056    #[doc(hidden)]
1057    #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
1058    pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1059        self.pool_config.max_idle_per_host = max_idle;
1060        self
1061    }
1062
1063    /// Sets the maximum idle connection per host allowed in the pool.
1064    ///
1065    /// Default is `usize::MAX` (no limit).
1066    pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1067        self.pool_config.max_idle_per_host = max_idle;
1068        self
1069    }
1070
1071    // HTTP/1 options
1072
1073    /// Sets the exact size of the read buffer to *always* use.
1074    ///
1075    /// Note that setting this option unsets the `http1_max_buf_size` option.
1076    ///
1077    /// Default is an adaptive read buffer.
1078    #[cfg(feature = "http1")]
1079    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1080    pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1081        self.h1_builder.read_buf_exact_size(Some(sz));
1082        self
1083    }
1084
1085    /// Set the maximum buffer size for the connection.
1086    ///
1087    /// Default is ~400kb.
1088    ///
1089    /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1090    ///
1091    /// # Panics
1092    ///
1093    /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1094    #[cfg(feature = "http1")]
1095    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1096    pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1097        self.h1_builder.max_buf_size(max);
1098        self
1099    }
1100
1101    /// Set whether HTTP/1 connections will accept spaces between header names
1102    /// and the colon that follow them in responses.
1103    ///
1104    /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1105    /// parsing.
1106    ///
1107    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1108    /// to say about it:
1109    ///
1110    /// > No whitespace is allowed between the header field-name and colon. In
1111    /// > the past, differences in the handling of such whitespace have led to
1112    /// > security vulnerabilities in request routing and response handling. A
1113    /// > server MUST reject any received request message that contains
1114    /// > whitespace between a header field-name and colon with a response code
1115    /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1116    /// > response message before forwarding the message downstream.
1117    ///
1118    /// Note that this setting does not affect HTTP/2.
1119    ///
1120    /// Default is false.
1121    ///
1122    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1123    #[cfg(feature = "http1")]
1124    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1125    pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1126        self.h1_builder
1127            .allow_spaces_after_header_name_in_responses(val);
1128        self
1129    }
1130
1131    /// Set whether HTTP/1 connections will accept obsolete line folding for
1132    /// header values.
1133    ///
1134    /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1135    /// to say about it:
1136    ///
1137    /// > A server that receives an obs-fold in a request message that is not
1138    /// > within a message/http container MUST either reject the message by
1139    /// > sending a 400 (Bad Request), preferably with a representation
1140    /// > explaining that obsolete line folding is unacceptable, or replace
1141    /// > each received obs-fold with one or more SP octets prior to
1142    /// > interpreting the field value or forwarding the message downstream.
1143    ///
1144    /// > A proxy or gateway that receives an obs-fold in a response message
1145    /// > that is not within a message/http container MUST either discard the
1146    /// > message and replace it with a 502 (Bad Gateway) response, preferably
1147    /// > with a representation explaining that unacceptable line folding was
1148    /// > received, or replace each received obs-fold with one or more SP
1149    /// > octets prior to interpreting the field value or forwarding the
1150    /// > message downstream.
1151    ///
1152    /// > A user agent that receives an obs-fold in a response message that is
1153    /// > not within a message/http container MUST replace each received
1154    /// > obs-fold with one or more SP octets prior to interpreting the field
1155    /// > value.
1156    ///
1157    /// Note that this setting does not affect HTTP/2.
1158    ///
1159    /// Default is false.
1160    ///
1161    /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1162    #[cfg(feature = "http1")]
1163    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1164    pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1165        self.h1_builder
1166            .allow_obsolete_multiline_headers_in_responses(val);
1167        self
1168    }
1169
1170    /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1171    ///
1172    /// This mimics the behaviour of major browsers. You probably don't want this.
1173    /// You should only want this if you are implementing a proxy whose main
1174    /// purpose is to sit in front of browsers whose users access arbitrary content
1175    /// which may be malformed, and they expect everything that works without
1176    /// the proxy to keep working with the proxy.
1177    ///
1178    /// This option will prevent Hyper's client from returning an error encountered
1179    /// when parsing a header, except if the error was caused by the character NUL
1180    /// (ASCII code 0), as Chrome specifically always reject those.
1181    ///
1182    /// The ignorable errors are:
1183    /// * empty header names;
1184    /// * characters that are not allowed in header names, except for `\0` and `\r`;
1185    /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1186    ///   spaces and tabs between the header name and the colon;
1187    /// * missing colon between header name and colon;
1188    /// * characters that are not allowed in header values except for `\0` and `\r`.
1189    ///
1190    /// If an ignorable error is encountered, the parser tries to find the next
1191    /// line in the input to resume parsing the rest of the headers. An error
1192    /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1193    /// looking for the next line.
1194    #[cfg(feature = "http1")]
1195    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1196    pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1197        self.h1_builder.ignore_invalid_headers_in_responses(val);
1198        self
1199    }
1200
1201    /// Set whether HTTP/1 connections should try to use vectored writes,
1202    /// or always flatten into a single buffer.
1203    ///
1204    /// Note that setting this to false may mean more copies of body data,
1205    /// but may also improve performance when an IO transport doesn't
1206    /// support vectored writes well, such as most TLS implementations.
1207    ///
1208    /// Setting this to true will force hyper to use queued strategy
1209    /// which may eliminate unnecessary cloning on some TLS backends
1210    ///
1211    /// Default is `auto`. In this mode hyper will try to guess which
1212    /// mode to use
1213    #[cfg(feature = "http1")]
1214    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1215    pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1216        self.h1_builder.writev(enabled);
1217        self
1218    }
1219
1220    /// Set whether HTTP/1 connections will write header names as title case at
1221    /// the socket level.
1222    ///
1223    /// Note that this setting does not affect HTTP/2.
1224    ///
1225    /// Default is false.
1226    #[cfg(feature = "http1")]
1227    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1228    pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1229        self.h1_builder.title_case_headers(val);
1230        self
1231    }
1232
1233    /// Set whether to support preserving original header cases.
1234    ///
1235    /// Currently, this will record the original cases received, and store them
1236    /// in a private extension on the `Response`. It will also look for and use
1237    /// such an extension in any provided `Request`.
1238    ///
1239    /// Since the relevant extension is still private, there is no way to
1240    /// interact with the original cases. The only effect this can have now is
1241    /// to forward the cases in a proxy-like fashion.
1242    ///
1243    /// Note that this setting does not affect HTTP/2.
1244    ///
1245    /// Default is false.
1246    #[cfg(feature = "http1")]
1247    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1248    pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1249        self.h1_builder.preserve_header_case(val);
1250        self
1251    }
1252
1253    /// Set the maximum number of headers.
1254    ///
1255    /// When a response is received, the parser will reserve a buffer to store headers for optimal
1256    /// performance.
1257    ///
1258    /// If client receives more headers than the buffer size, the error "message header too large"
1259    /// is returned.
1260    ///
1261    /// The headers is allocated on the stack by default, which has higher performance. After
1262    /// setting this value, headers will be allocated in heap memory, that is, heap memory
1263    /// allocation will occur for each response, and there will be a performance drop of about 5%.
1264    ///
1265    /// Note that this setting does not affect HTTP/2.
1266    ///
1267    /// Default is 100.
1268    #[cfg(feature = "http1")]
1269    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1270    pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
1271        self.h1_builder.max_headers(val);
1272        self
1273    }
1274
1275    /// Set whether HTTP/0.9 responses should be tolerated.
1276    ///
1277    /// Default is false.
1278    #[cfg(feature = "http1")]
1279    #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1280    pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1281        self.h1_builder.http09_responses(val);
1282        self
1283    }
1284
1285    /// Set whether the connection **must** use HTTP/2.
1286    ///
1287    /// The destination must either allow HTTP2 Prior Knowledge, or the
1288    /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1289    /// as part of the connection process. This will not make the `Client`
1290    /// utilize ALPN by itself.
1291    ///
1292    /// Note that setting this to true prevents HTTP/1 from being allowed.
1293    ///
1294    /// Default is false.
1295    #[cfg(feature = "http2")]
1296    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1297    pub fn http2_only(&mut self, val: bool) -> &mut Self {
1298        self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1299        self
1300    }
1301
1302    /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
1303    ///
1304    /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
1305    /// As of v0.4.0, it is 20.
1306    ///
1307    /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
1308    #[cfg(feature = "http2")]
1309    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1310    pub fn http2_max_pending_accept_reset_streams(
1311        &mut self,
1312        max: impl Into<Option<usize>>,
1313    ) -> &mut Self {
1314        self.h2_builder.max_pending_accept_reset_streams(max.into());
1315        self
1316    }
1317
1318    /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1319    /// stream-level flow control.
1320    ///
1321    /// Passing `None` will do nothing.
1322    ///
1323    /// If not set, hyper will use a default.
1324    ///
1325    /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1326    #[cfg(feature = "http2")]
1327    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1328    pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1329        self.h2_builder.initial_stream_window_size(sz.into());
1330        self
1331    }
1332
1333    /// Sets the max connection-level flow control for HTTP2
1334    ///
1335    /// Passing `None` will do nothing.
1336    ///
1337    /// If not set, hyper will use a default.
1338    #[cfg(feature = "http2")]
1339    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1340    pub fn http2_initial_connection_window_size(
1341        &mut self,
1342        sz: impl Into<Option<u32>>,
1343    ) -> &mut Self {
1344        self.h2_builder.initial_connection_window_size(sz.into());
1345        self
1346    }
1347
1348    /// Sets the initial maximum of locally initiated (send) streams.
1349    ///
1350    /// This value will be overwritten by the value included in the initial
1351    /// SETTINGS frame received from the peer as part of a [connection preface].
1352    ///
1353    /// Passing `None` will do nothing.
1354    ///
1355    /// If not set, hyper will use a default.
1356    ///
1357    /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
1358    #[cfg(feature = "http2")]
1359    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1360    pub fn http2_initial_max_send_streams(
1361        &mut self,
1362        initial: impl Into<Option<usize>>,
1363    ) -> &mut Self {
1364        self.h2_builder.initial_max_send_streams(initial);
1365        self
1366    }
1367
1368    /// Sets whether to use an adaptive flow control.
1369    ///
1370    /// Enabling this will override the limits set in
1371    /// `http2_initial_stream_window_size` and
1372    /// `http2_initial_connection_window_size`.
1373    #[cfg(feature = "http2")]
1374    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1375    pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1376        self.h2_builder.adaptive_window(enabled);
1377        self
1378    }
1379
1380    /// Sets the maximum frame size to use for HTTP2.
1381    ///
1382    /// Passing `None` will do nothing.
1383    ///
1384    /// If not set, hyper will use a default.
1385    #[cfg(feature = "http2")]
1386    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1387    pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1388        self.h2_builder.max_frame_size(sz);
1389        self
1390    }
1391
1392    /// Sets the max size of received header frames for HTTP2.
1393    ///
1394    /// Default is currently 16KB, but can change.
1395    #[cfg(feature = "http2")]
1396    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1397    pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
1398        self.h2_builder.max_header_list_size(max);
1399        self
1400    }
1401
1402    /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1403    /// connection alive.
1404    ///
1405    /// Pass `None` to disable HTTP2 keep-alive.
1406    ///
1407    /// Default is currently disabled.
1408    ///
1409    /// # Cargo Feature
1410    ///
1411    /// Requires the `tokio` cargo feature to be enabled.
1412    #[cfg(feature = "tokio")]
1413    #[cfg(feature = "http2")]
1414    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1415    pub fn http2_keep_alive_interval(
1416        &mut self,
1417        interval: impl Into<Option<Duration>>,
1418    ) -> &mut Self {
1419        self.h2_builder.keep_alive_interval(interval);
1420        self
1421    }
1422
1423    /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1424    ///
1425    /// If the ping is not acknowledged within the timeout, the connection will
1426    /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1427    ///
1428    /// Default is 20 seconds.
1429    ///
1430    /// # Cargo Feature
1431    ///
1432    /// Requires the `tokio` cargo feature to be enabled.
1433    #[cfg(feature = "tokio")]
1434    #[cfg(feature = "http2")]
1435    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1436    pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1437        self.h2_builder.keep_alive_timeout(timeout);
1438        self
1439    }
1440
1441    /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1442    ///
1443    /// If disabled, keep-alive pings are only sent while there are open
1444    /// request/responses streams. If enabled, pings are also sent when no
1445    /// streams are active. Does nothing if `http2_keep_alive_interval` is
1446    /// disabled.
1447    ///
1448    /// Default is `false`.
1449    ///
1450    /// # Cargo Feature
1451    ///
1452    /// Requires the `tokio` cargo feature to be enabled.
1453    #[cfg(feature = "tokio")]
1454    #[cfg(feature = "http2")]
1455    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1456    pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1457        self.h2_builder.keep_alive_while_idle(enabled);
1458        self
1459    }
1460
1461    /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1462    ///
1463    /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1464    /// details.
1465    ///
1466    /// The default value is determined by the `h2` crate.
1467    ///
1468    /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1469    #[cfg(feature = "http2")]
1470    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1471    pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1472        self.h2_builder.max_concurrent_reset_streams(max);
1473        self
1474    }
1475
1476    /// Sets the header table size.
1477    ///
1478    /// This setting informs the peer of the maximum size of the header compression
1479    /// table used to encode header blocks, in octets. The encoder may select any value
1480    /// equal to or less than the header table size specified by the sender.
1481    ///
1482    /// The default value of crate `h2` is 4,096.
1483    #[cfg(feature = "http2")]
1484    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1485    pub fn http2_header_table_size(&mut self, size: impl Into<Option<u32>>) -> &mut Self {
1486        self.h2_builder.header_table_size(size);
1487        self
1488    }
1489
1490    /// Enables or disables server push promises.
1491    ///
1492    /// This value is included in the initial SETTINGS handshake.
1493    /// Setting this value to value to
1494    /// false in the initial SETTINGS handshake guarantees that the remote server
1495    /// will never send a push promise.
1496    ///
1497    /// This setting can be changed during the life of a single HTTP/2
1498    /// connection by sending another settings frame updating the value.
1499    ///
1500    /// Default value of crate `h2`: `true`.
1501    #[cfg(feature = "http2")]
1502    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1503    pub fn http2_enable_push(&mut self, enabled: bool) -> &mut Self {
1504        self.h2_builder.enable_push(enabled);
1505        self
1506    }
1507
1508    /// Sets the maximum number of concurrent streams.
1509    ///
1510    /// The maximum concurrent streams setting only controls the maximum number
1511    /// of streams that can be initiated by the remote peer. In other words,
1512    /// when this setting is set to 100, this does not limit the number of
1513    /// concurrent streams that can be created by the caller.
1514    ///
1515    /// It is recommended that this value be no smaller than 100, so as to not
1516    /// unnecessarily limit parallelism. However, any value is legal, including
1517    /// 0. If `max` is set to 0, then the remote will not be permitted to
1518    /// initiate streams.
1519    ///
1520    /// Note that streams in the reserved state, i.e., push promises that have
1521    /// been reserved but the stream has not started, do not count against this
1522    /// setting.
1523    ///
1524    /// Also note that if the remote *does* exceed the value set here, it is not
1525    /// a protocol level error. Instead, the `h2` library will immediately reset
1526    /// the stream.
1527    ///
1528    /// See [Section 5.1.2] in the HTTP/2 spec for more details.
1529    ///
1530    /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
1531    #[cfg(feature = "http2")]
1532    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1533    pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
1534        self.h2_builder.max_concurrent_streams(max);
1535        self
1536    }
1537
1538    /// Sets the `Headers` frame pseudo order.
1539    ///
1540    /// The pseudo header order setting controls the order of pseudo headers in the
1541    /// serialized HTTP/2 headers. The default value is `None`, which means that
1542    /// we let the `miku-h2` crate decide the order.
1543    #[cfg(feature = "http2")]
1544    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1545    pub fn http2_headers_frame_pseudo_order(
1546        &mut self,
1547        order: impl Into<Option<&'static [PseudoType; 4]>>,
1548    ) -> &mut Self {
1549        self.h2_builder.headers_frame_pseudo_order(order);
1550        self
1551    }
1552
1553    /// Sets the `Headers` frame priority.
1554    #[cfg(feature = "http2")]
1555    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1556    pub fn headers_frame_priority(
1557        &mut self,
1558        priority: impl Into<Option<FrameStreamDependency>>,
1559    ) -> &mut Self {
1560        self.h2_builder.headers_frame_priority(priority);
1561        self
1562    }
1563
1564    /// Sets the `Priority` frames (settings) for virtual streams.
1565    #[cfg(feature = "http2")]
1566    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1567    pub fn virtual_streams_priorities(
1568        &mut self,
1569        priorities: impl Into<Option<&'static [FramePriority]>>,
1570    ) -> &mut Self {
1571        self.h2_builder.virtual_streams_priorities(priorities);
1572        self
1573    }
1574
1575    /// Provide a timer to be used for h2
1576    ///
1577    /// See the documentation of [`h2::client::Builder::timer`] for more
1578    /// details.
1579    ///
1580    /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
1581    pub fn timer<M>(&mut self, timer: M) -> &mut Self
1582    where
1583        M: Timer + Send + Sync + 'static,
1584    {
1585        #[cfg(feature = "http2")]
1586        self.h2_builder.timer(timer);
1587        self
1588    }
1589
1590    /// Provide a timer to be used for timeouts and intervals in connection pools.
1591    pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
1592    where
1593        M: Timer + Clone + Send + Sync + 'static,
1594    {
1595        self.pool_timer = Some(timer::Timer::new(timer.clone()));
1596        self
1597    }
1598
1599    /// Set the maximum write buffer size for each HTTP/2 stream.
1600    ///
1601    /// Default is currently 1MB, but may change.
1602    ///
1603    /// # Panics
1604    ///
1605    /// The value must be no larger than `u32::MAX`.
1606    #[cfg(feature = "http2")]
1607    #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1608    pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1609        self.h2_builder.max_send_buf_size(max);
1610        self
1611    }
1612
1613    /// Set whether to retry requests that get disrupted before ever starting
1614    /// to write.
1615    ///
1616    /// This means a request that is queued, and gets given an idle, reused
1617    /// connection, and then encounters an error immediately as the idle
1618    /// connection was found to be unusable.
1619    ///
1620    /// When this is set to `false`, the related `ResponseFuture` would instead
1621    /// resolve to an `Error::Cancel`.
1622    ///
1623    /// Default is `true`.
1624    #[inline]
1625    pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1626        self.client_config.retry_canceled_requests = val;
1627        self
1628    }
1629
1630    /// Set whether to automatically add the `Host` header to requests.
1631    ///
1632    /// If true, and a request does not include a `Host` header, one will be
1633    /// added automatically, derived from the authority of the `Uri`.
1634    ///
1635    /// Default is `true`.
1636    #[inline]
1637    pub fn set_host(&mut self, val: bool) -> &mut Self {
1638        self.client_config.set_host = val;
1639        self
1640    }
1641
1642    /// Build a client with this configuration and the default `HttpConnector`.
1643    #[cfg(feature = "tokio")]
1644    pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1645    where
1646        B: Body + Send,
1647        B::Data: Send,
1648    {
1649        let mut connector = HttpConnector::new();
1650        if self.pool_config.is_enabled() {
1651            connector.set_keepalive(self.pool_config.idle_timeout);
1652        }
1653        self.build(connector)
1654    }
1655
1656    /// Combine the configuration of this builder with a connector to create a `Client`.
1657    pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1658    where
1659        C: Connect + Clone,
1660        B: Body + Send,
1661        B::Data: Send,
1662    {
1663        let exec = self.exec.clone();
1664        let timer = self.pool_timer.clone();
1665        Client {
1666            config: self.client_config,
1667            exec: exec.clone(),
1668            #[cfg(feature = "http1")]
1669            h1_builder: self.h1_builder.clone(),
1670            #[cfg(feature = "http2")]
1671            h2_builder: self.h2_builder.clone(),
1672            connector,
1673            pool: pool::Pool::new(self.pool_config, exec, timer),
1674        }
1675    }
1676}
1677
1678impl fmt::Debug for Builder {
1679    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1680        f.debug_struct("Builder")
1681            .field("client_config", &self.client_config)
1682            .field("pool_config", &self.pool_config)
1683            .finish()
1684    }
1685}
1686
1687// ==== impl Error ====
1688
1689impl fmt::Debug for Error {
1690    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1691        let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
1692        f.field(&self.kind);
1693        if let Some(ref cause) = self.source {
1694            f.field(cause);
1695        }
1696        f.finish()
1697    }
1698}
1699
1700impl fmt::Display for Error {
1701    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1702        write!(f, "client error ({:?})", self.kind)
1703    }
1704}
1705
1706impl StdError for Error {
1707    fn source(&self) -> Option<&(dyn StdError + 'static)> {
1708        self.source.as_ref().map(|e| &**e as _)
1709    }
1710}
1711
1712impl Error {
1713    /// Returns true if this was an error from `Connect`.
1714    pub fn is_connect(&self) -> bool {
1715        matches!(self.kind, ErrorKind::Connect)
1716    }
1717
1718    /// Returns the info of the client connection on which this error occurred.
1719    #[cfg(any(feature = "http1", feature = "http2"))]
1720    pub fn connect_info(&self) -> Option<&Connected> {
1721        self.connect_info.as_ref()
1722    }
1723
1724    #[cfg(any(feature = "http1", feature = "http2"))]
1725    fn with_connect_info(self, connect_info: Connected) -> Self {
1726        Self {
1727            connect_info: Some(connect_info),
1728            ..self
1729        }
1730    }
1731    fn is_canceled(&self) -> bool {
1732        matches!(self.kind, ErrorKind::Canceled)
1733    }
1734
1735    fn tx(src: hyper::Error) -> Self {
1736        e!(SendRequest, src)
1737    }
1738
1739    fn closed(src: hyper::Error) -> Self {
1740        e!(ChannelClosed, src)
1741    }
1742}