use std::fmt;
use std::mem;
use std::sync::Arc;
use std::time::Duration;
use futures::{Async, Future, Poll};
use futures::future::{self, Either, Executor};
use futures::sync::oneshot;
use http::{Method, Request, Response, Uri, Version};
use http::header::{Entry, HeaderValue, HOST};
use http::uri::Scheme;
use body::{Body, Payload};
use common::Exec;
use self::connect::{Connect, Destination};
use self::pool::{Pool, Poolable, Reservation};
#[cfg(feature = "runtime")] pub use self::connect::HttpConnector;
pub mod conn;
pub mod connect;
pub(crate) mod dispatch;
#[cfg(feature = "runtime")] mod dns;
mod pool;
#[cfg(test)]
mod tests;
pub struct Client<C, B = Body> {
connector: Arc<C>,
executor: Exec,
h1_writev: bool,
h1_title_case_headers: bool,
pool: Pool<PoolClient<B>>,
retry_canceled_requests: bool,
set_host: bool,
ver: Ver,
}
#[cfg(feature = "runtime")]
impl Client<HttpConnector, Body> {
#[inline]
pub fn new() -> Client<HttpConnector, Body> {
Builder::default().build_http()
}
}
#[cfg(feature = "runtime")]
impl Default for Client<HttpConnector, Body> {
fn default() -> Client<HttpConnector, Body> {
Client::new()
}
}
impl Client<(), Body> {
#[inline]
pub fn builder() -> Builder {
Builder::default()
}
}
impl<C, B> Client<C, B>
where C: Connect + Sync + 'static,
C::Transport: 'static,
C::Future: 'static,
B: Payload + Send + 'static,
B::Data: Send,
{
pub fn get(&self, uri: Uri) -> ResponseFuture
where
B: Default,
{
let body = B::default();
if !body.is_end_stream() {
warn!("default Payload used for get() does not return true for is_end_stream");
}
let mut req = Request::new(body);
*req.uri_mut() = uri;
self.request(req)
}
pub fn request(&self, mut req: Request<B>) -> ResponseFuture {
let is_http_11 = self.ver == Ver::Http1 && match req.version() {
Version::HTTP_11 => true,
Version::HTTP_10 => false,
other => {
error!("Request has unsupported version \"{:?}\"", other);
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_version())));
}
};
let is_http_connect = req.method() == &Method::CONNECT;
if !is_http_11 && is_http_connect {
debug!("client does not support CONNECT requests for {:?}", req.version());
return ResponseFuture::new(Box::new(future::err(::Error::new_user_unsupported_request_method())));
}
let uri = req.uri().clone();
let domain = match (uri.scheme_part(), uri.authority_part()) {
(Some(scheme), Some(auth)) => {
format!("{}://{}", scheme, auth)
}
(None, Some(auth)) if is_http_connect => {
let scheme = match auth.port() {
Some(443) => {
set_scheme(req.uri_mut(), Scheme::HTTPS);
"https"
},
_ => {
set_scheme(req.uri_mut(), Scheme::HTTP);
"http"
},
};
format!("{}://{}", scheme, auth)
},
_ => {
debug!("Client requires absolute-form URIs, received: {:?}", uri);
return ResponseFuture::new(Box::new(future::err(::Error::new_user_absolute_uri_required())))
}
};
if self.set_host && self.ver == Ver::Http1 {
if let Entry::Vacant(entry) = req.headers_mut().entry(HOST).expect("HOST is always valid header name") {
let hostname = uri.host().expect("authority implies host");
let host = if let Some(port) = uri.port() {
let s = format!("{}:{}", hostname, port);
HeaderValue::from_str(&s)
} else {
HeaderValue::from_str(hostname)
}.expect("uri host is valid header value");
entry.insert(host);
}
}
let client = self.clone();
let uri = req.uri().clone();
let fut = RetryableSendRequest {
client: client,
future: self.send_request(req, &domain),
domain: domain,
uri: uri,
};
ResponseFuture::new(Box::new(fut))
}
fn send_request(&self, mut req: Request<B>, domain: &str) -> Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send> {
let url = req.uri().clone();
let ver = self.ver;
let pool_key = (Arc::new(domain.to_string()), self.ver);
let checkout = self.pool.checkout(pool_key.clone());
let connect = {
let executor = self.executor.clone();
let pool = self.pool.clone();
let h1_writev = self.h1_writev;
let h1_title_case_headers = self.h1_title_case_headers;
let connector = self.connector.clone();
let dst = Destination {
uri: url,
};
future::lazy(move || {
if let Some(connecting) = pool.connecting(&pool_key) {
Either::A(connector.connect(dst)
.map_err(::Error::new_connect)
.and_then(move |(io, connected)| {
conn::Builder::new()
.exec(executor.clone())
.h1_writev(h1_writev)
.h1_title_case_headers(h1_title_case_headers)
.http2_only(pool_key.1 == Ver::Http2)
.handshake(io)
.and_then(move |(tx, conn)| {
let bg = executor.execute(conn.map_err(|e| {
debug!("client connection error: {}", e)
}));
if let Err(err) = bg {
warn!("error spawning critical client task: {}", err);
return Either::A(future::err(err));
}
Either::B(tx.when_ready())
})
.map(move |tx| {
pool.pooled(connecting, PoolClient {
is_proxied: connected.is_proxied,
tx: match ver {
Ver::Http1 => PoolTx::Http1(tx),
Ver::Http2 => PoolTx::Http2(tx.into_http2()),
},
})
})
}))
} else {
let canceled = ::Error::new_canceled(Some("HTTP/2 connection in progress"));
Either::B(future::err(canceled))
}
})
};
let race = checkout.select(connect)
.map(|(pooled, _work)| pooled)
.or_else(|(e, other)| {
if e.is_canceled() {
Either::A(other.map_err(ClientError::Normal))
} else {
Either::B(future::err(ClientError::Normal(e)))
}
});
let executor = self.executor.clone();
let resp = race.and_then(move |mut pooled| {
let conn_reused = pooled.is_reused();
if ver == Ver::Http1 {
if req.method() == &Method::CONNECT {
authority_form(req.uri_mut());
} else if pooled.is_proxied {
absolute_form(req.uri_mut());
} else {
origin_form(req.uri_mut());
};
} else {
debug_assert!(
req.method() != &Method::CONNECT,
"Client should have returned Error for HTTP2 CONNECT"
);
}
let fut = pooled.send_request_retryable(req);
if pooled.is_closed() {
drop(pooled);
let fut = fut
.map_err(move |(err, orig_req)| {
if let Some(req) = orig_req {
ClientError::Canceled {
connection_reused: conn_reused,
reason: err,
req,
}
} else {
ClientError::Normal(err)
}
});
Either::A(fut)
} else {
let fut = fut
.map_err(move |(err, orig_req)| {
if let Some(req) = orig_req {
ClientError::Canceled {
connection_reused: conn_reused,
reason: err,
req,
}
} else {
ClientError::Normal(err)
}
})
.and_then(move |mut res| {
if ver == Ver::Http2 || !pooled.is_pool_enabled() || pooled.is_ready() {
drop(pooled);
} else if !res.body().is_end_stream() {
let (delayed_tx, delayed_rx) = oneshot::channel();
res.body_mut().delayed_eof(delayed_rx);
let on_idle = future::poll_fn(move || {
pooled.poll_ready()
})
.then(move |_| {
drop(delayed_tx);
Ok(())
});
if let Err(err) = executor.execute(on_idle) {
warn!("error spawning task to insert idle connection: {}", err);
}
} else {
let on_idle = future::poll_fn(move || {
pooled.poll_ready()
})
.then(|_| Ok(()));
if let Err(err) = executor.execute(on_idle) {
warn!("error spawning task to insert idle connection: {}", err);
}
}
Ok(res)
});
Either::B(fut)
}
});
Box::new(resp)
}
}
impl<C, B> Clone for Client<C, B> {
fn clone(&self) -> Client<C, B> {
Client {
connector: self.connector.clone(),
executor: self.executor.clone(),
h1_writev: self.h1_writev,
h1_title_case_headers: self.h1_title_case_headers,
pool: self.pool.clone(),
retry_canceled_requests: self.retry_canceled_requests,
set_host: self.set_host,
ver: self.ver,
}
}
}
impl<C, B> fmt::Debug for Client<C, B> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Client")
.finish()
}
}
#[must_use = "futures do nothing unless polled"]
pub struct ResponseFuture {
inner: Box<Future<Item=Response<Body>, Error=::Error> + Send>,
}
impl ResponseFuture {
fn new(fut: Box<Future<Item=Response<Body>, Error=::Error> + Send>) -> Self {
Self {
inner: fut,
}
}
}
impl fmt::Debug for ResponseFuture {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.pad("Future<Response>")
}
}
impl Future for ResponseFuture {
type Item = Response<Body>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
self.inner.poll()
}
}
struct RetryableSendRequest<C, B> {
client: Client<C, B>,
domain: String,
future: Box<Future<Item=Response<Body>, Error=ClientError<B>> + Send>,
uri: Uri,
}
impl<C, B> Future for RetryableSendRequest<C, B>
where
C: Connect + 'static,
C::Future: 'static,
B: Payload + Send + 'static,
B::Data: Send,
{
type Item = Response<Body>;
type Error = ::Error;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.future.poll() {
Ok(Async::Ready(resp)) => return Ok(Async::Ready(resp)),
Ok(Async::NotReady) => return Ok(Async::NotReady),
Err(ClientError::Normal(err)) => return Err(err),
Err(ClientError::Canceled {
connection_reused,
mut req,
reason,
}) => {
if !self.client.retry_canceled_requests || !connection_reused {
return Err(reason);
}
trace!("unstarted request canceled, trying again (reason={:?})", reason);
*req.uri_mut() = self.uri.clone();
self.future = self.client.send_request(req, &self.domain);
}
}
}
}
}
struct PoolClient<B> {
is_proxied: bool,
tx: PoolTx<B>,
}
enum PoolTx<B> {
Http1(conn::SendRequest<B>),
Http2(conn::Http2SendRequest<B>),
}
impl<B> PoolClient<B> {
fn poll_ready(&mut self) -> Poll<(), ::Error> {
match self.tx {
PoolTx::Http1(ref mut tx) => tx.poll_ready(),
PoolTx::Http2(_) => Ok(Async::Ready(())),
}
}
fn is_ready(&self) -> bool {
match self.tx {
PoolTx::Http1(ref tx) => tx.is_ready(),
PoolTx::Http2(ref tx) => tx.is_ready(),
}
}
fn is_closed(&self) -> bool {
match self.tx {
PoolTx::Http1(ref tx) => tx.is_closed(),
PoolTx::Http2(ref tx) => tx.is_closed(),
}
}
}
impl<B: Payload + 'static> PoolClient<B> {
fn send_request_retryable(&mut self, req: Request<B>) -> Box<Future<Item=Response<Body>, Error=(::Error, Option<Request<B>>)> + Send>
where
B: Send,
{
match self.tx {
PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req),
PoolTx::Http2(ref mut tx) => tx.send_request_retryable(req),
}
}
}
impl<B> Poolable for PoolClient<B>
where
B: Send + 'static,
{
fn is_open(&self) -> bool {
match self.tx {
PoolTx::Http1(ref tx) => tx.is_ready(),
PoolTx::Http2(ref tx) => tx.is_ready(),
}
}
fn reserve(self) -> Reservation<Self> {
match self.tx {
PoolTx::Http1(tx) => {
Reservation::Unique(PoolClient {
is_proxied: self.is_proxied,
tx: PoolTx::Http1(tx),
})
},
PoolTx::Http2(tx) => {
let b = PoolClient {
is_proxied: self.is_proxied,
tx: PoolTx::Http2(tx.clone()),
};
let a = PoolClient {
is_proxied: self.is_proxied,
tx: PoolTx::Http2(tx),
};
Reservation::Shared(a, b)
}
}
}
}
enum ClientError<B> {
Normal(::Error),
Canceled {
connection_reused: bool,
req: Request<B>,
reason: ::Error,
}
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
enum Ver {
Http1,
Http2,
}
fn origin_form(uri: &mut Uri) {
let path = match uri.path_and_query() {
Some(path) if path.as_str() != "/" => {
let mut parts = ::http::uri::Parts::default();
parts.path_and_query = Some(path.clone());
Uri::from_parts(parts).expect("path is valid uri")
},
_none_or_just_slash => {
debug_assert!(Uri::default() == "/");
Uri::default()
}
};
*uri = path
}
fn absolute_form(uri: &mut Uri) {
debug_assert!(uri.scheme_part().is_some(), "absolute_form needs a scheme");
debug_assert!(uri.authority_part().is_some(), "absolute_form needs an authority");
if uri.scheme_part() == Some(&Scheme::HTTPS) {
origin_form(uri);
}
}
fn authority_form(uri: &mut Uri) {
if log_enabled!(::log::Level::Warn) {
if let Some(path) = uri.path_and_query() {
if path != "/" {
warn!(
"HTTP/1.1 CONNECT request stripping path: {:?}",
path
);
}
}
}
*uri = match uri.authority_part() {
Some(auth) => {
let mut parts = ::http::uri::Parts::default();
parts.authority = Some(auth.clone());
Uri::from_parts(parts).expect("authority is valid")
},
None => {
unreachable!("authority_form with relative uri");
}
};
}
fn set_scheme(uri: &mut Uri, scheme: Scheme) {
debug_assert!(uri.scheme_part().is_none(), "set_scheme expects no existing scheme");
let old = mem::replace(uri, Uri::default());
let mut parts: ::http::uri::Parts = old.into();
parts.scheme = Some(scheme);
parts.path_and_query = Some("/".parse().expect("slash is a valid path"));
*uri = Uri::from_parts(parts).expect("scheme is valid");
}
#[derive(Clone)]
pub struct Builder {
exec: Exec,
keep_alive: bool,
keep_alive_timeout: Option<Duration>,
h1_writev: bool,
h1_title_case_headers: bool,
max_idle: usize,
retry_canceled_requests: bool,
set_host: bool,
ver: Ver,
}
impl Default for Builder {
fn default() -> Self {
Self {
exec: Exec::Default,
keep_alive: true,
keep_alive_timeout: Some(Duration::from_secs(90)),
h1_writev: true,
h1_title_case_headers: false,
max_idle: 5,
retry_canceled_requests: true,
set_host: true,
ver: Ver::Http1,
}
}
}
impl Builder {
#[inline]
pub fn keep_alive(&mut self, val: bool) -> &mut Self {
self.keep_alive = val;
self
}
#[inline]
pub fn keep_alive_timeout<D>(&mut self, val: D) -> &mut Self
where
D: Into<Option<Duration>>,
{
self.keep_alive_timeout = val.into();
self
}
#[inline]
pub fn http1_writev(&mut self, val: bool) -> &mut Self {
self.h1_writev = val;
self
}
pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self {
self.h1_title_case_headers = val;
self
}
pub fn http2_only(&mut self, val: bool) -> &mut Self {
self.ver = if val {
Ver::Http2
} else {
Ver::Http1
};
self
}
#[inline]
pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self {
self.retry_canceled_requests = val;
self
}
#[inline]
pub fn set_host(&mut self, val: bool) -> &mut Self {
self.set_host = val;
self
}
pub fn executor<E>(&mut self, exec: E) -> &mut Self
where
E: Executor<Box<Future<Item=(), Error=()> + Send>> + Send + Sync + 'static,
{
self.exec = Exec::Executor(Arc::new(exec));
self
}
#[cfg(feature = "runtime")]
pub fn build_http<B>(&self) -> Client<HttpConnector, B>
where
B: Payload + Send,
B::Data: Send,
{
let mut connector = HttpConnector::new(4);
if self.keep_alive {
connector.set_keepalive(self.keep_alive_timeout);
}
self.build(connector)
}
pub fn build<C, B>(&self, connector: C) -> Client<C, B>
where
C: Connect,
C::Transport: 'static,
C::Future: 'static,
B: Payload + Send,
B::Data: Send,
{
Client {
connector: Arc::new(connector),
executor: self.exec.clone(),
h1_writev: self.h1_writev,
h1_title_case_headers: self.h1_title_case_headers,
pool: Pool::new(self.keep_alive, self.keep_alive_timeout, &self.exec),
retry_canceled_requests: self.retry_canceled_requests,
set_host: self.set_host,
ver: self.ver,
}
}
}
impl fmt::Debug for Builder {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Builder")
.field("keep_alive", &self.keep_alive)
.field("keep_alive_timeout", &self.keep_alive_timeout)
.field("http1_writev", &self.h1_writev)
.field("max_idle", &self.max_idle)
.field("set_host", &self.set_host)
.field("version", &self.ver)
.finish()
}
}
#[cfg(test)]
mod unit_tests {
use super::*;
#[test]
fn set_relative_uri_with_implicit_path() {
let mut uri = "http://hyper.rs".parse().unwrap();
origin_form(&mut uri);
assert_eq!(uri.to_string(), "/");
}
#[test]
fn test_origin_form() {
let mut uri = "http://hyper.rs/guides".parse().unwrap();
origin_form(&mut uri);
assert_eq!(uri.to_string(), "/guides");
let mut uri = "http://hyper.rs/guides?foo=bar".parse().unwrap();
origin_form(&mut uri);
assert_eq!(uri.to_string(), "/guides?foo=bar");
}
#[test]
fn test_absolute_form() {
let mut uri = "http://hyper.rs/guides".parse().unwrap();
absolute_form(&mut uri);
assert_eq!(uri.to_string(), "http://hyper.rs/guides");
let mut uri = "https://hyper.rs/guides".parse().unwrap();
absolute_form(&mut uri);
assert_eq!(uri.to_string(), "/guides");
}
#[test]
fn test_authority_form() {
extern crate pretty_env_logger;
let _ = pretty_env_logger::try_init();
let mut uri = "http://hyper.rs".parse().unwrap();
authority_form(&mut uri);
assert_eq!(uri.to_string(), "hyper.rs");
let mut uri = "hyper.rs".parse().unwrap();
authority_form(&mut uri);
assert_eq!(uri.to_string(), "hyper.rs");
}
}