1use super::Result;
4use alloc::boxed::Box;
5use async_flow::{Inputs, Outputs};
6use core::error::Error as StdError;
7use hyper::body::{Body, Incoming};
8
9pub async fn request<T>(
11 mut requests: Inputs<http::Request<T>>,
12 responses: Outputs<Result<http::Response<Incoming>>>,
13) -> Result<(), async_flow::Error>
14where
15 T: Body + Send + 'static + Unpin,
16 T::Data: Send,
17 T::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
18{
19 while let Some(request) = requests.recv().await? {
20 let response = execute(request).await;
21 responses.send(response).await?;
22 }
23 Ok(())
24}
25
26#[cfg(all(
27 feature = "std",
28 all(feature = "http1", not(feature = "http2")),
29 not(feature = "tls")
30))]
31async fn execute<T>(request: http::Request<T>) -> Result<http::Response<Incoming>>
32where
33 T: Body + Send + 'static,
34 T::Data: Send,
35 T::Error: Into<Box<dyn StdError + Send + Sync>>,
36{
37 use super::Error;
38 use hyper::client::conn::http1;
39 use hyper_util::rt::TokioIo;
40 use tokio::net::TcpStream;
41
42 let url = request.uri();
43 let url_scheme = url.scheme().ok_or(Error::MissingUrlScheme)?;
44 let url_host = url.host().ok_or(Error::MissingUrlHost)?;
45 let is_https = url_scheme == &http::uri::Scheme::HTTPS;
46 let url_port = url
47 .port_u16()
48 .unwrap_or_else(|| if is_https { 443 } else { 80 });
49
50 let tcp_addr = (url_host, url_port);
51 let tcp_stream = match TcpStream::connect(tcp_addr).await {
52 Err(error) => return Err(Error::TcpConnectFailed(error)),
53 Ok(tcp_stream) => tcp_stream,
54 };
55
56 let io_adapter = TokioIo::new(tcp_stream);
57 let mut sender = match http1::handshake(io_adapter).await {
58 Err(error) => return Err(Error::HttpHandshakeFailed(error)),
59 Ok((sender, conn)) => {
60 tokio::task::spawn(async move {
61 if let Err(error) = conn.await {
62 #[cfg(feature = "std")]
63 std::eprintln!("Connection failed: {:?}", error); }
65 });
66 sender
67 },
68 };
69
70 Ok(sender.send_request(request).await?)
71}
72
73#[cfg(all(
74 feature = "std",
75 any(feature = "http1", feature = "http2"),
76 feature = "tls"
77))]
78async fn execute<T>(request: http::Request<T>) -> Result<http::Response<Incoming>>
79where
80 T: Body + Send + 'static + Unpin,
81 T::Data: Send,
82 T::Error: Into<Box<dyn StdError + Send + Sync>>,
83{
84 use super::Error;
85 use hyper_util::{client::legacy::Client, rt::TokioExecutor};
86 use rustls::ClientConfig;
87 use rustls_platform_verifier::ConfigVerifierExt;
88
89 let url = request.uri();
90 let _ = url.scheme().ok_or(Error::MissingUrlScheme)?;
91 let _ = url.host().ok_or(Error::MissingUrlHost)?;
92
93 let tls_config = ClientConfig::with_platform_verifier();
94 let http_connector = hyper_rustls::HttpsConnectorBuilder::new().with_tls_config(tls_config);
95
96 #[cfg(all(feature = "http1", not(feature = "http2")))]
97 let http_connector = http_connector.https_or_http().enable_http1();
98
99 #[cfg(all(feature = "http2", not(feature = "http1")))]
100 let http_connector = http_connector.https_only().enable_http2();
101
102 #[cfg(all(feature = "http1", feature = "http2"))]
103 let http_connector = http_connector.https_or_http().enable_http1().enable_http2();
104
105 let http_client: Client<_, T> =
106 Client::builder(TokioExecutor::new()).build(http_connector.build());
107
108 Ok(http_client
109 .request(request)
110 .await
111 .map_err(|_e| Error::Other(Box::new(_e)))?)
112}
113
114#[cfg(any(not(feature = "std"), not(any(feature = "http1", feature = "http2"))))]
115async fn execute<T>(_request: http::Request<T>) -> Result<http::Response<Incoming>>
116where
117 T: Body + Send + 'static,
118 T::Data: Send,
119 T::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
120{
121 #[allow(unreachable_code)]
122 return Err(unimplemented!());
123}
124
125#[cfg(test)]
126mod tests {
127 use super::*;
128 use alloc::{boxed::Box, string::String};
129 use async_flow::{Channel, InputPort};
130 use core::error::Error;
131
132 #[cfg(any(feature = "http1", feature = "http2"))]
133 #[tokio::test]
134 async fn test_request() -> Result<(), Box<dyn Error>> {
135 let mut in_ = Channel::bounded(1);
136 let mut out = Channel::bounded(1);
137
138 let fetcher = tokio::spawn(request(in_.rx, out.tx));
139
140 #[cfg(all(feature = "http1", not(feature = "http2")))]
141 let urls = ["http://httpbin.org/ip"];
142
143 #[cfg(feature = "http2")]
144 let urls = ["https://ar.to/robots.txt"];
145
146 for url in urls {
147 use hyper::header::HOST;
148 let url = url
149 .parse::<http::Uri>()
150 .expect("the input URL should be valid");
151 let url_authority = url
152 .authority()
153 .expect("the input URL should have an authority segment")
154 .clone();
155 let request = http::Request::builder()
156 .uri(url)
157 .header(HOST, url_authority.as_str())
158 .body(String::new())
159 .expect("the HTTP request should be constructed");
160
161 in_.tx.send(request).await.unwrap();
162 }
163 in_.tx.close();
164
165 let _ = tokio::join!(fetcher);
166
167 let outputs = out.rx.recv_all().await.unwrap();
168 #[cfg(feature = "std")]
169 std::eprintln!("{:?}", outputs); assert_eq!(outputs.len(), 1);
171
172 Ok(())
173 }
174}