1use std::sync::Arc;
7use std::time::Instant;
8
9use sentinel_config::{BodyStreamingMode, Config, RouteConfig, ServiceType};
10
11use crate::inference::StreamingTokenCounter;
12use crate::websocket::WebSocketHandler;
13
14#[derive(Debug, Clone)]
16pub enum FallbackReason {
17 HealthCheckFailed,
19 BudgetExhausted,
21 LatencyThreshold { observed_ms: u64, threshold_ms: u64 },
23 ErrorCode(u16),
25 ConnectionError(String),
27}
28
29impl std::fmt::Display for FallbackReason {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 match self {
32 FallbackReason::HealthCheckFailed => write!(f, "health_check_failed"),
33 FallbackReason::BudgetExhausted => write!(f, "budget_exhausted"),
34 FallbackReason::LatencyThreshold {
35 observed_ms,
36 threshold_ms,
37 } => write!(
38 f,
39 "latency_threshold_{}ms_exceeded_{}ms",
40 observed_ms, threshold_ms
41 ),
42 FallbackReason::ErrorCode(code) => write!(f, "error_code_{}", code),
43 FallbackReason::ConnectionError(msg) => write!(f, "connection_error_{}", msg),
44 }
45 }
46}
47
48#[derive(Debug, Clone)]
50pub struct RateLimitHeaderInfo {
51 pub limit: u32,
53 pub remaining: u32,
55 pub reset_at: u64,
57}
58
59pub struct RequestContext {
65 start_time: Instant,
67
68 pub(crate) trace_id: String,
71
72 pub(crate) config: Option<Arc<Config>>,
75
76 pub(crate) route_id: Option<String>,
79 pub(crate) route_config: Option<Arc<RouteConfig>>,
81 pub(crate) upstream: Option<String>,
83 pub(crate) selected_upstream_address: Option<String>,
85 pub(crate) upstream_attempts: u32,
87
88 pub(crate) namespace: Option<String>,
91 pub(crate) service: Option<String>,
93
94 pub(crate) method: String,
97 pub(crate) path: String,
99 pub(crate) query: Option<String>,
101
102 pub(crate) client_ip: String,
105 pub(crate) user_agent: Option<String>,
107 pub(crate) referer: Option<String>,
109 pub(crate) host: Option<String>,
111
112 pub(crate) request_body_bytes: u64,
115 pub(crate) response_bytes: u64,
117
118 pub(crate) connection_reused: bool,
121 pub(crate) is_websocket_upgrade: bool,
123
124 pub(crate) websocket_inspection_enabled: bool,
127 pub(crate) websocket_skip_inspection: bool,
129 pub(crate) websocket_inspection_agents: Vec<String>,
131 pub(crate) websocket_handler: Option<Arc<WebSocketHandler>>,
133
134 pub(crate) cache_eligible: bool,
137
138 pub(crate) body_inspection_enabled: bool,
141 pub(crate) body_bytes_inspected: u64,
143 pub(crate) body_buffer: Vec<u8>,
145 pub(crate) body_inspection_agents: Vec<String>,
147
148 pub(crate) decompression_enabled: bool,
151 pub(crate) body_content_encoding: Option<String>,
153 pub(crate) max_decompression_ratio: f64,
155 pub(crate) max_decompression_bytes: usize,
157 pub(crate) body_was_decompressed: bool,
159
160 pub(crate) rate_limit_info: Option<RateLimitHeaderInfo>,
163
164 pub(crate) geo_country_code: Option<String>,
167 pub(crate) geo_lookup_performed: bool,
169
170 pub(crate) request_body_streaming_mode: BodyStreamingMode,
173 pub(crate) request_body_chunk_index: u32,
175 pub(crate) agent_needs_more: bool,
177 pub(crate) response_body_streaming_mode: BodyStreamingMode,
179 pub(crate) response_body_chunk_index: u32,
181 pub(crate) response_body_bytes_inspected: u64,
183 pub(crate) response_body_inspection_enabled: bool,
185 pub(crate) response_body_inspection_agents: Vec<String>,
187
188 pub(crate) otel_span: Option<crate::otel::RequestSpan>,
191 pub(crate) trace_context: Option<crate::otel::TraceContext>,
193
194 pub(crate) inference_rate_limit_enabled: bool,
197 pub(crate) inference_estimated_tokens: u64,
199 pub(crate) inference_rate_limit_key: Option<String>,
201 pub(crate) inference_model: Option<String>,
203 pub(crate) inference_provider_override: Option<sentinel_config::InferenceProvider>,
205 pub(crate) model_routing_used: bool,
207 pub(crate) inference_actual_tokens: Option<u64>,
209
210 pub(crate) inference_budget_enabled: bool,
213 pub(crate) inference_budget_remaining: Option<i64>,
215 pub(crate) inference_budget_period_reset: Option<u64>,
217 pub(crate) inference_budget_exhausted: bool,
219
220 pub(crate) inference_cost_enabled: bool,
223 pub(crate) inference_request_cost: Option<f64>,
225 pub(crate) inference_input_tokens: u64,
227 pub(crate) inference_output_tokens: u64,
229
230 pub(crate) inference_streaming_response: bool,
233 pub(crate) inference_streaming_counter: Option<StreamingTokenCounter>,
235
236 pub(crate) fallback_attempt: u32,
239 pub(crate) tried_upstreams: Vec<String>,
241 pub(crate) fallback_reason: Option<FallbackReason>,
243 pub(crate) original_upstream: Option<String>,
245 pub(crate) model_mapping_applied: Option<(String, String)>,
247 pub(crate) should_retry_with_fallback: bool,
249
250 pub(crate) guardrails_enabled: bool,
253 pub(crate) guardrail_warning: bool,
255 pub(crate) guardrail_detection_categories: Vec<String>,
257 pub(crate) pii_detection_categories: Vec<String>,
259
260 pub(crate) shadow_pending: Option<ShadowPendingRequest>,
263 pub(crate) shadow_sent: bool,
265
266 pub(crate) sticky_session_new_assignment: bool,
269 pub(crate) sticky_session_set_cookie: Option<String>,
271 pub(crate) sticky_target_index: Option<usize>,
273}
274
275#[derive(Clone)]
277pub struct ShadowPendingRequest {
278 pub headers: pingora::http::RequestHeader,
280 pub manager: std::sync::Arc<crate::shadow::ShadowManager>,
282 pub request_ctx: crate::upstream::RequestContext,
284 pub include_body: bool,
286}
287
288impl RequestContext {
289 pub fn new() -> Self {
291 Self {
292 start_time: Instant::now(),
293 trace_id: String::new(),
294 config: None,
295 route_id: None,
296 route_config: None,
297 upstream: None,
298 selected_upstream_address: None,
299 upstream_attempts: 0,
300 namespace: None,
301 service: None,
302 method: String::new(),
303 path: String::new(),
304 query: None,
305 client_ip: String::new(),
306 user_agent: None,
307 referer: None,
308 host: None,
309 request_body_bytes: 0,
310 response_bytes: 0,
311 connection_reused: false,
312 is_websocket_upgrade: false,
313 websocket_inspection_enabled: false,
314 websocket_skip_inspection: false,
315 websocket_inspection_agents: Vec::new(),
316 websocket_handler: None,
317 cache_eligible: false,
318 body_inspection_enabled: false,
319 body_bytes_inspected: 0,
320 body_buffer: Vec::new(),
321 body_inspection_agents: Vec::new(),
322 decompression_enabled: false,
323 body_content_encoding: None,
324 max_decompression_ratio: 100.0,
325 max_decompression_bytes: 10 * 1024 * 1024, body_was_decompressed: false,
327 rate_limit_info: None,
328 geo_country_code: None,
329 geo_lookup_performed: false,
330 request_body_streaming_mode: BodyStreamingMode::Buffer,
331 request_body_chunk_index: 0,
332 agent_needs_more: false,
333 response_body_streaming_mode: BodyStreamingMode::Buffer,
334 response_body_chunk_index: 0,
335 response_body_bytes_inspected: 0,
336 response_body_inspection_enabled: false,
337 response_body_inspection_agents: Vec::new(),
338 otel_span: None,
339 trace_context: None,
340 inference_rate_limit_enabled: false,
341 inference_estimated_tokens: 0,
342 inference_rate_limit_key: None,
343 inference_model: None,
344 inference_provider_override: None,
345 model_routing_used: false,
346 inference_actual_tokens: None,
347 inference_budget_enabled: false,
348 inference_budget_remaining: None,
349 inference_budget_period_reset: None,
350 inference_budget_exhausted: false,
351 inference_cost_enabled: false,
352 inference_request_cost: None,
353 inference_input_tokens: 0,
354 inference_output_tokens: 0,
355 inference_streaming_response: false,
356 inference_streaming_counter: None,
357 fallback_attempt: 0,
358 tried_upstreams: Vec::new(),
359 fallback_reason: None,
360 original_upstream: None,
361 model_mapping_applied: None,
362 should_retry_with_fallback: false,
363 guardrails_enabled: false,
364 guardrail_warning: false,
365 guardrail_detection_categories: Vec::new(),
366 pii_detection_categories: Vec::new(),
367 shadow_pending: None,
368 shadow_sent: false,
369 sticky_session_new_assignment: false,
370 sticky_session_set_cookie: None,
371 sticky_target_index: None,
372 }
373 }
374
375 #[inline]
379 pub fn start_time(&self) -> Instant {
380 self.start_time
381 }
382
383 #[inline]
385 pub fn elapsed(&self) -> std::time::Duration {
386 self.start_time.elapsed()
387 }
388
389 #[inline]
393 pub fn correlation_id(&self) -> &str {
394 &self.trace_id
395 }
396
397 #[inline]
399 pub fn trace_id(&self) -> &str {
400 &self.trace_id
401 }
402
403 #[inline]
405 pub fn route_id(&self) -> Option<&str> {
406 self.route_id.as_deref()
407 }
408
409 #[inline]
411 pub fn upstream(&self) -> Option<&str> {
412 self.upstream.as_deref()
413 }
414
415 #[inline]
417 pub fn selected_upstream_address(&self) -> Option<&str> {
418 self.selected_upstream_address.as_deref()
419 }
420
421 #[inline]
423 pub fn route_config(&self) -> Option<&Arc<RouteConfig>> {
424 self.route_config.as_ref()
425 }
426
427 #[inline]
429 pub fn global_config(&self) -> Option<&Arc<Config>> {
430 self.config.as_ref()
431 }
432
433 #[inline]
435 pub fn service_type(&self) -> Option<ServiceType> {
436 self.route_config.as_ref().map(|c| c.service_type.clone())
437 }
438
439 #[inline]
441 pub fn upstream_attempts(&self) -> u32 {
442 self.upstream_attempts
443 }
444
445 #[inline]
447 pub fn method(&self) -> &str {
448 &self.method
449 }
450
451 #[inline]
453 pub fn path(&self) -> &str {
454 &self.path
455 }
456
457 #[inline]
459 pub fn query(&self) -> Option<&str> {
460 self.query.as_deref()
461 }
462
463 #[inline]
465 pub fn client_ip(&self) -> &str {
466 &self.client_ip
467 }
468
469 #[inline]
471 pub fn user_agent(&self) -> Option<&str> {
472 self.user_agent.as_deref()
473 }
474
475 #[inline]
477 pub fn referer(&self) -> Option<&str> {
478 self.referer.as_deref()
479 }
480
481 #[inline]
483 pub fn host(&self) -> Option<&str> {
484 self.host.as_deref()
485 }
486
487 #[inline]
489 pub fn response_bytes(&self) -> u64 {
490 self.response_bytes
491 }
492
493 #[inline]
495 pub fn geo_country_code(&self) -> Option<&str> {
496 self.geo_country_code.as_deref()
497 }
498
499 #[inline]
501 pub fn geo_lookup_performed(&self) -> bool {
502 self.geo_lookup_performed
503 }
504
505 #[inline]
510 pub fn traceparent(&self) -> Option<String> {
511 self.otel_span.as_ref().map(|span| {
512 let sampled = self
513 .trace_context
514 .as_ref()
515 .map(|c| c.sampled)
516 .unwrap_or(true);
517 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled)
518 })
519 }
520
521 #[inline]
525 pub fn set_trace_id(&mut self, trace_id: impl Into<String>) {
526 self.trace_id = trace_id.into();
527 }
528
529 #[inline]
531 pub fn set_route_id(&mut self, route_id: impl Into<String>) {
532 self.route_id = Some(route_id.into());
533 }
534
535 #[inline]
537 pub fn set_upstream(&mut self, upstream: impl Into<String>) {
538 self.upstream = Some(upstream.into());
539 }
540
541 #[inline]
543 pub fn set_selected_upstream_address(&mut self, address: impl Into<String>) {
544 self.selected_upstream_address = Some(address.into());
545 }
546
547 #[inline]
549 pub fn inc_upstream_attempts(&mut self) {
550 self.upstream_attempts += 1;
551 }
552
553 #[inline]
555 pub fn set_response_bytes(&mut self, bytes: u64) {
556 self.response_bytes = bytes;
557 }
558
559 #[inline]
563 pub fn fallback_attempt(&self) -> u32 {
564 self.fallback_attempt
565 }
566
567 #[inline]
569 pub fn tried_upstreams(&self) -> &[String] {
570 &self.tried_upstreams
571 }
572
573 #[inline]
575 pub fn fallback_reason(&self) -> Option<&FallbackReason> {
576 self.fallback_reason.as_ref()
577 }
578
579 #[inline]
581 pub fn original_upstream(&self) -> Option<&str> {
582 self.original_upstream.as_deref()
583 }
584
585 #[inline]
587 pub fn model_mapping_applied(&self) -> Option<&(String, String)> {
588 self.model_mapping_applied.as_ref()
589 }
590
591 #[inline]
593 pub fn used_fallback(&self) -> bool {
594 self.fallback_attempt > 0
595 }
596
597 #[inline]
599 pub fn record_fallback(&mut self, reason: FallbackReason, new_upstream: &str) {
600 if self.fallback_attempt == 0 {
601 self.original_upstream = self.upstream.clone();
603 }
604 self.fallback_attempt += 1;
605 self.fallback_reason = Some(reason);
606 if let Some(current) = &self.upstream {
607 self.tried_upstreams.push(current.clone());
608 }
609 self.upstream = Some(new_upstream.to_string());
610 }
611
612 #[inline]
614 pub fn record_model_mapping(&mut self, original: String, mapped: String) {
615 self.model_mapping_applied = Some((original, mapped));
616 }
617
618 #[inline]
622 pub fn used_model_routing(&self) -> bool {
623 self.model_routing_used
624 }
625
626 #[inline]
628 pub fn inference_provider_override(&self) -> Option<sentinel_config::InferenceProvider> {
629 self.inference_provider_override
630 }
631
632 #[inline]
636 pub fn record_model_routing(
637 &mut self,
638 upstream: &str,
639 model: Option<String>,
640 provider_override: Option<sentinel_config::InferenceProvider>,
641 ) {
642 self.upstream = Some(upstream.to_string());
643 self.model_routing_used = true;
644 if model.is_some() {
645 self.inference_model = model;
646 }
647 self.inference_provider_override = provider_override;
648 }
649}
650
651impl Default for RequestContext {
652 fn default() -> Self {
653 Self::new()
654 }
655}
656
657#[cfg(test)]
662mod tests {
663 use super::*;
664
665 #[test]
666 fn test_rate_limit_header_info() {
667 let info = RateLimitHeaderInfo {
668 limit: 100,
669 remaining: 42,
670 reset_at: 1704067200,
671 };
672
673 assert_eq!(info.limit, 100);
674 assert_eq!(info.remaining, 42);
675 assert_eq!(info.reset_at, 1704067200);
676 }
677
678 #[test]
679 fn test_request_context_default() {
680 let ctx = RequestContext::new();
681
682 assert!(ctx.trace_id.is_empty());
683 assert!(ctx.rate_limit_info.is_none());
684 assert!(ctx.route_id.is_none());
685 assert!(ctx.config.is_none());
686 }
687
688 #[test]
689 fn test_request_context_rate_limit_info() {
690 let mut ctx = RequestContext::new();
691
692 assert!(ctx.rate_limit_info.is_none());
694
695 ctx.rate_limit_info = Some(RateLimitHeaderInfo {
697 limit: 50,
698 remaining: 25,
699 reset_at: 1704067300,
700 });
701
702 assert!(ctx.rate_limit_info.is_some());
703 let info = ctx.rate_limit_info.as_ref().unwrap();
704 assert_eq!(info.limit, 50);
705 assert_eq!(info.remaining, 25);
706 assert_eq!(info.reset_at, 1704067300);
707 }
708
709 #[test]
710 fn test_request_context_elapsed() {
711 let ctx = RequestContext::new();
712
713 let elapsed = ctx.elapsed();
715 assert!(elapsed.as_secs() < 1);
716 }
717
718 #[test]
719 fn test_request_context_setters() {
720 let mut ctx = RequestContext::new();
721
722 ctx.set_trace_id("trace-123");
723 assert_eq!(ctx.trace_id(), "trace-123");
724 assert_eq!(ctx.correlation_id(), "trace-123");
725
726 ctx.set_route_id("my-route");
727 assert_eq!(ctx.route_id(), Some("my-route"));
728
729 ctx.set_upstream("backend-pool");
730 assert_eq!(ctx.upstream(), Some("backend-pool"));
731
732 ctx.inc_upstream_attempts();
733 ctx.inc_upstream_attempts();
734 assert_eq!(ctx.upstream_attempts(), 2);
735
736 ctx.set_response_bytes(1024);
737 assert_eq!(ctx.response_bytes(), 1024);
738 }
739
740 #[test]
741 fn test_fallback_tracking() {
742 let mut ctx = RequestContext::new();
743
744 assert_eq!(ctx.fallback_attempt(), 0);
746 assert!(!ctx.used_fallback());
747 assert!(ctx.tried_upstreams().is_empty());
748 assert!(ctx.fallback_reason().is_none());
749 assert!(ctx.original_upstream().is_none());
750
751 ctx.set_upstream("openai-primary");
753
754 ctx.record_fallback(FallbackReason::HealthCheckFailed, "anthropic-fallback");
756
757 assert_eq!(ctx.fallback_attempt(), 1);
758 assert!(ctx.used_fallback());
759 assert_eq!(ctx.tried_upstreams(), &["openai-primary".to_string()]);
760 assert!(matches!(
761 ctx.fallback_reason(),
762 Some(FallbackReason::HealthCheckFailed)
763 ));
764 assert_eq!(ctx.original_upstream(), Some("openai-primary"));
765 assert_eq!(ctx.upstream(), Some("anthropic-fallback"));
766
767 ctx.record_fallback(FallbackReason::ErrorCode(503), "local-gpu");
769
770 assert_eq!(ctx.fallback_attempt(), 2);
771 assert_eq!(
772 ctx.tried_upstreams(),
773 &[
774 "openai-primary".to_string(),
775 "anthropic-fallback".to_string()
776 ]
777 );
778 assert!(matches!(
779 ctx.fallback_reason(),
780 Some(FallbackReason::ErrorCode(503))
781 ));
782 assert_eq!(ctx.original_upstream(), Some("openai-primary"));
784 assert_eq!(ctx.upstream(), Some("local-gpu"));
785 }
786
787 #[test]
788 fn test_model_mapping_tracking() {
789 let mut ctx = RequestContext::new();
790
791 assert!(ctx.model_mapping_applied().is_none());
792
793 ctx.record_model_mapping("gpt-4".to_string(), "claude-3-opus".to_string());
794
795 let mapping = ctx.model_mapping_applied().unwrap();
796 assert_eq!(mapping.0, "gpt-4");
797 assert_eq!(mapping.1, "claude-3-opus");
798 }
799
800 #[test]
801 fn test_fallback_reason_display() {
802 assert_eq!(
803 FallbackReason::HealthCheckFailed.to_string(),
804 "health_check_failed"
805 );
806 assert_eq!(
807 FallbackReason::BudgetExhausted.to_string(),
808 "budget_exhausted"
809 );
810 assert_eq!(
811 FallbackReason::LatencyThreshold {
812 observed_ms: 5500,
813 threshold_ms: 5000
814 }
815 .to_string(),
816 "latency_threshold_5500ms_exceeded_5000ms"
817 );
818 assert_eq!(FallbackReason::ErrorCode(502).to_string(), "error_code_502");
819 assert_eq!(
820 FallbackReason::ConnectionError("timeout".to_string()).to_string(),
821 "connection_error_timeout"
822 );
823 }
824}