procsem/
lib.rs

1//! A simple process semaphore.
2//!
3//! (Note: The word _process_ should be read as a sequence of operations,
4//! rather than an operating system process).
5//!
6//! The [`ProcSem`] is intended to allow mutual exclusion of a chain of
7//! operations that may span over several threads/tasks.
8//!
9//! This is much like a `Mutex`, but it differs in that it holds no generic
10//! parameter and the `ProcCtx` (the equivalent of `Mutex`'s `MutexGuard`) is
11//! `Send`, because it is explicitly meant to be passed around between
12//! threads/tasks.  It supports blocking, nonblocking and async lock
13//! acquisition.
14//!
15//! # Progress reporting
16//! When a `ProcSem` is created, it can optionally be handed an object that
17//! implements [`StateReporter`].  If a `ProcSem` has a `StateReporter`
18//! associated with it, then an acquired `ProcCtx` can use
19//! [`ProcCtx::action()`], [`ProcCtx::progress()`] to pass progress information
20//! to the `StateReporter` implementor.
21//!
22//! Once a `ProcCtx` is about to terminate, [`ProcCtx::end()`] can be used to
23//! signal a final message that can be retrieved by the `ProcSem` object.
24
25use std::{
26  future::Future,
27  num::NonZeroUsize,
28  pin::Pin,
29  sync::{
30    atomic::{AtomicUsize, Ordering},
31    Arc
32  },
33  task::{Context, Poll, Waker}
34};
35
36use parking_lot::{Condvar, Mutex};
37
38use indexmap::IndexMap;
39
40
41/// Interface used to implement objects that are reponsible for sending process
42/// reports.
43#[allow(unused_variables)]
44pub trait StateReporter: Send + Sync {
45  /// Called when a [`ProcCtx`] has been acquired.
46  fn begin(&self) {}
47
48  /// Called when [`ProcCtx::action()`] has been called to set a current
49  /// action.
50  fn action(&self, s: &str) {}
51
52  /// The [`ProcCtx`] has progress has been made.
53  fn progress(&self, percent: Option<u8>) {}
54
55  /// Called when the [`ProcCtx`] is being dropped.
56  ///
57  /// This method will be called while the process ownership context still
58  /// exists.  If the `ProcCtx` is used as a resource lock, the application
59  /// can rely on mutual exclusive access to the resource within this callback.
60  fn end(&self, msg: Option<String>) {}
61}
62
63
64/// Shared data that must be protected behind mutually exclusive access.
65#[derive(Default)]
66struct Inner {
67  /// Flag denoting whether the semaphore is currently owned or not.
68  ///
69  /// If this is `true` it means there's a `ProcCtx` object.  If this is
70  /// `false` there's no `ProcCtx` object for this semaphore.
71  owned: bool,
72
73  /// Collection of async task wakers waiting to acquire this lock.
74  wakers: IndexMap<usize, Waker>,
75
76  /// The last final message.
77  endmsg: Option<String>
78}
79
80#[derive(Default)]
81struct Shared {
82  signal: Condvar,
83  inner: Mutex<Inner>,
84  idgen: AtomicUsize,
85  reporter: Option<Box<dyn StateReporter>>
86}
87
88
89/// Representation of a process that may be active or inactive.
90#[repr(transparent)]
91#[derive(Default)]
92pub struct ProcSem(Arc<Shared>);
93
94impl ProcSem {
95  /// Create a new process semaphore object.
96  #[must_use]
97  pub fn new() -> Self {
98    Self::default()
99  }
100
101  /// Create a new process semaphore with a reporter object.
102  pub fn with_reporter(reporter: impl StateReporter + 'static) -> Self {
103    Self(Arc::new(Shared {
104      reporter: Some(Box::new(reporter)),
105      ..Default::default()
106    }))
107  }
108
109  /// Return a `Future` that will yield a [`ProcCtx`] when possible.
110  ///
111  /// On successful acquire the internal endmsg will be cleared.
112  #[must_use]
113  pub fn acquire_async(&self) -> AcquireFuture {
114    AcquireFuture {
115      shared: Arc::clone(&self.0),
116      id: None
117    }
118  }
119
120  /// Acquire a [`ProcCtx`], blocking until successful.
121  ///
122  /// On successful acquire the internal endmsg will be cleared.
123  #[must_use]
124  #[allow(clippy::significant_drop_tightening)]
125  pub fn acquire_blocking(&self) -> ProcCtx {
126    let mut g = self.0.inner.lock();
127    loop {
128      if g.owned {
129        // There's a Lock for this ProcSem.
130        // Wait until the condvar is triggered so we can check again.
131        self.0.signal.wait(&mut g);
132      } else {
133        // Lock and return a Lock object
134        g.owned = true;
135        g.endmsg = None;
136        if let Some(ref r) = self.0.reporter {
137          r.begin();
138        }
139        break ProcCtx(Arc::clone(&self.0));
140      }
141    }
142  }
143
144  /// Attempt to acquire [`ProcCtx`], returning immediately.
145  ///
146  /// Returns `Some(ProcCtx)` if the lock was acquired successfully.  Returns
147  /// `None` if the lock is already owned by another lock.
148  ///
149  /// On successful acquire the internal endmsg will be cleared.
150  #[must_use]
151  #[allow(clippy::significant_drop_tightening)]
152  pub fn try_acquire(&self) -> Option<ProcCtx> {
153    let mut g = self.0.inner.lock();
154    if g.owned {
155      // Lock already acquired
156      None
157    } else {
158      g.owned = true;
159      g.endmsg = None;
160      if let Some(ref r) = self.0.reporter {
161        r.begin();
162      }
163      Some(ProcCtx(Arc::clone(&self.0)))
164    }
165  }
166
167  /// Get a clone of the internal end message.
168  #[must_use]
169  pub fn endmsg(&self) -> Option<String> {
170    self.0.inner.lock().endmsg.clone()
171  }
172}
173
174
175/// Future used to wait to acquire a [`ProcCtx`] in an async environment.
176pub struct AcquireFuture {
177  shared: Arc<Shared>,
178
179  id: Option<NonZeroUsize>
180}
181
182impl Future for AcquireFuture {
183  type Output = ProcCtx;
184
185  fn poll(
186    mut self: Pin<&mut Self>,
187    ctx: &mut Context<'_>
188  ) -> Poll<Self::Output> {
189    let mut inner = self.shared.inner.lock();
190    if inner.owned {
191      //
192      // Must wait for lock to be released
193      //
194
195      // Generate a unique identifier for this waker
196      let id = loop {
197        let id = self.shared.idgen.fetch_add(1, Ordering::SeqCst);
198        // Make sure id is non-zero and unique
199        if id == 0 || inner.wakers.contains_key(&id) {
200          continue;
201        }
202        break id;
203      };
204      inner.wakers.insert(id, ctx.waker().clone());
205      drop(inner);
206
207      // SAFETY: The loop above will guarantee that the id is non-zero
208      self.id = Some(unsafe { NonZeroUsize::new_unchecked(id) });
209
210      Poll::Pending
211    } else {
212      //
213      // Mark lock as owned and return a Lock object.
214      //
215      inner.owned = true;
216      inner.endmsg = None;
217      if let Some(ref r) = self.shared.reporter {
218        r.begin();
219      }
220
221      drop(inner);
222      Poll::Ready(ProcCtx(Arc::clone(&self.shared)))
223    }
224  }
225}
226
227impl Drop for AcquireFuture {
228  /// If this future has generated a `Waker`, then remove it.
229  fn drop(&mut self) {
230    if let Some(id) = self.id {
231      let mut inner = self.shared.inner.lock();
232      // Remove this future's waker
233      let _ = inner.wakers.swap_remove(&id.get());
234    }
235  }
236}
237
238
239/// Object that denotes ownership of a [`ProcSem`] semaphore.
240///
241/// When the process is complete this object must be dropped so the semaphore
242/// can be acquired again.
243#[repr(transparent)]
244pub struct ProcCtx(Arc<Shared>);
245
246
247impl ProcCtx {
248  /// Provide a textual describe the current stage of the process.
249  pub fn action(&self, text: &str) {
250    if let Some(ref r) = self.0.reporter {
251      r.action(text);
252    }
253  }
254
255  /// Set the progress (in percent).
256  pub fn progress(&self, percent: Option<u8>) {
257    if let Some(ref r) = self.0.reporter {
258      r.progress(percent);
259    }
260  }
261
262  /// Set an end-state message.
263  ///
264  /// The purpose of the "end message" is to be able to leave a final "result"
265  /// message that can be read by the `ProcSem`.
266  ///
267  /// Calling this function will not cause the [`StateReporter::end()`] to be
268  /// called.  However, if an end message has been set, a clone if it will be
269  /// passed to the `StateReporter::end()` callback.
270  ///
271  /// The message will be stored in the internal [`ProcSem`] storage
272  /// and can be retrieved using [`ProcSem::endmsg()`].
273  pub fn end(&self, text: &str) {
274    let mut g = self.0.inner.lock();
275    g.endmsg = Some(text.to_string());
276  }
277}
278
279impl Drop for ProcCtx {
280  #[allow(clippy::significant_drop_tightening)]
281  fn drop(&mut self) {
282    if let Some(ref r) = self.0.reporter {
283      let g = self.0.inner.lock();
284      let endmsg = g.endmsg.clone();
285      drop(g);
286      r.end(endmsg);
287    }
288
289    let mut g = self.0.inner.lock();
290    g.owned = false;
291
292    // Wake up all waiting futures.
293    //
294    // In an ideal world only one would be woken up, but there are edge cases
295    // when using multithreaded executors when an await is performed in a
296    // select!{} in which a doomed future might be picked.  In such a case the
297    // "wake" would be wasted on a future that will not be able to process the
298    // wake request.
299    //
300    // To avoid this, _all_ wakers are woken up and they'll have to reenter
301    // pending state if applicable.
302    for (_, w) in g.wakers.drain(..) {
303      w.wake();
304    }
305
306    // Wake a single blocking thread (blocked threads do not suffer from the
307    // same issue).
308    self.0.signal.notify_one();
309  }
310}
311
312// vim: set ft=rust et sw=2 ts=2 sts=2 cinoptions=2 tw=79 :