volo_http/client/transport/
protocol.rs

1//! Protocol related implementations
2
3use std::{error::Error, str::FromStr, sync::LazyLock};
4
5use futures::{
6    FutureExt, TryFutureExt,
7    future::{self, Either},
8};
9use http::{
10    header,
11    uri::{Authority, Scheme, Uri},
12    version::Version,
13};
14use hyper::client::conn;
15use hyper_util::rt::TokioIo;
16use motore::{make::MakeConnection, service::Service};
17use volo::{context::Context, net::Address};
18
19use super::{
20    connector::{HttpMakeConnection, PeerInfo},
21    pool::{self, Connecting, Pool, Poolable, Pooled, Reservation},
22};
23use crate::{
24    body::Body,
25    context::ClientContext,
26    error::{
27        BoxError, ClientError,
28        client::{Result, connect_error, no_address, request_error, retry, tri},
29    },
30    request::Request,
31    response::Response,
32    utils::lazy::Started,
33};
34
35/// Configuration of HTTP/1
36#[derive(Default)]
37pub(crate) struct ClientConfig {
38    #[cfg(feature = "http1")]
39    pub h1: super::http1::Config,
40    #[cfg(feature = "http2")]
41    pub h2: super::http2::Config,
42}
43
44#[derive(Clone)]
45pub(crate) struct ClientTransportConfig {
46    pub stat_enable: bool,
47    #[cfg(feature = "__tls")]
48    #[cfg_attr(docsrs, doc(cfg(any(feature = "rustls", feature = "native-tls"))))]
49    pub disable_tls: bool,
50}
51
52impl Default for ClientTransportConfig {
53    fn default() -> Self {
54        Self::new()
55    }
56}
57
58impl ClientTransportConfig {
59    pub fn new() -> Self {
60        Self {
61            stat_enable: true,
62            #[cfg(feature = "__tls")]
63            disable_tls: false,
64        }
65    }
66}
67
68/// Transport service of HTTP Client.
69///
70/// This service will connect to the [`Address`] of callee's [`Endpoint`] in [`ClientContext`], then
71/// send a [`Request`] to the destination server, and return a [`Response`] the server response.
72///
73/// [`Endpoint`]: volo::context::Endpoint
74/// [`Request`]: http::request::Request
75/// [`Response`]: http::response::Response
76pub struct ClientTransport<B = Body> {
77    #[cfg(feature = "http1")]
78    h1_client: conn::http1::Builder,
79    #[cfg(feature = "http2")]
80    h2_client: conn::http2::Builder<hyper_util::rt::TokioExecutor>,
81    config: ClientTransportConfig,
82    connector: HttpMakeConnection,
83    pool: Pool<PoolKey, HttpConnection<B>>,
84}
85
86type PoolKey = (Scheme, Address);
87
88impl<B> ClientTransport<B> {
89    pub(crate) fn new(
90        http_config: ClientConfig,
91        transport_config: ClientTransportConfig,
92        pool_config: pool::Config,
93        #[cfg(feature = "__tls")] tls_connector: Option<volo::net::tls::TlsConnector>,
94    ) -> Self {
95        #[cfg(feature = "http1")]
96        let h1_client = super::http1::client(&http_config.h1);
97        #[cfg(feature = "http2")]
98        let h2_client = super::http2::client(&http_config.h2);
99
100        let builder = HttpMakeConnection::builder(&transport_config);
101        #[cfg(feature = "__tls")]
102        let builder = match tls_connector {
103            Some(connector) => builder.with_tls_connector(connector),
104            None => builder,
105        };
106        let connector = builder.build();
107
108        Self {
109            #[cfg(feature = "http1")]
110            h1_client,
111            #[cfg(feature = "http2")]
112            h2_client,
113            config: transport_config,
114            connector,
115            pool: Pool::new(pool_config),
116        }
117    }
118
119    fn connect_to(
120        &self,
121        ver: pool::Ver,
122        peer: PeerInfo,
123    ) -> impl Started<Output = Result<Pooled<PoolKey, HttpConnection<B>>>> + Send + 'static
124    where
125        B: http_body::Body + Unpin + Send + 'static,
126        B::Data: Send,
127        B::Error: Into<BoxError> + 'static,
128    {
129        let key = (peer.scheme.clone(), peer.address.clone());
130        let connector = self.connector.clone();
131        let pool = self.pool.clone();
132        #[cfg(feature = "http1")]
133        let h1_client = self.h1_client.clone();
134        #[cfg(feature = "http2")]
135        let h2_client = self.h2_client.clone();
136
137        crate::utils::lazy::lazy(move || {
138            let connecting = match pool.connecting(&key, ver) {
139                Some(lock) => lock,
140                None => return Either::Right(future::err(retry())),
141            };
142            Either::Left(Box::pin(connect_impl(
143                ver,
144                peer,
145                connector,
146                pool,
147                connecting,
148                #[cfg(feature = "http1")]
149                h1_client,
150                #[cfg(feature = "http2")]
151                h2_client,
152            )))
153        })
154    }
155
156    async fn pooled_connect(
157        &self,
158        ver: Version,
159        peer: PeerInfo,
160    ) -> Result<Pooled<PoolKey, HttpConnection<B>>>
161    where
162        B: http_body::Body + Unpin + Send + 'static,
163        B::Data: Send,
164        B::Error: Into<BoxError> + 'static,
165    {
166        let key = (peer.scheme.clone(), peer.address.clone());
167
168        let checkout = self.pool.checkout(key);
169        let connect = self.connect_to(ver.into(), peer);
170
171        // Well, `futures::future::select` is more suitable than `tokio::select!` in this case.
172        match future::select(checkout, connect).await {
173            Either::Left((Ok(checked_out), connecting)) => {
174                // Checkout is done while connecting is started
175                if connecting.started() {
176                    let conn_fut = connecting
177                        .map_err(|err| tracing::trace!("background connect error: {err}"))
178                        .map(|_pooled| {
179                            // Drop the `Pooled` and put it into pool in `Drop`
180                        });
181                    // Spawn it for finishing the connecting
182                    tokio::spawn(conn_fut);
183                }
184                Ok(checked_out)
185            }
186            Either::Right((Ok(connected), _checkout)) => Ok(connected),
187            Either::Left((Err(err), connecting)) => {
188                // The checked out connection was closed, just continue the connecting
189                if err.is_canceled() {
190                    connecting.await
191                } else {
192                    // unreachable?
193                    Err(connect_error(err))
194                }
195            }
196            Either::Right((Err(err), checkout)) => {
197                // The connection failed while acquiring the pool lock, and we should retry the
198                // checkout.
199                if err
200                    .source()
201                    .is_some_and(<dyn Error>::is::<crate::error::client::Retry>)
202                {
203                    checkout.await.map_err(connect_error)
204                } else {
205                    // Unexpected connect error
206                    Err(err)
207                }
208            }
209        }
210    }
211}
212
213async fn connect_impl<B>(
214    _ver: pool::Ver,
215    peer: PeerInfo,
216    connector: HttpMakeConnection,
217    pool: Pool<PoolKey, HttpConnection<B>>,
218    connecting: Connecting<PoolKey, HttpConnection<B>>,
219    #[cfg(feature = "http1")] h1_client: conn::http1::Builder,
220    #[cfg(feature = "http2")] h2_client: conn::http2::Builder<hyper_util::rt::TokioExecutor>,
221) -> Result<Pooled<PoolKey, HttpConnection<B>>>
222where
223    B: http_body::Body + Unpin + Send + 'static,
224    B::Data: Send,
225    B::Error: Into<BoxError> + 'static,
226{
227    let conn = match connector.make_connection(peer).await {
228        Ok(conn) => conn,
229        Err(err) => {
230            tracing::warn!("[Volo-HTTP] failed to make connection: {err}");
231            return Err(err);
232        }
233    };
234
235    #[cfg(feature = "http2")]
236    let use_h2 = conn_use_h2(_ver, &conn);
237    #[cfg(not(feature = "http2"))]
238    let use_h2 = false;
239
240    let conn = TokioIo::new(conn);
241    if use_h2 {
242        #[cfg(feature = "http2")]
243        {
244            let connecting = if _ver == pool::Ver::Auto {
245                tri!(connecting.alpn_h2(&pool).ok_or_else(retry))
246            } else {
247                connecting
248            };
249            let (mut sender, conn) = tri!(h2_client.handshake(conn).await.map_err(connect_error));
250            tokio::spawn(conn);
251            // Wait for `conn` to ready up before we declare self sender as usable.
252            tri!(sender.ready().await.map_err(connect_error));
253            Ok(pool.pooled(connecting, HttpConnection::H2(sender)))
254        }
255        #[cfg(not(feature = "http2"))]
256        Err(crate::error::client::bad_version())
257    } else {
258        #[cfg(feature = "http1")]
259        {
260            let (mut sender, conn) = tri!(h1_client.handshake(conn).await.map_err(connect_error));
261            tokio::spawn(conn);
262            // Wait for `conn` to ready up before we declare self sender as usable.
263            tri!(sender.ready().await.map_err(connect_error));
264            Ok(pool.pooled(connecting, HttpConnection::H1(sender)))
265        }
266        #[cfg(not(feature = "http1"))]
267        Err(crate::error::client::bad_version())
268    }
269}
270
271#[cfg(feature = "http2")]
272fn conn_use_h2(ver: pool::Ver, _conn: &volo::net::conn::Conn) -> bool {
273    #[cfg(feature = "__tls")]
274    let use_h2 = match _conn.stream.negotiated_alpn().as_deref() {
275        Some(alpn) => {
276            // ALPN negotiated to use H2
277            if alpn == b"h2" {
278                return true;
279            }
280            // ALPN negotiated not to use H2
281            false
282        }
283        // Use H2 by default
284        None => true,
285    };
286    #[cfg(not(feature = "__tls"))]
287    let use_h2 = true;
288
289    // H2 is specified or H1 is disabled
290    if use_h2 && (ver == pool::Ver::Http2 || cfg!(not(feature = "http1"))) {
291        return true;
292    }
293
294    false
295}
296
297impl<B> Service<ClientContext, Request<B>> for ClientTransport<B>
298where
299    B: http_body::Body + Unpin + Send + 'static,
300    B::Data: Send,
301    B::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
302{
303    type Response = Response;
304    type Error = ClientError;
305
306    async fn call(
307        &self,
308        cx: &mut ClientContext,
309        mut req: Request<B>,
310    ) -> Result<Self::Response, Self::Error> {
311        rewrite_uri(cx, &mut req);
312
313        let callee = cx.rpc_info().callee();
314        let address = callee.address().ok_or_else(no_address)?;
315
316        let ver = req.version();
317        let peer = PeerInfo {
318            scheme: cx.target().scheme().cloned().unwrap_or(Scheme::HTTP),
319            address,
320            #[cfg(feature = "__tls")]
321            name: callee.service_name(),
322        };
323
324        let stat_enabled = self.config.stat_enable;
325        if stat_enabled {
326            cx.stats.record_transport_start_at();
327        }
328
329        let mut conn = tri!(self.pooled_connect(ver, peer).await);
330        let res = conn.send_request(req).await;
331
332        if stat_enabled {
333            cx.stats.record_transport_end_at();
334        }
335
336        res
337    }
338}
339
340enum HttpConnection<B> {
341    #[cfg(feature = "http1")]
342    H1(conn::http1::SendRequest<B>),
343    #[cfg(feature = "http2")]
344    H2(conn::http2::SendRequest<B>),
345}
346
347impl<B> Poolable for HttpConnection<B>
348where
349    B: Send + 'static,
350{
351    fn is_open(&self) -> bool {
352        match &self {
353            #[cfg(feature = "http1")]
354            Self::H1(h1) => h1.is_ready(),
355            #[cfg(feature = "http2")]
356            Self::H2(h2) => h2.is_ready(),
357        }
358    }
359
360    fn reserve(self) -> Reservation<Self> {
361        match self {
362            #[cfg(feature = "http1")]
363            Self::H1(h1) => Reservation::Unique(Self::H1(h1)),
364            #[cfg(feature = "http2")]
365            Self::H2(h2) => Reservation::Shared(Self::H2(h2.clone()), Self::H2(h2)),
366        }
367    }
368
369    fn can_share(&self) -> bool {
370        match self {
371            #[cfg(feature = "http1")]
372            Self::H1(_) => false,
373            #[cfg(feature = "http2")]
374            Self::H2(_) => true,
375        }
376    }
377}
378
379impl<B> HttpConnection<B>
380where
381    B: http_body::Body + Send + 'static,
382    B::Data: Send,
383    B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
384{
385    pub async fn send_request(&mut self, req: Request<B>) -> Result<Response> {
386        let res = match self {
387            #[cfg(feature = "http1")]
388            Self::H1(h1) => h1.send_request(req).await,
389            #[cfg(feature = "http2")]
390            Self::H2(h2) => h2.send_request(req).await,
391        };
392        match res {
393            Ok(resp) => Ok(resp.map(Body::from_incoming)),
394            Err(err) => Err(request_error(err)),
395        }
396    }
397}
398
399static PLACEHOLDER: LazyLock<Authority> =
400    LazyLock::new(|| Authority::from_static("volo-http.placeholder"));
401
402fn gen_authority<B>(req: &Request<B>) -> Authority {
403    let Some(host) = req.headers().get(header::HOST) else {
404        return PLACEHOLDER.to_owned();
405    };
406    let Ok(host) = host.to_str() else {
407        return PLACEHOLDER.to_owned();
408    };
409    let Ok(authority) = Authority::from_str(host) else {
410        return PLACEHOLDER.to_owned();
411    };
412    authority
413}
414
415// We use this function for HTTP/2 only because
416//
417// 1. header of http2 request has a field `:scheme`, hyper demands that uri of h2 request MUST have
418//    FULL uri, althrough scheme in `Uri` is optional, but authority is required.
419//
420//    If authority exists, hyper will set `:scheme` to HTTP if there is no scheme in `Uri`. But if
421//    there is no authority, hyper will throw an error `MissingUriSchemeAndAuthority`.
422//
423// 2. For http2 request, hyper will ignore `Host` in `HeaderMap` and take authority as its `Host` in
424//    HEADERS frame. So we must take our `Host` and set it as authority of `Uri`.
425fn rewrite_uri<B>(cx: &ClientContext, req: &mut Request<B>) {
426    if req.version() != Version::HTTP_2 {
427        return;
428    }
429    let scheme = cx.target().scheme().cloned().unwrap_or(Scheme::HTTP);
430    let authority = gen_authority(req);
431    let mut parts = req.uri().to_owned().into_parts();
432    parts.scheme = Some(scheme);
433    parts.authority = Some(authority);
434    let Ok(uri) = Uri::from_parts(parts) else {
435        return;
436    };
437    *req.uri_mut() = uri;
438}