Skip to main content

moonpool_sim/network/sim/
stream.rs

1use super::types::{ConnectionId, ListenerId};
2use crate::TcpListenerTrait;
3use crate::sim::state::CloseReason;
4use crate::{Event, WeakSimWorld};
5use async_trait::async_trait;
6use std::{
7    future::Future,
8    io,
9    pin::Pin,
10    task::{Context, Poll},
11};
12use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
13use tracing::instrument;
14
15/// Create an io::Error for simulation shutdown.
16///
17/// Used when the simulation world has been dropped but stream operations are still attempted.
18fn sim_shutdown_error() -> io::Error {
19    io::Error::new(io::ErrorKind::BrokenPipe, "simulation shutdown")
20}
21
22/// Create an io::Error for random connection failure (chaos injection).
23fn random_connection_failure_error() -> io::Error {
24    io::Error::new(
25        io::ErrorKind::ConnectionReset,
26        "Random connection failure (explicit)",
27    )
28}
29
30/// Create an io::Error for half-open connection timeout.
31fn half_open_timeout_error() -> io::Error {
32    io::Error::new(
33        io::ErrorKind::ConnectionReset,
34        "Connection reset (half-open timeout)",
35    )
36}
37
38/// Create an io::Error for aborted connection (RST).
39fn connection_aborted_error() -> io::Error {
40    io::Error::new(
41        io::ErrorKind::ConnectionReset,
42        "Connection was aborted (RST)",
43    )
44}
45
46/// Simulated TCP stream that implements async read/write operations.
47///
48/// `SimTcpStream` provides a realistic simulation of TCP socket behavior by implementing
49/// the `AsyncRead` and `AsyncWrite` traits. It interfaces with the simulation event system
50/// to provide ordered, reliable data delivery with configurable network delays.
51///
52/// ## Architecture Overview
53///
54/// Each `SimTcpStream` represents one endpoint of a TCP connection:
55///
56/// ```text
57/// Application Layer          SimTcpStream Layer          Simulation Layer
58/// ─────────────────          ──────────────────          ─────────────────
59///                                                        
60/// stream.write_all(data) ──► poll_write(data) ────────► buffer_send(data)
61///                                                        └─► ProcessSendBuffer event
62///                                                            └─► DataDelivery event
63///                                                                └─► paired connection
64///                                                        
65/// stream.read(buf) ◄────── poll_read(buf) ◄──────────── receive_buffer
66///                          │                           └─► waker registration
67///                          └─► Poll::Pending/Ready     
68/// ```
69///
70/// ## TCP Semantics Implemented
71///
72/// This implementation provides the core TCP guarantees required for realistic simulation:
73///
74/// ### 1. Reliable Delivery
75/// - All written data will eventually be delivered to the paired connection
76/// - No data loss (unless explicitly simulated via fault injection)
77/// - Delivery confirmation through the event system
78///
79/// ### 2. Ordered Delivery (FIFO)
80/// - Messages written first will arrive first at the destination
81/// - Achieved through per-connection send buffering
82/// - Critical for protocols that depend on message ordering
83///
84/// ### 3. Flow Control Simulation
85/// - Read operations block (`Poll::Pending`) when no data is available
86/// - Write operations complete immediately (buffering model)
87/// - Backpressure handled at the application layer
88///
89/// ## Usage Examples
90///
91/// Provides async read/write operations for client and server connections.
92///
93/// ## Performance Characteristics
94///
95/// - **Write Latency**: O(1) - writes are buffered immediately
96/// - **Read Latency**: O(network_delay) - depends on simulation configuration
97/// - **Memory Usage**: O(buffered_data) - proportional to unread data
98/// - **CPU Overhead**: Minimal - leverages efficient event system
99///
100/// ## Connection Lifecycle
101///
102/// 1. **Creation**: Stream created with reference to simulation and connection ID
103/// 2. **Active Phase**: Read/write operations interact with simulation buffers
104/// 3. **Data Transfer**: Asynchronous event processing handles network simulation
105/// 4. **Termination**: Stream dropped when connection ends (automatic cleanup)
106///
107/// ## Thread Safety
108///
109/// `SimTcpStream` is designed for single-threaded simulation environments:
110/// - No `Send` or `Sync` bounds (uses `#[async_trait(?Send)]`)
111/// - Safe for use within single-threaded async runtimes
112/// - Eliminates synchronization overhead for deterministic simulation
113pub struct SimTcpStream {
114    /// Weak reference to the simulation world.
115    ///
116    /// Uses `WeakSimWorld` to avoid circular references while allowing the stream
117    /// to detect if the simulation has been dropped. Operations return errors
118    /// gracefully if the simulation is no longer available.
119    sim: WeakSimWorld,
120
121    /// Unique identifier for this connection within the simulation.
122    ///
123    /// This ID corresponds to a `ConnectionState` entry in the simulation's
124    /// connection table. Used to route read/write operations to the correct
125    /// connection buffers and waker registrations.
126    connection_id: ConnectionId,
127}
128
129impl SimTcpStream {
130    /// Create a new simulated TCP stream
131    pub(crate) fn new(sim: WeakSimWorld, connection_id: ConnectionId) -> Self {
132        Self { sim, connection_id }
133    }
134
135    /// Get the connection ID (for test introspection and chaos injection)
136    pub fn connection_id(&self) -> ConnectionId {
137        self.connection_id
138    }
139}
140
141impl Drop for SimTcpStream {
142    fn drop(&mut self) {
143        // Close the connection in the simulation when the stream is dropped
144        // This matches real TCP behavior where dropping a socket always closes it
145        if let Ok(sim) = self.sim.upgrade() {
146            tracing::debug!(
147                "SimTcpStream dropping, closing connection {}",
148                self.connection_id.0
149            );
150            sim.close_connection(self.connection_id);
151        }
152    }
153}
154
155impl AsyncRead for SimTcpStream {
156    #[instrument(skip(self, cx, buf))]
157    fn poll_read(
158        self: Pin<&mut Self>,
159        cx: &mut Context<'_>,
160        buf: &mut ReadBuf<'_>,
161    ) -> Poll<io::Result<()>> {
162        tracing::trace!(
163            "SimTcpStream::poll_read called on connection_id={}",
164            self.connection_id.0
165        );
166        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
167
168        // Random close chaos injection (FDB rollRandomClose pattern)
169        // Check at start of every read operation - sim2.actor.cpp:408
170        // Returns Some(true) for explicit error, Some(false) for silent (connection marked closed)
171        if let Some(true) = sim.roll_random_close(self.connection_id) {
172            // 30% explicit exception - throw connection_failed immediately
173            return Poll::Ready(Err(random_connection_failure_error()));
174            // 70% silent case: connection already marked as closed, will return EOF below
175        }
176
177        // Check if receive side is closed (asymmetric closure)
178        if sim.is_recv_closed(self.connection_id) {
179            tracing::debug!(
180                "SimTcpStream::poll_read connection_id={} recv side closed, returning EOF",
181                self.connection_id.0
182            );
183            return Poll::Ready(Ok(())); // EOF
184        }
185
186        // Check for half-open connection (peer crashed)
187        if sim.is_half_open(self.connection_id) && sim.should_half_open_error(self.connection_id) {
188            // Error time reached - return ECONNRESET
189            tracing::debug!(
190                "SimTcpStream::poll_read connection_id={} half-open error time reached, returning ECONNRESET",
191                self.connection_id.0
192            );
193            return Poll::Ready(Err(half_open_timeout_error()));
194        }
195        // Half-open but not yet error time - will block (Pending) below waiting for data
196        // that will never come, which is the correct half-open behavior
197
198        // Check for read clogging (symmetric with write clogging)
199        if sim.is_read_clogged(self.connection_id) {
200            // Already clogged, register waker and return Pending
201            sim.register_read_clog_waker(self.connection_id, cx.waker().clone());
202            return Poll::Pending;
203        }
204
205        // Check if this read should be clogged
206        if sim.should_clog_read(self.connection_id) {
207            sim.clog_read(self.connection_id);
208            sim.register_read_clog_waker(self.connection_id, cx.waker().clone());
209            return Poll::Pending;
210        }
211
212        // Try to read from connection's receive buffer first
213        // We should be able to read buffered data even if connection is currently cut
214        let mut temp_buf = vec![0u8; buf.remaining()];
215        let bytes_read = sim
216            .read_from_connection(self.connection_id, &mut temp_buf)
217            .map_err(|e| io::Error::other(format!("read error: {}", e)))?;
218
219        tracing::trace!(
220            "SimTcpStream::poll_read connection_id={} read {} bytes",
221            self.connection_id.0,
222            bytes_read
223        );
224
225        if bytes_read > 0 {
226            let data_preview = String::from_utf8_lossy(&temp_buf[..std::cmp::min(bytes_read, 20)]);
227            tracing::trace!(
228                "SimTcpStream::poll_read connection_id={} returning data: '{}'",
229                self.connection_id.0,
230                data_preview
231            );
232            buf.put_slice(&temp_buf[..bytes_read]);
233            Poll::Ready(Ok(()))
234        } else {
235            // No data available - check if connection has received FIN, is closed, or cut
236
237            // Check for remote FIN (graceful close, all data delivered via FinDelivery event)
238            if sim.is_remote_fin_received(self.connection_id) {
239                tracing::info!(
240                    "SimTcpStream::poll_read connection_id={} remote FIN received, returning EOF (0 bytes)",
241                    self.connection_id.0
242                );
243                return Poll::Ready(Ok(()));
244            }
245
246            if sim.is_connection_closed(self.connection_id) {
247                // Local side was closed or connection was aborted
248                match sim.get_close_reason(self.connection_id) {
249                    CloseReason::Aborted => {
250                        tracing::info!(
251                            "SimTcpStream::poll_read connection_id={} was aborted (RST), returning ECONNRESET",
252                            self.connection_id.0
253                        );
254                        return Poll::Ready(Err(connection_aborted_error()));
255                    }
256                    _ => {
257                        tracing::info!(
258                            "SimTcpStream::poll_read connection_id={} is closed gracefully (FIN), returning EOF (0 bytes)",
259                            self.connection_id.0
260                        );
261                        return Poll::Ready(Ok(()));
262                    }
263                }
264            }
265
266            if sim.is_connection_cut(self.connection_id) {
267                // Connection is temporarily cut - register waker and wait for restoration
268                tracing::debug!(
269                    "SimTcpStream::poll_read connection_id={} is cut, registering cut waker",
270                    self.connection_id.0
271                );
272                sim.register_cut_waker(self.connection_id, cx.waker().clone());
273                return Poll::Pending;
274            }
275
276            // Register for notification when data arrives
277            tracing::trace!(
278                "SimTcpStream::poll_read connection_id={} no data, registering waker",
279                self.connection_id.0
280            );
281            sim.register_read_waker(self.connection_id, cx.waker().clone())
282                .map_err(|e| io::Error::other(format!("waker registration error: {}", e)))?;
283
284            // Double-check for data after registering waker to handle race conditions
285            // This prevents deadlocks where DataDelivery arrives between initial check and waker registration
286            let mut temp_buf_recheck = vec![0u8; buf.remaining()];
287            let bytes_read_recheck = sim
288                .read_from_connection(self.connection_id, &mut temp_buf_recheck)
289                .map_err(|e| io::Error::other(format!("recheck read error: {}", e)))?;
290
291            if bytes_read_recheck > 0 {
292                let data_preview = String::from_utf8_lossy(
293                    &temp_buf_recheck[..std::cmp::min(bytes_read_recheck, 20)],
294                );
295                tracing::trace!(
296                    "SimTcpStream::poll_read connection_id={} found data on recheck: '{}' (race condition avoided)",
297                    self.connection_id.0,
298                    data_preview
299                );
300                buf.put_slice(&temp_buf_recheck[..bytes_read_recheck]);
301                Poll::Ready(Ok(()))
302            } else {
303                // Final check - if connection has received FIN, is closed, or cut and no data available
304
305                // Check for remote FIN (recheck after waker registration)
306                if sim.is_remote_fin_received(self.connection_id) {
307                    tracing::info!(
308                        "SimTcpStream::poll_read connection_id={} remote FIN received on recheck, returning EOF (0 bytes)",
309                        self.connection_id.0
310                    );
311                    return Poll::Ready(Ok(()));
312                }
313
314                if sim.is_connection_closed(self.connection_id) {
315                    match sim.get_close_reason(self.connection_id) {
316                        CloseReason::Aborted => {
317                            tracing::info!(
318                                "SimTcpStream::poll_read connection_id={} was aborted on recheck (RST), returning ECONNRESET",
319                                self.connection_id.0
320                            );
321                            Poll::Ready(Err(connection_aborted_error()))
322                        }
323                        _ => {
324                            tracing::info!(
325                                "SimTcpStream::poll_read connection_id={} is closed on recheck (FIN), returning EOF (0 bytes)",
326                                self.connection_id.0
327                            );
328                            Poll::Ready(Ok(()))
329                        }
330                    }
331                } else if sim.is_connection_cut(self.connection_id) {
332                    // Connection is temporarily cut - already registered waker above, just wait
333                    tracing::debug!(
334                        "SimTcpStream::poll_read connection_id={} is cut on recheck, waiting",
335                        self.connection_id.0
336                    );
337                    Poll::Pending
338                } else {
339                    Poll::Pending
340                }
341            }
342        }
343    }
344}
345
346impl AsyncWrite for SimTcpStream {
347    #[instrument(skip(self, cx, buf))]
348    fn poll_write(
349        self: Pin<&mut Self>,
350        cx: &mut Context<'_>,
351        buf: &[u8],
352    ) -> Poll<Result<usize, io::Error>> {
353        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
354
355        // Random close chaos injection (FDB rollRandomClose pattern)
356        // Check at start of every write operation - sim2.actor.cpp:423
357        // Returns Some(true) for explicit error, Some(false) for silent (connection marked closed)
358        if let Some(true) = sim.roll_random_close(self.connection_id) {
359            // 30% explicit exception - throw connection_failed immediately
360            return Poll::Ready(Err(random_connection_failure_error()));
361            // 70% silent case: connection already marked as closed, will fail below
362        }
363
364        // Check if send side is closed (asymmetric closure)
365        if sim.is_send_closed(self.connection_id) {
366            return Poll::Ready(Err(io::Error::new(
367                io::ErrorKind::BrokenPipe,
368                "Connection send side closed",
369            )));
370        }
371
372        // Check if connection is closed or cut
373        if sim.is_connection_closed(self.connection_id) {
374            // Check how the connection was closed
375            return match sim.get_close_reason(self.connection_id) {
376                CloseReason::Aborted => Poll::Ready(Err(connection_aborted_error())),
377                _ => Poll::Ready(Err(io::Error::new(
378                    io::ErrorKind::BrokenPipe,
379                    "Connection was closed (FIN)",
380                ))),
381            };
382        }
383
384        if sim.is_connection_cut(self.connection_id) {
385            // Connection is temporarily cut - register waker and wait for restoration
386            tracing::debug!(
387                "SimTcpStream::poll_write connection_id={} is cut, registering cut waker",
388                self.connection_id.0
389            );
390            sim.register_cut_waker(self.connection_id, cx.waker().clone());
391            tracing::debug!(
392                "SimTcpStream::poll_write connection_id={} registered waker for cut connection",
393                self.connection_id.0
394            );
395            return Poll::Pending;
396        }
397
398        // Check for half-open connection (peer crashed)
399        if sim.is_half_open(self.connection_id) && sim.should_half_open_error(self.connection_id) {
400            // Error time reached - return ECONNRESET
401            tracing::debug!(
402                "SimTcpStream::poll_write connection_id={} half-open error time reached, returning ECONNRESET",
403                self.connection_id.0
404            );
405            return Poll::Ready(Err(half_open_timeout_error()));
406        }
407        // Half-open but not yet error time - writes succeed but data goes nowhere
408        // (paired_connection is already None, so buffer_send will silently succeed)
409
410        // Check for send buffer space (backpressure)
411        let available_buffer = sim.available_send_buffer(self.connection_id);
412        if available_buffer < buf.len() {
413            // Not enough buffer space, register waker and return Pending
414            tracing::debug!(
415                "SimTcpStream::poll_write connection_id={} buffer full (available={}, needed={}), waiting",
416                self.connection_id.0,
417                available_buffer,
418                buf.len()
419            );
420            sim.register_send_buffer_waker(self.connection_id, cx.waker().clone());
421            return Poll::Pending;
422        }
423
424        // Phase 7: Check for write clogging
425        if sim.is_write_clogged(self.connection_id) {
426            // Already clogged, register waker and return Pending
427            sim.register_clog_waker(self.connection_id, cx.waker().clone());
428            return Poll::Pending;
429        }
430
431        // Check if this write should be clogged
432        if sim.should_clog_write(self.connection_id) {
433            sim.clog_write(self.connection_id);
434            sim.register_clog_waker(self.connection_id, cx.waker().clone());
435            return Poll::Pending;
436        }
437
438        // Use buffered send to maintain TCP ordering
439        let data_preview = String::from_utf8_lossy(&buf[..std::cmp::min(buf.len(), 20)]);
440        tracing::trace!(
441            "SimTcpStream::poll_write buffering {} bytes: '{}' for ordered delivery",
442            buf.len(),
443            data_preview
444        );
445
446        // Buffer the data for ordered processing instead of direct event scheduling
447        sim.buffer_send(self.connection_id, buf.to_vec())
448            .map_err(|e| io::Error::other(format!("buffer send error: {}", e)))?;
449
450        Poll::Ready(Ok(buf.len()))
451    }
452
453    fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
454        Poll::Ready(Ok(()))
455    }
456
457    fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
458        let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
459
460        // Close the connection in the simulation when shutdown is called
461        tracing::debug!(
462            "SimTcpStream::poll_shutdown closing connection {}",
463            self.connection_id.0
464        );
465        sim.close_connection(self.connection_id);
466
467        Poll::Ready(Ok(()))
468    }
469}
470
471/// Future representing an accept operation
472pub struct AcceptFuture {
473    sim: WeakSimWorld,
474    local_addr: String,
475    #[allow(dead_code)] // May be used in future phases for more sophisticated listener tracking
476    listener_id: ListenerId,
477}
478
479impl Future for AcceptFuture {
480    type Output = io::Result<(SimTcpStream, String)>;
481
482    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
483        let sim = match self.sim.upgrade() {
484            Ok(sim) => sim,
485            Err(_) => return Poll::Ready(Err(sim_shutdown_error())),
486        };
487
488        match sim.get_pending_connection(&self.local_addr) {
489            Ok(Some(connection_id)) => {
490                // Get accept delay from network configuration
491                let delay = sim.with_network_config(|config| {
492                    crate::network::sample_duration(&config.accept_latency)
493                });
494
495                // Schedule accept completion event to advance simulation time
496                sim.schedule_event(
497                    Event::Connection {
498                        id: connection_id.0,
499                        state: crate::ConnectionStateChange::ConnectionReady,
500                    },
501                    delay,
502                );
503
504                // FDB Pattern (sim2.actor.cpp:1149-1175):
505                // Return the synthesized ephemeral peer address, not the client's real address.
506                // This simulates real TCP where servers see client ephemeral ports.
507                let peer_addr = sim
508                    .get_connection_peer_address(connection_id)
509                    .unwrap_or_else(|| "unknown:0".to_string());
510
511                let stream = SimTcpStream::new(self.sim.clone(), connection_id);
512                Poll::Ready(Ok((stream, peer_addr)))
513            }
514            Ok(None) => {
515                // No connection available yet - register waker for when connection becomes available
516                if let Err(e) = sim.register_accept_waker(&self.local_addr, cx.waker().clone()) {
517                    Poll::Ready(Err(io::Error::other(format!(
518                        "failed to register accept waker: {}",
519                        e
520                    ))))
521                } else {
522                    Poll::Pending
523                }
524            }
525            Err(e) => Poll::Ready(Err(io::Error::other(format!(
526                "failed to get pending connection: {}",
527                e
528            )))),
529        }
530    }
531}
532
533/// Simulated TCP listener
534pub struct SimTcpListener {
535    sim: WeakSimWorld,
536    #[allow(dead_code)] // Will be used in future phases
537    listener_id: ListenerId,
538    local_addr: String,
539}
540
541impl SimTcpListener {
542    /// Create a new simulated TCP listener
543    pub(crate) fn new(sim: WeakSimWorld, listener_id: ListenerId, local_addr: String) -> Self {
544        Self {
545            sim,
546            listener_id,
547            local_addr,
548        }
549    }
550}
551
552#[async_trait(?Send)]
553impl TcpListenerTrait for SimTcpListener {
554    type TcpStream = SimTcpStream;
555
556    #[instrument(skip(self))]
557    async fn accept(&self) -> io::Result<(Self::TcpStream, String)> {
558        AcceptFuture {
559            sim: self.sim.clone(),
560            local_addr: self.local_addr.clone(),
561            listener_id: self.listener_id,
562        }
563        .await
564    }
565
566    fn local_addr(&self) -> io::Result<String> {
567        Ok(self.local_addr.clone())
568    }
569}