moonpool-sim 0.6.0

Simulation engine for the moonpool framework
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
use super::types::{ConnectionId, ListenerId};
use crate::TcpListenerTrait;
use crate::sim::state::CloseReason;
use crate::{Event, WeakSimWorld};
use async_trait::async_trait;
use std::{
    future::Future,
    io,
    pin::Pin,
    task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tracing::instrument;

/// Create an io::Error for simulation shutdown.
///
/// Used when the simulation world has been dropped but stream operations are still attempted.
fn sim_shutdown_error() -> io::Error {
    io::Error::new(io::ErrorKind::BrokenPipe, "simulation shutdown")
}

/// Create an io::Error for random connection failure (chaos injection).
fn random_connection_failure_error() -> io::Error {
    io::Error::new(
        io::ErrorKind::ConnectionReset,
        "Random connection failure (explicit)",
    )
}

/// Create an io::Error for half-open connection timeout.
fn half_open_timeout_error() -> io::Error {
    io::Error::new(
        io::ErrorKind::ConnectionReset,
        "Connection reset (half-open timeout)",
    )
}

/// Create an io::Error for aborted connection (RST).
fn connection_aborted_error() -> io::Error {
    io::Error::new(
        io::ErrorKind::ConnectionReset,
        "Connection was aborted (RST)",
    )
}

/// Simulated TCP stream that implements async read/write operations.
///
/// `SimTcpStream` provides a realistic simulation of TCP socket behavior by implementing
/// the `AsyncRead` and `AsyncWrite` traits. It interfaces with the simulation event system
/// to provide ordered, reliable data delivery with configurable network delays.
///
/// ## Architecture Overview
///
/// Each `SimTcpStream` represents one endpoint of a TCP connection:
///
/// ```text
/// Application Layer          SimTcpStream Layer          Simulation Layer
/// ─────────────────          ──────────────────          ─────────────────
///                                                        
/// stream.write_all(data) ──► poll_write(data) ────────► buffer_send(data)
///                                                        └─► ProcessSendBuffer event
///                                                            └─► DataDelivery event
///                                                                └─► paired connection
///                                                        
/// stream.read(buf) ◄────── poll_read(buf) ◄──────────── receive_buffer
///                          │                           └─► waker registration
///                          └─► Poll::Pending/Ready     
/// ```
///
/// ## TCP Semantics Implemented
///
/// This implementation provides the core TCP guarantees required for realistic simulation:
///
/// ### 1. Reliable Delivery
/// - All written data will eventually be delivered to the paired connection
/// - No data loss (unless explicitly simulated via fault injection)
/// - Delivery confirmation through the event system
///
/// ### 2. Ordered Delivery (FIFO)
/// - Messages written first will arrive first at the destination
/// - Achieved through per-connection send buffering
/// - Critical for protocols that depend on message ordering
///
/// ### 3. Flow Control Simulation
/// - Read operations block (`Poll::Pending`) when no data is available
/// - Write operations complete immediately (buffering model)
/// - Backpressure handled at the application layer
///
/// ## Usage Examples
///
/// Provides async read/write operations for client and server connections.
///
/// ## Performance Characteristics
///
/// - **Write Latency**: O(1) - writes are buffered immediately
/// - **Read Latency**: O(network_delay) - depends on simulation configuration
/// - **Memory Usage**: O(buffered_data) - proportional to unread data
/// - **CPU Overhead**: Minimal - leverages efficient event system
///
/// ## Connection Lifecycle
///
/// 1. **Creation**: Stream created with reference to simulation and connection ID
/// 2. **Active Phase**: Read/write operations interact with simulation buffers
/// 3. **Data Transfer**: Asynchronous event processing handles network simulation
/// 4. **Termination**: Stream dropped when connection ends (automatic cleanup)
///
/// ## Thread Safety
///
/// `SimTcpStream` is designed for single-threaded simulation environments:
/// - No `Send` or `Sync` bounds (uses `#[async_trait(?Send)]`)
/// - Safe for use within single-threaded async runtimes
/// - Eliminates synchronization overhead for deterministic simulation
pub struct SimTcpStream {
    /// Weak reference to the simulation world.
    ///
    /// Uses `WeakSimWorld` to avoid circular references while allowing the stream
    /// to detect if the simulation has been dropped. Operations return errors
    /// gracefully if the simulation is no longer available.
    sim: WeakSimWorld,

    /// Unique identifier for this connection within the simulation.
    ///
    /// This ID corresponds to a `ConnectionState` entry in the simulation's
    /// connection table. Used to route read/write operations to the correct
    /// connection buffers and waker registrations.
    connection_id: ConnectionId,
}

impl SimTcpStream {
    /// Create a new simulated TCP stream
    pub(crate) fn new(sim: WeakSimWorld, connection_id: ConnectionId) -> Self {
        Self { sim, connection_id }
    }

    /// Get the connection ID (for test introspection and chaos injection)
    pub fn connection_id(&self) -> ConnectionId {
        self.connection_id
    }
}

impl Drop for SimTcpStream {
    fn drop(&mut self) {
        // Close the connection in the simulation when the stream is dropped
        // This matches real TCP behavior where dropping a socket always closes it
        if let Ok(sim) = self.sim.upgrade() {
            tracing::debug!(
                "SimTcpStream dropping, closing connection {}",
                self.connection_id.0
            );
            sim.close_connection(self.connection_id);
        }
    }
}

impl AsyncRead for SimTcpStream {
    #[instrument(skip(self, cx, buf))]
    fn poll_read(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &mut ReadBuf<'_>,
    ) -> Poll<io::Result<()>> {
        tracing::trace!(
            "SimTcpStream::poll_read called on connection_id={}",
            self.connection_id.0
        );
        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;

        // Random close chaos injection (FDB rollRandomClose pattern)
        // Check at start of every read operation - sim2.actor.cpp:408
        // Returns Some(true) for explicit error, Some(false) for silent (connection marked closed)
        if let Some(true) = sim.roll_random_close(self.connection_id) {
            // 30% explicit exception - throw connection_failed immediately
            return Poll::Ready(Err(random_connection_failure_error()));
            // 70% silent case: connection already marked as closed, will return EOF below
        }

        // Check if receive side is closed (asymmetric closure)
        if sim.is_recv_closed(self.connection_id) {
            tracing::debug!(
                "SimTcpStream::poll_read connection_id={} recv side closed, returning EOF",
                self.connection_id.0
            );
            return Poll::Ready(Ok(())); // EOF
        }

        // Check for half-open connection (peer crashed)
        if sim.is_half_open(self.connection_id) && sim.should_half_open_error(self.connection_id) {
            // Error time reached - return ECONNRESET
            tracing::debug!(
                "SimTcpStream::poll_read connection_id={} half-open error time reached, returning ECONNRESET",
                self.connection_id.0
            );
            return Poll::Ready(Err(half_open_timeout_error()));
        }
        // Half-open but not yet error time - will block (Pending) below waiting for data
        // that will never come, which is the correct half-open behavior

        // Check for read clogging (symmetric with write clogging)
        if sim.is_read_clogged(self.connection_id) {
            // Already clogged, register waker and return Pending
            sim.register_read_clog_waker(self.connection_id, cx.waker().clone());
            return Poll::Pending;
        }

        // Check if this read should be clogged
        if sim.should_clog_read(self.connection_id) {
            sim.clog_read(self.connection_id);
            sim.register_read_clog_waker(self.connection_id, cx.waker().clone());
            return Poll::Pending;
        }

        // Try to read from connection's receive buffer first
        // We should be able to read buffered data even if connection is currently cut
        let mut temp_buf = vec![0u8; buf.remaining()];
        let bytes_read = sim
            .read_from_connection(self.connection_id, &mut temp_buf)
            .map_err(|e| io::Error::other(format!("read error: {}", e)))?;

        tracing::trace!(
            "SimTcpStream::poll_read connection_id={} read {} bytes",
            self.connection_id.0,
            bytes_read
        );

        if bytes_read > 0 {
            let data_preview = String::from_utf8_lossy(&temp_buf[..std::cmp::min(bytes_read, 20)]);
            tracing::trace!(
                "SimTcpStream::poll_read connection_id={} returning data: '{}'",
                self.connection_id.0,
                data_preview
            );
            buf.put_slice(&temp_buf[..bytes_read]);
            Poll::Ready(Ok(()))
        } else {
            // No data available - check if connection has received FIN, is closed, or cut

            // Check for remote FIN (graceful close, all data delivered via FinDelivery event)
            if sim.is_remote_fin_received(self.connection_id) {
                tracing::info!(
                    "SimTcpStream::poll_read connection_id={} remote FIN received, returning EOF (0 bytes)",
                    self.connection_id.0
                );
                return Poll::Ready(Ok(()));
            }

            if sim.is_connection_closed(self.connection_id) {
                // Local side was closed or connection was aborted
                match sim.close_reason(self.connection_id) {
                    CloseReason::Aborted => {
                        tracing::info!(
                            "SimTcpStream::poll_read connection_id={} was aborted (RST), returning ECONNRESET",
                            self.connection_id.0
                        );
                        return Poll::Ready(Err(connection_aborted_error()));
                    }
                    _ => {
                        tracing::info!(
                            "SimTcpStream::poll_read connection_id={} is closed gracefully (FIN), returning EOF (0 bytes)",
                            self.connection_id.0
                        );
                        return Poll::Ready(Ok(()));
                    }
                }
            }

            if sim.is_connection_cut(self.connection_id) {
                // Connection is temporarily cut - register waker and wait for restoration
                tracing::debug!(
                    "SimTcpStream::poll_read connection_id={} is cut, registering cut waker",
                    self.connection_id.0
                );
                sim.register_cut_waker(self.connection_id, cx.waker().clone());
                return Poll::Pending;
            }

            // Register for notification when data arrives
            tracing::trace!(
                "SimTcpStream::poll_read connection_id={} no data, registering waker",
                self.connection_id.0
            );
            sim.register_read_waker(self.connection_id, cx.waker().clone())
                .map_err(|e| io::Error::other(format!("waker registration error: {}", e)))?;

            // Double-check for data after registering waker to handle race conditions
            // This prevents deadlocks where DataDelivery arrives between initial check and waker registration
            let mut temp_buf_recheck = vec![0u8; buf.remaining()];
            let bytes_read_recheck = sim
                .read_from_connection(self.connection_id, &mut temp_buf_recheck)
                .map_err(|e| io::Error::other(format!("recheck read error: {}", e)))?;

            if bytes_read_recheck > 0 {
                let data_preview = String::from_utf8_lossy(
                    &temp_buf_recheck[..std::cmp::min(bytes_read_recheck, 20)],
                );
                tracing::trace!(
                    "SimTcpStream::poll_read connection_id={} found data on recheck: '{}' (race condition avoided)",
                    self.connection_id.0,
                    data_preview
                );
                buf.put_slice(&temp_buf_recheck[..bytes_read_recheck]);
                Poll::Ready(Ok(()))
            } else {
                // Final check - if connection has received FIN, is closed, or cut and no data available

                // Check for remote FIN (recheck after waker registration)
                if sim.is_remote_fin_received(self.connection_id) {
                    tracing::info!(
                        "SimTcpStream::poll_read connection_id={} remote FIN received on recheck, returning EOF (0 bytes)",
                        self.connection_id.0
                    );
                    return Poll::Ready(Ok(()));
                }

                if sim.is_connection_closed(self.connection_id) {
                    match sim.close_reason(self.connection_id) {
                        CloseReason::Aborted => {
                            tracing::info!(
                                "SimTcpStream::poll_read connection_id={} was aborted on recheck (RST), returning ECONNRESET",
                                self.connection_id.0
                            );
                            Poll::Ready(Err(connection_aborted_error()))
                        }
                        _ => {
                            tracing::info!(
                                "SimTcpStream::poll_read connection_id={} is closed on recheck (FIN), returning EOF (0 bytes)",
                                self.connection_id.0
                            );
                            Poll::Ready(Ok(()))
                        }
                    }
                } else if sim.is_connection_cut(self.connection_id) {
                    // Connection is temporarily cut - already registered waker above, just wait
                    tracing::debug!(
                        "SimTcpStream::poll_read connection_id={} is cut on recheck, waiting",
                        self.connection_id.0
                    );
                    Poll::Pending
                } else {
                    Poll::Pending
                }
            }
        }
    }
}

impl AsyncWrite for SimTcpStream {
    #[instrument(skip(self, cx, buf))]
    fn poll_write(
        self: Pin<&mut Self>,
        cx: &mut Context<'_>,
        buf: &[u8],
    ) -> Poll<Result<usize, io::Error>> {
        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;

        // Random close chaos injection (FDB rollRandomClose pattern)
        // Check at start of every write operation - sim2.actor.cpp:423
        // Returns Some(true) for explicit error, Some(false) for silent (connection marked closed)
        if let Some(true) = sim.roll_random_close(self.connection_id) {
            // 30% explicit exception - throw connection_failed immediately
            return Poll::Ready(Err(random_connection_failure_error()));
            // 70% silent case: connection already marked as closed, will fail below
        }

        // Check if send side is closed (asymmetric closure)
        if sim.is_send_closed(self.connection_id) {
            return Poll::Ready(Err(io::Error::new(
                io::ErrorKind::BrokenPipe,
                "Connection send side closed",
            )));
        }

        // Check if connection is closed or cut
        if sim.is_connection_closed(self.connection_id) {
            // Check how the connection was closed
            return match sim.close_reason(self.connection_id) {
                CloseReason::Aborted => Poll::Ready(Err(connection_aborted_error())),
                _ => Poll::Ready(Err(io::Error::new(
                    io::ErrorKind::BrokenPipe,
                    "Connection was closed (FIN)",
                ))),
            };
        }

        if sim.is_connection_cut(self.connection_id) {
            // Connection is temporarily cut - register waker and wait for restoration
            tracing::debug!(
                "SimTcpStream::poll_write connection_id={} is cut, registering cut waker",
                self.connection_id.0
            );
            sim.register_cut_waker(self.connection_id, cx.waker().clone());
            tracing::debug!(
                "SimTcpStream::poll_write connection_id={} registered waker for cut connection",
                self.connection_id.0
            );
            return Poll::Pending;
        }

        // Check for half-open connection (peer crashed)
        if sim.is_half_open(self.connection_id) && sim.should_half_open_error(self.connection_id) {
            // Error time reached - return ECONNRESET
            tracing::debug!(
                "SimTcpStream::poll_write connection_id={} half-open error time reached, returning ECONNRESET",
                self.connection_id.0
            );
            return Poll::Ready(Err(half_open_timeout_error()));
        }
        // Half-open but not yet error time - writes succeed but data goes nowhere
        // (paired_connection is already None, so buffer_send will silently succeed)

        // Check for send buffer space (backpressure)
        let available_buffer = sim.available_send_buffer(self.connection_id);
        if available_buffer < buf.len() {
            // Not enough buffer space, register waker and return Pending
            tracing::debug!(
                "SimTcpStream::poll_write connection_id={} buffer full (available={}, needed={}), waiting",
                self.connection_id.0,
                available_buffer,
                buf.len()
            );
            sim.register_send_buffer_waker(self.connection_id, cx.waker().clone());
            return Poll::Pending;
        }

        // Phase 7: Check for write clogging
        if sim.is_write_clogged(self.connection_id) {
            // Already clogged, register waker and return Pending
            sim.register_clog_waker(self.connection_id, cx.waker().clone());
            return Poll::Pending;
        }

        // Check if this write should be clogged
        if sim.should_clog_write(self.connection_id) {
            sim.clog_write(self.connection_id);
            sim.register_clog_waker(self.connection_id, cx.waker().clone());
            return Poll::Pending;
        }

        // Use buffered send to maintain TCP ordering
        let data_preview = String::from_utf8_lossy(&buf[..std::cmp::min(buf.len(), 20)]);
        tracing::trace!(
            "SimTcpStream::poll_write buffering {} bytes: '{}' for ordered delivery",
            buf.len(),
            data_preview
        );

        // Buffer the data for ordered processing instead of direct event scheduling
        sim.buffer_send(self.connection_id, buf.to_vec())
            .map_err(|e| io::Error::other(format!("buffer send error: {}", e)))?;

        Poll::Ready(Ok(buf.len()))
    }

    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        Poll::Ready(Ok(()))
    }

    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;

        // Close the connection in the simulation when shutdown is called
        tracing::debug!(
            "SimTcpStream::poll_shutdown closing connection {}",
            self.connection_id.0
        );
        sim.close_connection(self.connection_id);

        Poll::Ready(Ok(()))
    }
}

