tokio-sync 0.2.0-alpha.6

Synchronization utilities.
Documentation
//! Thread-safe, asynchronous counting semaphore.
//!
//! A `Semaphore` instance holds a set of permits. Permits are used to
//! synchronize access to a shared resource.
//!
//! Before accessing the shared resource, callers acquire a permit from the
//! semaphore. Once the permit is acquired, the caller then enters the critical
//! section. If no permits are available, then acquiring the semaphore returns
//! `Pending`. The task is woken once a permit becomes available.

use crate::loom::{
    future::AtomicWaker,
    sync::{
        atomic::{AtomicPtr, AtomicUsize},
        CausalCell,
    },
    thread,
};

use std::fmt;
use std::ptr::{self, NonNull};
use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed, Release};
use std::sync::Arc;
use std::task::Poll::{Pending, Ready};
use std::task::{Context, Poll};
use std::usize;

/// Futures-aware semaphore.
pub struct Semaphore {
    /// Tracks both the waiter queue tail pointer and the number of remaining
    /// permits.
    state: AtomicUsize,

    /// waiter queue head pointer.
    head: CausalCell<NonNull<WaiterNode>>,

    /// Coordinates access to the queue head.
    rx_lock: AtomicUsize,

    /// Stub waiter node used as part of the MPSC channel algorithm.
    stub: Box<WaiterNode>,
}

/// A semaphore permit
///
/// Tracks the lifecycle of a semaphore permit.
///
/// An instance of `Permit` is intended to be used with a **single** instance of
/// `Semaphore`. Using a single instance of `Permit` with multiple semaphore
/// instances will result in unexpected behavior.
///
/// `Permit` does **not** release the permit back to the semaphore on drop. It
/// is the user's responsibility to ensure that `Permit::release` is called
/// before dropping the permit.
#[derive(Debug)]
pub struct Permit {
    waiter: Option<Arc<WaiterNode>>,
    state: PermitState,
}

/// Error returned by `Permit::poll_acquire`.
#[derive(Debug)]
pub struct AcquireError(());

/// Error returned by `Permit::try_acquire`.
#[derive(Debug)]
pub struct TryAcquireError {
    kind: ErrorKind,
}

#[derive(Debug)]
enum ErrorKind {
    Closed,
    NoPermits,
}

/// Node used to notify the semaphore waiter when permit is available.
#[derive(Debug)]
struct WaiterNode {
    /// Stores waiter state.
    ///
    /// See `NodeState` for more details.
    state: AtomicUsize,

    /// Task to wake when a permit is made available.
    waker: AtomicWaker,

    /// Next pointer in the queue of waiting senders.
    next: AtomicPtr<WaiterNode>,
}

/// Semaphore state
///
/// The 2 low bits track the modes.
///
/// - Closed
/// - Full
///
/// When not full, the rest of the `usize` tracks the total number of messages
/// in the channel. When full, the rest of the `usize` is a pointer to the tail
/// of the "waiting senders" queue.
#[derive(Copy, Clone)]
struct SemState(usize);

/// Permit state
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
enum PermitState {
    /// The permit has not been requested.
    Idle,

    /// Currently waiting for a permit to be made available and assigned to the
    /// waiter.
    Waiting,

    /// The permit has been acquired.
    Acquired,
}

/// Waiter node state
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
#[repr(usize)]
enum NodeState {
    /// Not waiting for a permit and the node is not in the wait queue.
    ///
    /// This is the initial state.
    Idle = 0,

    /// Not waiting for a permit but the node is in the wait queue.
    ///
    /// This happens when the waiter has previously requested a permit, but has
    /// since canceled the request. The node cannot be removed by the waiter, so
    /// this state informs the receiver to skip the node when it pops it from
    /// the wait queue.
    Queued = 1,

    /// Waiting for a permit and the node is in the wait queue.
    QueuedWaiting = 2,

    /// The waiter has been assigned a permit and the node has been removed from
    /// the queue.
    Assigned = 3,

