genmeta_proxy/
h3_forward.rs1use dhttp::{
2 endpoint::Endpoint,
3 h3x::dhttp::message::{MessageStreamError, MessageWriter},
4};
5use http::uri::{self, Uri};
6use hyper::{
7 Request, Response,
8 body::{Body, Incoming},
9 header,
10};
11use snafu::ResultExt;
12
13use crate::Error;
14
15const HOP_BY_HOP_HEADERS: &[header::HeaderName] = &[
18 header::CONNECTION,
19 header::TRANSFER_ENCODING,
20 header::TE,
21 header::UPGRADE,
22 header::HOST,
23 ];
25
26fn rewrite_request_for_h3(mut req: Request<Incoming>) -> Request<Incoming> {
31 let uri = req.uri().clone();
33 let mut parts = uri.into_parts();
34 parts.scheme = Some(uri::Scheme::HTTPS);
35 if let Ok(new_uri) = Uri::from_parts(parts) {
36 *req.uri_mut() = new_uri;
37 }
38
39 let headers = req.headers_mut();
41 for name in HOP_BY_HOP_HEADERS {
42 headers.remove(name);
43 }
44 headers.remove("proxy-connection");
45 headers.remove("keep-alive");
46
47 req
48}
49
50async fn close_write_stream(mut write_stream: MessageWriter) {
54 if let Err(e) = write_stream.close().await {
55 tracing::warn!(error = %snafu::Report::from_error(&e), "failed to close h3 request stream");
56 }
57}
58
59pub async fn forward_h3(
61 req: Request<Incoming>,
62 client: &Endpoint,
63) -> Result<Response<impl Body<Data = bytes::Bytes, Error = MessageStreamError> + use<>>, Error> {
64 let authority = req
65 .uri()
66 .authority()
67 .ok_or_else(|| {
68 <Error as snafu::FromString>::without_source(
69 "missing authority in dhttp/3 request uri".to_string(),
70 )
71 })?
72 .clone();
73
74 let connection = client
75 .connect(authority.clone())
76 .await
77 .whatever_context::<_, Error>(format!(
78 "failed to connect to dhttp/3 server `{authority}`"
79 ))?;
80
81 let (mut read_stream, mut write_stream) = connection
82 .initial_message_stream()
83 .await
84 .whatever_context::<_, Error>("failed to open h3 message stream")?;
85
86 let req = rewrite_request_for_h3(req);
87
88 write_stream
89 .send_hyper_request(req)
90 .await
91 .whatever_context::<_, Error>("failed to send h3 request")?;
92
93 let (response_result, _) = tokio::join!(
96 async {
97 let mut parts = read_stream
98 .read_hyper_response_parts()
99 .await
100 .whatever_context::<_, Error>("failed to read h3 response")?;
101 while parts.status.is_informational() {
102 tracing::debug!(status = %parts.status, "skipping informational response");
103 parts = read_stream
104 .read_hyper_response_parts()
105 .await
106 .whatever_context::<_, Error>("failed to read h3 response")?;
107 }
108 Ok::<_, Error>(parts)
109 },
110 close_write_stream(write_stream),
111 );
112
113 let response_parts = response_result?;
114 let body = read_stream.into_hyper_body();
115 let mut resp = Response::from_parts(response_parts, body);
116 *resp.version_mut() = http::Version::HTTP_11;
119 Ok(resp)
120}