use parking_lot::Mutex;
use smallvec::SmallVec;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
use std::task::{Context, Poll, Waker};
#[derive(Debug)]
pub struct Notify {
generation: AtomicU64,
stored_notifications: AtomicUsize,
waiters: Mutex<WaiterSlab>,
}
#[derive(Debug)]
struct WaiterSlab {
entries: Vec<WaiterEntry>,
free_slots: SmallVec<[usize; 4]>,
active: usize,
scan_start: usize,
}
#[derive(Debug)]
struct WaiterEntry {
waker: Option<Waker>,
notified: bool,
generation: u64,
}
impl WaiterSlab {
#[inline]
fn new() -> Self {
Self {
entries: Vec::new(),
free_slots: SmallVec::new(),
active: 0,
scan_start: 0,
}
}
#[inline]
fn insert(&mut self, entry: WaiterEntry) -> usize {
let is_active = entry.waker.is_some();
let index = loop {
if let Some(idx) = self.free_slots.pop() {
if idx < self.entries.len() {
self.entries[idx] = entry;
break idx;
}
} else {
let idx = self.entries.len();
self.entries.push(entry);
break idx;
}
};
if is_active {
self.active += 1;
if index < self.scan_start {
self.scan_start = index;
}
}
index
}
#[inline]
fn remove(&mut self, index: usize) {
if index < self.entries.len() {
if self.entries[index].waker.is_some() {
self.active -= 1;
}
self.entries[index].waker = None;
self.entries[index].notified = false;
self.free_slots.push(index);
}
while self
.entries
.last()
.is_some_and(|e| e.waker.is_none() && !e.notified)
{
self.entries.pop();
}
}
#[inline]
fn active_count(&self) -> usize {
self.active
}
}
impl Notify {
#[inline]
#[must_use]
pub fn new() -> Self {
Self {
generation: AtomicU64::new(0),
stored_notifications: AtomicUsize::new(0),
waiters: Mutex::new(WaiterSlab::new()),
}
}
#[inline]
pub fn notified(&self) -> Notified<'_> {
Notified {
notify: self,
state: NotifiedState::Init,
waiter_index: None,
initial_generation: self.generation.load(Ordering::Acquire),
}
}
#[inline]
pub fn notify_one(&self) {
let waker_to_wake = {
let mut waiters = self.waiters.lock();
let mut found_waker = None;
let start = waiters.scan_start;
for i in start..waiters.entries.len() {
let entry = &mut waiters.entries[i];
if !entry.notified && entry.waker.is_some() {
entry.notified = true;
found_waker = entry.waker.take();
waiters.scan_start = i + 1;
break;
}
}
if found_waker.is_some() {
waiters.active -= 1;
drop(waiters);
found_waker
} else {
waiters.scan_start = waiters.entries.len();
self.stored_notifications.fetch_add(1, Ordering::Release);
drop(waiters);
None
}
};
if let Some(waker) = waker_to_wake {
waker.wake();
}
}
#[inline]
pub fn notify_waiters(&self) {
let new_generation = self.generation.fetch_add(1, Ordering::Release) + 1;
let wakers: SmallVec<[Waker; 8]> = {
let mut waiters = self.waiters.lock();
let wakers: SmallVec<[Waker; 8]> = waiters
.entries
.iter_mut()
.filter_map(|entry| {
if entry.generation < new_generation && entry.waker.is_some() {
entry.generation = new_generation;
entry.notified = true;
return entry.waker.take();
}
None
})
.collect();
waiters.active -= wakers.len();
wakers
};
for waker in wakers {
waker.wake();
}
}
#[inline]
#[must_use]
pub fn waiter_count(&self) -> usize {
let waiters = self.waiters.lock();
waiters.active_count()
}
fn pass_baton(&self, mut waiters: parking_lot::MutexGuard<'_, WaiterSlab>) {
let start = waiters.scan_start;
for i in start..waiters.entries.len() {
let entry = &mut waiters.entries[i];
if !entry.notified && entry.waker.is_some() {
entry.notified = true;
if let Some(waker) = entry.waker.take() {
waiters.active -= 1;
waiters.scan_start = i + 1;
drop(waiters);
waker.wake();
return;
}
}
}
waiters.scan_start = waiters.entries.len();
self.stored_notifications.fetch_add(1, Ordering::Release);
}
#[inline]
fn pass_baton_if_waiter_exists(mut waiters: parking_lot::MutexGuard<'_, WaiterSlab>) {
let start = waiters.scan_start;
for i in start..waiters.entries.len() {
let entry = &mut waiters.entries[i];
if !entry.notified && entry.waker.is_some() {
entry.notified = true;
if let Some(waker) = entry.waker.take() {
waiters.active -= 1;
waiters.scan_start = i + 1;
drop(waiters);
waker.wake();
return;
}
}
}
waiters.scan_start = waiters.entries.len();
}
}
impl Default for Notify {
#[inline]
fn default() -> Self {
Self::new()
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
enum NotifiedState {
Init,
Waiting,
Done,
}
#[derive(Debug)]
pub struct Notified<'a> {
notify: &'a Notify,
state: NotifiedState,
waiter_index: Option<usize>,
initial_generation: u64,
}
impl Notified<'_> {
#[inline]
fn mark_done(&mut self) -> Poll<()> {
self.state = NotifiedState::Done;
Poll::Ready(())
}
#[inline]
fn try_consume_stored_notification(&self) -> bool {
let mut stored = self.notify.stored_notifications.load(Ordering::Acquire);
while stored > 0 {
match self.notify.stored_notifications.compare_exchange_weak(
stored,
stored - 1,
Ordering::Acquire,
Ordering::Relaxed,
) {
Ok(_) => return true,
Err(actual) => stored = actual,
}
}
false
}
#[inline]
fn poll_init(&mut self, cx: &Context<'_>) -> Poll<()> {
let observed_generation = self.notify.generation.load(Ordering::Acquire);
self.initial_generation = observed_generation;
if self.try_consume_stored_notification() {
return self.mark_done();
}
let mut waiters = self.notify.waiters.lock();
let current_gen = self.notify.generation.load(Ordering::Acquire);
if current_gen != observed_generation {
drop(waiters);
return self.mark_done();
}
if self.try_consume_stored_notification() {
drop(waiters);
return self.mark_done();
}
let index = waiters.insert(WaiterEntry {
waker: Some(cx.waker().clone()),
notified: false,
generation: observed_generation,
});
self.waiter_index = Some(index);
self.state = NotifiedState::Waiting;
drop(waiters);
Poll::Pending
}
#[inline]
fn poll_waiting(&mut self, cx: &Context<'_>) -> Poll<()> {
let current_gen = self.notify.generation.load(Ordering::Acquire);
let gen_changed = current_gen != self.initial_generation;
if let Some(index) = self.waiter_index {
let mut waiters = self.notify.waiters.lock();
let is_gen_changed = gen_changed || {
let new_gen = self.notify.generation.load(Ordering::Acquire);
new_gen != self.initial_generation
};
if index < waiters.entries.len() {
let entry_notified = waiters.entries[index].notified;
if is_gen_changed {
waiters.remove(index);
self.waiter_index = None;
drop(waiters);
return self.mark_done();
}
if entry_notified {
waiters.remove(index);
drop(waiters);
self.waiter_index = None;
return self.mark_done();
}
match &mut waiters.entries[index].waker {
Some(existing) if existing.will_wake(cx.waker()) => {}
Some(existing) => existing.clone_from(cx.waker()),
None => {
unreachable!(
"waker is never None while notified is false for a live Notified future"
);
}
}
} else {
unreachable!("waiter entry missing before removal");
}
} else if gen_changed {
return self.mark_done();
}
Poll::Pending
}
}
impl Future for Notified<'_> {
type Output = ();
#[inline]
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
match self.state {
NotifiedState::Init => self.poll_init(cx),
NotifiedState::Waiting => self.poll_waiting(cx),
NotifiedState::Done => panic!("Notified polled after completion"),
}
}
}
impl Drop for Notified<'_> {
fn drop(&mut self) {
if self.state == NotifiedState::Waiting {
if let Some(index) = self.waiter_index.take() {
let mut waiters = self.notify.waiters.lock();
let generation_advanced =
self.notify.generation.load(Ordering::Acquire) != self.initial_generation;
let (was_notified, notified_generation) = if index < waiters.entries.len() {
let entry = &waiters.entries[index];
(entry.notified, entry.generation)
} else {
(false, self.initial_generation)
};
waiters.remove(index);
if was_notified {
let was_broadcast_notify = notified_generation != self.initial_generation;
if was_broadcast_notify {
return;
}
if generation_advanced {
Notify::pass_baton_if_waiter_exists(waiters);
} else {
self.notify.pass_baton(waiters);
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::test_utils::init_test_logging;
use std::sync::Arc;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn noop_waker() -> Waker {
std::task::Waker::noop().clone()
}
fn poll_once<F>(fut: &mut F) -> Poll<F::Output>
where
F: Future + Unpin,
{
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
Pin::new(fut).poll(&mut cx)
}
fn init_test(name: &str) {
init_test_logging();
crate::test_phase!(name);
}
fn broadcast_with_middle_hole_signature(
broadcasts: usize,
) -> ([bool; 2], usize, usize, usize, bool) {
let notify = Notify::new();
let mut fut1 = notify.notified();
let mut fut2 = notify.notified();
let mut fut3 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
assert!(poll_once(&mut fut2).is_pending());
assert!(poll_once(&mut fut3).is_pending());
drop(fut2);
for _ in 0..broadcasts {
notify.notify_waiters();
}
let ready_pair = [
poll_once(&mut fut1).is_ready(),
poll_once(&mut fut3).is_ready(),
];
drop(fut1);
drop(fut3);
let waiter_count = notify.waiter_count();
let entries_len = notify.waiters.lock().entries.len();
let stored = notify.stored_notifications.load(Ordering::Acquire);
let mut late = notify.notified();
let late_pending = poll_once(&mut late).is_pending();
drop(late);
(ready_pair, waiter_count, entries_len, stored, late_pending)
}
fn broadcast_then_notify_one_signature(broadcasts: usize) -> ([bool; 2], usize, bool, bool) {
let notify = Notify::new();
let mut fut1 = notify.notified();
let mut fut2 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
assert!(poll_once(&mut fut2).is_pending());
for _ in 0..broadcasts {
notify.notify_waiters();
}
let ready_pair = [
poll_once(&mut fut1).is_ready(),
poll_once(&mut fut2).is_ready(),
];
drop(fut1);
drop(fut2);
notify.notify_one();
let stored_before_consume = notify.stored_notifications.load(Ordering::Acquire);
let mut stored_consumer = notify.notified();
let stored_consumer_ready = poll_once(&mut stored_consumer).is_ready();
drop(stored_consumer);
let mut trailing_waiter = notify.notified();
let trailing_waiter_pending = poll_once(&mut trailing_waiter).is_pending();
drop(trailing_waiter);
(
ready_pair,
stored_before_consume,
stored_consumer_ready,
trailing_waiter_pending,
)
}
fn repoll_then_notify_one_signature(extra_repolls: usize) -> ([bool; 3], usize) {
let notify = Notify::new();
let mut fut1 = notify.notified();
let mut fut2 = notify.notified();
let mut fut3 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
for _ in 0..extra_repolls {
assert!(poll_once(&mut fut1).is_pending());
}
assert!(poll_once(&mut fut2).is_pending());
assert!(poll_once(&mut fut3).is_pending());
notify.notify_one();
let ready = [
poll_once(&mut fut1).is_ready(),
poll_once(&mut fut2).is_ready(),
poll_once(&mut fut3).is_ready(),
];
drop(fut1);
drop(fut2);
drop(fut3);
let stored = notify.stored_notifications.load(Ordering::Acquire);
(ready, stored)
}
fn notify_one_with_middle_cancel_signature(
cancel_before_first_notify: bool,
) -> ([bool; 2], usize, bool) {
let notify = Notify::new();
let mut fut1 = notify.notified();
let mut fut2 = notify.notified();
let mut fut3 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
assert!(poll_once(&mut fut2).is_pending());
assert!(poll_once(&mut fut3).is_pending());
if cancel_before_first_notify {
drop(fut2);
notify.notify_one();
notify.notify_one();
} else {
notify.notify_one();
drop(fut2);
notify.notify_one();
}
let ready_pair = [
poll_once(&mut fut1).is_ready(),
poll_once(&mut fut3).is_ready(),
];
drop(fut1);
drop(fut3);
let stored = notify.stored_notifications.load(Ordering::Acquire);
let mut late = notify.notified();
let late_pending = poll_once(&mut late).is_pending();
drop(late);
(ready_pair, stored, late_pending)
}
#[test]
fn notify_one_wakes_waiter() {
init_test("notify_one_wakes_waiter");
let notify = Arc::new(Notify::new());
let notify2 = Arc::clone(¬ify);
let handle = thread::spawn(move || {
thread::sleep(Duration::from_millis(50));
notify2.notify_one();
});
let mut fut = notify.notified();
let pending = poll_once(&mut fut).is_pending();
crate::assert_with_log!(pending, "first poll pending", true, pending);
handle.join().expect("thread panicked");
let ready = poll_once(&mut fut).is_ready();
crate::assert_with_log!(ready, "ready after notify", true, ready);
crate::test_complete!("notify_one_wakes_waiter");
}
#[test]
fn notified_repoll_panics_after_notify_one_completion() {
init_test("notified_repoll_panics_after_notify_one_completion");
let notify = Notify::new();
let mut fut = notify.notified();
assert!(poll_once(&mut fut).is_pending());
notify.notify_one();
assert!(poll_once(&mut fut).is_ready());
let repoll = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = poll_once(&mut fut);
}));
crate::assert_with_log!(repoll.is_err(), "repoll panics", true, repoll.is_err());
crate::test_complete!("notified_repoll_panics_after_notify_one_completion");
}
#[test]
fn notify_before_wait_is_consumed() {
init_test("notify_before_wait_is_consumed");
let notify = Notify::new();
notify.notify_one();
let mut fut = notify.notified();
let ready = poll_once(&mut fut).is_ready();
crate::assert_with_log!(ready, "ready immediately", true, ready);
crate::test_complete!("notify_before_wait_is_consumed");
}
#[test]
fn notified_repoll_panics_after_stored_notify_completion() {
init_test("notified_repoll_panics_after_stored_notify_completion");
let notify = Notify::new();
notify.notify_one();
let mut fut = notify.notified();
assert!(poll_once(&mut fut).is_ready());
let repoll = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
let _ = poll_once(&mut fut);
}));
crate::assert_with_log!(repoll.is_err(), "repoll panics", true, repoll.is_err());
crate::test_complete!("notified_repoll_panics_after_stored_notify_completion");
}
#[test]
fn notify_one_lost_if_followed_by_broadcast_and_cancel() {
init_test("notify_one_lost_if_followed_by_broadcast_and_cancel");
let notify = Notify::new();
let mut waiter_a = notify.notified();
let mut waiter_b = notify.notified();
assert!(poll_once(&mut waiter_a).is_pending());
assert!(poll_once(&mut waiter_b).is_pending());
notify.notify_one();
notify.notify_waiters();
let mut waiter_c = notify.notified();
assert!(poll_once(&mut waiter_c).is_pending());
drop(waiter_a);
assert!(
poll_once(&mut waiter_c).is_ready(),
"Waiter C should be woken by the passed baton!"
);
crate::test_complete!("notify_one_lost_if_followed_by_broadcast_and_cancel");
}
#[test]
fn notify_one_lost_if_followed_by_broadcast_and_poll() {
init_test("notify_one_lost_if_followed_by_broadcast_and_poll");
let notify = Notify::new();
let mut waiter_a = notify.notified();
let mut waiter_b = notify.notified();
assert!(poll_once(&mut waiter_a).is_pending());
assert!(poll_once(&mut waiter_b).is_pending());
notify.notify_one();
notify.notify_waiters();
let mut waiter_c = notify.notified();
assert!(poll_once(&mut waiter_c).is_pending());
assert!(poll_once(&mut waiter_a).is_ready());
assert!(poll_once(&mut waiter_b).is_ready());
assert!(
poll_once(&mut waiter_c).is_pending(),
"Waiter C should remain pending since A consumed the notify_one baton"
);
crate::test_complete!("notify_one_lost_if_followed_by_broadcast_and_poll");
}
#[test]
fn notify_waiters_wakes_all() {
init_test("notify_waiters_wakes_all");
let notify = Arc::new(Notify::new());
let completed = Arc::new(std::sync::atomic::AtomicUsize::new(0));
let mut handles = Vec::new();
for _ in 0..3 {
let notify = Arc::clone(¬ify);
let completed = Arc::clone(&completed);
handles.push(thread::spawn(move || {
let mut fut = notify.notified();
loop {
if poll_once(&mut fut).is_ready() {
completed.fetch_add(1, Ordering::SeqCst);
return;
}
thread::sleep(Duration::from_millis(10));
}
}));
}
thread::sleep(Duration::from_millis(100));
notify.notify_waiters();
for handle in handles {
handle.join().expect("thread panicked");
}
let count = completed.load(Ordering::SeqCst);
crate::assert_with_log!(count == 3, "completed count", 3usize, count);
crate::test_complete!("notify_waiters_wakes_all");
}
#[test]
fn test_notify_no_waiters() {
init_test("test_notify_no_waiters");
let notify = Notify::new();
notify.notify_one();
notify.notify_waiters();
let mut fut = notify.notified();
let ready = poll_once(&mut fut).is_ready();
crate::assert_with_log!(ready, "stored notify consumed", true, ready);
crate::test_complete!("test_notify_no_waiters");
}
#[test]
fn test_notify_waiter_count() {
init_test("test_notify_waiter_count");
let notify = Notify::new();
let count0 = notify.waiter_count();
crate::assert_with_log!(count0 == 0, "initial count", 0usize, count0);
let mut fut = notify.notified();
let pending = poll_once(&mut fut).is_pending();
crate::assert_with_log!(pending, "should be pending", true, pending);
let count1 = notify.waiter_count();
crate::assert_with_log!(count1 == 1, "one waiter", 1usize, count1);
notify.notify_one();
let ready = poll_once(&mut fut).is_ready();
crate::assert_with_log!(ready, "should be ready", true, ready);
drop(fut);
let count2 = notify.waiter_count();
crate::assert_with_log!(count2 == 0, "no waiters after", 0usize, count2);
crate::test_complete!("test_notify_waiter_count");
}
#[test]
fn test_notify_drop_cleanup() {
init_test("test_notify_drop_cleanup");
let notify = Notify::new();
{
let mut fut = notify.notified();
let _ = poll_once(&mut fut);
}
let count = notify.waiter_count();
crate::assert_with_log!(count == 0, "cleaned up", 0usize, count);
crate::test_complete!("test_notify_drop_cleanup");
}
#[test]
fn test_notify_multiple_stored() {
init_test("test_notify_multiple_stored");
let notify = Notify::new();
notify.notify_one();
notify.notify_one();
let mut fut1 = notify.notified();
let ready1 = poll_once(&mut fut1).is_ready();
crate::assert_with_log!(ready1, "first ready", true, ready1);
let mut fut2 = notify.notified();
let ready2 = poll_once(&mut fut2).is_ready();
crate::assert_with_log!(ready2, "second ready", true, ready2);
let mut fut3 = notify.notified();
let pending = poll_once(&mut fut3).is_pending();
crate::assert_with_log!(pending, "third pending", true, pending);
crate::test_complete!("test_notify_multiple_stored");
}
#[test]
fn test_cancelled_middle_waiter_no_leak() {
init_test("test_cancelled_middle_waiter_no_leak");
let notify = Notify::new();
let mut fut1 = notify.notified();
let mut fut2 = notify.notified();
let mut fut3 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
assert!(poll_once(&mut fut2).is_pending());
assert!(poll_once(&mut fut3).is_pending());
let count = notify.waiter_count();
crate::assert_with_log!(count == 3, "three waiters", 3usize, count);
drop(fut2);
let count = notify.waiter_count();
crate::assert_with_log!(count == 2, "two waiters after middle drop", 2usize, count);
let entries_len = notify.waiters.lock().entries.len();
crate::assert_with_log!(entries_len <= 3, "entries bounded", true, entries_len <= 3);
drop(fut1);
drop(fut3);
let count = notify.waiter_count();
crate::assert_with_log!(count == 0, "no waiters after all drops", 0usize, count);
let entries_len = notify.waiters.lock().entries.len();
crate::assert_with_log!(entries_len == 0, "entries empty", 0usize, entries_len);
let mut fut_a = notify.notified();
assert!(poll_once(&mut fut_a).is_pending());
let entries_len = notify.waiters.lock().entries.len();
crate::assert_with_log!(entries_len == 1, "reused slot", 1usize, entries_len);
drop(fut_a);
crate::test_complete!("test_cancelled_middle_waiter_no_leak");
}
#[test]
fn test_repeated_cancel_no_growth() {
init_test("test_repeated_cancel_no_growth");
let notify = Notify::new();
for _ in 0..100 {
let mut fut = notify.notified();
assert!(poll_once(&mut fut).is_pending());
drop(fut);
}
let entries_len = notify.waiters.lock().entries.len();
crate::assert_with_log!(entries_len == 0, "no growth", 0usize, entries_len);
crate::test_complete!("test_repeated_cancel_no_growth");
}
#[test]
fn notify_one_does_not_lose_wakeup_during_registration_race() {
init_test("notify_one_does_not_lose_wakeup_during_registration_race");
let notify = Arc::new(Notify::new());
let gate = notify.waiters.lock();
let notify_for_notifier = Arc::clone(¬ify);
let notifier = thread::spawn(move || {
notify_for_notifier.notify_one();
});
thread::sleep(Duration::from_millis(10));
let (tx_ready, rx_ready) = mpsc::channel::<bool>();
let (tx_poll, rx_poll) = mpsc::channel::<()>();
let notify_for_poller = Arc::clone(¬ify);
let poller = thread::spawn(move || {
let mut fut = notify_for_poller.notified();
let first_ready = poll_once(&mut fut).is_ready();
tx_ready.send(first_ready).expect("send first_ready");
rx_poll.recv().expect("recv poll signal");
let second_ready = if first_ready {
true
} else {
poll_once(&mut fut).is_ready()
};
tx_ready.send(second_ready).expect("send second_ready");
});
drop(gate);
notifier.join().expect("notifier thread panicked");
let first_ready = rx_ready.recv().expect("recv first_ready");
tx_poll.send(()).expect("send poll signal");
let second_ready = rx_ready.recv().expect("recv second_ready");
poller.join().expect("poller thread panicked");
crate::assert_with_log!(
first_ready || second_ready,
"notify_one eventually makes notified() ready",
true,
first_ready || second_ready
);
crate::test_complete!("notify_one_does_not_lose_wakeup_during_registration_race");
}
#[test]
fn notify_waiters_preserves_slab_shrinking_with_middle_hole() {
init_test("notify_waiters_preserves_slab_shrinking_with_middle_hole");
let notify = Notify::new();
let mut fut1 = notify.notified();
let mut fut2 = notify.notified();
let mut fut3 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
assert!(poll_once(&mut fut2).is_pending());
assert!(poll_once(&mut fut3).is_pending());
drop(fut2);
notify.notify_waiters();
assert!(poll_once(&mut fut1).is_ready());
assert!(poll_once(&mut fut3).is_ready());
drop(fut1);
drop(fut3);
let count = notify.waiter_count();
crate::assert_with_log!(count == 0, "no waiters remain", 0usize, count);
let entries_len = notify.waiters.lock().entries.len();
crate::assert_with_log!(
entries_len == 0,
"slab tail fully shrinks after broadcast",
0usize,
entries_len
);
crate::test_complete!("notify_waiters_preserves_slab_shrinking_with_middle_hole");
}
#[test]
fn dropped_broadcast_waiter_does_not_leak_stored_notification() {
init_test("dropped_broadcast_waiter_does_not_leak_stored_notification");
let notify = Notify::new();
let mut fut1 = notify.notified();
let mut fut2 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
assert!(poll_once(&mut fut2).is_pending());
notify.notify_waiters();
drop(fut1);
assert!(poll_once(&mut fut2).is_ready());
drop(fut2);
let stored = notify.stored_notifications.load(Ordering::Acquire);
crate::assert_with_log!(
stored == 0,
"broadcast drop should not create stored token",
0usize,
stored
);
let mut fut3 = notify.notified();
let pending = poll_once(&mut fut3).is_pending();
crate::assert_with_log!(
pending,
"post-broadcast waiter should remain pending",
true,
pending
);
drop(fut3);
crate::test_complete!("dropped_broadcast_waiter_does_not_leak_stored_notification");
}
#[test]
fn dropped_notify_one_waiter_covered_by_broadcast_does_not_restore_token() {
init_test("dropped_notify_one_waiter_covered_by_broadcast_does_not_restore_token");
let notify = Notify::new();
let mut fut1 = notify.notified();
let mut fut2 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
assert!(poll_once(&mut fut2).is_pending());
notify.notify_one();
notify.notify_waiters();
drop(fut1);
assert!(poll_once(&mut fut2).is_ready());
drop(fut2);
let stored = notify.stored_notifications.load(Ordering::Acquire);
crate::assert_with_log!(
stored == 0,
"broadcast-covered notify_one drop should not restore token",
0usize,
stored
);
let mut fut3 = notify.notified();
let pending = poll_once(&mut fut3).is_pending();
crate::assert_with_log!(
pending,
"new waiter should remain pending after broadcast-covered drop",
true,
pending
);
drop(fut3);
crate::test_complete!(
"dropped_notify_one_waiter_covered_by_broadcast_does_not_restore_token"
);
}
#[test]
fn polled_notify_one_waiter_covered_by_broadcast_does_not_restore_token() {
init_test("polled_notify_one_waiter_covered_by_broadcast_does_not_restore_token");
let notify = Notify::new();
let mut fut1 = notify.notified();
let mut fut2 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
assert!(poll_once(&mut fut2).is_pending());
notify.notify_one();
notify.notify_waiters();
assert!(poll_once(&mut fut1).is_ready());
assert!(poll_once(&mut fut2).is_ready());
drop(fut1);
drop(fut2);
let stored = notify.stored_notifications.load(Ordering::Acquire);
crate::assert_with_log!(
stored == 0,
"broadcast-covered notify_one poll should not restore token",
0usize,
stored
);
let mut fut3 = notify.notified();
let pending = poll_once(&mut fut3).is_pending();
crate::assert_with_log!(
pending,
"new waiter should remain pending after broadcast-covered poll",
true,
pending
);
drop(fut3);
crate::test_complete!(
"polled_notify_one_waiter_covered_by_broadcast_does_not_restore_token"
);
}
#[test]
fn notify_one_baton_pass_to_next_waiter_on_drop() {
init_test("notify_one_baton_pass_to_next_waiter_on_drop");
let notify = Notify::new();
let mut fut1 = notify.notified();
let mut fut2 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
assert!(poll_once(&mut fut2).is_pending());
notify.notify_one();
drop(fut1);
let ready = poll_once(&mut fut2).is_ready();
crate::assert_with_log!(ready, "baton passed to second waiter", true, ready);
crate::test_complete!("notify_one_baton_pass_to_next_waiter_on_drop");
}
#[test]
fn notify_one_re_stores_when_no_other_waiter() {
init_test("notify_one_re_stores_when_no_other_waiter");
let notify = Notify::new();
let mut fut = notify.notified();
assert!(poll_once(&mut fut).is_pending());
notify.notify_one();
drop(fut);
let stored = notify.stored_notifications.load(Ordering::Acquire);
crate::assert_with_log!(stored == 1, "notification re-stored", 1usize, stored);
let mut fut2 = notify.notified();
let ready = poll_once(&mut fut2).is_ready();
crate::assert_with_log!(
ready,
"re-stored notification consumed by next waiter",
true,
ready
);
crate::test_complete!("notify_one_re_stores_when_no_other_waiter");
}
#[test]
fn notify_waiters_does_not_store_token_when_no_waiters() {
init_test("notify_waiters_does_not_store_token_when_no_waiters");
let notify = Notify::new();
notify.notify_waiters();
let stored = notify.stored_notifications.load(Ordering::Acquire);
crate::assert_with_log!(
stored == 0,
"no stored token from broadcast",
0usize,
stored
);
let mut fut = notify.notified();
let pending = poll_once(&mut fut).is_pending();
crate::assert_with_log!(
pending,
"waiter remains pending after no-op broadcast",
true,
pending
);
crate::test_complete!("notify_waiters_does_not_store_token_when_no_waiters");
}
#[test]
fn notify_waiters_does_not_wake_unpolled_future_created_before_broadcast() {
init_test("notify_waiters_does_not_wake_unpolled_future_created_before_broadcast");
let notify = Notify::new();
let mut fut = notify.notified();
notify.notify_waiters();
let pending = poll_once(&mut fut).is_pending();
crate::assert_with_log!(
pending,
"broadcast must not wake an unpolled future",
true,
pending
);
drop(fut);
crate::test_complete!(
"notify_waiters_does_not_wake_unpolled_future_created_before_broadcast"
);
}
#[test]
fn metamorphic_redundant_notify_waiters_preserves_middle_hole_cleanup() {
init_test("metamorphic_redundant_notify_waiters_preserves_middle_hole_cleanup");
let single = broadcast_with_middle_hole_signature(1);
let redundant = broadcast_with_middle_hole_signature(3);
crate::assert_with_log!(
redundant == single,
"repeating notify_waiters over the same waiter set preserves cleanup and late-waiter behavior",
format!("{single:?}"),
format!("{redundant:?}")
);
crate::assert_with_log!(
single.0 == [true, true],
"remaining waiters are both readied after broadcast",
[true, true],
single.0
);
crate::assert_with_log!(
single.1 == 0,
"no active waiters remain after draining the broadcasted set",
0usize,
single.1
);
crate::assert_with_log!(
single.2 == 0,
"slab shrinks fully after draining broadcasted waiters",
0usize,
single.2
);
crate::assert_with_log!(
single.3 == 0,
"redundant broadcasts do not mint stored tokens",
0usize,
single.3
);
crate::assert_with_log!(
single.4,
"a late waiter still remains pending after repeated broadcasts",
true,
single.4
);
crate::test_complete!("metamorphic_redundant_notify_waiters_preserves_middle_hole_cleanup");
}
#[test]
fn metamorphic_redundant_broadcasts_preserve_single_followup_notify_one_token() {
init_test("metamorphic_redundant_broadcasts_preserve_single_followup_notify_one_token");
let single = broadcast_then_notify_one_signature(1);
let redundant = broadcast_then_notify_one_signature(4);
crate::assert_with_log!(
redundant == single,
"redundant broadcasts do not amplify a later stored notify_one token",
format!("{single:?}"),
format!("{redundant:?}")
);
crate::assert_with_log!(
single.0 == [true, true],
"both original waiters are readied by the broadcast",
[true, true],
single.0
);
crate::assert_with_log!(
single.1 == 1,
"exactly one stored token remains for the follow-up notify_one",
1usize,
single.1
);
crate::assert_with_log!(
single.2,
"the next waiter consumes the single stored token immediately",
true,
single.2
);
crate::assert_with_log!(
single.3,
"the waiter after that remains pending because no extra token leaked",
true,
single.3
);
crate::test_complete!(
"metamorphic_redundant_broadcasts_preserve_single_followup_notify_one_token"
);
}
#[test]
fn metamorphic_extra_repolls_preserve_single_notify_one_consumer() {
init_test("metamorphic_extra_repolls_preserve_single_notify_one_consumer");
let single = repoll_then_notify_one_signature(0);
let repolled = repoll_then_notify_one_signature(5);
crate::assert_with_log!(
repolled == single,
"re-polling the front waiter with the same waker does not change single notify_one delivery",
format!("{single:?}"),
format!("{repolled:?}")
);
crate::assert_with_log!(
single.0 == [true, false, false],
"single notify_one still wakes only the first registered waiter",
[true, false, false],
single.0
);
crate::assert_with_log!(
single.1 == 0,
"single notify_one does not leak a stored token when a waiter consumes it",
0usize,
single.1
);
crate::test_complete!("metamorphic_extra_repolls_preserve_single_notify_one_consumer");
}
#[test]
fn metamorphic_middle_cancel_timing_preserves_notify_one_ready_prefix() {
init_test("metamorphic_middle_cancel_timing_preserves_notify_one_ready_prefix");
let cancelled_before = notify_one_with_middle_cancel_signature(true);
let cancelled_between = notify_one_with_middle_cancel_signature(false);
crate::assert_with_log!(
cancelled_between == cancelled_before,
"cancelling the middle waiter before or between notify_one calls preserves the ready prefix",
format!("{cancelled_before:?}"),
format!("{cancelled_between:?}")
);
crate::assert_with_log!(
cancelled_before.0 == [true, true],
"two notify_one calls still wake the surviving front and tail waiters in order",
[true, true],
cancelled_before.0
);
crate::assert_with_log!(
cancelled_before.1 == 0,
"no stored token remains after the surviving waiters consume both notify_one calls",
0usize,
cancelled_before.1
);
crate::assert_with_log!(
cancelled_before.2,
"a late waiter remains pending because cancellation timing did not mint an extra token",
true,
cancelled_before.2
);
crate::test_complete!("metamorphic_middle_cancel_timing_preserves_notify_one_ready_prefix");
}
#[test]
fn test_spurious_wakeup_bug() {
let notify = Notify::new();
let mut fut1 = notify.notified();
assert!(poll_once(&mut fut1).is_pending());
notify.notify_waiters();
let mut fut2 = notify.notified();
assert!(poll_once(&mut fut2).is_pending());
drop(fut1);
let is_ready = poll_once(&mut fut2).is_ready();
assert!(!is_ready, "Spurious wakeup detected!");
}
}