simple_hyper_client/blocking/
client.rs

1/* Copyright (c) Fortanix, Inc.
2 *
3 * This Source Code Form is subject to the terms of the Mozilla Public
4 * License, v. 2.0. If a copy of the MPL was not distributed with this
5 * file, You can obtain one at http://mozilla.org/MPL/2.0/. */
6
7use super::body::Body;
8use super::Response;
9use crate::async_client::{ClientBuilder as AsyncClientBuilder, RequestDetails};
10use crate::connector::NetworkConnector;
11use crate::error::Error;
12use crate::shared_body::SharedBody;
13
14use futures_executor::block_on;
15use headers::{Header, HeaderMap, HeaderMapExt};
16use hyper::{Method, Uri};
17use tokio::runtime;
18use tokio::sync::{mpsc, oneshot};
19
20use std::convert::{TryFrom, TryInto};
21use std::sync::Arc;
22use std::thread::{self, JoinHandle};
23use std::time::Duration;
24
25/// A wrapper for [hyper's `Client` type] providing a blocking interface
26///
27/// Example usage:
28/// ```ignore
29/// let connector = HttpConnector::new();
30/// let client = Client::with_connector(connector);
31/// let response = client.get("http://example.com/")?.send()?;
32/// ```
33///
34/// [hyper's `Client` type]: https://docs.rs/hyper/latest/hyper/client/struct.Client.html
35#[derive(Clone)]
36pub struct Client {
37    inner: Arc<ClientInner>,
38}
39
40type ResponseSender = oneshot::Sender<Result<Response, Error>>;
41
42struct ClientInner {
43    tx: Option<mpsc::UnboundedSender<(RequestDetails, ResponseSender)>>,
44    thread: Option<JoinHandle<()>>,
45}
46
47impl Drop for ClientInner {
48    fn drop(&mut self) {
49        // signal shutdown to the thread
50        self.tx.take();
51        self.thread.take().map(|h| h.join());
52    }
53}
54
55macro_rules! define_method_fn {
56    (@internal $name:ident, $method:ident, $method_str:expr) => {
57        #[doc = "Initiate a "]
58        #[doc = $method_str]
59        #[doc = " request with the specified URI."]
60        ///
61        /// Returns an error if `uri` is invalid.
62        pub fn $name<U>(&self, uri: U) -> Result<RequestBuilder, Error>
63        where
64            Uri: TryFrom<U>,
65            <Uri as TryFrom<U>>::Error: Into<http::Error>,
66        {
67            self.request(Method::$method, uri)
68        }
69    };
70
71    ($name:ident, $method:ident) => {
72        define_method_fn!(@internal $name, $method, stringify!($method));
73    };
74}
75
76impl Client {
77    pub fn builder() -> ClientBuilder {
78        ClientBuilder::new()
79    }
80
81    pub fn with_connector<C: NetworkConnector>(connector: C) -> Self {
82        ClientBuilder::new().build(connector)
83    }
84
85    /// Initiate a request with the specified method and URI.
86    ///
87    /// Returns an error if `uri` is invalid.
88    pub fn request<U>(&self, method: Method, uri: U) -> Result<RequestBuilder, Error>
89    where
90        Uri: TryFrom<U>,
91        <Uri as TryFrom<U>>::Error: Into<http::Error>,
92    {
93        let uri = uri.try_into().map_err(Into::into).map_err(Error::Http)?;
94        Ok(RequestBuilder {
95            client: self,
96            details: RequestDetails::new(method, uri),
97        })
98    }
99
100    define_method_fn!(get, GET);
101    define_method_fn!(head, HEAD);
102    define_method_fn!(post, POST);
103    define_method_fn!(patch, PATCH);
104    define_method_fn!(put, PUT);
105    define_method_fn!(delete, DELETE);
106}
107
108/// A builder for [`Client`].
109///
110/// [`Client`]: struct.Client.html
111#[derive(Clone)]
112pub struct ClientBuilder(AsyncClientBuilder);
113
114impl ClientBuilder {
115    fn new() -> Self {
116        ClientBuilder(AsyncClientBuilder::new())
117    }
118
119    /// Sets the maximum idle connection per host allowed in the pool.
120    ///
121    /// Default is usize::MAX (no limit).
122    pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
123        self.0.pool_max_idle_per_host(max_idle);
124        self
125    }
126
127    /// Set an optional timeout for idle sockets being kept-alive.
128    ///
129    /// Pass `None` to disable timeout.
130    ///
131    /// Default is 90 seconds.
132    pub fn pool_idle_timeout(&mut self, val: Option<Duration>) -> &mut Self {
133        self.0.pool_idle_timeout(val);
134        self
135    }
136
137    /// Combine the configuration of this builder with a connector to create a
138    /// `Client`.
139    pub fn build<C: NetworkConnector>(&self, connector: C) -> Client {
140        let async_client = self.0.build(connector);
141        let (tx, mut rx) = mpsc::unbounded_channel::<(RequestDetails, ResponseSender)>();
142
143        let thread = thread::spawn(move || {
144            let rt = runtime::Builder::new_current_thread()
145                .enable_all()
146                .build()
147                .unwrap(); // TODO: send back an error through a oneshot channel
148
149            rt.block_on(async move {
150                while let Some((req_details, resp_tx)) = rx.recv().await {
151                    let async_client = async_client.clone();
152                    tokio::spawn(async move {
153                        match req_details.send(&async_client).await {
154                            Ok(resp) => {
155                                let (parts, hyper_body) = resp.into_parts();
156                                let (fut, body) = Body::new(hyper_body);
157                                let _ = resp_tx.send(Ok(Response::from_parts(parts, body)));
158                                fut.await;
159                            }
160                            Err(e) => {
161                                let _: Result<_, _> = resp_tx.send(Err(e));
162                            }
163                        }
164                    });
165                }
166            })
167        });
168
169        Client {
170            inner: Arc::new(ClientInner {
171                tx: Some(tx),
172                thread: Some(thread),
173            }),
174        }
175    }
176}
177
178/// An HTTP request builder
179///
180/// This is created through [`Client::get()`], [`Client::post()`] etc.
181/// You need to call [`send()`] to actually send the request over the network.
182///
183/// [`Client::get()`]: struct.Client.html#method.get
184/// [`Client::post()`]: struct.Client.html#method.post
185/// [`send()`]: struct.RequestBuilder.html#method.send
186pub struct RequestBuilder<'a> {
187    client: &'a Client,
188    details: RequestDetails,
189}
190
191impl<'a> RequestBuilder<'a> {
192    /// Set the request body.
193    pub fn body<B: Into<SharedBody>>(mut self, body: B) -> Self {
194        self.details.body = Some(body.into());
195        self
196    }
197
198    /// Set the request headers.
199    pub fn headers(mut self, headers: HeaderMap) -> Self {
200        self.details.headers = headers;
201        self
202    }
203
204    /// Set a single header using [`HeaderMapExt::typed_insert()`].
205    ///
206    /// [`HeaderMapExt::typed_insert()`]: https://docs.rs/headers/0.3.5/headers/trait.HeaderMapExt.html#tymethod.typed_insert
207    pub fn header<H: Header>(mut self, header: H) -> Self {
208        self.details.headers.typed_insert(header);
209        self
210    }
211
212    /// Send the request over the network.
213    ///
214    /// Returns an error before sending the request if there is something wrong
215    /// with the request parameters (method, uri, etc.).
216    pub fn send(self) -> Result<Response, Error> {
217        let RequestBuilder { client, details } = self;
218        let (tx, rx) = oneshot::channel();
219        client
220            .inner
221            .tx
222            .as_ref()
223            .expect("runtime thread exited early")
224            .send((details, tx))
225            .expect("runtime thread panicked");
226
227        // TODO: replace `block_on` with `rx.blocking_recv()` once we move to tokio 1.16+
228        block_on(async move {
229            match rx.await {
230                Ok(res) => res,
231                Err(_) => panic!("event loop panicked"),
232            }
233        })
234        .map(|mut resp| {
235            resp.body_mut().keep_client_alive = KeepClientAlive(Some(client.inner.clone()));
236            resp
237        })
238    }
239}
240
241pub(super) struct KeepClientAlive(#[allow(unused)] Option<Arc<ClientInner>>);
242
243impl KeepClientAlive {
244    pub fn empty() -> Self {
245        KeepClientAlive(None)
246    }
247}
248
249#[cfg(test)]
250mod tests {
251    use super::*;
252    use crate::connector::HttpConnector;
253    use headers::ContentType;
254    use hyper::StatusCode;
255    use std::io::{Read, Write};
256    use std::net::{SocketAddr, TcpListener};
257    use std::thread;
258
259    const RESPONSE_OK: &str = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, world!\r\n";
260    const RESPONSE_404: &str =
261        "HTTP/1.1 404 Not Found\r\nContent-Length: 23\r\n\r\nResource was not found.\r\n";
262
263    fn test_http_server(resp: &'static str) -> SocketAddr {
264        let listener = TcpListener::bind("127.0.0.1:0").unwrap();
265        let addr = listener.local_addr().unwrap();
266        thread::spawn(move || {
267            let (mut stream, _) = listener.accept().unwrap();
268            let mut input = Vec::new();
269            stream.read(&mut input).unwrap();
270            stream.write_all(resp.as_bytes()).unwrap();
271        });
272        addr
273    }
274
275    #[test]
276    fn http_client_ok() {
277        let addr = test_http_server(RESPONSE_OK);
278        let url = format!("http://{}/", addr);
279
280        let connector = HttpConnector::new();
281        let client = Client::with_connector(connector);
282        let mut response = client
283            .request(Method::POST, url)
284            .unwrap()
285            .header(ContentType::json())
286            .body(r#"{"key":"value"}"#)
287            .send()
288            .unwrap();
289
290        assert_eq!(response.status(), StatusCode::OK);
291        let mut body = String::new();
292        response.body_mut().read_to_string(&mut body).unwrap();
293        assert_eq!(body, "Hello, world!");
294    }
295
296    #[test]
297    fn drop_client_before_response() {
298        let addr = test_http_server(RESPONSE_404);
299        let url = format!("http://{}/", addr);
300
301        let connector = HttpConnector::new();
302        let client = Client::with_connector(connector);
303        let mut response = client.get(url).unwrap().send().unwrap();
304        drop(client);
305
306        assert_eq!(response.status(), StatusCode::NOT_FOUND);
307        assert_eq!(response.headers().len(), 1);
308        let mut body = String::new();
309        response.body_mut().read_to_string(&mut body).unwrap();
310        assert_eq!(body, "Resource was not found.");
311    }
312}