gnostr_asyncgit/asyncjob/
mod.rs1#![deny(clippy::expect_used)]
4
5use std::sync::{Arc, Mutex, RwLock};
6
7use crossbeam_channel::Sender;
8
9use crate::error::Result;
10
11pub struct RunParams<
14 T: Copy + Send,
15 P: Clone + Send + Sync + PartialEq,
16> {
17 sender: Sender<T>,
18 progress: Arc<RwLock<P>>,
19}
20
21impl<T: Copy + Send, P: Clone + Send + Sync + PartialEq>
22 RunParams<T, P>
23{
24 pub fn send(&self, notification: T) -> Result<()> {
30 self.sender.send(notification)?;
31 Ok(())
32 }
33
34 pub fn set_progress(&self, p: P) -> Result<bool> {
36 Ok(if *self.progress.read()? == p {
37 false
38 } else {
39 *(self.progress.write()?) = p;
40 true
41 })
42 }
43}
44
45pub trait AsyncJob: Send + Sync + Clone {
47 type Notification: Copy + Send;
49 type Progress: Clone + Default + Send + Sync + PartialEq;
51
52 fn run(
58 &mut self,
59 params: RunParams<Self::Notification, Self::Progress>,
60 ) -> Result<Self::Notification>;
61
62 fn get_progress(&self) -> Self::Progress {
66 Self::Progress::default()
67 }
68}
69
70#[derive(Debug, Clone)]
74pub struct AsyncSingleJob<J: AsyncJob> {
75 next: Arc<Mutex<Option<J>>>,
76 last: Arc<Mutex<Option<J>>>,
77 progress: Arc<RwLock<J::Progress>>,
78 sender: Sender<J::Notification>,
79 pending: Arc<Mutex<()>>,
80}
81
82impl<J: 'static + AsyncJob> AsyncSingleJob<J> {
83 pub fn new(sender: Sender<J::Notification>) -> Self {
85 Self {
86 next: Arc::new(Mutex::new(None)),
87 last: Arc::new(Mutex::new(None)),
88 pending: Arc::new(Mutex::new(())),
89 progress: Arc::new(RwLock::new(J::Progress::default())),
90 sender,
91 }
92 }
93
94 pub fn is_pending(&self) -> bool {
96 self.pending.try_lock().is_err()
97 }
98
99 pub fn cancel(&mut self) -> bool {
102 if let Ok(mut next) = self.next.lock() {
103 if next.is_some() {
104 *next = None;
105 return true;
106 }
107 }
108
109 false
110 }
111
112 pub fn take_last(&self) -> Option<J> {
114 self.last.lock().map_or(None, |mut last| last.take())
115 }
116
117 pub fn spawn(&mut self, task: J) -> bool {
122 self.schedule_next(task);
123 self.check_for_job()
124 }
125
126 pub fn progress(&self) -> Option<J::Progress> {
128 self.progress.read().ok().map(|d| (*d).clone())
129 }
130
131 fn check_for_job(&self) -> bool {
132 if self.is_pending() {
133 return false;
134 }
135
136 if let Some(task) = self.take_next() {
137 let self_clone = (*self).clone();
138 rayon_core::spawn(move || {
139 if let Err(e) = self_clone.run_job(task) {
140 log::error!("async job error: {}", e);
141 }
142 });
143
144 return true;
145 }
146
147 false
148 }
149
150 fn run_job(&self, mut task: J) -> Result<()> {
151 {
153 let _pending = self.pending.lock()?;
154
155 let notification = task.run(RunParams {
156 progress: self.progress.clone(),
157 sender: self.sender.clone(),
158 })?;
159
160 if let Ok(mut last) = self.last.lock() {
161 *last = Some(task);
162 }
163
164 self.sender.send(notification)?;
165 }
166
167 self.check_for_job();
168
169 Ok(())
170 }
171
172 fn schedule_next(&mut self, task: J) {
173 if let Ok(mut next) = self.next.lock() {
174 *next = Some(task);
175 }
176 }
177
178 fn take_next(&self) -> Option<J> {
179 self.next.lock().map_or(None, |mut next| next.take())
180 }
181}
182
183#[cfg(test)]
184mod test {
185 use std::{
186 sync::atomic::{AtomicBool, AtomicU32, Ordering},
187 thread,
188 time::Duration,
189 };
190
191 use crossbeam_channel::unbounded;
192 use pretty_assertions::assert_eq;
193
194 use super::*;
195
196 #[derive(Clone)]
197 struct TestJob {
198 v: Arc<AtomicU32>,
199 finish: Arc<AtomicBool>,
200 value_to_add: u32,
201 }
202
203 type TestNotification = ();
204
205 impl AsyncJob for TestJob {
206 type Notification = TestNotification;
207 type Progress = ();
208
209 fn run(
210 &mut self,
211 _params: RunParams<Self::Notification, Self::Progress>,
212 ) -> Result<Self::Notification> {
213 println!("[job] wait");
214
215 while !self.finish.load(Ordering::SeqCst) {
216 std::thread::yield_now();
217 }
218
219 println!("[job] sleep");
220
221 thread::sleep(Duration::from_millis(100));
222
223 println!("[job] done sleeping");
224
225 let res =
226 self.v.fetch_add(self.value_to_add, Ordering::SeqCst);
227
228 println!("[job] value: {res}");
229
230 Ok(())
231 }
232 }
233
234 #[test]
235 fn test_overwrite() {
236 let (sender, receiver) = unbounded();
237
238 let mut job: AsyncSingleJob<TestJob> =
239 AsyncSingleJob::new(sender);
240
241 let task = TestJob {
242 v: Arc::new(AtomicU32::new(1)),
243 finish: Arc::new(AtomicBool::new(false)),
244 value_to_add: 1,
245 };
246
247 assert!(job.spawn(task.clone()));
248 task.finish.store(true, Ordering::SeqCst);
249 thread::sleep(Duration::from_millis(10));
250
251 for _ in 0..5 {
252 println!("spawn");
253 assert!(!job.spawn(task.clone()));
254 }
255
256 println!("recv");
257 receiver.recv().unwrap();
258 receiver.recv().unwrap();
259 assert!(receiver.is_empty());
260
261 assert_eq!(
262 task.v.load(std::sync::atomic::Ordering::SeqCst),
263 3
264 );
265 }
266
267 fn wait_for_job(job: &AsyncSingleJob<TestJob>) {
268 while job.is_pending() {
269 thread::sleep(Duration::from_millis(10));
270 }
271 }
272
273 #[test]
274 fn test_cancel() {
275 let (sender, receiver) = unbounded();
276
277 let mut job: AsyncSingleJob<TestJob> =
278 AsyncSingleJob::new(sender);
279
280 let task = TestJob {
281 v: Arc::new(AtomicU32::new(1)),
282 finish: Arc::new(AtomicBool::new(false)),
283 value_to_add: 1,
284 };
285
286 assert!(job.spawn(task.clone()));
287 task.finish.store(true, Ordering::SeqCst);
288 thread::sleep(Duration::from_millis(10));
289
290 for _ in 0..5 {
291 println!("spawn");
292 assert!(!job.spawn(task.clone()));
293 }
294
295 println!("cancel");
296 assert!(job.cancel());
297
298 task.finish.store(true, Ordering::SeqCst);
299
300 wait_for_job(&job);
301
302 println!("recv");
303 receiver.recv().unwrap();
304 println!("received");
305
306 assert_eq!(
307 task.v.load(std::sync::atomic::Ordering::SeqCst),
308 2
309 );
310 }
311}