event-listener 4.0.3

Notify async tasks or threads
Documentation
//! Implementation of `event-listener` built exclusively on atomics.
//!
//! On `no_std`, we don't have access to `Mutex`, so we can't use intrusive linked lists like the `std`
//! implementation. Normally, we would use a concurrent atomic queue to store listeners, but benchmarks
//! show that using queues in this way is very slow, especially for the single threaded use-case.
//!
//! We've found that it's easier to assume that the `Event` won't be under high contention in most use
//! cases. Therefore, we use a spinlock that protects a linked list of listeners, and fall back to an
//! atomic queue if the lock is contended. Benchmarks show that this is about 20% slower than the std
//! implementation, but still much faster than using a queue.

#[path = "no_std/node.rs"]
mod node;

use node::{Node, NothingProducer, TaskWaiting};

use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::cell::{Cell, UnsafeCell};
use crate::sync::Arc;
use crate::{RegisterResult, State, Task, TaskRef};

use core::fmt;
use core::marker::PhantomData;
use core::mem;
use core::num::NonZeroUsize;
use core::ops;
use core::pin::Pin;

use alloc::vec::Vec;

impl<T> crate::Inner<T> {
    /// Locks the list.
    fn try_lock(&self) -> Option<ListGuard<'_, T>> {
        self.list.inner.try_lock().map(|guard| ListGuard {
            inner: self,
            guard: Some(guard),
            tasks: alloc::vec![],
        })
    }

    /// Force a queue update.
    fn queue_update(&self) {
        // Locking and unlocking the mutex will drain the queue if there is no contention.
        drop(self.try_lock());
    }

    pub(crate) fn needs_notification(&self, _limit: usize) -> bool {
        // TODO: Figure out a stable way to do this optimization.
        true
    }

    /// Add a new listener to the list.
    ///
    /// Does nothing if the list is already registered.
    pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
        if listener.as_ref().as_pin_ref().is_some() {
            // Already inserted.
            return;
        }

        match self.try_lock() {
            Some(mut lock) => {
                let key = lock.insert(State::Created);
                *listener = Some(Listener::HasNode(key));
            }

            None => {
                // Push it to the queue.
                let (node, task_waiting) = Node::listener();
                self.list.queue.push(node).unwrap();
                *listener = Some(Listener::Queued(task_waiting));

                // Force a queue update.
                self.queue_update();
            }
        }
    }

    /// Remove a listener from the list.
    pub(crate) fn remove(
        &self,
        mut listener: Pin<&mut Option<Listener<T>>>,
        propagate: bool,
    ) -> Option<State<T>> {
        loop {
            let state = match listener.as_mut().take() {
                Some(Listener::HasNode(key)) => {
                    match self.try_lock() {
                        Some(mut list) => {
                            // Fast path removal.
                            list.remove(key, propagate)
                        }

                        None => {
                            // Slow path removal.
                            // This is why intrusive lists don't work on no_std.
                            let node = Node::RemoveListener {
                                listener: key,
                                propagate,
                            };

                            self.list.queue.push(node).unwrap();

                            // Force a queue update.
                            self.queue_update();

                            None
                        }
                    }
                }

                Some(Listener::Queued(tw)) => {
                    // Make sure it's not added after the queue is drained.
                    if let Some(key) = tw.cancel() {
                        // If it was already added, set up our listener and try again.
                        *listener = Some(Listener::HasNode(key));
                        continue;
                    }

                    None
                }

                None => None,

                _ => unreachable!(),
            };

            return state;
        }
    }

    /// Notifies a number of entries.
    #[cold]
    pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
        match self.try_lock() {
            Some(mut guard) => {
                // Notify the listeners.
                guard.notify(notify)
            }

            None => {
                // Push it to the queue.
                let node = Node::Notify(GenericNotify::new(
                    notify.count(Internal::new()),
                    notify.is_additional(Internal::new()),
                    NothingProducer::default(),
                ));

                self.list.queue.push(node).unwrap();

                // Force a queue update.
                self.queue_update();

                // We haven't notified anyone yet.
                0
            }
        }
    }

    /// Register a task to be notified when the event is triggered.
    ///
    /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
    /// isn't inserted, returns `None`.
    pub(crate) fn register(
        &self,
        mut listener: Pin<&mut Option<Listener<T>>>,
        task: TaskRef<'_>,
    ) -> RegisterResult<T> {
        loop {
            match listener.as_mut().take() {
                Some(Listener::HasNode(key)) => {
                    *listener = Some(Listener::HasNode(key));
                    match self.try_lock() {
                        Some(mut guard) => {
                            // Fast path registration.
                            return guard.register(listener, task);
                        }

                        None => {
                            // Wait for the lock.
                            let node = Node::Waiting(task.into_task());
                            self.list.queue.push(node).unwrap();

                            // Force a queue update.
                            self.queue_update();

                            return RegisterResult::Registered;
                        }
                    }
                }

                Some(Listener::Queued(task_waiting)) => {
                    // Force a queue update.
                    self.queue_update();

                    // Are we done yet?
                    match task_waiting.status() {
                        Some(key) => {
                            assert!(key.get() != usize::MAX);

                            // We're inserted now, adjust state.
                            *listener = Some(Listener::HasNode(key));
                        }

                        None => {
                            // We're still queued, so register the task.
                            task_waiting.register(task.into_task());
                            *listener = Some(Listener::Queued(task_waiting));

                            // Force a queue update.
                            self.queue_update();

                            return RegisterResult::Registered;
                        }
                    }
                }

                None => return RegisterResult::NeverInserted,

                _ => unreachable!(),
            }
        }
    }
}

