blocking_permit/
permit.rs

1use tao_log::{warn, trace};
2
3#[cfg(feature = "tokio-semaphore")]
4mod tokio_semaphore;
5
6#[cfg(feature = "tokio-semaphore")]
7pub use tokio_semaphore::{
8    BlockingPermit,
9    BlockingPermitFuture,
10    Semaphore,
11    SyncBlockingPermitFuture,
12};
13
14#[cfg(not(feature = "tokio-semaphore"))]
15#[cfg(feature = "futures-intrusive")]
16mod intrusive;
17
18#[cfg(not(feature = "tokio-semaphore"))]
19#[cfg(feature = "futures-intrusive")]
20pub use intrusive::{
21    BlockingPermit,
22    BlockingPermitFuture,
23    Semaphore,
24    SyncBlockingPermitFuture,
25};
26
27/// Extension trait for uniform construction of the re-exported [`Semaphore`]
28/// types.
29pub trait Semaphorish {
30    /// Construct given number of permits.
31    ///
32    /// This chooses _fair_ scheduling, if that is a supported option.
33    fn default_new(permits: usize) -> Self;
34}
35
36// Note: these methods are common to both above struct definitions
37
38impl<'a> BlockingPermit<'a> {
39    /// Enter the blocking section of code on the current thread.
40    ///
41    /// This is a secondary step from completion of the
42    /// [`BlockingPermitFuture`] as it must be called on the same thread,
43    /// immediately before the blocking section.  The blocking permit should
44    /// then be dropped at the end of the blocking section. If the
45    /// _tokio-threaded_ feature is or might be used, `run` should be
46    /// used instead.
47    ///
48    /// ## Panics
49    ///
50    /// Panics if this `BlockingPermit` has already been entered.
51    pub fn enter(&self) {
52        if self.entered.replace(true) {
53            panic!("BlockingPermit::enter (or run) called twice!");
54        }
55    }
56
57    /// Enter and run the blocking closure.
58    ///
59    /// When the *tokio-threaded* feature is enabled, this wraps the
60    /// `tokio::task::block_in_place` call, as a secondary step from completion
61    /// of the [`BlockingPermitFuture`] that must be called on the same
62    /// thread.
63    ///
64    /// In any case, the permit is passed by value and will be dropped on
65    /// termination of this call.
66    ///
67    /// ## Panics
68    ///
69    /// Panics if this `BlockingPermit` has already been entered.
70    pub fn run<F, T>(self, f: F) -> T
71        where F: FnOnce() -> T
72    {
73        if self.entered.replace(true) {
74            panic!("BlockingPermit::run (or enter) called twice!");
75        }
76
77        #[cfg(feature="tokio-threaded")] {
78            tokio::task::block_in_place(f)
79        }
80
81        #[cfg(not(feature="tokio-threaded"))] {
82            f()
83        }
84    }
85}
86
87impl<'a> Drop for BlockingPermit<'a> {
88    fn drop(&mut self) {
89        if self.entered.get() {
90            trace!("Dropped BlockingPermit (semaphore)");
91        } else {
92            warn!("Dropped BlockingPermit (semaphore) was never entered")
93        }
94    }
95}
96
97/// Request a permit to perform a blocking operation on the current thread.
98///
99/// The returned future attempts to obtain a permit from the provided
100/// `Semaphore` and outputs a `BlockingPermit` which can then be
101/// [`run`](BlockingPermit::run) to allow blocking or "long running"
102/// operation, while the `BlockingPermit` remains in scope. If no permits are
103/// immediately available, then the current task context will be notified when
104/// one becomes available.
105pub fn blocking_permit_future(semaphore: &Semaphore)
106    -> BlockingPermitFuture<'_>
107{
108    BlockingPermitFuture::new(semaphore)
109}