tokio 0.2.12

An event-driven, non-blocking I/O platform for writing asynchronous I/O backed applications.
Documentation
#![cfg_attr(not(feature = "sync"), allow(dead_code, unreachable_pub))]

//! Thread-safe, asynchronous counting semaphore.
//!
//! A `Semaphore` instance holds a set of permits. Permits are used to
//! synchronize access to a shared resource.
//!
//! Before accessing the shared resource, callers acquire a permit from the
//! semaphore. Once the permit is acquired, the caller then enters the critical
//! section. If no permits are available, then acquiring the semaphore returns
//! `Pending`. The task is woken once a permit becomes available.

use crate::loom::cell::CausalCell;
use crate::loom::future::AtomicWaker;
use crate::loom::sync::atomic::{AtomicPtr, AtomicUsize};
use crate::loom::thread;

use std::cmp;
use std::fmt;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use std::usize;

/// Futures-aware semaphore.
pub(crate) struct Semaphore {
    /// Tracks both the waiter queue tail pointer and the number of remaining
    /// permits.
    state: AtomicUsize,

    /// waiter queue head pointer.
    head: CausalCell<NonNull<Waiter>>,

    /// Coordinates access to the queue head.
    rx_lock: AtomicUsize,

    /// Stub waiter node used as part of the MPSC channel algorithm.
    stub: Box<Waiter>,
}

/// A semaphore permit
///
/// Tracks the lifecycle of a semaphore permit.
///
/// An instance of `Permit` is intended to be used with a **single** instance of
/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore
/// instances will result in unexpected behavior.
///
/// `Permit` does **not** release the permit back to the semaphore on drop. It
/// is the user's responsibility to ensure that `Permit::release` is called
/// before dropping the permit.
#[derive(Debug)]
pub(crate) struct Permit {
    waiter: Option<Box<Waiter>>,
    state: PermitState,
}

/// Error returned by `Permit::poll_acquire`.
#[derive(Debug)]
pub(crate) struct AcquireError(());

/// Error returned by `Permit::try_acquire`.
#[derive(Debug)]
pub(crate) enum TryAcquireError {
    Closed,
    NoPermits,
}

/// Node used to notify the semaphore waiter when permit is available.
#[derive(Debug)]
struct Waiter {
    /// Stores waiter state.
    ///
    /// See `WaiterState` for more details.
    state: AtomicUsize,

    /// Task to wake when a permit is made available.
    waker: AtomicWaker,

    /// Next pointer in the queue of waiting senders.
    next: AtomicPtr<Waiter>,
}

/// Semaphore state
///
/// The 2 low bits track the modes.
///
/// - Closed
/// - Full
///
/// When not full, the rest of the `usize` tracks the total number of messages
/// in the channel. When full, the rest of the `usize` is a pointer to the tail
/// of the "waiting senders" queue.
#[derive(Copy, Clone)]
struct SemState(usize);

/// Permit state
#[derive(Debug, Copy, Clone)]
enum PermitState {
    /// Currently waiting for permits to be made available and assigned to the
    /// waiter.
    Waiting(u16),

    /// The number of acquired permits
    Acquired(u16),
}

/// State for an individual waker node
#[derive(Debug, Copy, Clone)]
struct WaiterState(usize);

/// Waiter node is in the semaphore queue
const QUEUED: usize = 0b001;

/// Semaphore has been closed, no more permits will be issued.
const CLOSED: usize = 0b10;

/// The permit that owns the `Waiter` dropped.
const DROPPED: usize = 0b100;

/// Represents "one requested permit" in the waiter state
const PERMIT_ONE: usize = 0b1000;

/// Masks the waiter state to only contain bits tracking number of requested
/// permits.
const PERMIT_MASK: usize = usize::MAX - (PERMIT_ONE - 1);

/// How much to shift a permit count to pack it into the waker state
const PERMIT_SHIFT: u32 = PERMIT_ONE.trailing_zeros();

/// Flag differentiating between available permits and waiter pointers.
///
/// If we assume pointers are properly aligned, then the least significant bit
/// will always be zero. So, we use that bit to track if the value represents a
/// number.
const NUM_FLAG: usize = 0b01;

/// Signal the semaphore is closed
const CLOSED_FLAG: usize = 0b10;