    /// The semaphore has been closed. No more permits will be issued.
    Closed = 4,
}

// ===== impl Semaphore =====

impl Semaphore {
    /// Creates a new semaphore with the initial number of permits
    ///
    /// # Panics
    ///
    /// Panics if `permits` is zero.
    pub fn new(permits: usize) -> Semaphore {
        let stub = Box::new(WaiterNode::new());
        let ptr = NonNull::new(&*stub as *const _ as *mut _).unwrap();

        // Allocations are aligned
        debug_assert!(ptr.as_ptr() as usize & NUM_FLAG == 0);

        let state = SemState::new(permits, &stub);

        Semaphore {
            state: AtomicUsize::new(state.to_usize()),
            head: CausalCell::new(ptr),
            rx_lock: AtomicUsize::new(0),
            stub,
        }
    }

    /// Returns the current number of available permits
    pub fn available_permits(&self) -> usize {
        let curr = SemState::load(&self.state, Acquire);
        curr.available_permits()
    }

    /// Poll for a permit
    fn poll_permit(
        &self,
        mut permit: Option<(&mut Context<'_>, &mut Permit)>,
    ) -> Poll<Result<(), AcquireError>> {
        // Load the current state
        let mut curr = SemState::load(&self.state, Acquire);

        debug!(" + poll_permit; sem-state = {:?}", curr);

        // Tracks a *mut WaiterNode representing an Arc clone.
        //
        // This avoids having to bump the ref count unless required.
        let mut maybe_strong: Option<NonNull<WaiterNode>> = None;

        macro_rules! undo_strong {
            () => {
                if let Some(waiter) = maybe_strong {
                    // The waiter was cloned, but never got queued.
                    // Before entering `poll_permit`, the waiter was in the
                    // `Idle` state. We must transition the node back to the
                    // idle state.
                    let waiter = unsafe { Arc::from_raw(waiter.as_ptr()) };
                    waiter.revert_to_idle();
                }
            };
        }

        loop {
            let mut next = curr;

            if curr.is_closed() {
                undo_strong!();
                return Ready(Err(AcquireError::closed()));
            }

            if !next.acquire_permit(&self.stub) {
                debug!(" + poll_permit -- no permits");

                debug_assert!(curr.waiter().is_some());

                if maybe_strong.is_none() {
                    if let Some((ref mut cx, ref mut permit)) = permit {
                        // Get the Sender's waiter node, or initialize one
                        let waiter = permit
                            .waiter
                            .get_or_insert_with(|| Arc::new(WaiterNode::new()));

                        waiter.register(cx);

                        debug!(" + poll_permit -- to_queued_waiting");

                        if !waiter.to_queued_waiting() {
                            debug!(" + poll_permit; waiter already queued");
                            // The node is alrady queued, there is no further work
                            // to do.
                            return Pending;
                        }

                        maybe_strong = Some(WaiterNode::into_non_null(waiter.clone()));
                    } else {
                        // If no `waiter`, then the task is not registered and there
                        // is no further work to do.
                        return Pending;
                    }
                }

                next.set_waiter(maybe_strong.unwrap());
            }

            debug!(" + poll_permit -- pre-CAS; next = {:?}", next);

            debug_assert_ne!(curr.0, 0);
            debug_assert_ne!(next.0, 0);

            match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
                Ok(_) => {
                    debug!(" + poll_permit -- CAS ok");
                    match curr.waiter() {
                        Some(prev_waiter) => {
                            let waiter = maybe_strong.unwrap();

                            // Finish pushing
                            unsafe {
                                prev_waiter.as_ref().next.store(waiter.as_ptr(), Release);
                            }

                            debug!(" + poll_permit -- waiter pushed");

                            return Pending;
                        }
                        None => {
                            debug!(" + poll_permit -- permit acquired");

                            undo_strong!();

                            return Ready(Ok(()));
                        }
                    }
                }
                Err(actual) => {
                    curr = actual;
                }
            }
        }
    }

