moonpool_sim/network/sim/
stream.rs

1use super::types::{ConnectionId, ListenerId};
2use crate::TcpListenerTrait;
3use crate::sim::state::CloseReason;
4use crate::{Event, WeakSimWorld};
5use async_trait::async_trait;
6use std::{
7    future::Future,
8    io,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
13use tracing::instrument;
14
15/// Create an io::Error for simulation shutdown.
16///
17/// Used when the simulation world has been dropped but stream operations are still attempted.
18fn sim_shutdown_error() -> io::Error {
19    io::Error::new(io::ErrorKind::BrokenPipe, "simulation shutdown")
20}
21
22/// Create an io::Error for random connection failure (chaos injection).
23fn random_connection_failure_error() -> io::Error {
24    io::Error::new(
25        io::ErrorKind::ConnectionReset,
26        "Random connection failure (explicit)",
27    )
28}
29
30/// Create an io::Error for half-open connection timeout.
31fn half_open_timeout_error() -> io::Error {
32    io::Error::new(
33        io::ErrorKind::ConnectionReset,
34        "Connection reset (half-open timeout)",
35    )
36}
37
38/// Create an io::Error for aborted connection (RST).
39fn connection_aborted_error() -> io::Error {
40    io::Error::new(
41        io::ErrorKind::ConnectionReset,
42        "Connection was aborted (RST)",
43    )
44}
45
46/// Simulated TCP stream that implements async read/write operations.
47///
48/// `SimTcpStream` provides a realistic simulation of TCP socket behavior by implementing
49/// the `AsyncRead` and `AsyncWrite` traits. It interfaces with the simulation event system
50/// to provide ordered, reliable data delivery with configurable network delays.
51///
52/// ## Architecture Overview
53///
54/// Each `SimTcpStream` represents one endpoint of a TCP connection:
55///
56/// ```text
57/// Application Layer          SimTcpStream Layer          Simulation Layer
58/// ─────────────────          ──────────────────          ─────────────────
59///                                                        
60/// stream.write_all(data) ──► poll_write(data) ────────► buffer_send(data)
61///                                                        └─► ProcessSendBuffer event
62///                                                            └─► DataDelivery event
63///                                                                └─► paired connection
64///                                                        
65/// stream.read(buf) ◄────── poll_read(buf) ◄──────────── receive_buffer
66///                          │                           └─► waker registration
67///                          └─► Poll::Pending/Ready     
68/// ```
69///
70/// ## TCP Semantics Implemented
71///
72/// This implementation provides the core TCP guarantees required for realistic simulation:
73///
74/// ### 1. Reliable Delivery
75/// - All written data will eventually be delivered to the paired connection
76/// - No data loss (unless explicitly simulated via fault injection)
77/// - Delivery confirmation through the event system
78///
79/// ### 2. Ordered Delivery (FIFO)
80/// - Messages written first will arrive first at the destination
81/// - Achieved through per-connection send buffering
82/// - Critical for protocols that depend on message ordering
83///
84/// ### 3. Flow Control Simulation
85/// - Read operations block (`Poll::Pending`) when no data is available
86/// - Write operations complete immediately (buffering model)
87/// - Backpressure handled at the application layer
88///
89/// ## Usage Examples
90///
91/// Provides async read/write operations for client and server connections.
92///
93/// ## Performance Characteristics
94///
95/// - **Write Latency**: O(1) - writes are buffered immediately
96/// - **Read Latency**: O(network_delay) - depends on simulation configuration
97/// - **Memory Usage**: O(buffered_data) - proportional to unread data
98/// - **CPU Overhead**: Minimal - leverages efficient event system
99///
100/// ## Connection Lifecycle
101///
102/// 1. **Creation**: Stream created with reference to simulation and connection ID
103/// 2. **Active Phase**: Read/write operations interact with simulation buffers
104/// 3. **Data Transfer**: Asynchronous event processing handles network simulation
105/// 4. **Termination**: Stream dropped when connection ends (automatic cleanup)
106///
107/// ## Thread Safety
108///
109/// `SimTcpStream` is designed for single-threaded simulation environments:
110/// - No `Send` or `Sync` bounds (uses `#[async_trait(?Send)]`)
111/// - Safe for use within single-threaded async runtimes
112/// - Eliminates synchronization overhead for deterministic simulation
113pub struct SimTcpStream {
114    /// Weak reference to the simulation world.
115    ///
116    /// Uses `WeakSimWorld` to avoid circular references while allowing the stream
117    /// to detect if the simulation has been dropped. Operations return errors
118    /// gracefully if the simulation is no longer available.
119    sim: WeakSimWorld,
120
121    /// Unique identifier for this connection within the simulation.
122    ///
123    /// This ID corresponds to a `ConnectionState` entry in the simulation's
124    /// connection table. Used to route read/write operations to the correct
125    /// connection buffers and waker registrations.
126    connection_id: ConnectionId,
127}
128
129impl SimTcpStream {
130    /// Create a new simulated TCP stream
131    pub(crate) fn new(sim: WeakSimWorld, connection_id: ConnectionId) -> Self {
132        Self { sim, connection_id }
133    }
134
135    /// Get the connection ID (for test introspection and chaos injection)
136    pub fn connection_id(&self) -> ConnectionId {
137        self.connection_id
138    }
139}
140
141impl Drop for SimTcpStream {
142    fn drop(&mut self) {
143        // Close the connection in the simulation when the stream is dropped
144        // This matches real TCP behavior where dropping a socket always closes it
145        if let Ok(sim) = self.sim.upgrade() {
146            tracing::debug!(
147                "SimTcpStream dropping, closing connection {}",
148                self.connection_id.0
149            );
150            sim.close_connection(self.connection_id);
151        }
152    }
153}
154
155impl AsyncRead for SimTcpStream {
156    #[instrument(skip(self, cx, buf))]
157    fn poll_read(
158        self: Pin<&mut Self>,
159        cx: &mut Context<'_>,
160        buf: &mut ReadBuf<'_>,
161    ) -> Poll<io::Result<()>> {
162        tracing::trace!(
163            "SimTcpStream::poll_read called on connection_id={}",
164            self.connection_id.0
165        );
166        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
167
168        // Random close chaos injection (FDB rollRandomClose pattern)
169        // Check at start of every read operation - sim2.actor.cpp:408
170        // Returns Some(true) for explicit error, Some(false) for silent (connection marked closed)
171        if let Some(true) = sim.roll_random_close(self.connection_id) {
172            // 30% explicit exception - throw connection_failed immediately
173            return Poll::Ready(Err(random_connection_failure_error()));
174            // 70% silent case: connection already marked as closed, will return EOF below
175        }
176
177        // Check if receive side is closed (asymmetric closure)
178        if sim.is_recv_closed(self.connection_id) {
179            tracing::debug!(
180                "SimTcpStream::poll_read connection_id={} recv side closed, returning EOF",
181                self.connection_id.0
182            );
183            return Poll::Ready(Ok(())); // EOF
184        }
185
186        // Check for half-open connection (peer crashed)
187        if sim.is_half_open(self.connection_id) && sim.should_half_open_error(self.connection_id) {
188            // Error time reached - return ECONNRESET
189            tracing::debug!(
190                "SimTcpStream::poll_read connection_id={} half-open error time reached, returning ECONNRESET",
191                self.connection_id.0
192            );
193            return Poll::Ready(Err(half_open_timeout_error()));
194        }
195        // Half-open but not yet error time - will block (Pending) below waiting for data
196        // that will never come, which is the correct half-open behavior
197
198        // Check for read clogging (symmetric with write clogging)
199        if sim.is_read_clogged(self.connection_id) {
200            // Already clogged, register waker and return Pending
201            sim.register_read_clog_waker(self.connection_id, cx.waker().clone());
202            return Poll::Pending;
203        }
204
205        // Check if this read should be clogged
206        if sim.should_clog_read(self.connection_id) {
207            sim.clog_read(self.connection_id);
208            sim.register_read_clog_waker(self.connection_id, cx.waker().clone());
209            return Poll::Pending;
210        }
211
212        // Try to read from connection's receive buffer first
213        // We should be able to read buffered data even if connection is currently cut
214        let mut temp_buf = vec![0u8; buf.remaining()];
215        let bytes_read = sim
216            .read_from_connection(self.connection_id, &mut temp_buf)
217            .map_err(|e| io::Error::other(format!("read error: {}", e)))?;
218
219        tracing::trace!(
220            "SimTcpStream::poll_read connection_id={} read {} bytes",
221            self.connection_id.0,
222            bytes_read
223        );
224
225        if bytes_read > 0 {
226            let data_preview = String::from_utf8_lossy(&temp_buf[..std::cmp::min(bytes_read, 20)]);
227            tracing::trace!(
228                "SimTcpStream::poll_read connection_id={} returning data: '{}'",
229                self.connection_id.0,
230                data_preview
231            );
232            buf.put_slice(&temp_buf[..bytes_read]);
233            Poll::Ready(Ok(()))
234        } else {
235            // No data available - check if connection is closed or cut
236            if sim.is_connection_closed(self.connection_id) {
237                // Check how the connection was closed
238                match sim.get_close_reason(self.connection_id) {
239                    CloseReason::Aborted => {
240                        tracing::info!(
241                            "SimTcpStream::poll_read connection_id={} was aborted (RST), returning ECONNRESET",
242                            self.connection_id.0
243                        );
244                        return Poll::Ready(Err(connection_aborted_error()));
245                    }
246                    _ => {
247                        tracing::info!(
248                            "SimTcpStream::poll_read connection_id={} is closed gracefully (FIN), returning EOF (0 bytes)",
249                            self.connection_id.0
250                        );
251                        // Connection closed gracefully (FIN) - return EOF (0 bytes read)
252                        return Poll::Ready(Ok(()));
253                    }
254                }
255            }
256
257            if sim.is_connection_cut(self.connection_id) {
258                // Connection is temporarily cut - register waker and wait for restoration
259                tracing::debug!(
260                    "SimTcpStream::poll_read connection_id={} is cut, registering cut waker",
261                    self.connection_id.0
262                );
263                sim.register_cut_waker(self.connection_id, cx.waker().clone());
264                return Poll::Pending;
265            }
266
267            // Register for notification when data arrives
268            tracing::trace!(
269                "SimTcpStream::poll_read connection_id={} no data, registering waker",
270                self.connection_id.0
271            );
272            sim.register_read_waker(self.connection_id, cx.waker().clone())
273                .map_err(|e| io::Error::other(format!("waker registration error: {}", e)))?;
274
275            // Double-check for data after registering waker to handle race conditions
276            // This prevents deadlocks where DataDelivery arrives between initial check and waker registration
277            let mut temp_buf_recheck = vec![0u8; buf.remaining()];
278            let bytes_read_recheck = sim
279                .read_from_connection(self.connection_id, &mut temp_buf_recheck)
280                .map_err(|e| io::Error::other(format!("recheck read error: {}", e)))?;
281
282            if bytes_read_recheck > 0 {
283                let data_preview = String::from_utf8_lossy(
284                    &temp_buf_recheck[..std::cmp::min(bytes_read_recheck, 20)],
285                );
286                tracing::trace!(
287                    "SimTcpStream::poll_read connection_id={} found data on recheck: '{}' (race condition avoided)",
288                    self.connection_id.0,
289                    data_preview
290                );
291                buf.put_slice(&temp_buf_recheck[..bytes_read_recheck]);
292                Poll::Ready(Ok(()))
293            } else {
294                // Final check - if connection is closed or cut and no data available
295                if sim.is_connection_closed(self.connection_id) {
296                    match sim.get_close_reason(self.connection_id) {
297                        CloseReason::Aborted => {
298                            tracing::info!(
299                                "SimTcpStream::poll_read connection_id={} was aborted on recheck (RST), returning ECONNRESET",
300                                self.connection_id.0
301                            );
302                            Poll::Ready(Err(connection_aborted_error()))
303                        }
304                        _ => {
305                            tracing::info!(
306                                "SimTcpStream::poll_read connection_id={} is closed on recheck (FIN), returning EOF (0 bytes)",
307                                self.connection_id.0
308                            );
309                            // Connection closed gracefully - return EOF (0 bytes read)
310                            Poll::Ready(Ok(()))
311                        }
312                    }
313                } else if sim.is_connection_cut(self.connection_id) {
314                    // Connection is temporarily cut - already registered waker above, just wait
315                    tracing::debug!(
316                        "SimTcpStream::poll_read connection_id={} is cut on recheck, waiting",
317                        self.connection_id.0
318                    );
319                    Poll::Pending
320                } else {
321                    Poll::Pending
322                }
323            }
324        }
325    }
326}
327
328impl AsyncWrite for SimTcpStream {
329    #[instrument(skip(self, cx, buf))]
330    fn poll_write(
331        self: Pin<&mut Self>,
332        cx: &mut Context<'_>,
333        buf: &[u8],
334    ) -> Poll<Result<usize, io::Error>> {
335        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
336
337        // Random close chaos injection (FDB rollRandomClose pattern)
338        // Check at start of every write operation - sim2.actor.cpp:423
339        // Returns Some(true) for explicit error, Some(false) for silent (connection marked closed)
340        if let Some(true) = sim.roll_random_close(self.connection_id) {
341            // 30% explicit exception - throw connection_failed immediately
342            return Poll::Ready(Err(random_connection_failure_error()));
343            // 70% silent case: connection already marked as closed, will fail below
344        }
345
346        // Check if send side is closed (asymmetric closure)
347        if sim.is_send_closed(self.connection_id) {
348            return Poll::Ready(Err(io::Error::new(
349                io::ErrorKind::BrokenPipe,
350                "Connection send side closed",
351            )));
352        }
353
354        // Check if connection is closed or cut
355        if sim.is_connection_closed(self.connection_id) {
356            // Check how the connection was closed
357            return match sim.get_close_reason(self.connection_id) {
358                CloseReason::Aborted => Poll::Ready(Err(connection_aborted_error())),
359                _ => Poll::Ready(Err(io::Error::new(
360                    io::ErrorKind::BrokenPipe,
361                    "Connection was closed (FIN)",
362                ))),
363            };
364        }
365
366        if sim.is_connection_cut(self.connection_id) {
367            // Connection is temporarily cut - register waker and wait for restoration
368            tracing::debug!(
369                "SimTcpStream::poll_write connection_id={} is cut, registering cut waker",
370                self.connection_id.0
371            );
372            sim.register_cut_waker(self.connection_id, cx.waker().clone());
373            tracing::debug!(
374                "SimTcpStream::poll_write connection_id={} registered waker for cut connection",
375                self.connection_id.0
376            );
377            return Poll::Pending;
378        }
379
380        // Check for half-open connection (peer crashed)
381        if sim.is_half_open(self.connection_id) && sim.should_half_open_error(self.connection_id) {
382            // Error time reached - return ECONNRESET
383            tracing::debug!(
384                "SimTcpStream::poll_write connection_id={} half-open error time reached, returning ECONNRESET",
385                self.connection_id.0
386            );
387            return Poll::Ready(Err(half_open_timeout_error()));
388        }
389        // Half-open but not yet error time - writes succeed but data goes nowhere
390        // (paired_connection is already None, so buffer_send will silently succeed)
391
392        // Check for send buffer space (backpressure)
393        let available_buffer = sim.available_send_buffer(self.connection_id);
394        if available_buffer < buf.len() {
395            // Not enough buffer space, register waker and return Pending
396            tracing::debug!(
397                "SimTcpStream::poll_write connection_id={} buffer full (available={}, needed={}), waiting",
398                self.connection_id.0,
399                available_buffer,
400                buf.len()
401            );
402            sim.register_send_buffer_waker(self.connection_id, cx.waker().clone());
403            return Poll::Pending;
404        }
405
406        // Phase 7: Check for write clogging
407        if sim.is_write_clogged(self.connection_id) {
408            // Already clogged, register waker and return Pending
409            sim.register_clog_waker(self.connection_id, cx.waker().clone());
410            return Poll::Pending;
411        }
412
413        // Check if this write should be clogged
414        if sim.should_clog_write(self.connection_id) {
415            sim.clog_write(self.connection_id);
416            sim.register_clog_waker(self.connection_id, cx.waker().clone());
417            return Poll::Pending;
418        }
419
420        // Use buffered send to maintain TCP ordering
421        let data_preview = String::from_utf8_lossy(&buf[..std::cmp::min(buf.len(), 20)]);
422        tracing::trace!(
423            "SimTcpStream::poll_write buffering {} bytes: '{}' for ordered delivery",
424            buf.len(),
425            data_preview
426        );
427
428        // Buffer the data for ordered processing instead of direct event scheduling
429        sim.buffer_send(self.connection_id, buf.to_vec())
430            .map_err(|e| io::Error::other(format!("buffer send error: {}", e)))?;
431
432        Poll::Ready(Ok(buf.len()))
433    }
434
435    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
436        Poll::Ready(Ok(()))
437    }
438
439    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
440        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
441
442        // Close the connection in the simulation when shutdown is called
443        tracing::debug!(
444            "SimTcpStream::poll_shutdown closing connection {}",
445            self.connection_id.0
446        );
447        sim.close_connection(self.connection_id);
448
449        Poll::Ready(Ok(()))
450    }
451}
452
453/// Future representing an accept operation
454pub struct AcceptFuture {
455    sim: WeakSimWorld,
456    local_addr: String,
457    #[allow(dead_code)] // May be used in future phases for more sophisticated listener tracking
458    listener_id: ListenerId,
459}
460
461impl Future for AcceptFuture {
462    type Output = io::Result<(SimTcpStream, String)>;
463
464    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
465        let sim = match self.sim.upgrade() {
466            Ok(sim) => sim,
467            Err(_) => return Poll::Ready(Err(sim_shutdown_error())),
468        };
469
470        match sim.get_pending_connection(&self.local_addr) {
471            Ok(Some(connection_id)) => {
472                // Get accept delay from network configuration
473                let delay = sim.with_network_config(|config| {
474                    crate::network::sample_duration(&config.accept_latency)
475                });
476
477                // Schedule accept completion event to advance simulation time
478                sim.schedule_event(
479                    Event::Connection {
480                        id: connection_id.0,
481                        state: crate::ConnectionStateChange::ConnectionReady,
482                    },
483                    delay,
484                );
485
486                // FDB Pattern (sim2.actor.cpp:1149-1175):
487                // Return the synthesized ephemeral peer address, not the client's real address.
488                // This simulates real TCP where servers see client ephemeral ports.
489                let peer_addr = sim
490                    .get_connection_peer_address(connection_id)
491                    .unwrap_or_else(|| "unknown:0".to_string());
492
493                let stream = SimTcpStream::new(self.sim.clone(), connection_id);
494                Poll::Ready(Ok((stream, peer_addr)))
495            }
496            Ok(None) => {
497                // No connection available yet - register waker for when connection becomes available
498                if let Err(e) = sim.register_accept_waker(&self.local_addr, cx.waker().clone()) {
499                    Poll::Ready(Err(io::Error::other(format!(
500                        "failed to register accept waker: {}",
501                        e
502                    ))))
503                } else {
504                    Poll::Pending
505                }
506            }
507            Err(e) => Poll::Ready(Err(io::Error::other(format!(
508                "failed to get pending connection: {}",
509                e
510            )))),
511        }
512    }
513}
514
515/// Simulated TCP listener
516pub struct SimTcpListener {
517    sim: WeakSimWorld,
518    #[allow(dead_code)] // Will be used in future phases
519    listener_id: ListenerId,
520    local_addr: String,
521}
522
523impl SimTcpListener {
524    /// Create a new simulated TCP listener
525    pub(crate) fn new(sim: WeakSimWorld, listener_id: ListenerId, local_addr: String) -> Self {
526        Self {
527            sim,
528            listener_id,
529            local_addr,
530        }
531    }
532}
533
534#[async_trait(?Send)]
535impl TcpListenerTrait for SimTcpListener {
536    type TcpStream = SimTcpStream;
537
538    #[instrument(skip(self))]
539    async fn accept(&self) -> io::Result<(Self::TcpStream, String)> {
540        AcceptFuture {
541            sim: self.sim.clone(),
542            local_addr: self.local_addr.clone(),
543            listener_id: self.listener_id,
544        }
545        .await
546    }
547
548    fn local_addr(&self) -> io::Result<String> {
549        Ok(self.local_addr.clone())
550    }
551}