blocking_permit/permit/
intrusive.rs

1use std::cell::Cell;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Mutex, TryLockError};
5use std::task::{Context, Poll};
6
7use futures_core::future::FusedFuture;
8
9use futures_intrusive::sync::{SemaphoreAcquireFuture, SemaphoreReleaser};
10
11pub use futures_intrusive::sync::Semaphore;
12
13use crate::{Canceled, Semaphorish};
14
15/// A scoped permit for blocking operations. When dropped (out of scope or
16/// manually), the permit is released.
17#[must_use = "must call `run` or `enter` before blocking"]
18#[derive(Debug)]
19pub struct BlockingPermit<'a> {
20    #[allow(unused)]
21    releaser: SemaphoreReleaser<'a>,
22    pub(crate) entered: Cell<bool>
23}
24
25/// A future which resolves to a [`BlockingPermit`].
26#[must_use = "futures do nothing unless awaited or polled"]
27#[derive(Debug)]
28pub struct BlockingPermitFuture<'a> {
29    semaphore: &'a Semaphore,
30    acquire: Option<SemaphoreAcquireFuture<'a>>
31}
32
33impl Semaphorish for Semaphore {
34    fn default_new(permits: usize) -> Self {
35        Semaphore::new(true, permits)
36    }
37}
38
39impl<'a> BlockingPermitFuture<'a> {
40    /// Construct given `Semaphore` reference.
41    pub fn new(semaphore: &Semaphore) -> BlockingPermitFuture<'_>
42    {
43        BlockingPermitFuture {
44            semaphore,
45            acquire: None
46        }
47    }
48
49    /// Make a `Sync` version of this future by wrapping with a `Mutex`.
50    pub fn make_sync(self) -> SyncBlockingPermitFuture<'a> {
51        SyncBlockingPermitFuture {
52            futr: Mutex::new(self)
53        }
54    }
55}
56
57impl<'a> Future for BlockingPermitFuture<'a> {
58    type Output = Result<BlockingPermit<'a>, Canceled>;
59
60    // Note that with this implementation, `Canceled` is never returned. For
61    // maximum future flexibilty, however, we keep the error type in place.
62
63    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
64        -> Poll<Self::Output>
65    {
66        let this = unsafe { self.get_unchecked_mut() };
67        let acq = if let Some(ref mut af) = this.acquire {
68            af
69        } else {
70            this.acquire = Some(this.semaphore.acquire(1));
71            this.acquire.as_mut().unwrap()
72        };
73
74        // Safety: In this projection we Pin the underlying Future for the
75        // duration of `poll` and it is not further moved.
76        let acq = unsafe { Pin::new_unchecked(acq) };
77        match acq.poll(cx) {
78            Poll::Pending => Poll::Pending,
79            Poll::Ready(releaser) => Poll::Ready(Ok(BlockingPermit {
80                releaser,
81                entered: Cell::new(false)
82            })),
83       }
84    }
85}
86
87impl<'a> FusedFuture for BlockingPermitFuture<'a> {
88    fn is_terminated(&self) -> bool {
89        if let Some(ref ff) = self.acquire {
90            ff.is_terminated()
91        } else {
92            false
93        }
94    }
95}
96
97/// A `Sync` wrapper available via [`BlockingPermitFuture::make_sync`].
98#[must_use = "futures do nothing unless awaited or polled"]
99#[derive(Debug)]
100pub struct SyncBlockingPermitFuture<'a> {
101    futr: Mutex<BlockingPermitFuture<'a>>
102}
103
104impl<'a> SyncBlockingPermitFuture<'a> {
105    /// Construct given `Semaphore` reference.
106    pub fn new(semaphore: &'a Semaphore) -> SyncBlockingPermitFuture<'a>
107    {
108        SyncBlockingPermitFuture {
109            futr: Mutex::new(BlockingPermitFuture::new(semaphore))
110        }
111    }
112}
113
114impl<'a> Future for SyncBlockingPermitFuture<'a> {
115    type Output = Result<BlockingPermit<'a>, Canceled>;
116
117    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
118        -> Poll<Self::Output>
119    {
120        match self.futr.try_lock() {
121            Ok(mut guard) => {
122                let futr = unsafe { Pin::new_unchecked(&mut *guard) };
123                futr.poll(cx)
124            }
125            Err(TryLockError::Poisoned(_)) => Poll::Ready(Err(Canceled)),
126            Err(TryLockError::WouldBlock) => {
127                cx.waker().wake_by_ref(); //any spin should be brief
128                Poll::Pending
129            }
130        }
131    }
132}