tokio_stream_ext/
debounce.rs

1use core::pin::Pin;
2use core::task::{Context, Poll};
3use futures::{Future, Stream};
4use pin_project_lite::pin_project;
5use std::time::Duration;
6use tokio::time::{Instant, Sleep};
7
8pin_project! {
9    /// Stream for the [`distinctUntilChanged`](super::StreamOpsExt::distinctUntilChanged) method.
10    #[must_use = "streams do nothing unless polled"]
11    pub struct Debounce<St: Stream> {
12        #[pin]
13        value: St,
14        #[pin]
15        delay: Sleep,
16        #[pin]
17        debounce_time: Duration,
18        #[pin]
19        last_state: Option<St::Item>
20    }
21}
22
23impl<St> Debounce<St>
24where
25    St: Stream + Unpin,
26{
27    #[allow(dead_code)]
28    pub(super) fn new(stream: St, debounce_time: Duration) -> Debounce<St> {
29        Debounce {
30            value: stream,
31            delay: tokio::time::sleep(debounce_time),
32            debounce_time,
33            last_state: None,
34        }
35    }
36}
37
38impl<St, Item> Stream for Debounce<St>
39where
40    St: Stream<Item = Item>,
41    Item: Clone + Unpin,
42{
43    type Item = St::Item;
44    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45        let mut me = self.project();
46
47        // First, try polling the stream
48        match me.value.poll_next(cx) {
49            Poll::Ready(Some(v)) => {
50                let d = (*me.debounce_time).clone();
51                me.delay.as_mut().reset(Instant::now() + (d)); // FixMe doubleing issue
52                *me.last_state = Some(v);
53            }
54            Poll::Ready(None) => {
55                let l = (*me.last_state).clone();
56                *me.last_state = None;
57                return Poll::Ready(l);
58            }
59            _ => (),
60        }
61
62        // Now check the timer
63        match me.delay.poll(cx) {
64            Poll::Ready(()) => {
65                if let Some(l) = (*me.last_state).clone() {
66                    *me.last_state = None;
67                    return Poll::Ready(Some(l));
68                } else {
69                    Poll::Pending
70                }
71            }
72            Poll::Pending => Poll::Pending,
73        }
74    }
75
76    fn size_hint(&self) -> (usize, Option<usize>) {
77        self.value.size_hint()
78    }
79}
80
81#[cfg(test)]
82mod test {
83    use crate::StreamOpsExt;
84    use std::time::Duration;
85    use tokio::{sync::mpsc, time::sleep};
86    use tokio_stream::{wrappers::ReceiverStream, StreamExt};
87
88    #[tokio::test]
89    async fn debounce_test() {
90        let (tx, rx) = mpsc::channel(5);
91        let j = tokio::spawn(async move {
92            for i in 1..4 {
93                sleep(Duration::from_millis(100 * i)).await;
94                tx.send(i).await.unwrap();
95            }
96        });
97
98        let mut stream = Box::pin(ReceiverStream::new(rx).debounce(Duration::from_millis(250)));
99
100        assert_eq!(stream.next().await, Some(2));
101        assert_eq!(stream.next().await, Some(3));
102        assert_eq!(stream.next().await, None);
103        assert!(j.await.is_ok());
104    }
105}