    /// Close the semaphore. This prevents the semaphore from issuing new
    /// permits and notifies all pending waiters.
    pub fn close(&self) {
        debug!("+ Semaphore::close");

        // Acquire the `rx_lock`, setting the "closed" flag on the lock.
        let prev = self.rx_lock.fetch_or(1, AcqRel);
        debug!(" + close -- rx_lock.fetch_add(1)");

        if prev != 0 {
            debug!("+ close -- locked; prev = {}", prev);
            // Another thread has the lock and will be responsible for notifying
            // pending waiters.
            return;
        }

        self.add_permits_locked(0, true);
    }

    /// Add `n` new permits to the semaphore.
    pub fn add_permits(&self, n: usize) {
        debug!(" + add_permits; n = {}", n);

        if n == 0 {
            return;
        }

        // TODO: Handle overflow. A panic is not sufficient, the process must
        // abort.
        let prev = self.rx_lock.fetch_add(n << 1, AcqRel);
        debug!(" + add_permits; rx_lock.fetch_add(n << 1); n = {}", n);

        if prev != 0 {
            debug!(" + add_permits -- locked; prev = {}", prev);
            // Another thread has the lock and will be responsible for notifying
            // pending waiters.
            return;
        }

        self.add_permits_locked(n, false);
    }

    fn add_permits_locked(&self, mut rem: usize, mut closed: bool) {
        while rem > 0 || closed {
            debug!(
                " + add_permits_locked -- iter; rem = {}; closed = {:?}",
                rem, closed
            );

            if closed {
                SemState::fetch_set_closed(&self.state, AcqRel);
            }

            // Release the permits and notify
            self.add_permits_locked2(rem, closed);

            let n = rem << 1;

            let actual = if closed {
                let actual = self.rx_lock.fetch_sub(n | 1, AcqRel);
                debug!(
                    " + add_permits_locked; rx_lock.fetch_sub(n | 1); n = {}; actual={}",
                    n, actual
                );

                closed = false;
                actual
            } else {
                let actual = self.rx_lock.fetch_sub(n, AcqRel);
                debug!(
                    " + add_permits_locked; rx_lock.fetch_sub(n); n = {}; actual={}",
                    n, actual
                );

                closed = actual & 1 == 1;
                actual
            };

            rem = (actual >> 1) - rem;
        }

        debug!(" + add_permits; done");
    }

    /// Release a specific amount of permits to the semaphore
    ///
    /// This function is called by `add_permits` after the add lock has been
    /// acquired.
    fn add_permits_locked2(&self, mut n: usize, closed: bool) {
        while n > 0 || closed {
            let waiter = match self.pop(n, closed) {
                Some(waiter) => waiter,
                None => {
                    return;
                }
            };

            debug!(" + release_n -- notify");

            if waiter.notify(closed) {
                n = n.saturating_sub(1);
                debug!(" + release_n -- dec");
            }
        }
    }

