mockforge_chaos/protocols/
websocket.rs1use crate::{
4 config::ChaosConfig, fault::FaultInjector, latency::LatencyInjector, rate_limit::RateLimiter,
5 traffic_shaping::TrafficShaper, ChaosError, Result,
6};
7use std::sync::Arc;
8use tracing::{debug, warn};
9
10#[derive(Debug, Clone)]
12pub enum WebSocketFault {
13 ConnectionDrop,
15 CloseFrame(u16), MessageCorruption,
19 MessageDelay,
21}
22
23#[derive(Clone)]
25pub struct WebSocketChaos {
26 latency_injector: Arc<LatencyInjector>,
27 fault_injector: Arc<FaultInjector>,
28 rate_limiter: Arc<RateLimiter>,
29 traffic_shaper: Arc<TrafficShaper>,
30 config: Arc<ChaosConfig>,
31}
32
33impl WebSocketChaos {
34 pub fn new(config: ChaosConfig) -> Self {
36 let latency_injector =
37 Arc::new(LatencyInjector::new(config.latency.clone().unwrap_or_default()));
38
39 let fault_injector =
40 Arc::new(FaultInjector::new(config.fault_injection.clone().unwrap_or_default()));
41
42 let rate_limiter =
43 Arc::new(RateLimiter::new(config.rate_limit.clone().unwrap_or_default()));
44
45 let traffic_shaper =
46 Arc::new(TrafficShaper::new(config.traffic_shaping.clone().unwrap_or_default()));
47
48 Self {
49 latency_injector,
50 fault_injector,
51 rate_limiter,
52 traffic_shaper,
53 config: Arc::new(config),
54 }
55 }
56
57 pub async fn apply_connection(&self, path: &str, client_ip: Option<&str>) -> Result<()> {
59 if !self.config.enabled {
60 return Ok(());
61 }
62
63 debug!("Applying WebSocket chaos for connection: {}", path);
64
65 if let Err(e) = self.rate_limiter.check(client_ip, Some(path)) {
67 warn!("WebSocket rate limit exceeded: {}", path);
68 return Err(e);
69 }
70
71 if !self.traffic_shaper.check_connection_limit() {
73 warn!("WebSocket connection limit exceeded");
74 return Err(ChaosError::ConnectionThrottled);
75 }
76
77 self.latency_injector.inject().await;
79
80 self.fault_injector.inject()?;
82
83 Ok(())
84 }
85
86 pub async fn apply_message(&self, message_size: usize, direction: &str) -> Result<()> {
88 if !self.config.enabled {
89 return Ok(());
90 }
91
92 debug!("Applying WebSocket chaos for {} message: {} bytes", direction, message_size);
93
94 self.latency_injector.inject().await;
96
97 self.traffic_shaper.throttle_bandwidth(message_size).await;
99
100 if self.traffic_shaper.should_drop_packet() {
102 warn!("Simulating WebSocket message drop");
103 return Err(ChaosError::InjectedFault("Message dropped".to_string()));
104 }
105
106 self.fault_injector.inject()?;
108
109 Ok(())
110 }
111
112 pub fn should_drop_connection(&self) -> bool {
114 self.traffic_shaper.should_drop_packet()
115 }
116
117 pub fn should_corrupt_message(&self) -> bool {
119 self.fault_injector.should_truncate_response()
120 }
121
122 pub fn get_close_code(&self) -> Option<u16> {
124 self.fault_injector.get_http_error_status().map(|http_code| match http_code {
125 400 => 1002, 408 => 1001, 429 => 1008, 500 => 1011, 503 => 1001, _ => 1011, })
132 }
133
134 pub fn traffic_shaper(&self) -> &Arc<TrafficShaper> {
136 &self.traffic_shaper
137 }
138}
139
140pub mod close_code {
142 pub const NORMAL: u16 = 1000;
143 pub const GOING_AWAY: u16 = 1001;
144 pub const PROTOCOL_ERROR: u16 = 1002;
145 pub const UNSUPPORTED_DATA: u16 = 1003;
146 pub const NO_STATUS_RECEIVED: u16 = 1005;
147 pub const ABNORMAL_CLOSURE: u16 = 1006;
148 pub const INVALID_FRAME_PAYLOAD: u16 = 1007;
149 pub const POLICY_VIOLATION: u16 = 1008;
150 pub const MESSAGE_TOO_BIG: u16 = 1009;
151 pub const MANDATORY_EXTENSION: u16 = 1010;
152 pub const INTERNAL_ERROR: u16 = 1011;
153 pub const SERVICE_RESTART: u16 = 1012;
154 pub const TRY_AGAIN_LATER: u16 = 1013;
155 pub const BAD_GATEWAY: u16 = 1014;
156 pub const TLS_HANDSHAKE: u16 = 1015;
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use crate::config::{FaultInjectionConfig, LatencyConfig};
163
164 #[tokio::test]
165 async fn test_websocket_chaos_creation() {
166 let config = ChaosConfig {
167 enabled: true,
168 latency: Some(LatencyConfig {
169 enabled: true,
170 fixed_delay_ms: Some(10),
171 random_delay_range_ms: None,
172 jitter_percent: 0.0,
173 probability: 1.0,
174 }),
175 ..Default::default()
176 };
177
178 let chaos = WebSocketChaos::new(config);
179 assert!(chaos.config.enabled);
180 }
181
182 #[tokio::test]
183 async fn test_websocket_close_code_mapping() {
184 let config = ChaosConfig {
185 enabled: true,
186 fault_injection: Some(FaultInjectionConfig {
187 enabled: true,
188 http_errors: vec![500],
189 http_error_probability: 1.0,
190 ..Default::default()
191 }),
192 ..Default::default()
193 };
194
195 let chaos = WebSocketChaos::new(config);
196 let close_code = chaos.get_close_code();
197
198 if let Some(code) = close_code {
200 assert_eq!(code, 1011);
201 }
202 }
203
204 #[tokio::test]
205 async fn test_apply_message_latency() {
206 let config = ChaosConfig {
207 enabled: true,
208 latency: Some(LatencyConfig {
209 enabled: true,
210 fixed_delay_ms: Some(10),
211 random_delay_range_ms: None,
212 jitter_percent: 0.0,
213 probability: 1.0,
214 }),
215 ..Default::default()
216 };
217
218 let chaos = WebSocketChaos::new(config);
219 let start = std::time::Instant::now();
220
221 chaos.apply_message(1024, "inbound").await.unwrap();
222
223 let elapsed = start.elapsed();
224 assert!(elapsed >= std::time::Duration::from_millis(10));
225 }
226}