Skip to main content

grapsus_proxy/
shadow.rs

1//! Traffic mirroring / shadowing for safe canary testing
2//!
3//! This module implements fire-and-forget request duplication to shadow upstreams,
4//! enabling safe canary deployments and testing with production traffic.
5//!
6//! # Features
7//!
8//! - Sampling-based mirroring (percentage control)
9//! - Header-based sampling (selective mirroring)
10//! - Optional request body buffering
11//! - Fire-and-forget async execution (no blocking)
12//! - Comprehensive metrics
13
14use bytes::Bytes;
15use pingora::http::RequestHeader;
16use pingora::proxy::Session;
17use rand::RngExt;
18use reqwest::header::{HeaderMap, HeaderName, HeaderValue};
19use std::collections::HashMap;
20use std::str::FromStr;
21use std::sync::Arc;
22use std::time::Instant;
23use tokio::time::Duration;
24use tracing::{debug, error, trace, warn};
25use grapsus_common::errors::{GrapsusError, GrapsusResult};
26use grapsus_common::observability::RequestMetrics;
27use grapsus_config::routes::ShadowConfig;
28
29use crate::{RequestContext, UpstreamPool};
30
31/// Manager for traffic shadowing/mirroring
32///
33/// Handles request duplication to shadow upstreams with sampling control,
34/// optional body buffering, and fire-and-forget execution.
35#[derive(Clone)]
36pub struct ShadowManager {
37    /// Reference to upstream pools for shadow target selection
38    upstream_pools: Arc<HashMap<String, Arc<UpstreamPool>>>,
39
40    /// Shadow configuration
41    config: ShadowConfig,
42
43    /// Metrics collector for recording shadow metrics
44    metrics: Option<Arc<RequestMetrics>>,
45
46    /// Route ID for metrics labeling
47    route_id: String,
48
49    /// Shared HTTP client for shadow requests
50    client: reqwest::Client,
51}
52
53impl ShadowManager {
54    /// Create a new shadow manager
55    pub fn new(
56        upstream_pools: Arc<HashMap<String, Arc<UpstreamPool>>>,
57        config: ShadowConfig,
58        metrics: Option<Arc<RequestMetrics>>,
59        route_id: String,
60    ) -> Self {
61        // Build a reusable HTTP client with reasonable defaults for shadow traffic
62        let client = reqwest::Client::builder()
63            .timeout(Duration::from_millis(config.timeout_ms))
64            .pool_max_idle_per_host(10)
65            .pool_idle_timeout(Duration::from_secs(30))
66            // Accept invalid certs for shadow targets (they're often internal)
67            .danger_accept_invalid_certs(true)
68            .build()
69            .unwrap_or_else(|_| reqwest::Client::new());
70
71        Self {
72            upstream_pools,
73            config,
74            metrics,
75            route_id,
76            client,
77        }
78    }
79
80    /// Decide whether to shadow this request based on sampling rules
81    pub fn should_shadow(&self, headers: &RequestHeader) -> bool {
82        // Check sample_header if configured
83        if let Some((header_name, header_value)) = &self.config.sample_header {
84            if let Some(actual_value) = headers.headers.get(header_name) {
85                if actual_value.to_str().ok() != Some(header_value.as_str()) {
86                    trace!("Shadow skipped: sample-header mismatch");
87                    return false;
88                }
89            } else {
90                trace!("Shadow skipped: sample-header not present");
91                return false;
92            }
93        }
94
95        // Sample based on percentage
96        if self.config.percentage < 100.0 {
97            let mut rng = rand::rng();
98            let roll: f64 = rng.random_range(0.0..100.0);
99            if roll > self.config.percentage {
100                trace!(
101                    roll = roll,
102                    threshold = self.config.percentage,
103                    "Shadow skipped: sampling"
104                );
105                return false;
106            }
107        }
108
109        true
110    }
111
112    /// Shadow a request asynchronously (fire-and-forget)
113    ///
114    /// This method spawns a tokio task to mirror the request to the shadow upstream.
115    /// The task does not block the main request path and failures do not affect
116    /// the primary response.
117    ///
118    /// # Arguments
119    ///
120    /// * `original_headers` - Original request headers to clone
121    /// * `body` - Optional buffered request body (if buffer_body=true)
122    /// * `ctx` - Request context for correlation
123    pub fn shadow_request(
124        &self,
125        original_headers: RequestHeader,
126        body: Option<Vec<u8>>,
127        ctx: RequestContext,
128    ) {
129        // Check if upstream exists
130        if !self.upstream_pools.contains_key(&self.config.upstream) {
131            warn!(
132                upstream = %self.config.upstream,
133                "Shadow upstream not found in pools"
134            );
135            // Record error metric
136            if let Some(ref metrics) = self.metrics {
137                metrics.record_shadow_error(
138                    &self.route_id,
139                    &self.config.upstream,
140                    "upstream_not_found",
141                );
142            }
143            return;
144        }
145
146        let config = self.config.clone();
147        let upstream_id = self.config.upstream.clone();
148        let upstream_pools = Arc::clone(&self.upstream_pools);
149        let metrics = self.metrics.clone();
150        let route_id = self.route_id.clone();
151        let client = self.client.clone();
152
153        // Spawn fire-and-forget task
154        tokio::spawn(async move {
155            let start = Instant::now();
156
157            // Get upstream pool inside the async task
158            let upstream_pool = match upstream_pools.get(&upstream_id) {
159                Some(pool) => pool,
160                None => {
161                    // This shouldn't happen since we checked above, but handle gracefully
162                    warn!(upstream = %upstream_id, "Shadow upstream disappeared");
163                    return;
164                }
165            };
166
167            // Execute shadow request with timeout
168            let result = tokio::time::timeout(
169                Duration::from_millis(config.timeout_ms),
170                Self::execute_shadow_request(
171                    &client,
172                    upstream_pool,
173                    original_headers,
174                    body,
175                    ctx.clone(),
176                ),
177            )
178            .await;
179
180            let latency = start.elapsed();
181
182            match result {
183                Ok(Ok(())) => {
184                    debug!(
185                        upstream = %upstream_id,
186                        latency_ms = latency.as_millis(),
187                        path = %ctx.path,
188                        method = %ctx.method,
189                        "Shadow request completed successfully"
190                    );
191                    // Record success metrics
192                    if let Some(ref metrics) = metrics {
193                        metrics.record_shadow_success(&route_id, &upstream_id, latency);
194                    }
195                }
196                Ok(Err(e)) => {
197                    error!(
198                        upstream = %upstream_id,
199                        error = %e,
200                        latency_ms = latency.as_millis(),
201                        path = %ctx.path,
202                        method = %ctx.method,
203                        "Shadow request failed"
204                    );
205                    // Record error metrics
206                    if let Some(ref metrics) = metrics {
207                        metrics.record_shadow_error(&route_id, &upstream_id, "request_failed");
208                    }
209                }
210                Err(_) => {
211                    warn!(
212                        upstream = %upstream_id,
213                        timeout_ms = config.timeout_ms,
214                        path = %ctx.path,
215                        method = %ctx.method,
216                        "Shadow request timed out"
217                    );
218                    // Record timeout metrics
219                    if let Some(ref metrics) = metrics {
220                        metrics.record_shadow_timeout(&route_id, &upstream_id, latency);
221                    }
222                }
223            }
224        });
225    }
226
227    /// Execute the actual shadow request
228    ///
229    /// This is the internal implementation that sends the mirrored request
230    /// to the shadow upstream using reqwest.
231    async fn execute_shadow_request(
232        client: &reqwest::Client,
233        upstream_pool: &UpstreamPool,
234        headers: RequestHeader,
235        body: Option<Vec<u8>>,
236        ctx: RequestContext,
237    ) -> GrapsusResult<()> {
238        // Select shadow target from upstream pool
239        let target = upstream_pool.select_shadow_target(Some(&ctx)).await?;
240
241        // Build the full URL
242        let url = target.build_url(&ctx.path);
243
244        trace!(
245            url = %url,
246            method = %ctx.method,
247            body_size = body.as_ref().map(|b| b.len()).unwrap_or(0),
248            "Executing shadow request"
249        );
250
251        // Convert Pingora headers to reqwest headers
252        let mut reqwest_headers = HeaderMap::new();
253        for (name, value) in headers.headers.iter() {
254            // Skip hop-by-hop headers that shouldn't be forwarded
255            let name_str = name.as_str().to_lowercase();
256            if matches!(
257                name_str.as_str(),
258                "connection"
259                    | "keep-alive"
260                    | "proxy-authenticate"
261                    | "proxy-authorization"
262                    | "te"
263                    | "trailers"
264                    | "transfer-encoding"
265                    | "upgrade"
266            ) {
267                continue;
268            }
269
270            if let (Ok(header_name), Ok(header_value)) = (
271                HeaderName::from_str(name.as_str()),
272                HeaderValue::from_bytes(value.as_bytes()),
273            ) {
274                reqwest_headers.insert(header_name, header_value);
275            }
276        }
277
278        // Add shadow-specific header to identify mirrored traffic
279        reqwest_headers.insert("x-shadow-request", HeaderValue::from_static("true"));
280
281        // Update Host header to match shadow target
282        if let Ok(host_value) = HeaderValue::from_str(&target.host) {
283            reqwest_headers.insert("host", host_value);
284        }
285
286        // Build the request based on method
287        let method =
288            reqwest::Method::from_bytes(ctx.method.as_bytes()).unwrap_or(reqwest::Method::GET);
289
290        let mut request_builder = client.request(method, &url).headers(reqwest_headers);
291
292        // Add body if present
293        if let Some(body_bytes) = body {
294            request_builder = request_builder.body(body_bytes);
295        }
296
297        // Send the request and discard response
298        let response = request_builder.send().await.map_err(|e| {
299            GrapsusError::upstream(
300                upstream_pool.id().to_string(),
301                format!("Shadow request failed: {}", e),
302            )
303        })?;
304
305        let status = response.status();
306        trace!(
307            url = %url,
308            status = %status,
309            "Shadow request completed"
310        );
311
312        // We don't care about the response body, just that it was sent
313        // Drop the response to release the connection back to the pool
314        drop(response);
315
316        Ok(())
317    }
318}
319
320/// Helper to determine if HTTP method should have body buffered
321pub fn should_buffer_method(method: &str) -> bool {
322    matches!(method.to_uppercase().as_str(), "POST" | "PUT" | "PATCH")
323}
324
325/// Buffer request body from session with size limits
326///
327/// Reads the request body from a Pingora session and buffers it up to
328/// the configured maximum size. Returns an error if the body exceeds
329/// the limit.
330///
331/// # Arguments
332///
333/// * `session` - Pingora session to read body from
334/// * `max_bytes` - Maximum bytes to buffer
335pub async fn buffer_request_body(
336    session: &mut Session,
337    max_bytes: usize,
338) -> GrapsusResult<Vec<u8>> {
339    if max_bytes == 0 {
340        return Err(GrapsusError::LimitExceeded {
341            limit_type: grapsus_common::errors::LimitType::BodySize,
342            message: "max_body_bytes must be > 0".to_string(),
343            current_value: 0,
344            limit: 0,
345        });
346    }
347
348    let mut buffer = Vec::with_capacity(max_bytes.min(65536)); // Start with reasonable capacity
349    let mut total_read = 0;
350
351    loop {
352        // Read next chunk from the session
353        let chunk = session
354            .read_request_body()
355            .await
356            .map_err(|e| GrapsusError::Internal {
357                message: format!("Failed to read request body for shadow: {}", e),
358                correlation_id: None,
359                source: None,
360            })?;
361
362        match chunk {
363            Some(data) => {
364                let chunk_len: usize = data.len();
365
366                // Check if this chunk would exceed the limit
367                if total_read + chunk_len > max_bytes {
368                    return Err(GrapsusError::LimitExceeded {
369                        limit_type: grapsus_common::errors::LimitType::BodySize,
370                        message: format!(
371                            "Request body exceeds maximum shadow buffer size of {} bytes",
372                            max_bytes
373                        ),
374                        current_value: total_read + chunk_len,
375                        limit: max_bytes,
376                    });
377                }
378
379                buffer.extend_from_slice(&data);
380                total_read += chunk_len;
381
382                trace!(
383                    chunk_size = chunk_len,
384                    total_buffered = total_read,
385                    max_bytes = max_bytes,
386                    "Buffered request body chunk for shadow"
387                );
388            }
389            None => {
390                // End of body
391                break;
392            }
393        }
394    }
395
396    debug!(
397        total_bytes = total_read,
398        "Finished buffering request body for shadow"
399    );
400
401    Ok(buffer)
402}
403
404/// Clone request body bytes for shadow traffic
405///
406/// This is a simpler version that takes already-buffered body bytes
407/// (from request_body_filter) and returns a clone for shadow use.
408pub fn clone_body_for_shadow(body: &Option<Bytes>) -> Option<Vec<u8>> {
409    body.as_ref().map(|b| b.to_vec())
410}
411
412#[cfg(test)]
413mod tests {
414    use super::*;
415    use pingora::http::RequestHeader as PingoraRequestHeader;
416
417    #[test]
418    fn test_should_buffer_method() {
419        assert!(should_buffer_method("POST"));
420        assert!(should_buffer_method("PUT"));
421        assert!(should_buffer_method("PATCH"));
422        assert!(should_buffer_method("post")); // Case insensitive
423        assert!(!should_buffer_method("GET"));
424        assert!(!should_buffer_method("HEAD"));
425        assert!(!should_buffer_method("DELETE"));
426    }
427
428    #[test]
429    fn test_shadow_sampling_percentage() {
430        let pools = Arc::new(HashMap::new());
431        let config = ShadowConfig {
432            upstream: "shadow".to_string(),
433            percentage: 0.0, // 0% should never shadow
434            sample_header: None,
435            timeout_ms: 5000,
436            buffer_body: false,
437            max_body_bytes: 1048576,
438        };
439
440        let manager = ShadowManager::new(pools, config, None, "test-route".to_string());
441        let headers = PingoraRequestHeader::build("GET", b"/", None).unwrap();
442
443        // With 0% sampling, should never shadow
444        for _ in 0..100 {
445            assert!(!manager.should_shadow(&headers));
446        }
447    }
448
449    #[test]
450    fn test_shadow_sampling_always() {
451        let pools = Arc::new(HashMap::new());
452        let config = ShadowConfig {
453            upstream: "shadow".to_string(),
454            percentage: 100.0, // 100% should always shadow
455            sample_header: None,
456            timeout_ms: 5000,
457            buffer_body: false,
458            max_body_bytes: 1048576,
459        };
460
461        let manager = ShadowManager::new(pools, config, None, "test-route".to_string());
462        let headers = PingoraRequestHeader::build("GET", b"/", None).unwrap();
463
464        // With 100% sampling, should always shadow
465        for _ in 0..100 {
466            assert!(manager.should_shadow(&headers));
467        }
468    }
469
470    #[test]
471    fn test_shadow_sample_header_match() {
472        let pools = Arc::new(HashMap::new());
473        let config = ShadowConfig {
474            upstream: "shadow".to_string(),
475            percentage: 100.0,
476            sample_header: Some(("X-Shadow".to_string(), "true".to_string())),
477            timeout_ms: 5000,
478            buffer_body: false,
479            max_body_bytes: 1048576,
480        };
481
482        let manager = ShadowManager::new(pools, config, None, "test-route".to_string());
483
484        // Request with matching header
485        let mut headers = PingoraRequestHeader::build("GET", b"/", None).unwrap();
486        headers.insert_header("X-Shadow", "true").unwrap();
487        assert!(manager.should_shadow(&headers));
488
489        // Request without header
490        let headers_no_match = PingoraRequestHeader::build("GET", b"/", None).unwrap();
491        assert!(!manager.should_shadow(&headers_no_match));
492
493        // Request with wrong header value
494        let mut headers_wrong = PingoraRequestHeader::build("GET", b"/", None).unwrap();
495        headers_wrong.insert_header("X-Shadow", "false").unwrap();
496        assert!(!manager.should_shadow(&headers_wrong));
497    }
498
499    #[test]
500    fn test_clone_body_for_shadow() {
501        // Test with Some body
502        let body = Some(Bytes::from("test body content"));
503        let cloned = clone_body_for_shadow(&body);
504        assert!(cloned.is_some());
505        assert_eq!(cloned.unwrap(), b"test body content");
506
507        // Test with None body
508        let no_body: Option<Bytes> = None;
509        let cloned_none = clone_body_for_shadow(&no_body);
510        assert!(cloned_none.is_none());
511    }
512}