#[derive(Debug)]
pub(crate) struct List<T> {
    /// The inner list.
    inner: Mutex<ListenerSlab<T>>,

    /// The queue of pending operations.
    queue: concurrent_queue::ConcurrentQueue<Node<T>>,
}

impl<T> List<T> {
    pub(super) fn new() -> List<T> {
        List {
            inner: Mutex::new(ListenerSlab::new()),
            queue: concurrent_queue::ConcurrentQueue::unbounded(),
        }
    }
    pub fn total_listeners(&self) -> Result<usize, &str> {
        self.inner
            .try_lock()
            .map(|lock| Ok(lock.listeners.len()))
            .unwrap_or(Err("<locked>"))
    }
}

/// The guard returned by [`Inner::lock`].
pub(crate) struct ListGuard<'a, T> {
    /// Reference to the inner state.
    pub(crate) inner: &'a crate::Inner<T>,

    /// The locked list.
    pub(crate) guard: Option<MutexGuard<'a, ListenerSlab<T>>>,

    /// Tasks to wake up once this guard is dropped.
    tasks: Vec<Task>,
}

impl<T> ListGuard<'_, T> {
    #[cold]
    fn process_nodes_slow(&mut self, start_node: Node<T>) {
        let guard = self.guard.as_mut().unwrap();

        // Process the start node.
        self.tasks.extend(start_node.apply(guard));

        // Process all remaining nodes.
        while let Ok(node) = self.inner.list.queue.pop() {
            self.tasks.extend(node.apply(guard));
        }
    }

    #[inline]
    fn process_nodes(&mut self) {
        // Process every node left in the queue.
        if let Ok(start_node) = self.inner.list.queue.pop() {
            self.process_nodes_slow(start_node);
        }
    }
}

impl<T> ops::Deref for ListGuard<'_, T> {
    type Target = ListenerSlab<T>;

    fn deref(&self) -> &Self::Target {
        self.guard.as_ref().unwrap()
    }
}

impl<T> ops::DerefMut for ListGuard<'_, T> {
    fn deref_mut(&mut self) -> &mut Self::Target {
        self.guard.as_mut().unwrap()
    }
}

impl<T> Drop for ListGuard<'_, T> {
    fn drop(&mut self) {
        while self.guard.is_some() {
            // Process every node left in the queue.
            self.process_nodes();

            // Update the atomic `notified` counter.
            let list = self.guard.take().unwrap();
            let notified = if list.notified < list.len {
                list.notified
            } else {
                core::usize::MAX
            };

            self.inner.notified.store(notified, Ordering::Release);

            // Drop the actual lock.
            drop(list);

            // Wakeup all tasks.
            for task in self.tasks.drain(..) {
                task.wake();
            }

            // There is a deadlock where a node is pushed to the end of the queue after we've finished
            // process_nodes() but before we've finished dropping the lock. This can lead to some
            // notifications not being properly delivered, or listeners not being added to the list.
            // Therefore check before we finish dropping if there is anything left in the queue, and
            // if so, lock it again and force a queue update.
            if !self.inner.list.queue.is_empty() {
                self.guard = self.inner.list.inner.try_lock();
            }
        }
    }
}

