mockforge_chaos/protocols/
grpc.rs

1//! gRPC chaos engineering
2
3use crate::{
4    config::ChaosConfig, fault::FaultInjector, latency::LatencyInjector, rate_limit::RateLimiter,
5    traffic_shaping::TrafficShaper, ChaosError, Result,
6};
7use std::sync::Arc;
8use tracing::{debug, warn};
9
10/// gRPC-specific fault types
11#[derive(Debug, Clone)]
12pub enum GrpcFault {
13    /// gRPC status code error
14    StatusCode(i32), // 0=OK, 1=CANCELLED, 2=UNKNOWN, etc.
15    /// Stream interruption
16    StreamInterruption,
17    /// Metadata corruption
18    MetadataCorruption,
19    /// Message corruption
20    MessageCorruption,
21}
22
23/// gRPC chaos handler
24#[derive(Clone)]
25pub struct GrpcChaos {
26    latency_injector: Arc<LatencyInjector>,
27    fault_injector: Arc<FaultInjector>,
28    rate_limiter: Arc<RateLimiter>,
29    traffic_shaper: Arc<TrafficShaper>,
30    config: Arc<ChaosConfig>,
31}
32
33impl GrpcChaos {
34    /// Create new gRPC chaos handler
35    pub fn new(config: ChaosConfig) -> Self {
36        let latency_injector =
37            Arc::new(LatencyInjector::new(config.latency.clone().unwrap_or_default()));
38
39        let fault_injector =
40            Arc::new(FaultInjector::new(config.fault_injection.clone().unwrap_or_default()));
41
42        let rate_limiter =
43            Arc::new(RateLimiter::new(config.rate_limit.clone().unwrap_or_default()));
44
45        let traffic_shaper =
46            Arc::new(TrafficShaper::new(config.traffic_shaping.clone().unwrap_or_default()));
47
48        Self {
49            latency_injector,
50            fault_injector,
51            rate_limiter,
52            traffic_shaper,
53            config: Arc::new(config),
54        }
55    }
56
57    /// Apply chaos before gRPC request processing
58    pub async fn apply_pre_request(
59        &self,
60        service: &str,
61        method: &str,
62        client_ip: Option<&str>,
63    ) -> Result<()> {
64        if !self.config.enabled {
65            return Ok(());
66        }
67
68        let endpoint = format!("{}/{}", service, method);
69        debug!("Applying gRPC chaos for: {}", endpoint);
70
71        // Check rate limits
72        if let Err(e) = self.rate_limiter.check(client_ip, Some(&endpoint)) {
73            warn!("gRPC rate limit exceeded: {}", endpoint);
74            return Err(e);
75        }
76
77        // Check connection limits
78        if !self.traffic_shaper.check_connection_limit() {
79            warn!("gRPC connection limit exceeded");
80            return Err(ChaosError::ConnectionThrottled);
81        }
82
83        // Inject latency
84        self.latency_injector.inject().await;
85
86        // Check for fault injection
87        self.fault_injector.inject()?;
88
89        Ok(())
90    }
91
92    /// Apply chaos after gRPC response
93    pub async fn apply_post_response(&self, message_size: usize) -> Result<()> {
94        if !self.config.enabled {
95            return Ok(());
96        }
97
98        // Throttle bandwidth based on message size
99        self.traffic_shaper.throttle_bandwidth(message_size).await;
100
101        // Check for packet loss (simulated)
102        if self.traffic_shaper.should_drop_packet() {
103            warn!("Simulating gRPC packet loss");
104            return Err(ChaosError::InjectedFault("Packet loss".to_string()));
105        }
106
107        Ok(())
108    }
109
110    /// Get gRPC status code for fault injection
111    pub fn get_grpc_status_code(&self) -> Option<i32> {
112        self.fault_injector.get_http_error_status().map(|http_code| match http_code {
113            400 => 3,  // INVALID_ARGUMENT
114            401 => 16, // UNAUTHENTICATED
115            403 => 7,  // PERMISSION_DENIED
116            404 => 5,  // NOT_FOUND
117            429 => 8,  // RESOURCE_EXHAUSTED
118            500 => 13, // INTERNAL
119            501 => 12, // UNIMPLEMENTED
120            503 => 14, // UNAVAILABLE
121            504 => 4,  // DEADLINE_EXCEEDED
122            _ => 2,    // UNKNOWN
123        })
124    }
125
126    /// Check if should interrupt stream
127    pub fn should_interrupt_stream(&self) -> bool {
128        self.fault_injector.should_truncate_response()
129    }
130
131    /// Get traffic shaper for connection management
132    pub fn traffic_shaper(&self) -> &Arc<TrafficShaper> {
133        &self.traffic_shaper
134    }
135}
136
137/// gRPC status codes
138pub mod status {
139    pub const OK: i32 = 0;
140    pub const CANCELLED: i32 = 1;
141    pub const UNKNOWN: i32 = 2;
142    pub const INVALID_ARGUMENT: i32 = 3;
143    pub const DEADLINE_EXCEEDED: i32 = 4;
144    pub const NOT_FOUND: i32 = 5;
145    pub const ALREADY_EXISTS: i32 = 6;
146    pub const PERMISSION_DENIED: i32 = 7;
147    pub const RESOURCE_EXHAUSTED: i32 = 8;
148    pub const FAILED_PRECONDITION: i32 = 9;
149    pub const ABORTED: i32 = 10;
150    pub const OUT_OF_RANGE: i32 = 11;
151    pub const UNIMPLEMENTED: i32 = 12;
152    pub const INTERNAL: i32 = 13;
153    pub const UNAVAILABLE: i32 = 14;
154    pub const DATA_LOSS: i32 = 15;
155    pub const UNAUTHENTICATED: i32 = 16;
156}
157
158#[cfg(test)]
159mod tests {
160    use super::*;
161    use crate::config::{FaultInjectionConfig, LatencyConfig};
162
163    #[tokio::test]
164    async fn test_grpc_chaos_creation() {
165        let config = ChaosConfig {
166            enabled: true,
167            latency: Some(LatencyConfig {
168                enabled: true,
169                fixed_delay_ms: Some(10),
170                random_delay_range_ms: None,
171                jitter_percent: 0.0,
172                probability: 1.0,
173            }),
174            ..Default::default()
175        };
176
177        let chaos = GrpcChaos::new(config);
178        assert!(chaos.config.enabled);
179    }
180
181    #[tokio::test]
182    async fn test_grpc_status_code_mapping() {
183        let config = ChaosConfig {
184            enabled: true,
185            fault_injection: Some(FaultInjectionConfig {
186                enabled: true,
187                http_errors: vec![500],
188                http_error_probability: 1.0,
189                ..Default::default()
190            }),
191            ..Default::default()
192        };
193
194        let chaos = GrpcChaos::new(config);
195        let status = chaos.get_grpc_status_code();
196
197        // Should map 500 to gRPC INTERNAL (13)
198        if let Some(code) = status {
199            assert_eq!(code, 13);
200        }
201    }
202}