use std::sync::Arc;
use std::borrow::Cow;
use std::sync::atomic::{AtomicUsize, AtomicBool, Ordering};
use std::str::from_utf8;
use std::ascii::AsciiExt;
use futures::{Future, Async, Poll};
use tokio_core::io::Io;
use httparse;
use tk_bufstream::{ReadBuf, Buf};
use enums::Version;
use client::client::{BodyKind};
use client::errors::ErrorEnum;
use client::recv_mode::Mode;
use headers;
use chunked;
use body_parser::BodyProgress;
use client::encoder::RequestState;
use client::{Codec, Error, Head};
const MIN_HEADERS: usize = 16;
const MAX_HEADERS: usize = 1024;
#[derive(Debug, Clone)]
enum State {
Headers {
request_state: Arc<AtomicUsize>,
close_signal: Arc<AtomicBool>,
},
Body {
mode: Mode,
progress: BodyProgress,
},
}
pub struct Parser<S: Io, C: Codec<S>> {
io: Option<ReadBuf<S>>,
codec: C,
close: bool,
state: State,
}
fn scan_headers<'x>(is_head: bool, code: u16, headers: &'x [httparse::Header])
-> Result<(BodyKind, Option<Cow<'x, str>>, bool), ErrorEnum>
{
use client::client::BodyKind::*;
use client::errors::ErrorEnum::ConnectionInvalid;
let mut has_content_length = false;
let mut connection = None::<Cow<_>>;
let mut close = false;
if is_head || (code > 100 && code < 200) || code == 204 || code == 304 {
for header in headers.iter() {
if header.name.eq_ignore_ascii_case("Connection") {
let strconn = from_utf8(header.value)
.map_err(|_| ConnectionInvalid)?.trim();
connection = match connection {
Some(x) => Some(x + ", " + strconn),
None => Some(strconn.into()),
};
if header.value.split(|&x| x == b',').any(headers::is_close) {
close = true;
}
}
}
return Ok((Fixed(0), connection, close))
}
let mut result = BodyKind::Eof;
for header in headers.iter() {
if header.name.eq_ignore_ascii_case("Transfer-Encoding") {
if let Some(enc) = header.value.split(|&x| x == b',').last() {
if headers::is_chunked(enc) {
if has_content_length {
close = true;
}
result = Chunked;
}
}
} else if header.name.eq_ignore_ascii_case("Content-Length") {
if has_content_length {
return Err(ErrorEnum::DuplicateContentLength);
}
has_content_length = true;
if result != Chunked {
let s = from_utf8(header.value)
.map_err(|_| ErrorEnum::BadContentLength)?;
let len = s.parse()
.map_err(|_| ErrorEnum::BadContentLength)?;
result = Fixed(len);
} else {
close = true;
}
} else if header.name.eq_ignore_ascii_case("Connection") {
let strconn = from_utf8(header.value)
.map_err(|_| ConnectionInvalid)?.trim();
connection = match connection {
Some(x) => Some(x + ", " + strconn),
None => Some(strconn.into()),
};
if header.value.split(|&x| x == b',').any(headers::is_close) {
close = true;
}
}
}
Ok((result, connection, close))
}
fn new_body(mode: BodyKind, recv_mode: Mode)
-> Result<BodyProgress, ErrorEnum>
{
use super::client::BodyKind as B;
use super::recv_mode::Mode as M;
use client::errors::ErrorEnum::*;
use body_parser::BodyProgress as P;
match (mode, recv_mode) {
(B::Fixed(x), M::Buffered(b)) if x > b as u64 => {
Err(ResponseBodyTooLong)
}
(B::Fixed(x), _) => Ok(P::Fixed(x as usize)),
(B::Chunked, _) => Ok(P::Chunked(chunked::State::new())),
(B::Eof, _) => Ok(P::Eof),
}
}
fn parse_headers<S: Io, C: Codec<S>>(
buffer: &mut Buf, codec: &mut C, is_head: bool)
-> Result<Option<(State, bool)>, Error>
{
let (mode, body, close, bytes) = {
let mut vec;
let mut headers = [httparse::EMPTY_HEADER; MIN_HEADERS];
let (ver, code, reason, headers, bytes) = {
let mut raw = httparse::Response::new(&mut headers);
let mut result = raw.parse(&buffer[..]);
if matches!(result, Err(httparse::Error::TooManyHeaders)) {
vec = vec![httparse::EMPTY_HEADER; MAX_HEADERS];
raw = httparse::Response::new(&mut vec);
result = raw.parse(&buffer[..]);
}
match result.map_err(ErrorEnum::Header)? {
httparse::Status::Complete(bytes) => {
let ver = raw.version.unwrap();
let code = raw.code.unwrap();
(ver, code, raw.reason.unwrap(), raw.headers, bytes)
}
_ => return Ok(None),
}
};
let (body, conn, close) = try!(scan_headers(is_head, code, &headers));
let head = Head {
version: if ver == 1
{ Version::Http11 } else { Version::Http10 },
code: code,
reason: reason,
headers: headers,
body_kind: body,
connection_header: conn,
connection_close: close || ver == 0,
};
let mode = codec.headers_received(&head)?;
(mode, body, close, bytes)
};
buffer.consume(bytes);
Ok(Some((
State::Body {
mode: mode.mode,
progress: new_body(body, mode.mode)?,
},
close,
)))
}
impl<S: Io, C: Codec<S>> Parser<S, C> {
pub fn new(io: ReadBuf<S>, codec: C,
request_state: Arc<AtomicUsize>, close_signal: Arc<AtomicBool>)
-> Parser<S, C>
{
Parser {
io: Some(io),
codec: codec,
close: false,
state: State::Headers {
request_state: request_state,
close_signal: close_signal,
},
}
}
fn read_and_parse(&mut self) -> Poll<(), Error> {
use self::State::*;
use client::recv_mode::Mode::*;
let mut io = self.io.as_mut().expect("buffer is still here");
self.state = if let Headers {
ref request_state,
ref close_signal,
} = self.state
{
if io.read().map_err(ErrorEnum::Io)? == 0 {
if io.done() {
return Err(ErrorEnum::ResetOnResponseHeaders.into());
} else {
return Ok(Async::NotReady);
}
}
let reqs = request_state.load(Ordering::SeqCst);
if reqs == RequestState::Empty as usize {
return Err(ErrorEnum::PrematureResponseHeaders.into());
}
let is_head = reqs == RequestState::StartedHead as usize;
match parse_headers(&mut io.in_buf, &mut self.codec, is_head)? {
None => {
return Ok(Async::NotReady);
}
Some((body, close)) => {
if close {
close_signal.store(true, Ordering::SeqCst);
self.close = true;
}
body
},
}
} else {
self.state.clone()
};
loop {
match self.state {
Headers {..} => unreachable!(),
Body { ref mode, ref mut progress } => {
progress.parse(&mut io).map_err(ErrorEnum::ChunkSize)?;
let (bytes, done) = progress.check_buf(&io);
let operation = if done {
Some(self.codec.data_received(
&io.in_buf[..bytes], true)?)
} else if io.done() {
return Err(ErrorEnum::ResetOnResponseBody.into());
} else if matches!(*mode, Progressive(x) if x <= bytes) {
Some(self.codec.data_received(
&io.in_buf[..bytes], false)?)
} else {
None
};
match operation {
Some(Async::Ready(consumed)) => {
progress.consume(&mut io, consumed);
if done && consumed == bytes {
return Ok(Async::Ready(()));
}
}
Some(Async::NotReady) => {
if matches!(*mode, Progressive(x) if x > bytes) {
return Ok(Async::NotReady);
}
}
None => {} }
}
}
if io.read().map_err(ErrorEnum::Io)? == 0 {
if io.done() {
continue;
} else {
return Ok(Async::NotReady);
}
}
}
}
}
impl<S: Io, C: Codec<S>> Future for Parser<S, C> {
type Item = Option<ReadBuf<S>>;
type Error = Error;
fn poll(&mut self) -> Poll<Option<ReadBuf<S>>, Error> {
match self.read_and_parse()? {
Async::Ready(()) => {
let io = self.io.take().expect("buffer still here");
if self.close {
Ok(Async::Ready(None))
} else {
Ok(Async::Ready(Some(io)))
}
}
Async::NotReady => Ok(Async::NotReady),
}
}
}