/// An entry representing a registered listener.
enum Entry<T> {
    /// Contains the listener state.
    Listener {
        /// The state of the listener.
        state: Cell<State<T>>,

        /// The previous listener in the list.
        prev: Cell<Option<NonZeroUsize>>,

        /// The next listener in the list.
        next: Cell<Option<NonZeroUsize>>,
    },

    /// An empty slot that contains the index of the next empty slot.
    Empty(NonZeroUsize),

    /// Sentinel value.
    Sentinel,
}

struct TakenState<'a, T> {
    slot: &'a Cell<State<T>>,
    state: State<T>,
}

impl<T> Drop for TakenState<'_, T> {
    fn drop(&mut self) {
        self.slot
            .set(mem::replace(&mut self.state, State::NotifiedTaken));
    }
}

impl<T> fmt::Debug for TakenState<'_, T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        fmt::Debug::fmt(&self.state, f)
    }
}

impl<T: PartialEq> PartialEq for TakenState<'_, T> {
    fn eq(&self, other: &Self) -> bool {
        self.state == other.state
    }
}

impl<'a, T> TakenState<'a, T> {
    fn new(slot: &'a Cell<State<T>>) -> Self {
        let state = slot.replace(State::NotifiedTaken);
        Self { slot, state }
    }
}

impl<T> fmt::Debug for Entry<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Entry::Listener { state, next, prev } => f
                .debug_struct("Listener")
                .field("state", &TakenState::new(state))
                .field("prev", prev)
                .field("next", next)
                .finish(),
            Entry::Empty(next) => f.debug_tuple("Empty").field(next).finish(),
            Entry::Sentinel => f.debug_tuple("Sentinel").finish(),
        }
    }
}

impl<T: PartialEq> PartialEq for Entry<T> {
    fn eq(&self, other: &Entry<T>) -> bool {
        match (self, other) {
            (
                Self::Listener {
                    state: state1,
                    prev: prev1,
                    next: next1,
                },
                Self::Listener {
                    state: state2,
                    prev: prev2,
                    next: next2,
                },
            ) => {
                if TakenState::new(state1) != TakenState::new(state2) {
                    return false;
                }

                prev1.get() == prev2.get() && next1.get() == next2.get()
            }
            (Self::Empty(next1), Self::Empty(next2)) => next1 == next2,
            (Self::Sentinel, Self::Sentinel) => true,
            _ => false,
        }
    }
}

impl<T> Entry<T> {
    fn state(&self) -> &Cell<State<T>> {
        match self {
            Entry::Listener { state, .. } => state,
            _ => unreachable!(),
        }
    }

    fn prev(&self) -> &Cell<Option<NonZeroUsize>> {
        match self {
            Entry::Listener { prev, .. } => prev,
            _ => unreachable!(),
        }
    }

    fn next(&self) -> &Cell<Option<NonZeroUsize>> {
        match self {
            Entry::Listener { next, .. } => next,
            _ => unreachable!(),
        }
    }
}

/// A linked list of entries.
pub(crate) struct ListenerSlab<T> {
    /// The raw list of entries.
    listeners: Vec<Entry<T>>,

    /// First entry in the list.
    head: Option<NonZeroUsize>,

    /// Last entry in the list.
    tail: Option<NonZeroUsize>,

    /// The first unnotified entry in the list.
    start: Option<NonZeroUsize>,

    /// The number of notified entries in the list.
    notified: usize,

    /// The total number of listeners.
    len: usize,

    /// The index of the first `Empty` entry, or the length of the list plus one if there
    /// are no empty entries.
    first_empty: NonZeroUsize,
}

impl<T> fmt::Debug for ListenerSlab<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        f.debug_struct("ListenerSlab")
            .field("listeners", &self.listeners)
            .field("head", &self.head)
            .field("tail", &self.tail)
            .field("start", &self.start)
            .field("notified", &self.notified)
            .field("len", &self.len)
            .field("first_empty", &self.first_empty)
            .finish()
    }
}

