1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
use super::stream::{SimTcpListener, SimTcpStream};
use crate::NetworkProvider;
use crate::buggify;
use crate::network::ConnectFailureMode;
use crate::sim::rng::sim_random;
use crate::{Event, WeakSimWorld};
use async_trait::async_trait;
use std::io;
use tracing::instrument;
/// Simulated networking implementation
#[derive(Debug, Clone)]
pub struct SimNetworkProvider {
sim: WeakSimWorld,
}
impl SimNetworkProvider {
/// Create a new simulated network provider
pub fn new(sim: WeakSimWorld) -> Self {
Self { sim }
}
/// Sleep in simulation time.
///
/// This allows workloads to introduce delays for coordination purposes.
/// The sleep completes when the simulation processes the corresponding Wake event.
pub fn sleep(
&self,
duration: std::time::Duration,
) -> crate::SimulationResult<crate::SleepFuture> {
self.sim.sleep(duration)
}
}
#[async_trait(?Send)]
impl NetworkProvider for SimNetworkProvider {
type TcpStream = SimTcpStream;
type TcpListener = SimTcpListener;
#[instrument(skip(self))]
async fn bind(&self, addr: &str) -> io::Result<Self::TcpListener> {
let sim = self
.sim
.upgrade()
.map_err(|_| io::Error::other("simulation shutdown"))?;
// Get bind delay from network configuration and schedule bind completion event
let delay =
sim.with_network_config(|config| crate::network::sample_duration(&config.bind_latency));
// Schedule bind completion event to advance simulation time
let listener_id = sim
.create_listener(addr.to_string())
.map_err(|e| io::Error::other(format!("Failed to create listener: {}", e)))?;
// Schedule an event to simulate the bind delay - this advances simulation time
sim.schedule_event(
Event::Connection {
id: listener_id.0,
state: crate::ConnectionStateChange::BindComplete,
},
delay,
);
let listener = SimTcpListener::new(self.sim.clone(), listener_id, addr.to_string());
Ok(listener)
}
/// Connect to a remote address.
///
/// When chaos is enabled, connection establishment can fail or hang forever
/// based on the connect_failure_mode setting (FDB ref: sim2.actor.cpp:1243-1250):
/// - Disabled: Normal operation (no failure injection)
/// - AlwaysFail: Always fail with ConnectionRefused when buggified
/// - Probabilistic: 50% fail with error, 50% hang forever (tests timeout handling)
#[instrument(skip(self))]
async fn connect(&self, addr: &str) -> io::Result<Self::TcpStream> {
let sim = self
.sim
.upgrade()
.map_err(|_| io::Error::other("simulation shutdown"))?;
// Check connect failure mode (FDB SIM_CONNECT_ERROR_MODE pattern)
// FDB ref: sim2.actor.cpp:1243-1250
let (failure_mode, failure_probability) = sim.with_network_config(|config| {
(
config.chaos.connect_failure_mode,
config.chaos.connect_failure_probability,
)
});
match failure_mode {
ConnectFailureMode::Disabled => {} // Normal operation
ConnectFailureMode::AlwaysFail => {
// Always fail with connection_failed when buggified
if buggify!() {
tracing::debug!(addr = %addr, "Connection establishment failed (AlwaysFail mode)");
return Err(io::Error::new(
io::ErrorKind::ConnectionRefused,
"Connection establishment failed (AlwaysFail mode)",
));
}
}
ConnectFailureMode::Probabilistic => {
// Probabilistic - fail or hang forever
if buggify!() {
if sim_random::<f64>() > failure_probability {
// Throw connection_failed error
tracing::debug!(addr = %addr, "Connection establishment failed (Probabilistic mode - error)");
return Err(io::Error::new(
io::ErrorKind::ConnectionRefused,
"Connection establishment failed (Probabilistic mode)",
));
} else {
// Hang forever - create a future that never completes
// This tests timeout handling in connection retry logic
tracing::debug!(addr = %addr, "Connection hanging forever (Probabilistic mode - hang)");
std::future::pending::<()>().await;
unreachable!("pending() never resolves");
}
}
}
}
// Get connect delay from network configuration and schedule connection event
let delay = sim
.with_network_config(|config| crate::network::sample_duration(&config.connect_latency));
// Create a connection pair for bidirectional communication
let (client_id, server_id) = sim
.create_connection_pair("client-addr".to_string(), addr.to_string())
.map_err(|e| io::Error::other(format!("Failed to create connection pair: {}", e)))?;
// Store the server side for accept() to pick up later
sim.store_pending_connection(addr, server_id)
.map_err(|e| io::Error::other(format!("Failed to store pending connection: {}", e)))?;
// Schedule connection ready event to advance simulation time
sim.schedule_event(
Event::Connection {
id: client_id.0,
state: crate::ConnectionStateChange::ConnectionReady,
},
delay,
);
let stream = SimTcpStream::new(self.sim.clone(), client_id);
Ok(stream)
}
}