ruchei-itertools 0.0.0-a.3

async itertools, subproject of ruchei
Documentation
use core::{
    pin::Pin,
    task::{Context, Poll},
};

use futures_util::{Stream, ready, stream::FusedStream};
use pin_project::pin_project;

#[pin_project]
#[derive(Debug)]
pub struct DedupEager<S, T = <S as Stream>::Item> {
    #[pin]
    inner: S,
    last: Option<T>,
}

impl<S, T> DedupEager<S, T> {
    pub(crate) fn new(inner: S) -> Self {
        Self { inner, last: None }
    }
}

impl<S: Stream<Item = T>, T: PartialEq + Clone> Stream for DedupEager<S, T> {
    type Item = T;

    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let mut this = self.project();
        while let Some(item) = ready!(this.inner.as_mut().poll_next(cx)) {
            if this.last.as_ref().is_none_or(|last| *last != item) {
                *this.last = Some(item.clone());
                return Poll::Ready(Some(item));
            }
        }
        *this.last = None;
        Poll::Ready(None)
    }
}

impl<S: FusedStream<Item: PartialEq + Clone>> FusedStream for DedupEager<S> {
    fn is_terminated(&self) -> bool {
        self.inner.is_terminated()
    }
}

crate::macros::forward!(DedupEager, T);

#[cfg(test)]
mod test {
    use crate::AsyncItertools;

    #[test]
    fn unique() {
        let s = futures_util::stream::iter([1, 2, 3]);
        assert!(futures_executor::block_on_stream(s.dedup_eager()).eq([1, 2, 3]));
    }

    #[test]
    fn all_duplicates() {
        let s = futures_util::stream::iter([1, 1, 2, 2, 3, 3]);
        assert!(futures_executor::block_on_stream(s.dedup_eager()).eq([1, 2, 3]));
    }
}