moonpool_sim/network/sim/
provider.rs

1use super::stream::{SimTcpListener, SimTcpStream};
2use crate::NetworkProvider;
3use crate::buggify;
4use crate::network::ConnectFailureMode;
5use crate::sim::rng::sim_random;
6use crate::{Event, WeakSimWorld};
7use async_trait::async_trait;
8use std::io;
9use tracing::instrument;
10
11/// Simulated networking implementation
12#[derive(Debug, Clone)]
13pub struct SimNetworkProvider {
14    sim: WeakSimWorld,
15}
16
17impl SimNetworkProvider {
18    /// Create a new simulated network provider
19    pub fn new(sim: WeakSimWorld) -> Self {
20        Self { sim }
21    }
22
23    /// Sleep in simulation time.
24    ///
25    /// This allows workloads to introduce delays for coordination purposes.
26    /// The sleep completes when the simulation processes the corresponding Wake event.
27    pub fn sleep(
28        &self,
29        duration: std::time::Duration,
30    ) -> crate::SimulationResult<crate::SleepFuture> {
31        self.sim.sleep(duration)
32    }
33}
34
35#[async_trait(?Send)]
36impl NetworkProvider for SimNetworkProvider {
37    type TcpStream = SimTcpStream;
38    type TcpListener = SimTcpListener;
39
40    #[instrument(skip(self))]
41    async fn bind(&self, addr: &str) -> io::Result<Self::TcpListener> {
42        let sim = self
43            .sim
44            .upgrade()
45            .map_err(|_| io::Error::other("simulation shutdown"))?;
46
47        // Get bind delay from network configuration and schedule bind completion event
48        let delay =
49            sim.with_network_config(|config| crate::network::sample_duration(&config.bind_latency));
50
51        // Schedule bind completion event to advance simulation time
52        let listener_id = sim
53            .create_listener(addr.to_string())
54            .map_err(|e| io::Error::other(format!("Failed to create listener: {}", e)))?;
55
56        // Schedule an event to simulate the bind delay - this advances simulation time
57        sim.schedule_event(
58            Event::Connection {
59                id: listener_id.0,
60                state: crate::ConnectionStateChange::BindComplete,
61            },
62            delay,
63        );
64
65        let listener = SimTcpListener::new(self.sim.clone(), listener_id, addr.to_string());
66        Ok(listener)
67    }
68
69    /// Connect to a remote address.
70    ///
71    /// When chaos is enabled, connection establishment can fail or hang forever
72    /// based on the connect_failure_mode setting (FDB ref: sim2.actor.cpp:1243-1250):
73    /// - Disabled: Normal operation (no failure injection)
74    /// - AlwaysFail: Always fail with ConnectionRefused when buggified
75    /// - Probabilistic: 50% fail with error, 50% hang forever (tests timeout handling)
76    #[instrument(skip(self))]
77    async fn connect(&self, addr: &str) -> io::Result<Self::TcpStream> {
78        let sim = self
79            .sim
80            .upgrade()
81            .map_err(|_| io::Error::other("simulation shutdown"))?;
82
83        // Check connect failure mode (FDB SIM_CONNECT_ERROR_MODE pattern)
84        // FDB ref: sim2.actor.cpp:1243-1250
85        let (failure_mode, failure_probability) = sim.with_network_config(|config| {
86            (
87                config.chaos.connect_failure_mode,
88                config.chaos.connect_failure_probability,
89            )
90        });
91
92        match failure_mode {
93            ConnectFailureMode::Disabled => {} // Normal operation
94            ConnectFailureMode::AlwaysFail => {
95                // Always fail with connection_failed when buggified
96                if buggify!() {
97                    tracing::debug!(addr = %addr, "Connection establishment failed (AlwaysFail mode)");
98                    return Err(io::Error::new(
99                        io::ErrorKind::ConnectionRefused,
100                        "Connection establishment failed (AlwaysFail mode)",
101                    ));
102                }
103            }
104            ConnectFailureMode::Probabilistic => {
105                // Probabilistic - fail or hang forever
106                if buggify!() {
107                    if sim_random::<f64>() > failure_probability {
108                        // Throw connection_failed error
109                        tracing::debug!(addr = %addr, "Connection establishment failed (Probabilistic mode - error)");
110                        return Err(io::Error::new(
111                            io::ErrorKind::ConnectionRefused,
112                            "Connection establishment failed (Probabilistic mode)",
113                        ));
114                    } else {
115                        // Hang forever - create a future that never completes
116                        // This tests timeout handling in connection retry logic
117                        tracing::debug!(addr = %addr, "Connection hanging forever (Probabilistic mode - hang)");
118                        std::future::pending::<()>().await;
119                        unreachable!("pending() never resolves");
120                    }
121                }
122            }
123        }
124
125        // Get connect delay from network configuration and schedule connection event
126        let delay = sim
127            .with_network_config(|config| crate::network::sample_duration(&config.connect_latency));
128
129        // Create a connection pair for bidirectional communication
130        let (client_id, server_id) = sim
131            .create_connection_pair("client-addr".to_string(), addr.to_string())
132            .map_err(|e| io::Error::other(format!("Failed to create connection pair: {}", e)))?;
133
134        // Store the server side for accept() to pick up later
135        sim.store_pending_connection(addr, server_id)
136            .map_err(|e| io::Error::other(format!("Failed to store pending connection: {}", e)))?;
137
138        // Schedule connection ready event to advance simulation time
139        sim.schedule_event(
140            Event::Connection {
141                id: client_id.0,
142                state: crate::ConnectionStateChange::ConnectionReady,
143            },
144            delay,
145        );
146
147        let stream = SimTcpStream::new(self.sim.clone(), client_id);
148        Ok(stream)
149    }
150}