tauri_store_utils/time/
throttle.rs1use super::debounce::{Message, OptionalSender};
2use crate::manager::ManagerExt;
3use crate::task::{OptionalAbortHandle, RemoteCallable};
4use std::future::Future;
5use std::sync::atomic::AtomicBool;
6use std::sync::atomic::Ordering::Relaxed;
7use std::sync::{Arc, Weak};
8use std::time::Duration;
9use tauri::async_runtime::spawn;
10use tauri::{AppHandle, Runtime};
11use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
12use tokio::task::AbortHandle;
13use tokio::time::sleep;
14
15type ThrottledFn<R, Fut> = dyn Fn(AppHandle<R>) -> Fut + Send + Sync + 'static;
16
17pub struct Throttle<R, T, Fut>
19where
20 R: Runtime,
21 T: Send + 'static,
22 Fut: Future<Output = T> + Send + 'static,
23{
24 inner: Arc<ThrottledFn<R, Fut>>,
25 waiting: Arc<AtomicBool>,
26 sender: Arc<OptionalSender>,
27 abort_handle: Arc<OptionalAbortHandle>,
28 duration: Duration,
29}
30
31impl<R, T, Fut> Throttle<R, T, Fut>
32where
33 R: Runtime,
34 T: Send + 'static,
35 Fut: Future<Output = T> + Send + 'static,
36{
37 pub fn new<F>(duration: Duration, f: F) -> Self
38 where
39 F: Fn(AppHandle<R>) -> Fut + Send + Sync + 'static,
40 {
41 Self {
42 inner: Arc::new(f),
43 waiting: Arc::new(AtomicBool::new(false)),
44 sender: Arc::new(OptionalSender::default()),
45 abort_handle: Arc::new(OptionalAbortHandle::default()),
46 duration,
47 }
48 }
49
50 pub fn call(&self, app: &AppHandle<R>) {
51 if self.sender.send() || self.waiting.load(Relaxed) {
52 return;
53 }
54
55 let (tx, rx) = unbounded_channel();
56 let actor = Actor {
57 function: Arc::downgrade(&self.inner),
58 waiting: Arc::downgrade(&self.waiting),
59 receiver: rx,
60 duration: self.duration,
61 };
62
63 let _ = tx.send(Message::Call);
64
65 self.sender.inner.lock().replace(tx);
66 self.abort_handle.replace(actor.run(app));
67 }
68
69 pub fn abort(&self) {
70 self.sender.inner.lock().take();
71 self.abort_handle.abort();
72 self.waiting.store(false, Relaxed);
73 }
74}
75
76impl<R, T, Fut> RemoteCallable<AppHandle<R>> for Throttle<R, T, Fut>
77where
78 R: Runtime,
79 T: Send + 'static,
80 Fut: Future<Output = T> + Send + 'static,
81{
82 fn call(&self, app: &AppHandle<R>) {
83 self.call(app);
84 }
85
86 fn abort(&self) {
87 self.abort();
88 }
89}
90
91impl<R, T, Fut> Drop for Throttle<R, T, Fut>
92where
93 R: Runtime,
94 T: Send + 'static,
95 Fut: Future<Output = T> + Send + 'static,
96{
97 fn drop(&mut self) {
98 self.abort();
99 }
100}
101
102struct Actor<R, T, Fut>
103where
104 R: Runtime,
105 T: Send + 'static,
106 Fut: Future<Output = T> + Send + 'static,
107{
108 function: Weak<ThrottledFn<R, Fut>>,
109 waiting: Weak<AtomicBool>,
110 receiver: UnboundedReceiver<Message>,
111 duration: Duration,
112}
113
114impl<R, T, Fut> Actor<R, T, Fut>
115where
116 R: Runtime,
117 T: Send + 'static,
118 Fut: Future<Output = T> + Send + 'static,
119{
120 fn run(mut self, app: &AppHandle<R>) -> AbortHandle {
121 app.spawn(move |app| async move {
122 loop {
123 if (self.receiver.recv().await).is_some() {
124 let Some(waiting) = self.waiting.upgrade() else { break };
125 let Some(function) = self.function.upgrade() else { break };
126
127 if compare_exchange(&waiting, false, true) {
128 (function)(app.clone()).await;
129 spawn(async move {
130 sleep(self.duration).await;
131 waiting.store(false, Relaxed);
132 });
133 }
134 } else {
135 self.receiver.close();
136 self
137 .waiting
138 .upgrade()
139 .inspect(|it| it.store(false, Relaxed));
140
141 break;
142 }
143 }
144 })
145 }
146}
147
148fn compare_exchange(waiting: &Arc<AtomicBool>, expected: bool, new: bool) -> bool {
149 waiting
150 .compare_exchange(expected, new, Relaxed, Relaxed)
151 .is_ok()
152}