1use pingora::http::RequestHeader;
15use rand::Rng;
16use sentinel_common::errors::{SentinelError, SentinelResult};
17use sentinel_common::observability::RequestMetrics;
18use sentinel_config::routes::ShadowConfig;
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Instant;
22use tokio::time::Duration;
23use tracing::{debug, error, info, trace, warn};
24
25use crate::{RequestContext, UpstreamPool};
26
27#[derive(Clone)]
32pub struct ShadowManager {
33 upstream_pools: Arc<HashMap<String, Arc<UpstreamPool>>>,
35
36 config: ShadowConfig,
38
39 metrics: Option<Arc<RequestMetrics>>,
41
42 route_id: String,
44}
45
46impl ShadowManager {
47 pub fn new(
49 upstream_pools: Arc<HashMap<String, Arc<UpstreamPool>>>,
50 config: ShadowConfig,
51 metrics: Option<Arc<RequestMetrics>>,
52 route_id: String,
53 ) -> Self {
54 Self {
55 upstream_pools,
56 config,
57 metrics,
58 route_id,
59 }
60 }
61
62 pub fn should_shadow(&self, headers: &RequestHeader) -> bool {
64 if let Some((header_name, header_value)) = &self.config.sample_header {
66 if let Some(actual_value) = headers.headers.get(header_name) {
67 if actual_value.to_str().ok() != Some(header_value.as_str()) {
68 trace!("Shadow skipped: sample-header mismatch");
69 return false;
70 }
71 } else {
72 trace!("Shadow skipped: sample-header not present");
73 return false;
74 }
75 }
76
77 if self.config.percentage < 100.0 {
79 let mut rng = rand::thread_rng();
80 let roll: f64 = rng.gen_range(0.0..100.0);
81 if roll > self.config.percentage {
82 trace!(
83 roll = roll,
84 threshold = self.config.percentage,
85 "Shadow skipped: sampling"
86 );
87 return false;
88 }
89 }
90
91 true
92 }
93
94 pub fn shadow_request(
106 &self,
107 original_headers: RequestHeader,
108 body: Option<Vec<u8>>,
109 ctx: RequestContext,
110 ) {
111 if !self.upstream_pools.contains_key(&self.config.upstream) {
113 warn!(
114 upstream = %self.config.upstream,
115 "Shadow upstream not found in pools"
116 );
117 if let Some(ref metrics) = self.metrics {
119 metrics.record_shadow_error(&self.route_id, &self.config.upstream, "upstream_not_found");
120 }
121 return;
122 }
123
124 let config = self.config.clone();
125 let upstream_id = self.config.upstream.clone();
126 let upstream_pools = Arc::clone(&self.upstream_pools);
127 let metrics = self.metrics.clone();
128 let route_id = self.route_id.clone();
129
130 tokio::spawn(async move {
132 let start = Instant::now();
133
134 let upstream_pool = match upstream_pools.get(&upstream_id) {
136 Some(pool) => pool,
137 None => {
138 warn!(upstream = %upstream_id, "Shadow upstream disappeared");
140 return;
141 }
142 };
143
144 let result = tokio::time::timeout(
146 Duration::from_millis(config.timeout_ms),
147 Self::execute_shadow_request(upstream_pool, original_headers, body, ctx.clone()),
148 )
149 .await;
150
151 let latency = start.elapsed();
152
153 match result {
154 Ok(Ok(())) => {
155 debug!(
156 upstream = %upstream_id,
157 latency_ms = latency.as_millis(),
158 path = %ctx.path,
159 method = %ctx.method,
160 "Shadow request completed successfully"
161 );
162 if let Some(ref metrics) = metrics {
164 metrics.record_shadow_success(&route_id, &upstream_id, latency);
165 }
166 }
167 Ok(Err(e)) => {
168 error!(
169 upstream = %upstream_id,
170 error = %e,
171 latency_ms = latency.as_millis(),
172 path = %ctx.path,
173 method = %ctx.method,
174 "Shadow request failed"
175 );
176 if let Some(ref metrics) = metrics {
178 metrics.record_shadow_error(&route_id, &upstream_id, "request_failed");
179 }
180 }
181 Err(_) => {
182 warn!(
183 upstream = %upstream_id,
184 timeout_ms = config.timeout_ms,
185 path = %ctx.path,
186 method = %ctx.method,
187 "Shadow request timed out"
188 );
189 if let Some(ref metrics) = metrics {
191 metrics.record_shadow_timeout(&route_id, &upstream_id, latency);
192 }
193 }
194 }
195 });
196 }
197
198 async fn execute_shadow_request(
203 _upstream_pool: &UpstreamPool,
204 _headers: RequestHeader,
205 _body: Option<Vec<u8>>,
206 _ctx: RequestContext,
207 ) -> SentinelResult<()> {
208 trace!("Shadow request execution (placeholder)");
219 Ok(())
220 }
221}
222
223pub fn should_buffer_method(method: &str) -> bool {
225 matches!(method, "POST" | "PUT" | "PATCH")
226}
227
228pub async fn buffer_request_body(
239 _session: &mut pingora::proxy::Session,
240 max_bytes: usize,
241) -> SentinelResult<Vec<u8>> {
242 if max_bytes > 0 {
251 Ok(Vec::new())
252 } else {
253 Err(SentinelError::LimitExceeded {
254 limit_type: sentinel_common::errors::LimitType::BodySize,
255 message: "max_body_bytes must be > 0".to_string(),
256 current_value: 0,
257 limit: 0,
258 })
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use super::*;
265 use pingora::http::RequestHeader as PingoraRequestHeader;
266
267 #[test]
268 fn test_should_buffer_method() {
269 assert!(should_buffer_method("POST"));
270 assert!(should_buffer_method("PUT"));
271 assert!(should_buffer_method("PATCH"));
272 assert!(!should_buffer_method("GET"));
273 assert!(!should_buffer_method("HEAD"));
274 assert!(!should_buffer_method("DELETE"));
275 }
276
277 #[test]
278 fn test_shadow_sampling_percentage() {
279 let pools = Arc::new(HashMap::new());
280 let config = ShadowConfig {
281 upstream: "shadow".to_string(),
282 percentage: 0.0, sample_header: None,
284 timeout_ms: 5000,
285 buffer_body: false,
286 max_body_bytes: 1048576,
287 };
288
289 let manager = ShadowManager::new(pools, config, None, "test-route".to_string());
290 let headers = PingoraRequestHeader::build("GET", b"/", None).unwrap();
291
292 for _ in 0..100 {
294 assert!(!manager.should_shadow(&headers));
295 }
296 }
297
298 #[test]
299 fn test_shadow_sampling_always() {
300 let pools = Arc::new(HashMap::new());
301 let config = ShadowConfig {
302 upstream: "shadow".to_string(),
303 percentage: 100.0, sample_header: None,
305 timeout_ms: 5000,
306 buffer_body: false,
307 max_body_bytes: 1048576,
308 };
309
310 let manager = ShadowManager::new(pools, config, None, "test-route".to_string());
311 let headers = PingoraRequestHeader::build("GET", b"/", None).unwrap();
312
313 for _ in 0..100 {
315 assert!(manager.should_shadow(&headers));
316 }
317 }
318
319 #[test]
320 fn test_shadow_sample_header_match() {
321 let pools = Arc::new(HashMap::new());
322 let config = ShadowConfig {
323 upstream: "shadow".to_string(),
324 percentage: 100.0,
325 sample_header: Some(("X-Shadow".to_string(), "true".to_string())),
326 timeout_ms: 5000,
327 buffer_body: false,
328 max_body_bytes: 1048576,
329 };
330
331 let manager = ShadowManager::new(pools, config, None, "test-route".to_string());
332
333 let mut headers = PingoraRequestHeader::build("GET", b"/", None).unwrap();
335 headers
336 .insert_header("X-Shadow", "true")
337 .unwrap();
338 assert!(manager.should_shadow(&headers));
339
340 let headers_no_match = PingoraRequestHeader::build("GET", b"/", None).unwrap();
342 assert!(!manager.should_shadow(&headers_no_match));
343
344 let mut headers_wrong = PingoraRequestHeader::build("GET", b"/", None).unwrap();
346 headers_wrong
347 .insert_header("X-Shadow", "false")
348 .unwrap();
349 assert!(!manager.should_shadow(&headers_wrong));
350 }
351}