use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use std::thread;
use std::net::IpAddr;
use futures::{Async, Future, Stream};
use futures::future::{self, Either};
use futures::sync::{mpsc, oneshot};
use request::{Request, RequestBuilder};
use response::Response;
use {async_impl, header, Method, IntoUrl, Proxy, RedirectPolicy, wait};
#[cfg(feature = "tls")]
use {Certificate, Identity};
#[derive(Clone)]
pub struct Client {
inner: ClientHandle,
}
pub struct ClientBuilder {
inner: async_impl::ClientBuilder,
timeout: Timeout,
}
impl ClientBuilder {
pub fn new() -> ClientBuilder {
ClientBuilder {
inner: async_impl::ClientBuilder::new(),
timeout: Timeout::default(),
}
}
pub fn build(self) -> ::Result<Client> {
ClientHandle::new(self).map(|handle| Client {
inner: handle,
})
}
#[cfg(feature = "default-tls")]
pub fn use_default_tls(self) -> ClientBuilder {
self.with_inner(move |inner| inner.use_default_tls())
}
#[cfg(feature = "rustls-tls")]
pub fn use_rustls_tls(self) -> ClientBuilder {
self.with_inner(move |inner| inner.use_rustls_tls())
}
#[cfg(feature = "tls")]
pub fn add_root_certificate(self, cert: Certificate) -> ClientBuilder {
self.with_inner(move |inner| inner.add_root_certificate(cert))
}
#[cfg(feature = "tls")]
pub fn identity(self, identity: Identity) -> ClientBuilder {
self.with_inner(move |inner| inner.identity(identity))
}
#[cfg(feature = "default-tls")]
pub fn danger_accept_invalid_hostnames(self, accept_invalid_hostname: bool) -> ClientBuilder {
self.with_inner(|inner| inner.danger_accept_invalid_hostnames(accept_invalid_hostname))
}
#[cfg(feature = "tls")]
pub fn danger_accept_invalid_certs(self, accept_invalid_certs: bool) -> ClientBuilder {
self.with_inner(|inner| inner.danger_accept_invalid_certs(accept_invalid_certs))
}
pub fn default_headers(self, headers: header::HeaderMap) -> ClientBuilder {
self.with_inner(move |inner| inner.default_headers(headers))
}
pub fn gzip(self, enable: bool) -> ClientBuilder {
self.with_inner(|inner| inner.gzip(enable))
}
pub fn proxy(self, proxy: Proxy) -> ClientBuilder {
self.with_inner(move |inner| inner.proxy(proxy))
}
pub fn redirect(self, policy: RedirectPolicy) -> ClientBuilder {
self.with_inner(move |inner| inner.redirect(policy))
}
pub fn referer(self, enable: bool) -> ClientBuilder {
self.with_inner(|inner| inner.referer(enable))
}
pub fn timeout<T>(mut self, timeout: T) -> ClientBuilder
where T: Into<Option<Duration>>,
{
self.timeout = Timeout(timeout.into());
self
}
fn with_inner<F>(mut self, func: F) -> ClientBuilder
where
F: FnOnce(async_impl::ClientBuilder) -> async_impl::ClientBuilder,
{
self.inner = func(self.inner);
self
}
pub fn h2_prior_knowledge(self) -> ClientBuilder {
self.with_inner(|inner| inner.h2_prior_knowledge())
}
pub fn local_address<T>(self, addr: T) -> ClientBuilder
where
T: Into<Option<IpAddr>>,
{
self.with_inner(move |inner| inner.local_address(addr))
}
}
impl Client {
pub fn new() -> Client {
ClientBuilder::new()
.build()
.expect("Client::new()")
}
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::GET, url)
}
pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::POST, url)
}
pub fn put<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::PUT, url)
}
pub fn patch<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::PATCH, url)
}
pub fn delete<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::DELETE, url)
}
pub fn head<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::HEAD, url)
}
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
let req = url
.into_url()
.map(move |url| Request::new(method, url));
RequestBuilder::new(self.clone(), req)
}
pub fn execute(&self, request: Request) -> ::Result<Response> {
self.inner.execute_request(request)
}
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Client")
.finish()
}
}
impl fmt::Debug for ClientBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ClientBuilder")
.finish()
}
}
#[derive(Clone)]
struct ClientHandle {
timeout: Timeout,
inner: Arc<InnerClientHandle>
}
type ThreadSender = mpsc::UnboundedSender<(async_impl::Request, oneshot::Sender<::Result<async_impl::Response>>)>;
struct InnerClientHandle {
tx: Option<ThreadSender>,
thread: Option<thread::JoinHandle<()>>
}
impl Drop for InnerClientHandle {
fn drop(&mut self) {
self.tx.take();
self.thread.take().map(|h| h.join());
}
}
impl ClientHandle {
fn new(builder: ClientBuilder) -> ::Result<ClientHandle> {
let timeout = builder.timeout;
let builder = builder.inner;
let (tx, rx) = mpsc::unbounded();
let (spawn_tx, spawn_rx) = oneshot::channel::<::Result<()>>();
let handle = try_!(thread::Builder::new().name("reqwest-internal-sync-runtime".into()).spawn(move || {
use tokio::runtime::current_thread::Runtime;
let built = (|| {
let rt = try_!(Runtime::new());
let client = builder.build()?;
Ok((rt, client))
})();
let (mut rt, client) = match built {
Ok((rt, c)) => {
if let Err(_) = spawn_tx.send(Ok(())) {
return;
}
(rt, c)
},
Err(e) => {
let _ = spawn_tx.send(Err(e));
return;
}
};
let work = rx.for_each(move |(req, tx)| {
let mut tx_opt: Option<oneshot::Sender<::Result<async_impl::Response>>> = Some(tx);
let mut res_fut = client.execute(req);
let task = future::poll_fn(move || {
let canceled = tx_opt
.as_mut()
.expect("polled after complete")
.poll_cancel()
.expect("poll_cancel cannot error")
.is_ready();
if canceled {
trace!("response receiver is canceled");
Ok(Async::Ready(()))
} else {
let result = match res_fut.poll() {
Ok(Async::NotReady) => return Ok(Async::NotReady),
Ok(Async::Ready(res)) => Ok(res),
Err(err) => Err(err),
};
let _ = tx_opt
.take()
.expect("polled after complete")
.send(result);
Ok(Async::Ready(()))
}
});
::tokio::spawn(task);
Ok(())
});
rt.block_on(work)
.expect("runtime unexpected error");
}));
match spawn_rx.wait() {
Ok(Ok(())) => (),
Ok(Err(err)) => return Err(err),
Err(_canceled) => event_loop_panicked(),
}
let inner_handle = Arc::new(InnerClientHandle {
tx: Some(tx),
thread: Some(handle)
});
Ok(ClientHandle {
timeout: timeout,
inner: inner_handle,
})
}
fn execute_request(&self, req: Request) -> ::Result<Response> {
let (tx, rx) = oneshot::channel();
let (req, body) = req.into_async();
let url = req.url().clone();
self.inner.tx
.as_ref()
.expect("core thread exited early")
.unbounded_send((req, tx))
.expect("core thread panicked");
let write = if let Some(body) = body {
Either::A(body.send())
} else {
Either::B(future::ok(()))
};
let rx = rx.map_err(|_canceled| event_loop_panicked());
let fut = write.join(rx).map(|((), res)| res);
let res = match wait::timeout(fut, self.timeout.0) {
Ok(res) => res,
Err(wait::Waited::TimedOut) => return Err(::error::timedout(Some(url))),
Err(wait::Waited::Err(err)) => {
return Err(err.with_url(url));
}
};
res.map(|res| {
Response::new(res, self.timeout.0, KeepCoreThreadAlive(Some(self.inner.clone())))
})
}
}
#[derive(Clone, Copy)]
struct Timeout(Option<Duration>);
impl Default for Timeout {
fn default() -> Timeout {
Timeout(Some(Duration::from_secs(30)))
}
}
pub(crate) struct KeepCoreThreadAlive(Option<Arc<InnerClientHandle>>);
impl KeepCoreThreadAlive {
pub(crate) fn empty() -> KeepCoreThreadAlive {
KeepCoreThreadAlive(None)
}
}
#[cold]
#[inline(never)]
fn event_loop_panicked() -> ! {
panic!("event loop thread panicked");
}