blocking_permit/permit/
intrusive.rs1use std::cell::Cell;
2use std::future::Future;
3use std::pin::Pin;
4use std::sync::{Mutex, TryLockError};
5use std::task::{Context, Poll};
6
7use futures_core::future::FusedFuture;
8
9use futures_intrusive::sync::{SemaphoreAcquireFuture, SemaphoreReleaser};
10
11pub use futures_intrusive::sync::Semaphore;
12
13use crate::{Canceled, Semaphorish};
14
15#[must_use = "must call `run` or `enter` before blocking"]
18#[derive(Debug)]
19pub struct BlockingPermit<'a> {
20 #[allow(unused)]
21 releaser: SemaphoreReleaser<'a>,
22 pub(crate) entered: Cell<bool>
23}
24
25#[must_use = "futures do nothing unless awaited or polled"]
27#[derive(Debug)]
28pub struct BlockingPermitFuture<'a> {
29 semaphore: &'a Semaphore,
30 acquire: Option<SemaphoreAcquireFuture<'a>>
31}
32
33impl Semaphorish for Semaphore {
34 fn default_new(permits: usize) -> Self {
35 Semaphore::new(true, permits)
36 }
37}
38
39impl<'a> BlockingPermitFuture<'a> {
40 pub fn new(semaphore: &Semaphore) -> BlockingPermitFuture<'_>
42 {
43 BlockingPermitFuture {
44 semaphore,
45 acquire: None
46 }
47 }
48
49 pub fn make_sync(self) -> SyncBlockingPermitFuture<'a> {
51 SyncBlockingPermitFuture {
52 futr: Mutex::new(self)
53 }
54 }
55}
56
57impl<'a> Future for BlockingPermitFuture<'a> {
58 type Output = Result<BlockingPermit<'a>, Canceled>;
59
60 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
64 -> Poll<Self::Output>
65 {
66 let this = unsafe { self.get_unchecked_mut() };
67 let acq = if let Some(ref mut af) = this.acquire {
68 af
69 } else {
70 this.acquire = Some(this.semaphore.acquire(1));
71 this.acquire.as_mut().unwrap()
72 };
73
74 let acq = unsafe { Pin::new_unchecked(acq) };
77 match acq.poll(cx) {
78 Poll::Pending => Poll::Pending,
79 Poll::Ready(releaser) => Poll::Ready(Ok(BlockingPermit {
80 releaser,
81 entered: Cell::new(false)
82 })),
83 }
84 }
85}
86
87impl<'a> FusedFuture for BlockingPermitFuture<'a> {
88 fn is_terminated(&self) -> bool {
89 if let Some(ref ff) = self.acquire {
90 ff.is_terminated()
91 } else {
92 false
93 }
94 }
95}
96
97#[must_use = "futures do nothing unless awaited or polled"]
99#[derive(Debug)]
100pub struct SyncBlockingPermitFuture<'a> {
101 futr: Mutex<BlockingPermitFuture<'a>>
102}
103
104impl<'a> SyncBlockingPermitFuture<'a> {
105 pub fn new(semaphore: &'a Semaphore) -> SyncBlockingPermitFuture<'a>
107 {
108 SyncBlockingPermitFuture {
109 futr: Mutex::new(BlockingPermitFuture::new(semaphore))
110 }
111 }
112}
113
114impl<'a> Future for SyncBlockingPermitFuture<'a> {
115 type Output = Result<BlockingPermit<'a>, Canceled>;
116
117 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>)
118 -> Poll<Self::Output>
119 {
120 match self.futr.try_lock() {
121 Ok(mut guard) => {
122 let futr = unsafe { Pin::new_unchecked(&mut *guard) };
123 futr.poll(cx)
124 }
125 Err(TryLockError::Poisoned(_)) => Poll::Ready(Err(Canceled)),
126 Err(TryLockError::WouldBlock) => {
127 cx.waker().wake_by_ref(); Poll::Pending
129 }
130 }
131 }
132}