local-sync 0.0.5

Non-threadsafe data structure for async usage
Documentation
//! Semaphore borrowed from tokio.

#![allow(unused)]

use core::future::Future;
use std::{
    cell::{RefCell, UnsafeCell},
    cmp, fmt,
    marker::PhantomPinned,
    pin::Pin,
    ptr::NonNull,
    task::{Context, Poll, Waker},
};

use crate::{
    linked_list::{self, LinkedList},
    wake_list::WakeList,
};

/// Low level semaphore.
pub(crate) struct Inner {
    waiters: RefCell<Waitlist>,
    /// The current number of available permits in the semaphore.
    permits: RefCell<usize>,
}

struct Waitlist {
    queue: LinkedList<Waiter, <Waiter as linked_list::Link>::Target>,
    closed: bool,
}

/// Error returned from the [`Semaphore::try_acquire`] function.
///
/// [`Semaphore::try_acquire`]: crate::sync::Semaphore::try_acquire
#[derive(Debug, PartialEq)]
pub enum TryAcquireError {
    /// The semaphore has been [closed] and cannot issue new permits.
    ///
    /// [closed]: crate::sync::Semaphore::close
    Closed,

    /// The semaphore has no available permits.
    NoPermits,
}

/// Error returned from the [`Semaphore::acquire`] function.
///
/// An `acquire` operation can only fail if the semaphore has been
/// [closed].
///
/// [closed]: crate::sync::Semaphore::close
/// [`Semaphore::acquire`]: crate::sync::Semaphore::acquire
#[derive(Debug)]
pub struct AcquireError(());

pub(crate) struct Acquire<'a> {
    node: Waiter,
    semaphore: &'a Inner,
    num_permits: u32,
    queued: bool,
}

struct Waiter {
    /// The current state of the waiter.
    ///
    /// This is either the number of remaining permits required by
    /// the waiter, or a flag indicating that the waiter is not yet queued.
    state: RefCell<usize>,

    /// The waker to notify the task awaiting permits.
    ///
    /// # Safety
    ///
    /// This may only be accessed while the wait queue is locked.
    waker: UnsafeCell<Option<Waker>>,

    /// Intrusive linked-list pointers.
    ///
    /// # Safety
    ///
    /// This may only be accessed while the wait queue is locked.
    pointers: linked_list::Pointers<Waiter>,

    /// Should not be `Unpin`.
    _p: PhantomPinned,
}

impl Waiter {
    fn new(num_permits: u32) -> Self {
        Waiter {
            waker: UnsafeCell::new(None),
            state: RefCell::new(num_permits as usize),
            pointers: linked_list::Pointers::new(),
            _p: PhantomPinned,
        }
    }

    /// Assign permits to the waiter.
    ///
    /// Returns `true` if the waiter should be removed from the queue
    fn assign_permits(&self, n: &mut usize) -> bool {
        let mut curr = self.state.borrow_mut();
        let assign = cmp::min(*curr, *n);
        *curr -= assign;
        *n -= assign;

        *curr == 0
    }
}

unsafe impl linked_list::Link for Waiter {
    // XXX: ideally, we would be able to use `Pin` here, to enforce the
    // invariant that list entries may not move while in the list. However, we
    // can't do this currently, as using `Pin<&'a mut Waiter>` as the `Handle`
    // type would require `Semaphore` to be generic over a lifetime. We can't
    // use `Pin<*mut Waiter>`, as raw pointers are `Unpin` regardless of whether
    // or not they dereference to an `!Unpin` target.
    type Handle = NonNull<Waiter>;
    type Target = Waiter;

    fn as_raw(handle: &Self::Handle) -> NonNull<Waiter> {
        *handle
    }

    unsafe fn from_raw(ptr: NonNull<Waiter>) -> NonNull<Waiter> {
        ptr
    }

    unsafe fn pointers(mut target: NonNull<Waiter>) -> NonNull<linked_list::Pointers<Waiter>> {
        NonNull::from(&mut target.as_mut().pointers)
    }
}

