livekit_protocol/
debouncer.rs

1// Copyright 2023 LiveKit, Inc.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use futures_util::Future;
18use thiserror::Error;
19use tokio::sync::{mpsc, oneshot};
20
21#[derive(Debug, Error)]
22pub enum DebounceError {
23    #[error("function already executed")]
24    AlreadyExecuted,
25}
26
27pub struct Debouncer {
28    cancel_tx: Option<oneshot::Sender<()>>,
29    tx: mpsc::UnboundedSender<()>,
30}
31
32pub fn debounce<F>(duration: Duration, future: F) -> Debouncer
33where
34    F: Future + Send + 'static,
35{
36    let (tx, rx) = mpsc::unbounded_channel();
37    let (cancel_tx, cancel_rx) = oneshot::channel();
38    livekit_runtime::spawn(debounce_task(duration, future, rx, cancel_rx));
39    Debouncer { tx, cancel_tx: Some(cancel_tx) }
40}
41
42async fn debounce_task<F>(
43    duration: Duration,
44    future: F,
45    mut rx: mpsc::UnboundedReceiver<()>,
46    mut cancel_rx: oneshot::Receiver<()>,
47) where
48    F: Future + Send + 'static,
49{
50    loop {
51        tokio::select! {
52            _ = &mut cancel_rx => break,
53            _ = rx.recv() => continue,
54            _ = livekit_runtime::sleep(duration) => {
55                future.await;
56                break;
57            }
58        }
59    }
60}
61
62impl Debouncer {
63    pub fn call(&self) -> Result<(), mpsc::error::SendError<()>> {
64        self.tx.send(())
65    }
66}
67
68impl Drop for Debouncer {
69    fn drop(&mut self) {
70        let _ = self.cancel_tx.take().unwrap().send(());
71    }
72}