mockforge_chaos/protocols/
grpc.rs1use crate::{
4 config::ChaosConfig, fault::FaultInjector, latency::LatencyInjector, rate_limit::RateLimiter,
5 traffic_shaping::TrafficShaper, ChaosError, Result,
6};
7use std::sync::Arc;
8use tracing::{debug, warn};
9
10#[derive(Debug, Clone)]
12pub enum GrpcFault {
13 StatusCode(i32), StreamInterruption,
17 MetadataCorruption,
19 MessageCorruption,
21}
22
23#[derive(Clone)]
25pub struct GrpcChaos {
26 latency_injector: Arc<LatencyInjector>,
27 fault_injector: Arc<FaultInjector>,
28 rate_limiter: Arc<RateLimiter>,
29 traffic_shaper: Arc<TrafficShaper>,
30 config: Arc<ChaosConfig>,
31}
32
33impl GrpcChaos {
34 pub fn new(config: ChaosConfig) -> Self {
36 let latency_injector =
37 Arc::new(LatencyInjector::new(config.latency.clone().unwrap_or_default()));
38
39 let fault_injector =
40 Arc::new(FaultInjector::new(config.fault_injection.clone().unwrap_or_default()));
41
42 let rate_limiter =
43 Arc::new(RateLimiter::new(config.rate_limit.clone().unwrap_or_default()));
44
45 let traffic_shaper =
46 Arc::new(TrafficShaper::new(config.traffic_shaping.clone().unwrap_or_default()));
47
48 Self {
49 latency_injector,
50 fault_injector,
51 rate_limiter,
52 traffic_shaper,
53 config: Arc::new(config),
54 }
55 }
56
57 pub async fn apply_pre_request(
59 &self,
60 service: &str,
61 method: &str,
62 client_ip: Option<&str>,
63 ) -> Result<()> {
64 if !self.config.enabled {
65 return Ok(());
66 }
67
68 let endpoint = format!("{}/{}", service, method);
69 debug!("Applying gRPC chaos for: {}", endpoint);
70
71 if let Err(e) = self.rate_limiter.check(client_ip, Some(&endpoint)) {
73 warn!("gRPC rate limit exceeded: {}", endpoint);
74 return Err(e);
75 }
76
77 if !self.traffic_shaper.check_connection_limit() {
79 warn!("gRPC connection limit exceeded");
80 return Err(ChaosError::ConnectionThrottled);
81 }
82
83 self.latency_injector.inject().await;
85
86 self.fault_injector.inject()?;
88
89 Ok(())
90 }
91
92 pub async fn apply_post_response(&self, message_size: usize) -> Result<()> {
94 if !self.config.enabled {
95 return Ok(());
96 }
97
98 self.traffic_shaper.throttle_bandwidth(message_size).await;
100
101 if self.traffic_shaper.should_drop_packet() {
103 warn!("Simulating gRPC packet loss");
104 return Err(ChaosError::InjectedFault("Packet loss".to_string()));
105 }
106
107 Ok(())
108 }
109
110 pub fn get_grpc_status_code(&self) -> Option<i32> {
112 self.fault_injector.get_http_error_status().map(|http_code| match http_code {
113 400 => 3, 401 => 16, 403 => 7, 404 => 5, 429 => 8, 500 => 13, 501 => 12, 503 => 14, 504 => 4, _ => 2, })
124 }
125
126 pub fn should_interrupt_stream(&self) -> bool {
128 self.fault_injector.should_truncate_response()
129 }
130
131 pub fn traffic_shaper(&self) -> &Arc<TrafficShaper> {
133 &self.traffic_shaper
134 }
135}
136
137pub mod status {
139 pub const OK: i32 = 0;
140 pub const CANCELLED: i32 = 1;
141 pub const UNKNOWN: i32 = 2;
142 pub const INVALID_ARGUMENT: i32 = 3;
143 pub const DEADLINE_EXCEEDED: i32 = 4;
144 pub const NOT_FOUND: i32 = 5;
145 pub const ALREADY_EXISTS: i32 = 6;
146 pub const PERMISSION_DENIED: i32 = 7;
147 pub const RESOURCE_EXHAUSTED: i32 = 8;
148 pub const FAILED_PRECONDITION: i32 = 9;
149 pub const ABORTED: i32 = 10;
150 pub const OUT_OF_RANGE: i32 = 11;
151 pub const UNIMPLEMENTED: i32 = 12;
152 pub const INTERNAL: i32 = 13;
153 pub const UNAVAILABLE: i32 = 14;
154 pub const DATA_LOSS: i32 = 15;
155 pub const UNAUTHENTICATED: i32 = 16;
156}
157
158#[cfg(test)]
159mod tests {
160 use super::*;
161 use crate::config::{FaultInjectionConfig, LatencyConfig};
162
163 #[tokio::test]
164 async fn test_grpc_chaos_creation() {
165 let config = ChaosConfig {
166 enabled: true,
167 latency: Some(LatencyConfig {
168 enabled: true,
169 fixed_delay_ms: Some(10),
170 random_delay_range_ms: None,
171 jitter_percent: 0.0,
172 probability: 1.0,
173 }),
174 ..Default::default()
175 };
176
177 let chaos = GrpcChaos::new(config);
178 assert!(chaos.config.enabled);
179 }
180
181 #[tokio::test]
182 async fn test_grpc_status_code_mapping() {
183 let config = ChaosConfig {
184 enabled: true,
185 fault_injection: Some(FaultInjectionConfig {
186 enabled: true,
187 http_errors: vec![500],
188 http_error_probability: 1.0,
189 ..Default::default()
190 }),
191 ..Default::default()
192 };
193
194 let chaos = GrpcChaos::new(config);
195 let status = chaos.get_grpc_status_code();
196
197 if let Some(code) = status {
199 assert_eq!(code, 13);
200 }
201 }
202}