1use crate::connector::{ConnectorAdapter, NetworkConnector};
8use crate::error::Error;
9use crate::shared_body::SharedBody;
10use crate::Response;
11
12use headers::{ContentLength, Header, HeaderMap, HeaderMapExt};
13use hyper::{Client as HyperClient, Method, Request, Uri};
14
15use std::convert::{TryFrom, TryInto};
16use std::fmt;
17use std::future::Future;
18use std::sync::Arc;
19use std::time::Duration;
20
21#[derive(Clone)]
32pub struct Client {
33 inner: Arc<HyperClient<ConnectorAdapter, SharedBody>>,
34}
35
36macro_rules! define_method_fn {
37 (@internal $name:ident, $method:ident, $method_str:expr) => {
38 #[doc = "Initiate a "]
39 #[doc = $method_str]
40 #[doc = " request with the specified URI."]
41 pub fn $name<U>(&self, uri: U) -> Result<RequestBuilder<'_>, Error>
44 where
45 Uri: TryFrom<U>,
46 <Uri as TryFrom<U>>::Error: Into<http::Error>,
47 {
48 self.request(Method::$method, uri)
49 }
50 };
51
52 ($name:ident, $method:ident) => {
53 define_method_fn!(@internal $name, $method, stringify!($method));
54 };
55}
56
57impl Client {
58 pub fn builder() -> ClientBuilder {
59 ClientBuilder::new()
60 }
61
62 pub fn with_connector<C: NetworkConnector>(connector: C) -> Self {
64 ClientBuilder::new().build(connector)
65 }
66
67 pub async fn send(&self, request: Request<SharedBody>) -> Result<Response, Error> {
70 Ok(self.inner.request(request).await?)
71 }
72
73 pub fn request<U>(&self, method: Method, uri: U) -> Result<RequestBuilder<'_>, Error>
77 where
78 Uri: TryFrom<U>,
79 <Uri as TryFrom<U>>::Error: Into<http::Error>,
80 {
81 let uri = uri.try_into().map_err(Into::into).map_err(Error::Http)?;
82 Ok(RequestBuilder {
83 client: self,
84 details: RequestDetails::new(method, uri),
85 })
86 }
87
88 define_method_fn!(get, GET);
89 define_method_fn!(head, HEAD);
90 define_method_fn!(post, POST);
91 define_method_fn!(patch, PATCH);
92 define_method_fn!(put, PUT);
93 define_method_fn!(delete, DELETE);
94}
95
96#[derive(Clone)]
102pub struct ClientBuilder {
103 max_idle_per_host: usize,
104 idle_timeout: Option<Duration>,
105}
106
107impl ClientBuilder {
108 pub(crate) fn new() -> Self {
109 ClientBuilder {
110 max_idle_per_host: usize::MAX,
111 idle_timeout: Some(Duration::from_secs(90)),
112 }
113 }
114
115 pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
119 self.max_idle_per_host = max_idle;
120 self
121 }
122
123 pub fn pool_idle_timeout(&mut self, val: Option<Duration>) -> &mut Self {
129 self.idle_timeout = val;
130 self
131 }
132
133 pub fn build<C: NetworkConnector>(&self, connector: C) -> Client {
136 Client {
137 inner: Arc::new(
138 HyperClient::builder()
139 .pool_max_idle_per_host(self.max_idle_per_host)
140 .pool_idle_timeout(self.idle_timeout)
141 .executor(TokioExecutor)
142 .build(ConnectorAdapter::new(connector)),
143 ),
144 }
145 }
146}
147
148pub(crate) struct RequestDetails {
149 pub(crate) method: Method,
150 pub(crate) uri: Uri,
151 pub(crate) headers: HeaderMap,
152 pub(crate) body: Option<SharedBody>,
153}
154
155impl fmt::Debug for RequestDetails {
156 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
157 f.debug_struct("RequestDetails")
158 .field("method", &self.method)
159 .field("uri", &self.uri)
160 .field("headers", &self.headers.len())
161 .field("body", &self.body.as_ref().map_or("None", |_| "Some(...)"))
162 .finish()
163 }
164}
165
166impl RequestDetails {
167 pub fn new(method: Method, uri: Uri) -> Self {
168 RequestDetails {
169 method,
170 uri,
171 headers: HeaderMap::new(),
172 body: None,
173 }
174 }
175
176 pub async fn send(self, client: &Client) -> Result<Response, Error> {
177 let req = self.into_request()?;
178 Ok(client.inner.request(req).await?)
179 }
180
181 pub fn into_request(mut self) -> Result<Request<SharedBody>, Error> {
182 let can_have_body = match self.method {
183 Method::GET | Method::HEAD | Method::DELETE => false,
185 _ => true,
186 };
187 let body = match can_have_body {
188 true => {
189 let body = self.body.unwrap_or_else(|| SharedBody::empty());
190 self.headers.typed_insert(ContentLength(body.len() as u64));
195 body
196 }
197 false if self.body.is_some() => return Err(Error::BodyNotAllowed(self.method)),
198 false => SharedBody::empty(),
199 };
200 let mut req = Request::builder().method(self.method).uri(self.uri);
201 match req.headers_mut() {
202 Some(headers) => {
203 *headers = self.headers;
204 }
205 None => match req.body(SharedBody::empty()) {
207 Err(e) => return Err(e.into()),
208 Ok(_) => {
209 panic!("request builder must have errors if `fn headers_mut()` returns None")
210 }
211 },
212 }
213
214 Ok(req.body(body)?)
215 }
216}
217
218pub struct RequestBuilder<'a> {
229 client: &'a Client,
230 details: RequestDetails,
231}
232
233impl<'a> RequestBuilder<'a> {
234 pub fn body<B: Into<SharedBody>>(mut self, body: B) -> Self {
236 self.details.body = Some(body.into());
237 self
238 }
239
240 pub fn headers(mut self, headers: HeaderMap) -> Self {
242 self.details.headers = headers;
243 self
244 }
245
246 pub fn header<H: Header>(mut self, header: H) -> Self {
250 self.details.headers.typed_insert(header);
251 self
252 }
253
254 pub fn build(self) -> Result<Request<SharedBody>, Error> {
259 self.details.into_request()
260 }
261
262 pub async fn send(self) -> Result<Response, Error> {
267 self.details.send(&self.client).await
268 }
269}
270
271#[derive(Copy, Clone)]
272pub(crate) struct TokioExecutor;
273
274impl<F> hyper::rt::Executor<F> for TokioExecutor
275where
276 F: Future + Send + 'static,
277 F::Output: Send + 'static,
278{
279 fn execute(&self, fut: F) {
280 tokio::spawn(fut);
281 }
282}
283
284#[cfg(test)]
285mod tests {
286 use super::*;
287 use crate::connector::HttpConnector;
288 use headers::ContentType;
289 use hyper::body::to_bytes;
290 use hyper::StatusCode;
291 use std::net::SocketAddr;
292 use tokio::io::{AsyncReadExt, AsyncWriteExt};
293 use tokio::net::TcpListener;
294
295 const RESPONSE_OK: &str = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, world!\r\n";
296 const RESPONSE_404: &str =
297 "HTTP/1.1 404 Not Found\r\nContent-Length: 23\r\n\r\nResource was not found.\r\n";
298
299 async fn test_http_server(resp: &'static str) -> SocketAddr {
300 let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
301 let addr = listener.local_addr().unwrap();
302 tokio::spawn(async move {
303 let (mut stream, _) = listener.accept().await.unwrap();
304 let mut input = Vec::new();
305 stream.read(&mut input).await.unwrap();
306 stream.write_all(resp.as_bytes()).await.unwrap();
307 });
308 addr
309 }
310
311 #[tokio::test]
312 async fn http_client() {
313 let addr = test_http_server(RESPONSE_OK).await;
314 let url = format!("http://{}/", addr);
315
316 let connector = HttpConnector::new();
317 let client = Client::with_connector(connector);
318 let response = client
319 .post(url)
320 .unwrap()
321 .header(ContentType::json())
322 .body(r#"{"key":"value"}"#)
323 .send()
324 .await
325 .unwrap();
326
327 assert_eq!(response.status(), StatusCode::OK);
328 let body = to_bytes(response).await.unwrap();
329 assert_eq!(body, "Hello, world!".as_bytes());
330 }
331
332 #[tokio::test]
333 async fn drop_client_before_response() {
334 let addr = test_http_server(RESPONSE_404).await;
335 let url = format!("http://{}/", addr);
336
337 let connector = HttpConnector::new();
338 let client = Client::with_connector(connector);
339 let response = client.get(url).unwrap().send().await.unwrap();
340 drop(client);
341
342 assert_eq!(response.status(), StatusCode::NOT_FOUND);
343 assert_eq!(response.headers().len(), 1);
344 let body = to_bytes(response).await.unwrap();
345 assert_eq!(body, "Resource was not found.");
346 }
347
348 #[tokio::test]
349 async fn http_connector_connect_timeout() {
350 let url = "http://192.0.2.1/";
352 let connector = HttpConnector::new().connect_timeout(Some(Duration::from_millis(100)));
353 let client = Client::with_connector(connector);
354 let err = client.get(url).unwrap().send().await.unwrap_err();
355 assert_eq!(
356 err.to_string(),
357 "error trying to connect: I/O error: connection timed out"
358 );
359 }
360}