Skip to main content

tauri_store_utils/time/
throttle.rs

1use super::debounce::{Message, OptionalSender};
2use crate::manager::ManagerExt;
3use crate::task::{OptionalAbortHandle, RemoteCallable};
4use std::future::Future;
5use std::sync::atomic::AtomicBool;
6use std::sync::atomic::Ordering::Relaxed;
7use std::sync::{Arc, Weak};
8use std::time::Duration;
9use tauri::async_runtime::spawn;
10use tauri::{AppHandle, Runtime};
11use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
12use tokio::task::AbortHandle;
13use tokio::time::sleep;
14
15type ThrottledFn<R, Fut> = dyn Fn(AppHandle<R>) -> Fut + Send + Sync + 'static;
16
17/// Throttles a function call.
18pub struct Throttle<R, T, Fut>
19where
20  R: Runtime,
21  T: Send + 'static,
22  Fut: Future<Output = T> + Send + 'static,
23{
24  inner: Arc<ThrottledFn<R, Fut>>,
25  waiting: Arc<AtomicBool>,
26  sender: Arc<OptionalSender>,
27  abort_handle: Arc<OptionalAbortHandle>,
28  duration: Duration,
29}
30
31impl<R, T, Fut> Throttle<R, T, Fut>
32where
33  R: Runtime,
34  T: Send + 'static,
35  Fut: Future<Output = T> + Send + 'static,
36{
37  pub fn new<F>(duration: Duration, f: F) -> Self
38  where
39    F: Fn(AppHandle<R>) -> Fut + Send + Sync + 'static,
40  {
41    Self {
42      inner: Arc::new(f),
43      waiting: Arc::new(AtomicBool::new(false)),
44      sender: Arc::new(OptionalSender::default()),
45      abort_handle: Arc::new(OptionalAbortHandle::default()),
46      duration,
47    }
48  }
49
50  pub fn call(&self, app: &AppHandle<R>) {
51    if self.sender.send() || self.waiting.load(Relaxed) {
52      return;
53    }
54
55    let (tx, rx) = unbounded_channel();
56    let actor = Actor {
57      function: Arc::downgrade(&self.inner),
58      waiting: Arc::downgrade(&self.waiting),
59      receiver: rx,
60      duration: self.duration,
61    };
62
63    let _ = tx.send(Message::Call);
64
65    self.sender.inner.lock().replace(tx);
66    self.abort_handle.replace(actor.run(app));
67  }
68
69  pub fn abort(&self) {
70    self.sender.inner.lock().take();
71    self.abort_handle.abort();
72    self.waiting.store(false, Relaxed);
73  }
74}
75
76impl<R, T, Fut> RemoteCallable<AppHandle<R>> for Throttle<R, T, Fut>
77where
78  R: Runtime,
79  T: Send + 'static,
80  Fut: Future<Output = T> + Send + 'static,
81{
82  fn call(&self, app: &AppHandle<R>) {
83    self.call(app);
84  }
85
86  fn abort(&self) {
87    self.abort();
88  }
89}
90
91impl<R, T, Fut> Drop for Throttle<R, T, Fut>
92where
93  R: Runtime,
94  T: Send + 'static,
95  Fut: Future<Output = T> + Send + 'static,
96{
97  fn drop(&mut self) {
98    self.abort();
99  }
100}
101
102struct Actor<R, T, Fut>
103where
104  R: Runtime,
105  T: Send + 'static,
106  Fut: Future<Output = T> + Send + 'static,
107{
108  function: Weak<ThrottledFn<R, Fut>>,
109  waiting: Weak<AtomicBool>,
110  receiver: UnboundedReceiver<Message>,
111  duration: Duration,
112}
113
114impl<R, T, Fut> Actor<R, T, Fut>
115where
116  R: Runtime,
117  T: Send + 'static,
118  Fut: Future<Output = T> + Send + 'static,
119{
120  fn run(mut self, app: &AppHandle<R>) -> AbortHandle {
121    app.spawn(move |app| async move {
122      loop {
123        if (self.receiver.recv().await).is_some() {
124          let Some(waiting) = self.waiting.upgrade() else { break };
125          let Some(function) = self.function.upgrade() else { break };
126
127          if compare_exchange(&waiting, false, true) {
128            (function)(app.clone()).await;
129            spawn(async move {
130              sleep(self.duration).await;
131              waiting.store(false, Relaxed);
132            });
133          }
134        } else {
135          self.receiver.close();
136          self
137            .waiting
138            .upgrade()
139            .inspect(|it| it.store(false, Relaxed));
140
141          break;
142        }
143      }
144    })
145  }
146}
147
148fn compare_exchange(waiting: &Arc<AtomicBool>, expected: bool, new: bool) -> bool {
149  waiting
150    .compare_exchange(expected, new, Relaxed, Relaxed)
151    .is_ok()
152}