#[path = "no_std/node.rs"]
mod node;
use node::{Node, NothingProducer, TaskWaiting};
use crate::notify::{GenericNotify, Internal, Notification};
use crate::sync::atomic::{AtomicBool, Ordering};
use crate::sync::cell::{Cell, UnsafeCell};
use crate::sync::Arc;
use crate::{RegisterResult, State, Task, TaskRef};
use core::fmt;
use core::marker::PhantomData;
use core::mem;
use core::num::NonZeroUsize;
use core::ops;
use core::pin::Pin;
use alloc::vec::Vec;
impl<T> crate::Inner<T> {
fn try_lock(&self) -> Option<ListGuard<'_, T>> {
self.list.inner.try_lock().map(|guard| ListGuard {
inner: self,
guard: Some(guard),
tasks: alloc::vec![],
})
}
fn queue_update(&self) {
drop(self.try_lock());
}
pub(crate) fn needs_notification(&self, _limit: usize) -> bool {
true
}
pub(crate) fn insert(&self, mut listener: Pin<&mut Option<Listener<T>>>) {
if listener.as_ref().as_pin_ref().is_some() {
return;
}
match self.try_lock() {
Some(mut lock) => {
let key = lock.insert(State::Created);
*listener = Some(Listener::HasNode(key));
}
None => {
let (node, task_waiting) = Node::listener();
self.list.queue.push(node).unwrap();
*listener = Some(Listener::Queued(task_waiting));
self.queue_update();
}
}
}
pub(crate) fn remove(
&self,
mut listener: Pin<&mut Option<Listener<T>>>,
propagate: bool,
) -> Option<State<T>> {
loop {
let state = match listener.as_mut().take() {
Some(Listener::HasNode(key)) => {
match self.try_lock() {
Some(mut list) => {
list.remove(key, propagate)
}
None => {
let node = Node::RemoveListener {
listener: key,
propagate,
};
self.list.queue.push(node).unwrap();
self.queue_update();
None
}
}
}
Some(Listener::Queued(tw)) => {
if let Some(key) = tw.cancel() {
*listener = Some(Listener::HasNode(key));
continue;
}
None
}
None => None,
_ => unreachable!(),
};
return state;
}
}
#[cold]
pub(crate) fn notify(&self, notify: impl Notification<Tag = T>) -> usize {
match self.try_lock() {
Some(mut guard) => {
guard.notify(notify)
}
None => {
let node = Node::Notify(GenericNotify::new(
notify.count(Internal::new()),
notify.is_additional(Internal::new()),
NothingProducer::default(),
));
self.list.queue.push(node).unwrap();
self.queue_update();
0
}
}
}
pub(crate) fn register(
&self,
mut listener: Pin<&mut Option<Listener<T>>>,
task: TaskRef<'_>,
) -> RegisterResult<T> {
loop {
match listener.as_mut().take() {
Some(Listener::HasNode(key)) => {
*listener = Some(Listener::HasNode(key));
match self.try_lock() {
Some(mut guard) => {
return guard.register(listener, task);
}
None => {
let node = Node::Waiting(task.into_task());
self.list.queue.push(node).unwrap();
self.queue_update();
return RegisterResult::Registered;
}
}
}
Some(Listener::Queued(task_waiting)) => {
self.queue_update();
match task_waiting.status() {
Some(key) => {
assert!(key.get() != usize::MAX);
*listener = Some(Listener::HasNode(key));
}
None => {
task_waiting.register(task.into_task());
*listener = Some(Listener::Queued(task_waiting));
self.queue_update();
return RegisterResult::Registered;
}
}
}
None => return RegisterResult::NeverInserted,
_ => unreachable!(),
}
}
}
}
#[derive(Debug)]
pub(crate) struct List<T> {
inner: Mutex<ListenerSlab<T>>,
queue: concurrent_queue::ConcurrentQueue<Node<T>>,
}
impl<T> List<T> {
pub(super) fn new() -> List<T> {
List {
inner: Mutex::new(ListenerSlab::new()),
queue: concurrent_queue::ConcurrentQueue::unbounded(),
}
}
pub fn total_listeners(&self) -> Result<usize, &str> {
self.inner
.try_lock()
.map(|lock| Ok(lock.listeners.len()))
.unwrap_or(Err("<locked>"))
}
}
pub(crate) struct ListGuard<'a, T> {
pub(crate) inner: &'a crate::Inner<T>,
pub(crate) guard: Option<MutexGuard<'a, ListenerSlab<T>>>,
tasks: Vec<Task>,
}
impl<T> ListGuard<'_, T> {
#[cold]
fn process_nodes_slow(&mut self, start_node: Node<T>) {
let guard = self.guard.as_mut().unwrap();
self.tasks.extend(start_node.apply(guard));
while let Ok(node) = self.inner.list.queue.pop() {
self.tasks.extend(node.apply(guard));
}
}
#[inline]
fn process_nodes(&mut self) {
if let Ok(start_node) = self.inner.list.queue.pop() {
self.process_nodes_slow(start_node);
}
}
}
impl<T> ops::Deref for ListGuard<'_, T> {
type Target = ListenerSlab<T>;
fn deref(&self) -> &Self::Target {
self.guard.as_ref().unwrap()
}
}
impl<T> ops::DerefMut for ListGuard<'_, T> {
fn deref_mut(&mut self) -> &mut Self::Target {
self.guard.as_mut().unwrap()
}
}
impl<T> Drop for ListGuard<'_, T> {
fn drop(&mut self) {
while self.guard.is_some() {
self.process_nodes();
let list = self.guard.take().unwrap();
let notified = if list.notified < list.len {
list.notified
} else {
core::usize::MAX
};
self.inner.notified.store(notified, Ordering::Release);
drop(list);
for task in self.tasks.drain(..) {
task.wake();
}
if !self.inner.list.queue.is_empty() {
self.guard = self.inner.list.inner.try_lock();
}
}
}
}
enum Entry<T> {
Listener {
state: Cell<State<T>>,
prev: Cell<Option<NonZeroUsize>>,
next: Cell<Option<NonZeroUsize>>,
},
Empty(NonZeroUsize),
Sentinel,
}
struct TakenState<'a, T> {
slot: &'a Cell<State<T>>,
state: State<T>,
}
impl<T> Drop for TakenState<'_, T> {
fn drop(&mut self) {
self.slot
.set(mem::replace(&mut self.state, State::NotifiedTaken));
}
}
impl<T> fmt::Debug for TakenState<'_, T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt::Debug::fmt(&self.state, f)
}
}
impl<T: PartialEq> PartialEq for TakenState<'_, T> {
fn eq(&self, other: &Self) -> bool {
self.state == other.state
}
}
impl<'a, T> TakenState<'a, T> {
fn new(slot: &'a Cell<State<T>>) -> Self {
let state = slot.replace(State::NotifiedTaken);
Self { slot, state }
}
}
impl<T> fmt::Debug for Entry<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Entry::Listener { state, next, prev } => f
.debug_struct("Listener")
.field("state", &TakenState::new(state))
.field("prev", prev)
.field("next", next)
.finish(),
Entry::Empty(next) => f.debug_tuple("Empty").field(next).finish(),
Entry::Sentinel => f.debug_tuple("Sentinel").finish(),
}
}
}
impl<T: PartialEq> PartialEq for Entry<T> {
fn eq(&self, other: &Entry<T>) -> bool {
match (self, other) {
(
Self::Listener {
state: state1,
prev: prev1,
next: next1,
},
Self::Listener {
state: state2,
prev: prev2,
next: next2,
},
) => {
if TakenState::new(state1) != TakenState::new(state2) {
return false;
}
prev1.get() == prev2.get() && next1.get() == next2.get()
}
(Self::Empty(next1), Self::Empty(next2)) => next1 == next2,
(Self::Sentinel, Self::Sentinel) => true,
_ => false,
}
}
}
impl<T> Entry<T> {
fn state(&self) -> &Cell<State<T>> {
match self {
Entry::Listener { state, .. } => state,
_ => unreachable!(),
}
}
fn prev(&self) -> &Cell<Option<NonZeroUsize>> {
match self {
Entry::Listener { prev, .. } => prev,
_ => unreachable!(),
}
}
fn next(&self) -> &Cell<Option<NonZeroUsize>> {
match self {
Entry::Listener { next, .. } => next,
_ => unreachable!(),
}
}
}
pub(crate) struct ListenerSlab<T> {
listeners: Vec<Entry<T>>,
head: Option<NonZeroUsize>,
tail: Option<NonZeroUsize>,
start: Option<NonZeroUsize>,
notified: usize,
len: usize,
first_empty: NonZeroUsize,
}
impl<T> fmt::Debug for ListenerSlab<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("ListenerSlab")
.field("listeners", &self.listeners)
.field("head", &self.head)
.field("tail", &self.tail)
.field("start", &self.start)
.field("notified", &self.notified)
.field("len", &self.len)
.field("first_empty", &self.first_empty)
.finish()
}
}
impl<T> ListenerSlab<T> {
pub(crate) fn new() -> Self {
Self {
listeners: alloc::vec![Entry::Sentinel],
head: None,
tail: None,
start: None,
notified: 0,
len: 0,
first_empty: unsafe { NonZeroUsize::new_unchecked(1) },
}
}
pub(crate) fn insert(&mut self, state: State<T>) -> NonZeroUsize {
let key = {
let entry = Entry::Listener {
state: Cell::new(state),
prev: Cell::new(self.tail),
next: Cell::new(None),
};
let key = self.first_empty;
if self.first_empty.get() == self.listeners.len() {
self.listeners.push(entry);
self.first_empty = unsafe { NonZeroUsize::new_unchecked(self.listeners.len()) };
} else {
let slot = &mut self.listeners[key.get()];
let next = match mem::replace(slot, entry) {
Entry::Empty(next) => next,
_ => unreachable!(),
};
self.first_empty = next;
}
key
};
match mem::replace(&mut self.tail, Some(key)) {
None => self.head = Some(key),
Some(tail) => {
let tail = &self.listeners[tail.get()];
tail.next().set(Some(key));
}
}
if self.start.is_none() {
self.start = Some(key);
}
self.len += 1;
key
}
pub(crate) fn remove(&mut self, key: NonZeroUsize, propagate: bool) -> Option<State<T>> {
let entry = &self.listeners[key.get()];
let prev = entry.prev().get();
let next = entry.next().get();
match prev {
None => self.head = next,
Some(p) => self.listeners[p.get()].next().set(next),
}
match next {
None => self.tail = prev,
Some(n) => self.listeners[n.get()].prev().set(prev),
}
if self.start == Some(key) {
self.start = next;
}
let entry = mem::replace(
&mut self.listeners[key.get()],
Entry::Empty(self.first_empty),
);
self.first_empty = key;
let mut state = match entry {
Entry::Listener { state, .. } => state.into_inner(),
_ => unreachable!(),
};
if state.is_notified() {
self.notified = self.notified.saturating_sub(1);
if propagate {
let state = mem::replace(&mut state, State::NotifiedTaken);
if let State::Notified { tag, additional } = state {
let tags = {
let mut tag = Some(tag);
move || tag.take().expect("called more than once")
};
self.notify(GenericNotify::new(1, additional, tags));
}
}
}
self.len -= 1;
Some(state)
}
#[cold]
pub(crate) fn notify(&mut self, mut notify: impl Notification<Tag = T>) -> usize {
let mut n = notify.count(Internal::new());
let is_additional = notify.is_additional(Internal::new());
if !is_additional {
if n <= self.notified {
return 0;
}
n -= self.notified;
}
let original_count = n;
while n > 0 {
n -= 1;
match self.start {
None => return original_count - n - 1,
Some(e) => {
let entry = &self.listeners[e.get()];
self.start = entry.next().get();
let tag = notify.next_tag(Internal::new());
if let State::Task(task) = entry.state().replace(State::Notified {
tag,
additional: is_additional,
}) {
task.wake();
}
self.notified += 1;
}
}
}
original_count - n
}
pub(crate) fn register(
&mut self,
mut listener: Pin<&mut Option<Listener<T>>>,
task: TaskRef<'_>,
) -> RegisterResult<T> {
let key = match *listener {
Some(Listener::HasNode(key)) => key,
_ => return RegisterResult::NeverInserted,
};
let entry = &self.listeners[key.get()];
match entry.state().replace(State::NotifiedTaken) {
State::Notified { tag, .. } => {
self.remove(key, false);
*listener = None;
RegisterResult::Notified(tag)
}
State::Task(other_task) => {
if task.will_wake(other_task.as_task_ref()) {
entry.state().set(State::Task(other_task));
} else {
entry.state().set(State::Task(task.into_task()));
}
RegisterResult::Registered
}
_ => {
entry.state().set(State::Task(task.into_task()));
RegisterResult::Registered
}
}
}
}
pub(crate) enum Listener<T> {
HasNode(NonZeroUsize),
Queued(Arc<TaskWaiting>),
_EatGenericType(PhantomData<T>),
}
impl<T> fmt::Debug for Listener<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::HasNode(key) => f.debug_tuple("HasNode").field(key).finish(),
Self::Queued(tw) => f.debug_tuple("Queued").field(tw).finish(),
Self::_EatGenericType(_) => unreachable!(),
}
}
}
impl<T> Unpin for Listener<T> {}
impl<T> PartialEq for Listener<T> {
fn eq(&self, other: &Self) -> bool {
match (self, other) {
(Self::HasNode(a), Self::HasNode(b)) => a == b,
(Self::Queued(a), Self::Queued(b)) => Arc::ptr_eq(a, b),
_ => false,
}
}
}
pub(crate) struct Mutex<T> {
value: UnsafeCell<T>,
locked: AtomicBool,
}
impl<T: fmt::Debug> fmt::Debug for Mutex<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(lock) = self.try_lock() {
f.debug_tuple("Mutex").field(&*lock).finish()
} else {
f.write_str("Mutex { <locked> }")
}
}
}
impl<T> Mutex<T> {
pub(crate) fn new(value: T) -> Self {
Self {
value: UnsafeCell::new(value),
locked: AtomicBool::new(false),
}
}
pub(crate) fn try_lock(&self) -> Option<MutexGuard<'_, T>> {
if self
.locked
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
Some(MutexGuard { mutex: self })
} else {
self.try_lock_slow()
}
}
#[cold]
fn try_lock_slow(&self) -> Option<MutexGuard<'_, T>> {
let mut spins = 100u32;
loop {
if self
.locked
.compare_exchange_weak(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
return Some(MutexGuard { mutex: self });
}
while self.locked.load(Ordering::Relaxed) {
spins = spins.checked_sub(1)?;
}
}
}
}
pub(crate) struct MutexGuard<'a, T> {
mutex: &'a Mutex<T>,
}
impl<'a, T> Drop for MutexGuard<'a, T> {
fn drop(&mut self) {
self.mutex.locked.store(false, Ordering::Release);
}
}
impl<'a, T> ops::Deref for MutexGuard<'a, T> {
type Target = T;
fn deref(&self) -> &T {
unsafe { &*self.mutex.value.get() }
}
}
impl<'a, T> ops::DerefMut for MutexGuard<'a, T> {
fn deref_mut(&mut self) -> &mut T {
unsafe { &mut *self.mutex.value.get() }
}
}
unsafe impl<T: Send> Send for Mutex<T> {}
unsafe impl<T: Send> Sync for Mutex<T> {}
#[cfg(test)]
mod tests {
use super::*;
use crate::Task;
#[cfg(target_family = "wasm")]
use wasm_bindgen_test::wasm_bindgen_test as test;
#[test]
fn smoke_mutex() {
let mutex = Mutex::new(0);
{
let mut guard = mutex.try_lock().unwrap();
*guard += 1;
}
{
let mut guard = mutex.try_lock().unwrap();
*guard += 1;
}
let guard = mutex.try_lock().unwrap();
assert_eq!(*guard, 2);
}
#[test]
fn smoke_listener_slab() {
let mut listeners = ListenerSlab::<()>::new();
let key1 = listeners.insert(State::Created);
let key2 = listeners.insert(State::Created);
let key3 = listeners.insert(State::Created);
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key1));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
assert_eq!(listeners.remove(key2, false), Some(State::Created));
assert_eq!(listeners.len, 2);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key1));
assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Empty(NonZeroUsize::new(4).unwrap())
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key1)),
next: Cell::new(None),
}
);
}
#[test]
fn listener_slab_notify() {
let mut listeners = ListenerSlab::new();
let key1 = listeners.insert(State::Created);
let key2 = listeners.insert(State::Created);
let key3 = listeners.insert(State::Created);
listeners.notify(GenericNotify::new(1, true, || ()));
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 1);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key2));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Notified {
additional: true,
tag: ()
}),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
assert_eq!(
listeners.remove(key1, false),
Some(State::Notified {
additional: true,
tag: ()
})
);
assert_eq!(listeners.len, 2);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key2));
assert_eq!(listeners.start, Some(key2));
assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Empty(NonZeroUsize::new(4).unwrap())
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
}
#[test]
fn listener_slab_register() {
let woken = Arc::new(AtomicBool::new(false));
let waker = waker_fn::waker_fn({
let woken = woken.clone();
move || woken.store(true, Ordering::SeqCst)
});
let mut listeners = ListenerSlab::new();
let key1 = listeners.insert(State::Created);
let key2 = listeners.insert(State::Created);
let key3 = listeners.insert(State::Created);
assert_eq!(
listeners.register(
Pin::new(&mut Some(Listener::HasNode(key2))),
TaskRef::Waker(&waker)
),
RegisterResult::Registered
);
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key1));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Task(Task::Waker(waker.clone()))),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
listeners.notify(GenericNotify::new(2, false, || ()));
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 2);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key3));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
assert!(woken.load(Ordering::SeqCst));
assert_eq!(
listeners.register(
Pin::new(&mut Some(Listener::HasNode(key2))),
TaskRef::Waker(&waker)
),
RegisterResult::Notified(())
);
}
#[test]
fn listener_slab_notify_prop() {
let woken = Arc::new(AtomicBool::new(false));
let waker = waker_fn::waker_fn({
let woken = woken.clone();
move || woken.store(true, Ordering::SeqCst)
});
let mut listeners = ListenerSlab::new();
let key1 = listeners.insert(State::Created);
let key2 = listeners.insert(State::Created);
let key3 = listeners.insert(State::Created);
assert_eq!(
listeners.register(
Pin::new(&mut Some(Listener::HasNode(key2))),
TaskRef::Waker(&waker)
),
RegisterResult::Registered
);
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key1));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Task(Task::Waker(waker.clone()))),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
listeners.notify(GenericNotify::new(1, false, || ()));
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 1);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key2));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Task(Task::Waker(waker.clone()))),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
listeners.notify(GenericNotify::new(1, false, || ()));
assert_eq!(listeners.len, 3);
assert_eq!(listeners.notified, 1);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key1));
assert_eq!(listeners.start, Some(key2));
assert_eq!(listeners.first_empty, NonZeroUsize::new(4).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(None),
next: Cell::new(Some(key2)),
}
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Task(Task::Waker(waker.clone()))),
prev: Cell::new(Some(key1)),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
assert_eq!(
listeners.remove(key1, false),
Some(State::Notified {
additional: false,
tag: ()
})
);
assert_eq!(listeners.len, 2);
assert_eq!(listeners.notified, 0);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key2));
assert_eq!(listeners.start, Some(key2));
assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Empty(NonZeroUsize::new(4).unwrap())
);
assert_eq!(*listeners.listeners[2].prev(), Cell::new(None));
assert_eq!(*listeners.listeners[2].next(), Cell::new(Some(key3)));
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
listeners.notify(GenericNotify::new(1, false, || ()));
assert!(woken.load(Ordering::SeqCst));
assert_eq!(listeners.len, 2);
assert_eq!(listeners.notified, 1);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key2));
assert_eq!(listeners.start, Some(key3));
assert_eq!(listeners.first_empty, NonZeroUsize::new(1).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Empty(NonZeroUsize::new(4).unwrap())
);
assert_eq!(
listeners.listeners[2],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(None),
next: Cell::new(Some(key3)),
}
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Created),
prev: Cell::new(Some(key2)),
next: Cell::new(None),
}
);
assert_eq!(listeners.remove(key2, true), Some(State::NotifiedTaken));
assert_eq!(listeners.len, 1);
assert_eq!(listeners.notified, 1);
assert_eq!(listeners.tail, Some(key3));
assert_eq!(listeners.head, Some(key3));
assert_eq!(listeners.start, None);
assert_eq!(listeners.first_empty, NonZeroUsize::new(2).unwrap());
assert_eq!(listeners.listeners[0], Entry::Sentinel);
assert_eq!(
listeners.listeners[1],
Entry::Empty(NonZeroUsize::new(4).unwrap())
);
assert_eq!(
listeners.listeners[2],
Entry::Empty(NonZeroUsize::new(1).unwrap())
);
assert_eq!(
listeners.listeners[3],
Entry::Listener {
state: Cell::new(State::Notified {
additional: false,
tag: (),
}),
prev: Cell::new(None),
next: Cell::new(None),
}
);
assert_eq!(
listeners.remove(key3, false),
Some(State::Notified {
additional: false,
tag: ()
})
);
}
#[test]
fn uncontended_inner() {
let inner = crate::Inner::new();
let (mut listener1, mut listener2, mut listener3) = (None, None, None);
inner.insert(Pin::new(&mut listener1));
inner.insert(Pin::new(&mut listener2));
inner.insert(Pin::new(&mut listener3));
assert_eq!(
listener1,
Some(Listener::HasNode(NonZeroUsize::new(1).unwrap()))
);
assert_eq!(
listener2,
Some(Listener::HasNode(NonZeroUsize::new(2).unwrap()))
);
let woken = Arc::new(AtomicBool::new(false));
let waker = waker_fn::waker_fn({
let woken = woken.clone();
move || woken.store(true, Ordering::SeqCst)
});
assert_eq!(
inner.register(Pin::new(&mut listener2), TaskRef::Waker(&waker)),
RegisterResult::Registered
);
inner.notify(GenericNotify::new(1, false, || ()));
assert!(!woken.load(Ordering::SeqCst));
inner.notify(GenericNotify::new(1, false, || ()));
assert!(!woken.load(Ordering::SeqCst));
assert_eq!(
inner.register(Pin::new(&mut listener1), TaskRef::Waker(&waker)),
RegisterResult::Notified(())
);
assert!(listener1.is_none());
inner.notify(GenericNotify::new(1, false, || ()));
assert!(woken.load(Ordering::SeqCst));
assert_eq!(
inner.remove(Pin::new(&mut listener2), true),
Some(State::NotifiedTaken)
);
assert!(listener2.is_none());
assert_eq!(
inner.register(Pin::new(&mut listener3), TaskRef::Waker(&waker)),
RegisterResult::Notified(())
);
}
}