use std::{
cell::RefCell,
collections::{BTreeMap, HashSet, VecDeque},
net::IpAddr,
rc::{Rc, Weak},
task::Waker,
time::Duration,
};
use tracing::instrument;
use crate::{
SimulationError, SimulationResult,
chaos::fault_events::{SIM_FAULT_TIMELINE, SimFaultEvent},
chaos::state_handle::StateHandle,
network::{
NetworkConfiguration, PartitionStrategy,
sim::{ConnectionId, ListenerId, SimNetworkProvider},
},
};
use super::{
events::{ConnectionStateChange, Event, EventQueue, NetworkOperation, ScheduledEvent},
rng::{reset_sim_rng, set_sim_seed, sim_random, sim_random_range},
sleep::SleepFuture,
state::{
ClogState, CloseReason, ConnectionState, ListenerState, NetworkState, PartitionState,
StorageState,
},
wakers::WakerRegistry,
};
#[derive(Debug)]
pub(crate) struct SimInner {
pub(crate) current_time: Duration,
pub(crate) timer_time: Duration,
pub(crate) event_queue: EventQueue,
pub(crate) next_sequence: u64,
pub(crate) network: NetworkState,
pub(crate) storage: StorageState,
pub(crate) wakers: WakerRegistry,
pub(crate) next_task_id: u64,
pub(crate) awakened_tasks: HashSet<u64>,
pub(crate) events_processed: u64,
pub(crate) last_bit_flip_time: Duration,
pub(crate) last_processed_event: Option<Event>,
pub(crate) state: Option<StateHandle>,
}
impl SimInner {
pub(crate) fn new() -> Self {
Self {
current_time: Duration::ZERO,
timer_time: Duration::ZERO,
event_queue: EventQueue::new(),
next_sequence: 0,
network: NetworkState::new(NetworkConfiguration::default()),
storage: StorageState::default(),
wakers: WakerRegistry::default(),
next_task_id: 0,
awakened_tasks: HashSet::new(),
events_processed: 0,
last_bit_flip_time: Duration::ZERO,
last_processed_event: None,
state: None,
}
}
pub(crate) fn new_with_config(network_config: NetworkConfiguration) -> Self {
Self {
current_time: Duration::ZERO,
timer_time: Duration::ZERO,
event_queue: EventQueue::new(),
next_sequence: 0,
network: NetworkState::new(network_config),
storage: StorageState::default(),
wakers: WakerRegistry::default(),
next_task_id: 0,
awakened_tasks: HashSet::new(),
events_processed: 0,
last_bit_flip_time: Duration::ZERO,
last_processed_event: None,
state: None,
}
}
pub(crate) fn emit_fault(&self, event: SimFaultEvent) {
if let Some(ref state) = self.state {
let time_ms = self.current_time.as_millis() as u64;
state.emit_raw(SIM_FAULT_TIMELINE, event, time_ms, "sim");
}
}
pub(crate) fn calculate_flip_bit_count(random_value: u32, min_bits: u32, max_bits: u32) -> u32 {
if random_value == 0 {
return max_bits.min(32);
}
let bit_count = 1 + random_value.leading_zeros();
bit_count.clamp(min_bits, max_bits)
}
}
#[derive(Debug)]
pub struct SimWorld {
pub(crate) inner: Rc<RefCell<SimInner>>,
}
impl SimWorld {
fn create(network_config: Option<NetworkConfiguration>, seed: u64) -> Self {
reset_sim_rng();
set_sim_seed(seed);
crate::chaos::assertions::reset_assertion_results();
let inner = match network_config {
Some(config) => SimInner::new_with_config(config),
None => SimInner::new(),
};
Self {
inner: Rc::new(RefCell::new(inner)),
}
}
pub fn new() -> Self {
Self::create(None, 0)
}
pub fn new_with_seed(seed: u64) -> Self {
Self::create(None, seed)
}
pub fn new_with_network_config(network_config: NetworkConfiguration) -> Self {
Self::create(Some(network_config), 0)
}
pub fn new_with_network_config_and_seed(
network_config: NetworkConfiguration,
seed: u64,
) -> Self {
Self::create(Some(network_config), seed)
}
pub fn set_state(&self, state: StateHandle) {
self.inner.borrow_mut().state = Some(state);
}
#[instrument(skip(self))]
pub fn step(&mut self) -> bool {
let mut inner = self.inner.borrow_mut();
if let Some(scheduled_event) = inner.event_queue.pop_earliest() {
inner.current_time = scheduled_event.time();
Self::clear_expired_clogs_with_inner(&mut inner);
Self::randomly_trigger_partitions_with_inner(&mut inner);
let event = scheduled_event.into_event();
inner.last_processed_event = Some(event.clone());
Self::process_event_with_inner(&mut inner, event);
!inner.event_queue.is_empty()
} else {
inner.last_processed_event = None;
false
}
}
#[instrument(skip(self))]
pub fn run_until_empty(&mut self) {
while self.step() {
if self.inner.borrow().events_processed.is_multiple_of(50) {
let has_workload_events = !self
.inner
.borrow()
.event_queue
.has_only_infrastructure_events();
if !has_workload_events {
tracing::debug!(
"Early termination: only infrastructure events remain in queue"
);
break;
}
}
}
}
pub fn current_time(&self) -> Duration {
self.inner.borrow().current_time
}
pub fn now(&self) -> Duration {
self.inner.borrow().current_time
}
pub fn timer(&self) -> Duration {
let mut inner = self.inner.borrow_mut();
let chaos = &inner.network.config.chaos;
if !chaos.clock_drift_enabled {
return inner.current_time;
}
let max_timer = inner.current_time + chaos.clock_drift_max;
if inner.timer_time < max_timer {
let random_factor = sim_random::<f64>(); let gap = (max_timer - inner.timer_time).as_secs_f64();
let delta = random_factor * gap / 2.0;
inner.timer_time += Duration::from_secs_f64(delta);
}
inner.timer_time = inner.timer_time.max(inner.current_time);
inner.timer_time
}
#[instrument(skip(self))]
pub fn schedule_event(&self, event: Event, delay: Duration) {
let mut inner = self.inner.borrow_mut();
let scheduled_time = inner.current_time + delay;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let scheduled_event = ScheduledEvent::new(scheduled_time, event, sequence);
inner.event_queue.schedule(scheduled_event);
}
pub fn schedule_event_at(&self, event: Event, time: Duration) {
let mut inner = self.inner.borrow_mut();
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let scheduled_event = ScheduledEvent::new(time, event, sequence);
inner.event_queue.schedule(scheduled_event);
}
pub fn downgrade(&self) -> WeakSimWorld {
WeakSimWorld {
inner: Rc::downgrade(&self.inner),
}
}
pub fn has_pending_events(&self) -> bool {
!self.inner.borrow().event_queue.is_empty()
}
pub fn pending_event_count(&self) -> usize {
self.inner.borrow().event_queue.len()
}
pub fn network_provider(&self) -> SimNetworkProvider {
SimNetworkProvider::new(self.downgrade())
}
pub fn time_provider(&self) -> crate::providers::SimTimeProvider {
crate::providers::SimTimeProvider::new(self.downgrade())
}
pub fn task_provider(&self) -> crate::TokioTaskProvider {
crate::TokioTaskProvider
}
pub fn storage_provider(&self, ip: std::net::IpAddr) -> crate::storage::SimStorageProvider {
crate::storage::SimStorageProvider::new(self.downgrade(), ip)
}
pub fn set_storage_config(&mut self, config: crate::storage::StorageConfiguration) {
self.inner.borrow_mut().storage.config = config;
}
pub fn with_network_config<F, R>(&self, f: F) -> R
where
F: FnOnce(&NetworkConfiguration) -> R,
{
let inner = self.inner.borrow();
f(&inner.network.config)
}
pub(crate) fn create_listener(&self, addr: String) -> SimulationResult<ListenerId> {
let mut inner = self.inner.borrow_mut();
let listener_id = ListenerId(inner.network.next_listener_id);
inner.network.next_listener_id += 1;
inner.network.listeners.insert(
listener_id,
ListenerState {
id: listener_id,
addr,
pending_connections: VecDeque::new(),
},
);
Ok(listener_id)
}
pub(crate) fn read_from_connection(
&self,
connection_id: ConnectionId,
buf: &mut [u8],
) -> SimulationResult<usize> {
let mut inner = self.inner.borrow_mut();
if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
let mut bytes_read = 0;
while bytes_read < buf.len() && !connection.receive_buffer.is_empty() {
if let Some(byte) = connection.receive_buffer.pop_front() {
buf[bytes_read] = byte;
bytes_read += 1;
}
}
Ok(bytes_read)
} else {
Err(SimulationError::InvalidState(
"connection not found".to_string(),
))
}
}
pub(crate) fn write_to_connection(
&self,
connection_id: ConnectionId,
data: &[u8],
) -> SimulationResult<()> {
let mut inner = self.inner.borrow_mut();
if let Some(connection) = inner.network.connections.get_mut(&connection_id) {
for &byte in data {
connection.receive_buffer.push_back(byte);
}
Ok(())
} else {
Err(SimulationError::InvalidState(
"connection not found".to_string(),
))
}
}
pub(crate) fn buffer_send(
&self,
connection_id: ConnectionId,
data: Vec<u8>,
) -> SimulationResult<()> {
tracing::debug!(
"buffer_send called for connection_id={} with {} bytes",
connection_id.0,
data.len()
);
let mut inner = self.inner.borrow_mut();
if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.send_buffer.push_back(data);
tracing::debug!(
"buffer_send: added data to send_buffer, new length: {}",
conn.send_buffer.len()
);
if !conn.send_in_progress {
tracing::debug!(
"buffer_send: sender not in progress, scheduling ProcessSendBuffer event"
);
conn.send_in_progress = true;
let scheduled_time = inner.current_time + std::time::Duration::ZERO;
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let scheduled_event = ScheduledEvent::new(
scheduled_time,
Event::Network {
connection_id: connection_id.0,
operation: NetworkOperation::ProcessSendBuffer,
},
sequence,
);
inner.event_queue.schedule(scheduled_event);
tracing::debug!(
"buffer_send: scheduled ProcessSendBuffer event with sequence {}",
sequence
);
} else {
tracing::debug!(
"buffer_send: sender already in progress, not scheduling new event"
);
}
Ok(())
} else {
tracing::debug!(
"buffer_send: connection_id={} not found in connections table",
connection_id.0
);
Err(SimulationError::InvalidState(
"connection not found".to_string(),
))
}
}
pub(crate) fn create_connection_pair(
&self,
client_addr: String,
server_addr: String,
) -> SimulationResult<(ConnectionId, ConnectionId)> {
let mut inner = self.inner.borrow_mut();
let client_id = ConnectionId(inner.network.next_connection_id);
inner.network.next_connection_id += 1;
let server_id = ConnectionId(inner.network.next_connection_id);
inner.network.next_connection_id += 1;
let current_time = inner.current_time;
let client_ip = NetworkState::parse_ip_from_addr(&client_addr);
let server_ip = NetworkState::parse_ip_from_addr(&server_addr);
let ephemeral_peer_addr = match client_ip {
Some(std::net::IpAddr::V4(ipv4)) => {
let octets = ipv4.octets();
let ip_offset = sim_random_range(0u32..256) as u8;
let new_last_octet = octets[3].wrapping_add(ip_offset);
let ephemeral_ip =
std::net::Ipv4Addr::new(octets[0], octets[1], octets[2], new_last_octet);
let ephemeral_port = sim_random_range(40000u16..60000);
format!("{}:{}", ephemeral_ip, ephemeral_port)
}
Some(std::net::IpAddr::V6(ipv6)) => {
let segments = ipv6.segments();
let mut new_segments = segments;
let ip_offset = sim_random_range(0u16..256);
new_segments[7] = new_segments[7].wrapping_add(ip_offset);
let ephemeral_ip = std::net::Ipv6Addr::from(new_segments);
let ephemeral_port = sim_random_range(40000u16..60000);
format!("[{}]:{}", ephemeral_ip, ephemeral_port)
}
None => {
let ephemeral_port = sim_random_range(40000u16..60000);
format!("unknown:{}", ephemeral_port)
}
};
const DEFAULT_SEND_BUFFER_CAPACITY: usize = 64 * 1024;
inner.network.connections.insert(
client_id,
ConnectionState {
id: client_id,
addr: client_addr,
local_ip: client_ip,
remote_ip: server_ip,
peer_address: server_addr.clone(),
receive_buffer: VecDeque::new(),
paired_connection: Some(server_id),
send_buffer: VecDeque::new(),
send_in_progress: false,
next_send_time: current_time,
is_closed: false,
send_closed: false,
recv_closed: false,
is_cut: false,
cut_expiry: None,
close_reason: CloseReason::None,
send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
send_delay: None,
recv_delay: None,
is_half_open: false,
half_open_error_at: None,
is_stable: false,
graceful_close_pending: false,
last_data_delivery_scheduled_at: None,
remote_fin_received: false,
},
);
inner.network.connections.insert(
server_id,
ConnectionState {
id: server_id,
addr: server_addr,
local_ip: server_ip,
remote_ip: client_ip,
peer_address: ephemeral_peer_addr,
receive_buffer: VecDeque::new(),
paired_connection: Some(client_id),
send_buffer: VecDeque::new(),
send_in_progress: false,
next_send_time: current_time,
is_closed: false,
send_closed: false,
recv_closed: false,
is_cut: false,
cut_expiry: None,
close_reason: CloseReason::None,
send_buffer_capacity: DEFAULT_SEND_BUFFER_CAPACITY,
send_delay: None,
recv_delay: None,
is_half_open: false,
half_open_error_at: None,
is_stable: false,
graceful_close_pending: false,
last_data_delivery_scheduled_at: None,
remote_fin_received: false,
},
);
Ok((client_id, server_id))
}
pub(crate) fn register_read_waker(
&self,
connection_id: ConnectionId,
waker: Waker,
) -> SimulationResult<()> {
let mut inner = self.inner.borrow_mut();
let is_replacement = inner.wakers.read_wakers.contains_key(&connection_id);
inner.wakers.read_wakers.insert(connection_id, waker);
tracing::debug!(
"register_read_waker: connection_id={}, replacement={}, total_wakers={}",
connection_id.0,
is_replacement,
inner.wakers.read_wakers.len()
);
Ok(())
}
pub(crate) fn register_accept_waker(&self, addr: &str, waker: Waker) -> SimulationResult<()> {
let mut inner = self.inner.borrow_mut();
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
addr.hash(&mut hasher);
let listener_key = ListenerId(hasher.finish());
inner.wakers.listener_wakers.insert(listener_key, waker);
Ok(())
}
pub(crate) fn store_pending_connection(
&self,
addr: &str,
connection_id: ConnectionId,
) -> SimulationResult<()> {
let mut inner = self.inner.borrow_mut();
inner
.network
.pending_connections
.insert(addr.to_string(), connection_id);
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
addr.hash(&mut hasher);
let listener_key = ListenerId(hasher.finish());
if let Some(waker) = inner.wakers.listener_wakers.remove(&listener_key) {
waker.wake();
}
Ok(())
}
pub(crate) fn pending_connection(&self, addr: &str) -> SimulationResult<Option<ConnectionId>> {
let mut inner = self.inner.borrow_mut();
Ok(inner.network.pending_connections.remove(addr))
}
pub(crate) fn connection_peer_address(&self, connection_id: ConnectionId) -> Option<String> {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.map(|conn| conn.peer_address.clone())
}
#[instrument(skip(self))]
pub fn sleep(&self, duration: Duration) -> SleepFuture {
let task_id = self.generate_task_id();
let actual_duration = self.apply_buggified_delay(duration);
self.schedule_event(Event::Timer { task_id }, actual_duration);
SleepFuture::new(self.downgrade(), task_id)
}
fn apply_buggified_delay(&self, duration: Duration) -> Duration {
let inner = self.inner.borrow();
let chaos = &inner.network.config.chaos;
if !chaos.buggified_delay_enabled || chaos.buggified_delay_max == Duration::ZERO {
return duration;
}
if sim_random::<f64>() < chaos.buggified_delay_probability {
let random_factor = sim_random::<f64>().powf(1000.0);
let extra_delay = chaos.buggified_delay_max.mul_f64(random_factor);
tracing::trace!(
extra_delay_ms = extra_delay.as_millis(),
"Buggified delay applied"
);
duration + extra_delay
} else {
duration
}
}
fn generate_task_id(&self) -> u64 {
let mut inner = self.inner.borrow_mut();
let task_id = inner.next_task_id;
inner.next_task_id += 1;
task_id
}
fn wake_all(wakers: &mut BTreeMap<ConnectionId, Vec<Waker>>, connection_id: ConnectionId) {
if let Some(waker_list) = wakers.remove(&connection_id) {
for waker in waker_list {
waker.wake();
}
}
}
pub(crate) fn is_task_awake(&self, task_id: u64) -> SimulationResult<bool> {
let inner = self.inner.borrow();
Ok(inner.awakened_tasks.contains(&task_id))
}
pub(crate) fn register_task_waker(&self, task_id: u64, waker: Waker) -> SimulationResult<()> {
let mut inner = self.inner.borrow_mut();
inner.wakers.task_wakers.insert(task_id, waker);
Ok(())
}
fn clear_expired_clogs_with_inner(inner: &mut SimInner) {
let now = inner.current_time;
let expired: Vec<ConnectionId> = inner
.network
.connection_clogs
.iter()
.filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
.collect();
for id in expired {
inner.network.connection_clogs.remove(&id);
Self::wake_all(&mut inner.wakers.clog_wakers, id);
}
}
#[instrument(skip(inner))]
fn process_event_with_inner(inner: &mut SimInner, event: Event) {
inner.events_processed += 1;
match event {
Event::Timer { task_id } => Self::handle_timer_event(inner, task_id),
Event::Connection { id, state } => Self::handle_connection_event(inner, id, state),
Event::Network {
connection_id,
operation,
} => Self::handle_network_event(inner, connection_id, operation),
Event::Storage { file_id, operation } => {
super::storage_ops::handle_storage_event(inner, file_id, operation)
}
Event::Shutdown => Self::handle_shutdown_event(inner),
Event::ProcessRestart { ip }
| Event::ProcessGracefulShutdown { ip, .. }
| Event::ProcessForceKill { ip, .. } => {
tracing::debug!("Process lifecycle event for IP {}", ip);
}
}
}
fn handle_timer_event(inner: &mut SimInner, task_id: u64) {
inner.awakened_tasks.insert(task_id);
if let Some(waker) = inner.wakers.task_wakers.remove(&task_id) {
waker.wake();
}
}
fn handle_connection_event(inner: &mut SimInner, id: u64, state: ConnectionStateChange) {
let connection_id = ConnectionId(id);
match state {
ConnectionStateChange::BindComplete | ConnectionStateChange::ConnectionReady => {
}
ConnectionStateChange::ClogClear => {
inner.network.connection_clogs.remove(&connection_id);
Self::wake_all(&mut inner.wakers.clog_wakers, connection_id);
}
ConnectionStateChange::ReadClogClear => {
inner.network.read_clogs.remove(&connection_id);
Self::wake_all(&mut inner.wakers.read_clog_wakers, connection_id);
}
ConnectionStateChange::PartitionRestore => {
Self::clear_expired_partitions(inner);
}
ConnectionStateChange::SendPartitionClear => {
Self::clear_expired_send_partitions(inner);
}
ConnectionStateChange::RecvPartitionClear => {
Self::clear_expired_recv_partitions(inner);
}
ConnectionStateChange::CutRestore => {
if let Some(conn) = inner.network.connections.get_mut(&connection_id)
&& conn.is_cut
{
conn.is_cut = false;
conn.cut_expiry = None;
inner.emit_fault(SimFaultEvent::CutRestored { connection_id: id });
tracing::debug!("Connection {} restored via scheduled event", id);
Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
}
}
ConnectionStateChange::HalfOpenError => {
inner.emit_fault(SimFaultEvent::HalfOpenError { connection_id: id });
tracing::debug!("Connection {} half-open error time reached", id);
if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
waker.wake();
}
Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
}
}
}
fn clear_expired_partitions(inner: &mut SimInner) {
let now = inner.current_time;
let expired: Vec<_> = inner
.network
.ip_partitions
.iter()
.filter_map(|(pair, state)| (now >= state.expires_at).then_some(*pair))
.collect();
for pair in expired {
inner.network.ip_partitions.remove(&pair);
tracing::debug!("Restored IP partition {} -> {}", pair.0, pair.1);
}
}
fn clear_expired_send_partitions(inner: &mut SimInner) {
let now = inner.current_time;
let expired: Vec<_> = inner
.network
.send_partitions
.iter()
.filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
.collect();
for ip in expired {
inner.network.send_partitions.remove(&ip);
tracing::debug!("Cleared send partition for {}", ip);
}
}
fn clear_expired_recv_partitions(inner: &mut SimInner) {
let now = inner.current_time;
let expired: Vec<_> = inner
.network
.recv_partitions
.iter()
.filter_map(|(ip, &expires_at)| (now >= expires_at).then_some(*ip))
.collect();
for ip in expired {
inner.network.recv_partitions.remove(&ip);
tracing::debug!("Cleared receive partition for {}", ip);
}
}
fn handle_network_event(inner: &mut SimInner, conn_id: u64, operation: NetworkOperation) {
let connection_id = ConnectionId(conn_id);
match operation {
NetworkOperation::DataDelivery { data } => {
Self::handle_data_delivery(inner, connection_id, data);
}
NetworkOperation::ProcessSendBuffer => {
Self::handle_process_send_buffer(inner, connection_id);
}
NetworkOperation::FinDelivery => {
Self::handle_fin_delivery(inner, connection_id);
}
}
}
fn handle_data_delivery(inner: &mut SimInner, connection_id: ConnectionId, data: Vec<u8>) {
tracing::trace!(
"DataDelivery: {} bytes to connection {}",
data.len(),
connection_id.0
);
let is_stable = inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.is_stable);
if !inner.network.connections.contains_key(&connection_id) {
tracing::warn!("DataDelivery: connection {} not found", connection_id.0);
return;
}
let data_to_deliver = if is_stable {
data
} else {
Self::maybe_corrupt_data(inner, connection_id, &data)
};
let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
return;
};
for &byte in &data_to_deliver {
conn.receive_buffer.push_back(byte);
}
if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
waker.wake();
}
}
fn handle_fin_delivery(inner: &mut SimInner, connection_id: ConnectionId) {
tracing::debug!(
"FinDelivery: FIN received on connection {}",
connection_id.0
);
let is_closed = inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.is_closed);
if is_closed {
tracing::debug!(
"FinDelivery: connection {} already closed, ignoring stale FIN",
connection_id.0
);
return;
}
if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.remote_fin_received = true;
}
if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
waker.wake();
}
}
fn schedule_fin_delivery(
inner: &mut SimInner,
paired_id: Option<ConnectionId>,
last_delivery_time: Option<Duration>,
) {
let Some(peer_id) = paired_id else {
return;
};
let fin_time = match last_delivery_time {
Some(t) if t >= inner.current_time => t + Duration::from_nanos(1),
_ => inner.current_time + Duration::from_nanos(1),
};
let sequence = inner.next_sequence;
inner.next_sequence += 1;
tracing::debug!(
"Scheduling FinDelivery to connection {} at {:?}",
peer_id.0,
fin_time
);
inner.event_queue.schedule(ScheduledEvent::new(
fin_time,
Event::Network {
connection_id: peer_id.0,
operation: NetworkOperation::FinDelivery,
},
sequence,
));
}
fn maybe_corrupt_data(
inner: &mut SimInner,
connection_id: ConnectionId,
data: &[u8],
) -> Vec<u8> {
if data.is_empty() {
return data.to_vec();
}
let chaos = &inner.network.config.chaos;
let now = inner.current_time;
let cooldown_elapsed =
now.saturating_sub(inner.last_bit_flip_time) >= chaos.bit_flip_cooldown;
if !cooldown_elapsed || !crate::buggify_with_prob!(chaos.bit_flip_probability) {
return data.to_vec();
}
let random_value = sim_random::<u32>();
let flip_count = SimInner::calculate_flip_bit_count(
random_value,
chaos.bit_flip_min_bits,
chaos.bit_flip_max_bits,
);
let mut corrupted_data = data.to_vec();
let mut flipped_positions = std::collections::HashSet::new();
for _ in 0..flip_count {
let byte_idx = (sim_random::<u64>() as usize) % corrupted_data.len();
let bit_idx = (sim_random::<u64>() as usize) % 8;
let position = (byte_idx, bit_idx);
if !flipped_positions.contains(&position) {
flipped_positions.insert(position);
corrupted_data[byte_idx] ^= 1 << bit_idx;
}
}
inner.last_bit_flip_time = now;
tracing::info!(
"BitFlipInjected: bytes={} bits_flipped={} unique_positions={}",
data.len(),
flip_count,
flipped_positions.len()
);
inner.emit_fault(SimFaultEvent::BitFlip {
connection_id: connection_id.0,
flip_count: flipped_positions.len(),
});
corrupted_data
}
fn handle_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
let is_partitioned = inner
.network
.is_connection_partitioned(connection_id, inner.current_time);
if is_partitioned {
Self::handle_partitioned_send(inner, connection_id);
} else {
Self::handle_normal_send(inner, connection_id);
}
}
fn handle_partitioned_send(inner: &mut SimInner, connection_id: ConnectionId) {
let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
return;
};
if let Some(data) = conn.send_buffer.pop_front() {
tracing::debug!(
"Connection {} partitioned, failing send of {} bytes",
connection_id.0,
data.len()
);
Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
if !conn.send_buffer.is_empty() {
Self::schedule_process_send_buffer(inner, connection_id);
} else {
conn.send_in_progress = false;
if conn.graceful_close_pending {
conn.graceful_close_pending = false;
let peer_id = conn.paired_connection;
let last_time = conn.last_data_delivery_scheduled_at;
Self::schedule_fin_delivery(inner, peer_id, last_time);
}
}
} else {
conn.send_in_progress = false;
if conn.graceful_close_pending {
conn.graceful_close_pending = false;
let peer_id = conn.paired_connection;
let last_time = conn.last_data_delivery_scheduled_at;
Self::schedule_fin_delivery(inner, peer_id, last_time);
}
}
}
fn handle_normal_send(inner: &mut SimInner, connection_id: ConnectionId) {
let Some(conn) = inner.network.connections.get(&connection_id) else {
return;
};
let paired_id = conn.paired_connection;
let send_delay = conn.send_delay;
let next_send_time = conn.next_send_time;
let has_data = !conn.send_buffer.is_empty();
let is_stable = conn.is_stable;
let recv_delay = paired_id.and_then(|pid| {
inner
.network
.connections
.get(&pid)
.and_then(|c| c.recv_delay)
});
if !has_data {
if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.send_in_progress = false;
if conn.graceful_close_pending {
conn.graceful_close_pending = false;
let peer_id = conn.paired_connection;
let last_time = conn.last_data_delivery_scheduled_at;
Self::schedule_fin_delivery(inner, peer_id, last_time);
}
}
return;
}
let Some(conn) = inner.network.connections.get_mut(&connection_id) else {
return;
};
let Some(mut data) = conn.send_buffer.pop_front() else {
conn.send_in_progress = false;
return;
};
Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
if !is_stable && crate::buggify!() && !data.is_empty() {
let max_send = std::cmp::min(
data.len(),
inner.network.config.chaos.partial_write_max_bytes,
);
let truncate_to = sim_random_range(0..max_send + 1);
if truncate_to < data.len() {
let remainder = data.split_off(truncate_to);
conn.send_buffer.push_front(remainder);
tracing::debug!(
"BUGGIFY: Partial write on connection {} - sending {} bytes",
connection_id.0,
data.len()
);
}
}
let has_more = !conn.send_buffer.is_empty();
let base_delay = if has_more {
Duration::from_nanos(1)
} else {
send_delay.unwrap_or_else(|| {
crate::network::sample_duration(&inner.network.config.write_latency)
})
};
let earliest_time = std::cmp::max(inner.current_time + base_delay, next_send_time);
conn.next_send_time = earliest_time + Duration::from_nanos(1);
if let Some(paired_id) = paired_id {
let scheduled_time = earliest_time + recv_delay.unwrap_or(Duration::ZERO);
let sequence = inner.next_sequence;
inner.next_sequence += 1;
inner.event_queue.schedule(ScheduledEvent::new(
scheduled_time,
Event::Network {
connection_id: paired_id.0,
operation: NetworkOperation::DataDelivery { data },
},
sequence,
));
conn.last_data_delivery_scheduled_at = Some(scheduled_time);
}
if !conn.send_buffer.is_empty() {
Self::schedule_process_send_buffer(inner, connection_id);
} else {
conn.send_in_progress = false;
if conn.graceful_close_pending {
conn.graceful_close_pending = false;
let peer_id = conn.paired_connection;
let last_time = conn.last_data_delivery_scheduled_at;
Self::schedule_fin_delivery(inner, peer_id, last_time);
}
}
}
fn schedule_process_send_buffer(inner: &mut SimInner, connection_id: ConnectionId) {
let sequence = inner.next_sequence;
inner.next_sequence += 1;
inner.event_queue.schedule(ScheduledEvent::new(
inner.current_time,
Event::Network {
connection_id: connection_id.0,
operation: NetworkOperation::ProcessSendBuffer,
},
sequence,
));
}
fn handle_shutdown_event(inner: &mut SimInner) {
tracing::debug!("Processing Shutdown event - waking all pending tasks");
for (task_id, waker) in std::mem::take(&mut inner.wakers.task_wakers) {
tracing::trace!("Waking task {}", task_id);
waker.wake();
}
for (_conn_id, waker) in std::mem::take(&mut inner.wakers.read_wakers) {
waker.wake();
}
tracing::debug!("Shutdown event processed");
}
pub fn assertion_results(
&self,
) -> std::collections::HashMap<String, crate::chaos::AssertionStats> {
crate::chaos::assertion_results()
}
pub fn reset_assertion_results(&self) {
crate::chaos::reset_assertion_results();
}
pub fn abort_all_connections_for_ip(&self, ip: std::net::IpAddr) {
let connection_ids: Vec<ConnectionId> = {
let inner = self.inner.borrow();
inner
.network
.connections
.iter()
.filter_map(|(id, conn)| {
if conn.local_ip == Some(ip) || conn.remote_ip == Some(ip) {
Some(*id)
} else {
None
}
})
.collect()
};
let count = connection_ids.len();
for conn_id in connection_ids {
self.close_connection_abort(conn_id);
}
if count > 0 {
tracing::debug!("Aborted {} connections for rebooted IP {}", count, ip);
}
}
pub fn schedule_process_restart(
&self,
ip: std::net::IpAddr,
recovery_delay: std::time::Duration,
) {
self.schedule_event(Event::ProcessRestart { ip }, recovery_delay);
tracing::debug!(
"Scheduled process restart for IP {} in {:?}",
ip,
recovery_delay
);
}
pub fn last_processed_event(&self) -> Option<Event> {
self.inner.borrow().last_processed_event.clone()
}
pub fn extract_metrics(&self) -> crate::runner::SimulationMetrics {
let inner = self.inner.borrow();
crate::runner::SimulationMetrics {
wall_time: std::time::Duration::ZERO,
simulated_time: inner.current_time,
events_processed: inner.events_processed,
}
}
pub fn should_clog_write(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
let config = &inner.network.config;
if inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.is_stable)
{
return false;
}
if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
return inner.current_time < clog_state.expires_at;
}
config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
}
pub fn clog_write(&self, connection_id: ConnectionId) {
let mut inner = self.inner.borrow_mut();
let config = &inner.network.config;
let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
let expires_at = inner.current_time + clog_duration;
inner
.network
.connection_clogs
.insert(connection_id, ClogState { expires_at });
let clear_event = Event::Connection {
id: connection_id.0,
state: ConnectionStateChange::ClogClear,
};
let sequence = inner.next_sequence;
inner.next_sequence += 1;
inner
.event_queue
.schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
}
pub fn is_write_clogged(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
if let Some(clog_state) = inner.network.connection_clogs.get(&connection_id) {
inner.current_time < clog_state.expires_at
} else {
false
}
}
pub fn register_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
let mut inner = self.inner.borrow_mut();
inner
.wakers
.clog_wakers
.entry(connection_id)
.or_default()
.push(waker);
}
pub fn should_clog_read(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
let config = &inner.network.config;
if inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.is_stable)
{
return false;
}
if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
return inner.current_time < clog_state.expires_at;
}
config.chaos.clog_probability > 0.0 && sim_random::<f64>() < config.chaos.clog_probability
}
pub fn clog_read(&self, connection_id: ConnectionId) {
let mut inner = self.inner.borrow_mut();
let config = &inner.network.config;
let clog_duration = crate::network::sample_duration(&config.chaos.clog_duration);
let expires_at = inner.current_time + clog_duration;
inner
.network
.read_clogs
.insert(connection_id, ClogState { expires_at });
let clear_event = Event::Connection {
id: connection_id.0,
state: ConnectionStateChange::ReadClogClear,
};
let sequence = inner.next_sequence;
inner.next_sequence += 1;
inner
.event_queue
.schedule(ScheduledEvent::new(expires_at, clear_event, sequence));
}
pub fn is_read_clogged(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
if let Some(clog_state) = inner.network.read_clogs.get(&connection_id) {
inner.current_time < clog_state.expires_at
} else {
false
}
}
pub fn register_read_clog_waker(&self, connection_id: ConnectionId, waker: Waker) {
let mut inner = self.inner.borrow_mut();
inner
.wakers
.read_clog_wakers
.entry(connection_id)
.or_default()
.push(waker);
}
pub fn clear_expired_clogs(&self) {
let mut inner = self.inner.borrow_mut();
let now = inner.current_time;
let expired: Vec<ConnectionId> = inner
.network
.connection_clogs
.iter()
.filter_map(|(id, state)| (now >= state.expires_at).then_some(*id))
.collect();
for id in expired {
inner.network.connection_clogs.remove(&id);
Self::wake_all(&mut inner.wakers.clog_wakers, id);
}
}
pub fn cut_connection(&self, connection_id: ConnectionId, duration: Duration) {
let mut inner = self.inner.borrow_mut();
let expires_at = inner.current_time + duration;
if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.is_cut = true;
conn.cut_expiry = Some(expires_at);
inner.emit_fault(SimFaultEvent::ConnectionCut {
connection_id: connection_id.0,
duration_ms: duration.as_millis() as u64,
});
tracing::debug!("Connection {} cut until {:?}", connection_id.0, expires_at);
let restore_event = Event::Connection {
id: connection_id.0,
state: ConnectionStateChange::CutRestore,
};
let sequence = inner.next_sequence;
inner.next_sequence += 1;
inner
.event_queue
.schedule(ScheduledEvent::new(expires_at, restore_event, sequence));
}
}
pub fn is_connection_cut(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| {
conn.is_cut
&& conn
.cut_expiry
.is_some_and(|expiry| inner.current_time < expiry)
})
}
pub fn restore_connection(&self, connection_id: ConnectionId) {
let mut inner = self.inner.borrow_mut();
if let Some(conn) = inner.network.connections.get_mut(&connection_id)
&& conn.is_cut
{
conn.is_cut = false;
conn.cut_expiry = None;
tracing::debug!("Connection {} restored", connection_id.0);
Self::wake_all(&mut inner.wakers.cut_wakers, connection_id);
}
}
pub fn register_cut_waker(&self, connection_id: ConnectionId, waker: Waker) {
let mut inner = self.inner.borrow_mut();
inner
.wakers
.cut_wakers
.entry(connection_id)
.or_default()
.push(waker);
}
pub fn send_buffer_capacity(&self, connection_id: ConnectionId) -> usize {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.map(|conn| conn.send_buffer_capacity)
.unwrap_or(0)
}
pub fn send_buffer_used(&self, connection_id: ConnectionId) -> usize {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.map(|conn| conn.send_buffer.iter().map(|v| v.len()).sum())
.unwrap_or(0)
}
pub fn available_send_buffer(&self, connection_id: ConnectionId) -> usize {
let capacity = self.send_buffer_capacity(connection_id);
let used = self.send_buffer_used(connection_id);
capacity.saturating_sub(used)
}
pub fn register_send_buffer_waker(&self, connection_id: ConnectionId, waker: Waker) {
let mut inner = self.inner.borrow_mut();
inner
.wakers
.send_buffer_wakers
.entry(connection_id)
.or_default()
.push(waker);
}
#[allow(dead_code)] fn wake_send_buffer_waiters(&self, connection_id: ConnectionId) {
let mut inner = self.inner.borrow_mut();
Self::wake_all(&mut inner.wakers.send_buffer_wakers, connection_id);
}
pub fn pair_latency(&self, src: IpAddr, dst: IpAddr) -> Option<Duration> {
let inner = self.inner.borrow();
inner.network.pair_latencies.get(&(src, dst)).copied()
}
pub fn set_pair_latency_if_not_set(
&self,
src: IpAddr,
dst: IpAddr,
latency: Duration,
) -> Duration {
let mut inner = self.inner.borrow_mut();
*inner
.network
.pair_latencies
.entry((src, dst))
.or_insert_with(|| {
tracing::debug!(
"Setting base latency for IP pair {} -> {} to {:?}",
src,
dst,
latency
);
latency
})
}
pub fn connection_base_latency(&self, connection_id: ConnectionId) -> Duration {
let inner = self.inner.borrow();
let (local_ip, remote_ip) = inner
.network
.connections
.get(&connection_id)
.and_then(|conn| Some((conn.local_ip?, conn.remote_ip?)))
.unwrap_or({
(
IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
IpAddr::V4(std::net::Ipv4Addr::UNSPECIFIED),
)
});
drop(inner);
if let Some(latency) = self.pair_latency(local_ip, remote_ip) {
return latency;
}
let latency = self
.with_network_config(|config| crate::network::sample_duration(&config.write_latency));
self.set_pair_latency_if_not_set(local_ip, remote_ip, latency)
}
pub fn send_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.and_then(|conn| conn.send_delay)
}
pub fn recv_delay(&self, connection_id: ConnectionId) -> Option<Duration> {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.and_then(|conn| conn.recv_delay)
}
pub fn set_asymmetric_delays(
&self,
connection_id: ConnectionId,
send_delay: Option<Duration>,
recv_delay: Option<Duration>,
) {
let mut inner = self.inner.borrow_mut();
if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.send_delay = send_delay;
conn.recv_delay = recv_delay;
tracing::debug!(
"Connection {} asymmetric delays set: send={:?}, recv={:?}",
connection_id.0,
send_delay,
recv_delay
);
}
}
pub fn is_connection_closed(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.is_closed)
}
pub fn close_connection(&self, connection_id: ConnectionId) {
self.close_connection_with_reason(connection_id, CloseReason::Graceful);
}
pub fn close_connection_abort(&self, connection_id: ConnectionId) {
self.close_connection_with_reason(connection_id, CloseReason::Aborted);
}
pub fn close_reason(&self, connection_id: ConnectionId) -> CloseReason {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.map(|conn| conn.close_reason)
.unwrap_or(CloseReason::None)
}
fn close_connection_with_reason(&self, connection_id: ConnectionId, reason: CloseReason) {
match reason {
CloseReason::Graceful => self.close_connection_graceful(connection_id),
CloseReason::Aborted => self.close_connection_aborted(connection_id),
CloseReason::None => {}
}
}
fn close_connection_graceful(&self, connection_id: ConnectionId) {
let mut inner = self.inner.borrow_mut();
let conn_info = inner.network.connections.get(&connection_id).map(|conn| {
(
conn.paired_connection,
conn.send_closed,
conn.is_closed,
conn.send_in_progress,
conn.send_buffer.is_empty(),
conn.last_data_delivery_scheduled_at,
)
});
let Some((
paired_id,
was_send_closed,
was_closed,
send_in_progress,
send_buffer_empty,
last_delivery_time,
)) = conn_info
else {
return;
};
if was_closed || was_send_closed {
tracing::debug!(
"Connection {} already closed/send_closed, skipping graceful close",
connection_id.0
);
return;
}
if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.is_closed = true;
conn.close_reason = CloseReason::Graceful;
conn.send_closed = true;
tracing::debug!(
"Connection {} graceful close (FIN) - local write shut down",
connection_id.0
);
}
if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
waker.wake();
}
if send_in_progress || !send_buffer_empty {
if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.graceful_close_pending = true;
tracing::debug!(
"Connection {} graceful close deferred (send pipeline active)",
connection_id.0
);
}
} else {
Self::schedule_fin_delivery(&mut inner, paired_id, last_delivery_time);
}
}
fn close_connection_aborted(&self, connection_id: ConnectionId) {
let mut inner = self.inner.borrow_mut();
let paired_connection_id = inner
.network
.connections
.get(&connection_id)
.and_then(|conn| conn.paired_connection);
if let Some(conn) = inner.network.connections.get_mut(&connection_id)
&& !conn.is_closed
{
conn.is_closed = true;
conn.close_reason = CloseReason::Aborted;
conn.graceful_close_pending = false;
tracing::debug!(
"Connection {} closed permanently (reason: Aborted)",
connection_id.0
);
}
if let Some(paired_id) = paired_connection_id
&& let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
&& !paired_conn.is_closed
{
paired_conn.is_closed = true;
paired_conn.close_reason = CloseReason::Aborted;
tracing::debug!(
"Paired connection {} also closed (reason: Aborted)",
paired_id.0
);
}
if let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
tracing::debug!(
"Waking read waker for aborted connection {}",
connection_id.0
);
waker.wake();
}
if let Some(paired_id) = paired_connection_id
&& let Some(paired_waker) = inner.wakers.read_wakers.remove(&paired_id)
{
tracing::debug!(
"Waking read waker for paired aborted connection {}",
paired_id.0
);
paired_waker.wake();
}
}
pub fn close_connection_asymmetric(
&self,
connection_id: ConnectionId,
close_send: bool,
close_recv: bool,
) {
let mut inner = self.inner.borrow_mut();
let paired_id = inner
.network
.connections
.get(&connection_id)
.and_then(|conn| conn.paired_connection);
if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.send_closed = true;
conn.send_buffer.clear();
tracing::debug!(
"Connection {} send side closed (asymmetric)",
connection_id.0
);
}
if close_recv
&& let Some(paired) = paired_id
&& let Some(paired_conn) = inner.network.connections.get_mut(&paired)
{
paired_conn.recv_closed = true;
tracing::debug!(
"Connection {} recv side closed (asymmetric via peer)",
paired.0
);
}
if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
waker.wake();
}
if close_recv
&& let Some(paired) = paired_id
&& let Some(waker) = inner.wakers.read_wakers.remove(&paired)
{
waker.wake();
}
}
pub fn roll_random_close(&self, connection_id: ConnectionId) -> Option<bool> {
let mut inner = self.inner.borrow_mut();
let config = &inner.network.config;
if inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.is_stable)
{
return None;
}
if config.chaos.random_close_probability <= 0.0 {
return None;
}
let current_time = inner.current_time;
let time_since_last = current_time.saturating_sub(inner.network.last_random_close_time);
if time_since_last < config.chaos.random_close_cooldown {
return None;
}
if !crate::buggify_with_prob!(config.chaos.random_close_probability) {
return None;
}
inner.network.last_random_close_time = current_time;
inner.emit_fault(SimFaultEvent::RandomClose {
connection_id: connection_id.0,
});
let paired_id = inner
.network
.connections
.get(&connection_id)
.and_then(|conn| conn.paired_connection);
let a = super::rng::sim_random_f64();
let close_recv = a < 0.66;
let close_send = a > 0.33;
tracing::info!(
"Random connection failure triggered on connection {} (send={}, recv={}, a={:.3})",
connection_id.0,
close_send,
close_recv,
a
);
if close_send && let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.send_closed = true;
conn.send_buffer.clear();
}
if close_recv
&& let Some(paired) = paired_id
&& let Some(paired_conn) = inner.network.connections.get_mut(&paired)
{
paired_conn.recv_closed = true;
}
if close_send && let Some(waker) = inner.wakers.read_wakers.remove(&connection_id) {
waker.wake();
}
if close_recv
&& let Some(paired) = paired_id
&& let Some(waker) = inner.wakers.read_wakers.remove(&paired)
{
waker.wake();
}
let b = super::rng::sim_random_f64();
let explicit = b < inner.network.config.chaos.random_close_explicit_ratio;
tracing::debug!(
"Random close explicit={} (b={:.3}, ratio={:.2})",
explicit,
b,
inner.network.config.chaos.random_close_explicit_ratio
);
Some(explicit)
}
pub fn is_send_closed(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.send_closed || conn.is_closed)
}
pub fn is_recv_closed(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.recv_closed || conn.is_closed)
}
pub fn is_remote_fin_received(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.remote_fin_received)
}
pub fn simulate_peer_crash(&self, connection_id: ConnectionId, error_delay: Duration) {
let mut inner = self.inner.borrow_mut();
let current_time = inner.current_time;
let error_at = current_time + error_delay;
if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.is_half_open = true;
conn.half_open_error_at = Some(error_at);
conn.paired_connection = None;
inner.emit_fault(SimFaultEvent::PeerCrash {
connection_id: connection_id.0,
});
tracing::info!(
"Connection {} now half-open, errors manifest at {:?}",
connection_id.0,
error_at
);
}
let wake_event = Event::Connection {
id: connection_id.0,
state: ConnectionStateChange::HalfOpenError,
};
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let scheduled_event = ScheduledEvent::new(error_at, wake_event, sequence);
inner.event_queue.schedule(scheduled_event);
}
pub fn is_half_open(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.is_half_open)
}
pub fn should_half_open_error(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
let current_time = inner.current_time;
inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| {
conn.is_half_open
&& conn
.half_open_error_at
.is_some_and(|error_at| current_time >= error_at)
})
}
pub fn mark_connection_stable(&self, connection_id: ConnectionId) {
let mut inner = self.inner.borrow_mut();
if let Some(conn) = inner.network.connections.get_mut(&connection_id) {
conn.is_stable = true;
tracing::debug!("Connection {} marked as stable", connection_id.0);
if let Some(paired_id) = conn.paired_connection
&& let Some(paired_conn) = inner.network.connections.get_mut(&paired_id)
{
paired_conn.is_stable = true;
tracing::debug!("Paired connection {} also marked as stable", paired_id.0);
}
}
}
pub fn is_connection_stable(&self, connection_id: ConnectionId) -> bool {
let inner = self.inner.borrow();
inner
.network
.connections
.get(&connection_id)
.is_some_and(|conn| conn.is_stable)
}
pub fn partition_pair(
&self,
from_ip: std::net::IpAddr,
to_ip: std::net::IpAddr,
duration: Duration,
) -> SimulationResult<()> {
let mut inner = self.inner.borrow_mut();
let expires_at = inner.current_time + duration;
inner
.network
.ip_partitions
.insert((from_ip, to_ip), PartitionState { expires_at });
let restore_event = Event::Connection {
id: 0,
state: ConnectionStateChange::PartitionRestore,
};
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
inner.event_queue.schedule(scheduled_event);
inner.emit_fault(SimFaultEvent::PartitionCreated {
from: from_ip.to_string(),
to: to_ip.to_string(),
});
tracing::debug!(
"Partitioned {} -> {} until {:?}",
from_ip,
to_ip,
expires_at
);
Ok(())
}
pub fn partition_send_from(
&self,
ip: std::net::IpAddr,
duration: Duration,
) -> SimulationResult<()> {
let mut inner = self.inner.borrow_mut();
let expires_at = inner.current_time + duration;
inner.network.send_partitions.insert(ip, expires_at);
let clear_event = Event::Connection {
id: 0,
state: ConnectionStateChange::SendPartitionClear,
};
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
inner.event_queue.schedule(scheduled_event);
inner.emit_fault(SimFaultEvent::SendPartitionCreated { ip: ip.to_string() });
tracing::debug!("Partitioned sends from {} until {:?}", ip, expires_at);
Ok(())
}
pub fn partition_recv_to(
&self,
ip: std::net::IpAddr,
duration: Duration,
) -> SimulationResult<()> {
let mut inner = self.inner.borrow_mut();
let expires_at = inner.current_time + duration;
inner.network.recv_partitions.insert(ip, expires_at);
let clear_event = Event::Connection {
id: 0,
state: ConnectionStateChange::RecvPartitionClear,
};
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let scheduled_event = ScheduledEvent::new(expires_at, clear_event, sequence);
inner.event_queue.schedule(scheduled_event);
inner.emit_fault(SimFaultEvent::RecvPartitionCreated { ip: ip.to_string() });
tracing::debug!("Partitioned receives to {} until {:?}", ip, expires_at);
Ok(())
}
pub fn restore_partition(
&self,
from_ip: std::net::IpAddr,
to_ip: std::net::IpAddr,
) -> SimulationResult<()> {
let mut inner = self.inner.borrow_mut();
inner.network.ip_partitions.remove(&(from_ip, to_ip));
inner.emit_fault(SimFaultEvent::PartitionHealed {
from: from_ip.to_string(),
to: to_ip.to_string(),
});
tracing::debug!("Restored partition {} -> {}", from_ip, to_ip);
Ok(())
}
pub fn is_partitioned(
&self,
from_ip: std::net::IpAddr,
to_ip: std::net::IpAddr,
) -> SimulationResult<bool> {
let inner = self.inner.borrow();
Ok(inner
.network
.is_partitioned(from_ip, to_ip, inner.current_time))
}
fn randomly_trigger_partitions_with_inner(inner: &mut SimInner) {
let partition_config = &inner.network.config;
if partition_config.chaos.partition_probability == 0.0 {
return;
}
if sim_random::<f64>() >= partition_config.chaos.partition_probability {
return;
}
let unique_ips: HashSet<IpAddr> = inner
.network
.connections
.values()
.filter_map(|conn| conn.local_ip)
.collect();
if unique_ips.len() < 2 {
return; }
let ip_list: Vec<IpAddr> = unique_ips.into_iter().collect();
let partition_duration =
crate::network::sample_duration(&partition_config.chaos.partition_duration);
let expires_at = inner.current_time + partition_duration;
let partitioned_ips: Vec<IpAddr> = match partition_config.chaos.partition_strategy {
PartitionStrategy::Random => {
ip_list
.iter()
.filter(|_| sim_random::<f64>() < 0.5)
.copied()
.collect()
}
PartitionStrategy::UniformSize => {
let partition_size = sim_random_range(1..ip_list.len());
let mut shuffled = ip_list.clone();
for i in (1..shuffled.len()).rev() {
let j = sim_random_range(0..i + 1);
shuffled.swap(i, j);
}
shuffled.into_iter().take(partition_size).collect()
}
PartitionStrategy::IsolateSingle => {
let idx = sim_random_range(0..ip_list.len());
vec![ip_list[idx]]
}
};
if partitioned_ips.is_empty() || partitioned_ips.len() == ip_list.len() {
return;
}
let non_partitioned: Vec<IpAddr> = ip_list
.iter()
.filter(|ip| !partitioned_ips.contains(ip))
.copied()
.collect();
for &from_ip in &partitioned_ips {
for &to_ip in &non_partitioned {
if inner
.network
.is_partitioned(from_ip, to_ip, inner.current_time)
{
continue;
}
inner
.network
.ip_partitions
.insert((from_ip, to_ip), PartitionState { expires_at });
inner
.network
.ip_partitions
.insert((to_ip, from_ip), PartitionState { expires_at });
inner.emit_fault(SimFaultEvent::PartitionCreated {
from: from_ip.to_string(),
to: to_ip.to_string(),
});
tracing::debug!(
"Partition triggered: {} <-> {} until {:?} (strategy: {:?})",
from_ip,
to_ip,
expires_at,
partition_config.chaos.partition_strategy
);
}
}
let restore_event = Event::Connection {
id: 0,
state: ConnectionStateChange::PartitionRestore,
};
let sequence = inner.next_sequence;
inner.next_sequence += 1;
let scheduled_event = ScheduledEvent::new(expires_at, restore_event, sequence);
inner.event_queue.schedule(scheduled_event);
}
}
impl Default for SimWorld {
fn default() -> Self {
Self::new()
}
}
#[derive(Debug)]
pub struct WeakSimWorld {
pub(crate) inner: Weak<RefCell<SimInner>>,
}
macro_rules! weak_forward {
(wrap $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
$(#[$meta])*
pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
Ok(self.upgrade()?.$method($($arg),*))
}
};
(pass $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*) -> $ret:ty) => {
$(#[$meta])*
pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<$ret> {
self.upgrade()?.$method($($arg),*)
}
};
(unit $(#[$meta:meta])* $method:ident(&self $(, $arg:ident : $arg_ty:ty)*)) => {
$(#[$meta])*
pub fn $method(&self $(, $arg: $arg_ty)*) -> SimulationResult<()> {
self.upgrade()?.$method($($arg),*);
Ok(())
}
};
}
impl WeakSimWorld {
pub fn upgrade(&self) -> SimulationResult<SimWorld> {
self.inner
.upgrade()
.map(|inner| SimWorld { inner })
.ok_or(SimulationError::SimulationShutdown)
}
weak_forward!(wrap #[doc = "Returns the current simulation time."] current_time(&self) -> Duration);
weak_forward!(wrap #[doc = "Returns the exact simulation time (equivalent to FDB's now())."] now(&self) -> Duration);
weak_forward!(wrap #[doc = "Returns the drifted timer time (equivalent to FDB's timer())."] timer(&self) -> Duration);
weak_forward!(unit #[doc = "Schedules an event to execute after the specified delay."] schedule_event(&self, event: Event, delay: Duration));
weak_forward!(unit #[doc = "Schedules an event to execute at the specified absolute time."] schedule_event_at(&self, event: Event, time: Duration));
weak_forward!(pass #[doc = "Read data from connection's receive buffer."] read_from_connection(&self, connection_id: ConnectionId, buf: &mut [u8]) -> usize);
weak_forward!(pass #[doc = "Write data to connection's receive buffer."] write_to_connection(&self, connection_id: ConnectionId, data: &[u8]) -> ());
weak_forward!(pass #[doc = "Buffer data for ordered sending on a connection."] buffer_send(&self, connection_id: ConnectionId, data: Vec<u8>) -> ());
weak_forward!(wrap #[doc = "Get a network provider for the simulation."] network_provider(&self) -> SimNetworkProvider);
weak_forward!(wrap #[doc = "Get a time provider for the simulation."] time_provider(&self) -> crate::providers::SimTimeProvider);
weak_forward!(wrap #[doc = "Sleep for the specified duration in simulation time."] sleep(&self, duration: Duration) -> SleepFuture);
pub fn with_network_config<F, R>(&self, f: F) -> SimulationResult<R>
where
F: FnOnce(&NetworkConfiguration) -> R,
{
Ok(self.upgrade()?.with_network_config(f))
}
}
impl Clone for WeakSimWorld {
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn sim_world_basic_lifecycle() {
let mut sim = SimWorld::new();
assert_eq!(sim.current_time(), Duration::ZERO);
assert!(!sim.has_pending_events());
assert_eq!(sim.pending_event_count(), 0);
sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
assert!(sim.has_pending_events());
assert_eq!(sim.pending_event_count(), 1);
assert_eq!(sim.current_time(), Duration::ZERO);
let has_more = sim.step();
assert!(!has_more);
assert_eq!(sim.current_time(), Duration::from_millis(100));
assert!(!sim.has_pending_events());
assert_eq!(sim.pending_event_count(), 0);
}
#[test]
fn sim_world_multiple_events() {
let mut sim = SimWorld::new();
sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
assert_eq!(sim.pending_event_count(), 3);
assert!(sim.step());
assert_eq!(sim.current_time(), Duration::from_millis(100));
assert_eq!(sim.pending_event_count(), 2);
assert!(sim.step());
assert_eq!(sim.current_time(), Duration::from_millis(200));
assert_eq!(sim.pending_event_count(), 1);
assert!(!sim.step());
assert_eq!(sim.current_time(), Duration::from_millis(300));
assert_eq!(sim.pending_event_count(), 0);
}
#[test]
fn sim_world_run_until_empty() {
let mut sim = SimWorld::new();
sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200));
sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(300));
sim.run_until_empty();
assert_eq!(sim.current_time(), Duration::from_millis(300));
assert!(!sim.has_pending_events());
}
#[test]
fn sim_world_schedule_at_specific_time() {
let mut sim = SimWorld::new();
sim.schedule_event_at(Event::Timer { task_id: 1 }, Duration::from_millis(500));
assert_eq!(sim.current_time(), Duration::ZERO);
sim.step();
assert_eq!(sim.current_time(), Duration::from_millis(500));
}
#[test]
fn weak_sim_world_lifecycle() {
let sim = SimWorld::new();
let weak = sim.downgrade();
assert_eq!(
weak.current_time().expect("should get time"),
Duration::ZERO
);
weak.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100))
.expect("should schedule event");
assert!(sim.has_pending_events());
drop(sim);
assert_eq!(
weak.current_time(),
Err(SimulationError::SimulationShutdown)
);
assert_eq!(
weak.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(200)),
Err(SimulationError::SimulationShutdown)
);
}
#[test]
fn deterministic_event_ordering() {
let mut sim = SimWorld::new();
sim.schedule_event(Event::Timer { task_id: 2 }, Duration::from_millis(100));
sim.schedule_event(Event::Timer { task_id: 1 }, Duration::from_millis(100));
sim.schedule_event(Event::Timer { task_id: 3 }, Duration::from_millis(100));
assert!(sim.step());
assert_eq!(sim.current_time(), Duration::from_millis(100));
assert!(sim.step());
assert_eq!(sim.current_time(), Duration::from_millis(100));
assert!(!sim.step());
assert_eq!(sim.current_time(), Duration::from_millis(100));
}
#[test]
fn emit_fault_without_state_is_noop() {
let inner = SimInner::new();
assert!(inner.state.is_none());
inner.emit_fault(SimFaultEvent::StorageCrash {
ip: "10.0.1.1".to_string(),
});
}
#[test]
fn emit_fault_with_state_writes_to_timeline() {
let mut inner = SimInner::new();
let state = StateHandle::new();
inner.state = Some(state.clone());
inner.current_time = Duration::from_millis(500);
inner.emit_fault(SimFaultEvent::StorageCrash {
ip: "10.0.1.1".to_string(),
});
let tl = state
.timeline::<SimFaultEvent>(SIM_FAULT_TIMELINE)
.expect("timeline should exist");
assert_eq!(tl.len(), 1);
let entry = tl.last().expect("should have entry");
assert_eq!(entry.time_ms, 500);
assert_eq!(entry.source, "sim");
assert!(matches!(entry.event, SimFaultEvent::StorageCrash { .. }));
}
#[test]
fn partition_pair_emits_fault_event() {
let sim = SimWorld::new();
let state = StateHandle::new();
sim.set_state(state.clone());
let from: std::net::IpAddr = "10.0.1.1".parse().expect("valid ip");
let to: std::net::IpAddr = "10.0.1.2".parse().expect("valid ip");
sim.partition_pair(from, to, Duration::from_secs(10))
.expect("partition should succeed");
let tl = state
.timeline::<SimFaultEvent>(SIM_FAULT_TIMELINE)
.expect("timeline should exist");
assert_eq!(tl.len(), 1);
assert!(matches!(
&tl.all()[0].event,
SimFaultEvent::PartitionCreated { from, to }
if from == "10.0.1.1" && to == "10.0.1.2"
));
}
#[test]
fn restore_partition_emits_fault_event() {
let sim = SimWorld::new();
let state = StateHandle::new();
sim.set_state(state.clone());
let from: std::net::IpAddr = "10.0.1.1".parse().expect("valid ip");
let to: std::net::IpAddr = "10.0.1.2".parse().expect("valid ip");
sim.partition_pair(from, to, Duration::from_secs(10))
.expect("partition");
sim.restore_partition(from, to).expect("restore");
let tl = state
.timeline::<SimFaultEvent>(SIM_FAULT_TIMELINE)
.expect("timeline should exist");
assert_eq!(tl.len(), 2);
assert!(matches!(
&tl.all()[0].event,
SimFaultEvent::PartitionCreated { .. }
));
assert!(matches!(
&tl.all()[1].event,
SimFaultEvent::PartitionHealed { .. }
));
}
}