#![deny(dead_code, unused_imports)]
#![warn(rust_2018_idioms)]
use std::error::Error as StdError;
use std::fmt;
use std::io;
use std::mem;
use std::time::Duration;
use bytes::Bytes;
use futures::{future, Future, Stream};
use futures::future::Either;
use http;
use hyper;
use hyper_tls;
use hyperx::header::{ContentLength, TypedHeaders};
use tao_log::warn;
use tokio;
use tokio::timer::timeout;
use tokio::util::FutureExt;
use body_image::{
BodyImage, BodySink, BodyError, Encoding,
Epilog, Prolog, Dialog, RequestRecorded, Tunables,
};
pub type Flaw = Box<dyn StdError + Send + Sync + 'static>;
mod decode;
pub use decode::{decode_res_body, find_encodings, find_chunked};
mod image;
pub use image::AsyncBodyImage;
mod sink;
pub use sink::AsyncBodySink;
#[cfg(feature = "mmap")] mod mem_map_buf;
#[cfg(feature = "mmap")] use mem_map_buf::MemMapBuf;
#[cfg(feature = "mmap")] mod uni_image;
#[cfg(feature = "mmap")] pub use uni_image::{UniBodyImage, UniBodyBuf};
#[cfg(feature = "mmap")] mod uni_sink;
#[cfg(feature = "mmap")] pub use uni_sink::UniBodySink;
pub static VERSION: &str = env!("CARGO_PKG_VERSION");
#[cfg(feature = "brotli")]
pub static ACCEPT_ENCODINGS: &str = "br, gzip, deflate";
#[cfg(not(feature = "brotli"))]
pub static ACCEPT_ENCODINGS: &str = "gzip, deflate";
pub static BROWSE_ACCEPT: &str =
"text/html, application/xhtml+xml, \
application/xml;q=0.9, \
*/*;q=0.8";
#[derive(Debug)]
pub enum FutioError {
Body(BodyError),
ResponseTimeout(Duration),
BodyTimeout(Duration),
ContentLengthTooLong(u64),
Http(http::Error),
Hyper(hyper::Error),
UnsupportedEncoding(Encoding),
Other(Flaw),
_FutureProof
}
impl fmt::Display for FutioError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
FutioError::Body(ref be) =>
write!(f, "With body: {}", be),
FutioError::ResponseTimeout(d) =>
write!(f, "Timeout before initial response ({:?})", d),
FutioError::BodyTimeout(d) =>
write!(f, "Timeout before streaming body complete ({:?})", d),
FutioError::ContentLengthTooLong(l) =>
write!(f, "Response Content-Length too long: {}", l),
FutioError::Http(ref e) =>
write!(f, "Http error: {}", e),
FutioError::Hyper(ref e) =>
write!(f, "Hyper error: {}", e),
FutioError::UnsupportedEncoding(e) =>
write!(f, "Unsupported encoding: {}", e),
FutioError::Other(ref flaw) =>
write!(f, "Other error: {}", flaw),
FutioError::_FutureProof =>
unreachable!("Don't abuse the _FutureProof!")
}
}
}
impl StdError for FutioError {
fn source(&self) -> Option<&(dyn StdError + 'static)> {
match *self {
FutioError::Body(ref be) => Some(be),
FutioError::Http(ref ht) => Some(ht),
FutioError::Hyper(ref he) => Some(he),
FutioError::Other(ref flaw) => Some(flaw.as_ref()),
_ => None
}
}
}
impl From<BodyError> for FutioError {
fn from(err: BodyError) -> FutioError {
FutioError::Body(err)
}
}
impl From<http::Error> for FutioError {
fn from(err: http::Error) -> FutioError {
FutioError::Http(err)
}
}
impl From<hyper::Error> for FutioError {
fn from(err: hyper::Error) -> FutioError {
FutioError::Hyper(err)
}
}
impl From<io::Error> for FutioError {
fn from(err: io::Error) -> FutioError {
FutioError::Body(BodyError::Io(err))
}
}
pub fn fetch<B>(rr: RequestRecord<B>, tune: &Tunables)
-> Result<Dialog, FutioError>
where B: hyper::body::Payload + Send
{
let mut rt = tokio::runtime::Builder::new()
.name_prefix("tpool-")
.core_threads(2)
.blocking_threads(2)
.build()
.unwrap();
let connector = hyper_tls::HttpsConnector::new(1 )
.map_err(|e| FutioError::Other(Box::new(e)))?;
let client = hyper::Client::builder().build(connector);
rt.block_on(request_dialog(&client, rr, tune))
}
pub fn request_dialog<CN, B>(
client: &hyper::Client<CN, B>,
rr: RequestRecord<B>,
tune: &Tunables)
-> impl Future<Item=Dialog, Error=FutioError> + Send
where CN: hyper::client::connect::Connect + Sync + 'static,
B: hyper::body::Payload + Send
{
let prolog = rr.prolog;
let tune = tune.clone();
let res_timeout = tune.res_timeout();
let body_timeout = tune.body_timeout();
let futr = client
.request(rr.request)
.from_err::<FutioError>()
.map(|response| Monolog { prolog, response });
let futr = if let Some(t) = res_timeout {
Either::A(futr
.timeout(t)
.map_err(move |te| {
map_timeout(te, || FutioError::ResponseTimeout(t))
})
)
} else {
Either::B(futr)
};
let futr = futr.and_then(|monolog| resp_future(monolog, tune));
let futr = if let Some(t) = body_timeout {
Either::A(futr
.timeout(t)
.map_err(move |te| {
map_timeout(te, || FutioError::BodyTimeout(t))
})
)
} else {
Either::B(futr)
};
futr.and_then(InDialog::prepare)
}
fn map_timeout<F>(te: timeout::Error<FutioError>, on_elapsed: F) -> FutioError
where F: FnOnce() -> FutioError
{
if te.is_elapsed() {
on_elapsed()
} else if te.is_timer() {
FutioError::Other(te.into_timer().unwrap().into())
} else {
te.into_inner().expect("inner")
}
}
pub fn user_agent() -> String {
format!("Mozilla/5.0 (compatible; body-image {}; \
+https://crates.io/crates/body-image)",
VERSION)
}
fn resp_future(monolog: Monolog, tune: Tunables)
-> impl Future<Item=InDialog, Error=FutioError> + Send
{
let (resp_parts, body) = monolog.response.into_parts();
let bsink = match resp_parts.headers.try_decode::<ContentLength>() {
Some(Ok(ContentLength(l))) => {
if l > tune.max_body() {
Err(FutioError::ContentLengthTooLong(l))
} else if l > tune.max_body_ram() {
BodySink::with_fs(tune.temp_dir()).map_err(FutioError::from)
} else {
Ok(BodySink::with_ram(l))
}
},
Some(Err(e)) => Err(FutioError::Other(Box::new(e))),
None => Ok(BodySink::with_ram(tune.max_body_ram()))
};
let bsink = match bsink {
Ok(b) => b,
Err(e) => { return Either::A(future::err(e)); }
};
let async_body = AsyncBodySink::new(bsink, tune);
let mut in_dialog = InDialog {
prolog: monolog.prolog,
version: resp_parts.version,
status: resp_parts.status,
res_headers: resp_parts.headers,
res_body: BodySink::empty()
};
Either::B(
body.from_err::<FutioError>()
.forward(async_body)
.and_then(|(_strm, mut async_body)| {
mem::swap(async_body.body_mut(), &mut in_dialog.res_body);
Ok(in_dialog)
})
)
}
#[derive(Debug)]
pub struct RequestRecord<B> {
request: http::Request<B>,
prolog: Prolog,
}
impl<B> RequestRecord<B> {
pub fn method(&self) -> &http::Method { &self.prolog.method }
pub fn url(&self) -> &http::Uri { &self.prolog.url }
pub fn request(&self) -> &http::Request<B> { &self.request }
}
impl<B> RequestRecorded for RequestRecord<B> {
fn req_headers(&self) -> &http::HeaderMap { &self.prolog.req_headers }
fn req_body(&self) -> &BodyImage { &self.prolog.req_body }
}
#[derive(Debug)]
struct Monolog {
prolog: Prolog,
response: http::Response<hyper::Body>,
}
#[derive(Debug)]
struct InDialog {
prolog: Prolog,
version: http::Version,
status: http::StatusCode,
res_headers: http::HeaderMap,
res_body: BodySink,
}
impl InDialog {
fn prepare(self) -> Result<Dialog, FutioError> {
let res_decoded = if find_chunked(&self.res_headers) {
vec![Encoding::Chunked]
} else {
Vec::with_capacity(0)
};
Ok(Dialog::new(
self.prolog,
Epilog {
version: self.version,
status: self.status,
res_headers: self.res_headers,
res_body: self.res_body.prepare()?,
res_decoded,
}
))
}
}
pub trait RequestRecorder<B>
where B: hyper::body::Payload + Send
{
fn record(&mut self) -> Result<RequestRecord<B>, http::Error>;
fn record_body<BB>(&mut self, body: BB)
-> Result<RequestRecord<B>, http::Error>
where BB: Into<Bytes>;
fn record_body_image(&mut self, body: BodyImage, tune: &Tunables)
-> Result<RequestRecord<B>, http::Error>;
}
impl RequestRecorder<hyper::Body> for http::request::Builder {
fn record(&mut self) -> Result<RequestRecord<hyper::Body>, http::Error> {
let request = self.body(hyper::Body::empty())?;
let method = request.method().clone();
let url = request.uri().clone();
let req_headers = request.headers().clone();
let req_body = BodyImage::empty();
Ok(RequestRecord {
request,
prolog: Prolog { method, url, req_headers, req_body }
})
}
fn record_body<BB>(&mut self, body: BB)
-> Result<RequestRecord<hyper::Body>, http::Error>
where BB: Into<Bytes>
{
let buf: Bytes = body.into();
let buf_copy: Bytes = buf.clone();
let request = self.body(buf.into())?;
let method = request.method().clone();
let url = request.uri().clone();
let req_headers = request.headers().clone();
let req_body = if buf_copy.is_empty() {
BodyImage::empty()
} else {
BodyImage::from_slice(buf_copy)
};
Ok(RequestRecord {
request,
prolog: Prolog { method, url, req_headers, req_body } })
}
fn record_body_image(&mut self, body: BodyImage, tune: &Tunables)
-> Result<RequestRecord<hyper::Body>, http::Error>
{
let request = if !body.is_empty() {
let stream = AsyncBodyImage::new(body.clone(), tune);
self.body(hyper::Body::wrap_stream(stream))?
} else {
self.body(hyper::Body::empty())?
};
let method = request.method().clone();
let url = request.uri().clone();
let req_headers = request.headers().clone();
Ok(RequestRecord {
request,
prolog: Prolog { method, url, req_headers, req_body: body } })
}
}
#[cfg(test)]
mod logger;
#[cfg(test)]
mod futio_tests {
#[cfg(feature = "mmap")] mod futures;
mod server;
#[cfg(feature = "may_fail")] mod live;
use tao_log::{debug, debugv};
use super::{FutioError, Flaw};
use crate::logger::test_logger;
use std::mem::size_of;
fn is_flaw(f: Flaw) -> bool {
debug!("Flaw Debug: {:?}, Display: \"{}\"", f, f);
true
}
#[test]
fn test_error_as_flaw() {
assert!(test_logger());
assert!(is_flaw(FutioError::ContentLengthTooLong(42).into()));
assert!(is_flaw(FutioError::Other("one off".into()).into()));
}
#[test]
fn test_error_size() {
assert!(test_logger());
assert!(debugv!(size_of::<FutioError>()) <= 32);
}
#[test]
#[should_panic]
fn test_error_future_proof() {
assert!(!FutioError::_FutureProof.to_string().is_empty(),
"should have panic'd before, unreachable")
}
}