impl Inner {
    /// The maximum number of permits which a semaphore can hold.
    ///
    /// Note that this reserves three bits of flags in the permit counter, but
    /// we only actually use one of them. However, the previous semaphore
    /// implementation used three bits, so we will continue to reserve them to
    /// avoid a breaking change if additional flags need to be added in the
    /// future.
    pub(crate) const MAX_PERMITS: usize = std::usize::MAX >> 3;
    const CLOSED: usize = 1;
    // The least-significant bit in the number of permits is reserved to use
    // as a flag indicating that the semaphore has been closed. Consequently
    // PERMIT_SHIFT is used to leave that bit for that purpose.
    const PERMIT_SHIFT: usize = 1;

    /// Creates a new semaphore with the initial number of permits
    ///
    /// Maximum number of permits on 32-bit platforms is `1<<29`.
    pub(crate) const fn new(mut permits: usize) -> Self {
        permits &= Self::MAX_PERMITS;

        Self {
            permits: RefCell::new(permits << Self::PERMIT_SHIFT),
            waiters: RefCell::new(Waitlist {
                queue: LinkedList::new(),
                closed: false,
            }),
        }
    }

    /// Returns the current number of available permits
    pub(crate) fn available_permits(&self) -> usize {
        *self.permits.borrow() >> Self::PERMIT_SHIFT
    }

    /// Adds `added` new permits to the semaphore.
    ///
    /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
    pub(crate) fn release(&self, added: usize) {
        if added == 0 {
            return;
        }

        // Assign permits to the wait queue
        self.add_permits(added);
    }

    /// Closes the semaphore. This prevents the semaphore from issuing new
    /// permits and notifies all pending waiters.
    pub(crate) fn close(&self) {
        *self.permits.borrow_mut() |= Self::CLOSED;
        (*self.waiters.borrow_mut()).closed = true;

        let mut waiters = self.waiters.borrow_mut();

        while let Some(mut waiter) = waiters.queue.pop_back() {
            let waker = unsafe { (*waiter.as_mut().waker.get()).take() };
            if let Some(waker) = waker {
                waker.wake();
            }
        }
    }

    /// Returns true if the semaphore is closed
    pub(crate) fn is_closed(&self) -> bool {
        *self.permits.borrow() & Self::CLOSED != 0
    }

    pub(crate) fn try_acquire(&self, num_permits: u32) -> Result<(), TryAcquireError> {
        assert!(
            num_permits as usize <= Self::MAX_PERMITS,
            "a semaphore may not have more than MAX_PERMITS permits ({})",
            Self::MAX_PERMITS
        );
        let num_permits = (num_permits as usize) << Self::PERMIT_SHIFT;
        let mut curr = self.permits.borrow_mut();

        // Has the semaphore closed?
        if *curr & Self::CLOSED == Self::CLOSED {
            return Err(TryAcquireError::Closed);
        }

        // Are there enough permits remaining?
        if *curr < num_permits {
            return Err(TryAcquireError::NoPermits);
        }

        *curr -= num_permits;
        Ok(())
    }

