hyper_util/client/legacy/client.rs
1//! The legacy HTTP Client from 0.14.x
2//!
3//! This `Client` will eventually be deconstructed into more composable parts.
4//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
5//! in much the same way it did in hyper 0.14.
6
7use std::error::Error as StdError;
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{self, Poll};
12use std::time::Duration;
13
14use futures_util::future::{self, Either, FutureExt, TryFutureExt};
15use http::uri::Scheme;
16use hyper::client::conn::TrySendError as ConnTrySendError;
17use hyper::header::{HeaderValue, HOST};
18use hyper::rt::Timer;
19use hyper::{body::Body, Method, Request, Response, Uri, Version};
20use tracing::{debug, trace, warn};
21
22use super::connect::capture::CaptureConnectionExtension;
23#[cfg(feature = "tokio")]
24use super::connect::HttpConnector;
25use super::connect::{Alpn, Connect, Connected, Connection};
26use super::pool::{self, Ver};
27
28use crate::common::future::poll_fn;
29use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
30
31type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
32
33/// A Client to make outgoing HTTP requests.
34///
35/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
36/// underlying connection pool will be reused.
37#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
38pub struct Client<C, B> {
39 config: Config,
40 connector: C,
41 exec: Exec,
42 #[cfg(feature = "http1")]
43 h1_builder: hyper::client::conn::http1::Builder,
44 #[cfg(feature = "http2")]
45 h2_builder: hyper::client::conn::http2::Builder<Exec>,
46 pool: pool::Pool<PoolClient<B>, PoolKey>,
47}
48
49#[derive(Clone, Copy, Debug)]
50struct Config {
51 retry_canceled_requests: bool,
52 set_host: bool,
53 ver: Ver,
54}
55
56/// Client errors
57pub struct Error {
58 kind: ErrorKind,
59 source: Option<Box<dyn StdError + Send + Sync>>,
60 #[cfg(any(feature = "http1", feature = "http2"))]
61 connect_info: Option<Connected>,
62}
63
64#[derive(Debug)]
65enum ErrorKind {
66 Canceled,
67 ChannelClosed,
68 Connect,
69 UserUnsupportedRequestMethod,
70 UserUnsupportedVersion,
71 UserAbsoluteUriRequired,
72 SendRequest,
73}
74
75macro_rules! e {
76 ($kind:ident) => {
77 Error {
78 kind: ErrorKind::$kind,
79 source: None,
80 connect_info: None,
81 }
82 };
83 ($kind:ident, $src:expr) => {
84 Error {
85 kind: ErrorKind::$kind,
86 source: Some($src.into()),
87 connect_info: None,
88 }
89 };
90}
91
92// We might change this... :shrug:
93type PoolKey = (http::uri::Scheme, http::uri::Authority);
94
95enum TrySendError<B> {
96 Retryable {
97 error: Error,
98 req: Request<B>,
99 connection_reused: bool,
100 },
101 Nope(Error),
102}
103
104/// A `Future` that will resolve to an HTTP Response.
105///
106/// This is returned by `Client::request` (and `Client::get`).
107#[must_use = "futures do nothing unless polled"]
108pub struct ResponseFuture {
109 inner: SyncWrapper<
110 Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
111 >,
112}
113
114// ===== impl Client =====
115
116impl Client<(), ()> {
117 /// Create a builder to configure a new `Client`.
118 ///
119 /// # Example
120 ///
121 /// ```
122 /// # #[cfg(feature = "tokio")]
123 /// # fn run () {
124 /// use std::time::Duration;
125 /// use hyper_util::client::legacy::Client;
126 /// use hyper_util::rt::{TokioExecutor, TokioTimer};
127 ///
128 /// let client = Client::builder(TokioExecutor::new())
129 /// .pool_timer(TokioTimer::new())
130 /// .pool_idle_timeout(Duration::from_secs(30))
131 /// .http2_only(true)
132 /// .build_http();
133 /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
134 /// # drop(infer);
135 /// # }
136 /// # fn main() {}
137 /// ```
138 pub fn builder<E>(executor: E) -> Builder
139 where
140 E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
141 {
142 Builder::new(executor)
143 }
144}
145
146impl<C, B> Client<C, B>
147where
148 C: Connect + Clone + Send + Sync + 'static,
149 B: Body + Send + 'static + Unpin,
150 B::Data: Send,
151 B::Error: Into<Box<dyn StdError + Send + Sync>>,
152{
153 /// Send a `GET` request to the supplied `Uri`.
154 ///
155 /// # Note
156 ///
157 /// This requires that the `Body` type have a `Default` implementation.
158 /// It *should* return an "empty" version of itself, such that
159 /// `Body::is_end_stream` is `true`.
160 ///
161 /// # Example
162 ///
163 /// ```
164 /// # #[cfg(feature = "tokio")]
165 /// # fn run () {
166 /// use hyper::Uri;
167 /// use hyper_util::client::legacy::Client;
168 /// use hyper_util::rt::TokioExecutor;
169 /// use bytes::Bytes;
170 /// use http_body_util::Full;
171 ///
172 /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
173 ///
174 /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
175 /// # }
176 /// # fn main() {}
177 /// ```
178 pub fn get(&self, uri: Uri) -> ResponseFuture
179 where
180 B: Default,
181 {
182 let body = B::default();
183 if !body.is_end_stream() {
184 warn!("default Body used for get() does not return true for is_end_stream");
185 }
186
187 let mut req = Request::new(body);
188 *req.uri_mut() = uri;
189 self.request(req)
190 }
191
192 /// Send a constructed `Request` using this `Client`.
193 ///
194 /// # Example
195 ///
196 /// ```
197 /// # #[cfg(feature = "tokio")]
198 /// # fn run () {
199 /// use hyper::{Method, Request};
200 /// use hyper_util::client::legacy::Client;
201 /// use http_body_util::Full;
202 /// use hyper_util::rt::TokioExecutor;
203 /// use bytes::Bytes;
204 ///
205 /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
206 ///
207 /// let req: Request<Full<Bytes>> = Request::builder()
208 /// .method(Method::POST)
209 /// .uri("http://httpbin.org/post")
210 /// .body(Full::from("Hallo!"))
211 /// .expect("request builder");
212 ///
213 /// let future = client.request(req);
214 /// # }
215 /// # fn main() {}
216 /// ```
217 pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
218 let is_http_connect = req.method() == Method::CONNECT;
219 match req.version() {
220 Version::HTTP_11 => (),
221 Version::HTTP_10 => {
222 if is_http_connect {
223 warn!("CONNECT is not allowed for HTTP/1.0");
224 return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod)));
225 }
226 }
227 Version::HTTP_2 => (),
228 // completely unsupported HTTP version (like HTTP/0.9)!
229 other => return ResponseFuture::error_version(other),
230 };
231
232 let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
233 Ok(s) => s,
234 Err(err) => {
235 return ResponseFuture::new(future::err(err));
236 }
237 };
238
239 ResponseFuture::new(self.clone().send_request(req, pool_key))
240 }
241
242 async fn send_request(
243 self,
244 mut req: Request<B>,
245 pool_key: PoolKey,
246 ) -> Result<Response<hyper::body::Incoming>, Error> {
247 let uri = req.uri().clone();
248
249 loop {
250 req = match self.try_send_request(req, pool_key.clone()).await {
251 Ok(resp) => return Ok(resp),
252 Err(TrySendError::Nope(err)) => return Err(err),
253 Err(TrySendError::Retryable {
254 mut req,
255 error,
256 connection_reused,
257 }) => {
258 if !self.config.retry_canceled_requests || !connection_reused {
259 // if client disabled, don't retry
260 // a fresh connection means we definitely can't retry
261 return Err(error);
262 }
263
264 trace!(
265 "unstarted request canceled, trying again (reason={:?})",
266 error
267 );
268 *req.uri_mut() = uri.clone();
269 req
270 }
271 }
272 }
273 }
274
275 async fn try_send_request(
276 &self,
277 mut req: Request<B>,
278 pool_key: PoolKey,
279 ) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
280 let mut pooled = self
281 .connection_for(pool_key)
282 .await
283 // `connection_for` already retries checkout errors, so if
284 // it returns an error, there's not much else to retry
285 .map_err(TrySendError::Nope)?;
286
287 if let Some(conn) = req.extensions_mut().get_mut::<CaptureConnectionExtension>() {
288 conn.set(&pooled.conn_info);
289 }
290
291 if pooled.is_http1() {
292 if req.version() == Version::HTTP_2 {
293 warn!("Connection is HTTP/1, but request requires HTTP/2");
294 return Err(TrySendError::Nope(
295 e!(UserUnsupportedVersion).with_connect_info(pooled.conn_info.clone()),
296 ));
297 }
298
299 if self.config.set_host {
300 let uri = req.uri().clone();
301 req.headers_mut().entry(HOST).or_insert_with(|| {
302 let hostname = uri.host().expect("authority implies host");
303 if let Some(port) = get_non_default_port(&uri) {
304 let s = format!("{hostname}:{port}");
305 HeaderValue::from_str(&s)
306 } else {
307 HeaderValue::from_str(hostname)
308 }
309 .expect("uri host is valid header value")
310 });
311 }
312
313 // CONNECT always sends authority-form, so check it first...
314 if req.method() == Method::CONNECT {
315 authority_form(req.uri_mut());
316 } else if pooled.conn_info.is_proxied {
317 absolute_form(req.uri_mut());
318 } else {
319 origin_form(req.uri_mut());
320 }
321 } else if req.method() == Method::CONNECT && !pooled.is_http2() {
322 authority_form(req.uri_mut());
323 }
324
325 let mut res = match pooled.try_send_request(req).await {
326 Ok(res) => res,
327 Err(mut err) => {
328 return if let Some(req) = err.take_message() {
329 Err(TrySendError::Retryable {
330 connection_reused: pooled.is_reused(),
331 error: e!(Canceled, err.into_error())
332 .with_connect_info(pooled.conn_info.clone()),
333 req,
334 })
335 } else {
336 Err(TrySendError::Nope(
337 e!(SendRequest, err.into_error())
338 .with_connect_info(pooled.conn_info.clone()),
339 ))
340 }
341 }
342 };
343
344 // If the Connector included 'extra' info, add to Response...
345 if let Some(extra) = &pooled.conn_info.extra {
346 extra.set(res.extensions_mut());
347 }
348
349 // If pooled is HTTP/2, we can toss this reference immediately.
350 //
351 // when pooled is dropped, it will try to insert back into the
352 // pool. To delay that, spawn a future that completes once the
353 // sender is ready again.
354 //
355 // This *should* only be once the related `Connection` has polled
356 // for a new request to start.
357 //
358 // It won't be ready if there is a body to stream.
359 if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
360 drop(pooled);
361 } else if !res.body().is_end_stream() {
362 //let (delayed_tx, delayed_rx) = oneshot::channel::<()>();
363 //res.body_mut().delayed_eof(delayed_rx);
364 let on_idle = poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
365 // At this point, `pooled` is dropped, and had a chance
366 // to insert into the pool (if conn was idle)
367 //drop(delayed_tx);
368 });
369
370 self.exec.execute(on_idle);
371 } else {
372 // There's no body to delay, but the connection isn't
373 // ready yet. Only re-insert when it's ready
374 let on_idle = poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
375
376 self.exec.execute(on_idle);
377 }
378
379 Ok(res)
380 }
381
382 async fn connection_for(
383 &self,
384 pool_key: PoolKey,
385 ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
386 loop {
387 match self.one_connection_for(pool_key.clone()).await {
388 Ok(pooled) => return Ok(pooled),
389 Err(ClientConnectError::Normal(err)) => return Err(err),
390 Err(ClientConnectError::CheckoutIsClosed(reason)) => {
391 if !self.config.retry_canceled_requests {
392 return Err(e!(Connect, reason));
393 }
394
395 trace!(
396 "unstarted request canceled, trying again (reason={:?})",
397 reason,
398 );
399 continue;
400 }
401 };
402 }
403 }
404
405 async fn one_connection_for(
406 &self,
407 pool_key: PoolKey,
408 ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
409 // Return a single connection if pooling is not enabled
410 if !self.pool.is_enabled() {
411 return self
412 .connect_to(pool_key)
413 .await
414 .map_err(ClientConnectError::Normal);
415 }
416
417 // This actually races 2 different futures to try to get a ready
418 // connection the fastest, and to reduce connection churn.
419 //
420 // - If the pool has an idle connection waiting, that's used
421 // immediately.
422 // - Otherwise, the Connector is asked to start connecting to
423 // the destination Uri.
424 // - Meanwhile, the pool Checkout is watching to see if any other
425 // request finishes and tries to insert an idle connection.
426 // - If a new connection is started, but the Checkout wins after
427 // (an idle connection became available first), the started
428 // connection future is spawned into the runtime to complete,
429 // and then be inserted into the pool as an idle connection.
430 let checkout = self.pool.checkout(pool_key.clone());
431 let connect = self.connect_to(pool_key);
432 let is_ver_h2 = self.config.ver == Ver::Http2;
433
434 // The order of the `select` is depended on below...
435
436 match future::select(checkout, connect).await {
437 // Checkout won, connect future may have been started or not.
438 //
439 // If it has, let it finish and insert back into the pool,
440 // so as to not waste the socket...
441 Either::Left((Ok(checked_out), connecting)) => {
442 // This depends on the `select` above having the correct
443 // order, such that if the checkout future were ready
444 // immediately, the connect future will never have been
445 // started.
446 //
447 // If it *wasn't* ready yet, then the connect future will
448 // have been started...
449 if connecting.started() {
450 let bg = connecting
451 .map_err(|err| {
452 trace!("background connect error: {}", err);
453 })
454 .map(|_pooled| {
455 // dropping here should just place it in
456 // the Pool for us...
457 });
458 // An execute error here isn't important, we're just trying
459 // to prevent a waste of a socket...
460 self.exec.execute(bg);
461 }
462 Ok(checked_out)
463 }
464 // Connect won, checkout can just be dropped.
465 Either::Right((Ok(connected), _checkout)) => Ok(connected),
466 // Either checkout or connect could get canceled:
467 //
468 // 1. Connect is canceled if this is HTTP/2 and there is
469 // an outstanding HTTP/2 connecting task.
470 // 2. Checkout is canceled if the pool cannot deliver an
471 // idle connection reliably.
472 //
473 // In both cases, we should just wait for the other future.
474 Either::Left((Err(err), connecting)) => {
475 if err.is_canceled() {
476 connecting.await.map_err(ClientConnectError::Normal)
477 } else {
478 Err(ClientConnectError::Normal(e!(Connect, err)))
479 }
480 }
481 Either::Right((Err(err), checkout)) => {
482 if err.is_canceled() {
483 checkout.await.map_err(move |err| {
484 if is_ver_h2 && err.is_canceled() {
485 ClientConnectError::CheckoutIsClosed(err)
486 } else {
487 ClientConnectError::Normal(e!(Connect, err))
488 }
489 })
490 } else {
491 Err(ClientConnectError::Normal(err))
492 }
493 }
494 }
495 }
496
497 #[cfg(any(feature = "http1", feature = "http2"))]
498 fn connect_to(
499 &self,
500 pool_key: PoolKey,
501 ) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
502 {
503 let executor = self.exec.clone();
504 let pool = self.pool.clone();
505 #[cfg(feature = "http1")]
506 let h1_builder = self.h1_builder.clone();
507 #[cfg(feature = "http2")]
508 let h2_builder = self.h2_builder.clone();
509 let ver = self.config.ver;
510 let is_ver_h2 = ver == Ver::Http2;
511 let connector = self.connector.clone();
512 let dst = domain_as_uri(pool_key.clone());
513 hyper_lazy(move || {
514 // Try to take a "connecting lock".
515 //
516 // If the pool_key is for HTTP/2, and there is already a
517 // connection being established, then this can't take a
518 // second lock. The "connect_to" future is Canceled.
519 let connecting = match pool.connecting(&pool_key, ver) {
520 Some(lock) => lock,
521 None => {
522 let canceled = e!(Canceled);
523 // TODO
524 //crate::Error::new_canceled().with("HTTP/2 connection in progress");
525 return Either::Right(future::err(canceled));
526 }
527 };
528 Either::Left(
529 connector
530 .connect(super::connect::sealed::Internal, dst)
531 .map_err(|src| e!(Connect, src))
532 .and_then(move |io| {
533 let connected = io.connected();
534 // If ALPN is h2 and we aren't http2_only already,
535 // then we need to convert our pool checkout into
536 // a single HTTP2 one.
537 let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
538 match connecting.alpn_h2(&pool) {
539 Some(lock) => {
540 trace!("ALPN negotiated h2, updating pool");
541 lock
542 }
543 None => {
544 // Another connection has already upgraded,
545 // the pool checkout should finish up for us.
546 let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
547 return Either::Right(future::err(canceled));
548 }
549 }
550 } else {
551 connecting
552 };
553
554 #[cfg_attr(not(feature = "http2"), allow(unused))]
555 let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
556
557 Either::Left(Box::pin(async move {
558 let tx = if is_h2 {
559 #[cfg(feature = "http2")] {
560 let (mut tx, conn) =
561 h2_builder.handshake(io).await.map_err(Error::tx)?;
562
563 trace!(
564 "http2 handshake complete, spawning background dispatcher task"
565 );
566 executor.execute(
567 conn.map_err(|e| debug!("client connection error: {}", e))
568 .map(|_| ()),
569 );
570
571 // Wait for 'conn' to ready up before we
572 // declare this tx as usable
573 tx.ready().await.map_err(Error::tx)?;
574 PoolTx::Http2(tx)
575 }
576 #[cfg(not(feature = "http2"))]
577 panic!("http2 feature is not enabled");
578 } else {
579 #[cfg(feature = "http1")] {
580 // Perform the HTTP/1.1 handshake on the provided I/O stream.
581 // Uses the h1_builder to establish a connection, returning a sender (tx) for requests
582 // and a connection task (conn) that manages the connection lifecycle.
583 let (mut tx, conn) =
584 h1_builder.handshake(io).await.map_err(crate::client::legacy::client::Error::tx)?;
585
586 // Log that the HTTP/1.1 handshake has completed successfully.
587 // This indicates the connection is established and ready for request processing.
588 trace!(
589 "http1 handshake complete, spawning background dispatcher task"
590 );
591 // Create a oneshot channel to communicate errors from the connection task.
592 // err_tx sends errors from the connection task, and err_rx receives them
593 // to correlate connection failures with request readiness errors.
594 let (err_tx, err_rx) = tokio::sync::oneshot::channel();
595 // Spawn the connection task in the background using the executor.
596 // The task manages the HTTP/1.1 connection, including upgrades (e.g., WebSocket).
597 // Errors are sent via err_tx to ensure they can be checked if the sender (tx) fails.
598 executor.execute(
599 conn.with_upgrades()
600 .map_err(|e| {
601 // Log the connection error at debug level for diagnostic purposes.
602 debug!("client connection error: {:?}", e);
603 // Log that the error is being sent to the error channel.
604 trace!("sending connection error to error channel");
605 // Send the error via the oneshot channel, ignoring send failures
606 // (e.g., if the receiver is dropped, which is handled later).
607 let _ =err_tx.send(e);
608 })
609 .map(|_| ()),
610 );
611 // Log that the client is waiting for the connection to be ready.
612 // Readiness indicates the sender (tx) can accept a request without blocking.
613 trace!("waiting for connection to be ready");
614 // Check if the sender is ready to accept a request.
615 // This ensures the connection is fully established before proceeding.
616 // aka:
617 // Wait for 'conn' to ready up before we
618 // declare this tx as usable
619 match tx.ready().await {
620 // If ready, the connection is usable for sending requests.
621 Ok(_) => {
622 // Log that the connection is ready for use.
623 trace!("connection is ready");
624 // Drop the error receiver, as it’s no longer needed since the sender is ready.
625 // This prevents waiting for errors that won’t occur in a successful case.
626 drop(err_rx);
627 // Wrap the sender in PoolTx::Http1 for use in the connection pool.
628 PoolTx::Http1(tx)
629 }
630 // If the sender fails with a closed channel error, check for a specific connection error.
631 // This distinguishes between a vague ChannelClosed error and an actual connection failure.
632 Err(e) if e.is_closed() => {
633 // Log that the channel is closed, indicating a potential connection issue.
634 trace!("connection channel closed, checking for connection error");
635 // Check the oneshot channel for a specific error from the connection task.
636 match err_rx.await {
637 // If an error was received, it’s a specific connection failure.
638 Ok(err) => {
639 // Log the specific connection error for diagnostics.
640 trace!("received connection error: {:?}", err);
641 // Return the error wrapped in Error::tx to propagate it.
642 return Err(crate::client::legacy::client::Error::tx(err));
643 }
644 // If the error channel is closed, no specific error was sent.
645 // Fall back to the vague ChannelClosed error.
646 Err(_) => {
647 // Log that the error channel is closed, indicating no specific error.
648 trace!("error channel closed, returning the vague ChannelClosed error");
649 // Return the original error wrapped in Error::tx.
650 return Err(crate::client::legacy::client::Error::tx(e));
651 }
652 }
653 }
654 // For other errors (e.g., timeout, I/O issues), propagate them directly.
655 // These are not ChannelClosed errors and don’t require error channel checks.
656 Err(e) => {
657 // Log the specific readiness failure for diagnostics.
658 trace!("connection readiness failed: {:?}", e);
659 // Return the error wrapped in Error::tx to propagate it.
660 return Err(crate::client::legacy::client::Error::tx(e));
661 }
662 }
663 }
664 #[cfg(not(feature = "http1"))] {
665 panic!("http1 feature is not enabled");
666 }
667 };
668
669 Ok(pool.pooled(
670 connecting,
671 PoolClient {
672 conn_info: connected,
673 tx,
674 },
675 ))
676 }))
677 }),
678 )
679 })
680 }
681}
682
683impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
684where
685 C: Connect + Clone + Send + Sync + 'static,
686 B: Body + Send + 'static + Unpin,
687 B::Data: Send,
688 B::Error: Into<Box<dyn StdError + Send + Sync>>,
689{
690 type Response = Response<hyper::body::Incoming>;
691 type Error = Error;
692 type Future = ResponseFuture;
693
694 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
695 Poll::Ready(Ok(()))
696 }
697
698 fn call(&mut self, req: Request<B>) -> Self::Future {
699 self.request(req)
700 }
701}
702
703impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
704where
705 C: Connect + Clone + Send + Sync + 'static,
706 B: Body + Send + 'static + Unpin,
707 B::Data: Send,
708 B::Error: Into<Box<dyn StdError + Send + Sync>>,
709{
710 type Response = Response<hyper::body::Incoming>;
711 type Error = Error;
712 type Future = ResponseFuture;
713
714 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
715 Poll::Ready(Ok(()))
716 }
717
718 fn call(&mut self, req: Request<B>) -> Self::Future {
719 self.request(req)
720 }
721}
722
723impl<C: Clone, B> Clone for Client<C, B> {
724 fn clone(&self) -> Client<C, B> {
725 Client {
726 config: self.config,
727 exec: self.exec.clone(),
728 #[cfg(feature = "http1")]
729 h1_builder: self.h1_builder.clone(),
730 #[cfg(feature = "http2")]
731 h2_builder: self.h2_builder.clone(),
732 connector: self.connector.clone(),
733 pool: self.pool.clone(),
734 }
735 }
736}
737
738impl<C, B> fmt::Debug for Client<C, B> {
739 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
740 f.debug_struct("Client").finish()
741 }
742}
743
744// ===== impl ResponseFuture =====
745
746impl ResponseFuture {
747 fn new<F>(value: F) -> Self
748 where
749 F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
750 {
751 Self {
752 inner: SyncWrapper::new(Box::pin(value)),
753 }
754 }
755
756 fn error_version(ver: Version) -> Self {
757 warn!("Request has unsupported version \"{:?}\"", ver);
758 ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion))))
759 }
760}
761
762impl fmt::Debug for ResponseFuture {
763 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
764 f.pad("Future<Response>")
765 }
766}
767
768impl Future for ResponseFuture {
769 type Output = Result<Response<hyper::body::Incoming>, Error>;
770
771 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
772 self.inner.get_mut().as_mut().poll(cx)
773 }
774}
775
776// ===== impl PoolClient =====
777
778// FIXME: allow() required due to `impl Trait` leaking types to this lint
779#[allow(missing_debug_implementations)]
780struct PoolClient<B> {
781 conn_info: Connected,
782 tx: PoolTx<B>,
783}
784
785enum PoolTx<B> {
786 #[cfg(feature = "http1")]
787 Http1(hyper::client::conn::http1::SendRequest<B>),
788 #[cfg(feature = "http2")]
789 Http2(hyper::client::conn::http2::SendRequest<B>),
790}
791
792impl<B> PoolClient<B> {
793 fn poll_ready(
794 &mut self,
795 #[allow(unused_variables)] cx: &mut task::Context<'_>,
796 ) -> Poll<Result<(), Error>> {
797 match self.tx {
798 #[cfg(feature = "http1")]
799 PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
800 #[cfg(feature = "http2")]
801 PoolTx::Http2(_) => Poll::Ready(Ok(())),
802 }
803 }
804
805 fn is_http1(&self) -> bool {
806 !self.is_http2()
807 }
808
809 fn is_http2(&self) -> bool {
810 match self.tx {
811 #[cfg(feature = "http1")]
812 PoolTx::Http1(_) => false,
813 #[cfg(feature = "http2")]
814 PoolTx::Http2(_) => true,
815 }
816 }
817
818 fn is_poisoned(&self) -> bool {
819 self.conn_info.poisoned.poisoned()
820 }
821
822 fn is_ready(&self) -> bool {
823 match self.tx {
824 #[cfg(feature = "http1")]
825 PoolTx::Http1(ref tx) => tx.is_ready(),
826 #[cfg(feature = "http2")]
827 PoolTx::Http2(ref tx) => tx.is_ready(),
828 }
829 }
830}
831
832impl<B: Body + 'static> PoolClient<B> {
833 fn try_send_request(
834 &mut self,
835 req: Request<B>,
836 ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
837 where
838 B: Send,
839 {
840 #[cfg(all(feature = "http1", feature = "http2"))]
841 return match self.tx {
842 #[cfg(feature = "http1")]
843 PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
844 #[cfg(feature = "http2")]
845 PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
846 };
847
848 #[cfg(feature = "http1")]
849 #[cfg(not(feature = "http2"))]
850 return match self.tx {
851 #[cfg(feature = "http1")]
852 PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
853 };
854
855 #[cfg(not(feature = "http1"))]
856 #[cfg(feature = "http2")]
857 return match self.tx {
858 #[cfg(feature = "http2")]
859 PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
860 };
861 }
862}
863
864impl<B> pool::Poolable for PoolClient<B>
865where
866 B: Send + 'static,
867{
868 fn is_open(&self) -> bool {
869 !self.is_poisoned() && self.is_ready()
870 }
871
872 fn reserve(self) -> pool::Reservation<Self> {
873 match self.tx {
874 #[cfg(feature = "http1")]
875 PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
876 conn_info: self.conn_info,
877 tx: PoolTx::Http1(tx),
878 }),
879 #[cfg(feature = "http2")]
880 PoolTx::Http2(tx) => {
881 let b = PoolClient {
882 conn_info: self.conn_info.clone(),
883 tx: PoolTx::Http2(tx.clone()),
884 };
885 let a = PoolClient {
886 conn_info: self.conn_info,
887 tx: PoolTx::Http2(tx),
888 };
889 pool::Reservation::Shared(a, b)
890 }
891 }
892 }
893
894 fn can_share(&self) -> bool {
895 self.is_http2()
896 }
897}
898
899enum ClientConnectError {
900 Normal(Error),
901 CheckoutIsClosed(pool::Error),
902}
903
904fn origin_form(uri: &mut Uri) {
905 let path = match uri.path_and_query() {
906 Some(path) if path.as_str() != "/" => {
907 let mut parts = ::http::uri::Parts::default();
908 parts.path_and_query = Some(path.clone());
909 Uri::from_parts(parts).expect("path is valid uri")
910 }
911 _none_or_just_slash => {
912 debug_assert!(Uri::default() == "/");
913 Uri::default()
914 }
915 };
916 *uri = path
917}
918
919fn absolute_form(uri: &mut Uri) {
920 debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
921 debug_assert!(
922 uri.authority().is_some(),
923 "absolute_form needs an authority"
924 );
925 // If the URI is to HTTPS, and the connector claimed to be a proxy,
926 // then it *should* have tunneled, and so we don't want to send
927 // absolute-form in that case.
928 if uri.scheme() == Some(&Scheme::HTTPS) {
929 origin_form(uri);
930 }
931}
932
933fn authority_form(uri: &mut Uri) {
934 if let Some(path) = uri.path_and_query() {
935 // `https://hyper.rs` would parse with `/` path, don't
936 // annoy people about that...
937 if path != "/" {
938 warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
939 }
940 }
941 *uri = match uri.authority() {
942 Some(auth) => {
943 let mut parts = ::http::uri::Parts::default();
944 parts.authority = Some(auth.clone());
945 Uri::from_parts(parts).expect("authority is valid")
946 }
947 None => {
948 unreachable!("authority_form with relative uri");
949 }
950 };
951}
952
953fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
954 let uri_clone = uri.clone();
955 match (uri_clone.scheme(), uri_clone.authority()) {
956 (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
957 (None, Some(auth)) if is_http_connect => {
958 let scheme = match auth.port_u16() {
959 Some(443) => {
960 set_scheme(uri, Scheme::HTTPS);
961 Scheme::HTTPS
962 }
963 _ => {
964 set_scheme(uri, Scheme::HTTP);
965 Scheme::HTTP
966 }
967 };
968 Ok((scheme, auth.clone()))
969 }
970 _ => {
971 debug!("Client requires absolute-form URIs, received: {:?}", uri);
972 Err(e!(UserAbsoluteUriRequired))
973 }
974 }
975}
976
977fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
978 http::uri::Builder::new()
979 .scheme(scheme)
980 .authority(auth)
981 .path_and_query("/")
982 .build()
983 .expect("domain is valid Uri")
984}
985
986fn set_scheme(uri: &mut Uri, scheme: Scheme) {
987 debug_assert!(
988 uri.scheme().is_none(),
989 "set_scheme expects no existing scheme"
990 );
991 let old = std::mem::take(uri);
992 let mut parts: ::http::uri::Parts = old.into();
993 parts.scheme = Some(scheme);
994 parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
995 *uri = Uri::from_parts(parts).expect("scheme is valid");
996}
997
998fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
999 match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
1000 (Some(443), true) => None,
1001 (Some(80), false) => None,
1002 _ => uri.port(),
1003 }
1004}
1005
1006fn is_schema_secure(uri: &Uri) -> bool {
1007 uri.scheme_str()
1008 .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
1009 .unwrap_or_default()
1010}
1011
1012/// A builder to configure a new [`Client`](Client).
1013///
1014/// # Example
1015///
1016/// ```
1017/// # #[cfg(feature = "tokio")]
1018/// # fn run () {
1019/// use std::time::Duration;
1020/// use hyper_util::client::legacy::Client;
1021/// use hyper_util::rt::TokioExecutor;
1022///
1023/// let client = Client::builder(TokioExecutor::new())
1024/// .pool_idle_timeout(Duration::from_secs(30))
1025/// .http2_only(true)
1026/// .build_http();
1027/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1028/// # drop(infer);
1029/// # }
1030/// # fn main() {}
1031/// ```
1032#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
1033#[derive(Clone)]
1034pub struct Builder {
1035 client_config: Config,
1036 exec: Exec,
1037 #[cfg(feature = "http1")]
1038 h1_builder: hyper::client::conn::http1::Builder,
1039 #[cfg(feature = "http2")]
1040 h2_builder: hyper::client::conn::http2::Builder<Exec>,
1041 pool_config: pool::Config,
1042 pool_timer: Option<timer::Timer>,
1043}
1044
1045impl Builder {
1046 /// Construct a new Builder.
1047 pub fn new<E>(executor: E) -> Self
1048 where
1049 E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
1050 {
1051 let exec = Exec::new(executor);
1052 Self {
1053 client_config: Config {
1054 retry_canceled_requests: true,
1055 set_host: true,
1056 ver: Ver::Auto,
1057 },
1058 exec: exec.clone(),
1059 #[cfg(feature = "http1")]
1060 h1_builder: hyper::client::conn::http1::Builder::new(),
1061 #[cfg(feature = "http2")]
1062 h2_builder: hyper::client::conn::http2::Builder::new(exec),
1063 pool_config: pool::Config {
1064 idle_timeout: Some(Duration::from_secs(90)),
1065 max_idle_per_host: usize::MAX,
1066 },
1067 pool_timer: None,
1068 }
1069 }
1070 /// Set an optional timeout for idle sockets being kept-alive.
1071 /// A `Timer` is required for this to take effect. See `Builder::pool_timer`
1072 ///
1073 /// Pass `None` to disable timeout.
1074 ///
1075 /// Default is 90 seconds.
1076 ///
1077 /// # Example
1078 ///
1079 /// ```
1080 /// # #[cfg(feature = "tokio")]
1081 /// # fn run () {
1082 /// use std::time::Duration;
1083 /// use hyper_util::client::legacy::Client;
1084 /// use hyper_util::rt::{TokioExecutor, TokioTimer};
1085 ///
1086 /// let client = Client::builder(TokioExecutor::new())
1087 /// .pool_idle_timeout(Duration::from_secs(30))
1088 /// .pool_timer(TokioTimer::new())
1089 /// .build_http();
1090 ///
1091 /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1092 /// # }
1093 /// # fn main() {}
1094 /// ```
1095 pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
1096 where
1097 D: Into<Option<Duration>>,
1098 {
1099 self.pool_config.idle_timeout = val.into();
1100 self
1101 }
1102
1103 #[doc(hidden)]
1104 #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
1105 pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1106 self.pool_config.max_idle_per_host = max_idle;
1107 self
1108 }
1109
1110 /// Sets the maximum idle connection per host allowed in the pool.
1111 ///
1112 /// Default is `usize::MAX` (no limit).
1113 pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1114 self.pool_config.max_idle_per_host = max_idle;
1115 self
1116 }
1117
1118 // HTTP/1 options
1119
1120 /// Sets the exact size of the read buffer to *always* use.
1121 ///
1122 /// Note that setting this option unsets the `http1_max_buf_size` option.
1123 ///
1124 /// Default is an adaptive read buffer.
1125 #[cfg(feature = "http1")]
1126 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1127 pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1128 self.h1_builder.read_buf_exact_size(Some(sz));
1129 self
1130 }
1131
1132 /// Set the maximum buffer size for the connection.
1133 ///
1134 /// Default is ~400kb.
1135 ///
1136 /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1137 ///
1138 /// # Panics
1139 ///
1140 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1141 #[cfg(feature = "http1")]
1142 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1143 pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1144 self.h1_builder.max_buf_size(max);
1145 self
1146 }
1147
1148 /// Set whether HTTP/1 connections will accept spaces between header names
1149 /// and the colon that follow them in responses.
1150 ///
1151 /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1152 /// parsing.
1153 ///
1154 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1155 /// to say about it:
1156 ///
1157 /// > No whitespace is allowed between the header field-name and colon. In
1158 /// > the past, differences in the handling of such whitespace have led to
1159 /// > security vulnerabilities in request routing and response handling. A
1160 /// > server MUST reject any received request message that contains
1161 /// > whitespace between a header field-name and colon with a response code
1162 /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1163 /// > response message before forwarding the message downstream.
1164 ///
1165 /// Note that this setting does not affect HTTP/2.
1166 ///
1167 /// Default is false.
1168 ///
1169 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1170 #[cfg(feature = "http1")]
1171 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1172 pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1173 self.h1_builder
1174 .allow_spaces_after_header_name_in_responses(val);
1175 self
1176 }
1177
1178 /// Set whether HTTP/1 connections will accept obsolete line folding for
1179 /// header values.
1180 ///
1181 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1182 /// to say about it:
1183 ///
1184 /// > A server that receives an obs-fold in a request message that is not
1185 /// > within a message/http container MUST either reject the message by
1186 /// > sending a 400 (Bad Request), preferably with a representation
1187 /// > explaining that obsolete line folding is unacceptable, or replace
1188 /// > each received obs-fold with one or more SP octets prior to
1189 /// > interpreting the field value or forwarding the message downstream.
1190 ///
1191 /// > A proxy or gateway that receives an obs-fold in a response message
1192 /// > that is not within a message/http container MUST either discard the
1193 /// > message and replace it with a 502 (Bad Gateway) response, preferably
1194 /// > with a representation explaining that unacceptable line folding was
1195 /// > received, or replace each received obs-fold with one or more SP
1196 /// > octets prior to interpreting the field value or forwarding the
1197 /// > message downstream.
1198 ///
1199 /// > A user agent that receives an obs-fold in a response message that is
1200 /// > not within a message/http container MUST replace each received
1201 /// > obs-fold with one or more SP octets prior to interpreting the field
1202 /// > value.
1203 ///
1204 /// Note that this setting does not affect HTTP/2.
1205 ///
1206 /// Default is false.
1207 ///
1208 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1209 #[cfg(feature = "http1")]
1210 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1211 pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1212 self.h1_builder
1213 .allow_obsolete_multiline_headers_in_responses(val);
1214 self
1215 }
1216
1217 /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1218 ///
1219 /// This mimics the behaviour of major browsers. You probably don't want this.
1220 /// You should only want this if you are implementing a proxy whose main
1221 /// purpose is to sit in front of browsers whose users access arbitrary content
1222 /// which may be malformed, and they expect everything that works without
1223 /// the proxy to keep working with the proxy.
1224 ///
1225 /// This option will prevent Hyper's client from returning an error encountered
1226 /// when parsing a header, except if the error was caused by the character NUL
1227 /// (ASCII code 0), as Chrome specifically always reject those.
1228 ///
1229 /// The ignorable errors are:
1230 /// * empty header names;
1231 /// * characters that are not allowed in header names, except for `\0` and `\r`;
1232 /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1233 /// spaces and tabs between the header name and the colon;
1234 /// * missing colon between header name and colon;
1235 /// * characters that are not allowed in header values except for `\0` and `\r`.
1236 ///
1237 /// If an ignorable error is encountered, the parser tries to find the next
1238 /// line in the input to resume parsing the rest of the headers. An error
1239 /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1240 /// looking for the next line.
1241 #[cfg(feature = "http1")]
1242 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1243 pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1244 self.h1_builder.ignore_invalid_headers_in_responses(val);
1245 self
1246 }
1247
1248 /// Set whether HTTP/1 connections should try to use vectored writes,
1249 /// or always flatten into a single buffer.
1250 ///
1251 /// Note that setting this to false may mean more copies of body data,
1252 /// but may also improve performance when an IO transport doesn't
1253 /// support vectored writes well, such as most TLS implementations.
1254 ///
1255 /// Setting this to true will force hyper to use queued strategy
1256 /// which may eliminate unnecessary cloning on some TLS backends
1257 ///
1258 /// Default is `auto`. In this mode hyper will try to guess which
1259 /// mode to use
1260 #[cfg(feature = "http1")]
1261 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1262 pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1263 self.h1_builder.writev(enabled);
1264 self
1265 }
1266
1267 /// Set whether HTTP/1 connections will write header names as title case at
1268 /// the socket level.
1269 ///
1270 /// Note that this setting does not affect HTTP/2.
1271 ///
1272 /// Default is false.
1273 #[cfg(feature = "http1")]
1274 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1275 pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1276 self.h1_builder.title_case_headers(val);
1277 self
1278 }
1279
1280 /// Set whether to support preserving original header cases.
1281 ///
1282 /// Currently, this will record the original cases received, and store them
1283 /// in a private extension on the `Response`. It will also look for and use
1284 /// such an extension in any provided `Request`.
1285 ///
1286 /// Since the relevant extension is still private, there is no way to
1287 /// interact with the original cases. The only effect this can have now is
1288 /// to forward the cases in a proxy-like fashion.
1289 ///
1290 /// Note that this setting does not affect HTTP/2.
1291 ///
1292 /// Default is false.
1293 #[cfg(feature = "http1")]
1294 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1295 pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1296 self.h1_builder.preserve_header_case(val);
1297 self
1298 }
1299
1300 /// Set the maximum number of headers.
1301 ///
1302 /// When a response is received, the parser will reserve a buffer to store headers for optimal
1303 /// performance.
1304 ///
1305 /// If client receives more headers than the buffer size, the error "message header too large"
1306 /// is returned.
1307 ///
1308 /// The headers is allocated on the stack by default, which has higher performance. After
1309 /// setting this value, headers will be allocated in heap memory, that is, heap memory
1310 /// allocation will occur for each response, and there will be a performance drop of about 5%.
1311 ///
1312 /// Note that this setting does not affect HTTP/2.
1313 ///
1314 /// Default is 100.
1315 #[cfg(feature = "http1")]
1316 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1317 pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
1318 self.h1_builder.max_headers(val);
1319 self
1320 }
1321
1322 /// Set whether HTTP/0.9 responses should be tolerated.
1323 ///
1324 /// Default is false.
1325 #[cfg(feature = "http1")]
1326 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1327 pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1328 self.h1_builder.http09_responses(val);
1329 self
1330 }
1331
1332 /// Set whether the connection **must** use HTTP/2.
1333 ///
1334 /// The destination must either allow HTTP2 Prior Knowledge, or the
1335 /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1336 /// as part of the connection process. This will not make the `Client`
1337 /// utilize ALPN by itself.
1338 ///
1339 /// Note that setting this to true prevents HTTP/1 from being allowed.
1340 ///
1341 /// Default is false.
1342 #[cfg(feature = "http2")]
1343 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1344 pub fn http2_only(&mut self, val: bool) -> &mut Self {
1345 self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1346 self
1347 }
1348
1349 /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
1350 ///
1351 /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
1352 /// As of v0.4.0, it is 20.
1353 ///
1354 /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
1355 #[cfg(feature = "http2")]
1356 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1357 pub fn http2_max_pending_accept_reset_streams(
1358 &mut self,
1359 max: impl Into<Option<usize>>,
1360 ) -> &mut Self {
1361 self.h2_builder.max_pending_accept_reset_streams(max.into());
1362 self
1363 }
1364
1365 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1366 /// stream-level flow control.
1367 ///
1368 /// Passing `None` will do nothing.
1369 ///
1370 /// If not set, hyper will use a default.
1371 ///
1372 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1373 #[cfg(feature = "http2")]
1374 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1375 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1376 self.h2_builder.initial_stream_window_size(sz.into());
1377 self
1378 }
1379
1380 /// Sets the max connection-level flow control for HTTP2
1381 ///
1382 /// Passing `None` will do nothing.
1383 ///
1384 /// If not set, hyper will use a default.
1385 #[cfg(feature = "http2")]
1386 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1387 pub fn http2_initial_connection_window_size(
1388 &mut self,
1389 sz: impl Into<Option<u32>>,
1390 ) -> &mut Self {
1391 self.h2_builder.initial_connection_window_size(sz.into());
1392 self
1393 }
1394
1395 /// Sets the initial maximum of locally initiated (send) streams.
1396 ///
1397 /// This value will be overwritten by the value included in the initial
1398 /// SETTINGS frame received from the peer as part of a [connection preface].
1399 ///
1400 /// Passing `None` will do nothing.
1401 ///
1402 /// If not set, hyper will use a default.
1403 ///
1404 /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
1405 #[cfg(feature = "http2")]
1406 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1407 pub fn http2_initial_max_send_streams(
1408 &mut self,
1409 initial: impl Into<Option<usize>>,
1410 ) -> &mut Self {
1411 self.h2_builder.initial_max_send_streams(initial);
1412 self
1413 }
1414
1415 /// Sets whether to use an adaptive flow control.
1416 ///
1417 /// Enabling this will override the limits set in
1418 /// `http2_initial_stream_window_size` and
1419 /// `http2_initial_connection_window_size`.
1420 #[cfg(feature = "http2")]
1421 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1422 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1423 self.h2_builder.adaptive_window(enabled);
1424 self
1425 }
1426
1427 /// Sets the maximum frame size to use for HTTP2.
1428 ///
1429 /// Passing `None` will do nothing.
1430 ///
1431 /// If not set, hyper will use a default.
1432 #[cfg(feature = "http2")]
1433 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1434 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1435 self.h2_builder.max_frame_size(sz);
1436 self
1437 }
1438
1439 /// Sets the max size of received header frames for HTTP2.
1440 ///
1441 /// Default is currently 16KB, but can change.
1442 #[cfg(feature = "http2")]
1443 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1444 pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
1445 self.h2_builder.max_header_list_size(max);
1446 self
1447 }
1448
1449 /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1450 /// connection alive.
1451 ///
1452 /// Pass `None` to disable HTTP2 keep-alive.
1453 ///
1454 /// Default is currently disabled.
1455 ///
1456 /// # Cargo Feature
1457 ///
1458 /// Requires the `tokio` cargo feature to be enabled.
1459 #[cfg(feature = "tokio")]
1460 #[cfg(feature = "http2")]
1461 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1462 pub fn http2_keep_alive_interval(
1463 &mut self,
1464 interval: impl Into<Option<Duration>>,
1465 ) -> &mut Self {
1466 self.h2_builder.keep_alive_interval(interval);
1467 self
1468 }
1469
1470 /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1471 ///
1472 /// If the ping is not acknowledged within the timeout, the connection will
1473 /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1474 ///
1475 /// Default is 20 seconds.
1476 ///
1477 /// # Cargo Feature
1478 ///
1479 /// Requires the `tokio` cargo feature to be enabled.
1480 #[cfg(feature = "tokio")]
1481 #[cfg(feature = "http2")]
1482 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1483 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1484 self.h2_builder.keep_alive_timeout(timeout);
1485 self
1486 }
1487
1488 /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1489 ///
1490 /// If disabled, keep-alive pings are only sent while there are open
1491 /// request/responses streams. If enabled, pings are also sent when no
1492 /// streams are active. Does nothing if `http2_keep_alive_interval` is
1493 /// disabled.
1494 ///
1495 /// Default is `false`.
1496 ///
1497 /// # Cargo Feature
1498 ///
1499 /// Requires the `tokio` cargo feature to be enabled.
1500 #[cfg(feature = "tokio")]
1501 #[cfg(feature = "http2")]
1502 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1503 pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1504 self.h2_builder.keep_alive_while_idle(enabled);
1505 self
1506 }
1507
1508 /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1509 ///
1510 /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1511 /// details.
1512 ///
1513 /// The default value is determined by the `h2` crate.
1514 ///
1515 /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1516 #[cfg(feature = "http2")]
1517 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1518 pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1519 self.h2_builder.max_concurrent_reset_streams(max);
1520 self
1521 }
1522
1523 /// Provide a timer to be used for h2
1524 ///
1525 /// See the documentation of [`h2::client::Builder::timer`] for more
1526 /// details.
1527 ///
1528 /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
1529 pub fn timer<M>(&mut self, timer: M) -> &mut Self
1530 where
1531 M: Timer + Send + Sync + 'static,
1532 {
1533 #[cfg(feature = "http2")]
1534 self.h2_builder.timer(timer);
1535 self
1536 }
1537
1538 /// Provide a timer to be used for timeouts and intervals in connection pools.
1539 pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
1540 where
1541 M: Timer + Clone + Send + Sync + 'static,
1542 {
1543 self.pool_timer = Some(timer::Timer::new(timer.clone()));
1544 self
1545 }
1546
1547 /// Set the maximum write buffer size for each HTTP/2 stream.
1548 ///
1549 /// Default is currently 1MB, but may change.
1550 ///
1551 /// # Panics
1552 ///
1553 /// The value must be no larger than `u32::MAX`.
1554 #[cfg(feature = "http2")]
1555 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1556 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1557 self.h2_builder.max_send_buf_size(max);
1558 self
1559 }
1560
1561 /// Set whether to retry requests that get disrupted before ever starting
1562 /// to write.
1563 ///
1564 /// This means a request that is queued, and gets given an idle, reused
1565 /// connection, and then encounters an error immediately as the idle
1566 /// connection was found to be unusable.
1567 ///
1568 /// When this is set to `false`, the related `ResponseFuture` would instead
1569 /// resolve to an `Error::Cancel`.
1570 ///
1571 /// Default is `true`.
1572 #[inline]
1573 pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1574 self.client_config.retry_canceled_requests = val;
1575 self
1576 }
1577
1578 /// Set whether to automatically add the `Host` header to requests.
1579 ///
1580 /// If true, and a request does not include a `Host` header, one will be
1581 /// added automatically, derived from the authority of the `Uri`.
1582 ///
1583 /// Default is `true`.
1584 #[inline]
1585 pub fn set_host(&mut self, val: bool) -> &mut Self {
1586 self.client_config.set_host = val;
1587 self
1588 }
1589
1590 /// Build a client with this configuration and the default `HttpConnector`.
1591 #[cfg(feature = "tokio")]
1592 pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1593 where
1594 B: Body + Send,
1595 B::Data: Send,
1596 {
1597 let mut connector = HttpConnector::new();
1598 if self.pool_config.is_enabled() {
1599 connector.set_keepalive(self.pool_config.idle_timeout);
1600 }
1601 self.build(connector)
1602 }
1603
1604 /// Combine the configuration of this builder with a connector to create a `Client`.
1605 pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1606 where
1607 C: Connect + Clone,
1608 B: Body + Send,
1609 B::Data: Send,
1610 {
1611 let exec = self.exec.clone();
1612 let timer = self.pool_timer.clone();
1613 Client {
1614 config: self.client_config,
1615 exec: exec.clone(),
1616 #[cfg(feature = "http1")]
1617 h1_builder: self.h1_builder.clone(),
1618 #[cfg(feature = "http2")]
1619 h2_builder: self.h2_builder.clone(),
1620 connector,
1621 pool: pool::Pool::new(self.pool_config, exec, timer),
1622 }
1623 }
1624}
1625
1626impl fmt::Debug for Builder {
1627 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1628 f.debug_struct("Builder")
1629 .field("client_config", &self.client_config)
1630 .field("pool_config", &self.pool_config)
1631 .finish()
1632 }
1633}
1634
1635// ==== impl Error ====
1636
1637impl fmt::Debug for Error {
1638 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1639 let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
1640 f.field(&self.kind);
1641 if let Some(ref cause) = self.source {
1642 f.field(cause);
1643 }
1644 f.finish()
1645 }
1646}
1647
1648impl fmt::Display for Error {
1649 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1650 write!(f, "client error ({:?})", self.kind)
1651 }
1652}
1653
1654impl StdError for Error {
1655 fn source(&self) -> Option<&(dyn StdError + 'static)> {
1656 self.source.as_ref().map(|e| &**e as _)
1657 }
1658}
1659
1660impl Error {
1661 /// Returns true if this was an error from `Connect`.
1662 pub fn is_connect(&self) -> bool {
1663 matches!(self.kind, ErrorKind::Connect)
1664 }
1665
1666 /// Returns the info of the client connection on which this error occurred.
1667 #[cfg(any(feature = "http1", feature = "http2"))]
1668 pub fn connect_info(&self) -> Option<&Connected> {
1669 self.connect_info.as_ref()
1670 }
1671
1672 #[cfg(any(feature = "http1", feature = "http2"))]
1673 fn with_connect_info(self, connect_info: Connected) -> Self {
1674 Self {
1675 connect_info: Some(connect_info),
1676 ..self
1677 }
1678 }
1679 fn is_canceled(&self) -> bool {
1680 matches!(self.kind, ErrorKind::Canceled)
1681 }
1682
1683 fn tx(src: hyper::Error) -> Self {
1684 e!(SendRequest, src)
1685 }
1686
1687 fn closed(src: hyper::Error) -> Self {
1688 e!(ChannelClosed, src)
1689 }
1690}