hyper_util_wasm/client/legacy/client.rs
1//! The legacy HTTP Client from 0.14.x
2//!
3//! This `Client` will eventually be deconstructed into more composable parts.
4//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
5//! in much the same way it did in hyper 0.14.
6
7use std::error::Error as StdError;
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{self, Poll};
12use std::time::Duration;
13
14use futures_util::future::{self, Either, FutureExt, TryFutureExt};
15use http::uri::Scheme;
16use hyper::client::conn::TrySendError as ConnTrySendError;
17use hyper::header::{HeaderValue, HOST};
18use hyper::rt::Timer;
19use hyper::{body::Body, Method, Request, Response, Uri, Version};
20use tracing::{debug, trace, warn};
21
22#[cfg(feature = "tokio")]
23use super::connect::HttpConnector;
24use super::connect::{Alpn, Connect, Connected, Connection};
25use super::pool::{self, Ver};
26
27use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
28
29type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
30
31/// A Client to make outgoing HTTP requests.
32///
33/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
34/// underlying connection pool will be reused.
35#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
36pub struct Client<C, B> {
37 config: Config,
38 connector: C,
39 exec: Exec,
40 #[cfg(feature = "http1")]
41 h1_builder: hyper::client::conn::http1::Builder,
42 #[cfg(feature = "http2")]
43 h2_builder: hyper::client::conn::http2::Builder<Exec>,
44 pool: pool::Pool<PoolClient<B>, PoolKey>,
45}
46
47#[derive(Clone, Copy, Debug)]
48struct Config {
49 retry_canceled_requests: bool,
50 set_host: bool,
51 ver: Ver,
52}
53
54/// Client errors
55pub struct Error {
56 kind: ErrorKind,
57 source: Option<Box<dyn StdError + Send + Sync>>,
58}
59
60#[derive(Debug)]
61enum ErrorKind {
62 Canceled,
63 ChannelClosed,
64 Connect,
65 UserUnsupportedRequestMethod,
66 UserUnsupportedVersion,
67 UserAbsoluteUriRequired,
68 SendRequest,
69}
70
71macro_rules! e {
72 ($kind:ident) => {
73 Error {
74 kind: ErrorKind::$kind,
75 source: None,
76 }
77 };
78 ($kind:ident, $src:expr) => {
79 Error {
80 kind: ErrorKind::$kind,
81 source: Some($src.into()),
82 }
83 };
84}
85
86// We might change this... :shrug:
87type PoolKey = (http::uri::Scheme, http::uri::Authority);
88
89enum TrySendError<B> {
90 Retryable { error: Error, req: Request<B> },
91 Nope(Error),
92}
93
94/// A `Future` that will resolve to an HTTP Response.
95///
96/// This is returned by `Client::request` (and `Client::get`).
97#[must_use = "futures do nothing unless polled"]
98pub struct ResponseFuture {
99 inner: SyncWrapper<
100 Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
101 >,
102}
103
104// ===== impl Client =====
105
106impl Client<(), ()> {
107 /// Create a builder to configure a new `Client`.
108 ///
109 /// # Example
110 ///
111 /// ```
112 /// # #[cfg(feature = "tokio")]
113 /// # fn run () {
114 /// use std::time::Duration;
115 /// use hyper_util::client::legacy::Client;
116 /// use hyper_util::rt::TokioExecutor;
117 ///
118 /// let client = Client::builder(TokioExecutor::new())
119 /// .pool_idle_timeout(Duration::from_secs(30))
120 /// .http2_only(true)
121 /// .build_http();
122 /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
123 /// # drop(infer);
124 /// # }
125 /// # fn main() {}
126 /// ```
127 pub fn builder<E>(executor: E) -> Builder
128 where
129 E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
130 {
131 Builder::new(executor)
132 }
133}
134
135impl<C, B> Client<C, B>
136where
137 C: Connect + Clone + Send + Sync + 'static,
138 B: Body + Send + 'static + Unpin,
139 B::Data: Send,
140 B::Error: Into<Box<dyn StdError + Send + Sync>>,
141{
142 /// Send a `GET` request to the supplied `Uri`.
143 ///
144 /// # Note
145 ///
146 /// This requires that the `Body` type have a `Default` implementation.
147 /// It *should* return an "empty" version of itself, such that
148 /// `Body::is_end_stream` is `true`.
149 ///
150 /// # Example
151 ///
152 /// ```
153 /// # #[cfg(feature = "tokio")]
154 /// # fn run () {
155 /// use hyper::Uri;
156 /// use hyper_util::client::legacy::Client;
157 /// use hyper_util::rt::TokioExecutor;
158 /// use bytes::Bytes;
159 /// use http_body_util::Full;
160 ///
161 /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
162 ///
163 /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
164 /// # }
165 /// # fn main() {}
166 /// ```
167 pub fn get(&self, uri: Uri) -> ResponseFuture
168 where
169 B: Default,
170 {
171 let body = B::default();
172 if !body.is_end_stream() {
173 warn!("default Body used for get() does not return true for is_end_stream");
174 }
175
176 let mut req = Request::new(body);
177 *req.uri_mut() = uri;
178 self.request(req)
179 }
180
181 /// Send a constructed `Request` using this `Client`.
182 ///
183 /// # Example
184 ///
185 /// ```
186 /// # #[cfg(feature = "tokio")]
187 /// # fn run () {
188 /// use hyper::{Method, Request};
189 /// use hyper_util::client::legacy::Client;
190 /// use http_body_util::Full;
191 /// use hyper_util::rt::TokioExecutor;
192 /// use bytes::Bytes;
193 ///
194 /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
195 ///
196 /// let req: Request<Full<Bytes>> = Request::builder()
197 /// .method(Method::POST)
198 /// .uri("http://httpbin.org/post")
199 /// .body(Full::from("Hallo!"))
200 /// .expect("request builder");
201 ///
202 /// let future = client.request(req);
203 /// # }
204 /// # fn main() {}
205 /// ```
206 pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
207 let is_http_connect = req.method() == Method::CONNECT;
208 match req.version() {
209 Version::HTTP_11 => (),
210 Version::HTTP_10 => {
211 if is_http_connect {
212 warn!("CONNECT is not allowed for HTTP/1.0");
213 return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod)));
214 }
215 }
216 Version::HTTP_2 => (),
217 // completely unsupported HTTP version (like HTTP/0.9)!
218 other => return ResponseFuture::error_version(other),
219 };
220
221 let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
222 Ok(s) => s,
223 Err(err) => {
224 return ResponseFuture::new(future::err(err));
225 }
226 };
227
228 ResponseFuture::new(self.clone().send_request(req, pool_key))
229 }
230
231 async fn send_request(
232 self,
233 mut req: Request<B>,
234 pool_key: PoolKey,
235 ) -> Result<Response<hyper::body::Incoming>, Error> {
236 let uri = req.uri().clone();
237
238 loop {
239 req = match self.try_send_request(req, pool_key.clone()).await {
240 Ok(resp) => return Ok(resp),
241 Err(TrySendError::Nope(err)) => return Err(err),
242 Err(TrySendError::Retryable { mut req, error }) => {
243 if !self.config.retry_canceled_requests {
244 // if client disabled, don't retry
245 // a fresh connection means we definitely can't retry
246 return Err(error);
247 }
248
249 trace!(
250 "unstarted request canceled, trying again (reason={:?})",
251 error
252 );
253 *req.uri_mut() = uri.clone();
254 req
255 }
256 }
257 }
258 }
259
260 async fn try_send_request(
261 &self,
262 mut req: Request<B>,
263 pool_key: PoolKey,
264 ) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
265 let mut pooled = self
266 .connection_for(pool_key)
267 .await
268 // `connection_for` already retries checkout errors, so if
269 // it returns an error, there's not much else to retry
270 .map_err(TrySendError::Nope)?;
271
272 if pooled.is_http1() {
273 if req.version() == Version::HTTP_2 {
274 warn!("Connection is HTTP/1, but request requires HTTP/2");
275 return Err(TrySendError::Nope(e!(UserUnsupportedVersion)));
276 }
277
278 if self.config.set_host {
279 let uri = req.uri().clone();
280 req.headers_mut().entry(HOST).or_insert_with(|| {
281 let hostname = uri.host().expect("authority implies host");
282 if let Some(port) = get_non_default_port(&uri) {
283 let s = format!("{}:{}", hostname, port);
284 HeaderValue::from_str(&s)
285 } else {
286 HeaderValue::from_str(hostname)
287 }
288 .expect("uri host is valid header value")
289 });
290 }
291
292 // CONNECT always sends authority-form, so check it first...
293 if req.method() == Method::CONNECT {
294 authority_form(req.uri_mut());
295 } else if pooled.conn_info.is_proxied {
296 absolute_form(req.uri_mut());
297 } else {
298 origin_form(req.uri_mut());
299 }
300 } else if req.method() == Method::CONNECT {
301 authority_form(req.uri_mut());
302 }
303
304 let mut res = match pooled.try_send_request(req).await {
305 Ok(res) => res,
306 Err(mut err) => {
307 return if let Some(req) = err.take_message() {
308 Err(TrySendError::Retryable {
309 error: e!(Canceled, err.into_error()),
310 req,
311 })
312 } else {
313 Err(TrySendError::Nope(e!(SendRequest, err.into_error())))
314 }
315 }
316 };
317 //.send_request_retryable(req)
318 //.map_err(ClientError::map_with_reused(pooled.is_reused()));
319
320 // If the Connector included 'extra' info, add to Response...
321 if let Some(extra) = &pooled.conn_info.extra {
322 extra.set(res.extensions_mut());
323 }
324
325 // As of futures@0.1.21, there is a race condition in the mpsc
326 // channel, such that sending when the receiver is closing can
327 // result in the message being stuck inside the queue. It won't
328 // ever notify until the Sender side is dropped.
329 //
330 // To counteract this, we must check if our senders 'want' channel
331 // has been closed after having tried to send. If so, error out...
332 if pooled.is_closed() {
333 return Ok(res);
334 }
335
336 // If pooled is HTTP/2, we can toss this reference immediately.
337 //
338 // when pooled is dropped, it will try to insert back into the
339 // pool. To delay that, spawn a future that completes once the
340 // sender is ready again.
341 //
342 // This *should* only be once the related `Connection` has polled
343 // for a new request to start.
344 //
345 // It won't be ready if there is a body to stream.
346 if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
347 drop(pooled);
348 } else if !res.body().is_end_stream() {
349 //let (delayed_tx, delayed_rx) = oneshot::channel::<()>();
350 //res.body_mut().delayed_eof(delayed_rx);
351 let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
352 // At this point, `pooled` is dropped, and had a chance
353 // to insert into the pool (if conn was idle)
354 //drop(delayed_tx);
355 });
356
357 self.exec.execute(on_idle);
358 } else {
359 // There's no body to delay, but the connection isn't
360 // ready yet. Only re-insert when it's ready
361 let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
362
363 self.exec.execute(on_idle);
364 }
365
366 Ok(res)
367 }
368
369 async fn connection_for(
370 &self,
371 pool_key: PoolKey,
372 ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
373 loop {
374 match self.one_connection_for(pool_key.clone()).await {
375 Ok(pooled) => return Ok(pooled),
376 Err(ClientConnectError::Normal(err)) => return Err(err),
377 Err(ClientConnectError::CheckoutIsClosed(reason)) => {
378 if !self.config.retry_canceled_requests {
379 return Err(e!(Connect, reason));
380 }
381
382 trace!(
383 "unstarted request canceled, trying again (reason={:?})",
384 reason,
385 );
386 continue;
387 }
388 };
389 }
390 }
391
392 async fn one_connection_for(
393 &self,
394 pool_key: PoolKey,
395 ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
396 // Return a single connection if pooling is not enabled
397 if !self.pool.is_enabled() {
398 return self
399 .connect_to(pool_key)
400 .await
401 .map_err(ClientConnectError::Normal);
402 }
403
404 // This actually races 2 different futures to try to get a ready
405 // connection the fastest, and to reduce connection churn.
406 //
407 // - If the pool has an idle connection waiting, that's used
408 // immediately.
409 // - Otherwise, the Connector is asked to start connecting to
410 // the destination Uri.
411 // - Meanwhile, the pool Checkout is watching to see if any other
412 // request finishes and tries to insert an idle connection.
413 // - If a new connection is started, but the Checkout wins after
414 // (an idle connection became available first), the started
415 // connection future is spawned into the runtime to complete,
416 // and then be inserted into the pool as an idle connection.
417 let checkout = self.pool.checkout(pool_key.clone());
418 let connect = self.connect_to(pool_key);
419 let is_ver_h2 = self.config.ver == Ver::Http2;
420
421 // The order of the `select` is depended on below...
422
423 match future::select(checkout, connect).await {
424 // Checkout won, connect future may have been started or not.
425 //
426 // If it has, let it finish and insert back into the pool,
427 // so as to not waste the socket...
428 Either::Left((Ok(checked_out), connecting)) => {
429 // This depends on the `select` above having the correct
430 // order, such that if the checkout future were ready
431 // immediately, the connect future will never have been
432 // started.
433 //
434 // If it *wasn't* ready yet, then the connect future will
435 // have been started...
436 if connecting.started() {
437 let bg = connecting
438 .map_err(|err| {
439 trace!("background connect error: {}", err);
440 })
441 .map(|_pooled| {
442 // dropping here should just place it in
443 // the Pool for us...
444 });
445 // An execute error here isn't important, we're just trying
446 // to prevent a waste of a socket...
447 self.exec.execute(bg);
448 }
449 Ok(checked_out)
450 }
451 // Connect won, checkout can just be dropped.
452 Either::Right((Ok(connected), _checkout)) => Ok(connected),
453 // Either checkout or connect could get canceled:
454 //
455 // 1. Connect is canceled if this is HTTP/2 and there is
456 // an outstanding HTTP/2 connecting task.
457 // 2. Checkout is canceled if the pool cannot deliver an
458 // idle connection reliably.
459 //
460 // In both cases, we should just wait for the other future.
461 Either::Left((Err(err), connecting)) => {
462 if err.is_canceled() {
463 connecting.await.map_err(ClientConnectError::Normal)
464 } else {
465 Err(ClientConnectError::Normal(e!(Connect, err)))
466 }
467 }
468 Either::Right((Err(err), checkout)) => {
469 if err.is_canceled() {
470 checkout.await.map_err(move |err| {
471 if is_ver_h2 && err.is_canceled() {
472 ClientConnectError::CheckoutIsClosed(err)
473 } else {
474 ClientConnectError::Normal(e!(Connect, err))
475 }
476 })
477 } else {
478 Err(ClientConnectError::Normal(err))
479 }
480 }
481 }
482 }
483
484 #[cfg(any(feature = "http1", feature = "http2"))]
485 fn connect_to(
486 &self,
487 pool_key: PoolKey,
488 ) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
489 {
490 let executor = self.exec.clone();
491 let pool = self.pool.clone();
492 #[cfg(feature = "http1")]
493 let h1_builder = self.h1_builder.clone();
494 #[cfg(feature = "http2")]
495 let h2_builder = self.h2_builder.clone();
496 let ver = self.config.ver;
497 let is_ver_h2 = ver == Ver::Http2;
498 let connector = self.connector.clone();
499 let dst = domain_as_uri(pool_key.clone());
500 hyper_lazy(move || {
501 // Try to take a "connecting lock".
502 //
503 // If the pool_key is for HTTP/2, and there is already a
504 // connection being established, then this can't take a
505 // second lock. The "connect_to" future is Canceled.
506 let connecting = match pool.connecting(&pool_key, ver) {
507 Some(lock) => lock,
508 None => {
509 let canceled = e!(Canceled);
510 // TODO
511 //crate::Error::new_canceled().with("HTTP/2 connection in progress");
512 return Either::Right(future::err(canceled));
513 }
514 };
515 Either::Left(
516 connector
517 .connect(super::connect::sealed::Internal, dst)
518 .map_err(|src| e!(Connect, src))
519 .and_then(move |io| {
520 let connected = io.connected();
521 // If ALPN is h2 and we aren't http2_only already,
522 // then we need to convert our pool checkout into
523 // a single HTTP2 one.
524 let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
525 match connecting.alpn_h2(&pool) {
526 Some(lock) => {
527 trace!("ALPN negotiated h2, updating pool");
528 lock
529 }
530 None => {
531 // Another connection has already upgraded,
532 // the pool checkout should finish up for us.
533 let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
534 return Either::Right(future::err(canceled));
535 }
536 }
537 } else {
538 connecting
539 };
540
541 #[cfg_attr(not(feature = "http2"), allow(unused))]
542 let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
543
544 Either::Left(Box::pin(async move {
545 let tx = if is_h2 {
546 #[cfg(feature = "http2")] {
547 let (mut tx, conn) =
548 h2_builder.handshake(io).await.map_err(Error::tx)?;
549
550 trace!(
551 "http2 handshake complete, spawning background dispatcher task"
552 );
553 executor.execute(
554 conn.map_err(|e| debug!("client connection error: {}", e))
555 .map(|_| ()),
556 );
557
558 // Wait for 'conn' to ready up before we
559 // declare this tx as usable
560 tx.ready().await.map_err(Error::tx)?;
561 PoolTx::Http2(tx)
562 }
563 #[cfg(not(feature = "http2"))]
564 panic!("http2 feature is not enabled");
565 } else {
566 #[cfg(feature = "http1")] {
567 let (mut tx, conn) =
568 h1_builder.handshake(io).await.map_err(Error::tx)?;
569
570 trace!(
571 "http1 handshake complete, spawning background dispatcher task"
572 );
573 executor.execute(
574 conn.with_upgrades()
575 .map_err(|e| debug!("client connection error: {}", e))
576 .map(|_| ()),
577 );
578
579 // Wait for 'conn' to ready up before we
580 // declare this tx as usable
581 tx.ready().await.map_err(Error::tx)?;
582 PoolTx::Http1(tx)
583 }
584 #[cfg(not(feature = "http1"))] {
585 panic!("http1 feature is not enabled");
586 }
587 };
588
589 Ok(pool.pooled(
590 connecting,
591 PoolClient {
592 conn_info: connected,
593 tx,
594 },
595 ))
596 }))
597 }),
598 )
599 })
600 }
601}
602
603impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
604where
605 C: Connect + Clone + Send + Sync + 'static,
606 B: Body + Send + 'static + Unpin,
607 B::Data: Send,
608 B::Error: Into<Box<dyn StdError + Send + Sync>>,
609{
610 type Response = Response<hyper::body::Incoming>;
611 type Error = Error;
612 type Future = ResponseFuture;
613
614 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
615 Poll::Ready(Ok(()))
616 }
617
618 fn call(&mut self, req: Request<B>) -> Self::Future {
619 self.request(req)
620 }
621}
622
623impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
624where
625 C: Connect + Clone + Send + Sync + 'static,
626 B: Body + Send + 'static + Unpin,
627 B::Data: Send,
628 B::Error: Into<Box<dyn StdError + Send + Sync>>,
629{
630 type Response = Response<hyper::body::Incoming>;
631 type Error = Error;
632 type Future = ResponseFuture;
633
634 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
635 Poll::Ready(Ok(()))
636 }
637
638 fn call(&mut self, req: Request<B>) -> Self::Future {
639 self.request(req)
640 }
641}
642
643impl<C: Clone, B> Clone for Client<C, B> {
644 fn clone(&self) -> Client<C, B> {
645 Client {
646 config: self.config,
647 exec: self.exec.clone(),
648 #[cfg(feature = "http1")]
649 h1_builder: self.h1_builder.clone(),
650 #[cfg(feature = "http2")]
651 h2_builder: self.h2_builder.clone(),
652 connector: self.connector.clone(),
653 pool: self.pool.clone(),
654 }
655 }
656}
657
658impl<C, B> fmt::Debug for Client<C, B> {
659 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
660 f.debug_struct("Client").finish()
661 }
662}
663
664// ===== impl ResponseFuture =====
665
666impl ResponseFuture {
667 fn new<F>(value: F) -> Self
668 where
669 F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
670 {
671 Self {
672 inner: SyncWrapper::new(Box::pin(value)),
673 }
674 }
675
676 fn error_version(ver: Version) -> Self {
677 warn!("Request has unsupported version \"{:?}\"", ver);
678 ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion))))
679 }
680}
681
682impl fmt::Debug for ResponseFuture {
683 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684 f.pad("Future<Response>")
685 }
686}
687
688impl Future for ResponseFuture {
689 type Output = Result<Response<hyper::body::Incoming>, Error>;
690
691 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
692 self.inner.get_mut().as_mut().poll(cx)
693 }
694}
695
696// ===== impl PoolClient =====
697
698// FIXME: allow() required due to `impl Trait` leaking types to this lint
699#[allow(missing_debug_implementations)]
700struct PoolClient<B> {
701 conn_info: Connected,
702 tx: PoolTx<B>,
703}
704
705enum PoolTx<B> {
706 #[cfg(feature = "http1")]
707 Http1(hyper::client::conn::http1::SendRequest<B>),
708 #[cfg(feature = "http2")]
709 Http2(hyper::client::conn::http2::SendRequest<B>),
710}
711
712impl<B> PoolClient<B> {
713 fn poll_ready(
714 &mut self,
715 #[allow(unused_variables)] cx: &mut task::Context<'_>,
716 ) -> Poll<Result<(), Error>> {
717 match self.tx {
718 #[cfg(feature = "http1")]
719 PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
720 #[cfg(feature = "http2")]
721 PoolTx::Http2(_) => Poll::Ready(Ok(())),
722 }
723 }
724
725 fn is_http1(&self) -> bool {
726 !self.is_http2()
727 }
728
729 fn is_http2(&self) -> bool {
730 match self.tx {
731 #[cfg(feature = "http1")]
732 PoolTx::Http1(_) => false,
733 #[cfg(feature = "http2")]
734 PoolTx::Http2(_) => true,
735 }
736 }
737
738 fn is_ready(&self) -> bool {
739 match self.tx {
740 #[cfg(feature = "http1")]
741 PoolTx::Http1(ref tx) => tx.is_ready(),
742 #[cfg(feature = "http2")]
743 PoolTx::Http2(ref tx) => tx.is_ready(),
744 }
745 }
746
747 fn is_closed(&self) -> bool {
748 match self.tx {
749 #[cfg(feature = "http1")]
750 PoolTx::Http1(ref tx) => tx.is_closed(),
751 #[cfg(feature = "http2")]
752 PoolTx::Http2(ref tx) => tx.is_closed(),
753 }
754 }
755}
756
757impl<B: Body + 'static> PoolClient<B> {
758 fn try_send_request(
759 &mut self,
760 req: Request<B>,
761 ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
762 where
763 B: Send,
764 {
765 #[cfg(all(feature = "http1", feature = "http2"))]
766 return match self.tx {
767 #[cfg(feature = "http1")]
768 PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
769 #[cfg(feature = "http2")]
770 PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
771 };
772
773 #[cfg(feature = "http1")]
774 #[cfg(not(feature = "http2"))]
775 return match self.tx {
776 #[cfg(feature = "http1")]
777 PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
778 };
779
780 #[cfg(not(feature = "http1"))]
781 #[cfg(feature = "http2")]
782 return match self.tx {
783 #[cfg(feature = "http2")]
784 PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
785 };
786 }
787 /*
788 //TODO: can we re-introduce this somehow? Or must people use tower::retry?
789 fn send_request_retryable(
790 &mut self,
791 req: Request<B>,
792 ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, (Error, Option<Request<B>>)>>
793 where
794 B: Send,
795 {
796 match self.tx {
797 #[cfg(not(feature = "http2"))]
798 PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req),
799 #[cfg(feature = "http1")]
800 PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request_retryable(req)),
801 #[cfg(feature = "http2")]
802 PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request_retryable(req)),
803 }
804 }
805 */
806}
807
808impl<B> pool::Poolable for PoolClient<B>
809where
810 B: Send + 'static,
811{
812 fn is_open(&self) -> bool {
813 self.is_ready()
814 }
815
816 fn reserve(self) -> pool::Reservation<Self> {
817 match self.tx {
818 #[cfg(feature = "http1")]
819 PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
820 conn_info: self.conn_info,
821 tx: PoolTx::Http1(tx),
822 }),
823 #[cfg(feature = "http2")]
824 PoolTx::Http2(tx) => {
825 let b = PoolClient {
826 conn_info: self.conn_info.clone(),
827 tx: PoolTx::Http2(tx.clone()),
828 };
829 let a = PoolClient {
830 conn_info: self.conn_info,
831 tx: PoolTx::Http2(tx),
832 };
833 pool::Reservation::Shared(a, b)
834 }
835 }
836 }
837
838 fn can_share(&self) -> bool {
839 self.is_http2()
840 }
841}
842
843enum ClientConnectError {
844 Normal(Error),
845 CheckoutIsClosed(pool::Error),
846}
847
848fn origin_form(uri: &mut Uri) {
849 let path = match uri.path_and_query() {
850 Some(path) if path.as_str() != "/" => {
851 let mut parts = ::http::uri::Parts::default();
852 parts.path_and_query = Some(path.clone());
853 Uri::from_parts(parts).expect("path is valid uri")
854 }
855 _none_or_just_slash => {
856 debug_assert!(Uri::default() == "/");
857 Uri::default()
858 }
859 };
860 *uri = path
861}
862
863fn absolute_form(uri: &mut Uri) {
864 debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
865 debug_assert!(
866 uri.authority().is_some(),
867 "absolute_form needs an authority"
868 );
869 // If the URI is to HTTPS, and the connector claimed to be a proxy,
870 // then it *should* have tunneled, and so we don't want to send
871 // absolute-form in that case.
872 if uri.scheme() == Some(&Scheme::HTTPS) {
873 origin_form(uri);
874 }
875}
876
877fn authority_form(uri: &mut Uri) {
878 if let Some(path) = uri.path_and_query() {
879 // `https://hyper.rs` would parse with `/` path, don't
880 // annoy people about that...
881 if path != "/" {
882 warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
883 }
884 }
885 *uri = match uri.authority() {
886 Some(auth) => {
887 let mut parts = ::http::uri::Parts::default();
888 parts.authority = Some(auth.clone());
889 Uri::from_parts(parts).expect("authority is valid")
890 }
891 None => {
892 unreachable!("authority_form with relative uri");
893 }
894 };
895}
896
897fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
898 let uri_clone = uri.clone();
899 match (uri_clone.scheme(), uri_clone.authority()) {
900 (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
901 (None, Some(auth)) if is_http_connect => {
902 let scheme = match auth.port_u16() {
903 Some(443) => {
904 set_scheme(uri, Scheme::HTTPS);
905 Scheme::HTTPS
906 }
907 _ => {
908 set_scheme(uri, Scheme::HTTP);
909 Scheme::HTTP
910 }
911 };
912 Ok((scheme, auth.clone()))
913 }
914 _ => {
915 debug!("Client requires absolute-form URIs, received: {:?}", uri);
916 Err(e!(UserAbsoluteUriRequired))
917 }
918 }
919}
920
921fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
922 http::uri::Builder::new()
923 .scheme(scheme)
924 .authority(auth)
925 .path_and_query("/")
926 .build()
927 .expect("domain is valid Uri")
928}
929
930fn set_scheme(uri: &mut Uri, scheme: Scheme) {
931 debug_assert!(
932 uri.scheme().is_none(),
933 "set_scheme expects no existing scheme"
934 );
935 let old = std::mem::take(uri);
936 let mut parts: ::http::uri::Parts = old.into();
937 parts.scheme = Some(scheme);
938 parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
939 *uri = Uri::from_parts(parts).expect("scheme is valid");
940}
941
942fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
943 match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
944 (Some(443), true) => None,
945 (Some(80), false) => None,
946 _ => uri.port(),
947 }
948}
949
950fn is_schema_secure(uri: &Uri) -> bool {
951 uri.scheme_str()
952 .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
953 .unwrap_or_default()
954}
955
956/// A builder to configure a new [`Client`](Client).
957///
958/// # Example
959///
960/// ```
961/// # #[cfg(feature = "tokio")]
962/// # fn run () {
963/// use std::time::Duration;
964/// use hyper_util::client::legacy::Client;
965/// use hyper_util::rt::TokioExecutor;
966///
967/// let client = Client::builder(TokioExecutor::new())
968/// .pool_idle_timeout(Duration::from_secs(30))
969/// .http2_only(true)
970/// .build_http();
971/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
972/// # drop(infer);
973/// # }
974/// # fn main() {}
975/// ```
976#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
977#[derive(Clone)]
978pub struct Builder {
979 client_config: Config,
980 exec: Exec,
981 #[cfg(feature = "http1")]
982 h1_builder: hyper::client::conn::http1::Builder,
983 #[cfg(feature = "http2")]
984 h2_builder: hyper::client::conn::http2::Builder<Exec>,
985 pool_config: pool::Config,
986 pool_timer: Option<timer::Timer>,
987}
988
989impl Builder {
990 /// Construct a new Builder.
991 pub fn new<E>(executor: E) -> Self
992 where
993 E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
994 {
995 let exec = Exec::new(executor);
996 Self {
997 client_config: Config {
998 retry_canceled_requests: true,
999 set_host: true,
1000 ver: Ver::Auto,
1001 },
1002 exec: exec.clone(),
1003 #[cfg(feature = "http1")]
1004 h1_builder: hyper::client::conn::http1::Builder::new(),
1005 #[cfg(feature = "http2")]
1006 h2_builder: hyper::client::conn::http2::Builder::new(exec),
1007 pool_config: pool::Config {
1008 idle_timeout: Some(Duration::from_secs(90)),
1009 max_idle_per_host: usize::MAX,
1010 },
1011 pool_timer: None,
1012 }
1013 }
1014 /// Set an optional timeout for idle sockets being kept-alive.
1015 /// A `Timer` is required for this to take effect. See `Builder::pool_timer`
1016 ///
1017 /// Pass `None` to disable timeout.
1018 ///
1019 /// Default is 90 seconds.
1020 ///
1021 /// # Example
1022 ///
1023 /// ```
1024 /// # #[cfg(feature = "tokio")]
1025 /// # fn run () {
1026 /// use std::time::Duration;
1027 /// use hyper_util::client::legacy::Client;
1028 /// use hyper_util::rt::{TokioExecutor, TokioTimer};
1029 ///
1030 /// let client = Client::builder(TokioExecutor::new())
1031 /// .pool_idle_timeout(Duration::from_secs(30))
1032 /// .pool_timer(TokioTimer::new())
1033 /// .build_http();
1034 ///
1035 /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1036 /// # }
1037 /// # fn main() {}
1038 /// ```
1039 pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
1040 where
1041 D: Into<Option<Duration>>,
1042 {
1043 self.pool_config.idle_timeout = val.into();
1044 self
1045 }
1046
1047 #[doc(hidden)]
1048 #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
1049 pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1050 self.pool_config.max_idle_per_host = max_idle;
1051 self
1052 }
1053
1054 /// Sets the maximum idle connection per host allowed in the pool.
1055 ///
1056 /// Default is `usize::MAX` (no limit).
1057 pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1058 self.pool_config.max_idle_per_host = max_idle;
1059 self
1060 }
1061
1062 // HTTP/1 options
1063
1064 /// Sets the exact size of the read buffer to *always* use.
1065 ///
1066 /// Note that setting this option unsets the `http1_max_buf_size` option.
1067 ///
1068 /// Default is an adaptive read buffer.
1069 #[cfg(feature = "http1")]
1070 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1071 pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1072 self.h1_builder.read_buf_exact_size(Some(sz));
1073 self
1074 }
1075
1076 /// Set the maximum buffer size for the connection.
1077 ///
1078 /// Default is ~400kb.
1079 ///
1080 /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1081 ///
1082 /// # Panics
1083 ///
1084 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1085 #[cfg(feature = "http1")]
1086 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1087 pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1088 self.h1_builder.max_buf_size(max);
1089 self
1090 }
1091
1092 /// Set whether HTTP/1 connections will accept spaces between header names
1093 /// and the colon that follow them in responses.
1094 ///
1095 /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1096 /// parsing.
1097 ///
1098 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1099 /// to say about it:
1100 ///
1101 /// > No whitespace is allowed between the header field-name and colon. In
1102 /// > the past, differences in the handling of such whitespace have led to
1103 /// > security vulnerabilities in request routing and response handling. A
1104 /// > server MUST reject any received request message that contains
1105 /// > whitespace between a header field-name and colon with a response code
1106 /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1107 /// > response message before forwarding the message downstream.
1108 ///
1109 /// Note that this setting does not affect HTTP/2.
1110 ///
1111 /// Default is false.
1112 ///
1113 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1114 #[cfg(feature = "http1")]
1115 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1116 pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1117 self.h1_builder
1118 .allow_spaces_after_header_name_in_responses(val);
1119 self
1120 }
1121
1122 /// Set whether HTTP/1 connections will accept obsolete line folding for
1123 /// header values.
1124 ///
1125 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1126 /// to say about it:
1127 ///
1128 /// > A server that receives an obs-fold in a request message that is not
1129 /// > within a message/http container MUST either reject the message by
1130 /// > sending a 400 (Bad Request), preferably with a representation
1131 /// > explaining that obsolete line folding is unacceptable, or replace
1132 /// > each received obs-fold with one or more SP octets prior to
1133 /// > interpreting the field value or forwarding the message downstream.
1134 ///
1135 /// > A proxy or gateway that receives an obs-fold in a response message
1136 /// > that is not within a message/http container MUST either discard the
1137 /// > message and replace it with a 502 (Bad Gateway) response, preferably
1138 /// > with a representation explaining that unacceptable line folding was
1139 /// > received, or replace each received obs-fold with one or more SP
1140 /// > octets prior to interpreting the field value or forwarding the
1141 /// > message downstream.
1142 ///
1143 /// > A user agent that receives an obs-fold in a response message that is
1144 /// > not within a message/http container MUST replace each received
1145 /// > obs-fold with one or more SP octets prior to interpreting the field
1146 /// > value.
1147 ///
1148 /// Note that this setting does not affect HTTP/2.
1149 ///
1150 /// Default is false.
1151 ///
1152 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1153 #[cfg(feature = "http1")]
1154 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1155 pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1156 self.h1_builder
1157 .allow_obsolete_multiline_headers_in_responses(val);
1158 self
1159 }
1160
1161 /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1162 ///
1163 /// This mimics the behaviour of major browsers. You probably don't want this.
1164 /// You should only want this if you are implementing a proxy whose main
1165 /// purpose is to sit in front of browsers whose users access arbitrary content
1166 /// which may be malformed, and they expect everything that works without
1167 /// the proxy to keep working with the proxy.
1168 ///
1169 /// This option will prevent Hyper's client from returning an error encountered
1170 /// when parsing a header, except if the error was caused by the character NUL
1171 /// (ASCII code 0), as Chrome specifically always reject those.
1172 ///
1173 /// The ignorable errors are:
1174 /// * empty header names;
1175 /// * characters that are not allowed in header names, except for `\0` and `\r`;
1176 /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1177 /// spaces and tabs between the header name and the colon;
1178 /// * missing colon between header name and colon;
1179 /// * characters that are not allowed in header values except for `\0` and `\r`.
1180 ///
1181 /// If an ignorable error is encountered, the parser tries to find the next
1182 /// line in the input to resume parsing the rest of the headers. An error
1183 /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1184 /// looking for the next line.
1185 #[cfg(feature = "http1")]
1186 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1187 pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1188 self.h1_builder.ignore_invalid_headers_in_responses(val);
1189 self
1190 }
1191
1192 /// Set whether HTTP/1 connections should try to use vectored writes,
1193 /// or always flatten into a single buffer.
1194 ///
1195 /// Note that setting this to false may mean more copies of body data,
1196 /// but may also improve performance when an IO transport doesn't
1197 /// support vectored writes well, such as most TLS implementations.
1198 ///
1199 /// Setting this to true will force hyper to use queued strategy
1200 /// which may eliminate unnecessary cloning on some TLS backends
1201 ///
1202 /// Default is `auto`. In this mode hyper will try to guess which
1203 /// mode to use
1204 #[cfg(feature = "http1")]
1205 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1206 pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1207 self.h1_builder.writev(enabled);
1208 self
1209 }
1210
1211 /// Set whether HTTP/1 connections will write header names as title case at
1212 /// the socket level.
1213 ///
1214 /// Note that this setting does not affect HTTP/2.
1215 ///
1216 /// Default is false.
1217 #[cfg(feature = "http1")]
1218 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1219 pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1220 self.h1_builder.title_case_headers(val);
1221 self
1222 }
1223
1224 /// Set whether to support preserving original header cases.
1225 ///
1226 /// Currently, this will record the original cases received, and store them
1227 /// in a private extension on the `Response`. It will also look for and use
1228 /// such an extension in any provided `Request`.
1229 ///
1230 /// Since the relevant extension is still private, there is no way to
1231 /// interact with the original cases. The only effect this can have now is
1232 /// to forward the cases in a proxy-like fashion.
1233 ///
1234 /// Note that this setting does not affect HTTP/2.
1235 ///
1236 /// Default is false.
1237 #[cfg(feature = "http1")]
1238 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1239 pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1240 self.h1_builder.preserve_header_case(val);
1241 self
1242 }
1243
1244 /// Set the maximum number of headers.
1245 ///
1246 /// When a response is received, the parser will reserve a buffer to store headers for optimal
1247 /// performance.
1248 ///
1249 /// If client receives more headers than the buffer size, the error "message header too large"
1250 /// is returned.
1251 ///
1252 /// The headers is allocated on the stack by default, which has higher performance. After
1253 /// setting this value, headers will be allocated in heap memory, that is, heap memory
1254 /// allocation will occur for each response, and there will be a performance drop of about 5%.
1255 ///
1256 /// Note that this setting does not affect HTTP/2.
1257 ///
1258 /// Default is 100.
1259 #[cfg(feature = "http1")]
1260 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1261 pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
1262 self.h1_builder.max_headers(val);
1263 self
1264 }
1265
1266 /// Set whether HTTP/0.9 responses should be tolerated.
1267 ///
1268 /// Default is false.
1269 #[cfg(feature = "http1")]
1270 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1271 pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1272 self.h1_builder.http09_responses(val);
1273 self
1274 }
1275
1276 /// Set whether the connection **must** use HTTP/2.
1277 ///
1278 /// The destination must either allow HTTP2 Prior Knowledge, or the
1279 /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1280 /// as part of the connection process. This will not make the `Client`
1281 /// utilize ALPN by itself.
1282 ///
1283 /// Note that setting this to true prevents HTTP/1 from being allowed.
1284 ///
1285 /// Default is false.
1286 #[cfg(feature = "http2")]
1287 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1288 pub fn http2_only(&mut self, val: bool) -> &mut Self {
1289 self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1290 self
1291 }
1292
1293 /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
1294 ///
1295 /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
1296 /// As of v0.4.0, it is 20.
1297 ///
1298 /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
1299 #[cfg(feature = "http2")]
1300 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1301 pub fn http2_max_pending_accept_reset_streams(
1302 &mut self,
1303 max: impl Into<Option<usize>>,
1304 ) -> &mut Self {
1305 self.h2_builder.max_pending_accept_reset_streams(max.into());
1306 self
1307 }
1308
1309 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1310 /// stream-level flow control.
1311 ///
1312 /// Passing `None` will do nothing.
1313 ///
1314 /// If not set, hyper will use a default.
1315 ///
1316 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1317 #[cfg(feature = "http2")]
1318 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1319 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1320 self.h2_builder.initial_stream_window_size(sz.into());
1321 self
1322 }
1323
1324 /// Sets the max connection-level flow control for HTTP2
1325 ///
1326 /// Passing `None` will do nothing.
1327 ///
1328 /// If not set, hyper will use a default.
1329 #[cfg(feature = "http2")]
1330 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1331 pub fn http2_initial_connection_window_size(
1332 &mut self,
1333 sz: impl Into<Option<u32>>,
1334 ) -> &mut Self {
1335 self.h2_builder.initial_connection_window_size(sz.into());
1336 self
1337 }
1338
1339 /// Sets the initial maximum of locally initiated (send) streams.
1340 ///
1341 /// This value will be overwritten by the value included in the initial
1342 /// SETTINGS frame received from the peer as part of a [connection preface].
1343 ///
1344 /// Passing `None` will do nothing.
1345 ///
1346 /// If not set, hyper will use a default.
1347 ///
1348 /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
1349 #[cfg(feature = "http2")]
1350 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1351 pub fn http2_initial_max_send_streams(
1352 &mut self,
1353 initial: impl Into<Option<usize>>,
1354 ) -> &mut Self {
1355 self.h2_builder.initial_max_send_streams(initial);
1356 self
1357 }
1358
1359 /// Sets whether to use an adaptive flow control.
1360 ///
1361 /// Enabling this will override the limits set in
1362 /// `http2_initial_stream_window_size` and
1363 /// `http2_initial_connection_window_size`.
1364 #[cfg(feature = "http2")]
1365 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1366 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1367 self.h2_builder.adaptive_window(enabled);
1368 self
1369 }
1370
1371 /// Sets the maximum frame size to use for HTTP2.
1372 ///
1373 /// Passing `None` will do nothing.
1374 ///
1375 /// If not set, hyper will use a default.
1376 #[cfg(feature = "http2")]
1377 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1378 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1379 self.h2_builder.max_frame_size(sz);
1380 self
1381 }
1382
1383 /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1384 /// connection alive.
1385 ///
1386 /// Pass `None` to disable HTTP2 keep-alive.
1387 ///
1388 /// Default is currently disabled.
1389 ///
1390 /// # Cargo Feature
1391 ///
1392 /// Requires the `tokio` cargo feature to be enabled.
1393 #[cfg(feature = "tokio")]
1394 #[cfg(feature = "http2")]
1395 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1396 pub fn http2_keep_alive_interval(
1397 &mut self,
1398 interval: impl Into<Option<Duration>>,
1399 ) -> &mut Self {
1400 self.h2_builder.keep_alive_interval(interval);
1401 self
1402 }
1403
1404 /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1405 ///
1406 /// If the ping is not acknowledged within the timeout, the connection will
1407 /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1408 ///
1409 /// Default is 20 seconds.
1410 ///
1411 /// # Cargo Feature
1412 ///
1413 /// Requires the `tokio` cargo feature to be enabled.
1414 #[cfg(feature = "tokio")]
1415 #[cfg(feature = "http2")]
1416 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1417 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1418 self.h2_builder.keep_alive_timeout(timeout);
1419 self
1420 }
1421
1422 /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1423 ///
1424 /// If disabled, keep-alive pings are only sent while there are open
1425 /// request/responses streams. If enabled, pings are also sent when no
1426 /// streams are active. Does nothing if `http2_keep_alive_interval` is
1427 /// disabled.
1428 ///
1429 /// Default is `false`.
1430 ///
1431 /// # Cargo Feature
1432 ///
1433 /// Requires the `tokio` cargo feature to be enabled.
1434 #[cfg(feature = "tokio")]
1435 #[cfg(feature = "http2")]
1436 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1437 pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1438 self.h2_builder.keep_alive_while_idle(enabled);
1439 self
1440 }
1441
1442 /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1443 ///
1444 /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1445 /// details.
1446 ///
1447 /// The default value is determined by the `h2` crate.
1448 ///
1449 /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1450 #[cfg(feature = "http2")]
1451 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1452 pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1453 self.h2_builder.max_concurrent_reset_streams(max);
1454 self
1455 }
1456
1457 /// Provide a timer to be used for h2
1458 ///
1459 /// See the documentation of [`h2::client::Builder::timer`] for more
1460 /// details.
1461 ///
1462 /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
1463 pub fn timer<M>(&mut self, timer: M) -> &mut Self
1464 where
1465 M: Timer + Send + Sync + 'static,
1466 {
1467 #[cfg(feature = "http2")]
1468 self.h2_builder.timer(timer);
1469 self
1470 }
1471
1472 /// Provide a timer to be used for timeouts and intervals in connection pools.
1473 pub fn pool_timer<M>(&mut self, _timer: M) -> &mut Self
1474 where
1475 M: Timer + Clone + Send + Sync + 'static,
1476 {
1477 self.pool_timer = Some(timer::Timer::new());
1478 self
1479 }
1480
1481 /// Set the maximum write buffer size for each HTTP/2 stream.
1482 ///
1483 /// Default is currently 1MB, but may change.
1484 ///
1485 /// # Panics
1486 ///
1487 /// The value must be no larger than `u32::MAX`.
1488 #[cfg(feature = "http2")]
1489 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1490 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1491 self.h2_builder.max_send_buf_size(max);
1492 self
1493 }
1494
1495 /// Set whether to retry requests that get disrupted before ever starting
1496 /// to write.
1497 ///
1498 /// This means a request that is queued, and gets given an idle, reused
1499 /// connection, and then encounters an error immediately as the idle
1500 /// connection was found to be unusable.
1501 ///
1502 /// When this is set to `false`, the related `ResponseFuture` would instead
1503 /// resolve to an `Error::Cancel`.
1504 ///
1505 /// Default is `true`.
1506 #[inline]
1507 pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1508 self.client_config.retry_canceled_requests = val;
1509 self
1510 }
1511
1512 /// Set whether to automatically add the `Host` header to requests.
1513 ///
1514 /// If true, and a request does not include a `Host` header, one will be
1515 /// added automatically, derived from the authority of the `Uri`.
1516 ///
1517 /// Default is `true`.
1518 #[inline]
1519 pub fn set_host(&mut self, val: bool) -> &mut Self {
1520 self.client_config.set_host = val;
1521 self
1522 }
1523
1524 /// Build a client with this configuration and the default `HttpConnector`.
1525 #[cfg(feature = "tokio")]
1526 pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1527 where
1528 B: Body + Send,
1529 B::Data: Send,
1530 {
1531 let mut connector = HttpConnector::new();
1532 if self.pool_config.is_enabled() {
1533 connector.set_keepalive(self.pool_config.idle_timeout);
1534 }
1535 self.build(connector)
1536 }
1537
1538 /// Combine the configuration of this builder with a connector to create a `Client`.
1539 pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1540 where
1541 C: Connect + Clone,
1542 B: Body + Send,
1543 B::Data: Send,
1544 {
1545 let exec = self.exec.clone();
1546 let timer = self.pool_timer.clone();
1547 Client {
1548 config: self.client_config,
1549 exec: exec.clone(),
1550 #[cfg(feature = "http1")]
1551 h1_builder: self.h1_builder.clone(),
1552 #[cfg(feature = "http2")]
1553 h2_builder: self.h2_builder.clone(),
1554 connector,
1555 pool: pool::Pool::new(self.pool_config, exec, timer),
1556 }
1557 }
1558}
1559
1560impl fmt::Debug for Builder {
1561 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1562 f.debug_struct("Builder")
1563 .field("client_config", &self.client_config)
1564 .field("pool_config", &self.pool_config)
1565 .finish()
1566 }
1567}
1568
1569// ==== impl Error ====
1570
1571impl fmt::Debug for Error {
1572 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1573 let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
1574 f.field(&self.kind);
1575 if let Some(ref cause) = self.source {
1576 f.field(cause);
1577 }
1578 f.finish()
1579 }
1580}
1581
1582impl fmt::Display for Error {
1583 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1584 write!(f, "client error ({:?})", self.kind)
1585 }
1586}
1587
1588impl StdError for Error {
1589 fn source(&self) -> Option<&(dyn StdError + 'static)> {
1590 self.source.as_ref().map(|e| &**e as _)
1591 }
1592}
1593
1594impl Error {
1595 /// Returns true if this was an error from `Connect`.
1596 pub fn is_connect(&self) -> bool {
1597 matches!(self.kind, ErrorKind::Connect)
1598 }
1599
1600 fn is_canceled(&self) -> bool {
1601 matches!(self.kind, ErrorKind::Canceled)
1602 }
1603
1604 fn tx(src: hyper::Error) -> Self {
1605 e!(SendRequest, src)
1606 }
1607
1608 fn closed(src: hyper::Error) -> Self {
1609 e!(ChannelClosed, src)
1610 }
1611}