moonpool_sim/network/sim/
provider.rs1use super::stream::{SimTcpListener, SimTcpStream};
2use crate::NetworkProvider;
3use crate::buggify;
4use crate::network::ConnectFailureMode;
5use crate::sim::rng::sim_random;
6use crate::{Event, WeakSimWorld};
7use async_trait::async_trait;
8use std::io;
9use tracing::instrument;
10
11#[derive(Debug, Clone)]
13pub struct SimNetworkProvider {
14 sim: WeakSimWorld,
15}
16
17impl SimNetworkProvider {
18 pub fn new(sim: WeakSimWorld) -> Self {
20 Self { sim }
21 }
22
23 pub fn sleep(
28 &self,
29 duration: std::time::Duration,
30 ) -> crate::SimulationResult<crate::SleepFuture> {
31 self.sim.sleep(duration)
32 }
33}
34
35#[async_trait(?Send)]
36impl NetworkProvider for SimNetworkProvider {
37 type TcpStream = SimTcpStream;
38 type TcpListener = SimTcpListener;
39
40 #[instrument(skip(self))]
41 async fn bind(&self, addr: &str) -> io::Result<Self::TcpListener> {
42 let sim = self
43 .sim
44 .upgrade()
45 .map_err(|_| io::Error::other("simulation shutdown"))?;
46
47 let delay =
49 sim.with_network_config(|config| crate::network::sample_duration(&config.bind_latency));
50
51 let listener_id = sim
53 .create_listener(addr.to_string())
54 .map_err(|e| io::Error::other(format!("Failed to create listener: {}", e)))?;
55
56 sim.schedule_event(
58 Event::Connection {
59 id: listener_id.0,
60 state: crate::ConnectionStateChange::BindComplete,
61 },
62 delay,
63 );
64
65 let listener = SimTcpListener::new(self.sim.clone(), listener_id, addr.to_string());
66 Ok(listener)
67 }
68
69 #[instrument(skip(self))]
77 async fn connect(&self, addr: &str) -> io::Result<Self::TcpStream> {
78 let sim = self
79 .sim
80 .upgrade()
81 .map_err(|_| io::Error::other("simulation shutdown"))?;
82
83 let (failure_mode, failure_probability) = sim.with_network_config(|config| {
86 (
87 config.chaos.connect_failure_mode,
88 config.chaos.connect_failure_probability,
89 )
90 });
91
92 match failure_mode {
93 ConnectFailureMode::Disabled => {} ConnectFailureMode::AlwaysFail => {
95 if buggify!() {
97 tracing::debug!(addr = %addr, "Connection establishment failed (AlwaysFail mode)");
98 return Err(io::Error::new(
99 io::ErrorKind::ConnectionRefused,
100 "Connection establishment failed (AlwaysFail mode)",
101 ));
102 }
103 }
104 ConnectFailureMode::Probabilistic => {
105 if buggify!() {
107 if sim_random::<f64>() > failure_probability {
108 tracing::debug!(addr = %addr, "Connection establishment failed (Probabilistic mode - error)");
110 return Err(io::Error::new(
111 io::ErrorKind::ConnectionRefused,
112 "Connection establishment failed (Probabilistic mode)",
113 ));
114 } else {
115 tracing::debug!(addr = %addr, "Connection hanging forever (Probabilistic mode - hang)");
118 std::future::pending::<()>().await;
119 unreachable!("pending() never resolves");
120 }
121 }
122 }
123 }
124
125 let delay = sim
127 .with_network_config(|config| crate::network::sample_duration(&config.connect_latency));
128
129 let (client_id, server_id) = sim
131 .create_connection_pair("client-addr".to_string(), addr.to_string())
132 .map_err(|e| io::Error::other(format!("Failed to create connection pair: {}", e)))?;
133
134 sim.store_pending_connection(addr, server_id)
136 .map_err(|e| io::Error::other(format!("Failed to store pending connection: {}", e)))?;
137
138 sim.schedule_event(
140 Event::Connection {
141 id: client_id.0,
142 state: crate::ConnectionStateChange::ConnectionReady,
143 },
144 delay,
145 );
146
147 let stream = SimTcpStream::new(self.sim.clone(), client_id);
148 Ok(stream)
149 }
150}