tokio_stream_ext/
distinct_until_changed.rs1use core::fmt;
2use core::pin::Pin;
3use core::task::{Context, Poll};
4use futures::{ready, Stream};
5use pin_project_lite::pin_project;
6
7pin_project! {
8 #[must_use = "streams do nothing unless polled"]
10 pub struct DistinctUntilChanged<St, Item >
11 where
12 St: Stream<Item = Item>,
13 Item: PartialEq,
14 {
15 #[pin]
16 stream: St,
17 last: Option<Item>,
18 }
19}
20
21impl<St, Item> fmt::Debug for DistinctUntilChanged<St, Item>
22where
23 St: Stream<Item = Item> + fmt::Debug,
24 Item: PartialEq,
25{
26 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
27 f.debug_struct("DistinctUntilChanged")
28 .field("stream", &self.stream)
29 .finish()
30 }
31}
32
33impl<St, Item> DistinctUntilChanged<St, Item>
34where
35 St: Stream<Item = Item>,
36 Item: PartialEq,
37{
38 #[allow(dead_code)]
39 pub(super) fn new(stream: St) -> Self {
40 DistinctUntilChanged { stream, last: None }
41 }
42}
43
44impl<St, Item> Stream for DistinctUntilChanged<St, Item>
45where
46 St: Stream<Item = Item>,
47 Item: Clone + PartialEq,
48{
49 type Item = St::Item;
50
51 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
52 let me = self.as_mut().project();
53 let next = ready!(me.stream.poll_next(cx));
54 match next {
55 Some(v) if me.last.is_none() => {
56 *me.last = Some(v.clone());
57 Poll::Ready(Some(v))
58 }
59 value @ Some(_) if me.last.is_some() && value != *me.last => {
60 *me.last = value.clone();
61 Poll::Ready(value)
62 }
63 Some(_) => Poll::Pending,
64 None => Poll::Ready(None),
65 }
66 }
67
68 fn size_hint(&self) -> (usize, Option<usize>) {
69 self.stream.size_hint()
70 }
71}