stream_operators/
debounce_time.rs

1use crate::state::State;
2use pin_project_lite::pin_project;
3use std::{
4    pin::Pin,
5    task::{ready, Context, Poll},
6    time::Duration,
7};
8use tokio::time::interval;
9use tokio_stream::{wrappers::IntervalStream, Stream};
10
11pin_project! {
12    #[derive(Debug)]
13    pub struct DebounceTime<S: Stream> {
14        #[pin]
15        stream: S,
16        #[pin]
17        interval: IntervalStream,
18        item: Option<S::Item>,
19        state: State,
20    }
21}
22
23impl<S: Stream> DebounceTime<S> {
24    pub fn new(stream: S, timeout: Duration) -> Self {
25        Self {
26            stream,
27            interval: IntervalStream::new(interval(timeout)),
28            item: None,
29            state: State::HasNext,
30        }
31    }
32}
33
34impl<S: Stream> Stream for DebounceTime<S> {
35    type Item = S::Item;
36
37    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38        let this = self.project();
39        if *this.state == State::HasNext {
40            match this.stream.poll_next(cx) {
41                Poll::Ready(Some(item)) => {
42                    *this.item = Some(item);
43                }
44                Poll::Ready(None) => {
45                    *this.state = State::HasNone;
46                }
47                Poll::Pending => {}
48            }
49        }
50
51        match this.state {
52            State::HasNext => {
53                ready!(this.interval.poll_next(cx));
54                if let Some(item) = this.item.take() {
55                    Poll::Ready(Some(item))
56                } else {
57                    Poll::Pending
58                }
59            }
60            State::HasNone => {
61                if let Some(item) = this.item.take() {
62                    cx.waker().wake_by_ref();
63                    Poll::Ready(Some(item))
64                } else {
65                    *this.state = State::Done;
66                    Poll::Ready(None)
67                }
68            }
69            State::Done => panic!("poll_next called after completion"),
70        }
71    }
72}
73
74#[cfg(test)]
75mod tests {
76    use crate::{test_utils::interval_value, StreamOps};
77    use std::time::Duration;
78    use tokio::time::interval;
79    use tokio_stream::{wrappers::IntervalStream, StreamExt};
80
81    #[tokio::test]
82    async fn debounce_time_should_work() {
83        let mut stream = interval_value(Duration::from_millis(10), 1, 1)
84            .take(30)
85            .debounce_time(Duration::from_millis(100));
86
87        assert_eq!(stream.next().await, Some(1));
88        assert_eq!(stream.next().await, Some(11));
89        assert_eq!(stream.next().await, Some(21));
90        assert_eq!(stream.next().await, Some(30));
91
92        assert_eq!(stream.next().await, None);
93    }
94
95    #[tokio::test]
96    async fn debounce_time_should_work_with_empty_stream() {
97        let mut stream = IntervalStream::new(interval(Duration::from_millis(1)))
98            .take(0)
99            .debounce_time(Duration::from_millis(10));
100
101        assert_eq!(stream.next().await, None);
102    }
103}