    pub(crate) fn acquire(&self, num_permits: u32) -> Acquire<'_> {
        Acquire::new(self, num_permits)
    }

    /// Release `rem` permits to the semaphore's wait list, starting from the
    /// end of the queue.
    ///
    /// If `rem` exceeds the number of permits needed by the wait list, the
    /// remainder are assigned back to the semaphore.
    fn add_permits(&self, mut rem: usize) {
        let mut waiters = self.waiters.borrow_mut();
        let mut wakers = WakeList::new();
        let mut is_empty = false;
        while rem > 0 {
            'inner: while wakers.can_push() {
                // Was the waiter assigned enough permits to wake it?
                match waiters.queue.last() {
                    Some(waiter) => {
                        if !waiter.assign_permits(&mut rem) {
                            break 'inner;
                        }
                    }
                    None => {
                        is_empty = true;
                        // If we assigned permits to all the waiters in the queue, and there are
                        // still permits left over, assign them back to the semaphore.
                        break 'inner;
                    }
                };
                let mut waiter = waiters.queue.pop_back().unwrap();
                if let Some(waker) = unsafe { (*waiter.as_mut().waker.get()).take() } {
                    wakers.push(waker);
                }
            }

            if rem > 0 && is_empty {
                let permits = rem;
                assert!(
                    permits <= Self::MAX_PERMITS,
                    "cannot add more than MAX_PERMITS permits ({})",
                    Self::MAX_PERMITS
                );
                *self.permits.borrow_mut() += rem << Self::PERMIT_SHIFT;
                rem = 0;
            }

            wakers.wake_all();
        }

        assert_eq!(rem, 0);
    }

    fn poll_acquire(
        &self,
        cx: &mut Context<'_>,
        num_permits: u32,
        node: Pin<&mut Waiter>,
        queued: bool,
    ) -> Poll<Result<(), AcquireError>> {
        let needed = if queued {
            *node.state.borrow() << Self::PERMIT_SHIFT
        } else {
            (num_permits as usize) << Self::PERMIT_SHIFT
        };

        let mut curr = self.permits.borrow_mut();

        // If closed, return error immediately.
        if *curr & Self::CLOSED > 0 {
            return Poll::Ready(Err(AcquireError::closed()));
        }
        // If the current permits is enough and not queued, assign permit
        // and return ok immediately.
        if *curr >= needed && !queued {
            *curr -= needed;
            return Poll::Ready(Ok(()));
        }

        // Clear permits and assign it.
        let mut permits = *curr >> Self::PERMIT_SHIFT;
        *curr = 0;
        drop(curr);
        if node.assign_permits(&mut permits) {
            // TODO: may never be here?
            self.add_permits(permits);
            return Poll::Ready(Ok(()));
        }

        // Replace waker if needed.
        let waker = unsafe { &mut *node.waker.get() };
        // Do we need to register the new waker?
        if waker
            .as_ref()
            .map(|waker| !waker.will_wake(cx.waker()))
            .unwrap_or(true)
        {
            *waker = Some(cx.waker().clone());
        }

        // If the waiter is not already in the wait queue, enqueue it.
        if !queued {
            let node = unsafe {
                let node = Pin::into_inner_unchecked(node) as *mut _;
                NonNull::new_unchecked(node)
            };

            self.waiters.borrow_mut().queue.push_front(node);
        }

        Poll::Pending
    }
}

impl fmt::Debug for Inner {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("Semaphore")
            .field("permits", &self.available_permits())
            .finish()
    }
}

impl Future for Acquire<'_> {
    type Output = Result<(), AcquireError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let (node, semaphore, needed, queued) = self.project();

        match semaphore.poll_acquire(cx, needed, node, *queued) {
            Poll::Pending => {
                *queued = true;
                Poll::Pending
            }
            Poll::Ready(r) => {
                r?;
                *queued = false;
                Poll::Ready(Ok(()))
            }
        }
    }
}

impl<'a> Acquire<'a> {
    fn new(semaphore: &'a Inner, num_permits: u32) -> Self {
        Self {
            node: Waiter::new(num_permits),
            semaphore,
            num_permits,
            queued: false,
        }
    }

    fn project(self: Pin<&mut Self>) -> (Pin<&mut Waiter>, &Inner, u32, &mut bool) {
        fn is_unpin<T: Unpin>() {}
        unsafe {
            // Safety: all fields other than `node` are `Unpin`

            is_unpin::<&Inner>();
            is_unpin::<&mut bool>();
            is_unpin::<u32>();

            let this = self.get_unchecked_mut();
            (
                Pin::new_unchecked(&mut this.node),
                this.semaphore,
                this.num_permits,
                &mut this.queued,
            )
        }
    }
}

