1use std::sync::Arc;
7use std::time::Instant;
8
9use sentinel_config::{BodyStreamingMode, Config, RouteConfig, ServiceType};
10
11use crate::websocket::WebSocketHandler;
12
13#[derive(Debug, Clone)]
15pub struct RateLimitHeaderInfo {
16 pub limit: u32,
18 pub remaining: u32,
20 pub reset_at: u64,
22}
23
24pub struct RequestContext {
30 start_time: Instant,
32
33 pub(crate) trace_id: String,
36
37 pub(crate) config: Option<Arc<Config>>,
40
41 pub(crate) route_id: Option<String>,
44 pub(crate) route_config: Option<Arc<RouteConfig>>,
46 pub(crate) upstream: Option<String>,
48 pub(crate) selected_upstream_address: Option<String>,
50 pub(crate) upstream_attempts: u32,
52
53 pub(crate) namespace: Option<String>,
56 pub(crate) service: Option<String>,
58
59 pub(crate) method: String,
62 pub(crate) path: String,
64 pub(crate) query: Option<String>,
66
67 pub(crate) client_ip: String,
70 pub(crate) user_agent: Option<String>,
72 pub(crate) referer: Option<String>,
74 pub(crate) host: Option<String>,
76
77 pub(crate) request_body_bytes: u64,
80 pub(crate) response_bytes: u64,
82
83 pub(crate) connection_reused: bool,
86 pub(crate) is_websocket_upgrade: bool,
88
89 pub(crate) websocket_inspection_enabled: bool,
92 pub(crate) websocket_skip_inspection: bool,
94 pub(crate) websocket_inspection_agents: Vec<String>,
96 pub(crate) websocket_handler: Option<Arc<WebSocketHandler>>,
98
99 pub(crate) cache_eligible: bool,
102
103 pub(crate) body_inspection_enabled: bool,
106 pub(crate) body_bytes_inspected: u64,
108 pub(crate) body_buffer: Vec<u8>,
110 pub(crate) body_inspection_agents: Vec<String>,
112
113 pub(crate) decompression_enabled: bool,
116 pub(crate) body_content_encoding: Option<String>,
118 pub(crate) max_decompression_ratio: f64,
120 pub(crate) max_decompression_bytes: usize,
122 pub(crate) body_was_decompressed: bool,
124
125 pub(crate) rate_limit_info: Option<RateLimitHeaderInfo>,
128
129 pub(crate) geo_country_code: Option<String>,
132 pub(crate) geo_lookup_performed: bool,
134
135 pub(crate) request_body_streaming_mode: BodyStreamingMode,
138 pub(crate) request_body_chunk_index: u32,
140 pub(crate) agent_needs_more: bool,
142 pub(crate) response_body_streaming_mode: BodyStreamingMode,
144 pub(crate) response_body_chunk_index: u32,
146 pub(crate) response_body_bytes_inspected: u64,
148 pub(crate) response_body_inspection_enabled: bool,
150 pub(crate) response_body_inspection_agents: Vec<String>,
152
153 pub(crate) otel_span: Option<crate::otel::RequestSpan>,
156 pub(crate) trace_context: Option<crate::otel::TraceContext>,
158}
159
160impl RequestContext {
161 pub fn new() -> Self {
163 Self {
164 start_time: Instant::now(),
165 trace_id: String::new(),
166 config: None,
167 route_id: None,
168 route_config: None,
169 upstream: None,
170 selected_upstream_address: None,
171 upstream_attempts: 0,
172 namespace: None,
173 service: None,
174 method: String::new(),
175 path: String::new(),
176 query: None,
177 client_ip: String::new(),
178 user_agent: None,
179 referer: None,
180 host: None,
181 request_body_bytes: 0,
182 response_bytes: 0,
183 connection_reused: false,
184 is_websocket_upgrade: false,
185 websocket_inspection_enabled: false,
186 websocket_skip_inspection: false,
187 websocket_inspection_agents: Vec::new(),
188 websocket_handler: None,
189 cache_eligible: false,
190 body_inspection_enabled: false,
191 body_bytes_inspected: 0,
192 body_buffer: Vec::new(),
193 body_inspection_agents: Vec::new(),
194 decompression_enabled: false,
195 body_content_encoding: None,
196 max_decompression_ratio: 100.0,
197 max_decompression_bytes: 10 * 1024 * 1024, body_was_decompressed: false,
199 rate_limit_info: None,
200 geo_country_code: None,
201 geo_lookup_performed: false,
202 request_body_streaming_mode: BodyStreamingMode::Buffer,
203 request_body_chunk_index: 0,
204 agent_needs_more: false,
205 response_body_streaming_mode: BodyStreamingMode::Buffer,
206 response_body_chunk_index: 0,
207 response_body_bytes_inspected: 0,
208 response_body_inspection_enabled: false,
209 response_body_inspection_agents: Vec::new(),
210 otel_span: None,
211 trace_context: None,
212 }
213 }
214
215 #[inline]
219 pub fn start_time(&self) -> Instant {
220 self.start_time
221 }
222
223 #[inline]
225 pub fn elapsed(&self) -> std::time::Duration {
226 self.start_time.elapsed()
227 }
228
229 #[inline]
233 pub fn correlation_id(&self) -> &str {
234 &self.trace_id
235 }
236
237 #[inline]
239 pub fn trace_id(&self) -> &str {
240 &self.trace_id
241 }
242
243 #[inline]
245 pub fn route_id(&self) -> Option<&str> {
246 self.route_id.as_deref()
247 }
248
249 #[inline]
251 pub fn upstream(&self) -> Option<&str> {
252 self.upstream.as_deref()
253 }
254
255 #[inline]
257 pub fn selected_upstream_address(&self) -> Option<&str> {
258 self.selected_upstream_address.as_deref()
259 }
260
261 #[inline]
263 pub fn route_config(&self) -> Option<&Arc<RouteConfig>> {
264 self.route_config.as_ref()
265 }
266
267 #[inline]
269 pub fn global_config(&self) -> Option<&Arc<Config>> {
270 self.config.as_ref()
271 }
272
273 #[inline]
275 pub fn service_type(&self) -> Option<ServiceType> {
276 self.route_config.as_ref().map(|c| c.service_type.clone())
277 }
278
279 #[inline]
281 pub fn upstream_attempts(&self) -> u32 {
282 self.upstream_attempts
283 }
284
285 #[inline]
287 pub fn method(&self) -> &str {
288 &self.method
289 }
290
291 #[inline]
293 pub fn path(&self) -> &str {
294 &self.path
295 }
296
297 #[inline]
299 pub fn query(&self) -> Option<&str> {
300 self.query.as_deref()
301 }
302
303 #[inline]
305 pub fn client_ip(&self) -> &str {
306 &self.client_ip
307 }
308
309 #[inline]
311 pub fn user_agent(&self) -> Option<&str> {
312 self.user_agent.as_deref()
313 }
314
315 #[inline]
317 pub fn referer(&self) -> Option<&str> {
318 self.referer.as_deref()
319 }
320
321 #[inline]
323 pub fn host(&self) -> Option<&str> {
324 self.host.as_deref()
325 }
326
327 #[inline]
329 pub fn response_bytes(&self) -> u64 {
330 self.response_bytes
331 }
332
333 #[inline]
335 pub fn geo_country_code(&self) -> Option<&str> {
336 self.geo_country_code.as_deref()
337 }
338
339 #[inline]
341 pub fn geo_lookup_performed(&self) -> bool {
342 self.geo_lookup_performed
343 }
344
345 #[inline]
350 pub fn traceparent(&self) -> Option<String> {
351 self.otel_span.as_ref().map(|span| {
352 let sampled = self.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
353 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled)
354 })
355 }
356
357 #[inline]
361 pub fn set_trace_id(&mut self, trace_id: impl Into<String>) {
362 self.trace_id = trace_id.into();
363 }
364
365 #[inline]
367 pub fn set_route_id(&mut self, route_id: impl Into<String>) {
368 self.route_id = Some(route_id.into());
369 }
370
371 #[inline]
373 pub fn set_upstream(&mut self, upstream: impl Into<String>) {
374 self.upstream = Some(upstream.into());
375 }
376
377 #[inline]
379 pub fn set_selected_upstream_address(&mut self, address: impl Into<String>) {
380 self.selected_upstream_address = Some(address.into());
381 }
382
383 #[inline]
385 pub fn inc_upstream_attempts(&mut self) {
386 self.upstream_attempts += 1;
387 }
388
389 #[inline]
391 pub fn set_response_bytes(&mut self, bytes: u64) {
392 self.response_bytes = bytes;
393 }
394}
395
396impl Default for RequestContext {
397 fn default() -> Self {
398 Self::new()
399 }
400}
401
402#[cfg(test)]
407mod tests {
408 use super::*;
409
410 #[test]
411 fn test_rate_limit_header_info() {
412 let info = RateLimitHeaderInfo {
413 limit: 100,
414 remaining: 42,
415 reset_at: 1704067200,
416 };
417
418 assert_eq!(info.limit, 100);
419 assert_eq!(info.remaining, 42);
420 assert_eq!(info.reset_at, 1704067200);
421 }
422
423 #[test]
424 fn test_request_context_default() {
425 let ctx = RequestContext::new();
426
427 assert!(ctx.trace_id.is_empty());
428 assert!(ctx.rate_limit_info.is_none());
429 assert!(ctx.route_id.is_none());
430 assert!(ctx.config.is_none());
431 }
432
433 #[test]
434 fn test_request_context_rate_limit_info() {
435 let mut ctx = RequestContext::new();
436
437 assert!(ctx.rate_limit_info.is_none());
439
440 ctx.rate_limit_info = Some(RateLimitHeaderInfo {
442 limit: 50,
443 remaining: 25,
444 reset_at: 1704067300,
445 });
446
447 assert!(ctx.rate_limit_info.is_some());
448 let info = ctx.rate_limit_info.as_ref().unwrap();
449 assert_eq!(info.limit, 50);
450 assert_eq!(info.remaining, 25);
451 assert_eq!(info.reset_at, 1704067300);
452 }
453
454 #[test]
455 fn test_request_context_elapsed() {
456 let ctx = RequestContext::new();
457
458 let elapsed = ctx.elapsed();
460 assert!(elapsed.as_secs() < 1);
461 }
462
463 #[test]
464 fn test_request_context_setters() {
465 let mut ctx = RequestContext::new();
466
467 ctx.set_trace_id("trace-123");
468 assert_eq!(ctx.trace_id(), "trace-123");
469 assert_eq!(ctx.correlation_id(), "trace-123");
470
471 ctx.set_route_id("my-route");
472 assert_eq!(ctx.route_id(), Some("my-route"));
473
474 ctx.set_upstream("backend-pool");
475 assert_eq!(ctx.upstream(), Some("backend-pool"));
476
477 ctx.inc_upstream_attempts();
478 ctx.inc_upstream_attempts();
479 assert_eq!(ctx.upstream_attempts(), 2);
480
481 ctx.set_response_bytes(1024);
482 assert_eq!(ctx.response_bytes(), 1024);
483 }
484}