volo_http/client/transport/
protocol.rs1use std::{error::Error, str::FromStr, sync::LazyLock};
4
5use futures::{
6 FutureExt, TryFutureExt,
7 future::{self, Either},
8};
9use http::{
10 header,
11 uri::{Authority, Scheme, Uri},
12 version::Version,
13};
14use hyper::client::conn;
15use hyper_util::rt::TokioIo;
16use motore::{make::MakeConnection, service::Service};
17use volo::{context::Context, net::Address};
18
19use super::{
20 connector::{HttpMakeConnection, PeerInfo},
21 pool::{self, Connecting, Pool, Poolable, Pooled, Reservation},
22};
23use crate::{
24 body::Body,
25 context::ClientContext,
26 error::{
27 BoxError, ClientError,
28 client::{Result, connect_error, no_address, request_error, retry, tri},
29 },
30 request::Request,
31 response::Response,
32 utils::lazy::Started,
33};
34
35#[derive(Default)]
37pub(crate) struct ClientConfig {
38 #[cfg(feature = "http1")]
39 pub h1: super::http1::Config,
40 #[cfg(feature = "http2")]
41 pub h2: super::http2::Config,
42}
43
44#[derive(Clone)]
45pub(crate) struct ClientTransportConfig {
46 pub stat_enable: bool,
47 #[cfg(feature = "__tls")]
48 #[cfg_attr(docsrs, doc(cfg(any(feature = "rustls", feature = "native-tls"))))]
49 pub disable_tls: bool,
50}
51
52impl Default for ClientTransportConfig {
53 fn default() -> Self {
54 Self::new()
55 }
56}
57
58impl ClientTransportConfig {
59 pub fn new() -> Self {
60 Self {
61 stat_enable: true,
62 #[cfg(feature = "__tls")]
63 disable_tls: false,
64 }
65 }
66}
67
68pub struct ClientTransport<B = Body> {
77 #[cfg(feature = "http1")]
78 h1_client: conn::http1::Builder,
79 #[cfg(feature = "http2")]
80 h2_client: conn::http2::Builder<hyper_util::rt::TokioExecutor>,
81 config: ClientTransportConfig,
82 connector: HttpMakeConnection,
83 pool: Pool<PoolKey, HttpConnection<B>>,
84}
85
86type PoolKey = (Scheme, Address);
87
88impl<B> ClientTransport<B> {
89 pub(crate) fn new(
90 http_config: ClientConfig,
91 transport_config: ClientTransportConfig,
92 pool_config: pool::Config,
93 #[cfg(feature = "__tls")] tls_connector: Option<volo::net::tls::TlsConnector>,
94 ) -> Self {
95 #[cfg(feature = "http1")]
96 let h1_client = super::http1::client(&http_config.h1);
97 #[cfg(feature = "http2")]
98 let h2_client = super::http2::client(&http_config.h2);
99
100 let builder = HttpMakeConnection::builder(&transport_config);
101 #[cfg(feature = "__tls")]
102 let builder = match tls_connector {
103 Some(connector) => builder.with_tls_connector(connector),
104 None => builder,
105 };
106 let connector = builder.build();
107
108 Self {
109 #[cfg(feature = "http1")]
110 h1_client,
111 #[cfg(feature = "http2")]
112 h2_client,
113 config: transport_config,
114 connector,
115 pool: Pool::new(pool_config),
116 }
117 }
118
119 fn connect_to(
120 &self,
121 ver: pool::Ver,
122 peer: PeerInfo,
123 ) -> impl Started<Output = Result<Pooled<PoolKey, HttpConnection<B>>>> + Send + 'static
124 where
125 B: http_body::Body + Unpin + Send + 'static,
126 B::Data: Send,
127 B::Error: Into<BoxError> + 'static,
128 {
129 let key = (peer.scheme.clone(), peer.address.clone());
130 let connector = self.connector.clone();
131 let pool = self.pool.clone();
132 #[cfg(feature = "http1")]
133 let h1_client = self.h1_client.clone();
134 #[cfg(feature = "http2")]
135 let h2_client = self.h2_client.clone();
136
137 crate::utils::lazy::lazy(move || {
138 let connecting = match pool.connecting(&key, ver) {
139 Some(lock) => lock,
140 None => return Either::Right(future::err(retry())),
141 };
142 Either::Left(Box::pin(connect_impl(
143 ver,
144 peer,
145 connector,
146 pool,
147 connecting,
148 #[cfg(feature = "http1")]
149 h1_client,
150 #[cfg(feature = "http2")]
151 h2_client,
152 )))
153 })
154 }
155
156 async fn pooled_connect(
157 &self,
158 ver: Version,
159 peer: PeerInfo,
160 ) -> Result<Pooled<PoolKey, HttpConnection<B>>>
161 where
162 B: http_body::Body + Unpin + Send + 'static,
163 B::Data: Send,
164 B::Error: Into<BoxError> + 'static,
165 {
166 let key = (peer.scheme.clone(), peer.address.clone());
167
168 let checkout = self.pool.checkout(key);
169 let connect = self.connect_to(ver.into(), peer);
170
171 match future::select(checkout, connect).await {
173 Either::Left((Ok(checked_out), connecting)) => {
174 if connecting.started() {
176 let conn_fut = connecting
177 .map_err(|err| tracing::trace!("background connect error: {err}"))
178 .map(|_pooled| {
179 });
181 tokio::spawn(conn_fut);
183 }
184 Ok(checked_out)
185 }
186 Either::Right((Ok(connected), _checkout)) => Ok(connected),
187 Either::Left((Err(err), connecting)) => {
188 if err.is_canceled() {
190 connecting.await
191 } else {
192 Err(connect_error(err))
194 }
195 }
196 Either::Right((Err(err), checkout)) => {
197 if err
200 .source()
201 .is_some_and(<dyn Error>::is::<crate::error::client::Retry>)
202 {
203 checkout.await.map_err(connect_error)
204 } else {
205 Err(err)
207 }
208 }
209 }
210 }
211}
212
213async fn connect_impl<B>(
214 _ver: pool::Ver,
215 peer: PeerInfo,
216 connector: HttpMakeConnection,
217 pool: Pool<PoolKey, HttpConnection<B>>,
218 connecting: Connecting<PoolKey, HttpConnection<B>>,
219 #[cfg(feature = "http1")] h1_client: conn::http1::Builder,
220 #[cfg(feature = "http2")] h2_client: conn::http2::Builder<hyper_util::rt::TokioExecutor>,
221) -> Result<Pooled<PoolKey, HttpConnection<B>>>
222where
223 B: http_body::Body + Unpin + Send + 'static,
224 B::Data: Send,
225 B::Error: Into<BoxError> + 'static,
226{
227 let conn = match connector.make_connection(peer).await {
228 Ok(conn) => conn,
229 Err(err) => {
230 tracing::warn!("[Volo-HTTP] failed to make connection: {err}");
231 return Err(err);
232 }
233 };
234
235 #[cfg(feature = "http2")]
236 let use_h2 = conn_use_h2(_ver, &conn);
237 #[cfg(not(feature = "http2"))]
238 let use_h2 = false;
239
240 let conn = TokioIo::new(conn);
241 if use_h2 {
242 #[cfg(feature = "http2")]
243 {
244 let connecting = if _ver == pool::Ver::Auto {
245 tri!(connecting.alpn_h2(&pool).ok_or_else(retry))
246 } else {
247 connecting
248 };
249 let (mut sender, conn) = tri!(h2_client.handshake(conn).await.map_err(connect_error));
250 tokio::spawn(conn);
251 tri!(sender.ready().await.map_err(connect_error));
253 Ok(pool.pooled(connecting, HttpConnection::H2(sender)))
254 }
255 #[cfg(not(feature = "http2"))]
256 Err(crate::error::client::bad_version())
257 } else {
258 #[cfg(feature = "http1")]
259 {
260 let (mut sender, conn) = tri!(h1_client.handshake(conn).await.map_err(connect_error));
261 tokio::spawn(conn);
262 tri!(sender.ready().await.map_err(connect_error));
264 Ok(pool.pooled(connecting, HttpConnection::H1(sender)))
265 }
266 #[cfg(not(feature = "http1"))]
267 Err(crate::error::client::bad_version())
268 }
269}
270
271#[cfg(feature = "http2")]
272fn conn_use_h2(ver: pool::Ver, _conn: &volo::net::conn::Conn) -> bool {
273 #[cfg(feature = "__tls")]
274 let use_h2 = match _conn.stream.negotiated_alpn().as_deref() {
275 Some(alpn) => {
276 if alpn == b"h2" {
278 return true;
279 }
280 false
282 }
283 None => true,
285 };
286 #[cfg(not(feature = "__tls"))]
287 let use_h2 = true;
288
289 if use_h2 && (ver == pool::Ver::Http2 || cfg!(not(feature = "http1"))) {
291 return true;
292 }
293
294 false
295}
296
297impl<B> Service<ClientContext, Request<B>> for ClientTransport<B>
298where
299 B: http_body::Body + Unpin + Send + 'static,
300 B::Data: Send,
301 B::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
302{
303 type Response = Response;
304 type Error = ClientError;
305
306 async fn call(
307 &self,
308 cx: &mut ClientContext,
309 mut req: Request<B>,
310 ) -> Result<Self::Response, Self::Error> {
311 rewrite_uri(cx, &mut req);
312
313 let callee = cx.rpc_info().callee();
314 let address = callee.address().ok_or_else(no_address)?;
315
316 let ver = req.version();
317 let peer = PeerInfo {
318 scheme: cx.target().scheme().cloned().unwrap_or(Scheme::HTTP),
319 address,
320 #[cfg(feature = "__tls")]
321 name: callee.service_name(),
322 };
323
324 let stat_enabled = self.config.stat_enable;
325 if stat_enabled {
326 cx.stats.record_transport_start_at();
327 }
328
329 let mut conn = tri!(self.pooled_connect(ver, peer).await);
330 let res = conn.send_request(req).await;
331
332 if stat_enabled {
333 cx.stats.record_transport_end_at();
334 }
335
336 res
337 }
338}
339
340enum HttpConnection<B> {
341 #[cfg(feature = "http1")]
342 H1(conn::http1::SendRequest<B>),
343 #[cfg(feature = "http2")]
344 H2(conn::http2::SendRequest<B>),
345}
346
347impl<B> Poolable for HttpConnection<B>
348where
349 B: Send + 'static,
350{
351 fn is_open(&self) -> bool {
352 match &self {
353 #[cfg(feature = "http1")]
354 Self::H1(h1) => h1.is_ready(),
355 #[cfg(feature = "http2")]
356 Self::H2(h2) => h2.is_ready(),
357 }
358 }
359
360 fn reserve(self) -> Reservation<Self> {
361 match self {
362 #[cfg(feature = "http1")]
363 Self::H1(h1) => Reservation::Unique(Self::H1(h1)),
364 #[cfg(feature = "http2")]
365 Self::H2(h2) => Reservation::Shared(Self::H2(h2.clone()), Self::H2(h2)),
366 }
367 }
368
369 fn can_share(&self) -> bool {
370 match self {
371 #[cfg(feature = "http1")]
372 Self::H1(_) => false,
373 #[cfg(feature = "http2")]
374 Self::H2(_) => true,
375 }
376 }
377}
378
379impl<B> HttpConnection<B>
380where
381 B: http_body::Body + Send + 'static,
382 B::Data: Send,
383 B::Error: Into<Box<dyn std::error::Error + Send + Sync>> + 'static,
384{
385 pub async fn send_request(&mut self, req: Request<B>) -> Result<Response> {
386 let res = match self {
387 #[cfg(feature = "http1")]
388 Self::H1(h1) => h1.send_request(req).await,
389 #[cfg(feature = "http2")]
390 Self::H2(h2) => h2.send_request(req).await,
391 };
392 match res {
393 Ok(resp) => Ok(resp.map(Body::from_incoming)),
394 Err(err) => Err(request_error(err)),
395 }
396 }
397}
398
399static PLACEHOLDER: LazyLock<Authority> =
400 LazyLock::new(|| Authority::from_static("volo-http.placeholder"));
401
402fn gen_authority<B>(req: &Request<B>) -> Authority {
403 let Some(host) = req.headers().get(header::HOST) else {
404 return PLACEHOLDER.to_owned();
405 };
406 let Ok(host) = host.to_str() else {
407 return PLACEHOLDER.to_owned();
408 };
409 let Ok(authority) = Authority::from_str(host) else {
410 return PLACEHOLDER.to_owned();
411 };
412 authority
413}
414
415fn rewrite_uri<B>(cx: &ClientContext, req: &mut Request<B>) {
426 if req.version() != Version::HTTP_2 {
427 return;
428 }
429 let scheme = cx.target().scheme().cloned().unwrap_or(Scheme::HTTP);
430 let authority = gen_authority(req);
431 let mut parts = req.uri().to_owned().into_parts();
432 parts.scheme = Some(scheme);
433 parts.authority = Some(authority);
434 let Ok(uri) = Uri::from_parts(parts) else {
435 return;
436 };
437 *req.uri_mut() = uri;
438}