impl Drop for Acquire<'_> {
    fn drop(&mut self) {
        // If the future is completed, there is no node in the wait list, so we
        // can skip acquiring the lock.
        if !self.queued {
            return;
        }

        // This is where we ensure safety. The future is being dropped,
        // which means we must ensure that the waiter entry is no longer stored
        // in the linked list.
        let mut waiters = self.semaphore.waiters.borrow_mut();

        // remove the entry from the list
        let node = NonNull::from(&mut self.node);
        // Safety: we have locked the wait list.
        unsafe { waiters.queue.remove(node) };

        let acquired_permits = self.num_permits as usize - *self.node.state.borrow();
        if acquired_permits > 0 {
            self.semaphore.add_permits(acquired_permits);
        }
    }
}

impl AcquireError {
    fn closed() -> AcquireError {
        AcquireError(())
    }
}

impl fmt::Display for AcquireError {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(fmt, "semaphore closed")
    }
}

impl std::error::Error for AcquireError {}

impl TryAcquireError {
    /// Returns `true` if the error was caused by a closed semaphore.
    #[allow(dead_code)] // may be used later!
    pub(crate) fn is_closed(&self) -> bool {
        matches!(self, TryAcquireError::Closed)
    }

    /// Returns `true` if the error was caused by calling `try_acquire` on a
    /// semaphore with no available permits.
    #[allow(dead_code)] // may be used later!
    pub(crate) fn is_no_permits(&self) -> bool {
        matches!(self, TryAcquireError::NoPermits)
    }
}

impl fmt::Display for TryAcquireError {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            TryAcquireError::Closed => write!(fmt, "semaphore closed"),
            TryAcquireError::NoPermits => write!(fmt, "no permits available"),
        }
    }
}

impl std::error::Error for TryAcquireError {}

/// Counting semaphore performing asynchronous permit acquisition.
///
/// A semaphore maintains a set of permits. Permits are used to synchronize
/// access to a shared resource. A semaphore differs from a mutex in that it
/// can allow more than one concurrent caller to access the shared resource at a
/// time.
///
/// When `acquire` is called and the semaphore has remaining permits, the
/// function immediately returns a permit. However, if no remaining permits are
/// available, `acquire` (asynchronously) waits until an outstanding permit is
/// dropped. At this point, the freed permit is assigned to the caller.
///
/// This `Semaphore` is fair, which means that permits are given out in the order
/// they were requested. This fairness is also applied when `acquire_many` gets
/// involved, so if a call to `acquire_many` at the front of the queue requests
/// more permits than currently available, this can prevent a call to `acquire`
/// from completing, even if the semaphore has enough permits complete the call
/// to `acquire`.
///
/// To use the `Semaphore` in a poll function, you can use the [`PollSemaphore`]
/// utility.
///
/// # Examples
///
/// Basic usage:
///
/// ```
/// use local_sync::semaphore::{Semaphore, TryAcquireError};
///
/// #[monoio::main]
/// async fn main() {
///     let semaphore = Semaphore::new(3);
///
///     let a_permit = semaphore.acquire().await.unwrap();
///     let two_permits = semaphore.acquire_many(2).await.unwrap();
///
///     assert_eq!(semaphore.available_permits(), 0);
///
///     let permit_attempt = semaphore.try_acquire();
///     assert_eq!(permit_attempt.err(), Some(TryAcquireError::NoPermits));
/// }
/// ```
///
/// Use [`Semaphore::acquire_owned`] to move permits across tasks:
///
/// ```
/// use std::rc::Rc;
/// use local_sync::semaphore::Semaphore;
///
/// #[monoio::main]
/// async fn main() {
///     let semaphore = Rc::new(Semaphore::new(3));
///     let mut join_handles = Vec::new();
///
///     for _ in 0..5 {
///         let permit = semaphore.clone().acquire_owned().await.unwrap();
///         join_handles.push(monoio::spawn(async move {
///             // perform task...
///             // explicitly own `permit` in the task
///             drop(permit);
///         }));
///     }
///
///     for handle in join_handles {
///         handle.await;
///     }
/// }
/// ```
///
/// [`PollSemaphore`]: https://docs.rs/tokio-util/0.6/tokio_util/sync/struct.PollSemaphore.html
/// [`Semaphore::acquire_owned`]: crate::sync::Semaphore::acquire_owned
#[derive(Debug)]
pub struct Semaphore(Inner);

