stream_operators/
dinstinct_until_changed.rs

1use crate::state::State;
2use pin_project_lite::pin_project;
3use std::{
4    pin::Pin,
5    task::{ready, Context, Poll},
6};
7use tokio_stream::Stream;
8
9pin_project! {
10    #[derive(Debug)]
11    pub struct DistinctUntilChanged<S: Stream> {
12        #[pin]
13        stream: S,
14        prev: Option<S::Item>,
15        state: State,
16    }
17}
18
19impl<S: Stream> DistinctUntilChanged<S> {
20    pub fn new(stream: S) -> Self {
21        Self {
22            stream,
23            prev: None,
24            state: State::HasNext,
25        }
26    }
27}
28
29impl<S> Stream for DistinctUntilChanged<S>
30where
31    S: Stream,
32    S::Item: PartialEq,
33{
34    type Item = S::Item;
35
36    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
37        let this = self.project();
38        if *this.state == State::HasNext {
39            let next = ready!(this.stream.poll_next(cx));
40            match (next, this.prev.take()) {
41                (Some(next), Some(prev)) => {
42                    if next == prev {
43                        *this.prev = Some(next);
44                    } else {
45                        *this.prev = Some(next);
46                        return Poll::Ready(Some(prev));
47                    }
48                }
49                (Some(next), None) => {
50                    *this.prev = Some(next);
51                }
52                (None, Some(prev)) => {
53                    *this.state = State::HasNone;
54                    return Poll::Ready(Some(prev));
55                }
56                (None, None) => {
57                    *this.state = State::HasNone;
58                }
59            }
60        }
61
62        match this.state {
63            State::HasNext => {
64                cx.waker().wake_by_ref();
65                Poll::Pending
66            }
67            State::HasNone => {
68                *this.state = State::Done;
69                Poll::Ready(None)
70            }
71            State::Done => panic!("poll_next called after completion"),
72        }
73    }
74}
75
76#[cfg(test)]
77mod tests {
78    use std::time::Duration;
79
80    use crate::{test_utils::interval_value, StreamOps};
81    use tokio_stream::{iter, StreamExt};
82
83    #[tokio::test]
84    async fn distinct_until_changed_should_work() {
85        let mut stream = iter(vec![1, 1, 2, 3, 3, 3, 4, 4, 4, 4, 5]).distinct_until_changed();
86        assert_eq!(stream.next().await, Some(1));
87        assert_eq!(stream.next().await, Some(2));
88        assert_eq!(stream.next().await, Some(3));
89        assert_eq!(stream.next().await, Some(4));
90        assert_eq!(stream.next().await, Some(5));
91        assert_eq!(stream.next().await, None);
92    }
93
94    #[tokio::test]
95    async fn distinct_until_changed_empty_should_work() {
96        let mut stream = iter(1..1).distinct_until_changed();
97        assert_eq!(stream.next().await, None);
98    }
99
100    #[tokio::test]
101    async fn distinct_until_changed_one_should_work() {
102        let mut stream = iter(vec![1]).distinct_until_changed();
103        assert_eq!(stream.next().await, Some(1));
104        assert_eq!(stream.next().await, None);
105    }
106
107    #[tokio::test]
108    async fn distinct_until_changed_with_interval_should_work() {
109        let mut stream = interval_value(Duration::from_millis(1), 1, 0)
110            .take(30)
111            .distinct_until_changed();
112        assert_eq!(stream.next().await, Some(1));
113        assert_eq!(stream.next().await, None);
114    }
115}