moonpool_sim/sim/state.rs
1//! Network state management for simulation.
2//!
3//! This module provides internal state types for managing connections,
4//! listeners, partitions, and clogs in the simulation environment.
5
6use std::{
7 collections::{HashMap, VecDeque},
8 net::IpAddr,
9 time::Duration,
10};
11
12use crate::network::{
13 NetworkConfiguration,
14 sim::{ConnectionId, ListenerId},
15};
16
17/// Simple clog state - just tracks when it expires
18#[derive(Debug)]
19pub struct ClogState {
20 /// When the clog expires and writes can resume (in simulation time)
21 pub expires_at: Duration,
22}
23
24/// Reason for connection closure - distinguishes FIN vs RST semantics.
25///
26/// In real TCP:
27/// - FIN (graceful close): Peer gets EOF on read, writes may still work briefly
28/// - RST (aborted close): Peer gets ECONNRESET immediately on both read and write
29#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
30pub enum CloseReason {
31 /// Connection is not closed
32 #[default]
33 None,
34 /// Graceful close (FIN) - peer will get EOF on read
35 Graceful,
36 /// Aborted close (RST) - peer will get ECONNRESET
37 Aborted,
38}
39
40/// Network partition state between two IP addresses
41#[derive(Debug, Clone)]
42pub struct PartitionState {
43 /// When the partition expires and connectivity is restored (in simulation time)
44 pub expires_at: Duration,
45}
46
47/// Internal connection state for simulation
48#[derive(Debug, Clone)]
49pub struct ConnectionState {
50 /// Unique identifier for this connection within the simulation.
51 #[allow(dead_code)]
52 pub id: ConnectionId,
53
54 /// Network address this connection is associated with.
55 #[allow(dead_code)]
56 pub addr: String,
57
58 /// Local IP address for this connection
59 pub local_ip: Option<IpAddr>,
60
61 /// Remote IP address for this connection
62 pub remote_ip: Option<IpAddr>,
63
64 /// Peer address as seen by this connection.
65 ///
66 /// FDB Pattern (sim2.actor.cpp:1149-1175):
67 /// - For client-side connections: the destination address (server's listening address)
68 /// - For server-side connections: synthesized ephemeral address (random IP + port 40000-60000)
69 ///
70 /// This simulates real TCP behavior where servers see client ephemeral ports,
71 /// not the client's identity address. As FDB notes: "In the case of an incoming
72 /// connection, this may not be an address we can connect to!"
73 pub peer_address: String,
74
75 /// FIFO buffer for incoming data that hasn't been read by the application yet.
76 pub receive_buffer: VecDeque<u8>,
77
78 /// Reference to the other end of this bidirectional TCP connection.
79 pub paired_connection: Option<ConnectionId>,
80
81 /// FIFO buffer for outgoing data waiting to be sent over the network.
82 pub send_buffer: VecDeque<Vec<u8>>,
83
84 /// Flag indicating whether a `ProcessSendBuffer` event is currently scheduled.
85 pub send_in_progress: bool,
86
87 /// Next available time for sending messages from this connection.
88 pub next_send_time: Duration,
89
90 /// Whether this connection has been permanently closed by one of the endpoints
91 pub is_closed: bool,
92
93 /// Whether the send side is closed (writes will fail) - for asymmetric closure
94 /// FDB: closeInternal() on self closes send capability
95 pub send_closed: bool,
96
97 /// Whether the receive side is closed (reads return EOF) - for asymmetric closure
98 /// FDB: closeInternal() on peer closes recv capability
99 pub recv_closed: bool,
100
101 /// Whether this connection is temporarily cut (will be restored).
102 /// Unlike `is_closed`, a cut connection can be restored after a duration.
103 /// This simulates temporary network outages where the connection is not
104 /// permanently severed but temporarily unavailable.
105 pub is_cut: bool,
106
107 /// When the cut expires and the connection is restored (in simulation time).
108 /// Only meaningful when `is_cut` is true.
109 pub cut_expiry: Option<Duration>,
110
111 /// Reason for connection closure - distinguishes FIN vs RST semantics.
112 /// When `is_closed` is true, this indicates whether it was graceful or aborted.
113 pub close_reason: CloseReason,
114
115 /// Send buffer capacity in bytes.
116 /// When the send buffer reaches this limit, poll_write returns Pending.
117 /// Calculated from BDP (Bandwidth-Delay Product): latency × bandwidth.
118 pub send_buffer_capacity: usize,
119
120 /// Per-connection send delay override (asymmetric latency).
121 /// If set, this delay is applied when sending data from this connection.
122 /// If None, the global write_latency from NetworkConfiguration is used.
123 pub send_delay: Option<Duration>,
124
125 /// Per-connection receive delay override (asymmetric latency).
126 /// If set, this delay is applied when receiving data on this connection.
127 /// If None, the global read_latency from NetworkConfiguration is used.
128 pub recv_delay: Option<Duration>,
129
130 /// Whether this connection is in a half-open state (peer crashed).
131 /// In this state:
132 /// - Local side still thinks it's connected
133 /// - Writes succeed but data is silently discarded
134 /// - Reads block waiting for data that will never come
135 /// - After `half_open_error_at`, errors start manifesting
136 pub is_half_open: bool,
137
138 /// When a half-open connection starts returning errors (in simulation time).
139 /// Before this time: writes succeed (data dropped), reads block.
140 /// After this time: both read and write return ECONNRESET.
141 pub half_open_error_at: Option<Duration>,
142}
143
144/// Internal listener state for simulation
145#[derive(Debug)]
146pub struct ListenerState {
147 /// Unique identifier for this listener.
148 #[allow(dead_code)]
149 pub id: ListenerId,
150 /// Network address this listener is bound to.
151 #[allow(dead_code)]
152 pub addr: String,
153 /// Queue of pending connections waiting to be accepted.
154 #[allow(dead_code)]
155 pub pending_connections: VecDeque<ConnectionId>,
156}
157
158/// Network-related state management
159#[derive(Debug)]
160pub struct NetworkState {
161 /// Counter for generating unique connection IDs.
162 pub next_connection_id: u64,
163 /// Counter for generating unique listener IDs.
164 pub next_listener_id: u64,
165 /// Network configuration for this simulation.
166 pub config: NetworkConfiguration,
167 /// Active connections indexed by their ID.
168 pub connections: HashMap<ConnectionId, ConnectionState>,
169 /// Active listeners indexed by their ID.
170 pub listeners: HashMap<ListenerId, ListenerState>,
171 /// Connections pending acceptance, indexed by address.
172 pub pending_connections: HashMap<String, ConnectionId>,
173
174 /// Write clog state (temporary write blocking).
175 pub connection_clogs: HashMap<ConnectionId, ClogState>,
176
177 /// Read clog state (temporary read blocking, symmetric with write clogging).
178 pub read_clogs: HashMap<ConnectionId, ClogState>,
179
180 /// Partitions between specific IP pairs (from, to) -> partition state
181 pub ip_partitions: HashMap<(IpAddr, IpAddr), PartitionState>,
182 /// Send partitions - IP cannot send to anyone
183 pub send_partitions: HashMap<IpAddr, Duration>,
184 /// Receive partitions - IP cannot receive from anyone
185 pub recv_partitions: HashMap<IpAddr, Duration>,
186
187 /// Last time a random close was triggered (global cooldown tracking)
188 /// FDB: g_simulator->lastConnectionFailure - see sim2.actor.cpp:583
189 pub last_random_close_time: Duration,
190
191 /// Per-IP-pair base latencies for consistent connection behavior.
192 /// Once set on first connection, all subsequent connections between the same
193 /// IP pair will use this base latency (with optional jitter on top).
194 pub pair_latencies: HashMap<(IpAddr, IpAddr), Duration>,
195}
196
197impl NetworkState {
198 /// Create a new network state with the given configuration.
199 pub fn new(config: NetworkConfiguration) -> Self {
200 Self {
201 next_connection_id: 0,
202 next_listener_id: 0,
203 config,
204 connections: HashMap::new(),
205 listeners: HashMap::new(),
206 pending_connections: HashMap::new(),
207 connection_clogs: HashMap::new(),
208 read_clogs: HashMap::new(),
209 ip_partitions: HashMap::new(),
210 send_partitions: HashMap::new(),
211 recv_partitions: HashMap::new(),
212 last_random_close_time: Duration::ZERO,
213 pair_latencies: HashMap::new(),
214 }
215 }
216
217 /// Extract IP address from a network address string.
218 /// Supports formats like "127.0.0.1:8080", "\[::1\]:8080", etc.
219 pub fn parse_ip_from_addr(addr: &str) -> Option<IpAddr> {
220 // Handle IPv6 addresses in brackets
221 if addr.starts_with('[')
222 && let Some(bracket_end) = addr.find(']')
223 {
224 return addr[1..bracket_end].parse().ok();
225 }
226
227 // Handle IPv4 addresses and unbracketed IPv6
228 if let Some(colon_pos) = addr.rfind(':') {
229 addr[..colon_pos].parse().ok()
230 } else {
231 addr.parse().ok()
232 }
233 }
234
235 /// Check if communication from source IP to destination IP is partitioned
236 pub fn is_partitioned(&self, from_ip: IpAddr, to_ip: IpAddr, current_time: Duration) -> bool {
237 // Check IP pair partition
238 if let Some(partition) = self.ip_partitions.get(&(from_ip, to_ip))
239 && current_time < partition.expires_at
240 {
241 return true;
242 }
243
244 // Check send partition
245 if let Some(&partition_until) = self.send_partitions.get(&from_ip)
246 && current_time < partition_until
247 {
248 return true;
249 }
250
251 // Check receive partition
252 if let Some(&partition_until) = self.recv_partitions.get(&to_ip)
253 && current_time < partition_until
254 {
255 return true;
256 }
257
258 false
259 }
260
261 /// Check if a connection is partitioned (cannot send messages)
262 pub fn is_connection_partitioned(
263 &self,
264 connection_id: ConnectionId,
265 current_time: Duration,
266 ) -> bool {
267 if let Some(conn) = self.connections.get(&connection_id)
268 && let (Some(local_ip), Some(remote_ip)) = (conn.local_ip, conn.remote_ip)
269 {
270 return self.is_partitioned(local_ip, remote_ip, current_time);
271 }
272 false
273 }
274}