    /// Pop a waiter
    ///
    /// `rem` represents the remaining number of times the caller will pop. If
    /// there are no more waiters to pop, `rem` is used to set the available
    /// permits.
    fn pop(&self, rem: usize, closed: bool) -> Option<Arc<WaiterNode>> {
        debug!(" + pop; rem = {}", rem);

        'outer: loop {
            unsafe {
                let mut head = self.head.with(|head| *head);
                let mut next_ptr = head.as_ref().next.load(Acquire);

                let stub = self.stub();

                if head == stub {
                    debug!(" + pop; head == stub");

                    let next = match NonNull::new(next_ptr) {
                        Some(next) => next,
                        None => {
                            // This loop is not part of the standard intrusive mpsc
                            // channel algorithm. This is where we atomically pop
                            // the last task and add `rem` to the remaining capacity.
                            //
                            // This modification to the pop algorithm works because,
                            // at this point, we have not done any work (only done
                            // reading). We have a *pretty* good idea that there is
                            // no concurrent pusher.
                            //
                            // The capacity is then atomically added by doing an
                            // AcqRel CAS on `state`. The `state` cell is the
                            // linchpin of the algorithm.
                            //
                            // By successfully CASing `head` w/ AcqRel, we ensure
                            // that, if any thread was racing and entered a push, we
                            // see that and abort pop, retrying as it is
                            // "inconsistent".
                            let mut curr = SemState::load(&self.state, Acquire);

                            loop {
                                if curr.has_waiter(&self.stub) {
                                    // Inconsistent
                                    debug!(" + pop; inconsistent 1");
                                    thread::yield_now();
                                    continue 'outer;
                                }

                                // When closing the semaphore, nodes are popped
                                // with `rem == 0`. In this case, we are not
                                // adding permits, but notifying waiters of the
                                // semaphore's closed state.
                                if rem == 0 {
                                    debug_assert!(curr.is_closed(), "state = {:?}", curr);
                                    return None;
                                }

                                let mut next = curr;
                                next.release_permits(rem, &self.stub);

                                match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
                                    Ok(_) => return None,
                                    Err(actual) => {
                                        curr = actual;
                                    }
                                }
                            }
                        }
                    };

                    debug!(" + pop; got next waiter");

                    self.head.with_mut(|head| *head = next);
                    head = next;
                    next_ptr = next.as_ref().next.load(Acquire);
                }

                if let Some(next) = NonNull::new(next_ptr) {
                    self.head.with_mut(|head| *head = next);

                    return Some(Arc::from_raw(head.as_ptr()));
                }

                let state = SemState::load(&self.state, Acquire);

                // This must always be a pointer as the wait list is not empty.
                let tail = state.waiter().unwrap();

                if tail != head {
                    // Inconsistent
                    debug!(" + pop; inconsistent 2");
                    thread::yield_now();
                    continue 'outer;
                }

                self.push_stub(closed);

                next_ptr = head.as_ref().next.load(Acquire);

                if let Some(next) = NonNull::new(next_ptr) {
                    self.head.with_mut(|head| *head = next);

                    return Some(Arc::from_raw(head.as_ptr()));
                }

                // Inconsistent state, loop
                debug!(" + pop; inconsistent 3");
                thread::yield_now();
            }
        }
    }

    unsafe fn push_stub(&self, closed: bool) {
        let stub = self.stub();

        // Set the next pointer. This does not require an atomic operation as
        // this node is not accessible. The write will be flushed with the next
        // operation
        stub.as_ref().next.store(ptr::null_mut(), Relaxed);

        // Update the tail to point to the new node. We need to see the previous
        // node in order to update the next pointer as well as release `task`
        // to any other threads calling `push`.
        let prev = SemState::new_ptr(stub, closed).swap(&self.state, AcqRel);

        debug_assert_eq!(closed, prev.is_closed());

        // The stub is only pushed when there are pending tasks. Because of
        // this, the state must *always* be in pointer mode.
        let prev = prev.waiter().unwrap();

        // We don't want the *existing* pointer to be a stub.
        debug_assert_ne!(prev, stub);

        // Release `task` to the consume end.
        prev.as_ref().next.store(stub.as_ptr(), Release);
    }

    fn stub(&self) -> NonNull<WaiterNode> {
        unsafe { NonNull::new_unchecked(&*self.stub as *const _ as *mut _) }
    }
}

impl fmt::Debug for Semaphore {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt.debug_struct("Semaphore")
            .field("state", &SemState::load(&self.state, Relaxed))
            .field("head", &self.head.with(|ptr| ptr))
            .field("rx_lock", &self.rx_lock.load(Relaxed))
            .field("stub", &self.stub)
            .finish()
    }
}

unsafe impl Send for Semaphore {}
unsafe impl Sync for Semaphore {}

// ===== impl Permit =====

