mockforge_chaos/
middleware.rs1use crate::{
4 fault::FaultInjector,
5 latency::LatencyInjector,
6 rate_limit::RateLimiter,
7 resilience::{Bulkhead, CircuitBreaker},
8 traffic_shaping::TrafficShaper,
9 ChaosConfig,
10};
11use axum::{
12 body::Body,
13 extract::{ConnectInfo, Request},
14 http::StatusCode,
15 middleware::Next,
16 response::{IntoResponse, Response},
17};
18use http_body_util::BodyExt;
19use std::{net::SocketAddr, sync::Arc};
20use tracing::{debug, warn};
21
22#[derive(Clone)]
24pub struct ChaosMiddleware {
25 latency_injector: Arc<LatencyInjector>,
26 fault_injector: Arc<FaultInjector>,
27 rate_limiter: Arc<RateLimiter>,
28 traffic_shaper: Arc<TrafficShaper>,
29 circuit_breaker: Arc<CircuitBreaker>,
30 bulkhead: Arc<Bulkhead>,
31}
32
33impl ChaosMiddleware {
34 pub fn new(config: ChaosConfig) -> Self {
36 let latency_injector =
37 Arc::new(LatencyInjector::new(config.latency.clone().unwrap_or_default()));
38
39 let fault_injector =
40 Arc::new(FaultInjector::new(config.fault_injection.clone().unwrap_or_default()));
41
42 let rate_limiter =
43 Arc::new(RateLimiter::new(config.rate_limit.clone().unwrap_or_default()));
44
45 let traffic_shaper =
46 Arc::new(TrafficShaper::new(config.traffic_shaping.clone().unwrap_or_default()));
47
48 let circuit_breaker =
49 Arc::new(CircuitBreaker::new(config.circuit_breaker.clone().unwrap_or_default()));
50
51 let bulkhead = Arc::new(Bulkhead::new(config.bulkhead.clone().unwrap_or_default()));
52
53 Self {
54 latency_injector,
55 fault_injector,
56 rate_limiter,
57 traffic_shaper,
58 circuit_breaker,
59 bulkhead,
60 }
61 }
62
63 pub fn latency_injector(&self) -> &Arc<LatencyInjector> {
65 &self.latency_injector
66 }
67
68 pub fn fault_injector(&self) -> &Arc<FaultInjector> {
70 &self.fault_injector
71 }
72
73 pub fn rate_limiter(&self) -> &Arc<RateLimiter> {
75 &self.rate_limiter
76 }
77
78 pub fn traffic_shaper(&self) -> &Arc<TrafficShaper> {
80 &self.traffic_shaper
81 }
82
83 pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
85 &self.circuit_breaker
86 }
87
88 pub fn bulkhead(&self) -> &Arc<Bulkhead> {
90 &self.bulkhead
91 }
92}
93
94pub async fn chaos_middleware(
96 chaos: Arc<ChaosMiddleware>,
97 ConnectInfo(addr): ConnectInfo<SocketAddr>,
98 req: Request,
99 next: Next,
100) -> Response {
101 let path = req.uri().path().to_string();
102 let ip = addr.ip().to_string();
103
104 debug!("Chaos middleware processing: {} {}", req.method(), path);
105
106 if !chaos.circuit_breaker.allow_request().await {
108 warn!("Circuit breaker open, rejecting request: {}", path);
109 return (
110 StatusCode::SERVICE_UNAVAILABLE,
111 "Service temporarily unavailable (circuit breaker open)",
112 )
113 .into_response();
114 }
115
116 let _bulkhead_guard = match chaos.bulkhead.try_acquire().await {
118 Ok(guard) => guard,
119 Err(e) => {
120 warn!("Bulkhead rejected request: {} - {:?}", path, e);
121 return (StatusCode::SERVICE_UNAVAILABLE, format!("Service overloaded: {}", e))
122 .into_response();
123 }
124 };
125
126 if let Err(_e) = chaos.rate_limiter.check(Some(&ip), Some(&path)) {
128 warn!("Rate limit exceeded: {} - {}", ip, path);
129 return (StatusCode::TOO_MANY_REQUESTS, "Rate limit exceeded").into_response();
130 }
131
132 if !chaos.traffic_shaper.check_connection_limit() {
134 warn!("Connection limit exceeded");
135 return (StatusCode::SERVICE_UNAVAILABLE, "Connection limit exceeded").into_response();
136 }
137
138 let _connection_guard =
140 crate::traffic_shaping::ConnectionGuard::new(chaos.traffic_shaper.as_ref().clone());
141
142 if chaos.traffic_shaper.should_drop_packet() {
144 warn!("Simulating packet loss for: {}", path);
145 return (StatusCode::REQUEST_TIMEOUT, "Connection dropped").into_response();
146 }
147
148 chaos.latency_injector.inject().await;
150
151 if let Some(status_code) = chaos.fault_injector.get_http_error_status() {
153 warn!("Injecting HTTP error: {}", status_code);
154 return (
155 StatusCode::from_u16(status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
156 format!("Injected error: {}", status_code),
157 )
158 .into_response();
159 }
160
161 let (parts, body) = req.into_parts();
163 let body_bytes = match body.collect().await {
164 Ok(collected) => collected.to_bytes(),
165 Err(e) => {
166 warn!("Failed to read request body: {}", e);
167 return (StatusCode::BAD_REQUEST, "Failed to read request body").into_response();
168 }
169 };
170
171 let request_size = body_bytes.len();
172
173 chaos.traffic_shaper.throttle_bandwidth(request_size).await;
175
176 let req = Request::from_parts(parts, Body::from(body_bytes));
178
179 let response = next.run(req).await;
181
182 let status = response.status();
184 if status.is_server_error() || status == StatusCode::SERVICE_UNAVAILABLE {
185 chaos.circuit_breaker.record_failure().await;
186 } else if status.is_success() {
187 chaos.circuit_breaker.record_success().await;
188 }
189
190 let (parts, body) = response.into_parts();
192 let response_body_bytes = match body.collect().await {
193 Ok(collected) => collected.to_bytes(),
194 Err(e) => {
195 warn!("Failed to read response body: {}", e);
196 return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to read response body")
197 .into_response();
198 }
199 };
200
201 let response_size = response_body_bytes.len();
202
203 let final_body = if chaos.fault_injector.should_truncate_response() {
205 warn!("Injecting partial response");
206 let truncate_at = response_size / 2;
207 Body::from(response_body_bytes.slice(0..truncate_at))
208 } else {
209 Body::from(response_body_bytes)
210 };
211
212 chaos.traffic_shaper.throttle_bandwidth(response_size).await;
214
215 Response::from_parts(parts, final_body)
216}
217
218#[cfg(test)]
219mod tests {
220 use super::*;
221 use crate::config::{LatencyConfig, RateLimitConfig};
222
223 #[tokio::test]
224 async fn test_middleware_creation() {
225 let config = ChaosConfig {
226 enabled: true,
227 latency: Some(LatencyConfig {
228 enabled: true,
229 fixed_delay_ms: Some(10),
230 ..Default::default()
231 }),
232 ..Default::default()
233 };
234
235 let middleware = ChaosMiddleware::new(config);
236 assert!(middleware.latency_injector.is_enabled());
237 }
238
239 #[tokio::test]
240 async fn test_rate_limiting() {
241 let config = ChaosConfig {
242 enabled: true,
243 rate_limit: Some(RateLimitConfig {
244 enabled: true,
245 requests_per_second: 1,
246 burst_size: 2, ..Default::default()
248 }),
249 ..Default::default()
250 };
251
252 let middleware = Arc::new(ChaosMiddleware::new(config));
253
254 assert!(middleware.rate_limiter.check(Some("127.0.0.1"), Some("/test")).is_ok());
256 assert!(middleware.rate_limiter.check(Some("127.0.0.1"), Some("/test")).is_ok());
257
258 assert!(middleware.rate_limiter.check(Some("127.0.0.1"), Some("/test")).is_err());
260 }
261}