stream_operators/
dinstinct_until_changed.rs1use crate::state::State;
2use pin_project_lite::pin_project;
3use std::{
4 pin::Pin,
5 task::{ready, Context, Poll},
6};
7use tokio_stream::Stream;
8
9pin_project! {
10 #[derive(Debug)]
11 pub struct DistinctUntilChanged<S: Stream> {
12 #[pin]
13 stream: S,
14 prev: Option<S::Item>,
15 state: State,
16 }
17}
18
19impl<S: Stream> DistinctUntilChanged<S> {
20 pub fn new(stream: S) -> Self {
21 Self {
22 stream,
23 prev: None,
24 state: State::HasNext,
25 }
26 }
27}
28
29impl<S> Stream for DistinctUntilChanged<S>
30where
31 S: Stream,
32 S::Item: PartialEq,
33{
34 type Item = S::Item;
35
36 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37 let this = self.project();
38 if *this.state == State::HasNext {
39 let next = ready!(this.stream.poll_next(cx));
40 match (next, this.prev.take()) {
41 (Some(next), Some(prev)) => {
42 if next == prev {
43 *this.prev = Some(next);
44 } else {
45 *this.prev = Some(next);
46 return Poll::Ready(Some(prev));
47 }
48 }
49 (Some(next), None) => {
50 *this.prev = Some(next);
51 }
52 (None, Some(prev)) => {
53 *this.state = State::HasNone;
54 return Poll::Ready(Some(prev));
55 }
56 (None, None) => {
57 *this.state = State::HasNone;
58 }
59 }
60 }
61
62 match this.state {
63 State::HasNext => {
64 cx.waker().wake_by_ref();
65 Poll::Pending
66 }
67 State::HasNone => {
68 *this.state = State::Done;
69 Poll::Ready(None)
70 }
71 State::Done => panic!("poll_next called after completion"),
72 }
73 }
74}
75
76#[cfg(test)]
77mod tests {
78 use std::time::Duration;
79
80 use crate::{test_utils::interval_value, StreamOps};
81 use tokio_stream::{iter, StreamExt};
82
83 #[tokio::test]
84 async fn distinct_until_changed_should_work() {
85 let mut stream = iter(vec![1, 1, 2, 3, 3, 3, 4, 4, 4, 4, 5]).distinct_until_changed();
86 assert_eq!(stream.next().await, Some(1));
87 assert_eq!(stream.next().await, Some(2));
88 assert_eq!(stream.next().await, Some(3));
89 assert_eq!(stream.next().await, Some(4));
90 assert_eq!(stream.next().await, Some(5));
91 assert_eq!(stream.next().await, None);
92 }
93
94 #[tokio::test]
95 async fn distinct_until_changed_empty_should_work() {
96 let mut stream = iter(1..1).distinct_until_changed();
97 assert_eq!(stream.next().await, None);
98 }
99
100 #[tokio::test]
101 async fn distinct_until_changed_one_should_work() {
102 let mut stream = iter(vec![1]).distinct_until_changed();
103 assert_eq!(stream.next().await, Some(1));
104 assert_eq!(stream.next().await, None);
105 }
106
107 #[tokio::test]
108 async fn distinct_until_changed_with_interval_should_work() {
109 let mut stream = interval_value(Duration::from_millis(1), 1, 0)
110 .take(30)
111 .distinct_until_changed();
112 assert_eq!(stream.next().await, Some(1));
113 assert_eq!(stream.next().await, None);
114 }
115}