impl<T> ListenerSlab<T> {
    /// Create a new, empty list.
    pub(crate) fn new() -> Self {
        Self {
            listeners: alloc::vec![Entry::Sentinel],
            head: None,
            tail: None,
            start: None,
            notified: 0,
            len: 0,
            first_empty: unsafe { NonZeroUsize::new_unchecked(1) },
        }
    }

    /// Inserts a new entry into the list.
    pub(crate) fn insert(&mut self, state: State<T>) -> NonZeroUsize {
        // Add the new entry into the list.
        let key = {
            let entry = Entry::Listener {
                state: Cell::new(state),
                prev: Cell::new(self.tail),
                next: Cell::new(None),
            };

            let key = self.first_empty;
            if self.first_empty.get() == self.listeners.len() {
                // No empty entries, so add a new entry.
                self.listeners.push(entry);

                // SAFETY: Guaranteed to not overflow, since the Vec would have panicked already.
                self.first_empty = unsafe { NonZeroUsize::new_unchecked(self.listeners.len()) };
            } else {
                // There is an empty entry, so replace it.
                let slot = &mut self.listeners[key.get()];
                let next = match mem::replace(slot, entry) {
                    Entry::Empty(next) => next,
                    _ => unreachable!(),
                };

                self.first_empty = next;
            }

            key
        };

        // Replace the tail with the new entry.
        match mem::replace(&mut self.tail, Some(key)) {
            None => self.head = Some(key),
            Some(tail) => {
                let tail = &self.listeners[tail.get()];
                tail.next().set(Some(key));
            }
        }

        // If there are no listeners that have been notified, then the new listener is the next
        // listener to be notified.
        if self.start.is_none() {
            self.start = Some(key);
        }

        // Increment the length.
        self.len += 1;

        key
    }

    /// Removes an entry from the list and returns its state.
    pub(crate) fn remove(&mut self, key: NonZeroUsize, propagate: bool) -> Option<State<T>> {
        let entry = &self.listeners[key.get()];
        let prev = entry.prev().get();
        let next = entry.next().get();

        // Unlink from the previous entry.
        match prev {
            None => self.head = next,
            Some(p) => self.listeners[p.get()].next().set(next),
        }

        // Unlink from the next entry.
        match next {
            None => self.tail = prev,
            Some(n) => self.listeners[n.get()].prev().set(prev),
        }

        // If this was the first unnotified entry, move the pointer to the next one.
        if self.start == Some(key) {
            self.start = next;
        }

        // Extract the state.
        let entry = mem::replace(
            &mut self.listeners[key.get()],
            Entry::Empty(self.first_empty),
        );
        self.first_empty = key;

        let mut state = match entry {
            Entry::Listener { state, .. } => state.into_inner(),
            _ => unreachable!(),
        };

        // Update the counters.
        if state.is_notified() {
            self.notified = self.notified.saturating_sub(1);

            if propagate {
                // Propagate the notification to the next entry.
                let state = mem::replace(&mut state, State::NotifiedTaken);
                if let State::Notified { tag, additional } = state {
                    let tags = {
                        let mut tag = Some(tag);
                        move || tag.take().expect("called more than once")
                    };

                    self.notify(GenericNotify::new(1, additional, tags));
                }
            }
        }
        self.len -= 1;

        Some(state)
    }

    /// Notifies a number of listeners.
    #[cold]
    pub(crate) fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
        let mut n = notify.count(Internal::new());
        let is_additional = notify.is_additional(Internal::new());
        if !is_additional {
            // Make sure we're not notifying more than we have.
            if n <= self.notified {
                return 0;
            }
            n -= self.notified;
        }

        let original_count = n;
        while n > 0 {
            n -= 1;

            // Notify the next entry.
            match self.start {
                None => return original_count - n - 1,

                Some(e) => {
                    // Get the entry and move the pointer forwards.
                    let entry = &self.listeners[e.get()];
                    self.start = entry.next().get();

                    // Set the state to `Notified` and notify.
                    let tag = notify.next_tag(Internal::new());
                    if let State::Task(task) = entry.state().replace(State::Notified {
                        tag,
                        additional: is_additional,
                    }) {
                        task.wake();
                    }

                    // Bump the notified count.
                    self.notified += 1;
                }
            }
        }

