use super::body::Body;
use super::Response;
use crate::async_client::{ClientBuilder as AsyncClientBuilder, RequestDetails};
use crate::connector::NetworkConnector;
use crate::error::Error;
use crate::shared_body::SharedBody;
use futures_executor::block_on;
use headers::{Header, HeaderMap, HeaderMapExt};
use hyper::{Method, Uri};
use tokio::runtime;
use tokio::sync::{mpsc, oneshot};
use std::convert::{TryFrom, TryInto};
use std::sync::Arc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
#[derive(Clone)]
pub struct Client {
inner: Arc<ClientInner>,
}
type ResponseSender = oneshot::Sender<Result<Response, Error>>;
struct ClientInner {
tx: Option<mpsc::UnboundedSender<(RequestDetails, ResponseSender)>>,
thread: Option<JoinHandle<()>>,
}
impl Drop for ClientInner {
fn drop(&mut self) {
self.tx.take();
self.thread.take().map(|h| h.join());
}
}
macro_rules! define_method_fn {
(@internal $name:ident, $method:ident, $method_str:expr) => {
#[doc = "Initiate a "]
#[doc = $method_str]
#[doc = " request with the specified URI."]
pub fn $name<U>(&self, uri: U) -> Result<RequestBuilder, Error>
where
Uri: TryFrom<U>,
<Uri as TryFrom<U>>::Error: Into<http::Error>,
{
self.request(Method::$method, uri)
}
};
($name:ident, $method:ident) => {
define_method_fn!(@internal $name, $method, stringify!($method));
};
}
impl Client {
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
pub fn with_connector<C: NetworkConnector>(connector: C) -> Self {
ClientBuilder::new().build(connector)
}
pub fn request<U>(&self, method: Method, uri: U) -> Result<RequestBuilder, Error>
where
Uri: TryFrom<U>,
<Uri as TryFrom<U>>::Error: Into<http::Error>,
{
let uri = uri.try_into().map_err(Into::into).map_err(Error::Http)?;
Ok(RequestBuilder {
client: self,
details: RequestDetails::new(method, uri),
})
}
define_method_fn!(get, GET);
define_method_fn!(head, HEAD);
define_method_fn!(post, POST);
define_method_fn!(patch, PATCH);
define_method_fn!(put, PUT);
define_method_fn!(delete, DELETE);
}
#[derive(Clone)]
pub struct ClientBuilder(AsyncClientBuilder);
impl ClientBuilder {
fn new() -> Self {
ClientBuilder(AsyncClientBuilder::new())
}
pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self {
self.0.pool_max_idle_per_host(max_idle);
self
}
pub fn pool_idle_timeout(&mut self, val: Option<Duration>) -> &mut Self {
self.0.pool_idle_timeout(val);
self
}
pub fn build<C: NetworkConnector>(&self, connector: C) -> Client {
let async_client = self.0.build(connector);
let (tx, mut rx) = mpsc::unbounded_channel::<(RequestDetails, ResponseSender)>();
let thread = thread::spawn(move || {
let rt = runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async move {
while let Some((req_details, resp_tx)) = rx.recv().await {
let async_client = async_client.clone();
tokio::spawn(async move {
match req_details.send(&async_client).await {
Ok(resp) => {
let (parts, hyper_body) = resp.into_parts();
let (fut, body) = Body::new(hyper_body);
let _ = resp_tx.send(Ok(Response::from_parts(parts, body)));
fut.await;
}
Err(e) => {
let _: Result<_, _> = resp_tx.send(Err(e));
}
}
});
}
})
});
Client {
inner: Arc::new(ClientInner {
tx: Some(tx),
thread: Some(thread),
}),
}
}
}
pub struct RequestBuilder<'a> {
client: &'a Client,
details: RequestDetails,
}
impl<'a> RequestBuilder<'a> {
pub fn body<B: Into<SharedBody>>(mut self, body: B) -> Self {
self.details.body = Some(body.into());
self
}
pub fn headers(mut self, headers: HeaderMap) -> Self {
self.details.headers = headers;
self
}
pub fn header<H: Header>(mut self, header: H) -> Self {
self.details.headers.typed_insert(header);
self
}
pub fn send(self) -> Result<Response, Error> {
let RequestBuilder { client, details } = self;
let (tx, rx) = oneshot::channel();
client
.inner
.tx
.as_ref()
.expect("runtime thread exited early")
.send((details, tx))
.expect("runtime thread panicked");
block_on(async move {
match rx.await {
Ok(res) => res,
Err(_) => panic!("event loop panicked"),
}
})
.map(|mut resp| {
resp.body_mut().keep_client_alive = KeepClientAlive(Some(client.inner.clone()));
resp
})
}
}
pub(super) struct KeepClientAlive(#[allow(unused)] Option<Arc<ClientInner>>);
impl KeepClientAlive {
pub fn empty() -> Self {
KeepClientAlive(None)
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::connector::HttpConnector;
use headers::ContentType;
use hyper::StatusCode;
use std::io::{Read, Write};
use std::net::{SocketAddr, TcpListener};
use std::thread;
const RESPONSE_OK: &str = "HTTP/1.1 200 OK\r\nContent-Length: 13\r\n\r\nHello, world!\r\n";
const RESPONSE_404: &str =
"HTTP/1.1 404 Not Found\r\nContent-Length: 23\r\n\r\nResource was not found.\r\n";
fn test_http_server(resp: &'static str) -> SocketAddr {
let listener = TcpListener::bind("127.0.0.1:0").unwrap();
let addr = listener.local_addr().unwrap();
thread::spawn(move || {
let (mut stream, _) = listener.accept().unwrap();
let mut input = Vec::new();
stream.read(&mut input).unwrap();
stream.write_all(resp.as_bytes()).unwrap();
});
addr
}
#[test]
fn http_client_ok() {
let addr = test_http_server(RESPONSE_OK);
let url = format!("http://{}/", addr);
let connector = HttpConnector::new();
let client = Client::with_connector(connector);
let mut response = client
.request(Method::POST, url)
.unwrap()
.header(ContentType::json())
.body(r#"{"key":"value"}"#)
.send()
.unwrap();
assert_eq!(response.status(), StatusCode::OK);
let mut body = String::new();
response.body_mut().read_to_string(&mut body).unwrap();
assert_eq!(body, "Hello, world!");
}
#[test]
fn drop_client_before_response() {
let addr = test_http_server(RESPONSE_404);
let url = format!("http://{}/", addr);
let connector = HttpConnector::new();
let client = Client::with_connector(connector);
let mut response = client.get(url).unwrap().send().unwrap();
drop(client);
assert_eq!(response.status(), StatusCode::NOT_FOUND);
assert_eq!(response.headers().len(), 1);
let mut body = String::new();
response.body_mut().read_to_string(&mut body).unwrap();
assert_eq!(body, "Resource was not found.");
}
}