http_stat/
request.rs

1// Copyright 2025 Tree xie.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15// This file implements HTTP request functionality with support for HTTP/1.1, HTTP/2, and HTTP/3
16// It includes features like DNS resolution, TLS handshake, and request/response handling
17
18use super::build_http_request;
19use super::decompress::decompress;
20use super::error::{Error, Result};
21use super::finish_with_error;
22use super::grpc::grpc_request;
23use super::net::{dns_resolve, quic_connect, tcp_connect, tls_handshake};
24use super::stats::{Certificate, HttpStat, ALPN_HTTP3};
25use super::HttpRequest;
26use bytes::{Buf, Bytes, BytesMut};
27use chrono::{Local, TimeZone};
28use futures::future;
29
30use http::Request;
31use http::Response;
32use http::Version;
33use http_body_util::{BodyExt, Full};
34use hyper::body::Incoming;
35use hyper_util::rt::TokioExecutor;
36use hyper_util::rt::TokioIo;
37use std::sync::Once;
38use std::time::Duration;
39use std::time::Instant;
40use tokio::net::TcpStream;
41use tokio::sync::oneshot;
42use tokio::time::timeout;
43use tokio_rustls::client::TlsStream;
44
45// Format timestamp to human-readable string
46fn format_time(timestamp_seconds: i64) -> String {
47    Local
48        .timestamp_nanos(timestamp_seconds * 1_000_000_000)
49        .to_string()
50}
51
52// Initialize crypto provider once
53static INIT: Once = Once::new();
54
55fn ensure_crypto_provider() {
56    INIT.call_once(|| {
57        let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
58    });
59}
60
61// Send HTTP/1.1 request
62async fn send_http_request(
63    req: Request<Full<Bytes>>,
64    tcp_stream: TcpStream,
65    request_timeout: Option<Duration>,
66    tx: oneshot::Sender<String>,
67    stat: &mut HttpStat,
68) -> Result<Response<Incoming>> {
69    let (mut sender, conn) = timeout(
70        request_timeout.unwrap_or(Duration::from_secs(30)),
71        hyper::client::conn::http1::handshake(TokioIo::new(tcp_stream)),
72    )
73    .await
74    .map_err(|e| Error::Timeout { source: e })?
75    .map_err(|e| Error::Hyper { source: e })?;
76
77    // Spawn connection task
78    tokio::spawn(async move {
79        if let Err(e) = conn.await {
80            let _ = tx.send(e.to_string());
81        }
82    });
83
84    let server_processing_start = Instant::now();
85    let resp = sender
86        .send_request(req)
87        .await
88        .map_err(|e| Error::Hyper { source: e })?;
89    stat.server_processing = Some(server_processing_start.elapsed());
90    Ok(resp)
91}
92
93// Send HTTPS request
94async fn send_https_request(
95    req: Request<Full<Bytes>>,
96    tls_stream: TlsStream<TcpStream>,
97    request_timeout: Option<Duration>,
98    tx: oneshot::Sender<String>,
99    stat: &mut HttpStat,
100) -> Result<Response<Incoming>> {
101    let (mut sender, conn) = timeout(
102        request_timeout.unwrap_or(Duration::from_secs(30)),
103        hyper::client::conn::http1::handshake(TokioIo::new(tls_stream)),
104    )
105    .await
106    .map_err(|e| Error::Timeout { source: e })?
107    .map_err(|e| Error::Hyper { source: e })?;
108
109    // Spawn connection task
110    tokio::spawn(async move {
111        if let Err(e) = conn.await {
112            let _ = tx.send(e.to_string());
113        }
114    });
115
116    let server_processing_start = Instant::now();
117    let resp = sender
118        .send_request(req)
119        .await
120        .map_err(|e| Error::Hyper { source: e })?;
121    stat.server_processing = Some(server_processing_start.elapsed());
122    Ok(resp)
123}
124
125// Send HTTP/2 request
126async fn send_https2_request(
127    req: Request<Full<Bytes>>,
128    tls_stream: TlsStream<TcpStream>,
129    tx: oneshot::Sender<String>,
130    stat: &mut HttpStat,
131) -> Result<Response<Incoming>> {
132    let (mut sender, conn) = timeout(
133        Duration::from_secs(30),
134        hyper::client::conn::http2::handshake(TokioExecutor::new(), TokioIo::new(tls_stream)),
135    )
136    .await
137    .map_err(|e| Error::Timeout { source: e })?
138    .map_err(|e| Error::Hyper { source: e })?;
139
140    // Spawn connection task
141    tokio::spawn(async move {
142        if let Err(e) = conn.await {
143            let _ = tx.send(e.to_string());
144        }
145    });
146
147    let mut req = req;
148    *req.version_mut() = hyper::Version::HTTP_2;
149    // Remove Host header for HTTP/2 as it's replaced by :authority
150    req.headers_mut().remove("Host");
151
152    let server_processing_start = Instant::now();
153    let resp = sender
154        .send_request(req)
155        .await
156        .map_err(|e| Error::Hyper { source: e })?;
157    stat.server_processing = Some(server_processing_start.elapsed());
158    Ok(resp)
159}
160
161// Handle HTTP/3 request
162async fn http3_request(http_req: HttpRequest) -> HttpStat {
163    let start = Instant::now();
164    let mut stat = HttpStat {
165        alpn: Some(ALPN_HTTP3.to_string()),
166        ..Default::default()
167    };
168
169    // DNS resolution
170    let dns_result = dns_resolve(&http_req, &mut stat).await;
171    let (addr, host) = match dns_result {
172        Ok(result) => result,
173        Err(e) => {
174            return finish_with_error(stat, e, start);
175        }
176    };
177
178    // Establish QUIC connection
179    let (client_endpoint, conn) = match timeout(
180        http_req.quic_timeout.unwrap_or(Duration::from_secs(30)),
181        quic_connect(host, addr, http_req.skip_verify, &mut stat),
182    )
183    .await
184    {
185        Ok(Ok(result)) => result,
186        Ok(Err(e)) => {
187            return finish_with_error(stat, e, start);
188        }
189        Err(e) => {
190            return finish_with_error(stat, e, start);
191        }
192    };
193
194    // Set TLS information
195    stat.tls = Some("tls 1.3".to_string()); // QUIC always uses TLS 1.3
196    stat.alpn = Some(ALPN_HTTP3.to_string()); // We always use HTTP/3 for QUIC
197
198    // Extract certificate information
199    let mut certificates = vec![];
200    if let Some(peer_identity) = conn.peer_identity() {
201        if let Ok(certs) = peer_identity.downcast::<Vec<rustls::pki_types::CertificateDer>>() {
202            for (index, cert) in certs.iter().enumerate() {
203                if let Ok((_, cert)) = x509_parser::parse_x509_certificate(cert.as_ref()) {
204                    let oid_str = match cert.signature_algorithm.algorithm.to_string().as_str() {
205                        "1.2.840.113549.1.1.11" => "AES_256_GCM_SHA384".to_string(),
206                        "1.2.840.113549.1.1.12" => "AES_128_GCM_SHA256".to_string(),
207                        "1.2.840.113549.1.1.13" => "CHACHA20_POLY1305_SHA256".to_string(),
208                        "1.2.840.10045.4.3.2" => "AES_256_GCM_SHA384".to_string(),
209                        "1.2.840.10045.4.3.3" => "AES_128_GCM_SHA256".to_string(),
210                        "1.2.840.10045.4.3.4" => "CHACHA20_POLY1305_SHA256".to_string(),
211                        "1.3.101.112" => "AES_256_GCM_SHA384".to_string(),
212                        "1.3.101.113" => "AES_128_GCM_SHA256".to_string(),
213                        _ => format!("{:?}", cert.signature_algorithm.algorithm),
214                    };
215                    let subject = cert.subject().to_string();
216                    let issuer = cert.issuer().to_string();
217                    let not_before = format_time(cert.validity().not_before.timestamp());
218                    let not_after = format_time(cert.validity().not_after.timestamp());
219                    if index == 0 {
220                        stat.cert_cipher = Some(oid_str);
221                        stat.cert_not_before = Some(not_before);
222                        stat.cert_not_after = Some(not_after);
223                        stat.subject = Some(subject);
224                        stat.issuer = Some(issuer);
225                        if let Ok(Some(sans)) = cert.subject_alternative_name() {
226                            let mut domains = vec![];
227                            for san in sans.value.general_names.iter() {
228                                if let x509_parser::extensions::GeneralName::DNSName(domain) = san {
229                                    domains.push(domain.to_string());
230                                }
231                            }
232                            stat.cert_domains = Some(domains);
233                        };
234                        continue;
235                    }
236                    certificates.push(Certificate {
237                        subject,
238                        issuer,
239                        not_before,
240                        not_after,
241                    });
242                }
243            }
244        }
245    }
246    if !certificates.is_empty() {
247        stat.certificates = Some(certificates);
248    }
249
250    // Create HTTP/3 connection
251    let quinn_conn = h3_quinn::Connection::new(conn);
252
253    let (mut driver, mut send_request) = match timeout(
254        http_req.request_timeout.unwrap_or(Duration::from_secs(30)),
255        h3::client::new(quinn_conn),
256    )
257    .await
258    {
259        Ok(Ok(result)) => result,
260        Ok(Err(e)) => {
261            return finish_with_error(stat, e, start);
262        }
263        Err(e) => {
264            return finish_with_error(stat, e, start);
265        }
266    };
267
268    // Prepare request
269    let mut req = match http_req.builder(false).body(()) {
270        Ok(req) => req,
271        Err(e) => {
272            return finish_with_error(stat, e, start);
273        }
274    };
275    *req.version_mut() = Version::HTTP_3;
276    stat.request_headers = req.headers().clone();
277    let body = http_req.body.unwrap_or_default();
278
279    // Handle connection driver
280    let drive = async move {
281        Err::<(), h3::error::ConnectionError>(future::poll_fn(|cx| driver.poll_close(cx)).await)
282    };
283
284    // Send request and handle response
285    let request = async move {
286        let mut stream = send_request.send_request(req).await?;
287        stream.send_data(body).await?;
288
289        let mut sub_stat = HttpStat::default();
290
291        // Finish sending
292        stream.finish().await?;
293
294        let server_processing_start = Instant::now();
295
296        let resp = stream.recv_response().await?;
297        sub_stat.server_processing = Some(server_processing_start.elapsed());
298
299        sub_stat.status = Some(resp.status());
300        sub_stat.headers = Some(resp.headers().clone());
301
302        // Receive response body
303        let content_transfer_start = Instant::now();
304        let mut buf = BytesMut::new();
305        while let Some(chunk) = stream.recv_data().await? {
306            buf.extend(chunk.chunk());
307        }
308        sub_stat.content_transfer = Some(content_transfer_start.elapsed());
309        sub_stat.body = Some(Bytes::from(buf));
310        Ok::<HttpStat, h3::error::StreamError>(sub_stat)
311    };
312
313    // Execute request and handle results
314    let (req_res, drive_res) = tokio::join!(request, drive);
315    match req_res {
316        Ok(sub_stat) => {
317            stat.server_processing = sub_stat.server_processing;
318            stat.content_transfer = sub_stat.content_transfer;
319            stat.status = sub_stat.status;
320            stat.headers = sub_stat.headers;
321            stat.body = sub_stat.body;
322        }
323        Err(err) => {
324            if !err.is_h3_no_error() {
325                stat.error = Some(err.to_string());
326            }
327        }
328    }
329    if let Err(err) = drive_res {
330        if !err.is_h3_no_error() {
331            stat.error = Some(err.to_string());
332        }
333    }
334
335    stat.total = Some(start.elapsed());
336    // Close the connection immediately instead of waiting for idle
337    client_endpoint.close(0u32.into(), b"done");
338
339    stat
340}
341
342async fn http1_2_request(http_req: HttpRequest) -> HttpStat {
343    let start = Instant::now();
344    let mut stat = HttpStat::default();
345
346    // DNS resolution
347    let dns_result = dns_resolve(&http_req, &mut stat).await;
348    let (addr, host) = match dns_result {
349        Ok(result) => result,
350        Err(e) => {
351            return finish_with_error(stat, e, start);
352        }
353    };
354
355    let uri = &http_req.uri;
356    let is_https = uri.scheme() == Some(&http::uri::Scheme::HTTPS);
357
358    // TCP connection
359    let tcp_stream = match tcp_connect(addr, http_req.tcp_timeout, &mut stat).await {
360        Ok(stream) => stream,
361        Err(e) => {
362            return finish_with_error(stat, e, start);
363        }
364    };
365
366    // Create channel for connection errors
367    let (tx, mut rx) = oneshot::channel();
368
369    // Send request based on protocol
370    let resp = if is_https {
371        // TLS handshake
372        let tls_result = tls_handshake(
373            host.clone(),
374            tcp_stream,
375            http_req.tls_timeout,
376            http_req.alpn_protocols.clone(),
377            http_req.skip_verify,
378            &mut stat,
379        )
380        .await;
381        let (tls_stream, is_http2) = match tls_result {
382            Ok(result) => result,
383            Err(e) => {
384                return finish_with_error(stat, e, start);
385            }
386        };
387
388        // Send HTTPS request
389        if is_http2 {
390            let req = match build_http_request(&http_req, false) {
391                Ok(req) => req,
392                Err(e) => {
393                    return finish_with_error(stat, e, start);
394                }
395            };
396            stat.request_headers = req.headers().clone();
397            match send_https2_request(req, tls_stream, tx, &mut stat).await {
398                Ok(resp) => resp,
399                Err(e) => {
400                    return finish_with_error(stat, e, start);
401                }
402            }
403        } else {
404            let req = match build_http_request(&http_req, true) {
405                Ok(req) => req,
406                Err(e) => {
407                    return finish_with_error(stat, e, start);
408                }
409            };
410            stat.request_headers = req.headers().clone();
411            match send_https_request(req, tls_stream, http_req.request_timeout, tx, &mut stat).await
412            {
413                Ok(resp) => resp,
414                Err(e) => {
415                    return finish_with_error(stat, e, start);
416                }
417            }
418        }
419    } else {
420        let req = match build_http_request(&http_req, true) {
421            Ok(req) => req,
422            Err(e) => {
423                return finish_with_error(stat, e, start);
424            }
425        };
426        stat.request_headers = req.headers().clone();
427        // Send HTTP request
428        match send_http_request(req, tcp_stream, http_req.request_timeout, tx, &mut stat).await {
429            Ok(resp) => resp,
430            Err(e) => {
431                return finish_with_error(stat, e, start);
432            }
433        }
434    };
435
436    // Process response
437    stat.status = Some(resp.status());
438    stat.headers = Some(resp.headers().clone());
439
440    // Check for connection errors
441    if let Ok(error) = rx.try_recv() {
442        stat.error = Some(error);
443    }
444    // Read response body
445    let content_transfer_start = Instant::now();
446    let body_result = resp.collect().await;
447    let body = match body_result {
448        Ok(body) => body,
449        Err(e) => {
450            return finish_with_error(stat, format!("Failed to read response body: {e}"), start);
451        }
452    };
453
454    let body_bytes = body.to_bytes();
455    stat.body = Some(body_bytes);
456    stat.content_transfer = Some(content_transfer_start.elapsed());
457
458    stat.total = Some(start.elapsed());
459    stat
460}
461
462/// Performs an HTTP request and returns detailed statistics about the request lifecycle.
463///
464/// This function handles HTTP/1.1, HTTP/2, and HTTP/3 requests with the following features:
465/// - Automatic protocol selection based on ALPN negotiation
466/// - DNS resolution with support for custom IP mappings
467/// - TLS handshake with certificate verification
468/// - Response body handling with optional file output
469/// - Detailed timing statistics for each phase of the request
470///
471/// # Arguments
472///
473/// * `http_req` - An `HttpRequest` struct containing the request configuration including:
474///   - URI and HTTP method
475///   - ALPN protocols to negotiate
476///   - Custom DNS resolutions
477///   - Headers and request body
478///   - TLS verification settings
479///   - Output file path (optional)
480///
481/// # Returns
482///
483/// Returns an `HttpStat` struct containing:
484/// - DNS lookup time
485/// - QUIC connection time
486/// - TCP connection time
487/// - TLS handshake time (for HTTPS)
488/// - Server processing time
489/// - Content transfer time
490/// - Total request time
491/// - Response status and headers
492/// - Response body (if not written to file)
493/// - TLS and certificate information (for HTTPS)
494/// - Any errors that occurred during the request
495/// ```
496pub async fn request(http_req: HttpRequest) -> HttpStat {
497    ensure_crypto_provider();
498    let schema = if let Some(schema) = http_req.uri.scheme() {
499        schema.to_string()
500    } else {
501        "".to_string()
502    };
503
504    // Handle HTTP/3 request
505    let mut stat = if ["grpc", "grpcs"].contains(&schema.as_str()) {
506        grpc_request(http_req).await
507    } else if http_req.alpn_protocols.contains(&ALPN_HTTP3.to_string()) {
508        http3_request(http_req).await
509    } else {
510        http1_2_request(http_req).await
511    };
512    if let Some(body) = &stat.body {
513        stat.body_size = Some(body.len());
514    }
515    let encoding = if let Some(headers) = &stat.headers {
516        headers
517            .get("content-encoding")
518            .map(|v| v.to_str().unwrap_or_default())
519            .unwrap_or_default()
520    } else {
521        ""
522    };
523
524    if !encoding.is_empty() {
525        if let Some(body) = &stat.body {
526            match decompress(encoding, body) {
527                Ok(data) => {
528                    stat.body = Some(data);
529                }
530                Err(e) => {
531                    stat.error = Some(e.to_string());
532                }
533            }
534        }
535    }
536
537    stat
538}