use super::{Event, Interest, Reactor, Source, Token};
use crate::lab::chaos::{ChaosConfig, ChaosRng, ChaosStats};
use crate::tracing_compat::debug;
use crate::types::Time;
use parking_lot::Mutex;
use std::collections::{BTreeSet, BinaryHeap, HashMap};
use std::io;
use std::sync::atomic::{AtomicBool, Ordering};
use std::time::Duration;
fn duration_to_nanos_saturating(duration: Duration) -> u64 {
duration.as_nanos().min(u128::from(u64::MAX)) as u64
}
#[derive(Debug, PartialEq, Eq)]
struct TimedEvent {
time: Time,
sequence: u64,
event: Event,
delayed: bool,
}
impl PartialOrd for TimedEvent {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl Ord for TimedEvent {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
other
.time
.cmp(&self.time)
.then_with(|| other.sequence.cmp(&self.sequence))
}
}
#[derive(Debug, Clone, Default)]
pub struct FaultConfig {
pub pending_error: Option<io::ErrorKind>,
pub closed: bool,
pub partitioned: bool,
pub error_probability: f64,
pub error_kinds: Vec<io::ErrorKind>,
}
impl FaultConfig {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn with_error_probability(mut self, prob: f64) -> Self {
self.error_probability = prob.clamp(0.0, 1.0);
self
}
#[must_use]
pub fn with_error_kinds(mut self, kinds: Vec<io::ErrorKind>) -> Self {
self.error_kinds = kinds;
self
}
#[must_use]
pub fn with_partitioned(mut self, partitioned: bool) -> Self {
self.partitioned = partitioned;
self
}
#[must_use]
pub fn with_pending_error(mut self, kind: io::ErrorKind) -> Self {
self.pending_error = Some(kind);
self
}
#[must_use]
pub fn with_closed(mut self, closed: bool) -> Self {
self.closed = closed;
self
}
}
#[derive(Debug)]
struct FaultState {
config: FaultConfig,
rng: ChaosRng,
last_error_kind: Option<io::ErrorKind>,
injected_error_count: u64,
injected_close_count: u64,
dropped_event_count: u64,
}
impl FaultState {
fn new(token: Token, config: FaultConfig) -> Self {
let seed = token.0 as u64;
Self {
config,
rng: ChaosRng::new(seed),
last_error_kind: None,
injected_error_count: 0,
injected_close_count: 0,
dropped_event_count: 0,
}
}
fn should_inject_random_error(&mut self) -> bool {
let prob = self.config.error_probability;
if prob <= 0.0 || self.config.error_kinds.is_empty() {
return false;
}
if prob >= 1.0 {
return true;
}
self.rng.next_f64() < prob
}
fn next_error_kind(&mut self) -> Option<io::ErrorKind> {
if self.config.error_kinds.is_empty() {
return None;
}
let idx = (self.rng.next_u64() as usize) % self.config.error_kinds.len();
Some(self.config.error_kinds[idx])
}
}
#[derive(Debug)]
struct VirtualSocket {
interest: Interest,
fault: Option<FaultState>,
}
#[derive(Debug)]
struct LabChaos {
config: ChaosConfig,
rng: ChaosRng,
stats: ChaosStats,
last_io_error_kind: Option<io::ErrorKind>,
}
impl LabChaos {
fn new(config: ChaosConfig) -> Self {
Self {
rng: ChaosRng::from_config(&config),
stats: ChaosStats::new(),
last_io_error_kind: None,
config,
}
}
}
#[derive(Debug)]
pub struct LabReactor {
inner: Mutex<LabInner>,
woken: AtomicBool,
}
#[derive(Debug)]
struct LabInner {
sockets: HashMap<Token, VirtualSocket>,
pending: BinaryHeap<TimedEvent>,
time: Time,
next_sequence: u64,
chaos: Option<LabChaos>,
}
impl LabReactor {
#[must_use]
pub fn new() -> Self {
Self {
inner: Mutex::new(LabInner {
sockets: HashMap::new(),
pending: BinaryHeap::new(),
time: Time::ZERO,
next_sequence: 0,
chaos: None,
}),
woken: AtomicBool::new(false),
}
}
#[must_use]
pub fn with_chaos(config: ChaosConfig) -> Self {
Self {
inner: Mutex::new(LabInner {
sockets: HashMap::new(),
pending: BinaryHeap::new(),
time: Time::ZERO,
next_sequence: 0,
chaos: Some(LabChaos::new(config)),
}),
woken: AtomicBool::new(false),
}
}
pub fn inject_event(&self, token: Token, mut event: Event, delay: Duration) {
let mut inner = self.inner.lock();
let time = inner
.time
.saturating_add_nanos(duration_to_nanos_saturating(delay));
let sequence = inner.next_sequence;
inner.next_sequence = inner
.next_sequence
.checked_add(1)
.expect("lab reactor sequence counter exhausted");
event.token = token;
inner.pending.push(TimedEvent {
time,
sequence,
event,
delayed: false,
});
}
pub fn schedule_event(&self, token: Token, event: Event, delay: Duration) {
self.inject_event(token, event, delay);
}
pub fn set_ready(&self, token: Token, event: Event) {
self.inject_event(token, event, Duration::ZERO);
}
#[must_use]
pub fn now(&self) -> Time {
self.inner.lock().time
}
#[must_use]
pub fn next_event_time(&self) -> Option<Time> {
let inner = self.inner.lock();
inner.pending.peek().map(|event| event.time)
}
pub fn advance_time(&self, duration: Duration) {
let mut inner = self.inner.lock();
inner.time = inner
.time
.saturating_add_nanos(duration_to_nanos_saturating(duration));
}
pub fn advance_time_to(&self, target: Time) {
let mut inner = self.inner.lock();
if target > inner.time {
inner.time = target;
}
}
#[must_use]
pub fn chaos_stats(&self) -> ChaosStats {
self.inner
.lock()
.chaos
.as_ref()
.map_or_else(ChaosStats::new, |chaos| chaos.stats.clone())
}
#[must_use]
pub fn last_io_error_kind(&self) -> Option<io::ErrorKind> {
self.inner
.lock()
.chaos
.as_ref()
.and_then(|chaos| chaos.last_io_error_kind)
}
pub fn check_and_clear_wake(&self) -> bool {
self.woken.swap(false, Ordering::AcqRel)
}
pub fn set_fault_config(&self, token: Token, config: FaultConfig) -> io::Result<()> {
let mut inner = self.inner.lock();
match inner.sockets.get_mut(&token) {
Some(socket) => {
socket.fault = Some(FaultState::new(token, config));
Ok(())
}
None => Err(io::Error::new(
io::ErrorKind::NotFound,
"token not registered",
)),
}
}
pub fn clear_fault_config(&self, token: Token) -> io::Result<()> {
let mut inner = self.inner.lock();
match inner.sockets.get_mut(&token) {
Some(socket) => {
socket.fault = None;
Ok(())
}
None => Err(io::Error::new(
io::ErrorKind::NotFound,
"token not registered",
)),
}
}
pub fn inject_error(&self, token: Token, kind: io::ErrorKind) -> io::Result<()> {
let mut inner = self.inner.lock();
match inner.sockets.get_mut(&token) {
Some(socket) => {
if let Some(ref mut fault) = socket.fault {
fault.config.pending_error = Some(kind);
} else {
let mut config = FaultConfig::new();
config.pending_error = Some(kind);
socket.fault = Some(FaultState::new(token, config));
}
Ok(())
}
None => Err(io::Error::new(
io::ErrorKind::NotFound,
"token not registered",
)),
}
}
pub fn inject_close(&self, token: Token) -> io::Result<()> {
let mut inner = self.inner.lock();
if !inner.sockets.contains_key(&token) {
return Err(io::Error::new(
io::ErrorKind::NotFound,
"token not registered",
));
}
if let Some(socket) = inner.sockets.get_mut(&token) {
if let Some(ref mut fault) = socket.fault {
fault.config.closed = true;
fault.injected_close_count += 1;
} else {
let config = FaultConfig::new().with_closed(true);
let mut fault_state = FaultState::new(token, config);
fault_state.injected_close_count = 1;
socket.fault = Some(fault_state);
}
}
debug!(
target: "fault",
token = token.0,
injection = "close",
"injected connection close"
);
drop(inner);
Ok(())
}
pub fn partition(&self, token: Token, partitioned: bool) -> io::Result<()> {
let mut inner = self.inner.lock();
match inner.sockets.get_mut(&token) {
Some(socket) => {
if let Some(ref mut fault) = socket.fault {
fault.config.partitioned = partitioned;
} else if partitioned {
let config = FaultConfig::new().with_partitioned(true);
socket.fault = Some(FaultState::new(token, config));
}
debug!(
target: "fault",
token = token.0,
partitioned = partitioned,
"partition state changed"
);
Ok(())
}
None => Err(io::Error::new(
io::ErrorKind::NotFound,
"token not registered",
)),
}
}
pub fn last_injected_error(&self, token: Token) -> Option<io::ErrorKind> {
let inner = self.inner.lock();
inner
.sockets
.get(&token)
.and_then(|s| s.fault.as_ref())
.and_then(|f| f.last_error_kind)
}
pub fn fault_stats(&self, token: Token) -> Option<(u64, u64, u64)> {
let inner = self.inner.lock();
inner.sockets.get(&token).and_then(|s| {
s.fault.as_ref().map(|f| {
(
f.injected_error_count,
f.injected_close_count,
f.dropped_event_count,
)
})
})
}
}
impl Default for LabReactor {
fn default() -> Self {
Self::new()
}
}
impl Reactor for LabReactor {
fn register(&self, _source: &dyn Source, token: Token, interest: Interest) -> io::Result<()> {
let mut inner = self.inner.lock();
if inner.sockets.contains_key(&token) {
return Err(io::Error::new(
io::ErrorKind::AlreadyExists,
"token already registered",
));
}
inner.sockets.insert(
token,
VirtualSocket {
interest,
fault: None,
},
);
drop(inner);
Ok(())
}
fn modify(&self, token: Token, interest: Interest) -> io::Result<()> {
let mut inner = self.inner.lock();
match inner.sockets.get_mut(&token) {
Some(socket) => {
socket.interest = interest;
Ok(())
}
None => Err(io::Error::new(
io::ErrorKind::NotFound,
"token not registered",
)),
}
}
fn deregister(&self, token: Token) -> io::Result<()> {
let mut inner = self.inner.lock();
if inner.sockets.remove(&token).is_none() {
drop(inner);
return Err(io::Error::new(
io::ErrorKind::NotFound,
"token not registered",
));
}
let old_pending = std::mem::take(&mut inner.pending);
inner.pending = old_pending
.into_iter()
.filter(|te| te.event.token != token)
.collect();
drop(inner);
Ok(())
}
#[allow(clippy::too_many_lines, clippy::significant_drop_tightening)]
fn poll(&self, events: &mut super::Events, timeout: Option<Duration>) -> io::Result<usize> {
let was_woken = self.woken.swap(false, Ordering::AcqRel);
events.clear();
let delivered_events = {
let mut inner = self.inner.lock();
let current_time = inner.time;
let timeout_deadline = timeout.map(|duration| {
current_time.saturating_add_nanos(duration_to_nanos_saturating(duration))
});
let next_event_time = inner.pending.peek().map(|timed| timed.time);
let target_time = if was_woken {
current_time
} else {
match (timeout_deadline, next_event_time) {
(Some(deadline), Some(next)) => deadline.min(next),
(Some(deadline), None) => deadline,
(None, Some(next)) => next,
(None, None) => current_time,
}
};
if target_time > inner.time {
inner.time = target_time;
}
let mut ready_events = Vec::new();
let mut delivered_events = Vec::new();
while let Some(te) = inner.pending.peek() {
if te.time <= inner.time {
let te = inner.pending.pop().expect("pending timer array is empty");
if inner.sockets.contains_key(&te.event.token) {
ready_events.push(te);
}
} else {
break;
}
}
{
let LabInner {
sockets,
pending,
next_sequence,
chaos,
time: _,
} = &mut *inner;
let mut closed_tokens_emitted = BTreeSet::new();
for timed in ready_events {
let event = timed.event;
let token = event.token;
let Some(socket) = sockets.get_mut(&token) else {
continue;
};
let registered_interest = socket.interest;
if let Some(ref mut fault) = socket.fault {
if fault.config.partitioned {
fault.dropped_event_count += 1;
debug!(
target: "fault",
token = token.0,
injection = "partition_drop",
"event dropped due to partition"
);
continue;
}
if fault.config.closed {
if closed_tokens_emitted.insert(token) {
delivered_events.push(Event::hangup(token));
}
continue;
}
let mut injected_error = fault.config.pending_error.take();
if let Some(kind) = injected_error {
fault.last_error_kind = Some(kind);
fault.injected_error_count += 1;
debug!(
target: "fault",
token = token.0,
injection = "pending_error",
error_kind = ?kind,
"injected pending error"
);
}
if injected_error.is_none() && fault.should_inject_random_error() {
if let Some(kind) = fault.next_error_kind() {
fault.last_error_kind = Some(kind);
fault.injected_error_count += 1;
debug!(
target: "fault",
token = token.0,
injection = "random_error",
error_kind = ?kind,
"injected random error"
);
injected_error = Some(kind);
}
}
if injected_error.is_some() {
delivered_events.push(Event::errored(token));
continue;
}
}
let delivered = if let Some(chaos) = chaos.as_mut() {
let config = &chaos.config;
if !timed.delayed && chaos.rng.should_inject_delay(config) {
let delay = chaos.rng.next_delay(config);
if !delay.is_zero() {
let sequence = *next_sequence;
*next_sequence = next_sequence
.checked_add(1)
.expect("lab reactor sequence counter exhausted");
let delayed_time = timed
.time
.saturating_add_nanos(duration_to_nanos_saturating(delay));
pending.push(TimedEvent {
time: delayed_time,
sequence,
event,
delayed: true,
});
chaos.stats.record_delay(delay);
debug!(
target: "chaos",
token = token.0,
injection = "io_delay",
delay_ns = duration_to_nanos_saturating(delay)
);
continue;
}
}
let mut injected = false;
let mut delivered_event = event;
if chaos.rng.should_inject_io_error(config) {
if let Some(kind) = chaos.rng.next_io_error_kind(config) {
delivered_event = Event::errored(token);
chaos.last_io_error_kind = Some(kind);
chaos.stats.record_io_error();
debug!(
target: "chaos",
token = token.0,
injection = "io_error",
error_kind = ?kind
);
injected = true;
}
}
if !injected {
chaos.stats.record_no_injection();
}
Some(delivered_event)
} else {
Some(event)
};
if let Some(delivered_event) = delivered {
let mut ready = delivered_event.ready & registered_interest;
if delivered_event.is_error() {
ready = ready.add(Interest::ERROR);
}
if delivered_event.is_hangup() {
ready = ready.add(Interest::HUP);
}
if !ready.is_empty() {
delivered_events.push(Event::new(token, ready));
}
}
}
let mut closed_hup_tokens: Vec<Token> = sockets
.iter()
.filter_map(|(&token, socket)| {
let fault = socket.fault.as_ref()?;
if fault.config.partitioned || !fault.config.closed {
return None;
}
Some(token)
})
.collect();
closed_hup_tokens.sort();
for token in closed_hup_tokens {
if closed_tokens_emitted.insert(token) {
delivered_events.push(Event::hangup(token));
}
}
}
delivered_events
};
for event in delivered_events {
events.push(event);
}
Ok(events.len())
}
fn wake(&self) -> io::Result<()> {
self.woken.store(true, Ordering::Release);
Ok(())
}
fn registration_count(&self) -> usize {
self.inner.lock().sockets.len()
}
}
#[cfg(all(test, unix))]
mod tests {
use super::*;
use crate::test_utils::init_test_logging;
struct TestFdSource;
impl std::os::fd::AsRawFd for TestFdSource {
fn as_raw_fd(&self) -> std::os::fd::RawFd {
0
}
}
fn init_test(name: &str) {
init_test_logging();
crate::test_phase!(name);
}
#[test]
fn delivers_injected_event() {
init_test("delivers_injected_event");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::readable())
.unwrap();
reactor.inject_event(token, Event::readable(token), Duration::from_millis(10));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor
.poll(&mut events, Some(Duration::from_millis(5)))
.unwrap();
crate::assert_with_log!(
events.is_empty(),
"events empty before time",
true,
events.is_empty()
);
reactor
.poll(&mut events, Some(Duration::from_millis(10)))
.unwrap();
let count = events.iter().count();
crate::assert_with_log!(count == 1, "event delivered", 1usize, count);
crate::test_complete!("delivers_injected_event");
}
#[test]
fn modify_interest() {
init_test("modify_interest");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
crate::assert_with_log!(
reactor.registration_count() == 1,
"registration count",
1usize,
reactor.registration_count()
);
reactor.modify(token, Interest::WRITABLE).unwrap();
let result = reactor.modify(Token::new(999), Interest::READABLE);
crate::assert_with_log!(
result.is_err(),
"modify missing fails",
true,
result.is_err()
);
crate::test_complete!("modify_interest");
}
#[test]
fn deregister_by_token() {
init_test("deregister_by_token");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
crate::assert_with_log!(
reactor.registration_count() == 1,
"registration count",
1usize,
reactor.registration_count()
);
reactor.deregister(token).unwrap();
crate::assert_with_log!(
reactor.registration_count() == 0,
"registration count",
0usize,
reactor.registration_count()
);
let result = reactor.deregister(token);
crate::assert_with_log!(
result.is_err(),
"deregister missing fails",
true,
result.is_err()
);
crate::test_complete!("deregister_by_token");
}
#[test]
fn duplicate_register_fails() {
init_test("duplicate_register_fails");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
let result = reactor.register(&source, token, Interest::WRITABLE);
crate::assert_with_log!(result.is_err(), "duplicate fails", true, result.is_err());
crate::test_complete!("duplicate_register_fails");
}
#[test]
fn wake_sets_flag() {
init_test("wake_sets_flag");
let reactor = LabReactor::new();
let was_set = reactor.check_and_clear_wake();
crate::assert_with_log!(!was_set, "wake flag initially false", false, was_set);
reactor.wake().unwrap();
let now_set = reactor.check_and_clear_wake();
crate::assert_with_log!(now_set, "wake flag set", true, now_set);
let cleared = reactor.check_and_clear_wake();
crate::assert_with_log!(!cleared, "wake flag cleared", false, cleared);
crate::test_complete!("wake_sets_flag");
}
#[test]
fn wake_interrupts_timed_poll_without_advancing_virtual_time() {
init_test("wake_interrupts_timed_poll_without_advancing_virtual_time");
let reactor = LabReactor::new();
let mut events = crate::runtime::reactor::Events::with_capacity(4);
reactor.wake().unwrap();
let count = reactor
.poll(&mut events, Some(Duration::from_millis(50)))
.unwrap();
crate::assert_with_log!(count == 0, "no synthetic events", 0usize, count);
crate::assert_with_log!(
events.is_empty(),
"event buffer empty",
true,
events.is_empty()
);
crate::assert_with_log!(
reactor.now() == Time::ZERO,
"wake does not fast-forward virtual time",
Time::ZERO,
reactor.now()
);
crate::test_complete!("wake_interrupts_timed_poll_without_advancing_virtual_time");
}
#[test]
fn registration_count_and_is_empty() {
init_test("registration_count_and_is_empty");
let reactor = LabReactor::new();
let source = TestFdSource;
crate::assert_with_log!(
reactor.is_empty(),
"reactor empty",
true,
reactor.is_empty()
);
crate::assert_with_log!(
reactor.registration_count() == 0,
"registration count",
0usize,
reactor.registration_count()
);
reactor
.register(&source, Token::new(1), Interest::READABLE)
.unwrap();
crate::assert_with_log!(
!reactor.is_empty(),
"reactor not empty",
false,
reactor.is_empty()
);
crate::assert_with_log!(
reactor.registration_count() == 1,
"registration count",
1usize,
reactor.registration_count()
);
reactor
.register(&source, Token::new(2), Interest::WRITABLE)
.unwrap();
crate::assert_with_log!(
reactor.registration_count() == 2,
"registration count",
2usize,
reactor.registration_count()
);
reactor.deregister(Token::new(1)).unwrap();
crate::assert_with_log!(
reactor.registration_count() == 1,
"registration count",
1usize,
reactor.registration_count()
);
reactor.deregister(Token::new(2)).unwrap();
crate::assert_with_log!(
reactor.is_empty(),
"reactor empty",
true,
reactor.is_empty()
);
crate::test_complete!("registration_count_and_is_empty");
}
#[test]
fn virtual_time_advances() {
init_test("virtual_time_advances");
let reactor = LabReactor::new();
crate::assert_with_log!(
reactor.now() == Time::ZERO,
"initial time",
Time::ZERO,
reactor.now()
);
reactor.advance_time(Duration::from_secs(1));
crate::assert_with_log!(
reactor.now().as_nanos() == 1_000_000_000,
"time after advance",
1_000_000_000u64,
reactor.now().as_nanos()
);
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor
.poll(&mut events, Some(Duration::from_millis(500)))
.unwrap();
crate::assert_with_log!(
reactor.now().as_nanos() == 1_500_000_000,
"time after poll",
1_500_000_000u64,
reactor.now().as_nanos()
);
crate::test_complete!("virtual_time_advances");
}
#[test]
fn duration_to_nanos_saturates_max_duration() {
init_test("duration_to_nanos_saturates_max_duration");
let nanos = duration_to_nanos_saturating(Duration::MAX);
crate::assert_with_log!(nanos == u64::MAX, "nanos", u64::MAX, nanos);
crate::test_complete!("duration_to_nanos_saturates_max_duration");
}
#[test]
fn inject_event_with_max_duration_saturates_to_time_max() {
init_test("inject_event_with_max_duration_saturates_to_time_max");
let reactor = LabReactor::new();
let token = Token::new(1);
reactor.inject_event(token, Event::readable(token), Duration::MAX);
let next = reactor.next_event_time();
crate::assert_with_log!(
next == Some(Time::MAX),
"next event time",
Some(Time::MAX),
next
);
crate::test_complete!("inject_event_with_max_duration_saturates_to_time_max");
}
#[test]
fn poll_timeout_with_max_duration_saturates_time() {
init_test("poll_timeout_with_max_duration_saturates_time");
let reactor = LabReactor::new();
let mut events = crate::runtime::reactor::Events::with_capacity(1);
let count = reactor
.poll(&mut events, Some(Duration::MAX))
.expect("poll should succeed");
crate::assert_with_log!(count == 0, "count", 0usize, count);
let now = reactor.now();
crate::assert_with_log!(now == Time::MAX, "now", Time::MAX, now);
crate::test_complete!("poll_timeout_with_max_duration_saturates_time");
}
#[test]
fn advance_time_to_target() {
init_test("advance_time_to_target");
let reactor = LabReactor::new();
crate::assert_with_log!(
reactor.now() == Time::ZERO,
"initial time",
Time::ZERO,
reactor.now()
);
reactor.advance_time_to(Time::from_nanos(1_000_000_000));
crate::assert_with_log!(
reactor.now().as_nanos() == 1_000_000_000,
"time after advance",
1_000_000_000u64,
reactor.now().as_nanos()
);
reactor.advance_time_to(Time::from_nanos(500_000_000));
crate::assert_with_log!(
reactor.now().as_nanos() == 1_000_000_000,
"time unchanged",
1_000_000_000u64,
reactor.now().as_nanos()
);
reactor.advance_time_to(Time::from_nanos(2_000_000_000));
crate::assert_with_log!(
reactor.now().as_nanos() == 2_000_000_000,
"time advanced",
2_000_000_000u64,
reactor.now().as_nanos()
);
crate::test_complete!("advance_time_to_target");
}
#[test]
fn set_ready_delivers_immediately() {
init_test("set_ready_delivers_immediately");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor.set_ready(token, Event::readable(token));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
let count = events.iter().count();
crate::assert_with_log!(count == 1, "event delivered", 1usize, count);
crate::test_complete!("set_ready_delivers_immediately");
}
#[test]
fn poll_clears_existing_events_before_next_poll() {
init_test("poll_clears_existing_events_before_next_poll");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor.set_ready(token, Event::readable(token));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
let first_count = reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(first_count == 1, "first count", 1usize, first_count);
crate::assert_with_log!(
events.iter().count() == 1,
"first len",
1usize,
events.len()
);
let second_count = reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(second_count == 0, "second count", 0usize, second_count);
crate::assert_with_log!(
events.is_empty(),
"events cleared on second poll",
true,
events.is_empty()
);
crate::test_complete!("poll_clears_existing_events_before_next_poll");
}
#[test]
fn poll_returns_stored_count_when_capacity_saturates() {
init_test("poll_returns_stored_count_when_capacity_saturates");
let reactor = LabReactor::new();
let source = TestFdSource;
let token1 = Token::new(1);
let token2 = Token::new(2);
reactor
.register(&source, token1, Interest::READABLE)
.unwrap();
reactor
.register(&source, token2, Interest::READABLE)
.unwrap();
reactor.set_ready(token1, Event::readable(token1));
reactor.set_ready(token2, Event::readable(token2));
let mut events = crate::runtime::reactor::Events::with_capacity(1);
let count = reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(count == 2, "stored count", 2usize, count);
crate::assert_with_log!(
events.iter().count() == 2,
"stored len",
2usize,
events.len()
);
crate::test_complete!("poll_returns_stored_count_when_capacity_saturates");
}
#[test]
fn same_time_events_delivered_in_order() {
init_test("same_time_events_delivered_in_order");
let reactor = LabReactor::new();
let source = TestFdSource;
let token1 = Token::new(1);
let token2 = Token::new(2);
let token3 = Token::new(3);
reactor
.register(&source, token1, Interest::READABLE)
.unwrap();
reactor
.register(&source, token2, Interest::READABLE)
.unwrap();
reactor
.register(&source, token3, Interest::READABLE)
.unwrap();
reactor.schedule_event(token1, Event::readable(token1), Duration::from_millis(10));
reactor.schedule_event(token2, Event::readable(token2), Duration::from_millis(10));
reactor.schedule_event(token3, Event::readable(token3), Duration::from_millis(10));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor
.poll(&mut events, Some(Duration::from_millis(15)))
.unwrap();
let collected: Vec<_> = events.iter().collect();
crate::assert_with_log!(collected.len() == 3, "event count", 3usize, collected.len());
crate::assert_with_log!(
collected[0].token == token1,
"first token",
token1,
collected[0].token
);
crate::assert_with_log!(
collected[1].token == token2,
"second token",
token2,
collected[1].token
);
crate::assert_with_log!(
collected[2].token == token3,
"third token",
token3,
collected[2].token
);
crate::test_complete!("same_time_events_delivered_in_order");
}
#[test]
fn different_time_events_delivered_one_poll_per_due_deadline() {
init_test("different_time_events_delivered_one_poll_per_due_deadline");
let reactor = LabReactor::new();
let source = TestFdSource;
let token1 = Token::new(1);
let token2 = Token::new(2);
let token3 = Token::new(3);
reactor
.register(&source, token1, Interest::READABLE)
.unwrap();
reactor
.register(&source, token2, Interest::READABLE)
.unwrap();
reactor
.register(&source, token3, Interest::READABLE)
.unwrap();
reactor.schedule_event(token3, Event::readable(token3), Duration::from_millis(5));
reactor.schedule_event(token1, Event::readable(token1), Duration::from_millis(10));
reactor.schedule_event(token2, Event::readable(token2), Duration::from_millis(15));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor
.poll(&mut events, Some(Duration::from_millis(20)))
.unwrap();
let collected: Vec<_> = events.iter().collect();
crate::assert_with_log!(collected.len() == 1, "event count", 1usize, collected.len());
crate::assert_with_log!(
collected[0].token == token3,
"first token",
token3,
collected[0].token
);
crate::assert_with_log!(
reactor.now() == Time::from_millis(5),
"virtual time stops at earliest due event",
Time::from_millis(5),
reactor.now()
);
events.clear();
reactor
.poll(&mut events, Some(Duration::from_millis(20)))
.unwrap();
let collected: Vec<_> = events.iter().collect();
crate::assert_with_log!(
collected.len() == 1,
"second poll count",
1usize,
collected.len()
);
crate::assert_with_log!(
collected[0].token == token1,
"second poll token",
token1,
collected[0].token
);
crate::assert_with_log!(
reactor.now() == Time::from_millis(10),
"virtual time advances to second due event",
Time::from_millis(10),
reactor.now()
);
events.clear();
reactor
.poll(&mut events, Some(Duration::from_millis(20)))
.unwrap();
let collected: Vec<_> = events.iter().collect();
crate::assert_with_log!(
collected.len() == 1,
"third poll count",
1usize,
collected.len()
);
crate::assert_with_log!(
collected[0].token == token2,
"third poll token",
token2,
collected[0].token
);
crate::assert_with_log!(
reactor.now() == Time::from_millis(15),
"virtual time advances to final due event",
Time::from_millis(15),
reactor.now()
);
crate::test_complete!("different_time_events_delivered_one_poll_per_due_deadline");
}
#[test]
fn schedule_event_alias_works() {
init_test("schedule_event_alias_works");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor.schedule_event(token, Event::readable(token), Duration::from_millis(10));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor
.poll(&mut events, Some(Duration::from_millis(15)))
.unwrap();
let count = events.iter().count();
crate::assert_with_log!(count == 1, "event delivered", 1usize, count);
crate::test_complete!("schedule_event_alias_works");
}
#[test]
fn events_before_current_time_delivered_immediately() {
init_test("events_before_current_time_delivered_immediately");
let reactor = LabReactor::new();
let source = TestFdSource;
let token = Token::new(1);
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor.advance_time(Duration::from_millis(100));
reactor.schedule_event(token, Event::readable(token), Duration::ZERO);
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
let count = events.iter().count();
crate::assert_with_log!(count == 1, "event delivered", 1usize, count);
crate::test_complete!("events_before_current_time_delivered_immediately");
}
#[test]
fn deregister_cleans_up_scheduled_events() {
init_test("deregister_cleans_up_scheduled_events");
let reactor = LabReactor::new();
let source = TestFdSource;
let token1 = Token::new(1);
let token2 = Token::new(2);
reactor
.register(&source, token1, Interest::READABLE)
.unwrap();
reactor
.register(&source, token2, Interest::READABLE)
.unwrap();
reactor.schedule_event(token1, Event::readable(token1), Duration::from_millis(10));
reactor.schedule_event(token2, Event::readable(token2), Duration::from_millis(10));
reactor.schedule_event(token1, Event::readable(token1), Duration::from_millis(20));
reactor.deregister(token1).unwrap();
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor
.poll(&mut events, Some(Duration::from_millis(25)))
.unwrap();
let collected: Vec<_> = events.iter().collect();
crate::assert_with_log!(collected.len() == 1, "event count", 1usize, collected.len());
crate::assert_with_log!(
collected[0].token == token2,
"remaining token",
token2,
collected[0].token
);
crate::test_complete!("deregister_cleans_up_scheduled_events");
}
#[test]
fn io_chaos_injects_error_events() {
init_test("io_chaos_injects_error_events");
let config = ChaosConfig::new(7)
.with_io_error_probability(1.0)
.with_io_error_kinds(vec![io::ErrorKind::TimedOut]);
let reactor = LabReactor::with_chaos(config);
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor.set_ready(token, Event::readable(token));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
let event = events.iter().next().expect("event");
crate::assert_with_log!(event.is_error(), "event is error", true, event.is_error());
let stats = reactor.chaos_stats();
crate::assert_with_log!(
stats.io_errors == 1,
"io error count",
1u64,
stats.io_errors
);
crate::assert_with_log!(
stats.decision_points == 1,
"decision points",
1u64,
stats.decision_points
);
let last_kind = reactor.last_io_error_kind();
crate::assert_with_log!(
last_kind == Some(io::ErrorKind::TimedOut),
"last error kind",
Some(io::ErrorKind::TimedOut),
last_kind
);
crate::test_complete!("io_chaos_injects_error_events");
}
#[test]
fn io_chaos_delays_events() {
init_test("io_chaos_delays_events");
let config = ChaosConfig::new(11)
.with_delay_probability(1.0)
.with_delay_range(Duration::from_millis(5)..Duration::from_millis(6));
let reactor = LabReactor::with_chaos(config);
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor.set_ready(token, Event::readable(token));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(
events.is_empty(),
"initial poll empty",
true,
events.is_empty()
);
let delayed_at = reactor
.inner
.lock()
.pending
.peek()
.map(|te| te.time)
.expect("delayed event");
let delayed_stats = reactor.chaos_stats();
crate::assert_with_log!(
delayed_stats.delays == 1,
"delay count",
1u64,
delayed_stats.delays
);
crate::assert_with_log!(
delayed_stats.decision_points == 1,
"decision points after delay",
1u64,
delayed_stats.decision_points
);
reactor.advance_time_to(delayed_at);
events.clear();
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
let count = events.iter().count();
crate::assert_with_log!(count == 1, "delayed event delivered", 1usize, count);
let final_stats = reactor.chaos_stats();
crate::assert_with_log!(
final_stats.delays == 1,
"final delay count",
1u64,
final_stats.delays
);
crate::assert_with_log!(
final_stats.decision_points == 2,
"decision points after delivery",
2u64,
final_stats.decision_points
);
crate::test_complete!("io_chaos_delays_events");
}
#[test]
fn io_chaos_delay_is_based_on_due_time_not_full_poll_timeout() {
init_test("io_chaos_delay_is_based_on_due_time_not_full_poll_timeout");
let config = ChaosConfig::new(11)
.with_delay_probability(1.0)
.with_delay_range(Duration::from_millis(5)..Duration::from_millis(6));
let reactor = LabReactor::with_chaos(config);
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor.inject_event(token, Event::readable(token), Duration::from_millis(10));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor
.poll(&mut events, Some(Duration::from_millis(50)))
.unwrap();
crate::assert_with_log!(
events.is_empty(),
"delayed event not delivered on first poll",
true,
events.is_empty()
);
crate::assert_with_log!(
reactor.now() == Time::from_millis(10),
"poll stops at original due time",
Time::from_millis(10),
reactor.now()
);
let delayed_at = reactor
.inner
.lock()
.pending
.peek()
.map(|timed| timed.time)
.expect("delayed event");
let min_delayed_at = Time::from_millis(15);
let max_delayed_at = Time::from_millis(16);
crate::assert_with_log!(
delayed_at >= min_delayed_at && delayed_at < max_delayed_at,
"delay rebased from event due time",
format!("[{min_delayed_at:?}, {max_delayed_at:?})"),
delayed_at
);
crate::test_complete!("io_chaos_delay_is_based_on_due_time_not_full_poll_timeout");
}
#[test]
fn io_chaos_is_deterministic_with_same_seed() {
init_test("io_chaos_is_deterministic_with_same_seed");
let config = ChaosConfig::new(123)
.with_io_error_probability(0.5)
.with_io_error_kinds(vec![io::ErrorKind::WouldBlock, io::ErrorKind::TimedOut]);
let reactor_a = LabReactor::with_chaos(config.clone());
let reactor_b = LabReactor::with_chaos(config);
let token_a = Token::new(1);
let token_b = Token::new(1);
let source = TestFdSource;
reactor_a
.register(&source, token_a, Interest::READABLE)
.unwrap();
reactor_b
.register(&source, token_b, Interest::READABLE)
.unwrap();
reactor_a.set_ready(token_a, Event::readable(token_a));
reactor_b.set_ready(token_b, Event::readable(token_b));
let mut events_a = crate::runtime::reactor::Events::with_capacity(10);
let mut events_b = crate::runtime::reactor::Events::with_capacity(10);
reactor_a.poll(&mut events_a, Some(Duration::ZERO)).unwrap();
reactor_b.poll(&mut events_b, Some(Duration::ZERO)).unwrap();
let ready_a = events_a.iter().next().expect("event").ready;
let ready_b = events_b.iter().next().expect("event").ready;
crate::assert_with_log!(ready_a == ready_b, "ready matches", ready_b, ready_a);
let last_a = reactor_a.last_io_error_kind();
let last_b = reactor_b.last_io_error_kind();
crate::assert_with_log!(last_a == last_b, "last error kind", last_b, last_a);
crate::test_complete!("io_chaos_is_deterministic_with_same_seed");
}
mod io_driver_integration {
use super::*;
use crate::runtime::io_driver::IoDriver;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
use std::task::{Wake, Waker};
struct FlagWaker {
flag: AtomicBool,
count: AtomicUsize,
}
impl Wake for FlagWaker {
fn wake(self: Arc<Self>) {
self.flag.store(true, Ordering::SeqCst);
self.count.fetch_add(1, Ordering::SeqCst);
}
fn wake_by_ref(self: &Arc<Self>) {
self.flag.store(true, Ordering::SeqCst);
self.count.fetch_add(1, Ordering::SeqCst);
}
}
fn create_test_waker() -> (Waker, Arc<FlagWaker>) {
let state = Arc::new(FlagWaker {
flag: AtomicBool::new(false),
count: AtomicUsize::new(0),
});
(Waker::from(state.clone()), state)
}
#[test]
fn io_driver_with_lab_reactor_dispatches_wakers() {
super::init_test("io_driver_with_lab_reactor_dispatches_wakers");
let reactor = Arc::new(LabReactor::new());
let mut driver = IoDriver::new(reactor.clone());
let source = TestFdSource;
let (waker, waker_state) = create_test_waker();
let token = driver
.register(&source, Interest::READABLE, waker)
.expect("register");
let initial = waker_state.flag.load(Ordering::SeqCst);
crate::assert_with_log!(!initial, "waker not yet woken", false, initial);
reactor.inject_event(token, Event::readable(token), Duration::ZERO);
let count = driver.turn(Some(Duration::from_millis(10))).expect("turn");
crate::assert_with_log!(count >= 1, "events dispatched", true, count >= 1);
let flag = waker_state.flag.load(Ordering::SeqCst);
crate::assert_with_log!(flag, "waker fired", true, flag);
let wake_count = waker_state.count.load(Ordering::SeqCst);
crate::assert_with_log!(wake_count == 1, "wake count", 1usize, wake_count);
crate::test_complete!("io_driver_with_lab_reactor_dispatches_wakers");
}
#[test]
fn io_driver_with_lab_reactor_multiple_wakers() {
super::init_test("io_driver_with_lab_reactor_multiple_wakers");
let reactor = Arc::new(LabReactor::new());
let mut driver = IoDriver::new(reactor.clone());
let source = TestFdSource;
let (waker1, state1) = create_test_waker();
let (waker2, state2) = create_test_waker();
let (waker3, state3) = create_test_waker();
let token1 = driver
.register(&source, Interest::READABLE, waker1)
.unwrap();
let _token2 = driver
.register(&source, Interest::READABLE, waker2)
.unwrap();
let token3 = driver
.register(&source, Interest::READABLE, waker3)
.unwrap();
reactor.inject_event(token1, Event::readable(token1), Duration::ZERO);
reactor.inject_event(token3, Event::readable(token3), Duration::ZERO);
let count = driver.turn(Some(Duration::from_millis(10))).unwrap();
crate::assert_with_log!(count == 2, "dispatch count", 2usize, count);
let flag1 = state1.flag.load(Ordering::SeqCst);
let flag2 = state2.flag.load(Ordering::SeqCst);
let flag3 = state3.flag.load(Ordering::SeqCst);
crate::assert_with_log!(flag1, "waker1 fired", true, flag1);
crate::assert_with_log!(!flag2, "waker2 not fired", false, flag2);
crate::assert_with_log!(flag3, "waker3 fired", true, flag3);
crate::test_complete!("io_driver_with_lab_reactor_multiple_wakers");
}
}
mod fault_injection {
use super::*;
fn approx_eq(lhs: f64, rhs: f64) -> bool {
(lhs - rhs).abs() < f64::EPSILON
}
#[test]
fn fault_config_builder() {
super::init_test("fault_config_builder");
let config = FaultConfig::new()
.with_error_probability(0.5)
.with_error_kinds(vec![
io::ErrorKind::BrokenPipe,
io::ErrorKind::ConnectionReset,
])
.with_partitioned(true)
.with_closed(true)
.with_pending_error(io::ErrorKind::TimedOut);
let approx = approx_eq(config.error_probability, 0.5);
crate::assert_with_log!(
approx,
"error_probability",
0.5f64,
config.error_probability
);
crate::assert_with_log!(
config.error_kinds.len() == 2,
"error_kinds len",
2usize,
config.error_kinds.len()
);
crate::assert_with_log!(config.partitioned, "partitioned", true, config.partitioned);
crate::assert_with_log!(config.closed, "closed", true, config.closed);
crate::assert_with_log!(
config.pending_error == Some(io::ErrorKind::TimedOut),
"pending_error",
Some(io::ErrorKind::TimedOut),
config.pending_error
);
crate::test_complete!("fault_config_builder");
}
#[test]
fn fault_config_probability_clamped() {
super::init_test("fault_config_probability_clamped");
let config_low = FaultConfig::new().with_error_probability(-0.5);
let config_high = FaultConfig::new().with_error_probability(1.5);
let low_ok = approx_eq(config_low.error_probability, 0.0);
crate::assert_with_log!(low_ok, "clamped to 0", 0.0f64, config_low.error_probability);
let high_ok = approx_eq(config_high.error_probability, 1.0);
crate::assert_with_log!(
high_ok,
"clamped to 1",
1.0f64,
config_high.error_probability
);
crate::test_complete!("fault_config_probability_clamped");
}
#[test]
fn set_and_clear_fault_config() {
super::init_test("set_and_clear_fault_config");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
let config = FaultConfig::new().with_partitioned(true);
reactor.set_fault_config(token, config).unwrap();
let has_fault = reactor
.inner
.lock()
.sockets
.get(&token)
.and_then(|s| s.fault.as_ref())
.is_some();
crate::assert_with_log!(has_fault, "fault config set", true, has_fault);
reactor.clear_fault_config(token).unwrap();
let has_fault = reactor
.inner
.lock()
.sockets
.get(&token)
.and_then(|s| s.fault.as_ref())
.is_some();
crate::assert_with_log!(!has_fault, "fault config cleared", false, has_fault);
crate::test_complete!("set_and_clear_fault_config");
}
#[test]
fn set_fault_config_unregistered_token_fails() {
super::init_test("set_fault_config_unregistered_token_fails");
let reactor = LabReactor::new();
let token = Token::new(999);
let result = reactor.set_fault_config(token, FaultConfig::new());
crate::assert_with_log!(result.is_err(), "unregistered fails", true, result.is_err());
crate::test_complete!("set_fault_config_unregistered_token_fails");
}
#[test]
fn inject_error_one_shot() {
super::init_test("inject_error_one_shot");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor
.inject_error(token, io::ErrorKind::BrokenPipe)
.unwrap();
reactor.set_ready(token, Event::readable(token));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
let event = events.iter().next().expect("event");
crate::assert_with_log!(
event.is_error(),
"first event is error",
true,
event.is_error()
);
let last_error = reactor.last_injected_error(token);
crate::assert_with_log!(
last_error == Some(io::ErrorKind::BrokenPipe),
"last error recorded",
Some(io::ErrorKind::BrokenPipe),
last_error
);
events.clear();
reactor.set_ready(token, Event::readable(token));
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
let event = events.iter().next().expect("event");
crate::assert_with_log!(
event.is_readable(),
"second event is readable",
true,
event.is_readable()
);
crate::test_complete!("inject_error_one_shot");
}
#[test]
fn inject_close_delivers_hup() {
super::init_test("inject_close_delivers_hup");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor.inject_close(token).unwrap();
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
let event = events.iter().next().expect("event");
crate::assert_with_log!(event.is_hangup(), "received HUP", true, event.is_hangup());
let stats = reactor.fault_stats(token);
crate::assert_with_log!(stats.is_some(), "has stats", true, stats.is_some());
let (errors, closes, dropped) = stats.unwrap();
crate::assert_with_log!(closes == 1, "close count", 1u64, closes);
crate::assert_with_log!(errors == 0, "error count", 0u64, errors);
crate::assert_with_log!(dropped == 0, "dropped count", 0u64, dropped);
crate::test_complete!("inject_close_delivers_hup");
}
#[test]
fn closed_fault_state_forces_hup_until_cleared() {
super::init_test("closed_fault_state_forces_hup_until_cleared");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE | Interest::WRITABLE)
.unwrap();
reactor
.set_fault_config(token, FaultConfig::new().with_closed(true))
.unwrap();
reactor.set_ready(token, Event::readable(token));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(events.len() == 1, "single HUP", 1usize, events.len());
let first = events.iter().next().expect("event");
crate::assert_with_log!(
first.is_hangup(),
"first event is HUP",
true,
first.is_hangup()
);
reactor.set_ready(token, Event::readable(token));
reactor.set_ready(token, Event::writable(token));
events.clear();
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(events.len() == 1, "collapsed HUP", 1usize, events.len());
let second = events.iter().next().expect("event");
crate::assert_with_log!(
second.is_hangup(),
"second event remains HUP",
true,
second.is_hangup()
);
crate::test_complete!("closed_fault_state_forces_hup_until_cleared");
}
#[test]
fn closed_fault_state_delivers_hup_on_idle_poll() {
super::init_test("closed_fault_state_delivers_hup_on_idle_poll");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor
.set_fault_config(token, FaultConfig::new().with_closed(true))
.unwrap();
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(events.len() == 1, "single idle HUP", 1usize, events.len());
let event = events.iter().next().expect("event");
crate::assert_with_log!(
event.is_hangup(),
"idle poll reports HUP for closed socket",
true,
event.is_hangup()
);
crate::test_complete!("closed_fault_state_delivers_hup_on_idle_poll");
}
#[test]
fn clear_fault_config_suppresses_injected_close_before_poll() {
super::init_test("clear_fault_config_suppresses_injected_close_before_poll");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor.inject_close(token).unwrap();
reactor.clear_fault_config(token).unwrap();
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(
events.is_empty(),
"clearing fault config before poll suppresses injected close",
true,
events.is_empty()
);
crate::test_complete!("clear_fault_config_suppresses_injected_close_before_poll");
}
#[test]
fn partition_drops_events() {
super::init_test("partition_drops_events");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor.partition(token, true).unwrap();
reactor.set_ready(token, Event::readable(token));
reactor.set_ready(token, Event::writable(token));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(events.is_empty(), "events dropped", true, events.is_empty());
let stats = reactor.fault_stats(token);
let (_, _, dropped) = stats.unwrap();
crate::assert_with_log!(dropped == 2, "dropped count", 2u64, dropped);
reactor.partition(token, false).unwrap();
reactor.set_ready(token, Event::readable(token));
events.clear();
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(events.len() == 1, "event delivered", 1usize, events.len());
crate::test_complete!("partition_drops_events");
}
#[test]
fn random_error_injection() {
super::init_test("random_error_injection");
let reactor = LabReactor::new();
let token = Token::new(42); let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
let config = FaultConfig::new()
.with_error_probability(1.0)
.with_error_kinds(vec![io::ErrorKind::ConnectionReset]);
reactor.set_fault_config(token, config).unwrap();
reactor.set_ready(token, Event::readable(token));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
let event = events.iter().next().expect("event");
crate::assert_with_log!(event.is_error(), "error injected", true, event.is_error());
let last_error = reactor.last_injected_error(token);
crate::assert_with_log!(
last_error == Some(io::ErrorKind::ConnectionReset),
"error kind",
Some(io::ErrorKind::ConnectionReset),
last_error
);
crate::test_complete!("random_error_injection");
}
#[test]
fn per_token_fault_isolated() {
super::init_test("per_token_fault_isolated");
let reactor = LabReactor::new();
let token1 = Token::new(1);
let token2 = Token::new(2);
let source = TestFdSource;
reactor
.register(&source, token1, Interest::READABLE)
.unwrap();
reactor
.register(&source, token2, Interest::READABLE)
.unwrap();
reactor.partition(token1, true).unwrap();
reactor.set_ready(token1, Event::readable(token1));
reactor.set_ready(token2, Event::readable(token2));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(events.len() == 1, "one event", 1usize, events.len());
let event = events.iter().next().expect("event");
crate::assert_with_log!(
event.token == token2,
"token2 delivered",
token2,
event.token
);
crate::test_complete!("per_token_fault_isolated");
}
#[test]
fn fault_injection_deterministic_with_same_token() {
super::init_test("fault_injection_deterministic_with_same_token");
let reactor_a = LabReactor::new();
let reactor_b = LabReactor::new();
let token = Token::new(123); let source = TestFdSource;
reactor_a
.register(&source, token, Interest::READABLE)
.unwrap();
reactor_b
.register(&source, token, Interest::READABLE)
.unwrap();
let config = FaultConfig::new()
.with_error_probability(0.5)
.with_error_kinds(vec![
io::ErrorKind::WouldBlock,
io::ErrorKind::TimedOut,
io::ErrorKind::ConnectionReset,
]);
reactor_a.set_fault_config(token, config.clone()).unwrap();
reactor_b.set_fault_config(token, config).unwrap();
let mut results_a = Vec::new();
let mut results_b = Vec::new();
for _ in 0..10 {
reactor_a.set_ready(token, Event::readable(token));
reactor_b.set_ready(token, Event::readable(token));
let mut events_a = crate::runtime::reactor::Events::with_capacity(10);
let mut events_b = crate::runtime::reactor::Events::with_capacity(10);
reactor_a.poll(&mut events_a, Some(Duration::ZERO)).unwrap();
reactor_b.poll(&mut events_b, Some(Duration::ZERO)).unwrap();
results_a.push(events_a.iter().next().map(|e| e.ready));
results_b.push(events_b.iter().next().map(|e| e.ready));
}
crate::assert_with_log!(
results_a == results_b,
"deterministic results",
results_b,
results_a
);
crate::test_complete!("fault_injection_deterministic_with_same_token");
}
#[test]
fn inject_error_creates_fault_state_if_missing() {
super::init_test("inject_error_creates_fault_state_if_missing");
let reactor = LabReactor::new();
let token = Token::new(1);
let source = TestFdSource;
reactor
.register(&source, token, Interest::READABLE)
.unwrap();
reactor
.inject_error(token, io::ErrorKind::TimedOut)
.unwrap();
let has_fault = reactor
.inner
.lock()
.sockets
.get(&token)
.and_then(|s| s.fault.as_ref())
.is_some();
crate::assert_with_log!(has_fault, "fault state created", true, has_fault);
crate::test_complete!("inject_error_creates_fault_state_if_missing");
}
#[test]
fn per_token_fault_with_global_chaos() {
super::init_test("per_token_fault_with_global_chaos");
let config = ChaosConfig::new(42)
.with_io_error_probability(1.0)
.with_io_error_kinds(vec![io::ErrorKind::TimedOut]);
let reactor = LabReactor::with_chaos(config);
let token1 = Token::new(1);
let token2 = Token::new(2);
let source = TestFdSource;
reactor
.register(&source, token1, Interest::READABLE)
.unwrap();
reactor
.register(&source, token2, Interest::READABLE)
.unwrap();
reactor.partition(token1, true).unwrap();
reactor.set_ready(token1, Event::readable(token1));
reactor.set_ready(token2, Event::readable(token2));
let mut events = crate::runtime::reactor::Events::with_capacity(10);
reactor.poll(&mut events, Some(Duration::ZERO)).unwrap();
crate::assert_with_log!(events.len() == 1, "one event", 1usize, events.len());
let event = events.iter().next().expect("event");
crate::assert_with_log!(
event.token == token2,
"token2 delivered",
token2,
event.token
);
crate::assert_with_log!(
event.is_error(),
"token2 has error (global chaos)",
true,
event.is_error()
);
crate::test_complete!("per_token_fault_with_global_chaos");
}
#[test]
fn inject_close_unregistered_token_fails() {
super::init_test("inject_close_unregistered_token_fails");
let reactor = LabReactor::new();
let token = Token::new(999);
let result = reactor.inject_close(token);
crate::assert_with_log!(result.is_err(), "unregistered fails", true, result.is_err());
crate::test_complete!("inject_close_unregistered_token_fails");
}
#[test]
fn partition_unregistered_token_fails() {
super::init_test("partition_unregistered_token_fails");
let reactor = LabReactor::new();
let token = Token::new(999);
let result = reactor.partition(token, true);
crate::assert_with_log!(result.is_err(), "unregistered fails", true, result.is_err());
crate::test_complete!("partition_unregistered_token_fails");
}
#[test]
fn fault_config_debug_clone_default() {
let cfg = FaultConfig::default();
assert!(cfg.pending_error.is_none());
assert!(!cfg.closed);
assert!(!cfg.partitioned);
let cloned = cfg.clone();
assert!(
(cloned.error_probability).abs() < f64::EPSILON,
"expected 0.0, got {}",
cloned.error_probability
);
let dbg = format!("{cfg:?}");
assert!(dbg.contains("FaultConfig"));
}
}
}