use futures_core::Stream;
use std::io;
use std::pin::Pin;
use std::task::{Context, Poll};
use crate::enums::DecodeResult;
use crate::traits::byte_stream::GenByteStream;
use crate::traits::frame_decoder::FrameDecoder;
use crate::traits::stream_buffer::StreamBuffer;
pub struct FramedByteStream<S, D, B>
where
S: GenByteStream<B>,
D: FrameDecoder,
B: StreamBuffer,
{
pub(crate) inner: S,
decoder: D,
buf: B,
}
impl<S, D, B> FramedByteStream<S, D, B>
where
S: GenByteStream<B>,
D: FrameDecoder,
B: StreamBuffer,
{
pub fn new(stream: S, decoder: D, initial_capacity: usize) -> Self {
Self {
inner: stream,
decoder,
buf: B::with_capacity(initial_capacity),
}
}
}
impl<S, D, B> Stream for FramedByteStream<S, D, B>
where
S: GenByteStream<B>,
D: FrameDecoder + Unpin,
B: StreamBuffer + Unpin,
{
type Item = Result<D::Frame, io::Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let me = self.get_mut();
loop {
match me.decoder.decode(me.buf.as_ref()) {
Ok(DecodeResult::Frame { frame, consumed }) => {
me.buf.drain(0..consumed);
return Poll::Ready(Some(Ok(frame)));
}
Ok(DecodeResult::NeedMore) => {
}
Err(e) => {
me.buf = B::default();
return Poll::Ready(Some(Err(e)));
}
}
match Pin::new(&mut me.inner).poll_next(cx) {
Poll::Pending => return Poll::Pending,
Poll::Ready(Some(Ok(chunk))) => {
me.buf.extend_from_slice(chunk.as_ref());
continue; }
Poll::Ready(Some(Err(e))) => return Poll::Ready(Some(Err(e))),
Poll::Ready(None) => {
if me.buf.is_empty() {
return Poll::Ready(None);
} else {
return Poll::Ready(Some(Err(io::Error::new(
io::ErrorKind::UnexpectedEof,
"truncated frame at end of stream",
))));
}
}
}
}
}
}