/// A permit from the semaphore.
///
/// This type is created by the [`acquire`] method.
///
/// [`acquire`]: crate::sync::Semaphore::acquire()
#[must_use]
#[derive(Debug)]
pub struct SemaphorePermit<'a> {
    sem: &'a Semaphore,
    permits: u32,
}

/// An owned permit from the semaphore.
///
/// This type is created by the [`acquire_owned`] method.
///
/// [`acquire_owned`]: crate::sync::Semaphore::acquire_owned()
#[must_use]
#[derive(Debug)]
pub struct OwnedSemaphorePermit {
    sem: std::rc::Rc<Semaphore>,
    permits: u32,
}

pub struct AcquireResult<'a>(Acquire<'a>, &'a Semaphore, u32);

impl<'a> Future for AcquireResult<'a> {
    type Output = Result<SemaphorePermit<'a>, AcquireError>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let sem = self.1;
        let permits = self.2;
        let inner = unsafe { self.map_unchecked_mut(|me| &mut me.0) };
        futures_util::ready!(inner.poll(cx))?;
        Poll::Ready(Ok(SemaphorePermit { sem, permits }))
    }
}

impl Semaphore {
    /// Creates a new semaphore with the initial number of permits.
    pub const fn new(permits: usize) -> Self {
        Self(Inner::new(permits))
    }

    /// Returns the current number of available permits.
    pub fn available_permits(&self) -> usize {
        self.0.available_permits()
    }

    /// Adds `n` new permits to the semaphore.
    ///
    /// The maximum number of permits is `usize::MAX >> 3`, and this function will panic if the limit is exceeded.
    pub fn add_permits(&self, n: usize) {
        self.0.release(n);
    }

