futures_rx/stream_ext/
distinct.rs

1use std::{
2    collections::HashSet,
3    hash::{DefaultHasher, Hash, Hasher},
4    pin::Pin,
5    task::{Context, Poll},
6};
7
8use futures::{
9    stream::{Fuse, FusedStream},
10    Stream, StreamExt,
11};
12use pin_project_lite::pin_project;
13
14pin_project! {
15    /// Stream for the [`pairwise`](RxStreamExt::pairwise) method.
16    #[must_use = "streams do nothing unless polled"]
17    pub struct Distinct<S: Stream>
18     {
19        #[pin]
20        stream: Fuse<S>,
21        #[pin]
22        seen: HashSet<u64>,
23    }
24}
25
26impl<S: Stream> Distinct<S> {
27    pub(crate) fn new(stream: S) -> Self {
28        Self {
29            stream: stream.fuse(),
30            seen: HashSet::new(),
31        }
32    }
33}
34
35impl<S> FusedStream for Distinct<S>
36where
37    S: FusedStream,
38    S::Item: Hash,
39{
40    fn is_terminated(&self) -> bool {
41        self.stream.is_terminated()
42    }
43}
44
45impl<S> Stream for Distinct<S>
46where
47    S: Stream,
48    S::Item: Hash,
49{
50    type Item = S::Item;
51
52    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53        let mut this = self.project();
54
55        match this.stream.poll_next(cx) {
56            Poll::Ready(Some(event)) => {
57                let mut hasher = DefaultHasher::new();
58
59                event.hash(&mut hasher);
60
61                let should_emit = this.seen.as_mut().get_mut().insert(hasher.finish());
62
63                if should_emit {
64                    Poll::Ready(Some(event))
65                } else {
66                    cx.waker().wake_by_ref();
67
68                    Poll::Pending
69                }
70            }
71            Poll::Ready(None) => Poll::Ready(None),
72            Poll::Pending => Poll::Pending,
73        }
74    }
75
76    fn size_hint(&self) -> (usize, Option<usize>) {
77        let (lower, upper) = self.stream.size_hint();
78        let lower = if lower > 0 { 1 } else { 0 };
79
80        (lower, upper)
81    }
82}
83
84#[cfg(test)]
85mod test {
86    use futures::{executor::block_on, stream, StreamExt};
87
88    use crate::RxExt;
89
90    #[test]
91    fn smoke() {
92        block_on(async {
93            let stream = stream::iter([1, 1, 2, 1, 3, 2, 4, 5]);
94            let all_events = stream.distinct().collect::<Vec<_>>().await;
95
96            assert_eq!(all_events, [1, 2, 3, 4, 5]);
97        });
98    }
99}