/// Maximum number of permits a semaphore can manage
const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT;

/// When representing "numbers", the state has to be shifted this much (to get
/// rid of the flag bit).
const NUM_SHIFT: usize = 2;

// ===== impl Semaphore =====

impl Semaphore {
    /// Creates a new semaphore with the initial number of permits
    ///
    /// # Panics
    ///
    /// Panics if `permits` is zero.
    pub(crate) fn new(permits: usize) -> Semaphore {
        let stub = Box::new(Waiter::new());
        let ptr = NonNull::from(&*stub);

        // Allocations are aligned
        debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0);

        let state = SemState::new(permits, &stub);

        Semaphore {
            state: AtomicUsize::new(state.to_usize()),
            head: CausalCell::new(ptr),
            rx_lock: AtomicUsize::new(0),
            stub,
        }
    }

    /// Returns the current number of available permits
    pub(crate) fn available_permits(&self) -> usize {
        let curr = SemState(self.state.load(Acquire));
        curr.available_permits()
    }

    /// Tries to acquire the requested number of permits, registering the waiter
    /// if not enough permits are available.
    fn poll_acquire(
        &self,
        cx: &mut Context<'_>,
        num_permits: u16,
        permit: &mut Permit,
    ) -> Poll<Result<(), AcquireError>> {
        self.poll_acquire2(num_permits, || {
            let waiter = permit.waiter.get_or_insert_with(|| Box::new(Waiter::new()));

            waiter.waker.register_by_ref(cx.waker());

            Some(NonNull::from(&**waiter))
        })
    }

    fn try_acquire(&self, num_permits: u16) -> Result<(), TryAcquireError> {
        match self.poll_acquire2(num_permits, || None) {
            Poll::Ready(res) => res.map_err(to_try_acquire),
            Poll::Pending => Err(TryAcquireError::NoPermits),
        }
    }

    /// Polls for a permit
    ///
    /// Tries to acquire available permits first. If unable to acquire a
    /// sufficient number of permits, the caller's waiter is pushed onto the
    /// semaphore's wait queue.
    fn poll_acquire2<F>(
        &self,
        num_permits: u16,
        mut get_waiter: F,
    ) -> Poll<Result<(), AcquireError>>
    where
        F: FnMut() -> Option<NonNull<Waiter>>,
    {
        let num_permits = num_permits as usize;

        // Load the current state
        let mut curr = SemState(self.state.load(Acquire));

        // Saves a ref to the waiter node
        let mut maybe_waiter: Option<NonNull<Waiter>> = None;

        /// Used in branches where we attempt to push the waiter into the wait
        /// queue but fail due to permits becoming available or the wait queue
        /// transitioning to "closed". In this case, the waiter must be
        /// transitioned back to the "idle" state.
        macro_rules! revert_to_idle {
            () => {
                if let Some(waiter) = maybe_waiter {
                    unsafe { waiter.as_ref() }.revert_to_idle();
                }
            };
        }

        loop {
            let mut next = curr;

            if curr.is_closed() {
                revert_to_idle!();
                return Ready(Err(AcquireError::closed()));
            }

            let acquired = next.acquire_permits(num_permits, &self.stub);

            if !acquired {
                // There are not enough available permits to satisfy the
                // request. The permit transitions to a waiting state.
                debug_assert!(curr.waiter().is_some() || curr.available_permits() < num_permits);

                if let Some(waiter) = maybe_waiter.as_ref() {
                    // Safety: the caller owns the waiter.
                    let w = unsafe { waiter.as_ref() };
                    w.set_permits_to_acquire(num_permits - curr.available_permits());
                } else {
                    // Get the waiter for the permit.
                    if let Some(waiter) = get_waiter() {
                        // Safety: the caller owns the waiter.
                        let w = unsafe { waiter.as_ref() };

                        // If there are any currently available permits, the
                        // waiter acquires those immediately and waits for the
                        // remaining permits to become available.
                        if !w.to_queued(num_permits - curr.available_permits()) {
                            // The node is alrady queued, there is no further work
                            // to do.
                            return Pending;
                        }

                        maybe_waiter = Some(waiter);
                    } else {
                        // No waiter, this indicates the caller does not wish to
                        // "wait", so there is nothing left to do.
                        return Pending;
                    }
                }

                next.set_waiter(maybe_waiter.unwrap());
            }

            debug_assert_ne!(curr.0, 0);
            debug_assert_ne!(next.0, 0);

            match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
                Ok(_) => {
                    if acquired {
                        // Successfully acquire permits **without** queuing the
                        // waiter node. The waiter node is not currently in the
                        // queue.
                        revert_to_idle!();
                        return Ready(Ok(()));
                    } else {
                        // The node is pushed into the queue, the final step is
                        // to set the node's "next" pointer to return the wait
                        // queue into a consistent state.

                        let prev_waiter =
                            curr.waiter().unwrap_or_else(|| NonNull::from(&*self.stub));

                        let waiter = maybe_waiter.unwrap();

                        // Link the nodes.
                        //
                        // Safety: the mpsc algorithm guarantees the old tail of
                        // the queue is not removed from the queue during the
                        // push process.
                        unsafe {
                            prev_waiter.as_ref().store_next(waiter);
                        }

                        return Pending;
                    }
                }
                Err(actual) => {
                    curr = SemState(actual);
                }
            }
        }
    }

    /// Closes the semaphore. This prevents the semaphore from issuing new
    /// permits and notifies all pending waiters.
    pub(crate) fn close(&self) {
        // Acquire the `rx_lock`, setting the "closed" flag on the lock.
        let prev = self.rx_lock.fetch_or(1, AcqRel);

        if prev != 0 {
            // Another thread has the lock and will be responsible for notifying
            // pending waiters.
            return;
        }

        self.add_permits_locked(0, true);
    }

    /// Adds `n` new permits to the semaphore.
    pub(crate) fn add_permits(&self, n: usize) {
        if n == 0 {
            return;
        }

        // TODO: Handle overflow. A panic is not sufficient, the process must
        // abort.
        let prev = self.rx_lock.fetch_add(n << 1, AcqRel);

        if prev != 0 {
            // Another thread has the lock and will be responsible for notifying
            // pending waiters.
            return;
        }

        self.add_permits_locked(n, false);
    }

    fn add_permits_locked(&self, mut rem: usize, mut closed: bool) {
        while rem > 0 || closed {
            if closed {
                SemState::fetch_set_closed(&self.state, AcqRel);
            }

            // Release the permits and notify
            self.add_permits_locked2(rem, closed);

            let n = rem << 1;

            let actual = if closed {
                let actual = self.rx_lock.fetch_sub(n | 1, AcqRel);
                closed = false;
                actual
            } else {
                let actual = self.rx_lock.fetch_sub(n, AcqRel);
                closed = actual & 1 == 1;
                actual
            };

            rem = (actual >> 1) - rem;
        }
    }

    /// Releases a specific amount of permits to the semaphore
    ///
    /// This function is called by `add_permits` after the add lock has been
    /// acquired.
    fn add_permits_locked2(&self, mut n: usize, closed: bool) {
        // If closing the semaphore, we want to drain the entire queue. The
        // number of permits being assigned doesn't matter.
        if closed {
            n = usize::MAX;
        }

        'outer: while n > 0 {
            unsafe {
                let mut head = self.head.with(|head| *head);
                let mut next_ptr = head.as_ref().next.load(Acquire);

                let stub = self.stub();

                if head == stub {
                    // The stub node indicates an empty queue. Any remaining
                    // permits get assigned back to the semaphore.
                    let next = match NonNull::new(next_ptr) {
                        Some(next) => next,
                        None => {
                            // This loop is not part of the standard intrusive mpsc
                            // channel algorithm. This is where we atomically pop
                            // the last task and add `n` to the remaining capacity.
                            //
                            // This modification to the pop algorithm works because,
                            // at this point, we have not done any work (only done
                            // reading). We have a *pretty* good idea that there is
                            // no concurrent pusher.
                            //
                            // The capacity is then atomically added by doing an
                            // AcqRel CAS on `state`. The `state` cell is the
                            // linchpin of the algorithm.
                            //
                            // By successfully CASing `head` w/ AcqRel, we ensure
                            // that, if any thread was racing and entered a push, we
                            // see that and abort pop, retrying as it is
                            // "inconsistent".
                            let mut curr = SemState::load(&self.state, Acquire);

                            loop {
                                if curr.has_waiter(&self.stub) {
                                    // A waiter is being added concurrently.
                                    // This is the MPSC queue's "inconsistent"
                                    // state and we must loop and try again.
                                    thread::yield_now();
                                    continue 'outer;
                                }

                                // If closing, nothing more to do.
                                if closed {
                                    debug_assert!(curr.is_closed(), "state = {:?}", curr);
                                    return;
                                }

                                let mut next = curr;
                                next.release_permits(n, &self.stub);

                                match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
                                    Ok(_) => return,
                                    Err(actual) => {
                                        curr = SemState(actual);
                                    }
                                }
                            }
                        }
                    };

                    self.head.with_mut(|head| *head = next);
                    head = next;
                    next_ptr = next.as_ref().next.load(Acquire);
                }

                // `head` points to a waiter assign permits to the waiter. If
                // all requested permits are satisfied, then we can continue,
                // otherwise the node stays in the wait queue.
                if !head.as_ref().assign_permits(&mut n, closed) {
                    assert_eq!(n, 0);
                    return;
                }

                if let Some(next) = NonNull::new(next_ptr) {
                    self.head.with_mut(|head| *head = next);

                    self.remove_queued(head, closed);
                    continue 'outer;
                }

                let state = SemState::load(&self.state, Acquire);

                // This must always be a pointer as the wait list is not empty.
                let tail = state.waiter().unwrap();

                if tail != head {
                    // Inconsistent
                    thread::yield_now();
                    continue 'outer;
                }

                self.push_stub(closed);

                next_ptr = head.as_ref().next.load(Acquire);

                if let Some(next) = NonNull::new(next_ptr) {
                    self.head.with_mut(|head| *head = next);

                    self.remove_queued(head, closed);
                    continue 'outer;
                }

                // Inconsistent state, loop
                thread::yield_now();
            }
        }
    }

    /// The wait node has had all of its permits assigned and has been removed
    /// from the wait queue.
    ///
    /// Attempt to remove the QUEUED bit from the node. If additional permits
    /// are concurrently requested, the node must be pushed back into the wait
    /// queued.
    fn remove_queued(&self, waiter: NonNull<Waiter>, closed: bool) {
        let mut curr = WaiterState(unsafe { waiter.as_ref() }.state.load(Acquire));

        loop {
            if curr.is_dropped() {
                // The Permit dropped, it is on us to release the memory
                let _ = unsafe { Box::from_raw(waiter.as_ptr()) };
                return;
            }

            // The node is removed from the queue. We attempt to unset the
            // queued bit, but concurrently the waiter has requested more
            // permits. When the waiter requested more permits, it saw the
            // queued bit set so took no further action. This requires us to
            // push the node back into the queue.
            if curr.permits_to_acquire() > 0 {
                // More permits are requested. The waiter must be re-queued
                unsafe {
                    self.push_waiter(waiter, closed);
                }
                return;
            }

            let mut next = curr;
            next.unset_queued();

            let w = unsafe { waiter.as_ref() };

            match w.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
                Ok(_) => return,
                Err(actual) => {
                    curr = WaiterState(actual);
                }
            }
        }
    }

    unsafe fn push_stub(&self, closed: bool) {
        self.push_waiter(self.stub(), closed);
    }

    unsafe fn push_waiter(&self, waiter: NonNull<Waiter>, closed: bool) {
        // Set the next pointer. This does not require an atomic operation as
        // this node is not accessible. The write will be flushed with the next
        // operation
        waiter.as_ref().next.store(ptr::null_mut(), Relaxed);

        // Update the tail to point to the new node. We need to see the previous
        // node in order to update the next pointer as well as release `task`
        // to any other threads calling `push`.
        let next = SemState::new_ptr(waiter, closed);
        let prev = SemState(self.state.swap(next.0, AcqRel));

        debug_assert_eq!(closed, prev.is_closed());

        // This function is only called when there are pending tasks. Because of
        // this, the state must *always* be in pointer mode.
        let prev = prev.waiter().unwrap();

        // No cycles plz
        debug_assert_ne!(prev, waiter);

        // Release `task` to the consume end.
        prev.as_ref().next.store(waiter.as_ptr(), Release);
    }

    fn stub(&self) -> NonNull<Waiter> {
        unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) }
    }
}

