use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use std::thread;
use futures::{Future, Stream};
use futures::sync::{mpsc, oneshot};
use request::{self, Request, RequestBuilder};
use response::{self, Response};
use {async_impl, header, Certificate, Identity, Method, IntoUrl, Proxy, RedirectPolicy, wait};
#[derive(Clone)]
pub struct Client {
inner: ClientHandle,
}
pub struct ClientBuilder {
inner: async_impl::ClientBuilder,
timeout: Timeout,
}
impl ClientBuilder {
pub fn new() -> ClientBuilder {
ClientBuilder {
inner: async_impl::ClientBuilder::new(),
timeout: Timeout::default(),
}
}
pub fn build(&mut self) -> ::Result<Client> {
ClientHandle::new(self).map(|handle| Client {
inner: handle,
})
}
pub fn add_root_certificate(&mut self, cert: Certificate) -> &mut ClientBuilder {
self.inner.add_root_certificate(cert);
self
}
pub fn identity(&mut self, identity: Identity) -> &mut ClientBuilder {
self.inner.identity(identity);
self
}
#[inline]
pub fn danger_disable_hostname_verification(&mut self) -> &mut ClientBuilder {
self.inner.danger_disable_hostname_verification();
self
}
#[inline]
pub fn enable_hostname_verification(&mut self) -> &mut ClientBuilder {
self.inner.enable_hostname_verification();
self
}
#[inline]
pub fn default_headers(&mut self, headers: header::Headers) -> &mut ClientBuilder {
self.inner.default_headers(headers);
self
}
#[inline]
pub fn gzip(&mut self, enable: bool) -> &mut ClientBuilder {
self.inner.gzip(enable);
self
}
#[inline]
pub fn proxy(&mut self, proxy: Proxy) -> &mut ClientBuilder {
self.inner.proxy(proxy);
self
}
#[inline]
pub fn redirect(&mut self, policy: RedirectPolicy) -> &mut ClientBuilder {
self.inner.redirect(policy);
self
}
#[inline]
pub fn referer(&mut self, enable: bool) -> &mut ClientBuilder {
self.inner.referer(enable);
self
}
#[inline]
pub fn timeout<T>(&mut self, timeout: T) -> &mut ClientBuilder
where T: Into<Option<Duration>>,
{
self.timeout = Timeout(timeout.into());
self
}
}
impl Client {
#[inline]
pub fn new() -> Client {
ClientBuilder::new()
.build()
.expect("Client failed to initialize")
}
#[inline]
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Get, url)
}
pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Post, url)
}
pub fn put<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Put, url)
}
pub fn patch<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Patch, url)
}
pub fn delete<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Delete, url)
}
pub fn head<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Head, url)
}
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
let req = match url.into_url() {
Ok(url) => Ok(Request::new(method, url)),
Err(err) => Err(::error::from(err)),
};
request::builder(self.clone(), req)
}
pub fn execute(&self, request: Request) -> ::Result<Response> {
self.inner.execute_request(request)
}
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Client")
.finish()
}
}
impl fmt::Debug for ClientBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ClientBuilder")
.finish()
}
}
#[derive(Clone)]
struct ClientHandle {
timeout: Timeout,
inner: Arc<InnerClientHandle>
}
type ThreadSender = mpsc::UnboundedSender<(async_impl::Request, oneshot::Sender<::Result<async_impl::Response>>)>;
struct InnerClientHandle {
tx: Option<ThreadSender>,
thread: Option<thread::JoinHandle<()>>
}
impl Drop for InnerClientHandle {
fn drop(&mut self) {
self.tx.take();
self.thread.take().map(|h| h.join());
}
}
impl ClientHandle {
fn new(builder: &mut ClientBuilder) -> ::Result<ClientHandle> {
let timeout = builder.timeout;
let mut builder = async_impl::client::take_builder(&mut builder.inner);
let (tx, rx) = mpsc::unbounded();
let (spawn_tx, spawn_rx) = oneshot::channel::<::Result<()>>();
let handle = try_!(thread::Builder::new().name("reqwest-internal-sync-core".into()).spawn(move || {
use tokio_core::reactor::Core;
let built = (|| {
let core = try_!(Core::new());
let handle = core.handle();
let client = builder.build(&handle)?;
Ok((core, handle, client))
})();
let (mut core, handle, client) = match built {
Ok((a, b, c)) => {
if let Err(_) = spawn_tx.send(Ok(())) {
return;
}
(a, b, c)
},
Err(e) => {
let _ = spawn_tx.send(Err(e));
return;
}
};
let work = rx.for_each(|(req, tx)| {
let tx: oneshot::Sender<::Result<async_impl::Response>> = tx;
let task = client.execute(req)
.then(move |x| tx.send(x).map_err(|_| ()));
handle.spawn(task);
Ok(())
});
let _ = core.run(work);
}));
wait::timeout(spawn_rx, timeout.0).expect("core thread cancelled")?;
let inner_handle = Arc::new(InnerClientHandle {
tx: Some(tx),
thread: Some(handle)
});
Ok(ClientHandle {
timeout: timeout,
inner: inner_handle,
})
}
fn execute_request(&self, req: Request) -> ::Result<Response> {
let (tx, rx) = oneshot::channel();
let (req, body) = request::async(req);
let url = req.url().clone();
self.inner.tx
.as_ref()
.expect("core thread exited early")
.unbounded_send((req, tx))
.expect("core thread panicked");
if let Some(body) = body {
try_!(body.send(), &url);
}
let res = match wait::timeout(rx, self.timeout.0) {
Ok(res) => res,
Err(wait::Waited::TimedOut) => return Err(::error::timedout(Some(url))),
Err(wait::Waited::Err(_canceled)) => {
panic!("core thread panicked");
}
};
res.map(|res| {
response::new(res, self.timeout.0, KeepCoreThreadAlive(self.inner.clone()))
})
}
}
#[derive(Clone, Copy)]
struct Timeout(Option<Duration>);
impl Default for Timeout {
#[inline]
fn default() -> Timeout {
Timeout(Some(Duration::from_secs(30)))
}
}
pub struct KeepCoreThreadAlive(Arc<InnerClientHandle>);