local_sync/semaphore.rs
1//! Semaphore borrowed from tokio.
2
3#![allow(unused)]
4
5use core::future::Future;
6use std::{
7 cell::{RefCell, UnsafeCell},
8 cmp, fmt,
9 marker::PhantomPinned,
10 pin::Pin,
11 ptr::NonNull,
12 task::{Context, Poll, Waker},
13};
14
15use crate::{
16 linked_list::{self, LinkedList},
17 wake_list::WakeList,
18};
19
20/// Low level semaphore.
21pub(crate) struct Inner {
22 waiters: RefCell<Waitlist>,
23 /// The current number of available permits in the semaphore.
24 permits: RefCell<usize>,
25}
26
27struct Waitlist {
28 queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
29 closed: bool,
30}
31
32/// Error returned from the [`Semaphore::try_acquire`] function.
33///
34/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
35#[derive(Debug, PartialEq)]
36pub enum TryAcquireError {
37 /// The semaphore has been [closed] and cannot issue new permits.
38 ///
39 /// [closed]: crate::sync::Semaphore::close
40 Closed,
41
42 /// The semaphore has no available permits.
43 NoPermits,
44}
45
46/// Error returned from the [`Semaphore::acquire`] function.
47///
48/// An `acquire` operation can only fail if the semaphore has been
49/// [closed].
50///
51/// [closed]: crate::sync::Semaphore::close
52/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
53#[derive(Debug)]
54pub struct AcquireError(());
55
56pub(crate) struct Acquire<'a> {
57 node: Waiter,
58 semaphore: &'a Inner,
59 num_permits: u32,
60 queued: bool,
61}
62
63struct Waiter {
64 /// The current state of the waiter.
65 ///
66 /// This is either the number of remaining permits required by
67 /// the waiter, or a flag indicating that the waiter is not yet queued.
68 state: RefCell<usize>,
69
70 /// The waker to notify the task awaiting permits.
71 ///
72 /// # Safety
73 ///
74 /// This may only be accessed while the wait queue is locked.
75 waker: UnsafeCell<Option<Waker>>,
76
77 /// Intrusive linked-list pointers.
78 ///
79 /// # Safety
80 ///
81 /// This may only be accessed while the wait queue is locked.
82 pointers: linked_list::Pointers<Waiter>,
83
84 /// Should not be `Unpin`.
85 _p: PhantomPinned,
86}
87
88impl Waiter {
89 fn new(num_permits: u32) -> Self {
90 Waiter {
91 waker: UnsafeCell::new(None),
92 state: RefCell::new(num_permits as usize),
93 pointers: linked_list::Pointers::new(),
94 _p: PhantomPinned,
95 }
96 }
97
98 /// Assign permits to the waiter.
99 ///
100 /// Returns `true` if the waiter should be removed from the queue
101 fn assign_permits(&self, n: &mut usize) -> bool {
102 let mut curr = self.state.borrow_mut();
103 let assign = cmp::min(*curr, *n);
104 *curr -= assign;
105 *n -= assign;
106
107 *curr == 0
108 }
109}
110
111unsafe impl linked_list::Link for Waiter {
112 // XXX: ideally, we would be able to use `Pin` here, to enforce the
113 // invariant that list entries may not move while in the list. However, we
114 // can't do this currently, as using `Pin<&'a mut Waiter>` as the `Handle`
115 // type would require `Semaphore` to be generic over a lifetime. We can't
116 // use `Pin<*mut Waiter>`, as raw pointers are `Unpin` regardless of whether
117 // or not they dereference to an `!Unpin` target.
118 type Handle = NonNull<Waiter>;
119 type Target = Waiter;
120
121 fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
122 *handle
123 }
124
125 unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
126 ptr
127 }
128
129 unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
130 NonNull::from(&mut target.as_mut().pointers)
131 }
132}
133
134impl Inner {
135 /// The maximum number of permits which a semaphore can hold.
136 ///
137 /// Note that this reserves three bits of flags in the permit counter, but
138 /// we only actually use one of them. However, the previous semaphore
139 /// implementation used three bits, so we will continue to reserve them to
140 /// avoid a breaking change if additional flags need to be added in the
141 /// future.
142 pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3;
143 const CLOSED: usize = 1;
144 // The least-significant bit in the number of permits is reserved to use
145 // as a flag indicating that the semaphore has been closed. Consequently
146 // PERMIT_SHIFT is used to leave that bit for that purpose.
147 const PERMIT_SHIFT: usize = 1;
148
149 /// Creates a new semaphore with the initial number of permits
150 ///
151 /// Maximum number of permits on 32-bit platforms is `1<<29`.
152 pub(crate) const fn new(mut permits: usize) -> Self {
153 permits &= Self::MAX_PERMITS;
154
155 Self {
156 permits: RefCell::new(permits << Self::PERMIT_SHIFT),
157 waiters: RefCell::new(Waitlist {
158 queue: LinkedList::new(),
159 closed: false,
160 }),
161 }
162 }
163
164 /// Returns the current number of available permits
165 pub(crate) fn available_permits(&self) -> usize {
166 *self.permits.borrow() >> Self::PERMIT_SHIFT
167 }
168
169 /// Adds `added` new permits to the semaphore.
170 ///
171 /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
172 pub(crate) fn release(&self, added: usize) {
173 if added == 0 {
174 return;
175 }
176
177 // Assign permits to the wait queue
178 self.add_permits(added);
179 }
180
181 /// Closes the semaphore. This prevents the semaphore from issuing new
182 /// permits and notifies all pending waiters.
183 pub(crate) fn close(&self) {
184 *self.permits.borrow_mut() |= Self::CLOSED;
185 (*self.waiters.borrow_mut()).closed = true;
186
187 let mut waiters = self.waiters.borrow_mut();
188
189 while let Some(mut waiter) = waiters.queue.pop_back() {
190 let waker = unsafe { (*waiter.as_mut().waker.get()).take() };
191 if let Some(waker) = waker {
192 waker.wake();
193 }
194 }
195 }
196
197 /// Returns true if the semaphore is closed
198 pub(crate) fn is_closed(&self) -> bool {
199 *self.permits.borrow() & Self::CLOSED != 0
200 }
201
202 pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
203 assert!(
204 num_permits as usize <= Self::MAX_PERMITS,
205 "a semaphore may not have more than MAX_PERMITS permits ({})",
206 Self::MAX_PERMITS
207 );
208 let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
209 let mut curr = self.permits.borrow_mut();
210
211 // Has the semaphore closed?
212 if *curr & Self::CLOSED == Self::CLOSED {
213 return Err(TryAcquireError::Closed);
214 }
215
216 // Are there enough permits remaining?
217 if *curr < num_permits {
218 return Err(TryAcquireError::NoPermits);
219 }
220
221 *curr -= num_permits;
222 Ok(())
223 }
224
225 pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> {
226 Acquire::new(self, num_permits)
227 }
228
229 /// Release `rem` permits to the semaphore's wait list, starting from the
230 /// end of the queue.
231 ///
232 /// If `rem` exceeds the number of permits needed by the wait list, the
233 /// remainder are assigned back to the semaphore.
234 fn add_permits(&self, mut rem: usize) {
235 let mut waiters = self.waiters.borrow_mut();
236 let mut wakers = WakeList::new();
237 let mut is_empty = false;
238 while rem > 0 {
239 'inner: while wakers.can_push() {
240 // Was the waiter assigned enough permits to wake it?
241 match waiters.queue.last() {
242 Some(waiter) => {
243 if !waiter.assign_permits(&mut rem) {
244 break 'inner;
245 }
246 }
247 None => {
248 is_empty = true;
249 // If we assigned permits to all the waiters in the queue, and there are
250 // still permits left over, assign them back to the semaphore.
251 break 'inner;
252 }
253 };
254 let mut waiter = waiters.queue.pop_back().unwrap();
255 if let Some(waker) = unsafe { (*waiter.as_mut().waker.get()).take() } {
256 wakers.push(waker);
257 }
258 }
259
260 if rem > 0 && is_empty {
261 let permits = rem;
262 assert!(
263 permits <= Self::MAX_PERMITS,
264 "cannot add more than MAX_PERMITS permits ({})",
265 Self::MAX_PERMITS
266 );
267 *self.permits.borrow_mut() += rem << Self::PERMIT_SHIFT;
268 rem = 0;
269 }
270
271 wakers.wake_all();
272 }
273
274 assert_eq!(rem, 0);
275 }
276
277 fn poll_acquire(
278 &self,
279 cx: &mut Context<'_>,
280 num_permits: u32,
281 node: Pin<&mut Waiter>,
282 queued: bool,
283 ) -> Poll<Result<(), AcquireError>> {
284 let needed = if queued {
285 *node.state.borrow() << Self::PERMIT_SHIFT
286 } else {
287 (num_permits as usize) << Self::PERMIT_SHIFT
288 };
289
290 let mut curr = self.permits.borrow_mut();
291
292 // If closed, return error immediately.
293 if *curr & Self::CLOSED > 0 {
294 return Poll::Ready(Err(AcquireError::closed()));
295 }
296 // If the current permits is enough and not queued, assign permit
297 // and return ok immediately.
298 if *curr >= needed && !queued {
299 *curr -= needed;
300 return Poll::Ready(Ok(()));
301 }
302
303 // Clear permits and assign it.
304 let mut permits = *curr >> Self::PERMIT_SHIFT;
305 *curr = 0;
306 drop(curr);
307 if node.assign_permits(&mut permits) {
308 // TODO: may never be here?
309 self.add_permits(permits);
310 return Poll::Ready(Ok(()));
311 }
312
313 // Replace waker if needed.
314 let waker = unsafe { &mut *node.waker.get() };
315 // Do we need to register the new waker?
316 if waker
317 .as_ref()
318 .map(|waker| !waker.will_wake(cx.waker()))
319 .unwrap_or(true)
320 {
321 *waker = Some(cx.waker().clone());
322 }
323
324 // If the waiter is not already in the wait queue, enqueue it.
325 if !queued {
326 let node = unsafe {
327 let node = Pin::into_inner_unchecked(node) as *mut _;
328 NonNull::new_unchecked(node)
329 };
330
331 self.waiters.borrow_mut().queue.push_front(node);
332 }
333
334 Poll::Pending
335 }
336}
337
338impl fmt::Debug for Inner {
339 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
340 fmt.debug_struct("Semaphore")
341 .field("permits", &self.available_permits())
342 .finish()
343 }
344}
345
346impl Future for Acquire<'_> {
347 type Output = Result<(), AcquireError>;
348
349 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
350 let (node, semaphore, needed, queued) = self.project();
351
352 match semaphore.poll_acquire(cx, needed, node, *queued) {
353 Poll::Pending => {
354 *queued = true;
355 Poll::Pending
356 }
357 Poll::Ready(r) => {
358 r?;
359 *queued = false;
360 Poll::Ready(Ok(()))
361 }
362 }
363 }
364}
365
366impl<'a> Acquire<'a> {
367 fn new(semaphore: &'a Inner, num_permits: u32) -> Self {
368 Self {
369 node: Waiter::new(num_permits),
370 semaphore,
371 num_permits,
372 queued: false,
373 }
374 }
375
376 fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Inner, u32, &mut bool) {
377 fn is_unpin<T: Unpin>() {}
378 unsafe {
379 // Safety: all fields other than `node` are `Unpin`
380
381 is_unpin::<&Inner>();
382 is_unpin::<&mut bool>();
383 is_unpin::<u32>();
384
385 let this = self.get_unchecked_mut();
386 (
387 Pin::new_unchecked(&mut this.node),
388 this.semaphore,
389 this.num_permits,
390 &mut this.queued,
391 )
392 }
393 }
394}
395
396impl Drop for Acquire<'_> {
397 fn drop(&mut self) {
398 // If the future is completed, there is no node in the wait list, so we
399 // can skip acquiring the lock.
400 if !self.queued {
401 return;
402 }
403
404 // This is where we ensure safety. The future is being dropped,
405 // which means we must ensure that the waiter entry is no longer stored
406 // in the linked list.
407 let mut waiters = self.semaphore.waiters.borrow_mut();
408
409 // remove the entry from the list
410 let node = NonNull::from(&mut self.node);
411 // Safety: we have locked the wait list.
412 unsafe { waiters.queue.remove(node) };
413
414 let acquired_permits = self.num_permits as usize - *self.node.state.borrow();
415 if acquired_permits > 0 {
416 self.semaphore.add_permits(acquired_permits);
417 }
418 }
419}
420
421impl AcquireError {
422 fn closed() -> AcquireError {
423 AcquireError(())
424 }
425}
426
427impl fmt::Display for AcquireError {
428 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
429 write!(fmt, "semaphore closed")
430 }
431}
432
433impl std::error::Error for AcquireError {}
434
435impl TryAcquireError {
436 /// Returns `true` if the error was caused by a closed semaphore.
437 #[allow(dead_code)] // may be used later!
438 pub(crate) fn is_closed(&self) -> bool {
439 matches!(self, TryAcquireError::Closed)
440 }
441
442 /// Returns `true` if the error was caused by calling `try_acquire` on a
443 /// semaphore with no available permits.
444 #[allow(dead_code)] // may be used later!
445 pub(crate) fn is_no_permits(&self) -> bool {
446 matches!(self, TryAcquireError::NoPermits)
447 }
448}
449
450impl fmt::Display for TryAcquireError {
451 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
452 match self {
453 TryAcquireError::Closed => write!(fmt, "semaphore closed"),
454 TryAcquireError::NoPermits => write!(fmt, "no permits available"),
455 }
456 }
457}
458
459impl std::error::Error for TryAcquireError {}
460
461/// Counting semaphore performing asynchronous permit acquisition.
462///
463/// A semaphore maintains a set of permits. Permits are used to synchronize
464/// access to a shared resource. A semaphore differs from a mutex in that it
465/// can allow more than one concurrent caller to access the shared resource at a
466/// time.
467///
468/// When `acquire` is called and the semaphore has remaining permits, the
469/// function immediately returns a permit. However, if no remaining permits are
470/// available, `acquire` (asynchronously) waits until an outstanding permit is
471/// dropped. At this point, the freed permit is assigned to the caller.
472///
473/// This `Semaphore` is fair, which means that permits are given out in the order
474/// they were requested. This fairness is also applied when `acquire_many` gets
475/// involved, so if a call to `acquire_many` at the front of the queue requests
476/// more permits than currently available, this can prevent a call to `acquire`
477/// from completing, even if the semaphore has enough permits complete the call
478/// to `acquire`.
479///
480/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
481/// utility.
482///
483/// # Examples
484///
485/// Basic usage:
486///
487/// ```
488/// use local_sync::semaphore::{Semaphore, TryAcquireError};
489///
490/// #[monoio::main]
491/// async fn main() {
492/// let semaphore = Semaphore::new(3);
493///
494/// let a_permit = semaphore.acquire().await.unwrap();
495/// let two_permits = semaphore.acquire_many(2).await.unwrap();
496///
497/// assert_eq!(semaphore.available_permits(), 0);
498///
499/// let permit_attempt = semaphore.try_acquire();
500/// assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
501/// }
502/// ```
503///
504/// Use [`Semaphore::acquire_owned`] to move permits across tasks:
505///
506/// ```
507/// use std::rc::Rc;
508/// use local_sync::semaphore::Semaphore;
509///
510/// #[monoio::main]
511/// async fn main() {
512/// let semaphore = Rc::new(Semaphore::new(3));
513/// let mut join_handles = Vec::new();
514///
515/// for _ in 0..5 {
516/// let permit = semaphore.clone().acquire_owned().await.unwrap();
517/// join_handles.push(monoio::spawn(async move {
518/// // perform task...
519/// // explicitly own `permit` in the task
520/// drop(permit);
521/// }));
522/// }
523///
524/// for handle in join_handles {
525/// handle.await;
526/// }
527/// }
528/// ```
529///
530/// [`PollSemaphore`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSemaphore.html
531/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
532#[derive(Debug)]
533pub struct Semaphore(Inner);
534
535/// A permit from the semaphore.
536///
537/// This type is created by the [`acquire`] method.
538///
539/// [`acquire`]: crate::sync::Semaphore::acquire()
540#[must_use]
541#[derive(Debug)]
542pub struct SemaphorePermit<'a> {
543 sem: &'a Semaphore,
544 permits: u32,
545}
546
547/// An owned permit from the semaphore.
548///
549/// This type is created by the [`acquire_owned`] method.
550///
551/// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
552#[must_use]
553#[derive(Debug)]
554pub struct OwnedSemaphorePermit {
555 sem: std::rc::Rc<Semaphore>,
556 permits: u32,
557}
558
559pub struct AcquireResult<'a>(Acquire<'a>, &'a Semaphore, u32);
560
561impl<'a> Future for AcquireResult<'a> {
562 type Output = Result<SemaphorePermit<'a>, AcquireError>;
563
564 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
565 let sem = self.1;
566 let permits = self.2;
567 let inner = unsafe { self.map_unchecked_mut(|me| &mut me.0) };
568 futures_util::ready!(inner.poll(cx))?;
569 Poll::Ready(Ok(SemaphorePermit { sem, permits }))
570 }
571}
572
573impl Semaphore {
574 /// Creates a new semaphore with the initial number of permits.
575 pub const fn new(permits: usize) -> Self {
576 Self(Inner::new(permits))
577 }
578
579 /// Returns the current number of available permits.
580 pub fn available_permits(&self) -> usize {
581 self.0.available_permits()
582 }
583
584 /// Adds `n` new permits to the semaphore.
585 ///
586 /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
587 pub fn add_permits(&self, n: usize) {
588 self.0.release(n);
589 }
590
591 /// Acquires a permit from the semaphore.
592 ///
593 /// If the semaphore has been closed, this returns an [`AcquireError`].
594 /// Otherwise, this returns a [`SemaphorePermit`] representing the
595 /// acquired permit.
596 ///
597 /// # Cancel safety
598 ///
599 /// This method uses a queue to fairly distribute permits in the order they
600 /// were requested. Cancelling a call to `acquire` makes you lose your place
601 /// in the queue.
602 ///
603 /// # Examples
604 ///
605 /// ```
606 /// use local_sync::semaphore::Semaphore;
607 ///
608 /// #[monoio::main]
609 /// async fn main() {
610 /// let semaphore = Semaphore::new(2);
611 ///
612 /// let permit_1 = semaphore.acquire().await.unwrap();
613 /// assert_eq!(semaphore.available_permits(), 1);
614 ///
615 /// let permit_2 = semaphore.acquire().await.unwrap();
616 /// assert_eq!(semaphore.available_permits(), 0);
617 ///
618 /// drop(permit_1);
619 /// assert_eq!(semaphore.available_permits(), 1);
620 /// }
621 /// ```
622 ///
623 /// [`AcquireError`]: crate::sync::AcquireError
624 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
625 pub fn acquire(&self) -> AcquireResult<'_> {
626 let acq = self.0.acquire(1);
627 AcquireResult(acq, self, 1)
628 }
629
630 /// Acquires `n` permits from the semaphore.
631 ///
632 /// If the semaphore has been closed, this returns an [`AcquireError`].
633 /// Otherwise, this returns a [`SemaphorePermit`] representing the
634 /// acquired permits.
635 ///
636 /// # Cancel safety
637 ///
638 /// This method uses a queue to fairly distribute permits in the order they
639 /// were requested. Cancelling a call to `acquire_many` makes you lose your
640 /// place in the queue.
641 ///
642 /// # Examples
643 ///
644 /// ```
645 /// use local_sync::semaphore::Semaphore;
646 ///
647 /// #[monoio::main]
648 /// async fn main() {
649 /// let semaphore = Semaphore::new(5);
650 ///
651 /// let permit = semaphore.acquire_many(3).await.unwrap();
652 /// assert_eq!(semaphore.available_permits(), 2);
653 /// }
654 /// ```
655 ///
656 /// [`AcquireError`]: crate::sync::AcquireError
657 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
658 pub fn acquire_many(&self, n: u32) -> AcquireResult<'_> {
659 let acq = self.0.acquire(n);
660 AcquireResult(acq, self, n)
661 }
662
663 /// Tries to acquire a permit from the semaphore.
664 ///
665 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
666 /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
667 /// this returns a [`SemaphorePermit`] representing the acquired permits.
668 ///
669 /// # Examples
670 ///
671 /// ```
672 /// use local_sync::semaphore::{Semaphore, TryAcquireError};
673 ///
674 /// # fn main() {
675 /// let semaphore = Semaphore::new(2);
676 ///
677 /// let permit_1 = semaphore.try_acquire().unwrap();
678 /// assert_eq!(semaphore.available_permits(), 1);
679 ///
680 /// let permit_2 = semaphore.try_acquire().unwrap();
681 /// assert_eq!(semaphore.available_permits(), 0);
682 ///
683 /// let permit_3 = semaphore.try_acquire();
684 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
685 /// # }
686 /// ```
687 ///
688 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
689 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
690 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
691 pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
692 match self.0.try_acquire(1) {
693 Ok(_) => Ok(SemaphorePermit {
694 sem: self,
695 permits: 1,
696 }),
697 Err(e) => Err(e),
698 }
699 }
700
701 /// Tries to acquire `n` permits from the semaphore.
702 ///
703 /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
704 /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
705 /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
706 ///
707 /// # Examples
708 ///
709 /// ```
710 /// use local_sync::semaphore::{Semaphore, TryAcquireError};
711 ///
712 /// # fn main() {
713 /// let semaphore = Semaphore::new(4);
714 ///
715 /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
716 /// assert_eq!(semaphore.available_permits(), 1);
717 ///
718 /// let permit_2 = semaphore.try_acquire_many(2);
719 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
720 /// # }
721 /// ```
722 ///
723 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
724 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
725 /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
726 pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
727 match self.0.try_acquire(n) {
728 Ok(_) => Ok(SemaphorePermit {
729 sem: self,
730 permits: n,
731 }),
732 Err(e) => Err(e),
733 }
734 }
735
736 /// Acquires a permit from the semaphore.
737 ///
738 /// The semaphore must be wrapped in an [`Rc`] to call this method.
739 /// If the semaphore has been closed, this returns an [`AcquireError`].
740 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
741 /// acquired permit.
742 ///
743 /// # Cancel safety
744 ///
745 /// This method uses a queue to fairly distribute permits in the order they
746 /// were requested. Cancelling a call to `acquire_owned` makes you lose your
747 /// place in the queue.
748 ///
749 /// # Examples
750 ///
751 /// ```
752 /// use std::rc::Rc;
753 /// use local_sync::semaphore::Semaphore;
754 ///
755 /// #[monoio::main]
756 /// async fn main() {
757 /// let semaphore = Rc::new(Semaphore::new(3));
758 /// let mut join_handles = Vec::new();
759 ///
760 /// for _ in 0..5 {
761 /// let permit = semaphore.clone().acquire_owned().await.unwrap();
762 /// join_handles.push(monoio::spawn(async move {
763 /// // perform task...
764 /// // explicitly own `permit` in the task
765 /// drop(permit);
766 /// }));
767 /// }
768 ///
769 /// for handle in join_handles {
770 /// handle.await;
771 /// }
772 /// }
773 /// ```
774 ///
775 /// [`Rc`]: std::sync::Rc
776 /// [`AcquireError`]: crate::sync::AcquireError
777 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
778 pub async fn acquire_owned(
779 self: std::rc::Rc<Self>,
780 ) -> Result<OwnedSemaphorePermit, AcquireError> {
781 self.0.acquire(1).await?;
782 Ok(OwnedSemaphorePermit {
783 sem: self,
784 permits: 1,
785 })
786 }
787
788 /// Acquires `n` permits from the semaphore.
789 ///
790 /// The semaphore must be wrapped in an [`Rc`] to call this method.
791 /// If the semaphore has been closed, this returns an [`AcquireError`].
792 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
793 /// acquired permit.
794 ///
795 /// # Cancel safety
796 ///
797 /// This method uses a queue to fairly distribute permits in the order they
798 /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
799 /// your place in the queue.
800 ///
801 /// # Examples
802 ///
803 /// ```
804 /// use std::rc::Rc;
805 /// use local_sync::semaphore::Semaphore;
806 ///
807 /// #[monoio::main]
808 /// async fn main() {
809 /// let semaphore = Rc::new(Semaphore::new(10));
810 /// let mut join_handles = Vec::new();
811 ///
812 /// for _ in 0..5 {
813 /// let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
814 /// join_handles.push(monoio::spawn(async move {
815 /// // perform task...
816 /// // explicitly own `permit` in the task
817 /// drop(permit);
818 /// }));
819 /// }
820 ///
821 /// for handle in join_handles {
822 /// handle.await;
823 /// }
824 /// }
825 /// ```
826 ///
827 /// [`Rc`]: std::sync::Rc
828 /// [`AcquireError`]: crate::sync::AcquireError
829 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
830 pub async fn acquire_many_owned(
831 self: std::rc::Rc<Self>,
832 n: u32,
833 ) -> Result<OwnedSemaphorePermit, AcquireError> {
834 self.0.acquire(n).await?;
835 Ok(OwnedSemaphorePermit {
836 sem: self,
837 permits: n,
838 })
839 }
840
841 /// Tries to acquire a permit from the semaphore.
842 ///
843 /// The semaphore must be wrapped in an [`Rc`] to call this method. If
844 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
845 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
846 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
847 /// acquired permit.
848 ///
849 /// # Examples
850 ///
851 /// ```
852 /// use std::rc::Rc;
853 /// use local_sync::semaphore::{Semaphore, TryAcquireError};
854 ///
855 /// # fn main() {
856 /// let semaphore = Rc::new(Semaphore::new(2));
857 ///
858 /// let permit_1 = Rc::clone(&semaphore).try_acquire_owned().unwrap();
859 /// assert_eq!(semaphore.available_permits(), 1);
860 ///
861 /// let permit_2 = Rc::clone(&semaphore).try_acquire_owned().unwrap();
862 /// assert_eq!(semaphore.available_permits(), 0);
863 ///
864 /// let permit_3 = semaphore.try_acquire_owned();
865 /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
866 /// # }
867 /// ```
868 ///
869 /// [`Rc`]: std::sync::Rc
870 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
871 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
872 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
873 pub fn try_acquire_owned(
874 self: std::rc::Rc<Self>,
875 ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
876 match self.0.try_acquire(1) {
877 Ok(_) => Ok(OwnedSemaphorePermit {
878 sem: self,
879 permits: 1,
880 }),
881 Err(e) => Err(e),
882 }
883 }
884
885 /// Tries to acquire `n` permits from the semaphore.
886 ///
887 /// The semaphore must be wrapped in an [`Rc`] to call this method. If
888 /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
889 /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
890 /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
891 /// acquired permit.
892 ///
893 /// # Examples
894 ///
895 /// ```
896 /// use std::rc::Rc;
897 /// use local_sync::semaphore::{Semaphore, TryAcquireError};
898 ///
899 /// # fn main() {
900 /// let semaphore = Rc::new(Semaphore::new(4));
901 ///
902 /// let permit_1 = Rc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
903 /// assert_eq!(semaphore.available_permits(), 1);
904 ///
905 /// let permit_2 = semaphore.try_acquire_many_owned(2);
906 /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
907 /// # }
908 /// ```
909 ///
910 /// [`Rc`]: std::sync::Rc
911 /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
912 /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
913 /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
914 pub fn try_acquire_many_owned(
915 self: std::rc::Rc<Self>,
916 n: u32,
917 ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
918 match self.0.try_acquire(n) {
919 Ok(_) => Ok(OwnedSemaphorePermit {
920 sem: self,
921 permits: n,
922 }),
923 Err(e) => Err(e),
924 }
925 }
926
927 /// Closes the semaphore.
928 ///
929 /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
930 ///
931 /// # Examples
932 ///
933 /// ```
934 /// use local_sync::semaphore::{Semaphore, TryAcquireError};
935 /// use std::rc::Rc;
936 ///
937 /// #[monoio::main]
938 /// async fn main() {
939 /// let semaphore = Rc::new(Semaphore::new(1));
940 /// let semaphore2 = semaphore.clone();
941 ///
942 /// monoio::spawn(async move {
943 /// let permit = semaphore.acquire_many(2).await;
944 /// assert!(permit.is_err());
945 /// println!("waiter received error");
946 /// });
947 ///
948 /// println!("closing semaphore");
949 /// semaphore2.close();
950 ///
951 /// // Cannot obtain more permits
952 /// assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
953 /// }
954 /// ```
955 pub fn close(&self) {
956 self.0.close();
957 }
958
959 /// Returns true if the semaphore is closed
960 pub fn is_closed(&self) -> bool {
961 self.0.is_closed()
962 }
963}
964
965impl<'a> SemaphorePermit<'a> {
966 /// Forgets the permit **without** releasing it back to the semaphore.
967 /// This can be used to reduce the amount of permits available from a
968 /// semaphore.
969 pub fn forget(mut self) {
970 self.permits = 0;
971 }
972}
973
974impl OwnedSemaphorePermit {
975 /// Forgets the permit **without** releasing it back to the semaphore.
976 /// This can be used to reduce the amount of permits available from a
977 /// semaphore.
978 pub fn forget(mut self) {
979 self.permits = 0;
980 }
981}
982
983impl<'a> Drop for SemaphorePermit<'_> {
984 fn drop(&mut self) {
985 self.sem.add_permits(self.permits as usize);
986 }
987}
988
989impl Drop for OwnedSemaphorePermit {
990 fn drop(&mut self) {
991 self.sem.add_permits(self.permits as usize);
992 }
993}
994
995#[cfg(test)]
996mod tests {
997 use super::{Inner, Semaphore};
998
999 #[monoio::test]
1000 async fn inner_works() {
1001 let s = Inner::new(10);
1002 for _ in 0..10 {
1003 s.acquire(1).await.unwrap();
1004 }
1005 }
1006
1007 #[monoio::test]
1008 async fn inner_release_after_acquire() {
1009 let s = std::rc::Rc::new(Inner::new(0));
1010
1011 let s_move = s.clone();
1012 let join = monoio::spawn(async move {
1013 let _ = s_move.acquire(1).await.unwrap();
1014 let _ = s_move.acquire(1).await.unwrap();
1015 });
1016 s.release(2);
1017 join.await;
1018 }
1019
1020 #[monoio::test]
1021 async fn it_works() {
1022 let s = Semaphore::new(0);
1023 s.add_permits(1);
1024 let _ = s.acquire().await.unwrap();
1025 }
1026}