        original_count - n
    }

    /// Register a task to be notified when the event is triggered.
    ///
    /// Returns `true` if the listener was already notified, and `false` otherwise. If the listener
    /// isn't inserted, returns `None`.
    pub(crate) fn register(
        &mut self,
        mut listener: Pin<&mut Option<Listener<T>>>,
        task: TaskRef<'_>,
    ) -> RegisterResult<T> {
        let key = match *listener {
            Some(Listener::HasNode(key)) => key,
            _ => return RegisterResult::NeverInserted,
        };

        let entry = &self.listeners[key.get()];

        // Take the state out and check it.
        match entry.state().replace(State::NotifiedTaken) {
            State::Notified { tag, .. } => {
                // The listener was already notified, so we don't need to do anything.
                self.remove(key, false);
                *listener = None;
                RegisterResult::Notified(tag)
            }

            State::Task(other_task) => {
                // Only replace the task if it's not the same as the one we're registering.
                if task.will_wake(other_task.as_task_ref()) {
                    entry.state().set(State::Task(other_task));
                } else {
                    entry.state().set(State::Task(task.into_task()));
                }

                RegisterResult::Registered
            }

            _ => {
                // Register the task.
                entry.state().set(State::Task(task.into_task()));
                RegisterResult::Registered
            }
        }
    }
}

pub(crate) enum Listener<T> {
    /// The listener has a node inside of the linked list.
    HasNode(NonZeroUsize),

    /// The listener has an entry in the queue that may or may not have a task waiting.
    Queued(Arc<TaskWaiting>),

    /// Eat the generic type for consistency.
    _EatGenericType(PhantomData<T>),
}

impl<T> fmt::Debug for Listener<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        match self {
            Self::HasNode(key) => f.debug_tuple("HasNode").field(key).finish(),
            Self::Queued(tw) => f.debug_tuple("Queued").field(tw).finish(),
            Self::_EatGenericType(_) => unreachable!(),
        }
    }
}

impl<T> Unpin for Listener<T> {}

impl<T> PartialEq for Listener<T> {
    fn eq(&self, other: &Self) -> bool {
        match (self, other) {
            (Self::HasNode(a), Self::HasNode(b)) => a == b,
            (Self::Queued(a), Self::Queued(b)) => Arc::ptr_eq(a, b),
            _ => false,
        }
    }
}

/// A simple mutex type that optimistically assumes that the lock is uncontended.
pub(crate) struct Mutex<T> {
    /// The inner value.
    value: UnsafeCell<T>,

    /// Whether the mutex is locked.
    locked: AtomicBool,
}

impl<T: fmt::Debug> fmt::Debug for Mutex<T> {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        if let Some(lock) = self.try_lock() {
            f.debug_tuple("Mutex").field(&*lock).finish()
        } else {
            f.write_str("Mutex { <locked> }")
        }
    }
}

impl<T> Mutex<T> {
    /// Create a new mutex.
    pub(crate) fn new(value: T) -> Self {
        Self {
            value: UnsafeCell::new(value),
            locked: AtomicBool::new(false),
        }
    }

    /// Lock the mutex.
    pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
        // Try to lock the mutex.
        if self
            .locked
            .compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
            .is_ok()
        {
            // We have successfully locked the mutex.
            Some(MutexGuard { mutex: self })
        } else {
            self.try_lock_slow()
        }
    }

    #[cold]
    fn try_lock_slow(&self) -> Option<MutexGuard<'_, T>> {
        // Assume that the contention is short-term.
        // Spin for a while to see if the mutex becomes unlocked.
        let mut spins = 100u32;

        loop {
            if self
                .locked
                .compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
                .is_ok()
            {
                // We have successfully locked the mutex.
                return Some(MutexGuard { mutex: self });
            }

            // Use atomic loads instead of compare-exchange.
            while self.locked.load(Ordering::Relaxed) {
                // Return None once we've exhausted the number of spins.
                spins = spins.checked_sub(1)?;
            }
        }
    }
}

pub(crate) struct MutexGuard<'a, T> {
    mutex: &'a Mutex<T>,
}

