mockforge_chaos/protocols/
websocket.rs

1//! WebSocket chaos engineering
2
3use crate::{
4    config::ChaosConfig, fault::FaultInjector, latency::LatencyInjector, rate_limit::RateLimiter,
5    traffic_shaping::TrafficShaper, ChaosError, Result,
6};
7use std::sync::Arc;
8use tracing::{debug, warn};
9
10/// WebSocket-specific fault types
11#[derive(Debug, Clone)]
12pub enum WebSocketFault {
13    /// Connection drop
14    ConnectionDrop,
15    /// Close frame with code
16    CloseFrame(u16), // 1000=Normal, 1001=GoingAway, etc.
17    /// Message corruption
18    MessageCorruption,
19    /// Delayed message
20    MessageDelay,
21}
22
23/// WebSocket chaos handler
24#[derive(Clone)]
25pub struct WebSocketChaos {
26    latency_injector: Arc<LatencyInjector>,
27    fault_injector: Arc<FaultInjector>,
28    rate_limiter: Arc<RateLimiter>,
29    traffic_shaper: Arc<TrafficShaper>,
30    config: Arc<ChaosConfig>,
31}
32
33impl WebSocketChaos {
34    /// Create new WebSocket chaos handler
35    pub fn new(config: ChaosConfig) -> Self {
36        let latency_injector =
37            Arc::new(LatencyInjector::new(config.latency.clone().unwrap_or_default()));
38
39        let fault_injector =
40            Arc::new(FaultInjector::new(config.fault_injection.clone().unwrap_or_default()));
41
42        let rate_limiter =
43            Arc::new(RateLimiter::new(config.rate_limit.clone().unwrap_or_default()));
44
45        let traffic_shaper =
46            Arc::new(TrafficShaper::new(config.traffic_shaping.clone().unwrap_or_default()));
47
48        Self {
49            latency_injector,
50            fault_injector,
51            rate_limiter,
52            traffic_shaper,
53            config: Arc::new(config),
54        }
55    }
56
57    /// Apply chaos before WebSocket connection
58    pub async fn apply_connection(&self, path: &str, client_ip: Option<&str>) -> Result<()> {
59        if !self.config.enabled {
60            return Ok(());
61        }
62
63        debug!("Applying WebSocket chaos for connection: {}", path);
64
65        // Check rate limits
66        if let Err(e) = self.rate_limiter.check(client_ip, Some(path)) {
67            warn!("WebSocket rate limit exceeded: {}", path);
68            return Err(e);
69        }
70
71        // Check connection limits
72        if !self.traffic_shaper.check_connection_limit() {
73            warn!("WebSocket connection limit exceeded");
74            return Err(ChaosError::ConnectionThrottled);
75        }
76
77        // Inject connection latency
78        self.latency_injector.inject().await;
79
80        // Check for connection errors
81        self.fault_injector.inject()?;
82
83        Ok(())
84    }
85
86    /// Apply chaos before sending/receiving a message
87    pub async fn apply_message(&self, message_size: usize, direction: &str) -> Result<()> {
88        if !self.config.enabled {
89            return Ok(());
90        }
91
92        debug!("Applying WebSocket chaos for {} message: {} bytes", direction, message_size);
93
94        // Inject message latency
95        self.latency_injector.inject().await;
96
97        // Throttle bandwidth
98        self.traffic_shaper.throttle_bandwidth(message_size).await;
99
100        // Check for packet loss (message drop)
101        if self.traffic_shaper.should_drop_packet() {
102            warn!("Simulating WebSocket message drop");
103            return Err(ChaosError::InjectedFault("Message dropped".to_string()));
104        }
105
106        // Check for fault injection
107        self.fault_injector.inject()?;
108
109        Ok(())
110    }
111
112    /// Check if should drop connection
113    pub fn should_drop_connection(&self) -> bool {
114        self.traffic_shaper.should_drop_packet()
115    }
116
117    /// Check if should corrupt message
118    pub fn should_corrupt_message(&self) -> bool {
119        self.fault_injector.should_truncate_response()
120    }
121
122    /// Get WebSocket close code for fault injection
123    pub fn get_close_code(&self) -> Option<u16> {
124        self.fault_injector.get_http_error_status().map(|http_code| match http_code {
125            400 => 1002, // Protocol error
126            408 => 1001, // Going away (timeout)
127            429 => 1008, // Policy violation (rate limit)
128            500 => 1011, // Server error
129            503 => 1001, // Going away (unavailable)
130            _ => 1011,   // Server error
131        })
132    }
133
134    /// Get traffic shaper for connection management
135    pub fn traffic_shaper(&self) -> &Arc<TrafficShaper> {
136        &self.traffic_shaper
137    }
138}
139
140/// WebSocket close codes
141pub mod close_code {
142    pub const NORMAL: u16 = 1000;
143    pub const GOING_AWAY: u16 = 1001;
144    pub const PROTOCOL_ERROR: u16 = 1002;
145    pub const UNSUPPORTED_DATA: u16 = 1003;
146    pub const NO_STATUS_RECEIVED: u16 = 1005;
147    pub const ABNORMAL_CLOSURE: u16 = 1006;
148    pub const INVALID_FRAME_PAYLOAD: u16 = 1007;
149    pub const POLICY_VIOLATION: u16 = 1008;
150    pub const MESSAGE_TOO_BIG: u16 = 1009;
151    pub const MANDATORY_EXTENSION: u16 = 1010;
152    pub const INTERNAL_ERROR: u16 = 1011;
153    pub const SERVICE_RESTART: u16 = 1012;
154    pub const TRY_AGAIN_LATER: u16 = 1013;
155    pub const BAD_GATEWAY: u16 = 1014;
156    pub const TLS_HANDSHAKE: u16 = 1015;
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use crate::config::{FaultInjectionConfig, LatencyConfig};
163
164    #[tokio::test]
165    async fn test_websocket_chaos_creation() {
166        let config = ChaosConfig {
167            enabled: true,
168            latency: Some(LatencyConfig {
169                enabled: true,
170                fixed_delay_ms: Some(10),
171                random_delay_range_ms: None,
172                jitter_percent: 0.0,
173                probability: 1.0,
174            }),
175            ..Default::default()
176        };
177
178        let chaos = WebSocketChaos::new(config);
179        assert!(chaos.config.enabled);
180    }
181
182    #[tokio::test]
183    async fn test_websocket_close_code_mapping() {
184        let config = ChaosConfig {
185            enabled: true,
186            fault_injection: Some(FaultInjectionConfig {
187                enabled: true,
188                http_errors: vec![500],
189                http_error_probability: 1.0,
190                ..Default::default()
191            }),
192            ..Default::default()
193        };
194
195        let chaos = WebSocketChaos::new(config);
196        let close_code = chaos.get_close_code();
197
198        // Should map 500 to WebSocket INTERNAL_ERROR (1011)
199        if let Some(code) = close_code {
200            assert_eq!(code, 1011);
201        }
202    }
203
204    #[tokio::test]
205    async fn test_apply_message_latency() {
206        let config = ChaosConfig {
207            enabled: true,
208            latency: Some(LatencyConfig {
209                enabled: true,
210                fixed_delay_ms: Some(10),
211                random_delay_range_ms: None,
212                jitter_percent: 0.0,
213                probability: 1.0,
214            }),
215            ..Default::default()
216        };
217
218        let chaos = WebSocketChaos::new(config);
219        let start = std::time::Instant::now();
220
221        chaos.apply_message(1024, "inbound").await.unwrap();
222
223        let elapsed = start.elapsed();
224        assert!(elapsed >= std::time::Duration::from_millis(10));
225    }
226}