sentinel_proxy/
shadow.rs

1//! Traffic mirroring / shadowing for safe canary testing
2//!
3//! This module implements fire-and-forget request duplication to shadow upstreams,
4//! enabling safe canary deployments and testing with production traffic.
5//!
6//! # Features
7//!
8//! - Sampling-based mirroring (percentage control)
9//! - Header-based sampling (selective mirroring)
10//! - Optional request body buffering
11//! - Fire-and-forget async execution (no blocking)
12//! - Comprehensive metrics
13
14use pingora::http::RequestHeader;
15use rand::Rng;
16use sentinel_common::errors::{SentinelError, SentinelResult};
17use sentinel_common::observability::RequestMetrics;
18use sentinel_config::routes::ShadowConfig;
19use std::collections::HashMap;
20use std::sync::Arc;
21use std::time::Instant;
22use tokio::time::Duration;
23use tracing::{debug, error, info, trace, warn};
24
25use crate::{RequestContext, UpstreamPool};
26
27/// Manager for traffic shadowing/mirroring
28///
29/// Handles request duplication to shadow upstreams with sampling control,
30/// optional body buffering, and fire-and-forget execution.
31#[derive(Clone)]
32pub struct ShadowManager {
33    /// Reference to upstream pools for shadow target selection
34    upstream_pools: Arc<HashMap<String, Arc<UpstreamPool>>>,
35
36    /// Shadow configuration
37    config: ShadowConfig,
38
39    /// Metrics collector for recording shadow metrics
40    metrics: Option<Arc<RequestMetrics>>,
41
42    /// Route ID for metrics labeling
43    route_id: String,
44}
45
46impl ShadowManager {
47    /// Create a new shadow manager
48    pub fn new(
49        upstream_pools: Arc<HashMap<String, Arc<UpstreamPool>>>,
50        config: ShadowConfig,
51        metrics: Option<Arc<RequestMetrics>>,
52        route_id: String,
53    ) -> Self {
54        Self {
55            upstream_pools,
56            config,
57            metrics,
58            route_id,
59        }
60    }
61
62    /// Decide whether to shadow this request based on sampling rules
63    pub fn should_shadow(&self, headers: &RequestHeader) -> bool {
64        // Check sample_header if configured
65        if let Some((header_name, header_value)) = &self.config.sample_header {
66            if let Some(actual_value) = headers.headers.get(header_name) {
67                if actual_value.to_str().ok() != Some(header_value.as_str()) {
68                    trace!("Shadow skipped: sample-header mismatch");
69                    return false;
70                }
71            } else {
72                trace!("Shadow skipped: sample-header not present");
73                return false;
74            }
75        }
76
77        // Sample based on percentage
78        if self.config.percentage < 100.0 {
79            let mut rng = rand::thread_rng();
80            let roll: f64 = rng.gen_range(0.0..100.0);
81            if roll > self.config.percentage {
82                trace!(
83                    roll = roll,
84                    threshold = self.config.percentage,
85                    "Shadow skipped: sampling"
86                );
87                return false;
88            }
89        }
90
91        true
92    }
93
94    /// Shadow a request asynchronously (fire-and-forget)
95    ///
96    /// This method spawns a tokio task to mirror the request to the shadow upstream.
97    /// The task does not block the main request path and failures do not affect
98    /// the primary response.
99    ///
100    /// # Arguments
101    ///
102    /// * `original_headers` - Original request headers to clone
103    /// * `body` - Optional buffered request body (if buffer_body=true)
104    /// * `ctx` - Request context for correlation
105    pub fn shadow_request(
106        &self,
107        original_headers: RequestHeader,
108        body: Option<Vec<u8>>,
109        ctx: RequestContext,
110    ) {
111        // Check if upstream exists
112        if !self.upstream_pools.contains_key(&self.config.upstream) {
113            warn!(
114                upstream = %self.config.upstream,
115                "Shadow upstream not found in pools"
116            );
117            // Record error metric
118            if let Some(ref metrics) = self.metrics {
119                metrics.record_shadow_error(&self.route_id, &self.config.upstream, "upstream_not_found");
120            }
121            return;
122        }
123
124        let config = self.config.clone();
125        let upstream_id = self.config.upstream.clone();
126        let upstream_pools = Arc::clone(&self.upstream_pools);
127        let metrics = self.metrics.clone();
128        let route_id = self.route_id.clone();
129
130        // Spawn fire-and-forget task
131        tokio::spawn(async move {
132            let start = Instant::now();
133
134            // Get upstream pool inside the async task
135            let upstream_pool = match upstream_pools.get(&upstream_id) {
136                Some(pool) => pool,
137                None => {
138                    // This shouldn't happen since we checked above, but handle gracefully
139                    warn!(upstream = %upstream_id, "Shadow upstream disappeared");
140                    return;
141                }
142            };
143
144            // Execute shadow request with timeout
145            let result = tokio::time::timeout(
146                Duration::from_millis(config.timeout_ms),
147                Self::execute_shadow_request(upstream_pool, original_headers, body, ctx.clone()),
148            )
149            .await;
150
151            let latency = start.elapsed();
152
153            match result {
154                Ok(Ok(())) => {
155                    debug!(
156                        upstream = %upstream_id,
157                        latency_ms = latency.as_millis(),
158                        path = %ctx.path,
159                        method = %ctx.method,
160                        "Shadow request completed successfully"
161                    );
162                    // Record success metrics
163                    if let Some(ref metrics) = metrics {
164                        metrics.record_shadow_success(&route_id, &upstream_id, latency);
165                    }
166                }
167                Ok(Err(e)) => {
168                    error!(
169                        upstream = %upstream_id,
170                        error = %e,
171                        latency_ms = latency.as_millis(),
172                        path = %ctx.path,
173                        method = %ctx.method,
174                        "Shadow request failed"
175                    );
176                    // Record error metrics
177                    if let Some(ref metrics) = metrics {
178                        metrics.record_shadow_error(&route_id, &upstream_id, "request_failed");
179                    }
180                }
181                Err(_) => {
182                    warn!(
183                        upstream = %upstream_id,
184                        timeout_ms = config.timeout_ms,
185                        path = %ctx.path,
186                        method = %ctx.method,
187                        "Shadow request timed out"
188                    );
189                    // Record timeout metrics
190                    if let Some(ref metrics) = metrics {
191                        metrics.record_shadow_timeout(&route_id, &upstream_id, latency);
192                    }
193                }
194            }
195        });
196    }
197
198    /// Execute the actual shadow request
199    ///
200    /// This is the internal implementation that sends the mirrored request
201    /// to the shadow upstream.
202    async fn execute_shadow_request(
203        _upstream_pool: &UpstreamPool,
204        _headers: RequestHeader,
205        _body: Option<Vec<u8>>,
206        _ctx: RequestContext,
207    ) -> SentinelResult<()> {
208        // TODO: Implement actual request execution
209        // This will require:
210        // 1. Select target from upstream_pool
211        // 2. Connect to shadow target
212        // 3. Send headers
213        // 4. Send body if present
214        // 5. Read response (and discard)
215        // 6. Close connection
216
217        // For now, placeholder implementation
218        trace!("Shadow request execution (placeholder)");
219        Ok(())
220    }
221}
222
223/// Helper to determine if HTTP method should have body buffered
224pub fn should_buffer_method(method: &str) -> bool {
225    matches!(method, "POST" | "PUT" | "PATCH")
226}
227
228/// Buffer request body from session with size limits
229///
230/// Reads the request body from a Pingora session and buffers it up to
231/// the configured maximum size. Returns an error if the body exceeds
232/// the limit.
233///
234/// # Arguments
235///
236/// * `session` - Pingora session to read body from
237/// * `max_bytes` - Maximum bytes to buffer
238pub async fn buffer_request_body(
239    _session: &mut pingora::proxy::Session,
240    max_bytes: usize,
241) -> SentinelResult<Vec<u8>> {
242    // TODO: Implement body buffering
243    // This will require:
244    // 1. Read chunks from session.read_request_body()
245    // 2. Accumulate into buffer
246    // 3. Check total size against max_bytes
247    // 4. Return error if exceeded
248
249    // For now, placeholder
250    if max_bytes > 0 {
251        Ok(Vec::new())
252    } else {
253        Err(SentinelError::LimitExceeded {
254            limit_type: sentinel_common::errors::LimitType::BodySize,
255            message: "max_body_bytes must be > 0".to_string(),
256            current_value: 0,
257            limit: 0,
258        })
259    }
260}
261
262#[cfg(test)]
263mod tests {
264    use super::*;
265    use pingora::http::RequestHeader as PingoraRequestHeader;
266
267    #[test]
268    fn test_should_buffer_method() {
269        assert!(should_buffer_method("POST"));
270        assert!(should_buffer_method("PUT"));
271        assert!(should_buffer_method("PATCH"));
272        assert!(!should_buffer_method("GET"));
273        assert!(!should_buffer_method("HEAD"));
274        assert!(!should_buffer_method("DELETE"));
275    }
276
277    #[test]
278    fn test_shadow_sampling_percentage() {
279        let pools = Arc::new(HashMap::new());
280        let config = ShadowConfig {
281            upstream: "shadow".to_string(),
282            percentage: 0.0, // 0% should never shadow
283            sample_header: None,
284            timeout_ms: 5000,
285            buffer_body: false,
286            max_body_bytes: 1048576,
287        };
288
289        let manager = ShadowManager::new(pools, config, None, "test-route".to_string());
290        let headers = PingoraRequestHeader::build("GET", b"/", None).unwrap();
291
292        // With 0% sampling, should never shadow
293        for _ in 0..100 {
294            assert!(!manager.should_shadow(&headers));
295        }
296    }
297
298    #[test]
299    fn test_shadow_sampling_always() {
300        let pools = Arc::new(HashMap::new());
301        let config = ShadowConfig {
302            upstream: "shadow".to_string(),
303            percentage: 100.0, // 100% should always shadow
304            sample_header: None,
305            timeout_ms: 5000,
306            buffer_body: false,
307            max_body_bytes: 1048576,
308        };
309
310        let manager = ShadowManager::new(pools, config, None, "test-route".to_string());
311        let headers = PingoraRequestHeader::build("GET", b"/", None).unwrap();
312
313        // With 100% sampling, should always shadow
314        for _ in 0..100 {
315            assert!(manager.should_shadow(&headers));
316        }
317    }
318
319    #[test]
320    fn test_shadow_sample_header_match() {
321        let pools = Arc::new(HashMap::new());
322        let config = ShadowConfig {
323            upstream: "shadow".to_string(),
324            percentage: 100.0,
325            sample_header: Some(("X-Shadow".to_string(), "true".to_string())),
326            timeout_ms: 5000,
327            buffer_body: false,
328            max_body_bytes: 1048576,
329        };
330
331        let manager = ShadowManager::new(pools, config, None, "test-route".to_string());
332
333        // Request with matching header
334        let mut headers = PingoraRequestHeader::build("GET", b"/", None).unwrap();
335        headers
336            .insert_header("X-Shadow", "true")
337            .unwrap();
338        assert!(manager.should_shadow(&headers));
339
340        // Request without header
341        let headers_no_match = PingoraRequestHeader::build("GET", b"/", None).unwrap();
342        assert!(!manager.should_shadow(&headers_no_match));
343
344        // Request with wrong header value
345        let mut headers_wrong = PingoraRequestHeader::build("GET", b"/", None).unwrap();
346        headers_wrong
347            .insert_header("X-Shadow", "false")
348            .unwrap();
349        assert!(!manager.should_shadow(&headers_wrong));
350    }
351}