miku_hyper_util/client/legacy/client.rs
1//! The legacy HTTP Client from 0.14.x
2//!
3//! This `Client` will eventually be deconstructed into more composable parts.
4//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists
5//! in much the same way it did in hyper 0.14.
6
7use std::error::Error as StdError;
8use std::fmt;
9use std::future::Future;
10use std::pin::Pin;
11use std::task::{self, Poll};
12use std::time::Duration;
13
14use futures_util::future::{self, Either, FutureExt, TryFutureExt};
15use http::uri::Scheme;
16use hyper::client::conn::TrySendError as ConnTrySendError;
17#[cfg(feature = "http2")]
18use hyper::ext::{FramePriority, FrameStreamDependency, PseudoType};
19use hyper::header::{HeaderValue, HOST};
20use hyper::rt::Timer;
21use hyper::{body::Body, Method, Request, Response, Uri, Version};
22use tracing::{debug, trace, warn};
23
24use super::connect::capture::CaptureConnectionExtension;
25#[cfg(feature = "tokio")]
26use super::connect::HttpConnector;
27use super::connect::{Alpn, Connect, Connected, Connection};
28use super::pool::{self, Ver};
29
30use crate::common::{lazy as hyper_lazy, timer, Exec, Lazy, SyncWrapper};
31
32type BoxSendFuture = Pin<Box<dyn Future<Output = ()> + Send>>;
33
34/// A Client to make outgoing HTTP requests.
35///
36/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The
37/// underlying connection pool will be reused.
38#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
39pub struct Client<C, B> {
40 config: Config,
41 connector: C,
42 exec: Exec,
43 #[cfg(feature = "http1")]
44 h1_builder: hyper::client::conn::http1::Builder,
45 #[cfg(feature = "http2")]
46 h2_builder: hyper::client::conn::http2::Builder<Exec>,
47 pool: pool::Pool<PoolClient<B>, PoolKey>,
48}
49
50#[derive(Clone, Copy, Debug)]
51struct Config {
52 retry_canceled_requests: bool,
53 set_host: bool,
54 ver: Ver,
55}
56
57/// Client errors
58pub struct Error {
59 kind: ErrorKind,
60 source: Option<Box<dyn StdError + Send + Sync>>,
61 #[cfg(any(feature = "http1", feature = "http2"))]
62 connect_info: Option<Connected>,
63}
64
65#[derive(Debug)]
66enum ErrorKind {
67 Canceled,
68 ChannelClosed,
69 Connect,
70 UserUnsupportedRequestMethod,
71 UserUnsupportedVersion,
72 UserAbsoluteUriRequired,
73 SendRequest,
74}
75
76macro_rules! e {
77 ($kind:ident) => {
78 Error {
79 kind: ErrorKind::$kind,
80 source: None,
81 connect_info: None,
82 }
83 };
84 ($kind:ident, $src:expr) => {
85 Error {
86 kind: ErrorKind::$kind,
87 source: Some($src.into()),
88 connect_info: None,
89 }
90 };
91}
92
93// We might change this... :shrug:
94type PoolKey = (http::uri::Scheme, http::uri::Authority);
95
96enum TrySendError<B> {
97 Retryable {
98 error: Error,
99 req: Request<B>,
100 connection_reused: bool,
101 },
102 Nope(Error),
103}
104
105/// A `Future` that will resolve to an HTTP Response.
106///
107/// This is returned by `Client::request` (and `Client::get`).
108#[must_use = "futures do nothing unless polled"]
109pub struct ResponseFuture {
110 inner: SyncWrapper<
111 Pin<Box<dyn Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send>>,
112 >,
113}
114
115// ===== impl Client =====
116
117impl Client<(), ()> {
118 /// Create a builder to configure a new `Client`.
119 ///
120 /// # Example
121 ///
122 /// ```
123 /// # #[cfg(feature = "tokio")]
124 /// # fn run () {
125 /// use std::time::Duration;
126 /// use miku_hyper_util::client::legacy::Client;
127 /// use miku_hyper_util::rt::TokioExecutor;
128 ///
129 /// let client = Client::builder(TokioExecutor::new())
130 /// .pool_idle_timeout(Duration::from_secs(30))
131 /// .http2_only(true)
132 /// .build_http();
133 /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
134 /// # drop(infer);
135 /// # }
136 /// # fn main() {}
137 /// ```
138 pub fn builder<E>(executor: E) -> Builder
139 where
140 E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
141 {
142 Builder::new(executor)
143 }
144}
145
146impl<C, B> Client<C, B>
147where
148 C: Connect + Clone + Send + Sync + 'static,
149 B: Body + Send + 'static + Unpin,
150 B::Data: Send,
151 B::Error: Into<Box<dyn StdError + Send + Sync>>,
152{
153 /// Send a `GET` request to the supplied `Uri`.
154 ///
155 /// # Note
156 ///
157 /// This requires that the `Body` type have a `Default` implementation.
158 /// It *should* return an "empty" version of itself, such that
159 /// `Body::is_end_stream` is `true`.
160 ///
161 /// # Example
162 ///
163 /// ```
164 /// # #[cfg(feature = "tokio")]
165 /// # fn run () {
166 /// use hyper::Uri;
167 /// use miku_hyper_util::client::legacy::Client;
168 /// use miku_hyper_util::rt::TokioExecutor;
169 /// use bytes::Bytes;
170 /// use http_body_util::Full;
171 ///
172 /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
173 ///
174 /// let future = client.get(Uri::from_static("http://httpbin.org/ip"));
175 /// # }
176 /// # fn main() {}
177 /// ```
178 pub fn get(&self, uri: Uri) -> ResponseFuture
179 where
180 B: Default,
181 {
182 let body = B::default();
183 if !body.is_end_stream() {
184 warn!("default Body used for get() does not return true for is_end_stream");
185 }
186
187 let mut req = Request::new(body);
188 *req.uri_mut() = uri;
189 self.request(req)
190 }
191
192 /// Send a constructed `Request` using this `Client`.
193 ///
194 /// # Example
195 ///
196 /// ```
197 /// # #[cfg(feature = "tokio")]
198 /// # fn run () {
199 /// use hyper::{Method, Request};
200 /// use miku_hyper_util::client::legacy::Client;
201 /// use http_body_util::Full;
202 /// use miku_hyper_util::rt::TokioExecutor;
203 /// use bytes::Bytes;
204 ///
205 /// let client: Client<_, Full<Bytes>> = Client::builder(TokioExecutor::new()).build_http();
206 ///
207 /// let req: Request<Full<Bytes>> = Request::builder()
208 /// .method(Method::POST)
209 /// .uri("http://httpbin.org/post")
210 /// .body(Full::from("Hallo!"))
211 /// .expect("request builder");
212 ///
213 /// let future = client.request(req);
214 /// # }
215 /// # fn main() {}
216 /// ```
217 pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
218 let is_http_connect = req.method() == Method::CONNECT;
219 match req.version() {
220 Version::HTTP_11 => (),
221 Version::HTTP_10 => {
222 if is_http_connect {
223 warn!("CONNECT is not allowed for HTTP/1.0");
224 return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod)));
225 }
226 }
227 Version::HTTP_2 => (),
228 // completely unsupported HTTP version (like HTTP/0.9)!
229 other => return ResponseFuture::error_version(other),
230 };
231
232 let pool_key = match extract_domain(req.uri_mut(), is_http_connect) {
233 Ok(s) => s,
234 Err(err) => {
235 return ResponseFuture::new(future::err(err));
236 }
237 };
238
239 ResponseFuture::new(self.clone().send_request(req, pool_key))
240 }
241
242 async fn send_request(
243 self,
244 mut req: Request<B>,
245 pool_key: PoolKey,
246 ) -> Result<Response<hyper::body::Incoming>, Error> {
247 let uri = req.uri().clone();
248
249 loop {
250 req = match self.try_send_request(req, pool_key.clone()).await {
251 Ok(resp) => return Ok(resp),
252 Err(TrySendError::Nope(err)) => return Err(err),
253 Err(TrySendError::Retryable {
254 mut req,
255 error,
256 connection_reused,
257 }) => {
258 if !self.config.retry_canceled_requests || !connection_reused {
259 // if client disabled, don't retry
260 // a fresh connection means we definitely can't retry
261 return Err(error);
262 }
263
264 trace!(
265 "unstarted request canceled, trying again (reason={:?})",
266 error
267 );
268 *req.uri_mut() = uri.clone();
269 req
270 }
271 }
272 }
273 }
274
275 async fn try_send_request(
276 &self,
277 mut req: Request<B>,
278 pool_key: PoolKey,
279 ) -> Result<Response<hyper::body::Incoming>, TrySendError<B>> {
280 let mut pooled = self
281 .connection_for(pool_key)
282 .await
283 // `connection_for` already retries checkout errors, so if
284 // it returns an error, there's not much else to retry
285 .map_err(TrySendError::Nope)?;
286
287 req.extensions_mut()
288 .get_mut::<CaptureConnectionExtension>()
289 .map(|conn| conn.set(&pooled.conn_info));
290
291 if pooled.is_http1() {
292 if req.version() == Version::HTTP_2 {
293 warn!("Connection is HTTP/1, but request requires HTTP/2");
294 return Err(TrySendError::Nope(
295 e!(UserUnsupportedVersion).with_connect_info(pooled.conn_info.clone()),
296 ));
297 }
298
299 if self.config.set_host {
300 let uri = req.uri().clone();
301 req.headers_mut().entry(HOST).or_insert_with(|| {
302 let hostname = uri.host().expect("authority implies host");
303 if let Some(port) = get_non_default_port(&uri) {
304 let s = format!("{}:{}", hostname, port);
305 HeaderValue::from_str(&s)
306 } else {
307 HeaderValue::from_str(hostname)
308 }
309 .expect("uri host is valid header value")
310 });
311 }
312
313 // CONNECT always sends authority-form, so check it first...
314 if req.method() == Method::CONNECT {
315 authority_form(req.uri_mut());
316 } else if pooled.conn_info.is_proxied {
317 absolute_form(req.uri_mut());
318 } else {
319 origin_form(req.uri_mut());
320 }
321 } else if req.method() == Method::CONNECT {
322 authority_form(req.uri_mut());
323 }
324
325 let mut res = match pooled.try_send_request(req).await {
326 Ok(res) => res,
327 Err(mut err) => {
328 return if let Some(req) = err.take_message() {
329 Err(TrySendError::Retryable {
330 connection_reused: pooled.is_reused(),
331 error: e!(Canceled, err.into_error())
332 .with_connect_info(pooled.conn_info.clone()),
333 req,
334 })
335 } else {
336 Err(TrySendError::Nope(
337 e!(SendRequest, err.into_error())
338 .with_connect_info(pooled.conn_info.clone()),
339 ))
340 }
341 }
342 };
343
344 // If the Connector included 'extra' info, add to Response...
345 if let Some(extra) = &pooled.conn_info.extra {
346 extra.set(res.extensions_mut());
347 }
348
349 // As of futures@0.1.21, there is a race condition in the mpsc
350 // channel, such that sending when the receiver is closing can
351 // result in the message being stuck inside the queue. It won't
352 // ever notify until the Sender side is dropped.
353 //
354 // To counteract this, we must check if our senders 'want' channel
355 // has been closed after having tried to send. If so, error out...
356 if pooled.is_closed() {
357 return Ok(res);
358 }
359
360 // If pooled is HTTP/2, we can toss this reference immediately.
361 //
362 // when pooled is dropped, it will try to insert back into the
363 // pool. To delay that, spawn a future that completes once the
364 // sender is ready again.
365 //
366 // This *should* only be once the related `Connection` has polled
367 // for a new request to start.
368 //
369 // It won't be ready if there is a body to stream.
370 if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() {
371 drop(pooled);
372 } else if !res.body().is_end_stream() {
373 //let (delayed_tx, delayed_rx) = oneshot::channel::<()>();
374 //res.body_mut().delayed_eof(delayed_rx);
375 let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| {
376 // At this point, `pooled` is dropped, and had a chance
377 // to insert into the pool (if conn was idle)
378 //drop(delayed_tx);
379 });
380
381 self.exec.execute(on_idle);
382 } else {
383 // There's no body to delay, but the connection isn't
384 // ready yet. Only re-insert when it's ready
385 let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ());
386
387 self.exec.execute(on_idle);
388 }
389
390 Ok(res)
391 }
392
393 async fn connection_for(
394 &self,
395 pool_key: PoolKey,
396 ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, Error> {
397 loop {
398 match self.one_connection_for(pool_key.clone()).await {
399 Ok(pooled) => return Ok(pooled),
400 Err(ClientConnectError::Normal(err)) => return Err(err),
401 Err(ClientConnectError::CheckoutIsClosed(reason)) => {
402 if !self.config.retry_canceled_requests {
403 return Err(e!(Connect, reason));
404 }
405
406 trace!(
407 "unstarted request canceled, trying again (reason={:?})",
408 reason,
409 );
410 continue;
411 }
412 };
413 }
414 }
415
416 async fn one_connection_for(
417 &self,
418 pool_key: PoolKey,
419 ) -> Result<pool::Pooled<PoolClient<B>, PoolKey>, ClientConnectError> {
420 // Return a single connection if pooling is not enabled
421 if !self.pool.is_enabled() {
422 return self
423 .connect_to(pool_key)
424 .await
425 .map_err(ClientConnectError::Normal);
426 }
427
428 // This actually races 2 different futures to try to get a ready
429 // connection the fastest, and to reduce connection churn.
430 //
431 // - If the pool has an idle connection waiting, that's used
432 // immediately.
433 // - Otherwise, the Connector is asked to start connecting to
434 // the destination Uri.
435 // - Meanwhile, the pool Checkout is watching to see if any other
436 // request finishes and tries to insert an idle connection.
437 // - If a new connection is started, but the Checkout wins after
438 // (an idle connection became available first), the started
439 // connection future is spawned into the runtime to complete,
440 // and then be inserted into the pool as an idle connection.
441 let checkout = self.pool.checkout(pool_key.clone());
442 let connect = self.connect_to(pool_key);
443 let is_ver_h2 = self.config.ver == Ver::Http2;
444
445 // The order of the `select` is depended on below...
446
447 match future::select(checkout, connect).await {
448 // Checkout won, connect future may have been started or not.
449 //
450 // If it has, let it finish and insert back into the pool,
451 // so as to not waste the socket...
452 Either::Left((Ok(checked_out), connecting)) => {
453 // This depends on the `select` above having the correct
454 // order, such that if the checkout future were ready
455 // immediately, the connect future will never have been
456 // started.
457 //
458 // If it *wasn't* ready yet, then the connect future will
459 // have been started...
460 if connecting.started() {
461 let bg = connecting
462 .map_err(|err| {
463 trace!("background connect error: {}", err);
464 })
465 .map(|_pooled| {
466 // dropping here should just place it in
467 // the Pool for us...
468 });
469 // An execute error here isn't important, we're just trying
470 // to prevent a waste of a socket...
471 self.exec.execute(bg);
472 }
473 Ok(checked_out)
474 }
475 // Connect won, checkout can just be dropped.
476 Either::Right((Ok(connected), _checkout)) => Ok(connected),
477 // Either checkout or connect could get canceled:
478 //
479 // 1. Connect is canceled if this is HTTP/2 and there is
480 // an outstanding HTTP/2 connecting task.
481 // 2. Checkout is canceled if the pool cannot deliver an
482 // idle connection reliably.
483 //
484 // In both cases, we should just wait for the other future.
485 Either::Left((Err(err), connecting)) => {
486 if err.is_canceled() {
487 connecting.await.map_err(ClientConnectError::Normal)
488 } else {
489 Err(ClientConnectError::Normal(e!(Connect, err)))
490 }
491 }
492 Either::Right((Err(err), checkout)) => {
493 if err.is_canceled() {
494 checkout.await.map_err(move |err| {
495 if is_ver_h2 && err.is_canceled() {
496 ClientConnectError::CheckoutIsClosed(err)
497 } else {
498 ClientConnectError::Normal(e!(Connect, err))
499 }
500 })
501 } else {
502 Err(ClientConnectError::Normal(err))
503 }
504 }
505 }
506 }
507
508 #[cfg(any(feature = "http1", feature = "http2"))]
509 fn connect_to(
510 &self,
511 pool_key: PoolKey,
512 ) -> impl Lazy<Output = Result<pool::Pooled<PoolClient<B>, PoolKey>, Error>> + Send + Unpin
513 {
514 let executor = self.exec.clone();
515 let pool = self.pool.clone();
516 #[cfg(feature = "http1")]
517 let h1_builder = self.h1_builder.clone();
518 #[cfg(feature = "http2")]
519 let h2_builder = self.h2_builder.clone();
520 let ver = self.config.ver;
521 let is_ver_h2 = ver == Ver::Http2;
522 let connector = self.connector.clone();
523 let dst = domain_as_uri(pool_key.clone());
524 hyper_lazy(move || {
525 // Try to take a "connecting lock".
526 //
527 // If the pool_key is for HTTP/2, and there is already a
528 // connection being established, then this can't take a
529 // second lock. The "connect_to" future is Canceled.
530 let connecting = match pool.connecting(&pool_key, ver) {
531 Some(lock) => lock,
532 None => {
533 let canceled = e!(Canceled);
534 // TODO
535 //crate::Error::new_canceled().with("HTTP/2 connection in progress");
536 return Either::Right(future::err(canceled));
537 }
538 };
539 Either::Left(
540 connector
541 .connect(super::connect::sealed::Internal, dst)
542 .map_err(|src| e!(Connect, src))
543 .and_then(move |io| {
544 let connected = io.connected();
545 // If ALPN is h2 and we aren't http2_only already,
546 // then we need to convert our pool checkout into
547 // a single HTTP2 one.
548 let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 {
549 match connecting.alpn_h2(&pool) {
550 Some(lock) => {
551 trace!("ALPN negotiated h2, updating pool");
552 lock
553 }
554 None => {
555 // Another connection has already upgraded,
556 // the pool checkout should finish up for us.
557 let canceled = e!(Canceled, "ALPN upgraded to HTTP/2");
558 return Either::Right(future::err(canceled));
559 }
560 }
561 } else {
562 connecting
563 };
564
565 #[cfg_attr(not(feature = "http2"), allow(unused))]
566 let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2;
567
568 Either::Left(Box::pin(async move {
569 let tx = if is_h2 {
570 #[cfg(feature = "http2")] {
571 let (mut tx, conn) =
572 h2_builder.handshake(io).await.map_err(Error::tx)?;
573
574 trace!(
575 "http2 handshake complete, spawning background dispatcher task"
576 );
577 executor.execute(
578 conn.map_err(|e| debug!("client connection error: {}", e))
579 .map(|_| ()),
580 );
581
582 // Wait for 'conn' to ready up before we
583 // declare this tx as usable
584 tx.ready().await.map_err(Error::tx)?;
585 PoolTx::Http2(tx)
586 }
587 #[cfg(not(feature = "http2"))]
588 panic!("http2 feature is not enabled");
589 } else {
590 #[cfg(feature = "http1")] {
591 let (mut tx, conn) =
592 h1_builder.handshake(io).await.map_err(Error::tx)?;
593
594 trace!(
595 "http1 handshake complete, spawning background dispatcher task"
596 );
597 executor.execute(
598 conn.with_upgrades()
599 .map_err(|e| debug!("client connection error: {}", e))
600 .map(|_| ()),
601 );
602
603 // Wait for 'conn' to ready up before we
604 // declare this tx as usable
605 tx.ready().await.map_err(Error::tx)?;
606 PoolTx::Http1(tx)
607 }
608 #[cfg(not(feature = "http1"))] {
609 panic!("http1 feature is not enabled");
610 }
611 };
612
613 Ok(pool.pooled(
614 connecting,
615 PoolClient {
616 conn_info: connected,
617 tx,
618 },
619 ))
620 }))
621 }),
622 )
623 })
624 }
625}
626
627impl<C, B> tower_service::Service<Request<B>> for Client<C, B>
628where
629 C: Connect + Clone + Send + Sync + 'static,
630 B: Body + Send + 'static + Unpin,
631 B::Data: Send,
632 B::Error: Into<Box<dyn StdError + Send + Sync>>,
633{
634 type Response = Response<hyper::body::Incoming>;
635 type Error = Error;
636 type Future = ResponseFuture;
637
638 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
639 Poll::Ready(Ok(()))
640 }
641
642 fn call(&mut self, req: Request<B>) -> Self::Future {
643 self.request(req)
644 }
645}
646
647impl<C, B> tower_service::Service<Request<B>> for &'_ Client<C, B>
648where
649 C: Connect + Clone + Send + Sync + 'static,
650 B: Body + Send + 'static + Unpin,
651 B::Data: Send,
652 B::Error: Into<Box<dyn StdError + Send + Sync>>,
653{
654 type Response = Response<hyper::body::Incoming>;
655 type Error = Error;
656 type Future = ResponseFuture;
657
658 fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
659 Poll::Ready(Ok(()))
660 }
661
662 fn call(&mut self, req: Request<B>) -> Self::Future {
663 self.request(req)
664 }
665}
666
667impl<C: Clone, B> Clone for Client<C, B> {
668 fn clone(&self) -> Client<C, B> {
669 Client {
670 config: self.config,
671 exec: self.exec.clone(),
672 #[cfg(feature = "http1")]
673 h1_builder: self.h1_builder.clone(),
674 #[cfg(feature = "http2")]
675 h2_builder: self.h2_builder.clone(),
676 connector: self.connector.clone(),
677 pool: self.pool.clone(),
678 }
679 }
680}
681
682impl<C, B> fmt::Debug for Client<C, B> {
683 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
684 f.debug_struct("Client").finish()
685 }
686}
687
688// ===== impl ResponseFuture =====
689
690impl ResponseFuture {
691 fn new<F>(value: F) -> Self
692 where
693 F: Future<Output = Result<Response<hyper::body::Incoming>, Error>> + Send + 'static,
694 {
695 Self {
696 inner: SyncWrapper::new(Box::pin(value)),
697 }
698 }
699
700 fn error_version(ver: Version) -> Self {
701 warn!("Request has unsupported version \"{:?}\"", ver);
702 ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion))))
703 }
704}
705
706impl fmt::Debug for ResponseFuture {
707 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
708 f.pad("Future<Response>")
709 }
710}
711
712impl Future for ResponseFuture {
713 type Output = Result<Response<hyper::body::Incoming>, Error>;
714
715 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
716 self.inner.get_mut().as_mut().poll(cx)
717 }
718}
719
720// ===== impl PoolClient =====
721
722// FIXME: allow() required due to `impl Trait` leaking types to this lint
723#[allow(missing_debug_implementations)]
724struct PoolClient<B> {
725 conn_info: Connected,
726 tx: PoolTx<B>,
727}
728
729enum PoolTx<B> {
730 #[cfg(feature = "http1")]
731 Http1(hyper::client::conn::http1::SendRequest<B>),
732 #[cfg(feature = "http2")]
733 Http2(hyper::client::conn::http2::SendRequest<B>),
734}
735
736impl<B> PoolClient<B> {
737 fn poll_ready(
738 &mut self,
739 #[allow(unused_variables)] cx: &mut task::Context<'_>,
740 ) -> Poll<Result<(), Error>> {
741 match self.tx {
742 #[cfg(feature = "http1")]
743 PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(Error::closed),
744 #[cfg(feature = "http2")]
745 PoolTx::Http2(_) => Poll::Ready(Ok(())),
746 }
747 }
748
749 fn is_http1(&self) -> bool {
750 !self.is_http2()
751 }
752
753 fn is_http2(&self) -> bool {
754 match self.tx {
755 #[cfg(feature = "http1")]
756 PoolTx::Http1(_) => false,
757 #[cfg(feature = "http2")]
758 PoolTx::Http2(_) => true,
759 }
760 }
761
762 fn is_poisoned(&self) -> bool {
763 self.conn_info.poisoned.poisoned()
764 }
765
766 fn is_ready(&self) -> bool {
767 match self.tx {
768 #[cfg(feature = "http1")]
769 PoolTx::Http1(ref tx) => tx.is_ready(),
770 #[cfg(feature = "http2")]
771 PoolTx::Http2(ref tx) => tx.is_ready(),
772 }
773 }
774
775 fn is_closed(&self) -> bool {
776 match self.tx {
777 #[cfg(feature = "http1")]
778 PoolTx::Http1(ref tx) => tx.is_closed(),
779 #[cfg(feature = "http2")]
780 PoolTx::Http2(ref tx) => tx.is_closed(),
781 }
782 }
783}
784
785impl<B: Body + 'static> PoolClient<B> {
786 fn try_send_request(
787 &mut self,
788 req: Request<B>,
789 ) -> impl Future<Output = Result<Response<hyper::body::Incoming>, ConnTrySendError<Request<B>>>>
790 where
791 B: Send,
792 {
793 #[cfg(all(feature = "http1", feature = "http2"))]
794 return match self.tx {
795 #[cfg(feature = "http1")]
796 PoolTx::Http1(ref mut tx) => Either::Left(tx.try_send_request(req)),
797 #[cfg(feature = "http2")]
798 PoolTx::Http2(ref mut tx) => Either::Right(tx.try_send_request(req)),
799 };
800
801 #[cfg(feature = "http1")]
802 #[cfg(not(feature = "http2"))]
803 return match self.tx {
804 #[cfg(feature = "http1")]
805 PoolTx::Http1(ref mut tx) => tx.try_send_request(req),
806 };
807
808 #[cfg(not(feature = "http1"))]
809 #[cfg(feature = "http2")]
810 return match self.tx {
811 #[cfg(feature = "http2")]
812 PoolTx::Http2(ref mut tx) => tx.try_send_request(req),
813 };
814 }
815}
816
817impl<B> pool::Poolable for PoolClient<B>
818where
819 B: Send + 'static,
820{
821 fn is_open(&self) -> bool {
822 !self.is_poisoned() && self.is_ready()
823 }
824
825 fn reserve(self) -> pool::Reservation<Self> {
826 match self.tx {
827 #[cfg(feature = "http1")]
828 PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient {
829 conn_info: self.conn_info,
830 tx: PoolTx::Http1(tx),
831 }),
832 #[cfg(feature = "http2")]
833 PoolTx::Http2(tx) => {
834 let b = PoolClient {
835 conn_info: self.conn_info.clone(),
836 tx: PoolTx::Http2(tx.clone()),
837 };
838 let a = PoolClient {
839 conn_info: self.conn_info,
840 tx: PoolTx::Http2(tx),
841 };
842 pool::Reservation::Shared(a, b)
843 }
844 }
845 }
846
847 fn can_share(&self) -> bool {
848 self.is_http2()
849 }
850}
851
852enum ClientConnectError {
853 Normal(Error),
854 CheckoutIsClosed(pool::Error),
855}
856
857fn origin_form(uri: &mut Uri) {
858 let path = match uri.path_and_query() {
859 Some(path) if path.as_str() != "/" => {
860 let mut parts = ::http::uri::Parts::default();
861 parts.path_and_query = Some(path.clone());
862 Uri::from_parts(parts).expect("path is valid uri")
863 }
864 _none_or_just_slash => {
865 debug_assert!(Uri::default() == "/");
866 Uri::default()
867 }
868 };
869 *uri = path
870}
871
872fn absolute_form(uri: &mut Uri) {
873 debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme");
874 debug_assert!(
875 uri.authority().is_some(),
876 "absolute_form needs an authority"
877 );
878 // If the URI is to HTTPS, and the connector claimed to be a proxy,
879 // then it *should* have tunneled, and so we don't want to send
880 // absolute-form in that case.
881 if uri.scheme() == Some(&Scheme::HTTPS) {
882 origin_form(uri);
883 }
884}
885
886fn authority_form(uri: &mut Uri) {
887 if let Some(path) = uri.path_and_query() {
888 // `https://hyper.rs` would parse with `/` path, don't
889 // annoy people about that...
890 if path != "/" {
891 warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path);
892 }
893 }
894 *uri = match uri.authority() {
895 Some(auth) => {
896 let mut parts = ::http::uri::Parts::default();
897 parts.authority = Some(auth.clone());
898 Uri::from_parts(parts).expect("authority is valid")
899 }
900 None => {
901 unreachable!("authority_form with relative uri");
902 }
903 };
904}
905
906fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result<PoolKey, Error> {
907 let uri_clone = uri.clone();
908 match (uri_clone.scheme(), uri_clone.authority()) {
909 (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())),
910 (None, Some(auth)) if is_http_connect => {
911 let scheme = match auth.port_u16() {
912 Some(443) => {
913 set_scheme(uri, Scheme::HTTPS);
914 Scheme::HTTPS
915 }
916 _ => {
917 set_scheme(uri, Scheme::HTTP);
918 Scheme::HTTP
919 }
920 };
921 Ok((scheme, auth.clone()))
922 }
923 _ => {
924 debug!("Client requires absolute-form URIs, received: {:?}", uri);
925 Err(e!(UserAbsoluteUriRequired))
926 }
927 }
928}
929
930fn domain_as_uri((scheme, auth): PoolKey) -> Uri {
931 http::uri::Builder::new()
932 .scheme(scheme)
933 .authority(auth)
934 .path_and_query("/")
935 .build()
936 .expect("domain is valid Uri")
937}
938
939fn set_scheme(uri: &mut Uri, scheme: Scheme) {
940 debug_assert!(
941 uri.scheme().is_none(),
942 "set_scheme expects no existing scheme"
943 );
944 let old = std::mem::take(uri);
945 let mut parts: ::http::uri::Parts = old.into();
946 parts.scheme = Some(scheme);
947 parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
948 *uri = Uri::from_parts(parts).expect("scheme is valid");
949}
950
951fn get_non_default_port(uri: &Uri) -> Option<http::uri::Port<&str>> {
952 match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) {
953 (Some(443), true) => None,
954 (Some(80), false) => None,
955 _ => uri.port(),
956 }
957}
958
959fn is_schema_secure(uri: &Uri) -> bool {
960 uri.scheme_str()
961 .map(|scheme_str| matches!(scheme_str, "wss" | "https"))
962 .unwrap_or_default()
963}
964
965/// A builder to configure a new [`Client`](Client).
966///
967/// # Example
968///
969/// ```
970/// # #[cfg(feature = "tokio")]
971/// # fn run () {
972/// use std::time::Duration;
973/// use miku_hyper_util::client::legacy::Client;
974/// use miku_hyper_util::rt::TokioExecutor;
975///
976/// let client = Client::builder(TokioExecutor::new())
977/// .pool_idle_timeout(Duration::from_secs(30))
978/// .http2_only(true)
979/// .build_http();
980/// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
981/// # drop(infer);
982/// # }
983/// # fn main() {}
984/// ```
985#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))]
986#[derive(Clone)]
987pub struct Builder {
988 client_config: Config,
989 exec: Exec,
990 #[cfg(feature = "http1")]
991 h1_builder: hyper::client::conn::http1::Builder,
992 #[cfg(feature = "http2")]
993 h2_builder: hyper::client::conn::http2::Builder<Exec>,
994 pool_config: pool::Config,
995 pool_timer: Option<timer::Timer>,
996}
997
998impl Builder {
999 /// Construct a new Builder.
1000 pub fn new<E>(executor: E) -> Self
1001 where
1002 E: hyper::rt::Executor<BoxSendFuture> + Send + Sync + Clone + 'static,
1003 {
1004 let exec = Exec::new(executor);
1005 Self {
1006 client_config: Config {
1007 retry_canceled_requests: true,
1008 set_host: true,
1009 ver: Ver::Auto,
1010 },
1011 exec: exec.clone(),
1012 #[cfg(feature = "http1")]
1013 h1_builder: hyper::client::conn::http1::Builder::new(),
1014 #[cfg(feature = "http2")]
1015 h2_builder: hyper::client::conn::http2::Builder::new(exec),
1016 pool_config: pool::Config {
1017 idle_timeout: Some(Duration::from_secs(90)),
1018 max_idle_per_host: usize::MAX,
1019 },
1020 pool_timer: None,
1021 }
1022 }
1023 /// Set an optional timeout for idle sockets being kept-alive.
1024 /// A `Timer` is required for this to take effect. See `Builder::pool_timer`
1025 ///
1026 /// Pass `None` to disable timeout.
1027 ///
1028 /// Default is 90 seconds.
1029 ///
1030 /// # Example
1031 ///
1032 /// ```
1033 /// # #[cfg(feature = "tokio")]
1034 /// # fn run () {
1035 /// use std::time::Duration;
1036 /// use miku_hyper_util::client::legacy::Client;
1037 /// use miku_hyper_util::rt::{TokioExecutor, TokioTimer};
1038 ///
1039 /// let client = Client::builder(TokioExecutor::new())
1040 /// .pool_idle_timeout(Duration::from_secs(30))
1041 /// .pool_timer(TokioTimer::new())
1042 /// .build_http();
1043 ///
1044 /// # let infer: Client<_, http_body_util::Full<bytes::Bytes>> = client;
1045 /// # }
1046 /// # fn main() {}
1047 /// ```
1048 pub fn pool_idle_timeout<D>(&mut self, val: D) -> &mut Self
1049 where
1050 D: Into<Option<Duration>>,
1051 {
1052 self.pool_config.idle_timeout = val.into();
1053 self
1054 }
1055
1056 #[doc(hidden)]
1057 #[deprecated(note = "renamed to `pool_max_idle_per_host`")]
1058 pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1059 self.pool_config.max_idle_per_host = max_idle;
1060 self
1061 }
1062
1063 /// Sets the maximum idle connection per host allowed in the pool.
1064 ///
1065 /// Default is `usize::MAX` (no limit).
1066 pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
1067 self.pool_config.max_idle_per_host = max_idle;
1068 self
1069 }
1070
1071 // HTTP/1 options
1072
1073 /// Sets the exact size of the read buffer to *always* use.
1074 ///
1075 /// Note that setting this option unsets the `http1_max_buf_size` option.
1076 ///
1077 /// Default is an adaptive read buffer.
1078 #[cfg(feature = "http1")]
1079 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1080 pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self {
1081 self.h1_builder.read_buf_exact_size(Some(sz));
1082 self
1083 }
1084
1085 /// Set the maximum buffer size for the connection.
1086 ///
1087 /// Default is ~400kb.
1088 ///
1089 /// Note that setting this option unsets the `http1_read_exact_buf_size` option.
1090 ///
1091 /// # Panics
1092 ///
1093 /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum.
1094 #[cfg(feature = "http1")]
1095 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1096 pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self {
1097 self.h1_builder.max_buf_size(max);
1098 self
1099 }
1100
1101 /// Set whether HTTP/1 connections will accept spaces between header names
1102 /// and the colon that follow them in responses.
1103 ///
1104 /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when
1105 /// parsing.
1106 ///
1107 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1108 /// to say about it:
1109 ///
1110 /// > No whitespace is allowed between the header field-name and colon. In
1111 /// > the past, differences in the handling of such whitespace have led to
1112 /// > security vulnerabilities in request routing and response handling. A
1113 /// > server MUST reject any received request message that contains
1114 /// > whitespace between a header field-name and colon with a response code
1115 /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a
1116 /// > response message before forwarding the message downstream.
1117 ///
1118 /// Note that this setting does not affect HTTP/2.
1119 ///
1120 /// Default is false.
1121 ///
1122 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1123 #[cfg(feature = "http1")]
1124 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1125 pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self {
1126 self.h1_builder
1127 .allow_spaces_after_header_name_in_responses(val);
1128 self
1129 }
1130
1131 /// Set whether HTTP/1 connections will accept obsolete line folding for
1132 /// header values.
1133 ///
1134 /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has
1135 /// to say about it:
1136 ///
1137 /// > A server that receives an obs-fold in a request message that is not
1138 /// > within a message/http container MUST either reject the message by
1139 /// > sending a 400 (Bad Request), preferably with a representation
1140 /// > explaining that obsolete line folding is unacceptable, or replace
1141 /// > each received obs-fold with one or more SP octets prior to
1142 /// > interpreting the field value or forwarding the message downstream.
1143 ///
1144 /// > A proxy or gateway that receives an obs-fold in a response message
1145 /// > that is not within a message/http container MUST either discard the
1146 /// > message and replace it with a 502 (Bad Gateway) response, preferably
1147 /// > with a representation explaining that unacceptable line folding was
1148 /// > received, or replace each received obs-fold with one or more SP
1149 /// > octets prior to interpreting the field value or forwarding the
1150 /// > message downstream.
1151 ///
1152 /// > A user agent that receives an obs-fold in a response message that is
1153 /// > not within a message/http container MUST replace each received
1154 /// > obs-fold with one or more SP octets prior to interpreting the field
1155 /// > value.
1156 ///
1157 /// Note that this setting does not affect HTTP/2.
1158 ///
1159 /// Default is false.
1160 ///
1161 /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4
1162 #[cfg(feature = "http1")]
1163 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1164 pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self {
1165 self.h1_builder
1166 .allow_obsolete_multiline_headers_in_responses(val);
1167 self
1168 }
1169
1170 /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses.
1171 ///
1172 /// This mimics the behaviour of major browsers. You probably don't want this.
1173 /// You should only want this if you are implementing a proxy whose main
1174 /// purpose is to sit in front of browsers whose users access arbitrary content
1175 /// which may be malformed, and they expect everything that works without
1176 /// the proxy to keep working with the proxy.
1177 ///
1178 /// This option will prevent Hyper's client from returning an error encountered
1179 /// when parsing a header, except if the error was caused by the character NUL
1180 /// (ASCII code 0), as Chrome specifically always reject those.
1181 ///
1182 /// The ignorable errors are:
1183 /// * empty header names;
1184 /// * characters that are not allowed in header names, except for `\0` and `\r`;
1185 /// * when `allow_spaces_after_header_name_in_responses` is not enabled,
1186 /// spaces and tabs between the header name and the colon;
1187 /// * missing colon between header name and colon;
1188 /// * characters that are not allowed in header values except for `\0` and `\r`.
1189 ///
1190 /// If an ignorable error is encountered, the parser tries to find the next
1191 /// line in the input to resume parsing the rest of the headers. An error
1192 /// will be emitted nonetheless if it finds `\0` or a lone `\r` while
1193 /// looking for the next line.
1194 #[cfg(feature = "http1")]
1195 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1196 pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder {
1197 self.h1_builder.ignore_invalid_headers_in_responses(val);
1198 self
1199 }
1200
1201 /// Set whether HTTP/1 connections should try to use vectored writes,
1202 /// or always flatten into a single buffer.
1203 ///
1204 /// Note that setting this to false may mean more copies of body data,
1205 /// but may also improve performance when an IO transport doesn't
1206 /// support vectored writes well, such as most TLS implementations.
1207 ///
1208 /// Setting this to true will force hyper to use queued strategy
1209 /// which may eliminate unnecessary cloning on some TLS backends
1210 ///
1211 /// Default is `auto`. In this mode hyper will try to guess which
1212 /// mode to use
1213 #[cfg(feature = "http1")]
1214 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1215 pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder {
1216 self.h1_builder.writev(enabled);
1217 self
1218 }
1219
1220 /// Set whether HTTP/1 connections will write header names as title case at
1221 /// the socket level.
1222 ///
1223 /// Note that this setting does not affect HTTP/2.
1224 ///
1225 /// Default is false.
1226 #[cfg(feature = "http1")]
1227 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1228 pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
1229 self.h1_builder.title_case_headers(val);
1230 self
1231 }
1232
1233 /// Set whether to support preserving original header cases.
1234 ///
1235 /// Currently, this will record the original cases received, and store them
1236 /// in a private extension on the `Response`. It will also look for and use
1237 /// such an extension in any provided `Request`.
1238 ///
1239 /// Since the relevant extension is still private, there is no way to
1240 /// interact with the original cases. The only effect this can have now is
1241 /// to forward the cases in a proxy-like fashion.
1242 ///
1243 /// Note that this setting does not affect HTTP/2.
1244 ///
1245 /// Default is false.
1246 #[cfg(feature = "http1")]
1247 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1248 pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self {
1249 self.h1_builder.preserve_header_case(val);
1250 self
1251 }
1252
1253 /// Set the maximum number of headers.
1254 ///
1255 /// When a response is received, the parser will reserve a buffer to store headers for optimal
1256 /// performance.
1257 ///
1258 /// If client receives more headers than the buffer size, the error "message header too large"
1259 /// is returned.
1260 ///
1261 /// The headers is allocated on the stack by default, which has higher performance. After
1262 /// setting this value, headers will be allocated in heap memory, that is, heap memory
1263 /// allocation will occur for each response, and there will be a performance drop of about 5%.
1264 ///
1265 /// Note that this setting does not affect HTTP/2.
1266 ///
1267 /// Default is 100.
1268 #[cfg(feature = "http1")]
1269 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1270 pub fn http1_max_headers(&mut self, val: usize) -> &mut Self {
1271 self.h1_builder.max_headers(val);
1272 self
1273 }
1274
1275 /// Set whether HTTP/0.9 responses should be tolerated.
1276 ///
1277 /// Default is false.
1278 #[cfg(feature = "http1")]
1279 #[cfg_attr(docsrs, doc(cfg(feature = "http1")))]
1280 pub fn http09_responses(&mut self, val: bool) -> &mut Self {
1281 self.h1_builder.http09_responses(val);
1282 self
1283 }
1284
1285 /// Set whether the connection **must** use HTTP/2.
1286 ///
1287 /// The destination must either allow HTTP2 Prior Knowledge, or the
1288 /// `Connect` should be configured to do use ALPN to upgrade to `h2`
1289 /// as part of the connection process. This will not make the `Client`
1290 /// utilize ALPN by itself.
1291 ///
1292 /// Note that setting this to true prevents HTTP/1 from being allowed.
1293 ///
1294 /// Default is false.
1295 #[cfg(feature = "http2")]
1296 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1297 pub fn http2_only(&mut self, val: bool) -> &mut Self {
1298 self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto };
1299 self
1300 }
1301
1302 /// Configures the maximum number of pending reset streams allowed before a GOAWAY will be sent.
1303 ///
1304 /// This will default to the default value set by the [`h2` crate](https://crates.io/crates/h2).
1305 /// As of v0.4.0, it is 20.
1306 ///
1307 /// See <https://github.com/hyperium/hyper/issues/2877> for more information.
1308 #[cfg(feature = "http2")]
1309 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1310 pub fn http2_max_pending_accept_reset_streams(
1311 &mut self,
1312 max: impl Into<Option<usize>>,
1313 ) -> &mut Self {
1314 self.h2_builder.max_pending_accept_reset_streams(max.into());
1315 self
1316 }
1317
1318 /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2
1319 /// stream-level flow control.
1320 ///
1321 /// Passing `None` will do nothing.
1322 ///
1323 /// If not set, hyper will use a default.
1324 ///
1325 /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE
1326 #[cfg(feature = "http2")]
1327 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1328 pub fn http2_initial_stream_window_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1329 self.h2_builder.initial_stream_window_size(sz.into());
1330 self
1331 }
1332
1333 /// Sets the max connection-level flow control for HTTP2
1334 ///
1335 /// Passing `None` will do nothing.
1336 ///
1337 /// If not set, hyper will use a default.
1338 #[cfg(feature = "http2")]
1339 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1340 pub fn http2_initial_connection_window_size(
1341 &mut self,
1342 sz: impl Into<Option<u32>>,
1343 ) -> &mut Self {
1344 self.h2_builder.initial_connection_window_size(sz.into());
1345 self
1346 }
1347
1348 /// Sets the initial maximum of locally initiated (send) streams.
1349 ///
1350 /// This value will be overwritten by the value included in the initial
1351 /// SETTINGS frame received from the peer as part of a [connection preface].
1352 ///
1353 /// Passing `None` will do nothing.
1354 ///
1355 /// If not set, hyper will use a default.
1356 ///
1357 /// [connection preface]: https://httpwg.org/specs/rfc9113.html#preface
1358 #[cfg(feature = "http2")]
1359 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1360 pub fn http2_initial_max_send_streams(
1361 &mut self,
1362 initial: impl Into<Option<usize>>,
1363 ) -> &mut Self {
1364 self.h2_builder.initial_max_send_streams(initial);
1365 self
1366 }
1367
1368 /// Sets whether to use an adaptive flow control.
1369 ///
1370 /// Enabling this will override the limits set in
1371 /// `http2_initial_stream_window_size` and
1372 /// `http2_initial_connection_window_size`.
1373 #[cfg(feature = "http2")]
1374 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1375 pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self {
1376 self.h2_builder.adaptive_window(enabled);
1377 self
1378 }
1379
1380 /// Sets the maximum frame size to use for HTTP2.
1381 ///
1382 /// Passing `None` will do nothing.
1383 ///
1384 /// If not set, hyper will use a default.
1385 #[cfg(feature = "http2")]
1386 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1387 pub fn http2_max_frame_size(&mut self, sz: impl Into<Option<u32>>) -> &mut Self {
1388 self.h2_builder.max_frame_size(sz);
1389 self
1390 }
1391
1392 /// Sets the max size of received header frames for HTTP2.
1393 ///
1394 /// Default is currently 16KB, but can change.
1395 #[cfg(feature = "http2")]
1396 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1397 pub fn http2_max_header_list_size(&mut self, max: u32) -> &mut Self {
1398 self.h2_builder.max_header_list_size(max);
1399 self
1400 }
1401
1402 /// Sets an interval for HTTP2 Ping frames should be sent to keep a
1403 /// connection alive.
1404 ///
1405 /// Pass `None` to disable HTTP2 keep-alive.
1406 ///
1407 /// Default is currently disabled.
1408 ///
1409 /// # Cargo Feature
1410 ///
1411 /// Requires the `tokio` cargo feature to be enabled.
1412 #[cfg(feature = "tokio")]
1413 #[cfg(feature = "http2")]
1414 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1415 pub fn http2_keep_alive_interval(
1416 &mut self,
1417 interval: impl Into<Option<Duration>>,
1418 ) -> &mut Self {
1419 self.h2_builder.keep_alive_interval(interval);
1420 self
1421 }
1422
1423 /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
1424 ///
1425 /// If the ping is not acknowledged within the timeout, the connection will
1426 /// be closed. Does nothing if `http2_keep_alive_interval` is disabled.
1427 ///
1428 /// Default is 20 seconds.
1429 ///
1430 /// # Cargo Feature
1431 ///
1432 /// Requires the `tokio` cargo feature to be enabled.
1433 #[cfg(feature = "tokio")]
1434 #[cfg(feature = "http2")]
1435 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1436 pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self {
1437 self.h2_builder.keep_alive_timeout(timeout);
1438 self
1439 }
1440
1441 /// Sets whether HTTP2 keep-alive should apply while the connection is idle.
1442 ///
1443 /// If disabled, keep-alive pings are only sent while there are open
1444 /// request/responses streams. If enabled, pings are also sent when no
1445 /// streams are active. Does nothing if `http2_keep_alive_interval` is
1446 /// disabled.
1447 ///
1448 /// Default is `false`.
1449 ///
1450 /// # Cargo Feature
1451 ///
1452 /// Requires the `tokio` cargo feature to be enabled.
1453 #[cfg(feature = "tokio")]
1454 #[cfg(feature = "http2")]
1455 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1456 pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self {
1457 self.h2_builder.keep_alive_while_idle(enabled);
1458 self
1459 }
1460
1461 /// Sets the maximum number of HTTP2 concurrent locally reset streams.
1462 ///
1463 /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more
1464 /// details.
1465 ///
1466 /// The default value is determined by the `h2` crate.
1467 ///
1468 /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams
1469 #[cfg(feature = "http2")]
1470 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1471 pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self {
1472 self.h2_builder.max_concurrent_reset_streams(max);
1473 self
1474 }
1475
1476 /// Sets the header table size.
1477 ///
1478 /// This setting informs the peer of the maximum size of the header compression
1479 /// table used to encode header blocks, in octets. The encoder may select any value
1480 /// equal to or less than the header table size specified by the sender.
1481 ///
1482 /// The default value of crate `h2` is 4,096.
1483 #[cfg(feature = "http2")]
1484 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1485 pub fn http2_header_table_size(&mut self, size: impl Into<Option<u32>>) -> &mut Self {
1486 self.h2_builder.header_table_size(size);
1487 self
1488 }
1489
1490 /// Enables or disables server push promises.
1491 ///
1492 /// This value is included in the initial SETTINGS handshake.
1493 /// Setting this value to value to
1494 /// false in the initial SETTINGS handshake guarantees that the remote server
1495 /// will never send a push promise.
1496 ///
1497 /// This setting can be changed during the life of a single HTTP/2
1498 /// connection by sending another settings frame updating the value.
1499 ///
1500 /// Default value of crate `h2`: `true`.
1501 #[cfg(feature = "http2")]
1502 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1503 pub fn http2_enable_push(&mut self, enabled: bool) -> &mut Self {
1504 self.h2_builder.enable_push(enabled);
1505 self
1506 }
1507
1508 /// Sets the maximum number of concurrent streams.
1509 ///
1510 /// The maximum concurrent streams setting only controls the maximum number
1511 /// of streams that can be initiated by the remote peer. In other words,
1512 /// when this setting is set to 100, this does not limit the number of
1513 /// concurrent streams that can be created by the caller.
1514 ///
1515 /// It is recommended that this value be no smaller than 100, so as to not
1516 /// unnecessarily limit parallelism. However, any value is legal, including
1517 /// 0. If `max` is set to 0, then the remote will not be permitted to
1518 /// initiate streams.
1519 ///
1520 /// Note that streams in the reserved state, i.e., push promises that have
1521 /// been reserved but the stream has not started, do not count against this
1522 /// setting.
1523 ///
1524 /// Also note that if the remote *does* exceed the value set here, it is not
1525 /// a protocol level error. Instead, the `h2` library will immediately reset
1526 /// the stream.
1527 ///
1528 /// See [Section 5.1.2] in the HTTP/2 spec for more details.
1529 ///
1530 /// [Section 5.1.2]: https://http2.github.io/http2-spec/#rfc.section.5.1.2
1531 #[cfg(feature = "http2")]
1532 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1533 pub fn http2_max_concurrent_streams(&mut self, max: impl Into<Option<u32>>) -> &mut Self {
1534 self.h2_builder.max_concurrent_streams(max);
1535 self
1536 }
1537
1538 /// Sets the `Headers` frame pseudo order.
1539 ///
1540 /// The pseudo header order setting controls the order of pseudo headers in the
1541 /// serialized HTTP/2 headers. The default value is `None`, which means that
1542 /// we let the `miku-h2` crate decide the order.
1543 #[cfg(feature = "http2")]
1544 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1545 pub fn http2_headers_frame_pseudo_order(
1546 &mut self,
1547 order: impl Into<Option<&'static [PseudoType; 4]>>,
1548 ) -> &mut Self {
1549 self.h2_builder.headers_frame_pseudo_order(order);
1550 self
1551 }
1552
1553 /// Sets the `Headers` frame priority.
1554 #[cfg(feature = "http2")]
1555 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1556 pub fn headers_frame_priority(
1557 &mut self,
1558 priority: impl Into<Option<FrameStreamDependency>>,
1559 ) -> &mut Self {
1560 self.h2_builder.headers_frame_priority(priority);
1561 self
1562 }
1563
1564 /// Sets the `Priority` frames (settings) for virtual streams.
1565 #[cfg(feature = "http2")]
1566 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1567 pub fn virtual_streams_priorities(
1568 &mut self,
1569 priorities: impl Into<Option<&'static [FramePriority]>>,
1570 ) -> &mut Self {
1571 self.h2_builder.virtual_streams_priorities(priorities);
1572 self
1573 }
1574
1575 /// Provide a timer to be used for h2
1576 ///
1577 /// See the documentation of [`h2::client::Builder::timer`] for more
1578 /// details.
1579 ///
1580 /// [`h2::client::Builder::timer`]: https://docs.rs/h2/client/struct.Builder.html#method.timer
1581 pub fn timer<M>(&mut self, timer: M) -> &mut Self
1582 where
1583 M: Timer + Send + Sync + 'static,
1584 {
1585 #[cfg(feature = "http2")]
1586 self.h2_builder.timer(timer);
1587 self
1588 }
1589
1590 /// Provide a timer to be used for timeouts and intervals in connection pools.
1591 pub fn pool_timer<M>(&mut self, timer: M) -> &mut Self
1592 where
1593 M: Timer + Clone + Send + Sync + 'static,
1594 {
1595 self.pool_timer = Some(timer::Timer::new(timer.clone()));
1596 self
1597 }
1598
1599 /// Set the maximum write buffer size for each HTTP/2 stream.
1600 ///
1601 /// Default is currently 1MB, but may change.
1602 ///
1603 /// # Panics
1604 ///
1605 /// The value must be no larger than `u32::MAX`.
1606 #[cfg(feature = "http2")]
1607 #[cfg_attr(docsrs, doc(cfg(feature = "http2")))]
1608 pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self {
1609 self.h2_builder.max_send_buf_size(max);
1610 self
1611 }
1612
1613 /// Set whether to retry requests that get disrupted before ever starting
1614 /// to write.
1615 ///
1616 /// This means a request that is queued, and gets given an idle, reused
1617 /// connection, and then encounters an error immediately as the idle
1618 /// connection was found to be unusable.
1619 ///
1620 /// When this is set to `false`, the related `ResponseFuture` would instead
1621 /// resolve to an `Error::Cancel`.
1622 ///
1623 /// Default is `true`.
1624 #[inline]
1625 pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
1626 self.client_config.retry_canceled_requests = val;
1627 self
1628 }
1629
1630 /// Set whether to automatically add the `Host` header to requests.
1631 ///
1632 /// If true, and a request does not include a `Host` header, one will be
1633 /// added automatically, derived from the authority of the `Uri`.
1634 ///
1635 /// Default is `true`.
1636 #[inline]
1637 pub fn set_host(&mut self, val: bool) -> &mut Self {
1638 self.client_config.set_host = val;
1639 self
1640 }
1641
1642 /// Build a client with this configuration and the default `HttpConnector`.
1643 #[cfg(feature = "tokio")]
1644 pub fn build_http<B>(&self) -> Client<HttpConnector, B>
1645 where
1646 B: Body + Send,
1647 B::Data: Send,
1648 {
1649 let mut connector = HttpConnector::new();
1650 if self.pool_config.is_enabled() {
1651 connector.set_keepalive(self.pool_config.idle_timeout);
1652 }
1653 self.build(connector)
1654 }
1655
1656 /// Combine the configuration of this builder with a connector to create a `Client`.
1657 pub fn build<C, B>(&self, connector: C) -> Client<C, B>
1658 where
1659 C: Connect + Clone,
1660 B: Body + Send,
1661 B::Data: Send,
1662 {
1663 let exec = self.exec.clone();
1664 let timer = self.pool_timer.clone();
1665 Client {
1666 config: self.client_config,
1667 exec: exec.clone(),
1668 #[cfg(feature = "http1")]
1669 h1_builder: self.h1_builder.clone(),
1670 #[cfg(feature = "http2")]
1671 h2_builder: self.h2_builder.clone(),
1672 connector,
1673 pool: pool::Pool::new(self.pool_config, exec, timer),
1674 }
1675 }
1676}
1677
1678impl fmt::Debug for Builder {
1679 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1680 f.debug_struct("Builder")
1681 .field("client_config", &self.client_config)
1682 .field("pool_config", &self.pool_config)
1683 .finish()
1684 }
1685}
1686
1687// ==== impl Error ====
1688
1689impl fmt::Debug for Error {
1690 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1691 let mut f = f.debug_tuple("hyper_util::client::legacy::Error");
1692 f.field(&self.kind);
1693 if let Some(ref cause) = self.source {
1694 f.field(cause);
1695 }
1696 f.finish()
1697 }
1698}
1699
1700impl fmt::Display for Error {
1701 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1702 write!(f, "client error ({:?})", self.kind)
1703 }
1704}
1705
1706impl StdError for Error {
1707 fn source(&self) -> Option<&(dyn StdError + 'static)> {
1708 self.source.as_ref().map(|e| &**e as _)
1709 }
1710}
1711
1712impl Error {
1713 /// Returns true if this was an error from `Connect`.
1714 pub fn is_connect(&self) -> bool {
1715 matches!(self.kind, ErrorKind::Connect)
1716 }
1717
1718 /// Returns the info of the client connection on which this error occurred.
1719 #[cfg(any(feature = "http1", feature = "http2"))]
1720 pub fn connect_info(&self) -> Option<&Connected> {
1721 self.connect_info.as_ref()
1722 }
1723
1724 #[cfg(any(feature = "http1", feature = "http2"))]
1725 fn with_connect_info(self, connect_info: Connected) -> Self {
1726 Self {
1727 connect_info: Some(connect_info),
1728 ..self
1729 }
1730 }
1731 fn is_canceled(&self) -> bool {
1732 matches!(self.kind, ErrorKind::Canceled)
1733 }
1734
1735 fn tx(src: hyper::Error) -> Self {
1736 e!(SendRequest, src)
1737 }
1738
1739 fn closed(src: hyper::Error) -> Self {
1740 e!(ChannelClosed, src)
1741 }
1742}