1use super::error::{Error, Result};
30use super::stats::{HttpStat, ALPN_HTTP1, ALPN_HTTP2, ALPN_HTTP3};
31use super::SkipVerifier;
32use bytes::{Buf, Bytes, BytesMut};
33use futures::future;
34use hickory_resolver::config::LookupIpStrategy;
35use hickory_resolver::name_server::TokioConnectionProvider;
36use hickory_resolver::TokioResolver;
37use http::request::Builder;
38use http::HeaderValue;
39use http::Request;
40use http::Response;
41use http::Uri;
42use http::{HeaderMap, Method};
43use http_body_util::{BodyExt, Full};
44use hyper::body::Incoming;
45use hyper_util::rt::TokioExecutor;
46use hyper_util::rt::TokioIo;
47use std::collections::HashMap;
48use std::net::IpAddr;
49use std::net::SocketAddr;
50use std::sync::Arc;
51use std::sync::Once;
52use std::time::Duration;
53use std::time::Instant;
54use tokio::fs;
55use tokio::net::TcpStream;
56use tokio::sync::oneshot;
57use tokio::time::timeout;
58use tokio_rustls::client::TlsStream;
59use tokio_rustls::rustls::{ClientConfig, RootCertStore};
60use tokio_rustls::TlsConnector;
61
62const VERSION: &str = env!("CARGO_PKG_VERSION");
63
64fn format_tls_protocol(protocol: &str) -> String {
65 match protocol {
66 "TLSv1_3" => "tls v1.3".to_string(),
67 "TLSv1_2" => "tls v1.2".to_string(),
68 "TLSv1_1" => "tls v1.1".to_string(),
69 _ => protocol.to_string(),
70 }
71}
72
73#[derive(Default, Debug, Clone)]
74pub struct HttpRequest {
75 pub uri: Uri,
76 pub method: Option<Method>,
77 pub alpn_protocols: Vec<String>,
78 pub resolves: Option<HashMap<String, IpAddr>>,
79 pub headers: Option<HeaderMap<HeaderValue>>,
80 pub ip_version: Option<i32>, pub skip_verify: bool,
82 pub output: Option<String>,
83 pub body: Option<Bytes>,
84}
85
86impl HttpRequest {
87 fn builder(&self) -> Builder {
88 let uri = &self.uri;
89 let mut builder = Request::builder()
90 .uri(uri)
91 .method(self.method.clone().unwrap_or(Method::GET));
92 let mut set_host = false;
93 let mut set_user_agent = false;
94 if let Some(headers) = &self.headers {
95 for (key, value) in headers.iter() {
96 builder = builder.header(key, value);
97 match key.to_string().to_lowercase().as_str() {
98 "host" => set_host = true,
99 "user-agent" => set_user_agent = true,
100 _ => {}
101 }
102 }
103 }
104 if !set_host {
105 if let Some(host) = uri.host() {
106 builder = builder.header("Host", host);
107 }
108 }
109 if !set_user_agent {
110 builder = builder.header("User-Agent", format!("httpstat.rs/{}", VERSION));
111 }
112 builder
113 }
114}
115
116impl TryFrom<&str> for HttpRequest {
117 type Error = Error;
118
119 fn try_from(url: &str) -> Result<Self> {
120 let uri = url.parse::<Uri>().map_err(|e| Error::Uri { source: e })?;
121 Ok(Self {
122 uri,
123 method: None,
124 alpn_protocols: vec![ALPN_HTTP2.to_string(), ALPN_HTTP1.to_string()],
125 resolves: None,
126 headers: None,
127 ip_version: None,
128 skip_verify: false,
129 output: None,
130 body: None,
131 })
132 }
133}
134
135impl TryFrom<&HttpRequest> for Request<Full<Bytes>> {
136 type Error = Error;
137 fn try_from(req: &HttpRequest) -> Result<Self> {
138 req.builder()
139 .body(Full::new(req.body.clone().unwrap_or_default()))
140 .map_err(|e| Error::Http { source: e })
141 }
142}
143
144static INIT: Once = Once::new();
145
146fn ensure_crypto_provider() {
147 INIT.call_once(|| {
148 let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
149 });
150}
151
152async fn dns_resolve(req: &HttpRequest, stat: &mut HttpStat) -> Result<(SocketAddr, String)> {
153 let host = req
154 .uri
155 .host()
156 .ok_or(Error::Common {
157 category: "http".to_string(),
158 message: "host is required".to_string(),
159 })?
160 .to_string();
161 let default_port = if req.uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
162 443
163 } else {
164 80
165 };
166 let port = req.uri.port_u16().unwrap_or(default_port);
167
168 if let Some(resolves) = &req.resolves {
170 let host_port = format!("{}:{}", host, port);
171 if let Some(ip) = resolves.get(&host_port) {
172 let addr = SocketAddr::new(*ip, port);
173 stat.addr = Some(addr.to_string());
174 return Ok((addr, host));
175 }
176 }
177
178 let provider = TokioConnectionProvider::default();
179 let mut builder = TokioResolver::builder(provider).map_err(|e| Error::Resolve { source: e })?;
180 if let Some(ip_version) = req.ip_version {
181 match ip_version {
182 4 => builder.options_mut().ip_strategy = LookupIpStrategy::Ipv4Only,
183 6 => builder.options_mut().ip_strategy = LookupIpStrategy::Ipv6Only,
184 _ => {}
185 }
186 }
187
188 let resolver = builder.build();
189 let dns_start = Instant::now();
190 let addr = resolver
191 .lookup_ip(&host)
192 .await
193 .map_err(|e| Error::Resolve { source: e })?;
194 stat.dns_lookup = Some(dns_start.elapsed());
195 let addr = addr.into_iter().next().ok_or(Error::Common {
196 category: "http".to_string(),
197 message: "dns lookup failed".to_string(),
198 })?;
199 let addr = SocketAddr::new(addr, port);
200 stat.addr = Some(addr.to_string());
201
202 Ok((addr, host))
203}
204
205async fn tcp_connect(addr: SocketAddr, stat: &mut HttpStat) -> Result<TcpStream> {
206 let tcp_start = Instant::now();
207 let tcp_stream = timeout(Duration::from_secs(10), TcpStream::connect(addr))
208 .await
209 .map_err(|e| Error::Timeout { source: e })?
210 .map_err(|e| Error::Io { source: e })?;
211 stat.tcp_connect = Some(tcp_start.elapsed());
212 Ok(tcp_stream)
213}
214
215async fn tls_handshake(
216 host: String,
217 tcp_stream: TcpStream,
218 alpn_protocols: Vec<String>,
219 skip_verify: bool,
220 stat: &mut HttpStat,
221) -> Result<(TlsStream<TcpStream>, bool)> {
222 let tls_start = Instant::now();
223 let mut root_store = RootCertStore::empty();
224 let certs = rustls_native_certs::load_native_certs().certs;
225
226 for cert in certs {
227 root_store
228 .add(cert)
229 .map_err(|e| Error::Rustls { source: e })?;
230 }
231 let mut config = ClientConfig::builder()
232 .with_root_certificates(root_store)
233 .with_no_client_auth();
234
235 if skip_verify {
237 config
238 .dangerous()
239 .set_certificate_verifier(Arc::new(SkipVerifier));
240 }
241
242 config.alpn_protocols = alpn_protocols
243 .iter()
244 .map(|s| s.as_bytes().to_vec())
245 .collect();
246
247 let connector = TlsConnector::from(Arc::new(config));
248
249 let tls_stream = timeout(
251 Duration::from_secs(30),
252 connector.connect(
253 host.clone()
254 .try_into()
255 .map_err(|e| Error::InvalidDnsName { source: e })?,
256 tcp_stream,
257 ),
258 )
259 .await
260 .map_err(|e| Error::Timeout { source: e })?
261 .map_err(|e| Error::Io { source: e })?;
262 stat.tls_handshake = Some(tls_start.elapsed());
263
264 let (_, session) = tls_stream.get_ref();
265
266 stat.tls = session
267 .protocol_version()
268 .map(|v| format_tls_protocol(v.as_str().unwrap_or_default()));
269
270 if let Some(certs) = session.peer_certificates() {
271 if let Some(cert) = certs.first() {
272 if let Ok((_, cert)) = x509_parser::parse_x509_certificate(cert.as_ref()) {
273 stat.cert_not_before = Some(cert.validity().not_before.to_string());
274 stat.cert_not_after = Some(cert.validity().not_after.to_string());
275 if let Ok(Some(sans)) = cert.subject_alternative_name() {
276 let mut domains = Vec::new();
277 for san in sans.value.general_names.iter() {
278 if let x509_parser::extensions::GeneralName::DNSName(domain) = san {
279 domains.push(domain.to_string());
280 }
281 }
282 stat.cert_domains = Some(domains);
283 };
284 }
285 }
286 }
287 if let Some(cipher) = session.negotiated_cipher_suite() {
288 let cipher = format!("{:?}", cipher);
289 if let Some((_, cipher)) = cipher.split_once("_") {
290 stat.cert_cipher = Some(cipher.to_string());
291 } else {
292 stat.cert_cipher = Some(cipher);
293 }
294 }
295 let mut is_http2 = false;
296 if let Some(protocol) = session.alpn_protocol() {
297 let alpn = String::from_utf8_lossy(protocol).to_string();
298 is_http2 = alpn == ALPN_HTTP2;
299 stat.alpn = Some(alpn);
300 }
301 Ok((tls_stream, is_http2))
302}
303
304async fn send_http_request(
305 req: Request<Full<Bytes>>,
306 tcp_stream: TcpStream,
307 tx: oneshot::Sender<String>,
308 stat: &mut HttpStat,
309) -> Result<Response<Incoming>> {
310 let (mut sender, conn) = timeout(
311 Duration::from_secs(30),
312 hyper::client::conn::http1::handshake(TokioIo::new(tcp_stream)),
313 )
314 .await
315 .map_err(|e| Error::Timeout { source: e })?
316 .map_err(|e| Error::Hyper { source: e })?;
317 tokio::spawn(async move {
319 if let Err(e) = conn.await {
320 let _ = tx.send(e.to_string());
321 }
322 });
323
324 let server_processing_start = Instant::now();
325 let resp = sender
326 .send_request(req)
327 .await
328 .map_err(|e| Error::Hyper { source: e })?;
329 stat.server_processing = Some(server_processing_start.elapsed());
330 Ok(resp)
331}
332
333async fn send_https_request(
334 req: Request<Full<Bytes>>,
335 tls_stream: TlsStream<TcpStream>,
336 tx: oneshot::Sender<String>,
337 stat: &mut HttpStat,
338) -> Result<Response<Incoming>> {
339 let (mut sender, conn) = timeout(
340 Duration::from_secs(30),
341 hyper::client::conn::http1::handshake(TokioIo::new(tls_stream)),
342 )
343 .await
344 .map_err(|e| Error::Timeout { source: e })?
345 .map_err(|e| Error::Hyper { source: e })?;
346 tokio::spawn(async move {
348 if let Err(e) = conn.await {
349 let _ = tx.send(e.to_string());
350 }
351 });
352
353 let server_processing_start = Instant::now();
354 let resp = sender
355 .send_request(req)
356 .await
357 .map_err(|e| Error::Hyper { source: e })?;
358 stat.server_processing = Some(server_processing_start.elapsed());
359 Ok(resp)
360}
361
362async fn send_https2_request(
363 req: Request<Full<Bytes>>,
364 tls_stream: TlsStream<TcpStream>,
365 tx: oneshot::Sender<String>,
366 stat: &mut HttpStat,
367) -> Result<Response<Incoming>> {
368 let (mut sender, conn) = timeout(
369 Duration::from_secs(30),
370 hyper::client::conn::http2::handshake(TokioExecutor::new(), TokioIo::new(tls_stream)),
371 )
372 .await
373 .map_err(|e| Error::Timeout { source: e })?
374 .map_err(|e| Error::Hyper { source: e })?;
375 tokio::spawn(async move {
377 if let Err(e) = conn.await {
378 let _ = tx.send(e.to_string());
379 }
380 });
381
382 let mut req = req;
383 *req.version_mut() = hyper::Version::HTTP_2;
384 req.headers_mut().remove("Host");
386
387 let server_processing_start = Instant::now();
388 let resp = sender
389 .send_request(req)
390 .await
391 .map_err(|e| Error::Hyper { source: e })?;
392 stat.server_processing = Some(server_processing_start.elapsed());
393 Ok(resp)
394}
395
396fn finish_with_error(mut stat: HttpStat, error: impl ToString, start: Instant) -> HttpStat {
397 stat.error = Some(error.to_string());
398 stat.total = Some(start.elapsed());
399 stat
400}
401
402async fn quic_connect(
403 host: String,
404 addr: SocketAddr,
405 skip_verify: bool,
406 stat: &mut HttpStat,
407) -> Result<(quinn::Endpoint, quinn::Connection)> {
408 let quic_start = Instant::now();
409 let mut root_store = RootCertStore::empty();
410 let certs = rustls_native_certs::load_native_certs().certs;
411
412 for cert in certs {
413 root_store
414 .add(cert)
415 .map_err(|e| Error::Rustls { source: e })?;
416 }
417 let mut config = ClientConfig::builder()
418 .with_root_certificates(root_store)
419 .with_no_client_auth();
420 config.enable_early_data = true;
421 config.alpn_protocols = vec![ALPN_HTTP3.as_bytes().to_vec()];
422 if skip_verify {
424 config
425 .dangerous()
426 .set_certificate_verifier(Arc::new(SkipVerifier));
427 }
428
429 let mut client_endpoint =
430 h3_quinn::quinn::Endpoint::client("[::]:0".parse().map_err(|_| Error::Common {
431 category: "parse".to_string(),
432 message: "failed to parse address".to_string(),
433 })?)
434 .map_err(|e| Error::Io { source: e })?;
435
436 let h3_config =
437 quinn::crypto::rustls::QuicClientConfig::try_from(config).map_err(|e| Error::Common {
438 category: "quic".to_string(),
439 message: e.to_string(),
440 })?;
441
442 let client_config = quinn::ClientConfig::new(Arc::new(h3_config));
443 client_endpoint.set_default_client_config(client_config);
444
445 let conn = client_endpoint
446 .connect(addr, &host)
447 .map_err(|e| Error::QuicConnect { source: e })?
448 .await
449 .map_err(|e| Error::QuicConnection { source: e })?;
450
451 stat.quic_connect = Some(quic_start.elapsed());
452 Ok((client_endpoint, conn))
453}
454
455async fn http3_request(http_req: HttpRequest) -> HttpStat {
456 let start = Instant::now();
457 let mut stat = HttpStat {
458 alpn: Some(ALPN_HTTP3.to_string()),
459 ..Default::default()
460 };
461
462 let dns_result = dns_resolve(&http_req, &mut stat).await;
464 let (addr, host) = match dns_result {
465 Ok(result) => result,
466 Err(e) => {
467 return finish_with_error(stat, e, start);
468 }
469 };
470
471 let (client_endpoint, conn) =
472 match quic_connect(host, addr, http_req.skip_verify, &mut stat).await {
473 Ok(result) => result,
474 Err(e) => {
475 return finish_with_error(stat, e, start);
476 }
477 };
478
479 stat.tls = Some("tls 1.3".to_string()); stat.alpn = Some(ALPN_HTTP3.to_string()); if let Some(peer_identity) = conn.peer_identity() {
485 if let Ok(certs) = peer_identity.downcast::<Vec<rustls::pki_types::CertificateDer>>() {
486 if let Some(cert) = certs.first() {
487 if let Ok((_, cert)) = x509_parser::parse_x509_certificate(cert.as_ref()) {
488 let oid_str = match cert.signature_algorithm.algorithm.to_string().as_str() {
489 "1.2.840.113549.1.1.11" => "AES_256_GCM_SHA384".to_string(),
490 "1.2.840.113549.1.1.12" => "AES_128_GCM_SHA256".to_string(),
491 "1.2.840.113549.1.1.13" => "CHACHA20_POLY1305_SHA256".to_string(),
492 "1.2.840.10045.4.3.2" => "AES_256_GCM_SHA384".to_string(),
493 "1.2.840.10045.4.3.3" => "AES_128_GCM_SHA256".to_string(),
494 "1.2.840.10045.4.3.4" => "CHACHA20_POLY1305_SHA256".to_string(),
495 "1.3.101.112" => "AES_256_GCM_SHA384".to_string(),
496 "1.3.101.113" => "AES_128_GCM_SHA256".to_string(),
497 _ => format!("{:?}", cert.signature_algorithm.algorithm),
498 };
499 stat.cert_cipher = Some(oid_str);
500 stat.cert_not_before = Some(cert.validity().not_before.to_string());
501 stat.cert_not_after = Some(cert.validity().not_after.to_string());
502 if let Ok(Some(sans)) = cert.subject_alternative_name() {
503 let mut domains = Vec::new();
504 for san in sans.value.general_names.iter() {
505 if let x509_parser::extensions::GeneralName::DNSName(domain) = san {
506 domains.push(domain.to_string());
507 }
508 }
509 stat.cert_domains = Some(domains);
510 };
511 }
512 }
513 }
514 }
515
516 let quinn_conn = h3_quinn::Connection::new(conn);
517
518 let (mut driver, mut send_request) = match h3::client::new(quinn_conn)
519 .await
520 .map_err(|e| Error::H3ConnectionError { source: e })
521 {
522 Ok(result) => result,
523 Err(e) => {
524 return finish_with_error(stat, e, start);
525 }
526 };
527
528 let req = match http_req.builder().body(()) {
529 Ok(req) => req,
530 Err(e) => {
531 return finish_with_error(stat, e, start);
532 }
533 };
534 let body = http_req.body.unwrap_or_default();
535
536 let drive = async move {
537 Err::<(), h3::error::ConnectionError>(future::poll_fn(|cx| driver.poll_close(cx)).await)
538 };
539
540 let request = async move {
541 let mut stream = send_request.send_request(req).await?;
542 stream.send_data(body).await?;
543
544 let mut sub_stat = HttpStat::default();
545
546 stream.finish().await?;
548
549 let server_processing_start = Instant::now();
550
551 let resp = stream.recv_response().await?;
552 sub_stat.server_processing = Some(server_processing_start.elapsed());
553
554 sub_stat.status = Some(resp.status());
555 sub_stat.headers = Some(resp.headers().clone());
556
557 let content_transfer_start = Instant::now();
558 let mut buf = BytesMut::new();
559 while let Some(chunk) = stream.recv_data().await? {
560 buf.extend(chunk.chunk());
561 }
562 sub_stat.content_transfer = Some(content_transfer_start.elapsed());
563 sub_stat.body = Some(Bytes::from(buf));
564
565 Ok::<HttpStat, h3::error::StreamError>(sub_stat)
566 };
567
568 let (req_res, drive_res) = tokio::join!(request, drive);
569 match req_res {
570 Ok(sub_stat) => {
571 stat.server_processing = sub_stat.server_processing;
572 stat.content_transfer = sub_stat.content_transfer;
573 stat.status = sub_stat.status;
574 stat.headers = sub_stat.headers;
575 stat.body = sub_stat.body;
576 }
577 Err(err) => {
578 if !err.is_h3_no_error() {
579 stat.error = Some(err.to_string());
580 }
581 }
582 }
583 if let Err(err) = drive_res {
584 if !err.is_h3_no_error() {
585 stat.error = Some(err.to_string());
586 }
587 }
588
589 stat.total = Some(start.elapsed());
590 client_endpoint.wait_idle().await;
591
592 stat
593}
594
595pub async fn request(http_req: HttpRequest) -> HttpStat {
596 ensure_crypto_provider();
597
598 if http_req.alpn_protocols.contains(&ALPN_HTTP3.to_string()) {
599 return http3_request(http_req).await;
600 }
601
602 let start = Instant::now();
603 let mut stat = HttpStat::default();
604
605 let dns_result = dns_resolve(&http_req, &mut stat).await;
607 let (addr, host) = match dns_result {
608 Ok(result) => result,
609 Err(e) => {
610 return finish_with_error(stat, e, start);
611 }
612 };
613
614 let uri = &http_req.uri;
615 let is_https = uri.scheme() == Some(&http::uri::Scheme::HTTPS);
616
617 let req: Request<Full<Bytes>> = match (&http_req).try_into() {
618 Ok(req) => req,
619 Err(e) => {
620 return finish_with_error(stat, e, start);
621 }
622 };
623
624 let tcp_stream = match tcp_connect(addr, &mut stat).await {
626 Ok(stream) => stream,
627 Err(e) => {
628 return finish_with_error(stat, e, start);
629 }
630 };
631
632 let (tx, mut rx) = oneshot::channel();
634
635 let resp = if is_https {
637 let tls_result = tls_handshake(
639 host.clone(),
640 tcp_stream,
641 http_req.alpn_protocols,
642 http_req.skip_verify,
643 &mut stat,
644 )
645 .await;
646 let (tls_stream, is_http2) = match tls_result {
647 Ok(result) => result,
648 Err(e) => {
649 return finish_with_error(stat, e, start);
650 }
651 };
652
653 if is_http2 {
655 match send_https2_request(req, tls_stream, tx, &mut stat).await {
656 Ok(resp) => resp,
657 Err(e) => {
658 return finish_with_error(stat, e, start);
659 }
660 }
661 } else {
662 match send_https_request(req, tls_stream, tx, &mut stat).await {
663 Ok(resp) => resp,
664 Err(e) => {
665 return finish_with_error(stat, e, start);
666 }
667 }
668 }
669 } else {
670 match send_http_request(req, tcp_stream, tx, &mut stat).await {
672 Ok(resp) => resp,
673 Err(e) => {
674 return finish_with_error(stat, e, start);
675 }
676 }
677 };
678
679 stat.status = Some(resp.status());
680 stat.headers = Some(resp.headers().clone());
681
682 let content_transfer_start = Instant::now();
684 let body_result = resp.collect().await;
685 let body = match body_result {
686 Ok(body) => body,
687 Err(e) => {
688 return finish_with_error(stat, format!("Failed to read response body: {}", e), start);
689 }
690 };
691
692 let body_bytes = body.to_bytes();
693 if let Some(output) = http_req.output {
694 match fs::write(output, body_bytes).await {
695 Ok(_) => {}
696 Err(e) => {
697 return finish_with_error(
698 stat,
699 format!("Failed to write response body to file: {}", e),
700 start,
701 );
702 }
703 }
704 } else {
705 stat.body = Some(body_bytes);
706 }
707 stat.content_transfer = Some(content_transfer_start.elapsed());
708
709 if let Ok(error) = rx.try_recv() {
711 stat.error = Some(error);
712 }
713
714 stat.total = Some(start.elapsed());
715 stat
716}