sentinel_proxy/proxy/
context.rs

1//! Request context for the proxy request lifecycle.
2//!
3//! The `RequestContext` struct maintains state throughout a single request,
4//! including timing, routing decisions, and metadata for logging.
5
6use std::sync::Arc;
7use std::time::Instant;
8
9use sentinel_config::{BodyStreamingMode, Config, RouteConfig, ServiceType};
10
11use crate::websocket::WebSocketHandler;
12
13/// Rate limit header information for response headers
14#[derive(Debug, Clone)]
15pub struct RateLimitHeaderInfo {
16    /// Maximum requests allowed per window
17    pub limit: u32,
18    /// Remaining requests in current window
19    pub remaining: u32,
20    /// Unix timestamp (seconds) when the window resets
21    pub reset_at: u64,
22}
23
24/// Request context maintained throughout the request lifecycle.
25///
26/// This struct uses a hybrid approach:
27/// - Immutable fields (start_time) are private with getters
28/// - Mutable fields are public(crate) for efficient access within the proxy module
29pub struct RequestContext {
30    /// Request start time (immutable after creation)
31    start_time: Instant,
32
33    // === Tracing ===
34    /// Unique trace ID for request tracing (also used as correlation_id)
35    pub(crate) trace_id: String,
36
37    // === Global config (cached once per request) ===
38    /// Cached global configuration snapshot for this request
39    pub(crate) config: Option<Arc<Config>>,
40
41    // === Routing ===
42    /// Selected route ID
43    pub(crate) route_id: Option<String>,
44    /// Cached route configuration (avoids duplicate route matching)
45    pub(crate) route_config: Option<Arc<RouteConfig>>,
46    /// Selected upstream pool ID
47    pub(crate) upstream: Option<String>,
48    /// Selected upstream peer address (IP:port) for feedback reporting
49    pub(crate) selected_upstream_address: Option<String>,
50    /// Number of upstream attempts
51    pub(crate) upstream_attempts: u32,
52
53    // === Scope (for namespaced configurations) ===
54    /// Namespace for this request (if routed to a namespace scope)
55    pub(crate) namespace: Option<String>,
56    /// Service for this request (if routed to a service scope)
57    pub(crate) service: Option<String>,
58
59    // === Request metadata (cached for logging) ===
60    /// HTTP method
61    pub(crate) method: String,
62    /// Request path
63    pub(crate) path: String,
64    /// Query string
65    pub(crate) query: Option<String>,
66
67    // === Client info ===
68    /// Client IP address
69    pub(crate) client_ip: String,
70    /// User-Agent header
71    pub(crate) user_agent: Option<String>,
72    /// Referer header
73    pub(crate) referer: Option<String>,
74    /// Host header
75    pub(crate) host: Option<String>,
76
77    // === Body tracking ===
78    /// Request body bytes received
79    pub(crate) request_body_bytes: u64,
80    /// Response body bytes (set during response)
81    pub(crate) response_bytes: u64,
82
83    // === Connection tracking ===
84    /// Whether the upstream connection was reused
85    pub(crate) connection_reused: bool,
86    /// Whether this request is a WebSocket upgrade
87    pub(crate) is_websocket_upgrade: bool,
88
89    // === WebSocket Inspection ===
90    /// Whether WebSocket frame inspection is enabled for this connection
91    pub(crate) websocket_inspection_enabled: bool,
92    /// Whether to skip inspection (e.g., due to compression negotiation)
93    pub(crate) websocket_skip_inspection: bool,
94    /// Agent IDs for WebSocket frame inspection
95    pub(crate) websocket_inspection_agents: Vec<String>,
96    /// WebSocket frame handler (created after 101 upgrade)
97    pub(crate) websocket_handler: Option<Arc<WebSocketHandler>>,
98
99    // === Caching ===
100    /// Whether this request is eligible for caching
101    pub(crate) cache_eligible: bool,
102
103    // === Body Inspection ===
104    /// Whether body inspection is enabled for this request
105    pub(crate) body_inspection_enabled: bool,
106    /// Bytes already sent to agent for inspection
107    pub(crate) body_bytes_inspected: u64,
108    /// Accumulated body buffer for agent inspection
109    pub(crate) body_buffer: Vec<u8>,
110    /// Agent IDs to use for body inspection
111    pub(crate) body_inspection_agents: Vec<String>,
112
113    // === Body Decompression ===
114    /// Whether decompression is enabled for body inspection
115    pub(crate) decompression_enabled: bool,
116    /// Content-Encoding of the request body (if compressed)
117    pub(crate) body_content_encoding: Option<String>,
118    /// Maximum decompression ratio allowed
119    pub(crate) max_decompression_ratio: f64,
120    /// Maximum decompressed size allowed
121    pub(crate) max_decompression_bytes: usize,
122    /// Whether decompression was performed
123    pub(crate) body_was_decompressed: bool,
124
125    // === Rate Limiting ===
126    /// Rate limit info for response headers (set during request_filter)
127    pub(crate) rate_limit_info: Option<RateLimitHeaderInfo>,
128
129    // === GeoIP Filtering ===
130    /// Country code from GeoIP lookup (ISO 3166-1 alpha-2)
131    pub(crate) geo_country_code: Option<String>,
132    /// Whether a geo lookup was performed for this request
133    pub(crate) geo_lookup_performed: bool,
134
135    // === Body Streaming ===
136    /// Body streaming mode for request body inspection
137    pub(crate) request_body_streaming_mode: BodyStreamingMode,
138    /// Current chunk index for request body streaming
139    pub(crate) request_body_chunk_index: u32,
140    /// Whether agent needs more data (streaming mode)
141    pub(crate) agent_needs_more: bool,
142    /// Body streaming mode for response body inspection
143    pub(crate) response_body_streaming_mode: BodyStreamingMode,
144    /// Current chunk index for response body streaming
145    pub(crate) response_body_chunk_index: u32,
146    /// Response body bytes inspected
147    pub(crate) response_body_bytes_inspected: u64,
148    /// Response body inspection enabled
149    pub(crate) response_body_inspection_enabled: bool,
150    /// Agent IDs for response body inspection
151    pub(crate) response_body_inspection_agents: Vec<String>,
152
153    // === OpenTelemetry Tracing ===
154    /// OpenTelemetry request span (if tracing enabled)
155    pub(crate) otel_span: Option<crate::otel::RequestSpan>,
156    /// W3C trace context parsed from incoming request
157    pub(crate) trace_context: Option<crate::otel::TraceContext>,
158}
159
160impl RequestContext {
161    /// Create a new empty request context with the current timestamp.
162    pub fn new() -> Self {
163        Self {
164            start_time: Instant::now(),
165            trace_id: String::new(),
166            config: None,
167            route_id: None,
168            route_config: None,
169            upstream: None,
170            selected_upstream_address: None,
171            upstream_attempts: 0,
172            namespace: None,
173            service: None,
174            method: String::new(),
175            path: String::new(),
176            query: None,
177            client_ip: String::new(),
178            user_agent: None,
179            referer: None,
180            host: None,
181            request_body_bytes: 0,
182            response_bytes: 0,
183            connection_reused: false,
184            is_websocket_upgrade: false,
185            websocket_inspection_enabled: false,
186            websocket_skip_inspection: false,
187            websocket_inspection_agents: Vec::new(),
188            websocket_handler: None,
189            cache_eligible: false,
190            body_inspection_enabled: false,
191            body_bytes_inspected: 0,
192            body_buffer: Vec::new(),
193            body_inspection_agents: Vec::new(),
194            decompression_enabled: false,
195            body_content_encoding: None,
196            max_decompression_ratio: 100.0,
197            max_decompression_bytes: 10 * 1024 * 1024, // 10MB
198            body_was_decompressed: false,
199            rate_limit_info: None,
200            geo_country_code: None,
201            geo_lookup_performed: false,
202            request_body_streaming_mode: BodyStreamingMode::Buffer,
203            request_body_chunk_index: 0,
204            agent_needs_more: false,
205            response_body_streaming_mode: BodyStreamingMode::Buffer,
206            response_body_chunk_index: 0,
207            response_body_bytes_inspected: 0,
208            response_body_inspection_enabled: false,
209            response_body_inspection_agents: Vec::new(),
210            otel_span: None,
211            trace_context: None,
212        }
213    }
214
215    // === Immutable field accessors ===
216
217    /// Get the request start time.
218    #[inline]
219    pub fn start_time(&self) -> Instant {
220        self.start_time
221    }
222
223    /// Get elapsed duration since request start.
224    #[inline]
225    pub fn elapsed(&self) -> std::time::Duration {
226        self.start_time.elapsed()
227    }
228
229    // === Read-only accessors ===
230
231    /// Get trace_id (alias for backwards compatibility with correlation_id usage).
232    #[inline]
233    pub fn correlation_id(&self) -> &str {
234        &self.trace_id
235    }
236
237    /// Get the trace ID.
238    #[inline]
239    pub fn trace_id(&self) -> &str {
240        &self.trace_id
241    }
242
243    /// Get the route ID, if set.
244    #[inline]
245    pub fn route_id(&self) -> Option<&str> {
246        self.route_id.as_deref()
247    }
248
249    /// Get the upstream ID, if set.
250    #[inline]
251    pub fn upstream(&self) -> Option<&str> {
252        self.upstream.as_deref()
253    }
254
255    /// Get the selected upstream peer address (IP:port), if set.
256    #[inline]
257    pub fn selected_upstream_address(&self) -> Option<&str> {
258        self.selected_upstream_address.as_deref()
259    }
260
261    /// Get the cached route configuration, if set.
262    #[inline]
263    pub fn route_config(&self) -> Option<&Arc<RouteConfig>> {
264        self.route_config.as_ref()
265    }
266
267    /// Get the cached global configuration, if set.
268    #[inline]
269    pub fn global_config(&self) -> Option<&Arc<Config>> {
270        self.config.as_ref()
271    }
272
273    /// Get the service type from cached route config.
274    #[inline]
275    pub fn service_type(&self) -> Option<ServiceType> {
276        self.route_config.as_ref().map(|c| c.service_type.clone())
277    }
278
279    /// Get the number of upstream attempts.
280    #[inline]
281    pub fn upstream_attempts(&self) -> u32 {
282        self.upstream_attempts
283    }
284
285    /// Get the HTTP method.
286    #[inline]
287    pub fn method(&self) -> &str {
288        &self.method
289    }
290
291    /// Get the request path.
292    #[inline]
293    pub fn path(&self) -> &str {
294        &self.path
295    }
296
297    /// Get the query string, if present.
298    #[inline]
299    pub fn query(&self) -> Option<&str> {
300        self.query.as_deref()
301    }
302
303    /// Get the client IP address.
304    #[inline]
305    pub fn client_ip(&self) -> &str {
306        &self.client_ip
307    }
308
309    /// Get the User-Agent header, if present.
310    #[inline]
311    pub fn user_agent(&self) -> Option<&str> {
312        self.user_agent.as_deref()
313    }
314
315    /// Get the Referer header, if present.
316    #[inline]
317    pub fn referer(&self) -> Option<&str> {
318        self.referer.as_deref()
319    }
320
321    /// Get the Host header, if present.
322    #[inline]
323    pub fn host(&self) -> Option<&str> {
324        self.host.as_deref()
325    }
326
327    /// Get the response body size in bytes.
328    #[inline]
329    pub fn response_bytes(&self) -> u64 {
330        self.response_bytes
331    }
332
333    /// Get the GeoIP country code, if determined.
334    #[inline]
335    pub fn geo_country_code(&self) -> Option<&str> {
336        self.geo_country_code.as_deref()
337    }
338
339    /// Check if a geo lookup was performed for this request.
340    #[inline]
341    pub fn geo_lookup_performed(&self) -> bool {
342        self.geo_lookup_performed
343    }
344
345    /// Get traceparent header value for distributed tracing.
346    ///
347    /// Returns the W3C Trace Context traceparent header value if tracing is enabled.
348    /// Format: `{version}-{trace-id}-{span-id}-{trace-flags}`
349    #[inline]
350    pub fn traceparent(&self) -> Option<String> {
351        self.otel_span.as_ref().map(|span| {
352            let sampled = self.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
353            crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled)
354        })
355    }
356
357    // === Mutation helpers ===
358
359    /// Set the trace ID.
360    #[inline]
361    pub fn set_trace_id(&mut self, trace_id: impl Into<String>) {
362        self.trace_id = trace_id.into();
363    }
364
365    /// Set the route ID.
366    #[inline]
367    pub fn set_route_id(&mut self, route_id: impl Into<String>) {
368        self.route_id = Some(route_id.into());
369    }
370
371    /// Set the upstream ID.
372    #[inline]
373    pub fn set_upstream(&mut self, upstream: impl Into<String>) {
374        self.upstream = Some(upstream.into());
375    }
376
377    /// Set the selected upstream peer address (IP:port).
378    #[inline]
379    pub fn set_selected_upstream_address(&mut self, address: impl Into<String>) {
380        self.selected_upstream_address = Some(address.into());
381    }
382
383    /// Increment upstream attempt counter.
384    #[inline]
385    pub fn inc_upstream_attempts(&mut self) {
386        self.upstream_attempts += 1;
387    }
388
389    /// Set response bytes.
390    #[inline]
391    pub fn set_response_bytes(&mut self, bytes: u64) {
392        self.response_bytes = bytes;
393    }
394}
395
396impl Default for RequestContext {
397    fn default() -> Self {
398        Self::new()
399    }
400}
401
402// ============================================================================
403// Tests
404// ============================================================================
405
406#[cfg(test)]
407mod tests {
408    use super::*;
409
410    #[test]
411    fn test_rate_limit_header_info() {
412        let info = RateLimitHeaderInfo {
413            limit: 100,
414            remaining: 42,
415            reset_at: 1704067200,
416        };
417
418        assert_eq!(info.limit, 100);
419        assert_eq!(info.remaining, 42);
420        assert_eq!(info.reset_at, 1704067200);
421    }
422
423    #[test]
424    fn test_request_context_default() {
425        let ctx = RequestContext::new();
426
427        assert!(ctx.trace_id.is_empty());
428        assert!(ctx.rate_limit_info.is_none());
429        assert!(ctx.route_id.is_none());
430        assert!(ctx.config.is_none());
431    }
432
433    #[test]
434    fn test_request_context_rate_limit_info() {
435        let mut ctx = RequestContext::new();
436
437        // Initially no rate limit info
438        assert!(ctx.rate_limit_info.is_none());
439
440        // Set rate limit info
441        ctx.rate_limit_info = Some(RateLimitHeaderInfo {
442            limit: 50,
443            remaining: 25,
444            reset_at: 1704067300,
445        });
446
447        assert!(ctx.rate_limit_info.is_some());
448        let info = ctx.rate_limit_info.as_ref().unwrap();
449        assert_eq!(info.limit, 50);
450        assert_eq!(info.remaining, 25);
451        assert_eq!(info.reset_at, 1704067300);
452    }
453
454    #[test]
455    fn test_request_context_elapsed() {
456        let ctx = RequestContext::new();
457
458        // Elapsed time should be very small (less than 1 second)
459        let elapsed = ctx.elapsed();
460        assert!(elapsed.as_secs() < 1);
461    }
462
463    #[test]
464    fn test_request_context_setters() {
465        let mut ctx = RequestContext::new();
466
467        ctx.set_trace_id("trace-123");
468        assert_eq!(ctx.trace_id(), "trace-123");
469        assert_eq!(ctx.correlation_id(), "trace-123");
470
471        ctx.set_route_id("my-route");
472        assert_eq!(ctx.route_id(), Some("my-route"));
473
474        ctx.set_upstream("backend-pool");
475        assert_eq!(ctx.upstream(), Some("backend-pool"));
476
477        ctx.inc_upstream_attempts();
478        ctx.inc_upstream_attempts();
479        assert_eq!(ctx.upstream_attempts(), 2);
480
481        ctx.set_response_bytes(1024);
482        assert_eq!(ctx.response_bytes(), 1024);
483    }
484}