use std::{
future::Future,
marker::PhantomData,
ops::DerefMut,
pin::Pin,
task::{Context, Poll, ready},
};
use futures_core::FusedFuture;
#[cfg(feature = "compat")]
use crate::FuturesCompat;
use crate::{Deserialize, future::assert_future};
pub trait IoStream: Stream<Error = std::io::Error> {}
impl<T: ?Sized> IoStream for T where T: Stream<Error = std::io::Error> {}
#[must_use = "streams do nothing unless polled"]
pub trait Stream {
type Error;
fn poll_next<Item: Deserialize>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>>;
#[inline]
fn size_hint(&self) -> (usize, Option<usize>) {
(0, None)
}
}
impl<S: ?Sized + Stream + Unpin> Stream for &mut S {
type Error = S::Error;
fn poll_next<Item: Deserialize>(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
S::poll_next(Pin::new(&mut **self), cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}
impl<P> Stream for Pin<P>
where
P: DerefMut + Unpin,
P::Target: Stream,
{
type Error = <P::Target as Stream>::Error;
fn poll_next<Item: Deserialize>(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>> {
self.get_mut().as_mut().poll_next(cx)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(**self).size_hint()
}
}
pub trait FusedStream: Stream {
fn is_terminated(&self) -> bool;
}
impl<F: ?Sized + FusedStream + Unpin> FusedStream for &mut F {
fn is_terminated(&self) -> bool {
<F as FusedStream>::is_terminated(&**self)
}
}
impl<P> FusedStream for Pin<P>
where
P: DerefMut + Unpin,
P::Target: FusedStream,
{
fn is_terminated(&self) -> bool {
<P::Target as FusedStream>::is_terminated(&**self)
}
}
pub trait StreamExt: Stream {
fn next<Item: Deserialize>(&mut self) -> Next<'_, Self, Item>
where
Self: Unpin,
{
assert_future::<Option<Result<Item, Self::Error>>, _>(Next::new(self))
}
#[cfg(feature = "compat")]
fn compat_stream<Item: Deserialize>(self) -> FuturesCompat<Self, Item>
where
Self: Sized,
{
assert_futures_stream(FuturesCompat::new(self))
}
fn poll_next_unpin<Item: Deserialize>(
&mut self,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Item, Self::Error>>>
where
Self: Unpin,
{
Pin::new(self).poll_next(cx)
}
}
impl<S: Stream + ?Sized> StreamExt for S {}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct Next<'a, St: ?Sized, Item> {
stream: &'a mut St,
_pd: PhantomData<Item>,
}
impl<St: ?Sized + Unpin, Item> Unpin for Next<'_, St, Item> {}
impl<'a, St: ?Sized + Stream + Unpin, Item> Next<'a, St, Item> {
pub(super) fn new(stream: &'a mut St) -> Self {
Self {
stream,
_pd: PhantomData,
}
}
}
impl<St: ?Sized + FusedStream + Unpin, Item: Deserialize> FusedFuture for Next<'_, St, Item> {
fn is_terminated(&self) -> bool {
self.stream.is_terminated()
}
}
impl<St: ?Sized + Stream + Unpin, Item: Deserialize> Future for Next<'_, St, Item> {
type Output = Option<Result<Item, St::Error>>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
self.stream.poll_next_unpin(cx)
}
}
pub trait IoStreamExt: IoStream {
fn expect_next<Item: Deserialize>(&mut self) -> ExpectNext<'_, Self, Item>
where
Self: Unpin,
{
ExpectNext {
next: self.next(),
_pd: PhantomData,
}
}
}
impl<S: ?Sized> IoStreamExt for S where S: IoStream {}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct ExpectNext<'a, St: ?Sized, Item> {
next: Next<'a, St, Item>,
_pd: PhantomData<Item>,
}
impl<St: ?Sized + Unpin, Item> Unpin for ExpectNext<'_, St, Item> {}
impl<St: ?Sized + IoStream + Unpin, Item: Deserialize> Future for ExpectNext<'_, St, Item> {
type Output = Result<Item, St::Error>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
match ready!(Pin::new(&mut self.next).poll(cx)) {
Some(Ok(item)) => Poll::Ready(Ok(item)),
Some(Err(err)) => Poll::Ready(Err(err)),
None => Poll::Ready(Err(std::io::ErrorKind::UnexpectedEof.into())),
}
}
}
pub(crate) fn assert_futures_stream<S: futures_core::Stream>(stream: S) -> S {
stream
}