impl Drop for Semaphore {
    fn drop(&mut self) {
        self.close();
    }
}

impl fmt::Debug for Semaphore {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("Semaphore")
            .field("state", &SemState::load(&self.state, Relaxed))
            .field("head", &self.head.with(|ptr| ptr))
            .field("rx_lock", &self.rx_lock.load(Relaxed))
            .field("stub", &self.stub)
            .finish()
    }
}

unsafe impl Send for Semaphore {}
unsafe impl Sync for Semaphore {}

// ===== impl Permit =====

impl Permit {
    /// Creates a new `Permit`.
    ///
    /// The permit begins in the "unacquired" state.
    pub(crate) fn new() -> Permit {
        use PermitState::Acquired;

        Permit {
            waiter: None,
            state: Acquired(0),
        }
    }

    /// Returns `true` if the permit has been acquired
    pub(crate) fn is_acquired(&self) -> bool {
        match self.state {
            PermitState::Acquired(num) if num > 0 => true,
            _ => false,
        }
    }

    /// Tries to acquire the permit. If no permits are available, the current task
    /// is notified once a new permit becomes available.
    pub(crate) fn poll_acquire(
        &mut self,
        cx: &mut Context<'_>,
        num_permits: u16,
        semaphore: &Semaphore,
    ) -> Poll<Result<(), AcquireError>> {
        use std::cmp::Ordering::*;
        use PermitState::*;

        match self.state {
            Waiting(requested) => {
                // There must be a waiter
                let waiter = self.waiter.as_ref().unwrap();

                match requested.cmp(&num_permits) {
                    Less => {
                        let delta = num_permits - requested;

                        // Request additional permits. If the waiter has been
                        // dequeued, it must be re-queued.
                        if !waiter.try_inc_permits_to_acquire(delta as usize) {
                            let waiter = NonNull::from(&**waiter);

                            // Ignore the result. The check for
                            // `permits_to_acquire()` will converge the state as
                            // needed
                            let _ = semaphore.poll_acquire2(delta, || Some(waiter))?;
                        }

                        self.state = Waiting(num_permits);
                    }
                    Greater => {
                        let delta = requested - num_permits;
                        let to_release = waiter.try_dec_permits_to_acquire(delta as usize);

                        semaphore.add_permits(to_release);
                        self.state = Waiting(num_permits);
                    }
                    Equal => {}
                }

                if waiter.permits_to_acquire()? == 0 {
                    self.state = Acquired(requested);
                    return Ready(Ok(()));
                }

                waiter.waker.register_by_ref(cx.waker());

                if waiter.permits_to_acquire()? == 0 {
                    self.state = Acquired(requested);
                    return Ready(Ok(()));
                }

                Pending
            }
            Acquired(acquired) => {
                if acquired >= num_permits {
                    Ready(Ok(()))
                } else {
                    match semaphore.poll_acquire(cx, num_permits - acquired, self)? {
                        Ready(()) => {
                            self.state = Acquired(num_permits);
                            Ready(Ok(()))
                        }
                        Pending => {
                            self.state = Waiting(num_permits);
                            Pending
                        }
                    }
                }
            }
        }
    }

