use super::UserData;
use io_uring::squeue::SubmissionQueue;
#[cfg(not(feature = "loom"))]
use io_uring::{opcode::PollAdd, types::Fd};
#[cfg(feature = "loom")]
use loom::sync::{
atomic::{AtomicU32, AtomicU64, Ordering},
Arc, Condvar, Mutex,
};
use std::time::{Duration, Instant};
#[cfg(not(feature = "loom"))]
use std::{
mem::size_of,
os::fd::{AsRawFd, FromRawFd, OwnedFd},
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
};
#[cfg(not(feature = "loom"))]
use tracing::warn;
pub const WAKE_USER_DATA: UserData = UserData::MAX;
const STATE_BITS: u32 = 3;
const WAITING_ON_FUTEX_BIT: u32 = 1;
const WAITING_ON_EVENTFD_BIT: u32 = 1 << 1;
const WAKE_SIGNALLED_BIT: u32 = 1 << 2;
const STATE_MASK: u32 = WAITING_ON_FUTEX_BIT | WAITING_ON_EVENTFD_BIT | WAKE_SIGNALLED_BIT;
const WAITING_MASK: u32 = WAITING_ON_FUTEX_BIT | WAITING_ON_EVENTFD_BIT;
const SUBMISSION_INCREMENT: u32 = 1 << STATE_BITS;
pub const SUBMISSION_SEQ_MASK: u32 = u32::MAX >> STATE_BITS;
pub const HALF_SUBMISSION_SEQUENCE_DOMAIN: u32 = SUBMISSION_SEQ_MASK.div_ceil(2);
pub struct ArmGuard<'a> {
waker: &'a Waker,
still_idle: bool,
wake_latched: bool,
}
impl ArmGuard<'_> {
pub const fn still_idle(&self) -> bool {
self.still_idle
}
pub const fn wake_latched(&self) -> bool {
self.wake_latched
}
}
impl Drop for ArmGuard<'_> {
fn drop(&mut self) {
self.waker.clear_wait();
}
}
#[cfg(not(feature = "loom"))]
struct WakerInner {
wake_fd: OwnedFd,
state: AtomicU32,
}
#[cfg(feature = "loom")]
struct WakerInner {
state: AtomicU32,
futex_bucket: Mutex<()>,
futex_waiters: Condvar,
eventfd_counter: AtomicU64,
eventfd_readiness: Mutex<()>,
eventfd_waiters: Condvar,
}
#[derive(Clone)]
pub struct Waker {
inner: Arc<WakerInner>,
}
impl Waker {
#[cfg(not(feature = "loom"))]
pub fn new() -> Result<Self, std::io::Error> {
let fd = unsafe { libc::eventfd(0, libc::EFD_CLOEXEC | libc::EFD_NONBLOCK) };
if fd < 0 {
return Err(std::io::Error::last_os_error());
}
let wake_fd = unsafe { OwnedFd::from_raw_fd(fd) };
Ok(Self {
inner: Arc::new(WakerInner {
wake_fd,
state: AtomicU32::new(0),
}),
})
}
#[cfg(feature = "loom")]
pub fn new() -> Result<Self, std::io::Error> {
Ok(Self {
inner: Arc::new(WakerInner {
state: AtomicU32::new(0),
futex_bucket: Mutex::new(()),
futex_waiters: Condvar::new(),
eventfd_counter: AtomicU64::new(0),
eventfd_readiness: Mutex::new(()),
eventfd_waiters: Condvar::new(),
}),
})
}
pub fn wake(&self) {
let prev = self
.inner
.state
.fetch_or(WAKE_SIGNALLED_BIT, Ordering::Release);
if (prev & WAKE_SIGNALLED_BIT) != 0 {
return;
}
let waiting = prev & WAITING_MASK;
assert_ne!(
waiting, WAITING_MASK,
"iouring wake state cannot wait on futex and eventfd simultaneously"
);
match waiting {
0 => {}
WAITING_ON_FUTEX_BIT => self.futex_wake(),
WAITING_ON_EVENTFD_BIT => self.eventfd_wake(),
_ => unreachable!("unexpected iouring wake target"),
}
}
#[inline]
pub fn publish(&self) {
let prev = self
.inner
.state
.fetch_add(SUBMISSION_INCREMENT, Ordering::Release);
let waiting = prev & WAITING_MASK;
if waiting == 0 || (prev & WAKE_SIGNALLED_BIT) != 0 {
return;
}
self.wake();
}
#[inline]
pub fn pending(&self, processed_seq: u32) -> bool {
let published_seq =
(self.inner.state.load(Ordering::Acquire) >> STATE_BITS) & SUBMISSION_SEQ_MASK;
let delta = published_seq.wrapping_sub(processed_seq) & SUBMISSION_SEQ_MASK;
delta != 0 && delta < HALF_SUBMISSION_SEQUENCE_DOMAIN
}
pub fn park_idle(&self, processed_seq: u32) -> Option<Duration> {
let prev = self
.inner
.state
.fetch_or(WAITING_ON_FUTEX_BIT, Ordering::Relaxed);
assert_eq!(
prev & WAITING_MASK,
0,
"iouring wait target should be disarmed before re-arming"
);
let snapshot = prev | WAITING_ON_FUTEX_BIT;
if (snapshot & WAKE_SIGNALLED_BIT) == 0
&& ((snapshot >> STATE_BITS) & SUBMISSION_SEQ_MASK) == processed_seq
{
let before = Instant::now();
let slept = self.futex_wait(snapshot);
self.clear_wait();
slept.then(|| before.elapsed())
} else {
self.clear_wait();
None
}
}
pub fn arm(&self, processed_seq: u32) -> ArmGuard<'_> {
let prev = self
.inner
.state
.fetch_or(WAITING_ON_EVENTFD_BIT, Ordering::Relaxed);
assert_eq!(
prev & WAITING_MASK,
0,
"iouring wait target should be disarmed before re-arming"
);
let snapshot = prev | WAITING_ON_EVENTFD_BIT;
let wake_latched = (snapshot & WAKE_SIGNALLED_BIT) != 0;
let still_idle =
!wake_latched && ((snapshot >> STATE_BITS) & SUBMISSION_SEQ_MASK) == processed_seq;
ArmGuard {
waker: self,
still_idle,
wake_latched,
}
}
#[cfg(not(feature = "loom"))]
pub fn acknowledge(&self) {
let mut value: u64 = 0;
loop {
let ret = unsafe {
libc::read(
self.inner.wake_fd.as_raw_fd(),
&mut value as *mut u64 as *mut libc::c_void,
size_of::<u64>(),
)
};
if ret == size_of::<u64>() as isize {
return;
}
assert_eq!(
ret, -1,
"eventfd read returned unexpected byte count: {ret}"
);
match std::io::Error::last_os_error().raw_os_error() {
Some(libc::EINTR) => continue,
Some(libc::EAGAIN) => return,
_ => {
tracing::warn!("eventfd read failed");
return;
}
}
}
}
#[cfg(feature = "loom")]
pub fn acknowledge(&self) {
self.inner.eventfd_counter.swap(0, Ordering::AcqRel);
}
#[cfg(not(feature = "loom"))]
pub fn reinstall(&self, submission_queue: &mut SubmissionQueue<'_>) -> bool {
if submission_queue.is_full() {
return false;
}
let wake_poll = PollAdd::new(Fd(self.inner.wake_fd.as_raw_fd()), libc::POLLIN as u32)
.multi(true)
.build()
.user_data(WAKE_USER_DATA);
unsafe {
submission_queue
.push(&wake_poll)
.expect("checked wake poll SQE capacity");
}
true
}
#[cfg(feature = "loom")]
pub const fn reinstall(&self, _submission_queue: &mut SubmissionQueue<'_>) -> bool {
true
}
#[inline]
fn clear_wait(&self) {
self.inner.state.fetch_and(!STATE_MASK, Ordering::Acquire);
}
#[cfg(not(feature = "loom"))]
fn eventfd_wake(&self) {
let value: u64 = 1;
loop {
let ret = unsafe {
libc::write(
self.inner.wake_fd.as_raw_fd(),
&value as *const u64 as *const libc::c_void,
size_of::<u64>(),
)
};
if ret == size_of::<u64>() as isize {
return;
}
assert_eq!(
ret, -1,
"eventfd write returned unexpected byte count: {ret}"
);
match std::io::Error::last_os_error().raw_os_error() {
Some(libc::EINTR) => continue,
Some(libc::EAGAIN) => return,
_ => {
warn!("eventfd write failed");
return;
}
}
}
}
#[cfg(feature = "loom")]
fn eventfd_wake(&self) {
self.inner.eventfd_counter.fetch_add(1, Ordering::Release);
let _guard = self.inner.eventfd_readiness.lock().unwrap();
self.inner.eventfd_waiters.notify_one();
}
#[cfg(not(feature = "loom"))]
fn futex_wake(&self) {
loop {
let ret = unsafe {
libc::syscall(
libc::SYS_futex,
self.inner.state.as_ptr(),
libc::FUTEX_WAKE | libc::FUTEX_PRIVATE_FLAG,
1u32,
)
};
if ret >= 0 {
return;
}
let err = std::io::Error::last_os_error();
match err.raw_os_error() {
Some(libc::EINTR) => continue,
_ => {
panic!("futex wake failed: {err}");
}
}
}
}
#[cfg(feature = "loom")]
fn futex_wake(&self) {
let _guard = self.inner.futex_bucket.lock().unwrap();
self.inner.futex_waiters.notify_one();
}
#[cfg(not(feature = "loom"))]
fn futex_wait(&self, snapshot: u32) -> bool {
loop {
if self.inner.state.load(Ordering::Relaxed) != snapshot {
return false;
}
let ret = unsafe {
libc::syscall(
libc::SYS_futex,
self.inner.state.as_ptr(),
libc::FUTEX_WAIT | libc::FUTEX_PRIVATE_FLAG,
snapshot,
std::ptr::null::<libc::timespec>(),
)
};
if ret == 0 {
return true;
}
let err = std::io::Error::last_os_error();
match err.raw_os_error() {
Some(libc::EINTR) => continue,
Some(libc::EAGAIN) => return false,
_ => {
warn!("futex wait failed: {err}");
return false;
}
}
}
}
#[cfg(feature = "loom")]
fn futex_wait(&self, snapshot: u32) -> bool {
let mut guard = self.inner.futex_bucket.lock().unwrap();
let mut slept = false;
while self.inner.state.load(Ordering::Acquire) == snapshot {
slept = true;
guard = self.inner.futex_waiters.wait(guard).unwrap();
}
slept
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use io_uring::IoUring;
#[cfg(not(feature = "loom"))]
use std::{
mem::size_of,
os::fd::{AsRawFd, FromRawFd},
};
pub fn wait_until_futex_armed(waker: &Waker) {
while waker.inner.state.load(Ordering::Relaxed) & WAITING_ON_FUTEX_BIT == 0 {
std::hint::spin_loop();
}
}
pub fn wait_until_eventfd_armed(waker: &Waker) {
while waker.inner.state.load(Ordering::Relaxed) & WAITING_ON_EVENTFD_BIT == 0 {
std::hint::spin_loop();
}
}
pub fn state_bits(waker: &Waker) -> u32 {
waker.inner.state.load(Ordering::Relaxed) & STATE_MASK
}
pub fn submitted_seq(waker: &Waker) -> u32 {
(waker.inner.state.load(Ordering::Relaxed) >> STATE_BITS) & SUBMISSION_SEQ_MASK
}
pub fn eventfd_count(waker: &Waker) -> u64 {
#[cfg(not(feature = "loom"))]
{
let mut value = 0u64;
let ret = unsafe {
libc::read(
waker.inner.wake_fd.as_raw_fd(),
&mut value as *mut u64 as *mut libc::c_void,
size_of::<u64>(),
)
};
if ret == -1 && std::io::Error::last_os_error().raw_os_error() == Some(libc::EAGAIN) {
return 0;
}
assert_eq!(ret, size_of::<u64>() as isize);
value
}
#[cfg(feature = "loom")]
{
waker.inner.eventfd_counter.load(Ordering::Relaxed)
}
}
#[test]
fn test_publish_arm_guard_and_submitted() {
let waker = Waker::new().expect("eventfd creation should succeed");
assert_eq!(submitted_seq(&waker), 0);
waker.publish();
assert_eq!(submitted_seq(&waker), 1);
let arm = waker.arm(1);
assert!(arm.still_idle());
assert!(!arm.wake_latched());
waker.publish();
assert_eq!(submitted_seq(&waker), 2);
waker.acknowledge();
assert_eq!(submitted_seq(&waker), 2);
drop(arm);
assert_eq!(submitted_seq(&waker), 2);
assert_eq!(state_bits(&waker), 0);
let arm = waker.arm(2);
assert!(arm.still_idle());
assert!(!arm.wake_latched());
drop(arm);
}
#[test]
fn test_pending_uses_directional_half_range_compare() {
let waker = Waker::new().expect("eventfd creation should succeed");
waker.inner.state.store(1 << STATE_BITS, Ordering::Relaxed);
assert!(waker.pending(0));
assert!(!waker.pending(1));
waker.inner.state.store(0, Ordering::Relaxed);
assert!(!waker.pending(1));
waker.inner.state.store(
HALF_SUBMISSION_SEQUENCE_DOMAIN << STATE_BITS,
Ordering::Relaxed,
);
assert!(!waker.pending(0));
waker.inner.state.store(0, Ordering::Relaxed);
assert!(waker.pending(SUBMISSION_SEQ_MASK));
}
#[test]
fn test_park_idle_handles_concurrent_publish_and_wake_races() {
#[derive(Clone, Copy, Debug)]
enum Notifier {
Wake,
Publish,
}
for notifier in [Notifier::Wake, Notifier::Publish] {
for _ in 0..64 {
let waker = Waker::new().expect("eventfd creation should succeed");
let before = submitted_seq(&waker);
let notifier_waker = waker.clone();
let handle = std::thread::spawn(move || {
while state_bits(¬ifier_waker) & WAITING_ON_FUTEX_BIT == 0 {
std::hint::spin_loop();
}
match notifier {
Notifier::Wake => notifier_waker.wake(),
Notifier::Publish => notifier_waker.publish(),
}
});
let _ = waker.park_idle(before);
handle.join().expect("idle notifier thread panicked");
let expected = match notifier {
Notifier::Wake => before,
Notifier::Publish => before.wrapping_add(1) & SUBMISSION_SEQ_MASK,
};
assert_eq!(submitted_seq(&waker), expected, "{notifier:?}");
assert_eq!(state_bits(&waker), 0, "{notifier:?}");
}
}
}
#[test]
fn test_wake_without_idle_wait_keeps_sequence_stable() {
let waker = Waker::new().expect("eventfd creation should succeed");
let before = submitted_seq(&waker);
waker.wake();
assert_eq!(submitted_seq(&waker), before);
}
#[test]
fn test_wake_before_park_idle_skips_sleep() {
let waker = Waker::new().expect("eventfd creation should succeed");
let before = submitted_seq(&waker);
waker.wake();
let duration = waker.park_idle(before);
assert!(duration.is_none(), "should not have slept");
assert_eq!(submitted_seq(&waker), before);
assert_eq!(state_bits(&waker), 0);
}
#[test]
fn test_publish_before_park_idle_skips_sleep() {
let waker = Waker::new().expect("eventfd creation should succeed");
waker.publish();
assert!(waker.park_idle(0).is_none());
assert_eq!(submitted_seq(&waker), 1);
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 0);
}
#[test]
fn test_publish_after_futex_arm_rejects_stale_snapshot() {
let waker = Waker::new().expect("eventfd creation should succeed");
let before = submitted_seq(&waker);
let prev = waker
.inner
.state
.fetch_or(WAITING_ON_FUTEX_BIT, Ordering::Relaxed);
assert_eq!(prev & WAITING_MASK, 0);
let snapshot = prev | WAITING_ON_FUTEX_BIT;
waker.publish();
assert_eq!(
submitted_seq(&waker),
before.wrapping_add(1) & SUBMISSION_SEQ_MASK
);
assert!(!waker.futex_wait(snapshot));
waker.clear_wait();
assert_eq!(
submitted_seq(&waker),
before.wrapping_add(1) & SUBMISSION_SEQ_MASK
);
assert_eq!(state_bits(&waker), 0);
}
#[test]
fn test_publish_deduplicates_eventfd_wakes() {
let waker = Waker::new().expect("eventfd creation should succeed");
let barrier = Arc::new(std::sync::Barrier::new(5));
let mut handles = Vec::new();
let arm = waker.arm(0);
assert!(arm.still_idle());
assert!(!arm.wake_latched());
for _ in 0..4 {
let publisher = waker.clone();
let barrier = barrier.clone();
handles.push(std::thread::spawn(move || {
barrier.wait();
publisher.publish();
}));
}
barrier.wait();
for handle in handles {
handle.join().expect("publish thread panicked");
}
assert_eq!(submitted_seq(&waker), 4);
assert_eq!(eventfd_count(&waker), 1);
drop(arm);
}
#[test]
fn test_arm_after_sticky_wake_skips_blocking() {
let waker = Waker::new().expect("eventfd creation should succeed");
waker.wake();
let arm = waker.arm(0);
assert!(!arm.still_idle());
assert!(arm.wake_latched());
drop(arm);
assert_eq!(submitted_seq(&waker), 0);
assert_eq!(state_bits(&waker), 0);
}
#[test]
fn test_unarmed_wakes_rearm_across_epochs() {
let waker = Waker::new().expect("eventfd creation should succeed");
waker.wake();
let arm = waker.arm(0);
assert!(!arm.still_idle());
assert!(arm.wake_latched());
drop(arm);
assert_eq!(submitted_seq(&waker), 0);
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 0);
waker.wake();
let arm = waker.arm(0);
assert!(!arm.still_idle());
assert!(arm.wake_latched());
drop(arm);
assert_eq!(submitted_seq(&waker), 0);
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 0);
waker.publish();
waker.wake();
assert_eq!(submitted_seq(&waker), 1);
assert_eq!(eventfd_count(&waker), 0);
let arm = waker.arm(1);
assert!(!arm.still_idle());
assert!(arm.wake_latched());
drop(arm);
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 0);
}
#[test]
fn test_arm_after_publish_skips_blocking() {
let waker = Waker::new().expect("eventfd creation should succeed");
waker.publish();
let arm = waker.arm(0);
assert!(!arm.still_idle());
assert!(!arm.wake_latched());
drop(arm);
assert_eq!(submitted_seq(&waker), 1);
assert_eq!(state_bits(&waker), 0);
}
#[test]
fn test_wake_deduplicates_eventfd_wakes() {
let waker = Waker::new().expect("eventfd creation should succeed");
let barrier = Arc::new(std::sync::Barrier::new(5));
let mut handles = Vec::new();
let arm = waker.arm(0);
assert!(arm.still_idle());
assert!(!arm.wake_latched());
for _ in 0..4 {
let notifier = waker.clone();
let barrier = barrier.clone();
handles.push(std::thread::spawn(move || {
barrier.wait();
notifier.wake();
}));
}
barrier.wait();
for handle in handles {
handle.join().expect("wake thread panicked");
}
assert_eq!(submitted_seq(&waker), 0);
assert_eq!(eventfd_count(&waker), 1);
drop(arm);
}
#[test]
fn test_eventfd_wake_and_acknowledge_empty_paths_keep_sequence_stable() {
let waker = Waker::new().expect("eventfd creation should succeed");
let before = submitted_seq(&waker);
waker.eventfd_wake();
waker.acknowledge();
waker.acknowledge();
assert_eq!(submitted_seq(&waker), before);
}
#[test]
fn test_reinstall_pushes_wake_poll() {
let waker = Waker::new().expect("eventfd creation should succeed");
let mut ring = IoUring::new(8).expect("io_uring creation should succeed");
let mut sq = ring.submission();
let before = sq.len();
assert!(waker.reinstall(&mut sq));
assert_eq!(sq.len(), before + 1);
while !sq.is_full() {
let nop = io_uring::opcode::Nop::new().build().user_data(0);
unsafe {
sq.push(&nop).expect("unable to fill submission queue");
}
}
let before = sq.len();
assert!(!waker.reinstall(&mut sq));
assert_eq!(sq.len(), before);
}
#[cfg(not(feature = "loom"))]
#[test]
fn test_eventfd_wake_and_acknowledge_error_branches() {
let mut waker = Waker::new().expect("eventfd creation should succeed");
let before = submitted_seq(&waker);
let fd = waker.inner.wake_fd.as_raw_fd();
let value = u64::MAX - 1;
let wrote = unsafe {
libc::write(
fd,
&value as *const u64 as *const libc::c_void,
size_of::<u64>(),
)
};
assert_eq!(wrote, size_of::<u64>() as isize);
waker.eventfd_wake();
waker.acknowledge();
let closed = unsafe { libc::close(fd) };
assert_eq!(closed, 0);
waker.eventfd_wake();
waker.acknowledge();
let replacement = unsafe { libc::dup(libc::STDIN_FILENO) };
assert!(replacement >= 0);
let old = {
let inner = std::sync::Arc::get_mut(&mut waker.inner).expect("unique waker in test");
std::mem::replace(&mut inner.wake_fd, unsafe {
std::os::fd::OwnedFd::from_raw_fd(replacement)
})
};
std::mem::forget(old);
assert_eq!(submitted_seq(&waker), before);
}
}
#[cfg(all(test, feature = "loom"))]
mod loom_tests {
use super::{
tests::{eventfd_count, state_bits, submitted_seq},
*,
};
use commonware_utils::test_rng;
use loom::{
sync::{
atomic::{AtomicU32, Ordering},
Arc,
},
thread,
};
use rand::Rng;
struct QueuedRequest {
value: AtomicU32,
}
impl QueuedRequest {
fn empty() -> Self {
Self {
value: AtomicU32::new(0),
}
}
fn enqueue(&self, value: u32) {
self.value.store(value, Ordering::Relaxed);
}
fn read(&self) -> u32 {
self.value.load(Ordering::Relaxed)
}
}
fn wait_for_eventfd_readiness(waker: &Waker) {
let mut guard = waker.inner.eventfd_readiness.lock().unwrap();
while waker.inner.eventfd_counter.load(Ordering::Acquire) == 0 {
guard = waker.inner.eventfd_waiters.wait(guard).unwrap();
}
}
fn wait_for_wake_signal(waker: &Waker) {
while state_bits(waker) & WAKE_SIGNALLED_BIT == 0 {
thread::yield_now();
}
}
fn finish_leftover_wake(waker: &Waker) {
assert_eq!(state_bits(waker) & WAITING_MASK, 0);
if (state_bits(waker) & WAKE_SIGNALLED_BIT) != 0 {
let guard = waker.arm(submitted_seq(waker));
assert!(guard.wake_latched());
drop(guard);
}
waker.acknowledge();
assert_eq!(state_bits(waker), 0);
assert_eq!(eventfd_count(waker), 0);
}
fn simulate_eventfd_loop_until(waker: &Waker, mut processed: u32, target: u32) -> u32 {
while processed != target {
if waker.pending(processed) {
processed = processed.wrapping_add(1) & SUBMISSION_SEQ_MASK;
continue;
}
let guard = waker.arm(processed);
if guard.still_idle() {
wait_for_eventfd_readiness(waker);
assert!(
eventfd_count(waker) > 0,
"blocking eventfd wait must observe queued readiness before cleanup",
);
}
drop(guard);
waker.acknowledge();
}
processed
}
fn simulate_futex_loop_until(waker: &Waker, mut processed: u32, target: u32) -> u32 {
while processed != target {
if waker.pending(processed) {
processed = processed.wrapping_add(1) & SUBMISSION_SEQ_MASK;
continue;
}
let _ = waker.park_idle(processed);
}
processed
}
#[derive(Clone, Copy, Debug)]
enum ProducerOp {
Publish,
Wake,
}
impl ProducerOp {
fn generate_program(rng: &mut impl Rng, len: usize) -> Vec<Self> {
(0..len)
.map(|_| {
if rng.gen_bool(0.5) {
Self::Publish
} else {
Self::Wake
}
})
.collect()
}
fn execute(self, waker: &Waker, publishes: &AtomicU32) {
match self {
ProducerOp::Publish => {
waker.publish();
publishes.fetch_add(1, Ordering::Relaxed);
}
ProducerOp::Wake => waker.wake(),
}
}
}
#[test]
fn publish_pending_pairing() {
loom::model(|| {
let waker = Waker::new().unwrap();
let queued = Arc::new(QueuedRequest::empty());
let producer = thread::spawn({
let waker = waker.clone();
let queued = queued.clone();
move || {
queued.enqueue(42);
waker.publish();
}
});
while !waker.pending(0) {
thread::yield_now();
}
assert_eq!(queued.read(), 42);
producer.join().unwrap();
assert_eq!(submitted_seq(&waker), 1);
});
}
#[test]
fn wake_clear_wait_pairing() {
loom::model(|| {
let waker = Waker::new().unwrap();
let queued = Arc::new(QueuedRequest::empty());
let notifier = thread::spawn({
let waker = waker.clone();
let queued = queued.clone();
move || {
queued.enqueue(42);
waker.wake();
}
});
wait_for_wake_signal(&waker);
assert_eq!(eventfd_count(&waker), 0);
let guard = waker.arm(0);
assert!(guard.wake_latched());
drop(guard);
assert_eq!(queued.read(), 42);
assert_eq!(eventfd_count(&waker), 0);
notifier.join().unwrap();
});
}
#[test]
fn concurrent_unarmed_wakes_coalesce() {
loom::model(|| {
let waker = Waker::new().unwrap();
let a = thread::spawn({
let waker = waker.clone();
move || waker.wake()
});
let b = thread::spawn({
let waker = waker.clone();
move || waker.wake()
});
a.join().unwrap();
b.join().unwrap();
assert_eq!(eventfd_count(&waker), 0);
let guard = waker.arm(0);
assert!(!guard.still_idle());
assert!(guard.wake_latched());
drop(guard);
assert_eq!(submitted_seq(&waker), 0);
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 0);
});
}
#[test]
fn arm_and_recheck_eventfd_race() {
loom::model(|| {
let waker = Waker::new().unwrap();
let producer = thread::spawn({
let waker = waker.clone();
move || waker.publish()
});
let guard = waker.arm(0);
if guard.still_idle() {
wait_for_eventfd_readiness(&waker);
}
drop(guard);
waker.acknowledge();
producer.join().unwrap();
assert_eq!(submitted_seq(&waker), 1);
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 0);
});
}
#[test]
fn publish_clear_wait_pairing_when_armed() {
loom::model(|| {
let waker = Waker::new().unwrap();
let queued = Arc::new(QueuedRequest::empty());
let guard = waker.arm(0);
assert!(guard.still_idle());
let producer = thread::spawn({
let waker = waker.clone();
let queued = queued.clone();
move || {
queued.enqueue(42);
waker.publish();
}
});
wait_for_wake_signal(&waker);
drop(guard);
assert_eq!(queued.read(), 42);
producer.join().unwrap();
assert_eq!(submitted_seq(&waker), 1);
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 1);
waker.acknowledge();
assert_eq!(eventfd_count(&waker), 0);
});
}
#[test]
fn wake_clear_wait_pairing_when_armed() {
loom::model(|| {
let waker = Waker::new().unwrap();
let queued = Arc::new(QueuedRequest::empty());
let guard = waker.arm(0);
assert!(guard.still_idle());
let notifier = thread::spawn({
let waker = waker.clone();
let queued = queued.clone();
move || {
queued.enqueue(42);
waker.wake();
}
});
wait_for_wake_signal(&waker);
drop(guard);
assert_eq!(queued.read(), 42);
notifier.join().unwrap();
assert_eq!(submitted_seq(&waker), 0);
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 1);
waker.acknowledge();
assert_eq!(eventfd_count(&waker), 0);
});
}
#[test]
fn arm_and_recheck_futex_race() {
loom::model(|| {
let waker = Waker::new().unwrap();
let producer = thread::spawn({
let waker = waker.clone();
move || waker.publish()
});
let _ = waker.park_idle(0);
producer.join().unwrap();
assert_eq!(submitted_seq(&waker), 1);
finish_leftover_wake(&waker);
});
}
#[test]
fn publishers_dedup_eventfd_wake() {
loom::model(|| {
let waker = Waker::new().unwrap();
let guard = waker.arm(0);
assert!(guard.still_idle());
let a = thread::spawn({
let waker = waker.clone();
move || waker.publish()
});
let b = thread::spawn({
let waker = waker.clone();
move || waker.publish()
});
a.join().unwrap();
b.join().unwrap();
assert_eq!(submitted_seq(&waker), 2);
assert_eq!(eventfd_count(&waker), 1);
drop(guard);
waker.acknowledge();
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 0);
});
}
#[test]
fn mixed_publish_and_wake_dedup() {
loom::model(|| {
let waker = Waker::new().unwrap();
let guard = waker.arm(0);
assert!(guard.still_idle());
let publisher = thread::spawn({
let waker = waker.clone();
move || waker.publish()
});
let notifier = thread::spawn({
let waker = waker.clone();
move || waker.wake()
});
publisher.join().unwrap();
notifier.join().unwrap();
assert_eq!(submitted_seq(&waker), 1);
assert_eq!(eventfd_count(&waker), 1);
drop(guard);
waker.acknowledge();
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 0);
});
}
#[test]
fn mixed_publish_and_wake_futex_arm() {
loom::model(|| {
let waker = Waker::new().unwrap();
let prev = waker
.inner
.state
.fetch_or(WAITING_ON_FUTEX_BIT, Ordering::Relaxed);
assert_eq!(prev & WAITING_MASK, 0);
let snapshot = prev | WAITING_ON_FUTEX_BIT;
let publisher = thread::spawn({
let waker = waker.clone();
move || waker.publish()
});
let notifier = thread::spawn({
let waker = waker.clone();
move || waker.wake()
});
publisher.join().unwrap();
notifier.join().unwrap();
assert_eq!(submitted_seq(&waker), 1);
assert_eq!(eventfd_count(&waker), 0);
assert_eq!(
state_bits(&waker),
WAITING_ON_FUTEX_BIT | WAKE_SIGNALLED_BIT
);
assert!(!waker.futex_wait(snapshot));
waker.clear_wait();
assert_eq!(state_bits(&waker), 0);
});
}
#[test]
fn drop_wake() {
loom::model(|| {
let waker = Waker::new().unwrap();
let notifier = thread::spawn({
let waker = waker.clone();
move || waker.wake()
});
let guard = waker.arm(0);
if guard.still_idle() {
wait_for_eventfd_readiness(&waker);
}
drop(guard);
waker.acknowledge();
notifier.join().unwrap();
assert_eq!(submitted_seq(&waker), 0);
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 0);
});
}
#[test]
fn sequence_wraparound() {
loom::model(|| {
let waker = Waker::new().unwrap();
waker
.inner
.state
.store(SUBMISSION_SEQ_MASK << STATE_BITS, Ordering::Relaxed);
let producer = thread::spawn({
let waker = waker.clone();
move || {
waker.publish();
waker.publish();
}
});
assert_eq!(
simulate_eventfd_loop_until(&waker, SUBMISSION_SEQ_MASK, 1),
1
);
producer.join().unwrap();
assert_eq!(submitted_seq(&waker), 1);
finish_leftover_wake(&waker);
});
}
#[test]
fn two_producers_mixed_ops() {
loom::model(|| {
let waker = Waker::new().unwrap();
let publishes = Arc::new(AtomicU32::new(0));
let a = thread::spawn({
let waker = waker.clone();
let publishes = publishes.clone();
move || {
waker.publish();
publishes.fetch_add(1, Ordering::Relaxed);
waker.wake();
waker.publish();
publishes.fetch_add(1, Ordering::Relaxed);
}
});
let b = thread::spawn({
let waker = waker.clone();
let publishes = publishes.clone();
move || {
waker.wake();
waker.publish();
publishes.fetch_add(1, Ordering::Relaxed);
}
});
a.join().unwrap();
b.join().unwrap();
assert_eq!(submitted_seq(&waker), publishes.load(Ordering::Relaxed));
assert_eq!(state_bits(&waker) & WAITING_MASK, 0);
assert_eq!(eventfd_count(&waker), 0);
});
}
#[test]
fn producer_with_draining_loop() {
loom::model(|| {
let waker = Waker::new().unwrap();
let producer = thread::spawn({
let waker = waker.clone();
move || {
waker.publish();
waker.publish();
}
});
let processed = simulate_eventfd_loop_until(&waker, 0, 2);
producer.join().unwrap();
assert_eq!(processed, 2);
assert_eq!(submitted_seq(&waker), 2);
finish_leftover_wake(&waker);
});
}
#[test]
fn park_idle_with_concurrent_wake() {
loom::model(|| {
let waker = Waker::new().unwrap();
let notifier = thread::spawn({
let waker = waker.clone();
move || waker.wake()
});
let _ = waker.park_idle(0);
notifier.join().unwrap();
assert_eq!(submitted_seq(&waker), 0);
assert_eq!(state_bits(&waker), 0);
assert_eq!(eventfd_count(&waker), 0);
});
}
#[test]
fn two_cycle_drain_with_interleaved_wake() {
loom::model(|| {
let waker = Waker::new().unwrap();
let producer = thread::spawn({
let waker = waker.clone();
move || {
waker.publish();
waker.wake();
waker.publish();
}
});
let processed = simulate_eventfd_loop_until(&waker, 0, 2);
producer.join().unwrap();
assert_eq!(processed, 2);
assert_eq!(submitted_seq(&waker), 2);
finish_leftover_wake(&waker);
});
}
#[test]
fn multiple_park_idle_cycles() {
loom::model(|| {
let waker = Waker::new().unwrap();
let producer = thread::spawn({
let waker = waker.clone();
move || {
waker.publish();
waker.publish();
}
});
let processed = simulate_futex_loop_until(&waker, 0, 2);
producer.join().unwrap();
assert_eq!(processed, 2);
assert_eq!(submitted_seq(&waker), 2);
finish_leftover_wake(&waker);
});
}
#[test]
fn three_thread_stress() {
loom::model(|| {
let waker = Waker::new().unwrap();
let a = thread::spawn({
let waker = waker.clone();
move || waker.publish()
});
let b = thread::spawn({
let waker = waker.clone();
move || waker.publish()
});
let processed = simulate_eventfd_loop_until(&waker, 0, 2);
a.join().unwrap();
b.join().unwrap();
assert_eq!(processed, 2);
assert_eq!(submitted_seq(&waker), 2);
finish_leftover_wake(&waker);
});
}
#[test]
fn generated_producer_only_programs() {
const CASES: usize = 24;
const OPS_PER_PROGRAM: usize = 5;
let mut rng = test_rng();
let programs = (0..CASES)
.map(|_| {
[
ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM),
ProducerOp::generate_program(&mut rng, OPS_PER_PROGRAM),
]
})
.collect::<Vec<_>>();
for (iter, programs) in programs.into_iter().enumerate() {
loom::model(move || {
let waker = Waker::new().unwrap();
let publishes = Arc::new(AtomicU32::new(0));
let handles = programs
.iter()
.map(|program| {
let program = program.clone();
let waker = waker.clone();
let publishes = publishes.clone();
thread::spawn(move || {
for &op in program.iter() {
op.execute(&waker, &publishes);
}
})
})
.collect::<Vec<_>>();
for handle in handles {
handle.join().unwrap();
}
let expected = publishes.load(Ordering::Relaxed);
let got = submitted_seq(&waker);
assert_eq!(
got, expected,
"publish conservation failed: iter={iter} programs={programs:?}",
);
assert_eq!(
state_bits(&waker) & WAITING_MASK,
0,
"wait target remained armed: iter={iter} programs={programs:?}",
);
assert_eq!(
eventfd_count(&waker),
0,
"eventfd readiness queued while unarmed: iter={iter} programs={programs:?}",
);
});
}
}
fn generated_loop_programs(
cases: usize,
ops_per_program: usize,
simulate_loop_until: fn(&Waker, u32, u32) -> u32,
) {
let mut rng = test_rng();
let programs = (0..cases)
.map(|_| ProducerOp::generate_program(&mut rng, ops_per_program))
.collect::<Vec<_>>();
for (iter, program) in programs.into_iter().enumerate() {
let publish_count = program
.iter()
.filter(|op| matches!(op, ProducerOp::Publish))
.count() as u32;
loom::model(move || {
let waker = Waker::new().unwrap();
let publishes = Arc::new(AtomicU32::new(0));
let producer = thread::spawn({
let program = program.clone();
let waker = waker.clone();
let publishes = publishes.clone();
move || {
for &op in program.iter() {
op.execute(&waker, &publishes);
}
}
});
let processed = simulate_loop_until(&waker, 0, publish_count);
producer.join().unwrap();
assert_eq!(
processed, publish_count,
"loop progress failed: iter={iter} program={program:?}",
);
assert_eq!(
submitted_seq(&waker),
publish_count,
"publish conservation failed: iter={iter} program={program:?}",
);
assert_eq!(
publishes.load(Ordering::Relaxed),
publish_count,
"producer accounting failed: iter={iter} program={program:?}",
);
finish_leftover_wake(&waker);
});
}
}
#[test]
fn generated_eventfd_loop_programs() {
const CASES: usize = 96;
const OPS_PER_PROGRAM: usize = 3;
generated_loop_programs(CASES, OPS_PER_PROGRAM, simulate_eventfd_loop_until);
}
#[test]
fn generated_futex_loop_programs() {
const CASES: usize = 16;
const OPS_PER_PROGRAM: usize = 3;
generated_loop_programs(CASES, OPS_PER_PROGRAM, simulate_futex_loop_until);
}
}