1use super::build_http_request;
19use super::decompress::decompress;
20use super::error::{Error, Result};
21use super::finish_with_error;
22use super::grpc::grpc_request;
23use super::net::{dns_resolve, quic_connect, tcp_connect, tls_handshake};
24use super::stats::{Certificate, HttpStat, ALPN_HTTP3};
25use super::HttpRequest;
26use bytes::{Buf, Bytes, BytesMut};
27use chrono::{Local, TimeZone};
28use futures::future;
29
30use http::Request;
31use http::Response;
32use http::Version;
33use http_body_util::{BodyExt, Full};
34use hyper::body::Incoming;
35use hyper_util::rt::TokioExecutor;
36use hyper_util::rt::TokioIo;
37use std::sync::Once;
38use std::time::Duration;
39use std::time::Instant;
40use tokio::net::TcpStream;
41use tokio::sync::oneshot;
42use tokio::time::timeout;
43use tokio_rustls::client::TlsStream;
44
45fn format_time(timestamp_seconds: i64) -> String {
47 Local
48 .timestamp_nanos(timestamp_seconds * 1_000_000_000)
49 .to_string()
50}
51
52static INIT: Once = Once::new();
54
55fn ensure_crypto_provider() {
56 INIT.call_once(|| {
57 let _ = tokio_rustls::rustls::crypto::ring::default_provider().install_default();
58 });
59}
60
61async fn send_http_request(
63 req: Request<Full<Bytes>>,
64 tcp_stream: TcpStream,
65 request_timeout: Option<Duration>,
66 tx: oneshot::Sender<String>,
67 stat: &mut HttpStat,
68) -> Result<Response<Incoming>> {
69 let (mut sender, conn) = timeout(
70 request_timeout.unwrap_or(Duration::from_secs(30)),
71 hyper::client::conn::http1::handshake(TokioIo::new(tcp_stream)),
72 )
73 .await
74 .map_err(|e| Error::Timeout { source: e })?
75 .map_err(|e| Error::Hyper { source: e })?;
76
77 tokio::spawn(async move {
79 if let Err(e) = conn.await {
80 let _ = tx.send(e.to_string());
81 }
82 });
83
84 let server_processing_start = Instant::now();
85 let resp = sender
86 .send_request(req)
87 .await
88 .map_err(|e| Error::Hyper { source: e })?;
89 stat.server_processing = Some(server_processing_start.elapsed());
90 Ok(resp)
91}
92
93async fn send_https_request(
95 req: Request<Full<Bytes>>,
96 tls_stream: TlsStream<TcpStream>,
97 request_timeout: Option<Duration>,
98 tx: oneshot::Sender<String>,
99 stat: &mut HttpStat,
100) -> Result<Response<Incoming>> {
101 let (mut sender, conn) = timeout(
102 request_timeout.unwrap_or(Duration::from_secs(30)),
103 hyper::client::conn::http1::handshake(TokioIo::new(tls_stream)),
104 )
105 .await
106 .map_err(|e| Error::Timeout { source: e })?
107 .map_err(|e| Error::Hyper { source: e })?;
108
109 tokio::spawn(async move {
111 if let Err(e) = conn.await {
112 let _ = tx.send(e.to_string());
113 }
114 });
115
116 let server_processing_start = Instant::now();
117 let resp = sender
118 .send_request(req)
119 .await
120 .map_err(|e| Error::Hyper { source: e })?;
121 stat.server_processing = Some(server_processing_start.elapsed());
122 Ok(resp)
123}
124
125async fn send_https2_request(
127 req: Request<Full<Bytes>>,
128 tls_stream: TlsStream<TcpStream>,
129 tx: oneshot::Sender<String>,
130 stat: &mut HttpStat,
131) -> Result<Response<Incoming>> {
132 let (mut sender, conn) = timeout(
133 Duration::from_secs(30),
134 hyper::client::conn::http2::handshake(TokioExecutor::new(), TokioIo::new(tls_stream)),
135 )
136 .await
137 .map_err(|e| Error::Timeout { source: e })?
138 .map_err(|e| Error::Hyper { source: e })?;
139
140 tokio::spawn(async move {
142 if let Err(e) = conn.await {
143 let _ = tx.send(e.to_string());
144 }
145 });
146
147 let mut req = req;
148 *req.version_mut() = hyper::Version::HTTP_2;
149 req.headers_mut().remove("Host");
151
152 let server_processing_start = Instant::now();
153 let resp = sender
154 .send_request(req)
155 .await
156 .map_err(|e| Error::Hyper { source: e })?;
157 stat.server_processing = Some(server_processing_start.elapsed());
158 Ok(resp)
159}
160
161async fn http3_request(http_req: HttpRequest) -> HttpStat {
163 let start = Instant::now();
164 let mut stat = HttpStat {
165 alpn: Some(ALPN_HTTP3.to_string()),
166 ..Default::default()
167 };
168
169 let dns_result = dns_resolve(&http_req, &mut stat).await;
171 let (addr, host) = match dns_result {
172 Ok(result) => result,
173 Err(e) => {
174 return finish_with_error(stat, e, start);
175 }
176 };
177
178 let (client_endpoint, conn) = match timeout(
180 http_req.quic_timeout.unwrap_or(Duration::from_secs(30)),
181 quic_connect(host, addr, http_req.skip_verify, &mut stat),
182 )
183 .await
184 {
185 Ok(Ok(result)) => result,
186 Ok(Err(e)) => {
187 return finish_with_error(stat, e, start);
188 }
189 Err(e) => {
190 return finish_with_error(stat, e, start);
191 }
192 };
193
194 stat.tls = Some("tls 1.3".to_string()); stat.alpn = Some(ALPN_HTTP3.to_string()); let mut certificates = vec![];
200 if let Some(peer_identity) = conn.peer_identity() {
201 if let Ok(certs) = peer_identity.downcast::<Vec<rustls::pki_types::CertificateDer>>() {
202 for (index, cert) in certs.iter().enumerate() {
203 if let Ok((_, cert)) = x509_parser::parse_x509_certificate(cert.as_ref()) {
204 let oid_str = match cert.signature_algorithm.algorithm.to_string().as_str() {
205 "1.2.840.113549.1.1.11" => "AES_256_GCM_SHA384".to_string(),
206 "1.2.840.113549.1.1.12" => "AES_128_GCM_SHA256".to_string(),
207 "1.2.840.113549.1.1.13" => "CHACHA20_POLY1305_SHA256".to_string(),
208 "1.2.840.10045.4.3.2" => "AES_256_GCM_SHA384".to_string(),
209 "1.2.840.10045.4.3.3" => "AES_128_GCM_SHA256".to_string(),
210 "1.2.840.10045.4.3.4" => "CHACHA20_POLY1305_SHA256".to_string(),
211 "1.3.101.112" => "AES_256_GCM_SHA384".to_string(),
212 "1.3.101.113" => "AES_128_GCM_SHA256".to_string(),
213 _ => format!("{:?}", cert.signature_algorithm.algorithm),
214 };
215 let subject = cert.subject().to_string();
216 let issuer = cert.issuer().to_string();
217 let not_before = format_time(cert.validity().not_before.timestamp());
218 let not_after = format_time(cert.validity().not_after.timestamp());
219 if index == 0 {
220 stat.cert_cipher = Some(oid_str);
221 stat.cert_not_before = Some(not_before);
222 stat.cert_not_after = Some(not_after);
223 stat.subject = Some(subject);
224 stat.issuer = Some(issuer);
225 if let Ok(Some(sans)) = cert.subject_alternative_name() {
226 let mut domains = vec![];
227 for san in sans.value.general_names.iter() {
228 if let x509_parser::extensions::GeneralName::DNSName(domain) = san {
229 domains.push(domain.to_string());
230 }
231 }
232 stat.cert_domains = Some(domains);
233 };
234 continue;
235 }
236 certificates.push(Certificate {
237 subject,
238 issuer,
239 not_before,
240 not_after,
241 });
242 }
243 }
244 }
245 }
246 if !certificates.is_empty() {
247 stat.certificates = Some(certificates);
248 }
249
250 let quinn_conn = h3_quinn::Connection::new(conn);
252
253 let (mut driver, mut send_request) = match timeout(
254 http_req.request_timeout.unwrap_or(Duration::from_secs(30)),
255 h3::client::new(quinn_conn),
256 )
257 .await
258 {
259 Ok(Ok(result)) => result,
260 Ok(Err(e)) => {
261 return finish_with_error(stat, e, start);
262 }
263 Err(e) => {
264 return finish_with_error(stat, e, start);
265 }
266 };
267
268 let mut req = match http_req.builder(false).body(()) {
270 Ok(req) => req,
271 Err(e) => {
272 return finish_with_error(stat, e, start);
273 }
274 };
275 *req.version_mut() = Version::HTTP_3;
276 stat.request_headers = req.headers().clone();
277 let body = http_req.body.unwrap_or_default();
278
279 let drive = async move {
281 Err::<(), h3::error::ConnectionError>(future::poll_fn(|cx| driver.poll_close(cx)).await)
282 };
283
284 let request = async move {
286 let mut stream = send_request.send_request(req).await?;
287 stream.send_data(body).await?;
288
289 let mut sub_stat = HttpStat::default();
290
291 stream.finish().await?;
293
294 let server_processing_start = Instant::now();
295
296 let resp = stream.recv_response().await?;
297 sub_stat.server_processing = Some(server_processing_start.elapsed());
298
299 sub_stat.status = Some(resp.status());
300 sub_stat.headers = Some(resp.headers().clone());
301
302 let content_transfer_start = Instant::now();
304 let mut buf = BytesMut::new();
305 while let Some(chunk) = stream.recv_data().await? {
306 buf.extend(chunk.chunk());
307 }
308 sub_stat.content_transfer = Some(content_transfer_start.elapsed());
309 sub_stat.body = Some(Bytes::from(buf));
310 Ok::<HttpStat, h3::error::StreamError>(sub_stat)
311 };
312
313 let (req_res, drive_res) = tokio::join!(request, drive);
315 match req_res {
316 Ok(sub_stat) => {
317 stat.server_processing = sub_stat.server_processing;
318 stat.content_transfer = sub_stat.content_transfer;
319 stat.status = sub_stat.status;
320 stat.headers = sub_stat.headers;
321 stat.body = sub_stat.body;
322 }
323 Err(err) => {
324 if !err.is_h3_no_error() {
325 stat.error = Some(err.to_string());
326 }
327 }
328 }
329 if let Err(err) = drive_res {
330 if !err.is_h3_no_error() {
331 stat.error = Some(err.to_string());
332 }
333 }
334
335 stat.total = Some(start.elapsed());
336 client_endpoint.close(0u32.into(), b"done");
338
339 stat
340}
341
342async fn http1_2_request(http_req: HttpRequest) -> HttpStat {
343 let start = Instant::now();
344 let mut stat = HttpStat::default();
345
346 let dns_result = dns_resolve(&http_req, &mut stat).await;
348 let (addr, host) = match dns_result {
349 Ok(result) => result,
350 Err(e) => {
351 return finish_with_error(stat, e, start);
352 }
353 };
354
355 let uri = &http_req.uri;
356 let is_https = uri.scheme() == Some(&http::uri::Scheme::HTTPS);
357
358 let tcp_stream = match tcp_connect(addr, http_req.tcp_timeout, &mut stat).await {
360 Ok(stream) => stream,
361 Err(e) => {
362 return finish_with_error(stat, e, start);
363 }
364 };
365
366 let (tx, mut rx) = oneshot::channel();
368
369 let resp = if is_https {
371 let tls_result = tls_handshake(
373 host.clone(),
374 tcp_stream,
375 http_req.tls_timeout,
376 http_req.alpn_protocols.clone(),
377 http_req.skip_verify,
378 &mut stat,
379 )
380 .await;
381 let (tls_stream, is_http2) = match tls_result {
382 Ok(result) => result,
383 Err(e) => {
384 return finish_with_error(stat, e, start);
385 }
386 };
387
388 if is_http2 {
390 let req = match build_http_request(&http_req, false) {
391 Ok(req) => req,
392 Err(e) => {
393 return finish_with_error(stat, e, start);
394 }
395 };
396 stat.request_headers = req.headers().clone();
397 match send_https2_request(req, tls_stream, tx, &mut stat).await {
398 Ok(resp) => resp,
399 Err(e) => {
400 return finish_with_error(stat, e, start);
401 }
402 }
403 } else {
404 let req = match build_http_request(&http_req, true) {
405 Ok(req) => req,
406 Err(e) => {
407 return finish_with_error(stat, e, start);
408 }
409 };
410 stat.request_headers = req.headers().clone();
411 match send_https_request(req, tls_stream, http_req.request_timeout, tx, &mut stat).await
412 {
413 Ok(resp) => resp,
414 Err(e) => {
415 return finish_with_error(stat, e, start);
416 }
417 }
418 }
419 } else {
420 let req = match build_http_request(&http_req, true) {
421 Ok(req) => req,
422 Err(e) => {
423 return finish_with_error(stat, e, start);
424 }
425 };
426 stat.request_headers = req.headers().clone();
427 match send_http_request(req, tcp_stream, http_req.request_timeout, tx, &mut stat).await {
429 Ok(resp) => resp,
430 Err(e) => {
431 return finish_with_error(stat, e, start);
432 }
433 }
434 };
435
436 stat.status = Some(resp.status());
438 stat.headers = Some(resp.headers().clone());
439
440 if let Ok(error) = rx.try_recv() {
442 stat.error = Some(error);
443 }
444 let content_transfer_start = Instant::now();
446 let body_result = resp.collect().await;
447 let body = match body_result {
448 Ok(body) => body,
449 Err(e) => {
450 return finish_with_error(stat, format!("Failed to read response body: {e}"), start);
451 }
452 };
453
454 let body_bytes = body.to_bytes();
455 stat.body = Some(body_bytes);
456 stat.content_transfer = Some(content_transfer_start.elapsed());
457
458 stat.total = Some(start.elapsed());
459 stat
460}
461
462pub async fn request(http_req: HttpRequest) -> HttpStat {
497 ensure_crypto_provider();
498 let schema = if let Some(schema) = http_req.uri.scheme() {
499 schema.to_string()
500 } else {
501 "".to_string()
502 };
503
504 let mut stat = if ["grpc", "grpcs"].contains(&schema.as_str()) {
506 grpc_request(http_req).await
507 } else if http_req.alpn_protocols.contains(&ALPN_HTTP3.to_string()) {
508 http3_request(http_req).await
509 } else {
510 http1_2_request(http_req).await
511 };
512 if let Some(body) = &stat.body {
513 stat.body_size = Some(body.len());
514 }
515 let encoding = if let Some(headers) = &stat.headers {
516 headers
517 .get("content-encoding")
518 .map(|v| v.to_str().unwrap_or_default())
519 .unwrap_or_default()
520 } else {
521 ""
522 };
523
524 if !encoding.is_empty() {
525 if let Some(body) = &stat.body {
526 match decompress(encoding, body) {
527 Ok(data) => {
528 stat.body = Some(data);
529 }
530 Err(e) => {
531 stat.error = Some(e.to_string());
532 }
533 }
534 }
535 }
536
537 stat
538}