mundy/stream_utils/
dedup.rs1use futures_lite::ready;
2use futures_lite::stream::Stream;
3use pin_project_lite::pin_project;
4use std::pin::Pin;
5use std::task::{Context, Poll};
6
7pin_project! {
8 pub struct Dedup<S: Stream> {
9 #[pin]
10 stream: S,
11 current: Option<<S as Stream>::Item>,
12 }
13}
14
15impl<S: Stream> Dedup<S> {
16 pub(crate) fn new(stream: S) -> Self {
17 Dedup {
18 stream,
19 current: None,
20 }
21 }
22}
23
24impl<S> Stream for Dedup<S>
25where
26 S: Stream,
27 S::Item: Clone,
28 S::Item: PartialEq<S::Item>,
29{
30 type Item = S::Item;
31
32 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
33 let this = self.project();
34 let next = ready!(this.stream.poll_next(cx));
35
36 match next {
37 Some(v) if this.current.as_ref() != Some(&v) => {
38 *this.current = Some(v.clone());
39 Poll::Ready(Some(v))
40 }
41 Some(_) => {
42 cx.waker().wake_by_ref();
43 Poll::Pending
44 }
45 None => Poll::Ready(None),
46 }
47 }
48}