moonpool-sim 0.6.0

Simulation engine for the moonpool framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
//! Network state management for simulation.
//!
//! This module provides internal state types for managing connections,
//! listeners, partitions, and clogs in the simulation environment.

use std::{
    collections::{BTreeMap, HashMap, HashSet, VecDeque},
    net::IpAddr,
    time::Duration,
};

use moonpool_core::OpenOptions;

use crate::{
    network::{
        NetworkConfiguration,
        sim::{ConnectionId, ListenerId},
    },
    storage::{InMemoryStorage, StorageConfiguration},
};

/// Simple clog state - just tracks when it expires
#[derive(Debug)]
pub struct ClogState {
    /// When the clog expires and writes can resume (in simulation time)
    pub expires_at: Duration,
}

/// Reason for connection closure - distinguishes FIN vs RST semantics.
///
/// In real TCP:
/// - FIN (graceful close): Peer gets EOF on read, writes may still work briefly
/// - RST (aborted close): Peer gets ECONNRESET immediately on both read and write
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
pub enum CloseReason {
    /// Connection is not closed
    #[default]
    None,
    /// Graceful close (FIN) - peer will get EOF on read
    Graceful,
    /// Aborted close (RST) - peer will get ECONNRESET
    Aborted,
}

/// Network partition state between two IP addresses
#[derive(Debug, Clone)]
pub struct PartitionState {
    /// When the partition expires and connectivity is restored (in simulation time)
    pub expires_at: Duration,
}

/// Internal connection state for simulation
#[derive(Debug, Clone)]
pub struct ConnectionState {
    /// Unique identifier for this connection within the simulation.
    #[allow(dead_code)]
    pub id: ConnectionId,

    /// Network address this connection is associated with.
    #[allow(dead_code)]
    pub addr: String,

    /// Local IP address for this connection
    pub local_ip: Option<IpAddr>,

    /// Remote IP address for this connection
    pub remote_ip: Option<IpAddr>,

    /// Peer address as seen by this connection.
    ///
    /// FDB Pattern (sim2.actor.cpp:1149-1175):
    /// - For client-side connections: the destination address (server's listening address)
    /// - For server-side connections: synthesized ephemeral address (random IP + port 40000-60000)
    ///
    /// This simulates real TCP behavior where servers see client ephemeral ports,
    /// not the client's identity address. As FDB notes: "In the case of an incoming
    /// connection, this may not be an address we can connect to!"
    pub peer_address: String,

    /// FIFO buffer for incoming data that hasn't been read by the application yet.
    pub receive_buffer: VecDeque<u8>,

    /// Reference to the other end of this bidirectional TCP connection.
    pub paired_connection: Option<ConnectionId>,

    /// FIFO buffer for outgoing data waiting to be sent over the network.
    pub send_buffer: VecDeque<Vec<u8>>,

    /// Flag indicating whether a `ProcessSendBuffer` event is currently scheduled.
    pub send_in_progress: bool,

    /// Next available time for sending messages from this connection.
    pub next_send_time: Duration,

    /// Whether this connection has been permanently closed by one of the endpoints
    pub is_closed: bool,

    /// Whether the send side is closed (writes will fail) - for asymmetric closure
    /// FDB: closeInternal() on self closes send capability
    pub send_closed: bool,

    /// Whether the receive side is closed (reads return EOF) - for asymmetric closure
    /// FDB: closeInternal() on peer closes recv capability
    pub recv_closed: bool,

    /// Whether this connection is temporarily cut (will be restored).
    /// Unlike `is_closed`, a cut connection can be restored after a duration.
    /// This simulates temporary network outages where the connection is not
    /// permanently severed but temporarily unavailable.
    pub is_cut: bool,

    /// When the cut expires and the connection is restored (in simulation time).
    /// Only meaningful when `is_cut` is true.
    pub cut_expiry: Option<Duration>,

    /// Reason for connection closure - distinguishes FIN vs RST semantics.
    /// When `is_closed` is true, this indicates whether it was graceful or aborted.
    pub close_reason: CloseReason,

    /// Send buffer capacity in bytes.
    /// When the send buffer reaches this limit, poll_write returns Pending.
    /// Calculated from BDP (Bandwidth-Delay Product): latency × bandwidth.
    pub send_buffer_capacity: usize,

    /// Per-connection send delay override (asymmetric latency).
    /// If set, this delay is applied when sending data from this connection.
    /// If None, the global write_latency from NetworkConfiguration is used.
    pub send_delay: Option<Duration>,

    /// Per-connection receive delay override (asymmetric latency).
    /// If set, this delay is applied when receiving data on this connection.
    /// If None, the global read_latency from NetworkConfiguration is used.
    pub recv_delay: Option<Duration>,

    /// Whether this connection is in a half-open state (peer crashed).
    /// In this state:
    /// - Local side still thinks it's connected
    /// - Writes succeed but data is silently discarded
    /// - Reads block waiting for data that will never come
    /// - After `half_open_error_at`, errors start manifesting
    pub is_half_open: bool,

