1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109
use log::{warn, trace}; #[cfg(feature = "tokio-semaphore")] mod tokio_semaphore; #[cfg(feature = "tokio-semaphore")] pub use tokio_semaphore::{ BlockingPermit, BlockingPermitFuture, Semaphore, SyncBlockingPermitFuture, }; #[cfg(not(feature = "tokio-semaphore"))] #[cfg(feature = "futures-intrusive")] mod intrusive; #[cfg(not(feature = "tokio-semaphore"))] #[cfg(feature = "futures-intrusive")] pub use intrusive::{ BlockingPermit, BlockingPermitFuture, Semaphore, SyncBlockingPermitFuture, }; /// Extension trait for uniform construction of the re-exported [`Semaphore`] /// types. pub trait Semaphorish { /// Construct given number of permits. /// /// This chooses _fair_ scheduling, if that is a supported option. fn default_new(permits: usize) -> Self; } // Note: these methods are common to both above struct definitions impl<'a> BlockingPermit<'a> { /// Enter the blocking section of code on the current thread. /// /// This is a secondary step from completion of the /// [`BlockingPermitFuture`] as it must be called on the same thread, /// immediately before the blocking section. The blocking permit should /// then be dropped at the end of the blocking section. If the /// _tokio-threaded_ feature is or might be used, `run` should be /// used instead. /// /// ## Panics /// /// Panics if this `BlockingPermit` has already been entered. pub fn enter(&self) { if self.entered.replace(true) { panic!("BlockingPermit::enter (or run) called twice!"); } } /// Enter and run the blocking closure. /// /// When the *tokio-threaded* feature is enabled, this wraps the /// `tokio::task::block_in_place` call, as a secondary step from completion /// of the [`BlockingPermitFuture`] that must be called on the same /// thread. /// /// In any case, the permit is passed by value and will be dropped on /// termination of this call. /// /// ## Panics /// /// Panics if this `BlockingPermit` has already been entered. pub fn run<F, T>(self, f: F) -> T where F: FnOnce() -> T { if self.entered.replace(true) { panic!("BlockingPermit::run (or enter) called twice!"); } #[cfg(feature="tokio-threaded")] { tokio::task::block_in_place(f) } #[cfg(not(feature="tokio-threaded"))] { f() } } } impl<'a> Drop for BlockingPermit<'a> { fn drop(&mut self) { if self.entered.get() { trace!("Dropped BlockingPermit (semaphore)"); } else { warn!("Dropped BlockingPermit (semaphore) was never entered") } } } /// Request a permit to perform a blocking operation on the current thread. /// /// The returned future attempts to obtain a permit from the provided /// `Semaphore` and outputs a `BlockingPermit` which can then be /// [`run`](BlockingPermit::run) to allow blocking or "long running" /// operation, while the `BlockingPermit` remains in scope. If no permits are /// immediately available, then the current task context will be notified when /// one becomes available. pub fn blocking_permit_future(semaphore: &Semaphore) -> BlockingPermitFuture<'_> { BlockingPermitFuture::new(semaphore) }