    /// Acquires a permit from the semaphore.
    ///
    /// If the semaphore has been closed, this returns an [`AcquireError`].
    /// Otherwise, this returns a [`SemaphorePermit`] representing the
    /// acquired permit.
    ///
    /// # Cancel safety
    ///
    /// This method uses a queue to fairly distribute permits in the order they
    /// were requested. Cancelling a call to `acquire` makes you lose your place
    /// in the queue.
    ///
    /// # Examples
    ///
    /// ```
    /// use local_sync::semaphore::Semaphore;
    ///
    /// #[monoio::main]
    /// async fn main() {
    ///     let semaphore = Semaphore::new(2);
    ///
    ///     let permit_1 = semaphore.acquire().await.unwrap();
    ///     assert_eq!(semaphore.available_permits(), 1);
    ///
    ///     let permit_2 = semaphore.acquire().await.unwrap();
    ///     assert_eq!(semaphore.available_permits(), 0);
    ///
    ///     drop(permit_1);
    ///     assert_eq!(semaphore.available_permits(), 1);
    /// }
    /// ```
    ///
    /// [`AcquireError`]: crate::sync::AcquireError
    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
    pub fn acquire(&self) -> AcquireResult<'_> {
        let acq = self.0.acquire(1);
        AcquireResult(acq, self, 1)
    }

    /// Acquires `n` permits from the semaphore.
    ///
    /// If the semaphore has been closed, this returns an [`AcquireError`].
    /// Otherwise, this returns a [`SemaphorePermit`] representing the
    /// acquired permits.
    ///
    /// # Cancel safety
    ///
    /// This method uses a queue to fairly distribute permits in the order they
    /// were requested. Cancelling a call to `acquire_many` makes you lose your
    /// place in the queue.
    ///
    /// # Examples
    ///
    /// ```
    /// use local_sync::semaphore::Semaphore;
    ///
    /// #[monoio::main]
    /// async fn main() {
    ///     let semaphore = Semaphore::new(5);
    ///
    ///     let permit = semaphore.acquire_many(3).await.unwrap();
    ///     assert_eq!(semaphore.available_permits(), 2);
    /// }
    /// ```
    ///
    /// [`AcquireError`]: crate::sync::AcquireError
    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
    pub fn acquire_many(&self, n: u32) -> AcquireResult<'_> {
        let acq = self.0.acquire(n);
        AcquireResult(acq, self, n)
    }

    /// Tries to acquire a permit from the semaphore.
    ///
    /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
    /// and a [`TryAcquireError::NoPermits`] if there are no permits left. Otherwise,
    /// this returns a [`SemaphorePermit`] representing the acquired permits.
    ///
    /// # Examples
    ///
    /// ```
    /// use local_sync::semaphore::{Semaphore, TryAcquireError};
    ///
    /// # fn main() {
    /// let semaphore = Semaphore::new(2);
    ///
    /// let permit_1 = semaphore.try_acquire().unwrap();
    /// assert_eq!(semaphore.available_permits(), 1);
    ///
    /// let permit_2 = semaphore.try_acquire().unwrap();
    /// assert_eq!(semaphore.available_permits(), 0);
    ///
    /// let permit_3 = semaphore.try_acquire();
    /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
    /// # }
    /// ```
    ///
    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
    pub fn try_acquire(&self) -> Result<SemaphorePermit<'_>, TryAcquireError> {
        match self.0.try_acquire(1) {
            Ok(_) => Ok(SemaphorePermit {
                sem: self,
                permits: 1,
            }),
            Err(e) => Err(e),
        }
    }

    /// Tries to acquire `n` permits from the semaphore.
    ///
    /// If the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
    /// and a [`TryAcquireError::NoPermits`] if there are not enough permits left.
    /// Otherwise, this returns a [`SemaphorePermit`] representing the acquired permits.
    ///
    /// # Examples
    ///
    /// ```
    /// use local_sync::semaphore::{Semaphore, TryAcquireError};
    ///
    /// # fn main() {
    /// let semaphore = Semaphore::new(4);
    ///
    /// let permit_1 = semaphore.try_acquire_many(3).unwrap();
    /// assert_eq!(semaphore.available_permits(), 1);
    ///
    /// let permit_2 = semaphore.try_acquire_many(2);
    /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
    /// # }
    /// ```
    ///
    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
    /// [`SemaphorePermit`]: crate::sync::SemaphorePermit
    pub fn try_acquire_many(&self, n: u32) -> Result<SemaphorePermit<'_>, TryAcquireError> {
        match self.0.try_acquire(n) {
            Ok(_) => Ok(SemaphorePermit {
                sem: self,
                permits: n,
            }),
            Err(e) => Err(e),
        }
    }

    /// Acquires a permit from the semaphore.
    ///
    /// The semaphore must be wrapped in an [`Rc`] to call this method.
    /// If the semaphore has been closed, this returns an [`AcquireError`].
    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
    /// acquired permit.
    ///
    /// # Cancel safety
    ///
    /// This method uses a queue to fairly distribute permits in the order they
    /// were requested. Cancelling a call to `acquire_owned` makes you lose your
    /// place in the queue.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::rc::Rc;
    /// use local_sync::semaphore::Semaphore;
    ///
    /// #[monoio::main]
    /// async fn main() {
    ///     let semaphore = Rc::new(Semaphore::new(3));
    ///     let mut join_handles = Vec::new();
    ///
    ///     for _ in 0..5 {
    ///         let permit = semaphore.clone().acquire_owned().await.unwrap();
    ///         join_handles.push(monoio::spawn(async move {
    ///             // perform task...
    ///             // explicitly own `permit` in the task
    ///             drop(permit);
    ///         }));
    ///     }
    ///
    ///     for handle in join_handles {
    ///         handle.await;
    ///     }
    /// }
    /// ```
    ///
    /// [`Rc`]: std::sync::Rc
    /// [`AcquireError`]: crate::sync::AcquireError
    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
    pub async fn acquire_owned(
        self: std::rc::Rc<Self>,
    ) -> Result<OwnedSemaphorePermit, AcquireError> {
        self.0.acquire(1).await?;
        Ok(OwnedSemaphorePermit {
            sem: self,
            permits: 1,
        })
    }

    /// Acquires `n` permits from the semaphore.
    ///
    /// The semaphore must be wrapped in an [`Rc`] to call this method.
    /// If the semaphore has been closed, this returns an [`AcquireError`].
    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
    /// acquired permit.
    ///
    /// # Cancel safety
    ///
    /// This method uses a queue to fairly distribute permits in the order they
    /// were requested. Cancelling a call to `acquire_many_owned` makes you lose
    /// your place in the queue.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::rc::Rc;
    /// use local_sync::semaphore::Semaphore;
    ///
    /// #[monoio::main]
    /// async fn main() {
    ///     let semaphore = Rc::new(Semaphore::new(10));
    ///     let mut join_handles = Vec::new();
    ///
    ///     for _ in 0..5 {
    ///         let permit = semaphore.clone().acquire_many_owned(2).await.unwrap();
    ///         join_handles.push(monoio::spawn(async move {
    ///             // perform task...
    ///             // explicitly own `permit` in the task
    ///             drop(permit);
    ///         }));
    ///     }
    ///
    ///     for handle in join_handles {
    ///         handle.await;
    ///     }
    /// }
    /// ```
    ///
    /// [`Rc`]: std::sync::Rc
    /// [`AcquireError`]: crate::sync::AcquireError
    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
    pub async fn acquire_many_owned(
        self: std::rc::Rc<Self>,
        n: u32,
    ) -> Result<OwnedSemaphorePermit, AcquireError> {
        self.0.acquire(n).await?;
        Ok(OwnedSemaphorePermit {
            sem: self,
            permits: n,
        })
    }

    /// Tries to acquire a permit from the semaphore.
    ///
    /// The semaphore must be wrapped in an [`Rc`] to call this method. If
    /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
    /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
    /// acquired permit.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::rc::Rc;
    /// use local_sync::semaphore::{Semaphore, TryAcquireError};
    ///
    /// # fn main() {
    /// let semaphore = Rc::new(Semaphore::new(2));
    ///
    /// let permit_1 = Rc::clone(&semaphore).try_acquire_owned().unwrap();
    /// assert_eq!(semaphore.available_permits(), 1);
    ///
    /// let permit_2 = Rc::clone(&semaphore).try_acquire_owned().unwrap();
    /// assert_eq!(semaphore.available_permits(), 0);
    ///
    /// let permit_3 = semaphore.try_acquire_owned();
    /// assert_eq!(permit_3.err(), Some(TryAcquireError::NoPermits));
    /// # }
    /// ```
    ///
    /// [`Rc`]: std::sync::Rc
    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
    pub fn try_acquire_owned(
        self: std::rc::Rc<Self>,
    ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
        match self.0.try_acquire(1) {
            Ok(_) => Ok(OwnedSemaphorePermit {
                sem: self,
                permits: 1,
            }),
            Err(e) => Err(e),
        }
    }

    /// Tries to acquire `n` permits from the semaphore.
    ///
    /// The semaphore must be wrapped in an [`Rc`] to call this method. If
    /// the semaphore has been closed, this returns a [`TryAcquireError::Closed`]
    /// and a [`TryAcquireError::NoPermits`] if there are no permits left.
    /// Otherwise, this returns a [`OwnedSemaphorePermit`] representing the
    /// acquired permit.
    ///
    /// # Examples
    ///
    /// ```
    /// use std::rc::Rc;
    /// use local_sync::semaphore::{Semaphore, TryAcquireError};
    ///
    /// # fn main() {
    /// let semaphore = Rc::new(Semaphore::new(4));
    ///
    /// let permit_1 = Rc::clone(&semaphore).try_acquire_many_owned(3).unwrap();
    /// assert_eq!(semaphore.available_permits(), 1);
    ///
    /// let permit_2 = semaphore.try_acquire_many_owned(2);
    /// assert_eq!(permit_2.err(), Some(TryAcquireError::NoPermits));
    /// # }
    /// ```
    ///
    /// [`Rc`]: std::sync::Rc
    /// [`TryAcquireError::Closed`]: crate::sync::TryAcquireError::Closed
    /// [`TryAcquireError::NoPermits`]: crate::sync::TryAcquireError::NoPermits
    /// [`OwnedSemaphorePermit`]: crate::sync::OwnedSemaphorePermit
    pub fn try_acquire_many_owned(
        self: std::rc::Rc<Self>,
        n: u32,
    ) -> Result<OwnedSemaphorePermit, TryAcquireError> {
        match self.0.try_acquire(n) {
            Ok(_) => Ok(OwnedSemaphorePermit {
                sem: self,
                permits: n,
            }),
            Err(e) => Err(e),
        }
    }

    /// Closes the semaphore.
    ///
    /// This prevents the semaphore from issuing new permits and notifies all pending waiters.
    ///
    /// # Examples
    ///
    /// ```
    /// use local_sync::semaphore::{Semaphore, TryAcquireError};
    /// use std::rc::Rc;
    ///
    /// #[monoio::main]
    /// async fn main() {
    ///     let semaphore = Rc::new(Semaphore::new(1));
    ///     let semaphore2 = semaphore.clone();
    ///
    ///     monoio::spawn(async move {
    ///         let permit = semaphore.acquire_many(2).await;
    ///         assert!(permit.is_err());
    ///         println!("waiter received error");
    ///     });
    ///
    ///     println!("closing semaphore");
    ///     semaphore2.close();
    ///
    ///     // Cannot obtain more permits
    ///     assert_eq!(semaphore2.try_acquire().err(), Some(TryAcquireError::Closed))
    /// }
    /// ```
    pub fn close(&self) {
        self.0.close();
    }

    /// Returns true if the semaphore is closed
    pub fn is_closed(&self) -> bool {
        self.0.is_closed()
    }
}

