moonpool_sim/sim/state.rs
1//! Network state management for simulation.
2//!
3//! This module provides internal state types for managing connections,
4//! listeners, partitions, and clogs in the simulation environment.
5
6use std::{
7 collections::{BTreeMap, HashSet, VecDeque},
8 net::IpAddr,
9 time::Duration,
10};
11
12use moonpool_core::OpenOptions;
13
14use crate::{
15 network::{
16 NetworkConfiguration,
17 sim::{ConnectionId, ListenerId},
18 },
19 storage::{InMemoryStorage, StorageConfiguration},
20};
21
22/// Simple clog state - just tracks when it expires
23#[derive(Debug)]
24pub struct ClogState {
25 /// When the clog expires and writes can resume (in simulation time)
26 pub expires_at: Duration,
27}
28
29/// Reason for connection closure - distinguishes FIN vs RST semantics.
30///
31/// In real TCP:
32/// - FIN (graceful close): Peer gets EOF on read, writes may still work briefly
33/// - RST (aborted close): Peer gets ECONNRESET immediately on both read and write
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
35pub enum CloseReason {
36 /// Connection is not closed
37 #[default]
38 None,
39 /// Graceful close (FIN) - peer will get EOF on read
40 Graceful,
41 /// Aborted close (RST) - peer will get ECONNRESET
42 Aborted,
43}
44
45/// Network partition state between two IP addresses
46#[derive(Debug, Clone)]
47pub struct PartitionState {
48 /// When the partition expires and connectivity is restored (in simulation time)
49 pub expires_at: Duration,
50}
51
52/// Internal connection state for simulation
53#[derive(Debug, Clone)]
54pub struct ConnectionState {
55 /// Unique identifier for this connection within the simulation.
56 #[allow(dead_code)]
57 pub id: ConnectionId,
58
59 /// Network address this connection is associated with.
60 #[allow(dead_code)]
61 pub addr: String,
62
63 /// Local IP address for this connection
64 pub local_ip: Option<IpAddr>,
65
66 /// Remote IP address for this connection
67 pub remote_ip: Option<IpAddr>,
68
69 /// Peer address as seen by this connection.
70 ///
71 /// FDB Pattern (sim2.actor.cpp:1149-1175):
72 /// - For client-side connections: the destination address (server's listening address)
73 /// - For server-side connections: synthesized ephemeral address (random IP + port 40000-60000)
74 ///
75 /// This simulates real TCP behavior where servers see client ephemeral ports,
76 /// not the client's identity address. As FDB notes: "In the case of an incoming
77 /// connection, this may not be an address we can connect to!"
78 pub peer_address: String,
79
80 /// FIFO buffer for incoming data that hasn't been read by the application yet.
81 pub receive_buffer: VecDeque<u8>,
82
83 /// Reference to the other end of this bidirectional TCP connection.
84 pub paired_connection: Option<ConnectionId>,
85
86 /// FIFO buffer for outgoing data waiting to be sent over the network.
87 pub send_buffer: VecDeque<Vec<u8>>,
88
89 /// Flag indicating whether a `ProcessSendBuffer` event is currently scheduled.
90 pub send_in_progress: bool,
91
92 /// Next available time for sending messages from this connection.
93 pub next_send_time: Duration,
94
95 /// Whether this connection has been permanently closed by one of the endpoints
96 pub is_closed: bool,
97
98 /// Whether the send side is closed (writes will fail) - for asymmetric closure
99 /// FDB: closeInternal() on self closes send capability
100 pub send_closed: bool,
101
102 /// Whether the receive side is closed (reads return EOF) - for asymmetric closure
103 /// FDB: closeInternal() on peer closes recv capability
104 pub recv_closed: bool,
105
106 /// Whether this connection is temporarily cut (will be restored).
107 /// Unlike `is_closed`, a cut connection can be restored after a duration.
108 /// This simulates temporary network outages where the connection is not
109 /// permanently severed but temporarily unavailable.
110 pub is_cut: bool,
111
112 /// When the cut expires and the connection is restored (in simulation time).
113 /// Only meaningful when `is_cut` is true.
114 pub cut_expiry: Option<Duration>,
115
116 /// Reason for connection closure - distinguishes FIN vs RST semantics.
117 /// When `is_closed` is true, this indicates whether it was graceful or aborted.
118 pub close_reason: CloseReason,
119
120 /// Send buffer capacity in bytes.
121 /// When the send buffer reaches this limit, poll_write returns Pending.
122 /// Calculated from BDP (Bandwidth-Delay Product): latency × bandwidth.
123 pub send_buffer_capacity: usize,
124
125 /// Per-connection send delay override (asymmetric latency).
126 /// If set, this delay is applied when sending data from this connection.
127 /// If None, the global write_latency from NetworkConfiguration is used.
128 pub send_delay: Option<Duration>,
129
130 /// Per-connection receive delay override (asymmetric latency).
131 /// If set, this delay is applied when receiving data on this connection.
132 /// If None, the global read_latency from NetworkConfiguration is used.
133 pub recv_delay: Option<Duration>,
134
135 /// Whether this connection is in a half-open state (peer crashed).
136 /// In this state:
137 /// - Local side still thinks it's connected
138 /// - Writes succeed but data is silently discarded
139 /// - Reads block waiting for data that will never come
140 /// - After `half_open_error_at`, errors start manifesting
141 pub is_half_open: bool,
142
143 /// When a half-open connection starts returning errors (in simulation time).
144 /// Before this time: writes succeed (data dropped), reads block.
145 /// After this time: both read and write return ECONNRESET.
146 pub half_open_error_at: Option<Duration>,
147
148 /// Whether this connection is stable (exempt from chaos).
149 ///
150 /// FDB ref: sim2.actor.cpp:357-362, 427, 440, 581-582 (stableConnection flag)
151 ///
152 /// Stable connections are exempt from:
153 /// - Random close (`roll_random_close`)
154 /// - Write clogging
155 /// - Read clogging
156 /// - Bit flip corruption
157 /// - Partial write truncation
158 ///
159 /// This is used for parent-child process connections or supervision channels
160 /// that should remain reliable even during chaos testing.
161 pub is_stable: bool,
162
163 /// Whether a graceful close (FIN) is pending delivery to the peer.
164 /// Set when close_connection is called while the send pipeline still has data.
165 /// The FIN is delivered after the last DataDelivery event.
166 pub graceful_close_pending: bool,
167
168 /// Time of the most recently scheduled DataDelivery event from this connection.
169 /// Used to ensure FinDelivery is scheduled after the last data arrives at the peer.
170 pub last_data_delivery_scheduled_at: Option<Duration>,
171
172 /// Whether a FIN has been received from the remote peer (graceful close completed).
173 /// Distinct from `recv_closed` which is for chaos/asymmetric closure (immediate EOF).
174 /// This flag allows the reader to drain the receive buffer before seeing EOF.
175 pub remote_fin_received: bool,
176}
177
178/// Internal listener state for simulation
179#[derive(Debug)]
180pub struct ListenerState {
181 /// Unique identifier for this listener.
182 #[allow(dead_code)]
183 pub id: ListenerId,
184 /// Network address this listener is bound to.
185 #[allow(dead_code)]
186 pub addr: String,
187 /// Queue of pending connections waiting to be accepted.
188 #[allow(dead_code)]
189 pub pending_connections: VecDeque<ConnectionId>,
190}
191
192/// Network-related state management
193#[derive(Debug)]
194pub struct NetworkState {
195 /// Counter for generating unique connection IDs.
196 pub next_connection_id: u64,
197 /// Counter for generating unique listener IDs.
198 pub next_listener_id: u64,
199 /// Network configuration for this simulation.
200 pub config: NetworkConfiguration,
201 /// Active connections indexed by their ID.
202 pub connections: BTreeMap<ConnectionId, ConnectionState>,
203 /// Active listeners indexed by their ID.
204 pub listeners: BTreeMap<ListenerId, ListenerState>,
205 /// Connections pending acceptance, indexed by address.
206 pub pending_connections: BTreeMap<String, ConnectionId>,
207
208 /// Write clog state (temporary write blocking).
209 pub connection_clogs: BTreeMap<ConnectionId, ClogState>,
210
211 /// Read clog state (temporary read blocking, symmetric with write clogging).
212 pub read_clogs: BTreeMap<ConnectionId, ClogState>,
213
214 /// Partitions between specific IP pairs (from, to) -> partition state
215 pub ip_partitions: BTreeMap<(IpAddr, IpAddr), PartitionState>,
216 /// Send partitions - IP cannot send to anyone
217 pub send_partitions: BTreeMap<IpAddr, Duration>,
218 /// Receive partitions - IP cannot receive from anyone
219 pub recv_partitions: BTreeMap<IpAddr, Duration>,
220
221 /// Last time a random close was triggered (global cooldown tracking)
222 /// FDB: g_simulator->lastConnectionFailure - see sim2.actor.cpp:583
223 pub last_random_close_time: Duration,
224
225 /// Per-IP-pair base latencies for consistent connection behavior.
226 /// Once set on first connection, all subsequent connections between the same
227 /// IP pair will use this base latency (with optional jitter on top).
228 pub pair_latencies: BTreeMap<(IpAddr, IpAddr), Duration>,
229}
230
231impl NetworkState {
232 /// Create a new network state with the given configuration.
233 pub fn new(config: NetworkConfiguration) -> Self {
234 Self {
235 next_connection_id: 0,
236 next_listener_id: 0,
237 config,
238 connections: BTreeMap::new(),
239 listeners: BTreeMap::new(),
240 pending_connections: BTreeMap::new(),
241 connection_clogs: BTreeMap::new(),
242 read_clogs: BTreeMap::new(),
243 ip_partitions: BTreeMap::new(),
244 send_partitions: BTreeMap::new(),
245 recv_partitions: BTreeMap::new(),
246 last_random_close_time: Duration::ZERO,
247 pair_latencies: BTreeMap::new(),
248 }
249 }
250
251 /// Extract IP address from a network address string.
252 /// Supports formats like "127.0.0.1:8080", "\[::1\]:8080", etc.
253 pub fn parse_ip_from_addr(addr: &str) -> Option<IpAddr> {
254 // Handle IPv6 addresses in brackets
255 if addr.starts_with('[')
256 && let Some(bracket_end) = addr.find(']')
257 {
258 return addr[1..bracket_end].parse().ok();
259 }
260
261 // Handle IPv4 addresses and unbracketed IPv6
262 if let Some(colon_pos) = addr.rfind(':') {
263 addr[..colon_pos].parse().ok()
264 } else {
265 addr.parse().ok()
266 }
267 }
268
269 /// Check if communication from source IP to destination IP is partitioned
270 pub fn is_partitioned(&self, from_ip: IpAddr, to_ip: IpAddr, current_time: Duration) -> bool {
271 // Check IP pair partition
272 if let Some(partition) = self.ip_partitions.get(&(from_ip, to_ip))
273 && current_time < partition.expires_at
274 {
275 return true;
276 }
277
278 // Check send partition
279 if let Some(&partition_until) = self.send_partitions.get(&from_ip)
280 && current_time < partition_until
281 {
282 return true;
283 }
284
285 // Check receive partition
286 if let Some(&partition_until) = self.recv_partitions.get(&to_ip)
287 && current_time < partition_until
288 {
289 return true;
290 }
291
292 false
293 }
294
295 /// Check if a connection is partitioned (cannot send messages)
296 pub fn is_connection_partitioned(
297 &self,
298 connection_id: ConnectionId,
299 current_time: Duration,
300 ) -> bool {
301 if let Some(conn) = self.connections.get(&connection_id)
302 && let (Some(local_ip), Some(remote_ip)) = (conn.local_ip, conn.remote_ip)
303 {
304 return self.is_partitioned(local_ip, remote_ip, current_time);
305 }
306 false
307 }
308}
309
310// =============================================================================
311// Storage State Types
312// =============================================================================
313
314/// Unique identifier for a simulated file within the simulation.
315#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
316pub struct FileId(pub u64);
317
318/// Type of pending storage operation.
319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
320pub enum PendingOpType {
321 /// Read operation
322 Read,
323 /// Write operation
324 Write,
325 /// Sync/flush operation
326 Sync,
327 /// Set file length operation
328 SetLen,
329 /// File open operation
330 Open,
331}
332
333/// A pending storage operation awaiting completion.
334#[derive(Debug, Clone)]
335pub struct PendingStorageOp {
336 /// Type of the operation
337 pub op_type: PendingOpType,
338 /// Offset within the file (for read/write)
339 pub offset: u64,
340 /// Length of the operation in bytes
341 pub len: usize,
342 /// Data for write operations
343 pub data: Option<Vec<u8>>,
344}
345
346/// State of an individual simulated file.
347#[derive(Debug)]
348pub struct StorageFileState {
349 /// Unique identifier for this file
350 pub id: FileId,
351 /// Path this file was opened with
352 pub path: String,
353 /// Current file position for sequential operations
354 pub position: u64,
355 /// Options the file was opened with
356 pub options: OpenOptions,
357 /// In-memory storage backing this file
358 pub storage: InMemoryStorage,
359 /// Whether the file has been closed
360 pub is_closed: bool,
361 /// Pending operations keyed by sequence number
362 pub pending_ops: BTreeMap<u64, PendingStorageOp>,
363 /// Next sequence number for operations on this file
364 pub next_op_seq: u64,
365}
366
367impl StorageFileState {
368 /// Create a new storage file state.
369 pub fn new(id: FileId, path: String, options: OpenOptions, storage: InMemoryStorage) -> Self {
370 Self {
371 id,
372 path,
373 position: 0,
374 options,
375 storage,
376 is_closed: false,
377 pending_ops: BTreeMap::new(),
378 next_op_seq: 0,
379 }
380 }
381}
382
383/// Storage-related state management for the simulation.
384#[derive(Debug)]
385pub struct StorageState {
386 /// Counter for generating unique file IDs
387 pub next_file_id: u64,
388 /// Storage configuration for latencies and fault injection
389 pub config: StorageConfiguration,
390 /// Active files indexed by their ID
391 pub files: BTreeMap<FileId, StorageFileState>,
392 /// Mapping from path to file ID for quick lookup
393 pub path_to_file: BTreeMap<String, FileId>,
394 /// Set of paths that have been deleted (for create_new semantics)
395 pub deleted_paths: HashSet<String>,
396 /// Set of (file_id, op_seq) pairs for sync operations that failed
397 pub sync_failures: HashSet<(FileId, u64)>,
398}
399
400impl StorageState {
401 /// Create a new storage state with the given configuration.
402 pub fn new(config: StorageConfiguration) -> Self {
403 Self {
404 next_file_id: 0,
405 config,
406 files: BTreeMap::new(),
407 path_to_file: BTreeMap::new(),
408 deleted_paths: HashSet::new(),
409 sync_failures: HashSet::new(),
410 }
411 }
412}
413
414impl Default for StorageState {
415 fn default() -> Self {
416 Self::new(StorageConfiguration::default())
417 }
418}