use super::super::super::super::std::{error::*, immutable::*};
use {
futures::{Stream, StreamExt},
std::{cmp::*, io, pin::*, result::Result, task::*},
tokio::io::*,
};
const REMAINDER_INITIAL_CAPACITY: usize = 8 * 1_024;
pub struct AsyncBytesStreamReader<StreamT, ErrorT>
where
StreamT: Stream<Item = Result<ImmutableBytes, ErrorT>> + Unpin,
{
stream: StreamT,
pub remainder: BytesMut,
}
impl<StreamT, ErrorT> AsyncBytesStreamReader<StreamT, ErrorT>
where
StreamT: Stream<Item = Result<ImmutableBytes, ErrorT>> + Unpin,
ErrorT: Into<CapturedError>,
{
pub fn new(stream: StreamT) -> Self {
Self { stream, remainder: BytesMut::with_capacity(0) }
}
pub fn into_inner(self) -> (StreamT, BytesMut) {
(self.stream, self.remainder)
}
fn validate_remainder_capacity(&mut self) {
let capacity = self.remainder.capacity();
if capacity < REMAINDER_INITIAL_CAPACITY {
self.remainder.reserve(REMAINDER_INITIAL_CAPACITY - capacity);
}
}
}
impl<StreamT, ErrorT> AsyncRead for AsyncBytesStreamReader<StreamT, ErrorT>
where
StreamT: Stream<Item = Result<ImmutableBytes, ErrorT>> + Unpin,
ErrorT: Into<CapturedError>,
{
fn poll_read(mut self: Pin<&mut Self>, context: &mut Context, buffer: &mut ReadBuf) -> Poll<io::Result<()>> {
if self.remainder.has_remaining() {
let size = min(buffer.remaining_mut(), self.remainder.remaining());
if size != 0 {
let bytes = self.remainder.copy_to_bytes(size);
buffer.put(bytes);
if !buffer.has_remaining_mut() {
return Poll::Ready(Ok(()));
}
}
}
Poll::Ready(match ready!(self.stream.poll_next_unpin(context)) {
Some(result) => {
let mut bytes = result.map_err(io::Error::other)?;
let size = min(buffer.remaining_mut(), bytes.remaining());
if size != 0 {
let bytes = bytes.copy_to_bytes(size);
buffer.put(bytes);
}
if bytes.has_remaining() {
self.validate_remainder_capacity();
self.remainder.put(bytes);
}
Ok(())
}
None => Ok(()),
})
}
}