    /// Tries to acquire the permit.
    pub(crate) fn try_acquire(
        &mut self,
        num_permits: u16,
        semaphore: &Semaphore,
    ) -> Result<(), TryAcquireError> {
        use PermitState::*;

        match self.state {
            Waiting(requested) => {
                // There must be a waiter
                let waiter = self.waiter.as_ref().unwrap();

                if requested > num_permits {
                    let delta = requested - num_permits;
                    let to_release = waiter.try_dec_permits_to_acquire(delta as usize);

                    semaphore.add_permits(to_release);
                    self.state = Waiting(num_permits);
                }

                let res = waiter.permits_to_acquire().map_err(to_try_acquire)?;

                if res == 0 {
                    if requested < num_permits {
                        // Try to acquire the additional permits
                        semaphore.try_acquire(num_permits - requested)?;
                    }

                    self.state = Acquired(num_permits);
                    Ok(())
                } else {
                    Err(TryAcquireError::NoPermits)
                }
            }
            Acquired(acquired) => {
                if acquired < num_permits {
                    semaphore.try_acquire(num_permits - acquired)?;
                    self.state = Acquired(num_permits);
                }

                Ok(())
            }
        }
    }

    /// Releases a permit back to the semaphore
    pub(crate) fn release(&mut self, n: u16, semaphore: &Semaphore) {
        let n = self.forget(n);
        semaphore.add_permits(n as usize);
    }