    /// When a half-open connection starts returning errors (in simulation time).
    /// Before this time: writes succeed (data dropped), reads block.
    /// After this time: both read and write return ECONNRESET.
    pub half_open_error_at: Option<Duration>,

    /// Whether this connection is stable (exempt from chaos).
    ///
    /// FDB ref: sim2.actor.cpp:357-362, 427, 440, 581-582 (stableConnection flag)
    ///
    /// Stable connections are exempt from:
    /// - Random close (`roll_random_close`)
    /// - Write clogging
    /// - Read clogging
    /// - Bit flip corruption
    /// - Partial write truncation
    ///
    /// This is used for parent-child process connections or supervision channels
    /// that should remain reliable even during chaos testing.
    pub is_stable: bool,

    /// Whether a graceful close (FIN) is pending delivery to the peer.
    /// Set when close_connection is called while the send pipeline still has data.
    /// The FIN is delivered after the last DataDelivery event.
    pub graceful_close_pending: bool,

    /// Time of the most recently scheduled DataDelivery event from this connection.
    /// Used to ensure FinDelivery is scheduled after the last data arrives at the peer.
    pub last_data_delivery_scheduled_at: Option<Duration>,

    /// Whether a FIN has been received from the remote peer (graceful close completed).
    /// Distinct from `recv_closed` which is for chaos/asymmetric closure (immediate EOF).
    /// This flag allows the reader to drain the receive buffer before seeing EOF.
    pub remote_fin_received: bool,
}

/// Internal listener state for simulation
#[derive(Debug)]
pub struct ListenerState {
    /// Unique identifier for this listener.
    #[allow(dead_code)]
    pub id: ListenerId,
    /// Network address this listener is bound to.
    #[allow(dead_code)]
    pub addr: String,
    /// Queue of pending connections waiting to be accepted.
    #[allow(dead_code)]
    pub pending_connections: VecDeque<ConnectionId>,
}

/// Network-related state management
#[derive(Debug)]
pub struct NetworkState {
    /// Counter for generating unique connection IDs.
    pub next_connection_id: u64,
    /// Counter for generating unique listener IDs.
    pub next_listener_id: u64,
    /// Network configuration for this simulation.
    pub config: NetworkConfiguration,
    /// Active connections indexed by their ID.
    pub connections: BTreeMap<ConnectionId, ConnectionState>,
    /// Active listeners indexed by their ID.
    pub listeners: BTreeMap<ListenerId, ListenerState>,
    /// Connections pending acceptance, indexed by address.
    pub pending_connections: BTreeMap<String, ConnectionId>,

    /// Write clog state (temporary write blocking).
    pub connection_clogs: BTreeMap<ConnectionId, ClogState>,

    /// Read clog state (temporary read blocking, symmetric with write clogging).
    pub read_clogs: BTreeMap<ConnectionId, ClogState>,

    /// Partitions between specific IP pairs (from, to) -> partition state
    pub ip_partitions: BTreeMap<(IpAddr, IpAddr), PartitionState>,
    /// Send partitions - IP cannot send to anyone
    pub send_partitions: BTreeMap<IpAddr, Duration>,
    /// Receive partitions - IP cannot receive from anyone
    pub recv_partitions: BTreeMap<IpAddr, Duration>,

    /// Last time a random close was triggered (global cooldown tracking)
    /// FDB: g_simulator->lastConnectionFailure - see sim2.actor.cpp:583
    pub last_random_close_time: Duration,

    /// Per-IP-pair base latencies for consistent connection behavior.
    /// Once set on first connection, all subsequent connections between the same
    /// IP pair will use this base latency (with optional jitter on top).
    pub pair_latencies: BTreeMap<(IpAddr, IpAddr), Duration>,
}

impl NetworkState {
    /// Create a new network state with the given configuration.
    pub fn new(config: NetworkConfiguration) -> Self {
        Self {
            next_connection_id: 0,
            next_listener_id: 0,
            config,
            connections: BTreeMap::new(),
            listeners: BTreeMap::new(),
            pending_connections: BTreeMap::new(),
            connection_clogs: BTreeMap::new(),
            read_clogs: BTreeMap::new(),
            ip_partitions: BTreeMap::new(),
            send_partitions: BTreeMap::new(),
            recv_partitions: BTreeMap::new(),
            last_random_close_time: Duration::ZERO,
            pair_latencies: BTreeMap::new(),
        }
    }

    /// Extract IP address from a network address string.
    /// Supports formats like "127.0.0.1:8080", "\[::1\]:8080", etc.
    pub fn parse_ip_from_addr(addr: &str) -> Option<IpAddr> {
        // Handle IPv6 addresses in brackets
        if addr.starts_with('[')
            && let Some(bracket_end) = addr.find(']')
        {
            return addr[1..bracket_end].parse().ok();
        }

        // Handle IPv4 addresses and unbracketed IPv6
        if let Some(colon_pos) = addr.rfind(':') {
            addr[..colon_pos].parse().ok()
        } else {
            addr.parse().ok()
        }
    }

