yazi_shared/
debounce.rs

1use std::{pin::Pin, task::{Context, Poll}, time::Duration};
2
3use futures::{FutureExt, Stream, StreamExt};
4use tokio::time::{Instant, Sleep, sleep};
5
6pub struct Debounce<S>
7where
8	S: Stream,
9{
10	stream:   S,
11	interval: Duration,
12
13	sleep: Sleep,
14	last:  Option<S::Item>,
15}
16
17impl<S> Debounce<S>
18where
19	S: Stream + Unpin,
20{
21	pub fn new(stream: S, interval: Duration) -> Debounce<S> {
22		Self { stream, interval, sleep: sleep(Duration::ZERO), last: None }
23	}
24}
25
26impl<S> Stream for Debounce<S>
27where
28	S: Stream + Unpin,
29{
30	type Item = S::Item;
31
32	fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
33		let (mut stream, interval, mut sleep, last) = unsafe {
34			let me = self.get_unchecked_mut();
35			(Pin::new(&mut me.stream), me.interval, Pin::new_unchecked(&mut me.sleep), &mut me.last)
36		};
37
38		if sleep.poll_unpin(cx).is_ready() {
39			if let Some(last) = last.take() {
40				return Poll::Ready(Some(last));
41			}
42		}
43
44		while let Poll::Ready(next) = stream.poll_next_unpin(cx) {
45			match next {
46				Some(next) => {
47					*last = Some(next);
48				}
49				None if last.is_none() => {
50					return Poll::Ready(None);
51				}
52				None => {
53					sleep.reset(Instant::now());
54					return Poll::Pending;
55				}
56			}
57		}
58
59		sleep.reset(Instant::now() + interval);
60		Poll::Pending
61	}
62}