tokio_stream_ext/
debounce.rs1use core::pin::Pin;
2use core::task::{Context, Poll};
3use futures::{Future, Stream};
4use pin_project_lite::pin_project;
5use std::time::Duration;
6use tokio::time::{Instant, Sleep};
7
8pin_project! {
9 #[must_use = "streams do nothing unless polled"]
11 pub struct Debounce<St: Stream> {
12 #[pin]
13 value: St,
14 #[pin]
15 delay: Sleep,
16 #[pin]
17 debounce_time: Duration,
18 #[pin]
19 last_state: Option<St::Item>
20 }
21}
22
23impl<St> Debounce<St>
24where
25 St: Stream + Unpin,
26{
27 #[allow(dead_code)]
28 pub(super) fn new(stream: St, debounce_time: Duration) -> Debounce<St> {
29 Debounce {
30 value: stream,
31 delay: tokio::time::sleep(debounce_time),
32 debounce_time,
33 last_state: None,
34 }
35 }
36}
37
38impl<St, Item> Stream for Debounce<St>
39where
40 St: Stream<Item = Item>,
41 Item: Clone + Unpin,
42{
43 type Item = St::Item;
44 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
45 let mut me = self.project();
46
47 match me.value.poll_next(cx) {
49 Poll::Ready(Some(v)) => {
50 let d = (*me.debounce_time).clone();
51 me.delay.as_mut().reset(Instant::now() + (d)); *me.last_state = Some(v);
53 }
54 Poll::Ready(None) => {
55 let l = (*me.last_state).clone();
56 *me.last_state = None;
57 return Poll::Ready(l);
58 }
59 _ => (),
60 }
61
62 match me.delay.poll(cx) {
64 Poll::Ready(()) => {
65 if let Some(l) = (*me.last_state).clone() {
66 *me.last_state = None;
67 return Poll::Ready(Some(l));
68 } else {
69 Poll::Pending
70 }
71 }
72 Poll::Pending => Poll::Pending,
73 }
74 }
75
76 fn size_hint(&self) -> (usize, Option<usize>) {
77 self.value.size_hint()
78 }
79}
80
81#[cfg(test)]
82mod test {
83 use crate::StreamOpsExt;
84 use std::time::Duration;
85 use tokio::{sync::mpsc, time::sleep};
86 use tokio_stream::{wrappers::ReceiverStream, StreamExt};
87
88 #[tokio::test]
89 async fn debounce_test() {
90 let (tx, rx) = mpsc::channel(5);
91 let j = tokio::spawn(async move {
92 for i in 1..4 {
93 sleep(Duration::from_millis(100 * i)).await;
94 tx.send(i).await.unwrap();
95 }
96 });
97
98 let mut stream = Box::pin(ReceiverStream::new(rx).debounce(Duration::from_millis(250)));
99
100 assert_eq!(stream.next().await, Some(2));
101 assert_eq!(stream.next().await, Some(3));
102 assert_eq!(stream.next().await, None);
103 assert!(j.await.is_ok());
104 }
105}