mockforge_chaos/
middleware.rs

1//! Chaos engineering middleware for HTTP
2
3use crate::{
4    fault::FaultInjector,
5    latency::LatencyInjector,
6    rate_limit::RateLimiter,
7    resilience::{Bulkhead, CircuitBreaker},
8    traffic_shaping::TrafficShaper,
9    ChaosConfig,
10};
11use axum::{
12    body::Body,
13    extract::{ConnectInfo, Request},
14    http::StatusCode,
15    middleware::Next,
16    response::{IntoResponse, Response},
17};
18use http_body_util::BodyExt;
19use std::{net::SocketAddr, sync::Arc};
20use tracing::{debug, warn};
21
22/// Chaos middleware state
23#[derive(Clone)]
24pub struct ChaosMiddleware {
25    latency_injector: Arc<LatencyInjector>,
26    fault_injector: Arc<FaultInjector>,
27    rate_limiter: Arc<RateLimiter>,
28    traffic_shaper: Arc<TrafficShaper>,
29    circuit_breaker: Arc<CircuitBreaker>,
30    bulkhead: Arc<Bulkhead>,
31}
32
33impl ChaosMiddleware {
34    /// Create new chaos middleware from config
35    pub fn new(config: ChaosConfig) -> Self {
36        let latency_injector =
37            Arc::new(LatencyInjector::new(config.latency.clone().unwrap_or_default()));
38
39        let fault_injector =
40            Arc::new(FaultInjector::new(config.fault_injection.clone().unwrap_or_default()));
41
42        let rate_limiter =
43            Arc::new(RateLimiter::new(config.rate_limit.clone().unwrap_or_default()));
44
45        let traffic_shaper =
46            Arc::new(TrafficShaper::new(config.traffic_shaping.clone().unwrap_or_default()));
47
48        let circuit_breaker =
49            Arc::new(CircuitBreaker::new(config.circuit_breaker.clone().unwrap_or_default()));
50
51        let bulkhead = Arc::new(Bulkhead::new(config.bulkhead.clone().unwrap_or_default()));
52
53        Self {
54            latency_injector,
55            fault_injector,
56            rate_limiter,
57            traffic_shaper,
58            circuit_breaker,
59            bulkhead,
60        }
61    }
62
63    /// Get latency injector
64    pub fn latency_injector(&self) -> &Arc<LatencyInjector> {
65        &self.latency_injector
66    }
67
68    /// Get fault injector
69    pub fn fault_injector(&self) -> &Arc<FaultInjector> {
70        &self.fault_injector
71    }
72
73    /// Get rate limiter
74    pub fn rate_limiter(&self) -> &Arc<RateLimiter> {
75        &self.rate_limiter
76    }
77
78    /// Get traffic shaper
79    pub fn traffic_shaper(&self) -> &Arc<TrafficShaper> {
80        &self.traffic_shaper
81    }
82
83    /// Get circuit breaker
84    pub fn circuit_breaker(&self) -> &Arc<CircuitBreaker> {
85        &self.circuit_breaker
86    }
87
88    /// Get bulkhead
89    pub fn bulkhead(&self) -> &Arc<Bulkhead> {
90        &self.bulkhead
91    }
92}
93
94/// Chaos middleware handler
95pub async fn chaos_middleware(
96    chaos: Arc<ChaosMiddleware>,
97    ConnectInfo(addr): ConnectInfo<SocketAddr>,
98    req: Request,
99    next: Next,
100) -> Response {
101    let path = req.uri().path().to_string();
102    let ip = addr.ip().to_string();
103
104    debug!("Chaos middleware processing: {} {}", req.method(), path);
105
106    // Check circuit breaker
107    if !chaos.circuit_breaker.allow_request().await {
108        warn!("Circuit breaker open, rejecting request: {}", path);
109        return (
110            StatusCode::SERVICE_UNAVAILABLE,
111            "Service temporarily unavailable (circuit breaker open)",
112        )
113            .into_response();
114    }
115
116    // Try to acquire bulkhead slot
117    let _bulkhead_guard = match chaos.bulkhead.try_acquire().await {
118        Ok(guard) => guard,
119        Err(e) => {
120            warn!("Bulkhead rejected request: {} - {:?}", path, e);
121            return (StatusCode::SERVICE_UNAVAILABLE, format!("Service overloaded: {}", e))
122                .into_response();
123        }
124    };
125
126    // Check rate limits
127    if let Err(_e) = chaos.rate_limiter.check(Some(&ip), Some(&path)) {
128        warn!("Rate limit exceeded: {} - {}", ip, path);
129        return (StatusCode::TOO_MANY_REQUESTS, "Rate limit exceeded").into_response();
130    }
131
132    // Check connection limits
133    if !chaos.traffic_shaper.check_connection_limit() {
134        warn!("Connection limit exceeded");
135        return (StatusCode::SERVICE_UNAVAILABLE, "Connection limit exceeded").into_response();
136    }
137
138    // Always release connection on scope exit
139    let _connection_guard =
140        crate::traffic_shaping::ConnectionGuard::new(chaos.traffic_shaper.as_ref().clone());
141
142    // Check for packet loss (simulate dropped connection)
143    if chaos.traffic_shaper.should_drop_packet() {
144        warn!("Simulating packet loss for: {}", path);
145        return (StatusCode::REQUEST_TIMEOUT, "Connection dropped").into_response();
146    }
147
148    // Inject latency
149    chaos.latency_injector.inject().await;
150
151    // Check for fault injection
152    if let Some(status_code) = chaos.fault_injector.get_http_error_status() {
153        warn!("Injecting HTTP error: {}", status_code);
154        return (
155            StatusCode::from_u16(status_code).unwrap_or(StatusCode::INTERNAL_SERVER_ERROR),
156            format!("Injected error: {}", status_code),
157        )
158            .into_response();
159    }
160
161    // Extract body size for bandwidth throttling
162    let (parts, body) = req.into_parts();
163    let body_bytes = match body.collect().await {
164        Ok(collected) => collected.to_bytes(),
165        Err(e) => {
166            warn!("Failed to read request body: {}", e);
167            return (StatusCode::BAD_REQUEST, "Failed to read request body").into_response();
168        }
169    };
170
171    let request_size = body_bytes.len();
172
173    // Throttle request bandwidth
174    chaos.traffic_shaper.throttle_bandwidth(request_size).await;
175
176    // Reconstruct request
177    let req = Request::from_parts(parts, Body::from(body_bytes));
178
179    // Pass to next handler
180    let response = next.run(req).await;
181
182    // Record circuit breaker result based on response status
183    let status = response.status();
184    if status.is_server_error() || status == StatusCode::SERVICE_UNAVAILABLE {
185        chaos.circuit_breaker.record_failure().await;
186    } else if status.is_success() {
187        chaos.circuit_breaker.record_success().await;
188    }
189
190    // Extract response body size for bandwidth throttling
191    let (parts, body) = response.into_parts();
192    let response_body_bytes = match body.collect().await {
193        Ok(collected) => collected.to_bytes(),
194        Err(e) => {
195            warn!("Failed to read response body: {}", e);
196            return (StatusCode::INTERNAL_SERVER_ERROR, "Failed to read response body")
197                .into_response();
198        }
199    };
200
201    let response_size = response_body_bytes.len();
202
203    // Check if should truncate response (partial response simulation)
204    let final_body = if chaos.fault_injector.should_truncate_response() {
205        warn!("Injecting partial response");
206        let truncate_at = response_size / 2;
207        Body::from(response_body_bytes.slice(0..truncate_at))
208    } else {
209        Body::from(response_body_bytes)
210    };
211
212    // Throttle response bandwidth
213    chaos.traffic_shaper.throttle_bandwidth(response_size).await;
214
215    Response::from_parts(parts, final_body)
216}
217
218#[cfg(test)]
219mod tests {
220    use super::*;
221    use crate::config::{LatencyConfig, RateLimitConfig};
222
223    #[tokio::test]
224    async fn test_middleware_creation() {
225        let config = ChaosConfig {
226            enabled: true,
227            latency: Some(LatencyConfig {
228                enabled: true,
229                fixed_delay_ms: Some(10),
230                ..Default::default()
231            }),
232            ..Default::default()
233        };
234
235        let middleware = ChaosMiddleware::new(config);
236        assert!(middleware.latency_injector.is_enabled());
237    }
238
239    #[tokio::test]
240    async fn test_rate_limiting() {
241        let config = ChaosConfig {
242            enabled: true,
243            rate_limit: Some(RateLimitConfig {
244                enabled: true,
245                requests_per_second: 1,
246                burst_size: 2, // burst_size is the total capacity, not additional requests
247                ..Default::default()
248            }),
249            ..Default::default()
250        };
251
252        let middleware = Arc::new(ChaosMiddleware::new(config));
253
254        // First two requests should succeed (rate + burst)
255        assert!(middleware.rate_limiter.check(Some("127.0.0.1"), Some("/test")).is_ok());
256        assert!(middleware.rate_limiter.check(Some("127.0.0.1"), Some("/test")).is_ok());
257
258        // Third should fail
259        assert!(middleware.rate_limiter.check(Some("127.0.0.1"), Some("/test")).is_err());
260    }
261}