impl Permit {
    /// Create a new `Permit`.
    ///
    /// The permit begins in the "unacquired" state.
    ///
    /// # Examples
    ///
    /// ```
    /// use tokio_sync::semaphore::Permit;
    ///
    /// let permit = Permit::new();
    /// assert!(!permit.is_acquired());
    /// ```
    pub fn new() -> Permit {
        Permit {
            waiter: None,
            state: PermitState::Idle,
        }
    }

    /// Returns true if the permit has been acquired
    pub fn is_acquired(&self) -> bool {
        self.state == PermitState::Acquired
    }

    /// Try to acquire the permit. If no permits are available, the current task
    /// is notified once a new permit becomes available.
    pub fn poll_acquire(
        &mut self,
        cx: &mut Context<'_>,
        semaphore: &Semaphore,
    ) -> Poll<Result<(), AcquireError>> {
        match self.state {
            PermitState::Idle => {}
            PermitState::Waiting => {
                let waiter = self.waiter.as_ref().unwrap();

                if waiter.acquire(cx)? {
                    self.state = PermitState::Acquired;
                    return Ready(Ok(()));
                } else {
                    return Pending;
                }
            }
            PermitState::Acquired => {
                return Ready(Ok(()));
            }
        }

        match semaphore.poll_permit(Some((cx, self)))? {
            Ready(()) => {
                self.state = PermitState::Acquired;
                Ready(Ok(()))
            }
            Pending => {
                self.state = PermitState::Waiting;
                Pending
            }
        }
    }

    /// Try to acquire the permit.
    pub fn try_acquire(&mut self, semaphore: &Semaphore) -> Result<(), TryAcquireError> {
        match self.state {
            PermitState::Idle => {}
            PermitState::Waiting => {
                let waiter = self.waiter.as_ref().unwrap();

                if waiter.acquire2().map_err(to_try_acquire)? {
                    self.state = PermitState::Acquired;
                    return Ok(());
                } else {
                    return Err(TryAcquireError::no_permits());
                }
            }
            PermitState::Acquired => {
                return Ok(());
            }
        }

        match semaphore.poll_permit(None).map_err(to_try_acquire)? {
            Ready(()) => {
                self.state = PermitState::Acquired;
                Ok(())
            }
            Pending => Err(TryAcquireError::no_permits()),
        }
    }

    /// Release a permit back to the semaphore
    pub fn release(&mut self, semaphore: &Semaphore) {
        if self.forget2() {
            semaphore.add_permits(1);
        }
    }

    /// Forget the permit **without** releasing it back to the semaphore.
    ///
    /// After calling `forget`, `poll_acquire` is able to acquire new permit
    /// from the sempahore.
    ///
    /// Repeatedly calling `forget` without associated calls to `add_permit`
    /// will result in the semaphore losing all permits.
    pub fn forget(&mut self) {
        self.forget2();
    }

    /// Returns `true` if the permit was acquired
    fn forget2(&mut self) -> bool {
        match self.state {
            PermitState::Idle => false,
            PermitState::Waiting => {
                let ret = self.waiter.as_ref().unwrap().cancel_interest();
                self.state = PermitState::Idle;
                ret
            }
            PermitState::Acquired => {
                self.state = PermitState::Idle;
                true
            }
        }
    }
}

impl Default for Permit {
    fn default() -> Self {
        Self::new()
    }
}

// ===== impl AcquireError ====

impl AcquireError {
    fn closed() -> AcquireError {
        AcquireError(())
    }
}

fn to_try_acquire(_: AcquireError) -> TryAcquireError {
    TryAcquireError::closed()
}

impl fmt::Display for AcquireError {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        write!(fmt, "semaphore closed")
    }
}

impl ::std::error::Error for AcquireError {}

// ===== impl TryAcquireError =====

impl TryAcquireError {
    fn closed() -> TryAcquireError {
        TryAcquireError {
            kind: ErrorKind::Closed,
        }
    }

    fn no_permits() -> TryAcquireError {
        TryAcquireError {
            kind: ErrorKind::NoPermits,
        }
    }

