Skip to main content

moonpool_sim/sim/
state.rs

1//! Network state management for simulation.
2//!
3//! This module provides internal state types for managing connections,
4//! listeners, partitions, and clogs in the simulation environment.
5
6use std::{
7    collections::{BTreeMap, HashSet, VecDeque},
8    net::IpAddr,
9    time::Duration,
10};
11
12use moonpool_core::OpenOptions;
13
14use crate::{
15    network::{
16        NetworkConfiguration,
17        sim::{ConnectionId, ListenerId},
18    },
19    storage::{InMemoryStorage, StorageConfiguration},
20};
21
22/// Simple clog state - just tracks when it expires
23#[derive(Debug)]
24pub struct ClogState {
25    /// When the clog expires and writes can resume (in simulation time)
26    pub expires_at: Duration,
27}
28
29/// Reason for connection closure - distinguishes FIN vs RST semantics.
30///
31/// In real TCP:
32/// - FIN (graceful close): Peer gets EOF on read, writes may still work briefly
33/// - RST (aborted close): Peer gets ECONNRESET immediately on both read and write
34#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
35pub enum CloseReason {
36    /// Connection is not closed
37    #[default]
38    None,
39    /// Graceful close (FIN) - peer will get EOF on read
40    Graceful,
41    /// Aborted close (RST) - peer will get ECONNRESET
42    Aborted,
43}
44
45/// Network partition state between two IP addresses
46#[derive(Debug, Clone)]
47pub struct PartitionState {
48    /// When the partition expires and connectivity is restored (in simulation time)
49    pub expires_at: Duration,
50}
51
52/// Internal connection state for simulation
53#[derive(Debug, Clone)]
54pub struct ConnectionState {
55    /// Unique identifier for this connection within the simulation.
56    #[allow(dead_code)]
57    pub id: ConnectionId,
58
59    /// Network address this connection is associated with.
60    #[allow(dead_code)]
61    pub addr: String,
62
63    /// Local IP address for this connection
64    pub local_ip: Option<IpAddr>,
65
66    /// Remote IP address for this connection
67    pub remote_ip: Option<IpAddr>,
68
69    /// Peer address as seen by this connection.
70    ///
71    /// FDB Pattern (sim2.actor.cpp:1149-1175):
72    /// - For client-side connections: the destination address (server's listening address)
73    /// - For server-side connections: synthesized ephemeral address (random IP + port 40000-60000)
74    ///
75    /// This simulates real TCP behavior where servers see client ephemeral ports,
76    /// not the client's identity address. As FDB notes: "In the case of an incoming
77    /// connection, this may not be an address we can connect to!"
78    pub peer_address: String,
79
80    /// FIFO buffer for incoming data that hasn't been read by the application yet.
81    pub receive_buffer: VecDeque<u8>,
82
83    /// Reference to the other end of this bidirectional TCP connection.
84    pub paired_connection: Option<ConnectionId>,
85
86    /// FIFO buffer for outgoing data waiting to be sent over the network.
87    pub send_buffer: VecDeque<Vec<u8>>,
88
89    /// Flag indicating whether a `ProcessSendBuffer` event is currently scheduled.
90    pub send_in_progress: bool,
91
92    /// Next available time for sending messages from this connection.
93    pub next_send_time: Duration,
94
95    /// Whether this connection has been permanently closed by one of the endpoints
96    pub is_closed: bool,
97
98    /// Whether the send side is closed (writes will fail) - for asymmetric closure
99    /// FDB: closeInternal() on self closes send capability
100    pub send_closed: bool,
101
102    /// Whether the receive side is closed (reads return EOF) - for asymmetric closure
103    /// FDB: closeInternal() on peer closes recv capability
104    pub recv_closed: bool,
105
106    /// Whether this connection is temporarily cut (will be restored).
107    /// Unlike `is_closed`, a cut connection can be restored after a duration.
108    /// This simulates temporary network outages where the connection is not
109    /// permanently severed but temporarily unavailable.
110    pub is_cut: bool,
111
112    /// When the cut expires and the connection is restored (in simulation time).
113    /// Only meaningful when `is_cut` is true.
114    pub cut_expiry: Option<Duration>,
115
116    /// Reason for connection closure - distinguishes FIN vs RST semantics.
117    /// When `is_closed` is true, this indicates whether it was graceful or aborted.
118    pub close_reason: CloseReason,
119
120    /// Send buffer capacity in bytes.
121    /// When the send buffer reaches this limit, poll_write returns Pending.
122    /// Calculated from BDP (Bandwidth-Delay Product): latency × bandwidth.
123    pub send_buffer_capacity: usize,
124
125    /// Per-connection send delay override (asymmetric latency).
126    /// If set, this delay is applied when sending data from this connection.
127    /// If None, the global write_latency from NetworkConfiguration is used.
128    pub send_delay: Option<Duration>,
129
130    /// Per-connection receive delay override (asymmetric latency).
131    /// If set, this delay is applied when receiving data on this connection.
132    /// If None, the global read_latency from NetworkConfiguration is used.
133    pub recv_delay: Option<Duration>,
134
135    /// Whether this connection is in a half-open state (peer crashed).
136    /// In this state:
137    /// - Local side still thinks it's connected
138    /// - Writes succeed but data is silently discarded
139    /// - Reads block waiting for data that will never come
140    /// - After `half_open_error_at`, errors start manifesting
141    pub is_half_open: bool,
142
143    /// When a half-open connection starts returning errors (in simulation time).
144    /// Before this time: writes succeed (data dropped), reads block.
145    /// After this time: both read and write return ECONNRESET.
146    pub half_open_error_at: Option<Duration>,
147
148    /// Whether this connection is stable (exempt from chaos).
149    ///
150    /// FDB ref: sim2.actor.cpp:357-362, 427, 440, 581-582 (stableConnection flag)
151    ///
152    /// Stable connections are exempt from:
153    /// - Random close (`roll_random_close`)
154    /// - Write clogging
155    /// - Read clogging
156    /// - Bit flip corruption
157    /// - Partial write truncation
158    ///
159    /// This is used for parent-child process connections or supervision channels
160    /// that should remain reliable even during chaos testing.
161    pub is_stable: bool,
162
163    /// Whether a graceful close (FIN) is pending delivery to the peer.
164    /// Set when close_connection is called while the send pipeline still has data.
165    /// The FIN is delivered after the last DataDelivery event.
166    pub graceful_close_pending: bool,
167
168    /// Time of the most recently scheduled DataDelivery event from this connection.
169    /// Used to ensure FinDelivery is scheduled after the last data arrives at the peer.
170    pub last_data_delivery_scheduled_at: Option<Duration>,
171
172    /// Whether a FIN has been received from the remote peer (graceful close completed).
173    /// Distinct from `recv_closed` which is for chaos/asymmetric closure (immediate EOF).
174    /// This flag allows the reader to drain the receive buffer before seeing EOF.
175    pub remote_fin_received: bool,
176}
177
178/// Internal listener state for simulation
179#[derive(Debug)]
180pub struct ListenerState {
181    /// Unique identifier for this listener.
182    #[allow(dead_code)]
183    pub id: ListenerId,
184    /// Network address this listener is bound to.
185    #[allow(dead_code)]
186    pub addr: String,
187    /// Queue of pending connections waiting to be accepted.
188    #[allow(dead_code)]
189    pub pending_connections: VecDeque<ConnectionId>,
190}
191
192/// Network-related state management
193#[derive(Debug)]
194pub struct NetworkState {
195    /// Counter for generating unique connection IDs.
196    pub next_connection_id: u64,
197    /// Counter for generating unique listener IDs.
198    pub next_listener_id: u64,
199    /// Network configuration for this simulation.
200    pub config: NetworkConfiguration,
201    /// Active connections indexed by their ID.
202    pub connections: BTreeMap<ConnectionId, ConnectionState>,
203    /// Active listeners indexed by their ID.
204    pub listeners: BTreeMap<ListenerId, ListenerState>,
205    /// Connections pending acceptance, indexed by address.
206    pub pending_connections: BTreeMap<String, ConnectionId>,
207
208    /// Write clog state (temporary write blocking).
209    pub connection_clogs: BTreeMap<ConnectionId, ClogState>,
210
211    /// Read clog state (temporary read blocking, symmetric with write clogging).
212    pub read_clogs: BTreeMap<ConnectionId, ClogState>,
213
214    /// Partitions between specific IP pairs (from, to) -> partition state
215    pub ip_partitions: BTreeMap<(IpAddr, IpAddr), PartitionState>,
216    /// Send partitions - IP cannot send to anyone
217    pub send_partitions: BTreeMap<IpAddr, Duration>,
218    /// Receive partitions - IP cannot receive from anyone
219    pub recv_partitions: BTreeMap<IpAddr, Duration>,
220
221    /// Last time a random close was triggered (global cooldown tracking)
222    /// FDB: g_simulator->lastConnectionFailure - see sim2.actor.cpp:583
223    pub last_random_close_time: Duration,
224
225    /// Per-IP-pair base latencies for consistent connection behavior.
226    /// Once set on first connection, all subsequent connections between the same
227    /// IP pair will use this base latency (with optional jitter on top).
228    pub pair_latencies: BTreeMap<(IpAddr, IpAddr), Duration>,
229}
230
231impl NetworkState {
232    /// Create a new network state with the given configuration.
233    pub fn new(config: NetworkConfiguration) -> Self {
234        Self {
235            next_connection_id: 0,
236            next_listener_id: 0,
237            config,
238            connections: BTreeMap::new(),
239            listeners: BTreeMap::new(),
240            pending_connections: BTreeMap::new(),
241            connection_clogs: BTreeMap::new(),
242            read_clogs: BTreeMap::new(),
243            ip_partitions: BTreeMap::new(),
244            send_partitions: BTreeMap::new(),
245            recv_partitions: BTreeMap::new(),
246            last_random_close_time: Duration::ZERO,
247            pair_latencies: BTreeMap::new(),
248        }
249    }
250
251    /// Extract IP address from a network address string.
252    /// Supports formats like "127.0.0.1:8080", "\[::1\]:8080", etc.
253    pub fn parse_ip_from_addr(addr: &str) -> Option<IpAddr> {
254        // Handle IPv6 addresses in brackets
255        if addr.starts_with('[')
256            && let Some(bracket_end) = addr.find(']')
257        {
258            return addr[1..bracket_end].parse().ok();
259        }
260
261        // Handle IPv4 addresses and unbracketed IPv6
262        if let Some(colon_pos) = addr.rfind(':') {
263            addr[..colon_pos].parse().ok()
264        } else {
265            addr.parse().ok()
266        }
267    }
268
269    /// Check if communication from source IP to destination IP is partitioned
270    pub fn is_partitioned(&self, from_ip: IpAddr, to_ip: IpAddr, current_time: Duration) -> bool {
271        // Check IP pair partition
272        if let Some(partition) = self.ip_partitions.get(&(from_ip, to_ip))
273            && current_time < partition.expires_at
274        {
275            return true;
276        }
277
278        // Check send partition
279        if let Some(&partition_until) = self.send_partitions.get(&from_ip)
280            && current_time < partition_until
281        {
282            return true;
283        }
284
285        // Check receive partition
286        if let Some(&partition_until) = self.recv_partitions.get(&to_ip)
287            && current_time < partition_until
288        {
289            return true;
290        }
291
292        false
293    }
294
295    /// Check if a connection is partitioned (cannot send messages)
296    pub fn is_connection_partitioned(
297        &self,
298        connection_id: ConnectionId,
299        current_time: Duration,
300    ) -> bool {
301        if let Some(conn) = self.connections.get(&connection_id)
302            && let (Some(local_ip), Some(remote_ip)) = (conn.local_ip, conn.remote_ip)
303        {
304            return self.is_partitioned(local_ip, remote_ip, current_time);
305        }
306        false
307    }
308}
309
310// =============================================================================
311// Storage State Types
312// =============================================================================
313
314/// Unique identifier for a simulated file within the simulation.
315#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
316pub struct FileId(pub u64);
317
318/// Type of pending storage operation.
319#[derive(Debug, Clone, Copy, PartialEq, Eq)]
320pub enum PendingOpType {
321    /// Read operation
322    Read,
323    /// Write operation
324    Write,
325    /// Sync/flush operation
326    Sync,
327    /// Set file length operation
328    SetLen,
329    /// File open operation
330    Open,
331}
332
333/// A pending storage operation awaiting completion.
334#[derive(Debug, Clone)]
335pub struct PendingStorageOp {
336    /// Type of the operation
337    pub op_type: PendingOpType,
338    /// Offset within the file (for read/write)
339    pub offset: u64,
340    /// Length of the operation in bytes
341    pub len: usize,
342    /// Data for write operations
343    pub data: Option<Vec<u8>>,
344}
345
346/// State of an individual simulated file.
347#[derive(Debug)]
348pub struct StorageFileState {
349    /// Unique identifier for this file
350    pub id: FileId,
351    /// Path this file was opened with
352    pub path: String,
353    /// Current file position for sequential operations
354    pub position: u64,
355    /// Options the file was opened with
356    pub options: OpenOptions,
357    /// In-memory storage backing this file
358    pub storage: InMemoryStorage,
359    /// Whether the file has been closed
360    pub is_closed: bool,
361    /// Pending operations keyed by sequence number
362    pub pending_ops: BTreeMap<u64, PendingStorageOp>,
363    /// Next sequence number for operations on this file
364    pub next_op_seq: u64,
365}
366
367impl StorageFileState {
368    /// Create a new storage file state.
369    pub fn new(id: FileId, path: String, options: OpenOptions, storage: InMemoryStorage) -> Self {
370        Self {
371            id,
372            path,
373            position: 0,
374            options,
375            storage,
376            is_closed: false,
377            pending_ops: BTreeMap::new(),
378            next_op_seq: 0,
379        }
380    }
381}
382
383/// Storage-related state management for the simulation.
384#[derive(Debug)]
385pub struct StorageState {
386    /// Counter for generating unique file IDs
387    pub next_file_id: u64,
388    /// Storage configuration for latencies and fault injection
389    pub config: StorageConfiguration,
390    /// Active files indexed by their ID
391    pub files: BTreeMap<FileId, StorageFileState>,
392    /// Mapping from path to file ID for quick lookup
393    pub path_to_file: BTreeMap<String, FileId>,
394    /// Set of paths that have been deleted (for create_new semantics)
395    pub deleted_paths: HashSet<String>,
396    /// Set of (file_id, op_seq) pairs for sync operations that failed
397    pub sync_failures: HashSet<(FileId, u64)>,
398}
399
400impl StorageState {
401    /// Create a new storage state with the given configuration.
402    pub fn new(config: StorageConfiguration) -> Self {
403        Self {
404            next_file_id: 0,
405            config,
406            files: BTreeMap::new(),
407            path_to_file: BTreeMap::new(),
408            deleted_paths: HashSet::new(),
409            sync_failures: HashSet::new(),
410        }
411    }
412}
413
414impl Default for StorageState {
415    fn default() -> Self {
416        Self::new(StorageConfiguration::default())
417    }
418}