1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
use log::{warn, trace};

#[cfg(feature = "tokio-semaphore")]
mod tokio_semaphore;

#[cfg(feature = "tokio-semaphore")]
pub use tokio_semaphore::{
    BlockingPermit,
    BlockingPermitFuture,
    Semaphore,
    SyncBlockingPermitFuture,
};

#[cfg(not(feature = "tokio-semaphore"))]
#[cfg(feature = "futures-intrusive")]
mod intrusive;

#[cfg(not(feature = "tokio-semaphore"))]
#[cfg(feature = "futures-intrusive")]
pub use intrusive::{
    BlockingPermit,
    BlockingPermitFuture,
    Semaphore,
    SyncBlockingPermitFuture,
};

/// Extension trait for uniform construction of the re-exported [`Semaphore`]
/// types.
pub trait Semaphorish {
    /// Construct given number of permits.
    ///
    /// This chooses _fair_ scheduling, if that is a supported option.
    fn default_new(permits: usize) -> Self;
}

// Note: these methods are common to both above struct definitions

impl<'a> BlockingPermit<'a> {
    /// Enter the blocking section of code on the current thread.
    ///
    /// This is a secondary step from completion of the
    /// [`BlockingPermitFuture`] as it must be called on the same thread,
    /// immediately before the blocking section.  The blocking permit should
    /// then be dropped at the end of the blocking section. If the
    /// _tokio-threaded_ feature is or might be used, `run` should be
    /// used instead.
    ///
    /// ## Panics
    ///
    /// Panics if this `BlockingPermit` has already been entered.
    pub fn enter(&self) {
        if self.entered.replace(true) {
            panic!("BlockingPermit::enter (or run) called twice!");
        }
    }

    /// Enter and run the blocking closure.
    ///
    /// When the *tokio-threaded* feature is enabled, this wraps the
    /// `tokio::task::block_in_place` call, as a secondary step from completion
    /// of the [`BlockingPermitFuture`] that must be called on the same
    /// thread.
    ///
    /// In any case, the permit is passed by value and will be dropped on
    /// termination of this call.
    ///
    /// ## Panics
    ///
    /// Panics if this `BlockingPermit` has already been entered.
    pub fn run<F, T>(self, f: F) -> T
        where F: FnOnce() -> T
    {
        if self.entered.replace(true) {
            panic!("BlockingPermit::run (or enter) called twice!");
        }

        #[cfg(feature="tokio-threaded")] {
            tokio::task::block_in_place(f)
        }

        #[cfg(not(feature="tokio-threaded"))] {
            f()
        }
    }
}

impl<'a> Drop for BlockingPermit<'a> {
    fn drop(&mut self) {
        if self.entered.get() {
            trace!("Dropped BlockingPermit (semaphore)");
        } else {
            warn!("Dropped BlockingPermit (semaphore) was never entered")
        }
    }
}

/// Request a permit to perform a blocking operation on the current thread.
///
/// The returned future attempts to obtain a permit from the provided
/// `Semaphore` and outputs a `BlockingPermit` which can then be
/// [`run`](BlockingPermit::run) to allow blocking or "long running"
/// operation, while the `BlockingPermit` remains in scope. If no permits are
/// immediately available, then the current task context will be notified when
/// one becomes available.
pub fn blocking_permit_future(semaphore: &Semaphore)
    -> BlockingPermitFuture<'_>
{
    BlockingPermitFuture::new(semaphore)
}