futures_rx/stream_ext/
distinct_until_changed.rs

1use std::{
2    hash::{DefaultHasher, Hash, Hasher},
3    pin::Pin,
4    task::{Context, Poll},
5};
6
7use futures::{
8    stream::{Fuse, FusedStream},
9    Stream, StreamExt,
10};
11use pin_project_lite::pin_project;
12
13pin_project! {
14    /// Stream for the [`pairwise`](RxStreamExt::pairwise) method.
15    #[must_use = "streams do nothing unless polled"]
16    pub struct DistinctUntilChanged<S: Stream>
17     {
18        #[pin]
19        stream: Fuse<S>,
20        #[pin]
21        previous: Option<u64>,
22    }
23}
24
25impl<S: Stream> DistinctUntilChanged<S> {
26    pub(crate) fn new(stream: S) -> Self {
27        Self {
28            stream: stream.fuse(),
29            previous: None,
30        }
31    }
32}
33
34impl<S> FusedStream for DistinctUntilChanged<S>
35where
36    S: FusedStream,
37    S::Item: Hash,
38{
39    fn is_terminated(&self) -> bool {
40        self.stream.is_terminated()
41    }
42}
43
44impl<S> Stream for DistinctUntilChanged<S>
45where
46    S: Stream,
47    S::Item: Hash,
48{
49    type Item = S::Item;
50
51    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52        let mut this = self.project();
53
54        match this.stream.poll_next(cx) {
55            Poll::Ready(Some(event)) => {
56                let mut hasher = DefaultHasher::new();
57
58                event.hash(&mut hasher);
59
60                let hash = hasher.finish();
61                let should_emit = match this.previous.as_ref().get_ref() {
62                    Some(it) => *it != hash,
63                    None => true,
64                };
65
66                if should_emit {
67                    this.previous.set(Some(hasher.finish()));
68
69                    Poll::Ready(Some(event))
70                } else {
71                    cx.waker().wake_by_ref();
72
73                    Poll::Pending
74                }
75            }
76            Poll::Ready(None) => Poll::Ready(None),
77            Poll::Pending => Poll::Pending,
78        }
79    }
80
81    fn size_hint(&self) -> (usize, Option<usize>) {
82        let (lower, upper) = self.stream.size_hint();
83        let lower = if lower > 0 { 1 } else { 0 };
84
85        (lower, upper)
86    }
87}
88
89#[cfg(test)]
90mod test {
91    use futures::{executor::block_on, stream, StreamExt};
92
93    use crate::RxExt;
94
95    #[test]
96    fn smoke() {
97        block_on(async {
98            let stream = stream::iter([1, 1, 2, 3, 3, 3, 4, 5]);
99            let all_events = stream.distinct_until_changed().collect::<Vec<_>>().await;
100
101            assert_eq!(all_events, [1, 2, 3, 4, 5]);
102        });
103    }
104}