stream_operators/
debounce_time.rs1use crate::state::State;
2use pin_project_lite::pin_project;
3use std::{
4 pin::Pin,
5 task::{ready, Context, Poll},
6 time::Duration,
7};
8use tokio::time::interval;
9use tokio_stream::{wrappers::IntervalStream, Stream};
10
11pin_project! {
12 #[derive(Debug)]
13 pub struct DebounceTime<S: Stream> {
14 #[pin]
15 stream: S,
16 #[pin]
17 interval: IntervalStream,
18 item: Option<S::Item>,
19 state: State,
20 }
21}
22
23impl<S: Stream> DebounceTime<S> {
24 pub fn new(stream: S, timeout: Duration) -> Self {
25 Self {
26 stream,
27 interval: IntervalStream::new(interval(timeout)),
28 item: None,
29 state: State::HasNext,
30 }
31 }
32}
33
34impl<S: Stream> Stream for DebounceTime<S> {
35 type Item = S::Item;
36
37 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
38 let this = self.project();
39 if *this.state == State::HasNext {
40 match this.stream.poll_next(cx) {
41 Poll::Ready(Some(item)) => {
42 *this.item = Some(item);
43 }
44 Poll::Ready(None) => {
45 *this.state = State::HasNone;
46 }
47 Poll::Pending => {}
48 }
49 }
50
51 match this.state {
52 State::HasNext => {
53 ready!(this.interval.poll_next(cx));
54 if let Some(item) = this.item.take() {
55 Poll::Ready(Some(item))
56 } else {
57 Poll::Pending
58 }
59 }
60 State::HasNone => {
61 if let Some(item) = this.item.take() {
62 cx.waker().wake_by_ref();
63 Poll::Ready(Some(item))
64 } else {
65 *this.state = State::Done;
66 Poll::Ready(None)
67 }
68 }
69 State::Done => panic!("poll_next called after completion"),
70 }
71 }
72}
73
74#[cfg(test)]
75mod tests {
76 use crate::{test_utils::interval_value, StreamOps};
77 use std::time::Duration;
78 use tokio::time::interval;
79 use tokio_stream::{wrappers::IntervalStream, StreamExt};
80
81 #[tokio::test]
82 async fn debounce_time_should_work() {
83 let mut stream = interval_value(Duration::from_millis(10), 1, 1)
84 .take(30)
85 .debounce_time(Duration::from_millis(100));
86
87 assert_eq!(stream.next().await, Some(1));
88 assert_eq!(stream.next().await, Some(11));
89 assert_eq!(stream.next().await, Some(21));
90 assert_eq!(stream.next().await, Some(30));
91
92 assert_eq!(stream.next().await, None);
93 }
94
95 #[tokio::test]
96 async fn debounce_time_should_work_with_empty_stream() {
97 let mut stream = IntervalStream::new(interval(Duration::from_millis(1)))
98 .take(0)
99 .debounce_time(Duration::from_millis(10));
100
101 assert_eq!(stream.next().await, None);
102 }
103}