moonpool_sim/network/sim/stream.rs
1use super::types::{ConnectionId, ListenerId};
2use crate::TcpListenerTrait;
3use crate::sim::state::CloseReason;
4use crate::{Event, WeakSimWorld};
5use async_trait::async_trait;
6use std::{
7 future::Future,
8 io,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
13use tracing::instrument;
14
15/// Create an io::Error for simulation shutdown.
16///
17/// Used when the simulation world has been dropped but stream operations are still attempted.
18fn sim_shutdown_error() -> io::Error {
19 io::Error::new(io::ErrorKind::BrokenPipe, "simulation shutdown")
20}
21
22/// Create an io::Error for random connection failure (chaos injection).
23fn random_connection_failure_error() -> io::Error {
24 io::Error::new(
25 io::ErrorKind::ConnectionReset,
26 "Random connection failure (explicit)",
27 )
28}
29
30/// Create an io::Error for half-open connection timeout.
31fn half_open_timeout_error() -> io::Error {
32 io::Error::new(
33 io::ErrorKind::ConnectionReset,
34 "Connection reset (half-open timeout)",
35 )
36}
37
38/// Create an io::Error for aborted connection (RST).
39fn connection_aborted_error() -> io::Error {
40 io::Error::new(
41 io::ErrorKind::ConnectionReset,
42 "Connection was aborted (RST)",
43 )
44}
45
46/// Simulated TCP stream that implements async read/write operations.
47///
48/// `SimTcpStream` provides a realistic simulation of TCP socket behavior by implementing
49/// the `AsyncRead` and `AsyncWrite` traits. It interfaces with the simulation event system
50/// to provide ordered, reliable data delivery with configurable network delays.
51///
52/// ## Architecture Overview
53///
54/// Each `SimTcpStream` represents one endpoint of a TCP connection:
55///
56/// ```text
57/// Application Layer SimTcpStream Layer Simulation Layer
58/// ───────────────── ────────────────── ─────────────────
59///
60/// stream.write_all(data) ──► poll_write(data) ────────► buffer_send(data)
61/// └─► ProcessSendBuffer event
62/// └─► DataDelivery event
63/// └─► paired connection
64///
65/// stream.read(buf) ◄────── poll_read(buf) ◄──────────── receive_buffer
66/// │ └─► waker registration
67/// └─► Poll::Pending/Ready
68/// ```
69///
70/// ## TCP Semantics Implemented
71///
72/// This implementation provides the core TCP guarantees required for realistic simulation:
73///
74/// ### 1. Reliable Delivery
75/// - All written data will eventually be delivered to the paired connection
76/// - No data loss (unless explicitly simulated via fault injection)
77/// - Delivery confirmation through the event system
78///
79/// ### 2. Ordered Delivery (FIFO)
80/// - Messages written first will arrive first at the destination
81/// - Achieved through per-connection send buffering
82/// - Critical for protocols that depend on message ordering
83///
84/// ### 3. Flow Control Simulation
85/// - Read operations block (`Poll::Pending`) when no data is available
86/// - Write operations complete immediately (buffering model)
87/// - Backpressure handled at the application layer
88///
89/// ## Usage Examples
90///
91/// Provides async read/write operations for client and server connections.
92///
93/// ## Performance Characteristics
94///
95/// - **Write Latency**: O(1) - writes are buffered immediately
96/// - **Read Latency**: O(network_delay) - depends on simulation configuration
97/// - **Memory Usage**: O(buffered_data) - proportional to unread data
98/// - **CPU Overhead**: Minimal - leverages efficient event system
99///
100/// ## Connection Lifecycle
101///
102/// 1. **Creation**: Stream created with reference to simulation and connection ID
103/// 2. **Active Phase**: Read/write operations interact with simulation buffers
104/// 3. **Data Transfer**: Asynchronous event processing handles network simulation
105/// 4. **Termination**: Stream dropped when connection ends (automatic cleanup)
106///
107/// ## Thread Safety
108///
109/// `SimTcpStream` is designed for single-threaded simulation environments:
110/// - No `Send` or `Sync` bounds (uses `#[async_trait(?Send)]`)
111/// - Safe for use within single-threaded async runtimes
112/// - Eliminates synchronization overhead for deterministic simulation
113pub struct SimTcpStream {
114 /// Weak reference to the simulation world.
115 ///
116 /// Uses `WeakSimWorld` to avoid circular references while allowing the stream
117 /// to detect if the simulation has been dropped. Operations return errors
118 /// gracefully if the simulation is no longer available.
119 sim: WeakSimWorld,
120
121 /// Unique identifier for this connection within the simulation.
122 ///
123 /// This ID corresponds to a `ConnectionState` entry in the simulation's
124 /// connection table. Used to route read/write operations to the correct
125 /// connection buffers and waker registrations.
126 connection_id: ConnectionId,
127}
128
129impl SimTcpStream {
130 /// Create a new simulated TCP stream
131 pub(crate) fn new(sim: WeakSimWorld, connection_id: ConnectionId) -> Self {
132 Self { sim, connection_id }
133 }
134
135 /// Get the connection ID (for test introspection and chaos injection)
136 pub fn connection_id(&self) -> ConnectionId {
137 self.connection_id
138 }
139}
140
141impl Drop for SimTcpStream {
142 fn drop(&mut self) {
143 // Close the connection in the simulation when the stream is dropped
144 // This matches real TCP behavior where dropping a socket always closes it
145 if let Ok(sim) = self.sim.upgrade() {
146 tracing::debug!(
147 "SimTcpStream dropping, closing connection {}",
148 self.connection_id.0
149 );
150 sim.close_connection(self.connection_id);
151 }
152 }
153}
154
155impl AsyncRead for SimTcpStream {
156 #[instrument(skip(self, cx, buf))]
157 fn poll_read(
158 self: Pin<&mut Self>,
159 cx: &mut Context<'_>,
160 buf: &mut ReadBuf<'_>,
161 ) -> Poll<io::Result<()>> {
162 tracing::trace!(
163 "SimTcpStream::poll_read called on connection_id={}",
164 self.connection_id.0
165 );
166 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
167
168 // Random close chaos injection (FDB rollRandomClose pattern)
169 // Check at start of every read operation - sim2.actor.cpp:408
170 // Returns Some(true) for explicit error, Some(false) for silent (connection marked closed)
171 if let Some(true) = sim.roll_random_close(self.connection_id) {
172 // 30% explicit exception - throw connection_failed immediately
173 return Poll::Ready(Err(random_connection_failure_error()));
174 // 70% silent case: connection already marked as closed, will return EOF below
175 }
176
177 // Check if receive side is closed (asymmetric closure)
178 if sim.is_recv_closed(self.connection_id) {
179 tracing::debug!(
180 "SimTcpStream::poll_read connection_id={} recv side closed, returning EOF",
181 self.connection_id.0
182 );
183 return Poll::Ready(Ok(())); // EOF
184 }
185
186 // Check for half-open connection (peer crashed)
187 if sim.is_half_open(self.connection_id) && sim.should_half_open_error(self.connection_id) {
188 // Error time reached - return ECONNRESET
189 tracing::debug!(
190 "SimTcpStream::poll_read connection_id={} half-open error time reached, returning ECONNRESET",
191 self.connection_id.0
192 );
193 return Poll::Ready(Err(half_open_timeout_error()));
194 }
195 // Half-open but not yet error time - will block (Pending) below waiting for data
196 // that will never come, which is the correct half-open behavior
197
198 // Check for read clogging (symmetric with write clogging)
199 if sim.is_read_clogged(self.connection_id) {
200 // Already clogged, register waker and return Pending
201 sim.register_read_clog_waker(self.connection_id, cx.waker().clone());
202 return Poll::Pending;
203 }
204
205 // Check if this read should be clogged
206 if sim.should_clog_read(self.connection_id) {
207 sim.clog_read(self.connection_id);
208 sim.register_read_clog_waker(self.connection_id, cx.waker().clone());
209 return Poll::Pending;
210 }
211
212 // Try to read from connection's receive buffer first
213 // We should be able to read buffered data even if connection is currently cut
214 let mut temp_buf = vec![0u8; buf.remaining()];
215 let bytes_read = sim
216 .read_from_connection(self.connection_id, &mut temp_buf)
217 .map_err(|e| io::Error::other(format!("read error: {}", e)))?;
218
219 tracing::trace!(
220 "SimTcpStream::poll_read connection_id={} read {} bytes",
221 self.connection_id.0,
222 bytes_read
223 );
224
225 if bytes_read > 0 {
226 let data_preview = String::from_utf8_lossy(&temp_buf[..std::cmp::min(bytes_read, 20)]);
227 tracing::trace!(
228 "SimTcpStream::poll_read connection_id={} returning data: '{}'",
229 self.connection_id.0,
230 data_preview
231 );
232 buf.put_slice(&temp_buf[..bytes_read]);
233 Poll::Ready(Ok(()))
234 } else {
235 // No data available - check if connection has received FIN, is closed, or cut
236
237 // Check for remote FIN (graceful close, all data delivered via FinDelivery event)
238 if sim.is_remote_fin_received(self.connection_id) {
239 tracing::info!(
240 "SimTcpStream::poll_read connection_id={} remote FIN received, returning EOF (0 bytes)",
241 self.connection_id.0
242 );
243 return Poll::Ready(Ok(()));
244 }
245
246 if sim.is_connection_closed(self.connection_id) {
247 // Local side was closed or connection was aborted
248 match sim.get_close_reason(self.connection_id) {
249 CloseReason::Aborted => {
250 tracing::info!(
251 "SimTcpStream::poll_read connection_id={} was aborted (RST), returning ECONNRESET",
252 self.connection_id.0
253 );
254 return Poll::Ready(Err(connection_aborted_error()));
255 }
256 _ => {
257 tracing::info!(
258 "SimTcpStream::poll_read connection_id={} is closed gracefully (FIN), returning EOF (0 bytes)",
259 self.connection_id.0
260 );
261 return Poll::Ready(Ok(()));
262 }
263 }
264 }
265
266 if sim.is_connection_cut(self.connection_id) {
267 // Connection is temporarily cut - register waker and wait for restoration
268 tracing::debug!(
269 "SimTcpStream::poll_read connection_id={} is cut, registering cut waker",
270 self.connection_id.0
271 );
272 sim.register_cut_waker(self.connection_id, cx.waker().clone());
273 return Poll::Pending;
274 }
275
276 // Register for notification when data arrives
277 tracing::trace!(
278 "SimTcpStream::poll_read connection_id={} no data, registering waker",
279 self.connection_id.0
280 );
281 sim.register_read_waker(self.connection_id, cx.waker().clone())
282 .map_err(|e| io::Error::other(format!("waker registration error: {}", e)))?;
283
284 // Double-check for data after registering waker to handle race conditions
285 // This prevents deadlocks where DataDelivery arrives between initial check and waker registration
286 let mut temp_buf_recheck = vec![0u8; buf.remaining()];
287 let bytes_read_recheck = sim
288 .read_from_connection(self.connection_id, &mut temp_buf_recheck)
289 .map_err(|e| io::Error::other(format!("recheck read error: {}", e)))?;
290
291 if bytes_read_recheck > 0 {
292 let data_preview = String::from_utf8_lossy(
293 &temp_buf_recheck[..std::cmp::min(bytes_read_recheck, 20)],
294 );
295 tracing::trace!(
296 "SimTcpStream::poll_read connection_id={} found data on recheck: '{}' (race condition avoided)",
297 self.connection_id.0,
298 data_preview
299 );
300 buf.put_slice(&temp_buf_recheck[..bytes_read_recheck]);
301 Poll::Ready(Ok(()))
302 } else {
303 // Final check - if connection has received FIN, is closed, or cut and no data available
304
305 // Check for remote FIN (recheck after waker registration)
306 if sim.is_remote_fin_received(self.connection_id) {
307 tracing::info!(
308 "SimTcpStream::poll_read connection_id={} remote FIN received on recheck, returning EOF (0 bytes)",
309 self.connection_id.0
310 );
311 return Poll::Ready(Ok(()));
312 }
313
314 if sim.is_connection_closed(self.connection_id) {
315 match sim.get_close_reason(self.connection_id) {
316 CloseReason::Aborted => {
317 tracing::info!(
318 "SimTcpStream::poll_read connection_id={} was aborted on recheck (RST), returning ECONNRESET",
319 self.connection_id.0
320 );
321 Poll::Ready(Err(connection_aborted_error()))
322 }
323 _ => {
324 tracing::info!(
325 "SimTcpStream::poll_read connection_id={} is closed on recheck (FIN), returning EOF (0 bytes)",
326 self.connection_id.0
327 );
328 Poll::Ready(Ok(()))
329 }
330 }
331 } else if sim.is_connection_cut(self.connection_id) {
332 // Connection is temporarily cut - already registered waker above, just wait
333 tracing::debug!(
334 "SimTcpStream::poll_read connection_id={} is cut on recheck, waiting",
335 self.connection_id.0
336 );
337 Poll::Pending
338 } else {
339 Poll::Pending
340 }
341 }
342 }
343 }
344}
345
346impl AsyncWrite for SimTcpStream {
347 #[instrument(skip(self, cx, buf))]
348 fn poll_write(
349 self: Pin<&mut Self>,
350 cx: &mut Context<'_>,
351 buf: &[u8],
352 ) -> Poll<Result<usize, io::Error>> {
353 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
354
355 // Random close chaos injection (FDB rollRandomClose pattern)
356 // Check at start of every write operation - sim2.actor.cpp:423
357 // Returns Some(true) for explicit error, Some(false) for silent (connection marked closed)
358 if let Some(true) = sim.roll_random_close(self.connection_id) {
359 // 30% explicit exception - throw connection_failed immediately
360 return Poll::Ready(Err(random_connection_failure_error()));
361 // 70% silent case: connection already marked as closed, will fail below
362 }
363
364 // Check if send side is closed (asymmetric closure)
365 if sim.is_send_closed(self.connection_id) {
366 return Poll::Ready(Err(io::Error::new(
367 io::ErrorKind::BrokenPipe,
368 "Connection send side closed",
369 )));
370 }
371
372 // Check if connection is closed or cut
373 if sim.is_connection_closed(self.connection_id) {
374 // Check how the connection was closed
375 return match sim.get_close_reason(self.connection_id) {
376 CloseReason::Aborted => Poll::Ready(Err(connection_aborted_error())),
377 _ => Poll::Ready(Err(io::Error::new(
378 io::ErrorKind::BrokenPipe,
379 "Connection was closed (FIN)",
380 ))),
381 };
382 }
383
384 if sim.is_connection_cut(self.connection_id) {
385 // Connection is temporarily cut - register waker and wait for restoration
386 tracing::debug!(
387 "SimTcpStream::poll_write connection_id={} is cut, registering cut waker",
388 self.connection_id.0
389 );
390 sim.register_cut_waker(self.connection_id, cx.waker().clone());
391 tracing::debug!(
392 "SimTcpStream::poll_write connection_id={} registered waker for cut connection",
393 self.connection_id.0
394 );
395 return Poll::Pending;
396 }
397
398 // Check for half-open connection (peer crashed)
399 if sim.is_half_open(self.connection_id) && sim.should_half_open_error(self.connection_id) {
400 // Error time reached - return ECONNRESET
401 tracing::debug!(
402 "SimTcpStream::poll_write connection_id={} half-open error time reached, returning ECONNRESET",
403 self.connection_id.0
404 );
405 return Poll::Ready(Err(half_open_timeout_error()));
406 }
407 // Half-open but not yet error time - writes succeed but data goes nowhere
408 // (paired_connection is already None, so buffer_send will silently succeed)
409
410 // Check for send buffer space (backpressure)
411 let available_buffer = sim.available_send_buffer(self.connection_id);
412 if available_buffer < buf.len() {
413 // Not enough buffer space, register waker and return Pending
414 tracing::debug!(
415 "SimTcpStream::poll_write connection_id={} buffer full (available={}, needed={}), waiting",
416 self.connection_id.0,
417 available_buffer,
418 buf.len()
419 );
420 sim.register_send_buffer_waker(self.connection_id, cx.waker().clone());
421 return Poll::Pending;
422 }
423
424 // Phase 7: Check for write clogging
425 if sim.is_write_clogged(self.connection_id) {
426 // Already clogged, register waker and return Pending
427 sim.register_clog_waker(self.connection_id, cx.waker().clone());
428 return Poll::Pending;
429 }
430
431 // Check if this write should be clogged
432 if sim.should_clog_write(self.connection_id) {
433 sim.clog_write(self.connection_id);
434 sim.register_clog_waker(self.connection_id, cx.waker().clone());
435 return Poll::Pending;
436 }
437
438 // Use buffered send to maintain TCP ordering
439 let data_preview = String::from_utf8_lossy(&buf[..std::cmp::min(buf.len(), 20)]);
440 tracing::trace!(
441 "SimTcpStream::poll_write buffering {} bytes: '{}' for ordered delivery",
442 buf.len(),
443 data_preview
444 );
445
446 // Buffer the data for ordered processing instead of direct event scheduling
447 sim.buffer_send(self.connection_id, buf.to_vec())
448 .map_err(|e| io::Error::other(format!("buffer send error: {}", e)))?;
449
450 Poll::Ready(Ok(buf.len()))
451 }
452
453 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
454 Poll::Ready(Ok(()))
455 }
456
457 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
458 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
459
460 // Close the connection in the simulation when shutdown is called
461 tracing::debug!(
462 "SimTcpStream::poll_shutdown closing connection {}",
463 self.connection_id.0
464 );
465 sim.close_connection(self.connection_id);
466
467 Poll::Ready(Ok(()))
468 }
469}
470
471/// Future representing an accept operation
472pub struct AcceptFuture {
473 sim: WeakSimWorld,
474 local_addr: String,
475 #[allow(dead_code)] // May be used in future phases for more sophisticated listener tracking
476 listener_id: ListenerId,
477}
478
479impl Future for AcceptFuture {
480 type Output = io::Result<(SimTcpStream, String)>;
481
482 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
483 let sim = match self.sim.upgrade() {
484 Ok(sim) => sim,
485 Err(_) => return Poll::Ready(Err(sim_shutdown_error())),
486 };
487
488 match sim.get_pending_connection(&self.local_addr) {
489 Ok(Some(connection_id)) => {
490 // Get accept delay from network configuration
491 let delay = sim.with_network_config(|config| {
492 crate::network::sample_duration(&config.accept_latency)
493 });
494
495 // Schedule accept completion event to advance simulation time
496 sim.schedule_event(
497 Event::Connection {
498 id: connection_id.0,
499 state: crate::ConnectionStateChange::ConnectionReady,
500 },
501 delay,
502 );
503
504 // FDB Pattern (sim2.actor.cpp:1149-1175):
505 // Return the synthesized ephemeral peer address, not the client's real address.
506 // This simulates real TCP where servers see client ephemeral ports.
507 let peer_addr = sim
508 .get_connection_peer_address(connection_id)
509 .unwrap_or_else(|| "unknown:0".to_string());
510
511 let stream = SimTcpStream::new(self.sim.clone(), connection_id);
512 Poll::Ready(Ok((stream, peer_addr)))
513 }
514 Ok(None) => {
515 // No connection available yet - register waker for when connection becomes available
516 if let Err(e) = sim.register_accept_waker(&self.local_addr, cx.waker().clone()) {
517 Poll::Ready(Err(io::Error::other(format!(
518 "failed to register accept waker: {}",
519 e
520 ))))
521 } else {
522 Poll::Pending
523 }
524 }
525 Err(e) => Poll::Ready(Err(io::Error::other(format!(
526 "failed to get pending connection: {}",
527 e
528 )))),
529 }
530 }
531}
532
533/// Simulated TCP listener
534pub struct SimTcpListener {
535 sim: WeakSimWorld,
536 #[allow(dead_code)] // Will be used in future phases
537 listener_id: ListenerId,
538 local_addr: String,
539}
540
541impl SimTcpListener {
542 /// Create a new simulated TCP listener
543 pub(crate) fn new(sim: WeakSimWorld, listener_id: ListenerId, local_addr: String) -> Self {
544 Self {
545 sim,
546 listener_id,
547 local_addr,
548 }
549 }
550}
551
552#[async_trait(?Send)]
553impl TcpListenerTrait for SimTcpListener {
554 type TcpStream = SimTcpStream;
555
556 #[instrument(skip(self))]
557 async fn accept(&self) -> io::Result<(Self::TcpStream, String)> {
558 AcceptFuture {
559 sim: self.sim.clone(),
560 local_addr: self.local_addr.clone(),
561 listener_id: self.listener_id,
562 }
563 .await
564 }
565
566 fn local_addr(&self) -> io::Result<String> {
567 Ok(self.local_addr.clone())
568 }
569}