1use super::decompress::decompress;
19use super::error::{Error, Result};
20use super::stats::{HttpStat, ALPN_HTTP1, ALPN_HTTP2, ALPN_HTTP3};
21use super::SkipVerifier;
22use bytes::{Buf, Bytes, BytesMut};
23use chrono::{Local, TimeZone};
24use futures::future;
25use hickory_resolver::config::{LookupIpStrategy, NameServerConfigGroup, ResolverConfig};
26use hickory_resolver::name_server::TokioConnectionProvider;
27use hickory_resolver::TokioResolver;
28use http::request::Builder;
29use http::HeaderValue;
30use http::Request;
31use http::Response;
32use http::Uri;
33use http::{HeaderMap, Method};
34use http_body_util::{BodyExt, Full};
35use hyper::body::Incoming;
36use hyper_util::rt::TokioExecutor;
37use hyper_util::rt::TokioIo;
38use std::net::IpAddr;
39use std::net::SocketAddr;
40use std::sync::Arc;
41use std::sync::Once;
42use std::time::Duration;
43use std::time::Instant;
44use tokio::net::TcpStream;
45use tokio::sync::oneshot;
46use tokio::time::timeout;
47use tokio_rustls::client::TlsStream;
48use tokio_rustls::rustls::{ClientConfig, RootCertStore};
49use tokio_rustls::TlsConnector;
50
51const VERSION: &str = env!("CARGO_PKG_VERSION");
53
54fn format_tls_protocol(protocol: &str) -> String {
56 match protocol {
57 "TLSv1_3" => "tls v1.3".to_string(),
58 "TLSv1_2" => "tls v1.2".to_string(),
59 "TLSv1_1" => "tls v1.1".to_string(),
60 _ => protocol.to_string(),
61 }
62}
63
64fn format_time(timestamp_seconds: i64) -> String {
66 Local
67 .timestamp_nanos(timestamp_seconds * 1_000_000_000)
68 .to_string()
69}
70
71#[derive(Default, Debug, Clone)]
73pub struct HttpRequest {
74 pub uri: Uri, pub method: Option<Method>, pub alpn_protocols: Vec<String>, pub resolve: Option<IpAddr>, pub headers: Option<HeaderMap<HeaderValue>>, pub ip_version: Option<i32>, pub skip_verify: bool, pub body: Option<Bytes>, pub silent: bool, pub dns_servers: Option<Vec<String>>, }
85
86impl HttpRequest {
87 pub fn get_port(&self) -> u16 {
88 let default_port = if self.uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
89 443
90 } else {
91 80
92 };
93 self.uri.port_u16().unwrap_or(default_port)
94 }
95 fn builder(&self) -> Builder {
97 let uri = &self.uri;
98 let mut builder = Request::builder()
99 .uri(uri)
100 .method(self.method.clone().unwrap_or(Method::GET));
101 let mut set_host = false;
102 let mut set_user_agent = false;
103
104 if let Some(headers) = &self.headers {
106 for (key, value) in headers.iter() {
107 builder = builder.header(key, value);
108 match key.to_string().to_lowercase().as_str() {
109 "host" => set_host = true,
110 "user-agent" => set_user_agent = true,
111 _ => {}
112 }
113 }
114 }
115
116 if !set_host {
118 if let Some(host) = uri.host() {
119 builder = builder.header("Host", host);
120 }
121 }
122
123 if !set_user_agent {
125 builder = builder.header("User-Agent", format!("httpstat.rs/{}", VERSION));
126 }
127 builder
128 }
129}
130
131impl TryFrom<&str> for HttpRequest {
133 type Error = Error;
134
135 fn try_from(url: &str) -> Result<Self> {
136 let uri = url.parse::<Uri>().map_err(|e| Error::Uri { source: e })?;
137 Ok(Self {
138 uri,
139 alpn_protocols: vec![ALPN_HTTP2.to_string(), ALPN_HTTP1.to_string()],
140 ..Default::default()
141 })
142 }
143}
144
145impl TryFrom<&HttpRequest> for Request<Full<Bytes>> {
147 type Error = Error;
148 fn try_from(req: &HttpRequest) -> Result<Self> {
149 req.builder()
150 .body(Full::new(req.body.clone().unwrap_or_default()))
151 .map_err(|e| Error::Http { source: e })
152 }
153}
154
155static INIT: Once = Once::new();
157
158fn ensure_crypto_provider() {
159 INIT.call_once(|| {
160 let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
161 });
162}
163
164async fn dns_resolve(req: &HttpRequest, stat: &mut HttpStat) -> Result<(SocketAddr, String)> {
166 let host = req
167 .uri
168 .host()
169 .ok_or(Error::Common {
170 category: "http".to_string(),
171 message: "host is required".to_string(),
172 })?
173 .to_string();
174 let port = req.get_port();
175
176 if let Some(resolve) = &req.resolve {
178 let addr = SocketAddr::new(*resolve, port);
179 stat.addr = Some(addr.to_string());
180 return Ok((addr, host));
181 }
182
183 let provider = TokioConnectionProvider::default();
185
186 let mut builder = if let Some(dns_servers) = &req.dns_servers {
187 let servers: Vec<_> = dns_servers
188 .iter()
189 .flat_map(|server| server.parse::<IpAddr>().ok())
190 .collect();
191
192 let mut config = ResolverConfig::new();
193 for server in NameServerConfigGroup::from_ips_clear(&servers, 53, true).into_inner() {
194 config.add_name_server(server);
195 }
196 TokioResolver::builder_with_config(config, provider)
197 } else {
198 TokioResolver::builder(provider).map_err(|e| Error::Resolve { source: e })?
199 };
200
201 if let Some(ip_version) = req.ip_version {
202 match ip_version {
203 4 => builder.options_mut().ip_strategy = LookupIpStrategy::Ipv4Only,
204 6 => builder.options_mut().ip_strategy = LookupIpStrategy::Ipv6Only,
205 _ => {}
206 }
207 }
208
209 let resolver = builder.build();
211 let dns_start = Instant::now();
212 let addr = timeout(Duration::from_secs(10), resolver.lookup_ip(&host))
213 .await
214 .map_err(|e| Error::Timeout { source: e })?
215 .map_err(|e| Error::Resolve { source: e })?;
216 stat.dns_lookup = Some(dns_start.elapsed());
217 let addr = addr.into_iter().next().ok_or(Error::Common {
218 category: "http".to_string(),
219 message: "dns lookup failed".to_string(),
220 })?;
221 let addr = SocketAddr::new(addr, port);
222 stat.addr = Some(addr.to_string());
223
224 Ok((addr, host))
225}
226
227async fn tcp_connect(addr: SocketAddr, stat: &mut HttpStat) -> Result<TcpStream> {
229 let tcp_start = Instant::now();
230 let tcp_stream = timeout(Duration::from_secs(10), TcpStream::connect(addr))
231 .await
232 .map_err(|e| Error::Timeout { source: e })?
233 .map_err(|e| Error::Io { source: e })?;
234 stat.tcp_connect = Some(tcp_start.elapsed());
235 Ok(tcp_stream)
236}
237
238async fn tls_handshake(
240 host: String,
241 tcp_stream: TcpStream,
242 alpn_protocols: Vec<String>,
243 skip_verify: bool,
244 stat: &mut HttpStat,
245) -> Result<(TlsStream<TcpStream>, bool)> {
246 let tls_start = Instant::now();
247 let mut root_store = RootCertStore::empty();
248 let certs = rustls_native_certs::load_native_certs().certs;
249
250 for cert in certs {
252 root_store
253 .add(cert)
254 .map_err(|e| Error::Rustls { source: e })?;
255 }
256
257 let mut config = ClientConfig::builder()
259 .with_root_certificates(root_store)
260 .with_no_client_auth();
261
262 if skip_verify {
264 config
265 .dangerous()
266 .set_certificate_verifier(Arc::new(SkipVerifier));
267 }
268
269 config.alpn_protocols = alpn_protocols
271 .iter()
272 .map(|s| s.as_bytes().to_vec())
273 .collect();
274
275 let connector = TlsConnector::from(Arc::new(config));
276
277 let tls_stream = timeout(
279 Duration::from_secs(30),
280 connector.connect(
281 host.clone()
282 .try_into()
283 .map_err(|e| Error::InvalidDnsName { source: e })?,
284 tcp_stream,
285 ),
286 )
287 .await
288 .map_err(|e| Error::Timeout { source: e })?
289 .map_err(|e| Error::Io { source: e })?;
290 stat.tls_handshake = Some(tls_start.elapsed());
291
292 let (_, session) = tls_stream.get_ref();
294
295 stat.tls = session
296 .protocol_version()
297 .map(|v| format_tls_protocol(v.as_str().unwrap_or_default()));
298
299 if let Some(certs) = session.peer_certificates() {
301 if let Some(cert) = certs.first() {
302 if let Ok((_, cert)) = x509_parser::parse_x509_certificate(cert.as_ref()) {
303 stat.subject = Some(cert.subject().to_string());
304 stat.cert_not_before = Some(format_time(cert.validity().not_before.timestamp()));
305 stat.cert_not_after = Some(format_time(cert.validity().not_after.timestamp()));
306 stat.issuer = Some(cert.issuer().to_string());
307 if let Ok(Some(sans)) = cert.subject_alternative_name() {
308 let mut domains = Vec::new();
309 for san in sans.value.general_names.iter() {
310 if let x509_parser::extensions::GeneralName::DNSName(domain) = san {
311 domains.push(domain.to_string());
312 }
313 }
314 stat.cert_domains = Some(domains);
315 };
316 }
317 }
318 }
319
320 if let Some(cipher) = session.negotiated_cipher_suite() {
322 let cipher = format!("{:?}", cipher);
323 if let Some((_, cipher)) = cipher.split_once("_") {
324 stat.cert_cipher = Some(cipher.to_string());
325 } else {
326 stat.cert_cipher = Some(cipher);
327 }
328 }
329
330 let mut is_http2 = false;
332 if let Some(protocol) = session.alpn_protocol() {
333 let alpn = String::from_utf8_lossy(protocol).to_string();
334 is_http2 = alpn == ALPN_HTTP2;
335 stat.alpn = Some(alpn);
336 }
337 Ok((tls_stream, is_http2))
338}
339
340async fn send_http_request(
342 req: Request<Full<Bytes>>,
343 tcp_stream: TcpStream,
344 tx: oneshot::Sender<String>,
345 stat: &mut HttpStat,
346) -> Result<Response<Incoming>> {
347 let (mut sender, conn) = timeout(
348 Duration::from_secs(30),
349 hyper::client::conn::http1::handshake(TokioIo::new(tcp_stream)),
350 )
351 .await
352 .map_err(|e| Error::Timeout { source: e })?
353 .map_err(|e| Error::Hyper { source: e })?;
354
355 tokio::spawn(async move {
357 if let Err(e) = conn.await {
358 let _ = tx.send(e.to_string());
359 }
360 });
361
362 let server_processing_start = Instant::now();
363 let resp = sender
364 .send_request(req)
365 .await
366 .map_err(|e| Error::Hyper { source: e })?;
367 stat.server_processing = Some(server_processing_start.elapsed());
368 Ok(resp)
369}
370
371async fn send_https_request(
373 req: Request<Full<Bytes>>,
374 tls_stream: TlsStream<TcpStream>,
375 tx: oneshot::Sender<String>,
376 stat: &mut HttpStat,
377) -> Result<Response<Incoming>> {
378 let (mut sender, conn) = timeout(
379 Duration::from_secs(30),
380 hyper::client::conn::http1::handshake(TokioIo::new(tls_stream)),
381 )
382 .await
383 .map_err(|e| Error::Timeout { source: e })?
384 .map_err(|e| Error::Hyper { source: e })?;
385
386 tokio::spawn(async move {
388 if let Err(e) = conn.await {
389 let _ = tx.send(e.to_string());
390 }
391 });
392
393 let server_processing_start = Instant::now();
394 let resp = sender
395 .send_request(req)
396 .await
397 .map_err(|e| Error::Hyper { source: e })?;
398 stat.server_processing = Some(server_processing_start.elapsed());
399 Ok(resp)
400}
401
402async fn send_https2_request(
404 req: Request<Full<Bytes>>,
405 tls_stream: TlsStream<TcpStream>,
406 tx: oneshot::Sender<String>,
407 stat: &mut HttpStat,
408) -> Result<Response<Incoming>> {
409 let (mut sender, conn) = timeout(
410 Duration::from_secs(30),
411 hyper::client::conn::http2::handshake(TokioExecutor::new(), TokioIo::new(tls_stream)),
412 )
413 .await
414 .map_err(|e| Error::Timeout { source: e })?
415 .map_err(|e| Error::Hyper { source: e })?;
416
417 tokio::spawn(async move {
419 if let Err(e) = conn.await {
420 let _ = tx.send(e.to_string());
421 }
422 });
423
424 let mut req = req;
425 *req.version_mut() = hyper::Version::HTTP_2;
426 req.headers_mut().remove("Host");
428
429 let server_processing_start = Instant::now();
430 let resp = sender
431 .send_request(req)
432 .await
433 .map_err(|e| Error::Hyper { source: e })?;
434 stat.server_processing = Some(server_processing_start.elapsed());
435 Ok(resp)
436}
437
438fn finish_with_error(mut stat: HttpStat, error: impl ToString, start: Instant) -> HttpStat {
440 stat.error = Some(error.to_string());
441 stat.total = Some(start.elapsed());
442 stat
443}
444
445async fn quic_connect(
447 host: String,
448 addr: SocketAddr,
449 skip_verify: bool,
450 stat: &mut HttpStat,
451) -> Result<(quinn::Endpoint, quinn::Connection)> {
452 let quic_start = Instant::now();
453 let mut root_store = RootCertStore::empty();
454 let certs = rustls_native_certs::load_native_certs().certs;
455
456 for cert in certs {
458 root_store
459 .add(cert)
460 .map_err(|e| Error::Rustls { source: e })?;
461 }
462
463 let mut config = ClientConfig::builder()
465 .with_root_certificates(root_store)
466 .with_no_client_auth();
467 config.enable_early_data = true;
468 config.alpn_protocols = vec![ALPN_HTTP3.as_bytes().to_vec()];
469
470 if skip_verify {
472 config
473 .dangerous()
474 .set_certificate_verifier(Arc::new(SkipVerifier));
475 }
476
477 let mut client_endpoint =
479 h3_quinn::quinn::Endpoint::client("[::]:0".parse().map_err(|_| Error::Common {
480 category: "parse".to_string(),
481 message: "failed to parse address".to_string(),
482 })?)
483 .map_err(|e| Error::Io { source: e })?;
484
485 let h3_config =
486 quinn::crypto::rustls::QuicClientConfig::try_from(config).map_err(|e| Error::Common {
487 category: "quic".to_string(),
488 message: e.to_string(),
489 })?;
490
491 let client_config = quinn::ClientConfig::new(Arc::new(h3_config));
492 client_endpoint.set_default_client_config(client_config);
493
494 let conn = client_endpoint
496 .connect(addr, &host)
497 .map_err(|e| Error::QuicConnect { source: e })?
498 .await
499 .map_err(|e| Error::QuicConnection { source: e })?;
500
501 stat.quic_connect = Some(quic_start.elapsed());
502 Ok((client_endpoint, conn))
503}
504
505async fn http3_request(http_req: HttpRequest) -> HttpStat {
507 let start = Instant::now();
508 let mut stat = HttpStat {
509 alpn: Some(ALPN_HTTP3.to_string()),
510 ..Default::default()
511 };
512
513 let dns_result = dns_resolve(&http_req, &mut stat).await;
515 let (addr, host) = match dns_result {
516 Ok(result) => result,
517 Err(e) => {
518 return finish_with_error(stat, e, start);
519 }
520 };
521
522 let (client_endpoint, conn) = match timeout(
524 Duration::from_secs(30),
525 quic_connect(host, addr, http_req.skip_verify, &mut stat),
526 )
527 .await
528 {
529 Ok(Ok(result)) => result,
530 Ok(Err(e)) => {
531 return finish_with_error(stat, e, start);
532 }
533 Err(e) => {
534 return finish_with_error(stat, e, start);
535 }
536 };
537
538 stat.tls = Some("tls 1.3".to_string()); stat.alpn = Some(ALPN_HTTP3.to_string()); if let Some(peer_identity) = conn.peer_identity() {
544 if let Ok(certs) = peer_identity.downcast::<Vec<rustls::pki_types::CertificateDer>>() {
545 if let Some(cert) = certs.first() {
546 if let Ok((_, cert)) = x509_parser::parse_x509_certificate(cert.as_ref()) {
547 let oid_str = match cert.signature_algorithm.algorithm.to_string().as_str() {
548 "1.2.840.113549.1.1.11" => "AES_256_GCM_SHA384".to_string(),
549 "1.2.840.113549.1.1.12" => "AES_128_GCM_SHA256".to_string(),
550 "1.2.840.113549.1.1.13" => "CHACHA20_POLY1305_SHA256".to_string(),
551 "1.2.840.10045.4.3.2" => "AES_256_GCM_SHA384".to_string(),
552 "1.2.840.10045.4.3.3" => "AES_128_GCM_SHA256".to_string(),
553 "1.2.840.10045.4.3.4" => "CHACHA20_POLY1305_SHA256".to_string(),
554 "1.3.101.112" => "AES_256_GCM_SHA384".to_string(),
555 "1.3.101.113" => "AES_128_GCM_SHA256".to_string(),
556 _ => format!("{:?}", cert.signature_algorithm.algorithm),
557 };
558 stat.subject = Some(cert.subject().to_string());
559 stat.issuer = Some(cert.issuer().to_string());
560 stat.cert_cipher = Some(oid_str);
561 stat.cert_not_before =
562 Some(format_time(cert.validity().not_before.timestamp()));
563 stat.cert_not_after = Some(format_time(cert.validity().not_after.timestamp()));
564 if let Ok(Some(sans)) = cert.subject_alternative_name() {
565 let mut domains = Vec::new();
566 for san in sans.value.general_names.iter() {
567 if let x509_parser::extensions::GeneralName::DNSName(domain) = san {
568 domains.push(domain.to_string());
569 }
570 }
571 stat.cert_domains = Some(domains);
572 };
573 }
574 }
575 }
576 }
577
578 let quinn_conn = h3_quinn::Connection::new(conn);
580
581 let (mut driver, mut send_request) = match h3::client::new(quinn_conn)
582 .await
583 .map_err(|e| Error::H3ConnectionError { source: e })
584 {
585 Ok(result) => result,
586 Err(e) => {
587 return finish_with_error(stat, e, start);
588 }
589 };
590
591 let req = match http_req.builder().body(()) {
593 Ok(req) => req,
594 Err(e) => {
595 return finish_with_error(stat, e, start);
596 }
597 };
598 let body = http_req.body.unwrap_or_default();
599
600 let drive = async move {
602 Err::<(), h3::error::ConnectionError>(future::poll_fn(|cx| driver.poll_close(cx)).await)
603 };
604
605 let request = async move {
607 let mut stream = send_request.send_request(req).await?;
608 stream.send_data(body).await?;
609
610 let mut sub_stat = HttpStat::default();
611
612 stream.finish().await?;
614
615 let server_processing_start = Instant::now();
616
617 let resp = stream.recv_response().await?;
618 sub_stat.server_processing = Some(server_processing_start.elapsed());
619
620 sub_stat.status = Some(resp.status());
621 sub_stat.headers = Some(resp.headers().clone());
622
623 let content_transfer_start = Instant::now();
625 let mut buf = BytesMut::new();
626 while let Some(chunk) = stream.recv_data().await? {
627 buf.extend(chunk.chunk());
628 }
629 sub_stat.content_transfer = Some(content_transfer_start.elapsed());
630 sub_stat.body = Some(Bytes::from(buf));
631 Ok::<HttpStat, h3::error::StreamError>(sub_stat)
632 };
633
634 let (req_res, drive_res) = tokio::join!(request, drive);
636 match req_res {
637 Ok(sub_stat) => {
638 stat.server_processing = sub_stat.server_processing;
639 stat.content_transfer = sub_stat.content_transfer;
640 stat.status = sub_stat.status;
641 stat.headers = sub_stat.headers;
642 stat.body = sub_stat.body;
643 }
644 Err(err) => {
645 if !err.is_h3_no_error() {
646 stat.error = Some(err.to_string());
647 }
648 }
649 }
650 if let Err(err) = drive_res {
651 if !err.is_h3_no_error() {
652 stat.error = Some(err.to_string());
653 }
654 }
655
656 stat.total = Some(start.elapsed());
657 client_endpoint.close(0u32.into(), b"done");
659
660 stat
661}
662
663async fn http1_2_request(http_req: HttpRequest) -> HttpStat {
664 let start = Instant::now();
665 let mut stat = HttpStat::default();
666
667 let dns_result = dns_resolve(&http_req, &mut stat).await;
669 let (addr, host) = match dns_result {
670 Ok(result) => result,
671 Err(e) => {
672 return finish_with_error(stat, e, start);
673 }
674 };
675
676 let uri = &http_req.uri;
677 let is_https = uri.scheme() == Some(&http::uri::Scheme::HTTPS);
678
679 let req: Request<Full<Bytes>> = match (&http_req).try_into() {
681 Ok(req) => req,
682 Err(e) => {
683 return finish_with_error(stat, e, start);
684 }
685 };
686
687 let tcp_stream = match tcp_connect(addr, &mut stat).await {
689 Ok(stream) => stream,
690 Err(e) => {
691 return finish_with_error(stat, e, start);
692 }
693 };
694
695 let (tx, mut rx) = oneshot::channel();
697
698 let resp = if is_https {
700 let tls_result = tls_handshake(
702 host.clone(),
703 tcp_stream,
704 http_req.alpn_protocols,
705 http_req.skip_verify,
706 &mut stat,
707 )
708 .await;
709 let (tls_stream, is_http2) = match tls_result {
710 Ok(result) => result,
711 Err(e) => {
712 return finish_with_error(stat, e, start);
713 }
714 };
715
716 if is_http2 {
718 match send_https2_request(req, tls_stream, tx, &mut stat).await {
719 Ok(resp) => resp,
720 Err(e) => {
721 return finish_with_error(stat, e, start);
722 }
723 }
724 } else {
725 match send_https_request(req, tls_stream, tx, &mut stat).await {
726 Ok(resp) => resp,
727 Err(e) => {
728 return finish_with_error(stat, e, start);
729 }
730 }
731 }
732 } else {
733 match send_http_request(req, tcp_stream, tx, &mut stat).await {
735 Ok(resp) => resp,
736 Err(e) => {
737 return finish_with_error(stat, e, start);
738 }
739 }
740 };
741
742 stat.status = Some(resp.status());
744 stat.headers = Some(resp.headers().clone());
745
746 let content_transfer_start = Instant::now();
748 let body_result = resp.collect().await;
749 let body = match body_result {
750 Ok(body) => body,
751 Err(e) => {
752 return finish_with_error(stat, format!("Failed to read response body: {}", e), start);
753 }
754 };
755
756 let body_bytes = body.to_bytes();
757 stat.body = Some(body_bytes);
758 stat.content_transfer = Some(content_transfer_start.elapsed());
759
760 if let Ok(error) = rx.try_recv() {
762 stat.error = Some(error);
763 }
764
765 stat.total = Some(start.elapsed());
766 stat
767}
768
769pub async fn request(http_req: HttpRequest) -> HttpStat {
813 ensure_crypto_provider();
814 let silent = http_req.silent;
815
816 let mut stat = if http_req.alpn_protocols.contains(&ALPN_HTTP3.to_string()) {
818 http3_request(http_req).await
819 } else {
820 http1_2_request(http_req).await
821 };
822 if let Some(body) = &stat.body {
823 stat.body_size = Some(body.len());
824 }
825 let encoding = if let Some(headers) = &stat.headers {
826 headers
827 .get("content-encoding")
828 .map(|v| v.to_str().unwrap_or_default())
829 .unwrap_or_default()
830 } else {
831 ""
832 };
833
834 if !encoding.is_empty() {
835 if let Some(body) = &stat.body {
836 match decompress(encoding, body) {
837 Ok(data) => {
838 stat.body = Some(data);
839 }
840 Err(e) => {
841 stat.error = Some(e.to_string());
842 }
843 }
844 }
845 }
846
847 stat.silent = silent;
848
849 stat
850}