#![deny(dead_code, unused_imports)]
#![warn(rust_2018_idioms)]
use std::mem;
#[cfg(feature = "brotli")] use brotli;
use bytes::Bytes;
use failure::{
bail,
Error as Flare,
format_err
};
use flate2::read::{DeflateDecoder, GzDecoder};
use futures::{future, Future, Stream};
use futures::future::Either;
use http;
use hyper;
use hyper_tls;
use hyperx::header::{
ContentEncoding, ContentLength, Encoding as HyEncoding,
Header, TransferEncoding
};
use log::{debug, warn};
use tokio;
use tokio::timer::timeout;
use tokio::util::FutureExt;
use body_image::{
BodyImage, BodySink, BodyError, Encoding,
Epilog, Prolog, Dialog, Recorded, RequestRecorded, Tunables,
};
mod image;
pub use self::image::AsyncBodyImage;
mod sink;
pub use self::sink::AsyncBodySink;
#[cfg(feature = "mmap")] mod mem_map_buf;
#[cfg(feature = "mmap")] use self::mem_map_buf::MemMapBuf;
#[cfg(feature = "mmap")] mod uni_image;
#[cfg(feature = "mmap")] pub use self::uni_image::{UniBodyImage, UniBodyBuf};
#[cfg(feature = "mmap")] mod uni_sink;
#[cfg(feature = "mmap")] pub use self::uni_sink::UniBodySink;
pub static VERSION: &str = env!("CARGO_PKG_VERSION");
#[cfg(feature = "brotli")]
pub static ACCEPT_ENCODINGS: &str = "br, gzip, deflate";
#[cfg(not(feature = "brotli"))]
pub static ACCEPT_ENCODINGS: &str = "gzip, deflate";
pub static BROWSE_ACCEPT: &str =
"text/html, application/xhtml+xml, \
application/xml;q=0.9, \
*/*;q=0.8";
pub fn fetch<B>(rr: RequestRecord<B>, tune: &Tunables)
-> Result<Dialog, Flare>
where B: hyper::body::Payload + Send
{
let mut rt = tokio::runtime::Builder::new()
.name_prefix("tpool-")
.core_threads(2)
.blocking_threads(2)
.build()
.unwrap();
let connector = hyper_tls::HttpsConnector::new(1 )?;
let client = hyper::Client::builder().build(connector);
rt.block_on(request_dialog(&client, rr, tune))
}
pub fn request_dialog<CN, B>(
client: &hyper::Client<CN, B>,
rr: RequestRecord<B>,
tune: &Tunables)
-> impl Future<Item=Dialog, Error=Flare> + Send
where CN: hyper::client::connect::Connect + Sync + 'static,
B: hyper::body::Payload + Send
{
let prolog = rr.prolog;
let tune = tune.clone();
let res_timeout = tune.res_timeout();
let body_timeout = tune.body_timeout();
let futr = client
.request(rr.request)
.from_err::<Flare>()
.map(|response| Monolog { prolog, response });
let futr = if let Some(t) = res_timeout {
Either::A(futr
.timeout(t)
.map_err(move |te| {
timeout_to_flare(te, || {
format_err!("timeout before initial response ({:?})", t)
})
})
)
} else {
Either::B(futr)
};
let futr = futr.and_then(|monolog| resp_future(monolog, tune));
let futr = if let Some(t) = body_timeout {
Either::A(futr
.timeout(t)
.map_err(move |te| {
timeout_to_flare(te, || {
format_err!(
"timeout before streaming body complete ({:?})",
t
)
})
})
)
} else {
Either::B(futr)
};
futr.and_then(InDialog::prepare)
}
fn timeout_to_flare<F>(te: timeout::Error<Flare>, on_elapsed: F) -> Flare
where F: FnOnce() -> Flare
{
if te.is_elapsed() {
on_elapsed()
} else if te.is_timer() {
Flare::from(te.into_timer().unwrap())
} else {
te.into_inner().expect("inner")
}
}
pub fn find_encodings(headers: &http::HeaderMap) -> Vec<Encoding> {
let encodings = headers
.get_all(http::header::TRANSFER_ENCODING)
.iter()
.chain(headers
.get_all(http::header::CONTENT_ENCODING)
.iter());
let mut res = Vec::with_capacity(2);
'headers: for v in encodings {
if let Ok(v) = ContentEncoding::parse_header(&v) {
for av in v.iter() {
match *av {
HyEncoding::Identity => {}
HyEncoding::Chunked => {
res.push(Encoding::Chunked);
}
HyEncoding::Deflate => {
res.push(Encoding::Deflate);
break 'headers;
}
HyEncoding::Gzip => {
res.push(Encoding::Gzip);
break 'headers;
}
HyEncoding::Brotli => {
res.push(Encoding::Brotli);
break 'headers;
}
_ => {
warn!("Found unknown encoding: {:?}", av);
break 'headers;
}
}
}
}
}
res
}
pub fn find_chunked(headers: &http::HeaderMap) -> bool {
let encodings = headers.get_all(http::header::TRANSFER_ENCODING);
'headers: for v in encodings {
if let Ok(v) = TransferEncoding::parse_header(&v) {
for av in v.iter() {
match *av {
HyEncoding::Identity => {}
HyEncoding::Chunked => {
return true;
}
_ => {
break 'headers;
}
}
}
}
}
false
}
pub fn decode_res_body(dialog: &mut Dialog, tune: &Tunables)
-> Result<bool, BodyError>
{
let encodings = find_encodings(dialog.res_headers());
let compression = encodings.last().and_then(|e| {
if *e != Encoding::Chunked { Some(*e) } else { None }
});
let mut decoded = false;
if let Some(comp) = compression {
debug!("Body to {:?} decode: {:?}", comp, dialog.res_body());
let new_body = decompress(dialog.res_body(), comp, tune)?;
if let Some(b) = new_body {
dialog.set_res_body_decoded(b, encodings);
decoded = true;
debug!("Body update: {:?}", dialog.res_body());
} else {
warn!("Unsupported encoding: {:?} not decoded", comp);
}
}
Ok(decoded)
}
pub fn decompress(body: &BodyImage, compression: Encoding, tune: &Tunables)
-> Result<Option<BodyImage>, BodyError>
{
let mut reader = body.reader();
match compression {
Encoding::Gzip => {
let mut decoder = GzDecoder::new(reader.as_read());
let len_est = body.len() * u64::from(tune.size_estimate_gzip());
Ok(Some(BodyImage::read_from(&mut decoder, len_est, tune)?))
}
Encoding::Deflate => {
let mut decoder = DeflateDecoder::new(reader.as_read());
let len_est = body.len() * u64::from(tune.size_estimate_deflate());
Ok(Some(BodyImage::read_from(&mut decoder, len_est, tune)?))
}
#[cfg(feature = "brotli")]
Encoding::Brotli => {
let mut decoder = brotli::Decompressor::new(
reader.as_read(),
tune.buffer_size_ram());
let len_est = body.len() * u64::from(tune.size_estimate_brotli());
Ok(Some(BodyImage::read_from(&mut decoder, len_est, tune)?))
}
_ => {
Ok(None)
}
}
}
pub fn user_agent() -> String {
format!("Mozilla/5.0 (compatible; body-image {}; \
+https://crates.io/crates/body-image)",
VERSION)
}
fn resp_future(monolog: Monolog, tune: Tunables)
-> impl Future<Item=InDialog, Error=Flare> + Send
{
let (resp_parts, body) = monolog.response.into_parts();
let bsink = match resp_parts.headers.get(http::header::CONTENT_LENGTH) {
Some(v) => check_length(v, tune.max_body()).and_then(|cl| {
if cl > tune.max_body_ram() {
BodySink::with_fs(tune.temp_dir()).map_err(Flare::from)
} else {
Ok(BodySink::with_ram(cl))
}
}),
None => Ok(BodySink::with_ram(tune.max_body_ram()))
};
let bsink = match bsink {
Ok(b) => b,
Err(e) => { return Either::A(future::err(e)); }
};
let async_body = AsyncBodySink::new(bsink, tune);
let mut in_dialog = InDialog {
prolog: monolog.prolog,
version: resp_parts.version,
status: resp_parts.status,
res_headers: resp_parts.headers,
res_body: BodySink::empty()
};
Either::B(
body.from_err::<Flare>()
.forward(async_body)
.and_then(|(_strm, mut async_body)| {
mem::swap(async_body.body_mut(), &mut in_dialog.res_body);
Ok(in_dialog)
})
)
}
fn check_length(v: &http::header::HeaderValue, max: u64)
-> Result<u64, Flare>
{
let l = *ContentLength::parse_header(&v)?;
if l > max {
bail!("Response Content-Length too long: {}", l);
}
Ok(l)
}
#[derive(Debug)]
pub struct RequestRecord<B> {
request: http::Request<B>,
prolog: Prolog,
}
impl<B> RequestRecord<B> {
pub fn method(&self) -> &http::Method { &self.prolog.method }
pub fn url(&self) -> &http::Uri { &self.prolog.url }
pub fn request(&self) -> &http::Request<B> { &self.request }
}
impl<B> RequestRecorded for RequestRecord<B> {
fn req_headers(&self) -> &http::HeaderMap { &self.prolog.req_headers }
fn req_body(&self) -> &BodyImage { &self.prolog.req_body }
}
#[derive(Debug)]
struct Monolog {
prolog: Prolog,
response: http::Response<hyper::Body>,
}
#[derive(Debug)]
struct InDialog {
prolog: Prolog,
version: http::Version,
status: http::StatusCode,
res_headers: http::HeaderMap,
res_body: BodySink,
}
impl InDialog {
fn prepare(self) -> Result<Dialog, Flare> {
let res_decoded = if find_chunked(&self.res_headers) {
vec![Encoding::Chunked]
} else {
Vec::with_capacity(0)
};
Ok(Dialog::new(
self.prolog,
Epilog {
version: self.version,
status: self.status,
res_headers: self.res_headers,
res_body: self.res_body.prepare()?,
res_decoded,
}
))
}
}
pub trait RequestRecorder<B>
where B: hyper::body::Payload + Send
{
fn record(&mut self) -> Result<RequestRecord<B>, Flare>;
fn record_body<BB>(&mut self, body: BB)
-> Result<RequestRecord<B>, Flare>
where BB: Into<Bytes>;
fn record_body_image(&mut self, body: BodyImage, tune: &Tunables)
-> Result<RequestRecord<B>, Flare>;
}
impl RequestRecorder<hyper::Body> for http::request::Builder {
fn record(&mut self) -> Result<RequestRecord<hyper::Body>, Flare> {
let request = self.body(hyper::Body::empty())?;
let method = request.method().clone();
let url = request.uri().clone();
let req_headers = request.headers().clone();
let req_body = BodyImage::empty();
Ok(RequestRecord {
request,
prolog: Prolog { method, url, req_headers, req_body }
})
}
fn record_body<BB>(&mut self, body: BB)
-> Result<RequestRecord<hyper::Body>, Flare>
where BB: Into<Bytes>
{
let buf: Bytes = body.into();
let buf_copy: Bytes = buf.clone();
let request = self.body(buf.into())?;
let method = request.method().clone();
let url = request.uri().clone();
let req_headers = request.headers().clone();
let req_body = if buf_copy.is_empty() {
BodyImage::empty()
} else {
BodyImage::from_slice(buf_copy)
};
Ok(RequestRecord {
request,
prolog: Prolog { method, url, req_headers, req_body } })
}
fn record_body_image(&mut self, body: BodyImage, tune: &Tunables)
-> Result<RequestRecord<hyper::Body>, Flare>
{
let request = if !body.is_empty() {
let stream = AsyncBodyImage::new(body.clone(), tune);
self.body(hyper::Body::wrap_stream(stream))?
} else {
self.body(hyper::Body::empty())?
};
let method = request.method().clone();
let url = request.uri().clone();
let req_headers = request.headers().clone();
Ok(RequestRecord {
request,
prolog: Prolog { method, url, req_headers, req_body: body } })
}
}
#[cfg(test)]
mod logger;
#[cfg(test)]
mod futio_tests {
#[cfg(feature = "mmap")] mod futures;
mod server;
#[cfg(feature = "may_fail")] mod live;
}