futures_rx/stream_ext/
distinct.rs1use std::{
2 collections::HashSet,
3 hash::{DefaultHasher, Hash, Hasher},
4 pin::Pin,
5 task::{Context, Poll},
6};
7
8use futures::{
9 stream::{Fuse, FusedStream},
10 Stream, StreamExt,
11};
12use pin_project_lite::pin_project;
13
14pin_project! {
15 #[must_use = "streams do nothing unless polled"]
17 pub struct Distinct<S: Stream>
18 {
19 #[pin]
20 stream: Fuse<S>,
21 #[pin]
22 seen: HashSet<u64>,
23 }
24}
25
26impl<S: Stream> Distinct<S> {
27 pub(crate) fn new(stream: S) -> Self {
28 Self {
29 stream: stream.fuse(),
30 seen: HashSet::new(),
31 }
32 }
33}
34
35impl<S> FusedStream for Distinct<S>
36where
37 S: FusedStream,
38 S::Item: Hash,
39{
40 fn is_terminated(&self) -> bool {
41 self.stream.is_terminated()
42 }
43}
44
45impl<S> Stream for Distinct<S>
46where
47 S: Stream,
48 S::Item: Hash,
49{
50 type Item = S::Item;
51
52 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
53 let mut this = self.project();
54
55 match this.stream.poll_next(cx) {
56 Poll::Ready(Some(event)) => {
57 let mut hasher = DefaultHasher::new();
58
59 event.hash(&mut hasher);
60
61 let should_emit = this.seen.as_mut().get_mut().insert(hasher.finish());
62
63 if should_emit {
64 Poll::Ready(Some(event))
65 } else {
66 cx.waker().wake_by_ref();
67
68 Poll::Pending
69 }
70 }
71 Poll::Ready(None) => Poll::Ready(None),
72 Poll::Pending => Poll::Pending,
73 }
74 }
75
76 fn size_hint(&self) -> (usize, Option<usize>) {
77 let (lower, upper) = self.stream.size_hint();
78 let lower = if lower > 0 { 1 } else { 0 };
79
80 (lower, upper)
81 }
82}
83
84#[cfg(test)]
85mod test {
86 use futures::{executor::block_on, stream, StreamExt};
87
88 use crate::RxExt;
89
90 #[test]
91 fn smoke() {
92 block_on(async {
93 let stream = stream::iter([1, 1, 2, 1, 3, 2, 4, 5]);
94 let all_events = stream.distinct().collect::<Vec<_>>().await;
95
96 assert_eq!(all_events, [1, 2, 3, 4, 5]);
97 });
98 }
99}