permit/
lib.rs

1//! [![crates.io version](https://img.shields.io/crates/v/permit.svg)](https://crates.io/crates/permit)
2//! [![license: Apache 2.0](https://gitlab.com/leonhard-llc/ops/-/raw/main/license-apache-2.0.svg)](https://gitlab.com/leonhard-llc/ops/-/raw/main/permit/LICENSE)
3//! [![unsafe forbidden](https://gitlab.com/leonhard-llc/ops/-/raw/main/unsafe-forbidden.svg)](https://github.com/rust-secure-code/safety-dance/)
4//! [![pipeline status](https://gitlab.com/leonhard-llc/ops/badges/main/pipeline.svg)](https://gitlab.com/leonhard-llc/ops/-/pipelines)
5//!
6//! [`permit::Permit`](https://docs.rs/permit/latest/permit/struct.Permit.html)
7//! is a struct for cancelling operations.
8//!
9//! # Use Cases
10//! - Graceful server shutdown
11//! - Cancel operations that take too long
12//! - Stop in-flight operations when revoking authorization
13//!
14//! # Features
15//! - Subordinate permits.
16//!   Revoking a permit also revokes its subordinates, recursively.
17//! - Drop a permit to revoke its subordinates, recursively.
18//! - Wait for all subordinate permits to drop.
19//! - Implements `Future`.  You can `await` a permit and return when it is revoked.
20//! - Similar to Golang's [`context`](https://golang.org/pkg/context/)
21//! - Depends only on `std`.
22//! - `forbid(unsafe_code)`
23//! - 100% test coverage
24//!
25//! # Limitations
26//! - Does not hold data values
27//! - Allocates.  Uses [`alloc::sync::Arc`](https://doc.rust-lang.org/alloc/sync/struct.Arc.html).
28//!
29//! # Alternatives
30//! - [`async_ctx`](https://crates.io/crates/async_ctx)
31//!   - Good API
32//!   - Async only
33//! - [`stopper`](https://crates.io/crates/stopper/)
34//!   - Async only
35//! - [`io-context`](https://crates.io/crates/io-context)
36//!   - Holds [Any](https://doc.rust-lang.org/core/any/trait.Any.html) values
37//!   - Unmaintained
38//! - [`ctx`](https://crates.io/crates/ctx)
39//!   - Holds an [Any](https://doc.rust-lang.org/core/any/trait.Any.html) value
40//!   - API is a direct copy of Golang's
41//!     [`context`](https://golang.org/pkg/context/),
42//!     even where that doesn't make sense for Rust.
43//!     For example, to cancel, one must copy the context and call
44//!     a returned `Box<Fn>`.
45//!   - Unmaintained
46//!
47//! # Related Crates
48//!
49//! # Example
50//!
51//! Graceful shutdown:
52//! ```
53//! # use core::time::Duration;
54//! # fn wait_for_shutdown_signal() { () }
55//! let top_permit = permit::Permit::new();
56//! // Start some worker threads.
57//! for _ in 0..5 {
58//!     let permit = top_permit.new_sub();
59//!     std::thread::spawn(move || {
60//!         while !permit.is_revoked() {
61//!             // ...
62//! #           std::thread::sleep(Duration::from_millis(1));
63//!         }
64//!     });
65//! }
66//! wait_for_shutdown_signal();
67//! // Revoke all thread permits and wait for them to
68//! // finish and drop their permits.
69//! top_permit
70//!     .revoke()
71//!     .wait_subs_timeout(Duration::from_secs(3))
72//!     .unwrap();
73//! ```
74//!
75//! # Cargo Geiger Safety Report
76//! # Changelog
77//! - v0.2.1 - Fix bug where `sleep` and `sleep_until` would sometimes not return early.
78//! - v0.2.0
79//!    - Rename `try_wait_for` to `wait_subs_timeout`
80//!    - Rename `try_wait_until` to `wait_subs_deadline`
81//!    - Replace spinlock with Condvar in `wait*` methods
82//!    - Remove `wait`
83//!    - Add `sleep` and `sleep_until`
84//! - v0.1.5 - Implement `Debug`
85//! - v0.1.4 - Fix [bug](https://gitlab.com/leonhard-llc/ops/-/issues/2)
86//!   where `revoke()` and then `wait()` would not wait.
87//! - v0.1.3
88//!   - Don't keep or wake stale
89//!     [`std::task::Waker`](https://doc.rust-lang.org/std/task/struct.Waker.html) structs.
90//!   - Eliminate race that causes unnecessary wake.
91//! - v0.1.2 - Implement `Future`
92//! - v0.1.1 - Make `revoke` return `&Self`
93//! - v0.1.0 - Initial version
94#![forbid(unsafe_code)]
95use core::fmt::{Debug, Formatter};
96use core::hash::{Hash, Hasher};
97use core::pin::Pin;
98use core::sync::atomic::AtomicBool;
99use core::task::{Context, Poll, Waker};
100use core::time::Duration;
101use std::collections::HashSet;
102use std::future::Future;
103use std::sync::{Arc, Condvar, Mutex, Weak};
104use std::time::Instant;
105
106// This code was beautiful before implementing `Future`:
107// https://gitlab.com/leonhard-llc/ops/-/blob/26adc04aec12ac083fda358f176f0ef5130cda60/permit/src/lib.rs
108//
109// How can we simplify it?
110
111#[derive(Clone)]
112struct ArcNode(Arc<Node>);
113impl PartialEq for ArcNode {
114    fn eq(&self, other: &Self) -> bool {
115        Arc::as_ptr(&self.0).eq(&Arc::as_ptr(&other.0))
116    }
117}
118impl Eq for ArcNode {}
119impl Hash for ArcNode {
120    fn hash<H: Hasher>(&self, state: &mut H) {
121        Arc::as_ptr(&self.0).hash(state);
122    }
123}
124// impl Debug for ArcNode {
125//     fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
126//         write!(f, "ArcNode({:?})", Arc::as_ptr(&self.0))
127//     }
128// }
129
130// #[derive(Debug)]
131struct Inner {
132    revoked: bool,
133    opt_waker: Option<Waker>,
134    subs: HashSet<ArcNode>,
135}
136impl Inner {
137    #[must_use]
138    pub fn new(revoked: bool) -> Self {
139        Inner {
140            revoked,
141            opt_waker: None,
142            subs: HashSet::new(),
143        }
144    }
145
146    pub fn add_sub(&mut self, node: &Arc<Node>) {
147        if !self.revoked {
148            self.subs.insert(ArcNode(Arc::clone(node)));
149        }
150    }
151
152    pub fn remove_sub(&mut self, node: &Arc<Node>) {
153        let arc_node = ArcNode(Arc::clone(node));
154        self.subs.remove(&arc_node);
155    }
156
157    pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
158        if self.revoked {
159            Poll::Ready(())
160        } else {
161            self.opt_waker = Some(cx.waker().clone());
162            Poll::Pending
163        }
164    }
165
166    pub fn revoke(&mut self) -> (Option<Waker>, Vec<ArcNode>) {
167        self.revoked = true;
168        (self.opt_waker.take(), self.subs.iter().cloned().collect())
169    }
170}
171
172// #[derive(Debug)]
173struct Node {
174    superior: Weak<Node>,
175    atomic_revoked: AtomicBool,
176    inner: Mutex<Inner>,
177    condvar: Condvar,
178}
179impl Node {
180    #[must_use]
181    pub fn new(revoked: bool, superior: Weak<Self>) -> Self {
182        Node {
183            superior,
184            atomic_revoked: AtomicBool::new(revoked),
185            inner: Mutex::new(Inner::new(revoked)),
186            condvar: Condvar::new(),
187        }
188    }
189
190    #[must_use]
191    pub fn new_apex() -> Self {
192        Self::new(false, Weak::new())
193    }
194
195    #[must_use]
196    pub fn new_sub(self: &Arc<Self>) -> Arc<Self> {
197        let node = Arc::new(Self::new(self.is_revoked(), Arc::downgrade(self)));
198        self.inner.lock().unwrap().add_sub(&node);
199        node
200    }
201
202    #[must_use]
203    pub fn new_clone(self: &Arc<Self>) -> Arc<Self> {
204        let node = Arc::new(Self::new(self.is_revoked(), Weak::clone(&self.superior)));
205        if let Some(superior) = self.superior.upgrade() {
206            superior.add_sub(&node);
207        }
208        node
209    }
210
211    pub fn add_sub(self: &Arc<Self>, node: &Arc<Node>) {
212        self.inner.lock().unwrap().add_sub(node);
213    }
214
215    fn remove_sub(&self, node: &Arc<Node>) {
216        self.inner.lock().unwrap().remove_sub(node);
217        self.condvar.notify_all();
218    }
219
220    #[must_use]
221    pub fn is_revoked(&self) -> bool {
222        self.atomic_revoked
223            .load(std::sync::atomic::Ordering::Relaxed)
224    }
225
226    pub fn poll(self: &Arc<Self>, cx: &mut Context<'_>) -> Poll<()> {
227        self.inner.lock().unwrap().poll(cx)
228    }
229
230    fn revoke(self: &Arc<Self>, wake: bool) {
231        self.atomic_revoked
232            .store(true, std::sync::atomic::Ordering::Relaxed);
233        let (opt_waker, subs) = self.inner.lock().unwrap().revoke();
234        self.condvar.notify_all();
235        if wake {
236            if let Some(waker) = opt_waker {
237                waker.wake();
238            }
239        }
240        for sub in subs {
241            sub.0.revoke(true);
242        }
243    }
244
245    pub fn revoke_and_remove_from_superior(self: &Arc<Self>) {
246        if let Some(superior) = self.superior.upgrade() {
247            superior.remove_sub(self);
248        }
249        self.revoke(false);
250    }
251}
252
253#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
254pub struct DeadlineExceeded;
255impl core::fmt::Display for DeadlineExceeded {
256    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
257        write!(f, "DeadlineExceeded")
258    }
259}
260impl std::error::Error for DeadlineExceeded {}
261
262#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
263pub struct PermitRevoked;
264impl core::fmt::Display for PermitRevoked {
265    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
266        write!(f, "PermitRevoked")
267    }
268}
269impl std::error::Error for PermitRevoked {}
270
271/// A struct for cancelling operations.
272///
273/// Use [`new_sub()`](#method.new_sub) to make a subordinate permit.
274/// Call [`revoke()`](#method.revoke) to revoke a permit
275/// and its subordinate permits, recursively.
276///
277/// # Example
278///
279/// Graceful shutdown:
280/// ```
281/// # fn wait_for_shutdown_signal() { () }
282/// let top_permit = permit::Permit::new();
283/// // Start some worker threads.
284/// for _ in 0..5 {
285///     let permit = top_permit.new_sub();
286///     std::thread::spawn(move || {
287///         while !permit.is_revoked() {
288///             // ...
289/// #           std::thread::sleep(core::time::Duration::from_millis(1));
290///         }
291///     });
292/// }
293/// wait_for_shutdown_signal();
294/// // Revoke all thread permits and wait for them to
295/// // finish and drop their permits.
296/// top_permit
297///     .revoke()
298///     .wait_subs_timeout(core::time::Duration::from_secs(3))
299///     .unwrap();
300/// ```
301pub struct Permit {
302    node: Arc<Node>,
303}
304impl Permit {
305    /// Makes a new permit.
306    ///
307    /// This permit is not subordinate to any other permit.
308    /// It has no superior.
309    ///
310    /// Dropping the permit revokes it and any subordinate permits.
311    #[must_use]
312    pub fn new() -> Self {
313        Self {
314            node: Arc::new(Node::new_apex()),
315        }
316    }
317
318    /// Make a new permit that is subordinate to this permit.
319    ///
320    /// Call [`revoke()`](#method.revoke) to revoke a permit
321    /// and its subordinate permits, recursively.
322    ///
323    /// Dropping the permit revokes it and any subordinate permits.
324    #[must_use]
325    pub fn new_sub(&self) -> Self {
326        Self {
327            node: self.node.new_sub(),
328        }
329    }
330
331    /// Returns `true` if [`revoke()`](#method.revoke) has previously been called
332    /// on this permit or any of its superiors.
333    #[must_use]
334    pub fn is_revoked(&self) -> bool {
335        self.node.is_revoked()
336    }
337
338    /// Returns `Some(())` if [`revoke()`](#method.revoke) has not been called
339    /// on this permit or any of its superiors.
340    #[must_use]
341    pub fn ok(&self) -> Option<()> {
342        if self.node.is_revoked() {
343            None
344        } else {
345            Some(())
346        }
347    }
348
349    /// Revokes this permit and all subordinate permits.
350    #[allow(clippy::must_use_candidate)]
351    pub fn revoke(&self) -> &Self {
352        self.node.revoke_and_remove_from_superior();
353        self
354    }
355
356    /// Returns `true` if this permit has any subordinate permits that have not
357    /// been dropped.
358    ///
359    /// This includes direct subordinates and their subordinates, recursively.
360    #[allow(clippy::missing_panics_doc)]
361    #[must_use]
362    pub fn has_subs(&self) -> bool {
363        !self.node.inner.lock().unwrap().subs.is_empty()
364    }
365
366    /// Waits until `duration` time passes or the permit is revoked.
367    ///
368    /// # Errors
369    /// Returns `Err` when the permit is revoked.
370    #[allow(clippy::missing_panics_doc)]
371    pub fn sleep(&self, duration: Duration) -> Result<(), PermitRevoked> {
372        let inner_guard = self.node.inner.lock().unwrap();
373        let (_guard, wait_result) = self
374            .node
375            .condvar
376            .wait_timeout_while(inner_guard, duration, |inner_guard| !inner_guard.revoked)
377            .unwrap();
378        if wait_result.timed_out() {
379            Ok(())
380        } else {
381            Err(PermitRevoked {})
382        }
383    }
384
385    /// Waits until `deadline` or the permit is revoked.
386    ///
387    /// # Errors
388    /// Returns `Err` when the permit is revoked.
389    pub fn sleep_until(&self, deadline: Instant) -> Result<(), PermitRevoked> {
390        let duration = deadline.saturating_duration_since(Instant::now());
391        self.sleep(duration)
392    }
393
394    /// Waits for all direct subordinate permits to drop.
395    ///
396    /// # Errors
397    /// Returns [`DeadlineExceeded`](struct.DeadlineExceeded.html) when `duration` passes
398    /// and the permit has a subordinate permit.
399    #[allow(clippy::missing_panics_doc)]
400    pub fn wait_subs_timeout(&self, duration: Duration) -> Result<(), DeadlineExceeded> {
401        let guard = self.node.inner.lock().unwrap();
402        let (_guard, wait_result) = self
403            .node
404            .condvar
405            .wait_timeout_while(guard, duration, |guard| !guard.subs.is_empty())
406            .unwrap();
407        if wait_result.timed_out() {
408            Err(DeadlineExceeded {})
409        } else {
410            Ok(())
411        }
412    }
413
414    /// Waits for all direct subordinate permits to drop.
415    ///
416    /// # Errors
417    /// Returns [`DeadlineExceeded`](struct.DeadlineExceeded.html) when `deadline` passes
418    /// and the permit has a subordinate permit.
419    pub fn wait_subs_deadline(&self, deadline: Instant) -> Result<(), DeadlineExceeded> {
420        let duration = deadline.saturating_duration_since(Instant::now());
421        self.wait_subs_timeout(duration)
422    }
423}
424impl Drop for Permit {
425    fn drop(&mut self) {
426        self.node.revoke_and_remove_from_superior();
427    }
428}
429impl Clone for Permit {
430    fn clone(&self) -> Self {
431        Self {
432            node: self.node.new_clone(),
433        }
434    }
435}
436impl Debug for Permit {
437    fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
438        write!(
439            f,
440            "Permit{{revoked={},num_subs={}}}",
441            self.is_revoked(),
442            Arc::weak_count(&self.node)
443        )
444    }
445}
446impl Default for Permit {
447    fn default() -> Self {
448        Self::new()
449    }
450}
451impl Future for Permit {
452    type Output = ();
453
454    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
455        self.node.poll(cx)
456    }
457}