mundy/stream_utils/
dedup.rs

1use futures_lite::ready;
2use futures_lite::stream::Stream;
3use pin_project_lite::pin_project;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7pin_project! {
8    pub struct Dedup<S: Stream> {
9        #[pin]
10        stream: S,
11        current: Option<<S as Stream>::Item>,
12    }
13}
14
15impl<S: Stream> Dedup<S> {
16    pub(crate) fn new(stream: S) -> Self {
17        Dedup {
18            stream,
19            current: None,
20        }
21    }
22}
23
24impl<S> Stream for Dedup<S>
25where
26    S: Stream,
27    S::Item: Clone,
28    S::Item: PartialEq<S::Item>,
29{
30    type Item = S::Item;
31
32    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
33        let this = self.project();
34        let next = ready!(this.stream.poll_next(cx));
35
36        match next {
37            Some(v) if this.current.as_ref() != Some(&v) => {
38                *this.current = Some(v.clone());
39                Poll::Ready(Some(v))
40            }
41            Some(_) => {
42                cx.waker().wake_by_ref();
43                Poll::Pending
44            }
45            None => Poll::Ready(None),
46        }
47    }
48}