rust_integration_services/http/client/
http_client.rs1use std::sync::Arc;
2
3use hyper::{Request, Version};
4use hyper_util::rt::TokioIo;
5use tokio::net::TcpStream;
6use tokio_rustls::TlsConnector;
7
8use crate::http::{client::http_client_config::HttpClientConfig, executor::Executor, http_request::HttpRequest, http_response::HttpResponse};
9
10pub struct HttpClient {
11 config: Arc<HttpClientConfig>,
12}
13
14impl HttpClient {
15 pub fn new() -> Self {
16 Self {
17 config: Arc::new(HttpClientConfig::new()),
18 }
19 }
20
21 pub async fn send(self, request: HttpRequest) -> anyhow::Result<HttpResponse> {
25
26 let scheme = match request.scheme() {
27 Some(scheme) => scheme,
28 None => return Err(anyhow::anyhow!("URL is missing a scheme.")),
29 };
30
31 match scheme {
32 "http" => self.send_tcp(request).await,
33 "https" => self.send_tls(request).await,
34 _ => Err(anyhow::anyhow!("Unsupported scheme: {}", scheme)),
35 }
36 }
37
38
39 async fn send_tcp(self, request: HttpRequest) -> anyhow::Result<HttpResponse> {
40 let host = match request.host() {
41 Some(host) => host,
42 None => return Err(anyhow::anyhow!("Invalid URL.")),
43 };
44
45 let port = request.port().unwrap_or(80);
46
47 let stream = TcpStream::connect((host, port)).await?;
48 let io = TokioIo::new(stream);
49
50 let (mut sender, connection) = hyper::client::conn::http1::handshake(io).await?;
51
52 tokio::spawn(async move {
53 connection.await
54 });
55
56 let res = sender.send_request(Request::from(request)).await?;
57 Ok(HttpResponse::from(res))
58 }
59
60 async fn send_tls(self, request: HttpRequest) -> anyhow::Result<HttpResponse> {
61 let host = match request.host() {
62 Some(host) => host,
63 None => return Err(anyhow::anyhow!("Invalid URL.")),
64 };
65
66 let port = request.port().unwrap_or(443);
67 let domain = rustls::pki_types::ServerName::try_from(host.to_string())?;
68
69 let tls_config = self.config.tls_config.clone();
70 let tcp_stream = TcpStream::connect((host, port)).await?;
71 let tls_connector = TlsConnector::from(Arc::new(tls_config));
72 let tls_stream = tls_connector.connect(domain, tcp_stream).await?;
73
74 let protocol = tls_stream.get_ref().1.alpn_protocol();
75 let version = match protocol.as_deref() {
76 Some(b"h2") => Version::HTTP_2,
77 _ => Version::HTTP_11,
78 };
79
80 match version {
81 Version::HTTP_2 => {
82 let io = TokioIo::new(tls_stream);
83 let (mut sender, connection) = hyper::client::conn::http2::Builder::new(Executor).handshake(io).await?;
84
85 tokio::spawn(async move {
86 connection.await
87 });
88
89 let mut hyper_request = Request::from(request);
90 *hyper_request.version_mut() = version;
91 let res = sender.send_request(hyper_request).await?;
92 Ok(HttpResponse::from(res))
93 }
94 Version::HTTP_11 => {
95 let io = TokioIo::new(tls_stream);
96 let (mut sender, connection) = hyper::client::conn::http1::handshake(io).await?;
97
98 tokio::spawn(async move {
99 connection.await
100 });
101
102 let mut hyper_request = Request::from(request);
103 *hyper_request.version_mut() = version;
104 let res = sender.send_request(hyper_request).await?;
105 Ok(HttpResponse::from(res))
106 }
107 _ => {
108 Err(anyhow::anyhow!("Unsupported HTTP version"))
109 }
110 }
111 }
112}