use super::stream_support::{RawStreamReader, StreamOps, StreamVtable};
use std::boxed::Box;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
pub struct RawStreamReaderStream<O: StreamOps + 'static> {
state: StreamAdapterState<O>,
}
impl<O: StreamOps + 'static> Unpin for RawStreamReaderStream<O> {}
pub type StreamReaderStream<T> = RawStreamReaderStream<&'static StreamVtable<T>>;
type ReadNextFut<O> =
Pin<Box<dyn Future<Output = (RawStreamReader<O>, Option<<O as StreamOps>::Payload>)>>>;
enum StreamAdapterState<O: StreamOps + 'static> {
Idle(RawStreamReader<O>),
Reading(ReadNextFut<O>),
Complete,
}
impl<O: StreamOps + 'static> RawStreamReaderStream<O> {
pub fn new(reader: RawStreamReader<O>) -> Self {
Self {
state: StreamAdapterState::Idle(reader),
}
}
pub fn into_inner(self) -> Option<RawStreamReader<O>> {
match self.state {
StreamAdapterState::Idle(reader) => Some(reader),
_ => None,
}
}
}
impl<O: StreamOps + 'static> futures::stream::Stream for RawStreamReaderStream<O> {
type Item = O::Payload;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match std::mem::replace(&mut self.state, StreamAdapterState::Complete) {
StreamAdapterState::Idle(mut reader) => {
let fut: ReadNextFut<O> = Box::pin(async move {
let item = reader.next().await;
(reader, item)
});
self.state = StreamAdapterState::Reading(fut);
}
StreamAdapterState::Reading(mut fut) => match fut.as_mut().poll(cx) {
Poll::Pending => {
self.state = StreamAdapterState::Reading(fut);
return Poll::Pending;
}
Poll::Ready((reader, Some(item))) => {
self.state = StreamAdapterState::Idle(reader);
return Poll::Ready(Some(item));
}
Poll::Ready((_reader, None)) => {
self.state = StreamAdapterState::Complete;
return Poll::Ready(None);
}
},
StreamAdapterState::Complete => {
self.state = StreamAdapterState::Complete;
return Poll::Ready(None);
}
}
}
}
}
impl<O: StreamOps + 'static> RawStreamReader<O> {
pub fn into_stream(self) -> RawStreamReaderStream<O> {
RawStreamReaderStream::new(self)
}
}