    /// Returns true if the error was caused by a closed semaphore.
    pub fn is_closed(&self) -> bool {
        match self.kind {
            ErrorKind::Closed => true,
            _ => false,
        }
    }

    /// Returns true if the error was caused by calling `try_acquire` on a
    /// semaphore with no available permits.
    pub fn is_no_permits(&self) -> bool {
        match self.kind {
            ErrorKind::NoPermits => true,
            _ => false,
        }
    }
}

impl fmt::Display for TryAcquireError {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        let descr = match self.kind {
            ErrorKind::Closed => "semaphore closed",
            ErrorKind::NoPermits => "no permits available",
        };
        write!(fmt, "{}", descr)
    }
}

impl ::std::error::Error for TryAcquireError {}

// ===== impl WaiterNode =====

impl WaiterNode {
    fn new() -> WaiterNode {
        WaiterNode {
            state: AtomicUsize::new(NodeState::new().to_usize()),
            waker: AtomicWaker::new(),
            next: AtomicPtr::new(ptr::null_mut()),
        }
    }

    fn acquire(&self, cx: &mut Context<'_>) -> Result<bool, AcquireError> {
        if self.acquire2()? {
            return Ok(true);
        }

        self.waker.register_by_ref(cx.waker());

        self.acquire2()
    }

    fn acquire2(&self) -> Result<bool, AcquireError> {
        use self::NodeState::*;

        match Idle.compare_exchange(&self.state, Assigned, AcqRel, Acquire) {
            Ok(_) => Ok(true),
            Err(Closed) => Err(AcquireError::closed()),
            Err(_) => Ok(false),
        }
    }

    fn register(&self, cx: &mut Context<'_>) {
        self.waker.register_by_ref(cx.waker())
    }

    /// Returns `true` if the permit has been acquired
    fn cancel_interest(&self) -> bool {
        use self::NodeState::*;

        match Queued.compare_exchange(&self.state, QueuedWaiting, AcqRel, Acquire) {
            // Successfully removed interest from the queued node. The permit
            // has not been assigned to the node.
            Ok(_) => false,
            // The semaphore has been closed, there is no further action to
            // take.
            Err(Closed) => false,
            // The permit has been assigned. It must be acquired in order to
            // be released back to the semaphore.
            Err(Assigned) => {
                match self.acquire2() {
                    Ok(true) => true,
                    // Not a reachable state
                    Ok(false) => panic!(),
                    // The semaphore has been closed, no further action to take.
                    Err(_) => false,
                }
            }
            Err(state) => panic!("unexpected state = {:?}", state),
        }
    }

    /// Transition the state to `QueuedWaiting`.
    ///
    /// This step can only happen from `Queued` or from `Idle`.
    ///
    /// Returns `true` if transitioning into a queued state.
    fn to_queued_waiting(&self) -> bool {
        use self::NodeState::*;

        let mut curr = NodeState::load(&self.state, Acquire);

        loop {
            debug_assert!(curr == Idle || curr == Queued, "actual = {:?}", curr);
            let next = QueuedWaiting;

            match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
                Ok(_) => {
                    if curr.is_queued() {
                        return false;
                    } else {
                        // Transitioned to queued, reset next pointer
                        self.next.store(ptr::null_mut(), Relaxed);
                        return true;
                    }
                }
                Err(actual) => {
                    curr = actual;
                }
            }
        }
    }

    /// Notify the waiter
    ///
    /// Returns `true` if the waiter accepts the notification
    fn notify(&self, closed: bool) -> bool {
        use self::NodeState::*;

        // Assume QueuedWaiting state
        let mut curr = QueuedWaiting;

        loop {
            let next = match curr {
                Queued => Idle,
                QueuedWaiting => {
                    if closed {
                        Closed
                    } else {
                        Assigned
                    }
                }
                actual => panic!("actual = {:?}", actual),
            };

            match next.compare_exchange(&self.state, curr, AcqRel, Acquire) {
                Ok(_) => match curr {
                    QueuedWaiting => {
                        debug!(" + notify -- task notified");
                        self.waker.wake();
                        return true;
                    }
                    other => {
                        debug!(" + notify -- not notified; state = {:?}", other);
                        return false;
                    }
                },
                Err(actual) => curr = actual,
            }
        }
    }

    fn revert_to_idle(&self) {
        use self::NodeState::Idle;

        // There are no other handles to the node
        NodeState::store(&self.state, Idle, Relaxed);
    }

    #[allow(clippy::wrong_self_convention)] // https://github.com/rust-lang/rust-clippy/issues/4293
    fn into_non_null(self: Arc<WaiterNode>) -> NonNull<WaiterNode> {
        let ptr = Arc::into_raw(self);
        unsafe { NonNull::new_unchecked(ptr as *mut _) }
    }
}