/// Future representing an accept operation
pub struct AcceptFuture {
    sim: WeakSimWorld,
    local_addr: String,
    #[allow(dead_code)] // May be used in future phases for more sophisticated listener tracking
    listener_id: ListenerId,
}

impl Future for AcceptFuture {
    type Output = io::Result<(SimTcpStream, String)>;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        let sim = match self.sim.upgrade() {
            Ok(sim) => sim,
            Err(_) => return Poll::Ready(Err(sim_shutdown_error())),
        };

        match sim.pending_connection(&self.local_addr) {
            Ok(Some(connection_id)) => {
                // Get accept delay from network configuration
                let delay = sim.with_network_config(|config| {
                    crate::network::sample_duration(&config.accept_latency)
                });

                // Schedule accept completion event to advance simulation time
                sim.schedule_event(
                    Event::Connection {
                        id: connection_id.0,
                        state: crate::ConnectionStateChange::ConnectionReady,
                    },
                    delay,
                );

                // FDB Pattern (sim2.actor.cpp:1149-1175):
                // Return the synthesized ephemeral peer address, not the client's real address.
                // This simulates real TCP where servers see client ephemeral ports.
                let peer_addr = sim
                    .connection_peer_address(connection_id)
                    .unwrap_or_else(|| "unknown:0".to_string());

                let stream = SimTcpStream::new(self.sim.clone(), connection_id);
                Poll::Ready(Ok((stream, peer_addr)))
            }
            Ok(None) => {
                // No connection available yet - register waker for when connection becomes available
                if let Err(e) = sim.register_accept_waker(&self.local_addr, cx.waker().clone()) {
                    Poll::Ready(Err(io::Error::other(format!(
                        "failed to register accept waker: {}",
                        e
                    ))))
                } else {
                    Poll::Pending
                }
            }
            Err(e) => Poll::Ready(Err(io::Error::other(format!(
                "failed to get pending connection: {}",
                e
            )))),
        }
    }
}

/// Simulated TCP listener
pub struct SimTcpListener {
    sim: WeakSimWorld,
    #[allow(dead_code)] // Will be used in future phases
    listener_id: ListenerId,
    local_addr: String,
}

impl SimTcpListener {
    /// Create a new simulated TCP listener
    pub(crate) fn new(sim: WeakSimWorld, listener_id: ListenerId, local_addr: String) -> Self {
        Self {
            sim,
            listener_id,
            local_addr,
        }
    }
}

#[async_trait(?Send)]
impl TcpListenerTrait for SimTcpListener {
    type TcpStream = SimTcpStream;

    #[instrument(skip(self))]
    async fn accept(&self) -> io::Result<(Self::TcpStream, String)> {
        AcceptFuture {
            sim: self.sim.clone(),
            local_addr: self.local_addr.clone(),
            listener_id: self.listener_id,
        }
        .await
    }

    fn local_addr(&self) -> io::Result<String> {
        Ok(self.local_addr.clone())
    }
}