permit/lib.rs
1//! [](https://crates.io/crates/permit)
2//! [](https://gitlab.com/leonhard-llc/ops/-/raw/main/permit/LICENSE)
3//! [](https://github.com/rust-secure-code/safety-dance/)
4//! [](https://gitlab.com/leonhard-llc/ops/-/pipelines)
5//!
6//! [`permit::Permit`](https://docs.rs/permit/latest/permit/struct.Permit.html)
7//! is a struct for cancelling operations.
8//!
9//! # Use Cases
10//! - Graceful server shutdown
11//! - Cancel operations that take too long
12//! - Stop in-flight operations when revoking authorization
13//!
14//! # Features
15//! - Subordinate permits.
16//! Revoking a permit also revokes its subordinates, recursively.
17//! - Drop a permit to revoke its subordinates, recursively.
18//! - Wait for all subordinate permits to drop.
19//! - Implements `Future`. You can `await` a permit and return when it is revoked.
20//! - Similar to Golang's [`context`](https://golang.org/pkg/context/)
21//! - Depends only on `std`.
22//! - `forbid(unsafe_code)`
23//! - 100% test coverage
24//!
25//! # Limitations
26//! - Does not hold data values
27//! - Allocates. Uses [`alloc::sync::Arc`](https://doc.rust-lang.org/alloc/sync/struct.Arc.html).
28//!
29//! # Alternatives
30//! - [`async_ctx`](https://crates.io/crates/async_ctx)
31//! - Good API
32//! - Async only
33//! - [`stopper`](https://crates.io/crates/stopper/)
34//! - Async only
35//! - [`io-context`](https://crates.io/crates/io-context)
36//! - Holds [Any](https://doc.rust-lang.org/core/any/trait.Any.html) values
37//! - Unmaintained
38//! - [`ctx`](https://crates.io/crates/ctx)
39//! - Holds an [Any](https://doc.rust-lang.org/core/any/trait.Any.html) value
40//! - API is a direct copy of Golang's
41//! [`context`](https://golang.org/pkg/context/),
42//! even where that doesn't make sense for Rust.
43//! For example, to cancel, one must copy the context and call
44//! a returned `Box<Fn>`.
45//! - Unmaintained
46//!
47//! # Related Crates
48//!
49//! # Example
50//!
51//! Graceful shutdown:
52//! ```
53//! # use core::time::Duration;
54//! # fn wait_for_shutdown_signal() { () }
55//! let top_permit = permit::Permit::new();
56//! // Start some worker threads.
57//! for _ in 0..5 {
58//! let permit = top_permit.new_sub();
59//! std::thread::spawn(move || {
60//! while !permit.is_revoked() {
61//! // ...
62//! # std::thread::sleep(Duration::from_millis(1));
63//! }
64//! });
65//! }
66//! wait_for_shutdown_signal();
67//! // Revoke all thread permits and wait for them to
68//! // finish and drop their permits.
69//! top_permit
70//! .revoke()
71//! .wait_subs_timeout(Duration::from_secs(3))
72//! .unwrap();
73//! ```
74//!
75//! # Cargo Geiger Safety Report
76//! # Changelog
77//! - v0.2.1 - Fix bug where `sleep` and `sleep_until` would sometimes not return early.
78//! - v0.2.0
79//! - Rename `try_wait_for` to `wait_subs_timeout`
80//! - Rename `try_wait_until` to `wait_subs_deadline`
81//! - Replace spinlock with Condvar in `wait*` methods
82//! - Remove `wait`
83//! - Add `sleep` and `sleep_until`
84//! - v0.1.5 - Implement `Debug`
85//! - v0.1.4 - Fix [bug](https://gitlab.com/leonhard-llc/ops/-/issues/2)
86//! where `revoke()` and then `wait()` would not wait.
87//! - v0.1.3
88//! - Don't keep or wake stale
89//! [`std::task::Waker`](https://doc.rust-lang.org/std/task/struct.Waker.html) structs.
90//! - Eliminate race that causes unnecessary wake.
91//! - v0.1.2 - Implement `Future`
92//! - v0.1.1 - Make `revoke` return `&Self`
93//! - v0.1.0 - Initial version
94#![forbid(unsafe_code)]
95use core::fmt::{Debug, Formatter};
96use core::hash::{Hash, Hasher};
97use core::pin::Pin;
98use core::sync::atomic::AtomicBool;
99use core::task::{Context, Poll, Waker};
100use core::time::Duration;
101use std::collections::HashSet;
102use std::future::Future;
103use std::sync::{Arc, Condvar, Mutex, Weak};
104use std::time::Instant;
105
106// This code was beautiful before implementing `Future`:
107// https://gitlab.com/leonhard-llc/ops/-/blob/26adc04aec12ac083fda358f176f0ef5130cda60/permit/src/lib.rs
108//
109// How can we simplify it?
110
111#[derive(Clone)]
112struct ArcNode(Arc<Node>);
113impl PartialEq for ArcNode {
114 fn eq(&self, other: &Self) -> bool {
115 Arc::as_ptr(&self.0).eq(&Arc::as_ptr(&other.0))
116 }
117}
118impl Eq for ArcNode {}
119impl Hash for ArcNode {
120 fn hash<H: Hasher>(&self, state: &mut H) {
121 Arc::as_ptr(&self.0).hash(state);
122 }
123}
124// impl Debug for ArcNode {
125// fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
126// write!(f, "ArcNode({:?})", Arc::as_ptr(&self.0))
127// }
128// }
129
130// #[derive(Debug)]
131struct Inner {
132 revoked: bool,
133 opt_waker: Option<Waker>,
134 subs: HashSet<ArcNode>,
135}
136impl Inner {
137 #[must_use]
138 pub fn new(revoked: bool) -> Self {
139 Inner {
140 revoked,
141 opt_waker: None,
142 subs: HashSet::new(),
143 }
144 }
145
146 pub fn add_sub(&mut self, node: &Arc<Node>) {
147 if !self.revoked {
148 self.subs.insert(ArcNode(Arc::clone(node)));
149 }
150 }
151
152 pub fn remove_sub(&mut self, node: &Arc<Node>) {
153 let arc_node = ArcNode(Arc::clone(node));
154 self.subs.remove(&arc_node);
155 }
156
157 pub fn poll(&mut self, cx: &mut Context<'_>) -> Poll<()> {
158 if self.revoked {
159 Poll::Ready(())
160 } else {
161 self.opt_waker = Some(cx.waker().clone());
162 Poll::Pending
163 }
164 }
165
166 pub fn revoke(&mut self) -> (Option<Waker>, Vec<ArcNode>) {
167 self.revoked = true;
168 (self.opt_waker.take(), self.subs.iter().cloned().collect())
169 }
170}
171
172// #[derive(Debug)]
173struct Node {
174 superior: Weak<Node>,
175 atomic_revoked: AtomicBool,
176 inner: Mutex<Inner>,
177 condvar: Condvar,
178}
179impl Node {
180 #[must_use]
181 pub fn new(revoked: bool, superior: Weak<Self>) -> Self {
182 Node {
183 superior,
184 atomic_revoked: AtomicBool::new(revoked),
185 inner: Mutex::new(Inner::new(revoked)),
186 condvar: Condvar::new(),
187 }
188 }
189
190 #[must_use]
191 pub fn new_apex() -> Self {
192 Self::new(false, Weak::new())
193 }
194
195 #[must_use]
196 pub fn new_sub(self: &Arc<Self>) -> Arc<Self> {
197 let node = Arc::new(Self::new(self.is_revoked(), Arc::downgrade(self)));
198 self.inner.lock().unwrap().add_sub(&node);
199 node
200 }
201
202 #[must_use]
203 pub fn new_clone(self: &Arc<Self>) -> Arc<Self> {
204 let node = Arc::new(Self::new(self.is_revoked(), Weak::clone(&self.superior)));
205 if let Some(superior) = self.superior.upgrade() {
206 superior.add_sub(&node);
207 }
208 node
209 }
210
211 pub fn add_sub(self: &Arc<Self>, node: &Arc<Node>) {
212 self.inner.lock().unwrap().add_sub(node);
213 }
214
215 fn remove_sub(&self, node: &Arc<Node>) {
216 self.inner.lock().unwrap().remove_sub(node);
217 self.condvar.notify_all();
218 }
219
220 #[must_use]
221 pub fn is_revoked(&self) -> bool {
222 self.atomic_revoked
223 .load(std::sync::atomic::Ordering::Relaxed)
224 }
225
226 pub fn poll(self: &Arc<Self>, cx: &mut Context<'_>) -> Poll<()> {
227 self.inner.lock().unwrap().poll(cx)
228 }
229
230 fn revoke(self: &Arc<Self>, wake: bool) {
231 self.atomic_revoked
232 .store(true, std::sync::atomic::Ordering::Relaxed);
233 let (opt_waker, subs) = self.inner.lock().unwrap().revoke();
234 self.condvar.notify_all();
235 if wake {
236 if let Some(waker) = opt_waker {
237 waker.wake();
238 }
239 }
240 for sub in subs {
241 sub.0.revoke(true);
242 }
243 }
244
245 pub fn revoke_and_remove_from_superior(self: &Arc<Self>) {
246 if let Some(superior) = self.superior.upgrade() {
247 superior.remove_sub(self);
248 }
249 self.revoke(false);
250 }
251}
252
253#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
254pub struct DeadlineExceeded;
255impl core::fmt::Display for DeadlineExceeded {
256 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
257 write!(f, "DeadlineExceeded")
258 }
259}
260impl std::error::Error for DeadlineExceeded {}
261
262#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash)]
263pub struct PermitRevoked;
264impl core::fmt::Display for PermitRevoked {
265 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
266 write!(f, "PermitRevoked")
267 }
268}
269impl std::error::Error for PermitRevoked {}
270
271/// A struct for cancelling operations.
272///
273/// Use [`new_sub()`](#method.new_sub) to make a subordinate permit.
274/// Call [`revoke()`](#method.revoke) to revoke a permit
275/// and its subordinate permits, recursively.
276///
277/// # Example
278///
279/// Graceful shutdown:
280/// ```
281/// # fn wait_for_shutdown_signal() { () }
282/// let top_permit = permit::Permit::new();
283/// // Start some worker threads.
284/// for _ in 0..5 {
285/// let permit = top_permit.new_sub();
286/// std::thread::spawn(move || {
287/// while !permit.is_revoked() {
288/// // ...
289/// # std::thread::sleep(core::time::Duration::from_millis(1));
290/// }
291/// });
292/// }
293/// wait_for_shutdown_signal();
294/// // Revoke all thread permits and wait for them to
295/// // finish and drop their permits.
296/// top_permit
297/// .revoke()
298/// .wait_subs_timeout(core::time::Duration::from_secs(3))
299/// .unwrap();
300/// ```
301pub struct Permit {
302 node: Arc<Node>,
303}
304impl Permit {
305 /// Makes a new permit.
306 ///
307 /// This permit is not subordinate to any other permit.
308 /// It has no superior.
309 ///
310 /// Dropping the permit revokes it and any subordinate permits.
311 #[must_use]
312 pub fn new() -> Self {
313 Self {
314 node: Arc::new(Node::new_apex()),
315 }
316 }
317
318 /// Make a new permit that is subordinate to this permit.
319 ///
320 /// Call [`revoke()`](#method.revoke) to revoke a permit
321 /// and its subordinate permits, recursively.
322 ///
323 /// Dropping the permit revokes it and any subordinate permits.
324 #[must_use]
325 pub fn new_sub(&self) -> Self {
326 Self {
327 node: self.node.new_sub(),
328 }
329 }
330
331 /// Returns `true` if [`revoke()`](#method.revoke) has previously been called
332 /// on this permit or any of its superiors.
333 #[must_use]
334 pub fn is_revoked(&self) -> bool {
335 self.node.is_revoked()
336 }
337
338 /// Returns `Some(())` if [`revoke()`](#method.revoke) has not been called
339 /// on this permit or any of its superiors.
340 #[must_use]
341 pub fn ok(&self) -> Option<()> {
342 if self.node.is_revoked() {
343 None
344 } else {
345 Some(())
346 }
347 }
348
349 /// Revokes this permit and all subordinate permits.
350 #[allow(clippy::must_use_candidate)]
351 pub fn revoke(&self) -> &Self {
352 self.node.revoke_and_remove_from_superior();
353 self
354 }
355
356 /// Returns `true` if this permit has any subordinate permits that have not
357 /// been dropped.
358 ///
359 /// This includes direct subordinates and their subordinates, recursively.
360 #[allow(clippy::missing_panics_doc)]
361 #[must_use]
362 pub fn has_subs(&self) -> bool {
363 !self.node.inner.lock().unwrap().subs.is_empty()
364 }
365
366 /// Waits until `duration` time passes or the permit is revoked.
367 ///
368 /// # Errors
369 /// Returns `Err` when the permit is revoked.
370 #[allow(clippy::missing_panics_doc)]
371 pub fn sleep(&self, duration: Duration) -> Result<(), PermitRevoked> {
372 let inner_guard = self.node.inner.lock().unwrap();
373 let (_guard, wait_result) = self
374 .node
375 .condvar
376 .wait_timeout_while(inner_guard, duration, |inner_guard| !inner_guard.revoked)
377 .unwrap();
378 if wait_result.timed_out() {
379 Ok(())
380 } else {
381 Err(PermitRevoked {})
382 }
383 }
384
385 /// Waits until `deadline` or the permit is revoked.
386 ///
387 /// # Errors
388 /// Returns `Err` when the permit is revoked.
389 pub fn sleep_until(&self, deadline: Instant) -> Result<(), PermitRevoked> {
390 let duration = deadline.saturating_duration_since(Instant::now());
391 self.sleep(duration)
392 }
393
394 /// Waits for all direct subordinate permits to drop.
395 ///
396 /// # Errors
397 /// Returns [`DeadlineExceeded`](struct.DeadlineExceeded.html) when `duration` passes
398 /// and the permit has a subordinate permit.
399 #[allow(clippy::missing_panics_doc)]
400 pub fn wait_subs_timeout(&self, duration: Duration) -> Result<(), DeadlineExceeded> {
401 let guard = self.node.inner.lock().unwrap();
402 let (_guard, wait_result) = self
403 .node
404 .condvar
405 .wait_timeout_while(guard, duration, |guard| !guard.subs.is_empty())
406 .unwrap();
407 if wait_result.timed_out() {
408 Err(DeadlineExceeded {})
409 } else {
410 Ok(())
411 }
412 }
413
414 /// Waits for all direct subordinate permits to drop.
415 ///
416 /// # Errors
417 /// Returns [`DeadlineExceeded`](struct.DeadlineExceeded.html) when `deadline` passes
418 /// and the permit has a subordinate permit.
419 pub fn wait_subs_deadline(&self, deadline: Instant) -> Result<(), DeadlineExceeded> {
420 let duration = deadline.saturating_duration_since(Instant::now());
421 self.wait_subs_timeout(duration)
422 }
423}
424impl Drop for Permit {
425 fn drop(&mut self) {
426 self.node.revoke_and_remove_from_superior();
427 }
428}
429impl Clone for Permit {
430 fn clone(&self) -> Self {
431 Self {
432 node: self.node.new_clone(),
433 }
434 }
435}
436impl Debug for Permit {
437 fn fmt(&self, f: &mut Formatter<'_>) -> Result<(), core::fmt::Error> {
438 write!(
439 f,
440 "Permit{{revoked={},num_subs={}}}",
441 self.is_revoked(),
442 Arc::weak_count(&self.node)
443 )
444 }
445}
446impl Default for Permit {
447 fn default() -> Self {
448 Self::new()
449 }
450}
451impl Future for Permit {
452 type Output = ();
453
454 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
455 self.node.poll(cx)
456 }
457}