use std::fmt;
use std::sync::Arc;
use std::time::Duration;
use bytes::Bytes;
use futures::{Async, Future, Poll};
use hyper::client::FutureResponse;
use hyper::header::{Headers, Location, Referer, UserAgent, Accept, Encoding,
AcceptEncoding, Range, qitem};
use native_tls::{TlsConnector, TlsConnectorBuilder};
use tokio_core::reactor::Handle;
use super::body;
use super::request::{self, Request, RequestBuilder};
use super::response::{self, Response};
use connect::Connector;
use into_url::to_uri;
use redirect::{self, RedirectPolicy, check_redirect, remove_sensitive_headers};
use {Certificate, Identity, IntoUrl, Method, proxy, Proxy, StatusCode, Url};
static DEFAULT_USER_AGENT: &'static str =
concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"));
#[derive(Clone)]
pub struct Client {
inner: Arc<ClientRef>,
}
pub struct ClientBuilder {
config: Option<Config>,
err: Option<::Error>,
}
struct Config {
gzip: bool,
headers: Headers,
hostname_verification: bool,
proxies: Vec<Proxy>,
redirect_policy: RedirectPolicy,
referer: bool,
timeout: Option<Duration>,
tls: TlsConnectorBuilder,
dns_threads: usize,
}
impl ClientBuilder {
pub fn new() -> ClientBuilder {
match TlsConnector::builder() {
Ok(tls_connector_builder) => {
let mut headers = Headers::with_capacity(2);
headers.set(UserAgent::new(DEFAULT_USER_AGENT));
headers.set(Accept::star());
ClientBuilder {
config: Some(Config {
gzip: true,
headers: headers,
hostname_verification: true,
proxies: Vec::new(),
redirect_policy: RedirectPolicy::default(),
referer: true,
timeout: None,
tls: tls_connector_builder,
dns_threads: 4,
}),
err: None,
}
},
Err(e) => ClientBuilder {
config: None,
err: Some(::error::from(e)),
}
}
}
pub fn build(&mut self, handle: &Handle) -> ::Result<Client> {
if let Some(err) = self.err.take() {
return Err(err);
}
let config = self.config
.take()
.expect("ClientBuilder cannot be reused after building a Client");
let tls = try_!(config.tls.build());
let proxies = Arc::new(config.proxies);
let mut connector = Connector::new(config.dns_threads, tls, proxies.clone(), handle);
if !config.hostname_verification {
connector.danger_disable_hostname_verification();
}
let hyper_client = ::hyper::Client::configure()
.connector(connector)
.build(handle);
Ok(Client {
inner: Arc::new(ClientRef {
gzip: config.gzip,
hyper: hyper_client,
headers: config.headers,
proxies: proxies,
redirect_policy: config.redirect_policy,
referer: config.referer,
}),
})
}
pub fn add_root_certificate(&mut self, cert: Certificate) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
let cert = ::tls::cert(cert);
if let Err(e) = config.tls.add_root_certificate(cert) {
self.err = Some(::error::from(e));
}
}
self
}
pub fn identity(&mut self, identity: Identity) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
let pkcs12 = ::tls::pkcs12(identity);
if let Err(e) = config.tls.identity(pkcs12) {
self.err = Some(::error::from(e));
}
}
self
}
#[inline]
pub fn danger_disable_hostname_verification(&mut self) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
config.hostname_verification = false;
}
self
}
#[inline]
pub fn enable_hostname_verification(&mut self) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
config.hostname_verification = true;
}
self
}
#[inline]
pub fn default_headers(&mut self, headers: Headers) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
config.headers.extend(headers.iter());
}
self
}
#[inline]
pub fn gzip(&mut self, enable: bool) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
config.gzip = enable;
}
self
}
#[inline]
pub fn proxy(&mut self, proxy: Proxy) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
config.proxies.push(proxy);
}
self
}
#[inline]
pub fn redirect(&mut self, policy: RedirectPolicy) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
config.redirect_policy = policy;
}
self
}
#[inline]
pub fn referer(&mut self, enable: bool) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
config.referer = enable;
}
self
}
#[inline]
pub fn timeout(&mut self, timeout: Duration) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
config.timeout = Some(timeout);
}
self
}
#[inline]
pub fn dns_threads(&mut self, threads: usize) -> &mut ClientBuilder {
if let Some(config) = config_mut(&mut self.config, &self.err) {
config.dns_threads = threads;
}
self
}
}
fn config_mut<'a>(config: &'a mut Option<Config>, err: &Option<::Error>) -> Option<&'a mut Config> {
if err.is_some() {
None
} else {
config.as_mut()
}
}
type HyperClient = ::hyper::Client<Connector>;
impl Client {
#[inline]
pub fn new(handle: &Handle) -> Client {
ClientBuilder::new()
.build(handle)
.expect("TLS failed to initialize")
}
#[inline]
pub fn builder() -> ClientBuilder {
ClientBuilder::new()
}
pub fn get<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Get, url)
}
pub fn post<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Post, url)
}
pub fn put<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Put, url)
}
pub fn patch<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Patch, url)
}
pub fn delete<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Delete, url)
}
pub fn head<U: IntoUrl>(&self, url: U) -> RequestBuilder {
self.request(Method::Head, url)
}
pub fn request<U: IntoUrl>(&self, method: Method, url: U) -> RequestBuilder {
let req = match url.into_url() {
Ok(url) => Ok(Request::new(method, url)),
Err(err) => Err(::error::from(err)),
};
request::builder(self.clone(), req)
}
pub fn execute(&self, request: Request) -> Pending {
self.execute_request(request)
}
fn execute_request(&self, req: Request) -> Pending {
let (
method,
url,
user_headers,
body
) = request::pieces(req);
let mut headers = self.inner.headers.clone(); headers.extend(user_headers.iter());
if self.inner.gzip &&
!headers.has::<AcceptEncoding>() &&
!headers.has::<Range>() {
headers.set(AcceptEncoding(vec![qitem(Encoding::Gzip)]));
}
let uri = to_uri(&url);
let mut req = ::hyper::Request::new(method.clone(), uri.clone());
*req.headers_mut() = headers.clone();
let body = body.map(|body| {
let (reusable, body) = body::into_hyper(body);
req.set_body(body);
reusable
});
if proxy::is_proxied(&self.inner.proxies, &url) {
if uri.scheme() == Some("http") {
req.set_proxy(true);
}
}
let in_flight = self.inner.hyper.request(req);
Pending {
inner: PendingInner::Request(PendingRequest {
method: method,
url: url,
headers: headers,
body: body,
urls: Vec::new(),
client: self.inner.clone(),
in_flight: in_flight,
}),
}
}
}
impl fmt::Debug for Client {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Client")
.field("gzip", &self.inner.gzip)
.field("redirect_policy", &self.inner.redirect_policy)
.field("referer", &self.inner.referer)
.finish()
}
}
impl fmt::Debug for ClientBuilder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("ClientBuilder")
.finish()
}
}
struct ClientRef {
gzip: bool,
headers: Headers,
hyper: HyperClient,
proxies: Arc<Vec<Proxy>>,
redirect_policy: RedirectPolicy,
referer: bool,
}
pub struct Pending {
inner: PendingInner,
}
enum PendingInner {
Request(PendingRequest),
Error(Option<::Error>),
}
pub struct PendingRequest {
method: Method,
url: Url,
headers: Headers,
body: Option<Option<Bytes>>,
urls: Vec<Url>,
client: Arc<ClientRef>,
in_flight: FutureResponse,
}
impl Future for Pending {
type Item = Response;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.inner {
PendingInner::Request(ref mut req) => req.poll(),
PendingInner::Error(ref mut err) => Err(err.take().expect("Pending error polled more than once")),
}
}
}
impl Future for PendingRequest {
type Item = Response;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
let res = match try_!(self.in_flight.poll(), &self.url) {
Async::Ready(res) => res,
Async::NotReady => return Ok(Async::NotReady),
};
let should_redirect = match res.status() {
StatusCode::MovedPermanently |
StatusCode::Found |
StatusCode::SeeOther => {
self.body = None;
match self.method {
Method::Get | Method::Head => {},
_ => {
self.method = Method::Get;
}
}
true
},
StatusCode::TemporaryRedirect |
StatusCode::PermanentRedirect => match self.body {
Some(Some(_)) | None => true,
Some(None) => false,
},
_ => false,
};
if should_redirect {
let loc = res.headers()
.get::<Location>()
.map(|loc| self.url.join(loc));
if let Some(Ok(loc)) = loc {
if self.client.referer {
if let Some(referer) = make_referer(&loc, &self.url) {
self.headers.set(referer);
}
}
self.urls.push(self.url.clone());
let action = check_redirect(
&self.client.redirect_policy,
res.status(),
&loc,
&self.urls,
);
match action {
redirect::Action::Follow => {
self.url = loc;
remove_sensitive_headers(&mut self.headers, &self.url, &self.urls);
debug!("redirecting to {:?} '{}'", self.method, self.url);
let uri = to_uri(&self.url);
let mut req = ::hyper::Request::new(
self.method.clone(),
uri.clone()
);
*req.headers_mut() = self.headers.clone();
if let Some(Some(ref body)) = self.body {
req.set_body(body.clone());
}
if proxy::is_proxied(&self.client.proxies, &self.url) {
if uri.scheme() == Some("http") {
req.set_proxy(true);
}
}
self.in_flight = self.client.hyper.request(req);
continue;
},
redirect::Action::Stop => {
debug!("redirect_policy disallowed redirection to '{}'", loc);
},
redirect::Action::LoopDetected => {
return Err(::error::loop_detected(self.url.clone()));
},
redirect::Action::TooManyRedirects => {
return Err(::error::too_many_redirects(self.url.clone()));
}
}
} else if let Some(Err(e)) = loc {
debug!("Location header had invalid URI: {:?}", e);
}
}
let res = response::new(res, self.url.clone(), self.client.gzip);
return Ok(Async::Ready(res));
}
}
}
impl fmt::Debug for Pending {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self.inner {
PendingInner::Request(ref req) => {
f.debug_struct("Pending")
.field("method", &req.method)
.field("url", &req.url)
.finish()
},
PendingInner::Error(ref err) => {
f.debug_struct("Pending")
.field("error", err)
.finish()
}
}
}
}
fn make_referer(next: &Url, previous: &Url) -> Option<Referer> {
if next.scheme() == "http" && previous.scheme() == "https" {
return None;
}
let mut referer = previous.clone();
let _ = referer.set_username("");
let _ = referer.set_password(None);
referer.set_fragment(None);
Some(Referer::new(referer.into_string()))
}
pub fn take_builder(builder: &mut ClientBuilder) -> ClientBuilder {
use std::mem;
mem::replace(builder, ClientBuilder { config: None, err: None })
}
pub fn pending_err(err: ::Error) -> Pending {
Pending {
inner: PendingInner::Error(Some(err)),
}
}