impl<'a> SemaphorePermit<'a> {
    /// Forgets the permit **without** releasing it back to the semaphore.
    /// This can be used to reduce the amount of permits available from a
    /// semaphore.
    pub fn forget(mut self) {
        self.permits = 0;
    }
}

impl OwnedSemaphorePermit {
    /// Forgets the permit **without** releasing it back to the semaphore.
    /// This can be used to reduce the amount of permits available from a
    /// semaphore.
    pub fn forget(mut self) {
        self.permits = 0;
    }
}

impl<'a> Drop for SemaphorePermit<'_> {
    fn drop(&mut self) {
        self.sem.add_permits(self.permits as usize);
    }
}

impl Drop for OwnedSemaphorePermit {
    fn drop(&mut self) {
        self.sem.add_permits(self.permits as usize);
    }
}

#[cfg(test)]
mod tests {
    use super::{Inner, Semaphore};

    #[monoio::test]
    async fn inner_works() {
        let s = Inner::new(10);
        for _ in 0..10 {
            s.acquire(1).await.unwrap();
        }
    }

    #[monoio::test]
    async fn inner_release_after_acquire() {
        let s = std::rc::Rc::new(Inner::new(0));

        let s_move = s.clone();
        let join = monoio::spawn(async move {
            let _ = s_move.acquire(1).await.unwrap();
            let _ = s_move.acquire(1).await.unwrap();
        });
        s.release(2);
        join.await;
    }

    #[monoio::test]
    async fn it_works() {
        let s = Semaphore::new(0);
        s.add_permits(1);
        let _ = s.acquire().await.unwrap();
    }
}