1use std::sync::Arc;
7use std::time::Instant;
8
9use sentinel_config::{BodyStreamingMode, Config, RouteConfig, ServiceType};
10
11use crate::inference::StreamingTokenCounter;
12use crate::websocket::WebSocketHandler;
13
14#[derive(Debug, Clone)]
16pub enum FallbackReason {
17 HealthCheckFailed,
19 BudgetExhausted,
21 LatencyThreshold { observed_ms: u64, threshold_ms: u64 },
23 ErrorCode(u16),
25 ConnectionError(String),
27}
28
29impl std::fmt::Display for FallbackReason {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 match self {
32 FallbackReason::HealthCheckFailed => write!(f, "health_check_failed"),
33 FallbackReason::BudgetExhausted => write!(f, "budget_exhausted"),
34 FallbackReason::LatencyThreshold {
35 observed_ms,
36 threshold_ms,
37 } => write!(f, "latency_threshold_{}ms_exceeded_{}ms", observed_ms, threshold_ms),
38 FallbackReason::ErrorCode(code) => write!(f, "error_code_{}", code),
39 FallbackReason::ConnectionError(msg) => write!(f, "connection_error_{}", msg),
40 }
41 }
42}
43
44#[derive(Debug, Clone)]
46pub struct RateLimitHeaderInfo {
47 pub limit: u32,
49 pub remaining: u32,
51 pub reset_at: u64,
53}
54
55pub struct RequestContext {
61 start_time: Instant,
63
64 pub(crate) trace_id: String,
67
68 pub(crate) config: Option<Arc<Config>>,
71
72 pub(crate) route_id: Option<String>,
75 pub(crate) route_config: Option<Arc<RouteConfig>>,
77 pub(crate) upstream: Option<String>,
79 pub(crate) selected_upstream_address: Option<String>,
81 pub(crate) upstream_attempts: u32,
83
84 pub(crate) namespace: Option<String>,
87 pub(crate) service: Option<String>,
89
90 pub(crate) method: String,
93 pub(crate) path: String,
95 pub(crate) query: Option<String>,
97
98 pub(crate) client_ip: String,
101 pub(crate) user_agent: Option<String>,
103 pub(crate) referer: Option<String>,
105 pub(crate) host: Option<String>,
107
108 pub(crate) request_body_bytes: u64,
111 pub(crate) response_bytes: u64,
113
114 pub(crate) connection_reused: bool,
117 pub(crate) is_websocket_upgrade: bool,
119
120 pub(crate) websocket_inspection_enabled: bool,
123 pub(crate) websocket_skip_inspection: bool,
125 pub(crate) websocket_inspection_agents: Vec<String>,
127 pub(crate) websocket_handler: Option<Arc<WebSocketHandler>>,
129
130 pub(crate) cache_eligible: bool,
133
134 pub(crate) body_inspection_enabled: bool,
137 pub(crate) body_bytes_inspected: u64,
139 pub(crate) body_buffer: Vec<u8>,
141 pub(crate) body_inspection_agents: Vec<String>,
143
144 pub(crate) decompression_enabled: bool,
147 pub(crate) body_content_encoding: Option<String>,
149 pub(crate) max_decompression_ratio: f64,
151 pub(crate) max_decompression_bytes: usize,
153 pub(crate) body_was_decompressed: bool,
155
156 pub(crate) rate_limit_info: Option<RateLimitHeaderInfo>,
159
160 pub(crate) geo_country_code: Option<String>,
163 pub(crate) geo_lookup_performed: bool,
165
166 pub(crate) request_body_streaming_mode: BodyStreamingMode,
169 pub(crate) request_body_chunk_index: u32,
171 pub(crate) agent_needs_more: bool,
173 pub(crate) response_body_streaming_mode: BodyStreamingMode,
175 pub(crate) response_body_chunk_index: u32,
177 pub(crate) response_body_bytes_inspected: u64,
179 pub(crate) response_body_inspection_enabled: bool,
181 pub(crate) response_body_inspection_agents: Vec<String>,
183
184 pub(crate) otel_span: Option<crate::otel::RequestSpan>,
187 pub(crate) trace_context: Option<crate::otel::TraceContext>,
189
190 pub(crate) inference_rate_limit_enabled: bool,
193 pub(crate) inference_estimated_tokens: u64,
195 pub(crate) inference_rate_limit_key: Option<String>,
197 pub(crate) inference_model: Option<String>,
199 pub(crate) inference_provider_override: Option<sentinel_config::InferenceProvider>,
201 pub(crate) model_routing_used: bool,
203 pub(crate) inference_actual_tokens: Option<u64>,
205
206 pub(crate) inference_budget_enabled: bool,
209 pub(crate) inference_budget_remaining: Option<i64>,
211 pub(crate) inference_budget_period_reset: Option<u64>,
213 pub(crate) inference_budget_exhausted: bool,
215
216 pub(crate) inference_cost_enabled: bool,
219 pub(crate) inference_request_cost: Option<f64>,
221 pub(crate) inference_input_tokens: u64,
223 pub(crate) inference_output_tokens: u64,
225
226 pub(crate) inference_streaming_response: bool,
229 pub(crate) inference_streaming_counter: Option<StreamingTokenCounter>,
231
232 pub(crate) fallback_attempt: u32,
235 pub(crate) tried_upstreams: Vec<String>,
237 pub(crate) fallback_reason: Option<FallbackReason>,
239 pub(crate) original_upstream: Option<String>,
241 pub(crate) model_mapping_applied: Option<(String, String)>,
243 pub(crate) should_retry_with_fallback: bool,
245
246 pub(crate) guardrails_enabled: bool,
249 pub(crate) guardrail_warning: bool,
251 pub(crate) guardrail_detection_categories: Vec<String>,
253 pub(crate) pii_detection_categories: Vec<String>,
255
256 pub(crate) shadow_pending: Option<ShadowPendingRequest>,
259 pub(crate) shadow_sent: bool,
261
262 pub(crate) sticky_session_new_assignment: bool,
265 pub(crate) sticky_session_set_cookie: Option<String>,
267 pub(crate) sticky_target_index: Option<usize>,
269}
270
271#[derive(Clone)]
273pub struct ShadowPendingRequest {
274 pub headers: pingora::http::RequestHeader,
276 pub manager: std::sync::Arc<crate::shadow::ShadowManager>,
278 pub request_ctx: crate::upstream::RequestContext,
280 pub include_body: bool,
282}
283
284impl RequestContext {
285 pub fn new() -> Self {
287 Self {
288 start_time: Instant::now(),
289 trace_id: String::new(),
290 config: None,
291 route_id: None,
292 route_config: None,
293 upstream: None,
294 selected_upstream_address: None,
295 upstream_attempts: 0,
296 namespace: None,
297 service: None,
298 method: String::new(),
299 path: String::new(),
300 query: None,
301 client_ip: String::new(),
302 user_agent: None,
303 referer: None,
304 host: None,
305 request_body_bytes: 0,
306 response_bytes: 0,
307 connection_reused: false,
308 is_websocket_upgrade: false,
309 websocket_inspection_enabled: false,
310 websocket_skip_inspection: false,
311 websocket_inspection_agents: Vec::new(),
312 websocket_handler: None,
313 cache_eligible: false,
314 body_inspection_enabled: false,
315 body_bytes_inspected: 0,
316 body_buffer: Vec::new(),
317 body_inspection_agents: Vec::new(),
318 decompression_enabled: false,
319 body_content_encoding: None,
320 max_decompression_ratio: 100.0,
321 max_decompression_bytes: 10 * 1024 * 1024, body_was_decompressed: false,
323 rate_limit_info: None,
324 geo_country_code: None,
325 geo_lookup_performed: false,
326 request_body_streaming_mode: BodyStreamingMode::Buffer,
327 request_body_chunk_index: 0,
328 agent_needs_more: false,
329 response_body_streaming_mode: BodyStreamingMode::Buffer,
330 response_body_chunk_index: 0,
331 response_body_bytes_inspected: 0,
332 response_body_inspection_enabled: false,
333 response_body_inspection_agents: Vec::new(),
334 otel_span: None,
335 trace_context: None,
336 inference_rate_limit_enabled: false,
337 inference_estimated_tokens: 0,
338 inference_rate_limit_key: None,
339 inference_model: None,
340 inference_provider_override: None,
341 model_routing_used: false,
342 inference_actual_tokens: None,
343 inference_budget_enabled: false,
344 inference_budget_remaining: None,
345 inference_budget_period_reset: None,
346 inference_budget_exhausted: false,
347 inference_cost_enabled: false,
348 inference_request_cost: None,
349 inference_input_tokens: 0,
350 inference_output_tokens: 0,
351 inference_streaming_response: false,
352 inference_streaming_counter: None,
353 fallback_attempt: 0,
354 tried_upstreams: Vec::new(),
355 fallback_reason: None,
356 original_upstream: None,
357 model_mapping_applied: None,
358 should_retry_with_fallback: false,
359 guardrails_enabled: false,
360 guardrail_warning: false,
361 guardrail_detection_categories: Vec::new(),
362 pii_detection_categories: Vec::new(),
363 shadow_pending: None,
364 shadow_sent: false,
365 sticky_session_new_assignment: false,
366 sticky_session_set_cookie: None,
367 sticky_target_index: None,
368 }
369 }
370
371 #[inline]
375 pub fn start_time(&self) -> Instant {
376 self.start_time
377 }
378
379 #[inline]
381 pub fn elapsed(&self) -> std::time::Duration {
382 self.start_time.elapsed()
383 }
384
385 #[inline]
389 pub fn correlation_id(&self) -> &str {
390 &self.trace_id
391 }
392
393 #[inline]
395 pub fn trace_id(&self) -> &str {
396 &self.trace_id
397 }
398
399 #[inline]
401 pub fn route_id(&self) -> Option<&str> {
402 self.route_id.as_deref()
403 }
404
405 #[inline]
407 pub fn upstream(&self) -> Option<&str> {
408 self.upstream.as_deref()
409 }
410
411 #[inline]
413 pub fn selected_upstream_address(&self) -> Option<&str> {
414 self.selected_upstream_address.as_deref()
415 }
416
417 #[inline]
419 pub fn route_config(&self) -> Option<&Arc<RouteConfig>> {
420 self.route_config.as_ref()
421 }
422
423 #[inline]
425 pub fn global_config(&self) -> Option<&Arc<Config>> {
426 self.config.as_ref()
427 }
428
429 #[inline]
431 pub fn service_type(&self) -> Option<ServiceType> {
432 self.route_config.as_ref().map(|c| c.service_type.clone())
433 }
434
435 #[inline]
437 pub fn upstream_attempts(&self) -> u32 {
438 self.upstream_attempts
439 }
440
441 #[inline]
443 pub fn method(&self) -> &str {
444 &self.method
445 }
446
447 #[inline]
449 pub fn path(&self) -> &str {
450 &self.path
451 }
452
453 #[inline]
455 pub fn query(&self) -> Option<&str> {
456 self.query.as_deref()
457 }
458
459 #[inline]
461 pub fn client_ip(&self) -> &str {
462 &self.client_ip
463 }
464
465 #[inline]
467 pub fn user_agent(&self) -> Option<&str> {
468 self.user_agent.as_deref()
469 }
470
471 #[inline]
473 pub fn referer(&self) -> Option<&str> {
474 self.referer.as_deref()
475 }
476
477 #[inline]
479 pub fn host(&self) -> Option<&str> {
480 self.host.as_deref()
481 }
482
483 #[inline]
485 pub fn response_bytes(&self) -> u64 {
486 self.response_bytes
487 }
488
489 #[inline]
491 pub fn geo_country_code(&self) -> Option<&str> {
492 self.geo_country_code.as_deref()
493 }
494
495 #[inline]
497 pub fn geo_lookup_performed(&self) -> bool {
498 self.geo_lookup_performed
499 }
500
501 #[inline]
506 pub fn traceparent(&self) -> Option<String> {
507 self.otel_span.as_ref().map(|span| {
508 let sampled = self.trace_context.as_ref().map(|c| c.sampled).unwrap_or(true);
509 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled)
510 })
511 }
512
513 #[inline]
517 pub fn set_trace_id(&mut self, trace_id: impl Into<String>) {
518 self.trace_id = trace_id.into();
519 }
520
521 #[inline]
523 pub fn set_route_id(&mut self, route_id: impl Into<String>) {
524 self.route_id = Some(route_id.into());
525 }
526
527 #[inline]
529 pub fn set_upstream(&mut self, upstream: impl Into<String>) {
530 self.upstream = Some(upstream.into());
531 }
532
533 #[inline]
535 pub fn set_selected_upstream_address(&mut self, address: impl Into<String>) {
536 self.selected_upstream_address = Some(address.into());
537 }
538
539 #[inline]
541 pub fn inc_upstream_attempts(&mut self) {
542 self.upstream_attempts += 1;
543 }
544
545 #[inline]
547 pub fn set_response_bytes(&mut self, bytes: u64) {
548 self.response_bytes = bytes;
549 }
550
551 #[inline]
555 pub fn fallback_attempt(&self) -> u32 {
556 self.fallback_attempt
557 }
558
559 #[inline]
561 pub fn tried_upstreams(&self) -> &[String] {
562 &self.tried_upstreams
563 }
564
565 #[inline]
567 pub fn fallback_reason(&self) -> Option<&FallbackReason> {
568 self.fallback_reason.as_ref()
569 }
570
571 #[inline]
573 pub fn original_upstream(&self) -> Option<&str> {
574 self.original_upstream.as_deref()
575 }
576
577 #[inline]
579 pub fn model_mapping_applied(&self) -> Option<&(String, String)> {
580 self.model_mapping_applied.as_ref()
581 }
582
583 #[inline]
585 pub fn used_fallback(&self) -> bool {
586 self.fallback_attempt > 0
587 }
588
589 #[inline]
591 pub fn record_fallback(&mut self, reason: FallbackReason, new_upstream: &str) {
592 if self.fallback_attempt == 0 {
593 self.original_upstream = self.upstream.clone();
595 }
596 self.fallback_attempt += 1;
597 self.fallback_reason = Some(reason);
598 if let Some(current) = &self.upstream {
599 self.tried_upstreams.push(current.clone());
600 }
601 self.upstream = Some(new_upstream.to_string());
602 }
603
604 #[inline]
606 pub fn record_model_mapping(&mut self, original: String, mapped: String) {
607 self.model_mapping_applied = Some((original, mapped));
608 }
609
610 #[inline]
614 pub fn used_model_routing(&self) -> bool {
615 self.model_routing_used
616 }
617
618 #[inline]
620 pub fn inference_provider_override(&self) -> Option<sentinel_config::InferenceProvider> {
621 self.inference_provider_override
622 }
623
624 #[inline]
628 pub fn record_model_routing(
629 &mut self,
630 upstream: &str,
631 model: Option<String>,
632 provider_override: Option<sentinel_config::InferenceProvider>,
633 ) {
634 self.upstream = Some(upstream.to_string());
635 self.model_routing_used = true;
636 if model.is_some() {
637 self.inference_model = model;
638 }
639 self.inference_provider_override = provider_override;
640 }
641}
642
643impl Default for RequestContext {
644 fn default() -> Self {
645 Self::new()
646 }
647}
648
649#[cfg(test)]
654mod tests {
655 use super::*;
656
657 #[test]
658 fn test_rate_limit_header_info() {
659 let info = RateLimitHeaderInfo {
660 limit: 100,
661 remaining: 42,
662 reset_at: 1704067200,
663 };
664
665 assert_eq!(info.limit, 100);
666 assert_eq!(info.remaining, 42);
667 assert_eq!(info.reset_at, 1704067200);
668 }
669
670 #[test]
671 fn test_request_context_default() {
672 let ctx = RequestContext::new();
673
674 assert!(ctx.trace_id.is_empty());
675 assert!(ctx.rate_limit_info.is_none());
676 assert!(ctx.route_id.is_none());
677 assert!(ctx.config.is_none());
678 }
679
680 #[test]
681 fn test_request_context_rate_limit_info() {
682 let mut ctx = RequestContext::new();
683
684 assert!(ctx.rate_limit_info.is_none());
686
687 ctx.rate_limit_info = Some(RateLimitHeaderInfo {
689 limit: 50,
690 remaining: 25,
691 reset_at: 1704067300,
692 });
693
694 assert!(ctx.rate_limit_info.is_some());
695 let info = ctx.rate_limit_info.as_ref().unwrap();
696 assert_eq!(info.limit, 50);
697 assert_eq!(info.remaining, 25);
698 assert_eq!(info.reset_at, 1704067300);
699 }
700
701 #[test]
702 fn test_request_context_elapsed() {
703 let ctx = RequestContext::new();
704
705 let elapsed = ctx.elapsed();
707 assert!(elapsed.as_secs() < 1);
708 }
709
710 #[test]
711 fn test_request_context_setters() {
712 let mut ctx = RequestContext::new();
713
714 ctx.set_trace_id("trace-123");
715 assert_eq!(ctx.trace_id(), "trace-123");
716 assert_eq!(ctx.correlation_id(), "trace-123");
717
718 ctx.set_route_id("my-route");
719 assert_eq!(ctx.route_id(), Some("my-route"));
720
721 ctx.set_upstream("backend-pool");
722 assert_eq!(ctx.upstream(), Some("backend-pool"));
723
724 ctx.inc_upstream_attempts();
725 ctx.inc_upstream_attempts();
726 assert_eq!(ctx.upstream_attempts(), 2);
727
728 ctx.set_response_bytes(1024);
729 assert_eq!(ctx.response_bytes(), 1024);
730 }
731
732 #[test]
733 fn test_fallback_tracking() {
734 let mut ctx = RequestContext::new();
735
736 assert_eq!(ctx.fallback_attempt(), 0);
738 assert!(!ctx.used_fallback());
739 assert!(ctx.tried_upstreams().is_empty());
740 assert!(ctx.fallback_reason().is_none());
741 assert!(ctx.original_upstream().is_none());
742
743 ctx.set_upstream("openai-primary");
745
746 ctx.record_fallback(FallbackReason::HealthCheckFailed, "anthropic-fallback");
748
749 assert_eq!(ctx.fallback_attempt(), 1);
750 assert!(ctx.used_fallback());
751 assert_eq!(ctx.tried_upstreams(), &["openai-primary".to_string()]);
752 assert!(matches!(
753 ctx.fallback_reason(),
754 Some(FallbackReason::HealthCheckFailed)
755 ));
756 assert_eq!(ctx.original_upstream(), Some("openai-primary"));
757 assert_eq!(ctx.upstream(), Some("anthropic-fallback"));
758
759 ctx.record_fallback(
761 FallbackReason::ErrorCode(503),
762 "local-gpu",
763 );
764
765 assert_eq!(ctx.fallback_attempt(), 2);
766 assert_eq!(
767 ctx.tried_upstreams(),
768 &["openai-primary".to_string(), "anthropic-fallback".to_string()]
769 );
770 assert!(matches!(
771 ctx.fallback_reason(),
772 Some(FallbackReason::ErrorCode(503))
773 ));
774 assert_eq!(ctx.original_upstream(), Some("openai-primary"));
776 assert_eq!(ctx.upstream(), Some("local-gpu"));
777 }
778
779 #[test]
780 fn test_model_mapping_tracking() {
781 let mut ctx = RequestContext::new();
782
783 assert!(ctx.model_mapping_applied().is_none());
784
785 ctx.record_model_mapping("gpt-4".to_string(), "claude-3-opus".to_string());
786
787 let mapping = ctx.model_mapping_applied().unwrap();
788 assert_eq!(mapping.0, "gpt-4");
789 assert_eq!(mapping.1, "claude-3-opus");
790 }
791
792 #[test]
793 fn test_fallback_reason_display() {
794 assert_eq!(
795 FallbackReason::HealthCheckFailed.to_string(),
796 "health_check_failed"
797 );
798 assert_eq!(
799 FallbackReason::BudgetExhausted.to_string(),
800 "budget_exhausted"
801 );
802 assert_eq!(
803 FallbackReason::LatencyThreshold {
804 observed_ms: 5500,
805 threshold_ms: 5000
806 }
807 .to_string(),
808 "latency_threshold_5500ms_exceeded_5000ms"
809 );
810 assert_eq!(FallbackReason::ErrorCode(502).to_string(), "error_code_502");
811 assert_eq!(
812 FallbackReason::ConnectionError("timeout".to_string()).to_string(),
813 "connection_error_timeout"
814 );
815 }
816}