use crate::errors::Error;
use futures::{Async, Stream};
use hyper::Chunk;
use std::{
cmp,
io::{self, Read},
};
use tokio_io::AsyncRead;
enum ReadState {
Ready(Chunk, usize),
NotReady,
}
pub struct StreamReader<S> {
stream: S,
state: ReadState,
}
impl<S> StreamReader<S>
where
S: Stream<Item = Chunk, Error = Error>,
{
#[inline]
pub fn new(stream: S) -> StreamReader<S> {
StreamReader {
stream,
state: ReadState::NotReady,
}
}
}
impl<S> Read for StreamReader<S>
where
S: Stream<Item = Chunk, Error = Error>,
{
fn read(
&mut self,
buf: &mut [u8],
) -> io::Result<usize> {
loop {
let ret;
match self.state {
ReadState::Ready(ref mut chunk, ref mut pos) => {
let chunk_start = *pos;
let len = cmp::min(buf.len(), chunk.len() - chunk_start);
let chunk_end = chunk_start + len;
buf[..len].copy_from_slice(&chunk[chunk_start..chunk_end]);
*pos += len;
if *pos == chunk.len() {
ret = len;
} else {
return Ok(len);
}
}
ReadState::NotReady => {
match self.stream.poll() {
Ok(Async::Ready(Some(chunk))) => {
self.state = ReadState::Ready(chunk, 0);
continue;
}
Ok(Async::Ready(None)) => return Ok(0),
Ok(Async::NotReady) => return Err(io::ErrorKind::WouldBlock.into()),
Err(e) => return Err(io::Error::new(io::ErrorKind::Other, e.to_string())),
}
}
}
self.state = ReadState::NotReady;
return Ok(ret);
}
}
}
impl<S> AsyncRead for StreamReader<S> where S: Stream<Item = Chunk, Error = Error> {}