blocking_permit/permit.rs
1use tao_log::{warn, trace};
2
3#[cfg(feature = "tokio-semaphore")]
4mod tokio_semaphore;
5
6#[cfg(feature = "tokio-semaphore")]
7pub use tokio_semaphore::{
8 BlockingPermit,
9 BlockingPermitFuture,
10 Semaphore,
11 SyncBlockingPermitFuture,
12};
13
14#[cfg(not(feature = "tokio-semaphore"))]
15#[cfg(feature = "futures-intrusive")]
16mod intrusive;
17
18#[cfg(not(feature = "tokio-semaphore"))]
19#[cfg(feature = "futures-intrusive")]
20pub use intrusive::{
21 BlockingPermit,
22 BlockingPermitFuture,
23 Semaphore,
24 SyncBlockingPermitFuture,
25};
26
27/// Extension trait for uniform construction of the re-exported [`Semaphore`]
28/// types.
29pub trait Semaphorish {
30 /// Construct given number of permits.
31 ///
32 /// This chooses _fair_ scheduling, if that is a supported option.
33 fn default_new(permits: usize) -> Self;
34}
35
36// Note: these methods are common to both above struct definitions
37
38impl<'a> BlockingPermit<'a> {
39 /// Enter the blocking section of code on the current thread.
40 ///
41 /// This is a secondary step from completion of the
42 /// [`BlockingPermitFuture`] as it must be called on the same thread,
43 /// immediately before the blocking section. The blocking permit should
44 /// then be dropped at the end of the blocking section. If the
45 /// _tokio-threaded_ feature is or might be used, `run` should be
46 /// used instead.
47 ///
48 /// ## Panics
49 ///
50 /// Panics if this `BlockingPermit` has already been entered.
51 pub fn enter(&self) {
52 if self.entered.replace(true) {
53 panic!("BlockingPermit::enter (or run) called twice!");
54 }
55 }
56
57 /// Enter and run the blocking closure.
58 ///
59 /// When the *tokio-threaded* feature is enabled, this wraps the
60 /// `tokio::task::block_in_place` call, as a secondary step from completion
61 /// of the [`BlockingPermitFuture`] that must be called on the same
62 /// thread.
63 ///
64 /// In any case, the permit is passed by value and will be dropped on
65 /// termination of this call.
66 ///
67 /// ## Panics
68 ///
69 /// Panics if this `BlockingPermit` has already been entered.
70 pub fn run<F, T>(self, f: F) -> T
71 where F: FnOnce() -> T
72 {
73 if self.entered.replace(true) {
74 panic!("BlockingPermit::run (or enter) called twice!");
75 }
76
77 #[cfg(feature="tokio-threaded")] {
78 tokio::task::block_in_place(f)
79 }
80
81 #[cfg(not(feature="tokio-threaded"))] {
82 f()
83 }
84 }
85}
86
87impl<'a> Drop for BlockingPermit<'a> {
88 fn drop(&mut self) {
89 if self.entered.get() {
90 trace!("Dropped BlockingPermit (semaphore)");
91 } else {
92 warn!("Dropped BlockingPermit (semaphore) was never entered")
93 }
94 }
95}
96
97/// Request a permit to perform a blocking operation on the current thread.
98///
99/// The returned future attempts to obtain a permit from the provided
100/// `Semaphore` and outputs a `BlockingPermit` which can then be
101/// [`run`](BlockingPermit::run) to allow blocking or "long running"
102/// operation, while the `BlockingPermit` remains in scope. If no permits are
103/// immediately available, then the current task context will be notified when
104/// one becomes available.
105pub fn blocking_permit_future(semaphore: &Semaphore)
106 -> BlockingPermitFuture<'_>
107{
108 BlockingPermitFuture::new(semaphore)
109}