1use super::build_http_request;
19use super::decompress::decompress;
20use super::error::{Error, Result};
21use super::finish_with_error;
22use super::grpc::grpc_request;
23use super::net::{dns_resolve, parse_certificates, quic_connect, tcp_connect, tls_handshake};
24use super::proxy::{http_connect, socks5_connect, ProxyConfig, ProxyKind};
25use super::stats::{HttpStat, ALPN_HTTP3};
26use super::HttpRequest;
27use bytes::{Buf, Bytes, BytesMut};
28use futures::future;
29
30use http::Request;
31use http::Response;
32use http::Version;
33use http_body_util::{BodyExt, Full};
34use hyper::body::Incoming;
35use hyper_util::rt::TokioExecutor;
36use hyper_util::rt::TokioIo;
37use std::sync::Once;
38use std::time::Duration;
39use std::time::Instant;
40use tokio::net::TcpStream;
41use tokio::sync::oneshot;
42use tokio::time::timeout;
43use tokio_rustls::client::TlsStream;
44
45static INIT: Once = Once::new();
47
48fn ensure_crypto_provider() {
49 INIT.call_once(|| {
50 let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
51 });
52}
53
54async fn send_http1_request<S>(
56 req: Request<Full<Bytes>>,
57 stream: S,
58 request_timeout: Option<Duration>,
59 tx: oneshot::Sender<String>,
60 stat: &mut HttpStat,
61) -> Result<Response<Incoming>>
62where
63 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
64{
65 let (mut sender, conn) = timeout(
66 request_timeout.unwrap_or(Duration::from_secs(30)),
67 hyper::client::conn::http1::handshake(TokioIo::new(stream)),
68 )
69 .await
70 .map_err(|e| Error::Timeout { source: e })?
71 .map_err(|e| Error::Hyper { source: e })?;
72
73 tokio::spawn(async move {
75 if let Err(e) = conn.await {
76 let _ = tx.send(e.to_string());
77 }
78 });
79
80 let server_processing_start = Instant::now();
81 let resp = sender
82 .send_request(req)
83 .await
84 .map_err(|e| Error::Hyper { source: e })?;
85 stat.server_processing = Some(server_processing_start.elapsed());
86 Ok(resp)
87}
88
89async fn send_https2_request(
91 req: Request<Full<Bytes>>,
92 tls_stream: TlsStream<TcpStream>,
93 request_timeout: Option<Duration>,
94 tx: oneshot::Sender<String>,
95 stat: &mut HttpStat,
96) -> Result<Response<Incoming>> {
97 let (mut sender, conn) = timeout(
98 request_timeout.unwrap_or(Duration::from_secs(30)),
99 hyper::client::conn::http2::handshake(TokioExecutor::new(), TokioIo::new(tls_stream)),
100 )
101 .await
102 .map_err(|e| Error::Timeout { source: e })?
103 .map_err(|e| Error::Hyper { source: e })?;
104
105 tokio::spawn(async move {
107 if let Err(e) = conn.await {
108 let _ = tx.send(e.to_string());
109 }
110 });
111
112 let mut req = req;
113 *req.version_mut() = hyper::Version::HTTP_2;
114 req.headers_mut().remove("Host");
116
117 let server_processing_start = Instant::now();
118 let resp = sender
119 .send_request(req)
120 .await
121 .map_err(|e| Error::Hyper { source: e })?;
122 stat.server_processing = Some(server_processing_start.elapsed());
123 Ok(resp)
124}
125
126async fn http3_request(http_req: HttpRequest) -> HttpStat {
128 let start = Instant::now();
129 let mut stat = HttpStat {
130 alpn: Some(ALPN_HTTP3.to_string()),
131 ..Default::default()
132 };
133
134 let dns_result = dns_resolve(&http_req, &mut stat).await;
136 let (addr, host) = match dns_result {
137 Ok(result) => result,
138 Err(e) => {
139 return finish_with_error(stat, e, start);
140 }
141 };
142
143 let (client_endpoint, conn) = match timeout(
145 http_req.quic_timeout.unwrap_or(Duration::from_secs(30)),
146 quic_connect(
147 host,
148 addr,
149 http_req.skip_verify,
150 http_req.client_cert.as_deref(),
151 http_req.client_key.as_deref(),
152 http_req.bind_addr,
153 &mut stat,
154 ),
155 )
156 .await
157 {
158 Ok(Ok(result)) => result,
159 Ok(Err(e)) => {
160 return finish_with_error(stat, e, start);
161 }
162 Err(e) => {
163 return finish_with_error(stat, e, start);
164 }
165 };
166
167 stat.tls = Some("tls 1.3".to_string()); stat.alpn = Some(ALPN_HTTP3.to_string()); if let Some(peer_identity) = conn.peer_identity() {
173 if let Ok(certs) = peer_identity.downcast::<Vec<rustls::pki_types::CertificateDer>>() {
174 if let Some(first_cert) = certs.first() {
176 if let Ok((_, cert)) = x509_parser::parse_x509_certificate(first_cert.as_ref()) {
177 let oid_str = match cert.signature_algorithm.algorithm.to_string().as_str() {
178 "1.2.840.113549.1.1.11" => "AES_256_GCM_SHA384".to_string(),
179 "1.2.840.113549.1.1.12" => "AES_128_GCM_SHA256".to_string(),
180 "1.2.840.113549.1.1.13" => "CHACHA20_POLY1305_SHA256".to_string(),
181 "1.2.840.10045.4.3.2" => "AES_256_GCM_SHA384".to_string(),
182 "1.2.840.10045.4.3.3" => "AES_128_GCM_SHA256".to_string(),
183 "1.2.840.10045.4.3.4" => "CHACHA20_POLY1305_SHA256".to_string(),
184 "1.3.101.112" => "AES_256_GCM_SHA384".to_string(),
185 "1.3.101.113" => "AES_128_GCM_SHA256".to_string(),
186 _ => format!("{:?}", cert.signature_algorithm.algorithm),
187 };
188 stat.cert_cipher = Some(oid_str);
189 }
190 }
191 parse_certificates(&certs, &mut stat);
192 }
193 }
194
195 let quinn_conn = h3_quinn::Connection::new(conn);
197
198 let (mut driver, mut send_request) = match timeout(
199 http_req.request_timeout.unwrap_or(Duration::from_secs(30)),
200 h3::client::new(quinn_conn),
201 )
202 .await
203 {
204 Ok(Ok(result)) => result,
205 Ok(Err(e)) => {
206 return finish_with_error(stat, e, start);
207 }
208 Err(e) => {
209 return finish_with_error(stat, e, start);
210 }
211 };
212
213 let mut req = match http_req.builder(false).body(()) {
215 Ok(req) => req,
216 Err(e) => {
217 return finish_with_error(stat, e, start);
218 }
219 };
220 *req.version_mut() = Version::HTTP_3;
221 stat.request_headers = req.headers().clone();
222 let body = http_req.body.unwrap_or_default();
223
224 let drive = async move {
226 Err::<(), h3::error::ConnectionError>(future::poll_fn(|cx| driver.poll_close(cx)).await)
227 };
228
229 let request = async move {
231 let mut stream = send_request.send_request(req).await?;
232 stream.send_data(body).await?;
233
234 let mut sub_stat = HttpStat::default();
235
236 stream.finish().await?;
238
239 let server_processing_start = Instant::now();
240
241 let resp = stream.recv_response().await?;
242 sub_stat.server_processing = Some(server_processing_start.elapsed());
243
244 sub_stat.status = Some(resp.status());
245 sub_stat.headers = Some(resp.headers().clone());
246
247 let content_transfer_start = Instant::now();
249 let mut buf = BytesMut::new();
250 while let Some(chunk) = stream.recv_data().await? {
251 buf.extend(chunk.chunk());
252 }
253 sub_stat.content_transfer = Some(content_transfer_start.elapsed());
254 sub_stat.body = Some(Bytes::from(buf));
255 Ok::<HttpStat, h3::error::StreamError>(sub_stat)
256 };
257
258 let (req_res, drive_res) = tokio::join!(request, drive);
260 match req_res {
261 Ok(sub_stat) => {
262 stat.server_processing = sub_stat.server_processing;
263 stat.content_transfer = sub_stat.content_transfer;
264 stat.status = sub_stat.status;
265 stat.headers = sub_stat.headers;
266 stat.body = sub_stat.body;
267 }
268 Err(err) => {
269 if !err.is_h3_no_error() {
270 stat.error = Some(err.to_string());
271 }
272 }
273 }
274 if let Err(err) = drive_res {
275 if !err.is_h3_no_error() {
276 stat.error = Some(err.to_string());
277 }
278 }
279
280 stat.total = Some(start.elapsed());
281 client_endpoint.close(0u32.into(), b"done");
283
284 stat
285}
286
287async fn tcp_via_proxy(
292 http_req: &HttpRequest,
293 stat: &mut HttpStat,
294) -> Result<(TcpStream, String, bool)> {
295 let uri = &http_req.uri;
296 let is_https = uri.scheme() == Some(&http::uri::Scheme::HTTPS);
297 let target_host = uri.host().unwrap_or_default().to_string();
298 let target_port = http_req.get_port();
299
300 if let Some(proxy) = http_req.proxy.as_deref().and_then(ProxyConfig::parse) {
301 let proxy_addr = format!("{}:{}", proxy.host, proxy.port);
302 let tcp_start = Instant::now();
303 let proxy_stream = timeout(
304 http_req.tcp_timeout.unwrap_or(Duration::from_secs(5)),
305 TcpStream::connect(&proxy_addr),
306 )
307 .await
308 .map_err(|e| Error::Timeout { source: e })?
309 .map_err(|e| Error::Io { source: e })?;
310
311 if let Ok(peer) = proxy_stream.peer_addr() {
312 stat.addr = Some(peer.to_string());
313 }
314
315 let is_http_forward = !is_https && matches!(proxy.kind, ProxyKind::Http);
317 let stream = if is_http_forward {
318 proxy_stream
319 } else {
320 match proxy.kind {
321 ProxyKind::Socks5 => {
322 socks5_connect(proxy_stream, &target_host, target_port).await?
323 }
324 ProxyKind::Http => http_connect(proxy_stream, &target_host, target_port).await?,
325 }
326 };
327 stat.tcp_connect = Some(tcp_start.elapsed());
328 Ok((stream, target_host, is_http_forward))
329 } else {
330 let (addr, host) = dns_resolve(http_req, stat).await?;
331 let stream = tcp_connect(addr, http_req.tcp_timeout, http_req.bind_addr, stat).await?;
332 Ok((stream, host, false))
333 }
334}
335
336async fn http1_2_request(mut http_req: HttpRequest) -> HttpStat {
337 let start = Instant::now();
338 let mut stat = HttpStat::default();
339
340 let is_https = http_req.uri.scheme() == Some(&http::uri::Scheme::HTTPS);
341
342 let (tcp_stream, host, is_http_forward) = match tcp_via_proxy(&http_req, &mut stat).await {
344 Ok(r) => r,
345 Err(e) => return finish_with_error(stat, e, start),
346 };
347
348 if is_http_forward {
350 http_req.use_absolute_uri = true;
351 }
352
353 let (tx, mut rx) = oneshot::channel();
355
356 let resp = if is_https {
358 let tls_result = tls_handshake(host.clone(), tcp_stream, &http_req, &mut stat).await;
360 let (tls_stream, is_http2) = match tls_result {
361 Ok(result) => result,
362 Err(e) => {
363 return finish_with_error(stat, e, start);
364 }
365 };
366
367 if is_http2 {
369 let req = match build_http_request(&http_req, false) {
370 Ok(req) => req,
371 Err(e) => {
372 return finish_with_error(stat, e, start);
373 }
374 };
375 stat.request_headers = req.headers().clone();
376 match send_https2_request(req, tls_stream, http_req.request_timeout, tx, &mut stat)
377 .await
378 {
379 Ok(resp) => resp,
380 Err(e) => {
381 return finish_with_error(stat, e, start);
382 }
383 }
384 } else {
385 let req = match build_http_request(&http_req, true) {
386 Ok(req) => req,
387 Err(e) => {
388 return finish_with_error(stat, e, start);
389 }
390 };
391 stat.request_headers = req.headers().clone();
392 match send_http1_request(req, tls_stream, http_req.request_timeout, tx, &mut stat).await
393 {
394 Ok(resp) => resp,
395 Err(e) => {
396 return finish_with_error(stat, e, start);
397 }
398 }
399 }
400 } else {
401 let req = match build_http_request(&http_req, true) {
402 Ok(req) => req,
403 Err(e) => {
404 return finish_with_error(stat, e, start);
405 }
406 };
407 stat.request_headers = req.headers().clone();
408 match send_http1_request(req, tcp_stream, http_req.request_timeout, tx, &mut stat).await {
410 Ok(resp) => resp,
411 Err(e) => {
412 return finish_with_error(stat, e, start);
413 }
414 }
415 };
416
417 stat.status = Some(resp.status());
419 stat.headers = Some(resp.headers().clone());
420
421 if let Ok(error) = rx.try_recv() {
423 stat.error = Some(error);
424 }
425 let content_transfer_start = Instant::now();
427 let body_result = resp.collect().await;
428 let body = match body_result {
429 Ok(body) => body,
430 Err(e) => {
431 return finish_with_error(stat, format!("Failed to read response body: {e}"), start);
432 }
433 };
434
435 let body_bytes = body.to_bytes();
436 stat.body = Some(body_bytes);
437 stat.content_transfer = Some(content_transfer_start.elapsed());
438
439 stat.total = Some(start.elapsed());
440 stat
441}
442
443pub async fn request(http_req: HttpRequest) -> HttpStat {
478 ensure_crypto_provider();
479 let schema = if let Some(schema) = http_req.uri.scheme() {
480 schema.to_string()
481 } else {
482 "".to_string()
483 };
484
485 let mut stat = if ["grpc", "grpcs"].contains(&schema.as_str()) {
487 grpc_request(http_req).await
488 } else if http_req.alpn_protocols.contains(&ALPN_HTTP3.to_string()) {
489 http3_request(http_req).await
490 } else {
491 http1_2_request(http_req).await
492 };
493 if let Some(body) = &stat.body {
494 stat.body_size = Some(body.len());
495 }
496 let encoding = if let Some(headers) = &stat.headers {
497 headers
498 .get("content-encoding")
499 .map(|v| v.to_str().unwrap_or_default())
500 .unwrap_or_default()
501 } else {
502 ""
503 };
504
505 if !encoding.is_empty() {
506 if let Some(body) = &stat.body {
507 match decompress(encoding, body) {
508 Ok(data) => {
509 stat.body = Some(data);
510 }
511 Err(e) => {
512 stat.error = Some(e.to_string());
513 }
514 }
515 }
516 }
517
518 stat
519}
520
521enum ConnectionSender {
524 Http1(hyper::client::conn::http1::SendRequest<Full<Bytes>>),
525 Http2(hyper::client::conn::http2::SendRequest<Full<Bytes>>),
526}
527
528pub struct HttpConnection {
530 sender: ConnectionSender,
531 is_http2: bool,
532}
533
534async fn establish_http1<S>(
535 stream: S,
536 handshake_timeout: Duration,
537 mut stat: HttpStat,
538 start: Instant,
539) -> (HttpStat, Option<HttpConnection>)
540where
541 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
542{
543 match timeout(
544 handshake_timeout,
545 hyper::client::conn::http1::handshake(TokioIo::new(stream)),
546 )
547 .await
548 {
549 Ok(Ok((sender, conn))) => {
550 tokio::spawn(async move {
551 let _ = conn.await;
552 });
553 stat.total = Some(start.elapsed());
554 (
555 stat,
556 Some(HttpConnection {
557 sender: ConnectionSender::Http1(sender),
558 is_http2: false,
559 }),
560 )
561 }
562 Ok(Err(e)) => (
563 finish_with_error(stat, Error::Hyper { source: e }, start),
564 None,
565 ),
566 Err(e) => (
567 finish_with_error(stat, Error::Timeout { source: e }, start),
568 None,
569 ),
570 }
571}
572
573async fn establish_http2<S>(
574 stream: S,
575 handshake_timeout: Duration,
576 mut stat: HttpStat,
577 start: Instant,
578) -> (HttpStat, Option<HttpConnection>)
579where
580 S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static,
581{
582 match timeout(
583 handshake_timeout,
584 hyper::client::conn::http2::handshake(TokioExecutor::new(), TokioIo::new(stream)),
585 )
586 .await
587 {
588 Ok(Ok((sender, conn))) => {
589 tokio::spawn(async move {
590 let _ = conn.await;
591 });
592 stat.total = Some(start.elapsed());
593 (
594 stat,
595 Some(HttpConnection {
596 sender: ConnectionSender::Http2(sender),
597 is_http2: true,
598 }),
599 )
600 }
601 Ok(Err(e)) => (
602 finish_with_error(stat, Error::Hyper { source: e }, start),
603 None,
604 ),
605 Err(e) => (
606 finish_with_error(stat, Error::Timeout { source: e }, start),
607 None,
608 ),
609 }
610}
611
612pub async fn connect(http_req: &HttpRequest) -> (HttpStat, Option<HttpConnection>) {
617 ensure_crypto_provider();
618 let start = Instant::now();
619 let mut stat = HttpStat::default();
620
621 let is_https = http_req.uri.scheme() == Some(&http::uri::Scheme::HTTPS);
622
623 let (tcp_stream, host, _is_http_forward) = match tcp_via_proxy(http_req, &mut stat).await {
624 Ok(r) => r,
625 Err(e) => return (finish_with_error(stat, e, start), None),
626 };
627
628 let handshake_timeout = http_req.request_timeout.unwrap_or(Duration::from_secs(30));
629
630 if is_https {
631 let (tls_stream, is_h2) = match tls_handshake(host, tcp_stream, http_req, &mut stat).await {
632 Ok(r) => r,
633 Err(e) => return (finish_with_error(stat, e, start), None),
634 };
635
636 if is_h2 {
637 establish_http2(tls_stream, handshake_timeout, stat, start).await
638 } else {
639 establish_http1(tls_stream, handshake_timeout, stat, start).await
640 }
641 } else {
642 establish_http1(tcp_stream, handshake_timeout, stat, start).await
643 }
644}
645
646impl HttpConnection {
647 pub async fn send(&mut self, http_req: &HttpRequest) -> HttpStat {
649 let start = Instant::now();
650 let mut stat = HttpStat::default();
651
652 let is_http1 = !self.is_http2;
653 let req = match build_http_request(http_req, is_http1) {
654 Ok(req) => req,
655 Err(e) => return finish_with_error(stat, e, start),
656 };
657 stat.request_headers = req.headers().clone();
658
659 match &mut self.sender {
661 ConnectionSender::Http1(sender) => {
662 if let Err(e) = sender.ready().await {
663 return finish_with_error(stat, Error::Hyper { source: e }, start);
664 }
665 }
666 ConnectionSender::Http2(sender) => {
667 if let Err(e) = sender.ready().await {
668 return finish_with_error(stat, Error::Hyper { source: e }, start);
669 }
670 }
671 }
672
673 let server_processing_start = Instant::now();
674 let resp = match &mut self.sender {
675 ConnectionSender::Http1(sender) => sender.send_request(req).await,
676 ConnectionSender::Http2(sender) => {
677 let mut req = req;
678 *req.version_mut() = Version::HTTP_2;
679 req.headers_mut().remove("Host");
680 sender.send_request(req).await
681 }
682 };
683
684 let resp = match resp {
685 Ok(resp) => resp,
686 Err(e) => return finish_with_error(stat, Error::Hyper { source: e }, start),
687 };
688 stat.server_processing = Some(server_processing_start.elapsed());
689 stat.status = Some(resp.status());
690 stat.headers = Some(resp.headers().clone());
691
692 let content_transfer_start = Instant::now();
694 match resp.collect().await {
695 Ok(body) => {
696 let body_bytes = body.to_bytes();
697 stat.body = Some(body_bytes);
698 stat.content_transfer = Some(content_transfer_start.elapsed());
699 }
700 Err(e) => {
701 return finish_with_error(
702 stat,
703 format!("Failed to read response body: {e}"),
704 start,
705 );
706 }
707 }
708
709 stat.total = Some(start.elapsed());
710
711 if let Some(body) = &stat.body {
713 stat.body_size = Some(body.len());
714 }
715 let encoding = stat
716 .headers
717 .as_ref()
718 .and_then(|h| h.get("content-encoding"))
719 .and_then(|v| v.to_str().ok())
720 .unwrap_or_default();
721 if !encoding.is_empty() {
722 if let Some(body) = &stat.body {
723 match decompress(encoding, body) {
724 Ok(data) => stat.body = Some(data),
725 Err(e) => stat.error = Some(e.to_string()),
726 }
727 }
728 }
729
730 stat
731 }
732}