tokio_stream_ext/
distinct_until_changed.rs

1use core::fmt;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4use futures::{ready, Stream};
5use pin_project_lite::pin_project;
6
7pin_project! {
8    /// Stream for the [`distinctUntilChanged`](super::StreamOpsExt::distinctUntilChanged) method.
9    #[must_use = "streams do nothing unless polled"]
10    pub struct DistinctUntilChanged<St, Item >
11    where
12        St: Stream<Item = Item>,
13        Item: PartialEq,
14    {
15        #[pin]
16        stream: St,
17        last: Option<Item>,
18    }
19}
20
21impl<St, Item> fmt::Debug for DistinctUntilChanged<St, Item>
22where
23    St: Stream<Item = Item> + fmt::Debug,
24    Item: PartialEq,
25{
26    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27        f.debug_struct("DistinctUntilChanged")
28            .field("stream", &self.stream)
29            .finish()
30    }
31}
32
33impl<St, Item> DistinctUntilChanged<St, Item>
34where
35    St: Stream<Item = Item>,
36    Item: PartialEq,
37{
38    #[allow(dead_code)]
39    pub(super) fn new(stream: St) -> Self {
40        DistinctUntilChanged { stream, last: None }
41    }
42}
43
44impl<St, Item> Stream for DistinctUntilChanged<St, Item>
45where
46    St: Stream<Item = Item>,
47    Item: Clone + PartialEq,
48{
49    type Item = St::Item;
50
51    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52        let me = self.as_mut().project();
53        let next = ready!(me.stream.poll_next(cx));
54        match next {
55            Some(v) if me.last.is_none() => {
56                *me.last = Some(v.clone());
57                Poll::Ready(Some(v))
58            }
59            value @ Some(_) if me.last.is_some() && value != *me.last => {
60                *me.last = value.clone();
61                Poll::Ready(value)
62            }
63            Some(_) => Poll::Pending,
64            None => Poll::Ready(None),
65        }
66    }
67
68    fn size_hint(&self) -> (usize, Option<usize>) {
69        self.stream.size_hint()
70    }
71}