    /// Forgets the permit **without** releasing it back to the semaphore.
    ///
    /// After calling `forget`, `poll_acquire` is able to acquire new permit
    /// from the sempahore.
    ///
    /// Repeatedly calling `forget` without associated calls to `add_permit`
    /// will result in the semaphore losing all permits.
    ///
    /// Will forget **at most** the number of acquired permits. This number is
    /// returned.
    pub(crate) fn forget(&mut self, n: u16) -> u16 {
        use PermitState::*;

        match self.state {
            Waiting(requested) => {
                let n = cmp::min(n, requested);

                // Decrement
                let acquired = self
                    .waiter
                    .as_ref()
                    .unwrap()
                    .try_dec_permits_to_acquire(n as usize) as u16;

                if n == requested {
                    self.state = Acquired(0);
                } else if acquired == requested - n {
                    self.state = Waiting(acquired);
                } else {
                    self.state = Waiting(requested - n);
                }

                acquired
            }
            Acquired(acquired) => {
                let n = cmp::min(n, acquired);
                self.state = Acquired(acquired - n);
                n
            }
        }
    }
}

impl Default for Permit {
    fn default() -> Self {
        Self::new()
    }
}

impl Drop for Permit {
    fn drop(&mut self) {
        if let Some(waiter) = self.waiter.take() {
            // Set the dropped flag
            let state = WaiterState(waiter.state.fetch_or(DROPPED, AcqRel));

            if state.is_queued() {
                // The waiter is stored in the queue. The semaphore will drop it
                std::mem::forget(waiter);
            }
        }
    }
}

