moonpool_sim/network/sim/stream.rs
1use super::types::{ConnectionId, ListenerId};
2use crate::TcpListenerTrait;
3use crate::sim::state::CloseReason;
4use crate::{Event, WeakSimWorld};
5use async_trait::async_trait;
6use std::{
7 future::Future,
8 io,
9 pin::Pin,
10 task::{Context, Poll},
11};
12use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
13use tracing::instrument;
14
15/// Create an io::Error for simulation shutdown.
16///
17/// Used when the simulation world has been dropped but stream operations are still attempted.
18fn sim_shutdown_error() -> io::Error {
19 io::Error::new(io::ErrorKind::BrokenPipe, "simulation shutdown")
20}
21
22/// Create an io::Error for random connection failure (chaos injection).
23fn random_connection_failure_error() -> io::Error {
24 io::Error::new(
25 io::ErrorKind::ConnectionReset,
26 "Random connection failure (explicit)",
27 )
28}
29
30/// Create an io::Error for half-open connection timeout.
31fn half_open_timeout_error() -> io::Error {
32 io::Error::new(
33 io::ErrorKind::ConnectionReset,
34 "Connection reset (half-open timeout)",
35 )
36}
37
38/// Create an io::Error for aborted connection (RST).
39fn connection_aborted_error() -> io::Error {
40 io::Error::new(
41 io::ErrorKind::ConnectionReset,
42 "Connection was aborted (RST)",
43 )
44}
45
46/// Simulated TCP stream that implements async read/write operations.
47///
48/// `SimTcpStream` provides a realistic simulation of TCP socket behavior by implementing
49/// the `AsyncRead` and `AsyncWrite` traits. It interfaces with the simulation event system
50/// to provide ordered, reliable data delivery with configurable network delays.
51///
52/// ## Architecture Overview
53///
54/// Each `SimTcpStream` represents one endpoint of a TCP connection:
55///
56/// ```text
57/// Application Layer SimTcpStream Layer Simulation Layer
58/// ───────────────── ────────────────── ─────────────────
59///
60/// stream.write_all(data) ──► poll_write(data) ────────► buffer_send(data)
61/// └─► ProcessSendBuffer event
62/// └─► DataDelivery event
63/// └─► paired connection
64///
65/// stream.read(buf) ◄────── poll_read(buf) ◄──────────── receive_buffer
66/// │ └─► waker registration
67/// └─► Poll::Pending/Ready
68/// ```
69///
70/// ## TCP Semantics Implemented
71///
72/// This implementation provides the core TCP guarantees required for realistic simulation:
73///
74/// ### 1. Reliable Delivery
75/// - All written data will eventually be delivered to the paired connection
76/// - No data loss (unless explicitly simulated via fault injection)
77/// - Delivery confirmation through the event system
78///
79/// ### 2. Ordered Delivery (FIFO)
80/// - Messages written first will arrive first at the destination
81/// - Achieved through per-connection send buffering
82/// - Critical for protocols that depend on message ordering
83///
84/// ### 3. Flow Control Simulation
85/// - Read operations block (`Poll::Pending`) when no data is available
86/// - Write operations complete immediately (buffering model)
87/// - Backpressure handled at the application layer
88///
89/// ## Usage Examples
90///
91/// Provides async read/write operations for client and server connections.
92///
93/// ## Performance Characteristics
94///
95/// - **Write Latency**: O(1) - writes are buffered immediately
96/// - **Read Latency**: O(network_delay) - depends on simulation configuration
97/// - **Memory Usage**: O(buffered_data) - proportional to unread data
98/// - **CPU Overhead**: Minimal - leverages efficient event system
99///
100/// ## Connection Lifecycle
101///
102/// 1. **Creation**: Stream created with reference to simulation and connection ID
103/// 2. **Active Phase**: Read/write operations interact with simulation buffers
104/// 3. **Data Transfer**: Asynchronous event processing handles network simulation
105/// 4. **Termination**: Stream dropped when connection ends (automatic cleanup)
106///
107/// ## Thread Safety
108///
109/// `SimTcpStream` is designed for single-threaded simulation environments:
110/// - No `Send` or `Sync` bounds (uses `#[async_trait(?Send)]`)
111/// - Safe for use within single-threaded async runtimes
112/// - Eliminates synchronization overhead for deterministic simulation
113pub struct SimTcpStream {
114 /// Weak reference to the simulation world.
115 ///
116 /// Uses `WeakSimWorld` to avoid circular references while allowing the stream
117 /// to detect if the simulation has been dropped. Operations return errors
118 /// gracefully if the simulation is no longer available.
119 sim: WeakSimWorld,
120
121 /// Unique identifier for this connection within the simulation.
122 ///
123 /// This ID corresponds to a `ConnectionState` entry in the simulation's
124 /// connection table. Used to route read/write operations to the correct
125 /// connection buffers and waker registrations.
126 connection_id: ConnectionId,
127}
128
129impl SimTcpStream {
130 /// Create a new simulated TCP stream
131 pub(crate) fn new(sim: WeakSimWorld, connection_id: ConnectionId) -> Self {
132 Self { sim, connection_id }
133 }
134
135 /// Get the connection ID (for test introspection and chaos injection)
136 pub fn connection_id(&self) -> ConnectionId {
137 self.connection_id
138 }
139}
140
141impl Drop for SimTcpStream {
142 fn drop(&mut self) {
143 // Close the connection in the simulation when the stream is dropped
144 // This matches real TCP behavior where dropping a socket always closes it
145 if let Ok(sim) = self.sim.upgrade() {
146 tracing::debug!(
147 "SimTcpStream dropping, closing connection {}",
148 self.connection_id.0
149 );
150 sim.close_connection(self.connection_id);
151 }
152 }
153}
154
155impl AsyncRead for SimTcpStream {
156 #[instrument(skip(self, cx, buf))]
157 fn poll_read(
158 self: Pin<&mut Self>,
159 cx: &mut Context<'_>,
160 buf: &mut ReadBuf<'_>,
161 ) -> Poll<io::Result<()>> {
162 tracing::trace!(
163 "SimTcpStream::poll_read called on connection_id={}",
164 self.connection_id.0
165 );
166 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
167
168 // Random close chaos injection (FDB rollRandomClose pattern)
169 // Check at start of every read operation - sim2.actor.cpp:408
170 // Returns Some(true) for explicit error, Some(false) for silent (connection marked closed)
171 if let Some(true) = sim.roll_random_close(self.connection_id) {
172 // 30% explicit exception - throw connection_failed immediately
173 return Poll::Ready(Err(random_connection_failure_error()));
174 // 70% silent case: connection already marked as closed, will return EOF below
175 }
176
177 // Check if receive side is closed (asymmetric closure)
178 if sim.is_recv_closed(self.connection_id) {
179 tracing::debug!(
180 "SimTcpStream::poll_read connection_id={} recv side closed, returning EOF",
181 self.connection_id.0
182 );
183 return Poll::Ready(Ok(())); // EOF
184 }
185
186 // Check for half-open connection (peer crashed)
187 if sim.is_half_open(self.connection_id) && sim.should_half_open_error(self.connection_id) {
188 // Error time reached - return ECONNRESET
189 tracing::debug!(
190 "SimTcpStream::poll_read connection_id={} half-open error time reached, returning ECONNRESET",
191 self.connection_id.0
192 );
193 return Poll::Ready(Err(half_open_timeout_error()));
194 }
195 // Half-open but not yet error time - will block (Pending) below waiting for data
196 // that will never come, which is the correct half-open behavior
197
198 // Check for read clogging (symmetric with write clogging)
199 if sim.is_read_clogged(self.connection_id) {
200 // Already clogged, register waker and return Pending
201 sim.register_read_clog_waker(self.connection_id, cx.waker().clone());
202 return Poll::Pending;
203 }
204
205 // Check if this read should be clogged
206 if sim.should_clog_read(self.connection_id) {
207 sim.clog_read(self.connection_id);
208 sim.register_read_clog_waker(self.connection_id, cx.waker().clone());
209 return Poll::Pending;
210 }
211
212 // Try to read from connection's receive buffer first
213 // We should be able to read buffered data even if connection is currently cut
214 let mut temp_buf = vec![0u8; buf.remaining()];
215 let bytes_read = sim
216 .read_from_connection(self.connection_id, &mut temp_buf)
217 .map_err(|e| io::Error::other(format!("read error: {}", e)))?;
218
219 tracing::trace!(
220 "SimTcpStream::poll_read connection_id={} read {} bytes",
221 self.connection_id.0,
222 bytes_read
223 );
224
225 if bytes_read > 0 {
226 let data_preview = String::from_utf8_lossy(&temp_buf[..std::cmp::min(bytes_read, 20)]);
227 tracing::trace!(
228 "SimTcpStream::poll_read connection_id={} returning data: '{}'",
229 self.connection_id.0,
230 data_preview
231 );
232 buf.put_slice(&temp_buf[..bytes_read]);
233 Poll::Ready(Ok(()))
234 } else {
235 // No data available - check if connection is closed or cut
236 if sim.is_connection_closed(self.connection_id) {
237 // Check how the connection was closed
238 match sim.get_close_reason(self.connection_id) {
239 CloseReason::Aborted => {
240 tracing::info!(
241 "SimTcpStream::poll_read connection_id={} was aborted (RST), returning ECONNRESET",
242 self.connection_id.0
243 );
244 return Poll::Ready(Err(connection_aborted_error()));
245 }
246 _ => {
247 tracing::info!(
248 "SimTcpStream::poll_read connection_id={} is closed gracefully (FIN), returning EOF (0 bytes)",
249 self.connection_id.0
250 );
251 // Connection closed gracefully (FIN) - return EOF (0 bytes read)
252 return Poll::Ready(Ok(()));
253 }
254 }
255 }
256
257 if sim.is_connection_cut(self.connection_id) {
258 // Connection is temporarily cut - register waker and wait for restoration
259 tracing::debug!(
260 "SimTcpStream::poll_read connection_id={} is cut, registering cut waker",
261 self.connection_id.0
262 );
263 sim.register_cut_waker(self.connection_id, cx.waker().clone());
264 return Poll::Pending;
265 }
266
267 // Register for notification when data arrives
268 tracing::trace!(
269 "SimTcpStream::poll_read connection_id={} no data, registering waker",
270 self.connection_id.0
271 );
272 sim.register_read_waker(self.connection_id, cx.waker().clone())
273 .map_err(|e| io::Error::other(format!("waker registration error: {}", e)))?;
274
275 // Double-check for data after registering waker to handle race conditions
276 // This prevents deadlocks where DataDelivery arrives between initial check and waker registration
277 let mut temp_buf_recheck = vec![0u8; buf.remaining()];
278 let bytes_read_recheck = sim
279 .read_from_connection(self.connection_id, &mut temp_buf_recheck)
280 .map_err(|e| io::Error::other(format!("recheck read error: {}", e)))?;
281
282 if bytes_read_recheck > 0 {
283 let data_preview = String::from_utf8_lossy(
284 &temp_buf_recheck[..std::cmp::min(bytes_read_recheck, 20)],
285 );
286 tracing::trace!(
287 "SimTcpStream::poll_read connection_id={} found data on recheck: '{}' (race condition avoided)",
288 self.connection_id.0,
289 data_preview
290 );
291 buf.put_slice(&temp_buf_recheck[..bytes_read_recheck]);
292 Poll::Ready(Ok(()))
293 } else {
294 // Final check - if connection is closed or cut and no data available
295 if sim.is_connection_closed(self.connection_id) {
296 match sim.get_close_reason(self.connection_id) {
297 CloseReason::Aborted => {
298 tracing::info!(
299 "SimTcpStream::poll_read connection_id={} was aborted on recheck (RST), returning ECONNRESET",
300 self.connection_id.0
301 );
302 Poll::Ready(Err(connection_aborted_error()))
303 }
304 _ => {
305 tracing::info!(
306 "SimTcpStream::poll_read connection_id={} is closed on recheck (FIN), returning EOF (0 bytes)",
307 self.connection_id.0
308 );
309 // Connection closed gracefully - return EOF (0 bytes read)
310 Poll::Ready(Ok(()))
311 }
312 }
313 } else if sim.is_connection_cut(self.connection_id) {
314 // Connection is temporarily cut - already registered waker above, just wait
315 tracing::debug!(
316 "SimTcpStream::poll_read connection_id={} is cut on recheck, waiting",
317 self.connection_id.0
318 );
319 Poll::Pending
320 } else {
321 Poll::Pending
322 }
323 }
324 }
325 }
326}
327
328impl AsyncWrite for SimTcpStream {
329 #[instrument(skip(self, cx, buf))]
330 fn poll_write(
331 self: Pin<&mut Self>,
332 cx: &mut Context<'_>,
333 buf: &[u8],
334 ) -> Poll<Result<usize, io::Error>> {
335 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
336
337 // Random close chaos injection (FDB rollRandomClose pattern)
338 // Check at start of every write operation - sim2.actor.cpp:423
339 // Returns Some(true) for explicit error, Some(false) for silent (connection marked closed)
340 if let Some(true) = sim.roll_random_close(self.connection_id) {
341 // 30% explicit exception - throw connection_failed immediately
342 return Poll::Ready(Err(random_connection_failure_error()));
343 // 70% silent case: connection already marked as closed, will fail below
344 }
345
346 // Check if send side is closed (asymmetric closure)
347 if sim.is_send_closed(self.connection_id) {
348 return Poll::Ready(Err(io::Error::new(
349 io::ErrorKind::BrokenPipe,
350 "Connection send side closed",
351 )));
352 }
353
354 // Check if connection is closed or cut
355 if sim.is_connection_closed(self.connection_id) {
356 // Check how the connection was closed
357 return match sim.get_close_reason(self.connection_id) {
358 CloseReason::Aborted => Poll::Ready(Err(connection_aborted_error())),
359 _ => Poll::Ready(Err(io::Error::new(
360 io::ErrorKind::BrokenPipe,
361 "Connection was closed (FIN)",
362 ))),
363 };
364 }
365
366 if sim.is_connection_cut(self.connection_id) {
367 // Connection is temporarily cut - register waker and wait for restoration
368 tracing::debug!(
369 "SimTcpStream::poll_write connection_id={} is cut, registering cut waker",
370 self.connection_id.0
371 );
372 sim.register_cut_waker(self.connection_id, cx.waker().clone());
373 tracing::debug!(
374 "SimTcpStream::poll_write connection_id={} registered waker for cut connection",
375 self.connection_id.0
376 );
377 return Poll::Pending;
378 }
379
380 // Check for half-open connection (peer crashed)
381 if sim.is_half_open(self.connection_id) && sim.should_half_open_error(self.connection_id) {
382 // Error time reached - return ECONNRESET
383 tracing::debug!(
384 "SimTcpStream::poll_write connection_id={} half-open error time reached, returning ECONNRESET",
385 self.connection_id.0
386 );
387 return Poll::Ready(Err(half_open_timeout_error()));
388 }
389 // Half-open but not yet error time - writes succeed but data goes nowhere
390 // (paired_connection is already None, so buffer_send will silently succeed)
391
392 // Check for send buffer space (backpressure)
393 let available_buffer = sim.available_send_buffer(self.connection_id);
394 if available_buffer < buf.len() {
395 // Not enough buffer space, register waker and return Pending
396 tracing::debug!(
397 "SimTcpStream::poll_write connection_id={} buffer full (available={}, needed={}), waiting",
398 self.connection_id.0,
399 available_buffer,
400 buf.len()
401 );
402 sim.register_send_buffer_waker(self.connection_id, cx.waker().clone());
403 return Poll::Pending;
404 }
405
406 // Phase 7: Check for write clogging
407 if sim.is_write_clogged(self.connection_id) {
408 // Already clogged, register waker and return Pending
409 sim.register_clog_waker(self.connection_id, cx.waker().clone());
410 return Poll::Pending;
411 }
412
413 // Check if this write should be clogged
414 if sim.should_clog_write(self.connection_id) {
415 sim.clog_write(self.connection_id);
416 sim.register_clog_waker(self.connection_id, cx.waker().clone());
417 return Poll::Pending;
418 }
419
420 // Use buffered send to maintain TCP ordering
421 let data_preview = String::from_utf8_lossy(&buf[..std::cmp::min(buf.len(), 20)]);
422 tracing::trace!(
423 "SimTcpStream::poll_write buffering {} bytes: '{}' for ordered delivery",
424 buf.len(),
425 data_preview
426 );
427
428 // Buffer the data for ordered processing instead of direct event scheduling
429 sim.buffer_send(self.connection_id, buf.to_vec())
430 .map_err(|e| io::Error::other(format!("buffer send error: {}", e)))?;
431
432 Poll::Ready(Ok(buf.len()))
433 }
434
435 fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
436 Poll::Ready(Ok(()))
437 }
438
439 fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
440 let sim = self.sim.upgrade().map_err(|_| sim_shutdown_error())?;
441
442 // Close the connection in the simulation when shutdown is called
443 tracing::debug!(
444 "SimTcpStream::poll_shutdown closing connection {}",
445 self.connection_id.0
446 );
447 sim.close_connection(self.connection_id);
448
449 Poll::Ready(Ok(()))
450 }
451}
452
453/// Future representing an accept operation
454pub struct AcceptFuture {
455 sim: WeakSimWorld,
456 local_addr: String,
457 #[allow(dead_code)] // May be used in future phases for more sophisticated listener tracking
458 listener_id: ListenerId,
459}
460
461impl Future for AcceptFuture {
462 type Output = io::Result<(SimTcpStream, String)>;
463
464 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
465 let sim = match self.sim.upgrade() {
466 Ok(sim) => sim,
467 Err(_) => return Poll::Ready(Err(sim_shutdown_error())),
468 };
469
470 match sim.get_pending_connection(&self.local_addr) {
471 Ok(Some(connection_id)) => {
472 // Get accept delay from network configuration
473 let delay = sim.with_network_config(|config| {
474 crate::network::sample_duration(&config.accept_latency)
475 });
476
477 // Schedule accept completion event to advance simulation time
478 sim.schedule_event(
479 Event::Connection {
480 id: connection_id.0,
481 state: crate::ConnectionStateChange::ConnectionReady,
482 },
483 delay,
484 );
485
486 // FDB Pattern (sim2.actor.cpp:1149-1175):
487 // Return the synthesized ephemeral peer address, not the client's real address.
488 // This simulates real TCP where servers see client ephemeral ports.
489 let peer_addr = sim
490 .get_connection_peer_address(connection_id)
491 .unwrap_or_else(|| "unknown:0".to_string());
492
493 let stream = SimTcpStream::new(self.sim.clone(), connection_id);
494 Poll::Ready(Ok((stream, peer_addr)))
495 }
496 Ok(None) => {
497 // No connection available yet - register waker for when connection becomes available
498 if let Err(e) = sim.register_accept_waker(&self.local_addr, cx.waker().clone()) {
499 Poll::Ready(Err(io::Error::other(format!(
500 "failed to register accept waker: {}",
501 e
502 ))))
503 } else {
504 Poll::Pending
505 }
506 }
507 Err(e) => Poll::Ready(Err(io::Error::other(format!(
508 "failed to get pending connection: {}",
509 e
510 )))),
511 }
512 }
513}
514
515/// Simulated TCP listener
516pub struct SimTcpListener {
517 sim: WeakSimWorld,
518 #[allow(dead_code)] // Will be used in future phases
519 listener_id: ListenerId,
520 local_addr: String,
521}
522
523impl SimTcpListener {
524 /// Create a new simulated TCP listener
525 pub(crate) fn new(sim: WeakSimWorld, listener_id: ListenerId, local_addr: String) -> Self {
526 Self {
527 sim,
528 listener_id,
529 local_addr,
530 }
531 }
532}
533
534#[async_trait(?Send)]
535impl TcpListenerTrait for SimTcpListener {
536 type TcpStream = SimTcpStream;
537
538 #[instrument(skip(self))]
539 async fn accept(&self) -> io::Result<(Self::TcpStream, String)> {
540 AcceptFuture {
541 sim: self.sim.clone(),
542 local_addr: self.local_addr.clone(),
543 listener_id: self.listener_id,
544 }
545 .await
546 }
547
548 fn local_addr(&self) -> io::Result<String> {
549 Ok(self.local_addr.clone())
550 }
551}