http_stat/
request.rs

1// Copyright 2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// Copyright 2025 Tree xie.
16//
17// Licensed under the Apache License, Version 2.0 (the "License");
18// you may not use this file except in compliance with the License.
19// You may obtain a copy of the License at
20//
21// http://www.apache.org/licenses/LICENSE-2.0
22//
23// Unless required by applicable law or agreed to in writing, software
24// distributed under the License is distributed on an "AS IS" BASIS,
25// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
26// See the License for the specific language governing permissions and
27// limitations under the License.
28
29use super::error::{Error, Result};
30use super::stats::{HttpStat, ALPN_HTTP1, ALPN_HTTP2, ALPN_HTTP3};
31use super::SkipVerifier;
32use bytes::{Buf, Bytes, BytesMut};
33use futures::future;
34use hickory_resolver::config::LookupIpStrategy;
35use hickory_resolver::name_server::TokioConnectionProvider;
36use hickory_resolver::TokioResolver;
37use http::request::Builder;
38use http::HeaderValue;
39use http::Request;
40use http::Response;
41use http::Uri;
42use http::{HeaderMap, Method};
43use http_body_util::{BodyExt, Full};
44use hyper::body::Incoming;
45use hyper_util::rt::TokioExecutor;
46use hyper_util::rt::TokioIo;
47use std::collections::HashMap;
48use std::net::IpAddr;
49use std::net::SocketAddr;
50use std::sync::Arc;
51use std::sync::Once;
52use std::time::Duration;
53use std::time::Instant;
54use tokio::fs;
55use tokio::net::TcpStream;
56use tokio::sync::oneshot;
57use tokio::time::timeout;
58use tokio_rustls::client::TlsStream;
59use tokio_rustls::rustls::{ClientConfig, RootCertStore};
60use tokio_rustls::TlsConnector;
61
62const VERSION: &str = env!("CARGO_PKG_VERSION");
63
64fn format_tls_protocol(protocol: &str) -> String {
65    match protocol {
66        "TLSv1_3" => "tls v1.3".to_string(),
67        "TLSv1_2" => "tls v1.2".to_string(),
68        "TLSv1_1" => "tls v1.1".to_string(),
69        _ => protocol.to_string(),
70    }
71}
72
73#[derive(Default, Debug, Clone)]
74pub struct HttpRequest {
75    pub uri: Uri,
76    pub method: Option<Method>,
77    pub alpn_protocols: Vec<String>,
78    pub resolves: Option<HashMap<String, IpAddr>>,
79    pub headers: Option<HeaderMap<HeaderValue>>,
80    pub ip_version: Option<i32>, // 4 for IPv4, 6 for IPv6
81    pub skip_verify: bool,
82    pub output: Option<String>,
83    pub body: Option<Bytes>,
84}
85
86impl HttpRequest {
87    fn builder(&self) -> Builder {
88        let uri = &self.uri;
89        let mut builder = Request::builder()
90            .uri(uri)
91            .method(self.method.clone().unwrap_or(Method::GET));
92        let mut set_host = false;
93        let mut set_user_agent = false;
94        if let Some(headers) = &self.headers {
95            for (key, value) in headers.iter() {
96                builder = builder.header(key, value);
97                match key.to_string().to_lowercase().as_str() {
98                    "host" => set_host = true,
99                    "user-agent" => set_user_agent = true,
100                    _ => {}
101                }
102            }
103        }
104        if !set_host {
105            if let Some(host) = uri.host() {
106                builder = builder.header("Host", host);
107            }
108        }
109        if !set_user_agent {
110            builder = builder.header("User-Agent", format!("httpstat.rs/{}", VERSION));
111        }
112        builder
113    }
114}
115
116impl TryFrom<&str> for HttpRequest {
117    type Error = Error;
118
119    fn try_from(url: &str) -> Result<Self> {
120        let uri = url.parse::<Uri>().map_err(|e| Error::Uri { source: e })?;
121        Ok(Self {
122            uri,
123            method: None,
124            alpn_protocols: vec![ALPN_HTTP2.to_string(), ALPN_HTTP1.to_string()],
125            resolves: None,
126            headers: None,
127            ip_version: None,
128            skip_verify: false,
129            output: None,
130            body: None,
131        })
132    }
133}
134
135impl TryFrom<&HttpRequest> for Request<Full<Bytes>> {
136    type Error = Error;
137    fn try_from(req: &HttpRequest) -> Result<Self> {
138        req.builder()
139            .body(Full::new(req.body.clone().unwrap_or_default()))
140            .map_err(|e| Error::Http { source: e })
141    }
142}
143
144static INIT: Once = Once::new();
145
146fn ensure_crypto_provider() {
147    INIT.call_once(|| {
148        let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
149    });
150}
151
152async fn dns_resolve(req: &HttpRequest, stat: &mut HttpStat) -> Result<(SocketAddr, String)> {
153    let host = req
154        .uri
155        .host()
156        .ok_or(Error::Common {
157            category: "http".to_string(),
158            message: "host is required".to_string(),
159        })?
160        .to_string();
161    let default_port = if req.uri.scheme() == Some(&http::uri::Scheme::HTTPS) {
162        443
163    } else {
164        80
165    };
166    let port = req.uri.port_u16().unwrap_or(default_port);
167
168    // Check if we have a resolve entry for this host:port
169    if let Some(resolves) = &req.resolves {
170        let host_port = format!("{}:{}", host, port);
171        if let Some(ip) = resolves.get(&host_port) {
172            let addr = SocketAddr::new(*ip, port);
173            stat.addr = Some(addr.to_string());
174            return Ok((addr, host));
175        }
176    }
177
178    let provider = TokioConnectionProvider::default();
179    let mut builder = TokioResolver::builder(provider).map_err(|e| Error::Resolve { source: e })?;
180    if let Some(ip_version) = req.ip_version {
181        match ip_version {
182            4 => builder.options_mut().ip_strategy = LookupIpStrategy::Ipv4Only,
183            6 => builder.options_mut().ip_strategy = LookupIpStrategy::Ipv6Only,
184            _ => {}
185        }
186    }
187
188    let resolver = builder.build();
189    let dns_start = Instant::now();
190    let addr = resolver
191        .lookup_ip(&host)
192        .await
193        .map_err(|e| Error::Resolve { source: e })?;
194    stat.dns_lookup = Some(dns_start.elapsed());
195    let addr = addr.into_iter().next().ok_or(Error::Common {
196        category: "http".to_string(),
197        message: "dns lookup failed".to_string(),
198    })?;
199    let addr = SocketAddr::new(addr, port);
200    stat.addr = Some(addr.to_string());
201
202    Ok((addr, host))
203}
204
205async fn tcp_connect(addr: SocketAddr, stat: &mut HttpStat) -> Result<TcpStream> {
206    let tcp_start = Instant::now();
207    let tcp_stream = timeout(Duration::from_secs(10), TcpStream::connect(addr))
208        .await
209        .map_err(|e| Error::Timeout { source: e })?
210        .map_err(|e| Error::Io { source: e })?;
211    stat.tcp_connect = Some(tcp_start.elapsed());
212    Ok(tcp_stream)
213}
214
215async fn tls_handshake(
216    host: String,
217    tcp_stream: TcpStream,
218    alpn_protocols: Vec<String>,
219    skip_verify: bool,
220    stat: &mut HttpStat,
221) -> Result<(TlsStream<TcpStream>, bool)> {
222    let tls_start = Instant::now();
223    let mut root_store = RootCertStore::empty();
224    let certs = rustls_native_certs::load_native_certs().certs;
225
226    for cert in certs {
227        root_store
228            .add(cert)
229            .map_err(|e| Error::Rustls { source: e })?;
230    }
231    let mut config = ClientConfig::builder()
232        .with_root_certificates(root_store)
233        .with_no_client_auth();
234
235    // Skip certificate verification if requested
236    if skip_verify {
237        config
238            .dangerous()
239            .set_certificate_verifier(Arc::new(SkipVerifier));
240    }
241
242    config.alpn_protocols = alpn_protocols
243        .iter()
244        .map(|s| s.as_bytes().to_vec())
245        .collect();
246
247    let connector = TlsConnector::from(Arc::new(config));
248
249    // Perform TLS handshake
250    let tls_stream = timeout(
251        Duration::from_secs(30),
252        connector.connect(
253            host.clone()
254                .try_into()
255                .map_err(|e| Error::InvalidDnsName { source: e })?,
256            tcp_stream,
257        ),
258    )
259    .await
260    .map_err(|e| Error::Timeout { source: e })?
261    .map_err(|e| Error::Io { source: e })?;
262    stat.tls_handshake = Some(tls_start.elapsed());
263
264    let (_, session) = tls_stream.get_ref();
265
266    stat.tls = session
267        .protocol_version()
268        .map(|v| format_tls_protocol(v.as_str().unwrap_or_default()));
269
270    if let Some(certs) = session.peer_certificates() {
271        if let Some(cert) = certs.first() {
272            if let Ok((_, cert)) = x509_parser::parse_x509_certificate(cert.as_ref()) {
273                stat.cert_not_before = Some(cert.validity().not_before.to_string());
274                stat.cert_not_after = Some(cert.validity().not_after.to_string());
275                if let Ok(Some(sans)) = cert.subject_alternative_name() {
276                    let mut domains = Vec::new();
277                    for san in sans.value.general_names.iter() {
278                        if let x509_parser::extensions::GeneralName::DNSName(domain) = san {
279                            domains.push(domain.to_string());
280                        }
281                    }
282                    stat.cert_domains = Some(domains);
283                };
284            }
285        }
286    }
287    if let Some(cipher) = session.negotiated_cipher_suite() {
288        let cipher = format!("{:?}", cipher);
289        if let Some((_, cipher)) = cipher.split_once("_") {
290            stat.cert_cipher = Some(cipher.to_string());
291        } else {
292            stat.cert_cipher = Some(cipher);
293        }
294    }
295    let mut is_http2 = false;
296    if let Some(protocol) = session.alpn_protocol() {
297        let alpn = String::from_utf8_lossy(protocol).to_string();
298        is_http2 = alpn == ALPN_HTTP2;
299        stat.alpn = Some(alpn);
300    }
301    Ok((tls_stream, is_http2))
302}
303
304async fn send_http_request(
305    req: Request<Full<Bytes>>,
306    tcp_stream: TcpStream,
307    tx: oneshot::Sender<String>,
308    stat: &mut HttpStat,
309) -> Result<Response<Incoming>> {
310    let (mut sender, conn) = timeout(
311        Duration::from_secs(30),
312        hyper::client::conn::http1::handshake(TokioIo::new(tcp_stream)),
313    )
314    .await
315    .map_err(|e| Error::Timeout { source: e })?
316    .map_err(|e| Error::Hyper { source: e })?;
317    // Spawn the connection task
318    tokio::spawn(async move {
319        if let Err(e) = conn.await {
320            let _ = tx.send(e.to_string());
321        }
322    });
323
324    let server_processing_start = Instant::now();
325    let resp = sender
326        .send_request(req)
327        .await
328        .map_err(|e| Error::Hyper { source: e })?;
329    stat.server_processing = Some(server_processing_start.elapsed());
330    Ok(resp)
331}
332
333async fn send_https_request(
334    req: Request<Full<Bytes>>,
335    tls_stream: TlsStream<TcpStream>,
336    tx: oneshot::Sender<String>,
337    stat: &mut HttpStat,
338) -> Result<Response<Incoming>> {
339    let (mut sender, conn) = timeout(
340        Duration::from_secs(30),
341        hyper::client::conn::http1::handshake(TokioIo::new(tls_stream)),
342    )
343    .await
344    .map_err(|e| Error::Timeout { source: e })?
345    .map_err(|e| Error::Hyper { source: e })?;
346    // Spawn the connection task
347    tokio::spawn(async move {
348        if let Err(e) = conn.await {
349            let _ = tx.send(e.to_string());
350        }
351    });
352
353    let server_processing_start = Instant::now();
354    let resp = sender
355        .send_request(req)
356        .await
357        .map_err(|e| Error::Hyper { source: e })?;
358    stat.server_processing = Some(server_processing_start.elapsed());
359    Ok(resp)
360}
361
362async fn send_https2_request(
363    req: Request<Full<Bytes>>,
364    tls_stream: TlsStream<TcpStream>,
365    tx: oneshot::Sender<String>,
366    stat: &mut HttpStat,
367) -> Result<Response<Incoming>> {
368    let (mut sender, conn) = timeout(
369        Duration::from_secs(30),
370        hyper::client::conn::http2::handshake(TokioExecutor::new(), TokioIo::new(tls_stream)),
371    )
372    .await
373    .map_err(|e| Error::Timeout { source: e })?
374    .map_err(|e| Error::Hyper { source: e })?;
375    // Spawn the connection task
376    tokio::spawn(async move {
377        if let Err(e) = conn.await {
378            let _ = tx.send(e.to_string());
379        }
380    });
381
382    let mut req = req;
383    *req.version_mut() = hyper::Version::HTTP_2;
384    // Remove Host header for HTTP/2 as it's replaced by :authority
385    req.headers_mut().remove("Host");
386
387    let server_processing_start = Instant::now();
388    let resp = sender
389        .send_request(req)
390        .await
391        .map_err(|e| Error::Hyper { source: e })?;
392    stat.server_processing = Some(server_processing_start.elapsed());
393    Ok(resp)
394}
395
396fn finish_with_error(mut stat: HttpStat, error: impl ToString, start: Instant) -> HttpStat {
397    stat.error = Some(error.to_string());
398    stat.total = Some(start.elapsed());
399    stat
400}
401
402async fn quic_connect(
403    host: String,
404    addr: SocketAddr,
405    skip_verify: bool,
406    stat: &mut HttpStat,
407) -> Result<(quinn::Endpoint, quinn::Connection)> {
408    let quic_start = Instant::now();
409    let mut root_store = RootCertStore::empty();
410    let certs = rustls_native_certs::load_native_certs().certs;
411
412    for cert in certs {
413        root_store
414            .add(cert)
415            .map_err(|e| Error::Rustls { source: e })?;
416    }
417    let mut config = ClientConfig::builder()
418        .with_root_certificates(root_store)
419        .with_no_client_auth();
420    config.enable_early_data = true;
421    config.alpn_protocols = vec![ALPN_HTTP3.as_bytes().to_vec()];
422    // Skip certificate verification if requested
423    if skip_verify {
424        config
425            .dangerous()
426            .set_certificate_verifier(Arc::new(SkipVerifier));
427    }
428
429    let mut client_endpoint =
430        h3_quinn::quinn::Endpoint::client("[::]:0".parse().map_err(|_| Error::Common {
431            category: "parse".to_string(),
432            message: "failed to parse address".to_string(),
433        })?)
434        .map_err(|e| Error::Io { source: e })?;
435
436    let h3_config =
437        quinn::crypto::rustls::QuicClientConfig::try_from(config).map_err(|e| Error::Common {
438            category: "quic".to_string(),
439            message: e.to_string(),
440        })?;
441
442    let client_config = quinn::ClientConfig::new(Arc::new(h3_config));
443    client_endpoint.set_default_client_config(client_config);
444
445    let conn = client_endpoint
446        .connect(addr, &host)
447        .map_err(|e| Error::QuicConnect { source: e })?
448        .await
449        .map_err(|e| Error::QuicConnection { source: e })?;
450
451    stat.quic_connect = Some(quic_start.elapsed());
452    Ok((client_endpoint, conn))
453}
454
455async fn http3_request(http_req: HttpRequest) -> HttpStat {
456    let start = Instant::now();
457    let mut stat = HttpStat {
458        alpn: Some(ALPN_HTTP3.to_string()),
459        ..Default::default()
460    };
461
462    // DNS resolution
463    let dns_result = dns_resolve(&http_req, &mut stat).await;
464    let (addr, host) = match dns_result {
465        Ok(result) => result,
466        Err(e) => {
467            return finish_with_error(stat, e, start);
468        }
469    };
470
471    let (client_endpoint, conn) =
472        match quic_connect(host, addr, http_req.skip_verify, &mut stat).await {
473            Ok(result) => result,
474            Err(e) => {
475                return finish_with_error(stat, e, start);
476            }
477        };
478
479    // Get TLS information from the connection
480    stat.tls = Some("tls 1.3".to_string()); // QUIC always uses TLS 1.3
481    stat.alpn = Some(ALPN_HTTP3.to_string()); // We always use HTTP/3 for QUIC
482
483    // Get certificate information from the connection
484    if let Some(peer_identity) = conn.peer_identity() {
485        if let Ok(certs) = peer_identity.downcast::<Vec<rustls::pki_types::CertificateDer>>() {
486            if let Some(cert) = certs.first() {
487                if let Ok((_, cert)) = x509_parser::parse_x509_certificate(cert.as_ref()) {
488                    let oid_str = match cert.signature_algorithm.algorithm.to_string().as_str() {
489                        "1.2.840.113549.1.1.11" => "AES_256_GCM_SHA384".to_string(),
490                        "1.2.840.113549.1.1.12" => "AES_128_GCM_SHA256".to_string(),
491                        "1.2.840.113549.1.1.13" => "CHACHA20_POLY1305_SHA256".to_string(),
492                        "1.2.840.10045.4.3.2" => "AES_256_GCM_SHA384".to_string(),
493                        "1.2.840.10045.4.3.3" => "AES_128_GCM_SHA256".to_string(),
494                        "1.2.840.10045.4.3.4" => "CHACHA20_POLY1305_SHA256".to_string(),
495                        "1.3.101.112" => "AES_256_GCM_SHA384".to_string(),
496                        "1.3.101.113" => "AES_128_GCM_SHA256".to_string(),
497                        _ => format!("{:?}", cert.signature_algorithm.algorithm),
498                    };
499                    stat.cert_cipher = Some(oid_str);
500                    stat.cert_not_before = Some(cert.validity().not_before.to_string());
501                    stat.cert_not_after = Some(cert.validity().not_after.to_string());
502                    if let Ok(Some(sans)) = cert.subject_alternative_name() {
503                        let mut domains = Vec::new();
504                        for san in sans.value.general_names.iter() {
505                            if let x509_parser::extensions::GeneralName::DNSName(domain) = san {
506                                domains.push(domain.to_string());
507                            }
508                        }
509                        stat.cert_domains = Some(domains);
510                    };
511                }
512            }
513        }
514    }
515
516    let quinn_conn = h3_quinn::Connection::new(conn);
517
518    let (mut driver, mut send_request) = match h3::client::new(quinn_conn)
519        .await
520        .map_err(|e| Error::H3ConnectionError { source: e })
521    {
522        Ok(result) => result,
523        Err(e) => {
524            return finish_with_error(stat, e, start);
525        }
526    };
527
528    let req = match http_req.builder().body(()) {
529        Ok(req) => req,
530        Err(e) => {
531            return finish_with_error(stat, e, start);
532        }
533    };
534    let body = http_req.body.unwrap_or_default();
535
536    let drive = async move {
537        Err::<(), h3::error::ConnectionError>(future::poll_fn(|cx| driver.poll_close(cx)).await)
538    };
539
540    let request = async move {
541        let mut stream = send_request.send_request(req).await?;
542        stream.send_data(body).await?;
543
544        let mut sub_stat = HttpStat::default();
545
546        // finish on the sending side
547        stream.finish().await?;
548
549        let server_processing_start = Instant::now();
550
551        let resp = stream.recv_response().await?;
552        sub_stat.server_processing = Some(server_processing_start.elapsed());
553
554        sub_stat.status = Some(resp.status());
555        sub_stat.headers = Some(resp.headers().clone());
556
557        let content_transfer_start = Instant::now();
558        let mut buf = BytesMut::new();
559        while let Some(chunk) = stream.recv_data().await? {
560            buf.extend(chunk.chunk());
561        }
562        sub_stat.content_transfer = Some(content_transfer_start.elapsed());
563        sub_stat.body = Some(Bytes::from(buf));
564
565        Ok::<HttpStat, h3::error::StreamError>(sub_stat)
566    };
567
568    let (req_res, drive_res) = tokio::join!(request, drive);
569    match req_res {
570        Ok(sub_stat) => {
571            stat.server_processing = sub_stat.server_processing;
572            stat.content_transfer = sub_stat.content_transfer;
573            stat.status = sub_stat.status;
574            stat.headers = sub_stat.headers;
575            stat.body = sub_stat.body;
576        }
577        Err(err) => {
578            if !err.is_h3_no_error() {
579                stat.error = Some(err.to_string());
580            }
581        }
582    }
583    if let Err(err) = drive_res {
584        if !err.is_h3_no_error() {
585            stat.error = Some(err.to_string());
586        }
587    }
588
589    stat.total = Some(start.elapsed());
590    client_endpoint.wait_idle().await;
591
592    stat
593}
594
595pub async fn request(http_req: HttpRequest) -> HttpStat {
596    ensure_crypto_provider();
597
598    if http_req.alpn_protocols.contains(&ALPN_HTTP3.to_string()) {
599        return http3_request(http_req).await;
600    }
601
602    let start = Instant::now();
603    let mut stat = HttpStat::default();
604
605    // DNS resolution
606    let dns_result = dns_resolve(&http_req, &mut stat).await;
607    let (addr, host) = match dns_result {
608        Ok(result) => result,
609        Err(e) => {
610            return finish_with_error(stat, e, start);
611        }
612    };
613
614    let uri = &http_req.uri;
615    let is_https = uri.scheme() == Some(&http::uri::Scheme::HTTPS);
616
617    let req: Request<Full<Bytes>> = match (&http_req).try_into() {
618        Ok(req) => req,
619        Err(e) => {
620            return finish_with_error(stat, e, start);
621        }
622    };
623
624    // TCP connection
625    let tcp_stream = match tcp_connect(addr, &mut stat).await {
626        Ok(stream) => stream,
627        Err(e) => {
628            return finish_with_error(stat, e, start);
629        }
630    };
631
632    // Create a channel to receive connection errors
633    let (tx, mut rx) = oneshot::channel();
634
635    // Send the request based on protocol
636    let resp = if is_https {
637        // TLS handshake
638        let tls_result = tls_handshake(
639            host.clone(),
640            tcp_stream,
641            http_req.alpn_protocols,
642            http_req.skip_verify,
643            &mut stat,
644        )
645        .await;
646        let (tls_stream, is_http2) = match tls_result {
647            Ok(result) => result,
648            Err(e) => {
649                return finish_with_error(stat, e, start);
650            }
651        };
652
653        // Send HTTPS request
654        if is_http2 {
655            match send_https2_request(req, tls_stream, tx, &mut stat).await {
656                Ok(resp) => resp,
657                Err(e) => {
658                    return finish_with_error(stat, e, start);
659                }
660            }
661        } else {
662            match send_https_request(req, tls_stream, tx, &mut stat).await {
663                Ok(resp) => resp,
664                Err(e) => {
665                    return finish_with_error(stat, e, start);
666                }
667            }
668        }
669    } else {
670        // Send HTTP request
671        match send_http_request(req, tcp_stream, tx, &mut stat).await {
672            Ok(resp) => resp,
673            Err(e) => {
674                return finish_with_error(stat, e, start);
675            }
676        }
677    };
678
679    stat.status = Some(resp.status());
680    stat.headers = Some(resp.headers().clone());
681
682    // Read the response body
683    let content_transfer_start = Instant::now();
684    let body_result = resp.collect().await;
685    let body = match body_result {
686        Ok(body) => body,
687        Err(e) => {
688            return finish_with_error(stat, format!("Failed to read response body: {}", e), start);
689        }
690    };
691
692    let body_bytes = body.to_bytes();
693    if let Some(output) = http_req.output {
694        match fs::write(output, body_bytes).await {
695            Ok(_) => {}
696            Err(e) => {
697                return finish_with_error(
698                    stat,
699                    format!("Failed to write response body to file: {}", e),
700                    start,
701                );
702            }
703        }
704    } else {
705        stat.body = Some(body_bytes);
706    }
707    stat.content_transfer = Some(content_transfer_start.elapsed());
708
709    // Check for connection errors
710    if let Ok(error) = rx.try_recv() {
711        stat.error = Some(error);
712    }
713
714    stat.total = Some(start.elapsed());
715    stat
716}