// ===== impl AcquireError ====

impl AcquireError {
    fn closed() -> AcquireError {
        AcquireError(())
    }
}

fn to_try_acquire(_: AcquireError) -> TryAcquireError {
    TryAcquireError::Closed
}

impl fmt::Display for AcquireError {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(fmt, "semaphore closed")
    }
}

impl std::error::Error for AcquireError {}

// ===== impl TryAcquireError =====

impl TryAcquireError {
    /// Returns `true` if the error was caused by a closed semaphore.
    pub(crate) fn is_closed(&self) -> bool {
        match self {
            TryAcquireError::Closed => true,
            _ => false,
        }
    }

    /// Returns `true` if the error was caused by calling `try_acquire` on a
    /// semaphore with no available permits.
    pub(crate) fn is_no_permits(&self) -> bool {
        match self {
            TryAcquireError::NoPermits => true,
            _ => false,
        }
    }
}

impl fmt::Display for TryAcquireError {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            TryAcquireError::Closed => write!(fmt, "{}", "semaphore closed"),
            TryAcquireError::NoPermits => write!(fmt, "{}", "no permits available"),
        }
    }
}

impl std::error::Error for TryAcquireError {}

// ===== impl Waiter =====

impl Waiter {
    fn new() -> Waiter {
        Waiter {
            state: AtomicUsize::new(0),
            waker: AtomicWaker::new(),
            next: AtomicPtr::new(ptr::null_mut()),
        }
    }

    fn permits_to_acquire(&self) -> Result<usize, AcquireError> {
        let state = WaiterState(self.state.load(Acquire));

        if state.is_closed() {
            Err(AcquireError(()))
        } else {
            Ok(state.permits_to_acquire())
        }
    }