// ===== impl State =====

/// Flag differentiating between available permits and waiter pointers.
///
/// If we assume pointers are properly aligned, then the least significant bit
/// will always be zero. So, we use that bit to track if the value represents a
/// number.
const NUM_FLAG: usize = 0b01;

const CLOSED_FLAG: usize = 0b10;

const MAX_PERMITS: usize = usize::MAX >> NUM_SHIFT;

/// When representing "numbers", the state has to be shifted this much (to get
/// rid of the flag bit).
const NUM_SHIFT: usize = 2;

impl SemState {
    /// Returns a new default `State` value.
    fn new(permits: usize, stub: &WaiterNode) -> SemState {
        assert!(permits <= MAX_PERMITS);

        if permits > 0 {
            SemState((permits << NUM_SHIFT) | NUM_FLAG)
        } else {
            SemState(stub as *const _ as usize)
        }
    }

    /// Returns a `State` tracking `ptr` as the tail of the queue.
    fn new_ptr(tail: NonNull<WaiterNode>, closed: bool) -> SemState {
        let mut val = tail.as_ptr() as usize;

        if closed {
            val |= CLOSED_FLAG;
        }

        SemState(val)
    }

    /// Returns the amount of remaining capacity
    fn available_permits(self) -> usize {
        if !self.has_available_permits() {
            return 0;
        }

        self.0 >> NUM_SHIFT
    }

    /// Returns true if the state has permits that can be claimed by a waiter.
    fn has_available_permits(self) -> bool {
        self.0 & NUM_FLAG == NUM_FLAG
    }

    fn has_waiter(self, stub: &WaiterNode) -> bool {
        !self.has_available_permits() && !self.is_stub(stub)
    }

    /// Try to acquire a permit
    ///
    /// # Return
    ///
    /// Returns `true` if the permit was acquired, `false` otherwise. If `false`
    /// is returned, it can be assumed that `State` represents the head pointer
    /// in the mpsc channel.
    fn acquire_permit(&mut self, stub: &WaiterNode) -> bool {
        if !self.has_available_permits() {
            return false;
        }

        debug_assert!(self.waiter().is_none());

        self.0 -= 1 << NUM_SHIFT;

        if self.0 == NUM_FLAG {
            // Set the state to the stub pointer.
            self.0 = stub as *const _ as usize;
        }

        true
    }

    /// Release permits
    ///
    /// Returns `true` if the permits were accepted.
    fn release_permits(&mut self, permits: usize, stub: &WaiterNode) {
        debug_assert!(permits > 0);

        if self.is_stub(stub) {
            self.0 = (permits << NUM_SHIFT) | NUM_FLAG | (self.0 & CLOSED_FLAG);
            return;
        }

        debug_assert!(self.has_available_permits());

        self.0 += permits << NUM_SHIFT;
    }

    fn is_waiter(self) -> bool {
        self.0 & NUM_FLAG == 0
    }

    /// Returns the waiter, if one is set.
    fn waiter(self) -> Option<NonNull<WaiterNode>> {
        if self.is_waiter() {
            let waiter = NonNull::new(self.as_ptr()).expect("null pointer stored");

            Some(waiter)
        } else {
            None
        }
    }

