local_sync/
semaphore.rs

1//! Semaphore borrowed from tokio.
2
3#![allow(unused)]
4
5use core::future::Future;
6use std::{
7    cell::{RefCell, UnsafeCell},
8    cmp, fmt,
9    marker::PhantomPinned,
10    pin::Pin,
11    ptr::NonNull,
12    task::{Context, Poll, Waker},
13};
14
15use crate::{
16    linked_list::{self, LinkedList},
17    wake_list::WakeList,
18};
19
20/// Low level semaphore.
21pub(crate) struct Inner {
22    waiters: RefCell<Waitlist>,
23    /// The current number of available permits in the semaphore.
24    permits: RefCell<usize>,
25}
26
27struct Waitlist {
28    queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
29    closed: bool,
30}
31
32/// Error returned from the [`Semaphore::try_acquire`] function.
33///
34/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
35#[derive(Debug, PartialEq)]
36pub enum TryAcquireError {
37    /// The semaphore has been [closed] and cannot issue new permits.
38    ///
39    /// [closed]: crate::sync::Semaphore::close
40    Closed,
41
42    /// The semaphore has no available permits.
43    NoPermits,
44}
45
46/// Error returned from the [`Semaphore::acquire`] function.
47///
48/// An `acquire` operation can only fail if the semaphore has been
49/// [closed].
50///
51/// [closed]: crate::sync::Semaphore::close
52/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
53#[derive(Debug)]
54pub struct AcquireError(());
55
56pub(crate) struct Acquire<'a> {
57    node: Waiter,
58    semaphore: &'a Inner,
59    num_permits: u32,
60    queued: bool,
61}
62
63struct Waiter {
64    /// The current state of the waiter.
65    ///
66    /// This is either the number of remaining permits required by
67    /// the waiter, or a flag indicating that the waiter is not yet queued.
68    state: RefCell<usize>,
69
70    /// The waker to notify the task awaiting permits.
71    ///
72    /// # Safety
73    ///
74    /// This may only be accessed while the wait queue is locked.
75    waker: UnsafeCell<Option<Waker>>,
76
77    /// Intrusive linked-list pointers.
78    ///
79    /// # Safety
80    ///
81    /// This may only be accessed while the wait queue is locked.
82    pointers: linked_list::Pointers<Waiter>,
83
84    /// Should not be `Unpin`.
85    _p: PhantomPinned,
86}
87
88impl Waiter {
89    fn new(num_permits: u32) -> Self {
90        Waiter {
91            waker: UnsafeCell::new(None),
92            state: RefCell::new(num_permits as usize),
93            pointers: linked_list::Pointers::new(),
94            _p: PhantomPinned,
95        }
96    }
97
98    /// Assign permits to the waiter.
99    ///
100    /// Returns `true` if the waiter should be removed from the queue
101    fn assign_permits(&self, n: &mut usize) -> bool {
102        let mut curr = self.state.borrow_mut();
103        let assign = cmp::min(*curr, *n);
104        *curr -= assign;
105        *n -= assign;
106
107        *curr == 0
108    }
109}
110
111unsafe impl linked_list::Link for Waiter {
112    // XXX: ideally, we would be able to use `Pin` here, to enforce the
113    // invariant that list entries may not move while in the list. However, we
114    // can't do this currently, as using `Pin<&'a mut Waiter>` as the `Handle`
115    // type would require `Semaphore` to be generic over a lifetime. We can't
116    // use `Pin<*mut Waiter>`, as raw pointers are `Unpin` regardless of whether
117    // or not they dereference to an `!Unpin` target.
118    type Handle = NonNull<Waiter>;
119    type Target = Waiter;
120
121    fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
122        *handle
123    }
124
125    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
126        ptr
127    }
128
129    unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
130        NonNull::from(&mut target.as_mut().pointers)
131    }
132}
133
134impl Inner {
135    /// The maximum number of permits which a semaphore can hold.
136    ///
137    /// Note that this reserves three bits of flags in the permit counter, but
138    /// we only actually use one of them. However, the previous semaphore
139    /// implementation used three bits, so we will continue to reserve them to
140    /// avoid a breaking change if additional flags need to be added in the
141    /// future.
142    pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3;
143    const CLOSED: usize = 1;
144    // The least-significant bit in the number of permits is reserved to use
145    // as a flag indicating that the semaphore has been closed. Consequently
146    // PERMIT_SHIFT is used to leave that bit for that purpose.
147    const PERMIT_SHIFT: usize = 1;
148
149    /// Creates a new semaphore with the initial number of permits
150    ///
151    /// Maximum number of permits on 32-bit platforms is `1<<29`.
152    pub(crate) const fn new(mut permits: usize) -> Self {
153        permits &= Self::MAX_PERMITS;
154
155        Self {
156            permits: RefCell::new(permits << Self::PERMIT_SHIFT),
157            waiters: RefCell::new(Waitlist {
158                queue: LinkedList::new(),
159                closed: false,
160            }),
161        }
162    }
163
164    /// Returns the current number of available permits
165    pub(crate) fn available_permits(&self) -> usize {
166        *self.permits.borrow() >> Self::PERMIT_SHIFT
167    }
168
169    /// Adds `added` new permits to the semaphore.
170    ///
171    /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
172    pub(crate) fn release(&self, added: usize) {
173        if added == 0 {
174            return;
175        }
176
177        // Assign permits to the wait queue
178        self.add_permits(added);
179    }
180
181    /// Closes the semaphore. This prevents the semaphore from issuing new
182    /// permits and notifies all pending waiters.
183    pub(crate) fn close(&self) {
184        *self.permits.borrow_mut() |= Self::CLOSED;
185        (*self.waiters.borrow_mut()).closed = true;
186
187        let mut waiters = self.waiters.borrow_mut();
188
189        while let Some(mut waiter) = waiters.queue.pop_back() {
190            let waker = unsafe { (*waiter.as_mut().waker.get()).take() };
191            if let Some(waker) = waker {
192                waker.wake();
193            }
194        }
195    }
196
197    /// Returns true if the semaphore is closed
198    pub(crate) fn is_closed(&self) -> bool {
199        *self.permits.borrow() & Self::CLOSED != 0
200    }
201
202    pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
203        assert!(
204            num_permits as usize <= Self::MAX_PERMITS,
205            "a semaphore may not have more than MAX_PERMITS permits ({})",
206            Self::MAX_PERMITS
207        );
208        let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
209        let mut curr = self.permits.borrow_mut();
210
211        // Has the semaphore closed?
212        if *curr & Self::CLOSED == Self::CLOSED {
213            return Err(TryAcquireError::Closed);
214        }
215
216        // Are there enough permits remaining?
217        if *curr < num_permits {
218            return Err(TryAcquireError::NoPermits);
219        }
220
221        *curr -= num_permits;
222        Ok(())
223    }
224
225    pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> {
226        Acquire::new(self, num_permits)
227    }
228
229    /// Release `rem` permits to the semaphore's wait list, starting from the
230    /// end of the queue.
231    ///
232    /// If `rem` exceeds the number of permits needed by the wait list, the
233    /// remainder are assigned back to the semaphore.
234    fn add_permits(&self, mut rem: usize) {
235        let mut waiters = self.waiters.borrow_mut();
236        let mut wakers = WakeList::new();
237        let mut is_empty = false;
238        while rem > 0 {
239            'inner: while wakers.can_push() {
240                // Was the waiter assigned enough permits to wake it?
241                match waiters.queue.last() {
242                    Some(waiter) => {
243                        if !waiter.assign_permits(&mut rem) {
244                            break 'inner;
245                        }
246                    }
247                    None => {
248                        is_empty = true;
249                        // If we assigned permits to all the waiters in the queue, and there are
250                        // still permits left over, assign them back to the semaphore.
251                        break 'inner;
252                    }
253                };
254                let mut waiter = waiters.queue.pop_back().unwrap();
255                if let Some(waker) = unsafe { (*waiter.as_mut().waker.get()).take() } {
256                    wakers.push(waker);
257                }
258            }
259
260            if rem > 0 && is_empty {
261                let permits = rem;
262                assert!(
263                    permits <= Self::MAX_PERMITS,
264                    "cannot add more than MAX_PERMITS permits ({})",
265                    Self::MAX_PERMITS
266                );
267                *self.permits.borrow_mut() += rem << Self::PERMIT_SHIFT;
268                rem = 0;
269            }
270
271            wakers.wake_all();
272        }
273
274        assert_eq!(rem, 0);
275    }
276
277    fn poll_acquire(
278        &self,
279        cx: &mut Context<'_>,
280        num_permits: u32,
281        node: Pin<&mut Waiter>,
282        queued: bool,
283    ) -> Poll<Result<(), AcquireError>> {
284        let needed = if queued {
285            *node.state.borrow() << Self::PERMIT_SHIFT
286        } else {
287            (num_permits as usize) << Self::PERMIT_SHIFT
288        };
289
290        let mut curr = self.permits.borrow_mut();
291
292        // If closed, return error immediately.
293        if *curr & Self::CLOSED > 0 {
294            return Poll::Ready(Err(AcquireError::closed()));
295        }
296        // If the current permits is enough and not queued, assign permit
297        // and return ok immediately.
298        if *curr >= needed && !queued {
299            *curr -= needed;
300            return Poll::Ready(Ok(()));
301        }
302
303        // Clear permits and assign it.
304        let mut permits = *curr >> Self::PERMIT_SHIFT;
305        *curr = 0;
306        drop(curr);
307        if node.assign_permits(&mut permits) {
308            // TODO: may never be here?
309            self.add_permits(permits);
310            return Poll::Ready(Ok(()));
311        }
312
313        // Replace waker if needed.
314        let waker = unsafe { &mut *node.waker.get() };
315        // Do we need to register the new waker?
316        if waker
317            .as_ref()
318            .map(|waker| !waker.will_wake(cx.waker()))
319            .unwrap_or(true)
320        {
321            *waker = Some(cx.waker().clone());
322        }
323
324        // If the waiter is not already in the wait queue, enqueue it.
325        if !queued {
326            let node = unsafe {
327                let node = Pin::into_inner_unchecked(node) as *mut _;
328                NonNull::new_unchecked(node)
329            };
330
331            self.waiters.borrow_mut().queue.push_front(node);
332        }
333
334        Poll::Pending
335    }
336}
337
338impl fmt::Debug for Inner {
339    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
340        fmt.debug_struct("Semaphore")
341            .field("permits", &self.available_permits())
342            .finish()
343    }
344}
345
346impl Future for Acquire<'_> {
347    type Output = Result<(), AcquireError>;
348
349    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
350        let (node, semaphore, needed, queued) = self.project();
351
352        match semaphore.poll_acquire(cx, needed, node, *queued) {
353            Poll::Pending => {
354                *queued = true;
355                Poll::Pending
356            }
357            Poll::Ready(r) => {
358                r?;
359                *queued = false;
360                Poll::Ready(Ok(()))
361            }
362        }
363    }
364}
365
366impl<'a> Acquire<'a> {
367    fn new(semaphore: &'a Inner, num_permits: u32) -> Self {
368        Self {
369            node: Waiter::new(num_permits),
370            semaphore,
371            num_permits,
372            queued: false,
373        }
374    }
375
376    fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Inner, u32, &mut bool) {
377        fn is_unpin<T: Unpin>() {}
378        unsafe {
379            // Safety: all fields other than `node` are `Unpin`
380
381            is_unpin::<&Inner>();
382            is_unpin::<&mut bool>();
383            is_unpin::<u32>();
384
385            let this = self.get_unchecked_mut();
386            (
387                Pin::new_unchecked(&mut this.node),
388                this.semaphore,
389                this.num_permits,
390                &mut this.queued,
391            )
392        }
393    }
394}
395
396impl Drop for Acquire<'_> {
397    fn drop(&mut self) {
398        // If the future is completed, there is no node in the wait list, so we
399        // can skip acquiring the lock.
400        if !self.queued {
401            return;
402        }
403
404        // This is where we ensure safety. The future is being dropped,
405        // which means we must ensure that the waiter entry is no longer stored
406        // in the linked list.
407        let mut waiters = self.semaphore.waiters.borrow_mut();
408
409        // remove the entry from the list
410        let node = NonNull::from(&mut self.node);
411        // Safety: we have locked the wait list.
412        unsafe { waiters.queue.remove(node) };
413
414        let acquired_permits = self.num_permits as usize - *self.node.state.borrow();
415        if acquired_permits > 0 {
416            self.semaphore.add_permits(acquired_permits);
417        }
418    }
419}
420
421impl AcquireError {
422    fn closed() -> AcquireError {
423        AcquireError(())
424    }
425}
426
427impl fmt::Display for AcquireError {
428    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
429        write!(fmt, "semaphore closed")
430    }
431}
432
433impl std::error::Error for AcquireError {}
434
435impl TryAcquireError {
436    /// Returns `true` if the error was caused by a closed semaphore.
437    #[allow(dead_code)] // may be used later!
438    pub(crate) fn is_closed(&self) -> bool {
439        matches!(self, TryAcquireError::Closed)
440    }
441
442    /// Returns `true` if the error was caused by calling `try_acquire` on a
443    /// semaphore with no available permits.
444    #[allow(dead_code)] // may be used later!
445    pub(crate) fn is_no_permits(&self) -> bool {
446        matches!(self, TryAcquireError::NoPermits)
447    }
448}
449
450impl fmt::Display for TryAcquireError {
451    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
452        match self {
453            TryAcquireError::Closed => write!(fmt, "semaphore closed"),
454            TryAcquireError::NoPermits => write!(fmt, "no permits available"),
455        }
456    }
457}
458
459impl std::error::Error for TryAcquireError {}
460
461/// Counting semaphore performing asynchronous permit acquisition.
462///
463/// A semaphore maintains a set of permits. Permits are used to synchronize
464/// access to a shared resource. A semaphore differs from a mutex in that it
465/// can allow more than one concurrent caller to access the shared resource at a
466/// time.
467///
468/// When `acquire` is called and the semaphore has remaining permits, the
469/// function immediately returns a permit. However, if no remaining permits are
470/// available, `acquire` (asynchronously) waits until an outstanding permit is
471/// dropped. At this point, the freed permit is assigned to the caller.
472///
473/// This `Semaphore` is fair, which means that permits are given out in the order
474/// they were requested. This fairness is also applied when `acquire_many` gets
475/// involved, so if a call to `acquire_many` at the front of the queue requests
476/// more permits than currently available, this can prevent a call to `acquire`
477/// from completing, even if the semaphore has enough permits complete the call
478/// to `acquire`.
479///
480/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
481/// utility.
482///
483/// # Examples
484///
485/// Basic usage:
486///
487/// ```
488/// use local_sync::semaphore::{Semaphore, TryAcquireError};
489///
490/// #[monoio::main]
491/// async fn main() {
492///     let semaphore = Semaphore::new(3);
493///
494///     let a_permit = semaphore.acquire().await.unwrap();
495///     let two_permits = semaphore.acquire_many(2).await.unwrap();
496///
497///     assert_eq!(semaphore.available_permits(), 0);
498///
499///     let permit_attempt = semaphore.try_acquire();
500///     assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
501/// }
502/// ```
503///
504/// Use [`Semaphore::acquire_owned`] to move permits across tasks:
505///
506/// ```
507/// use std::rc::Rc;
508/// use local_sync::semaphore::Semaphore;
509///
510/// #[monoio::main]
511/// async fn main() {
512///     let semaphore = Rc::new(Semaphore::new(3));
513///     let mut join_handles = Vec::new();
514///
515///     for _ in 0..5 {
516///         let permit = semaphore.clone().acquire_owned().await.unwrap();
517///         join_handles.push(monoio::spawn(async move {
518///             // perform task...
519///             // explicitly own `permit` in the task
520///             drop(permit);
521///         }));
522///     }
523///
524///     for handle in join_handles {
525///         handle.await;
526///     }
527/// }
528/// ```
529///
530/// [`PollSemaphore`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSemaphore.html
531/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
532#[derive(Debug)]
533pub struct Semaphore(Inner);
534
535/// A permit from the semaphore.
536///
537/// This type is created by the [`acquire`] method.
538///
539/// [`acquire`]: crate::sync::Semaphore::acquire()
540#[must_use]
541#[derive(Debug)]
542pub struct SemaphorePermit<'a> {
543    sem: &'a Semaphore,
544    permits: u32,
545}
546
547/// An owned permit from the semaphore.
548///
549/// This type is created by the [`acquire_owned`] method.
550///
551/// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
552#[must_use]
553#[derive(Debug)]
554pub struct OwnedSemaphorePermit {
555    sem: std::rc::Rc<Semaphore>,
556    permits: u32,
557}
558
559pub struct AcquireResult<'a>(Acquire<'a>, &'a Semaphore, u32);
560
561impl<'a> Future for AcquireResult<'a> {
562    type Output = Result<SemaphorePermit<'a>, AcquireError>;
563
564    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
565        let sem = self.1;
566        let permits = self.2;
567        let inner = unsafe { self.map_unchecked_mut(|me| &mut me.0) };
568        futures_util::ready!(inner.poll(cx))?;
569        Poll::Ready(Ok(SemaphorePermit { sem, permits }))
570    }
571}
572
573impl Semaphore {
574    /// Creates a new semaphore with the initial number of permits.
575    pub const fn new(permits: usize) -> Self {
576        Self(Inner::new(permits))
577    }
578
579    /// Returns the current number of available permits.
580    pub fn available_permits(&self) -> usize {
581        self.0.available_permits()
582    }
583
584    /// Adds `n` new permits to the semaphore.
585    ///
586    /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
587    pub fn add_permits(&self, n: usize) {
588        self.0.release(n);
589    }
590
591    /// Acquires a permit from the semaphore.
592    ///
593    /// If the semaphore has been closed, this returns an [`AcquireError`].
594    /// Otherwise, this returns a [`SemaphorePermit`] representing the
595    /// acquired permit.
596    ///
597    /// # Cancel safety
598    ///
599    /// This method uses a queue to fairly distribute permits in the order they
600    /// were requested. Cancelling a call to `acquire` makes you lose your place
601    /// in the queue.
602    ///
603    /// # Examples
604    ///
605    /// ```
606    /// use local_sync::semaphore::Semaphore;
607    ///
608    /// #[monoio::main]
609    /// async fn main() {
610    ///     let semaphore = Semaphore::new(2);
611    ///
612    ///     let permit_1 = semaphore.acquire().await.unwrap();
613    ///     assert_eq!(semaphore.available_permits(), 1);
614    ///
615    ///     let permit_2 = semaphore.acquire().await.unwrap();
616    ///     assert_eq!(semaphore.available_permits(), 0);
617    ///
618    ///     drop(permit_1);
619    ///     assert_eq!(semaphore.available_permits(), 1);
620    /// }
621    /// ```
622    ///
623    /// [`AcquireError`]: crate::sync::AcquireError
624    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
625    pub fn acquire(&self) -> AcquireResult<'_> {
626        let acq = self.0.acquire(1);
627        AcquireResult(acq, self, 1)
628    }
629
630    /// Acquires `n` permits from the semaphore.
631    ///
632    /// If the semaphore has been closed, this returns an [`AcquireError`].
633    /// Otherwise, this returns a [`SemaphorePermit`] representing the
634    /// acquired permits.
635    ///
636    /// # Cancel safety
637    ///
638    /// This method uses a queue to fairly distribute permits in the order they
639    /// were requested. Cancelling a call to `acquire_many` makes you lose your
640    /// place in the queue.
641    ///
642    /// # Examples
643    ///
644    /// ```
645    /// use local_sync::semaphore::Semaphore;
646    ///
647    /// #[monoio::main]
648    /// async fn main() {
649    ///     let semaphore = Semaphore::new(5);
650    ///
651    ///     let permit = semaphore.acquire_many(3).await.unwrap();
652    ///     assert_eq!(semaphore.available_permits(), 2);
653    /// }
654    /// ```
655    ///
656    /// [`AcquireError`]: crate::sync::AcquireError
657    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
658    pub fn acquire_many(&self, n: u32) -> AcquireResult<'_> {
659        let acq = self.0.acquire(n);
660        AcquireResult(acq, self, n)
661    }
662
663    /// Tries to acquire a permit from the semaphore.
664    ///
665    /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
666    /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
667    /// this returns a [`SemaphorePermit`] representing the acquired permits.
668    ///
669    /// # Examples
670    ///
671    /// ```
672    /// use local_sync::semaphore::{Semaphore, TryAcquireError};
673    ///
674    /// # fn main() {
675    /// let semaphore = Semaphore::new(2);
676    ///
677    /// let permit_1 = semaphore.try_acquire().unwrap();
678    /// assert_eq!(semaphore.available_permits(), 1);
679    ///
680    /// let permit_2 = semaphore.try_acquire().unwrap();
681    /// assert_eq!(semaphore.available_permits(), 0);
682    ///
683    /// let permit_3 = semaphore.try_acquire();
684    /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
685    /// # }
686    /// ```
687    ///
688    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
689    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
690    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
691    pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
692        match self.0.try_acquire(1) {
693            Ok(_) => Ok(SemaphorePermit {
694                sem: self,
695                permits: 1,
696            }),
697            Err(e) => Err(e),
698        }
699    }
700
701    /// Tries to acquire `n` permits from the semaphore.
702    ///
703    /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
704    /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
705    /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
706    ///
707    /// # Examples
708    ///
709    /// ```
710    /// use local_sync::semaphore::{Semaphore, TryAcquireError};
711    ///
712    /// # fn main() {
713    /// let semaphore = Semaphore::new(4);
714    ///
715    /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
716    /// assert_eq!(semaphore.available_permits(), 1);
717    ///
718    /// let permit_2 = semaphore.try_acquire_many(2);
719    /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
720    /// # }
721    /// ```
722    ///
723    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
724    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
725    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
726    pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
727        match self.0.try_acquire(n) {
728            Ok(_) => Ok(SemaphorePermit {
729                sem: self,
730                permits: n,
731            }),
732            Err(e) => Err(e),
733        }
734    }
735
736    /// Acquires a permit from the semaphore.
737    ///
738    /// The semaphore must be wrapped in an [`Rc`] to call this method.
739    /// If the semaphore has been closed, this returns an [`AcquireError`].
740    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
741    /// acquired permit.
742    ///
743    /// # Cancel safety
744    ///
745    /// This method uses a queue to fairly distribute permits in the order they
746    /// were requested. Cancelling a call to `acquire_owned` makes you lose your
747    /// place in the queue.
748    ///
749    /// # Examples
750    ///
751    /// ```
752    /// use std::rc::Rc;
753    /// use local_sync::semaphore::Semaphore;
754    ///
755    /// #[monoio::main]
756    /// async fn main() {
757    ///     let semaphore = Rc::new(Semaphore::new(3));
758    ///     let mut join_handles = Vec::new();
759    ///
760    ///     for _ in 0..5 {
761    ///         let permit = semaphore.clone().acquire_owned().await.unwrap();
762    ///         join_handles.push(monoio::spawn(async move {
763    ///             // perform task...
764    ///             // explicitly own `permit` in the task
765    ///             drop(permit);
766    ///         }));
767    ///     }
768    ///
769    ///     for handle in join_handles {
770    ///         handle.await;
771    ///     }
772    /// }
773    /// ```
774    ///
775    /// [`Rc`]: std::sync::Rc
776    /// [`AcquireError`]: crate::sync::AcquireError
777    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
778    pub async fn acquire_owned(
779        self: std::rc::Rc<Self>,
780    ) -> Result<OwnedSemaphorePermit, AcquireError> {
781        self.0.acquire(1).await?;
782        Ok(OwnedSemaphorePermit {
783            sem: self,
784            permits: 1,
785        })
786    }
787
788    /// Acquires `n` permits from the semaphore.
789    ///
790    /// The semaphore must be wrapped in an [`Rc`] to call this method.
791    /// If the semaphore has been closed, this returns an [`AcquireError`].
792    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
793    /// acquired permit.
794    ///
795    /// # Cancel safety
796    ///
797    /// This method uses a queue to fairly distribute permits in the order they
798    /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
799    /// your place in the queue.
800    ///
801    /// # Examples
802    ///
803    /// ```
804    /// use std::rc::Rc;
805    /// use local_sync::semaphore::Semaphore;
806    ///
807    /// #[monoio::main]
808    /// async fn main() {
809    ///     let semaphore = Rc::new(Semaphore::new(10));
810    ///     let mut join_handles = Vec::new();
811    ///
812    ///     for _ in 0..5 {
813    ///         let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
814    ///         join_handles.push(monoio::spawn(async move {
815    ///             // perform task...
816    ///             // explicitly own `permit` in the task
817    ///             drop(permit);
818    ///         }));
819    ///     }
820    ///
821    ///     for handle in join_handles {
822    ///         handle.await;
823    ///     }
824    /// }
825    /// ```
826    ///
827    /// [`Rc`]: std::sync::Rc
828    /// [`AcquireError`]: crate::sync::AcquireError
829    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
830    pub async fn acquire_many_owned(
831        self: std::rc::Rc<Self>,
832        n: u32,
833    ) -> Result<OwnedSemaphorePermit, AcquireError> {
834        self.0.acquire(n).await?;
835        Ok(OwnedSemaphorePermit {
836            sem: self,
837            permits: n,
838        })
839    }
840
841    /// Tries to acquire a permit from the semaphore.
842    ///
843    /// The semaphore must be wrapped in an [`Rc`] to call this method. If
844    /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
845    /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
846    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
847    /// acquired permit.
848    ///
849    /// # Examples
850    ///
851    /// ```
852    /// use std::rc::Rc;
853    /// use local_sync::semaphore::{Semaphore, TryAcquireError};
854    ///
855    /// # fn main() {
856    /// let semaphore = Rc::new(Semaphore::new(2));
857    ///
858    /// let permit_1 = Rc::clone(&semaphore).try_acquire_owned().unwrap();
859    /// assert_eq!(semaphore.available_permits(), 1);
860    ///
861    /// let permit_2 = Rc::clone(&semaphore).try_acquire_owned().unwrap();
862    /// assert_eq!(semaphore.available_permits(), 0);
863    ///
864    /// let permit_3 = semaphore.try_acquire_owned();
865    /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
866    /// # }
867    /// ```
868    ///
869    /// [`Rc`]: std::sync::Rc
870    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
871    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
872    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
873    pub fn try_acquire_owned(
874        self: std::rc::Rc<Self>,
875    ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
876        match self.0.try_acquire(1) {
877            Ok(_) => Ok(OwnedSemaphorePermit {
878                sem: self,
879                permits: 1,
880            }),
881            Err(e) => Err(e),
882        }
883    }
884
885    /// Tries to acquire `n` permits from the semaphore.
886    ///
887    /// The semaphore must be wrapped in an [`Rc`] to call this method. If
888    /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
889    /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
890    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
891    /// acquired permit.
892    ///
893    /// # Examples
894    ///
895    /// ```
896    /// use std::rc::Rc;
897    /// use local_sync::semaphore::{Semaphore, TryAcquireError};
898    ///
899    /// # fn main() {
900    /// let semaphore = Rc::new(Semaphore::new(4));
901    ///
902    /// let permit_1 = Rc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
903    /// assert_eq!(semaphore.available_permits(), 1);
904    ///
905    /// let permit_2 = semaphore.try_acquire_many_owned(2);
906    /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
907    /// # }
908    /// ```
909    ///
910    /// [`Rc`]: std::sync::Rc
911    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
912    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
913    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
914    pub fn try_acquire_many_owned(
915        self: std::rc::Rc<Self>,
916        n: u32,
917    ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
918        match self.0.try_acquire(n) {
919            Ok(_) => Ok(OwnedSemaphorePermit {
920                sem: self,
921                permits: n,
922            }),
923            Err(e) => Err(e),
924        }
925    }
926
927    /// Closes the semaphore.
928    ///
929    /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
930    ///
931    /// # Examples
932    ///
933    /// ```
934    /// use local_sync::semaphore::{Semaphore, TryAcquireError};
935    /// use std::rc::Rc;
936    ///
937    /// #[monoio::main]
938    /// async fn main() {
939    ///     let semaphore = Rc::new(Semaphore::new(1));
940    ///     let semaphore2 = semaphore.clone();
941    ///
942    ///     monoio::spawn(async move {
943    ///         let permit = semaphore.acquire_many(2).await;
944    ///         assert!(permit.is_err());
945    ///         println!("waiter received error");
946    ///     });
947    ///
948    ///     println!("closing semaphore");
949    ///     semaphore2.close();
950    ///
951    ///     // Cannot obtain more permits
952    ///     assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
953    /// }
954    /// ```
955    pub fn close(&self) {
956        self.0.close();
957    }
958
959    /// Returns true if the semaphore is closed
960    pub fn is_closed(&self) -> bool {
961        self.0.is_closed()
962    }
963}
964
965impl<'a> SemaphorePermit<'a> {
966    /// Forgets the permit **without** releasing it back to the semaphore.
967    /// This can be used to reduce the amount of permits available from a
968    /// semaphore.
969    pub fn forget(mut self) {
970        self.permits = 0;
971    }
972}
973
974impl OwnedSemaphorePermit {
975    /// Forgets the permit **without** releasing it back to the semaphore.
976    /// This can be used to reduce the amount of permits available from a
977    /// semaphore.
978    pub fn forget(mut self) {
979        self.permits = 0;
980    }
981}
982
983impl<'a> Drop for SemaphorePermit<'_> {
984    fn drop(&mut self) {
985        self.sem.add_permits(self.permits as usize);
986    }
987}
988
989impl Drop for OwnedSemaphorePermit {
990    fn drop(&mut self) {
991        self.sem.add_permits(self.permits as usize);
992    }
993}
994
995#[cfg(test)]
996mod tests {
997    use super::{Inner, Semaphore};
998
999    #[monoio::test]
1000    async fn inner_works() {
1001        let s = Inner::new(10);
1002        for _ in 0..10 {
1003            s.acquire(1).await.unwrap();
1004        }
1005    }
1006
1007    #[monoio::test]
1008    async fn inner_release_after_acquire() {
1009        let s = std::rc::Rc::new(Inner::new(0));
1010
1011        let s_move = s.clone();
1012        let join = monoio::spawn(async move {
1013            let _ = s_move.acquire(1).await.unwrap();
1014            let _ = s_move.acquire(1).await.unwrap();
1015        });
1016        s.release(2);
1017        join.await;
1018    }
1019
1020    #[monoio::test]
1021    async fn it_works() {
1022        let s = Semaphore::new(0);
1023        s.add_permits(1);
1024        let _ = s.acquire().await.unwrap();
1025    }
1026}