    /// Only increments the number of permits *if* the waiter is currently
    /// queued.
    ///
    /// # Returns
    ///
    /// `true` if the number of permits to acquire has been incremented. `false`
    /// otherwise. On `false`, the caller should use `Semaphore::poll_acquire`.
    fn try_inc_permits_to_acquire(&self, n: usize) -> bool {
        let mut curr = WaiterState(self.state.load(Acquire));

        loop {
            if !curr.is_queued() {
                assert_eq!(0, curr.permits_to_acquire());
                return false;
            }

            let mut next = curr;
            next.set_permits_to_acquire(n + curr.permits_to_acquire());

            match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
                Ok(_) => return true,
                Err(actual) => curr = WaiterState(actual),
            }
        }
    }

    /// Try to decrement the number of permits to acquire. This returns the
    /// actual number of permits that were decremented. The delta betweeen `n`
    /// and the return has been assigned to the permit and the caller must
    /// assign these back to the semaphore.
    fn try_dec_permits_to_acquire(&self, n: usize) -> usize {
        let mut curr = WaiterState(self.state.load(Acquire));

        loop {
            if !curr.is_queued() {
                assert_eq!(0, curr.permits_to_acquire());
            }

            let delta = cmp::min(n, curr.permits_to_acquire());
            let rem = curr.permits_to_acquire() - delta;

            let mut next = curr;
            next.set_permits_to_acquire(rem);

            match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
                Ok(_) => return n - delta,
                Err(actual) => curr = WaiterState(actual),
            }
        }
    }

    /// Store the number of remaining permits needed to satisfy the waiter and
    /// transition to the "QUEUED" state.
    ///
    /// # Returns
    ///
    /// `true` if the `QUEUED` bit was set as part of the transition.
    fn to_queued(&self, num_permits: usize) -> bool {
        let mut curr = WaiterState(self.state.load(Acquire));

        // The waiter should **not** be waiting for any permits.
        debug_assert_eq!(curr.permits_to_acquire(), 0);

        loop {
            let mut next = curr;
            next.set_permits_to_acquire(num_permits);
            next.set_queued();

            match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
                Ok(_) => {
                    if curr.is_queued() {
                        return false;
                    } else {
                        // Make sure the next pointer is null
                        self.next.store(ptr::null_mut(), Relaxed);
                        return true;
                    }
                }
                Err(actual) => curr = WaiterState(actual),
            }
        }
    }

    /// Set the number of permits to acquire.
    ///
    /// This function is only called when the waiter is being inserted into the
    /// wait queue. Because of this, there are no concurrent threads that can
    /// modify the state and using `store` is safe.
    fn set_permits_to_acquire(&self, num_permits: usize) {
        debug_assert!(WaiterState(self.state.load(Acquire)).is_queued());

        let mut state = WaiterState(QUEUED);
        state.set_permits_to_acquire(num_permits);

        self.state.store(state.0, Release);
    }

    /// Assign permits to the waiter.
    ///
    /// Returns `true` if the waiter should be removed from the queue
    fn assign_permits(&self, n: &mut usize, closed: bool) -> bool {
        let mut curr = WaiterState(self.state.load(Acquire));

        loop {
            let mut next = curr;

            // Number of permits to assign to this waiter
            let assign = cmp::min(curr.permits_to_acquire(), *n);

            // Assign the permits
            next.set_permits_to_acquire(curr.permits_to_acquire() - assign);

            if closed {
                next.set_closed();
            }

            match self.state.compare_exchange(curr.0, next.0, AcqRel, Acquire) {
                Ok(_) => {
                    // Update `n`
                    *n -= assign;

                    if next.permits_to_acquire() == 0 {
                        if curr.permits_to_acquire() > 0 {
                            self.waker.wake();
                        }

                        return true;
                    } else {
                        return false;
                    }
                }
                Err(actual) => curr = WaiterState(actual),
            }
        }
    }

    fn revert_to_idle(&self) {
        // An idle node is not waiting on any permits
        self.state.store(0, Relaxed);
    }

    fn store_next(&self, next: NonNull<Waiter>) {
        self.next.store(next.as_ptr(), Release);
    }
}

// ===== impl SemState =====

impl SemState {
    /// Returns a new default `State` value.
    fn new(permits: usize, stub: &Waiter) -> SemState {
        assert!(permits <= MAX_PERMITS);

        if permits > 0 {
            SemState((permits << NUM_SHIFT) | NUM_FLAG)
        } else {
            SemState(stub as *const _ as usize)
        }
    }

    /// Returns a `State` tracking `ptr` as the tail of the queue.
    fn new_ptr(tail: NonNull<Waiter>, closed: bool) -> SemState {
        let mut val = tail.as_ptr() as usize;

        if closed {
            val |= CLOSED_FLAG;
        }

        SemState(val)
    }

