livekit_protocol/
debouncer.rs1use std::time::Duration;
16
17use futures_util::Future;
18use thiserror::Error;
19use tokio::sync::{mpsc, oneshot};
20
21#[derive(Debug, Error)]
22pub enum DebounceError {
23 #[error("function already executed")]
24 AlreadyExecuted,
25}
26
27pub struct Debouncer {
28 cancel_tx: Option<oneshot::Sender<()>>,
29 tx: mpsc::UnboundedSender<()>,
30}
31
32pub fn debounce<F>(duration: Duration, future: F) -> Debouncer
33where
34 F: Future + Send + 'static,
35{
36 let (tx, rx) = mpsc::unbounded_channel();
37 let (cancel_tx, cancel_rx) = oneshot::channel();
38 livekit_runtime::spawn(debounce_task(duration, future, rx, cancel_rx));
39 Debouncer { tx, cancel_tx: Some(cancel_tx) }
40}
41
42async fn debounce_task<F>(
43 duration: Duration,
44 future: F,
45 mut rx: mpsc::UnboundedReceiver<()>,
46 mut cancel_rx: oneshot::Receiver<()>,
47) where
48 F: Future + Send + 'static,
49{
50 loop {
51 tokio::select! {
52 _ = &mut cancel_rx => break,
53 _ = rx.recv() => continue,
54 _ = livekit_runtime::sleep(duration) => {
55 future.await;
56 break;
57 }
58 }
59 }
60}
61
62impl Debouncer {
63 pub fn call(&self) -> Result<(), mpsc::error::SendError<()>> {
64 self.tx.send(())
65 }
66}
67
68impl Drop for Debouncer {
69 fn drop(&mut self) {
70 let _ = self.cancel_tx.take().unwrap().send(());
71 }
72}