use std::pin::Pin;
use cfg_if::cfg_if;
use crate::future::Future;
use crate::task::{Context, Poll};
cfg_if! {
if #[cfg(feature = "docs")] {
#[doc(hidden)]
pub struct ImplFuture<'a, T>(std::marker::PhantomData<&'a T>);
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => (ImplFuture<$a, $o>);
}
} else {
macro_rules! ret {
($a:lifetime, $f:tt, $o:ty) => ($f<$a, Self>);
}
}
}
pub trait Stream {
type Item;
fn next<'a>(&'a mut self) -> ret!('a, NextFuture, Option<Self::Item>)
where
Self: Unpin;
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take {
stream: self,
remaining: n,
}
}
}
impl<T: futures::Stream + Unpin + ?Sized> Stream for T {
type Item = <Self as futures::Stream>::Item;
fn next<'a>(&'a mut self) -> ret!('a, NextFuture, Option<Self::Item>)
where
Self: Unpin,
{
NextFuture { stream: self }
}
}
#[doc(hidden)]
#[allow(missing_debug_implementations)]
pub struct NextFuture<'a, T: Unpin + ?Sized> {
stream: &'a mut T,
}
impl<T: futures::Stream + Unpin + ?Sized> Future for NextFuture<'_, T> {
type Output = Option<T::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.stream).poll_next(cx)
}
}
#[derive(Clone, Debug)]
pub struct Take<S> {
stream: S,
remaining: usize,
}
impl<S: Unpin> Unpin for Take<S> {}
impl<S: futures::Stream> Take<S> {
pin_utils::unsafe_pinned!(stream: S);
pin_utils::unsafe_unpinned!(remaining: usize);
}
impl<S: futures::Stream> futures::Stream for Take<S> {
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
if self.remaining == 0 {
Poll::Ready(None)
} else {
let next = futures::ready!(self.as_mut().stream().poll_next(cx));
match next {
Some(_) => *self.as_mut().remaining() -= 1,
None => *self.as_mut().remaining() = 0,
}
Poll::Ready(next)
}
}
}