    /// Assumes `self` represents a pointer
    fn as_ptr(self) -> *mut WaiterNode {
        (self.0 & !CLOSED_FLAG) as *mut WaiterNode
    }

    /// Set to a pointer to a waiter.
    ///
    /// This can only be done from the full state.
    fn set_waiter(&mut self, waiter: NonNull<WaiterNode>) {
        let waiter = waiter.as_ptr() as usize;
        debug_assert!(waiter & NUM_FLAG == 0);
        debug_assert!(!self.is_closed());

        self.0 = waiter;
    }

    fn is_stub(self, stub: &WaiterNode) -> bool {
        self.as_ptr() as usize == stub as *const _ as usize
    }

    /// Load the state from an AtomicUsize.
    fn load(cell: &AtomicUsize, ordering: Ordering) -> SemState {
        let value = cell.load(ordering);
        debug!(" + SemState::load; value = {}", value);
        SemState(value)
    }

    /// Swap the values
    fn swap(self, cell: &AtomicUsize, ordering: Ordering) -> SemState {
        let prev = SemState(cell.swap(self.to_usize(), ordering));
        debug_assert_eq!(prev.is_closed(), self.is_closed());
        prev
    }

    /// Compare and exchange the current value into the provided cell
    fn compare_exchange(
        self,
        cell: &AtomicUsize,
        prev: SemState,
        success: Ordering,
        failure: Ordering,
    ) -> Result<SemState, SemState> {
        debug_assert_eq!(prev.is_closed(), self.is_closed());

        let res = cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure);

        debug!(
            " + SemState::compare_exchange; prev = {}; next = {}; result = {:?}",
            prev.to_usize(),
            self.to_usize(),
            res
        );

        res.map(SemState).map_err(SemState)
    }

    fn fetch_set_closed(cell: &AtomicUsize, ordering: Ordering) -> SemState {
        let value = cell.fetch_or(CLOSED_FLAG, ordering);
        SemState(value)
    }

    fn is_closed(self) -> bool {
        self.0 & CLOSED_FLAG == CLOSED_FLAG
    }

    /// Converts the state into a `usize` representation.
    fn to_usize(self) -> usize {
        self.0
    }
}

impl fmt::Debug for SemState {
    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
        let mut fmt = fmt.debug_struct("SemState");

        if self.is_waiter() {
            fmt.field("state", &"<waiter>");
        } else {
            fmt.field("permits", &self.available_permits());
        }

        fmt.finish()
    }
}

// ===== impl NodeState =====

impl NodeState {
    fn new() -> NodeState {
        NodeState::Idle
    }

    fn from_usize(value: usize) -> NodeState {
        use self::NodeState::*;

        match value {
            0 => Idle,
            1 => Queued,
            2 => QueuedWaiting,
            3 => Assigned,
            4 => Closed,
            _ => panic!(),
        }
    }

    fn load(cell: &AtomicUsize, ordering: Ordering) -> NodeState {
        NodeState::from_usize(cell.load(ordering))
    }

    /// Store a value
    fn store(cell: &AtomicUsize, value: NodeState, ordering: Ordering) {
        cell.store(value.to_usize(), ordering);
    }

    fn compare_exchange(
        self,
        cell: &AtomicUsize,
        prev: NodeState,
        success: Ordering,
        failure: Ordering,
    ) -> Result<NodeState, NodeState> {
        cell.compare_exchange(prev.to_usize(), self.to_usize(), success, failure)
            .map(NodeState::from_usize)
            .map_err(NodeState::from_usize)
    }

    /// Returns `true` if `self` represents a queued state.
    fn is_queued(self) -> bool {
        use self::NodeState::*;

        match self {
            Queued | QueuedWaiting => true,
            _ => false,
        }
    }

    fn to_usize(self) -> usize {
        self as usize
    }
}