procsem/lib.rs
1//! A simple process semaphore.
2//!
3//! (Note: The word _process_ should be read as a sequence of operations,
4//! rather than an operating system process).
5//!
6//! The [`ProcSem`] is intended to allow mutual exclusion of a chain of
7//! operations that may span over several threads/tasks.
8//!
9//! This is much like a `Mutex`, but it differs in that it holds no generic
10//! parameter and the `ProcCtx` (the equivalent of `Mutex`'s `MutexGuard`) is
11//! `Send`, because it is explicitly meant to be passed around between
12//! threads/tasks. It supports blocking, nonblocking and async lock
13//! acquisition.
14//!
15//! # Progress reporting
16//! When a `ProcSem` is created, it can optionally be handed an object that
17//! implements [`StateReporter`]. If a `ProcSem` has a `StateReporter`
18//! associated with it, then an acquired `ProcCtx` can use
19//! [`ProcCtx::action()`], [`ProcCtx::progress()`] to pass progress information
20//! to the `StateReporter` implementor.
21//!
22//! Once a `ProcCtx` is about to terminate, [`ProcCtx::end()`] can be used to
23//! signal a final message that can be retrieved by the `ProcSem` object.
24
25use std::{
26 future::Future,
27 num::NonZeroUsize,
28 pin::Pin,
29 sync::{
30 atomic::{AtomicUsize, Ordering},
31 Arc
32 },
33 task::{Context, Poll, Waker}
34};
35
36use parking_lot::{Condvar, Mutex};
37
38use indexmap::IndexMap;
39
40
41/// Interface used to implement objects that are reponsible for sending process
42/// reports.
43#[allow(unused_variables)]
44pub trait StateReporter: Send + Sync {
45 /// Called when a [`ProcCtx`] has been acquired.
46 fn begin(&self) {}
47
48 /// Called when [`ProcCtx::action()`] has been called to set a current
49 /// action.
50 fn action(&self, s: &str) {}
51
52 /// The [`ProcCtx`] has progress has been made.
53 fn progress(&self, percent: Option<u8>) {}
54
55 /// Called when the [`ProcCtx`] is being dropped.
56 ///
57 /// This method will be called while the process ownership context still
58 /// exists. If the `ProcCtx` is used as a resource lock, the application
59 /// can rely on mutual exclusive access to the resource within this callback.
60 fn end(&self, msg: Option<String>) {}
61}
62
63
64/// Shared data that must be protected behind mutually exclusive access.
65#[derive(Default)]
66struct Inner {
67 /// Flag denoting whether the semaphore is currently owned or not.
68 ///
69 /// If this is `true` it means there's a `ProcCtx` object. If this is
70 /// `false` there's no `ProcCtx` object for this semaphore.
71 owned: bool,
72
73 /// Collection of async task wakers waiting to acquire this lock.
74 wakers: IndexMap<usize, Waker>,
75
76 /// The last final message.
77 endmsg: Option<String>
78}
79
80#[derive(Default)]
81struct Shared {
82 signal: Condvar,
83 inner: Mutex<Inner>,
84 idgen: AtomicUsize,
85 reporter: Option<Box<dyn StateReporter>>
86}
87
88
89/// Representation of a process that may be active or inactive.
90#[repr(transparent)]
91#[derive(Default)]
92pub struct ProcSem(Arc<Shared>);
93
94impl ProcSem {
95 /// Create a new process semaphore object.
96 #[must_use]
97 pub fn new() -> Self {
98 Self::default()
99 }
100
101 /// Create a new process semaphore with a reporter object.
102 pub fn with_reporter(reporter: impl StateReporter + 'static) -> Self {
103 Self(Arc::new(Shared {
104 reporter: Some(Box::new(reporter)),
105 ..Default::default()
106 }))
107 }
108
109 /// Return a `Future` that will yield a [`ProcCtx`] when possible.
110 ///
111 /// On successful acquire the internal endmsg will be cleared.
112 #[must_use]
113 pub fn acquire_async(&self) -> AcquireFuture {
114 AcquireFuture {
115 shared: Arc::clone(&self.0),
116 id: None
117 }
118 }
119
120 /// Acquire a [`ProcCtx`], blocking until successful.
121 ///
122 /// On successful acquire the internal endmsg will be cleared.
123 #[must_use]
124 #[allow(clippy::significant_drop_tightening)]
125 pub fn acquire_blocking(&self) -> ProcCtx {
126 let mut g = self.0.inner.lock();
127 loop {
128 if g.owned {
129 // There's a Lock for this ProcSem.
130 // Wait until the condvar is triggered so we can check again.
131 self.0.signal.wait(&mut g);
132 } else {
133 // Lock and return a Lock object
134 g.owned = true;
135 g.endmsg = None;
136 if let Some(ref r) = self.0.reporter {
137 r.begin();
138 }
139 break ProcCtx(Arc::clone(&self.0));
140 }
141 }
142 }
143
144 /// Attempt to acquire [`ProcCtx`], returning immediately.
145 ///
146 /// Returns `Some(ProcCtx)` if the lock was acquired successfully. Returns
147 /// `None` if the lock is already owned by another lock.
148 ///
149 /// On successful acquire the internal endmsg will be cleared.
150 #[must_use]
151 #[allow(clippy::significant_drop_tightening)]
152 pub fn try_acquire(&self) -> Option<ProcCtx> {
153 let mut g = self.0.inner.lock();
154 if g.owned {
155 // Lock already acquired
156 None
157 } else {
158 g.owned = true;
159 g.endmsg = None;
160 if let Some(ref r) = self.0.reporter {
161 r.begin();
162 }
163 Some(ProcCtx(Arc::clone(&self.0)))
164 }
165 }
166
167 /// Get a clone of the internal end message.
168 #[must_use]
169 pub fn endmsg(&self) -> Option<String> {
170 self.0.inner.lock().endmsg.clone()
171 }
172}
173
174
175/// Future used to wait to acquire a [`ProcCtx`] in an async environment.
176pub struct AcquireFuture {
177 shared: Arc<Shared>,
178
179 id: Option<NonZeroUsize>
180}
181
182impl Future for AcquireFuture {
183 type Output = ProcCtx;
184
185 fn poll(
186 mut self: Pin<&mut Self>,
187 ctx: &mut Context<'_>
188 ) -> Poll<Self::Output> {
189 let mut inner = self.shared.inner.lock();
190 if inner.owned {
191 //
192 // Must wait for lock to be released
193 //
194
195 // Generate a unique identifier for this waker
196 let id = loop {
197 let id = self.shared.idgen.fetch_add(1, Ordering::SeqCst);
198 // Make sure id is non-zero and unique
199 if id == 0 || inner.wakers.contains_key(&id) {
200 continue;
201 }
202 break id;
203 };
204 inner.wakers.insert(id, ctx.waker().clone());
205 drop(inner);
206
207 // SAFETY: The loop above will guarantee that the id is non-zero
208 self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
209
210 Poll::Pending
211 } else {
212 //
213 // Mark lock as owned and return a Lock object.
214 //
215 inner.owned = true;
216 inner.endmsg = None;
217 if let Some(ref r) = self.shared.reporter {
218 r.begin();
219 }
220
221 drop(inner);
222 Poll::Ready(ProcCtx(Arc::clone(&self.shared)))
223 }
224 }
225}
226
227impl Drop for AcquireFuture {
228 /// If this future has generated a `Waker`, then remove it.
229 fn drop(&mut self) {
230 if let Some(id) = self.id {
231 let mut inner = self.shared.inner.lock();
232 // Remove this future's waker
233 let _ = inner.wakers.swap_remove(&id.get());
234 }
235 }
236}
237
238
239/// Object that denotes ownership of a [`ProcSem`] semaphore.
240///
241/// When the process is complete this object must be dropped so the semaphore
242/// can be acquired again.
243#[repr(transparent)]
244pub struct ProcCtx(Arc<Shared>);
245
246
247impl ProcCtx {
248 /// Provide a textual describe the current stage of the process.
249 pub fn action(&self, text: &str) {
250 if let Some(ref r) = self.0.reporter {
251 r.action(text);
252 }
253 }
254
255 /// Set the progress (in percent).
256 pub fn progress(&self, percent: Option<u8>) {
257 if let Some(ref r) = self.0.reporter {
258 r.progress(percent);
259 }
260 }
261
262 /// Set an end-state message.
263 ///
264 /// The purpose of the "end message" is to be able to leave a final "result"
265 /// message that can be read by the `ProcSem`.
266 ///
267 /// Calling this function will not cause the [`StateReporter::end()`] to be
268 /// called. However, if an end message has been set, a clone if it will be
269 /// passed to the `StateReporter::end()` callback.
270 ///
271 /// The message will be stored in the internal [`ProcSem`] storage
272 /// and can be retrieved using [`ProcSem::endmsg()`].
273 pub fn end(&self, text: &str) {
274 let mut g = self.0.inner.lock();
275 g.endmsg = Some(text.to_string());
276 }
277}
278
279impl Drop for ProcCtx {
280 #[allow(clippy::significant_drop_tightening)]
281 fn drop(&mut self) {
282 if let Some(ref r) = self.0.reporter {
283 let g = self.0.inner.lock();
284 let endmsg = g.endmsg.clone();
285 drop(g);
286 r.end(endmsg);
287 }
288
289 let mut g = self.0.inner.lock();
290 g.owned = false;
291
292 // Wake up all waiting futures.
293 //
294 // In an ideal world only one would be woken up, but there are edge cases
295 // when using multithreaded executors when an await is performed in a
296 // select!{} in which a doomed future might be picked. In such a case the
297 // "wake" would be wasted on a future that will not be able to process the
298 // wake request.
299 //
300 // To avoid this, _all_ wakers are woken up and they'll have to reenter
301 // pending state if applicable.
302 for (_, w) in g.wakers.drain(..) {
303 w.wake();
304 }
305
306 // Wake a single blocking thread (blocked threads do not suffer from the
307 // same issue).
308 self.0.signal.notify_one();
309 }
310}
311
312// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :