1use std::{pin::Pin, task::{Context, Poll}, time::Duration};
2
3use futures::{FutureExt, Stream, StreamExt};
4use tokio::time::{Instant, Sleep, sleep};
5
6pub struct Debounce<S>
7where
8 S: Stream,
9{
10 stream: S,
11 interval: Duration,
12
13 sleep: Sleep,
14 last: Option<S::Item>,
15}
16
17impl<S> Debounce<S>
18where
19 S: Stream + Unpin,
20{
21 pub fn new(stream: S, interval: Duration) -> Debounce<S> {
22 Self { stream, interval, sleep: sleep(Duration::ZERO), last: None }
23 }
24}
25
26impl<S> Stream for Debounce<S>
27where
28 S: Stream + Unpin,
29{
30 type Item = S::Item;
31
32 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
33 let (mut stream, interval, mut sleep, last) = unsafe {
34 let me = self.get_unchecked_mut();
35 (Pin::new(&mut me.stream), me.interval, Pin::new_unchecked(&mut me.sleep), &mut me.last)
36 };
37
38 if sleep.poll_unpin(cx).is_ready() {
39 if let Some(last) = last.take() {
40 return Poll::Ready(Some(last));
41 }
42 }
43
44 while let Poll::Ready(next) = stream.poll_next_unpin(cx) {
45 match next {
46 Some(next) => {
47 *last = Some(next);
48 }
49 None if last.is_none() => {
50 return Poll::Ready(None);
51 }
52 None => {
53 sleep.reset(Instant::now());
54 return Poll::Pending;
55 }
56 }
57 }
58
59 sleep.reset(Instant::now() + interval);
60 Poll::Pending
61 }
62}