    /// Check if communication from source IP to destination IP is partitioned
    pub fn is_partitioned(&self, from_ip: IpAddr, to_ip: IpAddr, current_time: Duration) -> bool {
        // Check IP pair partition
        if let Some(partition) = self.ip_partitions.get(&(from_ip, to_ip))
            && current_time < partition.expires_at
        {
            return true;
        }

        // Check send partition
        if let Some(&partition_until) = self.send_partitions.get(&from_ip)
            && current_time < partition_until
        {
            return true;
        }

        // Check receive partition
        if let Some(&partition_until) = self.recv_partitions.get(&to_ip)
            && current_time < partition_until
        {
            return true;
        }

        false
    }

    /// Check if a connection is partitioned (cannot send messages)
    pub fn is_connection_partitioned(
        &self,
        connection_id: ConnectionId,
        current_time: Duration,
    ) -> bool {
        if let Some(conn) = self.connections.get(&connection_id)
            && let (Some(local_ip), Some(remote_ip)) = (conn.local_ip, conn.remote_ip)
        {
            return self.is_partitioned(local_ip, remote_ip, current_time);
        }
        false
    }
}

// =============================================================================
// Storage State Types
// =============================================================================

/// Unique identifier for a simulated file within the simulation.
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub struct FileId(pub u64);

/// Type of pending storage operation.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PendingOpType {
    /// Read operation
    Read,
    /// Write operation
    Write,
    /// Sync/flush operation
    Sync,
    /// Set file length operation
    SetLen,
    /// File open operation
    Open,
}

/// A pending storage operation awaiting completion.
#[derive(Debug, Clone)]
pub struct PendingStorageOp {
    /// Type of the operation
    pub op_type: PendingOpType,
    /// Offset within the file (for read/write)
    pub offset: u64,
    /// Length of the operation in bytes
    pub len: usize,
    /// Data for write operations
    pub data: Option<Vec<u8>>,
}

/// State of an individual simulated file.
#[derive(Debug)]
pub struct StorageFileState {
    /// Unique identifier for this file
    pub id: FileId,
    /// Path this file was opened with
    pub path: String,
    /// Current file position for sequential operations
    pub position: u64,
    /// Options the file was opened with
    pub options: OpenOptions,
    /// In-memory storage backing this file
    pub storage: InMemoryStorage,
    /// Whether the file has been closed
    pub is_closed: bool,
    /// Pending operations keyed by sequence number
    pub pending_ops: BTreeMap<u64, PendingStorageOp>,
    /// Next sequence number for operations on this file
    pub next_op_seq: u64,
    /// IP address of the process that owns this file.
    pub owner_ip: IpAddr,
}

impl StorageFileState {
    /// Create a new storage file state.
    pub fn new(
        id: FileId,
        path: String,
        options: OpenOptions,
        storage: InMemoryStorage,
        owner_ip: IpAddr,
    ) -> Self {
        Self {
            id,
            path,
            position: 0,
            options,
            storage,
            is_closed: false,
            pending_ops: BTreeMap::new(),
            next_op_seq: 0,
            owner_ip,
        }
    }
}

/// Storage-related state management for the simulation.
#[derive(Debug)]
pub struct StorageState {
    /// Counter for generating unique file IDs
    pub next_file_id: u64,
    /// Default storage configuration (used when no per-process config is set)
    pub config: StorageConfiguration,
    /// Per-process storage configurations, keyed by IP address.
    ///
    /// When a file's owner IP has an entry here, that config is used for
    /// fault injection and latency calculations instead of the global default.
    pub per_process_configs: HashMap<IpAddr, StorageConfiguration>,
    /// Active files indexed by their ID
    pub files: BTreeMap<FileId, StorageFileState>,
    /// Mapping from path to file ID for quick lookup
    pub path_to_file: BTreeMap<String, FileId>,
    /// Set of paths that have been deleted (for create_new semantics)
    pub deleted_paths: HashSet<String>,
    /// Set of (file_id, op_seq) pairs for sync operations that failed
    pub sync_failures: HashSet<(FileId, u64)>,
}

impl StorageState {
    /// Create a new storage state with the given configuration.
    pub fn new(config: StorageConfiguration) -> Self {
        Self {
            next_file_id: 0,
            config,
            per_process_configs: HashMap::new(),
            files: BTreeMap::new(),
            path_to_file: BTreeMap::new(),
            deleted_paths: HashSet::new(),
            sync_failures: HashSet::new(),
        }
    }

    /// Resolve storage configuration for a given IP address.
    ///
    /// Returns the per-process config if one is set, otherwise the global default.
    pub fn config_for(&self, ip: IpAddr) -> &StorageConfiguration {
        self.per_process_configs.get(&ip).unwrap_or(&self.config)
    }
}

impl Default for StorageState {
    fn default() -> Self {
        Self::new(StorageConfiguration::default())
    }
}