gnostr_asyncgit/asyncjob/
mod.rs

1//! provides `AsyncJob` trait and `AsyncSingleJob` struct
2
3#![deny(clippy::expect_used)]
4
5use std::sync::{Arc, Mutex, RwLock};
6
7use crossbeam_channel::Sender;
8
9use crate::error::Result;
10
11/// Passed to `AsyncJob::run` allowing sending intermediate progress
12/// notifications
13pub struct RunParams<
14	T: Copy + Send,
15	P: Clone + Send + Sync + PartialEq,
16> {
17	sender: Sender<T>,
18	progress: Arc<RwLock<P>>,
19}
20
21impl<T: Copy + Send, P: Clone + Send + Sync + PartialEq>
22	RunParams<T, P>
23{
24	/// send an intermediate update notification.
25	/// do not confuse this with the return value of `run`.
26	/// `send` should only be used about progress notifications
27	/// and not for the final notification indicating the end of the
28	/// async job. see `run` for more info
29	pub fn send(&self, notification: T) -> Result<()> {
30		self.sender.send(notification)?;
31		Ok(())
32	}
33
34	/// set the current progress
35	pub fn set_progress(&self, p: P) -> Result<bool> {
36		Ok(if *self.progress.read()? == p {
37			false
38		} else {
39			*(self.progress.write()?) = p;
40			true
41		})
42	}
43}
44
45/// trait that defines an async task we can run on a threadpool
46pub trait AsyncJob: Send + Sync + Clone {
47	/// defines what notification type is used to communicate outside
48	type Notification: Copy + Send;
49	/// type of progress
50	type Progress: Clone + Default + Send + Sync + PartialEq;
51
52	/// can run a synchronous time intensive task.
53	/// the returned notification is used to tell interested parties
54	/// that the job finished and the job can be access via
55	/// `take_last`. prior to this final notification it is not safe
56	/// to assume `take_last` will already return the correct job
57	fn run(
58		&mut self,
59		params: RunParams<Self::Notification, Self::Progress>,
60	) -> Result<Self::Notification>;
61
62	/// allows observers to get intermediate progress status if the
63	/// job customizes it by default this will be returning
64	/// `Self::Progress::default()`
65	fn get_progress(&self) -> Self::Progress {
66		Self::Progress::default()
67	}
68}
69
70/// Abstraction for a FIFO task queue that will only queue up **one**
71/// `next` job. It keeps overwriting the next job until it is actually
72/// taken to be processed
73#[derive(Debug, Clone)]
74pub struct AsyncSingleJob<J: AsyncJob> {
75	next: Arc<Mutex<Option<J>>>,
76	last: Arc<Mutex<Option<J>>>,
77	progress: Arc<RwLock<J::Progress>>,
78	sender: Sender<J::Notification>,
79	pending: Arc<Mutex<()>>,
80}
81
82impl<J: 'static + AsyncJob> AsyncSingleJob<J> {
83	///
84	pub fn new(sender: Sender<J::Notification>) -> Self {
85		Self {
86			next: Arc::new(Mutex::new(None)),
87			last: Arc::new(Mutex::new(None)),
88			pending: Arc::new(Mutex::new(())),
89			progress: Arc::new(RwLock::new(J::Progress::default())),
90			sender,
91		}
92	}
93
94	///
95	pub fn is_pending(&self) -> bool {
96		self.pending.try_lock().is_err()
97	}
98
99	/// makes sure `next` is cleared and returns `true` if it actually
100	/// canceled something
101	pub fn cancel(&mut self) -> bool {
102		if let Ok(mut next) = self.next.lock() {
103			if next.is_some() {
104				*next = None;
105				return true;
106			}
107		}
108
109		false
110	}
111
112	/// take out last finished job
113	pub fn take_last(&self) -> Option<J> {
114		self.last.lock().map_or(None, |mut last| last.take())
115	}
116
117	/// spawns `task` if nothing is running currently,
118	/// otherwise schedules as `next` overwriting if `next` was set
119	/// before. return `true` if the new task gets started right
120	/// away.
121	pub fn spawn(&mut self, task: J) -> bool {
122		self.schedule_next(task);
123		self.check_for_job()
124	}
125
126	///
127	pub fn progress(&self) -> Option<J::Progress> {
128		self.progress.read().ok().map(|d| (*d).clone())
129	}
130
131	fn check_for_job(&self) -> bool {
132		if self.is_pending() {
133			return false;
134		}
135
136		if let Some(task) = self.take_next() {
137			let self_clone = (*self).clone();
138			rayon_core::spawn(move || {
139				if let Err(e) = self_clone.run_job(task) {
140					log::error!("async job error: {}", e);
141				}
142			});
143
144			return true;
145		}
146
147		false
148	}
149
150	fn run_job(&self, mut task: J) -> Result<()> {
151		//limit the pending scope
152		{
153			let _pending = self.pending.lock()?;
154
155			let notification = task.run(RunParams {
156				progress: self.progress.clone(),
157				sender: self.sender.clone(),
158			})?;
159
160			if let Ok(mut last) = self.last.lock() {
161				*last = Some(task);
162			}
163
164			self.sender.send(notification)?;
165		}
166
167		self.check_for_job();
168
169		Ok(())
170	}
171
172	fn schedule_next(&mut self, task: J) {
173		if let Ok(mut next) = self.next.lock() {
174			*next = Some(task);
175		}
176	}
177
178	fn take_next(&self) -> Option<J> {
179		self.next.lock().map_or(None, |mut next| next.take())
180	}
181}
182
183#[cfg(test)]
184mod test {
185	use std::{
186		sync::atomic::{AtomicBool, AtomicU32, Ordering},
187		thread,
188		time::Duration,
189	};
190
191	use crossbeam_channel::unbounded;
192	use pretty_assertions::assert_eq;
193
194	use super::*;
195
196	#[derive(Clone)]
197	struct TestJob {
198		v: Arc<AtomicU32>,
199		finish: Arc<AtomicBool>,
200		value_to_add: u32,
201	}
202
203	type TestNotification = ();
204
205	impl AsyncJob for TestJob {
206		type Notification = TestNotification;
207		type Progress = ();
208
209		fn run(
210			&mut self,
211			_params: RunParams<Self::Notification, Self::Progress>,
212		) -> Result<Self::Notification> {
213			println!("[job] wait");
214
215			while !self.finish.load(Ordering::SeqCst) {
216				std::thread::yield_now();
217			}
218
219			println!("[job] sleep");
220
221			thread::sleep(Duration::from_millis(100));
222
223			println!("[job] done sleeping");
224
225			let res =
226				self.v.fetch_add(self.value_to_add, Ordering::SeqCst);
227
228			println!("[job] value: {res}");
229
230			Ok(())
231		}
232	}
233
234	#[test]
235	fn test_overwrite() {
236		let (sender, receiver) = unbounded();
237
238		let mut job: AsyncSingleJob<TestJob> =
239			AsyncSingleJob::new(sender);
240
241		let task = TestJob {
242			v: Arc::new(AtomicU32::new(1)),
243			finish: Arc::new(AtomicBool::new(false)),
244			value_to_add: 1,
245		};
246
247		assert!(job.spawn(task.clone()));
248		task.finish.store(true, Ordering::SeqCst);
249		thread::sleep(Duration::from_millis(10));
250
251		for _ in 0..5 {
252			println!("spawn");
253			assert!(!job.spawn(task.clone()));
254		}
255
256		println!("recv");
257		receiver.recv().unwrap();
258		receiver.recv().unwrap();
259		assert!(receiver.is_empty());
260
261		assert_eq!(
262			task.v.load(std::sync::atomic::Ordering::SeqCst),
263			3
264		);
265	}
266
267	fn wait_for_job(job: &AsyncSingleJob<TestJob>) {
268		while job.is_pending() {
269			thread::sleep(Duration::from_millis(10));
270		}
271	}
272
273	#[test]
274	fn test_cancel() {
275		let (sender, receiver) = unbounded();
276
277		let mut job: AsyncSingleJob<TestJob> =
278			AsyncSingleJob::new(sender);
279
280		let task = TestJob {
281			v: Arc::new(AtomicU32::new(1)),
282			finish: Arc::new(AtomicBool::new(false)),
283			value_to_add: 1,
284		};
285
286		assert!(job.spawn(task.clone()));
287		task.finish.store(true, Ordering::SeqCst);
288		thread::sleep(Duration::from_millis(10));
289
290		for _ in 0..5 {
291			println!("spawn");
292			assert!(!job.spawn(task.clone()));
293		}
294
295		println!("cancel");
296		assert!(job.cancel());
297
298		task.finish.store(true, Ordering::SeqCst);
299
300		wait_for_job(&job);
301
302		println!("recv");
303		receiver.recv().unwrap();
304		println!("received");
305
306		assert_eq!(
307			task.v.load(std::sync::atomic::Ordering::SeqCst),
308			2
309		);
310	}
311}