Skip to main content

flows_http/
request.rs

1// This is free and unencumbered software released into the public domain.
2
3use super::Result;
4use alloc::boxed::Box;
5use async_flow::{Inputs, Outputs};
6use core::error::Error as StdError;
7use hyper::body::{Body, Incoming};
8
9/// A block that outputs HTTP responses corresponding to input HTTP requests.
10pub async fn request<T>(
11    mut requests: Inputs<http::Request<T>>,
12    responses: Outputs<Result<http::Response<Incoming>>>,
13) -> Result<(), async_flow::Error>
14where
15    T: Body + Send + 'static + Unpin,
16    T::Data: Send,
17    T::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
18{
19    while let Some(request) = requests.recv().await? {
20        let response = execute(request).await;
21        responses.send(response).await?;
22    }
23    Ok(())
24}
25
26#[cfg(all(
27    feature = "std",
28    all(feature = "http1", not(feature = "http2")),
29    not(feature = "tls")
30))]
31async fn execute<T>(request: http::Request<T>) -> Result<http::Response<Incoming>>
32where
33    T: Body + Send + 'static,
34    T::Data: Send,
35    T::Error: Into<Box<dyn StdError + Send + Sync>>,
36{
37    use super::Error;
38    use hyper::client::conn::http1;
39    use hyper_util::rt::TokioIo;
40    use tokio::net::TcpStream;
41
42    let url = request.uri();
43    let url_scheme = url.scheme().ok_or(Error::MissingUrlScheme)?;
44    let url_host = url.host().ok_or(Error::MissingUrlHost)?;
45    let is_https = url_scheme == &http::uri::Scheme::HTTPS;
46    let url_port = url
47        .port_u16()
48        .unwrap_or_else(|| if is_https { 443 } else { 80 });
49
50    let tcp_addr = (url_host, url_port);
51    let tcp_stream = match TcpStream::connect(tcp_addr).await {
52        Err(error) => return Err(Error::TcpConnectFailed(error)),
53        Ok(tcp_stream) => tcp_stream,
54    };
55
56    let io_adapter = TokioIo::new(tcp_stream);
57    let mut sender = match http1::handshake(io_adapter).await {
58        Err(error) => return Err(Error::HttpHandshakeFailed(error)),
59        Ok((sender, conn)) => {
60            tokio::task::spawn(async move {
61                if let Err(error) = conn.await {
62                    #[cfg(feature = "std")]
63                    std::eprintln!("Connection failed: {:?}", error); // FIXME
64                }
65            });
66            sender
67        },
68    };
69
70    Ok(sender.send_request(request).await?)
71}
72
73#[cfg(all(
74    feature = "std",
75    any(feature = "http1", feature = "http2"),
76    feature = "tls"
77))]
78async fn execute<T>(request: http::Request<T>) -> Result<http::Response<Incoming>>
79where
80    T: Body + Send + 'static + Unpin,
81    T::Data: Send,
82    T::Error: Into<Box<dyn StdError + Send + Sync>>,
83{
84    use super::Error;
85    use hyper_util::{client::legacy::Client, rt::TokioExecutor};
86    use rustls::ClientConfig;
87    use rustls_platform_verifier::ConfigVerifierExt;
88
89    let url = request.uri();
90    let _ = url.scheme().ok_or(Error::MissingUrlScheme)?;
91    let _ = url.host().ok_or(Error::MissingUrlHost)?;
92
93    let tls_config = ClientConfig::with_platform_verifier();
94    let http_connector = hyper_rustls::HttpsConnectorBuilder::new().with_tls_config(tls_config);
95
96    #[cfg(all(feature = "http1", not(feature = "http2")))]
97    let http_connector = http_connector.https_or_http().enable_http1();
98
99    #[cfg(all(feature = "http2", not(feature = "http1")))]
100    let http_connector = http_connector.https_only().enable_http2();
101
102    #[cfg(all(feature = "http1", feature = "http2"))]
103    let http_connector = http_connector.https_or_http().enable_http1().enable_http2();
104
105    let http_client: Client<_, T> =
106        Client::builder(TokioExecutor::new()).build(http_connector.build());
107
108    Ok(http_client
109        .request(request)
110        .await
111        .map_err(|_e| Error::Other(Box::new(_e)))?)
112}
113
114#[cfg(any(not(feature = "std"), not(any(feature = "http1", feature = "http2"))))]
115async fn execute<T>(_request: http::Request<T>) -> Result<http::Response<Incoming>>
116where
117    T: Body + Send + 'static,
118    T::Data: Send,
119    T::Error: Into<Box<dyn core::error::Error + Send + Sync>>,
120{
121    #[allow(unreachable_code)]
122    return Err(unimplemented!());
123}
124
125#[cfg(test)]
126mod tests {
127    use super::*;
128    use alloc::{boxed::Box, string::String};
129    use async_flow::{Channel, InputPort};
130    use core::error::Error;
131
132    #[cfg(any(feature = "http1", feature = "http2"))]
133    #[tokio::test]
134    async fn test_request() -> Result<(), Box<dyn Error>> {
135        let mut in_ = Channel::bounded(1);
136        let mut out = Channel::bounded(1);
137
138        let fetcher = tokio::spawn(request(in_.rx, out.tx));
139
140        #[cfg(all(feature = "http1", not(feature = "http2")))]
141        let urls = ["http://httpbin.org/ip"];
142
143        #[cfg(feature = "http2")]
144        let urls = ["https://ar.to/robots.txt"];
145
146        for url in urls {
147            use hyper::header::HOST;
148            let url = url
149                .parse::<http::Uri>()
150                .expect("the input URL should be valid");
151            let url_authority = url
152                .authority()
153                .expect("the input URL should have an authority segment")
154                .clone();
155            let request = http::Request::builder()
156                .uri(url)
157                .header(HOST, url_authority.as_str())
158                .body(String::new())
159                .expect("the HTTP request should be constructed");
160
161            in_.tx.send(request).await.unwrap();
162        }
163        in_.tx.close();
164
165        let _ = tokio::join!(fetcher);
166
167        let outputs = out.rx.recv_all().await.unwrap();
168        #[cfg(feature = "std")]
169        std::eprintln!("{:?}", outputs); // DEBUG
170        assert_eq!(outputs.len(), 1);
171
172        Ok(())
173    }
174}