impl<'a, T> Drop for MutexGuard<'a, T> {
    fn drop(&mut self) {
        self.mutex.locked.store(false, Ordering::Release);
    }
}

impl<'a, T> ops::Deref for MutexGuard<'a, T> {
    type Target = T;

    fn deref(&self) -> &T {
        unsafe { &*self.mutex.value.get() }
    }
}

impl<'a, T> ops::DerefMut for MutexGuard<'a, T> {
    fn deref_mut(&mut self) -> &mut T {
        unsafe { &mut *self.mutex.value.get() }
    }
}

unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}

#[cfg(test)]
mod tests {
    use super::*;
    use crate::Task;

    #[cfg(target_family = "wasm")]
    use wasm_bindgen_test::wasm_bindgen_test as test;

    #[test]
    fn smoke_mutex() {
        let mutex = Mutex::new(0);

        {
            let mut guard = mutex.try_lock().unwrap();
            *guard += 1;
        }

        {
            let mut guard = mutex.try_lock().unwrap();
            *guard += 1;
        }

        let guard = mutex.try_lock().unwrap();
        assert_eq!(*guard, 2);
    }

    #[test]
    fn smoke_listener_slab() {
        let mut listeners = ListenerSlab::<()>::new();

        // Insert a few listeners.
        let key1 = listeners.insert(State::Created);
        let key2 = listeners.insert(State::Created);
        let key3 = listeners.insert(State::Created);

        assert_eq!(listeners.len, 3);
        assert_eq!(listeners.notified, 0);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key1));
        assert_eq!(listeners.start, Some(key1));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(None),
                next: Cell::new(Some(key2)),
            }
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key1)),
                next: Cell::new(Some(key3)),
            }
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key2)),
                next: Cell::new(None),
            }
        );

        // Remove one.
        assert_eq!(listeners.remove(key2, false), Some(State::Created));

        assert_eq!(listeners.len, 2);
        assert_eq!(listeners.notified, 0);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key1));
        assert_eq!(listeners.start, Some(key1));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(None),
                next: Cell::new(Some(key3)),
            }
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Empty(NonZeroUsize::new(4).unwrap())
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key1)),
                next: Cell::new(None),
            }
        );
    }

    #[test]
    fn listener_slab_notify() {
        let mut listeners = ListenerSlab::new();

        // Insert a few listeners.
        let key1 = listeners.insert(State::Created);
        let key2 = listeners.insert(State::Created);
        let key3 = listeners.insert(State::Created);

        // Notify one.
        listeners.notify(GenericNotify::new(1, true, || ()));

        assert_eq!(listeners.len, 3);
        assert_eq!(listeners.notified, 1);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key1));
        assert_eq!(listeners.start, Some(key2));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Listener {
                state: Cell::new(State::Notified {
                    additional: true,
                    tag: ()
                }),
                prev: Cell::new(None),
                next: Cell::new(Some(key2)),
            }
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key1)),
                next: Cell::new(Some(key3)),
            }
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key2)),
                next: Cell::new(None),
            }
        );

        // Remove the notified listener.
        assert_eq!(
            listeners.remove(key1, false),
            Some(State::Notified {
                additional: true,
                tag: ()
            })
        );

        assert_eq!(listeners.len, 2);
        assert_eq!(listeners.notified, 0);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key2));
        assert_eq!(listeners.start, Some(key2));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Empty(NonZeroUsize::new(4).unwrap())
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(None),
                next: Cell::new(Some(key3)),
            }
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key2)),
                next: Cell::new(None),
            }
        );
    }

    #[test]
    fn listener_slab_register() {
        let woken = Arc::new(AtomicBool::new(false));
        let waker = waker_fn::waker_fn({
            let woken = woken.clone();
            move || woken.store(true, Ordering::SeqCst)
        });

        let mut listeners = ListenerSlab::new();

        // Insert a few listeners.
        let key1 = listeners.insert(State::Created);
        let key2 = listeners.insert(State::Created);
        let key3 = listeners.insert(State::Created);

        // Register one.
        assert_eq!(
            listeners.register(
                Pin::new(&mut Some(Listener::HasNode(key2))),
                TaskRef::Waker(&waker)
            ),
            RegisterResult::Registered
        );

        assert_eq!(listeners.len, 3);
        assert_eq!(listeners.notified, 0);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key1));
        assert_eq!(listeners.start, Some(key1));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(None),
                next: Cell::new(Some(key2)),
            }
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Listener {
                state: Cell::new(State::Task(Task::Waker(waker.clone()))),
                prev: Cell::new(Some(key1)),
                next: Cell::new(Some(key3)),
            }
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key2)),
                next: Cell::new(None),
            }
        );

        // Notify the listener.
        listeners.notify(GenericNotify::new(2, false, || ()));

        assert_eq!(listeners.len, 3);
        assert_eq!(listeners.notified, 2);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key1));
        assert_eq!(listeners.start, Some(key3));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Listener {
                state: Cell::new(State::Notified {
                    additional: false,
                    tag: (),
                }),
                prev: Cell::new(None),
                next: Cell::new(Some(key2)),
            }
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Listener {
                state: Cell::new(State::Notified {
                    additional: false,
                    tag: (),
                }),
                prev: Cell::new(Some(key1)),
                next: Cell::new(Some(key3)),
            }
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key2)),
                next: Cell::new(None),
            }
        );

        assert!(woken.load(Ordering::SeqCst));
        assert_eq!(
            listeners.register(
                Pin::new(&mut Some(Listener::HasNode(key2))),
                TaskRef::Waker(&waker)
            ),
            RegisterResult::Notified(())
        );
    }

    #[test]
    fn listener_slab_notify_prop() {
        let woken = Arc::new(AtomicBool::new(false));
        let waker = waker_fn::waker_fn({
            let woken = woken.clone();
            move || woken.store(true, Ordering::SeqCst)
        });

        let mut listeners = ListenerSlab::new();

        // Insert a few listeners.
        let key1 = listeners.insert(State::Created);
        let key2 = listeners.insert(State::Created);
        let key3 = listeners.insert(State::Created);

        // Register one.
        assert_eq!(
            listeners.register(
                Pin::new(&mut Some(Listener::HasNode(key2))),
                TaskRef::Waker(&waker)
            ),
            RegisterResult::Registered
        );

        assert_eq!(listeners.len, 3);
        assert_eq!(listeners.notified, 0);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key1));
        assert_eq!(listeners.start, Some(key1));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(None),
                next: Cell::new(Some(key2)),
            }
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Listener {
                state: Cell::new(State::Task(Task::Waker(waker.clone()))),
                prev: Cell::new(Some(key1)),
                next: Cell::new(Some(key3)),
            }
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key2)),
                next: Cell::new(None),
            }
        );

        // Notify the first listener.
        listeners.notify(GenericNotify::new(1, false, || ()));

        assert_eq!(listeners.len, 3);
        assert_eq!(listeners.notified, 1);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key1));
        assert_eq!(listeners.start, Some(key2));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Listener {
                state: Cell::new(State::Notified {
                    additional: false,
                    tag: (),
                }),
                prev: Cell::new(None),
                next: Cell::new(Some(key2)),
            }
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Listener {
                state: Cell::new(State::Task(Task::Waker(waker.clone()))),
                prev: Cell::new(Some(key1)),
                next: Cell::new(Some(key3)),
            }
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key2)),
                next: Cell::new(None),
            }
        );

        // Calling notify again should not change anything.
        listeners.notify(GenericNotify::new(1, false, || ()));

        assert_eq!(listeners.len, 3);
        assert_eq!(listeners.notified, 1);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key1));
        assert_eq!(listeners.start, Some(key2));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Listener {
                state: Cell::new(State::Notified {
                    additional: false,
                    tag: (),
                }),
                prev: Cell::new(None),
                next: Cell::new(Some(key2)),
            }
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Listener {
                state: Cell::new(State::Task(Task::Waker(waker.clone()))),
                prev: Cell::new(Some(key1)),
                next: Cell::new(Some(key3)),
            }
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key2)),
                next: Cell::new(None),
            }
        );

        // Remove the first listener.
        assert_eq!(
            listeners.remove(key1, false),
            Some(State::Notified {
                additional: false,
                tag: ()
            })
        );

        assert_eq!(listeners.len, 2);
        assert_eq!(listeners.notified, 0);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key2));
        assert_eq!(listeners.start, Some(key2));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Empty(NonZeroUsize::new(4).unwrap())
        );
        assert_eq!(*listeners.listeners[2].prev(), Cell::new(None));
        assert_eq!(*listeners.listeners[2].next(), Cell::new(Some(key3)));
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key2)),
                next: Cell::new(None),
            }
        );

        // Notify the second listener.
        listeners.notify(GenericNotify::new(1, false, || ()));
        assert!(woken.load(Ordering::SeqCst));

        assert_eq!(listeners.len, 2);
        assert_eq!(listeners.notified, 1);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key2));
        assert_eq!(listeners.start, Some(key3));
        assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Empty(NonZeroUsize::new(4).unwrap())
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Listener {
                state: Cell::new(State::Notified {
                    additional: false,
                    tag: (),
                }),
                prev: Cell::new(None),
                next: Cell::new(Some(key3)),
            }
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Created),
                prev: Cell::new(Some(key2)),
                next: Cell::new(None),
            }
        );

        // Remove and propagate the second listener.
        assert_eq!(listeners.remove(key2, true), Some(State::NotifiedTaken));

        // The third listener should be notified.
        assert_eq!(listeners.len, 1);
        assert_eq!(listeners.notified, 1);
        assert_eq!(listeners.tail, Some(key3));
        assert_eq!(listeners.head, Some(key3));
        assert_eq!(listeners.start, None);
        assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap());
        assert_eq!(listeners.listeners[0], Entry::Sentinel);
        assert_eq!(
            listeners.listeners[1],
            Entry::Empty(NonZeroUsize::new(4).unwrap())
        );
        assert_eq!(
            listeners.listeners[2],
            Entry::Empty(NonZeroUsize::new(1).unwrap())
        );
        assert_eq!(
            listeners.listeners[3],
            Entry::Listener {
                state: Cell::new(State::Notified {
                    additional: false,
                    tag: (),
                }),
                prev: Cell::new(None),
                next: Cell::new(None),
            }
        );

        // Remove the third listener.
        assert_eq!(
            listeners.remove(key3, false),
            Some(State::Notified {
                additional: false,
                tag: ()
            })
        );
    }

    #[test]
    fn uncontended_inner() {
        let inner = crate::Inner::new();

        // Register two listeners.
        let (mut listener1, mut listener2, mut listener3) = (None, None, None);
        inner.insert(Pin::new(&mut listener1));
        inner.insert(Pin::new(&mut listener2));
        inner.insert(Pin::new(&mut listener3));

        assert_eq!(
            listener1,
            Some(Listener::HasNode(NonZeroUsize::new(1).unwrap()))
        );
        assert_eq!(
            listener2,
            Some(Listener::HasNode(NonZeroUsize::new(2).unwrap()))
        );

        // Register a waker in the second listener.
        let woken = Arc::new(AtomicBool::new(false));
        let waker = waker_fn::waker_fn({
            let woken = woken.clone();
            move || woken.store(true, Ordering::SeqCst)
        });
        assert_eq!(
            inner.register(Pin::new(&mut listener2), TaskRef::Waker(&waker)),
            RegisterResult::Registered
        );

        // Notify the first listener.
        inner.notify(GenericNotify::new(1, false, || ()));
        assert!(!woken.load(Ordering::SeqCst));

        // Another notify should do nothing.
        inner.notify(GenericNotify::new(1, false, || ()));
        assert!(!woken.load(Ordering::SeqCst));

        // Receive the notification.
        assert_eq!(
            inner.register(Pin::new(&mut listener1), TaskRef::Waker(&waker)),
            RegisterResult::Notified(())
        );

        // First listener is already removed.
        assert!(listener1.is_none());

        // Notify the second listener.
        inner.notify(GenericNotify::new(1, false, || ()));
        assert!(woken.load(Ordering::SeqCst));

        // Remove the second listener and propagate the notification.
        assert_eq!(
            inner.remove(Pin::new(&mut listener2), true),
            Some(State::NotifiedTaken)
        );

        // Second listener is already removed.
        assert!(listener2.is_none());

        // Third listener should be notified.
        assert_eq!(
            inner.register(Pin::new(&mut listener3), TaskRef::Waker(&waker)),
            RegisterResult::Notified(())
        );
    }
}