simple_hyper_client/blocking/
client.rs1use super::body::Body;
8use super::Response;
9use crate::async_client::{ClientBuilder as AsyncClientBuilder, RequestDetails};
10use crate::connector::NetworkConnector;
11use crate::error::Error;
12use crate::shared_body::SharedBody;
13
14use futures_executor::block_on;
15use headers::{Header, HeaderMap, HeaderMapExt};
16use hyper::{Method, Uri};
17use tokio::runtime;
18use tokio::sync::{mpsc, oneshot};
19
20use std::convert::{TryFrom, TryInto};
21use std::sync::Arc;
22use std::thread::{self, JoinHandle};
23use std::time::Duration;
24
25#[derive(Clone)]
36pub struct Client {
37 inner: Arc<ClientInner>,
38}
39
40type ResponseSender = oneshot::Sender<Result<Response, Error>>;
41
42struct ClientInner {
43 tx: Option<mpsc::UnboundedSender<(RequestDetails, ResponseSender)>>,
44 thread: Option<JoinHandle<()>>,
45}
46
47impl Drop for ClientInner {
48 fn drop(&mut self) {
49 self.tx.take();
51 self.thread.take().map(|h| h.join());
52 }
53}
54
55macro_rules! define_method_fn {
56 (@internal $name:ident, $method:ident, $method_str:expr) => {
57 #[doc = "Initiate a "]
58 #[doc = $method_str]
59 #[doc = " request with the specified URI."]
60 pub fn $name<U>(&self, uri: U) -> Result<RequestBuilder, Error>
63 where
64 Uri: TryFrom<U>,
65 <Uri as TryFrom<U>>::Error: Into<http::Error>,
66 {
67 self.request(Method::$method, uri)
68 }
69 };
70
71 ($name:ident, $method:ident) => {
72 define_method_fn!(@internal $name, $method, stringify!($method));
73 };
74}
75
76impl Client {
77 pub fn builder() -> ClientBuilder {
78 ClientBuilder::new()
79 }
80
81 pub fn with_connector<C: NetworkConnector>(connector: C) -> Self {
82 ClientBuilder::new().build(connector)
83 }
84
85 pub fn request<U>(&self, method: Method, uri: U) -> Result<RequestBuilder, Error>
89 where
90 Uri: TryFrom<U>,
91 <Uri as TryFrom<U>>::Error: Into<http::Error>,
92 {
93 let uri = uri.try_into().map_err(Into::into).map_err(Error::Http)?;
94 Ok(RequestBuilder {
95 client: self,
96 details: RequestDetails::new(method, uri),
97 })
98 }
99
100 define_method_fn!(get, GET);
101 define_method_fn!(head, HEAD);
102 define_method_fn!(post, POST);
103 define_method_fn!(patch, PATCH);
104 define_method_fn!(put, PUT);
105 define_method_fn!(delete, DELETE);
106}
107
108#[derive(Clone)]
112pub struct ClientBuilder(AsyncClientBuilder);
113
114impl ClientBuilder {
115 fn new() -> Self {
116 ClientBuilder(AsyncClientBuilder::new())
117 }
118
119 pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
123 self.0.pool_max_idle_per_host(max_idle);
124 self
125 }
126
127 pub fn pool_idle_timeout(&mut self, val: Option<Duration>) -> &mut Self {
133 self.0.pool_idle_timeout(val);
134 self
135 }
136
137 pub fn build<C: NetworkConnector>(&self, connector: C) -> Client {
140 let async_client = self.0.build(connector);
141 let (tx, mut rx) = mpsc::unbounded_channel::<(RequestDetails, ResponseSender)>();
142
143 let thread = thread::spawn(move || {
144 let rt = runtime::Builder::new_current_thread()
145 .enable_all()
146 .build()
147 .unwrap(); rt.block_on(async move {
150 while let Some((req_details, resp_tx)) = rx.recv().await {
151 let async_client = async_client.clone();
152 tokio::spawn(async move {
153 match req_details.send(&async_client).await {
154 Ok(resp) => {
155 let (parts, hyper_body) = resp.into_parts();
156 let (fut, body) = Body::new(hyper_body);
157 let _ = resp_tx.send(Ok(Response::from_parts(parts, body)));
158 fut.await;
159 }
160 Err(e) => {
161 let _: Result<_, _> = resp_tx.send(Err(e));
162 }
163 }
164 });
165 }
166 })
167 });
168
169 Client {
170 inner: Arc::new(ClientInner {
171 tx: Some(tx),
172 thread: Some(thread),
173 }),
174 }
175 }
176}
177
178pub struct RequestBuilder<'a> {
187 client: &'a Client,
188 details: RequestDetails,
189}
190
191impl<'a> RequestBuilder<'a> {
192 pub fn body<B: Into<SharedBody>>(mut self, body: B) -> Self {
194 self.details.body = Some(body.into());
195 self
196 }
197
198 pub fn headers(mut self, headers: HeaderMap) -> Self {
200 self.details.headers = headers;
201 self
202 }
203
204 pub fn header<H: Header>(mut self, header: H) -> Self {
208 self.details.headers.typed_insert(header);
209 self
210 }
211
212 pub fn send(self) -> Result<Response, Error> {
217 let RequestBuilder { client, details } = self;
218 let (tx, rx) = oneshot::channel();
219 client
220 .inner
221 .tx
222 .as_ref()
223 .expect("runtime thread exited early")
224 .send((details, tx))
225 .expect("runtime thread panicked");
226
227 block_on(async move {
229 match rx.await {
230 Ok(res) => res,
231 Err(_) => panic!("event loop panicked"),
232 }
233 })
234 .map(|mut resp| {
235 resp.body_mut().keep_client_alive = KeepClientAlive(Some(client.inner.clone()));
236 resp
237 })
238 }
239}
240
241pub(super) struct KeepClientAlive(#[allow(unused)] Option<Arc<ClientInner>>);
242
243impl KeepClientAlive {
244 pub fn empty() -> Self {
245 KeepClientAlive(None)
246 }
247}
248
249#[cfg(test)]
250mod tests {
251 use super::*;
252 use crate::connector::HttpConnector;
253 use headers::ContentType;
254 use hyper::StatusCode;
255 use std::io::{Read, Write};
256 use std::net::{SocketAddr, TcpListener};
257 use std::thread;
258
259 const RESPONSE_OK: &str = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, world!\r\n";
260 const RESPONSE_404: &str =
261 "HTTP/1.1 404 Not Found\r\nContent-Length: 23\r\n\r\nResource was not found.\r\n";
262
263 fn test_http_server(resp: &'static str) -> SocketAddr {
264 let listener = TcpListener::bind("127.0.0.1:0").unwrap();
265 let addr = listener.local_addr().unwrap();
266 thread::spawn(move || {
267 let (mut stream, _) = listener.accept().unwrap();
268 let mut input = Vec::new();
269 stream.read(&mut input).unwrap();
270 stream.write_all(resp.as_bytes()).unwrap();
271 });
272 addr
273 }
274
275 #[test]
276 fn http_client_ok() {
277 let addr = test_http_server(RESPONSE_OK);
278 let url = format!("http://{}/", addr);
279
280 let connector = HttpConnector::new();
281 let client = Client::with_connector(connector);
282 let mut response = client
283 .request(Method::POST, url)
284 .unwrap()
285 .header(ContentType::json())
286 .body(r#"{"key":"value"}"#)
287 .send()
288 .unwrap();
289
290 assert_eq!(response.status(), StatusCode::OK);
291 let mut body = String::new();
292 response.body_mut().read_to_string(&mut body).unwrap();
293 assert_eq!(body, "Hello, world!");
294 }
295
296 #[test]
297 fn drop_client_before_response() {
298 let addr = test_http_server(RESPONSE_404);
299 let url = format!("http://{}/", addr);
300
301 let connector = HttpConnector::new();
302 let client = Client::with_connector(connector);
303 let mut response = client.get(url).unwrap().send().unwrap();
304 drop(client);
305
306 assert_eq!(response.status(), StatusCode::NOT_FOUND);
307 assert_eq!(response.headers().len(), 1);
308 let mut body = String::new();
309 response.body_mut().read_to_string(&mut body).unwrap();
310 assert_eq!(body, "Resource was not found.");
311 }
312}