use crate::runtime::reactor::{
Event, Events, Interest, Reactor, SlabToken, Source, Token, TokenSlab,
};
use parking_lot::Mutex;
use std::collections::HashMap;
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Weak};
use std::task::Waker;
use std::time::Duration;
const DEFAULT_EVENTS_CAPACITY: usize = 1024;
const DEFAULT_INTERESTS_CAPACITY: usize = 64;
#[inline]
const fn interest_map_capacity(events_capacity: usize) -> usize {
if events_capacity > DEFAULT_INTERESTS_CAPACITY {
events_capacity
} else {
DEFAULT_INTERESTS_CAPACITY
}
}
#[derive(Debug, Clone, Default)]
pub struct IoStats {
pub polls: u64,
pub events_received: u64,
pub wakers_dispatched: u64,
pub unknown_tokens: u64,
pub registrations: u64,
pub deregistrations: u64,
}
pub struct IoDriver {
reactor: Arc<dyn Reactor>,
wakers: TokenSlab,
interests: HashMap<Token, Interest>,
events: Events,
waker_buf: Vec<Waker>,
stats: IoStats,
}
impl IoDriver {
#[must_use]
pub fn new(reactor: Arc<dyn Reactor>) -> Self {
Self {
reactor,
wakers: TokenSlab::new(),
interests: HashMap::with_capacity(interest_map_capacity(DEFAULT_EVENTS_CAPACITY)),
events: Events::with_capacity(DEFAULT_EVENTS_CAPACITY),
waker_buf: Vec::with_capacity(64),
stats: IoStats::default(),
}
}
#[must_use]
pub fn with_capacity(reactor: Arc<dyn Reactor>, events_capacity: usize) -> Self {
Self {
reactor,
wakers: TokenSlab::new(),
interests: HashMap::with_capacity(interest_map_capacity(events_capacity)),
events: Events::with_capacity(events_capacity),
waker_buf: Vec::with_capacity(events_capacity.min(256)),
stats: IoStats::default(),
}
}
#[must_use]
pub fn reactor(&self) -> &Arc<dyn Reactor> {
&self.reactor
}
pub fn register(
&mut self,
source: &dyn Source,
interest: Interest,
waker: Waker,
) -> io::Result<Token> {
let slab_key = self.wakers.insert(waker);
let io_token = Token::new(slab_key.to_usize());
match self.reactor.register(source, io_token, interest) {
Ok(()) => {
self.interests.insert(io_token, interest);
self.stats.registrations += 1;
Ok(io_token)
}
Err(e) => {
let _ = self.wakers.remove(slab_key);
Err(e)
}
}
}
pub fn register_waker(&mut self, waker: Waker) -> Token {
let slab_key = self.wakers.insert(waker);
self.stats.registrations += 1;
Token::new(slab_key.to_usize())
}
pub fn update_waker(&mut self, token: Token, waker: Waker) -> bool {
let slab_key = SlabToken::from_usize(token.0);
self.wakers.get_mut(slab_key).is_some_and(|slot| {
if !slot.will_wake(&waker) {
*slot = waker;
}
true
})
}
pub fn modify_interest(&mut self, token: Token, interest: Interest) -> io::Result<()> {
self.reactor.modify(token, interest)?;
self.interests.insert(token, interest);
Ok(())
}
pub fn deregister(&mut self, token: Token) -> io::Result<()> {
let result = self.reactor.deregister(token);
let slab_key = SlabToken::from_usize(token.0);
if self.wakers.remove(slab_key).is_some() {
self.stats.deregistrations += 1;
}
self.interests.remove(&token);
match result {
Ok(()) => Ok(()),
Err(err) if err.kind() == io::ErrorKind::NotFound => Ok(()),
Err(err) => Err(err),
}
}
pub fn deregister_waker(&mut self, token: Token) {
let slab_key = SlabToken::from_usize(token.0);
if self.wakers.remove(slab_key).is_some() {
self.stats.deregistrations += 1;
}
}
pub fn turn(&mut self, timeout: Option<Duration>) -> io::Result<usize> {
self.turn_with(timeout, |_, _| {})
}
pub fn turn_with<F>(&mut self, timeout: Option<Duration>, mut on_event: F) -> io::Result<usize>
where
F: FnMut(&Event, Option<Interest>),
{
self.events.clear();
let n = self.reactor.poll(&mut self.events, timeout)?;
self.stats.polls += 1;
self.stats.events_received += n as u64;
self.waker_buf.clear();
let mut seen_tokens = smallvec::SmallVec::<[Token; 64]>::new();
for event in &self.events {
let interest = self.interests.get(&event.token).copied();
on_event(event, interest);
if seen_tokens.contains(&event.token) {
continue;
}
let slab_key = SlabToken::from_usize(event.token.0);
if let Some(waker) = self.wakers.get(slab_key) {
self.waker_buf.push(waker.clone());
seen_tokens.push(event.token);
self.stats.wakers_dispatched += 1;
} else {
self.stats.unknown_tokens += 1;
}
}
for waker in self.waker_buf.drain(..) {
waker.wake();
}
Ok(n)
}
pub(crate) fn take_events(&mut self) -> Events {
std::mem::take(&mut self.events)
}
pub(crate) fn restore_and_extract_wakers<F>(
&mut self,
events: Events,
mut on_event: F,
) -> smallvec::SmallVec<[Waker; 64]>
where
F: FnMut(&Event, Option<Interest>),
{
struct Restorer<'a> {
driver: &'a mut IoDriver,
events: Option<Events>,
}
impl Drop for Restorer<'_> {
fn drop(&mut self) {
if let Some(mut events) = self.events.take() {
events.clear();
self.driver.events = events;
}
}
}
let restorer = Restorer {
driver: self,
events: Some(events),
};
let events_ref = restorer
.events
.as_ref()
.expect("events should be Some during restore");
let mut wakers = smallvec::SmallVec::with_capacity(events_ref.len());
let mut seen_tokens = smallvec::SmallVec::<[Token; 64]>::new();
for event in events_ref {
let interest = restorer.driver.interests.get(&event.token).copied();
on_event(event, interest);
if seen_tokens.contains(&event.token) {
continue;
}
let slab_key = SlabToken::from_usize(event.token.0);
if let Some(waker) = restorer.driver.wakers.get(slab_key) {
wakers.push(waker.clone());
seen_tokens.push(event.token);
restorer.driver.stats.wakers_dispatched += 1;
} else {
restorer.driver.stats.unknown_tokens += 1;
}
}
wakers
}
pub(crate) fn restore_events_only(&mut self, mut events: Events) {
events.clear();
self.events = events;
}
pub fn wake(&self) -> io::Result<()> {
self.reactor.wake()
}
#[inline]
#[must_use]
pub fn stats(&self) -> &IoStats {
&self.stats
}
#[inline]
#[must_use]
pub fn waker_count(&self) -> usize {
self.wakers.len()
}
#[inline]
#[must_use]
pub fn is_empty(&self) -> bool {
self.wakers.is_empty()
}
}
#[derive(Clone)]
pub struct IoDriverHandle {
inner: Arc<Mutex<IoDriver>>,
reactor: Arc<dyn Reactor>,
is_polling: Arc<AtomicBool>,
}
struct PollingGuard<'a> {
handle: &'a IoDriverHandle,
events: Option<Events>,
clear_poll_flag: bool,
}
impl<'a> PollingGuard<'a> {
fn new(handle: &'a IoDriverHandle, events: Events, clear_poll_flag: bool) -> Self {
Self {
handle,
events: Some(events),
clear_poll_flag,
}
}
fn events_mut(&mut self) -> &mut Events {
self.events
.as_mut()
.expect("polling guard events must exist while polling")
}
fn take_events(&mut self) -> Events {
self.events
.take()
.expect("polling guard events must be present")
}
}
impl Drop for PollingGuard<'_> {
fn drop(&mut self) {
if let Some(events) = self.events.take() {
let mut driver = self.handle.inner.lock();
driver.restore_events_only(events);
}
if self.clear_poll_flag {
self.handle.is_polling.store(false, Ordering::Release);
}
}
}
impl std::fmt::Debug for IoDriverHandle {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IoDriverHandle")
.field("inner", &self.inner)
.field("reactor", &"<dyn Reactor>")
.field("is_polling", &self.is_polling.load(Ordering::Relaxed))
.finish()
}
}
impl IoDriverHandle {
#[must_use]
pub fn new(reactor: Arc<dyn Reactor>) -> Self {
Self {
inner: Arc::new(Mutex::new(IoDriver::new(reactor.clone()))),
reactor,
is_polling: Arc::new(AtomicBool::new(false)),
}
}
#[must_use]
pub fn with_capacity(reactor: Arc<dyn Reactor>, events_capacity: usize) -> Self {
Self {
inner: Arc::new(Mutex::new(IoDriver::with_capacity(
reactor.clone(),
events_capacity,
))),
reactor,
is_polling: Arc::new(AtomicBool::new(false)),
}
}
pub fn register(
&self,
source: &dyn Source,
interest: Interest,
waker: Waker,
) -> io::Result<IoRegistration> {
let _ = self.reactor.wake();
let token = {
let mut driver = self.inner.lock();
driver.register(source, interest, waker)?
};
Ok(IoRegistration::new(
token,
Arc::downgrade(&self.inner),
interest,
self.reactor.clone(),
))
}
#[must_use]
pub fn update_waker(&self, token: Token, waker: Waker) -> bool {
let mut driver = self.inner.lock();
driver.update_waker(token, waker)
}
#[must_use]
pub fn is_empty(&self) -> bool {
let driver = self.inner.lock();
driver.is_empty()
}
#[must_use]
pub fn waker_count(&self) -> usize {
let driver = self.inner.lock();
driver.waker_count()
}
pub fn wake(&self) -> io::Result<()> {
self.reactor.wake()
}
#[must_use]
pub fn stats(&self) -> IoStats {
let driver = self.inner.lock();
driver.stats().clone()
}
pub fn turn_with<F>(&self, timeout: Option<Duration>, on_event: F) -> io::Result<usize>
where
F: FnMut(&Event, Option<Interest>),
{
self.try_turn_with(timeout, on_event)
.map(|polled| polled.unwrap_or(0))
}
pub fn try_turn_with<F>(
&self,
timeout: Option<Duration>,
on_event: F,
) -> io::Result<Option<usize>>
where
F: FnMut(&Event, Option<Interest>),
{
if self
.is_polling
.compare_exchange(false, true, Ordering::Acquire, Ordering::Relaxed)
.is_ok()
{
let events = {
let mut driver = self.inner.lock();
driver.take_events()
};
let mut guard = PollingGuard::new(self, events, true);
let poll_result = self.reactor.poll(guard.events_mut(), timeout);
let wakers = {
let mut driver = self.inner.lock();
let events = guard.take_events();
if let Ok(n) = &poll_result {
driver.stats.polls += 1;
driver.stats.events_received += *n as u64;
let wakers = driver.restore_and_extract_wakers(events, on_event);
drop(driver);
wakers
} else {
driver.restore_events_only(events);
drop(driver);
smallvec::SmallVec::new()
}
};
drop(guard);
for waker in wakers {
waker.wake();
}
poll_result.map(Some)
} else {
Ok(None)
}
}
pub fn lock(&self) -> parking_lot::MutexGuard<'_, IoDriver> {
self.inner.lock()
}
#[must_use]
pub fn try_lock(&self) -> Option<parking_lot::MutexGuard<'_, IoDriver>> {
self.inner.try_lock()
}
}
pub struct IoRegistration {
token: Token,
interest: Interest,
driver: Weak<Mutex<IoDriver>>,
reactor: Arc<dyn Reactor>,
cached_waker: Option<Waker>,
deregistered: bool,
}
impl IoRegistration {
fn new(
token: Token,
driver: Weak<Mutex<IoDriver>>,
interest: Interest,
reactor: Arc<dyn Reactor>,
) -> Self {
Self {
token,
interest,
driver,
reactor,
cached_waker: None,
deregistered: false,
}
}
fn wake_polling_reactor(&self) {
let _ = self.reactor.wake();
}
#[inline]
#[must_use]
pub fn token(&self) -> Token {
self.token
}
#[inline]
#[must_use]
pub fn interest(&self) -> Interest {
self.interest
}
#[inline]
#[must_use]
pub fn is_active(&self) -> bool {
self.driver.strong_count() > 0
}
pub fn set_interest(&mut self, interest: Interest) -> io::Result<()> {
self.wake_polling_reactor();
let Some(driver) = self.driver.upgrade() else {
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"I/O driver has been dropped",
));
};
{
let mut guard = driver.lock();
guard.modify_interest(self.token, interest)?;
}
self.interest = interest;
Ok(())
}
#[must_use]
pub fn update_waker(&self, waker: Waker) -> bool {
self.driver.upgrade().is_some_and(|driver| {
let mut guard = driver.lock();
guard.update_waker(self.token, waker)
})
}
pub fn rearm(&mut self, interest: Interest, waker: &Waker) -> io::Result<bool> {
self.wake_polling_reactor();
let Some(driver) = self.driver.upgrade() else {
return Err(io::Error::new(
io::ErrorKind::NotConnected,
"I/O driver has been dropped",
));
};
let mut guard = driver.lock();
guard.modify_interest(self.token, interest)?;
self.interest = interest;
if self
.cached_waker
.as_ref()
.is_none_or(|w| !w.will_wake(waker))
{
let slab_key = SlabToken::from_usize(self.token.0);
if let Some(slot) = guard.wakers.get_mut(slab_key) {
slot.clone_from(waker);
self.cached_waker = Some(waker.clone());
} else {
return Ok(false);
}
}
Ok(true)
}
pub fn deregister(mut self) -> io::Result<()> {
self.wake_polling_reactor();
if let Some(driver) = self.driver.upgrade() {
let first = {
let mut guard = driver.lock();
guard.deregister(self.token)
};
match first {
Ok(()) => {
self.deregistered = true;
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {
self.deregistered = true;
Ok(())
}
Err(first_err) => {
let second = {
let mut guard = driver.lock();
guard.deregister(self.token)
};
match second {
Ok(()) => {
self.deregistered = true;
Ok(())
}
Err(err) if err.kind() == io::ErrorKind::NotFound => {
self.deregistered = true;
Ok(())
}
Err(_second_err) => Err(first_err),
}
}
}
} else {
self.deregistered = true;
Ok(())
}
}
}
impl Drop for IoRegistration {
fn drop(&mut self) {
if self.deregistered {
return;
}
self.wake_polling_reactor();
if let Some(driver) = self.driver.upgrade() {
let first = {
let mut guard = driver.lock();
guard.deregister(self.token)
};
if first
.as_ref()
.is_err_and(|err| err.kind() != io::ErrorKind::NotFound)
{
let mut guard = driver.lock();
let _ = guard.deregister(self.token);
}
}
}
}
impl std::fmt::Debug for IoRegistration {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IoRegistration")
.field("token", &self.token)
.field("interest", &self.interest)
.field("active", &self.is_active())
.field("deregistered", &self.deregistered)
.finish_non_exhaustive()
}
}
impl std::fmt::Debug for IoDriver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("IoDriver")
.field("waker_count", &self.wakers.len())
.field("events_capacity", &self.events.capacity())
.field("stats", &self.stats)
.finish_non_exhaustive()
}
}
#[cfg(all(test, unix))]
mod tests {
use super::*;
use crate::runtime::reactor::{Event, Interest, LabReactor, Token};
use crate::test_utils::init_test_logging;
use std::collections::HashSet;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::sync::{Arc, Barrier, Condvar, Mutex as StdMutex};
use std::task::Wake;
struct FlagWaker {
flag: AtomicBool,
count: AtomicUsize,
}
impl Wake for FlagWaker {
fn wake(self: Arc<Self>) {
self.flag.store(true, Ordering::SeqCst);
self.count.fetch_add(1, Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.flag.store(true, Ordering::SeqCst);
self.count.fetch_add(1, Ordering::SeqCst);
}
}
fn create_test_waker() -> (Waker, Arc<FlagWaker>) {
let waker_state = Arc::new(FlagWaker {
flag: AtomicBool::new(false),
count: AtomicUsize::new(0),
});
let waker = Waker::from(waker_state.clone());
(waker, waker_state)
}
struct TestFdSource;
impl std::os::fd::AsRawFd for TestFdSource {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
0
}
}
struct NotFoundReactor;
impl Reactor for NotFoundReactor {
fn register(
&self,
_source: &dyn Source,
_token: Token,
_interest: Interest,
) -> io::Result<()> {
Ok(())
}
fn modify(&self, _token: Token, _interest: Interest) -> io::Result<()> {
Ok(())
}
fn deregister(&self, _token: Token) -> io::Result<()> {
Err(io::Error::new(io::ErrorKind::NotFound, "not registered"))
}
fn poll(&self, _events: &mut Events, _timeout: Option<Duration>) -> io::Result<usize> {
Ok(0)
}
fn wake(&self) -> io::Result<()> {
Ok(())
}
fn registration_count(&self) -> usize {
0
}
}
struct FlakyReactor {
deregister_calls: AtomicUsize,
}
impl FlakyReactor {
fn new() -> Self {
Self {
deregister_calls: AtomicUsize::new(0),
}
}
fn deregister_calls(&self) -> usize {
self.deregister_calls.load(Ordering::SeqCst)
}
}
impl Reactor for FlakyReactor {
fn register(
&self,
_source: &dyn Source,
_token: Token,
_interest: Interest,
) -> io::Result<()> {
Ok(())
}
fn modify(&self, _token: Token, _interest: Interest) -> io::Result<()> {
Ok(())
}
fn deregister(&self, _token: Token) -> io::Result<()> {
let call = self.deregister_calls.fetch_add(1, Ordering::SeqCst);
if call == 0 {
Err(io::Error::other("injected failure"))
} else {
Ok(())
}
}
fn poll(&self, _events: &mut Events, _timeout: Option<Duration>) -> io::Result<usize> {
Ok(0)
}
fn wake(&self) -> io::Result<()> {
Ok(())
}
fn registration_count(&self) -> usize {
0
}
}
struct AlwaysFailReactor {
deregister_calls: AtomicUsize,
}
impl AlwaysFailReactor {
fn new() -> Self {
Self {
deregister_calls: AtomicUsize::new(0),
}
}
fn deregister_calls(&self) -> usize {
self.deregister_calls.load(Ordering::SeqCst)
}
}
impl Reactor for AlwaysFailReactor {
fn register(
&self,
_source: &dyn Source,
_token: Token,
_interest: Interest,
) -> io::Result<()> {
Ok(())
}
fn modify(&self, _token: Token, _interest: Interest) -> io::Result<()> {
Ok(())
}
fn deregister(&self, _token: Token) -> io::Result<()> {
self.deregister_calls.fetch_add(1, Ordering::SeqCst);
Err(io::Error::other("persistent failure"))
}
fn poll(&self, _events: &mut Events, _timeout: Option<Duration>) -> io::Result<usize> {
Ok(0)
}
fn wake(&self) -> io::Result<()> {
Ok(())
}
fn registration_count(&self) -> usize {
0
}
}
struct ThirdTryReactor {
deregister_calls: AtomicUsize,
deregistered: AtomicBool,
}
impl ThirdTryReactor {
fn new() -> Self {
Self {
deregister_calls: AtomicUsize::new(0),
deregistered: AtomicBool::new(false),
}
}
fn deregister_calls(&self) -> usize {
self.deregister_calls.load(Ordering::SeqCst)
}
fn was_deregistered(&self) -> bool {
self.deregistered.load(Ordering::SeqCst)
}
}
impl Reactor for ThirdTryReactor {
fn register(
&self,
_source: &dyn Source,
_token: Token,
_interest: Interest,
) -> io::Result<()> {
Ok(())
}
fn modify(&self, _token: Token, _interest: Interest) -> io::Result<()> {
Ok(())
}
fn deregister(&self, _token: Token) -> io::Result<()> {
let call = self.deregister_calls.fetch_add(1, Ordering::SeqCst);
if call < 2 {
Err(io::Error::other("persistent failure"))
} else {
self.deregistered.store(true, Ordering::SeqCst);
Ok(())
}
}
fn poll(&self, _events: &mut Events, _timeout: Option<Duration>) -> io::Result<usize> {
Ok(0)
}
fn wake(&self) -> io::Result<()> {
Ok(())
}
fn registration_count(&self) -> usize {
0
}
}
struct PollErrorWithEventReactor {
emit_token: Mutex<Option<Token>>,
}
impl PollErrorWithEventReactor {
fn new() -> Self {
Self {
emit_token: Mutex::new(None),
}
}
fn set_emit_token(&self, token: Token) {
let mut slot = self.emit_token.lock();
*slot = Some(token);
}
}
impl Reactor for PollErrorWithEventReactor {
fn register(
&self,
_source: &dyn Source,
_token: Token,
_interest: Interest,
) -> io::Result<()> {
Ok(())
}
fn modify(&self, _token: Token, _interest: Interest) -> io::Result<()> {
Ok(())
}
fn deregister(&self, _token: Token) -> io::Result<()> {
Ok(())
}
fn poll(&self, events: &mut Events, _timeout: Option<Duration>) -> io::Result<usize> {
let emit_token = {
let guard = self.emit_token.lock();
*guard
};
if let Some(token) = emit_token {
events.push(Event::readable(token));
}
Err(io::Error::other("injected poll failure"))
}
fn wake(&self) -> io::Result<()> {
Ok(())
}
fn registration_count(&self) -> usize {
0
}
}
struct BlockingFirstPollReactor {
poll_calls: AtomicUsize,
started: StdMutex<bool>,
started_cv: Condvar,
release_first_poll: AtomicBool,
}
impl BlockingFirstPollReactor {
fn new() -> Self {
Self {
poll_calls: AtomicUsize::new(0),
started: StdMutex::new(false),
started_cv: Condvar::new(),
release_first_poll: AtomicBool::new(false),
}
}
fn wait_until_first_poll_started(&self) {
let mut started_guard = self
.started
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
while !*started_guard {
started_guard = self.started_cv.wait(started_guard).expect("started wait");
}
drop(started_guard);
}
fn release_first_poll(&self) {
self.release_first_poll.store(true, Ordering::Release);
}
fn poll_calls(&self) -> usize {
self.poll_calls.load(Ordering::SeqCst)
}
}
impl Reactor for BlockingFirstPollReactor {
fn register(
&self,
_source: &dyn Source,
_token: Token,
_interest: Interest,
) -> io::Result<()> {
Ok(())
}
fn modify(&self, _token: Token, _interest: Interest) -> io::Result<()> {
Ok(())
}
fn deregister(&self, _token: Token) -> io::Result<()> {
Ok(())
}
fn poll(&self, _events: &mut Events, _timeout: Option<Duration>) -> io::Result<usize> {
let call = self.poll_calls.fetch_add(1, Ordering::SeqCst);
if call == 0 {
{
let mut started = self
.started
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*started = true;
}
self.started_cv.notify_all();
while !self.release_first_poll.load(Ordering::Acquire) {
std::thread::yield_now();
}
}
Ok(0)
}
fn wake(&self) -> io::Result<()> {
Ok(())
}
fn registration_count(&self) -> usize {
0
}
}
struct WakeTrackingBlockingReactor {
started: StdMutex<bool>,
started_cv: Condvar,
release_poll: AtomicBool,
wake_calls: AtomicUsize,
}
impl WakeTrackingBlockingReactor {
fn new() -> Self {
Self {
started: StdMutex::new(false),
started_cv: Condvar::new(),
release_poll: AtomicBool::new(false),
wake_calls: AtomicUsize::new(0),
}
}
fn wait_until_poll_started(&self) {
let mut started_guard = self
.started
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
while !*started_guard {
started_guard = self.started_cv.wait(started_guard).expect("started wait");
}
drop(started_guard);
}
fn release_poll(&self) {
self.release_poll.store(true, Ordering::Release);
}
fn wake_calls(&self) -> usize {
self.wake_calls.load(Ordering::SeqCst)
}
}
impl Reactor for WakeTrackingBlockingReactor {
fn register(
&self,
_source: &dyn Source,
_token: Token,
_interest: Interest,
) -> io::Result<()> {
Ok(())
}
fn modify(&self, _token: Token, _interest: Interest) -> io::Result<()> {
Ok(())
}
fn deregister(&self, _token: Token) -> io::Result<()> {
Ok(())
}
fn poll(&self, _events: &mut Events, _timeout: Option<Duration>) -> io::Result<usize> {
{
let mut started = self
.started
.lock()
.unwrap_or_else(std::sync::PoisonError::into_inner);
*started = true;
}
self.started_cv.notify_all();
while !self.release_poll.load(Ordering::Acquire)
&& self.wake_calls.load(Ordering::Acquire) == 0
{
std::thread::yield_now();
}
Ok(0)
}
fn wake(&self) -> io::Result<()> {
self.wake_calls.fetch_add(1, Ordering::SeqCst);
Ok(())
}
fn registration_count(&self) -> usize {
0
}
}
fn init_test(name: &str) {
init_test_logging();
crate::test_phase!(name);
}
fn render_interest_snapshot(interest: Option<Interest>) -> &'static str {
match interest {
Some(interest) if interest.is_readable() && interest.is_writable() => {
"readable|writable"
}
Some(interest) if interest.is_readable() => "readable",
Some(interest) if interest.is_writable() => "writable",
Some(_) => "empty",
None => "untracked",
}
}
fn render_ready_snapshot(event: &Event) -> &'static str {
match (event.is_readable(), event.is_writable()) {
(true, true) => "readable|writable",
(true, false) => "readable",
(false, true) => "writable",
(false, false) => "empty",
}
}
fn render_io_driver_metrics_snapshot(
stats: &IoStats,
waker_count: usize,
captured_events: &[String],
) -> String {
let mut snapshot = format!(
"polls: {}\n\
events_received: {}\n\
wakers_dispatched: {}\n\
unknown_tokens: {}\n\
registrations: {}\n\
deregistrations: {}\n\
live_wakers: {}\n",
stats.polls,
stats.events_received,
stats.wakers_dispatched,
stats.unknown_tokens,
stats.registrations,
stats.deregistrations,
waker_count
);
if !captured_events.is_empty() {
snapshot.push_str("events:\n");
for event in captured_events {
snapshot.push_str("- ");
snapshot.push_str(event);
snapshot.push('\n');
}
}
snapshot
}
#[test]
fn io_driver_new() {
init_test("io_driver_new");
let reactor = Arc::new(LabReactor::new());
let driver = IoDriver::new(reactor);
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
crate::assert_with_log!(
driver.waker_count() == 0,
"waker count",
0usize,
driver.waker_count()
);
crate::assert_with_log!(
driver.stats().polls == 0,
"polls",
0usize,
driver.stats().polls
);
crate::test_complete!("io_driver_new");
}
#[test]
fn io_driver_with_capacity() {
init_test("io_driver_with_capacity");
let reactor = Arc::new(LabReactor::new());
let driver = IoDriver::with_capacity(reactor, 256);
crate::assert_with_log!(
driver.events.capacity() == 256,
"events capacity",
256usize,
driver.events.capacity()
);
crate::assert_with_log!(
driver.interests.capacity() >= 256,
"interest map capacity",
true,
driver.interests.capacity() >= 256
);
crate::test_complete!("io_driver_with_capacity");
}
#[test]
fn io_driver_register_full_flow() {
init_test("io_driver_register_full_flow");
let reactor = Arc::new(LabReactor::new());
let mut driver = IoDriver::new(reactor);
let source = TestFdSource;
let (waker, _) = create_test_waker();
let token = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
crate::assert_with_log!(
driver.waker_count() == 1,
"waker count",
1usize,
driver.waker_count()
);
crate::assert_with_log!(
!driver.is_empty(),
"driver not empty",
false,
driver.is_empty()
);
crate::assert_with_log!(
driver.stats().registrations == 1,
"registrations",
1usize,
driver.stats().registrations
);
crate::assert_with_log!(token.0 == 0, "token id", 0usize, token.0);
crate::test_complete!("io_driver_register_full_flow");
}
#[test]
fn io_driver_deregister() {
init_test("io_driver_deregister");
let reactor = Arc::new(LabReactor::new());
let mut driver = IoDriver::new(reactor);
let source = TestFdSource;
let (waker, _) = create_test_waker();
let token = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
crate::assert_with_log!(
driver.waker_count() == 1,
"waker count",
1usize,
driver.waker_count()
);
driver.deregister(token).expect("deregister should succeed");
crate::assert_with_log!(
driver.waker_count() == 0,
"waker count",
0usize,
driver.waker_count()
);
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
crate::assert_with_log!(
driver.stats().deregistrations == 1,
"deregistrations",
1usize,
driver.stats().deregistrations
);
crate::test_complete!("io_driver_deregister");
}
#[test]
fn io_driver_deregister_not_found_cleans_waker() {
init_test("io_driver_deregister_not_found_cleans_waker");
let reactor = Arc::new(NotFoundReactor);
let mut driver = IoDriver::new(reactor);
let (waker, _) = create_test_waker();
let token = driver.register_waker(waker);
crate::assert_with_log!(
driver.waker_count() == 1,
"waker count",
1usize,
driver.waker_count()
);
let result = driver.deregister(token);
crate::assert_with_log!(result.is_ok(), "deregister ok", true, result.is_ok());
crate::assert_with_log!(
driver.waker_count() == 0,
"waker count",
0usize,
driver.waker_count()
);
crate::assert_with_log!(
driver.stats().deregistrations == 1,
"deregistrations",
1usize,
driver.stats().deregistrations
);
crate::test_complete!("io_driver_deregister_not_found_cleans_waker");
}
#[test]
fn io_driver_update_waker() {
init_test("io_driver_update_waker");
let reactor = Arc::new(LabReactor::new());
let mut driver = IoDriver::new(reactor);
let (waker1, _) = create_test_waker();
let (waker2, _) = create_test_waker();
let token = driver.register_waker(waker1);
let updated = driver.update_waker(token, waker2.clone());
crate::assert_with_log!(updated, "update succeeds", true, updated);
let updated_missing = driver.update_waker(Token::new(999), waker2);
crate::assert_with_log!(
!updated_missing,
"update missing fails",
false,
updated_missing
);
crate::test_complete!("io_driver_update_waker");
}
#[test]
fn io_registration_deregister_transient_error_returns_ok_after_retry() {
init_test("io_registration_deregister_transient_error_returns_ok_after_retry");
let reactor = Arc::new(FlakyReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = IoDriverHandle::new(reactor_handle);
let source = TestFdSource;
let (waker, _) = create_test_waker();
let reg = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
let result = reg.deregister();
crate::assert_with_log!(result.is_ok(), "deregister succeeds", true, result.is_ok());
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
let calls = reactor.deregister_calls();
crate::assert_with_log!(calls == 2, "deregister retried", 2usize, calls);
crate::test_complete!("io_registration_deregister_transient_error_returns_ok_after_retry");
}
#[test]
fn io_registration_drop_retries_transient_deregister_error() {
init_test("io_registration_drop_retries_transient_deregister_error");
let reactor = Arc::new(FlakyReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = IoDriverHandle::new(reactor_handle);
let source = TestFdSource;
let (waker, _) = create_test_waker();
{
let reg = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
drop(reg);
}
let calls = reactor.deregister_calls();
crate::assert_with_log!(
calls == 2,
"drop path retries transient deregister failure",
2usize,
calls
);
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
crate::test_complete!("io_registration_drop_retries_transient_deregister_error");
}
#[test]
fn io_registration_deregister_persistent_error_returns_err() {
init_test("io_registration_deregister_persistent_error_returns_err");
let reactor = Arc::new(AlwaysFailReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = IoDriverHandle::new(reactor_handle);
let source = TestFdSource;
let (waker, _) = create_test_waker();
let reg = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
let result = reg.deregister();
crate::assert_with_log!(
result.is_err(),
"persistent deregister failure surfaces error",
true,
result.is_err()
);
let calls = reactor.deregister_calls();
crate::assert_with_log!(calls == 4, "four total deregister attempts", 4usize, calls);
crate::assert_with_log!(
driver.is_empty(),
"driver cleans up local registration after persistent failure",
true,
driver.is_empty()
);
crate::test_complete!("io_registration_deregister_persistent_error_returns_err");
}
#[test]
fn io_registration_deregister_error_still_allows_drop_cleanup_success() {
init_test("io_registration_deregister_error_still_allows_drop_cleanup_success");
let reactor = Arc::new(ThirdTryReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = IoDriverHandle::new(reactor_handle);
let source = TestFdSource;
let (waker, _) = create_test_waker();
let reg = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
let result = reg.deregister();
crate::assert_with_log!(
result.is_err(),
"explicit deregister still reports the two-attempt failure",
true,
result.is_err()
);
let was = reactor.was_deregistered();
crate::assert_with_log!(
was,
"drop cleanup gets a final successful deregister attempt",
true,
was
);
let calls = reactor.deregister_calls();
crate::assert_with_log!(
calls == 3,
"two explicit attempts plus one drop cleanup attempt",
3usize,
calls
);
crate::assert_with_log!(driver.is_empty(), "driver empty", true, driver.is_empty());
crate::test_complete!("io_registration_deregister_error_still_allows_drop_cleanup_success");
}
#[test]
fn io_driver_turn_dispatches_wakers() {
init_test("io_driver_turn_dispatches_wakers");
let reactor = Arc::new(LabReactor::new());
let source = TestFdSource;
let (waker, waker_state) = create_test_waker();
let mut driver = IoDriver::new(reactor.clone());
let token = driver.register_waker(waker);
reactor
.register(&source, token, Interest::READABLE)
.expect("register should succeed");
reactor.inject_event(token, Event::readable(token), Duration::ZERO);
let initial = waker_state.flag.load(Ordering::SeqCst);
crate::assert_with_log!(!initial, "waker not yet woken", false, initial);
let count = driver
.turn(Some(Duration::from_millis(10)))
.expect("turn should succeed");
crate::assert_with_log!(count == 1, "event count", 1usize, count);
let flag = waker_state.flag.load(Ordering::SeqCst);
crate::assert_with_log!(flag, "waker fired", true, flag);
let wake_count = waker_state.count.load(Ordering::SeqCst);
crate::assert_with_log!(wake_count == 1, "wake count", 1usize, wake_count);
crate::assert_with_log!(
driver.stats().polls == 1,
"polls",
1usize,
driver.stats().polls
);
crate::assert_with_log!(
driver.stats().events_received == 1,
"events received",
1usize,
driver.stats().events_received
);
crate::assert_with_log!(
driver.stats().wakers_dispatched == 1,
"wakers dispatched",
1usize,
driver.stats().wakers_dispatched
);
crate::assert_with_log!(
driver.stats().unknown_tokens == 0,
"unknown tokens",
0usize,
driver.stats().unknown_tokens
);
crate::test_complete!("io_driver_turn_dispatches_wakers");
}
#[test]
fn io_driver_turn_handles_unknown_tokens() {
init_test("io_driver_turn_handles_unknown_tokens");
let reactor = Arc::new(LabReactor::new());
let source = TestFdSource;
let io_token = Token::new(999);
reactor
.register(&source, io_token, Interest::READABLE)
.expect("register should succeed");
reactor.inject_event(io_token, Event::readable(io_token), Duration::ZERO);
let mut driver = IoDriver::new(reactor);
let count = driver
.turn(Some(Duration::from_millis(10)))
.expect("turn should succeed");
crate::assert_with_log!(count == 1, "event count", 1usize, count);
crate::assert_with_log!(
driver.stats().events_received == 1,
"events received",
1usize,
driver.stats().events_received
);
crate::assert_with_log!(
driver.stats().wakers_dispatched == 0,
"wakers dispatched",
0usize,
driver.stats().wakers_dispatched
);
crate::assert_with_log!(
driver.stats().unknown_tokens == 1,
"unknown tokens",
1usize,
driver.stats().unknown_tokens
);
crate::test_complete!("io_driver_turn_handles_unknown_tokens");
}
#[test]
fn io_driver_stale_token_does_not_wake_new_waker() {
init_test("io_driver_stale_token_does_not_wake_new_waker");
let reactor = Arc::new(LabReactor::new());
let source = TestFdSource;
let mut driver = IoDriver::new(reactor.clone());
let (waker1, _) = create_test_waker();
let token1 = driver.register_waker(waker1);
driver.deregister_waker(token1);
let (waker2, state2) = create_test_waker();
let token2 = driver.register_waker(waker2);
crate::assert_with_log!(token1 != token2, "token rotates", true, token1 != token2);
reactor
.register(&source, token1, Interest::READABLE)
.expect("register should succeed");
reactor.inject_event(token1, Event::readable(token1), Duration::ZERO);
let count = driver
.turn(Some(Duration::from_millis(10)))
.expect("turn should succeed");
crate::assert_with_log!(count == 1, "event count", 1usize, count);
let flag2 = state2.flag.load(Ordering::SeqCst);
crate::assert_with_log!(!flag2, "new waker not fired", false, flag2);
crate::assert_with_log!(
driver.stats().unknown_tokens == 1,
"unknown tokens",
1usize,
driver.stats().unknown_tokens
);
crate::test_complete!("io_driver_stale_token_does_not_wake_new_waker");
}
#[test]
fn metamorphic_cancelled_registration_handoff_releases_old_token() {
init_test("metamorphic_cancelled_registration_handoff_releases_old_token");
let reactor = Arc::new(LabReactor::new());
let source = TestFdSource;
let mut driver = IoDriver::new(reactor.clone());
let (stale_waker, stale_state) = create_test_waker();
let stale_token = driver
.register(&source, Interest::READABLE, stale_waker)
.expect("stale register should succeed");
driver
.deregister(stale_token)
.expect("cancelling stale registration should succeed");
reactor
.register(&source, stale_token, Interest::READABLE)
.expect("manual stale-token registration should succeed");
reactor.inject_event(stale_token, Event::readable(stale_token), Duration::ZERO);
let (fresh_waker, fresh_state) = create_test_waker();
let fresh_token = driver
.register(&source, Interest::READABLE, fresh_waker)
.expect("fresh register should succeed");
reactor.inject_event(fresh_token, Event::readable(fresh_token), Duration::ZERO);
let count = driver
.turn(Some(Duration::from_millis(10)))
.expect("turn should succeed");
crate::assert_with_log!(
stale_token != fresh_token,
"fresh registration must receive a distinct token after cancellation",
true,
stale_token != fresh_token
);
crate::assert_with_log!(count == 2, "event count", 2usize, count);
let stale_wakes = stale_state.count.load(Ordering::SeqCst);
crate::assert_with_log!(stale_wakes == 0, "stale wake count", 0usize, stale_wakes);
let fresh_wakes = fresh_state.count.load(Ordering::SeqCst);
crate::assert_with_log!(fresh_wakes == 1, "fresh wake count", 1usize, fresh_wakes);
crate::assert_with_log!(
driver.stats().unknown_tokens == 1,
"stale event becomes unknown after cancellation",
1usize,
driver.stats().unknown_tokens
);
crate::assert_with_log!(
driver.waker_count() == 1,
"only the fresh registration remains live",
1usize,
driver.waker_count()
);
crate::test_complete!("metamorphic_cancelled_registration_handoff_releases_old_token");
}
#[test]
fn metamorphic_recycled_tokens_preserve_live_registration_bookkeeping() {
init_test("metamorphic_recycled_tokens_preserve_live_registration_bookkeeping");
let source = TestFdSource;
let control_reactor = Arc::new(LabReactor::new());
let mut control_driver = IoDriver::new(control_reactor.clone());
let (control_waker, control_state) = create_test_waker();
let control_token = control_driver
.register(&source, Interest::READABLE, control_waker)
.expect("control register should succeed");
control_reactor.inject_event(
control_token,
Event::readable(control_token),
Duration::ZERO,
);
let control_count = control_driver
.turn(Some(Duration::ZERO))
.expect("control turn should succeed");
let recycled_reactor = Arc::new(LabReactor::new());
let mut recycled_driver = IoDriver::new(recycled_reactor.clone());
let mut stale_tokens = Vec::new();
for cycle in 0..3 {
let (stale_waker, stale_state) = create_test_waker();
let stale_token = recycled_driver
.register(&source, Interest::READABLE, stale_waker)
.expect("stale register should succeed");
recycled_driver
.deregister(stale_token)
.expect("stale deregister should succeed");
crate::assert_with_log!(
stale_state.count.load(Ordering::SeqCst) == 0,
"recycled registration never wakes before removal",
0usize,
stale_state.count.load(Ordering::SeqCst)
);
crate::assert_with_log!(
recycled_driver.is_empty(),
"recycled driver empties after each deregister",
true,
recycled_driver.is_empty()
);
crate::assert_with_log!(
recycled_reactor.registration_count() == 0,
"reactor registrations return to baseline after recycle",
0usize,
recycled_reactor.registration_count()
);
crate::assert_with_log!(
!stale_tokens.contains(&stale_token),
"recycled token generation stays unique",
true,
!stale_tokens.contains(&stale_token)
);
stale_tokens.push(stale_token);
crate::assert_with_log!(
stale_tokens.len() == cycle + 1,
"recycled token tracked",
cycle + 1,
stale_tokens.len()
);
}
let (recycled_waker, recycled_state) = create_test_waker();
let recycled_live_token = recycled_driver
.register(&source, Interest::READABLE, recycled_waker)
.expect("recycled live register should succeed");
for stale_token in &stale_tokens {
recycled_reactor.inject_event(
*stale_token,
Event::readable(*stale_token),
Duration::ZERO,
);
}
recycled_reactor.inject_event(
recycled_live_token,
Event::readable(recycled_live_token),
Duration::ZERO,
);
let recycled_count = recycled_driver
.turn(Some(Duration::ZERO))
.expect("recycled turn should succeed");
let control_wakes = control_state.count.load(Ordering::SeqCst);
let recycled_wakes = recycled_state.count.load(Ordering::SeqCst);
crate::assert_with_log!(
control_count == 1,
"control event count",
1usize,
control_count
);
crate::assert_with_log!(
recycled_count == stale_tokens.len() + 1,
"recycled turn sees stale tokens plus the live one",
stale_tokens.len() + 1,
recycled_count
);
crate::assert_with_log!(
recycled_live_token != control_token,
"recycled live token differs from the fresh control token",
true,
recycled_live_token != control_token
);
crate::assert_with_log!(
control_wakes == recycled_wakes,
"recycled stale history preserves live wake count",
control_wakes,
recycled_wakes
);
crate::assert_with_log!(
control_wakes == 1,
"live registration still wakes exactly once",
1usize,
control_wakes
);
crate::assert_with_log!(
recycled_driver.waker_count() == control_driver.waker_count(),
"recycled bookkeeping preserves live waker count",
control_driver.waker_count(),
recycled_driver.waker_count()
);
crate::assert_with_log!(
recycled_reactor.registration_count() == control_reactor.registration_count(),
"reactor bookkeeping matches a fresh live registration",
control_reactor.registration_count(),
recycled_reactor.registration_count()
);
crate::assert_with_log!(
recycled_driver.stats().wakers_dispatched == control_driver.stats().wakers_dispatched,
"recycled bookkeeping preserves live dispatch count",
control_driver.stats().wakers_dispatched,
recycled_driver.stats().wakers_dispatched
);
crate::assert_with_log!(
recycled_driver.stats().unknown_tokens == stale_tokens.len() as u64,
"recycled stale tokens degrade to unknown events only",
stale_tokens.len() as u64,
recycled_driver.stats().unknown_tokens
);
crate::test_complete!("metamorphic_recycled_tokens_preserve_live_registration_bookkeeping");
}
#[test]
fn io_driver_wake() {
init_test("io_driver_wake");
let reactor = Arc::new(LabReactor::new());
let driver = IoDriver::new(reactor.clone());
driver.wake().expect("wake should succeed");
let woke = reactor.check_and_clear_wake();
crate::assert_with_log!(woke, "reactor woke", true, woke);
crate::test_complete!("io_driver_wake");
}
#[test]
fn io_driver_multiple_wakers() {
init_test("io_driver_multiple_wakers");
let reactor = Arc::new(LabReactor::new());
let source = TestFdSource;
let mut driver = IoDriver::new(reactor.clone());
let (waker1, state1) = create_test_waker();
let (waker2, state2) = create_test_waker();
let (waker3, state3) = create_test_waker();
let token1 = driver.register_waker(waker1);
let token2 = driver.register_waker(waker2);
let token3 = driver.register_waker(waker3);
crate::assert_with_log!(
driver.waker_count() == 3,
"waker count",
3usize,
driver.waker_count()
);
reactor
.register(&source, token1, Interest::READABLE)
.unwrap();
reactor
.register(&source, token2, Interest::READABLE)
.unwrap();
reactor
.register(&source, token3, Interest::READABLE)
.unwrap();
reactor.inject_event(token1, Event::readable(token1), Duration::ZERO);
reactor.inject_event(token3, Event::readable(token3), Duration::ZERO);
let count = driver
.turn(Some(Duration::from_millis(10)))
.expect("turn should succeed");
crate::assert_with_log!(count == 2, "event count", 2usize, count);
let flag1 = state1.flag.load(Ordering::SeqCst);
let flag2 = state2.flag.load(Ordering::SeqCst);
let flag3 = state3.flag.load(Ordering::SeqCst);
crate::assert_with_log!(flag1, "waker1 fired", true, flag1);
crate::assert_with_log!(!flag2, "waker2 not fired", false, flag2);
crate::assert_with_log!(flag3, "waker3 fired", true, flag3);
crate::assert_with_log!(
driver.stats().wakers_dispatched == 2,
"wakers dispatched",
2usize,
driver.stats().wakers_dispatched
);
crate::test_complete!("io_driver_multiple_wakers");
}
#[test]
fn metamorphic_duplicate_events_wake_once_per_registration() {
init_test("metamorphic_duplicate_events_wake_once_per_registration");
let reactor = Arc::new(LabReactor::new());
let source = TestFdSource;
let mut driver = IoDriver::new(reactor.clone());
let (waker, state) = create_test_waker();
let token = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
reactor.inject_event(token, Event::readable(token), Duration::ZERO);
reactor.inject_event(token, Event::readable(token), Duration::ZERO);
let count = driver
.turn(Some(Duration::from_millis(10)))
.expect("turn should succeed");
crate::assert_with_log!(count == 2, "event count", 2usize, count);
let wake_count = state.count.load(Ordering::SeqCst);
crate::assert_with_log!(
wake_count == 1,
"duplicate events wake once per registration",
1usize,
wake_count
);
crate::assert_with_log!(
driver.stats().wakers_dispatched == 1,
"wakers dispatched",
1usize,
driver.stats().wakers_dispatched
);
crate::test_complete!("metamorphic_duplicate_events_wake_once_per_registration");
}
#[test]
fn io_driver_reactor_metrics_snapshot() {
init_test("io_driver_reactor_metrics_snapshot");
let reactor = Arc::new(LabReactor::new());
let source = TestFdSource;
let mut driver = IoDriver::new(reactor.clone());
let (readable_waker, readable_state) = create_test_waker();
let readable_token = driver
.register(&source, Interest::READABLE, readable_waker)
.expect("readable register should succeed");
let (writable_waker, writable_state) = create_test_waker();
let writable_token = driver
.register(&source, Interest::WRITABLE, writable_waker)
.expect("writable register should succeed");
let unknown_token = Token::new(77);
reactor
.register(&source, unknown_token, Interest::READABLE)
.expect("manual unknown-token register should succeed");
reactor.inject_event(
readable_token,
Event::readable(readable_token),
Duration::ZERO,
);
reactor.inject_event(
writable_token,
Event::writable(writable_token),
Duration::ZERO,
);
reactor.inject_event(
unknown_token,
Event::readable(unknown_token),
Duration::ZERO,
);
let mut captured_events = Vec::new();
let first_turn = driver
.turn_with(Some(Duration::ZERO), |event, interest| {
captured_events.push(format!(
"turn=1 token={} ready={} interest={}",
event.token.0,
render_ready_snapshot(event),
render_interest_snapshot(interest)
));
})
.expect("first turn should succeed");
crate::assert_with_log!(first_turn == 3, "first turn count", 3usize, first_turn);
driver
.deregister(writable_token)
.expect("writable deregister should succeed");
reactor.inject_event(
readable_token,
Event::readable(readable_token),
Duration::ZERO,
);
reactor.inject_event(
readable_token,
Event::readable(readable_token),
Duration::ZERO,
);
let second_turn = driver
.turn_with(Some(Duration::ZERO), |event, interest| {
captured_events.push(format!(
"turn=2 token={} ready={} interest={}",
event.token.0,
render_ready_snapshot(event),
render_interest_snapshot(interest)
));
})
.expect("second turn should succeed");
crate::assert_with_log!(second_turn == 2, "second turn count", 2usize, second_turn);
let readable_wakes = readable_state.count.load(Ordering::SeqCst);
crate::assert_with_log!(
readable_wakes == 2,
"readable waker fires once per turn",
2usize,
readable_wakes
);
let writable_wakes = writable_state.count.load(Ordering::SeqCst);
crate::assert_with_log!(
writable_wakes == 1,
"writable waker fires once before deregister",
1usize,
writable_wakes
);
insta::assert_snapshot!(
"io_driver_reactor_metrics_snapshot",
render_io_driver_metrics_snapshot(
driver.stats(),
driver.waker_count(),
&captured_events
)
);
}
#[test]
fn io_driver_handle_turn_with_poll_error_does_not_dispatch() {
init_test("io_driver_handle_turn_with_poll_error_does_not_dispatch");
let reactor = Arc::new(PollErrorWithEventReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = IoDriverHandle::new(reactor_handle);
let (waker, waker_state) = create_test_waker();
let token = {
let mut guard = driver.lock();
guard.register_waker(waker)
};
reactor.set_emit_token(token);
let result = driver.turn_with(Some(Duration::ZERO), |_event, _interest| {});
crate::assert_with_log!(
result.is_err(),
"turn_with propagates poll error",
true,
result.is_err()
);
let fired = waker_state.flag.load(Ordering::SeqCst);
crate::assert_with_log!(!fired, "waker not fired", false, fired);
let wake_count = waker_state.count.load(Ordering::SeqCst);
crate::assert_with_log!(wake_count == 0, "wake count", 0usize, wake_count);
let stats = driver.stats();
crate::assert_with_log!(stats.polls == 0, "polls", 0usize, stats.polls);
crate::assert_with_log!(
stats.events_received == 0,
"events received",
0usize,
stats.events_received
);
crate::assert_with_log!(
stats.wakers_dispatched == 0,
"wakers dispatched",
0usize,
stats.wakers_dispatched
);
crate::assert_with_log!(
stats.unknown_tokens == 0,
"unknown tokens",
0usize,
stats.unknown_tokens
);
crate::assert_with_log!(
driver.waker_count() == 1,
"waker remains registered",
1usize,
driver.waker_count()
);
crate::test_complete!("io_driver_handle_turn_with_poll_error_does_not_dispatch");
}
#[test]
fn io_driver_handle_turn_with_skips_concurrent_poll() {
init_test("io_driver_handle_turn_with_skips_concurrent_poll");
let reactor = Arc::new(BlockingFirstPollReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = IoDriverHandle::new(reactor_handle);
let driver_clone = driver.clone();
let join = std::thread::spawn(move || {
let result = driver_clone.try_turn_with(Some(Duration::ZERO), |_event, _interest| {});
crate::assert_with_log!(
matches!(result, Ok(Some(0))),
"leader poll completes",
true,
matches!(result, Ok(Some(0)))
);
});
reactor.wait_until_first_poll_started();
let busy_turn = driver
.turn_with(Some(Duration::ZERO), |_event, _interest| {})
.expect("turn_with should return Ok when busy");
crate::assert_with_log!(busy_turn == 0, "busy turn returns zero", 0usize, busy_turn);
crate::assert_with_log!(
reactor.poll_calls() == 1,
"no second concurrent poll",
1usize,
reactor.poll_calls()
);
reactor.release_first_poll();
join.join().expect("poll thread should join");
crate::test_complete!("io_driver_handle_turn_with_skips_concurrent_poll");
}
#[test]
fn io_driver_handle_register_wakes_inflight_poll() {
init_test("io_driver_handle_register_wakes_inflight_poll");
let reactor = Arc::new(WakeTrackingBlockingReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = IoDriverHandle::new(reactor_handle);
let driver_clone = driver.clone();
let join = std::thread::spawn(move || {
let result = driver_clone.try_turn_with(Some(Duration::ZERO), |_event, _interest| {});
crate::assert_with_log!(
matches!(result, Ok(Some(0))),
"leader poll completes",
true,
matches!(result, Ok(Some(0)))
);
});
reactor.wait_until_poll_started();
let source = TestFdSource;
let (waker, _) = create_test_waker();
let reg = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
let wake_calls = reactor.wake_calls();
crate::assert_with_log!(
wake_calls >= 1,
"register wakes in-flight poll",
true,
wake_calls >= 1
);
reactor.release_poll();
join.join().expect("poll thread should join");
drop(reg);
crate::test_complete!("io_driver_handle_register_wakes_inflight_poll");
}
#[test]
fn io_driver_handle_register_preemptively_wakes_reactor() {
init_test("io_driver_handle_register_preemptively_wakes_reactor");
let reactor = Arc::new(WakeTrackingBlockingReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = IoDriverHandle::new(reactor_handle);
let source = TestFdSource;
let (waker, _) = create_test_waker();
let reg = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
let wake_calls = reactor.wake_calls();
crate::assert_with_log!(
wake_calls >= 1,
"register preemptively wakes reactor",
true,
wake_calls >= 1
);
drop(reg);
crate::test_complete!("io_driver_handle_register_preemptively_wakes_reactor");
}
#[test]
fn io_registration_rearm_wakes_inflight_poll() {
init_test("io_registration_rearm_wakes_inflight_poll");
let reactor = Arc::new(WakeTrackingBlockingReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = IoDriverHandle::new(reactor_handle);
let source = TestFdSource;
let (waker, _) = create_test_waker();
let mut reg = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
let driver_clone = driver.clone();
let join = std::thread::spawn(move || {
let result = driver_clone.try_turn_with(Some(Duration::ZERO), |_event, _interest| {});
crate::assert_with_log!(
matches!(result, Ok(Some(0))),
"leader poll completes",
true,
matches!(result, Ok(Some(0)))
);
});
reactor.wait_until_poll_started();
let (new_waker, _) = create_test_waker();
reg.rearm(Interest::READABLE, &new_waker)
.expect("rearm should succeed");
let wake_calls = reactor.wake_calls();
crate::assert_with_log!(
wake_calls >= 1,
"rearm wakes in-flight poll",
true,
wake_calls >= 1
);
reactor.release_poll();
join.join().expect("poll thread should join");
drop(reg);
crate::assert_with_log!(
driver.is_empty(),
"registration cleaned up after rearm test",
true,
driver.is_empty()
);
crate::test_complete!("io_registration_rearm_wakes_inflight_poll");
}
#[test]
fn io_registration_rearm_preemptively_wakes_reactor() {
init_test("io_registration_rearm_preemptively_wakes_reactor");
let reactor = Arc::new(WakeTrackingBlockingReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = IoDriverHandle::new(reactor_handle);
let source = TestFdSource;
let (waker, _) = create_test_waker();
let mut reg = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
let baseline = reactor.wake_calls();
let (new_waker, _) = create_test_waker();
reg.rearm(Interest::READABLE, &new_waker)
.expect("rearm should succeed");
let wake_calls = reactor.wake_calls();
crate::assert_with_log!(
wake_calls > baseline,
"rearm preemptively wakes reactor",
true,
wake_calls > baseline
);
crate::test_complete!("io_registration_rearm_preemptively_wakes_reactor");
}
#[test]
fn metamorphic_concurrent_registrations_preserve_token_uniqueness() {
init_test("metamorphic_concurrent_registrations_preserve_token_uniqueness");
let reactor = Arc::new(LabReactor::new());
let reactor_handle: Arc<dyn Reactor> = reactor.clone();
let driver = Arc::new(IoDriverHandle::new(reactor_handle));
let registrations = 12usize;
let start = Arc::new(Barrier::new(registrations + 1));
let release = Arc::new(Barrier::new(registrations + 1));
let (tx, rx) = std::sync::mpsc::channel();
let handles: Vec<_> = (0..registrations)
.map(|_| {
let driver = Arc::clone(&driver);
let start = Arc::clone(&start);
let release = Arc::clone(&release);
let tx = tx.clone();
std::thread::spawn(move || {
let source = TestFdSource;
let (waker, _) = create_test_waker();
start.wait();
let registration = driver
.register(&source, Interest::READABLE, waker)
.expect("register should succeed");
tx.send(registration.token())
.expect("token send should succeed");
release.wait();
drop(registration);
})
})
.collect();
drop(tx);
start.wait();
let tokens: Vec<_> = rx.iter().take(registrations).collect();
let unique: HashSet<_> = tokens.iter().copied().collect();
crate::assert_with_log!(
tokens.len() == registrations,
"registration count",
registrations,
tokens.len()
);
crate::assert_with_log!(
unique.len() == registrations,
"concurrent registrations keep tokens unique",
registrations,
unique.len()
);
crate::assert_with_log!(
driver.waker_count() == registrations,
"all concurrent registrations stay live until release",
registrations,
driver.waker_count()
);
release.wait();
for handle in handles {
handle.join().expect("registration thread should join");
}
crate::assert_with_log!(
driver.is_empty(),
"dropping concurrent registrations should clean up all tokens",
true,
driver.is_empty()
);
crate::test_complete!("metamorphic_concurrent_registrations_preserve_token_uniqueness");
}
#[test]
fn metamorphic_concurrent_register_drop_returns_to_zero_bookkeeping() {
init_test("metamorphic_concurrent_register_drop_returns_to_zero_bookkeeping");
let registrations = 8usize;
let source = TestFdSource;
let control_reactor = Arc::new(LabReactor::new());
let control_handle: Arc<dyn Reactor> = control_reactor.clone();
let control_driver = IoDriverHandle::new(control_handle);
for _ in 0..registrations {
let (waker, _) = create_test_waker();
let registration = control_driver
.register(&source, Interest::READABLE, waker)
.expect("sequential register should succeed");
drop(registration);
}
let control_stats = control_driver.stats();
let concurrent_reactor = Arc::new(LabReactor::new());
let concurrent_handle: Arc<dyn Reactor> = concurrent_reactor.clone();
let concurrent_driver = Arc::new(IoDriverHandle::new(concurrent_handle));
let start = Arc::new(Barrier::new(registrations + 1));
let release = Arc::new(Barrier::new(registrations + 1));
let handles: Vec<_> = (0..registrations)
.map(|_| {
let driver = Arc::clone(&concurrent_driver);
let start = Arc::clone(&start);
let release = Arc::clone(&release);
std::thread::spawn(move || {
let source = TestFdSource;
let (waker, _) = create_test_waker();
start.wait();
let registration = driver
.register(&source, Interest::READABLE, waker)
.expect("concurrent register should succeed");
release.wait();
drop(registration);
})
})
.collect();
start.wait();
release.wait();
for handle in handles {
handle.join().expect("registration thread should join");
}
let concurrent_stats = concurrent_driver.stats();
crate::assert_with_log!(
control_driver.is_empty(),
"sequential control returns to zero live registrations",
true,
control_driver.is_empty()
);
crate::assert_with_log!(
concurrent_driver.is_empty(),
"concurrent register/drop returns to zero live registrations",
true,
concurrent_driver.is_empty()
);
crate::assert_with_log!(
control_reactor.registration_count() == 0,
"sequential control returns reactor bookkeeping to zero",
0usize,
control_reactor.registration_count()
);
crate::assert_with_log!(
concurrent_reactor.registration_count() == 0,
"concurrent register/drop returns reactor bookkeeping to zero",
0usize,
concurrent_reactor.registration_count()
);
crate::assert_with_log!(
concurrent_stats.registrations == control_stats.registrations,
"concurrent register/drop preserves total registration count",
control_stats.registrations,
concurrent_stats.registrations
);
crate::assert_with_log!(
concurrent_stats.deregistrations == control_stats.deregistrations,
"concurrent register/drop preserves total deregistration count",
control_stats.deregistrations,
concurrent_stats.deregistrations
);
crate::assert_with_log!(
concurrent_stats.registrations == registrations as u64,
"all concurrent registrations are tracked",
registrations as u64,
concurrent_stats.registrations
);
crate::assert_with_log!(
concurrent_stats.deregistrations == registrations as u64,
"all concurrent drops are tracked",
registrations as u64,
concurrent_stats.deregistrations
);
crate::test_complete!("metamorphic_concurrent_register_drop_returns_to_zero_bookkeeping");
}
#[test]
fn io_driver_debug() {
init_test("io_driver_debug");
let reactor = Arc::new(LabReactor::new());
let driver = IoDriver::new(reactor);
let debug_text = format!("{driver:?}");
crate::assert_with_log!(
debug_text.contains("IoDriver"),
"debug contains type",
true,
debug_text.contains("IoDriver")
);
crate::assert_with_log!(
debug_text.contains("waker_count"),
"debug contains waker_count",
true,
debug_text.contains("waker_count")
);
crate::test_complete!("io_driver_debug");
}
#[test]
fn io_stats_default() {
init_test("io_stats_default");
let stats = IoStats::default();
crate::assert_with_log!(stats.polls == 0, "polls", 0usize, stats.polls);
crate::assert_with_log!(
stats.events_received == 0,
"events received",
0usize,
stats.events_received
);
crate::assert_with_log!(
stats.wakers_dispatched == 0,
"wakers dispatched",
0usize,
stats.wakers_dispatched
);
crate::assert_with_log!(
stats.unknown_tokens == 0,
"unknown tokens",
0usize,
stats.unknown_tokens
);
crate::assert_with_log!(
stats.registrations == 0,
"registrations",
0usize,
stats.registrations
);
crate::assert_with_log!(
stats.deregistrations == 0,
"deregistrations",
0usize,
stats.deregistrations
);
crate::test_complete!("io_stats_default");
}
#[cfg(target_os = "linux")]
mod epoll_integration {
use super::*;
use crate::runtime::reactor::EpollReactor;
use std::io::Write;
use std::os::unix::net::UnixStream;
#[test]
fn io_driver_with_epoll_reactor_dispatches_waker() {
super::init_test("io_driver_with_epoll_reactor_dispatches_waker");
let reactor = Arc::new(EpollReactor::new().expect("create reactor"));
let mut driver = IoDriver::new(reactor);
let (sock_read, mut sock_write) = UnixStream::pair().expect("create socket pair");
let (waker, waker_state) = create_test_waker();
let token = driver
.register(&sock_read, Interest::READABLE, waker)
.expect("register should succeed");
let initial = waker_state.flag.load(Ordering::SeqCst);
crate::assert_with_log!(!initial, "waker not yet woken", false, initial);
sock_write.write_all(b"hello").expect("write failed");
let count = driver
.turn(Some(Duration::from_millis(100)))
.expect("turn should succeed");
crate::assert_with_log!(count >= 1, "event count", true, count >= 1);
let flag = waker_state.flag.load(Ordering::SeqCst);
crate::assert_with_log!(flag, "waker fired", true, flag);
let wake_count = waker_state.count.load(Ordering::SeqCst);
crate::assert_with_log!(wake_count == 1, "wake count", 1usize, wake_count);
driver.deregister(token).expect("deregister should succeed");
crate::test_complete!("io_driver_with_epoll_reactor_dispatches_waker");
}
#[test]
fn io_driver_with_epoll_reactor_writable() {
super::init_test("io_driver_with_epoll_reactor_writable");
let reactor = Arc::new(EpollReactor::new().expect("create reactor"));
let mut driver = IoDriver::new(reactor);
let (sock1, _sock2) = UnixStream::pair().expect("create socket pair");
let (waker, waker_state) = create_test_waker();
let token = driver
.register(&sock1, Interest::WRITABLE, waker)
.expect("register should succeed");
let count = driver
.turn(Some(Duration::from_millis(100)))
.expect("turn should succeed");
crate::assert_with_log!(count >= 1, "event count", true, count >= 1);
let flag = waker_state.flag.load(Ordering::SeqCst);
crate::assert_with_log!(flag, "waker fired", true, flag);
driver.deregister(token).expect("deregister should succeed");
crate::test_complete!("io_driver_with_epoll_reactor_writable");
}
}
#[test]
fn io_stats_debug_clone_default() {
let s = IoStats::default();
let dbg = format!("{s:?}");
assert!(dbg.contains("IoStats"));
let s2 = s;
assert_eq!(s2.polls, 0);
assert_eq!(s2.events_received, 0);
assert_eq!(s2.registrations, 0);
}
}