futures_rx/stream_ext/
distinct_until_changed.rs1use std::{
2 hash::{DefaultHasher, Hash, Hasher},
3 pin::Pin,
4 task::{Context, Poll},
5};
6
7use futures::{
8 stream::{Fuse, FusedStream},
9 Stream, StreamExt,
10};
11use pin_project_lite::pin_project;
12
13pin_project! {
14 #[must_use = "streams do nothing unless polled"]
16 pub struct DistinctUntilChanged<S: Stream>
17 {
18 #[pin]
19 stream: Fuse<S>,
20 #[pin]
21 previous: Option<u64>,
22 }
23}
24
25impl<S: Stream> DistinctUntilChanged<S> {
26 pub(crate) fn new(stream: S) -> Self {
27 Self {
28 stream: stream.fuse(),
29 previous: None,
30 }
31 }
32}
33
34impl<S> FusedStream for DistinctUntilChanged<S>
35where
36 S: FusedStream,
37 S::Item: Hash,
38{
39 fn is_terminated(&self) -> bool {
40 self.stream.is_terminated()
41 }
42}
43
44impl<S> Stream for DistinctUntilChanged<S>
45where
46 S: Stream,
47 S::Item: Hash,
48{
49 type Item = S::Item;
50
51 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52 let mut this = self.project();
53
54 match this.stream.poll_next(cx) {
55 Poll::Ready(Some(event)) => {
56 let mut hasher = DefaultHasher::new();
57
58 event.hash(&mut hasher);
59
60 let hash = hasher.finish();
61 let should_emit = match this.previous.as_ref().get_ref() {
62 Some(it) => *it != hash,
63 None => true,
64 };
65
66 if should_emit {
67 this.previous.set(Some(hasher.finish()));
68
69 Poll::Ready(Some(event))
70 } else {
71 cx.waker().wake_by_ref();
72
73 Poll::Pending
74 }
75 }
76 Poll::Ready(None) => Poll::Ready(None),
77 Poll::Pending => Poll::Pending,
78 }
79 }
80
81 fn size_hint(&self) -> (usize, Option<usize>) {
82 let (lower, upper) = self.stream.size_hint();
83 let lower = if lower > 0 { 1 } else { 0 };
84
85 (lower, upper)
86 }
87}
88
89#[cfg(test)]
90mod test {
91 use futures::{executor::block_on, stream, StreamExt};
92
93 use crate::RxExt;
94
95 #[test]
96 fn smoke() {
97 block_on(async {
98 let stream = stream::iter([1, 1, 2, 3, 3, 3, 4, 5]);
99 let all_events = stream.distinct_until_changed().collect::<Vec<_>>().await;
100
101 assert_eq!(all_events, [1, 2, 3, 4, 5]);
102 });
103 }
104}