1use std::sync::Arc;
7use std::time::Instant;
8
9use grapsus_config::{BodyStreamingMode, Config, RouteConfig, ServiceType};
10
11use crate::inference::StreamingTokenCounter;
12use crate::websocket::WebSocketHandler;
13
14#[derive(Debug, Clone)]
16pub enum FallbackReason {
17 HealthCheckFailed,
19 BudgetExhausted,
21 LatencyThreshold { observed_ms: u64, threshold_ms: u64 },
23 ErrorCode(u16),
25 ConnectionError(String),
27}
28
29impl std::fmt::Display for FallbackReason {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 match self {
32 FallbackReason::HealthCheckFailed => write!(f, "health_check_failed"),
33 FallbackReason::BudgetExhausted => write!(f, "budget_exhausted"),
34 FallbackReason::LatencyThreshold {
35 observed_ms,
36 threshold_ms,
37 } => write!(
38 f,
39 "latency_threshold_{}ms_exceeded_{}ms",
40 observed_ms, threshold_ms
41 ),
42 FallbackReason::ErrorCode(code) => write!(f, "error_code_{}", code),
43 FallbackReason::ConnectionError(msg) => write!(f, "connection_error_{}", msg),
44 }
45 }
46}
47
48#[derive(Debug, Clone)]
50pub(crate) enum CacheStatus {
51 HitMemory,
53 HitDisk,
55 Hit,
57 HitStale,
59 Miss,
61 Bypass(&'static str),
63}
64
65#[derive(Debug, Clone)]
67pub struct RateLimitHeaderInfo {
68 pub limit: u32,
70 pub remaining: u32,
72 pub reset_at: u64,
74}
75
76pub struct RequestContext {
82 start_time: Instant,
84
85 pub(crate) trace_id: String,
88
89 pub(crate) config: Option<Arc<Config>>,
92
93 pub(crate) route_id: Option<String>,
96 pub(crate) route_config: Option<Arc<RouteConfig>>,
98 pub(crate) upstream: Option<String>,
100 pub(crate) selected_upstream_address: Option<String>,
102 pub(crate) upstream_attempts: u32,
104
105 pub(crate) namespace: Option<String>,
108 pub(crate) service: Option<String>,
110
111 pub(crate) method: String,
114 pub(crate) path: String,
116 pub(crate) query: Option<String>,
118
119 pub(crate) client_ip: String,
122 pub(crate) user_agent: Option<String>,
124 pub(crate) referer: Option<String>,
126 pub(crate) host: Option<String>,
128
129 pub(crate) request_body_bytes: u64,
132 pub(crate) response_bytes: u64,
134
135 pub(crate) connection_reused: bool,
138 pub(crate) is_websocket_upgrade: bool,
140
141 pub(crate) websocket_inspection_enabled: bool,
144 pub(crate) websocket_skip_inspection: bool,
146 pub(crate) websocket_inspection_agents: Vec<String>,
148 pub(crate) websocket_handler: Option<Arc<WebSocketHandler>>,
150
151 pub(crate) cache_eligible: bool,
154 pub(crate) cache_status: Option<CacheStatus>,
156
157 pub(crate) body_inspection_enabled: bool,
160 pub(crate) body_bytes_inspected: u64,
162 pub(crate) body_buffer: Vec<u8>,
164 pub(crate) body_inspection_agents: Vec<String>,
166
167 pub(crate) decompression_enabled: bool,
170 pub(crate) body_content_encoding: Option<String>,
172 pub(crate) max_decompression_ratio: f64,
174 pub(crate) max_decompression_bytes: usize,
176 pub(crate) body_was_decompressed: bool,
178
179 pub(crate) rate_limit_info: Option<RateLimitHeaderInfo>,
182
183 pub(crate) geo_country_code: Option<String>,
186 pub(crate) geo_lookup_performed: bool,
188
189 pub(crate) request_body_streaming_mode: BodyStreamingMode,
192 pub(crate) request_body_chunk_index: u32,
194 pub(crate) agent_needs_more: bool,
196 pub(crate) response_body_streaming_mode: BodyStreamingMode,
198 pub(crate) response_body_chunk_index: u32,
200 pub(crate) response_body_bytes_inspected: u64,
202 pub(crate) response_body_inspection_enabled: bool,
204 pub(crate) response_body_inspection_agents: Vec<String>,
206
207 pub(crate) otel_span: Option<crate::otel::RequestSpan>,
210 pub(crate) trace_context: Option<crate::otel::TraceContext>,
212
213 pub(crate) inference_rate_limit_enabled: bool,
216 pub(crate) inference_estimated_tokens: u64,
218 pub(crate) inference_rate_limit_key: Option<String>,
220 pub(crate) inference_model: Option<String>,
222 pub(crate) inference_provider_override: Option<grapsus_config::InferenceProvider>,
224 pub(crate) model_routing_used: bool,
226 pub(crate) inference_actual_tokens: Option<u64>,
228
229 pub(crate) inference_budget_enabled: bool,
232 pub(crate) inference_budget_remaining: Option<i64>,
234 pub(crate) inference_budget_period_reset: Option<u64>,
236 pub(crate) inference_budget_exhausted: bool,
238
239 pub(crate) inference_cost_enabled: bool,
242 pub(crate) inference_request_cost: Option<f64>,
244 pub(crate) inference_input_tokens: u64,
246 pub(crate) inference_output_tokens: u64,
248
249 pub(crate) inference_streaming_response: bool,
252 pub(crate) inference_streaming_counter: Option<StreamingTokenCounter>,
254
255 pub(crate) fallback_attempt: u32,
258 pub(crate) tried_upstreams: Vec<String>,
260 pub(crate) fallback_reason: Option<FallbackReason>,
262 pub(crate) original_upstream: Option<String>,
264 pub(crate) model_mapping_applied: Option<(String, String)>,
266 pub(crate) should_retry_with_fallback: bool,
268
269 pub(crate) guardrails_enabled: bool,
272 pub(crate) guardrail_warning: bool,
274 pub(crate) guardrail_detection_categories: Vec<String>,
276 pub(crate) pii_detection_categories: Vec<String>,
278
279 pub(crate) shadow_pending: Option<ShadowPendingRequest>,
282 pub(crate) shadow_sent: bool,
284
285 pub(crate) sticky_session_new_assignment: bool,
288 pub(crate) sticky_session_set_cookie: Option<String>,
290 pub(crate) sticky_target_index: Option<usize>,
292
293 pub(crate) listener_keepalive_timeout_secs: Option<u64>,
296
297 pub(crate) filter_connect_timeout_secs: Option<u64>,
300 pub(crate) filter_upstream_timeout_secs: Option<u64>,
302 pub(crate) cors_origin: Option<String>,
304 pub(crate) compress_enabled: bool,
306
307 pub(crate) route_agent_ids: Vec<String>,
310 pub(crate) response_agent_processing_enabled: bool,
312 pub(crate) response_agent_body_buffer: Vec<u8>,
314 pub(crate) response_agent_body_complete: bool,
316}
317
318#[derive(Clone)]
320pub struct ShadowPendingRequest {
321 pub headers: pingora::http::RequestHeader,
323 pub manager: std::sync::Arc<crate::shadow::ShadowManager>,
325 pub request_ctx: crate::upstream::RequestContext,
327 pub include_body: bool,
329}
330
331impl RequestContext {
332 pub fn new() -> Self {
334 Self {
335 start_time: Instant::now(),
336 trace_id: String::new(),
337 config: None,
338 route_id: None,
339 route_config: None,
340 upstream: None,
341 selected_upstream_address: None,
342 upstream_attempts: 0,
343 namespace: None,
344 service: None,
345 method: String::new(),
346 path: String::new(),
347 query: None,
348 client_ip: String::new(),
349 user_agent: None,
350 referer: None,
351 host: None,
352 request_body_bytes: 0,
353 response_bytes: 0,
354 connection_reused: false,
355 is_websocket_upgrade: false,
356 websocket_inspection_enabled: false,
357 websocket_skip_inspection: false,
358 websocket_inspection_agents: Vec::new(),
359 websocket_handler: None,
360 cache_eligible: false,
361 cache_status: None,
362 body_inspection_enabled: false,
363 body_bytes_inspected: 0,
364 body_buffer: Vec::new(),
365 body_inspection_agents: Vec::new(),
366 decompression_enabled: false,
367 body_content_encoding: None,
368 max_decompression_ratio: 100.0,
369 max_decompression_bytes: 10 * 1024 * 1024, body_was_decompressed: false,
371 rate_limit_info: None,
372 geo_country_code: None,
373 geo_lookup_performed: false,
374 request_body_streaming_mode: BodyStreamingMode::Buffer,
375 request_body_chunk_index: 0,
376 agent_needs_more: false,
377 response_body_streaming_mode: BodyStreamingMode::Buffer,
378 response_body_chunk_index: 0,
379 response_body_bytes_inspected: 0,
380 response_body_inspection_enabled: false,
381 response_body_inspection_agents: Vec::new(),
382 otel_span: None,
383 trace_context: None,
384 inference_rate_limit_enabled: false,
385 inference_estimated_tokens: 0,
386 inference_rate_limit_key: None,
387 inference_model: None,
388 inference_provider_override: None,
389 model_routing_used: false,
390 inference_actual_tokens: None,
391 inference_budget_enabled: false,
392 inference_budget_remaining: None,
393 inference_budget_period_reset: None,
394 inference_budget_exhausted: false,
395 inference_cost_enabled: false,
396 inference_request_cost: None,
397 inference_input_tokens: 0,
398 inference_output_tokens: 0,
399 inference_streaming_response: false,
400 inference_streaming_counter: None,
401 fallback_attempt: 0,
402 tried_upstreams: Vec::new(),
403 fallback_reason: None,
404 original_upstream: None,
405 model_mapping_applied: None,
406 should_retry_with_fallback: false,
407 guardrails_enabled: false,
408 guardrail_warning: false,
409 guardrail_detection_categories: Vec::new(),
410 pii_detection_categories: Vec::new(),
411 shadow_pending: None,
412 shadow_sent: false,
413 sticky_session_new_assignment: false,
414 sticky_session_set_cookie: None,
415 sticky_target_index: None,
416 listener_keepalive_timeout_secs: None,
417 filter_connect_timeout_secs: None,
418 filter_upstream_timeout_secs: None,
419 cors_origin: None,
420 compress_enabled: false,
421 route_agent_ids: Vec::new(),
422 response_agent_processing_enabled: false,
423 response_agent_body_buffer: Vec::new(),
424 response_agent_body_complete: false,
425 }
426 }
427
428 #[inline]
432 pub fn start_time(&self) -> Instant {
433 self.start_time
434 }
435
436 #[inline]
438 pub fn elapsed(&self) -> std::time::Duration {
439 self.start_time.elapsed()
440 }
441
442 #[inline]
446 pub fn correlation_id(&self) -> &str {
447 &self.trace_id
448 }
449
450 #[inline]
452 pub fn trace_id(&self) -> &str {
453 &self.trace_id
454 }
455
456 #[inline]
458 pub fn route_id(&self) -> Option<&str> {
459 self.route_id.as_deref()
460 }
461
462 #[inline]
464 pub fn upstream(&self) -> Option<&str> {
465 self.upstream.as_deref()
466 }
467
468 #[inline]
470 pub fn selected_upstream_address(&self) -> Option<&str> {
471 self.selected_upstream_address.as_deref()
472 }
473
474 #[inline]
476 pub fn route_config(&self) -> Option<&Arc<RouteConfig>> {
477 self.route_config.as_ref()
478 }
479
480 #[inline]
482 pub fn global_config(&self) -> Option<&Arc<Config>> {
483 self.config.as_ref()
484 }
485
486 #[inline]
488 pub fn service_type(&self) -> Option<ServiceType> {
489 self.route_config.as_ref().map(|c| c.service_type.clone())
490 }
491
492 #[inline]
494 pub fn upstream_attempts(&self) -> u32 {
495 self.upstream_attempts
496 }
497
498 #[inline]
500 pub fn method(&self) -> &str {
501 &self.method
502 }
503
504 #[inline]
506 pub fn path(&self) -> &str {
507 &self.path
508 }
509
510 #[inline]
512 pub fn query(&self) -> Option<&str> {
513 self.query.as_deref()
514 }
515
516 #[inline]
518 pub fn client_ip(&self) -> &str {
519 &self.client_ip
520 }
521
522 #[inline]
524 pub fn user_agent(&self) -> Option<&str> {
525 self.user_agent.as_deref()
526 }
527
528 #[inline]
530 pub fn referer(&self) -> Option<&str> {
531 self.referer.as_deref()
532 }
533
534 #[inline]
536 pub fn host(&self) -> Option<&str> {
537 self.host.as_deref()
538 }
539
540 #[inline]
542 pub fn response_bytes(&self) -> u64 {
543 self.response_bytes
544 }
545
546 #[inline]
548 pub fn geo_country_code(&self) -> Option<&str> {
549 self.geo_country_code.as_deref()
550 }
551
552 #[inline]
554 pub fn geo_lookup_performed(&self) -> bool {
555 self.geo_lookup_performed
556 }
557
558 #[inline]
563 pub fn traceparent(&self) -> Option<String> {
564 self.otel_span.as_ref().map(|span| {
565 let sampled = self
566 .trace_context
567 .as_ref()
568 .map(|c| c.sampled)
569 .unwrap_or(true);
570 crate::otel::create_traceparent(&span.trace_id, &span.span_id, sampled)
571 })
572 }
573
574 #[inline]
578 pub fn set_trace_id(&mut self, trace_id: impl Into<String>) {
579 self.trace_id = trace_id.into();
580 }
581
582 #[inline]
584 pub fn set_route_id(&mut self, route_id: impl Into<String>) {
585 self.route_id = Some(route_id.into());
586 }
587
588 #[inline]
590 pub fn set_upstream(&mut self, upstream: impl Into<String>) {
591 self.upstream = Some(upstream.into());
592 }
593
594 #[inline]
596 pub fn set_selected_upstream_address(&mut self, address: impl Into<String>) {
597 self.selected_upstream_address = Some(address.into());
598 }
599
600 #[inline]
602 pub fn inc_upstream_attempts(&mut self) {
603 self.upstream_attempts += 1;
604 }
605
606 #[inline]
608 pub fn set_response_bytes(&mut self, bytes: u64) {
609 self.response_bytes = bytes;
610 }
611
612 #[inline]
616 pub fn fallback_attempt(&self) -> u32 {
617 self.fallback_attempt
618 }
619
620 #[inline]
622 pub fn tried_upstreams(&self) -> &[String] {
623 &self.tried_upstreams
624 }
625
626 #[inline]
628 pub fn fallback_reason(&self) -> Option<&FallbackReason> {
629 self.fallback_reason.as_ref()
630 }
631
632 #[inline]
634 pub fn original_upstream(&self) -> Option<&str> {
635 self.original_upstream.as_deref()
636 }
637
638 #[inline]
640 pub fn model_mapping_applied(&self) -> Option<&(String, String)> {
641 self.model_mapping_applied.as_ref()
642 }
643
644 #[inline]
646 pub fn used_fallback(&self) -> bool {
647 self.fallback_attempt > 0
648 }
649
650 #[inline]
652 pub fn record_fallback(&mut self, reason: FallbackReason, new_upstream: &str) {
653 if self.fallback_attempt == 0 {
654 self.original_upstream = self.upstream.clone();
656 }
657 self.fallback_attempt += 1;
658 self.fallback_reason = Some(reason);
659 if let Some(current) = &self.upstream {
660 self.tried_upstreams.push(current.clone());
661 }
662 self.upstream = Some(new_upstream.to_string());
663 }
664
665 #[inline]
667 pub fn record_model_mapping(&mut self, original: String, mapped: String) {
668 self.model_mapping_applied = Some((original, mapped));
669 }
670
671 #[inline]
675 pub fn used_model_routing(&self) -> bool {
676 self.model_routing_used
677 }
678
679 #[inline]
681 pub fn inference_provider_override(&self) -> Option<grapsus_config::InferenceProvider> {
682 self.inference_provider_override
683 }
684
685 #[inline]
689 pub fn record_model_routing(
690 &mut self,
691 upstream: &str,
692 model: Option<String>,
693 provider_override: Option<grapsus_config::InferenceProvider>,
694 ) {
695 self.upstream = Some(upstream.to_string());
696 self.model_routing_used = true;
697 if model.is_some() {
698 self.inference_model = model;
699 }
700 self.inference_provider_override = provider_override;
701 }
702}
703
704impl Default for RequestContext {
705 fn default() -> Self {
706 Self::new()
707 }
708}
709
710#[cfg(test)]
715mod tests {
716 use super::*;
717
718 #[test]
719 fn test_rate_limit_header_info() {
720 let info = RateLimitHeaderInfo {
721 limit: 100,
722 remaining: 42,
723 reset_at: 1704067200,
724 };
725
726 assert_eq!(info.limit, 100);
727 assert_eq!(info.remaining, 42);
728 assert_eq!(info.reset_at, 1704067200);
729 }
730
731 #[test]
732 fn test_request_context_default() {
733 let ctx = RequestContext::new();
734
735 assert!(ctx.trace_id.is_empty());
736 assert!(ctx.rate_limit_info.is_none());
737 assert!(ctx.route_id.is_none());
738 assert!(ctx.config.is_none());
739 }
740
741 #[test]
742 fn test_request_context_rate_limit_info() {
743 let mut ctx = RequestContext::new();
744
745 assert!(ctx.rate_limit_info.is_none());
747
748 ctx.rate_limit_info = Some(RateLimitHeaderInfo {
750 limit: 50,
751 remaining: 25,
752 reset_at: 1704067300,
753 });
754
755 assert!(ctx.rate_limit_info.is_some());
756 let info = ctx.rate_limit_info.as_ref().unwrap();
757 assert_eq!(info.limit, 50);
758 assert_eq!(info.remaining, 25);
759 assert_eq!(info.reset_at, 1704067300);
760 }
761
762 #[test]
763 fn test_request_context_elapsed() {
764 let ctx = RequestContext::new();
765
766 let elapsed = ctx.elapsed();
768 assert!(elapsed.as_secs() < 1);
769 }
770
771 #[test]
772 fn test_request_context_setters() {
773 let mut ctx = RequestContext::new();
774
775 ctx.set_trace_id("trace-123");
776 assert_eq!(ctx.trace_id(), "trace-123");
777 assert_eq!(ctx.correlation_id(), "trace-123");
778
779 ctx.set_route_id("my-route");
780 assert_eq!(ctx.route_id(), Some("my-route"));
781
782 ctx.set_upstream("backend-pool");
783 assert_eq!(ctx.upstream(), Some("backend-pool"));
784
785 ctx.inc_upstream_attempts();
786 ctx.inc_upstream_attempts();
787 assert_eq!(ctx.upstream_attempts(), 2);
788
789 ctx.set_response_bytes(1024);
790 assert_eq!(ctx.response_bytes(), 1024);
791 }
792
793 #[test]
794 fn test_fallback_tracking() {
795 let mut ctx = RequestContext::new();
796
797 assert_eq!(ctx.fallback_attempt(), 0);
799 assert!(!ctx.used_fallback());
800 assert!(ctx.tried_upstreams().is_empty());
801 assert!(ctx.fallback_reason().is_none());
802 assert!(ctx.original_upstream().is_none());
803
804 ctx.set_upstream("openai-primary");
806
807 ctx.record_fallback(FallbackReason::HealthCheckFailed, "anthropic-fallback");
809
810 assert_eq!(ctx.fallback_attempt(), 1);
811 assert!(ctx.used_fallback());
812 assert_eq!(ctx.tried_upstreams(), &["openai-primary".to_string()]);
813 assert!(matches!(
814 ctx.fallback_reason(),
815 Some(FallbackReason::HealthCheckFailed)
816 ));
817 assert_eq!(ctx.original_upstream(), Some("openai-primary"));
818 assert_eq!(ctx.upstream(), Some("anthropic-fallback"));
819
820 ctx.record_fallback(FallbackReason::ErrorCode(503), "local-gpu");
822
823 assert_eq!(ctx.fallback_attempt(), 2);
824 assert_eq!(
825 ctx.tried_upstreams(),
826 &[
827 "openai-primary".to_string(),
828 "anthropic-fallback".to_string()
829 ]
830 );
831 assert!(matches!(
832 ctx.fallback_reason(),
833 Some(FallbackReason::ErrorCode(503))
834 ));
835 assert_eq!(ctx.original_upstream(), Some("openai-primary"));
837 assert_eq!(ctx.upstream(), Some("local-gpu"));
838 }
839
840 #[test]
841 fn test_model_mapping_tracking() {
842 let mut ctx = RequestContext::new();
843
844 assert!(ctx.model_mapping_applied().is_none());
845
846 ctx.record_model_mapping("gpt-4".to_string(), "claude-3-opus".to_string());
847
848 let mapping = ctx.model_mapping_applied().unwrap();
849 assert_eq!(mapping.0, "gpt-4");
850 assert_eq!(mapping.1, "claude-3-opus");
851 }
852
853 #[test]
854 fn test_fallback_reason_display() {
855 assert_eq!(
856 FallbackReason::HealthCheckFailed.to_string(),
857 "health_check_failed"
858 );
859 assert_eq!(
860 FallbackReason::BudgetExhausted.to_string(),
861 "budget_exhausted"
862 );
863 assert_eq!(
864 FallbackReason::LatencyThreshold {
865 observed_ms: 5500,
866 threshold_ms: 5000
867 }
868 .to_string(),
869 "latency_threshold_5500ms_exceeded_5000ms"
870 );
871 assert_eq!(FallbackReason::ErrorCode(502).to_string(), "error_code_502");
872 assert_eq!(
873 FallbackReason::ConnectionError("timeout".to_string()).to_string(),
874 "connection_error_timeout"
875 );
876 }
877}