    /// Returns the amount of remaining capacity
    fn available_permits(self) -> usize {
        if !self.has_available_permits() {
            return 0;
        }

        self.0 >> NUM_SHIFT
    }

    /// Returns `true` if the state has permits that can be claimed by a waiter.
    fn has_available_permits(self) -> bool {
        self.0 & NUM_FLAG == NUM_FLAG
    }

    fn has_waiter(self, stub: &Waiter) -> bool {
        !self.has_available_permits() && !self.is_stub(stub)
    }

    /// Tries to atomically acquire specified number of permits.
    ///
    /// # Return
    ///
    /// Returns `true` if the specified number of permits were acquired, `false`
    /// otherwise. Returning false does not mean that there are no more
    /// available permits.
    fn acquire_permits(&mut self, num: usize, stub: &Waiter) -> bool {
        debug_assert!(num > 0);

        if self.available_permits() < num {
            return false;
        }

        debug_assert!(self.waiter().is_none());

        self.0 -= num << NUM_SHIFT;

        if self.0 == NUM_FLAG {
            // Set the state to the stub pointer.
            self.0 = stub as *const _ as usize;
        }

        true
    }

    /// Releases permits
    ///
    /// Returns `true` if the permits were accepted.
    fn release_permits(&mut self, permits: usize, stub: &Waiter) {
        debug_assert!(permits > 0);

        if self.is_stub(stub) {
            self.0 = (permits << NUM_SHIFT) | NUM_FLAG | (self.0 & CLOSED_FLAG);
            return;
        }

        debug_assert!(self.has_available_permits());

        self.0 += permits << NUM_SHIFT;
    }

    fn is_waiter(self) -> bool {
        self.0 & NUM_FLAG == 0
    }

    /// Returns the waiter, if one is set.
    fn waiter(self) -> Option<NonNull<Waiter>> {
        if self.is_waiter() {
            let waiter = NonNull::new(self.as_ptr()).expect("null pointer stored");

            Some(waiter)
        } else {
            None
        }
    }

    /// Assumes `self` represents a pointer
    fn as_ptr(self) -> *mut Waiter {
        (self.0 & !CLOSED_FLAG) as *mut Waiter
    }

    /// Sets to a pointer to a waiter.
    ///
    /// This can only be done from the full state.
    fn set_waiter(&mut self, waiter: NonNull<Waiter>) {
        let waiter = waiter.as_ptr() as usize;
        debug_assert!(!self.is_closed());

        self.0 = waiter;
    }

    fn is_stub(self, stub: &Waiter) -> bool {
        self.as_ptr() as usize == stub as *const _ as usize
    }

    /// Loads the state from an AtomicUsize.
    fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState {
        let value = cell.load(ordering);
        SemState(value)
    }

    fn fetch_set_closed(cell: &AtomicUsize, ordering: Ordering) -> SemState {
        let value = cell.fetch_or(CLOSED_FLAG, ordering);
        SemState(value)
    }

    fn is_closed(self) -> bool {
        self.0 & CLOSED_FLAG == CLOSED_FLAG
    }

    /// Converts the state into a `usize` representation.
    fn to_usize(self) -> usize {
        self.0
    }
}

impl fmt::Debug for SemState {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut fmt = fmt.debug_struct("SemState");

        if self.is_waiter() {
            fmt.field("state", &"<waiter>");
        } else {
            fmt.field("permits", &self.available_permits());
        }

        fmt.finish()
    }
}

// ===== impl WaiterState =====

impl WaiterState {
    fn permits_to_acquire(self) -> usize {
        self.0 >> PERMIT_SHIFT
    }

    fn set_permits_to_acquire(&mut self, val: usize) {
        self.0 = (val << PERMIT_SHIFT) | (self.0 & !PERMIT_MASK)
    }

    fn is_queued(self) -> bool {
        self.0 & QUEUED == QUEUED
    }

    fn set_queued(&mut self) {
        self.0 |= QUEUED;
    }

    fn is_closed(self) -> bool {
        self.0 & CLOSED == CLOSED
    }

    fn set_closed(&mut self) {
        self.0 |= CLOSED;
    }

    fn unset_queued(&mut self) {
        assert!(self.is_queued());
        self.0 -= QUEUED;
    }

    fn is_dropped(self) -> bool {
        self.0 & DROPPED == DROPPED
    }
}