use core::fmt;
use core::future::Future;
use core::marker::PhantomData;
use core::mem;
use core::pin::Pin;
use core::task::{Context, Poll};
#[doc(no_inline)]
pub use futures_core::stream::Stream;
use pin_project_lite::pin_project;
#[cfg(feature = "std")]
use crate::future;
use crate::ready;
#[cfg(not(feature = "std"))]
extern crate alloc;
#[cfg(not(feature = "std"))]
use alloc::boxed::Box;
pub fn block_on<S: Stream + Unpin>(stream: S) -> BlockOn<S> {
BlockOn(stream)
}
#[derive(Debug)]
pub struct BlockOn<S>(S);
#[cfg(feature = "std")]
impl<S: Stream + Unpin> Iterator for BlockOn<S> {
type Item = S::Item;
fn next(&mut self) -> Option<Self::Item> {
future::block_on(self.0.next())
}
}
pub fn empty<T>() -> Empty<T> {
Empty {
_marker: PhantomData,
}
}
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Empty<T> {
_marker: PhantomData<T>,
}
impl<T> Unpin for Empty<T> {}
impl<T> Stream for Empty<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(0))
}
}
pub fn iter<I: IntoIterator>(iter: I) -> Iter<I::IntoIter> {
Iter {
iter: iter.into_iter(),
}
}
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Iter<I> {
iter: I,
}
impl<I> Unpin for Iter<I> {}
impl<I: Iterator> Stream for Iter<I> {
type Item = I::Item;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(self.iter.next())
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.iter.size_hint()
}
}
pub fn once<T>(t: T) -> Once<T> {
Once { value: Some(t) }
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Once<T> {
value: Option<T>,
}
}
impl<T> Stream for Once<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Ready(self.project().value.take())
}
fn size_hint(&self) -> (usize, Option<usize>) {
if self.value.is_some() {
(1, Some(1))
} else {
(0, Some(0))
}
}
}
pub fn pending<T>() -> Pending<T> {
Pending {
_marker: PhantomData,
}
}
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Pending<T> {
_marker: PhantomData<T>,
}
impl<T> Unpin for Pending<T> {}
impl<T> Stream for Pending<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<T>> {
Poll::Pending
}
fn size_hint(&self) -> (usize, Option<usize>) {
(0, Some(0))
}
}
pub fn poll_fn<T, F>(f: F) -> PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
{
PollFn { f }
}
#[derive(Clone)]
#[must_use = "streams do nothing unless polled"]
pub struct PollFn<F> {
f: F,
}
impl<F> Unpin for PollFn<F> {}
impl<F> fmt::Debug for PollFn<F> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PollFn").finish()
}
}
impl<T, F> Stream for PollFn<F>
where
F: FnMut(&mut Context<'_>) -> Poll<Option<T>>,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
(&mut self.f)(cx)
}
}
pub fn repeat<T: Clone>(item: T) -> Repeat<T> {
Repeat { item }
}
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Repeat<T> {
item: T,
}
impl<T> Unpin for Repeat<T> {}
impl<T: Clone> Stream for Repeat<T> {
type Item = T;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(Some(self.item.clone()))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(usize::max_value(), None)
}
}
pub fn repeat_with<T, F>(repeater: F) -> RepeatWith<F>
where
F: FnMut() -> T,
{
RepeatWith { f: repeater }
}
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct RepeatWith<F> {
f: F,
}
impl<F> Unpin for RepeatWith<F> {}
impl<T, F> Stream for RepeatWith<F>
where
F: FnMut() -> T,
{
type Item = T;
fn poll_next(mut self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let item = (&mut self.f)();
Poll::Ready(Some(item))
}
fn size_hint(&self) -> (usize, Option<usize>) {
(usize::max_value(), None)
}
}
pub fn unfold<T, F, Fut, Item>(seed: T, f: F) -> Unfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
Unfold {
f,
state: Some(seed),
fut: None,
}
}
pin_project! {
#[derive(Clone)]
#[must_use = "streams do nothing unless polled"]
pub struct Unfold<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
}
}
impl<T, F, Fut> fmt::Debug for Unfold<T, F, Fut>
where
T: fmt::Debug,
Fut: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("Unfold")
.field("state", &self.state)
.field("fut", &self.fut)
.finish()
}
}
impl<T, F, Fut, Item> Stream for Unfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Option<(Item, T)>>,
{
type Item = Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(state) = this.state.take() {
this.fut.set(Some((this.f)(state)));
}
let step = ready!(this
.fut
.as_mut()
.as_pin_mut()
.expect("`Unfold` must not be polled after it returned `Poll::Ready(None)`")
.poll(cx));
this.fut.set(None);
if let Some((item, next_state)) = step {
*this.state = Some(next_state);
Poll::Ready(Some(item))
} else {
Poll::Ready(None)
}
}
}
pub fn try_unfold<T, E, F, Fut, Item>(init: T, f: F) -> TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Result<Option<(Item, T)>, E>>,
{
TryUnfold {
f,
state: Some(init),
fut: None,
}
}
pin_project! {
#[derive(Clone)]
#[must_use = "streams do nothing unless polled"]
pub struct TryUnfold<T, F, Fut> {
f: F,
state: Option<T>,
#[pin]
fut: Option<Fut>,
}
}
impl<T, F, Fut> fmt::Debug for TryUnfold<T, F, Fut>
where
T: fmt::Debug,
Fut: fmt::Debug,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TryUnfold")
.field("state", &self.state)
.field("fut", &self.fut)
.finish()
}
}
impl<T, E, F, Fut, Item> Stream for TryUnfold<T, F, Fut>
where
F: FnMut(T) -> Fut,
Fut: Future<Output = Result<Option<(Item, T)>, E>>,
{
type Item = Result<Item, E>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if let Some(state) = this.state.take() {
this.fut.set(Some((this.f)(state)));
}
match this.fut.as_mut().as_pin_mut() {
None => {
Poll::Ready(None)
}
Some(future) => {
let step = ready!(future.poll(cx));
this.fut.set(None);
match step {
Ok(Some((item, next_state))) => {
*this.state = Some(next_state);
Poll::Ready(Some(Ok(item)))
}
Ok(None) => Poll::Ready(None),
Err(e) => Poll::Ready(Some(Err(e))),
}
}
}
}
}
pub trait StreamExt: Stream {
fn next(&mut self) -> NextFuture<'_, Self>
where
Self: Unpin,
{
NextFuture { stream: self }
}
fn count(self) -> CountFuture<Self>
where
Self: Sized,
{
CountFuture {
stream: self,
count: 0,
}
}
fn map<T, F>(self, f: F) -> Map<Self, F>
where
Self: Sized,
F: FnMut(Self::Item) -> T,
{
Map { stream: self, f }
}
fn filter<P>(self, predicate: P) -> Filter<Self, P>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
Filter {
stream: self,
predicate,
}
}
fn filter_map<T, F>(self, f: F) -> FilterMap<Self, F>
where
Self: Sized,
F: FnMut(Self::Item) -> Option<T>,
{
FilterMap { stream: self, f }
}
fn take(self, n: usize) -> Take<Self>
where
Self: Sized,
{
Take { stream: self, n }
}
fn take_while<P>(self, predicate: P) -> TakeWhile<Self, P>
where
Self: Sized,
P: FnMut(&Self::Item) -> bool,
{
TakeWhile {
stream: self,
predicate,
}
}
fn step_by(self, step: usize) -> StepBy<Self>
where
Self: Sized,
{
assert!(step > 0, "`step` must be greater than zero");
StepBy {
stream: self,
step,
i: 0,
}
}
fn chain<U>(self, other: U) -> Chain<Self, U>
where
Self: Sized,
U: Stream<Item = Self::Item> + Sized,
{
Chain {
first: self.fuse(),
second: other.fuse(),
}
}
fn cloned<'a, T>(self) -> Cloned<Self>
where
Self: Sized + Stream<Item = &'a T>,
T: Clone + 'a,
{
Cloned { stream: self }
}
fn copied<'a, T>(self) -> Copied<Self>
where
Self: Sized + Stream<Item = &'a T>,
T: Copy + 'a,
{
Copied { stream: self }
}
fn collect<C: Default + Extend<Self::Item>>(self) -> CollectFuture<Self, C>
where
Self: Sized,
{
CollectFuture {
stream: self,
collection: Default::default(),
}
}
fn try_collect<T, C: Default + Extend<T>>(self) -> TryCollectFuture<Self, C>
where
Self: Sized,
Self::Item: try_hack::Result<Ok = T>,
{
TryCollectFuture {
stream: self,
items: Default::default(),
}
}
fn fold<T, F>(self, init: T, f: F) -> FoldFuture<Self, F, T>
where
Self: Sized,
F: FnMut(T, Self::Item) -> T,
{
FoldFuture {
stream: self,
f,
acc: Some(init),
}
}
fn try_fold<T, E, F, B>(&mut self, init: B, f: F) -> TryFoldFuture<'_, Self, F, B>
where
Self: Unpin + Sized,
Self::Item: try_hack::Result<Ok = T, Err = E>,
F: FnMut(B, T) -> Result<B, E>,
{
TryFoldFuture {
stream: self,
f,
acc: Some(init),
}
}
fn fuse(self) -> Fuse<Self>
where
Self: Sized,
{
Fuse {
stream: self,
done: false,
}
}
fn cycle(self) -> Cycle<Self>
where
Self: Clone + Sized,
{
Cycle {
orig: self.clone(),
stream: self,
}
}
fn enumerate(self) -> Enumerate<Self>
where
Self: Sized,
{
Enumerate { stream: self, i: 0 }
}
fn inspect<F>(self, f: F) -> Inspect<Self, F>
where
Self: Sized,
F: FnMut(&Self::Item),
{
Inspect { stream: self, f }
}
fn boxed<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + Send + 'a>>
where
Self: Sized + Send + 'a,
{
Box::pin(self)
}
fn boxed_local<'a>(self) -> Pin<Box<dyn Stream<Item = Self::Item> + 'a>>
where
Self: Sized + 'a,
{
Box::pin(self)
}
}
impl<S: ?Sized> StreamExt for S where S: Stream {}
mod try_hack {
pub trait Result {
type Ok;
type Err;
fn into_result(self) -> core::result::Result<Self::Ok, Self::Err>;
}
impl<T, E> Result for core::result::Result<T, E> {
type Ok = T;
type Err = E;
fn into_result(self) -> core::result::Result<T, E> {
self
}
}
}
pub type Boxed<T> = Pin<Box<dyn Stream<Item = T> + Send>>;
pub type BoxedLocal<T> = Pin<Box<dyn Stream<Item = T>>>;
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct NextFuture<'a, T: ?Sized> {
stream: &'a mut T,
}
impl<S: ?Sized + Unpin> Unpin for NextFuture<'_, S> {}
impl<S: Stream + Unpin + ?Sized> Future for NextFuture<'_, S> {
type Output = Option<S::Item>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut *self.stream).poll_next(cx)
}
}
pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CountFuture<S: ?Sized> {
count: usize,
#[pin]
stream: S,
}
}
impl<S: Stream + ?Sized> Future for CountFuture<S> {
type Output = usize;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(self.as_mut().project().stream.poll_next(cx)) {
None => return Poll::Ready(self.count),
Some(_) => *self.as_mut().project().count += 1,
}
}
}
}
pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct CollectFuture<S, C> {
#[pin]
stream: S,
collection: C,
}
}
impl<S, C> Future for CollectFuture<S, C>
where
S: Stream,
C: Default + Extend<S::Item>,
{
type Output = C;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<C> {
let mut this = self.as_mut().project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(e) => this.collection.extend(Some(e)),
None => {
return Poll::Ready(mem::replace(self.project().collection, Default::default()))
}
}
}
}
}
pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryCollectFuture<S, C> {
#[pin]
stream: S,
items: C,
}
}
impl<T, E, S, C> Future for TryCollectFuture<S, C>
where
S: Stream<Item = Result<T, E>>,
C: Default + Extend<T>,
{
type Output = Result<C, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
Poll::Ready(Ok(loop {
match ready!(this.stream.as_mut().poll_next(cx)?) {
Some(x) => this.items.extend(Some(x)),
None => break mem::replace(this.items, Default::default()),
}
}))
}
}
pin_project! {
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct FoldFuture<S, F, T> {
#[pin]
stream: S,
f: F,
acc: Option<T>,
}
}
impl<S, F, T> Future for FoldFuture<S, F, T>
where
S: Stream + Sized,
F: FnMut(T, S::Item) -> T,
{
type Output = T;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(v) => {
let old = this.acc.take().unwrap();
let new = (this.f)(old, v);
*this.acc = Some(new);
}
None => return Poll::Ready(this.acc.take().unwrap()),
}
}
}
}
#[derive(Debug)]
#[must_use = "futures do nothing unless you `.await` or poll them"]
pub struct TryFoldFuture<'a, S, F, B> {
stream: &'a mut S,
f: F,
acc: Option<B>,
}
impl<'a, S, F, B> Unpin for TryFoldFuture<'a, S, F, B> {}
impl<'a, T, E, S, F, B> Future for TryFoldFuture<'a, S, F, B>
where
S: Stream + Unpin,
S::Item: try_hack::Result<Ok = T, Err = E>,
F: FnMut(B, T) -> Result<B, E>,
{
type Output = Result<B, E>;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
loop {
match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
Some(res) => {
use try_hack::Result as _;
match res.into_result() {
Err(e) => return Poll::Ready(Err(e)),
Ok(t) => {
let old = self.acc.take().unwrap();
let new = (&mut self.f)(old, t);
match new {
Ok(t) => self.acc = Some(t),
Err(e) => return Poll::Ready(Err(e)),
}
}
}
}
None => return Poll::Ready(Ok(self.acc.take().unwrap())),
}
}
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Fuse<S> {
#[pin]
stream: S,
done: bool,
}
}
impl<S: Stream> Stream for Fuse<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
if *this.done {
Poll::Ready(None)
} else {
let next = ready!(this.stream.poll_next(cx));
if next.is_none() {
*this.done = true;
}
Poll::Ready(next)
}
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Map<S, F> {
#[pin]
stream: S,
f: F,
}
}
impl<S, F, T> Stream for Map<S, F>
where
S: Stream,
F: FnMut(S::Item) -> T,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = ready!(this.stream.poll_next(cx));
Poll::Ready(next.map(this.f))
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Filter<S, P> {
#[pin]
stream: S,
predicate: P,
}
}
impl<S, P> Stream for Filter<S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(v) if (this.predicate)(&v) => return Poll::Ready(Some(v)),
Some(_) => {}
}
}
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct FilterMap<S, F> {
#[pin]
stream: S,
f: F,
}
}
impl<S, F, T> Stream for FilterMap<S, F>
where
S: Stream,
F: FnMut(S::Item) -> Option<T>,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
None => return Poll::Ready(None),
Some(v) => {
if let Some(t) = (this.f)(v) {
return Poll::Ready(Some(t));
}
}
}
}
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Take<S> {
#[pin]
stream: S,
n: usize,
}
}
impl<S: Stream> Stream for Take<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<S::Item>> {
let this = self.project();
if *this.n == 0 {
Poll::Ready(None)
} else {
let next = ready!(this.stream.poll_next(cx));
match next {
Some(_) => *this.n -= 1,
None => *this.n = 0,
}
Poll::Ready(next)
}
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct TakeWhile<S, P> {
#[pin]
stream: S,
predicate: P,
}
}
impl<S, P> Stream for TakeWhile<S, P>
where
S: Stream,
P: FnMut(&S::Item) -> bool,
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match ready!(this.stream.poll_next(cx)) {
Some(v) => {
if (this.predicate)(&v) {
Poll::Ready(Some(v))
} else {
Poll::Ready(None)
}
}
None => Poll::Ready(None),
}
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct StepBy<S> {
#[pin]
stream: S,
step: usize,
i: usize,
}
}
impl<S: Stream> Stream for StepBy<S> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
loop {
match ready!(this.stream.as_mut().poll_next(cx)) {
Some(v) => {
if *this.i == 0 {
*this.i = *this.step - 1;
return Poll::Ready(Some(v));
} else {
*this.i -= 1;
}
}
None => return Poll::Ready(None),
}
}
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Chain<S, U> {
#[pin]
first: Fuse<S>,
#[pin]
second: Fuse<U>,
}
}
impl<S: Stream, U: Stream<Item = S::Item>> Stream for Chain<S, U> {
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
if !this.first.done {
let next = ready!(this.first.as_mut().poll_next(cx));
if let Some(next) = next {
return Poll::Ready(Some(next));
}
}
if !this.second.done {
let next = ready!(this.second.as_mut().poll_next(cx));
if let Some(next) = next {
return Poll::Ready(Some(next));
}
}
if this.first.done && this.second.done {
Poll::Ready(None)
} else {
Poll::Pending
}
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Cloned<S> {
#[pin]
stream: S,
}
}
impl<'a, S, T: 'a> Stream for Cloned<S>
where
S: Stream<Item = &'a T>,
T: Clone,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = ready!(this.stream.poll_next(cx));
Poll::Ready(next.cloned())
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Copied<S> {
#[pin]
stream: S,
}
}
impl<'a, S, T: 'a> Stream for Copied<S>
where
S: Stream<Item = &'a T>,
T: Copy,
{
type Item = T;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
let next = ready!(this.stream.poll_next(cx));
Poll::Ready(next.copied())
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Cycle<S> {
orig: S,
#[pin]
stream: S,
}
}
impl<S> Stream for Cycle<S>
where
S: Stream + Clone,
{
type Item = S::Item;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
match ready!(self.as_mut().project().stream.as_mut().poll_next(cx)) {
Some(item) => Poll::Ready(Some(item)),
None => {
let new = self.as_mut().orig.clone();
self.as_mut().project().stream.set(new);
self.project().stream.poll_next(cx)
}
}
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Enumerate<S> {
#[pin]
stream: S,
i: usize,
}
}
impl<S> Stream for Enumerate<S>
where
S: Stream,
{
type Item = (usize, S::Item);
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
match ready!(this.stream.poll_next(cx)) {
Some(v) => {
let ret = (*this.i, v);
*this.i += 1;
Poll::Ready(Some(ret))
}
None => Poll::Ready(None),
}
}
}
pin_project! {
#[derive(Clone, Debug)]
#[must_use = "streams do nothing unless polled"]
pub struct Inspect<S, F> {
#[pin]
stream: S,
f: F,
}
}
impl<S, F> Stream for Inspect<S, F>
where
S: Stream,
F: FnMut(&S::Item),
{
type Item = S::Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut this = self.project();
let next = ready!(this.stream.as_mut().poll_next(cx));
if let Some(x) = &next {
(this.f)(x);
}
Poll::Ready(next)
}
}