use std::io;
use futures::try_ready;
use bytes::{BufMut, BytesMut};
use tokio::prelude::*;
use super::request;
pub struct HttpReader<S> {
socket: S,
closed: bool,
buffer: BytesMut,
}
impl<S: AsyncRead> HttpReader<S> {
pub fn new(socket: S) -> HttpReader<S> {
HttpReader {
socket,
closed: false,
buffer: BytesMut::with_capacity(1024),
}
}
}
impl<S: AsyncRead> Stream for HttpReader<S> {
type Item = request::Request;
type Error = io::Error;
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
loop {
if self.closed {
return Ok(Async::Ready(None));
}
if self.buffer.has_remaining_mut() {
self.buffer.reserve(1024);
}
let n = try_ready!(self.socket.read_buf(&mut self.buffer));
if n == 0 {
self.closed = true;
}
if !self.buffer.is_empty() {
match request::Request::parse(&mut self.buffer) {
Ok(Some(req)) => return Ok(Async::Ready(Some(req))),
Ok(None) => continue,
Err(e) => return Err(e),
}
}
}
}
}